Select Git revision
Code owners
Assign users and groups as approvers for specific file changes. Learn more.
thing.py 10.54 KiB
import ast
import threading
import websocket
import json
import uuid
import requests
from s3i import IdentityProvider, TokenType, GetValueReply, Directory
from s3i.broker import Broker, BrokerREST, BrokerMetaClass
from s3i.messages import ServiceReply
from ml.managing_actor import ManagingActor
from ml.tools import BColors
from ml.tools import find_broker_endpoint
from ml.app_logger import APP_LOGGER
class BaseVariable(object):
IDP_URL = "https://idp.s3i.vswf.dev/"
IDP_REALM = "KWH"
BROKER_HOST = "rabbitmq.s3i.vswf.dev"
REPO_WWS_URL = "wss://ditto.s3i.vswf.dev/ws/2"
REPO_URL = "https://ditto.s3i.vswf.dev/api/2/"
DIR_URL = "https://dir.s3i.vswf.dev/api/2/"
def get_sensor_uuid(body_json):
parameters = body_json.get("parameters", {})
sensor_uuid = parameters.get("sensor_uuid", "")
return sensor_uuid
class Thing(ManagingActor):
def __init__(
self,
client_secret,
model: dict,
grant_type: str,
is_broker: bool,
is_repo: bool,
is_broker_rest=True,
username=None,
password=None,
):
super(Thing, self).__init__()
self.__thing_id = model.get("thingId", "")
self.__policy_id = model.get("policyId", "")
self.__grant_type = grant_type
self.__username = username
self.__password = password
self.__client_secret = client_secret
self.__is_broker = is_broker
self.__is_broker_rest = is_broker_rest
self.__is_repo = is_repo
self.__access_token = ""
self.__endpoint = ""
self.__ws_connected = False
self.broker = None
self.ws = None
self.dir = None
attributes = model.get("attributes", None)
self.__name = ""
self.__class_name = ""
self.__type_name = ""
self.roles = []
self.__features = {}
if attributes:
self.__name = attributes.get("name", "")
self.__class_name = attributes.get("class", "")
self.__type_name = attributes.get("type", "")
@property
def features(self):
return self.__features
@features.setter
def features(self, value):
self.__features = value
@property
def class_name(self):
return self.__class_name
@property
def client_secret(self):
return self.__client_secret
@property
def grant_type(self):
return self.__grant_type
@property
def access_token(self):
return self.__access_token
@property
def name(self):
return self.__name
@property
def thing_id(self):
return self.__thing_id
def run_forever(self):
print("[S³I]: Launch {}{}{}".format(BColors.OKGREEN, self.name, BColors.ENDC))
self.__connect_with_idp()
def __connect_with_idp(self):
print(BColors.OKBLUE + "[S³I][IdP]" + BColors.ENDC + ": Connect with S3I-IdentityProvider")
idp = IdentityProvider(
grant_type=self.__grant_type,
client_id=self.__thing_id,
username=self.__username,
password=self.__password,
client_secret=self.__client_secret,
realm=BaseVariable.IDP_REALM,
identity_provider_url=BaseVariable.IDP_URL,
)
# This may take a while so fetch token directly.
idp.run_forever(token_type=TokenType.ACCESS_TOKEN, on_new_token=self.__on_token)
def __on_token(self, token):
self.__access_token = token
self.__connect_with_dir()
if self.__is_broker:
self.__connect_with_broker()
if self.__is_repo:
self.__connect_with_repo()
def __connect_with_dir(self):
print(
BColors.OKBLUE
+ "[S³I][Dir]"
+ BColors.ENDC
+ ": Connect with S3I-Directory"
)
self.dir = Directory(s3i_dir_url="https://dir.s3i.vswf.dev/api/2/", token=self.__access_token)
def __connect_with_repo(self):
print(
BColors.OKBLUE
+ "[S³I][Repo]"
+ BColors.ENDC
+ ": Connect with S3I-Repository"
)
self._ws = websocket.WebSocketApp(
BaseVariable.REPO_WWS_URL,
header={"Authorization: Bearer {}".format(self.__access_token)},
on_message=self.__on_new_websocket_message,
on_error=self.__on_new_websocket_error,
on_open=self.__on_websocket_connection_opened,
on_close=self.__on_websocket_connection_closed,
)
threading.Thread(target=self._ws.run_forever).start()
@staticmethod
def __on_new_websocket_message(ws, msg):
pass
@staticmethod
def __on_new_websocket_error(ws, error):
print(BColors.OKBLUE + "[S³I][Repo]" + BColors.ENDC + " : Websocekt error")
def __on_websocket_connection_opened(self):
self.__ws_connected = True
self._ws.send("START-SEND-MESSAGES")
print(
BColors.OKBLUE
+ "[S³I][Repo]"
+ BColors.ENDC
+ ": Websocket connection built"
)
def __on_websocket_connection_closed(self):
self.__ws_connected = False
print(
BColors.OKBLUE
+ "[S³I][Repo]"
+ BColors.ENDC
+ ": Websocket connection closed"
)
def sync_with_repo(self, path, topic):
if not self.__ws_connected:
return None
msg = {"topic": topic, "path": path, "value": self.fml40_data_model[path]}
self._ws.send(json.dumps(msg))
def __connect_with_broker(self):
print(BColors.OKBLUE + "[S³I][Broker]" + BColors.ENDC + ": Connect with S3I-Broker")
self.__endpoint = find_broker_endpoint(self.dir, thing_id=self.thing_id)
if self.__is_broker_rest:
self.broker = BrokerREST(token=self.access_token)
def receive():
while True:
msg_str = self.broker.receive_once(self.__endpoint)
if msg_str == "":
continue
else:
self.__on_broker_callback(ch=None, method=None, properties=None,
body=json.loads(msg_str))
threading.Thread(
target=receive,
).start()
else:
self.broker = Broker(
auth_form="Username/Password",
username=" ",
password=self.__access_token,
host=BaseVariable.BROKER_HOST,
)
threading.Thread(
target=self.broker.receive,
args=(self.__endpoint, self.__on_broker_callback),
).start()
def __on_broker_callback(self, ch, method, properties, body):
if isinstance(body, bytes):
body = ast.literal_eval(body.decode("utf-8"))
elif isinstance(body, int):
print(body)
return
elif isinstance(body, str):
print(body)
return
message_type = body.get("messageType")
if message_type == "userMessage":
self.on_user_message(body)
elif message_type == "serviceRequest":
self.on_service_request(body)
elif message_type == "getValueRequest":
self.on_get_value_request(body)
elif message_type == "getValueReply":
self.on_get_value_reply(body)
elif message_type == "serviceReply":
self.on_service_reply(body)
else:
### TODO send user message reply back
pass
def on_user_message(self, msg):
pass
def on_get_value_request(self, msg):
get_value_reply = GetValueReply()
request_sender = msg.get("sender", "")
request_msg_id = msg.get("identifier")
request_sender_endpoint = msg.get("replyToEndpoint")
request_sender_endpoint = f"s3ib://{request_sender_endpoint}"
attribute_path = msg.get("attributePath")
reply_msg_uuid = "s3i:" + str(uuid.uuid4())
# TODO: Get correct value.
value = None
if attribute_path == "features/moisture":
moisture = self.features.get("ml40::Moisture", None)
moisture_proxy = moisture.proxy()
value = moisture_proxy.humidity.get()
get_value_reply.fillGetValueReply(
senderUUID=self.__thing_id,
receiverUUID=[request_sender],
results=value,
msgUUID=reply_msg_uuid,
replyingToUUID=request_msg_id,
)
msg_txt = get_value_reply.msg.__str__()
send_resp = self.broker.send(
receiver_endpoints=[request_sender_endpoint], msg=msg_txt,
)
def on_service_request(self, body_json):
service_type = body_json.get("serviceType")
service_functionality = service_type.split('/')[0]
service_functionality_obj = self.features.get(service_functionality, None)
if service_functionality_obj is None:
APP_LOGGER.critical(
"Functionality %s is not one of the built-in functionalities in %s!" % (
service_functionality, self.name)
)
else:
pass
# TODO: Call right functionality.
func_proxy = service_functionality_obj.proxy()
method = getattr(func_proxy, service_type.split('/')[1])
result = {"ok": method("test_job").get()}
service_reply = ServiceReply()
service_reply.fillServiceReply(
senderUUID=self.thing_id,
receiverUUID=body_json.get("sender", None),
serviceType=body_json.get("serviceType", None),
results=result,
replyingToUUID=body_json.get("identifier", None),
msgUUID="s3i:{}".format(uuid.uuid4())
)
receiver_eps = list()
for r in service_reply.msg.get("receivers", None):
receiver_ep = find_broker_endpoint(self.dir, r)
receiver_eps.append(receiver_ep)
self.broker.send(receiver_endpoints=receiver_eps, msg=json.dumps(service_reply.msg))
def on_get_value_reply(self, msg):
print(msg)
def on_service_reply(self, msg):
print(msg)
def add_function_impl(self, obj, feature_name):
feature = self.__features.get(feature_name, None)
if feature is None:
APP_LOGGER.critical(
"Functionality %s is not one of the build-in functionalities" % feature_name
)
else:
self.__features[feature_name] = obj.start("", self.actor_ref)