Verified Commit 90b8e11e authored by Leon Mauritz Möller's avatar Leon Mauritz Möller
Browse files

adapter.xml: add support for IEC61360 Concept Descriptions

add _get_text_mapped_or_none()
rename _get_child_multiple() -> _get_all_children_expect_tag()
allow passing the namespace to _construct_key_tuple(),
  _construct_reference() and _construct_lang_string_set()
minor fixes in _construct_entity() and _construct_file()
update copyright year to 2020
parent 8c95bfb0
# Copyright 2019 PyI40AAS Contributors
# Copyright 2020 PyI40AAS Contributors
#
# Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except in compliance with
# the License. You may obtain a copy of the License at
......@@ -40,7 +40,8 @@ from typing import Any, Callable, Dict, IO, Iterable, Optional, Tuple, Type, Typ
from mypy_extensions import TypedDict # TODO: import this from typing should we require python 3.8+ at some point
from .xml_serialization import NS_AAS, NS_ABAC, NS_IEC
from .._generic import MODELING_KIND_INVERSE, ASSET_KIND_INVERSE, KEY_ELEMENTS_INVERSE, KEY_TYPES_INVERSE, \
IDENTIFIER_TYPES_INVERSE, ENTITY_TYPES_INVERSE, KEY_ELEMENTS_CLASSES_INVERSE
IDENTIFIER_TYPES_INVERSE, ENTITY_TYPES_INVERSE, IEC61360_DATA_TYPES_INVERSE, IEC61360_LEVEL_TYPES_INVERSE, \
KEY_ELEMENTS_CLASSES_INVERSE
logger = logging.getLogger(__name__)
......@@ -138,7 +139,7 @@ def _get_child_mandatory(parent: etree.Element, child_tag: str) -> etree.Element
return child
def _get_child_multiple(parent: etree.Element, exppected_tag: str, failsafe: bool) -> Iterable[etree.Element]:
def _get_all_children_expect_tag(parent: etree.Element, exppected_tag: str, failsafe: bool) -> Iterable[etree.Element]:
"""
Iterates over all children, matching the tag.
......@@ -212,6 +213,20 @@ def _get_text_or_none(element: Optional[etree.Element]) -> Optional[str]:
return element.text if element is not None else None
def _get_text_mapped_or_none(element: Optional[etree.Element], dct: Dict[str, T]) -> Optional[T]:
"""
Returns dct[element.text] or None, if the element is None, has no text or the text is not in dct.
:param element: The xml element or None.
:param dct: The dictionary that is used to map the text.
:return: The mapped text or None.
"""
text = _get_text_or_none(element)
if text is None or text not in dct:
return None
return dct[text]
def _get_text_mandatory(element: etree.Element) -> str:
"""
A helper function for getting the mandatory text of an element.
......@@ -346,8 +361,8 @@ def _child_construct_multiple(parent: etree.Element, expected_tag: str, construc
If an error occurred while constructing an element and while in failsafe mode,
the respective element will be skipped.
"""
return _failsafe_construct_multiple(_get_child_multiple(parent, expected_tag, failsafe), constructor, failsafe,
**kwargs)
return _failsafe_construct_multiple(_get_all_children_expect_tag(parent, expected_tag, failsafe), constructor,
failsafe, **kwargs)
def _child_text_mandatory(parent: etree.Element, child_tag: str) -> str:
......@@ -429,9 +444,9 @@ def _get_modeling_kind_kwarg(element: etree.Element) -> ModelingKindKwArg:
An empty dict if not.
"""
kwargs: ModelingKindKwArg = ModelingKindKwArg()
kind = element.find(NS_AAS + "kind")
kind = _get_text_mapped_or_none(element.find(NS_AAS + "kind"), MODELING_KIND_INVERSE)
if kind is not None:
kwargs["kind"] = _get_text_mandatory_mapped(kind, MODELING_KIND_INVERSE)
kwargs["kind"] = kind
return kwargs
......@@ -444,13 +459,15 @@ def _construct_key(element: etree.Element, _failsafe: bool, **_kwargs: Any) -> m
)
def _construct_key_tuple(element: etree.Element, failsafe: bool, **_kwargs: Any) -> Tuple[model.Key, ...]:
keys = _get_child_mandatory(element, NS_AAS + "keys")
return tuple(_child_construct_multiple(keys, NS_AAS + "key", _construct_key, failsafe))
def _construct_key_tuple(element: etree.Element, failsafe: bool, namespace: str = NS_AAS, **_kwargs: Any)\
-> Tuple[model.Key, ...]:
keys = _get_child_mandatory(element, namespace + "keys")
return tuple(_child_construct_multiple(keys, namespace + "key", _construct_key, failsafe))
def _construct_reference(element: etree.Element, failsafe: bool, **_kwargs: Any) -> model.Reference:
return model.Reference(_construct_key_tuple(element, failsafe))
def _construct_reference(element: etree.Element, failsafe: bool, namespace: str = NS_AAS, **_kwargs: Any) \
-> model.Reference:
return model.Reference(_construct_key_tuple(element, failsafe, namespace=namespace))
def _construct_aas_reference(element: etree.Element, failsafe: bool, type_: Type[model.base._RT], **_kwargs: Any) \
......@@ -500,9 +517,10 @@ def _construct_administrative_information(element: etree.Element, _failsafe: boo
)
def _construct_lang_string_set(element: etree.Element, failsafe: bool, **_kwargs: Any) -> model.LangStringSet:
def _construct_lang_string_set(element: etree.Element, failsafe: bool, namespace: str = NS_AAS, **_kwargs: Any) \
-> model.LangStringSet:
lss: model.LangStringSet = {}
for lang_string in _get_child_multiple(element, NS_AAS + "langString", failsafe):
for lang_string in _get_all_children_expect_tag(element, namespace + "langString", failsafe):
lss[_get_attrib_mandatory(lang_string, "lang")] = _get_text_mandatory(lang_string)
return lss
......@@ -643,9 +661,9 @@ def _construct_blob(element: etree.Element, failsafe: bool, **_kwargs: Any) -> m
_child_text_mandatory(element, NS_AAS + "mimeType"),
**_get_modeling_kind_kwarg(element)
)
value = element.find(NS_AAS + "value")
value = _get_text_or_none(element.find(NS_AAS + "value"))
if value is not None:
blob.value = base64.b64decode(_get_text_mandatory(value))
blob.value = base64.b64decode(value)
_amend_abstract_attributes(blob, element, failsafe)
return blob
......@@ -663,11 +681,10 @@ def _construct_entity(element: etree.Element, failsafe: bool, **_kwargs: Any) ->
entity = model.Entity(
_child_text_mandatory(element, NS_AAS + "idShort"),
_child_text_mandatory_mapped(element, NS_AAS + "entityType", ENTITY_TYPES_INVERSE),
# pass the asset to the constructor, because self managed entities need asset references
asset=_failsafe_construct(element.find(NS_AAS + "assetRef"), _construct_asset_reference, failsafe),
**_get_modeling_kind_kwarg(element)
)
asset_ref = _failsafe_construct(element.find(NS_AAS + "assetRef"), _construct_asset_reference, failsafe)
if asset_ref is not None:
entity.asset = asset_ref
for stmt in _failsafe_construct_multiple(_get_child_mandatory(element, NS_AAS + "statements"),
_construct_submodel_element, failsafe):
entity.statement.add(stmt)
......@@ -678,12 +695,12 @@ def _construct_entity(element: etree.Element, failsafe: bool, **_kwargs: Any) ->
def _construct_file(element: etree.Element, failsafe: bool, **_kwargs: Any) -> model.File:
file = model.File(
_child_text_mandatory(element, NS_AAS + "idShort"),
_child_text_mandatory(element, NS_AAS + "idShort"),
_child_text_mandatory(element, NS_AAS + "mimeType"),
**_get_modeling_kind_kwarg(element)
)
value = element.find(NS_AAS + "value")
value = _get_text_or_none(element.find(NS_AAS + "value"))
if value is not None:
file.value = _get_text_mandatory(value)
file.value = value
_amend_abstract_attributes(file, element, failsafe)
return file
......@@ -860,10 +877,108 @@ def _construct_submodel(element: etree.Element, failsafe: bool, **_kwargs: Any)
return submodel
def _construct_concept_description(element: etree.Element, failsafe: bool, **_kwargs: Any) -> model.ConceptDescription:
cd = model.ConceptDescription(
_child_construct_mandatory(element, NS_AAS + "identification", _construct_identifier)
def _construct_value_reference_pair(element: etree.Element, _failsafe: bool,
value_format: Optional[model.DataTypeDef] = None, **_kwargs: Any) \
-> model.ValueReferencePair:
if value_format is None:
raise ValueError("No value format given!")
return model.ValueReferencePair(
value_format,
model.datatypes.from_xsd(_child_text_mandatory(element, NS_IEC + "value"), value_format),
_child_construct_mandatory(element, NS_IEC + "valueId", _construct_reference, namespace=NS_IEC)
)
def _construct_value_list(element: etree.Element, failsafe: bool,
value_format: Optional[model.DataTypeDef] = None, **_kwargs: Any) \
-> model.ValueList:
return set(
_child_construct_multiple(element, NS_IEC + "valueReferencePair", _construct_value_reference_pair, failsafe,
value_format=value_format)
)
def _construct_iec61360_concept_description(element: etree.Element, failsafe: bool,
identifier: Optional[model.Identifier] = None, **_kwargs: Any) \
-> model.IEC61360ConceptDescription:
if identifier is None:
raise ValueError("No identifier given!")
cd = model.IEC61360ConceptDescription(
identifier,
_child_construct_mandatory(element, NS_IEC + "preferredName", _construct_lang_string_set, namespace=NS_IEC),
_child_text_mandatory_mapped(element, NS_IEC + "dataType", IEC61360_DATA_TYPES_INVERSE)
)
definition = _failsafe_construct(element.find(NS_IEC + "definition"), _construct_lang_string_set, failsafe,
namespace=NS_IEC)
if definition is not None:
cd.definition = definition
short_name = _failsafe_construct(element.find(NS_IEC + "shortName"), _construct_lang_string_set, failsafe,
namespace=NS_IEC)
if short_name is not None:
cd.short_name = short_name
unit = _get_text_or_none(element.find(NS_IEC + "unit"))
if unit is not None:
cd.unit = unit
unit_id = _failsafe_construct(element.find(NS_IEC + "unitId"), _construct_reference, failsafe, namespace=NS_IEC)
if unit_id is not None:
cd.unit_id = unit_id
source_of_definition = _get_text_or_none(element.find(NS_IEC + "sourceOfDefinition"))
if source_of_definition is not None:
cd.source_of_definition = source_of_definition
symbol = _get_text_or_none(element.find(NS_IEC + "symbol"))
if symbol is not None:
cd.symbol = symbol
value_format = _get_text_mapped_or_none(element.find(NS_IEC + "valueFormat"),
model.datatypes.XSD_TYPE_CLASSES)
if value_format is not None:
cd.value_format = value_format
value_list = _failsafe_construct(element.find(NS_IEC + "valueList"), _construct_value_list, failsafe,
value_format=value_format)
if value_list is not None:
cd.value_list = value_list
value = _get_text_or_none(element.find(NS_IEC + "value"))
if value is not None and value_format is not None:
cd.value = model.datatypes.from_xsd(value, value_format)
value_id = _failsafe_construct(element.find(NS_IEC + "valueId"), _construct_reference, failsafe, namespace=NS_IEC)
if value_id is not None:
cd.value_id = value_id
for level_type_element in element.findall(NS_IEC + "levelType"):
level_type = _get_text_mapped_or_none(level_type_element, IEC61360_LEVEL_TYPES_INVERSE)
if level_type is None:
error_message = f"{_element_pretty_identifier(level_type_element)} has invalid value: " \
+ str(level_type_element.text)
if not failsafe:
raise ValueError(error_message)
logger.warning(error_message)
continue
cd.level_types.add(level_type)
return cd
def _construct_concept_description(element: etree.Element, failsafe: bool, **_kwargs: Any) -> model.ConceptDescription:
"""
TODO: our model doesn't support more than one embeddedDataSpecification (yet)
"""
cd: Optional[model.ConceptDescription] = None
identifier = _child_construct_mandatory(element, NS_AAS + "identification", _construct_identifier)
# Hack to detect IEC61360ConceptDescriptions, which are represented using dataSpecification according to DotAAS
dspec_tag = NS_AAS + "embeddedDataSpecification"
dspecs = element.findall(dspec_tag)
if len(dspecs) > 1:
logger.warning(f"{_element_pretty_identifier(element)} has more than one "
f"{_tag_replace_namespace(dspec_tag, element.nsmap)}. This model currently supports only one "
f"per {_tag_replace_namespace(element.tag, element.nsmap)}!")
if len(dspecs) > 0:
dspec = dspecs[0]
dspec_content = dspec.find(NS_AAS + "dataSpecificationContent")
if dspec_content is not None:
dspec_ref = _failsafe_construct(dspec.find(NS_AAS + "dataSpecification"), _construct_reference, failsafe)
if dspec_ref is not None and len(dspec_ref.key) > 0 and dspec_ref.key[0].value == \
"http://admin-shell.io/DataSpecificationTemplates/DataSpecificationIEC61360/2/0":
cd = _failsafe_construct(dspec_content.find(NS_AAS + "dataSpecificationIEC61360"),
_construct_iec61360_concept_description, failsafe, identifier=identifier)
if cd is None:
cd = model.ConceptDescription(identifier)
for ref in _failsafe_construct_multiple(element.findall(NS_AAS + "isCaseOf"), _construct_reference, failsafe):
cd.is_case_of.add(ref)
_amend_abstract_attributes(cd, element, failsafe)
......
Supports Markdown
0% or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment