import ast import threading import json import uuid from s3i import IdentityProvider, TokenType, GetValueReply, Directory, Repository from s3i.broker import Broker, BrokerREST from s3i.messages import ServiceReply from ml.tools import BColors from ml.tools import find_broker_endpoint from ml.app_logger import APP_LOGGER import time import copy class BaseVariable(object): IDP_URL = "https://idp.s3i.vswf.dev/" IDP_REALM = "KWH" BROKER_HOST = "rabbitmq.s3i.vswf.dev" REPO_WWS_URL = "wss://ditto.s3i.vswf.dev/ws/2" REPO_URL = "https://ditto.s3i.vswf.dev/api/2/" DIR_URL = "https://dir.s3i.vswf.dev/api/2/" class Thing: def __init__( self, model: dict, client_secret="", grant_type="password", is_broker=False, is_repo=False, is_broker_rest=True, username=None, password=None, ): self.__model = model self.__thing_id = model.get("thingId", "") self.__policy_id = model.get("policyId", "") self.__grant_type = grant_type self.__username = username self.__password = password self.__client_secret = client_secret self.__is_broker = is_broker self.__is_broker_rest = is_broker_rest self.__is_repo = is_repo self.__access_token = "" self.__endpoint = "" self.__ws_connected = False self.broker = None self.ws = None self.dir = None self.repo = None self.repo_json = dict() self.dir_json = dict() self.dt_json = dict() attributes = model.get("attributes", None) self.__name = "" self.__roles = {} self.__features = {} self.__resGetValue = list() if attributes: self.__name = attributes.get("name", "") @property def model(self): return self.__model @property def features(self): return self.__features @features.setter def features(self, value): self.__features = value @property def roles(self): return self.__roles @roles.setter def roles(self, value): self.__roles = value @property def client_secret(self): return self.__client_secret @property def grant_type(self): return self.__grant_type @property def access_token(self): return self.__access_token @property def name(self): return self.__name @property def thing_id(self): return self.__thing_id @property def policy_id(self): return self.__policy_id def run_forever(self): print("[S³I]: Launch {}{}{}".format(BColors.OKGREEN, self.name, BColors.ENDC)) self.__connect_with_idp() threading.Thread(target=self.__json_syn).start() threading.Thread(target=self.__dir_syn).start() if self.__is_repo: threading.Thread(target=self.__repo_syn).start() @staticmethod def add_user_def(func): threading.Thread(target=func).start() def __json_syn(self, freq=0.1): while True: try: time.sleep(freq) self.to_json() except: continue def __dir_syn(self, freq=0.1): while True: try: time.sleep(freq) old_dir_json = self.dir_json self.to_dir_json() if self.dir_json == old_dir_json: continue else: self.dir.updateThingIDBased(thingID=self.thing_id, payload=self.dir_json) except: continue def __repo_syn(self, freq=0.1): while self.__is_repo: try: time.sleep(freq) old_repo_json = self.repo_json self.to_repo_json() if self.repo_json == old_repo_json: continue else: self.repo.updateThingIDBased(thingID=self.thing_id, payload=self.repo_json) except: continue def __connect_with_idp(self): print(BColors.OKBLUE + "[S³I][IdP]" + BColors.ENDC + ": Connect with S3I IdentityProvider") idp = IdentityProvider( grant_type=self.__grant_type, client_id=self.__thing_id, username=self.__username, password=self.__password, client_secret=self.__client_secret, realm=BaseVariable.IDP_REALM, identity_provider_url=BaseVariable.IDP_URL, ) # This may take a while so fetch token directly. idp.run_forever(token_type=TokenType.ACCESS_TOKEN, on_new_token=self.__on_token) def __on_token(self, token): self.__access_token = token self.__connect_with_dir() self.__connect_with_repo() if self.__is_broker: self.__connect_with_broker() def __connect_with_dir(self): print( BColors.OKBLUE + "[S³I][Dir]" + BColors.ENDC + ": Connect with S3I Directory" ) self.dir = Directory(s3i_dir_url=BaseVariable.DIR_URL, token=self.__access_token) def __connect_with_repo(self): print( BColors.OKBLUE + "[S³I][Repo]" + BColors.ENDC + ": Connect with S3I Repository" ) self.repo = Repository(s3i_repo_url=BaseVariable.REPO_URL, token=self.__access_token) def __connect_with_broker(self): print(BColors.OKBLUE + "[S³I][Broker]" + BColors.ENDC + ": Connect with S3I Broker") self.__endpoint = find_broker_endpoint(self.dir, thing_id=self.thing_id) if self.__is_broker_rest: self.broker = BrokerREST(token=self.access_token) def receive(): while True: try: time.sleep(0.1) msg_str = self.broker.receive_once(self.__endpoint) if msg_str == "": continue else: self.__on_broker_callback(ch=None, method=None, properties=None, body=json.loads(msg_str)) except: continue threading.Thread( target=receive ).start() else: self.broker = Broker( auth_form="Username/Password", username=" ", password=self.__access_token, host=BaseVariable.BROKER_HOST, ) threading.Thread( target=self.broker.receive, args=(self.__endpoint, self.__on_broker_callback), ).start() def __on_broker_callback(self, ch, method, properties, body): if isinstance(body, bytes): body = ast.literal_eval(body.decode("utf-8")) elif isinstance(body, int): return elif isinstance(body, str): return message_type = body.get("messageType") if message_type == "userMessage": self.on_user_message(body) elif message_type == "serviceRequest": self.on_service_request(body) elif message_type == "getValueRequest": self.on_get_value_request(body) elif message_type == "getValueReply": self.on_get_value_reply(body) elif message_type == "serviceReply": self.on_service_reply(body) else: ### TODO send user message reply back pass def on_user_message(self, msg): print( BColors.OKBLUE + "[S³I][Broker]" + BColors.ENDC + ": You have received a S³I-B UserMessage" + json.dumps(msg, indent=2) ) def on_get_value_request(self, msg): print( BColors.OKBLUE + "[S³I][Broker]" + BColors.ENDC + ": You have received a S³I-B GetValueRequest" + json.dumps(msg, indent=2) ) get_value_reply = GetValueReply() request_sender = msg.get("sender") request_msg_id = msg.get("identifier") request_sender_endpoint = msg.get("replyToEndpoint") attribute_path = msg.get("attributePath") reply_msg_uuid = "s3i:" + str(uuid.uuid4()) try: print( BColors.OKBLUE + "[S³I]" + BColors.ENDC + ": Search the attribute with path: " + attribute_path ) value = self._uriToData(attribute_path) except KeyError: value = "invalid attribute path" get_value_reply.fillGetValueReply( senderUUID=self.thing_id, receiverUUID=[request_sender], results=value, msgUUID=reply_msg_uuid, replyingToUUID=request_msg_id, ) res = self.broker.send( receiver_endpoints=[request_sender_endpoint], msg=json.dumps(get_value_reply.msg), ) if res.status_code == 201: print( BColors.OKBLUE + "[S³I][Broker]" + BColors.ENDC + ": Send a S³I-B GetValueReply back to the requester." ) def _uriToData(self, uri): if uri == "": return self.dt_json else: uri_list = uri.split("/") if uri_list[0] == "features": try: return self.dt_json[uri] except KeyError: return "invalid attribute path" try: self._getValue(self.dt_json, uri_list) except: return "invalid attribute path" if self.__resGetValue.__len__() == 0: return "invalid attribute path" response = copy.deepcopy(self.__resGetValue) self.__resGetValue.clear() if response.__len__() == 1: return response[0] else: return response def _getValue(self, source, uri_list): value = source[uri_list[0]] if uri_list.__len__() == 1: # if is ditto-feature if isinstance(value, str): try: stringValue_split = value.split(":") if stringValue_split[0] == "ditto-feature": value = self.dt_json["features"][stringValue_split[1]]["properties"][uri_list[0]] except: pass self.__resGetValue.append(value) return if isinstance(value, dict): del uri_list[0] self._getValue(value, uri_list) if isinstance(value, list): if isinstance(value[0], (str, int, float, bool, list)): return value if isinstance(value[0], dict): for item in value: if item["class"] == "ml40::Thing": for i in item["roles"]: if self._findValue(i, uri_list[1]): uri_list_1 = copy.deepcopy(uri_list) del uri_list_1[:2] self._getValue(item, uri_list_1) else: if self._findValue(item, uri_list[1]): uri_list_1 = copy.deepcopy(uri_list) del uri_list_1[:2] if not uri_list_1: self.__resGetValue.append(item) return else: self._getValue(item, uri_list_1) if isinstance(value, (str, int, float, bool)): # if is ditto-feature if isinstance(value, str): try: stringValue_split = value.split(":") if stringValue_split[0] == "ditto-feature": value = self.dt_json["features"][stringValue_split[1]][ "properties" ][uri_list[0]] except: pass self.__resGetValue.append(value) def _findValue(self, json, value): for item in json: if json[item] == value: # print("Parameter: ", json[item]) return True return False def on_service_request(self, body_json): print( BColors.OKBLUE + "[S³I][Broker]" + BColors.ENDC + ": You have received a S³I-B ServiceRequest " + json.dumps(body_json, indent=2) ) service_type = body_json.get("serviceType") parameters = body_json.get("parameters") service_functionality = service_type.split('/')[0] service_functionality_obj = self.features.get(service_functionality) if service_functionality_obj is None: APP_LOGGER.critical( "Functionality %s is not one of the built-in functionalities in %s!" % ( service_functionality, self.name) ) else: # TODO: Call right functionality. print( BColors.OKBLUE + "[S³I][Broker]" + BColors.ENDC + ": Execute the function {0} of the class {1}.".format(service_type.split('/')[1], service_type.split('/')[0]) ) method = getattr(service_functionality_obj, service_type.split('/')[1]) result = method(**parameters) if isinstance(result, bool): result = {"ok": result} service_reply = ServiceReply() service_reply.fillServiceReply( senderUUID=self.thing_id, receiverUUID=body_json.get("sender", None), serviceType=body_json.get("serviceType", None), results=result, replyingToUUID=body_json.get("identifier", None), msgUUID="s3i:{}".format(uuid.uuid4()) ) res = self.broker.send(receiver_endpoints=[body_json.get("replyToEndpoint", None)], msg=json.dumps(service_reply.msg)) if res.status_code == 201: print( BColors.OKBLUE + "[S³I][Broker]" + BColors.ENDC + ": Send a S³I-B ServiceReply back to the requester " ) def on_get_value_reply(self, msg): print( BColors.OKBLUE + "[S³I][Broker]" + BColors.ENDC + ": You have received a S³I-B GetValueReply" + json.dumps(msg, indent=2) ) value = msg.get("value", None) if isinstance(value, dict): value = json.dumps(value, indent=2) print( BColors.OKBLUE + "[S³I][Broker]" + BColors.ENDC + ": The queried value is: {0}{1}{2}".format(BColors.OKGREEN, value, BColors.ENDC) ) def on_service_reply(self, msg): print( BColors.OKBLUE + "[S³I][Broker]" + BColors.ENDC + ": You have received a S³I-B ServiceReply" + json.dumps(msg, indent=2) ) results = msg.get("results", None) if isinstance(results, dict): results = json.dumps(results, indent=2) print( BColors.OKBLUE + "[S³I][Broker]" + BColors.ENDC + ": The result is: {0}{1}{2}".format(BColors.OKGREEN, results, BColors.ENDC) ) def to_dir_json(self): self.dir_json = self.dir.queryThingIDBased(self.thing_id) if self.thing_id is not None: self.dir_json["thingId"] = self.thing_id if self.policy_id is not None: self.dir_json["policyId"] = self.policy_id self.dir_json["attributes"]["class"] = "ml40::Thing" self.dir_json["attributes"]["name"] = self.name if self.roles: self.dir_json["attributes"]["roles"] = list() if self.features: self.dir_json["attributes"]["features"] = list() for key in self.roles.keys(): self.dir_json["attributes"]["roles"].append(self.roles[key].to_json()) for key in self.features.keys(): self.dir_json["attributes"]["features"].append(self.features[key].to_json()) return self.dir_json def to_repo_json(self): self.repo_json = self.dt_json return self.repo_json def to_json(self): self.dt_json = { "thingId": self.thing_id, "policyId": self.policy_id, "attributes": { "class": "ml40::Thing", "name": self.name, } } if self.roles: self.dt_json["attributes"]["roles"] = list() if self.features: self.dt_json["attributes"]["features"] = list() for key in self.roles.keys(): self.dt_json["attributes"]["roles"].append(self.roles[key].to_json()) for key in self.features.keys(): self.dt_json["attributes"]["features"].append(self.features[key].to_json()) return self.dt_json def to_subthing_json(self): json_out = { "class": "ml40::Thing", "name": self.name, "roles": [], "features": [] } for key in self.roles.keys(): json_out["roles"].append(self.roles[key].to_json()) for key in self.features.keys(): json_out["features"].append(self.features[key].to_json()) return json_out