thing.py 36.1 KB
Newer Older
1
2
3
"""This module implements the thing class which is the core element of
this package."""

C. Albrecht's avatar
WIP    
C. Albrecht committed
4
5
6
import threading
import json
import uuid
7
8
import time
import copy
9
from s3i import IdentityProvider, TokenType, Directory
10
from s3i import Repository
11
import s3i.exception
Jiahang Chen's avatar
Jiahang Chen committed
12
from s3i.broker import Broker, BrokerREST
13
from s3i.messages import ServiceReply, GetValueReply, SetValueReply
Jiahang Chen's avatar
Jiahang Chen committed
14
from ml.tools import find_broker_endpoint
C. Albrecht's avatar
WIP    
C. Albrecht committed
15
from ml.app_logger import APP_LOGGER
Jiahang Chen's avatar
Jiahang Chen committed
16
17


18
19
20
class BaseVariable:
    """The BaseVariable class holds various urls to s3i services."""

C. Albrecht's avatar
WIP    
C. Albrecht committed
21
22
23
24
25
26
27
28
    IDP_URL = "https://idp.s3i.vswf.dev/"
    IDP_REALM = "KWH"
    BROKER_HOST = "rabbitmq.s3i.vswf.dev"
    REPO_WWS_URL = "wss://ditto.s3i.vswf.dev/ws/2"
    REPO_URL = "https://ditto.s3i.vswf.dev/api/2/"
    DIR_URL = "https://dir.s3i.vswf.dev/api/2/"


Jiahang Chen's avatar
Jiahang Chen committed
29
class Thing:
30
    """The thing class represents a customizable runtime environment for
Jiahang Chen's avatar
Jiahang Chen committed
31
    operating Digital Twins complying the Forest Modeling Language (fml40)."""
32

C. Albrecht's avatar
WIP    
C. Albrecht committed
33
    def __init__(
Jiahang Chen's avatar
Jiahang Chen committed
34
35
36
37
38
39
40
41
42
            self,
            model: dict,
            client_secret="",
            grant_type="password",
            is_broker=False,
            is_repo=False,
            is_broker_rest=True,
            username=None,
            password=None,
C. Albrecht's avatar
WIP    
C. Albrecht committed
43
    ):
Jiahang Chen's avatar
Jiahang Chen committed
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
        """
        Constructor

        :param model: edge-device or S³I Repository specified JSON entry, like config file for Digital Twins
        :type model: dict
        :param client_secret: OAuth 2.0 specified client secret, generated in the S³I IdentityProvider
        :type client_secret: str
        :param grant_type: OAuth 2.0 specified grant type to issue a JWT. Here the grant type can be password or client_credentials
        :type grant_type: str
        :param is_broker: whether broker interface is enabled in the ml40::thing instance
        :type is_broker: bool
        :param is_broker_rest: Whether the connection with the S³I Broker is established via HTTP REST
        :type is_broker_rest: bool
        :param is_repo: Whether the thing uses the S³I Repository to launch its Digital Twin in the cloud
        :type is_repo: bool
        :param username: OAuth 2.0 specified username, registered in the S³I IdentityProvider. If the grant_type is set as password, the username is required
        :type username: str
        :param password: OAuth 2.0 specified password, registered in the S³I IdentityProvider. If the grant_type is set as password, the password is required

        """
64

Jiahang Chen's avatar
Jiahang Chen committed
65
        self.__model = model
C. Albrecht's avatar
WIP    
C. Albrecht committed
66
67
68
69
70
71
        self.__thing_id = model.get("thingId", "")
        self.__policy_id = model.get("policyId", "")
        self.__grant_type = grant_type
        self.__username = username
        self.__password = password
        self.__client_secret = client_secret
Jiahang Chen's avatar
Jiahang Chen committed
72

C. Albrecht's avatar
WIP    
C. Albrecht committed
73
        self.__is_broker = is_broker
Jiahang Chen's avatar
Jiahang Chen committed
74
        self.__is_broker_rest = is_broker_rest
C. Albrecht's avatar
WIP    
C. Albrecht committed
75
        self.__is_repo = is_repo
Jiahang Chen's avatar
Jiahang Chen committed
76

C. Albrecht's avatar
WIP    
C. Albrecht committed
77
        self.__access_token = ""
Jiahang Chen's avatar
Jiahang Chen committed
78
79
        self.__endpoint = ""

C. Albrecht's avatar
WIP    
C. Albrecht committed
80
        self.__ws_connected = False
Jiahang Chen's avatar
Jiahang Chen committed
81
82
        self.broker = None
        self.ws = None
83
84
        # TODO: Change the variable name dir, because it is a builtin
        # name.
Jiahang Chen's avatar
Jiahang Chen committed
85
        self.dir = None
Jiahang Chen's avatar
Jiahang Chen committed
86
87
        self.repo = None

88
        # : Dict[<str>, <str>]
Jiahang Chen's avatar
Jiahang Chen committed
89
90
        self.repo_json = dict()
        self.dir_json = dict()
91
        self.dt_json = dict()
C. Albrecht's avatar
WIP    
C. Albrecht committed
92
93
94

        attributes = model.get("attributes", None)
        self.__name = ""
Jiahang Chen's avatar
Jiahang Chen committed
95
        self.__roles = {}
Jiahang Chen's avatar
Jiahang Chen committed
96
        self.__features = {}
Jiahang Chen's avatar
Jiahang Chen committed
97
        self.__ditto_features = {}
98
        # ??? Is this property necessary? Only used as return value in _getValue()
Jiahang Chen's avatar
Jiahang Chen committed
99
        self.__resGetValue = list()
100
        self.__source_obj = None
C. Albrecht's avatar
WIP    
C. Albrecht committed
101
102
        if attributes:
            self.__name = attributes.get("name", "")
Jiahang Chen's avatar
Jiahang Chen committed
103
104
105

    @property
    def model(self):
Jiahang Chen's avatar
Jiahang Chen committed
106
        """Returns the specification JSON from which this thing has been constructed from.
107
108
109
110
111
112

        :returns: Representation of a ml40 compliant thing.
        :rtype: dict

        """

Jiahang Chen's avatar
Jiahang Chen committed
113
        return self.__model
Jiahang Chen's avatar
Jiahang Chen committed
114

Jiahang Chen's avatar
Jiahang Chen committed
115
116
117
118
119
120
121
122
    @property
    def ditto_features(self):
        return self.__ditto_features

    @ditto_features.setter
    def ditto_features(self, value):
        self.__ditto_features = value

Jiahang Chen's avatar
Jiahang Chen committed
123
124
    @property
    def features(self):
Jiahang Chen's avatar
Jiahang Chen committed
125
        """Returns thing's features.
126

127
128
        :returns: Features
        :rtype: dict
129
130
131

        """

Jiahang Chen's avatar
Jiahang Chen committed
132
133
134
135
        return self.__features

    @features.setter
    def features(self, value):
Jiahang Chen's avatar
Jiahang Chen committed
136
        """Replaces thing's features with value.
137
138
139
140
141

        :param value: New collection of features

        """

Jiahang Chen's avatar
Jiahang Chen committed
142
143
        self.__features = value

Jiahang Chen's avatar
Jiahang Chen committed
144
145
    @property
    def roles(self):
146
        """Returns the thing's roles.
147
148
149

        :returns: ml40 roles
        :rtype: dict
150

151
152
        """

Jiahang Chen's avatar
Jiahang Chen committed
153
154
155
156
        return self.__roles

    @roles.setter
    def roles(self, value):
Jiahang Chen's avatar
Jiahang Chen committed
157
        """Replaces thing's roles with value
158

159
        :param value: New collection of roles
160
161
        """

Jiahang Chen's avatar
Jiahang Chen committed
162
163
        self.__roles = value

C. Albrecht's avatar
WIP    
C. Albrecht committed
164
165
    @property
    def client_secret(self):
166
167
168
169
        """Returns the client secret.

        :returns: Client secret
        :rtype: str
170

171
172
        """

C. Albrecht's avatar
WIP    
C. Albrecht committed
173
174
175
176
        return self.__client_secret

    @property
    def grant_type(self):
Jiahang Chen's avatar
Jiahang Chen committed
177
        """Returns the method used to obtain JSON Web Tokens from the S³I IdentityProvider
178

Jiahang Chen's avatar
Jiahang Chen committed
179
        :returns: OAuth2 specified grant type [password, client_credentials]
180
        :rtype: str
181

182
183
        """

C. Albrecht's avatar
WIP    
C. Albrecht committed
184
185
186
187
        return self.__grant_type

    @property
    def access_token(self):
Jiahang Chen's avatar
Jiahang Chen committed
188
        """Returns the current JSON Web token.
189

Jiahang Chen's avatar
Jiahang Chen committed
190
        :returns: JSON Web token
191
        :rtype: str
192

193
        """
C. Albrecht's avatar
WIP    
C. Albrecht committed
194
195
196
197
        return self.__access_token

    @property
    def name(self):
198
199
200
201
        """Returns the name of this thing.

        :returns: name
        :rtype: str
202

203
204
        """

C. Albrecht's avatar
WIP    
C. Albrecht committed
205
206
207
208
        return self.__name

    @property
    def thing_id(self):
209
210
211
212
        """Returns the identifier of this thing.

        :returns: identifier
        :rtype: str
213

214
215
        """

C. Albrecht's avatar
WIP    
C. Albrecht committed
216
217
        return self.__thing_id

Jiahang Chen's avatar
Jiahang Chen committed
218
219
    @property
    def policy_id(self):
220
221
222
223
224
225
226
        """Returns the identifier of this thing's policy.

        :returns: identifier
        :rtype: str

        """

Jiahang Chen's avatar
Jiahang Chen committed
227
228
        return self.__policy_id

C. Albrecht's avatar
WIP    
C. Albrecht committed
229
    def run_forever(self):
230
        """Starts the thing in permanent mode.
231

232
        """
Jiahang Chen's avatar
Jiahang Chen committed
233
234
        __log = "[S3I]: Launch {}".format(self.name)
        APP_LOGGER.info(__log)
C. Albrecht's avatar
WIP    
C. Albrecht committed
235
        self.__connect_with_idp()
Jiahang Chen's avatar
Jiahang Chen committed
236
        self.__dir_syn()
237
        threading.Thread(target=self.__json_syn).start()
Jiahang Chen's avatar
Jiahang Chen committed
238
239
240
241
242
        if self.__is_repo:
            threading.Thread(target=self.__repo_syn).start()

    @staticmethod
    def add_user_def(func):
Jiahang Chen's avatar
Jiahang Chen committed
243
        """Insert user-specified function in the thing object.
244

Jiahang Chen's avatar
Jiahang Chen committed
245
        :param func: external defined function to be executed.
246
247
248

        """

Jiahang Chen's avatar
Jiahang Chen committed
249
250
        threading.Thread(target=func).start()

251
    def __json_syn(self, freq=0.1):
Jiahang Chen's avatar
Jiahang Chen committed
252
253
254
255
256
257
        """
        Applies local changes to the original model in the thing object

        :param freq: Frequency of the update
        :type freq: float
        """
Jiahang Chen's avatar
Jiahang Chen committed
258
        while True:
259
            try:
260
261
262
263
264
                time.sleep(freq)
                self.to_json()
            except:
                continue

265
266
    def __dir_syn(self):
        """Applies local changes to the directory entry in the cloud only once.
267
268

        """
269
270
271
272
273
        self.to_dir_json()
        if self.dir_json is not None:
            self.dir.updateThingIDBased(
                thingID=self.thing_id, payload=self.dir_json
            )
Jiahang Chen's avatar
Jiahang Chen committed
274

275
    def __repo_syn(self, freq=0.1):
276
        """Applies local changes to the repository entry in the cloud.
277
278

        :param freq: Frequency of the update.
Jiahang Chen's avatar
Jiahang Chen committed
279
        :type freq: float
280
        """
Jiahang Chen's avatar
Jiahang Chen committed
281
        while self.__is_repo:
282
            try:
283
                time.sleep(freq)
284
285
                old_repo_json = self.repo_json
                self.to_repo_json()
286
                # TODO: Clean up this reqion
287
288
289
                if self.repo_json == old_repo_json:
                    continue
                else:
290
291
292
                    self.repo.updateThingIDBased(
                        thingID=self.thing_id, payload=self.repo_json
                    )
293
            except:
Jiahang Chen's avatar
Jiahang Chen committed
294
295
                continue

C. Albrecht's avatar
WIP    
C. Albrecht committed
296
    def __connect_with_idp(self):
Jiahang Chen's avatar
Jiahang Chen committed
297
298
        """Establishes a connection to the S³I IdentityProvider which guarantees,
        that the JSON web token needed to use s3i services.
299
        be renewed if it has expired.
300
301

        """
Jiahang Chen's avatar
Jiahang Chen committed
302
303
        __log = "[S3I][IdP]: Connect with S3I IdentityProvider"
        APP_LOGGER.info(__log)
C. Albrecht's avatar
WIP    
C. Albrecht committed
304
305
306
307
308
309
310
311
312
313
314
315
316
317
        idp = IdentityProvider(
            grant_type=self.__grant_type,
            client_id=self.__thing_id,
            username=self.__username,
            password=self.__password,
            client_secret=self.__client_secret,
            realm=BaseVariable.IDP_REALM,
            identity_provider_url=BaseVariable.IDP_URL,
        )

        # This may take a while so fetch token directly.
        idp.run_forever(token_type=TokenType.ACCESS_TOKEN, on_new_token=self.__on_token)

    def __on_token(self, token):
Jiahang Chen's avatar
Jiahang Chen committed
318
319
        """Updates the JSON Web Token with token and reestablishes connections
        to the s3i services .
320

Jiahang Chen's avatar
Jiahang Chen committed
321
322
        :param token: New JSON Web token
        :type token: str
323
324

        """
Jiahang Chen's avatar
Jiahang Chen committed
325
        APP_LOGGER.info("get new token")
C. Albrecht's avatar
WIP    
C. Albrecht committed
326
        self.__access_token = token
Jiahang Chen's avatar
Jiahang Chen committed
327
        self.__connect_with_dir()
Jiahang Chen's avatar
Jiahang Chen committed
328
        self.__connect_with_repo()
C. Albrecht's avatar
WIP    
C. Albrecht committed
329
330
        if self.__is_broker:
            self.__connect_with_broker()
Jiahang Chen's avatar
Jiahang Chen committed
331

Jiahang Chen's avatar
Jiahang Chen committed
332
    def __connect_with_dir(self):
333
        """Initializes the property dir with a Directory object which can be
Jiahang Chen's avatar
Jiahang Chen committed
334
        used to access the s3i Directory.
335
336
337
338
339
340

        :returns:
        :rtype:

        """

Jiahang Chen's avatar
Jiahang Chen committed
341
342
        __log = "[S3I][Dir]: Connect with S3I Directory"
        APP_LOGGER.info(__log)
343
344
345
        self.dir = Directory(
            s3i_dir_url=BaseVariable.DIR_URL, token=self.__access_token
        )
Jiahang Chen's avatar
Jiahang Chen committed
346

C. Albrecht's avatar
WIP    
C. Albrecht committed
347
    def __connect_with_repo(self):
348
        """Initializes the property repo whit a Repository object which can be
Jiahang Chen's avatar
Jiahang Chen committed
349
        used to access the s3i Repository.
350
351
352
353
354
355

        :returns:
        :rtype:

        """

Jiahang Chen's avatar
Jiahang Chen committed
356
357
        __log = "[S3I][Repo]: Connect with S3I Repository"
        APP_LOGGER.info(__log)
358
359
360
        self.repo = Repository(
            s3i_repo_url=BaseVariable.REPO_URL, token=self.__access_token
        )
Jiahang Chen's avatar
Jiahang Chen committed
361

C. Albrecht's avatar
WIP    
C. Albrecht committed
362
    def __connect_with_broker(self):
363
        """Initializes the property broker with a Broker object. Additionally
Jiahang Chen's avatar
Jiahang Chen committed
364
        a callback function is registered which handles incoming S³I-B Messages
365
366
367
368
        messages.

        """

Jiahang Chen's avatar
Jiahang Chen committed
369
370
        __log = "[S3I][Broker]: Connect with S3I Broker"
        APP_LOGGER.info(__log)
Jiahang Chen's avatar
Jiahang Chen committed
371
372
        if self.__is_broker_rest:
            self.broker = BrokerREST(token=self.access_token)
Jiahang Chen's avatar
Jiahang Chen committed
373

Jiahang Chen's avatar
Jiahang Chen committed
374
            def receive():
Jiahang Chen's avatar
Jiahang Chen committed
375
                self.__endpoint = find_broker_endpoint(self.dir, thing_id=self.thing_id)
Jiahang Chen's avatar
Jiahang Chen committed
376
                while True:
377
378
                    try:
                        time.sleep(0.1)
Jiahang Chen's avatar
Jiahang Chen committed
379
380
381
                        msg = self.broker.receive_once(self.__endpoint)
                        msg_to_json = lambda msg: msg if isinstance(msg, dict) else json.loads(msg)
                        if msg == {}:
382
383
                            continue
                        else:
384
385
386
387
                            self.__on_broker_callback(
                                ch=None,
                                method=None,
                                properties=None,
Jiahang Chen's avatar
Jiahang Chen committed
388
                                body=msg_to_json(msg),
389
                            )
390
                    except:
Jiahang Chen's avatar
Jiahang Chen committed
391
                        continue
392

393
            threading.Thread(target=receive).start()
Jiahang Chen's avatar
Jiahang Chen committed
394
395

        else:
396
397
            if self.broker is None:
                # first time to build a broker instance
398
                self.__endpoint = find_broker_endpoint(self.dir, thing_id=self.thing_id)
399
400
401
402
403
404
405
406
407
408
409
                self.broker = Broker(
                    auth_form="Username/Password",
                    username=" ",
                    password=self.__access_token,
                    host=BaseVariable.BROKER_HOST,
                )

                threading.Thread(
                    target=self.broker.receive,
                    args=(self.__endpoint, self.__on_broker_callback),
                ).start()
Jiahang Chen's avatar
Jiahang Chen committed
410

411
            else:
Jiahang Chen's avatar
Jiahang Chen committed
412
                self.broker.maybe_reconnect(self.access_token)
C. Albrecht's avatar
WIP    
C. Albrecht committed
413
414

    def __on_broker_callback(self, ch, method, properties, body):
415
416
417
418
419
420
421
422
        """Parses body (content of a S3I-B message) and delegates the
        processing of the message to a separate method. The method is
        selected according to the message's type.

        :param body: S3I-B message

        """

Jiahang Chen's avatar
Jiahang Chen committed
423
        if isinstance(body, bytes):
Jiahang Chen's avatar
Jiahang Chen committed
424
            try:
425
                body = json.loads(body)
Jiahang Chen's avatar
Jiahang Chen committed
426
427
            except ValueError:
                pass
428

Jiahang Chen's avatar
Jiahang Chen committed
429
        elif isinstance(body, int):
430
            pass
Jiahang Chen's avatar
Jiahang Chen committed
431
        elif isinstance(body, str):
432
433
            pass

Jiahang Chen's avatar
Jiahang Chen committed
434
435
        if ch is not None:
            ch.basic_ack(method.delivery_tag)
436
437
438
439
440
441
442
443
444
445
446
447
        try:
            message_type = body.get("messageType")
            if message_type == "userMessage":
                self.on_user_message(body)
            elif message_type == "serviceRequest":
                self.on_service_request(body)
            elif message_type == "getValueRequest":
                self.on_get_value_request(body)
            elif message_type == "getValueReply":
                self.on_get_value_reply(body)
            elif message_type == "serviceReply":
                self.on_service_reply(body)
448
449
450
451
            elif message_type == "setValueRequest":
                self.on_set_value_request(body)
            elif message_type == "setValueReply":
                self.on_set_value_reply(body)
452
453
454
455
            else:
                ### TODO send user message reply back
                pass
        except AttributeError:
Jiahang Chen's avatar
Jiahang Chen committed
456
            pass
Jiahang Chen's avatar
Jiahang Chen committed
457

458
459
460
461
462
463
    def __send_message_to_broker(self, receiver_endpoints, msg):
        try:
            res = self.broker.send(
                receiver_endpoints=receiver_endpoints,
                msg=json.dumps(msg)
            )
Jiahang Chen's avatar
Jiahang Chen committed
464
465
            __log = "[S3I][Broker]: Send a S3I-B message back to the requester"
            APP_LOGGER.info(__log)
466
467
            return res

Jiahang Chen's avatar
Jiahang Chen committed
468
469
        except s3i.exception.S3IBrokerAMQPError as e:
            __log = "[S3I]: {}".format(e.error_msg)
Jiahang Chen's avatar
Jiahang Chen committed
470
            APP_LOGGER.critical(__log)
471

Jiahang Chen's avatar
Jiahang Chen committed
472
    def on_user_message(self, msg):
Jiahang Chen's avatar
Jiahang Chen committed
473
        """Handles incoming S³I-B UserMessages.
474

Jiahang Chen's avatar
Jiahang Chen committed
475
        :param msg: S³I-B UserMessages
476
477

        """
Jiahang Chen's avatar
Jiahang Chen committed
478
479
        __log = "[S3I][Broker]: You have received a S3I-B UserMessage"
        APP_LOGGER.info(__log)
C. Albrecht's avatar
WIP    
C. Albrecht committed
480
481

    def on_get_value_request(self, msg):
Jiahang Chen's avatar
Jiahang Chen committed
482
        """Handles incoming GetValueRequest message. Looks up the value specified in msg and
483
484
485
486
487
        sends a GetValueReply message back to the sender.

        :param msg: GetValueRequest

        """
Jiahang Chen's avatar
Jiahang Chen committed
488
489
        __log = "[S3I][Broker]: You have received a S3I-B GetValueRequest"
        APP_LOGGER.info(__log)
490

Jiahang Chen's avatar
Jiahang Chen committed
491
492
493
494
495
496
497
        get_value_reply = GetValueReply()
        request_sender = msg.get("sender")
        request_msg_id = msg.get("identifier")
        request_sender_endpoint = msg.get("replyToEndpoint")
        attribute_path = msg.get("attributePath")
        reply_msg_uuid = "s3i:" + str(uuid.uuid4())
        try:
Jiahang Chen's avatar
Jiahang Chen committed
498
            __log = "[S3I]: Search the given attribute path: {}".format(attribute_path)
Jiahang Chen's avatar
Jiahang Chen committed
499
            APP_LOGGER.info(__log)
Jiahang Chen's avatar
Jiahang Chen committed
500
501
            value = self._uriToData(attribute_path)
        except KeyError:
Jiahang Chen's avatar
Jiahang Chen committed
502
503
504
            value = "Invalid attribute path"
            __log = "[S3I]: " + value
            APP_LOGGER.critical(__log)
Jiahang Chen's avatar
Jiahang Chen committed
505
506
507

        get_value_reply.fillGetValueReply(
            senderUUID=self.thing_id,
Jiahang Chen's avatar
Jiahang Chen committed
508
            receiverUUIDs=[request_sender],
509
            results=value,
Jiahang Chen's avatar
Jiahang Chen committed
510
511
512
            msgUUID=reply_msg_uuid,
            replyingToUUID=request_msg_id,
        )
Jiahang Chen's avatar
Jiahang Chen committed
513

514
        res = self.__send_message_to_broker(
Jiahang Chen's avatar
Jiahang Chen committed
515
            receiver_endpoints=[request_sender_endpoint],
516
            msg=get_value_reply.msg
Jiahang Chen's avatar
Jiahang Chen committed
517
        )
518

519
520
        if self.__is_broker_rest:
            if res.status_code == 201:
Jiahang Chen's avatar
Jiahang Chen committed
521
522
                __log = "[S3I][Broker]: Send S3I-B GetValueReply back to the requester"
                APP_LOGGER.info(__log)
523
            else:
Jiahang Chen's avatar
Jiahang Chen committed
524
525
                __log = "[S3I[Broker]: " + res.text
                APP_LOGGER.info(__log)
Jiahang Chen's avatar
Jiahang Chen committed
526
527

    def _uriToData(self, uri):
528
529
530
531
532
533
534
        """Returns a copy of the value found at uri.

        :param uri: Path to value
        :rtype: Feature

        """

Jiahang Chen's avatar
Jiahang Chen committed
535
        if uri == "":
536
            return self.dt_json
Jiahang Chen's avatar
Jiahang Chen committed
537
538
539
540
        else:
            uri_list = uri.split("/")
            if uri_list[0] == "features":
                try:
541
                    return self.dt_json[uri]
Jiahang Chen's avatar
Jiahang Chen committed
542
                except KeyError:
Jiahang Chen's avatar
Jiahang Chen committed
543
                    return "Invalid attribute path"
Jiahang Chen's avatar
Jiahang Chen committed
544
545

            try:
546
                self._getValue(self.dt_json, uri_list)
Jiahang Chen's avatar
Jiahang Chen committed
547
            except:
Jiahang Chen's avatar
Jiahang Chen committed
548
                return "Invalid attribute path"
Jiahang Chen's avatar
Jiahang Chen committed
549
            if self.__resGetValue.__len__() == 0:
Jiahang Chen's avatar
Jiahang Chen committed
550
                return "Invalid attribute path"
Jiahang Chen's avatar
Jiahang Chen committed
551
552
553
554
555
556
557
558
            response = copy.deepcopy(self.__resGetValue)
            self.__resGetValue.clear()
            if response.__len__() == 1:
                return response[0]
            else:
                return response

    def _getValue(self, source, uri_list):
559
560
561
562
563
564
565
566
567
568
        """Searches for the value specified by uri_list in source and stores
        the result in __resGetValue.

        :param source: Object that is scanned
        :param uri_list: List containing path

        """

        # ??? What if the uri points to a Value object?
        # Shouldn't it be serialized?!
Jiahang Chen's avatar
Jiahang Chen committed
569
570
571
572
573
574
575
        value = source[uri_list[0]]
        if uri_list.__len__() == 1:
            # if is ditto-feature
            if isinstance(value, str):
                try:
                    stringValue_split = value.split(":")
                    if stringValue_split[0] == "ditto-feature":
576
577
578
                        value = self.dt_json["features"][stringValue_split[1]][
                            "properties"
                        ][uri_list[0]]
Jiahang Chen's avatar
Jiahang Chen committed
579
580
581
582
583
                except:
                    pass
            self.__resGetValue.append(value)
            return
        if isinstance(value, dict):
584
            # ??? uri_list.pop(0) better?!
Jiahang Chen's avatar
Jiahang Chen committed
585
586
587
588
589
590
591
592
593
594
595
596
597
            del uri_list[0]
            self._getValue(value, uri_list)
        if isinstance(value, list):
            if isinstance(value[0], (str, int, float, bool, list)):
                return value
            if isinstance(value[0], dict):
                for item in value:
                    if item["class"] == "ml40::Thing":
                        for i in item["roles"]:
                            if self._findValue(i, uri_list[1]):
                                uri_list_1 = copy.deepcopy(uri_list)
                                del uri_list_1[:2]
                                self._getValue(item, uri_list_1)
Jiahang Chen's avatar
Jiahang Chen committed
598
599
600
601
602
603
                        _f = self._findValue({"identifier": item.get("identifier")}, uri_list[1]) or \
                             self._findValue({"name": item.get("name")}, uri_list[1])
                        if _f:
                            uri_list_1 = copy.deepcopy(uri_list)
                            del uri_list_1[:2]
                            self._getValue(item, uri_list_1)
Jiahang Chen's avatar
Jiahang Chen committed
604
605
606
607
608
609
610
611
612
613
614
615
616
617
618
                    else:
                        if self._findValue(item, uri_list[1]):
                            uri_list_1 = copy.deepcopy(uri_list)
                            del uri_list_1[:2]
                            if not uri_list_1:
                                self.__resGetValue.append(item)
                                return
                            else:
                                self._getValue(item, uri_list_1)
        if isinstance(value, (str, int, float, bool)):
            # if is ditto-feature
            if isinstance(value, str):
                try:
                    stringValue_split = value.split(":")
                    if stringValue_split[0] == "ditto-feature":
619
                        value = self.dt_json["features"][stringValue_split[1]][
Jiahang Chen's avatar
Jiahang Chen committed
620
621
622
623
624
625
626
                            "properties"
                        ][uri_list[0]]
                except:
                    pass
            self.__resGetValue.append(value)

    def _findValue(self, json, value):
627
628
629
630
631
632
633
634
635
636
        """Returns true if value has been found in json, otherwise returns false.

        :param json: dictionary
        :param value:
        :returns:
        :rtype:

        """

        # TODO: Simplify: value in json.values()
Jiahang Chen's avatar
Jiahang Chen committed
637
638
639
640
641
        for item in json:
            if json[item] == value:
                # print("Parameter: ", json[item])
                return True
        return False
C. Albrecht's avatar
WIP    
C. Albrecht committed
642
643

    def on_service_request(self, body_json):
Jiahang Chen's avatar
Jiahang Chen committed
644
645
        """Handles S³I-B ServiceRequests. Executes the method of the
        functionality specified in serviceType and send a ServiceReply
646
647
648
649
650
        back to the sender.

        :param body_json: ServiceRequest

        """
Jiahang Chen's avatar
Jiahang Chen committed
651
        __log = "[S3I][Broker]: You have received a S3I-B ServiceRequest"
C. Albrecht's avatar
WIP    
C. Albrecht committed
652
        service_type = body_json.get("serviceType")
653
        parameters = body_json.get("parameters")
654
        service_reply = ServiceReply()
Jiahang Chen's avatar
Jiahang Chen committed
655
        service_functionality = service_type.split('/')[0]
Jiahang Chen's avatar
Jiahang Chen committed
656
        service_functionality_obj = self.features.get(service_functionality)
Jiahang Chen's avatar
Jiahang Chen committed
657
        if service_functionality_obj is None:
C. Albrecht's avatar
WIP    
C. Albrecht committed
658
            APP_LOGGER.critical(
Jiahang Chen's avatar
Jiahang Chen committed
659
                "[S3I]: Functionality %s is not one of the built-in functionalities in %s!"
660
                % (service_functionality, self.name)
C. Albrecht's avatar
WIP    
C. Albrecht committed
661
            )
GromeTT's avatar
GromeTT committed
662
663
            service_reply.fillServiceReply(
                senderUUID=self.thing_id,
Jiahang Chen's avatar
Jiahang Chen committed
664
                receiverUUIDs=[body_json.get("sender", None)],
GromeTT's avatar
GromeTT committed
665
666
667
668
669
                serviceType=body_json.get("serviceType", None),
                results={"error": "invalid functionalities (serviceType) {}".format(service_functionality)},
                replyingToUUID=body_json.get("identifier", None),
                msgUUID="s3i:{}".format(uuid.uuid4())
            )
670
671
672
673
674
675
        else:
            # TODO: Call right functionality.
            try:
                method = getattr(service_functionality_obj, service_type.split('/')[1])
            except AttributeError:
                APP_LOGGER.critical(
Jiahang Chen's avatar
Jiahang Chen committed
676
                    "[S3I]: Method %s is not one of the built-in functionalities in %s!" % (
677
678
679
680
                        service_type.split('/')[1], self.name)
                )
                service_reply.fillServiceReply(
                    senderUUID=self.thing_id,
Jiahang Chen's avatar
Jiahang Chen committed
681
                    receiverUUIDs=[body_json.get("sender", None)],
682
                    serviceType=body_json.get("serviceType", None),
683
684
685
686
687
688
                    results={"error": "invalid method {}".format(service_type.split('/')[1])},
                    replyingToUUID=body_json.get("identifier", None),
                    msgUUID="s3i:{}".format(uuid.uuid4())
                )
            except IndexError:
                APP_LOGGER.critical(
Jiahang Chen's avatar
Jiahang Chen committed
689
                    "[S3I]: ServiceType consists of functionality and method name."
690
691
692
693
694
695
                )
                service_reply.fillServiceReply(
                    senderUUID=self.thing_id,
                    receiverUUIDs=[body_json.get("sender", None)],
                    serviceType=body_json.get("serviceType", None),
                    results={"error": "method missing"},
696
697
698
699
                    replyingToUUID=body_json.get("identifier", None),
                    msgUUID="s3i:{}".format(uuid.uuid4())
                )
            else:
Jiahang Chen's avatar
Jiahang Chen committed
700
701
702
                __log = "[S3I][Broker]: Execute the function {0} of the class {1}".format(service_type.split('/')[1],
                                                                                          service_type.split('/')[0])
                APP_LOGGER.info(__log)
703
704
705
                try:
                    result = method(**parameters)
                except TypeError:
Jiahang Chen's avatar
Jiahang Chen committed
706
                    APP_LOGGER.critical("[S3I]: Invalid function arguments")
707
708
                    service_reply.fillServiceReply(
                        senderUUID=self.thing_id,
Jiahang Chen's avatar
Jiahang Chen committed
709
                        receiverUUIDs=[body_json.get("sender", None)],
710
711
712
713
                        serviceType=body_json.get("serviceType", None),
                        results={"error": "invalid function arguments (parameters)"},
                        replyingToUUID=body_json.get("identifier", None),
                        msgUUID="s3i:{}".format(uuid.uuid4())
714
715
                    )
                else:
716
717
                    if isinstance(result, bool):
                        result = {"ok": result}
718
                    elif result is None:
719
                        return
720
721
                    service_reply.fillServiceReply(
                        senderUUID=self.thing_id,
Jiahang Chen's avatar
Jiahang Chen committed
722
                        receiverUUIDs=[body_json.get("sender", None)],
723
724
725
726
727
728
                        serviceType=body_json.get("serviceType", None),
                        results=result,
                        replyingToUUID=body_json.get("identifier", None),
                        msgUUID="s3i:{}".format(uuid.uuid4())
                    )

729
730
731
732
733
        res = self.__send_message_to_broker(
            receiver_endpoints=[body_json.get("replyToEndpoint", None)],
            msg=service_reply.msg
        )

734
735
        if self.__is_broker_rest:
            if res.status_code == 201:
Jiahang Chen's avatar
Jiahang Chen committed
736
737
                __log = "[S3I][Broker]: Send a S3I-B ServiceReply back to the requester"
                APP_LOGGER.info(__log)
738
            else:
Jiahang Chen's avatar
Jiahang Chen committed
739
                APP_LOGGER.critical(__log)
Jiahang Chen's avatar
Jiahang Chen committed
740

741
742
743
744
745
746
747
748
749
750
751
752
753
754
755
756
757
758
    def on_set_value_request(self, msg):
        """Handles incoming S³I-B SetValueRequest. Prints the content of msg to stdout.

        :param msg: GetValueReply

        """
        __log = "[S3I][Broker]: You have received a S3I-B SetValueRequest"
        APP_LOGGER.info(__log)

        set_value_reply = SetValueReply()
        request_sender = msg.get("sender")
        request_msg_id = msg.get("identifier")
        request_sender_endpoint = msg.get("replyToEndpoint")
        attribute_path = msg.get("attributePath")
        new_value = msg.get("newValue")
        reply_msg_uuid = "s3i:" + str(uuid.uuid4())

        try:
759
            __log = "[S3I]: Search for the given attribute path: {}".format(attribute_path)
760
            APP_LOGGER.info(__log)
761
762
763
764
765
766
            old_value = self._uriToData(attribute_path)
            ins = self._uriToIns(attribute_path)
            APP_LOGGER.info("[S3I]: Change value from {} to {}".format(old_value, new_value))
            result = self._set_value_req(ins, new_value, attribute_path)

        except:
767
768
769
770
771
772
773
774
775
776
777
778
779
780
781
782
783
784
785
786
787
788
789
790
            __log = "[S3I]: Invalid attribute path"
            APP_LOGGER.critical(__log)
            result = False

        set_value_reply.fillSetValueReply(
            senderUUID=self.thing_id,
            receiverUUIDs=[request_sender],
            ok=result,
            replyingToUUID=request_msg_id,
            msgUUID=reply_msg_uuid
        )
        res = self.__send_message_to_broker(
            receiver_endpoints=[request_sender_endpoint],
            msg=set_value_reply.msg
        )

        if self.__is_broker_rest:
            if res.status_code == 201:
                __log = "[S3I][Broker]: Send S3I-B GetValueReply back to the requester"
                APP_LOGGER.info(__log)
            else:
                __log = "[S3I[Broker]: " + res.text
                APP_LOGGER.info(__log)

791
792
793
794
795
796
797
798
799
800
801
802
803
804
805
806
807
808
809
810
811
812
813
814
815
816
817
818
819
820
821
822
823
824
825
826
827
828
829
830
831
832
833
834
835
836
837
838
839
840
841
842
843
844
845
846
847
848
849
    def _set_value_req(self, ins, new_value, attribute_path):
        if not isinstance(new_value, dict):
            attr_list = attribute_path.split("/")
            if attr_list.__len__() <= 2:
                APP_LOGGER.info("Not allowed to set attribute {}".format(attribute_path))
                return False
            else:
                if hasattr(ins, attr_list[attr_list.__len__() - 1]):
                    setattr(ins, attr_list[attr_list.__len__() - 1], new_value)
                    return True
                APP_LOGGER.info("{} is not one of the attributes".format(attr_list[attr_list.__len__() - 1]))
                return False
        else:
            for key in new_value.keys():
                if hasattr(ins, key):
                    setattr(ins, key, new_value[key])
                else:
                    APP_LOGGER.info("{} is not one of the attributes".format(key))
                    return False
            return True

    def _uriToIns(self, uri):
        if not uri:
            return None
        uri_list = uri.split("/")
        uri_list.pop(0)  # delete first element "attributes"
        return self._getInstance(self, uri_list)

    def _getInstance(self, source_obj, uri_list):
        if uri_list.__len__() == 0 or uri_list.__len__() == 1:
            ### the original uri was "attributes/features"
            return source_obj

        if "ml40" in uri_list[0]:
            _uri = uri_list[0]
            uri_list.pop(0)
            return self._getInstance(source_obj.features[_uri], uri_list)

        elif uri_list[0] == "features":
            uri_list.pop(0)
            return self._getInstance(source_obj, uri_list)

        elif uri_list[0] == "targets":
            uri_list.pop(0)
            for key in source_obj.targets.keys():
                subthing_dict = source_obj.targets[key].to_subthing_json()
                if subthing_dict.get("name", "") == uri_list[0] or subthing_dict.get("identifier", "") == uri_list[0] \
                        or subthing_dict.get("class", "") == uri_list[0]:
                    uri_list.pop(0)
                    return self._getInstance(source_obj.targets[key], uri_list)

        elif uri_list[0] == "subFeatures":
            uri_list.pop(0)
            for key in source_obj.subFeatures.keys():
                subfeature_dict = source_obj.subFeatures[key].to_json()
                if subfeature_dict.get("name", "") == uri_list[0] or subfeature_dict.get("identifier", "")==uri_list[0] \
                    or subfeature_dict.get("class", "") == uri_list[0]:
                    uri_list.pop(0)
                    return self._getInstance(source_obj.subFeatures[key], uri_list)
850

Jiahang Chen's avatar
Jiahang Chen committed
851
    def on_get_value_reply(self, msg):
Jiahang Chen's avatar
Jiahang Chen committed
852
        """Handles incoming S³I-B GetValueReply. Prints the content of msg to stdout.
853
854
855
856
857
858
859

        :param msg: GetValueReply

        """

        # ???: Behavior should be defined by the user! Maybe he want
        # to process the result!
Jiahang Chen's avatar
Jiahang Chen committed
860
861
        __log = "[S3I][Broker]: You have received a S3I-B GetValueReply"
        APP_LOGGER.info(__log)
Jiahang Chen's avatar
Jiahang Chen committed
862
        value = msg.get("value", None)
863
        if isinstance(value, (dict, list)):
Jiahang Chen's avatar
Jiahang Chen committed
864
            value = json.dumps(value, indent=2)
Jiahang Chen's avatar
Jiahang Chen committed
865
866
        __log = "[S3I][Broker]: The queried value is: {0}".format(value)
        APP_LOGGER.info(__log)
Jiahang Chen's avatar
Jiahang Chen committed
867

Jiahang Chen's avatar
Jiahang Chen committed
868
    def on_service_reply(self, msg):
Jiahang Chen's avatar
Jiahang Chen committed
869
        """Handles incoming S³I-B ServiceReply. Prints the content of msg to stdout.
870
871
872
873

        :param msg: ServiceReply

        """
Jiahang Chen's avatar
Jiahang Chen committed
874
875
        __log = "[S3I][Broker]: You have received a S3I-B ServiceReply"
        APP_LOGGER.info(__log)
Jiahang Chen's avatar
Jiahang Chen committed
876
877
878
879
        results = msg.get("results", None)
        if isinstance(results, dict):
            results = json.dumps(results, indent=2)

Jiahang Chen's avatar
Jiahang Chen committed
880
881
        __log = "[S3I][Broker]: The result is: {0}".format(results)
        APP_LOGGER.info(__log)
Jiahang Chen's avatar
Jiahang Chen committed
882

883
884
885
886
887
888
889
890
891
    def on_set_value_reply(self, msg):
        """Handles incoming S³I-B SetValueReply. Prints the content of msg to stdout.

        :param msg: GetValueReply

        """
        __log = "[S3I][Broker]: You have received a S3I-B SetValueReply"
        APP_LOGGER.info(__log)
        result = msg.get("ok", None)
892
        __log = "[S3I][Broker]: The status of value setting: {0}".format(result)
893
894
        APP_LOGGER.info(__log)

Jiahang Chen's avatar
Jiahang Chen committed
895
    def to_dir_json(self):
896
897
898
899
900
901
902
        """Returns a dictionary representing this thing's directory entry.

        :returns: Directory representation of this object
        :rtype: dict

        """

Jiahang Chen's avatar
Jiahang Chen committed
903
904
905
906
907
        self.dir_json = self.dir.queryThingIDBased(self.thing_id)
        if self.thing_id is not None:
            self.dir_json["thingId"] = self.thing_id
        if self.policy_id is not None:
            self.dir_json["policyId"] = self.policy_id
Jiahang Chen's avatar
Jiahang Chen committed
908
909
910
911
912
        if self.name is not None:
            self.dir_json["attributes"]["name"] = self.name
        if self.features.get("ml40::Location") is not None:
            self.dir_json["attributes"]["location"] = {
                "longitude": self.features.get("ml40::Location").to_json()["longitude"],
GromeTT's avatar
GromeTT committed
913
                "latitude": self.features.get("ml40::Location").to_json()["latitude"]
Jiahang Chen's avatar
Jiahang Chen committed
914
915
916
917
            }
        self.dir_json["attributes"]["dataModel"] = "fml40"
        self.dir_json["attributes"]["thingStructure"] = {
            "class": "ml40::Thing",
GromeTT's avatar
GromeTT committed
918
            "links": []
Jiahang Chen's avatar
Jiahang Chen committed
919
        }
Jiahang Chen's avatar
Jiahang Chen committed
920
        for key in self.roles.keys():
GromeTT's avatar
GromeTT committed
921
922
923
924
            role_entry = {
                "association": "roles",
                "target": self.roles[key].to_json()
            }
Jiahang Chen's avatar
Jiahang Chen committed
925
            self.dir_json["attributes"]["thingStructure"]["links"].append(role_entry)
Jiahang Chen's avatar
Jiahang Chen committed
926

Jiahang Chen's avatar
Jiahang Chen committed
927
        for key in self.features.keys():
Jiahang Chen's avatar
Jiahang Chen committed
928
929
            feature_target = {
                "class": self.features[key].to_json()["class"],
Jiahang Chen's avatar
Jiahang Chen committed
930
            }
Jiahang Chen's avatar
Jiahang Chen committed
931
932
933
            if self.features[key].to_json().get("identifier") is not None:
                feature_target["identifier"] = self.features[key].to_json()["identifier"]

934
            feature_entry = {"association": "features", "target": feature_target}
Jiahang Chen's avatar
Jiahang Chen committed
935
            # if the feature has targets, like ml40::Composite
936
            if hasattr(self.features[key], "targets"):
Jiahang Chen's avatar
Jiahang Chen committed
937
938
                feature_entry["target"]["links"] = list()
                for target in self.features[key].targets.keys():
939
940
941
                    target_json = (
                        self.features[key].targets[target].to_subthing_dir_json()
                    )
Jiahang Chen's avatar
Jiahang Chen committed
942
943
                    feature_entry["target"]["links"].append(target_json)
            self.dir_json["attributes"]["thingStructure"]["links"].append(feature_entry)
Jiahang Chen's avatar
Jiahang Chen committed
944
945
        return self.dir_json

946
    def to_repo_json(self):
947
948
949
950
951
952
953
        """Returns a dictionary representing this thing's repository entry.

        :returns: Repository representation of this object
        :rtype: dict

        """

954
        self.repo_json = self.dt_json
Jiahang Chen's avatar
Jiahang Chen committed
955
956
        return self.repo_json

957
    def to_json(self):
958
959
960
961
962
963
964
        """Returns a dictionary representing this thing in it's current state.

        :returns: Representation of this object
        :rtype: dict

        """

965
966
967
968
969
970
        self.dt_json = {
            "thingId": self.thing_id,
            "policyId": self.policy_id,
            "attributes": {
                "class": "ml40::Thing",
                "name": self.name,
971
            },
972
973
974
975
976
        }
        if self.roles:
            self.dt_json["attributes"]["roles"] = list()
        if self.features:
            self.dt_json["attributes"]["features"] = list()
Jiahang Chen's avatar
Jiahang Chen committed
977
        if self.ditto_features:
Jiahang Chen's avatar
Jiahang Chen committed
978
            self.dt_json["features"] = dict()
979
980
        for key in self.roles.keys():
            self.dt_json["attributes"]["roles"].append(self.roles[key].to_json())
Jiahang Chen's avatar
Jiahang Chen committed
981
        for key in self.features.keys():
982
            self.dt_json["attributes"]["features"].append(self.features[key].to_json())
Jiahang Chen's avatar
Jiahang Chen committed
983
984
        for key in self.ditto_features.keys():
            self.dt_json["features"][key] = self.ditto_features[key].to_json()
985
986
        return self.dt_json

Jiahang Chen's avatar
Jiahang Chen committed
987
    def to_subthing_json(self):
988
989
990
        """Returns a dictionary representing this thing in it's current state
        as a subordinate thing. This representation should be used for
        subordinate things in s3i repository entries.
991

992
993
        :returns: Representation of this object as a subordinate thing
        :rtype: dict
994
995
996

        """

Jiahang Chen's avatar
Jiahang Chen committed
997
998
999
1000
        json_out = {
            "class": "ml40::Thing",
            "name": self.name,
            "roles": [],
1001
            "features": [],
Jiahang Chen's avatar
Jiahang Chen committed
1002
        }
Jiahang Chen's avatar
Jiahang Chen committed
1003
1004
        if self.thing_id:
            json_out["identifier"] = self.thing_id
Jiahang Chen's avatar
Jiahang Chen committed
1005
        else:
Jiahang Chen's avatar
Jiahang Chen committed
1006
1007
            if self.model["attributes"].get("identifier") is not None:
                json_out["identifier"] = self.model["attributes"]["identifier"]
Jiahang Chen's avatar
Jiahang Chen committed
1008
1009
1010
1011
1012
        for key in self.roles.keys():
            json_out["roles"].append(self.roles[key].to_json())
        for key in self.features.keys():
            json_out["features"].append(self.features[key].to_json())
        return json_out
Jiahang Chen's avatar
Jiahang Chen committed
1013
1014

    def to_subthing_dir_json(self):
1015
1016
1017
1018
1019
1020
1021
1022
1023
        """Returns a dictionary representing this thing in it's current state
        as a subordinate thing. This representation should be used for
        subordinate things in s3i directory entries.

        :returns: Representation of this object as a subordinate thing.
        :rtype: dict

        """

Jiahang Chen's avatar
Jiahang Chen committed
1024
1025
1026
        json_out = {"class": "ml40::Thing", "links": []}
        if self.thing_id:
            json_out["identifier"] = self.thing_id
Jiahang Chen's avatar
Jiahang Chen committed
1027
        for key in self.roles.keys():
1028
            role_entry = {"association": "roles", "target": self.roles[key].to_json()}
Jiahang Chen's avatar
Jiahang Chen committed
1029
1030
            json_out["links"].append(role_entry)
        for key in self.features.keys():
Jiahang Chen's avatar
Jiahang Chen committed
1031
1032
            feature_target = {
                "class": self.features[key].to_json()["class"],
Jiahang Chen's avatar
Jiahang Chen committed
1033
            }
Jiahang Chen's avatar
Jiahang Chen committed
1034
1035
            if self.features[key].to_json().get("identifier") is not None:
                feature_target["identifier"] = self.features[key].to_json()["identifier"]
1036
            feature_entry = {"association": "features", "target": feature_target}
Jiahang Chen's avatar
Jiahang Chen committed
1037
1038
            json_out["links"].append(feature_entry)
        return json_out