-
Jiahang Chen authoredJiahang Chen authored
Code owners
Assign users and groups as approvers for specific file changes. Learn more.
thing.py 30.78 KiB
"""This module implements the thing class which is the core element of
this package."""
import ast
import threading
import json
import uuid
import time
import copy
from s3i import IdentityProvider, TokenType, GetValueReply, Directory
from s3i import Repository
from s3i.broker import Broker, BrokerREST
from s3i.messages import ServiceReply
from ml.identifier import ID
from ml.tools import BColors
from ml.tools import find_broker_endpoint
from ml.app_logger import APP_LOGGER
class BaseVariable:
"""The BaseVariable class holds various urls to s3i services."""
IDP_URL = "https://idp.s3i.vswf.dev/"
IDP_REALM = "KWH"
BROKER_HOST = "rabbitmq.s3i.vswf.dev"
REPO_WWS_URL = "wss://ditto.s3i.vswf.dev/ws/2"
REPO_URL = "https://ditto.s3i.vswf.dev/api/2/"
DIR_URL = "https://dir.s3i.vswf.dev/api/2/"
class Thing:
"""The thing class represents a customizable runtime environment for
operating Digital Twins complying the Forest Modeling Language (fml40)."""
def __init__(
self,
model: dict,
client_secret="",
grant_type="password",
is_broker=False,
is_repo=False,
is_broker_rest=True,
username=None,
password=None,
):
"""
Constructor
:param model: edge-device or S³I Repository specified JSON entry, like config file for Digital Twins
:type model: dict
:param client_secret: OAuth 2.0 specified client secret, generated in the S³I IdentityProvider
:type client_secret: str
:param grant_type: OAuth 2.0 specified grant type to issue a JWT. Here the grant type can be password or client_credentials
:type grant_type: str
:param is_broker: whether broker interface is enabled in the ml40::thing instance
:type is_broker: bool
:param is_broker_rest: Whether the connection with the S³I Broker is established via HTTP REST
:type is_broker_rest: bool
:param is_repo: Whether the thing uses the S³I Repository to launch its Digital Twin in the cloud
:type is_repo: bool
:param username: OAuth 2.0 specified username, registered in the S³I IdentityProvider. If the grant_type is set as password, the username is required
:type username: str
:param password: OAuth 2.0 specified password, registered in the S³I IdentityProvider. If the grant_type is set as password, the password is required
"""
self.__model = model
self.__thing_id = model.get("thingId", "")
self.__policy_id = model.get("policyId", "")
self.__grant_type = grant_type
self.__username = username
self.__password = password
self.__client_secret = client_secret
self.__is_broker = is_broker
self.__is_broker_rest = is_broker_rest
self.__is_repo = is_repo
self.__access_token = ""
self.__endpoint = ""
self.__ws_connected = False
self.broker = None
self.ws = None
# TODO: Change the variable name dir, because it is a builtin
# name.
self.dir = None
self.repo = None
# : Dict[<str>, <str>]
self.repo_json = dict()
self.dir_json = dict()
self.dt_json = dict()
attributes = model.get("attributes", None)
self.__name = ""
self.__roles = {}
self.__features = {}
self.__ditto_features = {}
# ??? Is this property necessary? Only used as return value in _getValue()
self.__resGetValue = list()
if attributes:
self.__name = attributes.get("name", "")
@property
def model(self):
"""Returns the specification JSON from which this thing has been constructed from.
:returns: Representation of a ml40 compliant thing.
:rtype: dict
"""
return self.__model
@property
def ditto_features(self):
return self.__ditto_features
@ditto_features.setter
def ditto_features(self, value):
self.__ditto_features = value
@property
def features(self):
"""Returns thing's features.
:returns: Features
:rtype: dict
"""
return self.__features
@features.setter
def features(self, value):
"""Replaces thing's features with value.
:param value: New collection of features
"""
self.__features = value
@property
def roles(self):
"""Returns the thing's roles.
:returns: ml40 roles
:rtype: dict
"""
return self.__roles
@roles.setter
def roles(self, value):
"""Replaces thing's roles with value
:param value: New collection of roles
"""
self.__roles = value
@property
def client_secret(self):
"""Returns the client secret.
:returns: Client secret
:rtype: str
"""
return self.__client_secret
@property
def grant_type(self):
"""Returns the method used to obtain JSON Web Tokens from the S³I IdentityProvider
:returns: OAuth2 specified grant type [password, client_credentials]
:rtype: str
"""
return self.__grant_type
@property
def access_token(self):
"""Returns the current JSON Web token.
:returns: JSON Web token
:rtype: str
"""
return self.__access_token
@property
def name(self):
"""Returns the name of this thing.
:returns: name
:rtype: str
"""
return self.__name
@property
def thing_id(self):
"""Returns the identifier of this thing.
:returns: identifier
:rtype: str
"""
return self.__thing_id
@property
def policy_id(self):
"""Returns the identifier of this thing's policy.
:returns: identifier
:rtype: str
"""
return self.__policy_id
def run_forever(self):
"""Starts the thing in permanent mode.
"""
# TODO: Use logger instead!
print("[S³I]: Launch {}{}{}".format(BColors.OKGREEN, self.name, BColors.ENDC))
self.__connect_with_idp()
threading.Thread(target=self.__json_syn).start()
threading.Thread(target=self.__dir_syn).start()
if self.__is_repo:
threading.Thread(target=self.__repo_syn).start()
@staticmethod
def add_user_def(func):
"""Insert user-specified function in the thing object.
:param func: external defined function to be executed.
"""
threading.Thread(target=func).start()
def __json_syn(self, freq=0.1):
"""
Applies local changes to the original model in the thing object
:param freq: Frequency of the update
:type freq: float
"""
while True:
try:
time.sleep(freq)
self.to_json()
except:
continue
def __dir_syn(self, freq=0.1):
"""Applies local changes to the directory entry in the cloud.
:param freq: Frequency of the update.
:type freq: float
"""
while True:
# try:
time.sleep(freq)
old_dir_json = self.dir_json
self.to_dir_json()
if self.dir_json == old_dir_json:
continue
else:
self.dir.updateThingIDBased(
thingID=self.thing_id, payload=self.dir_json
)
# except:
# continue
def __repo_syn(self, freq=0.1):
"""Applies local changes to the repository entry in the cloud.
:param freq: Frequency of the update.
:type freq: float
"""
while self.__is_repo:
try:
time.sleep(freq)
old_repo_json = self.repo_json
self.to_repo_json()
# TODO: Clean up this reqion
if self.repo_json == old_repo_json:
continue
else:
self.repo.updateThingIDBased(
thingID=self.thing_id, payload=self.repo_json
)
except:
continue
def __connect_with_idp(self):
"""Establishes a connection to the S³I IdentityProvider which guarantees,
that the JSON web token needed to use s3i services.
be renewed if it has expired.
"""
#TODO: Use logger!
print(
BColors.OKBLUE
+ "[S³I][IdP]"
+ BColors.ENDC
+ ": Connect with S3I IdentityProvider"
)
idp = IdentityProvider(
grant_type=self.__grant_type,
client_id=self.__thing_id,
username=self.__username,
password=self.__password,
client_secret=self.__client_secret,
realm=BaseVariable.IDP_REALM,
identity_provider_url=BaseVariable.IDP_URL,
)
# This may take a while so fetch token directly.
idp.run_forever(token_type=TokenType.ACCESS_TOKEN, on_new_token=self.__on_token)
def __on_token(self, token):
"""Updates the JSON Web Token with token and reestablishes connections
to the s3i services .
:param token: New JSON Web token
:type token: str
"""
self.__access_token = token
self.__connect_with_dir()
self.__connect_with_repo()
if self.__is_broker:
self.__connect_with_broker()
def __connect_with_dir(self):
"""Initializes the property dir with a Directory object which can be
used to access the s3i Directory.
:returns:
:rtype:
"""
# TODO: Use logger
print(
BColors.OKBLUE
+ "[S³I][Dir]"
+ BColors.ENDC
+ ": Connect with S3I Directory"
)
self.dir = Directory(
s3i_dir_url=BaseVariable.DIR_URL, token=self.__access_token
)
def __connect_with_repo(self):
"""Initializes the property repo whit a Repository object which can be
used to access the s3i Repository.
:returns:
:rtype:
"""
# TODO: Use logger
print(
BColors.OKBLUE
+ "[S³I][Repo]"
+ BColors.ENDC
+ ": Connect with S3I Repository"
)
self.repo = Repository(
s3i_repo_url=BaseVariable.REPO_URL, token=self.__access_token
)
def __connect_with_broker(self):
"""Initializes the property broker with a Broker object. Additionally
a callback function is registered which handles incoming S³I-B Messages
messages.
"""
# TODO: Use logger
print(
BColors.OKBLUE
+ "[S³I][Broker]"
+ BColors.ENDC
+ ": Connect with S3I Broker"
)
self.__endpoint = find_broker_endpoint(self.dir, thing_id=self.thing_id)
if self.__is_broker_rest:
self.broker = BrokerREST(token=self.access_token)
def receive():
while True:
try:
time.sleep(0.1)
msg_str = self.broker.receive_once(self.__endpoint)
if msg_str == "":
continue
else:
self.__on_broker_callback(
ch=None,
method=None,
properties=None,
body=json.loads(msg_str),
)
except:
continue
threading.Thread(target=receive).start()
else:
self.broker = Broker(
auth_form="Username/Password",
username=" ",
password=self.__access_token,
host=BaseVariable.BROKER_HOST,
)
threading.Thread(
target=self.broker.receive,
args=(self.__endpoint, self.__on_broker_callback),
).start()
def __on_broker_callback(self, ch, method, properties, body):
"""Parses body (content of a S3I-B message) and delegates the
processing of the message to a separate method. The method is
selected according to the message's type.
:param body: S3I-B message
"""
if isinstance(body, bytes):
body_str = body.decode('utf8').replace("'", '"')
try:
body = json.loads(body_str)
except ValueError:
pass
elif isinstance(body, int):
return
elif isinstance(body, str):
return
ch.basic_ack(method.delivery_tag)
message_type = body.get("messageType")
if message_type == "userMessage":
self.on_user_message(body)
elif message_type == "serviceRequest":
self.on_service_request(body)
elif message_type == "getValueRequest":
self.on_get_value_request(body)
elif message_type == "getValueReply":
self.on_get_value_reply(body)
elif message_type == "serviceReply":
self.on_service_reply(body)
else:
### TODO send user message reply back
pass
def on_user_message(self, msg):
"""Handles incoming S³I-B UserMessages.
:param msg: S³I-B UserMessages
"""
print(
BColors.OKBLUE
+ "[S³I][Broker]"
+ BColors.ENDC
+ ": You have received a S³I-B UserMessage"
+ json.dumps(msg, indent=2)
)
def on_get_value_request(self, msg):
"""Handles incoming GetValueRequest message. Looks up the value specified in msg and
sends a GetValueReply message back to the sender.
:param msg: GetValueRequest
"""
print(
BColors.OKBLUE
+ "[S³I][Broker]"
+ BColors.ENDC
+ ": You have received a S³I-B GetValueRequest"
+ json.dumps(msg, indent=2)
)
get_value_reply = GetValueReply()
request_sender = msg.get("sender")
request_msg_id = msg.get("identifier")
request_sender_endpoint = msg.get("replyToEndpoint")
attribute_path = msg.get("attributePath")
reply_msg_uuid = "s3i:" + str(uuid.uuid4())
try:
print(
BColors.OKBLUE
+ "[S³I]"
+ BColors.ENDC
+ ": Search the attribute with path: "
+ attribute_path
)
value = self._uriToData(attribute_path)
except KeyError:
value = "invalid attribute path"
APP_LOGGER.critical(value)
get_value_reply.fillGetValueReply(
senderUUID=self.thing_id,
receiverUUIDs=[request_sender],
results=value,
msgUUID=reply_msg_uuid,
replyingToUUID=request_msg_id,
)
res = self.broker.send(
receiver_endpoints=[request_sender_endpoint],
msg=json.dumps(get_value_reply.msg),
)
if self.__is_broker_rest:
if res.status_code == 201:
print(
BColors.OKBLUE
+ "[S³I][Broker]"
+ BColors.ENDC
+ ": Send a S³I-B GetValueReply back to the requester "
)
else:
print(BColors.OKBLUE
+ "[S³I][Broker]"
+ BColors.ENDC
+ res.text)
def _uriToData(self, uri):
"""Returns a copy of the value found at uri.
:param uri: Path to value
:rtype: Feature
"""
if uri == "":
return self.dt_json
else:
uri_list = uri.split("/")
if uri_list[0] == "features":
try:
return self.dt_json[uri]
except KeyError:
return "invalid attribute path"
try:
self._getValue(self.dt_json, uri_list)
except:
return "invalid attribute path"
if self.__resGetValue.__len__() == 0:
return "invalid attribute path"
response = copy.deepcopy(self.__resGetValue)
self.__resGetValue.clear()
if response.__len__() == 1:
return response[0]
else:
return response
def _getValue(self, source, uri_list):
"""Searches for the value specified by uri_list in source and stores
the result in __resGetValue.
:param source: Object that is scanned
:param uri_list: List containing path
"""
# ??? What if the uri points to a Value object?
# Shouldn't it be serialized?!
value = source[uri_list[0]]
if uri_list.__len__() == 1:
# if is ditto-feature
if isinstance(value, str):
try:
stringValue_split = value.split(":")
if stringValue_split[0] == "ditto-feature":
value = self.dt_json["features"][stringValue_split[1]][
"properties"
][uri_list[0]]
except:
pass
self.__resGetValue.append(value)
return
if isinstance(value, dict):
# ??? uri_list.pop(0) better?!
del uri_list[0]
self._getValue(value, uri_list)
if isinstance(value, list):
if isinstance(value[0], (str, int, float, bool, list)):
return value
if isinstance(value[0], dict):
for item in value:
if item["class"] == "ml40::Thing":
for i in item["roles"]:
if self._findValue(i, uri_list[1]):
uri_list_1 = copy.deepcopy(uri_list)
del uri_list_1[:2]
self._getValue(item, uri_list_1)
else:
if self._findValue(item, uri_list[1]):
uri_list_1 = copy.deepcopy(uri_list)
del uri_list_1[:2]
if not uri_list_1:
self.__resGetValue.append(item)
return
else:
self._getValue(item, uri_list_1)
if isinstance(value, (str, int, float, bool)):
# if is ditto-feature
if isinstance(value, str):
try:
stringValue_split = value.split(":")
if stringValue_split[0] == "ditto-feature":
value = self.dt_json["features"][stringValue_split[1]][
"properties"
][uri_list[0]]
except:
pass
self.__resGetValue.append(value)
def _findValue(self, json, value):
"""Returns true if value has been found in json, otherwise returns false.
:param json: dictionary
:param value:
:returns:
:rtype:
"""
# TODO: Simplify: value in json.values()
for item in json:
if json[item] == value:
# print("Parameter: ", json[item])
return True
return False
def on_service_request(self, body_json):
"""Handles S³I-B ServiceRequests. Executes the method of the
functionality specified in serviceType and send a ServiceReply
back to the sender.
:param body_json: ServiceRequest
"""
print(
BColors.OKBLUE
+ "[S³I][Broker]"
+ BColors.ENDC
+ ": You have received a S³I-B ServiceRequest "
+ json.dumps(body_json, indent=2)
)
service_type = body_json.get("serviceType")
parameters = body_json.get("parameters")
service_reply = ServiceReply()
service_functionality = service_type.split('/')[0]
service_functionality_obj = self.features.get(service_functionality)
if service_functionality_obj is None:
APP_LOGGER.critical(
"Functionality %s is not one of the built-in functionalities in %s!"
% (service_functionality, self.name)
)
service_reply.fillServiceReply(
senderUUID=self.thing_id,
receiverUUIDs=[body_json.get("sender", None)],
serviceType=body_json.get("serviceType", None),
results={"error": "invalid functionalities (serviceType) {}".format(service_functionality)},
replyingToUUID=body_json.get("identifier", None),
msgUUID="s3i:{}".format(uuid.uuid4())
)
else:
# TODO: Call right functionality.
try:
method = getattr(service_functionality_obj, service_type.split('/')[1])
except AttributeError:
APP_LOGGER.critical(
"Functionality %s is not one of the built-in functionalities in %s!" % (
service_type.split('/')[1], self.name)
)
service_reply.fillServiceReply(
senderUUID=self.thing_id,
receiverUUIDs=[body_json.get("sender", None)],
serviceType=body_json.get("serviceType", None),
results={"error": "invalid functionalities (serviceType) {}".format(service_type.split('/')[1])},
replyingToUUID=body_json.get("identifier", None),
msgUUID="s3i:{}".format(uuid.uuid4())
)
else:
print(
BColors.OKBLUE
+ "[S³I][Broker]"
+ BColors.ENDC
+ ": Execute the function {0} of the class {1}.".format(service_type.split('/')[1],
service_type.split('/')[0])
)
try:
result = method(**parameters)
except TypeError:
APP_LOGGER.critical("Invalid function arguments")
service_reply.fillServiceReply(
senderUUID=self.thing_id,
receiverUUIDs=[body_json.get("sender", None)],
serviceType=body_json.get("serviceType", None),
results={"error": "invalid function arguments (parameters)"},
replyingToUUID=body_json.get("identifier", None),
msgUUID="s3i:{}".format(uuid.uuid4())
)
else:
if isinstance(result, bool):
result = {"ok": result}
service_reply.fillServiceReply(
senderUUID=self.thing_id,
receiverUUIDs=[body_json.get("sender", None)],
serviceType=body_json.get("serviceType", None),
results=result,
replyingToUUID=body_json.get("identifier", None),
msgUUID="s3i:{}".format(uuid.uuid4())
)
res = self.broker.send(receiver_endpoints=[body_json.get("replyToEndpoint", None)],
msg=json.dumps(service_reply.msg))
if self.__is_broker_rest:
if res.status_code == 201:
print(
BColors.OKBLUE
+ "[S³I][Broker]"
+ BColors.ENDC
+ ": Send a S³I-B ServiceReply back to the requester "
)
else:
print( BColors.OKBLUE
+ "[S³I][Broker]"
+ BColors.ENDC
+ res.text)
def on_get_value_reply(self, msg):
"""Handles incoming S³I-B GetValueReply. Prints the content of msg to stdout.
:param msg: GetValueReply
"""
# ???: Behavior should be defined by the user! Maybe he want
# to process the result!
print(
BColors.OKBLUE
+ "[S³I][Broker]"
+ BColors.ENDC
+ ": You have received a S³I-B GetValueReply"
+ json.dumps(msg, indent=2)
)
value = msg.get("value", None)
if isinstance(value, dict):
value = json.dumps(value, indent=2)
print(
BColors.OKBLUE
+ "[S³I][Broker]"
+ BColors.ENDC
+ ": The queried value is: {0}{1}{2}".format(
BColors.OKGREEN, value, BColors.ENDC
)
)
def on_service_reply(self, msg):
"""Handles incoming S³I-B ServiceReply. Prints the content of msg to stdout.
:param msg: ServiceReply
"""
print(
BColors.OKBLUE
+ "[S³I][Broker]"
+ BColors.ENDC
+ ": You have received a S³I-B ServiceReply"
+ json.dumps(msg, indent=2)
)
results = msg.get("results", None)
if isinstance(results, dict):
results = json.dumps(results, indent=2)
print(
BColors.OKBLUE
+ "[S³I][Broker]"
+ BColors.ENDC
+ ": The result is: {0}{1}{2}".format(
BColors.OKGREEN, results, BColors.ENDC
)
)
def to_dir_json(self):
"""Returns a dictionary representing this thing's directory entry.
:returns: Directory representation of this object
:rtype: dict
"""
self.dir_json = self.dir.queryThingIDBased(self.thing_id)
if self.thing_id is not None:
self.dir_json["thingId"] = self.thing_id
if self.policy_id is not None:
self.dir_json["policyId"] = self.policy_id
if self.name is not None:
self.dir_json["attributes"]["name"] = self.name
if self.features.get("ml40::Location") is not None:
self.dir_json["attributes"]["location"] = {
"longitude": self.features.get("ml40::Location").to_json()["longitude"],
"latitude": self.features.get("ml40::Location").to_json()["latitude"]
}
self.dir_json["attributes"]["dataModel"] = "fml40"
self.dir_json["attributes"]["thingStructure"] = {
"class": "ml40::Thing",
"links": []
}
for key in self.roles.keys():
role_entry = {
"association": "roles",
"target": self.roles[key].to_json()
}
self.dir_json["attributes"]["thingStructure"]["links"].append(role_entry)
for key in self.features.keys():
feature_target = {
"class": self.features[key].to_json()["class"],
}
if self.features[key].to_json().get("identifier") is not None:
feature_target["identifier"] = self.features[key].to_json()["identifier"]
feature_entry = {"association": "features", "target": feature_target}
# if the feature has targets, like ml40::Composite
if hasattr(self.features[key], "targets"):
feature_entry["target"]["links"] = list()
for target in self.features[key].targets.keys():
target_json = (
self.features[key].targets[target].to_subthing_dir_json()
)
feature_entry["target"]["links"].append(target_json)
self.dir_json["attributes"]["thingStructure"]["links"].append(feature_entry)
return self.dir_json
def to_repo_json(self):
"""Returns a dictionary representing this thing's repository entry.
:returns: Repository representation of this object
:rtype: dict
"""
self.repo_json = self.dt_json
return self.repo_json
def to_json(self):
"""Returns a dictionary representing this thing in it's current state.
:returns: Representation of this object
:rtype: dict
"""
self.dt_json = {
"thingId": self.thing_id,
"policyId": self.policy_id,
"attributes": {
"class": "ml40::Thing",
"name": self.name,
},
}
if self.roles:
self.dt_json["attributes"]["roles"] = list()
if self.features:
self.dt_json["attributes"]["features"] = list()
if self.ditto_features:
self.dt_json["features"] = dict()
for key in self.roles.keys():
self.dt_json["attributes"]["roles"].append(self.roles[key].to_json())
for key in self.features.keys():
self.dt_json["attributes"]["features"].append(self.features[key].to_json())
for key in self.ditto_features.keys():
self.dt_json["features"][key] = self.ditto_features[key].to_json()
return self.dt_json
def to_subthing_json(self):
"""Returns a dictionary representing this thing in it's current state
as a subordinate thing. This representation should be used for
subordinate things in s3i repository entries.
:returns: Representation of this object as a subordinate thing
:rtype: dict
"""
json_out = {
"class": "ml40::Thing",
"name": self.name,
"roles": [],
"features": [],
}
if self.thing_id:
json_out["identifier"] = self.thing_id
for key in self.roles.keys():
json_out["roles"].append(self.roles[key].to_json())
for key in self.features.keys():
json_out["features"].append(self.features[key].to_json())
return json_out
def to_subthing_dir_json(self):
"""Returns a dictionary representing this thing in it's current state
as a subordinate thing. This representation should be used for
subordinate things in s3i directory entries.
:returns: Representation of this object as a subordinate thing.
:rtype: dict
"""
json_out = {"class": "ml40::Thing", "links": []}
if self.thing_id:
json_out["identifier"] = self.thing_id
for key in self.roles.keys():
role_entry = {"association": "roles", "target": self.roles[key].to_json()}
json_out["links"].append(role_entry)
for key in self.features.keys():
feature_target = {
"class": self.features[key].to_json()["class"],
}
if self.features[key].to_json().get("identifier") is not None:
feature_target["identifier"] = self.features[key].to_json()["identifier"]
feature_entry = {"association": "features", "target": feature_target}
json_out["links"].append(feature_entry)
return json_out