thing.py 30.3 KB
Newer Older
1
2
3
"""This module implements the thing class which is the core element of
this package."""

C. Albrecht's avatar
WIP  
C. Albrecht committed
4
5
6
7
import ast
import threading
import json
import uuid
8
9
10
11
import time
import copy
from s3i import IdentityProvider, TokenType, GetValueReply, Directory
from s3i import Repository
Jiahang Chen's avatar
Jiahang Chen committed
12
from s3i.broker import Broker, BrokerREST
Jiahang Chen's avatar
Jiahang Chen committed
13
from s3i.messages import ServiceReply
Jiahang Chen's avatar
Jiahang Chen committed
14
from ml.identifier import ID
C. Albrecht's avatar
WIP  
C. Albrecht committed
15
from ml.tools import BColors
Jiahang Chen's avatar
Jiahang Chen committed
16
from ml.tools import find_broker_endpoint
C. Albrecht's avatar
WIP  
C. Albrecht committed
17
from ml.app_logger import APP_LOGGER
Jiahang Chen's avatar
Jiahang Chen committed
18
19


20
21
22
class BaseVariable:
    """The BaseVariable class holds various urls to s3i services."""

C. Albrecht's avatar
WIP  
C. Albrecht committed
23
24
25
26
27
28
29
30
    IDP_URL = "https://idp.s3i.vswf.dev/"
    IDP_REALM = "KWH"
    BROKER_HOST = "rabbitmq.s3i.vswf.dev"
    REPO_WWS_URL = "wss://ditto.s3i.vswf.dev/ws/2"
    REPO_URL = "https://ditto.s3i.vswf.dev/api/2/"
    DIR_URL = "https://dir.s3i.vswf.dev/api/2/"


Jiahang Chen's avatar
Jiahang Chen committed
31
class Thing:
32
    """The thing class represents a customizable runtime environment for
Jiahang Chen's avatar
Jiahang Chen committed
33
    operating Digital Twins complying the Forest Modeling Language (fml40)."""
34

C. Albrecht's avatar
WIP  
C. Albrecht committed
35
    def __init__(
36
37
38
39
40
41
42
43
44
        self,
        model: dict,
        client_secret="",
        grant_type="password",
        is_broker=False,
        is_repo=False,
        is_broker_rest=True,
        username=None,
        password=None,
C. Albrecht's avatar
WIP  
C. Albrecht committed
45
    ):
Jiahang Chen's avatar
Jiahang Chen committed
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
        """
        Constructor

        :param model: edge-device or S³I Repository specified JSON entry, like config file for Digital Twins
        :type model: dict
        :param client_secret: OAuth 2.0 specified client secret, generated in the S³I IdentityProvider
        :type client_secret: str
        :param grant_type: OAuth 2.0 specified grant type to issue a JWT. Here the grant type can be password or client_credentials
        :type grant_type: str
        :param is_broker: whether broker interface is enabled in the ml40::thing instance
        :type is_broker: bool
        :param is_broker_rest: Whether the connection with the S³I Broker is established via HTTP REST
        :type is_broker_rest: bool
        :param is_repo: Whether the thing uses the S³I Repository to launch its Digital Twin in the cloud
        :type is_repo: bool
        :param username: OAuth 2.0 specified username, registered in the S³I IdentityProvider. If the grant_type is set as password, the username is required
        :type username: str
        :param password: OAuth 2.0 specified password, registered in the S³I IdentityProvider. If the grant_type is set as password, the password is required

        """
66

Jiahang Chen's avatar
Jiahang Chen committed
67
        self.__model = model
C. Albrecht's avatar
WIP  
C. Albrecht committed
68
69
70
71
72
73
        self.__thing_id = model.get("thingId", "")
        self.__policy_id = model.get("policyId", "")
        self.__grant_type = grant_type
        self.__username = username
        self.__password = password
        self.__client_secret = client_secret
Jiahang Chen's avatar
Jiahang Chen committed
74

C. Albrecht's avatar
WIP  
C. Albrecht committed
75
        self.__is_broker = is_broker
Jiahang Chen's avatar
Jiahang Chen committed
76
        self.__is_broker_rest = is_broker_rest
C. Albrecht's avatar
WIP  
C. Albrecht committed
77
        self.__is_repo = is_repo
Jiahang Chen's avatar
Jiahang Chen committed
78

C. Albrecht's avatar
WIP  
C. Albrecht committed
79
        self.__access_token = ""
Jiahang Chen's avatar
Jiahang Chen committed
80
81
        self.__endpoint = ""

C. Albrecht's avatar
WIP  
C. Albrecht committed
82
        self.__ws_connected = False
Jiahang Chen's avatar
Jiahang Chen committed
83
84
        self.broker = None
        self.ws = None
85
86
        # TODO: Change the variable name dir, because it is a builtin
        # name.
Jiahang Chen's avatar
Jiahang Chen committed
87
        self.dir = None
Jiahang Chen's avatar
Jiahang Chen committed
88
89
        self.repo = None

90
        # : Dict[<str>, <str>]
Jiahang Chen's avatar
Jiahang Chen committed
91
92
        self.repo_json = dict()
        self.dir_json = dict()
93
        self.dt_json = dict()
C. Albrecht's avatar
WIP  
C. Albrecht committed
94
95
96

        attributes = model.get("attributes", None)
        self.__name = ""
Jiahang Chen's avatar
Jiahang Chen committed
97
        self.__roles = {}
Jiahang Chen's avatar
Jiahang Chen committed
98
        self.__features = {}
99
        # ??? Is this property necessary? Only used as return value in _getValue()
Jiahang Chen's avatar
Jiahang Chen committed
100
        self.__resGetValue = list()
C. Albrecht's avatar
WIP  
C. Albrecht committed
101
102
        if attributes:
            self.__name = attributes.get("name", "")
Jiahang Chen's avatar
Jiahang Chen committed
103
104
105

    @property
    def model(self):
Jiahang Chen's avatar
Jiahang Chen committed
106
        """Returns the specification JSON from which this thing has been constructed from.
107
108
109
110
111
112

        :returns: Representation of a ml40 compliant thing.
        :rtype: dict

        """

Jiahang Chen's avatar
Jiahang Chen committed
113
        return self.__model
Jiahang Chen's avatar
Jiahang Chen committed
114
115
116

    @property
    def features(self):
Jiahang Chen's avatar
Jiahang Chen committed
117
        """Returns thing's features.
118

119
120
        :returns: Features
        :rtype: dict
121
122
123

        """

Jiahang Chen's avatar
Jiahang Chen committed
124
125
126
127
        return self.__features

    @features.setter
    def features(self, value):
Jiahang Chen's avatar
Jiahang Chen committed
128
        """Replaces thing's features with value.
129
130
131
132
133

        :param value: New collection of features

        """

Jiahang Chen's avatar
Jiahang Chen committed
134
135
        self.__features = value

Jiahang Chen's avatar
Jiahang Chen committed
136
137
    @property
    def roles(self):
138
        """Returns the thing's roles.
139
140
141

        :returns: ml40 roles
        :rtype: dict
142

143
144
        """

Jiahang Chen's avatar
Jiahang Chen committed
145
146
147
148
        return self.__roles

    @roles.setter
    def roles(self, value):
Jiahang Chen's avatar
Jiahang Chen committed
149
        """Replaces thing's roles with value
150

151
        :param value: New collection of roles
152
153
        """

Jiahang Chen's avatar
Jiahang Chen committed
154
155
        self.__roles = value

C. Albrecht's avatar
WIP  
C. Albrecht committed
156
157
    @property
    def client_secret(self):
158
159
160
161
        """Returns the client secret.

        :returns: Client secret
        :rtype: str
162

163
164
        """

C. Albrecht's avatar
WIP  
C. Albrecht committed
165
166
167
168
        return self.__client_secret

    @property
    def grant_type(self):
Jiahang Chen's avatar
Jiahang Chen committed
169
        """Returns the method used to obtain JSON Web Tokens from the S³I IdentityProvider
170

Jiahang Chen's avatar
Jiahang Chen committed
171
        :returns: OAuth2 specified grant type [password, client_credentials]
172
        :rtype: str
173

174
175
        """

C. Albrecht's avatar
WIP  
C. Albrecht committed
176
177
178
179
        return self.__grant_type

    @property
    def access_token(self):
Jiahang Chen's avatar
Jiahang Chen committed
180
        """Returns the current JSON Web token.
181

Jiahang Chen's avatar
Jiahang Chen committed
182
        :returns: JSON Web token
183
        :rtype: str
184

185
        """
C. Albrecht's avatar
WIP  
C. Albrecht committed
186
187
188
189
        return self.__access_token

    @property
    def name(self):
190
191
192
193
        """Returns the name of this thing.

        :returns: name
        :rtype: str
194

195
196
        """

C. Albrecht's avatar
WIP  
C. Albrecht committed
197
198
199
200
        return self.__name

    @property
    def thing_id(self):
201
202
203
204
        """Returns the identifier of this thing.

        :returns: identifier
        :rtype: str
205

206
207
        """

C. Albrecht's avatar
WIP  
C. Albrecht committed
208
209
        return self.__thing_id

Jiahang Chen's avatar
Jiahang Chen committed
210
211
    @property
    def policy_id(self):
212
213
214
215
216
217
218
        """Returns the identifier of this thing's policy.

        :returns: identifier
        :rtype: str

        """

Jiahang Chen's avatar
Jiahang Chen committed
219
220
        return self.__policy_id

C. Albrecht's avatar
WIP  
C. Albrecht committed
221
    def run_forever(self):
222
        """Starts the thing in permanent mode.
223

224
225
226
        """

        # TODO: Use logger instead!
Jiahang Chen's avatar
Jiahang Chen committed
227
        print("[S³I]: Launch {}{}{}".format(BColors.OKGREEN, self.name, BColors.ENDC))
C. Albrecht's avatar
WIP  
C. Albrecht committed
228
229
        self.__connect_with_idp()

230
        threading.Thread(target=self.__json_syn).start()
Jiahang Chen's avatar
Jiahang Chen committed
231
232
233
234
235
236
        threading.Thread(target=self.__dir_syn).start()
        if self.__is_repo:
            threading.Thread(target=self.__repo_syn).start()

    @staticmethod
    def add_user_def(func):
Jiahang Chen's avatar
Jiahang Chen committed
237
        """Insert user-specified function in the thing object.
238

Jiahang Chen's avatar
Jiahang Chen committed
239
        :param func: external defined function to be executed.
240
241
242

        """

Jiahang Chen's avatar
Jiahang Chen committed
243
244
        threading.Thread(target=func).start()

245
    def __json_syn(self, freq=0.1):
Jiahang Chen's avatar
Jiahang Chen committed
246
247
248
249
250
251
        """
        Applies local changes to the original model in the thing object

        :param freq: Frequency of the update
        :type freq: float
        """
Jiahang Chen's avatar
Jiahang Chen committed
252
        while True:
253
            try:
254
255
256
257
258
259
                time.sleep(freq)
                self.to_json()
            except:
                continue

    def __dir_syn(self, freq=0.1):
260
        """Applies local changes to the directory entry in the cloud.
261
262

        :param freq: Frequency of the update.
Jiahang Chen's avatar
Jiahang Chen committed
263
        :type freq: float
264
        """
265
        while True:
266
267
268
269
270
271
272
273
274
275
276
277
            # try:
            time.sleep(freq)
            old_dir_json = self.dir_json
            self.to_dir_json()
            if self.dir_json == old_dir_json:
                continue
            else:
                self.dir.updateThingIDBased(
                    thingID=self.thing_id, payload=self.dir_json
                )
        # except:
        #    continue
Jiahang Chen's avatar
Jiahang Chen committed
278

279
    def __repo_syn(self, freq=0.1):
280
        """Applies local changes to the repository entry in the cloud.
281
282

        :param freq: Frequency of the update.
Jiahang Chen's avatar
Jiahang Chen committed
283
        :type freq: float
284
        """
Jiahang Chen's avatar
Jiahang Chen committed
285
        while self.__is_repo:
286
            try:
287
                time.sleep(freq)
288
289
                old_repo_json = self.repo_json
                self.to_repo_json()
290
                # TODO: Clean up this reqion
291
292
293
                if self.repo_json == old_repo_json:
                    continue
                else:
294
295
296
                    self.repo.updateThingIDBased(
                        thingID=self.thing_id, payload=self.repo_json
                    )
297
            except:
Jiahang Chen's avatar
Jiahang Chen committed
298
299
                continue

C. Albrecht's avatar
WIP  
C. Albrecht committed
300
    def __connect_with_idp(self):
Jiahang Chen's avatar
Jiahang Chen committed
301
302
        """Establishes a connection to the S³I IdentityProvider which guarantees,
        that the JSON web token needed to use s3i services.
303
        be renewed if it has expired.
304
305
306

        """

Jiahang Chen's avatar
Jiahang Chen committed
307
        #TODO: Use logger!
308
309
310
311
312
313
        print(
            BColors.OKBLUE
            + "[S³I][IdP]"
            + BColors.ENDC
            + ": Connect with S3I IdentityProvider"
        )
C. Albrecht's avatar
WIP  
C. Albrecht committed
314
315
316
317
318
319
320
321
322
323
324
325
326
327
        idp = IdentityProvider(
            grant_type=self.__grant_type,
            client_id=self.__thing_id,
            username=self.__username,
            password=self.__password,
            client_secret=self.__client_secret,
            realm=BaseVariable.IDP_REALM,
            identity_provider_url=BaseVariable.IDP_URL,
        )

        # This may take a while so fetch token directly.
        idp.run_forever(token_type=TokenType.ACCESS_TOKEN, on_new_token=self.__on_token)

    def __on_token(self, token):
Jiahang Chen's avatar
Jiahang Chen committed
328
329
        """Updates the JSON Web Token with token and reestablishes connections
        to the s3i services .
330

Jiahang Chen's avatar
Jiahang Chen committed
331
332
        :param token: New JSON Web token
        :type token: str
333
334
335

        """

C. Albrecht's avatar
WIP  
C. Albrecht committed
336
        self.__access_token = token
Jiahang Chen's avatar
Jiahang Chen committed
337
        self.__connect_with_dir()
Jiahang Chen's avatar
Jiahang Chen committed
338
        self.__connect_with_repo()
C. Albrecht's avatar
WIP  
C. Albrecht committed
339
340
        if self.__is_broker:
            self.__connect_with_broker()
Jiahang Chen's avatar
Jiahang Chen committed
341

Jiahang Chen's avatar
Jiahang Chen committed
342
    def __connect_with_dir(self):
343
        """Initializes the property dir with a Directory object which can be
Jiahang Chen's avatar
Jiahang Chen committed
344
        used to access the s3i Directory.
345
346
347
348
349
350
351

        :returns:
        :rtype:

        """

        # TODO: Use logger
Jiahang Chen's avatar
Jiahang Chen committed
352
        print(
Jiahang Chen's avatar
Jiahang Chen committed
353
            BColors.OKBLUE
Jiahang Chen's avatar
Jiahang Chen committed
354
            + "[S³I][Dir]"
Jiahang Chen's avatar
Jiahang Chen committed
355
            + BColors.ENDC
Jiahang Chen's avatar
Jiahang Chen committed
356
            + ": Connect with S3I Directory"
Jiahang Chen's avatar
Jiahang Chen committed
357
        )
358
359
360
        self.dir = Directory(
            s3i_dir_url=BaseVariable.DIR_URL, token=self.__access_token
        )
Jiahang Chen's avatar
Jiahang Chen committed
361

C. Albrecht's avatar
WIP  
C. Albrecht committed
362
    def __connect_with_repo(self):
363
        """Initializes the property repo whit a Repository object which can be
Jiahang Chen's avatar
Jiahang Chen committed
364
        used to access the s3i Repository.
365
366
367
368
369
370
371

        :returns:
        :rtype:

        """

        # TODO: Use logger
Jiahang Chen's avatar
Jiahang Chen committed
372
        print(
Jiahang Chen's avatar
Jiahang Chen committed
373
            BColors.OKBLUE
Jiahang Chen's avatar
Jiahang Chen committed
374
            + "[S³I][Repo]"
Jiahang Chen's avatar
Jiahang Chen committed
375
            + BColors.ENDC
Jiahang Chen's avatar
Jiahang Chen committed
376
            + ": Connect with S3I Repository"
Jiahang Chen's avatar
Jiahang Chen committed
377
        )
378
379
380
        self.repo = Repository(
            s3i_repo_url=BaseVariable.REPO_URL, token=self.__access_token
        )
Jiahang Chen's avatar
Jiahang Chen committed
381

C. Albrecht's avatar
WIP  
C. Albrecht committed
382
    def __connect_with_broker(self):
383
        """Initializes the property broker with a Broker object. Additionally
Jiahang Chen's avatar
Jiahang Chen committed
384
        a callback function is registered which handles incoming S³I-B Messages
385
386
387
388
389
        messages.

        """

        # TODO: Use logger
390
391
392
393
394
395
        print(
            BColors.OKBLUE
            + "[S³I][Broker]"
            + BColors.ENDC
            + ": Connect with S3I Broker"
        )
Jiahang Chen's avatar
Jiahang Chen committed
396
397
398
399
400
401
        self.__endpoint = find_broker_endpoint(self.dir, thing_id=self.thing_id)
        if self.__is_broker_rest:
            self.broker = BrokerREST(token=self.access_token)

            def receive():
                while True:
402
403
404
405
406
407
                    try:
                        time.sleep(0.1)
                        msg_str = self.broker.receive_once(self.__endpoint)
                        if msg_str == "":
                            continue
                        else:
408
409
410
411
412
413
                            self.__on_broker_callback(
                                ch=None,
                                method=None,
                                properties=None,
                                body=json.loads(msg_str),
                            )
414
                    except:
Jiahang Chen's avatar
Jiahang Chen committed
415
                        continue
416

417
            threading.Thread(target=receive).start()
Jiahang Chen's avatar
Jiahang Chen committed
418
419
420
421
422
423
424
425
426
427
428
429
430

        else:
            self.broker = Broker(
                auth_form="Username/Password",
                username=" ",
                password=self.__access_token,
                host=BaseVariable.BROKER_HOST,
            )

            threading.Thread(
                target=self.broker.receive,
                args=(self.__endpoint, self.__on_broker_callback),
            ).start()
C. Albrecht's avatar
WIP  
C. Albrecht committed
431
432

    def __on_broker_callback(self, ch, method, properties, body):
433
434
435
436
437
438
439
440
        """Parses body (content of a S3I-B message) and delegates the
        processing of the message to a separate method. The method is
        selected according to the message's type.

        :param body: S3I-B message

        """

Jiahang Chen's avatar
Jiahang Chen committed
441
        if isinstance(body, bytes):
Jiahang Chen's avatar
Jiahang Chen committed
442
443
444
445
446
            body_str = body.decode('utf8').replace("'", '"')
            try:
                body = json.loads(body_str)
            except ValueError:
                pass
447

Jiahang Chen's avatar
Jiahang Chen committed
448
449
        elif isinstance(body, int):
            return
Jiahang Chen's avatar
Jiahang Chen committed
450
        elif isinstance(body, str):
Jiahang Chen's avatar
Jiahang Chen committed
451
452
453
454
455
456
457
458
459
460
461
462
463
            return

        message_type = body.get("messageType")
        if message_type == "userMessage":
            self.on_user_message(body)
        elif message_type == "serviceRequest":
            self.on_service_request(body)
        elif message_type == "getValueRequest":
            self.on_get_value_request(body)
        elif message_type == "getValueReply":
            self.on_get_value_reply(body)
        elif message_type == "serviceReply":
            self.on_service_reply(body)
Jiahang Chen's avatar
Jiahang Chen committed
464
        else:
Jiahang Chen's avatar
Jiahang Chen committed
465
466
            ### TODO send user message reply back
            pass
Jiahang Chen's avatar
Jiahang Chen committed
467
468

    def on_user_message(self, msg):
Jiahang Chen's avatar
Jiahang Chen committed
469
        """Handles incoming S³I-B UserMessages.
470

Jiahang Chen's avatar
Jiahang Chen committed
471
        :param msg: S³I-B UserMessages
472
473
474

        """

Jiahang Chen's avatar
Jiahang Chen committed
475
476
477
478
        print(
            BColors.OKBLUE
            + "[S³I][Broker]"
            + BColors.ENDC
Jiahang Chen's avatar
Jiahang Chen committed
479
            + ": You have received a S³I-B UserMessage"
Jiahang Chen's avatar
Jiahang Chen committed
480
481
            + json.dumps(msg, indent=2)
        )
C. Albrecht's avatar
WIP  
C. Albrecht committed
482
483

    def on_get_value_request(self, msg):
Jiahang Chen's avatar
Jiahang Chen committed
484
        """Handles incoming GetValueRequest message. Looks up the value specified in msg and
485
486
487
488
489
490
        sends a GetValueReply message back to the sender.

        :param msg: GetValueRequest

        """

Jiahang Chen's avatar
Jiahang Chen committed
491
492
493
494
495
496
497
        print(
            BColors.OKBLUE
            + "[S³I][Broker]"
            + BColors.ENDC
            + ": You have received a S³I-B GetValueRequest"
            + json.dumps(msg, indent=2)
        )
Jiahang Chen's avatar
Jiahang Chen committed
498
499
500
501
502
503
504
        get_value_reply = GetValueReply()
        request_sender = msg.get("sender")
        request_msg_id = msg.get("identifier")
        request_sender_endpoint = msg.get("replyToEndpoint")
        attribute_path = msg.get("attributePath")
        reply_msg_uuid = "s3i:" + str(uuid.uuid4())
        try:
Jiahang Chen's avatar
Jiahang Chen committed
505
506
507
508
509
510
511
            print(
                BColors.OKBLUE
                + "[S³I]"
                + BColors.ENDC
                + ": Search the attribute with path: "
                + attribute_path
            )
Jiahang Chen's avatar
Jiahang Chen committed
512
513
514
            value = self._uriToData(attribute_path)
        except KeyError:
            value = "invalid attribute path"
515
            APP_LOGGER.critical(value)
Jiahang Chen's avatar
Jiahang Chen committed
516
517
518
519

        get_value_reply.fillGetValueReply(
            senderUUID=self.thing_id,
            receiverUUID=[request_sender],
520
            results=value,
Jiahang Chen's avatar
Jiahang Chen committed
521
522
523
            msgUUID=reply_msg_uuid,
            replyingToUUID=request_msg_id,
        )
Jiahang Chen's avatar
Jiahang Chen committed
524
525

        res = self.broker.send(
Jiahang Chen's avatar
Jiahang Chen committed
526
527
528
            receiver_endpoints=[request_sender_endpoint],
            msg=json.dumps(get_value_reply.msg),
        )
529
530
531
532
533
534
535
536
537
538
539
540
541
        if self.__is_broker_rest:
            if res.status_code == 201:
                print(
                    BColors.OKBLUE
                    + "[S³I][Broker]"
                    + BColors.ENDC
                    + ": Send a S³I-B GetValueReply back to the requester  "
                )
            else:
                print(BColors.OKBLUE
                      + "[S³I][Broker]"
                      + BColors.ENDC
                      + res.text)
Jiahang Chen's avatar
Jiahang Chen committed
542
543

    def _uriToData(self, uri):
544
545
546
547
548
549
550
        """Returns a copy of the value found at uri.

        :param uri: Path to value
        :rtype: Feature

        """

Jiahang Chen's avatar
Jiahang Chen committed
551
        if uri == "":
552
            return self.dt_json
Jiahang Chen's avatar
Jiahang Chen committed
553
554
555
556
        else:
            uri_list = uri.split("/")
            if uri_list[0] == "features":
                try:
557
                    return self.dt_json[uri]
Jiahang Chen's avatar
Jiahang Chen committed
558
559
560
561
                except KeyError:
                    return "invalid attribute path"

            try:
562
                self._getValue(self.dt_json, uri_list)
Jiahang Chen's avatar
Jiahang Chen committed
563
564
565
566
567
568
569
570
571
572
573
574
            except:
                return "invalid attribute path"
            if self.__resGetValue.__len__() == 0:
                return "invalid attribute path"
            response = copy.deepcopy(self.__resGetValue)
            self.__resGetValue.clear()
            if response.__len__() == 1:
                return response[0]
            else:
                return response

    def _getValue(self, source, uri_list):
575
576
577
578
579
580
581
582
583
584
585
        """Searches for the value specified by uri_list in source and stores
        the result in __resGetValue.

        :param source: Object that is scanned
        :param uri_list: List containing path

        """

        # ??? What if the uri points to a Value object?
        # Shouldn't it be serialized?!

Jiahang Chen's avatar
Jiahang Chen committed
586
587
588
589
590
591
592
        value = source[uri_list[0]]
        if uri_list.__len__() == 1:
            # if is ditto-feature
            if isinstance(value, str):
                try:
                    stringValue_split = value.split(":")
                    if stringValue_split[0] == "ditto-feature":
593
594
595
                        value = self.dt_json["features"][stringValue_split[1]][
                            "properties"
                        ][uri_list[0]]
Jiahang Chen's avatar
Jiahang Chen committed
596
597
598
599
600
                except:
                    pass
            self.__resGetValue.append(value)
            return
        if isinstance(value, dict):
601
            # ??? uri_list.pop(0) better?!
Jiahang Chen's avatar
Jiahang Chen committed
602
603
604
605
606
607
608
609
610
611
612
613
614
615
616
617
618
619
620
621
622
623
624
625
626
627
628
629
            del uri_list[0]
            self._getValue(value, uri_list)
        if isinstance(value, list):
            if isinstance(value[0], (str, int, float, bool, list)):
                return value
            if isinstance(value[0], dict):
                for item in value:
                    if item["class"] == "ml40::Thing":
                        for i in item["roles"]:
                            if self._findValue(i, uri_list[1]):
                                uri_list_1 = copy.deepcopy(uri_list)
                                del uri_list_1[:2]
                                self._getValue(item, uri_list_1)
                    else:
                        if self._findValue(item, uri_list[1]):
                            uri_list_1 = copy.deepcopy(uri_list)
                            del uri_list_1[:2]
                            if not uri_list_1:
                                self.__resGetValue.append(item)
                                return
                            else:
                                self._getValue(item, uri_list_1)
        if isinstance(value, (str, int, float, bool)):
            # if is ditto-feature
            if isinstance(value, str):
                try:
                    stringValue_split = value.split(":")
                    if stringValue_split[0] == "ditto-feature":
630
                        value = self.dt_json["features"][stringValue_split[1]][
Jiahang Chen's avatar
Jiahang Chen committed
631
632
633
634
635
636
637
                            "properties"
                        ][uri_list[0]]
                except:
                    pass
            self.__resGetValue.append(value)

    def _findValue(self, json, value):
638
639
640
641
642
643
644
645
646
647
        """Returns true if value has been found in json, otherwise returns false.

        :param json: dictionary
        :param value:
        :returns:
        :rtype:

        """

        # TODO: Simplify: value in json.values()
Jiahang Chen's avatar
Jiahang Chen committed
648
649
650
651
652
        for item in json:
            if json[item] == value:
                # print("Parameter: ", json[item])
                return True
        return False
C. Albrecht's avatar
WIP  
C. Albrecht committed
653
654

    def on_service_request(self, body_json):
Jiahang Chen's avatar
Jiahang Chen committed
655
656
        """Handles S³I-B ServiceRequests. Executes the method of the
        functionality specified in serviceType and send a ServiceReply
657
658
659
660
661
662
        back to the sender.

        :param body_json: ServiceRequest

        """

Jiahang Chen's avatar
Jiahang Chen committed
663
664
665
666
667
668
669
        print(
            BColors.OKBLUE
            + "[S³I][Broker]"
            + BColors.ENDC
            + ": You have received a S³I-B ServiceRequest "
            + json.dumps(body_json, indent=2)
        )
C. Albrecht's avatar
WIP  
C. Albrecht committed
670
        service_type = body_json.get("serviceType")
671
        parameters = body_json.get("parameters")
672
        service_reply = ServiceReply()
Jiahang Chen's avatar
Jiahang Chen committed
673
        service_functionality = service_type.split('/')[0]
Jiahang Chen's avatar
Jiahang Chen committed
674
        service_functionality_obj = self.features.get(service_functionality)
Jiahang Chen's avatar
Jiahang Chen committed
675
        if service_functionality_obj is None:
C. Albrecht's avatar
WIP  
C. Albrecht committed
676
            APP_LOGGER.critical(
677
678
                "Functionality %s is not one of the built-in functionalities in %s!"
                % (service_functionality, self.name)
C. Albrecht's avatar
WIP  
C. Albrecht committed
679
            )
GromeTT's avatar
GromeTT committed
680
681
682
683
684
685
686
687
            service_reply.fillServiceReply(
                senderUUID=self.thing_id,
                receiverUUID=body_json.get("sender", None),
                serviceType=body_json.get("serviceType", None),
                results={"error": "invalid functionalities (serviceType) {}".format(service_functionality)},
                replyingToUUID=body_json.get("identifier", None),
                msgUUID="s3i:{}".format(uuid.uuid4())
            )
688
689
690
691
692
693
694
695
696
697
698
699
700
701
702
703
704
705
706
707
708
709
710
711
712
713
714
715
716
717
718
719
720
721
722
723
        else:
            # TODO: Call right functionality.
            try:
                method = getattr(service_functionality_obj, service_type.split('/')[1])
            except AttributeError:
                APP_LOGGER.critical(
                    "Functionality %s is not one of the built-in functionalities in %s!" % (
                        service_type.split('/')[1], self.name)
                )
                service_reply.fillServiceReply(
                    senderUUID=self.thing_id,
                    receiverUUID=body_json.get("sender", None),
                    serviceType=body_json.get("serviceType", None),
                    results={"error": "invalid functionalities (serviceType) {}".format(service_type.split('/')[1])},
                    replyingToUUID=body_json.get("identifier", None),
                    msgUUID="s3i:{}".format(uuid.uuid4())
                )
            else:
                print(
                    BColors.OKBLUE
                    + "[S³I][Broker]"
                    + BColors.ENDC
                    + ": Execute the function {0} of the class {1}.".format(service_type.split('/')[1],
                                                                            service_type.split('/')[0])
                )
                try:
                    result = method(**parameters)
                except TypeError:
                    APP_LOGGER.critical("Invalid function arguments")
                    service_reply.fillServiceReply(
                        senderUUID=self.thing_id,
                        receiverUUID=body_json.get("sender", None),
                        serviceType=body_json.get("serviceType", None),
                        results={"error": "invalid function arguments (parameters)"},
                        replyingToUUID=body_json.get("identifier", None),
                        msgUUID="s3i:{}".format(uuid.uuid4())
724
725
                    )
                else:
726
727
728
729
730
731
732
733
734
735
736
737
738
739
740
741
742
743
744
745
746
747
748
749
750
751
                    if isinstance(result, bool):
                        result = {"ok": result}
                    service_reply.fillServiceReply(
                        senderUUID=self.thing_id,
                        receiverUUID=body_json.get("sender", None),
                        serviceType=body_json.get("serviceType", None),
                        results=result,
                        replyingToUUID=body_json.get("identifier", None),
                        msgUUID="s3i:{}".format(uuid.uuid4())
                    )

        res = self.broker.send(receiver_endpoints=[body_json.get("replyToEndpoint", None)],
                               msg=json.dumps(service_reply.msg))
        if self.__is_broker_rest:
            if res.status_code == 201:
                print(
                    BColors.OKBLUE
                    + "[S³I][Broker]"
                    + BColors.ENDC
                    + ": Send a S³I-B ServiceReply back to the requester  "
                )
            else:
                print( BColors.OKBLUE
                    + "[S³I][Broker]"
                    + BColors.ENDC
                    + res.text)
Jiahang Chen's avatar
Jiahang Chen committed
752
753

    def on_get_value_reply(self, msg):
Jiahang Chen's avatar
Jiahang Chen committed
754
        """Handles incoming S³I-B GetValueReply. Prints the content of msg to stdout.
755
756
757
758
759
760
761

        :param msg: GetValueReply

        """

        # ???: Behavior should be defined by the user! Maybe he want
        # to process the result!
Jiahang Chen's avatar
Jiahang Chen committed
762
763
764
765
        print(
            BColors.OKBLUE
            + "[S³I][Broker]"
            + BColors.ENDC
Jiahang Chen's avatar
Jiahang Chen committed
766
            + ": You have received a S³I-B GetValueReply"
Jiahang Chen's avatar
Jiahang Chen committed
767
768
            + json.dumps(msg, indent=2)
        )
Jiahang Chen's avatar
Jiahang Chen committed
769
770
771
772
773
774
775
776
        value = msg.get("value", None)
        if isinstance(value, dict):
            value = json.dumps(value, indent=2)

        print(
            BColors.OKBLUE
            + "[S³I][Broker]"
            + BColors.ENDC
777
778
779
            + ": The queried value is: {0}{1}{2}".format(
                BColors.OKGREEN, value, BColors.ENDC
            )
Jiahang Chen's avatar
Jiahang Chen committed
780
781
        )

Jiahang Chen's avatar
Jiahang Chen committed
782
    def on_service_reply(self, msg):
Jiahang Chen's avatar
Jiahang Chen committed
783
        """Handles incoming S³I-B ServiceReply. Prints the content of msg to stdout.
784
785
786
787
788

        :param msg: ServiceReply

        """

Jiahang Chen's avatar
Jiahang Chen committed
789
790
791
792
        print(
            BColors.OKBLUE
            + "[S³I][Broker]"
            + BColors.ENDC
Jiahang Chen's avatar
Jiahang Chen committed
793
            + ": You have received a S³I-B ServiceReply"
Jiahang Chen's avatar
Jiahang Chen committed
794
795
            + json.dumps(msg, indent=2)
        )
Jiahang Chen's avatar
Jiahang Chen committed
796
797
798
799
800
801
802
803
        results = msg.get("results", None)
        if isinstance(results, dict):
            results = json.dumps(results, indent=2)

        print(
            BColors.OKBLUE
            + "[S³I][Broker]"
            + BColors.ENDC
804
805
806
            + ": The result is: {0}{1}{2}".format(
                BColors.OKGREEN, results, BColors.ENDC
            )
Jiahang Chen's avatar
Jiahang Chen committed
807
        )
Jiahang Chen's avatar
Jiahang Chen committed
808
809

    def to_dir_json(self):
810
811
812
813
814
815
816
        """Returns a dictionary representing this thing's directory entry.

        :returns: Directory representation of this object
        :rtype: dict

        """

Jiahang Chen's avatar
Jiahang Chen committed
817
818
819
820
821
        self.dir_json = self.dir.queryThingIDBased(self.thing_id)
        if self.thing_id is not None:
            self.dir_json["thingId"] = self.thing_id
        if self.policy_id is not None:
            self.dir_json["policyId"] = self.policy_id
Jiahang Chen's avatar
Jiahang Chen committed
822
823
824
825
826
        if self.name is not None:
            self.dir_json["attributes"]["name"] = self.name
        if self.features.get("ml40::Location") is not None:
            self.dir_json["attributes"]["location"] = {
                "longitude": self.features.get("ml40::Location").to_json()["longitude"],
GromeTT's avatar
GromeTT committed
827
                "latitude": self.features.get("ml40::Location").to_json()["latitude"]
Jiahang Chen's avatar
Jiahang Chen committed
828
829
830
831
            }
        self.dir_json["attributes"]["dataModel"] = "fml40"
        self.dir_json["attributes"]["thingStructure"] = {
            "class": "ml40::Thing",
GromeTT's avatar
GromeTT committed
832
            "links": []
Jiahang Chen's avatar
Jiahang Chen committed
833
        }
Jiahang Chen's avatar
Jiahang Chen committed
834
        for key in self.roles.keys():
GromeTT's avatar
GromeTT committed
835
836
837
838
            role_entry = {
                "association": "roles",
                "target": self.roles[key].to_json()
            }
Jiahang Chen's avatar
Jiahang Chen committed
839
            self.dir_json["attributes"]["thingStructure"]["links"].append(role_entry)
Jiahang Chen's avatar
Jiahang Chen committed
840

Jiahang Chen's avatar
Jiahang Chen committed
841
        for key in self.features.keys():
Jiahang Chen's avatar
Jiahang Chen committed
842
843
            feature_target = {
                "class": self.features[key].to_json()["class"],
Jiahang Chen's avatar
Jiahang Chen committed
844
            }
Jiahang Chen's avatar
Jiahang Chen committed
845
846
847
            if self.features[key].to_json().get("identifier") is not None:
                feature_target["identifier"] = self.features[key].to_json()["identifier"]

848
            feature_entry = {"association": "features", "target": feature_target}
Jiahang Chen's avatar
Jiahang Chen committed
849
            # if the feature has targets, like ml40::Composite
850
            if hasattr(self.features[key], "targets"):
Jiahang Chen's avatar
Jiahang Chen committed
851
852
                feature_entry["target"]["links"] = list()
                for target in self.features[key].targets.keys():
853
854
855
                    target_json = (
                        self.features[key].targets[target].to_subthing_dir_json()
                    )
Jiahang Chen's avatar
Jiahang Chen committed
856
857
                    feature_entry["target"]["links"].append(target_json)
            self.dir_json["attributes"]["thingStructure"]["links"].append(feature_entry)
Jiahang Chen's avatar
Jiahang Chen committed
858
859
        return self.dir_json

860
    def to_repo_json(self):
861
862
863
864
865
866
867
        """Returns a dictionary representing this thing's repository entry.

        :returns: Repository representation of this object
        :rtype: dict

        """

868
        self.repo_json = self.dt_json
Jiahang Chen's avatar
Jiahang Chen committed
869
870
        return self.repo_json

871
    def to_json(self):
872
873
874
875
876
877
878
        """Returns a dictionary representing this thing in it's current state.

        :returns: Representation of this object
        :rtype: dict

        """

879
880
881
882
883
884
        self.dt_json = {
            "thingId": self.thing_id,
            "policyId": self.policy_id,
            "attributes": {
                "class": "ml40::Thing",
                "name": self.name,
885
            },
886
887
888
889
890
891
892
        }
        if self.roles:
            self.dt_json["attributes"]["roles"] = list()
        if self.features:
            self.dt_json["attributes"]["features"] = list()
        for key in self.roles.keys():
            self.dt_json["attributes"]["roles"].append(self.roles[key].to_json())
Jiahang Chen's avatar
Jiahang Chen committed
893
        for key in self.features.keys():
894
895
896
            self.dt_json["attributes"]["features"].append(self.features[key].to_json())
        return self.dt_json

Jiahang Chen's avatar
Jiahang Chen committed
897
    def to_subthing_json(self):
898
899
900
        """Returns a dictionary representing this thing in it's current state
        as a subordinate thing. This representation should be used for
        subordinate things in s3i repository entries.
901

902
903
        :returns: Representation of this object as a subordinate thing
        :rtype: dict
904
905
906

        """

Jiahang Chen's avatar
Jiahang Chen committed
907
908
909
910
        json_out = {
            "class": "ml40::Thing",
            "name": self.name,
            "roles": [],
911
            "features": [],
Jiahang Chen's avatar
Jiahang Chen committed
912
        }
Jiahang Chen's avatar
Jiahang Chen committed
913
914
        if self.thing_id:
            json_out["identifier"] = self.thing_id
Jiahang Chen's avatar
Jiahang Chen committed
915
916
917
918
919
        for key in self.roles.keys():
            json_out["roles"].append(self.roles[key].to_json())
        for key in self.features.keys():
            json_out["features"].append(self.features[key].to_json())
        return json_out
Jiahang Chen's avatar
Jiahang Chen committed
920
921

    def to_subthing_dir_json(self):
922
923
924
925
926
927
928
929
930
        """Returns a dictionary representing this thing in it's current state
        as a subordinate thing. This representation should be used for
        subordinate things in s3i directory entries.

        :returns: Representation of this object as a subordinate thing.
        :rtype: dict

        """

Jiahang Chen's avatar
Jiahang Chen committed
931
932
933
        json_out = {"class": "ml40::Thing", "links": []}
        if self.thing_id:
            json_out["identifier"] = self.thing_id
Jiahang Chen's avatar
Jiahang Chen committed
934
        for key in self.roles.keys():
935
            role_entry = {"association": "roles", "target": self.roles[key].to_json()}
Jiahang Chen's avatar
Jiahang Chen committed
936
937
            json_out["links"].append(role_entry)
        for key in self.features.keys():
Jiahang Chen's avatar
Jiahang Chen committed
938
939
            feature_target = {
                "class": self.features[key].to_json()["class"],
Jiahang Chen's avatar
Jiahang Chen committed
940
            }
Jiahang Chen's avatar
Jiahang Chen committed
941
942
            if self.features[key].to_json().get("identifier") is not None:
                feature_target["identifier"] = self.features[key].to_json()["identifier"]
943
            feature_entry = {"association": "features", "target": feature_target}
Jiahang Chen's avatar
Jiahang Chen committed
944
945
            json_out["links"].append(feature_entry)
        return json_out