thing.py 10.3 KB
Newer Older
C. Albrecht's avatar
WIP  
C. Albrecht committed
1
2
3
4
5
import ast
import threading
import websocket
import json
import uuid
Jiahang Chen's avatar
Jiahang Chen committed
6
7
8
from s3i import IdentityProvider, TokenType, Broker, GetValueReply, Directory
from s3i.broker import BrokerREST
from s3i.messages import ServiceReply
C. Albrecht's avatar
WIP  
C. Albrecht committed
9
10
11
12

from ml.managing_actor import ManagingActor
from ml.tools import BColors
from ml.tools import send_message
Jiahang Chen's avatar
Jiahang Chen committed
13
from ml.tools import find_broker_endpoint
C. Albrecht's avatar
WIP  
C. Albrecht committed
14
from ml.app_logger import APP_LOGGER
Jiahang Chen's avatar
Jiahang Chen committed
15
16


C. Albrecht's avatar
WIP  
C. Albrecht committed
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
class BaseVariable(object):
    IDP_URL = "https://idp.s3i.vswf.dev/"
    IDP_REALM = "KWH"
    BROKER_HOST = "rabbitmq.s3i.vswf.dev"
    REPO_WWS_URL = "wss://ditto.s3i.vswf.dev/ws/2"
    REPO_URL = "https://ditto.s3i.vswf.dev/api/2/"
    DIR_URL = "https://dir.s3i.vswf.dev/api/2/"


def get_sensor_uuid(body_json):
    parameters = body_json.get("parameters", {})
    sensor_uuid = parameters.get("sensor_uuid", "")
    return sensor_uuid


class Thing(ManagingActor):
    def __init__(
Jiahang Chen's avatar
Jiahang Chen committed
34
35
36
37
38
39
            self,
            client_secret,
            model: dict,
            grant_type: str,
            is_broker: bool,
            is_repo: bool,
Jiahang Chen's avatar
Jiahang Chen committed
40
41
            is_broker_rest=True,
            username=None,
Jiahang Chen's avatar
Jiahang Chen committed
42
            password=None,
C. Albrecht's avatar
WIP  
C. Albrecht committed
43
44
45
46
47
48
49
50
    ):
        super(Thing, self).__init__()
        self.__thing_id = model.get("thingId", "")
        self.__policy_id = model.get("policyId", "")
        self.__grant_type = grant_type
        self.__username = username
        self.__password = password
        self.__client_secret = client_secret
Jiahang Chen's avatar
Jiahang Chen committed
51

C. Albrecht's avatar
WIP  
C. Albrecht committed
52
        self.__is_broker = is_broker
Jiahang Chen's avatar
Jiahang Chen committed
53
        self.__is_broker_rest = is_broker_rest
C. Albrecht's avatar
WIP  
C. Albrecht committed
54
        self.__is_repo = is_repo
Jiahang Chen's avatar
Jiahang Chen committed
55

C. Albrecht's avatar
WIP  
C. Albrecht committed
56
        self.__access_token = ""
Jiahang Chen's avatar
Jiahang Chen committed
57
58
59

        self.__endpoint = ""

C. Albrecht's avatar
WIP  
C. Albrecht committed
60
        self.__ws_connected = False
Jiahang Chen's avatar
Jiahang Chen committed
61
62
63
        self.broker = None
        self.ws = None
        self.dir = None
C. Albrecht's avatar
WIP  
C. Albrecht committed
64
65
66
67
68

        attributes = model.get("attributes", None)
        self.__name = ""
        self.__class_name = ""
        self.__type_name = ""
Jiahang Chen's avatar
Jiahang Chen committed
69
70
71
        self.roles = []
        self.__features = {}

C. Albrecht's avatar
WIP  
C. Albrecht committed
72
73
74
75
        if attributes:
            self.__name = attributes.get("name", "")
            self.__class_name = attributes.get("class", "")
            self.__type_name = attributes.get("type", "")
Jiahang Chen's avatar
Jiahang Chen committed
76
77
78
79
80
81
82
83
84

    @property
    def features(self):
        return self.__features

    @features.setter
    def features(self, value):
        self.__features = value

C. Albrecht's avatar
WIP  
C. Albrecht committed
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
    @property
    def class_name(self):
        return self.__class_name

    @property
    def client_secret(self):
        return self.__client_secret

    @property
    def grant_type(self):
        return self.__grant_type

    @property
    def access_token(self):
        return self.__access_token

    @property
    def name(self):
        return self.__name

    @property
    def thing_id(self):
        return self.__thing_id

    def run_forever(self):
Jiahang Chen's avatar
Jiahang Chen committed
110
        print("[S³I]: Launch {}{}{}".format(BColors.OKGREEN, self.name, BColors.ENDC))
C. Albrecht's avatar
WIP  
C. Albrecht committed
111
112
113
        self.__connect_with_idp()

    def __connect_with_idp(self):
Jiahang Chen's avatar
Jiahang Chen committed
114
        print(BColors.OKBLUE + "[S³I][IdP]" + BColors.ENDC + ": Connect with S3I-IdentityProvider")
C. Albrecht's avatar
WIP  
C. Albrecht committed
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
        idp = IdentityProvider(
            grant_type=self.__grant_type,
            client_id=self.__thing_id,
            username=self.__username,
            password=self.__password,
            client_secret=self.__client_secret,
            realm=BaseVariable.IDP_REALM,
            identity_provider_url=BaseVariable.IDP_URL,
        )

        # This may take a while so fetch token directly.
        idp.run_forever(token_type=TokenType.ACCESS_TOKEN, on_new_token=self.__on_token)

    def __on_token(self, token):
        self.__access_token = token
Jiahang Chen's avatar
Jiahang Chen committed
130
        self.__connect_with_dir()
C. Albrecht's avatar
WIP  
C. Albrecht committed
131
132
133
134
135
        if self.__is_broker:
            self.__connect_with_broker()
        if self.__is_repo:
            self.__connect_with_repo()

Jiahang Chen's avatar
Jiahang Chen committed
136
137
    def __connect_with_dir(self):
        print(
Jiahang Chen's avatar
Jiahang Chen committed
138
            BColors.OKBLUE
Jiahang Chen's avatar
Jiahang Chen committed
139
            + "[S³I][Dir]"
Jiahang Chen's avatar
Jiahang Chen committed
140
            + BColors.ENDC
Jiahang Chen's avatar
Jiahang Chen committed
141
142
143
144
            + ": Connect with S3I-Directory"
        )
        self.dir = Directory(s3i_dir_url="https://dir.s3i.vswf.dev/api/2/", token=self.__access_token)

C. Albrecht's avatar
WIP  
C. Albrecht committed
145
    def __connect_with_repo(self):
Jiahang Chen's avatar
Jiahang Chen committed
146
        print(
Jiahang Chen's avatar
Jiahang Chen committed
147
            BColors.OKBLUE
Jiahang Chen's avatar
Jiahang Chen committed
148
            + "[S³I][Repo]"
Jiahang Chen's avatar
Jiahang Chen committed
149
            + BColors.ENDC
Jiahang Chen's avatar
Jiahang Chen committed
150
151
152
            + ": Connect with S3I-Repository"
        )

C. Albrecht's avatar
WIP  
C. Albrecht committed
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
        self._ws = websocket.WebSocketApp(
            BaseVariable.REPO_WWS_URL,
            header={"Authorization: Bearer {}".format(self.__access_token)},
            on_message=self.__on_new_websocket_message,
            on_error=self.__on_new_websocket_error,
            on_open=self.__on_websocket_connection_opened,
            on_close=self.__on_websocket_connection_closed,
        )

        threading.Thread(target=self._ws.run_forever).start()

    @staticmethod
    def __on_new_websocket_message(ws, msg):
        pass

    @staticmethod
    def __on_new_websocket_error(ws, error):
        print(BColors.OKBLUE + "[S³I][Repo]" + BColors.ENDC + " : Websocekt error")

    def __on_websocket_connection_opened(self):
        self.__ws_connected = True
        self._ws.send("START-SEND-MESSAGES")
        print(
            BColors.OKBLUE
            + "[S³I][Repo]"
            + BColors.ENDC
            + ": Websocket connection built"
        )

    def __on_websocket_connection_closed(self):
        self.__ws_connected = False
        print(
            BColors.OKBLUE
            + "[S³I][Repo]"
            + BColors.ENDC
            + ": Websocket connection closed"
        )

    def sync_with_repo(self, path, topic):
        if not self.__ws_connected:
            return None
        msg = {"topic": topic, "path": path, "value": self.fml40_data_model[path]}
        self._ws.send(json.dumps(msg))

    def __connect_with_broker(self):
Jiahang Chen's avatar
Jiahang Chen committed
198
199
200
201
202
203
204
205
206
207
208
209
210
211
        print(BColors.OKBLUE + "[S³I][Broker]" + BColors.ENDC + ": Connect with S3I-Broker")
        self.__endpoint = find_broker_endpoint(self.dir, thing_id=self.thing_id)

        if self.__is_broker_rest:
            self.broker = BrokerREST(token=self.access_token)

            def receive():
                while True:
                    msg_str = self.broker.receive_once(self.__endpoint)
                    if msg_str == "":
                        continue
                    else:
                        self.__on_broker_callback(ch=None, method=None, properties=None,
                                                  body=json.loads(msg_str))
Jiahang Chen's avatar
Jiahang Chen committed
212

Jiahang Chen's avatar
Jiahang Chen committed
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
            threading.Thread(
                target=receive,
            ).start()

        else:
            self.broker = Broker(
                auth_form="Username/Password",
                username=" ",
                password=self.__access_token,
                host=BaseVariable.BROKER_HOST,
            )

            threading.Thread(
                target=self.broker.receive,
                args=(self.__endpoint, self.__on_broker_callback),
            ).start()
C. Albrecht's avatar
WIP  
C. Albrecht committed
229
230

    def __on_broker_callback(self, ch, method, properties, body):
Jiahang Chen's avatar
Jiahang Chen committed
231
232
233
        if isinstance(body, bytes):
            body = ast.literal_eval(body.decode("utf-8"))
        elif isinstance(body, int):
Jiahang Chen's avatar
Jiahang Chen committed
234
            print(body)
Jiahang Chen's avatar
Jiahang Chen committed
235
            return
Jiahang Chen's avatar
Jiahang Chen committed
236
237
        elif isinstance(body, str):
            print(body)
Jiahang Chen's avatar
Jiahang Chen committed
238
239
240
241
242
243
244
245
246
247
248
249
250
            return

        message_type = body.get("messageType")
        if message_type == "userMessage":
            self.on_user_message(body)
        elif message_type == "serviceRequest":
            self.on_service_request(body)
        elif message_type == "getValueRequest":
            self.on_get_value_request(body)
        elif message_type == "getValueReply":
            self.on_get_value_reply(body)
        elif message_type == "serviceReply":
            self.on_service_reply(body)
Jiahang Chen's avatar
Jiahang Chen committed
251
        else:
Jiahang Chen's avatar
Jiahang Chen committed
252
253
            ### TODO send user message reply back
            pass
Jiahang Chen's avatar
Jiahang Chen committed
254
255

    def on_user_message(self, msg):
Jiahang Chen's avatar
Jiahang Chen committed
256
        pass
C. Albrecht's avatar
WIP  
C. Albrecht committed
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281

    def on_get_value_request(self, msg):
        get_value_reply = GetValueReply()
        request_sender = msg.get("sender", "")
        request_msg_id = msg.get("identifier")
        request_sender_endpoint = msg.get("replyToEndpoint")
        request_sender_endpoint = f"s3ib://{request_sender_endpoint}"
        attribute_path = msg.get("attributePath")
        reply_msg_uuid = "s3i:" + str(uuid.uuid4())
        # TODO: Get correct value.
        value = None

        if attribute_path == "features/moisture":
            moisture = self.features.get("ml40::Moisture", None)
            moisture_proxy = moisture.proxy()
            value = moisture_proxy.humidity.get()

        get_value_reply.fillGetValueReply(
            senderUUID=self.__thing_id,
            receiverUUID=[request_sender],
            results=value,
            msgUUID=reply_msg_uuid,
            replyingToUUID=request_msg_id,
        )
        msg_txt = get_value_reply.msg.__str__()
Jiahang Chen's avatar
Jiahang Chen committed
282
        self.broker.send(
C. Albrecht's avatar
WIP  
C. Albrecht committed
283
284
285
286
287
            receiver_endpoints=[request_sender_endpoint], msg=msg_txt,
        )

    def on_service_request(self, body_json):
        service_type = body_json.get("serviceType")
Jiahang Chen's avatar
Jiahang Chen committed
288
289
290
        service_functionality = service_type.split('/')[0]
        service_functionality_obj = self.features.get(service_functionality, None)
        if service_functionality_obj is None:
C. Albrecht's avatar
WIP  
C. Albrecht committed
291
            APP_LOGGER.critical(
Jiahang Chen's avatar
Jiahang Chen committed
292
293
                "Functionality %s is not one of the built-in functionalities in %s!" % (
                service_functionality, self.name)
C. Albrecht's avatar
WIP  
C. Albrecht committed
294
295
296
297
            )
        else:
            pass
            # TODO: Call right functionality.
Jiahang Chen's avatar
Jiahang Chen committed
298
299
            func_proxy = service_functionality_obj.proxy()
            method = getattr(func_proxy, service_type.split('/')[1])
Jiahang Chen's avatar
Jiahang Chen committed
300
301
302
303
304
305
306
307
308
309
310
311
            result = {"ok": method("test_job").get()}

            service_reply = ServiceReply()
            service_reply.fillServiceReply(
                senderUUID=self.thing_id,
                receiverUUID=body_json.get("sender", None),
                serviceType=body_json.get("serviceType", None),
                results=result,
                replyingToUUID=body_json.get("identifier", None),
                msgUUID=str(uuid.uuid4())
            )
            send_message(self.broker, self.dir, service_reply.msg)
Jiahang Chen's avatar
Jiahang Chen committed
312
313

    def on_get_value_reply(self, msg):
Jiahang Chen's avatar
Jiahang Chen committed
314
        print(msg)
Jiahang Chen's avatar
Jiahang Chen committed
315
316

    def on_service_reply(self, msg):
Jiahang Chen's avatar
Jiahang Chen committed
317
        print(msg)
Jiahang Chen's avatar
Jiahang Chen committed
318
319
320
321
322
323

    def add_function_impl(self, obj, feature_name):
        feature = self.__features.get(feature_name, None)

        if feature is None:
            APP_LOGGER.critical(
Jiahang Chen's avatar
Jiahang Chen committed
324
                "Functionality %s is not one of the build-in functionalities" % feature_name
Jiahang Chen's avatar
Jiahang Chen committed
325
326
327
            )
        else:
            self.__features[feature_name] = obj.start("", self.actor_ref)