Skip to content
Snippets Groups Projects
Select Git revision
  • d96ffe7499a2e3b4ce492b67b9ae45772ecc21e2
  • master default protected
  • condition_monitoring
  • add_smart_forestry_features
  • add_cc4w_features
  • add_SiteStruct_features
  • s3i_v3
  • RIF-waldboden40
  • message_callbacks
  • response_handling_with_async_events
  • fix_enum
  • Einarbeitung_Peng
  • 16-add-Classes-from-DigiKomForst-project
  • digiKomForstServices
  • update_doc
  • 12-emitting-named-events-crashes-application
  • DZ-Wald
  • 1.0.0
  • 0.2.12
  • 0.2.11
  • 0.2.10
  • 0.2.9
  • 0.2.8
  • 0.2.7
  • 0.2.6
  • 0.2.5.3
  • 0.2.5.2
  • 0.2.5.1
  • 0.2.5
  • v0.2.0
30 results

thing.py

Blame
  • Code owners
    Assign users and groups as approvers for specific file changes. Learn more.
    thing.py 17.53 KiB
    import ast
    import threading
    import json
    import uuid
    from s3i import IdentityProvider, TokenType, GetValueReply, Directory, Repository
    from s3i.broker import Broker, BrokerREST
    from s3i.messages import ServiceReply
    from ml.tools import BColors
    from ml.tools import find_broker_endpoint
    from ml.app_logger import APP_LOGGER
    import time
    import copy
    
    
    class BaseVariable(object):
        IDP_URL = "https://idp.s3i.vswf.dev/"
        IDP_REALM = "KWH"
        BROKER_HOST = "rabbitmq.s3i.vswf.dev"
        REPO_WWS_URL = "wss://ditto.s3i.vswf.dev/ws/2"
        REPO_URL = "https://ditto.s3i.vswf.dev/api/2/"
        DIR_URL = "https://dir.s3i.vswf.dev/api/2/"
    
    
    class Thing:
        def __init__(
                self,
                model: dict,
                client_secret="",
                grant_type="password",
                is_broker=False,
                is_repo=False,
                is_broker_rest=True,
                username=None,
                password=None,
        ):
            self.__model = model
            self.__thing_id = model.get("thingId", "")
            self.__policy_id = model.get("policyId", "")
            self.__grant_type = grant_type
            self.__username = username
            self.__password = password
            self.__client_secret = client_secret
    
            self.__is_broker = is_broker
            self.__is_broker_rest = is_broker_rest
            self.__is_repo = is_repo
    
            self.__access_token = ""
    
            self.__endpoint = ""
    
            self.__ws_connected = False
            self.broker = None
            self.ws = None
            self.dir = None
            self.repo = None
    
            self.repo_json = dict()
            self.dir_json = dict()
    
            attributes = model.get("attributes", None)
            self.__name = ""
            self.__roles = {}
            self.__features = {}
            self.__resGetValue = list()
            if attributes:
                self.__name = attributes.get("name", "")
    
        @property
        def model(self):
            return self.__model
    
        @property
        def features(self):
            return self.__features
    
        @features.setter
        def features(self, value):
            self.__features = value
    
        @property
        def roles(self):
            return self.__roles
    
        @roles.setter
        def roles(self, value):
            self.__roles = value
    
        @property
        def client_secret(self):
            return self.__client_secret
    
        @property
        def grant_type(self):
            return self.__grant_type
    
        @property
        def access_token(self):
            return self.__access_token
    
        @property
        def name(self):
            return self.__name
    
        @property
        def thing_id(self):
            return self.__thing_id
    
        @property
        def policy_id(self):
            return self.__policy_id
    
        def run_forever(self):
            print("[S³I]: Launch {}{}{}".format(BColors.OKGREEN, self.name, BColors.ENDC))
            self.__connect_with_idp()
    
            threading.Thread(target=self.__dir_syn).start()
            if self.__is_repo:
                threading.Thread(target=self.__repo_syn).start()
    
        @staticmethod
        def add_user_def(func):
            threading.Thread(target=func).start()
    
        def __dir_syn(self):
            while True:
                try:
                    time.sleep(0.1)
                    old_dir_json = self.dir_json
                    self.to_dir_json()
                    if self.dir_json == old_dir_json:
                        continue
                    else:
                        self.dir.updateThingIDBased(thingID=self.thing_id, payload=self.dir_json)
                except:
                    continue
    
        def __repo_syn(self):
            while self.__is_repo:
                try:
                    time.sleep(0.1)
                    old_repo_json = self.repo_json
                    self.to_repo_json()
                    if self.repo_json == old_repo_json:
                        continue
                    else:
                        self.repo.updateThingIDBased(thingID=self.thing_id, payload=self.repo_json)
                except:
                    continue
    
        def __connect_with_idp(self):
            print(BColors.OKBLUE + "[S³I][IdP]" + BColors.ENDC + ": Connect with S3I IdentityProvider")
            idp = IdentityProvider(
                grant_type=self.__grant_type,
                client_id=self.__thing_id,
                username=self.__username,
                password=self.__password,
                client_secret=self.__client_secret,
                realm=BaseVariable.IDP_REALM,
                identity_provider_url=BaseVariable.IDP_URL,
            )
    
            # This may take a while so fetch token directly.
            idp.run_forever(token_type=TokenType.ACCESS_TOKEN, on_new_token=self.__on_token)
    
        def __on_token(self, token):
            self.__access_token = token
            self.__connect_with_dir()
            self.__connect_with_repo()
            if self.__is_broker:
                self.__connect_with_broker()
    
        def __connect_with_dir(self):
            print(
                BColors.OKBLUE
                + "[S³I][Dir]"
                + BColors.ENDC
                + ": Connect with S3I Directory"
            )
            self.dir = Directory(s3i_dir_url=BaseVariable.DIR_URL, token=self.__access_token)
    
        def __connect_with_repo(self):
            print(
                BColors.OKBLUE
                + "[S³I][Repo]"
                + BColors.ENDC
                + ": Connect with S3I Repository"
            )
            self.repo = Repository(s3i_repo_url=BaseVariable.REPO_URL, token=self.__access_token)
    
        def __connect_with_broker(self):
            print(BColors.OKBLUE + "[S³I][Broker]" + BColors.ENDC + ": Connect with S3I Broker")
            self.__endpoint = find_broker_endpoint(self.dir, thing_id=self.thing_id)
            if self.__is_broker_rest:
                self.broker = BrokerREST(token=self.access_token)
    
                def receive():
                    while True:
                        try:
                            time.sleep(0.1)
                            msg_str = self.broker.receive_once(self.__endpoint)
                            if msg_str == "":
                                continue
                            else:
                                self.__on_broker_callback(ch=None, method=None, properties=None,
                                                          body=json.loads(msg_str))
                        except:
                            continue
    
                threading.Thread(
                    target=receive
                ).start()
    
            else:
                self.broker = Broker(
                    auth_form="Username/Password",
                    username=" ",
                    password=self.__access_token,
                    host=BaseVariable.BROKER_HOST,
                )
    
                threading.Thread(
                    target=self.broker.receive,
                    args=(self.__endpoint, self.__on_broker_callback),
                ).start()
    
        def __on_broker_callback(self, ch, method, properties, body):
            if isinstance(body, bytes):
                body = ast.literal_eval(body.decode("utf-8"))
            elif isinstance(body, int):
                return
            elif isinstance(body, str):
                return
    
            message_type = body.get("messageType")
            if message_type == "userMessage":
                self.on_user_message(body)
            elif message_type == "serviceRequest":
                self.on_service_request(body)
            elif message_type == "getValueRequest":
                self.on_get_value_request(body)
            elif message_type == "getValueReply":
                self.on_get_value_reply(body)
            elif message_type == "serviceReply":
                self.on_service_reply(body)
            else:
                ### TODO send user message reply back
                pass
    
        def on_user_message(self, msg):
            print(
                BColors.OKBLUE
                + "[S³I][Broker]"
                + BColors.ENDC
                + ": You have received a S³I-B UserMessage"
                + json.dumps(msg, indent=2)
            )
    
        def on_get_value_request(self, msg):
            print(
                BColors.OKBLUE
                + "[S³I][Broker]"
                + BColors.ENDC
                + ": You have received a S³I-B GetValueRequest"
                + json.dumps(msg, indent=2)
            )
            get_value_reply = GetValueReply()
            request_sender = msg.get("sender")
            request_msg_id = msg.get("identifier")
            request_sender_endpoint = msg.get("replyToEndpoint")
            attribute_path = msg.get("attributePath")
            reply_msg_uuid = "s3i:" + str(uuid.uuid4())
            try:
                print(
                    BColors.OKBLUE
                    + "[S³I]"
                    + BColors.ENDC
                    + ": Search the attribute with path: "
                    + attribute_path
                )
                value = self._uriToData(attribute_path)
            except KeyError:
                value = "invalid attribute path"
    
            get_value_reply.fillGetValueReply(
                senderUUID=self.thing_id,
                receiverUUID=[request_sender],
                results=value,
                msgUUID=reply_msg_uuid,
                replyingToUUID=request_msg_id,
            )
    
            res = self.broker.send(
                receiver_endpoints=[request_sender_endpoint],
                msg=json.dumps(get_value_reply.msg),
            )
            if res.status_code == 201:
                print(
                    BColors.OKBLUE
                    + "[S³I][Broker]"
                    + BColors.ENDC
                    + ": Send a S³I-B GetValueReply back to the requester."
                )
    
        def _uriToData(self, uri):
            if uri == "":
                return self.repo_json
            else:
                uri_list = uri.split("/")
                if uri_list[0] == "features":
                    try:
                        return self.repo_json[uri]
                    except KeyError:
                        return "invalid attribute path"
    
                try:
                    self._getValue(self.repo_json, uri_list)
                except:
                    return "invalid attribute path"
                if self.__resGetValue.__len__() == 0:
                    return "invalid attribute path"
                response = copy.deepcopy(self.__resGetValue)
                self.__resGetValue.clear()
                if response.__len__() == 1:
                    return response[0]
                else:
                    return response
    
        def _getValue(self, source, uri_list):
            value = source[uri_list[0]]
            if uri_list.__len__() == 1:
                # if is ditto-feature
                if isinstance(value, str):
                    try:
                        stringValue_split = value.split(":")
                        if stringValue_split[0] == "ditto-feature":
                            value = self.repo_json["features"][stringValue_split[1]]["properties"][uri_list[0]]
                    except:
                        pass
                self.__resGetValue.append(value)
                return
            if isinstance(value, dict):
                del uri_list[0]
                self._getValue(value, uri_list)
            if isinstance(value, list):
                if isinstance(value[0], (str, int, float, bool, list)):
                    return value
                if isinstance(value[0], dict):
                    for item in value:
                        if item["class"] == "ml40::Thing":
                            for i in item["roles"]:
                                if self._findValue(i, uri_list[1]):
                                    uri_list_1 = copy.deepcopy(uri_list)
                                    del uri_list_1[:2]
                                    self._getValue(item, uri_list_1)
                        else:
                            if self._findValue(item, uri_list[1]):
                                uri_list_1 = copy.deepcopy(uri_list)
                                del uri_list_1[:2]
                                if not uri_list_1:
                                    self.__resGetValue.append(item)
                                    return
                                else:
                                    self._getValue(item, uri_list_1)
            if isinstance(value, (str, int, float, bool)):
                # if is ditto-feature
                if isinstance(value, str):
                    try:
                        stringValue_split = value.split(":")
                        if stringValue_split[0] == "ditto-feature":
                            value = self.repo_json["features"][stringValue_split[1]][
                                "properties"
                            ][uri_list[0]]
                    except:
                        pass
                self.__resGetValue.append(value)
    
        def _findValue(self, json, value):
            for item in json:
                if json[item] == value:
                    # print("Parameter: ", json[item])
                    return True
            return False
    
        def on_service_request(self, body_json):
            print(
                BColors.OKBLUE
                + "[S³I][Broker]"
                + BColors.ENDC
                + ": You have received a S³I-B ServiceRequest "
                + json.dumps(body_json, indent=2)
            )
            service_type = body_json.get("serviceType")
            parameters = body_json.get("parameters")
            service_functionality = service_type.split('/')[0]
            service_functionality_obj = self.features.get(service_functionality)
            if service_functionality_obj is None:
                APP_LOGGER.critical(
                    "Functionality %s is not one of the built-in functionalities in %s!" % (
                        service_functionality, self.name)
                )
            else:
                # TODO: Call right functionality.
                print(
                    BColors.OKBLUE
                    + "[S³I][Broker]"
                    + BColors.ENDC
                    + ": Execute the function {0} of the class {1}.".format(service_type.split('/')[1],
                                                                           service_type.split('/')[0])
                )
                method = getattr(service_functionality_obj, service_type.split('/')[1])
                result = method(**parameters)
    
                if isinstance(result, bool):
                    result = {"ok": result}
    
                service_reply = ServiceReply()
                service_reply.fillServiceReply(
                    senderUUID=self.thing_id,
                    receiverUUID=body_json.get("sender", None),
                    serviceType=body_json.get("serviceType", None),
                    results=result,
                    replyingToUUID=body_json.get("identifier", None),
                    msgUUID="s3i:{}".format(uuid.uuid4())
                )
    
                res = self.broker.send(receiver_endpoints=[body_json.get("replyToEndpoint", None)],
                                       msg=json.dumps(service_reply.msg))
                if res.status_code == 201:
                    print(
                        BColors.OKBLUE
                        + "[S³I][Broker]"
                        + BColors.ENDC
                        + ": Send a S³I-B ServiceReply back to the requester  "
                    )
    
        def on_get_value_reply(self, msg):
            print(
                BColors.OKBLUE
                + "[S³I][Broker]"
                + BColors.ENDC
                + ": You have received a S³I-B GetValueReply"
                + json.dumps(msg, indent=2)
            )
            value = msg.get("value", None)
            if isinstance(value, dict):
                value = json.dumps(value, indent=2)
    
            print(
                BColors.OKBLUE
                + "[S³I][Broker]"
                + BColors.ENDC
                + ": The queried value is: {0}{1}{2}".format(BColors.OKGREEN, value,
                                                             BColors.ENDC)
            )
    
    
        def on_service_reply(self, msg):
            print(
                BColors.OKBLUE
                + "[S³I][Broker]"
                + BColors.ENDC
                + ": You have received a S³I-B ServiceReply"
                + json.dumps(msg, indent=2)
            )
            results = msg.get("results", None)
            if isinstance(results, dict):
                results = json.dumps(results, indent=2)
    
            print(
                BColors.OKBLUE
                + "[S³I][Broker]"
                + BColors.ENDC
                + ": The result is: {0}{1}{2}".format(BColors.OKGREEN, results,
                                                      BColors.ENDC)
            )
    
        def to_dir_json(self):
            self.dir_json = self.dir.queryThingIDBased(self.thing_id)
            if self.thing_id is not None:
                self.dir_json["thingId"] = self.thing_id
            if self.policy_id is not None:
                self.dir_json["policyId"] = self.policy_id
            self.dir_json["attributes"]["class"] = "ml40::Thing"
    
            self.dir_json["attributes"]["name"] = self.name
            if self.roles:
                self.dir_json["attributes"]["roles"] = list()
            if self.features:
                self.dir_json["attributes"]["features"] = list()
    
            for key in self.roles.keys():
                self.dir_json["attributes"]["roles"].append(self.roles[key].to_json())
            for key in self.features.keys():
                self.dir_json["attributes"]["features"].append(self.features[key].to_json())
    
            return self.dir_json
    
        def to_repo_json(self, path=None, value=None):
            if path is None and value is None:
                self.repo_json = {
                    "thingId": self.thing_id,
                    "policyId": self.policy_id,
                    "attributes": {
                        "class": "ml40::Thing",
                        "name": self.name,
                    }
                }
                if self.roles:
                    self.repo_json["attributes"]["roles"] = list()
                if self.features:
                    self.repo_json["attributes"]["features"] = list()
                for key in self.roles.keys():
                    self.repo_json["attributes"]["roles"].append(self.roles[key].to_json())
                for key in self.features:
                    self.repo_json["attributes"]["features"].append(self.features[key].to_json())
            return self.repo_json
    
        def to_subthing_json(self):
            json_out = {
                "class": "ml40::Thing",
                "name": self.name,
                "roles": [],
                "features": []
            }
            for key in self.roles.keys():
                json_out["roles"].append(self.roles[key].to_json())
            for key in self.features.keys():
                json_out["features"].append(self.features[key].to_json())
            return json_out