thing.py 31.9 KB
Newer Older
1
2
3
"""This module implements the thing class which is the core element of
this package."""

C. Albrecht's avatar
WIP    
C. Albrecht committed
4
5
6
import threading
import json
import uuid
7
8
9
10
import time
import copy
from s3i import IdentityProvider, TokenType, GetValueReply, Directory
from s3i import Repository
11
import s3i.exception
Jiahang Chen's avatar
Jiahang Chen committed
12
from s3i.broker import Broker, BrokerREST
Jiahang Chen's avatar
Jiahang Chen committed
13
from s3i.messages import ServiceReply
C. Albrecht's avatar
WIP    
C. Albrecht committed
14
from ml.tools import BColors
Jiahang Chen's avatar
Jiahang Chen committed
15
from ml.tools import find_broker_endpoint
C. Albrecht's avatar
WIP    
C. Albrecht committed
16
from ml.app_logger import APP_LOGGER
Jiahang Chen's avatar
Jiahang Chen committed
17
18


19
20
21
class BaseVariable:
    """The BaseVariable class holds various urls to s3i services."""

C. Albrecht's avatar
WIP    
C. Albrecht committed
22
23
24
25
26
27
28
29
    IDP_URL = "https://idp.s3i.vswf.dev/"
    IDP_REALM = "KWH"
    BROKER_HOST = "rabbitmq.s3i.vswf.dev"
    REPO_WWS_URL = "wss://ditto.s3i.vswf.dev/ws/2"
    REPO_URL = "https://ditto.s3i.vswf.dev/api/2/"
    DIR_URL = "https://dir.s3i.vswf.dev/api/2/"


Jiahang Chen's avatar
Jiahang Chen committed
30
class Thing:
31
    """The thing class represents a customizable runtime environment for
Jiahang Chen's avatar
Jiahang Chen committed
32
    operating Digital Twins complying the Forest Modeling Language (fml40)."""
33

C. Albrecht's avatar
WIP    
C. Albrecht committed
34
    def __init__(
35
36
37
38
39
40
41
42
43
        self,
        model: dict,
        client_secret="",
        grant_type="password",
        is_broker=False,
        is_repo=False,
        is_broker_rest=True,
        username=None,
        password=None,
C. Albrecht's avatar
WIP    
C. Albrecht committed
44
    ):
Jiahang Chen's avatar
Jiahang Chen committed
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
        """
        Constructor

        :param model: edge-device or S³I Repository specified JSON entry, like config file for Digital Twins
        :type model: dict
        :param client_secret: OAuth 2.0 specified client secret, generated in the S³I IdentityProvider
        :type client_secret: str
        :param grant_type: OAuth 2.0 specified grant type to issue a JWT. Here the grant type can be password or client_credentials
        :type grant_type: str
        :param is_broker: whether broker interface is enabled in the ml40::thing instance
        :type is_broker: bool
        :param is_broker_rest: Whether the connection with the S³I Broker is established via HTTP REST
        :type is_broker_rest: bool
        :param is_repo: Whether the thing uses the S³I Repository to launch its Digital Twin in the cloud
        :type is_repo: bool
        :param username: OAuth 2.0 specified username, registered in the S³I IdentityProvider. If the grant_type is set as password, the username is required
        :type username: str
        :param password: OAuth 2.0 specified password, registered in the S³I IdentityProvider. If the grant_type is set as password, the password is required

        """
65

Jiahang Chen's avatar
Jiahang Chen committed
66
        self.__model = model
C. Albrecht's avatar
WIP    
C. Albrecht committed
67
68
69
70
71
72
        self.__thing_id = model.get("thingId", "")
        self.__policy_id = model.get("policyId", "")
        self.__grant_type = grant_type
        self.__username = username
        self.__password = password
        self.__client_secret = client_secret
Jiahang Chen's avatar
Jiahang Chen committed
73

C. Albrecht's avatar
WIP    
C. Albrecht committed
74
        self.__is_broker = is_broker
Jiahang Chen's avatar
Jiahang Chen committed
75
        self.__is_broker_rest = is_broker_rest
C. Albrecht's avatar
WIP    
C. Albrecht committed
76
        self.__is_repo = is_repo
Jiahang Chen's avatar
Jiahang Chen committed
77

C. Albrecht's avatar
WIP    
C. Albrecht committed
78
        self.__access_token = ""
Jiahang Chen's avatar
Jiahang Chen committed
79
80
        self.__endpoint = ""

C. Albrecht's avatar
WIP    
C. Albrecht committed
81
        self.__ws_connected = False
Jiahang Chen's avatar
Jiahang Chen committed
82
83
        self.broker = None
        self.ws = None
84
85
        # TODO: Change the variable name dir, because it is a builtin
        # name.
Jiahang Chen's avatar
Jiahang Chen committed
86
        self.dir = None
Jiahang Chen's avatar
Jiahang Chen committed
87
88
        self.repo = None

89
        # : Dict[<str>, <str>]
Jiahang Chen's avatar
Jiahang Chen committed
90
91
        self.repo_json = dict()
        self.dir_json = dict()
92
        self.dt_json = dict()
C. Albrecht's avatar
WIP    
C. Albrecht committed
93
94
95

        attributes = model.get("attributes", None)
        self.__name = ""
Jiahang Chen's avatar
Jiahang Chen committed
96
        self.__roles = {}
Jiahang Chen's avatar
Jiahang Chen committed
97
        self.__features = {}
Jiahang Chen's avatar
Jiahang Chen committed
98
        self.__ditto_features = {}
99
        # ??? Is this property necessary? Only used as return value in _getValue()
Jiahang Chen's avatar
Jiahang Chen committed
100
        self.__resGetValue = list()
C. Albrecht's avatar
WIP    
C. Albrecht committed
101
102
        if attributes:
            self.__name = attributes.get("name", "")
Jiahang Chen's avatar
Jiahang Chen committed
103
104
105

    @property
    def model(self):
Jiahang Chen's avatar
Jiahang Chen committed
106
        """Returns the specification JSON from which this thing has been constructed from.
107
108
109
110
111
112

        :returns: Representation of a ml40 compliant thing.
        :rtype: dict

        """

Jiahang Chen's avatar
Jiahang Chen committed
113
        return self.__model
Jiahang Chen's avatar
Jiahang Chen committed
114

Jiahang Chen's avatar
Jiahang Chen committed
115
116
117
118
119
120
121
122
123
    @property
    def ditto_features(self):
        return self.__ditto_features

    @ditto_features.setter
    def ditto_features(self, value):
        self.__ditto_features = value


Jiahang Chen's avatar
Jiahang Chen committed
124
125
    @property
    def features(self):
Jiahang Chen's avatar
Jiahang Chen committed
126
        """Returns thing's features.
127

128
129
        :returns: Features
        :rtype: dict
130
131
132

        """

Jiahang Chen's avatar
Jiahang Chen committed
133
134
135
136
        return self.__features

    @features.setter
    def features(self, value):
Jiahang Chen's avatar
Jiahang Chen committed
137
        """Replaces thing's features with value.
138
139
140
141
142

        :param value: New collection of features

        """

Jiahang Chen's avatar
Jiahang Chen committed
143
144
        self.__features = value

Jiahang Chen's avatar
Jiahang Chen committed
145
146
    @property
    def roles(self):
147
        """Returns the thing's roles.
148
149
150

        :returns: ml40 roles
        :rtype: dict
151

152
153
        """

Jiahang Chen's avatar
Jiahang Chen committed
154
155
156
157
        return self.__roles

    @roles.setter
    def roles(self, value):
Jiahang Chen's avatar
Jiahang Chen committed
158
        """Replaces thing's roles with value
159

160
        :param value: New collection of roles
161
162
        """

Jiahang Chen's avatar
Jiahang Chen committed
163
164
        self.__roles = value

C. Albrecht's avatar
WIP    
C. Albrecht committed
165
166
    @property
    def client_secret(self):
167
168
169
170
        """Returns the client secret.

        :returns: Client secret
        :rtype: str
171

172
173
        """

C. Albrecht's avatar
WIP    
C. Albrecht committed
174
175
176
177
        return self.__client_secret

    @property
    def grant_type(self):
Jiahang Chen's avatar
Jiahang Chen committed
178
        """Returns the method used to obtain JSON Web Tokens from the S³I IdentityProvider
179

Jiahang Chen's avatar
Jiahang Chen committed
180
        :returns: OAuth2 specified grant type [password, client_credentials]
181
        :rtype: str
182

183
184
        """

C. Albrecht's avatar
WIP    
C. Albrecht committed
185
186
187
188
        return self.__grant_type

    @property
    def access_token(self):
Jiahang Chen's avatar
Jiahang Chen committed
189
        """Returns the current JSON Web token.
190

Jiahang Chen's avatar
Jiahang Chen committed
191
        :returns: JSON Web token
192
        :rtype: str
193

194
        """
C. Albrecht's avatar
WIP    
C. Albrecht committed
195
196
197
198
        return self.__access_token

    @property
    def name(self):
199
200
201
202
        """Returns the name of this thing.

        :returns: name
        :rtype: str
203

204
205
        """

C. Albrecht's avatar
WIP    
C. Albrecht committed
206
207
208
209
        return self.__name

    @property
    def thing_id(self):
210
211
212
213
        """Returns the identifier of this thing.

        :returns: identifier
        :rtype: str
214

215
216
        """

C. Albrecht's avatar
WIP    
C. Albrecht committed
217
218
        return self.__thing_id

Jiahang Chen's avatar
Jiahang Chen committed
219
220
    @property
    def policy_id(self):
221
222
223
224
225
226
227
        """Returns the identifier of this thing's policy.

        :returns: identifier
        :rtype: str

        """

Jiahang Chen's avatar
Jiahang Chen committed
228
229
        return self.__policy_id

C. Albrecht's avatar
WIP    
C. Albrecht committed
230
    def run_forever(self):
231
        """Starts the thing in permanent mode.
232

233
234
235
        """

        # TODO: Use logger instead!
Jiahang Chen's avatar
Jiahang Chen committed
236
        print("[S³I]: Launch {}{}{}".format(BColors.OKGREEN, self.name, BColors.ENDC))
C. Albrecht's avatar
WIP    
C. Albrecht committed
237
238
        self.__connect_with_idp()

239
        threading.Thread(target=self.__json_syn).start()
Jiahang Chen's avatar
Jiahang Chen committed
240
241
242
243
244
245
        threading.Thread(target=self.__dir_syn).start()
        if self.__is_repo:
            threading.Thread(target=self.__repo_syn).start()

    @staticmethod
    def add_user_def(func):
Jiahang Chen's avatar
Jiahang Chen committed
246
        """Insert user-specified function in the thing object.
247

Jiahang Chen's avatar
Jiahang Chen committed
248
        :param func: external defined function to be executed.
249
250
251

        """

Jiahang Chen's avatar
Jiahang Chen committed
252
253
        threading.Thread(target=func).start()

254
    def __json_syn(self, freq=0.1):
Jiahang Chen's avatar
Jiahang Chen committed
255
256
257
258
259
260
        """
        Applies local changes to the original model in the thing object

        :param freq: Frequency of the update
        :type freq: float
        """
Jiahang Chen's avatar
Jiahang Chen committed
261
        while True:
262
            try:
263
264
265
266
267
                time.sleep(freq)
                self.to_json()
            except:
                continue

268
269
    def __dir_syn(self):
        """Applies local changes to the directory entry in the cloud only once.
270
271

        """
272
273
274
275
276
        self.to_dir_json()
        if self.dir_json is not None:
            self.dir.updateThingIDBased(
                thingID=self.thing_id, payload=self.dir_json
            )
Jiahang Chen's avatar
Jiahang Chen committed
277

278
    def __repo_syn(self, freq=0.1):
279
        """Applies local changes to the repository entry in the cloud.
280
281

        :param freq: Frequency of the update.
Jiahang Chen's avatar
Jiahang Chen committed
282
        :type freq: float
283
        """
Jiahang Chen's avatar
Jiahang Chen committed
284
        while self.__is_repo:
285
            try:
286
                time.sleep(freq)
287
288
                old_repo_json = self.repo_json
                self.to_repo_json()
289
                # TODO: Clean up this reqion
290
291
292
                if self.repo_json == old_repo_json:
                    continue
                else:
293
294
295
                    self.repo.updateThingIDBased(
                        thingID=self.thing_id, payload=self.repo_json
                    )
296
            except:
Jiahang Chen's avatar
Jiahang Chen committed
297
298
                continue

C. Albrecht's avatar
WIP    
C. Albrecht committed
299
    def __connect_with_idp(self):
Jiahang Chen's avatar
Jiahang Chen committed
300
301
        """Establishes a connection to the S³I IdentityProvider which guarantees,
        that the JSON web token needed to use s3i services.
302
        be renewed if it has expired.
303
304
305

        """

Jiahang Chen's avatar
Jiahang Chen committed
306
        #TODO: Use logger!
307
308
309
310
311
312
        print(
            BColors.OKBLUE
            + "[S³I][IdP]"
            + BColors.ENDC
            + ": Connect with S3I IdentityProvider"
        )
C. Albrecht's avatar
WIP    
C. Albrecht committed
313
314
315
316
317
318
319
320
321
322
323
324
325
326
        idp = IdentityProvider(
            grant_type=self.__grant_type,
            client_id=self.__thing_id,
            username=self.__username,
            password=self.__password,
            client_secret=self.__client_secret,
            realm=BaseVariable.IDP_REALM,
            identity_provider_url=BaseVariable.IDP_URL,
        )

        # This may take a while so fetch token directly.
        idp.run_forever(token_type=TokenType.ACCESS_TOKEN, on_new_token=self.__on_token)

    def __on_token(self, token):
Jiahang Chen's avatar
Jiahang Chen committed
327
328
        """Updates the JSON Web Token with token and reestablishes connections
        to the s3i services .
329

Jiahang Chen's avatar
Jiahang Chen committed
330
331
        :param token: New JSON Web token
        :type token: str
332
333
334

        """

C. Albrecht's avatar
WIP    
C. Albrecht committed
335
        self.__access_token = token
Jiahang Chen's avatar
Jiahang Chen committed
336
        self.__connect_with_dir()
Jiahang Chen's avatar
Jiahang Chen committed
337
        self.__connect_with_repo()
C. Albrecht's avatar
WIP    
C. Albrecht committed
338
339
        if self.__is_broker:
            self.__connect_with_broker()
Jiahang Chen's avatar
Jiahang Chen committed
340

Jiahang Chen's avatar
Jiahang Chen committed
341
    def __connect_with_dir(self):
342
        """Initializes the property dir with a Directory object which can be
Jiahang Chen's avatar
Jiahang Chen committed
343
        used to access the s3i Directory.
344
345
346
347
348
349
350

        :returns:
        :rtype:

        """

        # TODO: Use logger
Jiahang Chen's avatar
Jiahang Chen committed
351
        print(
Jiahang Chen's avatar
Jiahang Chen committed
352
            BColors.OKBLUE
Jiahang Chen's avatar
Jiahang Chen committed
353
            + "[S³I][Dir]"
Jiahang Chen's avatar
Jiahang Chen committed
354
            + BColors.ENDC
Jiahang Chen's avatar
Jiahang Chen committed
355
            + ": Connect with S3I Directory"
Jiahang Chen's avatar
Jiahang Chen committed
356
        )
357
358
359
        self.dir = Directory(
            s3i_dir_url=BaseVariable.DIR_URL, token=self.__access_token
        )
Jiahang Chen's avatar
Jiahang Chen committed
360

C. Albrecht's avatar
WIP    
C. Albrecht committed
361
    def __connect_with_repo(self):
362
        """Initializes the property repo whit a Repository object which can be
Jiahang Chen's avatar
Jiahang Chen committed
363
        used to access the s3i Repository.
364
365
366
367
368
369
370

        :returns:
        :rtype:

        """

        # TODO: Use logger
Jiahang Chen's avatar
Jiahang Chen committed
371
        print(
Jiahang Chen's avatar
Jiahang Chen committed
372
            BColors.OKBLUE
Jiahang Chen's avatar
Jiahang Chen committed
373
            + "[S³I][Repo]"
Jiahang Chen's avatar
Jiahang Chen committed
374
            + BColors.ENDC
Jiahang Chen's avatar
Jiahang Chen committed
375
            + ": Connect with S3I Repository"
Jiahang Chen's avatar
Jiahang Chen committed
376
        )
377
378
379
        self.repo = Repository(
            s3i_repo_url=BaseVariable.REPO_URL, token=self.__access_token
        )
Jiahang Chen's avatar
Jiahang Chen committed
380

C. Albrecht's avatar
WIP    
C. Albrecht committed
381
    def __connect_with_broker(self):
382
        """Initializes the property broker with a Broker object. Additionally
Jiahang Chen's avatar
Jiahang Chen committed
383
        a callback function is registered which handles incoming S³I-B Messages
384
385
386
387
388
        messages.

        """

        # TODO: Use logger
389
390
391
392
393
394
        print(
            BColors.OKBLUE
            + "[S³I][Broker]"
            + BColors.ENDC
            + ": Connect with S3I Broker"
        )
Jiahang Chen's avatar
Jiahang Chen committed
395
396
397
398
399
400
        self.__endpoint = find_broker_endpoint(self.dir, thing_id=self.thing_id)
        if self.__is_broker_rest:
            self.broker = BrokerREST(token=self.access_token)

            def receive():
                while True:
401
402
403
404
405
406
                    try:
                        time.sleep(0.1)
                        msg_str = self.broker.receive_once(self.__endpoint)
                        if msg_str == "":
                            continue
                        else:
407
408
409
410
411
412
                            self.__on_broker_callback(
                                ch=None,
                                method=None,
                                properties=None,
                                body=json.loads(msg_str),
                            )
413
                    except:
Jiahang Chen's avatar
Jiahang Chen committed
414
                        continue
415

416
            threading.Thread(target=receive).start()
Jiahang Chen's avatar
Jiahang Chen committed
417
418

        else:
419
420
421
422
423
424
425
426
427
428
429
430
431
432
433
            if self.broker is None:
                # first time to build a broker instance
                self.broker = Broker(
                    auth_form="Username/Password",
                    username=" ",
                    password=self.__access_token,
                    host=BaseVariable.BROKER_HOST,
                )

                threading.Thread(
                    target=self.broker.receive,
                    args=(self.__endpoint, self.__on_broker_callback),
                ).start()
            else:
                self.broker.update_token(self.access_token)
C. Albrecht's avatar
WIP    
C. Albrecht committed
434
435

    def __on_broker_callback(self, ch, method, properties, body):
436
437
438
439
440
441
442
443
        """Parses body (content of a S3I-B message) and delegates the
        processing of the message to a separate method. The method is
        selected according to the message's type.

        :param body: S3I-B message

        """

Jiahang Chen's avatar
Jiahang Chen committed
444
        if isinstance(body, bytes):
Jiahang Chen's avatar
Jiahang Chen committed
445
446
447
448
449
            body_str = body.decode('utf8').replace("'", '"')
            try:
                body = json.loads(body_str)
            except ValueError:
                pass
450

Jiahang Chen's avatar
Jiahang Chen committed
451
452
        elif isinstance(body, int):
            return
Jiahang Chen's avatar
Jiahang Chen committed
453
        elif isinstance(body, str):
Jiahang Chen's avatar
Jiahang Chen committed
454
455
            return

Jiahang Chen's avatar
Jiahang Chen committed
456
        ch.basic_ack(method.delivery_tag)
Jiahang Chen's avatar
Jiahang Chen committed
457
458
459
460
461
462
463
464
465
466
467
        message_type = body.get("messageType")
        if message_type == "userMessage":
            self.on_user_message(body)
        elif message_type == "serviceRequest":
            self.on_service_request(body)
        elif message_type == "getValueRequest":
            self.on_get_value_request(body)
        elif message_type == "getValueReply":
            self.on_get_value_reply(body)
        elif message_type == "serviceReply":
            self.on_service_reply(body)
Jiahang Chen's avatar
Jiahang Chen committed
468
        else:
Jiahang Chen's avatar
Jiahang Chen committed
469
470
            ### TODO send user message reply back
            pass
Jiahang Chen's avatar
Jiahang Chen committed
471

472
473
474
475
476
477
478
479
480
481
482
483
484
485
486
487
488
489
    def __send_message_to_broker(self, receiver_endpoints, msg):
        try:
            res = self.broker.send(
                receiver_endpoints=receiver_endpoints,
                msg=json.dumps(msg)
            )
            print(
                BColors.OKBLUE
                + "[S³I][Broker]"
                + BColors.ENDC
                + ": Send a S³I-B GetValueReply back to the requester  "
            )
            return res

        except s3i.exception.S3IBrokerAMQPError:
            value = "invalid request sender endpoint {}".format(receiver_endpoints)
            APP_LOGGER.critical(value)

Jiahang Chen's avatar
Jiahang Chen committed
490
    def on_user_message(self, msg):
Jiahang Chen's avatar
Jiahang Chen committed
491
        """Handles incoming S³I-B UserMessages.
492

Jiahang Chen's avatar
Jiahang Chen committed
493
        :param msg: S³I-B UserMessages
494
495
496

        """

Jiahang Chen's avatar
Jiahang Chen committed
497
498
499
500
        print(
            BColors.OKBLUE
            + "[S³I][Broker]"
            + BColors.ENDC
Jiahang Chen's avatar
Jiahang Chen committed
501
            + ": You have received a S³I-B UserMessage"
Jiahang Chen's avatar
Jiahang Chen committed
502
503
            + json.dumps(msg, indent=2)
        )
C. Albrecht's avatar
WIP    
C. Albrecht committed
504
505

    def on_get_value_request(self, msg):
Jiahang Chen's avatar
Jiahang Chen committed
506
        """Handles incoming GetValueRequest message. Looks up the value specified in msg and
507
508
509
510
511
512
        sends a GetValueReply message back to the sender.

        :param msg: GetValueRequest

        """

Jiahang Chen's avatar
Jiahang Chen committed
513
514
515
516
517
518
519
        print(
            BColors.OKBLUE
            + "[S³I][Broker]"
            + BColors.ENDC
            + ": You have received a S³I-B GetValueRequest"
            + json.dumps(msg, indent=2)
        )
Jiahang Chen's avatar
Jiahang Chen committed
520
521
522
523
524
525
526
        get_value_reply = GetValueReply()
        request_sender = msg.get("sender")
        request_msg_id = msg.get("identifier")
        request_sender_endpoint = msg.get("replyToEndpoint")
        attribute_path = msg.get("attributePath")
        reply_msg_uuid = "s3i:" + str(uuid.uuid4())
        try:
Jiahang Chen's avatar
Jiahang Chen committed
527
528
529
530
531
532
533
            print(
                BColors.OKBLUE
                + "[S³I]"
                + BColors.ENDC
                + ": Search the attribute with path: "
                + attribute_path
            )
Jiahang Chen's avatar
Jiahang Chen committed
534
535
536
            value = self._uriToData(attribute_path)
        except KeyError:
            value = "invalid attribute path"
537
            APP_LOGGER.critical(value)
Jiahang Chen's avatar
Jiahang Chen committed
538
539
540

        get_value_reply.fillGetValueReply(
            senderUUID=self.thing_id,
Jiahang Chen's avatar
Jiahang Chen committed
541
            receiverUUIDs=[request_sender],
542
            results=value,
Jiahang Chen's avatar
Jiahang Chen committed
543
544
545
            msgUUID=reply_msg_uuid,
            replyingToUUID=request_msg_id,
        )
Jiahang Chen's avatar
Jiahang Chen committed
546

547
        res = self.__send_message_to_broker(
Jiahang Chen's avatar
Jiahang Chen committed
548
            receiver_endpoints=[request_sender_endpoint],
549
            msg=get_value_reply.msg
Jiahang Chen's avatar
Jiahang Chen committed
550
        )
551

552
553
554
555
556
557
558
559
560
561
562
563
564
        if self.__is_broker_rest:
            if res.status_code == 201:
                print(
                    BColors.OKBLUE
                    + "[S³I][Broker]"
                    + BColors.ENDC
                    + ": Send a S³I-B GetValueReply back to the requester  "
                )
            else:
                print(BColors.OKBLUE
                      + "[S³I][Broker]"
                      + BColors.ENDC
                      + res.text)
Jiahang Chen's avatar
Jiahang Chen committed
565
566

    def _uriToData(self, uri):
567
568
569
570
571
572
573
        """Returns a copy of the value found at uri.

        :param uri: Path to value
        :rtype: Feature

        """

Jiahang Chen's avatar
Jiahang Chen committed
574
        if uri == "":
575
            return self.dt_json
Jiahang Chen's avatar
Jiahang Chen committed
576
577
578
579
        else:
            uri_list = uri.split("/")
            if uri_list[0] == "features":
                try:
580
                    return self.dt_json[uri]
Jiahang Chen's avatar
Jiahang Chen committed
581
582
583
584
                except KeyError:
                    return "invalid attribute path"

            try:
585
                self._getValue(self.dt_json, uri_list)
Jiahang Chen's avatar
Jiahang Chen committed
586
587
588
589
590
591
592
593
594
595
596
597
            except:
                return "invalid attribute path"
            if self.__resGetValue.__len__() == 0:
                return "invalid attribute path"
            response = copy.deepcopy(self.__resGetValue)
            self.__resGetValue.clear()
            if response.__len__() == 1:
                return response[0]
            else:
                return response

    def _getValue(self, source, uri_list):
598
599
600
601
602
603
604
605
606
607
608
        """Searches for the value specified by uri_list in source and stores
        the result in __resGetValue.

        :param source: Object that is scanned
        :param uri_list: List containing path

        """

        # ??? What if the uri points to a Value object?
        # Shouldn't it be serialized?!

Jiahang Chen's avatar
Jiahang Chen committed
609
610
611
612
613
614
615
        value = source[uri_list[0]]
        if uri_list.__len__() == 1:
            # if is ditto-feature
            if isinstance(value, str):
                try:
                    stringValue_split = value.split(":")
                    if stringValue_split[0] == "ditto-feature":
616
617
618
                        value = self.dt_json["features"][stringValue_split[1]][
                            "properties"
                        ][uri_list[0]]
Jiahang Chen's avatar
Jiahang Chen committed
619
620
621
622
623
                except:
                    pass
            self.__resGetValue.append(value)
            return
        if isinstance(value, dict):
624
            # ??? uri_list.pop(0) better?!
Jiahang Chen's avatar
Jiahang Chen committed
625
626
627
628
629
630
631
632
633
634
635
636
637
638
639
640
641
642
643
644
645
646
647
648
649
650
651
652
            del uri_list[0]
            self._getValue(value, uri_list)
        if isinstance(value, list):
            if isinstance(value[0], (str, int, float, bool, list)):
                return value
            if isinstance(value[0], dict):
                for item in value:
                    if item["class"] == "ml40::Thing":
                        for i in item["roles"]:
                            if self._findValue(i, uri_list[1]):
                                uri_list_1 = copy.deepcopy(uri_list)
                                del uri_list_1[:2]
                                self._getValue(item, uri_list_1)
                    else:
                        if self._findValue(item, uri_list[1]):
                            uri_list_1 = copy.deepcopy(uri_list)
                            del uri_list_1[:2]
                            if not uri_list_1:
                                self.__resGetValue.append(item)
                                return
                            else:
                                self._getValue(item, uri_list_1)
        if isinstance(value, (str, int, float, bool)):
            # if is ditto-feature
            if isinstance(value, str):
                try:
                    stringValue_split = value.split(":")
                    if stringValue_split[0] == "ditto-feature":
653
                        value = self.dt_json["features"][stringValue_split[1]][
Jiahang Chen's avatar
Jiahang Chen committed
654
655
656
657
658
659
660
                            "properties"
                        ][uri_list[0]]
                except:
                    pass
            self.__resGetValue.append(value)

    def _findValue(self, json, value):
661
662
663
664
665
666
667
668
669
670
        """Returns true if value has been found in json, otherwise returns false.

        :param json: dictionary
        :param value:
        :returns:
        :rtype:

        """

        # TODO: Simplify: value in json.values()
Jiahang Chen's avatar
Jiahang Chen committed
671
672
673
674
675
        for item in json:
            if json[item] == value:
                # print("Parameter: ", json[item])
                return True
        return False
C. Albrecht's avatar
WIP    
C. Albrecht committed
676
677

    def on_service_request(self, body_json):
Jiahang Chen's avatar
Jiahang Chen committed
678
679
        """Handles S³I-B ServiceRequests. Executes the method of the
        functionality specified in serviceType and send a ServiceReply
680
681
682
683
684
685
        back to the sender.

        :param body_json: ServiceRequest

        """

Jiahang Chen's avatar
Jiahang Chen committed
686
687
688
689
690
691
692
        print(
            BColors.OKBLUE
            + "[S³I][Broker]"
            + BColors.ENDC
            + ": You have received a S³I-B ServiceRequest "
            + json.dumps(body_json, indent=2)
        )
C. Albrecht's avatar
WIP    
C. Albrecht committed
693
        service_type = body_json.get("serviceType")
694
        parameters = body_json.get("parameters")
695
        service_reply = ServiceReply()
Jiahang Chen's avatar
Jiahang Chen committed
696
        service_functionality = service_type.split('/')[0]
Jiahang Chen's avatar
Jiahang Chen committed
697
        service_functionality_obj = self.features.get(service_functionality)
Jiahang Chen's avatar
Jiahang Chen committed
698
        if service_functionality_obj is None:
C. Albrecht's avatar
WIP    
C. Albrecht committed
699
            APP_LOGGER.critical(
700
701
                "Functionality %s is not one of the built-in functionalities in %s!"
                % (service_functionality, self.name)
C. Albrecht's avatar
WIP    
C. Albrecht committed
702
            )
GromeTT's avatar
GromeTT committed
703
704
            service_reply.fillServiceReply(
                senderUUID=self.thing_id,
Jiahang Chen's avatar
Jiahang Chen committed
705
                receiverUUIDs=[body_json.get("sender", None)],
GromeTT's avatar
GromeTT committed
706
707
708
709
710
                serviceType=body_json.get("serviceType", None),
                results={"error": "invalid functionalities (serviceType) {}".format(service_functionality)},
                replyingToUUID=body_json.get("identifier", None),
                msgUUID="s3i:{}".format(uuid.uuid4())
            )
711
712
713
714
715
716
        else:
            # TODO: Call right functionality.
            try:
                method = getattr(service_functionality_obj, service_type.split('/')[1])
            except AttributeError:
                APP_LOGGER.critical(
717
                    "Method %s is not one of the built-in functionalities in %s!" % (
718
719
720
721
                        service_type.split('/')[1], self.name)
                )
                service_reply.fillServiceReply(
                    senderUUID=self.thing_id,
Jiahang Chen's avatar
Jiahang Chen committed
722
                    receiverUUIDs=[body_json.get("sender", None)],
723
                    serviceType=body_json.get("serviceType", None),
724
725
726
727
728
729
730
731
732
733
734
735
736
                    results={"error": "invalid method {}".format(service_type.split('/')[1])},
                    replyingToUUID=body_json.get("identifier", None),
                    msgUUID="s3i:{}".format(uuid.uuid4())
                )
            except IndexError:
                APP_LOGGER.critical(
                    "ServiceType consists of functionality and method name."
                )
                service_reply.fillServiceReply(
                    senderUUID=self.thing_id,
                    receiverUUIDs=[body_json.get("sender", None)],
                    serviceType=body_json.get("serviceType", None),
                    results={"error": "method missing"},
737
738
739
740
741
742
743
744
745
746
747
748
749
750
751
752
753
                    replyingToUUID=body_json.get("identifier", None),
                    msgUUID="s3i:{}".format(uuid.uuid4())
                )
            else:
                print(
                    BColors.OKBLUE
                    + "[S³I][Broker]"
                    + BColors.ENDC
                    + ": Execute the function {0} of the class {1}.".format(service_type.split('/')[1],
                                                                            service_type.split('/')[0])
                )
                try:
                    result = method(**parameters)
                except TypeError:
                    APP_LOGGER.critical("Invalid function arguments")
                    service_reply.fillServiceReply(
                        senderUUID=self.thing_id,
Jiahang Chen's avatar
Jiahang Chen committed
754
                        receiverUUIDs=[body_json.get("sender", None)],
755
756
757
758
                        serviceType=body_json.get("serviceType", None),
                        results={"error": "invalid function arguments (parameters)"},
                        replyingToUUID=body_json.get("identifier", None),
                        msgUUID="s3i:{}".format(uuid.uuid4())
759
760
                    )
                else:
761
762
                    if isinstance(result, bool):
                        result = {"ok": result}
763
                    elif result is None:
764
                        return
765
766
                    service_reply.fillServiceReply(
                        senderUUID=self.thing_id,
Jiahang Chen's avatar
Jiahang Chen committed
767
                        receiverUUIDs=[body_json.get("sender", None)],
768
769
770
771
772
773
                        serviceType=body_json.get("serviceType", None),
                        results=result,
                        replyingToUUID=body_json.get("identifier", None),
                        msgUUID="s3i:{}".format(uuid.uuid4())
                    )

774
775
776
777
778
        res = self.__send_message_to_broker(
            receiver_endpoints=[body_json.get("replyToEndpoint", None)],
            msg=service_reply.msg
        )

779
780
781
782
783
784
785
786
787
788
789
790
791
        if self.__is_broker_rest:
            if res.status_code == 201:
                print(
                    BColors.OKBLUE
                    + "[S³I][Broker]"
                    + BColors.ENDC
                    + ": Send a S³I-B ServiceReply back to the requester  "
                )
            else:
                print( BColors.OKBLUE
                    + "[S³I][Broker]"
                    + BColors.ENDC
                    + res.text)
Jiahang Chen's avatar
Jiahang Chen committed
792
793

    def on_get_value_reply(self, msg):
Jiahang Chen's avatar
Jiahang Chen committed
794
        """Handles incoming S³I-B GetValueReply. Prints the content of msg to stdout.
795
796
797
798
799
800
801

        :param msg: GetValueReply

        """

        # ???: Behavior should be defined by the user! Maybe he want
        # to process the result!
Jiahang Chen's avatar
Jiahang Chen committed
802
803
804
805
        print(
            BColors.OKBLUE
            + "[S³I][Broker]"
            + BColors.ENDC
Jiahang Chen's avatar
Jiahang Chen committed
806
            + ": You have received a S³I-B GetValueReply"
Jiahang Chen's avatar
Jiahang Chen committed
807
808
            + json.dumps(msg, indent=2)
        )
Jiahang Chen's avatar
Jiahang Chen committed
809
810
811
812
813
814
815
816
        value = msg.get("value", None)
        if isinstance(value, dict):
            value = json.dumps(value, indent=2)

        print(
            BColors.OKBLUE
            + "[S³I][Broker]"
            + BColors.ENDC
817
818
819
            + ": The queried value is: {0}{1}{2}".format(
                BColors.OKGREEN, value, BColors.ENDC
            )
Jiahang Chen's avatar
Jiahang Chen committed
820
821
        )

Jiahang Chen's avatar
Jiahang Chen committed
822
    def on_service_reply(self, msg):
Jiahang Chen's avatar
Jiahang Chen committed
823
        """Handles incoming S³I-B ServiceReply. Prints the content of msg to stdout.
824
825
826
827
828

        :param msg: ServiceReply

        """

Jiahang Chen's avatar
Jiahang Chen committed
829
830
831
832
        print(
            BColors.OKBLUE
            + "[S³I][Broker]"
            + BColors.ENDC
Jiahang Chen's avatar
Jiahang Chen committed
833
            + ": You have received a S³I-B ServiceReply"
Jiahang Chen's avatar
Jiahang Chen committed
834
835
            + json.dumps(msg, indent=2)
        )
Jiahang Chen's avatar
Jiahang Chen committed
836
837
838
839
840
841
842
843
        results = msg.get("results", None)
        if isinstance(results, dict):
            results = json.dumps(results, indent=2)

        print(
            BColors.OKBLUE
            + "[S³I][Broker]"
            + BColors.ENDC
844
845
846
            + ": The result is: {0}{1}{2}".format(
                BColors.OKGREEN, results, BColors.ENDC
            )
Jiahang Chen's avatar
Jiahang Chen committed
847
        )
Jiahang Chen's avatar
Jiahang Chen committed
848
849

    def to_dir_json(self):
850
851
852
853
854
855
856
        """Returns a dictionary representing this thing's directory entry.

        :returns: Directory representation of this object
        :rtype: dict

        """

Jiahang Chen's avatar
Jiahang Chen committed
857
858
859
860
861
        self.dir_json = self.dir.queryThingIDBased(self.thing_id)
        if self.thing_id is not None:
            self.dir_json["thingId"] = self.thing_id
        if self.policy_id is not None:
            self.dir_json["policyId"] = self.policy_id
Jiahang Chen's avatar
Jiahang Chen committed
862
863
864
865
866
        if self.name is not None:
            self.dir_json["attributes"]["name"] = self.name
        if self.features.get("ml40::Location") is not None:
            self.dir_json["attributes"]["location"] = {
                "longitude": self.features.get("ml40::Location").to_json()["longitude"],
GromeTT's avatar
GromeTT committed
867
                "latitude": self.features.get("ml40::Location").to_json()["latitude"]
Jiahang Chen's avatar
Jiahang Chen committed
868
869
870
871
            }
        self.dir_json["attributes"]["dataModel"] = "fml40"
        self.dir_json["attributes"]["thingStructure"] = {
            "class": "ml40::Thing",
GromeTT's avatar
GromeTT committed
872
            "links": []
Jiahang Chen's avatar
Jiahang Chen committed
873
        }
Jiahang Chen's avatar
Jiahang Chen committed
874
        for key in self.roles.keys():
GromeTT's avatar
GromeTT committed
875
876
877
878
            role_entry = {
                "association": "roles",
                "target": self.roles[key].to_json()
            }
Jiahang Chen's avatar
Jiahang Chen committed
879
            self.dir_json["attributes"]["thingStructure"]["links"].append(role_entry)
Jiahang Chen's avatar
Jiahang Chen committed
880

Jiahang Chen's avatar
Jiahang Chen committed
881
        for key in self.features.keys():
Jiahang Chen's avatar
Jiahang Chen committed
882
883
            feature_target = {
                "class": self.features[key].to_json()["class"],
Jiahang Chen's avatar
Jiahang Chen committed
884
            }
Jiahang Chen's avatar
Jiahang Chen committed
885
886
887
            if self.features[key].to_json().get("identifier") is not None:
                feature_target["identifier"] = self.features[key].to_json()["identifier"]

888
            feature_entry = {"association": "features", "target": feature_target}
Jiahang Chen's avatar
Jiahang Chen committed
889
            # if the feature has targets, like ml40::Composite
890
            if hasattr(self.features[key], "targets"):
Jiahang Chen's avatar
Jiahang Chen committed
891
892
                feature_entry["target"]["links"] = list()
                for target in self.features[key].targets.keys():
893
894
895
                    target_json = (
                        self.features[key].targets[target].to_subthing_dir_json()
                    )
Jiahang Chen's avatar
Jiahang Chen committed
896
897
                    feature_entry["target"]["links"].append(target_json)
            self.dir_json["attributes"]["thingStructure"]["links"].append(feature_entry)
Jiahang Chen's avatar
Jiahang Chen committed
898
899
        return self.dir_json

900
    def to_repo_json(self):
901
902
903
904
905
906
907
        """Returns a dictionary representing this thing's repository entry.

        :returns: Repository representation of this object
        :rtype: dict

        """

908
        self.repo_json = self.dt_json
Jiahang Chen's avatar
Jiahang Chen committed
909
910
        return self.repo_json

911
    def to_json(self):
912
913
914
915
916
917
918
        """Returns a dictionary representing this thing in it's current state.

        :returns: Representation of this object
        :rtype: dict

        """

919
920
921
922
923
924
        self.dt_json = {
            "thingId": self.thing_id,
            "policyId": self.policy_id,
            "attributes": {
                "class": "ml40::Thing",
                "name": self.name,
925
            },
926
927
928
929
930
        }
        if self.roles:
            self.dt_json["attributes"]["roles"] = list()
        if self.features:
            self.dt_json["attributes"]["features"] = list()
Jiahang Chen's avatar
Jiahang Chen committed
931
        if self.ditto_features:
Jiahang Chen's avatar
Jiahang Chen committed
932
            self.dt_json["features"] = dict()
933
934
        for key in self.roles.keys():
            self.dt_json["attributes"]["roles"].append(self.roles[key].to_json())
Jiahang Chen's avatar
Jiahang Chen committed
935
        for key in self.features.keys():
936
            self.dt_json["attributes"]["features"].append(self.features[key].to_json())
Jiahang Chen's avatar
Jiahang Chen committed
937
938
        for key in self.ditto_features.keys():
            self.dt_json["features"][key] = self.ditto_features[key].to_json()
939
940
        return self.dt_json

Jiahang Chen's avatar
Jiahang Chen committed
941
    def to_subthing_json(self):
942
943
944
        """Returns a dictionary representing this thing in it's current state
        as a subordinate thing. This representation should be used for
        subordinate things in s3i repository entries.
945

946
947
        :returns: Representation of this object as a subordinate thing
        :rtype: dict
948
949
950

        """

Jiahang Chen's avatar
Jiahang Chen committed
951
952
953
954
        json_out = {
            "class": "ml40::Thing",
            "name": self.name,
            "roles": [],
955
            "features": [],
Jiahang Chen's avatar
Jiahang Chen committed
956
        }
Jiahang Chen's avatar
Jiahang Chen committed
957
958
        if self.thing_id:
            json_out["identifier"] = self.thing_id
Jiahang Chen's avatar
Jiahang Chen committed
959
960
961
962
963
        for key in self.roles.keys():
            json_out["roles"].append(self.roles[key].to_json())
        for key in self.features.keys():
            json_out["features"].append(self.features[key].to_json())
        return json_out
Jiahang Chen's avatar
Jiahang Chen committed
964
965

    def to_subthing_dir_json(self):
966
967
968
969
970
971
972
973
974
        """Returns a dictionary representing this thing in it's current state
        as a subordinate thing. This representation should be used for
        subordinate things in s3i directory entries.

        :returns: Representation of this object as a subordinate thing.
        :rtype: dict

        """

Jiahang Chen's avatar
Jiahang Chen committed
975
976
977
        json_out = {"class": "ml40::Thing", "links": []}
        if self.thing_id:
            json_out["identifier"] = self.thing_id
Jiahang Chen's avatar
Jiahang Chen committed
978
        for key in self.roles.keys():
979
            role_entry = {"association": "roles", "target": self.roles[key].to_json()}
Jiahang Chen's avatar
Jiahang Chen committed
980
981
            json_out["links"].append(role_entry)
        for key in self.features.keys():
Jiahang Chen's avatar
Jiahang Chen committed
982
983
            feature_target = {
                "class": self.features[key].to_json()["class"],
Jiahang Chen's avatar
Jiahang Chen committed
984
            }
Jiahang Chen's avatar
Jiahang Chen committed
985
986
            if self.features[key].to_json().get("identifier") is not None:
                feature_target["identifier"] = self.features[key].to_json()["identifier"]
987
            feature_entry = {"association": "features", "target": feature_target}
Jiahang Chen's avatar
Jiahang Chen committed
988
989
            json_out["links"].append(feature_entry)
        return json_out