Commit dd70e6e1 authored by Jiahang Chen's avatar Jiahang Chen
Browse files

first integration of s3i event system

parent 1218bc59
Pipeline #507247 passed with stages
in 1 minute and 58 seconds
......@@ -13,6 +13,7 @@
import copy
from s3i import IdentityProvider, TokenType, Directory
from s3i import Repository
from s3i import EventSubscriptionReply, EventUnsubscriptionReply, EventManager, EventFilterType
import s3i.exception
from s3i.broker import Broker, BrokerREST
from s3i.messages import ServiceReply, GetValueReply, SetValueReply
......@@ -48,7 +49,6 @@ def __init__(
is_stanford2010_sync=False,
stanford2010_sync_freq=None,
is_database=None,
database_conf=None,
database_file=None,
username=None,
password=None,
......@@ -95,7 +95,6 @@ def __init__(
self.__stanford2010_sync_freq = stanford2010_sync_freq
self.__is_db = is_database
self.__db_conf = database_conf
self.__db_file = database_file
self.__db = None
......@@ -126,6 +125,8 @@ def __init__(
if attributes:
self.__name = attributes.get("name", "")
self.__stanford2010 = None
self.__event_manager = None
self.user_func_list = list()
@property
def model(self):
......@@ -282,15 +283,23 @@ def run_forever(self):
if self.__is_db:
self.__config_database()
@staticmethod
def add_user_def(func):
self.__event_manager = EventManager(
event_filter_type=EventFilterType.RQL_FILTER,
ml40_model=self.model
)
threading.Thread(target=self.__event_manager.emit_event,
args=(self.broker, self.thing_id)).start()
for func in self.user_func_list:
threading.Thread(target=func).start()
def add_user_def(self, func):
"""Insert user-specified function in the thing object.
:param func: external defined function to be executed.
"""
threading.Thread(target=func).start()
self.user_func_list.append(func)
def __json_syn(self, freq=0.1):
"""
......@@ -341,13 +350,14 @@ def __stanford2010_syn(self, path, is_period=False, freq=10):
"""
This function finds the .hpr data according to the entered path and read out the content
"""
def get_hpr_files(path):
# get all .hpr files under the specified folder
files = [f for f in listdir(path) if isfile(join(path, f))]
hpr_files = []
for file in files:
filename, file_extension = splitext(file)
if file_extension == ".hpr": # in the folder, there is always one hpr data
if file_extension == ".hpr": # in the folder, there is always one hpr data
hpr_files.append(file)
return hpr_files
......@@ -402,8 +412,7 @@ def get_last_hpr(hpr_files, is_remove=False):
self.__stanford2010 = hpr_temp
def __config_database(self):
self.__db = DataBase(db=self.__db_file,
conf=self.__db_conf)
self.__db = DataBase(db=self.__db_file)
self.__db.connect()
def __connect_with_idp(self):
......@@ -543,7 +552,6 @@ def __on_broker_callback(self, ch, method, properties, body):
elif isinstance(body, str):
pass
if ch is not None:
ch.basic_ack(method.delivery_tag)
try:
......@@ -562,6 +570,16 @@ def __on_broker_callback(self, ch, method, properties, body):
self.on_set_value_request(body)
elif message_type == "setValueReply":
self.on_set_value_reply(body)
elif message_type == "eventSubscriptionRequest":
self.on_event_subscription_request(body)
elif message_type == "eventSubscriptionReply":
self.on_event_subscription_reply(body)
elif message_type == "eventUnsubscriptionRequest":
self.on_event_unsubscription_request(body)
elif message_type == "eventUnsubscriptionReply":
self.on_event_unsubscription_reply(body)
elif message_type == "eventMessage":
self.on_event_message(body)
else:
### TODO send user message reply back
pass
......@@ -956,11 +974,83 @@ def _getInstance(self, source_obj, uri_list):
uri_list.pop(0)
for key in source_obj.subFeatures.keys():
subfeature_dict = source_obj.subFeatures[key].to_json()
if subfeature_dict.get("name", "") == uri_list[0] or subfeature_dict.get("identifier", "")==uri_list[0] \
or subfeature_dict.get("class", "") == uri_list[0]:
if subfeature_dict.get("name", "") == uri_list[0] or subfeature_dict.get("identifier", "") == uri_list[
0] \
or subfeature_dict.get("class", "") == uri_list[0]:
uri_list.pop(0)
return self._getInstance(source_obj.subFeatures[key], uri_list)
def on_event_subscription_request(self, msg):
__log = "[S3I][Broker]: You have received a S3I-B EventSubscriptionRequest"
APP_LOGGER.info(__log)
rql_expression = msg.get("RQL")
subscription_status, sub_id = self.__event_manager.add_event(
filter_expression=rql_expression, subscriber=msg.get("sender"),
subscriber_endpoint=msg.get("replyToEndpoint")
)
__log = "[S3I][Broker]: Validation of RQL syntax: {}".format(subscription_status)
APP_LOGGER.info((__log))
event_sub_reply = EventSubscriptionReply()
event_sub_reply.fillEventSubscriptionReply(
sender=self.thing_id,
receivers=[msg.get("sender")],
subscription_id=sub_id,
replying_to_message=msg.get("identifier"),
msg_id = "s3i:" + str(uuid.uuid4()),
status=subscription_status
)
res = self.__send_message_to_broker(
receiver_endpoints=[msg.get("replyToEndpoint")],
msg=event_sub_reply.msg
)
if self.__is_broker_rest:
if res.status_code == 201:
__log = "[S3I][Broker]: Send S3I-B EventSubscriptionReply back to the requester"
APP_LOGGER.info(__log)
else:
__log = "[S3I[Broker]: " + res.text
APP_LOGGER.info(__log)
def on_event_unsubscription_request(self, msg):
__log = "[S3I][Broker]: You have received a S3I-B EventUnsubscriptionRequest"
APP_LOGGER.info(__log)
sub_id = msg.get("subscription-id")
unsubscription_status = self.__event_manager.delete_event(sub_id)
__log = "[S3I][Broker]: Status of unsubscription event: {}".format(unsubscription_status)
APP_LOGGER.info(__log)
event_ubsub_reply = EventUnsubscriptionReply()
event_ubsub_reply.fillEventUnsubscriptionReply(
sender=self.thing_id,
receivers=[msg.get("sender")],
msg_id = "s3i:" + str(uuid.uuid4()),
replying_to_message=msg.get("identifier"),
sub_id=sub_id,
status=unsubscription_status
)
res = self.__send_message_to_broker(
receiver_endpoints=[msg.get("replyToEndpoint")],
msg=event_ubsub_reply.msg
)
if self.__is_broker_rest:
if res.status_code == 201:
__log = "[S3I][Broker]: Send S3I-B EventSubscriptionReply back to the requester"
APP_LOGGER.info(__log)
else:
__log = "[S3I[Broker]: " + res.text
APP_LOGGER.info(__log)
def on_event_message(self, msg):
__log = "[S3I][Broker]: You have received a S3I-B EventMessage"
APP_LOGGER.info(__log)
content = msg.get("content")
sub_id = msg.get("subscription-id")
time = msg.get("time")
__log = "[S3I][Broker]: The event (sub-id: {0} has been trigged at the time {1}, content: {2})".format(
sub_id, time, content)
APP_LOGGER.info(__log)
def on_get_value_reply(self, msg):
"""Handles incoming S³I-B GetValueReply. Prints the content of msg to stdout.
......@@ -1005,6 +1095,33 @@ def on_set_value_reply(self, msg):
__log = "[S3I][Broker]: The status of value setting: {0}".format(result)
APP_LOGGER.info(__log)
def on_event_subscription_reply(self, msg):
"""
Handles incoming S3I-B EventSubscriptionReply. Prints the content of msg to stdout
:param msg: EventSubscriptionReply
:type msg: dict
"""
__log = "[S3I][Broker]: You have received a S3I-B EventSubscriptionReply"
APP_LOGGER.info(__log)
result = msg.get("ok", None)
sub_id = msg.get("subscription-id")
__log = "[S3I][Broker]: The status of event subscription (subcription-id: {0}) is {1}".format(sub_id, result)
APP_LOGGER.info(__log)
def on_event_unsubscription_reply(self, msg):
"""
Handles incoming S3I-B EventUnsubscriptionReply. Prints the content of msg to stdout
:param msg: EventUnsubscriptionReply
:type msg: dict
"""
__log = "[S3I][Broker]: You habe received a S3I-B EventUnsubscriptionReply"
APP_LOGGER.info(__log)
result = msg.get("ok", None)
sub_id = msg.get("subscription-id")
__log = "[S3I][Broker]: The status of event unsubscription (subscription-id: {0}): {1}".format(sub_id, result)
APP_LOGGER.info(__log)
def to_dir_json(self):
"""Returns a dictionary representing this thing's directory entry.
......@@ -1095,6 +1212,7 @@ def to_json(self):
self.dt_json["attributes"]["features"].append(self.features[key].to_json())
for key in self.ditto_features.keys():
self.dt_json["features"][key] = self.ditto_features[key].to_json()
self.__event_manager.subject_updated(cur_ml40=self.dt_json)
return self.dt_json
def to_subthing_json(self):
......
Markdown is supported
0% or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment