thing.py 16.7 KB
Newer Older
C. Albrecht's avatar
WIP  
C. Albrecht committed
1
2
3
4
5
import ast
import threading
import websocket
import json
import uuid
Jiahang Chen's avatar
Jiahang Chen committed
6
7
from s3i import IdentityProvider, TokenType, GetValueReply, Directory, Repository
from s3i.broker import Broker, BrokerREST
Jiahang Chen's avatar
Jiahang Chen committed
8
from s3i.messages import ServiceReply
C. Albrecht's avatar
WIP  
C. Albrecht committed
9
from ml.tools import BColors
Jiahang Chen's avatar
Jiahang Chen committed
10
from ml.tools import find_broker_endpoint
C. Albrecht's avatar
WIP  
C. Albrecht committed
11
from ml.app_logger import APP_LOGGER
Jiahang Chen's avatar
Jiahang Chen committed
12
13
import time
import copy
Jiahang Chen's avatar
Jiahang Chen committed
14
15


C. Albrecht's avatar
WIP  
C. Albrecht committed
16
17
18
19
20
21
22
23
24
class BaseVariable(object):
    IDP_URL = "https://idp.s3i.vswf.dev/"
    IDP_REALM = "KWH"
    BROKER_HOST = "rabbitmq.s3i.vswf.dev"
    REPO_WWS_URL = "wss://ditto.s3i.vswf.dev/ws/2"
    REPO_URL = "https://ditto.s3i.vswf.dev/api/2/"
    DIR_URL = "https://dir.s3i.vswf.dev/api/2/"


Jiahang Chen's avatar
Jiahang Chen committed
25
class Thing:
C. Albrecht's avatar
WIP  
C. Albrecht committed
26
    def __init__(
Jiahang Chen's avatar
Jiahang Chen committed
27
28
            self,
            model: dict,
Jiahang Chen's avatar
Jiahang Chen committed
29
30
31
32
            client_secret="",
            grant_type="password",
            is_broker=False,
            is_repo=False,
Jiahang Chen's avatar
Jiahang Chen committed
33
34
            is_broker_rest=True,
            username=None,
Jiahang Chen's avatar
Jiahang Chen committed
35
            password=None,
C. Albrecht's avatar
WIP  
C. Albrecht committed
36
    ):
Jiahang Chen's avatar
Jiahang Chen committed
37
        self.__model = model
C. Albrecht's avatar
WIP  
C. Albrecht committed
38
39
40
41
42
43
        self.__thing_id = model.get("thingId", "")
        self.__policy_id = model.get("policyId", "")
        self.__grant_type = grant_type
        self.__username = username
        self.__password = password
        self.__client_secret = client_secret
Jiahang Chen's avatar
Jiahang Chen committed
44

C. Albrecht's avatar
WIP  
C. Albrecht committed
45
        self.__is_broker = is_broker
Jiahang Chen's avatar
Jiahang Chen committed
46
        self.__is_broker_rest = is_broker_rest
C. Albrecht's avatar
WIP  
C. Albrecht committed
47
        self.__is_repo = is_repo
Jiahang Chen's avatar
Jiahang Chen committed
48

C. Albrecht's avatar
WIP  
C. Albrecht committed
49
        self.__access_token = ""
Jiahang Chen's avatar
Jiahang Chen committed
50
51
52

        self.__endpoint = ""

C. Albrecht's avatar
WIP  
C. Albrecht committed
53
        self.__ws_connected = False
Jiahang Chen's avatar
Jiahang Chen committed
54
55
56
        self.broker = None
        self.ws = None
        self.dir = None
Jiahang Chen's avatar
Jiahang Chen committed
57
58
59
60
        self.repo = None

        self.repo_json = dict()
        self.dir_json = dict()
C. Albrecht's avatar
WIP  
C. Albrecht committed
61
62
63

        attributes = model.get("attributes", None)
        self.__name = ""
Jiahang Chen's avatar
Jiahang Chen committed
64
        self.__roles = {}
Jiahang Chen's avatar
Jiahang Chen committed
65
        self.__features = {}
Jiahang Chen's avatar
Jiahang Chen committed
66
        self.__resGetValue = list()
C. Albrecht's avatar
WIP  
C. Albrecht committed
67
68
        if attributes:
            self.__name = attributes.get("name", "")
Jiahang Chen's avatar
Jiahang Chen committed
69
70
71
72

    @property
    def model(self):
        return self.__model
Jiahang Chen's avatar
Jiahang Chen committed
73
74
75
76
77
78
79
80
81

    @property
    def features(self):
        return self.__features

    @features.setter
    def features(self, value):
        self.__features = value

Jiahang Chen's avatar
Jiahang Chen committed
82
83
84
85
86
87
88
89
    @property
    def roles(self):
        return self.__roles

    @roles.setter
    def roles(self, value):
        self.__roles = value

C. Albrecht's avatar
WIP  
C. Albrecht committed
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
    @property
    def client_secret(self):
        return self.__client_secret

    @property
    def grant_type(self):
        return self.__grant_type

    @property
    def access_token(self):
        return self.__access_token

    @property
    def name(self):
        return self.__name

    @property
    def thing_id(self):
        return self.__thing_id

Jiahang Chen's avatar
Jiahang Chen committed
110
111
112
113
    @property
    def policy_id(self):
        return self.__policy_id

C. Albrecht's avatar
WIP  
C. Albrecht committed
114
    def run_forever(self):
Jiahang Chen's avatar
Jiahang Chen committed
115
        print("[S³I]: Launch {}{}{}".format(BColors.OKGREEN, self.name, BColors.ENDC))
C. Albrecht's avatar
WIP  
C. Albrecht committed
116
117
        self.__connect_with_idp()

Jiahang Chen's avatar
Jiahang Chen committed
118
119
120
121
122
123
124
125
126
127
        threading.Thread(target=self.__dir_syn).start()
        if self.__is_repo:
            threading.Thread(target=self.__repo_syn).start()

    @staticmethod
    def add_user_def(func):
        threading.Thread(target=func).start()

    def __dir_syn(self):
        while True:
128
129
130
131
132
133
134
135
136
            try:
                time.sleep(0.1)
                old_dir_json = self.dir_json
                self.to_dir_json()
                if self.dir_json == old_dir_json:
                    continue
                else:
                    self.dir.updateThingIDBased(thingID=self.thing_id, payload=self.dir_json)
            except:
Jiahang Chen's avatar
Jiahang Chen committed
137
138
139
140
                continue

    def __repo_syn(self):
        while self.__is_repo:
141
142
143
144
145
146
147
148
149
            try:
                time.sleep(0.1)
                old_repo_json = self.repo_json
                self.to_repo_json()
                if self.repo_json == old_repo_json:
                    continue
                else:
                    self.repo.updateThingIDBased(thingID=self.thing_id, payload=self.repo_json)
            except:
Jiahang Chen's avatar
Jiahang Chen committed
150
151
                continue

C. Albrecht's avatar
WIP  
C. Albrecht committed
152
    def __connect_with_idp(self):
Jiahang Chen's avatar
Jiahang Chen committed
153
        print(BColors.OKBLUE + "[S³I][IdP]" + BColors.ENDC + ": Connect with S3I-IdentityProvider")
C. Albrecht's avatar
WIP  
C. Albrecht committed
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
        idp = IdentityProvider(
            grant_type=self.__grant_type,
            client_id=self.__thing_id,
            username=self.__username,
            password=self.__password,
            client_secret=self.__client_secret,
            realm=BaseVariable.IDP_REALM,
            identity_provider_url=BaseVariable.IDP_URL,
        )

        # This may take a while so fetch token directly.
        idp.run_forever(token_type=TokenType.ACCESS_TOKEN, on_new_token=self.__on_token)

    def __on_token(self, token):
        self.__access_token = token
Jiahang Chen's avatar
Jiahang Chen committed
169
        self.__connect_with_dir()
Jiahang Chen's avatar
Jiahang Chen committed
170
        self.__connect_with_repo()
C. Albrecht's avatar
WIP  
C. Albrecht committed
171
172
        if self.__is_broker:
            self.__connect_with_broker()
Jiahang Chen's avatar
Jiahang Chen committed
173

Jiahang Chen's avatar
Jiahang Chen committed
174
175
    def __connect_with_dir(self):
        print(
Jiahang Chen's avatar
Jiahang Chen committed
176
            BColors.OKBLUE
Jiahang Chen's avatar
Jiahang Chen committed
177
            + "[S³I][Dir]"
Jiahang Chen's avatar
Jiahang Chen committed
178
            + BColors.ENDC
Jiahang Chen's avatar
Jiahang Chen committed
179
180
            + ": Connect with S3I-Directory"
        )
Jiahang Chen's avatar
Jiahang Chen committed
181
        self.dir = Directory(s3i_dir_url=BaseVariable.DIR_URL, token=self.__access_token)
Jiahang Chen's avatar
Jiahang Chen committed
182

C. Albrecht's avatar
WIP  
C. Albrecht committed
183
    def __connect_with_repo(self):
Jiahang Chen's avatar
Jiahang Chen committed
184
        print(
Jiahang Chen's avatar
Jiahang Chen committed
185
            BColors.OKBLUE
Jiahang Chen's avatar
Jiahang Chen committed
186
            + "[S³I][Repo]"
Jiahang Chen's avatar
Jiahang Chen committed
187
            + BColors.ENDC
Jiahang Chen's avatar
Jiahang Chen committed
188
189
            + ": Connect with S3I-Repository"
        )
Jiahang Chen's avatar
Jiahang Chen committed
190
        self.repo = Repository(s3i_repo_url=BaseVariable.REPO_URL, token=self.__access_token)
Jiahang Chen's avatar
Jiahang Chen committed
191

C. Albrecht's avatar
WIP  
C. Albrecht committed
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
        self._ws = websocket.WebSocketApp(
            BaseVariable.REPO_WWS_URL,
            header={"Authorization: Bearer {}".format(self.__access_token)},
            on_message=self.__on_new_websocket_message,
            on_error=self.__on_new_websocket_error,
            on_open=self.__on_websocket_connection_opened,
            on_close=self.__on_websocket_connection_closed,
        )

        threading.Thread(target=self._ws.run_forever).start()

    @staticmethod
    def __on_new_websocket_message(ws, msg):
        pass

    @staticmethod
    def __on_new_websocket_error(ws, error):
        print(BColors.OKBLUE + "[S³I][Repo]" + BColors.ENDC + " : Websocekt error")

    def __on_websocket_connection_opened(self):
        self.__ws_connected = True
        self._ws.send("START-SEND-MESSAGES")
        print(
            BColors.OKBLUE
            + "[S³I][Repo]"
            + BColors.ENDC
            + ": Websocket connection built"
        )

    def __on_websocket_connection_closed(self):
        self.__ws_connected = False
        print(
            BColors.OKBLUE
            + "[S³I][Repo]"
            + BColors.ENDC
            + ": Websocket connection closed"
        )

    def sync_with_repo(self, path, topic):
231
        # TODO
C. Albrecht's avatar
WIP  
C. Albrecht committed
232
233
        if not self.__ws_connected:
            return None
Jiahang Chen's avatar
Jiahang Chen committed
234
        msg = {"topic": topic, "path": path, "value": self.repo_json[path]}
C. Albrecht's avatar
WIP  
C. Albrecht committed
235
236
237
        self._ws.send(json.dumps(msg))

    def __connect_with_broker(self):
Jiahang Chen's avatar
Jiahang Chen committed
238
239
240
241
242
243
244
        print(BColors.OKBLUE + "[S³I][Broker]" + BColors.ENDC + ": Connect with S3I-Broker")
        self.__endpoint = find_broker_endpoint(self.dir, thing_id=self.thing_id)
        if self.__is_broker_rest:
            self.broker = BrokerREST(token=self.access_token)

            def receive():
                while True:
245
246
247
248
249
250
251
252
253
                    try:
                        time.sleep(0.1)
                        msg_str = self.broker.receive_once(self.__endpoint)
                        if msg_str == "":
                            continue
                        else:
                            self.__on_broker_callback(ch=None, method=None, properties=None,
                                                      body=json.loads(msg_str))
                    except:
Jiahang Chen's avatar
Jiahang Chen committed
254
                        continue
255

Jiahang Chen's avatar
Jiahang Chen committed
256
            threading.Thread(
Jiahang Chen's avatar
Jiahang Chen committed
257
                target=receive
Jiahang Chen's avatar
Jiahang Chen committed
258
259
260
261
262
263
264
265
266
267
268
269
270
271
            ).start()

        else:
            self.broker = Broker(
                auth_form="Username/Password",
                username=" ",
                password=self.__access_token,
                host=BaseVariable.BROKER_HOST,
            )

            threading.Thread(
                target=self.broker.receive,
                args=(self.__endpoint, self.__on_broker_callback),
            ).start()
C. Albrecht's avatar
WIP  
C. Albrecht committed
272
273

    def __on_broker_callback(self, ch, method, properties, body):
Jiahang Chen's avatar
Jiahang Chen committed
274
275
276
277
        if isinstance(body, bytes):
            body = ast.literal_eval(body.decode("utf-8"))
        elif isinstance(body, int):
            return
Jiahang Chen's avatar
Jiahang Chen committed
278
        elif isinstance(body, str):
Jiahang Chen's avatar
Jiahang Chen committed
279
280
281
282
283
284
285
286
287
288
289
290
291
            return

        message_type = body.get("messageType")
        if message_type == "userMessage":
            self.on_user_message(body)
        elif message_type == "serviceRequest":
            self.on_service_request(body)
        elif message_type == "getValueRequest":
            self.on_get_value_request(body)
        elif message_type == "getValueReply":
            self.on_get_value_reply(body)
        elif message_type == "serviceReply":
            self.on_service_reply(body)
Jiahang Chen's avatar
Jiahang Chen committed
292
        else:
Jiahang Chen's avatar
Jiahang Chen committed
293
294
            ### TODO send user message reply back
            pass
Jiahang Chen's avatar
Jiahang Chen committed
295
296

    def on_user_message(self, msg):
Jiahang Chen's avatar
Jiahang Chen committed
297
298
299
300
301
302
303
        print(
            BColors.OKBLUE
            + "[S³I][Broker]"
            + BColors.ENDC
            + ": You have received a user message"
            + json.dumps(msg, indent=2)
        )
C. Albrecht's avatar
WIP  
C. Albrecht committed
304
305

    def on_get_value_request(self, msg):
Jiahang Chen's avatar
Jiahang Chen committed
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
373
374
375
376
377
378
379
380
381
382
383
384
385
386
387
388
389
390
391
392
393
394
395
396
397
398
399
400
401
402
403
404
405
406
407
        get_value_reply = GetValueReply()
        request_sender = msg.get("sender")
        request_msg_id = msg.get("identifier")
        request_sender_endpoint = msg.get("replyToEndpoint")
        attribute_path = msg.get("attributePath")
        reply_msg_uuid = "s3i:" + str(uuid.uuid4())
        try:
            value = self._uriToData(attribute_path)
        except KeyError:
            value = "invalid attribute path"

        get_value_reply.fillGetValueReply(
            senderUUID=self.thing_id,
            receiverUUID=[request_sender],
            results=json.dumps(value),
            msgUUID=reply_msg_uuid,
            replyingToUUID=request_msg_id,
        )
        self.broker.send(
            receiver_endpoints=[request_sender_endpoint],
            msg=json.dumps(get_value_reply.msg),
        )

    def _uriToData(self, uri):
        if uri == "":
            return self.repo_json
        else:
            uri_list = uri.split("/")
            if uri_list[0] == "features":
                try:
                    return self.repo_json[uri]
                except KeyError:
                    return "invalid attribute path"

            try:
                self._getValue(self.repo_json, uri_list)
            except:
                return "invalid attribute path"
            if self.__resGetValue.__len__() == 0:
                return "invalid attribute path"
            response = copy.deepcopy(self.__resGetValue)
            self.__resGetValue.clear()
            if response.__len__() == 1:
                return response[0]
            else:
                return response

    def _getValue(self, source, uri_list):
        value = source[uri_list[0]]
        if uri_list.__len__() == 1:
            # if is ditto-feature
            if isinstance(value, str):
                try:
                    stringValue_split = value.split(":")
                    if stringValue_split[0] == "ditto-feature":
                        value = self.repo_json["features"][stringValue_split[1]]["properties"][uri_list[0]]
                except:
                    pass
            self.__resGetValue.append(value)
            return
        if isinstance(value, dict):
            del uri_list[0]
            self._getValue(value, uri_list)
        if isinstance(value, list):
            if isinstance(value[0], (str, int, float, bool, list)):
                return value
            if isinstance(value[0], dict):
                for item in value:
                    if item["class"] == "ml40::Thing":
                        for i in item["roles"]:
                            if self._findValue(i, uri_list[1]):
                                uri_list_1 = copy.deepcopy(uri_list)
                                del uri_list_1[:2]
                                self._getValue(item, uri_list_1)
                    else:
                        if self._findValue(item, uri_list[1]):
                            uri_list_1 = copy.deepcopy(uri_list)
                            del uri_list_1[:2]
                            if not uri_list_1:
                                self.__resGetValue.append(item)
                                return
                            else:
                                self._getValue(item, uri_list_1)
        if isinstance(value, (str, int, float, bool)):
            # if is ditto-feature
            if isinstance(value, str):
                try:
                    stringValue_split = value.split(":")
                    if stringValue_split[0] == "ditto-feature":
                        value = self.repo_json["features"][stringValue_split[1]][
                            "properties"
                        ][uri_list[0]]
                except:
                    pass
            self.__resGetValue.append(value)

    def _findValue(self, json, value):
        for item in json:
            if json[item] == value:
                # print("Parameter: ", json[item])
                return True
        return False
C. Albrecht's avatar
WIP  
C. Albrecht committed
408
409
410

    def on_service_request(self, body_json):
        service_type = body_json.get("serviceType")
411
        parameters = body_json.get("parameters")
Jiahang Chen's avatar
Jiahang Chen committed
412
        service_functionality = service_type.split('/')[0]
Jiahang Chen's avatar
Jiahang Chen committed
413
        service_functionality_obj = self.features.get(service_functionality)
Jiahang Chen's avatar
Jiahang Chen committed
414
        if service_functionality_obj is None:
C. Albrecht's avatar
WIP  
C. Albrecht committed
415
            APP_LOGGER.critical(
Jiahang Chen's avatar
Jiahang Chen committed
416
                "Functionality %s is not one of the built-in functionalities in %s!" % (
Jiahang Chen's avatar
Jiahang Chen committed
417
                    service_functionality, self.name)
C. Albrecht's avatar
WIP  
C. Albrecht committed
418
419
420
            )
        else:
            # TODO: Call right functionality.
Jiahang Chen's avatar
Jiahang Chen committed
421
422
            method = getattr(service_functionality_obj, service_type.split('/')[1])
            result = method(**parameters)
423
424

            if isinstance(result, bool):
Jiahang Chen's avatar
Jiahang Chen committed
425
                result = {"ok": result}
Jiahang Chen's avatar
Jiahang Chen committed
426
427
428
429
430
431

            service_reply = ServiceReply()
            service_reply.fillServiceReply(
                senderUUID=self.thing_id,
                receiverUUID=body_json.get("sender", None),
                serviceType=body_json.get("serviceType", None),
Jiahang Chen's avatar
Jiahang Chen committed
432
                results=result,
Jiahang Chen's avatar
Jiahang Chen committed
433
                replyingToUUID=body_json.get("identifier", None),
Jiahang Chen's avatar
Jiahang Chen committed
434
                msgUUID="s3i:{}".format(uuid.uuid4())
Jiahang Chen's avatar
Jiahang Chen committed
435
            )
436
437
            self.broker.send(receiver_endpoints=[body_json.get("replyToEndpoint", None)],
                             msg=json.dumps(service_reply.msg))
Jiahang Chen's avatar
Jiahang Chen committed
438
439

    def on_get_value_reply(self, msg):
Jiahang Chen's avatar
Jiahang Chen committed
440
441
442
443
444
445
446
        print(
            BColors.OKBLUE
            + "[S³I][Broker]"
            + BColors.ENDC
            + ": You have received a get value reply"
            + json.dumps(msg, indent=2)
        )
Jiahang Chen's avatar
Jiahang Chen committed
447
448

    def on_service_reply(self, msg):
Jiahang Chen's avatar
Jiahang Chen committed
449
450
451
452
453
454
455
456
457
458
459
460
461
462
463
        print(
            BColors.OKBLUE
            + "[S³I][Broker]"
            + BColors.ENDC
            + ": You have received a service reply"
            + json.dumps(msg, indent=2)
        )

    def to_dir_json(self):
        self.dir_json = self.dir.queryThingIDBased(self.thing_id)
        if self.thing_id is not None:
            self.dir_json["thingId"] = self.thing_id
        if self.policy_id is not None:
            self.dir_json["policyId"] = self.policy_id
        self.dir_json["attributes"]["class"] = "ml40::Thing"
464

Jiahang Chen's avatar
Jiahang Chen committed
465
466
467
468
469
470
471
472
473
474
475
476
477
478
479
480
481
482
483
484
485
486
487
488
489
490
491
492
493
494
495
496
497
498
499
500
501
502
503
504
505
506
507
508
509
        self.dir_json["attributes"]["name"] = self.name
        if self.roles:
            self.dir_json["attributes"]["roles"] = list()
        if self.features:
            self.dir_json["attributes"]["features"] = list()

        for key in self.roles.keys():
            self.dir_json["attributes"]["roles"].append(self.roles[key].to_json())
        for key in self.features.keys():
            self.dir_json["attributes"]["features"].append(self.features[key].to_json())

        return self.dir_json

    def to_repo_json(self, path=None, value=None):
        if path is None and value is None:
            self.repo_json = {
                "thingId": self.thing_id,
                "policyId": self.policy_id,
                "attributes": {
                    "class": "ml40::Thing",
                    "name": self.name,
                }
            }
            if self.roles:
                self.repo_json["attributes"]["roles"] = list()
            if self.features:
                self.repo_json["attributes"]["features"] = list()
            for key in self.roles.keys():
                self.repo_json["attributes"]["roles"].append(self.roles[key].to_json())
            for key in self.features:
                self.repo_json["attributes"]["features"].append(self.features[key].to_json())
        return self.repo_json

    def to_subthing_json(self):
        json_out = {
            "class": "ml40::Thing",
            "name": self.name,
            "roles": [],
            "features": []
        }
        for key in self.roles.keys():
            json_out["roles"].append(self.roles[key].to_json())
        for key in self.features.keys():
            json_out["features"].append(self.features[key].to_json())
        return json_out