Skip to content
Snippets Groups Projects
Commit 71609a1c authored by Matthias Stefan Bodenbenner's avatar Matthias Stefan Bodenbenner
Browse files

changed initalization procedure, made it much more easy

parent c3cb486c
Branches
Tags 8.0.0
No related merge requests found
[![Build](https://git-ce.rwth-aachen.de/wzl-mq-ms/forschung-lehre/lava/unified-device-interface/python/badges/master/pipeline.svg)](https://git-ce.rwth-aachen.de/wzl-mq-ms/forschung-lehre/lava/unified-device-interface/python/commits/master)
# Python Unified Device Interface
Current stable version: 7.1.0
Current stable version: 8.0.0
Stable legacy version: 5.2.7 **SHOULD NOT BE USED ANYMORE!**
## Installation
......@@ -65,6 +65,15 @@ Funded by the Deutsche Forschungsgemeinschaft (DFG, German Research Foundation)
## Recent changes
**8.0.0** - 2023-03-11
- changed initialization routine
- the mapping via a dictionary of encapsulated sensor logic to HTTP-Endpoints is not needed anymore
- the mapping is now derived automatically because the names of the attributes from the sensor implementation are assumed to be generated from a SOIL-Model
- code clean-up
- removed a lot of deprecated source code
- bug fixes
- fixed in error of fixed jobs
**7.1.0** - 2023-02-27
- added legacy flag as server parameter (default: false)
......
from setuptools import setup, find_packages
setup(name='wzl-udi',
version='7.1.0',
version='8.0.0',
url='https://git-ce.rwth-aachen.de/wzl-mq-public/soil/python',
author='Matthias Bodenbenner',
author_email='m.bodenbenner@wzl.mq.rwth-aachen.de',
......
# -*- coding: utf-8 -*-
import asyncio
import traceback
import functools
from typing import Dict, Any
import traceback
from typing import Dict
from aiohttp import web
from aiohttp.web import middleware
......@@ -11,17 +10,17 @@ from aiohttp.web_request import Request
from wzl.utilities import root_logger
from .error import ServerException
from ..soil.component import Component
from ..soil.element import Element
from ..soil.error import InvokationException, ReadOnlyException, ChildNotFoundException
from ..soil.measurement import Measurement
from ..utils.error import DeviceException, UserException
from ..soil.function import Function
from ..soil.figure import Figure
from ..soil.object import Object
from ..soil.component import Component
from ..soil.function import Function
from ..soil.measurement import Measurement
from ..soil.parameter import Parameter
from ..utils.constants import BASE_UUID_PATTERN, HTTP_GET, HTTP_OPTIONS
from ..soil.stream import StreamScheduler
from ..utils import serialize
from ..utils.constants import BASE_UUID_PATTERN, HTTP_GET, HTTP_OPTIONS
from ..utils.error import DeviceException, UserException
logger = root_logger.get(__name__)
......@@ -57,7 +56,7 @@ class HTTPServer(object):
"""
def __init__(self, loop: asyncio.ProactorEventLoop, host: str, port: int, model: Component,
dataformat: str = 'json', legacy_mode=False):
dataformat: str = 'json', legacy_mode=False, scheduler: StreamScheduler = None):
"""Constructor
Args:
......@@ -67,6 +66,7 @@ class HTTPServer(object):
model: The root component of the SOIL model, should be initialized via Component.load(...)
dataformat: String specifying the dataformat of the responses of the server, either 'json' (default) or 'xml'.
legacy_mode: If true, the datatypes are serialized to "bool" and "float" (instead of "boolean" and "float").
scheduler: Stream handler. Is required to hand over jobs of streams and events for dynamic components, if created at runtime.
"""
if dataformat not in ['json', 'xml']:
raise ValueError('Dataformat must be one of "json" or "xml".')
......@@ -77,6 +77,7 @@ class HTTPServer(object):
self.root = model
self._dataformat = dataformat
self._legacy_mode = legacy_mode
self._scheduler = scheduler
self.app = web.Application(loop=self.loop, middlewares=[cors])
......@@ -204,13 +205,17 @@ class HTTPServer(object):
logger.error('Response: {}'.format(response))
return self.prepare_response(response, None, status=404)
if not isinstance(item, Object) and not isinstance(item, Component):
if not isinstance(item, Component):
return self.prepare_response({}, None, status=405)
try:
response = await self.loop.run_in_executor(None, functools.partial(item.remove, uuids[-1], *data['args'],
component_name = await self.loop.run_in_executor(None,
functools.partial(item.remove, uuids[-1], *data['args'],
**data['kwargs']))
if self._scheduler is not None:
self._scheduler.remove_jobs('/'.join(uuids))
status = 200
logger.info('Response: {}'.format(response))
# logger.info('Response: {}'.format(response))
except ChildNotFoundException as e:
logger.error(traceback.format_exc())
response = {'error': str(e)}
......@@ -296,15 +301,18 @@ class HTTPServer(object):
logger.error('Response: {}'.format(response))
return self.prepare_response(response, None, status=404)
if not isinstance(item, Object) and not isinstance(item, Component):
if not isinstance(item, Component):
return self.prepare_response({}, None, status=405)
try:
response = await self.loop.run_in_executor(None,
implementation = await self.loop.run_in_executor(None,
functools.partial(item.add, uuids[-1], data['class_name'],
data['json_file'], *data['args'],
**data['kwargs']))
if self._scheduler is not None:
self._scheduler.add_jobs(implementation.streams('/'.join(uuids)))
status = 200
logger.info('Response: {}'.format(response))
response = {}
# logger.info('Response: {}'.format(response))
except (DeviceException, ServerException, UserException) as e:
logger.error(traceback.format_exc())
response = {'error': str(e)}
......
......@@ -11,9 +11,9 @@ import json
import os
import sys
from typing import List, Any, Union, Dict
from wzl.utilities import root_logger
from . import docstring_parser
from .element import Element
from .error import ChildNotFoundException
from .function import Function
......@@ -27,8 +27,9 @@ logger = root_logger.get(__name__)
class Component(Element):
def __init__(self, uuid: str, name: str, description: str, functions: List[Function], measurements: List[Measurement],
parameters: List[Parameter], components: List['Component'], mapping: Dict, ontology: str = None):
def __init__(self, uuid: str, name: str, description: str, functions: List[Function],
measurements: List[Measurement],
parameters: List[Parameter], components: List['Component'], implementation: Any, ontology: str = None):
"""
Args:
......@@ -40,21 +41,7 @@ class Component(Element):
measurements: List of all children measurements.
parameters: List of all children parameters.
components: List of all children components. Might contain dynamic-components.
mapping: Dictionary containing a mapping of the underlying device implementation to the HTTP-endpoints.
The mapping of a component looks as follows :
{
'getter': com_implementation.get,
'setter': com_implementation.set,
'MEA-Temperature': com_implementation.get_mea_temperature,
'PAR-State': {...},
'FUN-Reset: {...},
'COM-Part': {...},
}
If the component does not have dynamic children components, 'getter' and 'setter' are set to None.
For all children there is a key-value pair where the UUID of the child is the key and the mapping of the child is the value.
For the structure of the childrens' mappings please refer to the respective documentation.
implementation: The class of the sensor layer implementing this component.
ontology: Optional field containing the reference to a semantic definition of the components name or purpose.
Raises:
......@@ -91,8 +78,7 @@ class Component(Element):
self._measurements = measurements
self._components = components
self._parameters = parameters
self._implementation_add = mapping['add'] if 'add' in mapping else None
self._implementation_remove = mapping['remove'] if 'remove' in mapping else None
self._implementation = implementation
def __getitem__(self, item: Union[str, List[str]], method: int = HTTP_GET) -> Any:
"""Returns the value of the specified item.
......@@ -142,7 +128,8 @@ class Component(Element):
return child
else:
return child.__getitem__(item[1:], method)
raise ChildNotFoundException(f'{self.uuid}: Given uuid {item} is not the id of a child of the current component!')
raise ChildNotFoundException(
f'{self.uuid}: Given uuid {item} is not the id of a child of the current component!')
return super().__getitem__(item, method)
def __setitem__(self, key: str, value: Any):
......@@ -211,102 +198,50 @@ class Component(Element):
return dictionary
@staticmethod
def merge_dictionaries(parent_dict: Dict[str, Any], component_dict: Dict[str, Any]):
def merge_measurements(parent_list: List[Dict], component_list: List[Dict]):
for measurement in parent_list:
if 'uuid' not in measurement:
raise Exception('UUID {} not given for measurement to be overwritten.'.format(measurement['uuid']))
idx = [i for i, v in enumerate(component_list) if v['uuid'] == measurement['uuid']]
if len(idx) != 1:
raise Exception('Mismatching UUID: {}.'.format(measurement['uuid']))
idx = idx[0]
component_list[idx].update(measurement)
return component_list
def merge_functions(parent_list, component_list):
for function in parent_list:
if 'uuid' not in function:
raise Exception('UUID {} not given for function to be overwritten.'.format(function['uuid']))
idx = [i for i, v in enumerate(component_list) if v['uuid'] == function['uuid']]
if len(idx) != 1:
raise Exception('Mismatching UUID: {}.'.format(function['uuid']))
idx = idx[0]
if 'name' in function:
component_list[idx]['name'] = function['name']
if 'description' in function:
component_list[idx]['description'] = function['description']
if 'arguments' in function:
component_list[idx]['arguments'] = merge_measurements(function['arguments'], component_list[idx]['arguments'])
if 'returns' in function:
component_list[idx]['returns'] = merge_measurements(function['returns'], component_list[idx]['returns'])
return component_list
# merge components, i.e. overwrite fields of 'static' children dictionary with the 'dynamic' fields of the parents dictionary
uuid = parent_dict['uuid']
component_dict['uuid'] = uuid
if 'name' in parent_dict:
component_dict['name'] = parent_dict['name']
if 'description' in parent_dict:
component_dict['description'] = parent_dict['description']
if 'measurements' in parent_dict:
component_dict['measurements'] = merge_measurements(parent_dict['measurements'], component_dict['measurements'])
if 'parameters' in parent_dict:
component_dict['paramters'] = merge_measurements(parent_dict['parameters'], component_dict['parameters'])
if 'functions' in parent_dict:
component_dict['functions'] = merge_functions(parent_dict['functions'], component_dict['functions'])
if 'components' in parent_dict:
for obj in parent_dict['components']:
index = [i for i, o in enumerate(component_dict['components']) if o['uuid'] == obj['uuid']]
if len(index) != 1:
raise Exception('Mismatching UUID: {}.'.format(obj['uuid']))
index = index[0]
component_dict['components'][index] = Component.merge_dictionaries(obj, component_dict['components'][index])
return component_dict
@staticmethod
def deserialize(dictionary, mapping=None, filepath=''):
def deserialize(dictionary, implementation=None):
if 'uuid' not in dictionary:
raise SerialisationException('The component can not be deserialized. UUID is missing!')
uuid = dictionary['uuid']
if uuid[:3] != 'COM':
raise SerialisationException(
'The component can not be deserialized. The UUID must start with COM, but actually starts with {}!'.format(uuid[:3]))
if 'file' in dictionary:
try:
with open(os.path.normpath(os.path.join(filepath, dictionary['file']))) as file:
component_dict = json.load(file)
dictionary = Component.merge_dictionaries(dictionary, component_dict)
except Exception as e:
raise SerialisationException('{}: The component can not be deserialized. Provided JSON-file can not be parsed! {}'.format(uuid, e))
'The component can not be deserialized. The UUID must start with COM, but actually starts with {}!'.format(
uuid[:3]))
if 'name' not in dictionary:
raise SerialisationException('{}: The component can not be deserialized. Name is missing!'.format(uuid))
if 'description' not in dictionary:
raise SerialisationException('{}: The component can not be deserialized. Description is missing!'.format(uuid))
raise SerialisationException(
'{}: The component can not be deserialized. Description is missing!'.format(uuid))
if 'measurements' not in dictionary:
raise SerialisationException('{}: The component can not be deserialized. List of measurements is missing!'.format(uuid))
raise SerialisationException(
'{}: The component can not be deserialized. List of measurements is missing!'.format(uuid))
if 'parameters' not in dictionary:
raise SerialisationException('{}: The component can not be deserialized. List of parameters is missing!'.format(uuid))
raise SerialisationException(
'{}: The component can not be deserialized. List of parameters is missing!'.format(uuid))
if 'functions' not in dictionary:
raise SerialisationException('{}: The component can not be deserialized. List of functions is missing!'.format(uuid))
raise SerialisationException(
'{}: The component can not be deserialized. List of functions is missing!'.format(uuid))
if 'components' not in dictionary:
raise SerialisationException('{}: The component can not be deserialized. List of components is missing!'.format(uuid))
raise SerialisationException(
'{}: The component can not be deserialized. List of components is missing!'.format(uuid))
try:
measurements = []
for var in dictionary['measurements']:
if mapping is not None:
submapping = mapping[var['uuid']] if var['uuid'] in mapping else None
measurements += [Measurement.deserialize(var, submapping)]
if implementation is not None:
getter = getattr(implementation, f'get_mea_{var["uuid"][4:].lower()}')
measurements += [Measurement.deserialize(var, getter)]
else:
measurements += [Measurement.deserialize(var)]
except Exception as e:
raise SerialisationException('{}: A measurement of the component can not be deserialized. {}'.format(uuid, e))
raise SerialisationException(
'{}: A measurement of the component can not be deserialized. {}'.format(uuid, e))
try:
parameters = []
for par in dictionary['parameters']:
if mapping is not None:
submapping = mapping[par['uuid']] if par['uuid'] in mapping else None
parameters += [Parameter.deserialize(par, submapping)]
if implementation is not None:
getter = getattr(implementation, f'get_par_{par["uuid"][4:].lower()}')
setter = None if par['constant'] else getattr(implementation, f'set_par_{par["uuid"][4:].lower()}')
parameters += [Parameter.deserialize(par, {'getter': getter, 'setter': setter})]
else:
parameters += [Parameter.deserialize(par)]
except Exception as e:
......@@ -314,9 +249,9 @@ class Component(Element):
try:
functions = []
for func in dictionary['functions']:
if mapping is not None:
submapping = mapping[func['uuid']] if func['uuid'] in mapping else None
functions += [Function.deserialize(func, submapping)]
if implementation is not None:
method = getattr(implementation, f'fun_{func["uuid"][4:].lower()}')
functions += [Function.deserialize(func, method)]
else:
functions += [Function.deserialize(func)]
except Exception as e:
......@@ -324,16 +259,27 @@ class Component(Element):
try:
components = []
for obj in dictionary['components']:
if mapping is not None:
submapping = mapping[obj['uuid']] if obj['uuid'] in mapping else None
components += [Component.deserialize(obj, submapping, filepath)]
if implementation is not None:
child_implementation = None
attributes = list(filter(lambda attr: attr[:5] == '_com_', dir(implementation)))
if f'_com_{obj["uuid"][4:].lower()}' in attributes:
child_implementation = getattr(implementation, f'_com_{obj["uuid"][4:].lower()}')
else:
for attr in attributes:
attribute = getattr(implementation, attr)
if isinstance(attribute, dict) and obj['uuid'] in attribute:
child_implementation = attribute[obj['uuid']]
break
components += [Component.deserialize(obj, child_implementation)]
else:
components += [Component.deserialize(obj, filepath=filepath)]
components += [Component.deserialize(obj)]
except Exception as e:
raise SerialisationException('{}: An component of the component can not be deserialized. {}'.format(uuid, e))
raise SerialisationException(
'{}: An component of the component can not be deserialized. {}'.format(uuid, e))
try:
ontology = dictionary['ontology'] if 'ontology' in dictionary else None
return Component(dictionary['uuid'], dictionary['name'], dictionary['description'], functions, measurements, parameters, components, mapping,
return Component(dictionary['uuid'], dictionary['name'], dictionary['description'], functions, measurements,
parameters, components, implementation,
ontology)
except Exception as e:
raise SerialisationException('{}: The component can not be deserialized. {}'.format(uuid, e))
......@@ -367,33 +313,44 @@ class Component(Element):
else:
raise Exception('Wrong type updating element on existing model!')
def add(self, uuid: str, class_name: str, json_file: str, *args, **kwargs):
def add(self, uuid: str, class_name: str, data: Dict, *args, **kwargs):
if uuid[:3] == 'COM':
if uuid not in [o.uuid for o in self._components]:
if uuid == data['uuid']:
try:
module_name = f'{class_name[:3].lower()}_{class_name[3:].lower()}'
try:
__import__(class_name)
implementation = getattr(sys.modules[class_name], class_name)(*args, **kwargs)
mapping = docstring_parser.parse_docstrings_for_soil(implementation)
self._components += [Component.load(json_file, mapping)]
if self._implementation_add is not None:
self._implementation_add(implementation)
__import__(module_name)
implementation = getattr(sys.modules[module_name], class_name)(self._implementation._device,
*args, **kwargs)
except AttributeError:
module_name = f'hwc.{module_name}'
__import__(module_name)
implementation = getattr(sys.modules[module_name], class_name)(self._implementation._device,
*args, **kwargs)
self._components += [Component.load(data, implementation)]
getattr(self._implementation, 'add')(uuid, implementation)
return implementation
except Exception as e:
raise DeviceException('Can not add component with UUID {}. {}'.format(uuid, e), predecessor=e)
else:
raise UserException(
'The UUID of the component given in the model file ({}) does not match UUID in the requested URL ({}).'.format(
data['uuid'], uuid))
else:
raise UserException('Component has already a child with UUID {}.'.format(uuid))
else:
raise UserException('UUID {} is not of the UUID of an component.'.format(uuid))
def remove(self, uuid: str, *args, **kwargs):
def remove(self, uuid: str, *args, **kwargs) -> str:
for o in self._components:
if o.uuid == uuid:
if self._implementation_remove is not None:
try:
self._implementation_remove(*args, **kwargs)
getattr(self._implementation, 'remove')(*args, **kwargs)
except Exception as e:
raise DeviceException(str(e), predecessor=e)
self._components.remove(o)
return
return o.__class__.__name__
raise ChildNotFoundException('{}: Child {} not found!'.format(self.uuid, uuid))
@staticmethod
......@@ -405,7 +362,7 @@ class Component(Element):
raise Exception('{} is not a json file!'.format(file))
with open(file, 'r') as f:
model_dict = json.load(f)
return Component.deserialize(model_dict, implementation, os.path.dirname(file))
return Component.deserialize(model_dict, implementation)
elif isinstance(file, dict):
return Component.deserialize(file, implementation)
else:
......
from deprecated import deprecated
import docstring_parser
import types
from typing import Any, Callable
from wzl.utilities import root_logger
from .stream import ConfigurableJob, FixedJob
from ..utils.error import SerialisationException
logger = root_logger.get(__name__)
@deprecated(version='6.0.0', reason='Building service model from docstrings is too error-prone.')
def parse_children(parse_docstrings: Callable, parent_url: str, implementation: Any):
export_dict = {}
children_attribute_list, uuid_attribute_list = [], []
# parse docstring parameters
doc = docstring_parser.parse(implementation.__doc__)
try:
uuid = doc.short_description.split('.')[0]
if uuid[:3] != 'OBJ':
raise SerialisationException('Short description of class {} misses UUID of the Object.'.format(implementation.__class__.__name__))
except:
raise SerialisationException('Short description of the class {} is missing.'.format(implementation.__class__.__name__))
if len(doc.params) > 0:
for element in doc.params:
if isinstance(element, docstring_parser.common.DocstringParam) and element.args[1] == 'uuids':
uuid_attribute_list = element.description.replace('\t', '').replace(' ', '').split(',')
elif isinstance(element, docstring_parser.common.DocstringParam) and element.args[1] == 'children':
children_attribute_list = element.description.replace('\t', '').replace(' ', '').split(',')
# check if the uuid and children list parameters are matching
if len(uuid_attribute_list) != len(children_attribute_list):
raise SerialisationException(
'The number of attributes of uuids and and corresponding children differ!\nlen(uuids) != len(children): {} != {}'.format(
len(uuid_attribute_list), len(children_attribute_list)))
# try to retrieve attribute from implementation and parse the children
for uuid_attribute, children_attribute in zip(uuid_attribute_list, children_attribute_list):
try:
uuid_list = implementation.__getattribute__(uuid_attribute)
children_list = implementation.__getattribute__(children_attribute)
for child_uuid, child in zip(uuid_list, children_list):
child_url = '/'.join([uuid, child_uuid]) if parent_url == '' else '/'.join([parent_url, uuid, child_uuid])
export_dict[child_uuid] = parse_docstrings(child, child_url)
except AttributeError as e:
raise SerialisationException('AttributeError: {}'.format(str(e)))
return export_dict
@deprecated(version='6.0.0', reason='Building service model from docstrings is too error-prone.')
def parse_docstrings_for_mqtt(implementation, parent_url="", *args, **kwargs):
if implementation is None:
return None
schedule = []
child_schedule = parse_children(parse_docstrings_for_mqtt, parent_url, implementation)
for key in child_schedule:
schedule += [*child_schedule[key]]
# parse functions and setters/getters of variables and parameters
object_dict = implementation.__class__.__dict__
funcs = [object_dict[x] for x in object_dict if isinstance(object_dict[x], types.FunctionType)]
for func in funcs:
doc = docstring_parser.parse(func.__doc__)
if func.__name__ == '__init__' or doc.short_description == 'PUT.' or doc.short_description == 'DELETE.':
continue
short = doc.short_description
if short is not None and short[:3] != 'FUN' and len(doc.params) == 0:
returns = doc.returns.description.split('.')[0].split(',')
assert (len(returns) == 1)
if returns[0][:3] == 'VAR':
lines = doc.long_description.split('\n')
for line in lines:
if line[:4] == 'MQTT':
interval = line.replace(' ', '').split(':')[1]
if len(interval) > 4 and interval[:4] == 'self':
interval = implementation.__getattribute__(interval.split('.')[1])
schedule += [
ConfigurableJob('{}/{}'.format(parent_url, returns[0]), interval, implementation.__getattribute__(func.__name__))]
else:
schedule += [
FixedJob('{}/{}'.format(parent_url, returns[0]), eval(interval), implementation.__getattribute__(func.__name__))]
return schedule
@deprecated(version='6.0.0', reason='Building service model from docstrings is too error-prone.')
def parse_docstrings_for_soil(implementation: Any, parent_url="", *args, **kwargs):
if implementation is None:
return None
export_dict = parse_children(parse_docstrings_for_soil, parent_url, implementation)
# parse functions and setters/getters of variables and parameters
object_dict = implementation.__class__.__dict__
funcs = [object_dict[x] for x in object_dict if isinstance(object_dict[x], types.FunctionType)]
for func in funcs:
try:
if func.__name__[0] == '_' or func.__doc__ is None:
continue
doc = docstring_parser.parse(func.__doc__)
short = doc.short_description
if short is not None and short.split('-')[0] == 'FUN':
d = {'method': implementation.__getattribute__(func.__name__), 'signature': {'arguments': {}, 'returns': []}}
for arg in doc.params:
desc = arg.description.split('.')
d['signature']['arguments'][desc[0]] = arg.arg_name
if doc.returns is not None:
returns_string = doc.returns.description.split('.')[0]
if ', ' in returns_string:
d['signature']['returns'] = returns_string.split(', ')
if ',\n' in returns_string:
d['signature']['returns'] = returns_string.split(',\n')
else:
d['signature']['returns'] = returns_string.split(',')
export_dict[short.split('.')[0]] = d
elif short is not None and short.split('.')[0] == 'PUT':
export_dict['add'] = implementation.__getattribute__(func.__name__)
elif short is not None and short.split('.')[0] == 'DELETE':
export_dict['remove'] = implementation.__getattribute__(func.__name__)
elif (short is None or short[:3] != 'FUN') and len(doc.params) == 0 and doc.returns is not None:
returns = doc.returns.description.split('.')[0].split(',')
assert (len(returns) == 1)
if returns[0][:3] == 'VAR':
export_dict[returns[0]] = implementation.__getattribute__(func.__name__)
else:
if returns[0] not in export_dict:
export_dict[returns[0]] = {'setter': None, 'getter': None}
export_dict[returns[0]]['getter'] = implementation.__getattribute__(func.__name__)
elif len(list(doc.params)) == 1:
# assert (doc.returns.description.split('.')[0] == 'None')
arg = doc.params[0].description.split('.')[0]
assert (arg[:3] == 'PAR')
if arg not in export_dict:
export_dict[arg] = {'setter': None, 'getter': None}
export_dict[arg]['setter'] = implementation.__getattribute__(func.__name__)
except SerialisationException as e:
logger.error("{}: Error in docstring parsing: {}".format(func.__name__, e))
return export_dict
import functools
import inspect
from typing import Any, Dict, List, Union
from typing import Any, Dict, List, Union, Callable
from wzl.utilities import root_logger
from .element import Element
......@@ -16,7 +16,7 @@ logger = root_logger.get(__name__)
class Function(Element):
def __init__(self, uuid: str, name: str, description: str, arguments: List[Figure], returns: List[Figure],
implementation: Dict, ontology: str = None):
implementation: Callable, ontology: str = None):
Element.__init__(self, uuid, name, description, ontology)
if uuid[:3] != 'FUN':
raise Exception('{}: The UUID must start with FUN!'.format(uuid))
......@@ -33,9 +33,12 @@ class Function(Element):
self._arguments = arguments
self._returns = returns
self._implementation = implementation['method']
self._signature = implementation['signature']
self._mqtt_callback = implementation['mqtt_callback'] if 'mqtt_callback' in implementation.keys() else None
self._implementation = implementation
signature_arguments = {}
for arg in self._arguments:
signature_arguments[arg['uuid']] = f'arg_{arg["uuid"][4:].lower()}'
self._signature = {'arguments': signature_arguments, 'returns': [ret['uuid'] for ret in self._returns]}
# self._mqtt_callback = implementation['mqtt_callback'] if 'mqtt_callback' in implementation.keys() else None
def __getitem__(self, item: Union[str, List[str]], method: int = HTTP_GET) -> Any:
if item == "arguments":
......@@ -91,14 +94,14 @@ class Function(Element):
# set up servers
try:
if inspect.iscoroutinefunction(self._implementation):
if self._mqtt_callback is not None:
result = await self._implementation(functools.partial(self._mqtt_callback, topic), **args)
else:
# if self._mqtt_callback is not None:
# result = await self._implementation(functools.partial(self._mqtt_callback, topic), **args)
# else:
result = await self._implementation(**args)
else:
if self._mqtt_callback is not None:
result = self._implementation(functools.partial(self._mqtt_callback, topic), **args)
else:
# if self._mqtt_callback is not None:
# result = self._implementation(functools.partial(self._mqtt_callback, topic), **args)
# else:
result = self._implementation(**args)
except Exception as e:
raise DeviceException(str(e), predecessor=e)
......
# from __future__ import annotations
import json
import os
import sys
from deprecated import deprecated
from typing import List, Any, Union, Dict
from wzl.utilities import root_logger
from . import docstring_parser
from .element import Element
from .error import ChildNotFoundException
from ..utils.error import SerialisationException, DeviceException, UserException
from .function import Function
from .variable import Variable
from .parameter import Parameter
from ..utils.constants import HTTP_GET
logger = root_logger.get(__name__)
@deprecated(version='6.0.0', reason='"Object" has been renamed to "Component" to avoid ambiguity.')
class Object(Element):
def __init__(self, uuid: str, name: str, description: str, functions: List[Function], variables: List[Variable], parameters: List[Parameter],
objects: List['Object'], implementation: Dict, ontology: str = None):
Element.__init__(self, uuid, name, description, ontology)
if not isinstance(functions, list):
raise Exception('{}: Given functions are not a list!'.format(uuid))
for f in functions:
if not isinstance(f, Function):
raise Exception('{}: Given function is not of type Function!'.format(uuid))
if not isinstance(variables, list):
raise Exception('{}: Given variables are not a list!'.format(uuid))
for v in variables:
if not isinstance(v, Variable):
raise Exception('{}: Given variable is not of type Variables!'.format(uuid))
if not isinstance(parameters, list):
raise Exception('{}: Given variables are not a list!'.format(uuid))
for p in parameters:
if not isinstance(p, Parameter):
raise Exception('{}: Given variable is not of type Variables!'.format(uuid))
if not isinstance(objects, list):
raise Exception('{}: Given objects are not a list!'.format(uuid))
for o in objects:
if not isinstance(o, Object):
raise Exception('{}: Given object is not of type Objects!'.format(uuid))
self._functions = functions
self._variables = variables
self._objects = objects
self._parameters = parameters
self._implementation_add = implementation['add'] if 'add' in implementation else None
self._implementation_remove = implementation['remove'] if 'remove' in implementation else None
def __getitem__(self, item: Union[str, List[str]], method: int = HTTP_GET) -> Any:
attribute = False
if isinstance(item, str):
attribute = hasattr(self, item)
if item == "functions":
return self._functions
if item == "variables":
return self._variables
if item == "parameters":
return self._variables
if item == "objects":
return self._objects
if item == "children":
ret = []
everything = self._objects + self._variables + self._parameters + self._functions
for o in everything:
ret += [o.uuid]
return ret
# if the item is a list, the list contains the uuid of the descendants
if isinstance(item, list):
if len(item) > 0 and super().__getitem__('uuid', method) == item[0]:
item = item[1:]
if len(item) == 0:
return self
everything = self._objects + self._variables + self._parameters + self._functions
for o in everything:
if o.uuid == item[0]:
if len(item) == 1:
return o
else:
return o.__getitem__(item[1:], method)
raise Exception("{}: Given uuid {} is not the id of a child of the current object!".format(self.uuid, item))
return super().__getitem__(item, method)
def __setitem__(self, key: str, value: Any):
if key == "functions":
if not isinstance(value, list):
raise Exception('{}: Given functions are not a list!'.format(self.uuid))
for f in value:
if not isinstance(f, Function):
raise Exception('{}: Given function is not of type Function!'.format(self.uuid))
self._functions = value
elif key == "variables":
if not isinstance(value, list):
raise Exception('{}: Given variables are not a list!'.format(self.uuid))
for v in value:
if not isinstance(v, Variable):
raise Exception('{}: Given variable is not of type Variable!'.format(self.uuid))
self._variables = value
elif key == "parameters":
if not isinstance(value, list):
raise Exception('{}: Given parameters are not a list!'.format(self.uuid))
for v in value:
if not isinstance(v, Parameter):
raise Exception('{}: Given parameter is not of type Parameter!'.format(self.uuid))
self._variables = value
elif key == "objects":
if not isinstance(value, list):
raise Exception('{}: Given objects are not a list!'.format(self.uuid))
for o in value:
if not isinstance(o, Object):
raise Exception('{}: Given object is not of type Object!'.format(self.uuid))
self._objects = value
else:
super().__setitem__(key, value)
def serialize(self, keys: List[Any], method: int = HTTP_GET) -> Dict[str, Any]:
if not keys: # list is empty
keys = ['uuid', 'name', 'description', 'children', 'ontology']
if 'all' in keys: # serialize complete tree recursively (overrides all other keys)
dictionary = super().serialize([])
dictionary['variables'] = list(map(lambda x: x.serialize([]), self._variables))
dictionary['functions'] = list(map(lambda x: x.serialize(['all']), self._functions))
dictionary['objects'] = list(map(lambda x: x.serialize(['all']), self._objects))
dictionary['parameters'] = list(map(lambda x: x.serialize(['all']), self._parameters))
return dictionary
dictionary = super().serialize(keys, method)
if 'children' in keys:
everything = self._objects + self._variables + self._parameters + self._functions
dictionary['children'] = list(map(lambda x: x.serialize(['name', 'uuid']), everything))
return dictionary
@staticmethod
def merge_dictionaries(parent_dict, object_dict):
def merge_variables(parent_list, object_list):
for variable in parent_list:
if 'uuid' not in variable:
raise Exception('UUID {} not given for variable to be overwritten.'.format(variable['uuid']))
idx = [i for i, v in enumerate(object_list) if v['uuid'] == variable['uuid']]
if len(idx) != 1:
raise Exception('Mismatching UUID: {}.'.format(variable['uuid']))
idx = idx[0]
object_list[idx].update(variable)
return object_list
def merge_functions(parent_list, object_list):
for function in parent_list:
if 'uuid' not in function:
raise Exception('UUID {} not given for function to be overwritten.'.format(function['uuid']))
idx = [i for i, v in enumerate(object_list) if v['uuid'] == function['uuid']]
if len(idx) != 1:
raise Exception('Mismatching UUID: {}.'.format(function['uuid']))
idx = idx[0]
if 'name' in function:
object_list[idx]['name'] = function['name']
if 'description' in function:
object_list[idx]['description'] = function['description']
if 'arguments' in function:
object_list[idx]['arguments'] = merge_variables(function['arguments'], object_list[idx]['arguments'])
if 'returns' in function:
object_list[idx]['returns'] = merge_variables(function['returns'], object_list[idx]['returns'])
return object_list
# merge objects, i.e. overwrite fields of "static" children dictionary with the "dynamic" fields of the parents dictionary
uuid = parent_dict['uuid']
object_dict['uuid'] = uuid
if 'name' in parent_dict:
object_dict['name'] = parent_dict['name']
if 'description' in parent_dict:
object_dict['description'] = parent_dict['description']
if 'variables' in parent_dict:
object_dict['variables'] = merge_variables(parent_dict['variables'], object_dict['variables'])
if 'parameters' in parent_dict:
object_dict['paramters'] = merge_variables(parent_dict['parameters'], object_dict['parameters'])
if 'functions' in parent_dict:
object_dict['functions'] = merge_functions(parent_dict['functions'], object_dict['functions'])
if 'objects' in parent_dict:
for obj in parent_dict['objects']:
index = [i for i, o in enumerate(object_dict['objects']) if o['uuid'] == obj['uuid']]
if len(index) != 1:
raise Exception('Mismatching UUID: {}.'.format(obj['uuid']))
index = index[0]
object_dict['objects'][index] = Object.merge_dictionaries(obj, object_dict['objects'][index])
return object_dict
@staticmethod
def deserialize(dictionary, mapping=None, filepath=''):
if 'uuid' not in dictionary:
raise SerialisationException('The object can not be deserialized. UUID is missing!')
uuid = dictionary['uuid']
if 'file' in dictionary:
try:
with open(os.path.normpath(os.path.join(filepath, dictionary['file']))) as file:
object_dict = json.load(file)
dictionary = Object.merge_dictionaries(dictionary, object_dict)
except Exception as e:
raise SerialisationException('{}: The object can not be deserialized. Provided JSON-file can not be parsed! {}'.format(uuid, e))
if 'name' not in dictionary:
raise SerialisationException('{}: The object can not be deserialized. Name is missing!'.format(uuid))
if 'description' not in dictionary:
raise SerialisationException('{}: The object can not be deserialized. Description is missing!'.format(uuid))
if 'variables' not in dictionary:
raise SerialisationException('{}: The object can not be deserialized. List of variables is missing!'.format(uuid))
if 'parameters' not in dictionary:
raise SerialisationException('{}: The object can not be deserialized. List of parameters is missing!'.format(uuid))
if 'functions' not in dictionary:
raise SerialisationException('{}: The object can not be deserialized. List of functions is missing!'.format(uuid))
if 'objects' not in dictionary:
raise SerialisationException('{}: The object can not be deserialized. List of objects is missing!'.format(uuid))
try:
variables = []
for var in dictionary['variables']:
if mapping is not None:
submapping = mapping[var["uuid"]] if var['uuid'] in mapping else None
variables += [Variable.deserialize(var,submapping)]
else:
variables += [Variable.deserialize(var)]
except Exception as e:
raise SerialisationException('{}: A variable of the object can not be deserialized. {}'.format(uuid, e))
try:
parameters = []
for par in dictionary['parameters']:
if mapping is not None:
submapping = mapping[par["uuid"]] if par['uuid'] in mapping else None
parameters += [Parameter.deserialize(par, submapping)]
else:
parameters += [Parameter.deserialize(par)]
except Exception as e:
raise SerialisationException('{}: A parameter of the object can not be deserialized. {}'.format(uuid, e))
try:
functions = []
for func in dictionary['functions']:
if mapping is not None:
submapping = mapping[func["uuid"]] if func['uuid'] in mapping else None
functions += [Function.deserialize(func, submapping)]
else:
functions += [Function.deserialize(func)]
except Exception as e:
raise SerialisationException('{}: A function of the object can not be deserialized. {}'.format(uuid, e))
try:
objects = []
for obj in dictionary['objects']:
if mapping is not None:
submapping = mapping[obj["uuid"]] if obj['uuid'] in mapping else None
objects += [Object.deserialize(obj, submapping, filepath)]
else:
objects += [Object.deserialize(obj, filepath=filepath)]
except Exception as e:
raise SerialisationException('{}: An object of the object can not be deserialized. {}'.format(uuid, e))
try:
ontology = dictionary['ontology'] if 'ontology' in dictionary else None
return Object(dictionary['uuid'], dictionary['name'], dictionary['description'], functions, variables, parameters, objects, mapping,
ontology)
except Exception as e:
raise SerialisationException('{}: The object can not be deserialized. {}'.format(uuid, e))
def write(self, filename: str):
if filename[-5:] != ".json":
raise Exception('{} is not a json file!'.format(filename))
model_dict = self.serialize(['all'])
f = open(filename, "w")
f.write(json.dumps(model_dict))
f.close()
def update(self, element: Union['Object', Function, Variable, Parameter]):
if isinstance(element, Object):
for i, o in enumerate(self._objects):
if o.uuid == element.uuid:
self._objects[i] = element
return
# self._objects.append(element)
else:
raise Exception("Wrong type updating element on existing model!")
def add(self, uuid: str, class_name: str, json_file: str, *args, **kwargs):
if uuid[:3] == 'OBJ':
if uuid not in [o.uuid for o in self._objects]:
try:
__import__(class_name)
implementation = getattr(sys.modules[class_name], class_name)(*args, **kwargs)
mapping = docstring_parser.parse_docstrings_for_soil(implementation)
self._objects += [Object.load(json_file, mapping)]
if self._implementation_add is not None:
self._implementation_add(implementation)
except Exception as e:
raise DeviceException('Can not add object with UUID {}. {}'.format(uuid, e), predecessor=e)
else:
raise UserException('Object has already a child with UUID {}.'.format(uuid))
else:
raise UserException('UUID {} is not of the UUID of an object.'.format(uuid))
def remove(self, uuid: str, *args, **kwargs):
for o in self._objects:
if o.uuid == uuid:
if self._implementation_remove is not None:
try:
self._implementation_remove(*args, **kwargs)
except Exception as e:
raise DeviceException(str(e), predecessor=e)
self._objects.remove(o)
return
raise ChildNotFoundException('{}: Child {} not found!'.format(self.uuid, uuid))
@staticmethod
def load(file: Union[str, dict], implementation: Any) -> 'Object':
if isinstance(file, str):
if not os.path.isfile(file):
raise Exception('There is no file named {}!'.format(file))
if file[-5:] != ".json":
raise Exception('{} is not a json file!'.format(file))
with open(file, 'r') as f:
model_dict = json.load(f)
return Object.deserialize(model_dict, implementation, os.path.dirname(file))
elif isinstance(file, dict):
return Object.deserialize(file, implementation)
else:
raise Exception('Given file is not a name of a json-file nor a json-like dictionary.')
\ No newline at end of file
import datetime
import json
import traceback
from abc import ABC, abstractmethod
import datetime
from typing import List, Callable, Any, Union, Dict
from wzl.utilities import root_logger
from wzl.mqtt.client import MQTTPublisher
from wzl.utilities import root_logger
# from .component import Component
from .event import Event
from ..utils import serialize
......@@ -103,7 +105,6 @@ class FixedJob(Job):
def interval(self) -> float:
return self._interval
@property
def _is_triggered(self) -> bool:
return True
......@@ -210,6 +211,17 @@ class StreamScheduler(object):
"""
self._running = False
def add_jobs(self, schedule: List[Job]):
self._schedule += schedule
def remove_jobs(self, fqid: str):
jobs_to_remove = []
for job in self._schedule:
if fqid in job.topic:
jobs_to_remove += [jobs_to_remove]
for job in jobs_to_remove:
self._schedule.remove(job)
def add_publisher(self, publisher: MQTTPublisher) -> None:
"""Adds the given publisher to the list of publishers.
......@@ -260,6 +272,7 @@ class StreamScheduler(object):
job.schedule()
next = job.determine_next(next)
except JobError:
logger.error(traceback.format_exc())
job.stop()
if next is None:
......
from deprecated import deprecated
from typing import Dict
from wzl.utilities import root_logger
from .datatype import Datatype
from ..utils.constants import HTTP_GET
from ..utils.error import SerialisationException
from .figure import Figure
logger = root_logger.get(__name__)
@deprecated(version='6.0.0', reason='"Variable" has been renamed to "Measurement" to be consistent with VIM and published articles.')
class Variable(Figure):
def __init__(self, uuid, name, description, datatype, dimension, range, getter, unit, nonce=None, ontology: str = None):
Figure.__init__(self, uuid, name, description, datatype, dimension, range, None, getter, ontology)
self._unit = unit
self._covariance = None
self._uncertainty = None
self._timestamp = None
self._nonce = nonce
@property
def unit(self):
return self._unit
@property
def covariance(self):
return self._covariance
@property
def uncertainty(self):
return self._uncertainty
@property
def timestamp(self):
return self._timestamp
@property
def nonce(self):
return self._nonce
def __getitem__(self, item: str, method=HTTP_GET):
"""
Getter-Method.
According to the given key the method returns the value of the corresponding attribute.
:param item: name of the attribute. Provided as string without leading underscores.
:param method: ???
:return: the value of the attribute indicated by 'item'.
"""
if item == "unit":
return self._unit
if item == 'nonce':
return self._nonce
if item == 'covariance':
return self._covariance
if item == 'uncertainty':
return self._uncertainty
if item == 'timestamp':
return self._timestamp
if item == []:
return self
return super().__getitem__(item, method)
def __setitem__(self, key: str, value):
"""
Setter - Method
If key is "value" datatype, dimension and range is checked for correctness.
:param key: sets the value of the attribute with name 'item' to the provided value.
:param value: value to be set
"""
if key in ['value', 'timestamp', 'covariance', 'uncertainty']:
raise KeyError('The {} attribute of a measurement can not be set manually!'.format(key))
elif key == "nonce":
self._nonce = self._nonce
elif key == 'unit':
self._unit = self._unit
else:
super().__setitem__(key, value)
def serialize(self, keys: [str], legacy_mode: bool, method=HTTP_GET):
"""
Serializes an object of type Variable into a JSON-like dictionary.
:param keys: All attributes given in the "keys" array are serialized.
:param method: ???
:return: a dictionary having all "keys" as keys and the values of the corresponding attributes as value.
"""
# list is empty provide all attributes of the default-serialization
if not keys:
keys = ['uuid', 'name', 'description', 'datatype', 'value', 'dimension', 'range', 'timestamp', 'nonce', 'covariance', 'uncertainty',
'unit', 'ontology']
if 'value' in keys and 'timestamp' not in keys:
keys += ['timestamp']
dictionary = {}
# get all attribute values
for key in keys:
value = self.__getitem__(key, method)
# in case of timestamp convert into RFC3339 string
if key == 'timestamp' or (key == 'value' and self._datatype == 'time'):
value = value.isoformat() + 'Z' if value is not None else ""
if key == "datatype":
dictionary[key] = value.to_string(legacy_mode)
else:
dictionary[key] = value
return dictionary
@staticmethod
def deserialize(dictionary: Dict, implementation=None):
"""
Takes a JSON-like dictionary, parses it, performs a complete correctness check and returns an object of type Figure with the
values provided in the dictionary, if dictionary is a valid serialization of a Figure.
:param dictionary: serialized variable
:param implementation: implementation wrapper object,
:return: an object of type Figure
"""
# check if all required attributes are present
if 'uuid' not in dictionary:
raise SerialisationException('The variable can not be deserialized. UUID is missing!')
uuid = dictionary['uuid']
if 'name' not in dictionary:
raise SerialisationException('{}: The variable can not be deserialized. Name is missing!'.format(uuid))
if 'description' not in dictionary:
raise SerialisationException('{}: The variable can not be deserialized. Description is missing!'.format(uuid))
if 'datatype' not in dictionary:
raise SerialisationException('{}: The variable can not be deserialized. Datatype is missing!'.format(uuid))
if 'dimension' not in dictionary:
raise SerialisationException('{}: The variable can not be deserialized. Dimension is missing!'.format(uuid))
if 'value' not in dictionary:
raise SerialisationException('{}: The variable can not be deserialized. Value is missing!'.format(uuid))
if 'range' not in dictionary:
raise SerialisationException('{}: The variable can not be deserialized. Range is missing!'.format(uuid))
if 'unit' not in dictionary:
raise SerialisationException('{}: The variable can not be deserialized. Unit is missing!'.format(uuid))
try:
ontology = dictionary['ontology'] if 'ontology' in dictionary else None
return Variable(dictionary['uuid'], dictionary['name'], dictionary['description'], Datatype.from_string(dictionary['datatype']), dictionary['dimension'],
dictionary['range'], implementation, dictionary['unit'], ontology)
except Exception as e:
raise SerialisationException('{}: The variable can not be deserialized. {}'.format(uuid, e))
0% Loading or .
You are about to add 0 people to the discussion. Proceed with caution.
Please register or to comment