thing.py 31.1 KB
Newer Older
1
2
3
"""This module implements the thing class which is the core element of
this package."""

C. Albrecht's avatar
WIP    
C. Albrecht committed
4
5
6
import threading
import json
import uuid
7
8
9
10
import time
import copy
from s3i import IdentityProvider, TokenType, GetValueReply, Directory
from s3i import Repository
11
import s3i.exception
Jiahang Chen's avatar
Jiahang Chen committed
12
from s3i.broker import Broker, BrokerREST
Jiahang Chen's avatar
Jiahang Chen committed
13
14
from s3i.messages import ServiceReply
from ml.tools import find_broker_endpoint
C. Albrecht's avatar
WIP    
C. Albrecht committed
15
from ml.app_logger import APP_LOGGER
Jiahang Chen's avatar
Jiahang Chen committed
16
17


18
19
20
class BaseVariable:
    """The BaseVariable class holds various urls to s3i services."""

C. Albrecht's avatar
WIP    
C. Albrecht committed
21
22
23
24
25
26
27
28
    IDP_URL = "https://idp.s3i.vswf.dev/"
    IDP_REALM = "KWH"
    BROKER_HOST = "rabbitmq.s3i.vswf.dev"
    REPO_WWS_URL = "wss://ditto.s3i.vswf.dev/ws/2"
    REPO_URL = "https://ditto.s3i.vswf.dev/api/2/"
    DIR_URL = "https://dir.s3i.vswf.dev/api/2/"


Jiahang Chen's avatar
Jiahang Chen committed
29
class Thing:
30
    """The thing class represents a customizable runtime environment for
Jiahang Chen's avatar
Jiahang Chen committed
31
    operating Digital Twins complying the Forest Modeling Language (fml40)."""
32

C. Albrecht's avatar
WIP    
C. Albrecht committed
33
    def __init__(
Jiahang Chen's avatar
Jiahang Chen committed
34
35
36
37
38
39
40
41
42
            self,
            model: dict,
            client_secret="",
            grant_type="password",
            is_broker=False,
            is_repo=False,
            is_broker_rest=True,
            username=None,
            password=None,
C. Albrecht's avatar
WIP    
C. Albrecht committed
43
    ):
Jiahang Chen's avatar
Jiahang Chen committed
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
        """
        Constructor

        :param model: edge-device or S³I Repository specified JSON entry, like config file for Digital Twins
        :type model: dict
        :param client_secret: OAuth 2.0 specified client secret, generated in the S³I IdentityProvider
        :type client_secret: str
        :param grant_type: OAuth 2.0 specified grant type to issue a JWT. Here the grant type can be password or client_credentials
        :type grant_type: str
        :param is_broker: whether broker interface is enabled in the ml40::thing instance
        :type is_broker: bool
        :param is_broker_rest: Whether the connection with the S³I Broker is established via HTTP REST
        :type is_broker_rest: bool
        :param is_repo: Whether the thing uses the S³I Repository to launch its Digital Twin in the cloud
        :type is_repo: bool
        :param username: OAuth 2.0 specified username, registered in the S³I IdentityProvider. If the grant_type is set as password, the username is required
        :type username: str
        :param password: OAuth 2.0 specified password, registered in the S³I IdentityProvider. If the grant_type is set as password, the password is required

        """
64

Jiahang Chen's avatar
Jiahang Chen committed
65
        self.__model = model
C. Albrecht's avatar
WIP    
C. Albrecht committed
66
67
68
69
70
71
        self.__thing_id = model.get("thingId", "")
        self.__policy_id = model.get("policyId", "")
        self.__grant_type = grant_type
        self.__username = username
        self.__password = password
        self.__client_secret = client_secret
Jiahang Chen's avatar
Jiahang Chen committed
72

C. Albrecht's avatar
WIP    
C. Albrecht committed
73
        self.__is_broker = is_broker
Jiahang Chen's avatar
Jiahang Chen committed
74
        self.__is_broker_rest = is_broker_rest
C. Albrecht's avatar
WIP    
C. Albrecht committed
75
        self.__is_repo = is_repo
Jiahang Chen's avatar
Jiahang Chen committed
76

C. Albrecht's avatar
WIP    
C. Albrecht committed
77
        self.__access_token = ""
Jiahang Chen's avatar
Jiahang Chen committed
78
79
        self.__endpoint = ""

C. Albrecht's avatar
WIP    
C. Albrecht committed
80
        self.__ws_connected = False
Jiahang Chen's avatar
Jiahang Chen committed
81
82
        self.broker = None
        self.ws = None
83
84
        # TODO: Change the variable name dir, because it is a builtin
        # name.
Jiahang Chen's avatar
Jiahang Chen committed
85
        self.dir = None
Jiahang Chen's avatar
Jiahang Chen committed
86
87
        self.repo = None

88
        # : Dict[<str>, <str>]
Jiahang Chen's avatar
Jiahang Chen committed
89
90
        self.repo_json = dict()
        self.dir_json = dict()
91
        self.dt_json = dict()
C. Albrecht's avatar
WIP    
C. Albrecht committed
92
93
94

        attributes = model.get("attributes", None)
        self.__name = ""
Jiahang Chen's avatar
Jiahang Chen committed
95
        self.__roles = {}
Jiahang Chen's avatar
Jiahang Chen committed
96
        self.__features = {}
Jiahang Chen's avatar
Jiahang Chen committed
97
        self.__ditto_features = {}
98
        # ??? Is this property necessary? Only used as return value in _getValue()
Jiahang Chen's avatar
Jiahang Chen committed
99
        self.__resGetValue = list()
C. Albrecht's avatar
WIP    
C. Albrecht committed
100
101
        if attributes:
            self.__name = attributes.get("name", "")
Jiahang Chen's avatar
Jiahang Chen committed
102
103
104

    @property
    def model(self):
Jiahang Chen's avatar
Jiahang Chen committed
105
        """Returns the specification JSON from which this thing has been constructed from.
106
107
108
109
110
111

        :returns: Representation of a ml40 compliant thing.
        :rtype: dict

        """

Jiahang Chen's avatar
Jiahang Chen committed
112
        return self.__model
Jiahang Chen's avatar
Jiahang Chen committed
113

Jiahang Chen's avatar
Jiahang Chen committed
114
115
116
117
118
119
120
121
    @property
    def ditto_features(self):
        return self.__ditto_features

    @ditto_features.setter
    def ditto_features(self, value):
        self.__ditto_features = value

Jiahang Chen's avatar
Jiahang Chen committed
122
123
    @property
    def features(self):
Jiahang Chen's avatar
Jiahang Chen committed
124
        """Returns thing's features.
125

126
127
        :returns: Features
        :rtype: dict
128
129
130

        """

Jiahang Chen's avatar
Jiahang Chen committed
131
132
133
134
        return self.__features

    @features.setter
    def features(self, value):
Jiahang Chen's avatar
Jiahang Chen committed
135
        """Replaces thing's features with value.
136
137
138
139
140

        :param value: New collection of features

        """

Jiahang Chen's avatar
Jiahang Chen committed
141
142
        self.__features = value

Jiahang Chen's avatar
Jiahang Chen committed
143
144
    @property
    def roles(self):
145
        """Returns the thing's roles.
146
147
148

        :returns: ml40 roles
        :rtype: dict
149

150
151
        """

Jiahang Chen's avatar
Jiahang Chen committed
152
153
154
155
        return self.__roles

    @roles.setter
    def roles(self, value):
Jiahang Chen's avatar
Jiahang Chen committed
156
        """Replaces thing's roles with value
157

158
        :param value: New collection of roles
159
160
        """

Jiahang Chen's avatar
Jiahang Chen committed
161
162
        self.__roles = value

C. Albrecht's avatar
WIP    
C. Albrecht committed
163
164
    @property
    def client_secret(self):
165
166
167
168
        """Returns the client secret.

        :returns: Client secret
        :rtype: str
169

170
171
        """

C. Albrecht's avatar
WIP    
C. Albrecht committed
172
173
174
175
        return self.__client_secret

    @property
    def grant_type(self):
Jiahang Chen's avatar
Jiahang Chen committed
176
        """Returns the method used to obtain JSON Web Tokens from the S³I IdentityProvider
177

Jiahang Chen's avatar
Jiahang Chen committed
178
        :returns: OAuth2 specified grant type [password, client_credentials]
179
        :rtype: str
180

181
182
        """

C. Albrecht's avatar
WIP    
C. Albrecht committed
183
184
185
186
        return self.__grant_type

    @property
    def access_token(self):
Jiahang Chen's avatar
Jiahang Chen committed
187
        """Returns the current JSON Web token.
188

Jiahang Chen's avatar
Jiahang Chen committed
189
        :returns: JSON Web token
190
        :rtype: str
191

192
        """
C. Albrecht's avatar
WIP    
C. Albrecht committed
193
194
195
196
        return self.__access_token

    @property
    def name(self):
197
198
199
200
        """Returns the name of this thing.

        :returns: name
        :rtype: str
201

202
203
        """

C. Albrecht's avatar
WIP    
C. Albrecht committed
204
205
206
207
        return self.__name

    @property
    def thing_id(self):
208
209
210
211
        """Returns the identifier of this thing.

        :returns: identifier
        :rtype: str
212

213
214
        """

C. Albrecht's avatar
WIP    
C. Albrecht committed
215
216
        return self.__thing_id

Jiahang Chen's avatar
Jiahang Chen committed
217
218
    @property
    def policy_id(self):
219
220
221
222
223
224
225
        """Returns the identifier of this thing's policy.

        :returns: identifier
        :rtype: str

        """

Jiahang Chen's avatar
Jiahang Chen committed
226
227
        return self.__policy_id

C. Albrecht's avatar
WIP    
C. Albrecht committed
228
    def run_forever(self):
229
        """Starts the thing in permanent mode.
230

231
        """
Jiahang Chen's avatar
Jiahang Chen committed
232
233
        __log = "[S3I]: Launch {}".format(self.name)
        APP_LOGGER.info(__log)
C. Albrecht's avatar
WIP    
C. Albrecht committed
234
235
        self.__connect_with_idp()

236
        threading.Thread(target=self.__json_syn).start()
Jiahang Chen's avatar
Jiahang Chen committed
237
238
239
240
241
242
        threading.Thread(target=self.__dir_syn).start()
        if self.__is_repo:
            threading.Thread(target=self.__repo_syn).start()

    @staticmethod
    def add_user_def(func):
Jiahang Chen's avatar
Jiahang Chen committed
243
        """Insert user-specified function in the thing object.
244

Jiahang Chen's avatar
Jiahang Chen committed
245
        :param func: external defined function to be executed.
246
247
248

        """

Jiahang Chen's avatar
Jiahang Chen committed
249
250
        threading.Thread(target=func).start()

251
    def __json_syn(self, freq=0.1):
Jiahang Chen's avatar
Jiahang Chen committed
252
253
254
255
256
257
        """
        Applies local changes to the original model in the thing object

        :param freq: Frequency of the update
        :type freq: float
        """
Jiahang Chen's avatar
Jiahang Chen committed
258
        while True:
259
            try:
260
261
262
263
264
                time.sleep(freq)
                self.to_json()
            except:
                continue

265
266
    def __dir_syn(self):
        """Applies local changes to the directory entry in the cloud only once.
267
268

        """
269
270
271
272
273
        self.to_dir_json()
        if self.dir_json is not None:
            self.dir.updateThingIDBased(
                thingID=self.thing_id, payload=self.dir_json
            )
Jiahang Chen's avatar
Jiahang Chen committed
274

275
    def __repo_syn(self, freq=0.1):
276
        """Applies local changes to the repository entry in the cloud.
277
278

        :param freq: Frequency of the update.
Jiahang Chen's avatar
Jiahang Chen committed
279
        :type freq: float
280
        """
Jiahang Chen's avatar
Jiahang Chen committed
281
        while self.__is_repo:
282
            try:
283
                time.sleep(freq)
284
285
                old_repo_json = self.repo_json
                self.to_repo_json()
286
                # TODO: Clean up this reqion
287
288
289
                if self.repo_json == old_repo_json:
                    continue
                else:
290
291
292
                    self.repo.updateThingIDBased(
                        thingID=self.thing_id, payload=self.repo_json
                    )
293
            except:
Jiahang Chen's avatar
Jiahang Chen committed
294
295
                continue

C. Albrecht's avatar
WIP    
C. Albrecht committed
296
    def __connect_with_idp(self):
Jiahang Chen's avatar
Jiahang Chen committed
297
298
        """Establishes a connection to the S³I IdentityProvider which guarantees,
        that the JSON web token needed to use s3i services.
299
        be renewed if it has expired.
300
301

        """
Jiahang Chen's avatar
Jiahang Chen committed
302
303
        __log = "[S3I][IdP]: Connect with S3I IdentityProvider"
        APP_LOGGER.info(__log)
C. Albrecht's avatar
WIP    
C. Albrecht committed
304
305
306
307
308
309
310
311
312
313
314
315
316
317
        idp = IdentityProvider(
            grant_type=self.__grant_type,
            client_id=self.__thing_id,
            username=self.__username,
            password=self.__password,
            client_secret=self.__client_secret,
            realm=BaseVariable.IDP_REALM,
            identity_provider_url=BaseVariable.IDP_URL,
        )

        # This may take a while so fetch token directly.
        idp.run_forever(token_type=TokenType.ACCESS_TOKEN, on_new_token=self.__on_token)

    def __on_token(self, token):
Jiahang Chen's avatar
Jiahang Chen committed
318
319
        """Updates the JSON Web Token with token and reestablishes connections
        to the s3i services .
320

Jiahang Chen's avatar
Jiahang Chen committed
321
322
        :param token: New JSON Web token
        :type token: str
323
324
325

        """

C. Albrecht's avatar
WIP    
C. Albrecht committed
326
        self.__access_token = token
Jiahang Chen's avatar
Jiahang Chen committed
327
        self.__connect_with_dir()
Jiahang Chen's avatar
Jiahang Chen committed
328
        self.__connect_with_repo()
C. Albrecht's avatar
WIP    
C. Albrecht committed
329
330
        if self.__is_broker:
            self.__connect_with_broker()
Jiahang Chen's avatar
Jiahang Chen committed
331

Jiahang Chen's avatar
Jiahang Chen committed
332
    def __connect_with_dir(self):
333
        """Initializes the property dir with a Directory object which can be
Jiahang Chen's avatar
Jiahang Chen committed
334
        used to access the s3i Directory.
335
336
337
338
339
340

        :returns:
        :rtype:

        """

Jiahang Chen's avatar
Jiahang Chen committed
341
342
        __log = "[S3I][Dir]: Connect with S3I Directory"
        APP_LOGGER.info(__log)
343
344
345
        self.dir = Directory(
            s3i_dir_url=BaseVariable.DIR_URL, token=self.__access_token
        )
Jiahang Chen's avatar
Jiahang Chen committed
346

C. Albrecht's avatar
WIP    
C. Albrecht committed
347
    def __connect_with_repo(self):
348
        """Initializes the property repo whit a Repository object which can be
Jiahang Chen's avatar
Jiahang Chen committed
349
        used to access the s3i Repository.
350
351
352
353
354
355

        :returns:
        :rtype:

        """

Jiahang Chen's avatar
Jiahang Chen committed
356
357
        __log = "[S3I][Repo]: Connect with S3I Repository"
        APP_LOGGER.info(__log)
358
359
360
        self.repo = Repository(
            s3i_repo_url=BaseVariable.REPO_URL, token=self.__access_token
        )
Jiahang Chen's avatar
Jiahang Chen committed
361

C. Albrecht's avatar
WIP    
C. Albrecht committed
362
    def __connect_with_broker(self):
363
        """Initializes the property broker with a Broker object. Additionally
Jiahang Chen's avatar
Jiahang Chen committed
364
        a callback function is registered which handles incoming S³I-B Messages
365
366
367
368
        messages.

        """

Jiahang Chen's avatar
Jiahang Chen committed
369
370
        __log = "[S3I][Broker]: Connect with S3I Broker"
        APP_LOGGER.info(__log)
Jiahang Chen's avatar
Jiahang Chen committed
371
372
        if self.__is_broker_rest:
            self.broker = BrokerREST(token=self.access_token)
Jiahang Chen's avatar
Jiahang Chen committed
373

Jiahang Chen's avatar
Jiahang Chen committed
374
            def receive():
Jiahang Chen's avatar
Jiahang Chen committed
375
                self.__endpoint = find_broker_endpoint(self.dir, thing_id=self.thing_id)
Jiahang Chen's avatar
Jiahang Chen committed
376
                while True:
377
378
                    try:
                        time.sleep(0.1)
Jiahang Chen's avatar
Jiahang Chen committed
379
380
381
                        msg = self.broker.receive_once(self.__endpoint)
                        msg_to_json = lambda msg: msg if isinstance(msg, dict) else json.loads(msg)
                        if msg == {}:
382
383
                            continue
                        else:
384
385
386
387
                            self.__on_broker_callback(
                                ch=None,
                                method=None,
                                properties=None,
Jiahang Chen's avatar
Jiahang Chen committed
388
                                body=msg_to_json(msg),
389
                            )
390
                    except:
Jiahang Chen's avatar
Jiahang Chen committed
391
                        continue
392

393
            threading.Thread(target=receive).start()
Jiahang Chen's avatar
Jiahang Chen committed
394
395

        else:
396
397
            if self.broker is None:
                # first time to build a broker instance
398
                self.__endpoint = find_broker_endpoint(self.dir, thing_id=self.thing_id)
399
400
401
402
403
404
405
406
407
408
409
                self.broker = Broker(
                    auth_form="Username/Password",
                    username=" ",
                    password=self.__access_token,
                    host=BaseVariable.BROKER_HOST,
                )

                threading.Thread(
                    target=self.broker.receive,
                    args=(self.__endpoint, self.__on_broker_callback),
                ).start()
410
411
            else:
                self.broker.update_token(self.access_token)
C. Albrecht's avatar
WIP    
C. Albrecht committed
412
413

    def __on_broker_callback(self, ch, method, properties, body):
414
415
416
417
418
419
420
421
        """Parses body (content of a S3I-B message) and delegates the
        processing of the message to a separate method. The method is
        selected according to the message's type.

        :param body: S3I-B message

        """

Jiahang Chen's avatar
Jiahang Chen committed
422
        if isinstance(body, bytes):
Jiahang Chen's avatar
Jiahang Chen committed
423
424
425
426
427
            body_str = body.decode('utf8').replace("'", '"')
            try:
                body = json.loads(body_str)
            except ValueError:
                pass
428

Jiahang Chen's avatar
Jiahang Chen committed
429
        elif isinstance(body, int):
430
            pass
Jiahang Chen's avatar
Jiahang Chen committed
431
        elif isinstance(body, str):
432
433
            pass

Jiahang Chen's avatar
Jiahang Chen committed
434
435
        if ch is not None:
            ch.basic_ack(method.delivery_tag)
436
437
438
439
440
441
442
443
444
445
446
447
448
449
450
451
        try:
            message_type = body.get("messageType")
            if message_type == "userMessage":
                self.on_user_message(body)
            elif message_type == "serviceRequest":
                self.on_service_request(body)
            elif message_type == "getValueRequest":
                self.on_get_value_request(body)
            elif message_type == "getValueReply":
                self.on_get_value_reply(body)
            elif message_type == "serviceReply":
                self.on_service_reply(body)
            else:
                ### TODO send user message reply back
                pass
        except AttributeError:
Jiahang Chen's avatar
Jiahang Chen committed
452
            pass
Jiahang Chen's avatar
Jiahang Chen committed
453

454
455
456
457
458
459
    def __send_message_to_broker(self, receiver_endpoints, msg):
        try:
            res = self.broker.send(
                receiver_endpoints=receiver_endpoints,
                msg=json.dumps(msg)
            )
Jiahang Chen's avatar
Jiahang Chen committed
460
461
            __log = "[S3I][Broker]: Send a S3I-B message back to the requester"
            APP_LOGGER.info(__log)
462
463
464
            return res

        except s3i.exception.S3IBrokerAMQPError:
Jiahang Chen's avatar
Jiahang Chen committed
465
466
            __log = "[S3I]: Invalid request sender endpoint {}".format(receiver_endpoints)
            APP_LOGGER.critical(__log)
467

Jiahang Chen's avatar
Jiahang Chen committed
468
    def on_user_message(self, msg):
Jiahang Chen's avatar
Jiahang Chen committed
469
        """Handles incoming S³I-B UserMessages.
470

Jiahang Chen's avatar
Jiahang Chen committed
471
        :param msg: S³I-B UserMessages
472
473

        """
Jiahang Chen's avatar
Jiahang Chen committed
474
475
        __log = "[S3I][Broker]: You have received a S3I-B UserMessage"
        APP_LOGGER.info(__log)
C. Albrecht's avatar
WIP    
C. Albrecht committed
476
477

    def on_get_value_request(self, msg):
Jiahang Chen's avatar
Jiahang Chen committed
478
        """Handles incoming GetValueRequest message. Looks up the value specified in msg and
479
480
481
482
483
        sends a GetValueReply message back to the sender.

        :param msg: GetValueRequest

        """
Jiahang Chen's avatar
Jiahang Chen committed
484
485
        __log = "[S3I][Broker]: You have received a S3I-B GetValueRequest"
        APP_LOGGER.info(__log)
486

Jiahang Chen's avatar
Jiahang Chen committed
487
488
489
490
491
492
493
        get_value_reply = GetValueReply()
        request_sender = msg.get("sender")
        request_msg_id = msg.get("identifier")
        request_sender_endpoint = msg.get("replyToEndpoint")
        attribute_path = msg.get("attributePath")
        reply_msg_uuid = "s3i:" + str(uuid.uuid4())
        try:
Jiahang Chen's avatar
Jiahang Chen committed
494
            __log = "[S3I]: Search the given attribute path: {}".format(attribute_path)
Jiahang Chen's avatar
Jiahang Chen committed
495
            APP_LOGGER.info(__log)
Jiahang Chen's avatar
Jiahang Chen committed
496
497
            value = self._uriToData(attribute_path)
        except KeyError:
Jiahang Chen's avatar
Jiahang Chen committed
498
499
500
            value = "Invalid attribute path"
            __log = "[S3I]: " + value
            APP_LOGGER.critical(__log)
Jiahang Chen's avatar
Jiahang Chen committed
501
502
503

        get_value_reply.fillGetValueReply(
            senderUUID=self.thing_id,
Jiahang Chen's avatar
Jiahang Chen committed
504
            receiverUUIDs=[request_sender],
505
            results=value,
Jiahang Chen's avatar
Jiahang Chen committed
506
507
508
            msgUUID=reply_msg_uuid,
            replyingToUUID=request_msg_id,
        )
Jiahang Chen's avatar
Jiahang Chen committed
509

510
        res = self.__send_message_to_broker(
Jiahang Chen's avatar
Jiahang Chen committed
511
            receiver_endpoints=[request_sender_endpoint],
512
            msg=get_value_reply.msg
Jiahang Chen's avatar
Jiahang Chen committed
513
        )
514

515
516
        if self.__is_broker_rest:
            if res.status_code == 201:
Jiahang Chen's avatar
Jiahang Chen committed
517
518
                __log = "[S3I][Broker]: Send S3I-B GetValueReply back to the requester"
                APP_LOGGER.info(__log)
519
            else:
Jiahang Chen's avatar
Jiahang Chen committed
520
521
                __log = "[S3I[Broker]: " + res.text
                APP_LOGGER.info(__log)
Jiahang Chen's avatar
Jiahang Chen committed
522
523

    def _uriToData(self, uri):
524
525
526
527
528
529
530
        """Returns a copy of the value found at uri.

        :param uri: Path to value
        :rtype: Feature

        """

Jiahang Chen's avatar
Jiahang Chen committed
531
        if uri == "":
532
            return self.dt_json
Jiahang Chen's avatar
Jiahang Chen committed
533
534
535
536
        else:
            uri_list = uri.split("/")
            if uri_list[0] == "features":
                try:
537
                    return self.dt_json[uri]
Jiahang Chen's avatar
Jiahang Chen committed
538
                except KeyError:
Jiahang Chen's avatar
Jiahang Chen committed
539
                    return "Invalid attribute path"
Jiahang Chen's avatar
Jiahang Chen committed
540
541

            try:
542
                self._getValue(self.dt_json, uri_list)
Jiahang Chen's avatar
Jiahang Chen committed
543
            except:
Jiahang Chen's avatar
Jiahang Chen committed
544
                return "Invalid attribute path"
Jiahang Chen's avatar
Jiahang Chen committed
545
            if self.__resGetValue.__len__() == 0:
Jiahang Chen's avatar
Jiahang Chen committed
546
                return "Invalid attribute path"
Jiahang Chen's avatar
Jiahang Chen committed
547
548
549
550
551
552
553
554
            response = copy.deepcopy(self.__resGetValue)
            self.__resGetValue.clear()
            if response.__len__() == 1:
                return response[0]
            else:
                return response

    def _getValue(self, source, uri_list):
555
556
557
558
559
560
561
562
563
564
        """Searches for the value specified by uri_list in source and stores
        the result in __resGetValue.

        :param source: Object that is scanned
        :param uri_list: List containing path

        """

        # ??? What if the uri points to a Value object?
        # Shouldn't it be serialized?!
Jiahang Chen's avatar
Jiahang Chen committed
565
566
567
568
569
570
571
        value = source[uri_list[0]]
        if uri_list.__len__() == 1:
            # if is ditto-feature
            if isinstance(value, str):
                try:
                    stringValue_split = value.split(":")
                    if stringValue_split[0] == "ditto-feature":
572
573
574
                        value = self.dt_json["features"][stringValue_split[1]][
                            "properties"
                        ][uri_list[0]]
Jiahang Chen's avatar
Jiahang Chen committed
575
576
577
578
579
                except:
                    pass
            self.__resGetValue.append(value)
            return
        if isinstance(value, dict):
580
            # ??? uri_list.pop(0) better?!
Jiahang Chen's avatar
Jiahang Chen committed
581
582
583
584
585
586
587
588
589
590
591
592
593
            del uri_list[0]
            self._getValue(value, uri_list)
        if isinstance(value, list):
            if isinstance(value[0], (str, int, float, bool, list)):
                return value
            if isinstance(value[0], dict):
                for item in value:
                    if item["class"] == "ml40::Thing":
                        for i in item["roles"]:
                            if self._findValue(i, uri_list[1]):
                                uri_list_1 = copy.deepcopy(uri_list)
                                del uri_list_1[:2]
                                self._getValue(item, uri_list_1)
Jiahang Chen's avatar
Jiahang Chen committed
594
595
596
597
598
599
                        _f = self._findValue({"identifier": item.get("identifier")}, uri_list[1]) or \
                             self._findValue({"name": item.get("name")}, uri_list[1])
                        if _f:
                            uri_list_1 = copy.deepcopy(uri_list)
                            del uri_list_1[:2]
                            self._getValue(item, uri_list_1)
Jiahang Chen's avatar
Jiahang Chen committed
600
601
602
603
604
605
606
607
608
609
610
611
612
613
614
                    else:
                        if self._findValue(item, uri_list[1]):
                            uri_list_1 = copy.deepcopy(uri_list)
                            del uri_list_1[:2]
                            if not uri_list_1:
                                self.__resGetValue.append(item)
                                return
                            else:
                                self._getValue(item, uri_list_1)
        if isinstance(value, (str, int, float, bool)):
            # if is ditto-feature
            if isinstance(value, str):
                try:
                    stringValue_split = value.split(":")
                    if stringValue_split[0] == "ditto-feature":
615
                        value = self.dt_json["features"][stringValue_split[1]][
Jiahang Chen's avatar
Jiahang Chen committed
616
617
618
619
620
621
622
                            "properties"
                        ][uri_list[0]]
                except:
                    pass
            self.__resGetValue.append(value)

    def _findValue(self, json, value):
623
624
625
626
627
628
629
630
631
632
        """Returns true if value has been found in json, otherwise returns false.

        :param json: dictionary
        :param value:
        :returns:
        :rtype:

        """

        # TODO: Simplify: value in json.values()
Jiahang Chen's avatar
Jiahang Chen committed
633
634
635
636
637
        for item in json:
            if json[item] == value:
                # print("Parameter: ", json[item])
                return True
        return False
C. Albrecht's avatar
WIP    
C. Albrecht committed
638
639

    def on_service_request(self, body_json):
Jiahang Chen's avatar
Jiahang Chen committed
640
641
        """Handles S³I-B ServiceRequests. Executes the method of the
        functionality specified in serviceType and send a ServiceReply
642
643
644
645
646
        back to the sender.

        :param body_json: ServiceRequest

        """
Jiahang Chen's avatar
Jiahang Chen committed
647
        __log = "[S3I][Broker]: You have received a S3I-B ServiceRequest"
C. Albrecht's avatar
WIP    
C. Albrecht committed
648
        service_type = body_json.get("serviceType")
649
        parameters = body_json.get("parameters")
650
        service_reply = ServiceReply()
Jiahang Chen's avatar
Jiahang Chen committed
651
        service_functionality = service_type.split('/')[0]
Jiahang Chen's avatar
Jiahang Chen committed
652
        service_functionality_obj = self.features.get(service_functionality)
Jiahang Chen's avatar
Jiahang Chen committed
653
        if service_functionality_obj is None:
C. Albrecht's avatar
WIP    
C. Albrecht committed
654
            APP_LOGGER.critical(
Jiahang Chen's avatar
Jiahang Chen committed
655
                "[S3I]: Functionality %s is not one of the built-in functionalities in %s!"
656
                % (service_functionality, self.name)
C. Albrecht's avatar
WIP    
C. Albrecht committed
657
            )
GromeTT's avatar
GromeTT committed
658
659
            service_reply.fillServiceReply(
                senderUUID=self.thing_id,
Jiahang Chen's avatar
Jiahang Chen committed
660
                receiverUUIDs=[body_json.get("sender", None)],
GromeTT's avatar
GromeTT committed
661
662
663
664
665
                serviceType=body_json.get("serviceType", None),
                results={"error": "invalid functionalities (serviceType) {}".format(service_functionality)},
                replyingToUUID=body_json.get("identifier", None),
                msgUUID="s3i:{}".format(uuid.uuid4())
            )
666
667
668
669
670
671
        else:
            # TODO: Call right functionality.
            try:
                method = getattr(service_functionality_obj, service_type.split('/')[1])
            except AttributeError:
                APP_LOGGER.critical(
Jiahang Chen's avatar
Jiahang Chen committed
672
                    "[S3I]: Method %s is not one of the built-in functionalities in %s!" % (
673
674
675
676
                        service_type.split('/')[1], self.name)
                )
                service_reply.fillServiceReply(
                    senderUUID=self.thing_id,
Jiahang Chen's avatar
Jiahang Chen committed
677
                    receiverUUIDs=[body_json.get("sender", None)],
678
                    serviceType=body_json.get("serviceType", None),
679
680
681
682
683
684
                    results={"error": "invalid method {}".format(service_type.split('/')[1])},
                    replyingToUUID=body_json.get("identifier", None),
                    msgUUID="s3i:{}".format(uuid.uuid4())
                )
            except IndexError:
                APP_LOGGER.critical(
Jiahang Chen's avatar
Jiahang Chen committed
685
                    "[S3I]: ServiceType consists of functionality and method name."
686
687
688
689
690
691
                )
                service_reply.fillServiceReply(
                    senderUUID=self.thing_id,
                    receiverUUIDs=[body_json.get("sender", None)],
                    serviceType=body_json.get("serviceType", None),
                    results={"error": "method missing"},
692
693
694
695
                    replyingToUUID=body_json.get("identifier", None),
                    msgUUID="s3i:{}".format(uuid.uuid4())
                )
            else:
Jiahang Chen's avatar
Jiahang Chen committed
696
697
698
                __log = "[S3I][Broker]: Execute the function {0} of the class {1}".format(service_type.split('/')[1],
                                                                                          service_type.split('/')[0])
                APP_LOGGER.info(__log)
699
700
701
                try:
                    result = method(**parameters)
                except TypeError:
Jiahang Chen's avatar
Jiahang Chen committed
702
                    APP_LOGGER.critical("[S3I]: Invalid function arguments")
703
704
                    service_reply.fillServiceReply(
                        senderUUID=self.thing_id,
Jiahang Chen's avatar
Jiahang Chen committed
705
                        receiverUUIDs=[body_json.get("sender", None)],
706
707
708
709
                        serviceType=body_json.get("serviceType", None),
                        results={"error": "invalid function arguments (parameters)"},
                        replyingToUUID=body_json.get("identifier", None),
                        msgUUID="s3i:{}".format(uuid.uuid4())
710
711
                    )
                else:
712
713
                    if isinstance(result, bool):
                        result = {"ok": result}
714
                    elif result is None:
715
                        return
716
717
                    service_reply.fillServiceReply(
                        senderUUID=self.thing_id,
Jiahang Chen's avatar
Jiahang Chen committed
718
                        receiverUUIDs=[body_json.get("sender", None)],
719
720
721
722
723
724
                        serviceType=body_json.get("serviceType", None),
                        results=result,
                        replyingToUUID=body_json.get("identifier", None),
                        msgUUID="s3i:{}".format(uuid.uuid4())
                    )

725
726
727
728
729
        res = self.__send_message_to_broker(
            receiver_endpoints=[body_json.get("replyToEndpoint", None)],
            msg=service_reply.msg
        )

730
731
        if self.__is_broker_rest:
            if res.status_code == 201:
Jiahang Chen's avatar
Jiahang Chen committed
732
733
                __log = "[S3I][Broker]: Send a S3I-B ServiceReply back to the requester"
                APP_LOGGER.info(__log)
734
            else:
Jiahang Chen's avatar
Jiahang Chen committed
735
                APP_LOGGER.critical(__log)
Jiahang Chen's avatar
Jiahang Chen committed
736
737

    def on_get_value_reply(self, msg):
Jiahang Chen's avatar
Jiahang Chen committed
738
        """Handles incoming S³I-B GetValueReply. Prints the content of msg to stdout.
739
740
741
742
743
744
745

        :param msg: GetValueReply

        """

        # ???: Behavior should be defined by the user! Maybe he want
        # to process the result!
Jiahang Chen's avatar
Jiahang Chen committed
746
747
        __log = "[S3I][Broker]: You have received a S3I-B GetValueReply"
        APP_LOGGER.info(__log)
Jiahang Chen's avatar
Jiahang Chen committed
748
749
750
        value = msg.get("value", None)
        if isinstance(value, dict):
            value = json.dumps(value, indent=2)
Jiahang Chen's avatar
Jiahang Chen committed
751
752
        __log = "[S3I][Broker]: The queried value is: {0}".format(value)
        APP_LOGGER.info(__log)
Jiahang Chen's avatar
Jiahang Chen committed
753

Jiahang Chen's avatar
Jiahang Chen committed
754
    def on_service_reply(self, msg):
Jiahang Chen's avatar
Jiahang Chen committed
755
        """Handles incoming S³I-B ServiceReply. Prints the content of msg to stdout.
756
757
758
759

        :param msg: ServiceReply

        """
Jiahang Chen's avatar
Jiahang Chen committed
760
761
        __log = "[S3I][Broker]: You have received a S3I-B ServiceReply"
        APP_LOGGER.info(__log)
Jiahang Chen's avatar
Jiahang Chen committed
762
763
764
765
        results = msg.get("results", None)
        if isinstance(results, dict):
            results = json.dumps(results, indent=2)

Jiahang Chen's avatar
Jiahang Chen committed
766
767
        __log = "[S3I][Broker]: The result is: {0}".format(results)
        APP_LOGGER.info(__log)
Jiahang Chen's avatar
Jiahang Chen committed
768
769

    def to_dir_json(self):
770
771
772
773
774
775
776
        """Returns a dictionary representing this thing's directory entry.

        :returns: Directory representation of this object
        :rtype: dict

        """

Jiahang Chen's avatar
Jiahang Chen committed
777
778
779
780
781
        self.dir_json = self.dir.queryThingIDBased(self.thing_id)
        if self.thing_id is not None:
            self.dir_json["thingId"] = self.thing_id
        if self.policy_id is not None:
            self.dir_json["policyId"] = self.policy_id
Jiahang Chen's avatar
Jiahang Chen committed
782
783
784
785
786
        if self.name is not None:
            self.dir_json["attributes"]["name"] = self.name
        if self.features.get("ml40::Location") is not None:
            self.dir_json["attributes"]["location"] = {
                "longitude": self.features.get("ml40::Location").to_json()["longitude"],
GromeTT's avatar
GromeTT committed
787
                "latitude": self.features.get("ml40::Location").to_json()["latitude"]
Jiahang Chen's avatar
Jiahang Chen committed
788
789
790
791
            }
        self.dir_json["attributes"]["dataModel"] = "fml40"
        self.dir_json["attributes"]["thingStructure"] = {
            "class": "ml40::Thing",
GromeTT's avatar
GromeTT committed
792
            "links": []
Jiahang Chen's avatar
Jiahang Chen committed
793
        }
Jiahang Chen's avatar
Jiahang Chen committed
794
        for key in self.roles.keys():
GromeTT's avatar
GromeTT committed
795
796
797
798
            role_entry = {
                "association": "roles",
                "target": self.roles[key].to_json()
            }
Jiahang Chen's avatar
Jiahang Chen committed
799
            self.dir_json["attributes"]["thingStructure"]["links"].append(role_entry)
Jiahang Chen's avatar
Jiahang Chen committed
800

Jiahang Chen's avatar
Jiahang Chen committed
801
        for key in self.features.keys():
Jiahang Chen's avatar
Jiahang Chen committed
802
803
            feature_target = {
                "class": self.features[key].to_json()["class"],
Jiahang Chen's avatar
Jiahang Chen committed
804
            }
Jiahang Chen's avatar
Jiahang Chen committed
805
806
807
            if self.features[key].to_json().get("identifier") is not None:
                feature_target["identifier"] = self.features[key].to_json()["identifier"]

808
            feature_entry = {"association": "features", "target": feature_target}
Jiahang Chen's avatar
Jiahang Chen committed
809
            # if the feature has targets, like ml40::Composite
810
            if hasattr(self.features[key], "targets"):
Jiahang Chen's avatar
Jiahang Chen committed
811
812
                feature_entry["target"]["links"] = list()
                for target in self.features[key].targets.keys():
813
814
815
                    target_json = (
                        self.features[key].targets[target].to_subthing_dir_json()
                    )
Jiahang Chen's avatar
Jiahang Chen committed
816
817
                    feature_entry["target"]["links"].append(target_json)
            self.dir_json["attributes"]["thingStructure"]["links"].append(feature_entry)
Jiahang Chen's avatar
Jiahang Chen committed
818
819
        return self.dir_json

820
    def to_repo_json(self):
821
822
823
824
825
826
827
        """Returns a dictionary representing this thing's repository entry.

        :returns: Repository representation of this object
        :rtype: dict

        """

828
        self.repo_json = self.dt_json
Jiahang Chen's avatar
Jiahang Chen committed
829
830
        return self.repo_json

831
    def to_json(self):
832
833
834
835
836
837
838
        """Returns a dictionary representing this thing in it's current state.

        :returns: Representation of this object
        :rtype: dict

        """

839
840
841
842
843
844
        self.dt_json = {
            "thingId": self.thing_id,
            "policyId": self.policy_id,
            "attributes": {
                "class": "ml40::Thing",
                "name": self.name,
845
            },
846
847
848
849
850
        }
        if self.roles:
            self.dt_json["attributes"]["roles"] = list()
        if self.features:
            self.dt_json["attributes"]["features"] = list()
Jiahang Chen's avatar
Jiahang Chen committed
851
        if self.ditto_features:
Jiahang Chen's avatar
Jiahang Chen committed
852
            self.dt_json["features"] = dict()
853
854
        for key in self.roles.keys():
            self.dt_json["attributes"]["roles"].append(self.roles[key].to_json())
Jiahang Chen's avatar
Jiahang Chen committed
855
        for key in self.features.keys():
856
            self.dt_json["attributes"]["features"].append(self.features[key].to_json())
Jiahang Chen's avatar
Jiahang Chen committed
857
858
        for key in self.ditto_features.keys():
            self.dt_json["features"][key] = self.ditto_features[key].to_json()
859
860
        return self.dt_json

Jiahang Chen's avatar
Jiahang Chen committed
861
    def to_subthing_json(self):
862
863
864
        """Returns a dictionary representing this thing in it's current state
        as a subordinate thing. This representation should be used for
        subordinate things in s3i repository entries.
865

866
867
        :returns: Representation of this object as a subordinate thing
        :rtype: dict
868
869
870

        """

Jiahang Chen's avatar
Jiahang Chen committed
871
872
873
874
        json_out = {
            "class": "ml40::Thing",
            "name": self.name,
            "roles": [],
875
            "features": [],
Jiahang Chen's avatar
Jiahang Chen committed
876
        }
Jiahang Chen's avatar
Jiahang Chen committed
877
878
        if self.thing_id:
            json_out["identifier"] = self.thing_id
Jiahang Chen's avatar
Jiahang Chen committed
879
        else:
Jiahang Chen's avatar
Jiahang Chen committed
880
881
            if self.model["attributes"].get("identifier") is not None:
                json_out["identifier"] = self.model["attributes"]["identifier"]
Jiahang Chen's avatar
Jiahang Chen committed
882
883
884
885
886
        for key in self.roles.keys():
            json_out["roles"].append(self.roles[key].to_json())
        for key in self.features.keys():
            json_out["features"].append(self.features[key].to_json())
        return json_out
Jiahang Chen's avatar
Jiahang Chen committed
887
888

    def to_subthing_dir_json(self):
889
890
891
892
893
894
895
896
897
        """Returns a dictionary representing this thing in it's current state
        as a subordinate thing. This representation should be used for
        subordinate things in s3i directory entries.

        :returns: Representation of this object as a subordinate thing.
        :rtype: dict

        """

Jiahang Chen's avatar
Jiahang Chen committed
898
899
900
        json_out = {"class": "ml40::Thing", "links": []}
        if self.thing_id:
            json_out["identifier"] = self.thing_id
Jiahang Chen's avatar
Jiahang Chen committed
901
        for key in self.roles.keys():
902
            role_entry = {"association": "roles", "target": self.roles[key].to_json()}
Jiahang Chen's avatar
Jiahang Chen committed
903
904
            json_out["links"].append(role_entry)
        for key in self.features.keys():
Jiahang Chen's avatar
Jiahang Chen committed
905
906
            feature_target = {
                "class": self.features[key].to_json()["class"],
Jiahang Chen's avatar
Jiahang Chen committed
907
            }
Jiahang Chen's avatar
Jiahang Chen committed
908
909
            if self.features[key].to_json().get("identifier") is not None:
                feature_target["identifier"] = self.features[key].to_json()["identifier"]
910
            feature_entry = {"association": "features", "target": feature_target}
Jiahang Chen's avatar
Jiahang Chen committed
911
912
            json_out["links"].append(feature_entry)
        return json_out