import ast import threading import websocket import json import uuid import requests from s3i import IdentityProvider, TokenType, GetValueReply, Directory from s3i.broker import Broker, BrokerREST, BrokerMetaClass from s3i.messages import ServiceReply from ml.tools import remove_namespace from ml.managing_actor import ManagingActor from ml.tools import BColors from ml.tools import find_broker_endpoint from ml.app_logger import APP_LOGGER class BaseVariable(object): IDP_URL = "https://idp.s3i.vswf.dev/" IDP_REALM = "KWH" BROKER_HOST = "rabbitmq.s3i.vswf.dev" REPO_WWS_URL = "wss://ditto.s3i.vswf.dev/ws/2" REPO_URL = "https://ditto.s3i.vswf.dev/api/2/" DIR_URL = "https://dir.s3i.vswf.dev/api/2/" class Thing(ManagingActor): def __init__( self, model: dict, client_secret="", grant_type="password", is_broker=False, is_repo=False, is_broker_rest=True, username=None, password=None, ): super(Thing, self).__init__() self.__model = model self.__thing_id = model.get("thingId", "") self.__policy_id = model.get("policyId", "") self.__grant_type = grant_type self.__username = username self.__password = password self.__client_secret = client_secret self.__is_broker = is_broker self.__is_broker_rest = is_broker_rest self.__is_repo = is_repo self.__access_token = "" self.__endpoint = "" self.__ws_connected = False self.broker = None self.ws = None self.dir = None attributes = model.get("attributes", None) self.__name = "" self.__roles = {} self.__features = {} if attributes: self.__name = attributes.get("name", "") @property def model(self): return self.__model @property def features(self): return self.__features @features.setter def features(self, value): self.__features = value @property def roles(self): return self.__roles @roles.setter def roles(self, value): self.__roles = value @property def client_secret(self): return self.__client_secret @property def grant_type(self): return self.__grant_type @property def access_token(self): return self.__access_token @property def name(self): return self.__name @property def thing_id(self): return self.__thing_id @property def policy_id(self): return self.__policy_id def run_forever(self): print("[S³I]: Launch {}{}{}".format(BColors.OKGREEN, self.name, BColors.ENDC)) self.__connect_with_idp() def __connect_with_idp(self): print(BColors.OKBLUE + "[S³I][IdP]" + BColors.ENDC + ": Connect with S3I-IdentityProvider") idp = IdentityProvider( grant_type=self.__grant_type, client_id=self.__thing_id, username=self.__username, password=self.__password, client_secret=self.__client_secret, realm=BaseVariable.IDP_REALM, identity_provider_url=BaseVariable.IDP_URL, ) # This may take a while so fetch token directly. idp.run_forever(token_type=TokenType.ACCESS_TOKEN, on_new_token=self.__on_token) def __on_token(self, token): self.__access_token = token self.__connect_with_dir() if self.__is_broker: self.__connect_with_broker() if self.__is_repo: self.__connect_with_repo() def __connect_with_dir(self): print( BColors.OKBLUE + "[S³I][Dir]" + BColors.ENDC + ": Connect with S3I-Directory" ) self.dir = Directory(s3i_dir_url="https://dir.s3i.vswf.dev/api/2/", token=self.__access_token) def __connect_with_repo(self): print( BColors.OKBLUE + "[S³I][Repo]" + BColors.ENDC + ": Connect with S3I-Repository" ) self._ws = websocket.WebSocketApp( BaseVariable.REPO_WWS_URL, header={"Authorization: Bearer {}".format(self.__access_token)}, on_message=self.__on_new_websocket_message, on_error=self.__on_new_websocket_error, on_open=self.__on_websocket_connection_opened, on_close=self.__on_websocket_connection_closed, ) threading.Thread(target=self._ws.run_forever).start() @staticmethod def __on_new_websocket_message(ws, msg): pass @staticmethod def __on_new_websocket_error(ws, error): print(BColors.OKBLUE + "[S³I][Repo]" + BColors.ENDC + " : Websocekt error") def __on_websocket_connection_opened(self): self.__ws_connected = True self._ws.send("START-SEND-MESSAGES") print( BColors.OKBLUE + "[S³I][Repo]" + BColors.ENDC + ": Websocket connection built" ) def __on_websocket_connection_closed(self): self.__ws_connected = False print( BColors.OKBLUE + "[S³I][Repo]" + BColors.ENDC + ": Websocket connection closed" ) def sync_with_repo(self, path, topic): if not self.__ws_connected: return None msg = {"topic": topic, "path": path, "value": self.fml40_data_model[path]} self._ws.send(json.dumps(msg)) def __connect_with_broker(self): print(BColors.OKBLUE + "[S³I][Broker]" + BColors.ENDC + ": Connect with S3I-Broker") self.__endpoint = find_broker_endpoint(self.dir, thing_id=self.thing_id) if self.__is_broker_rest: self.broker = BrokerREST(token=self.access_token) def receive(): while True: msg_str = self.broker.receive_once(self.__endpoint) if msg_str == "": continue else: self.__on_broker_callback(ch=None, method=None, properties=None, body=json.loads(msg_str)) threading.Thread( target=receive, ).start() else: self.broker = Broker( auth_form="Username/Password", username=" ", password=self.__access_token, host=BaseVariable.BROKER_HOST, ) threading.Thread( target=self.broker.receive, args=(self.__endpoint, self.__on_broker_callback), ).start() def __on_broker_callback(self, ch, method, properties, body): if isinstance(body, bytes): body = ast.literal_eval(body.decode("utf-8")) elif isinstance(body, int): print(body) return elif isinstance(body, str): print(body) return message_type = body.get("messageType") if message_type == "userMessage": self.on_user_message(body) elif message_type == "serviceRequest": self.on_service_request(body) elif message_type == "getValueRequest": self.on_get_value_request(body) elif message_type == "getValueReply": self.on_get_value_reply(body) elif message_type == "serviceReply": self.on_service_reply(body) else: ### TODO send user message reply back pass def on_user_message(self, msg): pass def on_get_value_request(self, msg): # TODO implement get value request pass def on_service_request(self, body_json): service_type = body_json.get("serviceType") parameters = body_json.get("parameters") service_functionality = service_type.split('/')[0] service_functionality_obj = self.features.get(service_functionality, None) if service_functionality_obj is None: APP_LOGGER.critical( "Functionality %s is not one of the built-in functionalities in %s!" % ( service_functionality, self.name) ) else: pass # TODO: Call right functionality. func_proxy = service_functionality_obj.proxy() method = getattr(func_proxy, service_type.split('/')[1]) result = method(**parameters).get() if isinstance(result, bool): results = {"ok": result} else: results = result service_reply = ServiceReply() service_reply.fillServiceReply( senderUUID=self.thing_id, receiverUUID=body_json.get("sender", None), serviceType=body_json.get("serviceType", None), results=results, replyingToUUID=body_json.get("identifier", None), msgUUID="s3i:{}".format(uuid.uuid4()) ) receiver_eps = list() for r in service_reply.msg.get("receivers", None): receiver_ep = find_broker_endpoint(self.dir, r) receiver_eps.append(receiver_ep) self.broker.send(receiver_endpoints=receiver_eps, msg=json.dumps(service_reply.msg)) def on_get_value_reply(self, msg): print(msg) def on_service_reply(self, msg): print(msg) def add_function_impl(self, impl_actor, feature_name): feature = self.__features.get(feature_name, None) if feature is None: APP_LOGGER.critical( "Functionality %s is not one of the build-in functionalities" % feature_name ) else: self.__features[feature_name] = impl_actor.start(feature_name, self.actor_ref)