Commit 0176df76 authored by Jiahang Chen's avatar Jiahang Chen
Browse files

update ml package

parent f906a51d
Pipeline #338784 passed with stage
in 15 seconds
......@@ -8,14 +8,25 @@ def __init__(self, name, ref_managing_actor):
super(AcceptsFellingJobs, self).__init__(
name=name, ref_managing_actor=ref_managing_actor
)
self.job_list = []
def acceptJob(self, job: FellingJob) -> bool:
APP_LOGGER.info("Checking if the job can be accepted.")
proxy_executer = self.managing_actor.proxy_functionalities.get()[
self.proxy_name
]
proxy_executer.executeJob(job)
return True
APP_LOGGER.info("Checking if the felling job can be accepted.")
return self.add_to_job_list(job)
def queryJobStatus(self, identifier):
pass
def removeJob(self, identifier):
pass
def from_json(self, json_obj):
super().from_json(json_obj)
def add_to_job_list(self, job):
if job not in self.job_list:
self.job_list.append(job)
return True
else:
APP_LOGGER.info("[S³I]: This felling job has been already added in job list")
return False
\ No newline at end of file
......@@ -12,7 +12,7 @@ def __init__(self, name, ref_managing_actor):
self.__jobs = dict()
def acceptJob(self, job: Job) -> bool:
print("I am checking if i can accept job {}".format(job))
pass
def queryJobStatus(self, identifier: ID) -> JobStatus:
pass
......
......@@ -3,7 +3,6 @@
class Job(Document):
def __init__(self, name, ref_managing_actor, status: JobStatus):
def __init__(self, name, ref_managing_actor, status=JobStatus.Pending):
super(Job, self).__init__(name=name,
ref_managing_actor=ref_managing_actor)
self.status = status
from modelling_language.ml40.role.Role import Role
from ml.ml40.roles.role import Role
class HMI(Role):
......
import ast
import threading
import time
import websocket
import json
import os
import uuid
from s3i import IdentityProvider, TokenType, Broker, GetValueReply
from s3i import Directory
from s3i import IdentityProvider, TokenType, Broker, GetValueReply, Directory
from s3i.broker import BrokerREST
from s3i.messages import ServiceReply
# from pynput import keyboard
from ml.managing_actor import ManagingActor
from ml.tools import BColors
from ml.tools import send_request
from ml.tools import send_message
from ml.tools import decode_message
from ml.tools import load_config
from ml.tools import find_broker_endpoint
from ml.app_logger import APP_LOGGER
from ml.tools import create_request
class bcolors:
"""colors for the console log"""
HEADER = '\033[95m'
OKBLUE = '\033[94m'
OKGREEN = '\033[92m'
WARNING = '\033[93m'
FAIL = '\033[91m'
ENDC = '\033[0m'
BOLD = '\033[1m'
UNDERLINE = '\033[4m'
class BaseVariable(object):
......@@ -54,7 +37,8 @@ def __init__(
grant_type: str,
is_broker: bool,
is_repo: bool,
username=None,
is_broker_rest=True,
username=None,
password=None,
):
super(Thing, self).__init__()
......@@ -66,6 +50,7 @@ def __init__(
self.__client_secret = client_secret
self.__is_broker = is_broker
self.__is_broker_rest = is_broker_rest
self.__is_repo = is_repo
self.__access_token = ""
......@@ -97,38 +82,6 @@ def features(self):
def features(self, value):
self.__features = value
def send_message(self, receiver_ids, req):
send_message(self.__access_token, receiver_ids, req)
def send_service_message(self, receiver_ids, service_class_name, parameters):
APP_LOGGER.info(f"Sending message {service_class_name}")
send_request(
self.__access_token,
self.thing_id,
receiver_ids,
service_class_name,
parameters,
)
"""
def on_receive(self, msg, decode=True):
# TODO: Invent a different mechanism for this. Works for now!
# APP_LOGGER.info(f"Receiving message: {message_type}")
body_str = decode_message(msg, decode)
body_json = json.loads(body_str)
print(body_json)
message_type = body_json.get("messageType", "")
if message_type == "serviceRequest":
self.on_service_request(body_json)
elif message_type == "getValueRequest":
self.on_get_value_request(body_json)
elif message_type == "getValueReply":
self.on_process_get_value_reply(body_json)
elif message_type == "serviceReply":
self.on_process_service_reply(body_json)
"""
@property
def class_name(self):
return self.__class_name
......@@ -154,14 +107,11 @@ def thing_id(self):
return self.__thing_id
def run_forever(self):
print("[S³I]: Launch {}{}{}".format(bcolors.OKGREEN, self.name, bcolors.ENDC))
print("[S³I]: Launch {}{}{}".format(BColors.OKGREEN, self.name, BColors.ENDC))
self.__connect_with_idp()
def __on_key_pressed(self, key):
pass
def __connect_with_idp(self):
print(bcolors.OKBLUE + "[S³I][IdP]" + bcolors.ENDC + ": Connect with S3I-IdentityProvider")
print(BColors.OKBLUE + "[S³I][IdP]" + BColors.ENDC + ": Connect with S3I-IdentityProvider")
idp = IdentityProvider(
grant_type=self.__grant_type,
client_id=self.__thing_id,
......@@ -185,18 +135,18 @@ def __on_token(self, token):
def __connect_with_dir(self):
print(
bcolors.OKBLUE
BColors.OKBLUE
+ "[S³I][Dir]"
+ bcolors.ENDC
+ BColors.ENDC
+ ": Connect with S3I-Directory"
)
self.dir = Directory(s3i_dir_url="https://dir.s3i.vswf.dev/api/2/", token=self.__access_token)
def __connect_with_repo(self):
print(
bcolors.OKBLUE
BColors.OKBLUE
+ "[S³I][Repo]"
+ bcolors.ENDC
+ BColors.ENDC
+ ": Connect with S3I-Repository"
)
......@@ -245,47 +195,65 @@ def sync_with_repo(self, path, topic):
self._ws.send(json.dumps(msg))
def __connect_with_broker(self):
print(bcolors.OKBLUE + "[S³I][Broker]" + bcolors.ENDC + ": Connect with S3I-Broker")
self.broker = Broker(
auth_form="Username/Password",
username=" ",
password=self.__access_token,
host=BaseVariable.BROKER_HOST,
)
endpoints = self.dir.queryThingIDBased(thingID=self.thing_id)["attributes"]["allEndpoints"]
for ep in endpoints:
if "s3ib" in ep:
self.__endpoint = ep
print(BColors.OKBLUE + "[S³I][Broker]" + BColors.ENDC + ": Connect with S3I-Broker")
self.__endpoint = find_broker_endpoint(self.dir, thing_id=self.thing_id)
if self.__is_broker_rest:
self.broker = BrokerREST(token=self.access_token)
def receive():
while True:
msg_str = self.broker.receive_once(self.__endpoint)
if msg_str == "":
continue
else:
self.__on_broker_callback(ch=None, method=None, properties=None,
body=json.loads(msg_str))
threading.Thread(
target=self.broker.receive,
args=(self.__endpoint, self.__on_broker_callback),
).start()
threading.Thread(
target=receive,
).start()
else:
self.broker = Broker(
auth_form="Username/Password",
username=" ",
password=self.__access_token,
host=BaseVariable.BROKER_HOST,
)
threading.Thread(
target=self.broker.receive,
args=(self.__endpoint, self.__on_broker_callback),
).start()
def __on_broker_callback(self, ch, method, properties, body):
body = ast.literal_eval(body.decode("utf-8"))
if isinstance(body, int):
if isinstance(body, bytes):
body = ast.literal_eval(body.decode("utf-8"))
elif isinstance(body, int):
print(body)
return
elif isinstance(body, str):
print(body)
return
message_type = body.get("messageType")
if message_type == "userMessage":
self.on_user_message(body)
elif message_type == "serviceRequest":
self.on_service_request(body)
elif message_type == "getValueRequest":
self.on_get_value_request(body)
elif message_type == "getValueReply":
self.on_get_value_reply(body)
elif message_type == "serviceReply":
self.on_service_reply(body)
else:
message_type = body.get("messageType")
if message_type == "userMessage":
self.on_user_message(body)
if message_type == "serviceRequest":
self.on_service_request(body)
elif message_type == "getValueRequest":
self.on_get_value_request(body)
elif message_type == "getValueReply":
self.on_get_value_reply(body)
elif message_type == "serviceReply":
self.on_service_reply(body)
else:
### TODO send user message reply back
pass
### TODO send user message reply back
pass
def on_user_message(self, msg):
print(msg)
pass
def on_get_value_request(self, msg):
get_value_reply = GetValueReply()
......@@ -321,30 +289,39 @@ def on_service_request(self, body_json):
service_functionality_obj = self.features.get(service_functionality, None)
if service_functionality_obj is None:
APP_LOGGER.critical(
"Functionality %s is not one of the built-in functionalities in %s!" % (service_functionality, self.name)
"Functionality %s is not one of the built-in functionalities in %s!" % (
service_functionality, self.name)
)
else:
pass
# TODO: Call right functionality.
# func_proxy = func.proxy()
func_proxy = service_functionality_obj.proxy()
method = getattr(func_proxy, service_type.split('/')[1])
method("test_job")
result = {"ok": method("test_job").get()}
service_reply = ServiceReply()
service_reply.fillServiceReply(
senderUUID=self.thing_id,
receiverUUID=body_json.get("sender", None),
serviceType=body_json.get("serviceType", None),
results=result,
replyingToUUID=body_json.get("identifier", None),
msgUUID=str(uuid.uuid4())
)
send_message(self.broker, self.dir, service_reply.msg)
def on_get_value_reply(self, msg):
pass
print(msg)
def on_service_reply(self, msg):
pass
print(msg)
def add_function_impl(self, obj, feature_name):
feature = self.__features.get(feature_name, None)
if feature is None:
APP_LOGGER.critical(
"Functionality %s is not one of the build-in functionalities" %feature_name
"Functionality %s is not one of the build-in functionalities" % feature_name
)
else:
self.__features[feature_name] = obj.start("", self.actor_ref)
......@@ -4,6 +4,7 @@
import time
import uuid
import json
import os
from functools import partial
from s3i import Directory
from s3i import Broker
......@@ -177,6 +178,22 @@ def send_request(access_token, sender_id, receiver_ids, service_type, parameters
APP_LOGGER.debug(f"Message send: {msg_str}")
def send_message(broker_obj, dir_obj, msg: dict):
if isinstance(msg, dict):
receivers = msg.get("receivers", None)
for r in receivers:
broker_obj.send([find_broker_endpoint(dir_obj, thing_id=r)], json.dumps(msg))
def find_broker_endpoint(dir_obj, thing_id):
thing_json = dir_obj.queryThingIDBased(thing_id)
all_endpoints = thing_json["attributes"].get("allEndpoints", None)
if all_endpoints:
for ep in all_endpoints:
if "s3ib" in ep:
return ep
def verify_message(message, message_type, service_type):
# TODO: Exception handling
if message["messageType"] != message_type:
......@@ -218,17 +235,6 @@ def create_request(sender_id, receiver_ids, service_type, parameters={}):
return req
def send_message(access_token, receiver_ids, req):
receiver_endpoints = [("s3ib://{}".format(i)) for i in receiver_ids]
s3i_broker = Broker(
auth_form="Username/Password",
username=" ",
password=access_token,
host="rabbitmq.s3i.vswf.dev",
)
s3i_broker.send(receiver_endpoints, req.msg.__str__())
# TODO: Pass access token instead of idp
def dir_search_with_name(idp, name):
"""Queries the s3i directory for the id corresponding to name and
......@@ -290,6 +296,35 @@ def dir_name_to_default_endpoints(idp, name):
return endpoint
def make_config_file(dt_id, name, roles, features=[]):
config_file = {
"thingId": dt_id,
"policyId": dt_id,
"attributes": {
"class": "ml40::Thing",
"name": name,
"roles": [
{
"class": roles
}
],
"features": [
]
}
}
for feature in features:
config_file["attributes"]["features"].append(
{
"class": feature
}
)
cwd = os.getcwd()
path = os.path.join(cwd, "configs", "config_{}.json".format(name))
with open(path, 'wb') as file:
file.write(json.dumps(config_file).encode('utf-8'))
return "config_{}.json".format(name)
def load_config(config_filepath):
"""Creates a json object from a json formatted file found at config_filepath.
......@@ -346,3 +381,5 @@ def get_requests(config):
receiver_uuids.append(target)
requests.append((msg_type, receiver_uuids, parameters))
return requests
Supports Markdown
0% or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment