Commit 39e2beee authored by Jiahang Chen's avatar Jiahang Chen
Browse files

add asyncio event loop to thing

parent ad98ab49
from ml.dt_factory import create_thing, add_function_impl_obj, build, build_feature, build_role, build_sub_features, build_sub_thing
from ml.tools import make_thing_config, make_sub_thing, load_config, make_feature_config, find_broker_endpoint
from ml.app_logger import setup_logger, APP_LOGGER
from ml.entry import Entry
"""This module provides functionalities for setting up a command line
parser and some functions to extract data from parsed command line arguments.
"""
import argparse
import json
from s3i import IdentityProvider, TokenType
GRANT_TYPES = ("client_credentials", "password")
def configure_client_parser(parser):
"""Adds subparsers to parser allowing the specification of a
client via file or terminal input.
:param parser: Parser to be modified
:returns: Modified parser
:rtype: argparse.ArgumentParser
"""
# Setup a parser for file input
subparsers = parser.add_subparsers(required=True, dest="client")
client_file_parser = subparsers.add_parser(
"cfi", help="Specify client and seceret via file input."
)
client_file_parser.add_argument("cfp", help="Filepath")
# Setup a parser for user input
client_ui_parser = subparsers.add_parser(
"cui", help="Specify client and secret via terminal."
)
client_ui_parser.add_argument(
"-c", default="admin-cli", help="Client name: admin-cli is default"
)
client_ui_parser.add_argument(
"-cs", default="", help="Client secret: empty string is default"
)
return client_file_parser, client_ui_parser
def configure_user_parser(parser):
"""Adds subparsers to parser allowing the specification of a
username and a password via file or terminal input.
:param parser: Parser to be modified
:returns: Modified parser
:rtype: argparse.ArgumentParser
"""
# Setup a parser for file input
subparsers = parser.add_subparsers(required=True, dest="user")
user_file_parser = subparsers.add_parser(
"ufi", help="Specifiy username and password via file input."
)
user_file_parser.add_argument("ufp", help="Filepath to credentials.")
# Setup a parser for user input
user_ui_parser = subparsers.add_parser(
"uui", help="Specify username and password via terminal."
)
user_ui_parser.add_argument("-u", required=True, help="Username")
user_ui_parser.add_argument("-up", required=True, help="User password")
return user_file_parser, user_ui_parser
def configure_credentials_parser(parser):
"""Adds arguments to parser allowing to specify the grant type
used to retrieve a java web token. Afterwards
configure_client_parser and configure_user_parser are called.
:param parser: Parser to be modified
:returns: Modified parser
:rtype: argparse.ArgumentParser
"""
parser.add_argument(
"--g",
choices=GRANT_TYPES,
default=GRANT_TYPES[0],
help=f"Specify grant type ({GRANT_TYPES[0]} is default)",
)
parser.add_argument(
"--scope", default="email", help="Specifiy the scope (email is default)"
)
cfi_parser, cui_parser = configure_client_parser(parser)
__cfi_ufi_parser, __cfi_uui_parser = configure_user_parser(cfi_parser)
__cui_ufi_parser, __cui_uui_parser = configure_user_parser(cui_parser)
return parser
def get_token_from_args(args):
"""Extracts the credentials from args and returns a java web
token.
:param args: argparse.Namespace object
:returns: A java webo token or an error message.
:rtype: str
"""
grant_type = args.g
scope = args.scope
client = None
secret = None
username = None
password = None
if args.client == "cfi":
filepath = args.cfp
client, secret = parse_client_id_and_secret(filepath)
else:
client = args.c
secret = args.cs
if args.user == "ufi":
filepath = args.ufp
username, password = parse_username_and_password(filepath)
else:
username = args.u
password = args.up
token = get_access_token(grant_type,
client,
secret,
username,
password,
scope)
return str(token)
def parse_username_and_password(filepath):
"""Reads the file specified by filepath and returns username and
password.
:param filepath: Path to a json file
:returns: Username and password
:rtype: (str, str)
"""
username = None
password = None
if filepath is not None:
with open(filepath, "r") as json_f:
js_object = json.load(json_f)
username = js_object["name"]
password = js_object["password"]
return username, password
def parse_client_id_and_secret(filepath):
"""Reads the file specified by filepath and returns the client id
and the client secret.
:param filepath: Path to a json file
:returns: Client id and client secret
:rtype: (str, str)
"""
client_id = None
client_secret = None
if filepath is not None:
with open(filepath, "r") as json_f:
js_object = json.load(json_f)
client_id = js_object.get("thingId", None)
if not client_id:
client_id = js_object.get("identifier")
client_secret = js_object.get("client_secret")
if not client_secret:
client_secret = js_object.get("secret")
return client_id, client_secret
def get_access_token(grant_type, client_id, client_secret, username, password, scope):
"""Returns a java web token.
:param grant_type: method used to retrieve the token
:param client_id: s3i specific client identifier
:param client_secret: secret of the client
:param username: username
:param password: password
:param scope:
:returns: java web token
:rtype: str
"""
idp = IdentityProvider(
grant_type=grant_type,
client_id=client_id,
username=username,
password=password,
client_secret=client_secret,
realm="KWH",
identity_provider_url="https://idp.s3i.vswf.dev/",
)
return idp.get_token(TokenType.ACCESS_TOKEN, scope=scope)
......@@ -3,6 +3,9 @@
from ml.app_logger import APP_LOGGER
from ml.tools import remove_namespace, check_var_conflict
from ml.ditto_feature import ditto_feature
import sys
import inspect
from ml.entry import Entry
from ml.thing import Thing
from ml.ml40.roles.servives.service import Service
from ml.ml40.roles.hmis.app import App
......@@ -186,8 +189,6 @@
# TODO: automatically get all classes in module
DT_FACTORY = {}
import sys, inspect
clsmembers = inspect.getmembers(sys.modules[__name__], inspect.isclass)
for member in clsmembers:
DT_FACTORY[member[0]] = member[1]
......@@ -239,7 +240,7 @@ def build_sub_thing(feature_ins, feature):
"""
json_sub_things = feature.get("targets", [])
for json_sub_thing in json_sub_things:
sub_thing_ref = create_thing(model={"attributes": json_sub_thing})
sub_thing_ref = create_thing(model_json={"attributes": json_sub_thing})
sub_thing_name = json_sub_thing.get("name", None)
feature_ins.targets[sub_thing_name] = sub_thing_ref
......@@ -370,7 +371,38 @@ def add_function_impl_obj(thing, impl_obj, feature_name, **kwargs):
thing.features[feature_name] = impl_ins
def create_thing(model, grant_type="password",
def create_thing(
model_json,
oauth2_secret="",
grant_type="client_credentials",
username=None,
password=None,
is_repository=False,
is_broker=False,
is_broker_rest=False,
):
attributes = model_json.get("attributes")
if attributes is None:
APP_LOGGER.error("Attributes are empty")
identifier = model_json.get("thingId", "")
name = attributes.get("name", "")
APP_LOGGER.info("Build digital twin {} with id {}".format(name, identifier))
entry_ref = Entry(identifier, name)
build(entry_ref, model_json)
return Thing(
entry=entry_ref,
oauth2_secret=oauth2_secret,
grant_type=grant_type,
username=username,
password=password,
is_repository=is_repository,
is_broker=is_broker,
is_broker_rest=is_broker_rest
)
def _create_thing(model, grant_type="password",
secret="", username=None, password=None,
is_broker_rest=False, is_broker=False, is_repo=False, is_stanford2010=False,
is_stanford2010_sync=False,
......
class Entry:
def __init__(self, identifier, name):
self.__name = name
self.__identifier = identifier
self.__features = {}
self.__roles = {}
self.__ditto_features = {}
self.__repo_json = {}
self.__dir_json = {
"thingId": "",
"policyId": "",
"attributes": {
}
}
@property
def features(self):
return self.__features
@features.setter
def features(self, value):
self.__features = value
@property
def ditto_features(self):
return self.__ditto_features
@ditto_features.setter
def ditto_features(self, value):
self.__ditto_features = value
@property
def roles(self):
return self.__roles
@roles.setter
def roles(self, value):
self.__roles = value
@property
def name(self):
return self.__name
@property
def identifier(self):
return self.__identifier
@property
def repo_json(self):
return self.__repo_json
@property
def dir_json(self):
return self.__dir_json
def refresh_directory_entry(self):
if self.__identifier is not None:
self.__dir_json["thingId"] = self.__identifier
self.__dir_json["policyId"] = self.__identifier
if self.__name is not None:
self.__dir_json["attributes"]["name"] = self.__name
self.__dir_json["attributes"]["dataModel"] = "fml40"
self.__dir_json["attributes"]["thingStructure"] = {
"class": "ml40::Thing",
"links": []
}
for key in self.__roles.keys():
role_entry = {
"association": "roles",
"target": self.__roles[key].to_json()
}
self.__dir_json["attributes"]["thingStructure"]["links"].append(role_entry)
for key in self.features.keys():
feature_target = {
"class": self.__features[key].to_json()["class"],
}
if self.__features[key].to_json().get("identifier") is not None:
feature_target["identifier"] = self.__features[key].to_json()["identifier"]
feature_entry = {"association": "features", "target": feature_target}
# if the feature has targets, like ml40::Composite
if hasattr(self.__features[key], "targets"):
feature_entry["target"]["links"] = list()
for target in self.__features[key].targets.keys():
target_json = (
self.__features[key].targets[target].entry.refresh_sub_thing_dir_entry()
)
feature_entry["target"]["links"].append(target_json)
self.__dir_json["attributes"]["thingStructure"]["links"].append(feature_entry)
return self.__dir_json
def refresh_repository_entry(self):
self.__repo_json = {
"thingId": self.__identifier,
"policyId": self.__identifier,
"attributes": {
"class": "ml40::Thing",
"name": self.__name,
},
}
if self.roles:
self.__repo_json["attributes"]["roles"] = []
if self.features:
self.__repo_json["attributes"]["features"] = []
if self.ditto_features:
self.__repo_json["features"] = {}
for key in self.__roles.keys():
self.__repo_json["attributes"]["roles"].append(self.__roles[key].to_json())
for key in self.__features.keys():
self.__repo_json["attributes"]["features"].append(self.__features[key].to_json())
for key in self.__ditto_features.keys():
self.__repo_json["features"][key] = self.__ditto_features[key].to_json()
return self.__repo_json
def refresh_sub_thing_repo_entry(self):
"""Returns a dictionary representing this thing in it's current state
as a subordinate thing. This representation should be used for
subordinate things in s3i repository entries.
:returns: Representation of this object as a subordinate thing
:rtype: dict
"""
json_out = {
"class": "ml40::Thing",
"name": self.name,
"roles": [],
"features": [],
}
if self.__identifier:
json_out["identifier"] = self.__identifier
for key in self.__roles.keys():
json_out["roles"].append(self.__roles[key].to_json())
for key in self.__features.keys():
json_out["features"].append(self.__features[key].to_json())
return json_out
def refresh_sub_thing_dir_entry(self):
"""Returns a dictionary representing this thing in it's current state
as a subordinate thing. This representation should be used for
subordinate things in s3i directory entries.
:returns: Representation of this object as a subordinate thing.
:rtype: dict
"""
json_out = {"class": "ml40::Thing", "links": []}
if self.__identifier:
json_out["identifier"] = self.__identifier
for key in self.__roles.keys():
role_entry = {"association": "roles", "target": self.__roles[key].to_json()}
json_out["links"].append(role_entry)
for key in self.__features.keys():
feature_target = {
"class": self.__features[key].to_json()["class"],
}
if self.__features[key].to_json().get("identifier") is not None:
feature_target["identifier"] = self.__features[key].to_json()["identifier"]
feature_entry = {"association": "features", "target": feature_target}
json_out["links"].append(feature_entry)
return json_out
......@@ -22,6 +22,6 @@ def to_json(self):
if self.targets:
self.__json_out["targets"] = []
for key in self.targets.keys():
self.__json_out["targets"].append(self.targets[key].to_subthing_json())
self.__json_out["targets"].append(self.targets[key].entry.refresh_sub_thing_repo_entry())
return self.__json_out
......@@ -23,5 +23,5 @@ def to_json(self):
if self.targets:
self.__json_out["targets"] = []
for key in self.targets.keys():
self.__json_out["targets"].append(self.targets[key].to_subthing_json())
self.__json_out["targets"].append(self.targets[key].entry.refresh_sub_thing_repo_entry())
return self.__json_out
This diff is collapsed.
import pika
import requests
from s3i.exception import raise_error_from_response, S3IBrokerRESTError
from pika.adapters.asyncio_connection import AsyncioConnection
from pika.connection import ConnectionParameters
from pika import PlainCredentials
from pika.exceptions import UnroutableError
from ml.app_logger import APP_LOGGER
MSG_EXCHANGE = "demo.direct"
EVENT_EXCHANGE = "eventExchange"
BROKER_HOST = "rabbitmq.s3i.vswf.dev"
BROKER_VIRTUAL_HOST = "s3i"
BROKER_API_URL = "https://broker.s3i.vswf.dev/"
class BrokerREST:
def __init__(self, token, url=BROKER_API_URL):
self.__token = token
self.__url = url
self.__headers = {
'Content-Type': 'application/json',
"Authorization": 'Bearer {}'.format(self.__token)
}
@property
def token(self):
return self.__token
@token.setter
def token(self, value):
self.__token = value
def send(self, endpoint, msg):
response = requests.post(url=self.__url + endpoint, headers=self.__headers, data=msg)
raise_error_from_response(response, S3IBrokerRESTError, 201)
def receive_once(self, endpoint):
response = requests.get(url=self.__url + endpoint,
headers=self.__headers)
return raise_error_from_response(response, S3IBrokerRESTError, 200)
class Broker:
def __init__(self, token, endpoint, callback, loop):
self.__token = token
self.__endpoint = endpoint #TODO Event Queue, more queue?
self.__loop = loop
self.__callback = callback
self.__credentials = PlainCredentials(
username=" ",
password=token
)
self.__connection_parameters = ConnectionParameters(
host=BROKER_HOST,
virtual_host=BROKER_VIRTUAL_HOST,
credentials=self.__credentials,
heartbeat=0
)
self.__connection = None
self.__channel = None
@property
def token(self):
return self.__token
@token.setter
def token(self, value):
self.__token = value
@property
def connection(self):
return self.__connection
@property
def channel(self):
return self.__channel
def connect(self):
self.__connection = AsyncioConnection(
parameters=self.__connection_parameters,
on_open_callback=self.on_connection_open,
on_open_error_callback=self.on_connection_open_error,
on_close_callback=self.on_connection_closed,
custom_ioloop=self.__loop
)
def on_connection_open(self, _unused_connection):
self.__channel = _unused_connection.channel(
on_open_callback=self.on_channel_open
)
@staticmethod
def on_connection_open_error(_unused_connection, err):
APP_LOGGER.error("[S3I]: Connection to broker failed: {}".format(err))
@staticmethod
def on_connection_closed(_unused_connection, reason):
APP_LOGGER.info("[S3I]: Connection closed: {}".format(reason))
def on_channel_open(self, _unused_channel):
APP_LOGGER.info("[S3I]: Channel {} open".format(_unused_channel))
_unused_channel.add_on_close_callback(self.on_channel_closed)
_unused_channel.basic_qos(
prefetch_count=1
)
_unused_channel.basic_consume(
auto_ack=True,
queue=self.__endpoint,
on_message_callback=self.__callback
)
@staticmethod
def on_channel_closed(channel, reason):
APP_LOGGER.info("[S3I]: Channel {} is closed: {}".format(channel, reason))
def send(self, endpoint, msg):
if self.__channel.is_open:
try:
self.__channel.basic_publish(
exchange=MSG_EXCHANGE,
properties=pika.BasicProperties(
content_type="application/json",
delivery_mode=2
),
body=msg,
routing_key=endpoint
)
except UnroutableError as err:
APP_LOGGER.error("[S3I]: Sending message failed: {}".format(err))
def publish_event(self, msg, topic):
if self.__channel.is_open:
self.__channel.basic_publish(
exchange=MSG_EXCHANGE,
properties=pika.BasicProperties(
content_type="application/json",
),
body=msg,
routing_key=topic
)
This diff is collapsed.
......@@ -17,7 +17,7 @@
"requests",
"jsonschema",
#"s3i==0.4.1",
"s3i@https://git.rwth-aachen.de/kwh40/s3i/-/jobs/artifacts/master/raw/public/s3i-0.4.1-py3-none-any.whl?job=wheel"
"s3i@https://git.rwth-aachen.de/kwh40/s3i/-/jobs/artifacts/master/raw/public/s3i-0.5.2-py3-none-any.whl?job=wheel"
],
classifiers=[
"Programming Language :: Python :: 3",
......
from ml.dt_factory import create_thing
from ml.app_logger import setup_logger
setup_logger("test")
model_json = {
"thingId": "s3i:b6d1cc6d-896c-40fe-9403-b5b7682b1d03",
"policyId": "s3i:b6d1cc6d-896c-40fe-9403-b5b7682b1d03",