diff --git a/README.md b/README.md index b8e766ff2455aaa75e28a68d5e562e2747bbd802..2ec7465be2001f999fcbf516062d8fbb65918ced 100644 --- a/README.md +++ b/README.md @@ -10,10 +10,10 @@ # Installation python -m pip install fml40-reference-implementation ``` -or from our self hosted [wheel](https://git.rwth-aachen.de/kwh40/fml40-reference-implementation/-/jobs/artifacts/master/raw/public/fml40_reference_implementation-0.2.1-py3-none-any.whl?job=wheel) +or from our self hosted [wheel](https://git.rwth-aachen.de/kwh40/fml40-reference-implementation/-/jobs/artifacts/master/raw/public/fml40_reference_implementation-0.2.2-py3-none-any.whl?job=wheel) ``` -pip install https://git.rwth-aachen.de/kwh40/fml40-reference-implementation/-/jobs/artifacts/master/raw/public/fml40_reference_implementation-0.2.1-py3-none-any.whl?job=wheel +pip install https://git.rwth-aachen.de/kwh40/fml40-reference-implementation/-/jobs/artifacts/master/raw/public/fml40_reference_implementation-0.2.2-py3-none-any.whl?job=wheel ``` Documentation diff --git a/ml/callback.py b/ml/callback.py index f3899579439bedd9645b245dca19bc8787f7e561..e19ce7631a85d8de73f971d979403234aa50cc1c 100644 --- a/ml/callback.py +++ b/ml/callback.py @@ -1,3 +1,5 @@ +import asyncio + class CallbackManager(object): """ @@ -5,6 +7,7 @@ class CallbackManager(object): """ CALLBACK = 'callback' ONE_SHOT = 'one_shot' + IS_ASYNC_CALLBACK = 'is_async_callback' ARGS = 'args' KWARGS = 'kwargs' @@ -14,15 +17,31 @@ def __init__(self): """ self._stack = {} - def add(self, prefix, callback, one_shot, *args, **kwargs): + def add(self, prefix, callback, one_shot, is_async, *args, **kwargs): + """ + Appends async callback function to dictionary + :param prefix: + :param callback: + :param one_shot: + :param is_async: + :param args: + :param kwargs: + :return: + """ # prepare the stack if prefix not in self._stack: self._stack[prefix] = [] + if not isinstance(one_shot, bool): + raise TypeError + if not isinstance(is_async, bool): + raise TypeError + # create callback dictionary callback_dict = self.create_callback_dict( callback, one_shot, + is_async, *args, **kwargs ) @@ -48,46 +67,39 @@ def clear(self): if self._stack: self._stack = {} - def process(self, prefix): + def process(self, prefix, loop=asyncio.get_event_loop()): if prefix not in self._stack: return False - for callback_dict in self._stack[prefix]: - method = callback_dict[self.CALLBACK] - args = callback_dict[self.ARGS] - kwargs = callback_dict[self.KWARGS] - method(*args, **kwargs) - for callback_dict in self._stack[prefix]: - if callback_dict[self.ONE_SHOT]: - self.remove(prefix, callback_dict) - return True - - def process_async_task(self, prefix, loop): - if prefix not in self._stack: - return False for callback_dict in self._stack[prefix]: method = callback_dict[self.CALLBACK] args = callback_dict[self.ARGS] kwargs = callback_dict[self.KWARGS] - loop.create_task(method(*args, **kwargs)) + if callback_dict[self.IS_ASYNC_CALLBACK]: + loop.create_task(method(*args, **kwargs)) + else: + method(*args, **kwargs) for callback_dict in self._stack[prefix]: if callback_dict[self.ONE_SHOT]: self.remove(prefix, callback_dict) + return True - def create_callback_dict(self, callback, one_shot, *args, **kwargs): + def create_callback_dict(self, callback, one_shot, is_async, *args, **kwargs): """ Create and return callback dictionary :param method callback: :param bool one_shot: + :param is_async: :return: """ return { self.CALLBACK: callback, self.ONE_SHOT: one_shot, + self.IS_ASYNC_CALLBACK: is_async, self.ARGS: args, self.KWARGS: kwargs } diff --git a/ml/s3i_tools.py b/ml/s3i_tools.py index 67f67da1142ad59f47339ff4fd6655171b855080..1710b2f0b2cbd74fe5fbbf73fe89757b75cde208 100644 --- a/ml/s3i_tools.py +++ b/ml/s3i_tools.py @@ -110,7 +110,7 @@ def on_connection_open(self, _unused_connection): self.__channel = _unused_connection.channel( on_open_callback=self.on_channel_open ) - self.__callbacks.process_async_task( + self.__callbacks.process( self._ON_CONNECTION_OPEN, self.__loop ) @@ -131,7 +131,7 @@ def on_channel_open(self, _unused_channel): prefetch_count=1 ) self.start_consuming() - self.__callbacks.process_async_task( + self.__callbacks.process( self._ON_CHANNEL_OPEN, self.__loop ) diff --git a/ml/thing.py b/ml/thing.py index 1942f6e96688e0e251ed2a35af8f96aebdbf6658..622890565a66e64e07e3f6d50f3e448472b7db66 100644 --- a/ml/thing.py +++ b/ml/thing.py @@ -1,7 +1,4 @@ import asyncio -import threading -import time - from s3i.identity_provider import IdentityProvider, TokenType from s3i.directory import Directory from s3i.repository import Repository @@ -76,43 +73,47 @@ def run_forever(self): APP_LOGGER.info("[S3I]: Disconnect from S3I") self.loop.close() - def add_on_thing_start_ok_callback(self, callback_func, one_shot, *args, **kwargs): + def add_on_thing_start_ok_callback(self, callback_func, one_shot, is_async, *args, **kwargs): self.callbacks.add( self._ON_THING_START_OK, callback_func, one_shot, + is_async, *args, **kwargs ) - def add_on_idp_start_ok_callback(self, callback_func, one_shot, *args, **kwargs): + def add_on_idp_start_ok_callback(self, callback_func, one_shot, is_async, *args, **kwargs): self.callbacks.add( self._ON_IDP_START_OK, callback_func, one_shot, + is_async, *args, **kwargs ) - def add_on_directory_start_ok_callback(self, callback_func, one_shot, *args, **kwargs): + def add_on_directory_start_ok_callback(self, callback_func, one_shot, is_async, *args, **kwargs): self.callbacks.add( self._ON_DIRECTORY_START_OK, callback_func, one_shot, + is_async, *args, **kwargs ) - def add_on_repository_start_ok_callback(self, callback_func, one_shot, *args, **kwargs): + def add_on_repository_start_ok_callback(self, callback_func, one_shot, is_async, *args, **kwargs): self.callbacks.add( self._ON_REPOSITORY_START_OK, callback_func, one_shot, + is_async, *args, **kwargs ) - def add_on_broker_start_ok_callback(self, callback_func, one_shot, *args, **kwargs): + def add_on_broker_start_ok_callback(self, callback_func, one_shot, is_async, *args, **kwargs): def _add_on_channel_open_callback(_thing, _callback, _one_shot, *_args, **_kwargs): _thing.broker.add_on_channel_open_callback( callback_func, @@ -124,6 +125,7 @@ def _add_on_channel_open_callback(_thing, _callback, _one_shot, *_args, **_kwarg self._ON_BROKER_START_OK, _add_on_channel_open_callback, one_shot, + is_async, # the following parameters are *args self, callback_func, one_shot, @@ -132,7 +134,7 @@ def _add_on_channel_open_callback(_thing, _callback, _one_shot, *_args, **_kwarg ) def add_ml40_implementation(self, implementation_object, ml40_feature, *args, **kwargs): - async def _add_ml40_implementation(_thing, _implementation_object, _ml40_feature, *_args, **_kwargs): + def _add_ml40_implementation(_thing, _implementation_object, _ml40_feature, *_args, **_kwargs): feature = _thing.entry.features.get(_ml40_feature, None) if feature is None: APP_LOGGER.critical( @@ -147,6 +149,7 @@ async def _add_ml40_implementation(_thing, _implementation_object, _ml40_feature self.add_on_thing_start_ok_callback( _add_ml40_implementation, True, + False, self, implementation_object, ml40_feature, @@ -183,7 +186,7 @@ def __setup_identity_provider(self): APP_LOGGER.info("[S3I]: Access Token granted") self.loop.call_later(self.__get_remaining_time_to_refresh(), self.__refresh_token_recur) - self.callbacks.process_async_task( + self.callbacks.process( self._ON_IDP_START_OK, self.loop ) @@ -217,7 +220,7 @@ def __get_remaining_time_to_refresh(self): def __setup_thing_json_sync(self): APP_LOGGER.info("[S3I]: Start the thing") self.__recursively_update_dt_json(frequency=self.parameters.thing_sync_freq) - self.callbacks.process_async_task( + self.callbacks.process( prefix=self._ON_THING_START_OK, loop=self.loop ) @@ -229,7 +232,7 @@ def __setup_directory(self): token=self.__token ) self.__recursively_update_directory(frequency=self.parameters.dir_sync_freq) - self.callbacks.process_async_task( + self.callbacks.process( self._ON_DIRECTORY_START_OK, self.loop ) @@ -252,6 +255,7 @@ def __setup__broker(self): self.broker.connect() self.callbacks.process( self._ON_BROKER_START_OK, + self.loop ) def __setup_repository(self): @@ -261,7 +265,7 @@ def __setup_repository(self): token=self.__token ) self.__recursively_update_repository(frequency=self.parameters.repo_sync_freq) - self.callbacks.process_async_task( + self.callbacks.process( self._ON_REPOSITORY_START_OK, self.loop ) diff --git a/setup.py b/setup.py index 1ad1aeec153c157164973dfe6969a0ae8aacc660..61be06f55e9f4e0b94ec4a09e16006a1f9b96f90 100644 --- a/setup.py +++ b/setup.py @@ -5,7 +5,7 @@ setuptools.setup( name="fml40-reference-implementation", - version="0.2.1", + version="0.2.2", author="Kompetenzzentrum Wald und Holz 4.0", author_email="s3i@kwh40.de", description="fml40 reference implementation basic functions",