Skip to content
Snippets Groups Projects
Select Git revision
  • a0a03a389512026fbffd4fb62ad4a7e82bdc8ca1
  • master default protected
  • condition_monitoring
  • add_smart_forestry_features
  • add_cc4w_features
  • add_SiteStruct_features
  • s3i_v3
  • RIF-waldboden40
  • message_callbacks
  • response_handling_with_async_events
  • fix_enum
  • Einarbeitung_Peng
  • 16-add-Classes-from-DigiKomForst-project
  • digiKomForstServices
  • update_doc
  • 12-emitting-named-events-crashes-application
  • DZ-Wald
  • 1.0.0
  • 0.2.12
  • 0.2.11
  • 0.2.10
  • 0.2.9
  • 0.2.8
  • 0.2.7
  • 0.2.6
  • 0.2.5.3
  • 0.2.5.2
  • 0.2.5.1
  • 0.2.5
  • v0.2.0
30 results

thing.py

Blame
  • Code owners
    Assign users and groups as approvers for specific file changes. Learn more.
    thing.py 10.54 KiB
    import ast
    import threading
    import websocket
    import json
    import uuid
    import requests
    from s3i import IdentityProvider, TokenType, GetValueReply, Directory
    from s3i.broker import Broker, BrokerREST, BrokerMetaClass
    from s3i.messages import ServiceReply
    
    from ml.managing_actor import ManagingActor
    from ml.tools import BColors
    from ml.tools import find_broker_endpoint
    from ml.app_logger import APP_LOGGER
    
    
    class BaseVariable(object):
        IDP_URL = "https://idp.s3i.vswf.dev/"
        IDP_REALM = "KWH"
        BROKER_HOST = "rabbitmq.s3i.vswf.dev"
        REPO_WWS_URL = "wss://ditto.s3i.vswf.dev/ws/2"
        REPO_URL = "https://ditto.s3i.vswf.dev/api/2/"
        DIR_URL = "https://dir.s3i.vswf.dev/api/2/"
    
    
    def get_sensor_uuid(body_json):
        parameters = body_json.get("parameters", {})
        sensor_uuid = parameters.get("sensor_uuid", "")
        return sensor_uuid
    
    
    class Thing(ManagingActor):
        def __init__(
                self,
                client_secret,
                model: dict,
                grant_type: str,
                is_broker: bool,
                is_repo: bool,
                is_broker_rest=True,
                username=None,
                password=None,
        ):
            super(Thing, self).__init__()
            self.__thing_id = model.get("thingId", "")
            self.__policy_id = model.get("policyId", "")
            self.__grant_type = grant_type
            self.__username = username
            self.__password = password
            self.__client_secret = client_secret
    
            self.__is_broker = is_broker
            self.__is_broker_rest = is_broker_rest
            self.__is_repo = is_repo
    
            self.__access_token = ""
    
            self.__endpoint = ""
    
            self.__ws_connected = False
            self.broker = None
            self.ws = None
            self.dir = None
    
            attributes = model.get("attributes", None)
            self.__name = ""
            self.__class_name = ""
            self.__type_name = ""
            self.roles = []
            self.__features = {}
    
            if attributes:
                self.__name = attributes.get("name", "")
                self.__class_name = attributes.get("class", "")
                self.__type_name = attributes.get("type", "")
    
        @property
        def features(self):
            return self.__features
    
        @features.setter
        def features(self, value):
            self.__features = value
    
        @property
        def class_name(self):
            return self.__class_name
    
        @property
        def client_secret(self):
            return self.__client_secret
    
        @property
        def grant_type(self):
            return self.__grant_type
    
        @property
        def access_token(self):
            return self.__access_token
    
        @property
        def name(self):
            return self.__name
    
        @property
        def thing_id(self):
            return self.__thing_id
    
        def run_forever(self):
            print("[S³I]: Launch {}{}{}".format(BColors.OKGREEN, self.name, BColors.ENDC))
            self.__connect_with_idp()
    
        def __connect_with_idp(self):
            print(BColors.OKBLUE + "[S³I][IdP]" + BColors.ENDC + ": Connect with S3I-IdentityProvider")
            idp = IdentityProvider(
                grant_type=self.__grant_type,
                client_id=self.__thing_id,
                username=self.__username,
                password=self.__password,
                client_secret=self.__client_secret,
                realm=BaseVariable.IDP_REALM,
                identity_provider_url=BaseVariable.IDP_URL,
            )
    
            # This may take a while so fetch token directly.
            idp.run_forever(token_type=TokenType.ACCESS_TOKEN, on_new_token=self.__on_token)
    
        def __on_token(self, token):
            self.__access_token = token
            self.__connect_with_dir()
            if self.__is_broker:
                self.__connect_with_broker()
            if self.__is_repo:
                self.__connect_with_repo()
    
        def __connect_with_dir(self):
            print(
                BColors.OKBLUE
                + "[S³I][Dir]"
                + BColors.ENDC
                + ": Connect with S3I-Directory"
            )
            self.dir = Directory(s3i_dir_url="https://dir.s3i.vswf.dev/api/2/", token=self.__access_token)
    
        def __connect_with_repo(self):
            print(
                BColors.OKBLUE
                + "[S³I][Repo]"
                + BColors.ENDC
                + ": Connect with S3I-Repository"
            )
    
            self._ws = websocket.WebSocketApp(
                BaseVariable.REPO_WWS_URL,
                header={"Authorization: Bearer {}".format(self.__access_token)},
                on_message=self.__on_new_websocket_message,
                on_error=self.__on_new_websocket_error,
                on_open=self.__on_websocket_connection_opened,
                on_close=self.__on_websocket_connection_closed,
            )
    
            threading.Thread(target=self._ws.run_forever).start()
    
        @staticmethod
        def __on_new_websocket_message(ws, msg):
            pass
    
        @staticmethod
        def __on_new_websocket_error(ws, error):
            print(BColors.OKBLUE + "[S³I][Repo]" + BColors.ENDC + " : Websocekt error")
    
        def __on_websocket_connection_opened(self):
            self.__ws_connected = True
            self._ws.send("START-SEND-MESSAGES")
            print(
                BColors.OKBLUE
                + "[S³I][Repo]"
                + BColors.ENDC
                + ": Websocket connection built"
            )
    
        def __on_websocket_connection_closed(self):
            self.__ws_connected = False
            print(
                BColors.OKBLUE
                + "[S³I][Repo]"
                + BColors.ENDC
                + ": Websocket connection closed"
            )
    
        def sync_with_repo(self, path, topic):
            if not self.__ws_connected:
                return None
            msg = {"topic": topic, "path": path, "value": self.fml40_data_model[path]}
            self._ws.send(json.dumps(msg))
    
        def __connect_with_broker(self):
            print(BColors.OKBLUE + "[S³I][Broker]" + BColors.ENDC + ": Connect with S3I-Broker")
            self.__endpoint = find_broker_endpoint(self.dir, thing_id=self.thing_id)
    
            if self.__is_broker_rest:
                self.broker = BrokerREST(token=self.access_token)
    
                def receive():
                    while True:
                        msg_str = self.broker.receive_once(self.__endpoint)
                        if msg_str == "":
                            continue
                        else:
                            self.__on_broker_callback(ch=None, method=None, properties=None,
                                                      body=json.loads(msg_str))
    
                threading.Thread(
                    target=receive,
                ).start()
    
    
    
            else:
                self.broker = Broker(
                    auth_form="Username/Password",
                    username=" ",
                    password=self.__access_token,
                    host=BaseVariable.BROKER_HOST,
                )
    
                threading.Thread(
                    target=self.broker.receive,
                    args=(self.__endpoint, self.__on_broker_callback),
                ).start()
    
        def __on_broker_callback(self, ch, method, properties, body):
            if isinstance(body, bytes):
                body = ast.literal_eval(body.decode("utf-8"))
            elif isinstance(body, int):
                print(body)
                return
            elif isinstance(body, str):
                print(body)
                return
    
            message_type = body.get("messageType")
            if message_type == "userMessage":
                self.on_user_message(body)
            elif message_type == "serviceRequest":
                self.on_service_request(body)
            elif message_type == "getValueRequest":
                self.on_get_value_request(body)
            elif message_type == "getValueReply":
                self.on_get_value_reply(body)
            elif message_type == "serviceReply":
                self.on_service_reply(body)
            else:
                ### TODO send user message reply back
                pass
    
        def on_user_message(self, msg):
            pass
    
        def on_get_value_request(self, msg):
            get_value_reply = GetValueReply()
            request_sender = msg.get("sender", "")
            request_msg_id = msg.get("identifier")
            request_sender_endpoint = msg.get("replyToEndpoint")
            request_sender_endpoint = f"s3ib://{request_sender_endpoint}"
            attribute_path = msg.get("attributePath")
            reply_msg_uuid = "s3i:" + str(uuid.uuid4())
            # TODO: Get correct value.
            value = None
    
            if attribute_path == "features/moisture":
                moisture = self.features.get("ml40::Moisture", None)
                moisture_proxy = moisture.proxy()
                value = moisture_proxy.humidity.get()
    
            get_value_reply.fillGetValueReply(
                senderUUID=self.__thing_id,
                receiverUUID=[request_sender],
                results=value,
                msgUUID=reply_msg_uuid,
                replyingToUUID=request_msg_id,
            )
            msg_txt = get_value_reply.msg.__str__()
            send_resp = self.broker.send(
                receiver_endpoints=[request_sender_endpoint], msg=msg_txt,
            )
    
        def on_service_request(self, body_json):
            service_type = body_json.get("serviceType")
            service_functionality = service_type.split('/')[0]
            service_functionality_obj = self.features.get(service_functionality, None)
            if service_functionality_obj is None:
                APP_LOGGER.critical(
                    "Functionality %s is not one of the built-in functionalities in %s!" % (
                        service_functionality, self.name)
                )
            else:
                pass
                # TODO: Call right functionality.
                func_proxy = service_functionality_obj.proxy()
                method = getattr(func_proxy, service_type.split('/')[1])
                result = {"ok": method("test_job").get()}
    
                service_reply = ServiceReply()
                service_reply.fillServiceReply(
                    senderUUID=self.thing_id,
                    receiverUUID=body_json.get("sender", None),
                    serviceType=body_json.get("serviceType", None),
                    results=result,
                    replyingToUUID=body_json.get("identifier", None),
                    msgUUID="s3i:{}".format(uuid.uuid4())
                )
                receiver_eps = list()
                for r in service_reply.msg.get("receivers", None):
                    receiver_ep = find_broker_endpoint(self.dir, r)
                    receiver_eps.append(receiver_ep)
    
                self.broker.send(receiver_endpoints=receiver_eps, msg=json.dumps(service_reply.msg))
    
        def on_get_value_reply(self, msg):
            print(msg)
    
        def on_service_reply(self, msg):
            print(msg)
    
        def add_function_impl(self, obj, feature_name):
            feature = self.__features.get(feature_name, None)
    
            if feature is None:
                APP_LOGGER.critical(
                    "Functionality %s is not one of the build-in functionalities" % feature_name
                )
            else:
                self.__features[feature_name] = obj.start("", self.actor_ref)