thing.py 31 KB
Newer Older
1
2
3
"""This module implements the thing class which is the core element of
this package."""

C. Albrecht's avatar
WIP    
C. Albrecht committed
4
5
6
import threading
import json
import uuid
7
8
9
10
import time
import copy
from s3i import IdentityProvider, TokenType, GetValueReply, Directory
from s3i import Repository
11
import s3i.exception
Jiahang Chen's avatar
Jiahang Chen committed
12
from s3i.broker import Broker, BrokerREST
Jiahang Chen's avatar
Jiahang Chen committed
13
14
from s3i.messages import ServiceReply
from ml.tools import find_broker_endpoint
C. Albrecht's avatar
WIP    
C. Albrecht committed
15
from ml.app_logger import APP_LOGGER
Jiahang Chen's avatar
Jiahang Chen committed
16
17


18
19
20
class BaseVariable:
    """The BaseVariable class holds various urls to s3i services."""

C. Albrecht's avatar
WIP    
C. Albrecht committed
21
22
23
24
25
26
27
28
    IDP_URL = "https://idp.s3i.vswf.dev/"
    IDP_REALM = "KWH"
    BROKER_HOST = "rabbitmq.s3i.vswf.dev"
    REPO_WWS_URL = "wss://ditto.s3i.vswf.dev/ws/2"
    REPO_URL = "https://ditto.s3i.vswf.dev/api/2/"
    DIR_URL = "https://dir.s3i.vswf.dev/api/2/"


Jiahang Chen's avatar
Jiahang Chen committed
29
class Thing:
30
    """The thing class represents a customizable runtime environment for
Jiahang Chen's avatar
Jiahang Chen committed
31
    operating Digital Twins complying the Forest Modeling Language (fml40)."""
32

C. Albrecht's avatar
WIP    
C. Albrecht committed
33
    def __init__(
Jiahang Chen's avatar
Jiahang Chen committed
34
35
36
37
38
39
40
41
42
            self,
            model: dict,
            client_secret="",
            grant_type="password",
            is_broker=False,
            is_repo=False,
            is_broker_rest=True,
            username=None,
            password=None,
C. Albrecht's avatar
WIP    
C. Albrecht committed
43
    ):
Jiahang Chen's avatar
Jiahang Chen committed
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
        """
        Constructor

        :param model: edge-device or S³I Repository specified JSON entry, like config file for Digital Twins
        :type model: dict
        :param client_secret: OAuth 2.0 specified client secret, generated in the S³I IdentityProvider
        :type client_secret: str
        :param grant_type: OAuth 2.0 specified grant type to issue a JWT. Here the grant type can be password or client_credentials
        :type grant_type: str
        :param is_broker: whether broker interface is enabled in the ml40::thing instance
        :type is_broker: bool
        :param is_broker_rest: Whether the connection with the S³I Broker is established via HTTP REST
        :type is_broker_rest: bool
        :param is_repo: Whether the thing uses the S³I Repository to launch its Digital Twin in the cloud
        :type is_repo: bool
        :param username: OAuth 2.0 specified username, registered in the S³I IdentityProvider. If the grant_type is set as password, the username is required
        :type username: str
        :param password: OAuth 2.0 specified password, registered in the S³I IdentityProvider. If the grant_type is set as password, the password is required

        """
64

Jiahang Chen's avatar
Jiahang Chen committed
65
        self.__model = model
C. Albrecht's avatar
WIP    
C. Albrecht committed
66
67
68
69
70
71
        self.__thing_id = model.get("thingId", "")
        self.__policy_id = model.get("policyId", "")
        self.__grant_type = grant_type
        self.__username = username
        self.__password = password
        self.__client_secret = client_secret
Jiahang Chen's avatar
Jiahang Chen committed
72

C. Albrecht's avatar
WIP    
C. Albrecht committed
73
        self.__is_broker = is_broker
Jiahang Chen's avatar
Jiahang Chen committed
74
        self.__is_broker_rest = is_broker_rest
C. Albrecht's avatar
WIP    
C. Albrecht committed
75
        self.__is_repo = is_repo
Jiahang Chen's avatar
Jiahang Chen committed
76

C. Albrecht's avatar
WIP    
C. Albrecht committed
77
        self.__access_token = ""
Jiahang Chen's avatar
Jiahang Chen committed
78
79
        self.__endpoint = ""

C. Albrecht's avatar
WIP    
C. Albrecht committed
80
        self.__ws_connected = False
Jiahang Chen's avatar
Jiahang Chen committed
81
82
        self.broker = None
        self.ws = None
83
84
        # TODO: Change the variable name dir, because it is a builtin
        # name.
Jiahang Chen's avatar
Jiahang Chen committed
85
        self.dir = None
Jiahang Chen's avatar
Jiahang Chen committed
86
87
        self.repo = None

88
        # : Dict[<str>, <str>]
Jiahang Chen's avatar
Jiahang Chen committed
89
90
        self.repo_json = dict()
        self.dir_json = dict()
91
        self.dt_json = dict()
C. Albrecht's avatar
WIP    
C. Albrecht committed
92
93
94

        attributes = model.get("attributes", None)
        self.__name = ""
Jiahang Chen's avatar
Jiahang Chen committed
95
        self.__roles = {}
Jiahang Chen's avatar
Jiahang Chen committed
96
        self.__features = {}
Jiahang Chen's avatar
Jiahang Chen committed
97
        self.__ditto_features = {}
98
        # ??? Is this property necessary? Only used as return value in _getValue()
Jiahang Chen's avatar
Jiahang Chen committed
99
        self.__resGetValue = list()
C. Albrecht's avatar
WIP    
C. Albrecht committed
100
101
        if attributes:
            self.__name = attributes.get("name", "")
Jiahang Chen's avatar
Jiahang Chen committed
102
103
104

    @property
    def model(self):
Jiahang Chen's avatar
Jiahang Chen committed
105
        """Returns the specification JSON from which this thing has been constructed from.
106
107
108
109
110
111

        :returns: Representation of a ml40 compliant thing.
        :rtype: dict

        """

Jiahang Chen's avatar
Jiahang Chen committed
112
        return self.__model
Jiahang Chen's avatar
Jiahang Chen committed
113

Jiahang Chen's avatar
Jiahang Chen committed
114
115
116
117
118
119
120
121
    @property
    def ditto_features(self):
        return self.__ditto_features

    @ditto_features.setter
    def ditto_features(self, value):
        self.__ditto_features = value

Jiahang Chen's avatar
Jiahang Chen committed
122
123
    @property
    def features(self):
Jiahang Chen's avatar
Jiahang Chen committed
124
        """Returns thing's features.
125

126
127
        :returns: Features
        :rtype: dict
128
129
130

        """

Jiahang Chen's avatar
Jiahang Chen committed
131
132
133
134
        return self.__features

    @features.setter
    def features(self, value):
Jiahang Chen's avatar
Jiahang Chen committed
135
        """Replaces thing's features with value.
136
137
138
139
140

        :param value: New collection of features

        """

Jiahang Chen's avatar
Jiahang Chen committed
141
142
        self.__features = value

Jiahang Chen's avatar
Jiahang Chen committed
143
144
    @property
    def roles(self):
145
        """Returns the thing's roles.
146
147
148

        :returns: ml40 roles
        :rtype: dict
149

150
151
        """

Jiahang Chen's avatar
Jiahang Chen committed
152
153
154
155
        return self.__roles

    @roles.setter
    def roles(self, value):
Jiahang Chen's avatar
Jiahang Chen committed
156
        """Replaces thing's roles with value
157

158
        :param value: New collection of roles
159
160
        """

Jiahang Chen's avatar
Jiahang Chen committed
161
162
        self.__roles = value

C. Albrecht's avatar
WIP    
C. Albrecht committed
163
164
    @property
    def client_secret(self):
165
166
167
168
        """Returns the client secret.

        :returns: Client secret
        :rtype: str
169

170
171
        """

C. Albrecht's avatar
WIP    
C. Albrecht committed
172
173
174
175
        return self.__client_secret

    @property
    def grant_type(self):
Jiahang Chen's avatar
Jiahang Chen committed
176
        """Returns the method used to obtain JSON Web Tokens from the S³I IdentityProvider
177

Jiahang Chen's avatar
Jiahang Chen committed
178
        :returns: OAuth2 specified grant type [password, client_credentials]
179
        :rtype: str
180

181
182
        """

C. Albrecht's avatar
WIP    
C. Albrecht committed
183
184
185
186
        return self.__grant_type

    @property
    def access_token(self):
Jiahang Chen's avatar
Jiahang Chen committed
187
        """Returns the current JSON Web token.
188

Jiahang Chen's avatar
Jiahang Chen committed
189
        :returns: JSON Web token
190
        :rtype: str
191

192
        """
C. Albrecht's avatar
WIP    
C. Albrecht committed
193
194
195
196
        return self.__access_token

    @property
    def name(self):
197
198
199
200
        """Returns the name of this thing.

        :returns: name
        :rtype: str
201

202
203
        """

C. Albrecht's avatar
WIP    
C. Albrecht committed
204
205
206
207
        return self.__name

    @property
    def thing_id(self):
208
209
210
211
        """Returns the identifier of this thing.

        :returns: identifier
        :rtype: str
212

213
214
        """

C. Albrecht's avatar
WIP    
C. Albrecht committed
215
216
        return self.__thing_id

Jiahang Chen's avatar
Jiahang Chen committed
217
218
    @property
    def policy_id(self):
219
220
221
222
223
224
225
        """Returns the identifier of this thing's policy.

        :returns: identifier
        :rtype: str

        """

Jiahang Chen's avatar
Jiahang Chen committed
226
227
        return self.__policy_id

C. Albrecht's avatar
WIP    
C. Albrecht committed
228
    def run_forever(self):
229
        """Starts the thing in permanent mode.
230

231
        """
Jiahang Chen's avatar
Jiahang Chen committed
232
233
        __log = "[S3I]: Launch {}".format(self.name)
        APP_LOGGER.info(__log)
C. Albrecht's avatar
WIP    
C. Albrecht committed
234
        self.__connect_with_idp()
Jiahang Chen's avatar
Jiahang Chen committed
235
        self.__dir_syn()
236
        threading.Thread(target=self.__json_syn).start()
Jiahang Chen's avatar
Jiahang Chen committed
237
238
239
240
241
        if self.__is_repo:
            threading.Thread(target=self.__repo_syn).start()

    @staticmethod
    def add_user_def(func):
Jiahang Chen's avatar
Jiahang Chen committed
242
        """Insert user-specified function in the thing object.
243

Jiahang Chen's avatar
Jiahang Chen committed
244
        :param func: external defined function to be executed.
245
246
247

        """

Jiahang Chen's avatar
Jiahang Chen committed
248
249
        threading.Thread(target=func).start()

250
    def __json_syn(self, freq=0.1):
Jiahang Chen's avatar
Jiahang Chen committed
251
252
253
254
255
256
        """
        Applies local changes to the original model in the thing object

        :param freq: Frequency of the update
        :type freq: float
        """
Jiahang Chen's avatar
Jiahang Chen committed
257
        while True:
258
            try:
259
260
261
262
263
                time.sleep(freq)
                self.to_json()
            except:
                continue

264
265
    def __dir_syn(self):
        """Applies local changes to the directory entry in the cloud only once.
266
267

        """
268
269
270
271
272
        self.to_dir_json()
        if self.dir_json is not None:
            self.dir.updateThingIDBased(
                thingID=self.thing_id, payload=self.dir_json
            )
Jiahang Chen's avatar
Jiahang Chen committed
273

274
    def __repo_syn(self, freq=0.1):
275
        """Applies local changes to the repository entry in the cloud.
276
277

        :param freq: Frequency of the update.
Jiahang Chen's avatar
Jiahang Chen committed
278
        :type freq: float
279
        """
Jiahang Chen's avatar
Jiahang Chen committed
280
        while self.__is_repo:
281
            try:
282
                time.sleep(freq)
283
284
                old_repo_json = self.repo_json
                self.to_repo_json()
285
                # TODO: Clean up this reqion
286
287
288
                if self.repo_json == old_repo_json:
                    continue
                else:
289
290
291
                    self.repo.updateThingIDBased(
                        thingID=self.thing_id, payload=self.repo_json
                    )
292
            except:
Jiahang Chen's avatar
Jiahang Chen committed
293
294
                continue

C. Albrecht's avatar
WIP    
C. Albrecht committed
295
    def __connect_with_idp(self):
Jiahang Chen's avatar
Jiahang Chen committed
296
297
        """Establishes a connection to the S³I IdentityProvider which guarantees,
        that the JSON web token needed to use s3i services.
298
        be renewed if it has expired.
299
300

        """
Jiahang Chen's avatar
Jiahang Chen committed
301
302
        __log = "[S3I][IdP]: Connect with S3I IdentityProvider"
        APP_LOGGER.info(__log)
C. Albrecht's avatar
WIP    
C. Albrecht committed
303
304
305
306
307
308
309
310
311
312
313
314
315
316
        idp = IdentityProvider(
            grant_type=self.__grant_type,
            client_id=self.__thing_id,
            username=self.__username,
            password=self.__password,
            client_secret=self.__client_secret,
            realm=BaseVariable.IDP_REALM,
            identity_provider_url=BaseVariable.IDP_URL,
        )

        # This may take a while so fetch token directly.
        idp.run_forever(token_type=TokenType.ACCESS_TOKEN, on_new_token=self.__on_token)

    def __on_token(self, token):
Jiahang Chen's avatar
Jiahang Chen committed
317
318
        """Updates the JSON Web Token with token and reestablishes connections
        to the s3i services .
319

Jiahang Chen's avatar
Jiahang Chen committed
320
321
        :param token: New JSON Web token
        :type token: str
322
323

        """
C. Albrecht's avatar
WIP    
C. Albrecht committed
324
        self.__access_token = token
Jiahang Chen's avatar
Jiahang Chen committed
325
        self.__connect_with_dir()
Jiahang Chen's avatar
Jiahang Chen committed
326
        self.__connect_with_repo()
C. Albrecht's avatar
WIP    
C. Albrecht committed
327
328
        if self.__is_broker:
            self.__connect_with_broker()
Jiahang Chen's avatar
Jiahang Chen committed
329

Jiahang Chen's avatar
Jiahang Chen committed
330
    def __connect_with_dir(self):
331
        """Initializes the property dir with a Directory object which can be
Jiahang Chen's avatar
Jiahang Chen committed
332
        used to access the s3i Directory.
333
334
335
336
337
338

        :returns:
        :rtype:

        """

Jiahang Chen's avatar
Jiahang Chen committed
339
340
        __log = "[S3I][Dir]: Connect with S3I Directory"
        APP_LOGGER.info(__log)
341
342
343
        self.dir = Directory(
            s3i_dir_url=BaseVariable.DIR_URL, token=self.__access_token
        )
Jiahang Chen's avatar
Jiahang Chen committed
344

C. Albrecht's avatar
WIP    
C. Albrecht committed
345
    def __connect_with_repo(self):
346
        """Initializes the property repo whit a Repository object which can be
Jiahang Chen's avatar
Jiahang Chen committed
347
        used to access the s3i Repository.
348
349
350
351
352
353

        :returns:
        :rtype:

        """

Jiahang Chen's avatar
Jiahang Chen committed
354
355
        __log = "[S3I][Repo]: Connect with S3I Repository"
        APP_LOGGER.info(__log)
356
357
358
        self.repo = Repository(
            s3i_repo_url=BaseVariable.REPO_URL, token=self.__access_token
        )
Jiahang Chen's avatar
Jiahang Chen committed
359

C. Albrecht's avatar
WIP    
C. Albrecht committed
360
    def __connect_with_broker(self):
361
        """Initializes the property broker with a Broker object. Additionally
Jiahang Chen's avatar
Jiahang Chen committed
362
        a callback function is registered which handles incoming S³I-B Messages
363
364
365
366
        messages.

        """

Jiahang Chen's avatar
Jiahang Chen committed
367
368
        __log = "[S3I][Broker]: Connect with S3I Broker"
        APP_LOGGER.info(__log)
Jiahang Chen's avatar
Jiahang Chen committed
369
370
        if self.__is_broker_rest:
            self.broker = BrokerREST(token=self.access_token)
Jiahang Chen's avatar
Jiahang Chen committed
371

Jiahang Chen's avatar
Jiahang Chen committed
372
            def receive():
Jiahang Chen's avatar
Jiahang Chen committed
373
                self.__endpoint = find_broker_endpoint(self.dir, thing_id=self.thing_id)
Jiahang Chen's avatar
Jiahang Chen committed
374
                while True:
375
376
                    try:
                        time.sleep(0.1)
Jiahang Chen's avatar
Jiahang Chen committed
377
378
379
                        msg = self.broker.receive_once(self.__endpoint)
                        msg_to_json = lambda msg: msg if isinstance(msg, dict) else json.loads(msg)
                        if msg == {}:
380
381
                            continue
                        else:
382
383
384
385
                            self.__on_broker_callback(
                                ch=None,
                                method=None,
                                properties=None,
Jiahang Chen's avatar
Jiahang Chen committed
386
                                body=msg_to_json(msg),
387
                            )
388
                    except:
Jiahang Chen's avatar
Jiahang Chen committed
389
                        continue
390

391
            threading.Thread(target=receive).start()
Jiahang Chen's avatar
Jiahang Chen committed
392
393

        else:
394
395
            if self.broker is None:
                # first time to build a broker instance
396
                self.__endpoint = find_broker_endpoint(self.dir, thing_id=self.thing_id)
397
398
399
400
401
402
403
404
405
406
407
                self.broker = Broker(
                    auth_form="Username/Password",
                    username=" ",
                    password=self.__access_token,
                    host=BaseVariable.BROKER_HOST,
                )

                threading.Thread(
                    target=self.broker.receive,
                    args=(self.__endpoint, self.__on_broker_callback),
                ).start()
Jiahang Chen's avatar
Jiahang Chen committed
408

409
            else:
Jiahang Chen's avatar
Jiahang Chen committed
410
                self.broker.maybe_reconnect(self.access_token)
C. Albrecht's avatar
WIP    
C. Albrecht committed
411
412

    def __on_broker_callback(self, ch, method, properties, body):
413
414
415
416
417
418
419
420
        """Parses body (content of a S3I-B message) and delegates the
        processing of the message to a separate method. The method is
        selected according to the message's type.

        :param body: S3I-B message

        """

Jiahang Chen's avatar
Jiahang Chen committed
421
        if isinstance(body, bytes):
Jiahang Chen's avatar
Jiahang Chen committed
422
423
424
425
426
            body_str = body.decode('utf8').replace("'", '"')
            try:
                body = json.loads(body_str)
            except ValueError:
                pass
427

Jiahang Chen's avatar
Jiahang Chen committed
428
        elif isinstance(body, int):
429
            pass
Jiahang Chen's avatar
Jiahang Chen committed
430
        elif isinstance(body, str):
431
432
            pass

Jiahang Chen's avatar
Jiahang Chen committed
433
434
        if ch is not None:
            ch.basic_ack(method.delivery_tag)
435
436
437
438
439
440
441
442
443
444
445
446
447
448
449
450
        try:
            message_type = body.get("messageType")
            if message_type == "userMessage":
                self.on_user_message(body)
            elif message_type == "serviceRequest":
                self.on_service_request(body)
            elif message_type == "getValueRequest":
                self.on_get_value_request(body)
            elif message_type == "getValueReply":
                self.on_get_value_reply(body)
            elif message_type == "serviceReply":
                self.on_service_reply(body)
            else:
                ### TODO send user message reply back
                pass
        except AttributeError:
Jiahang Chen's avatar
Jiahang Chen committed
451
            pass
Jiahang Chen's avatar
Jiahang Chen committed
452

453
454
455
456
457
458
    def __send_message_to_broker(self, receiver_endpoints, msg):
        try:
            res = self.broker.send(
                receiver_endpoints=receiver_endpoints,
                msg=json.dumps(msg)
            )
Jiahang Chen's avatar
Jiahang Chen committed
459
460
            __log = "[S3I][Broker]: Send a S3I-B message back to the requester"
            APP_LOGGER.info(__log)
461
462
463
            return res

        except s3i.exception.S3IBrokerAMQPError:
Jiahang Chen's avatar
Jiahang Chen committed
464
465
            __log = "[S3I]: Invalid request sender endpoint {}".format(receiver_endpoints)
            APP_LOGGER.critical(__log)
466

Jiahang Chen's avatar
Jiahang Chen committed
467
    def on_user_message(self, msg):
Jiahang Chen's avatar
Jiahang Chen committed
468
        """Handles incoming S³I-B UserMessages.
469

Jiahang Chen's avatar
Jiahang Chen committed
470
        :param msg: S³I-B UserMessages
471
472

        """
Jiahang Chen's avatar
Jiahang Chen committed
473
474
        __log = "[S3I][Broker]: You have received a S3I-B UserMessage"
        APP_LOGGER.info(__log)
C. Albrecht's avatar
WIP    
C. Albrecht committed
475
476

    def on_get_value_request(self, msg):
Jiahang Chen's avatar
Jiahang Chen committed
477
        """Handles incoming GetValueRequest message. Looks up the value specified in msg and
478
479
480
481
482
        sends a GetValueReply message back to the sender.

        :param msg: GetValueRequest

        """
Jiahang Chen's avatar
Jiahang Chen committed
483
484
        __log = "[S3I][Broker]: You have received a S3I-B GetValueRequest"
        APP_LOGGER.info(__log)
485

Jiahang Chen's avatar
Jiahang Chen committed
486
487
488
489
490
491
492
        get_value_reply = GetValueReply()
        request_sender = msg.get("sender")
        request_msg_id = msg.get("identifier")
        request_sender_endpoint = msg.get("replyToEndpoint")
        attribute_path = msg.get("attributePath")
        reply_msg_uuid = "s3i:" + str(uuid.uuid4())
        try:
Jiahang Chen's avatar
Jiahang Chen committed
493
            __log = "[S3I]: Search the given attribute path: {}".format(attribute_path)
Jiahang Chen's avatar
Jiahang Chen committed
494
            APP_LOGGER.info(__log)
Jiahang Chen's avatar
Jiahang Chen committed
495
496
            value = self._uriToData(attribute_path)
        except KeyError:
Jiahang Chen's avatar
Jiahang Chen committed
497
498
499
            value = "Invalid attribute path"
            __log = "[S3I]: " + value
            APP_LOGGER.critical(__log)
Jiahang Chen's avatar
Jiahang Chen committed
500
501
502

        get_value_reply.fillGetValueReply(
            senderUUID=self.thing_id,
Jiahang Chen's avatar
Jiahang Chen committed
503
            receiverUUIDs=[request_sender],
504
            results=value,
Jiahang Chen's avatar
Jiahang Chen committed
505
506
507
            msgUUID=reply_msg_uuid,
            replyingToUUID=request_msg_id,
        )
Jiahang Chen's avatar
Jiahang Chen committed
508

509
        res = self.__send_message_to_broker(
Jiahang Chen's avatar
Jiahang Chen committed
510
            receiver_endpoints=[request_sender_endpoint],
511
            msg=get_value_reply.msg
Jiahang Chen's avatar
Jiahang Chen committed
512
        )
513

514
515
        if self.__is_broker_rest:
            if res.status_code == 201:
Jiahang Chen's avatar
Jiahang Chen committed
516
517
                __log = "[S3I][Broker]: Send S3I-B GetValueReply back to the requester"
                APP_LOGGER.info(__log)
518
            else:
Jiahang Chen's avatar
Jiahang Chen committed
519
520
                __log = "[S3I[Broker]: " + res.text
                APP_LOGGER.info(__log)
Jiahang Chen's avatar
Jiahang Chen committed
521
522

    def _uriToData(self, uri):
523
524
525
526
527
528
529
        """Returns a copy of the value found at uri.

        :param uri: Path to value
        :rtype: Feature

        """

Jiahang Chen's avatar
Jiahang Chen committed
530
        if uri == "":
531
            return self.dt_json
Jiahang Chen's avatar
Jiahang Chen committed
532
533
534
535
        else:
            uri_list = uri.split("/")
            if uri_list[0] == "features":
                try:
536
                    return self.dt_json[uri]
Jiahang Chen's avatar
Jiahang Chen committed
537
                except KeyError:
Jiahang Chen's avatar
Jiahang Chen committed
538
                    return "Invalid attribute path"
Jiahang Chen's avatar
Jiahang Chen committed
539
540

            try:
541
                self._getValue(self.dt_json, uri_list)
Jiahang Chen's avatar
Jiahang Chen committed
542
            except:
Jiahang Chen's avatar
Jiahang Chen committed
543
                return "Invalid attribute path"
Jiahang Chen's avatar
Jiahang Chen committed
544
            if self.__resGetValue.__len__() == 0:
Jiahang Chen's avatar
Jiahang Chen committed
545
                return "Invalid attribute path"
Jiahang Chen's avatar
Jiahang Chen committed
546
547
548
549
550
551
552
553
            response = copy.deepcopy(self.__resGetValue)
            self.__resGetValue.clear()
            if response.__len__() == 1:
                return response[0]
            else:
                return response

    def _getValue(self, source, uri_list):
554
555
556
557
558
559
560
561
562
563
        """Searches for the value specified by uri_list in source and stores
        the result in __resGetValue.

        :param source: Object that is scanned
        :param uri_list: List containing path

        """

        # ??? What if the uri points to a Value object?
        # Shouldn't it be serialized?!
Jiahang Chen's avatar
Jiahang Chen committed
564
565
566
567
568
569
570
        value = source[uri_list[0]]
        if uri_list.__len__() == 1:
            # if is ditto-feature
            if isinstance(value, str):
                try:
                    stringValue_split = value.split(":")
                    if stringValue_split[0] == "ditto-feature":
571
572
573
                        value = self.dt_json["features"][stringValue_split[1]][
                            "properties"
                        ][uri_list[0]]
Jiahang Chen's avatar
Jiahang Chen committed
574
575
576
577
578
                except:
                    pass
            self.__resGetValue.append(value)
            return
        if isinstance(value, dict):
579
            # ??? uri_list.pop(0) better?!
Jiahang Chen's avatar
Jiahang Chen committed
580
581
582
583
584
585
586
587
588
589
590
591
592
            del uri_list[0]
            self._getValue(value, uri_list)
        if isinstance(value, list):
            if isinstance(value[0], (str, int, float, bool, list)):
                return value
            if isinstance(value[0], dict):
                for item in value:
                    if item["class"] == "ml40::Thing":
                        for i in item["roles"]:
                            if self._findValue(i, uri_list[1]):
                                uri_list_1 = copy.deepcopy(uri_list)
                                del uri_list_1[:2]
                                self._getValue(item, uri_list_1)
Jiahang Chen's avatar
Jiahang Chen committed
593
594
595
596
597
598
                        _f = self._findValue({"identifier": item.get("identifier")}, uri_list[1]) or \
                             self._findValue({"name": item.get("name")}, uri_list[1])
                        if _f:
                            uri_list_1 = copy.deepcopy(uri_list)
                            del uri_list_1[:2]
                            self._getValue(item, uri_list_1)
Jiahang Chen's avatar
Jiahang Chen committed
599
600
601
602
603
604
605
606
607
608
609
610
611
612
613
                    else:
                        if self._findValue(item, uri_list[1]):
                            uri_list_1 = copy.deepcopy(uri_list)
                            del uri_list_1[:2]
                            if not uri_list_1:
                                self.__resGetValue.append(item)
                                return
                            else:
                                self._getValue(item, uri_list_1)
        if isinstance(value, (str, int, float, bool)):
            # if is ditto-feature
            if isinstance(value, str):
                try:
                    stringValue_split = value.split(":")
                    if stringValue_split[0] == "ditto-feature":
614
                        value = self.dt_json["features"][stringValue_split[1]][
Jiahang Chen's avatar
Jiahang Chen committed
615
616
617
618
619
620
621
                            "properties"
                        ][uri_list[0]]
                except:
                    pass
            self.__resGetValue.append(value)

    def _findValue(self, json, value):
622
623
624
625
626
627
628
629
630
631
        """Returns true if value has been found in json, otherwise returns false.

        :param json: dictionary
        :param value:
        :returns:
        :rtype:

        """

        # TODO: Simplify: value in json.values()
Jiahang Chen's avatar
Jiahang Chen committed
632
633
634
635
636
        for item in json:
            if json[item] == value:
                # print("Parameter: ", json[item])
                return True
        return False
C. Albrecht's avatar
WIP    
C. Albrecht committed
637
638

    def on_service_request(self, body_json):
Jiahang Chen's avatar
Jiahang Chen committed
639
640
        """Handles S³I-B ServiceRequests. Executes the method of the
        functionality specified in serviceType and send a ServiceReply
641
642
643
644
645
        back to the sender.

        :param body_json: ServiceRequest

        """
Jiahang Chen's avatar
Jiahang Chen committed
646
        __log = "[S3I][Broker]: You have received a S3I-B ServiceRequest"
C. Albrecht's avatar
WIP    
C. Albrecht committed
647
        service_type = body_json.get("serviceType")
648
        parameters = body_json.get("parameters")
649
        service_reply = ServiceReply()
Jiahang Chen's avatar
Jiahang Chen committed
650
        service_functionality = service_type.split('/')[0]
Jiahang Chen's avatar
Jiahang Chen committed
651
        service_functionality_obj = self.features.get(service_functionality)
Jiahang Chen's avatar
Jiahang Chen committed
652
        if service_functionality_obj is None:
C. Albrecht's avatar
WIP    
C. Albrecht committed
653
            APP_LOGGER.critical(
Jiahang Chen's avatar
Jiahang Chen committed
654
                "[S3I]: Functionality %s is not one of the built-in functionalities in %s!"
655
                % (service_functionality, self.name)
C. Albrecht's avatar
WIP    
C. Albrecht committed
656
            )
GromeTT's avatar
GromeTT committed
657
658
            service_reply.fillServiceReply(
                senderUUID=self.thing_id,
Jiahang Chen's avatar
Jiahang Chen committed
659
                receiverUUIDs=[body_json.get("sender", None)],
GromeTT's avatar
GromeTT committed
660
661
662
663
664
                serviceType=body_json.get("serviceType", None),
                results={"error": "invalid functionalities (serviceType) {}".format(service_functionality)},
                replyingToUUID=body_json.get("identifier", None),
                msgUUID="s3i:{}".format(uuid.uuid4())
            )
665
666
667
668
669
670
        else:
            # TODO: Call right functionality.
            try:
                method = getattr(service_functionality_obj, service_type.split('/')[1])
            except AttributeError:
                APP_LOGGER.critical(
Jiahang Chen's avatar
Jiahang Chen committed
671
                    "[S3I]: Method %s is not one of the built-in functionalities in %s!" % (
672
673
674
675
                        service_type.split('/')[1], self.name)
                )
                service_reply.fillServiceReply(
                    senderUUID=self.thing_id,
Jiahang Chen's avatar
Jiahang Chen committed
676
                    receiverUUIDs=[body_json.get("sender", None)],
677
                    serviceType=body_json.get("serviceType", None),
678
679
680
681
682
683
                    results={"error": "invalid method {}".format(service_type.split('/')[1])},
                    replyingToUUID=body_json.get("identifier", None),
                    msgUUID="s3i:{}".format(uuid.uuid4())
                )
            except IndexError:
                APP_LOGGER.critical(
Jiahang Chen's avatar
Jiahang Chen committed
684
                    "[S3I]: ServiceType consists of functionality and method name."
685
686
687
688
689
690
                )
                service_reply.fillServiceReply(
                    senderUUID=self.thing_id,
                    receiverUUIDs=[body_json.get("sender", None)],
                    serviceType=body_json.get("serviceType", None),
                    results={"error": "method missing"},
691
692
693
694
                    replyingToUUID=body_json.get("identifier", None),
                    msgUUID="s3i:{}".format(uuid.uuid4())
                )
            else:
Jiahang Chen's avatar
Jiahang Chen committed
695
696
697
                __log = "[S3I][Broker]: Execute the function {0} of the class {1}".format(service_type.split('/')[1],
                                                                                          service_type.split('/')[0])
                APP_LOGGER.info(__log)
698
699
700
                try:
                    result = method(**parameters)
                except TypeError:
Jiahang Chen's avatar
Jiahang Chen committed
701
                    APP_LOGGER.critical("[S3I]: Invalid function arguments")
702
703
                    service_reply.fillServiceReply(
                        senderUUID=self.thing_id,
Jiahang Chen's avatar
Jiahang Chen committed
704
                        receiverUUIDs=[body_json.get("sender", None)],
705
706
707
708
                        serviceType=body_json.get("serviceType", None),
                        results={"error": "invalid function arguments (parameters)"},
                        replyingToUUID=body_json.get("identifier", None),
                        msgUUID="s3i:{}".format(uuid.uuid4())
709
710
                    )
                else:
711
712
                    if isinstance(result, bool):
                        result = {"ok": result}
713
                    elif result is None:
714
                        return
715
716
                    service_reply.fillServiceReply(
                        senderUUID=self.thing_id,
Jiahang Chen's avatar
Jiahang Chen committed
717
                        receiverUUIDs=[body_json.get("sender", None)],
718
719
720
721
722
723
                        serviceType=body_json.get("serviceType", None),
                        results=result,
                        replyingToUUID=body_json.get("identifier", None),
                        msgUUID="s3i:{}".format(uuid.uuid4())
                    )

724
725
726
727
728
        res = self.__send_message_to_broker(
            receiver_endpoints=[body_json.get("replyToEndpoint", None)],
            msg=service_reply.msg
        )

729
730
        if self.__is_broker_rest:
            if res.status_code == 201:
Jiahang Chen's avatar
Jiahang Chen committed
731
732
                __log = "[S3I][Broker]: Send a S3I-B ServiceReply back to the requester"
                APP_LOGGER.info(__log)
733
            else:
Jiahang Chen's avatar
Jiahang Chen committed
734
                APP_LOGGER.critical(__log)
Jiahang Chen's avatar
Jiahang Chen committed
735
736

    def on_get_value_reply(self, msg):
Jiahang Chen's avatar
Jiahang Chen committed
737
        """Handles incoming S³I-B GetValueReply. Prints the content of msg to stdout.
738
739
740
741
742
743
744

        :param msg: GetValueReply

        """

        # ???: Behavior should be defined by the user! Maybe he want
        # to process the result!
Jiahang Chen's avatar
Jiahang Chen committed
745
746
        __log = "[S3I][Broker]: You have received a S3I-B GetValueReply"
        APP_LOGGER.info(__log)
Jiahang Chen's avatar
Jiahang Chen committed
747
748
749
        value = msg.get("value", None)
        if isinstance(value, dict):
            value = json.dumps(value, indent=2)
Jiahang Chen's avatar
Jiahang Chen committed
750
751
        __log = "[S3I][Broker]: The queried value is: {0}".format(value)
        APP_LOGGER.info(__log)
Jiahang Chen's avatar
Jiahang Chen committed
752

Jiahang Chen's avatar
Jiahang Chen committed
753
    def on_service_reply(self, msg):
Jiahang Chen's avatar
Jiahang Chen committed
754
        """Handles incoming S³I-B ServiceReply. Prints the content of msg to stdout.
755
756
757
758

        :param msg: ServiceReply

        """
Jiahang Chen's avatar
Jiahang Chen committed
759
760
        __log = "[S3I][Broker]: You have received a S3I-B ServiceReply"
        APP_LOGGER.info(__log)
Jiahang Chen's avatar
Jiahang Chen committed
761
762
763
764
        results = msg.get("results", None)
        if isinstance(results, dict):
            results = json.dumps(results, indent=2)

Jiahang Chen's avatar
Jiahang Chen committed
765
766
        __log = "[S3I][Broker]: The result is: {0}".format(results)
        APP_LOGGER.info(__log)
Jiahang Chen's avatar
Jiahang Chen committed
767
768

    def to_dir_json(self):
769
770
771
772
773
774
775
        """Returns a dictionary representing this thing's directory entry.

        :returns: Directory representation of this object
        :rtype: dict

        """

Jiahang Chen's avatar
Jiahang Chen committed
776
777
778
779
780
        self.dir_json = self.dir.queryThingIDBased(self.thing_id)
        if self.thing_id is not None:
            self.dir_json["thingId"] = self.thing_id
        if self.policy_id is not None:
            self.dir_json["policyId"] = self.policy_id
Jiahang Chen's avatar
Jiahang Chen committed
781
782
783
784
785
        if self.name is not None:
            self.dir_json["attributes"]["name"] = self.name
        if self.features.get("ml40::Location") is not None:
            self.dir_json["attributes"]["location"] = {
                "longitude": self.features.get("ml40::Location").to_json()["longitude"],
GromeTT's avatar
GromeTT committed
786
                "latitude": self.features.get("ml40::Location").to_json()["latitude"]
Jiahang Chen's avatar
Jiahang Chen committed
787
788
789
790
            }
        self.dir_json["attributes"]["dataModel"] = "fml40"
        self.dir_json["attributes"]["thingStructure"] = {
            "class": "ml40::Thing",
GromeTT's avatar
GromeTT committed
791
            "links": []
Jiahang Chen's avatar
Jiahang Chen committed
792
        }
Jiahang Chen's avatar
Jiahang Chen committed
793
        for key in self.roles.keys():
GromeTT's avatar
GromeTT committed
794
795
796
797
            role_entry = {
                "association": "roles",
                "target": self.roles[key].to_json()
            }
Jiahang Chen's avatar
Jiahang Chen committed
798
            self.dir_json["attributes"]["thingStructure"]["links"].append(role_entry)
Jiahang Chen's avatar
Jiahang Chen committed
799

Jiahang Chen's avatar
Jiahang Chen committed
800
        for key in self.features.keys():
Jiahang Chen's avatar
Jiahang Chen committed
801
802
            feature_target = {
                "class": self.features[key].to_json()["class"],
Jiahang Chen's avatar
Jiahang Chen committed
803
            }
Jiahang Chen's avatar
Jiahang Chen committed
804
805
806
            if self.features[key].to_json().get("identifier") is not None:
                feature_target["identifier"] = self.features[key].to_json()["identifier"]

807
            feature_entry = {"association": "features", "target": feature_target}
Jiahang Chen's avatar
Jiahang Chen committed
808
            # if the feature has targets, like ml40::Composite
809
            if hasattr(self.features[key], "targets"):
Jiahang Chen's avatar
Jiahang Chen committed
810
811
                feature_entry["target"]["links"] = list()
                for target in self.features[key].targets.keys():
812
813
814
                    target_json = (
                        self.features[key].targets[target].to_subthing_dir_json()
                    )
Jiahang Chen's avatar
Jiahang Chen committed
815
816
                    feature_entry["target"]["links"].append(target_json)
            self.dir_json["attributes"]["thingStructure"]["links"].append(feature_entry)
Jiahang Chen's avatar
Jiahang Chen committed
817
818
        return self.dir_json

819
    def to_repo_json(self):
820
821
822
823
824
825
826
        """Returns a dictionary representing this thing's repository entry.

        :returns: Repository representation of this object
        :rtype: dict

        """

827
        self.repo_json = self.dt_json
Jiahang Chen's avatar
Jiahang Chen committed
828
829
        return self.repo_json

830
    def to_json(self):
831
832
833
834
835
836
837
        """Returns a dictionary representing this thing in it's current state.

        :returns: Representation of this object
        :rtype: dict

        """

838
839
840
841
842
843
        self.dt_json = {
            "thingId": self.thing_id,
            "policyId": self.policy_id,
            "attributes": {
                "class": "ml40::Thing",
                "name": self.name,
844
            },
845
846
847
848
849
        }
        if self.roles:
            self.dt_json["attributes"]["roles"] = list()
        if self.features:
            self.dt_json["attributes"]["features"] = list()
Jiahang Chen's avatar
Jiahang Chen committed
850
        if self.ditto_features:
Jiahang Chen's avatar
Jiahang Chen committed
851
            self.dt_json["features"] = dict()
852
853
        for key in self.roles.keys():
            self.dt_json["attributes"]["roles"].append(self.roles[key].to_json())
Jiahang Chen's avatar
Jiahang Chen committed
854
        for key in self.features.keys():
855
            self.dt_json["attributes"]["features"].append(self.features[key].to_json())
Jiahang Chen's avatar
Jiahang Chen committed
856
857
        for key in self.ditto_features.keys():
            self.dt_json["features"][key] = self.ditto_features[key].to_json()
858
859
        return self.dt_json

Jiahang Chen's avatar
Jiahang Chen committed
860
    def to_subthing_json(self):
861
862
863
        """Returns a dictionary representing this thing in it's current state
        as a subordinate thing. This representation should be used for
        subordinate things in s3i repository entries.
864

865
866
        :returns: Representation of this object as a subordinate thing
        :rtype: dict
867
868
869

        """

Jiahang Chen's avatar
Jiahang Chen committed
870
871
872
873
        json_out = {
            "class": "ml40::Thing",
            "name": self.name,
            "roles": [],
874
            "features": [],
Jiahang Chen's avatar
Jiahang Chen committed
875
        }
Jiahang Chen's avatar
Jiahang Chen committed
876
877
        if self.thing_id:
            json_out["identifier"] = self.thing_id
Jiahang Chen's avatar
Jiahang Chen committed
878
        else:
Jiahang Chen's avatar
Jiahang Chen committed
879
880
            if self.model["attributes"].get("identifier") is not None:
                json_out["identifier"] = self.model["attributes"]["identifier"]
Jiahang Chen's avatar
Jiahang Chen committed
881
882
883
884
885
        for key in self.roles.keys():
            json_out["roles"].append(self.roles[key].to_json())
        for key in self.features.keys():
            json_out["features"].append(self.features[key].to_json())
        return json_out
Jiahang Chen's avatar
Jiahang Chen committed
886
887

    def to_subthing_dir_json(self):
888
889
890
891
892
893
894
895
896
        """Returns a dictionary representing this thing in it's current state
        as a subordinate thing. This representation should be used for
        subordinate things in s3i directory entries.

        :returns: Representation of this object as a subordinate thing.
        :rtype: dict

        """

Jiahang Chen's avatar
Jiahang Chen committed
897
898
899
        json_out = {"class": "ml40::Thing", "links": []}
        if self.thing_id:
            json_out["identifier"] = self.thing_id
Jiahang Chen's avatar
Jiahang Chen committed
900
        for key in self.roles.keys():
901
            role_entry = {"association": "roles", "target": self.roles[key].to_json()}
Jiahang Chen's avatar
Jiahang Chen committed
902
903
            json_out["links"].append(role_entry)
        for key in self.features.keys():
Jiahang Chen's avatar
Jiahang Chen committed
904
905
            feature_target = {
                "class": self.features[key].to_json()["class"],
Jiahang Chen's avatar
Jiahang Chen committed
906
            }
Jiahang Chen's avatar
Jiahang Chen committed
907
908
            if self.features[key].to_json().get("identifier") is not None:
                feature_target["identifier"] = self.features[key].to_json()["identifier"]
909
            feature_entry = {"association": "features", "target": feature_target}
Jiahang Chen's avatar
Jiahang Chen committed
910
911
            json_out["links"].append(feature_entry)
        return json_out