diff --git a/aas/adapter/_generic.py b/aas/adapter/_generic.py index a5d85e69adbac626366ea44e45870f92965eb1ba..aee37a689731f684e24e79cb21c5cdf532172083 100644 --- a/aas/adapter/_generic.py +++ b/aas/adapter/_generic.py @@ -12,7 +12,7 @@ The dicts defined in this module are used in the json and xml modules to translate enum members of our implementation to the respective string and vice versa. """ -from typing import Dict +from typing import Dict, Type from aas import model @@ -98,4 +98,5 @@ IEC61360_DATA_TYPES_INVERSE: Dict[str, model.concept.IEC61360DataType] = {v: k f IEC61360_LEVEL_TYPES_INVERSE: Dict[str, model.concept.IEC61360LevelType] = \ {v: k for k, v in IEC61360_LEVEL_TYPES.items()} -KEY_ELEMENTS_CLASSES_INVERSE: Dict[model.KeyElements, type] = {v: k for k, v in model.KEY_ELEMENTS_CLASSES.items()} +KEY_ELEMENTS_CLASSES_INVERSE: Dict[model.KeyElements, Type[model.Referable]] = \ + {v: k for k, v in model.KEY_ELEMENTS_CLASSES.items()} diff --git a/aas/adapter/xml/__init__.py b/aas/adapter/xml/__init__.py index d35c70358c8ce6d35e2740b283d537ca5792b3db..dc8e7cf9b82e06330522fae91c0fe870c4c56f88 100644 --- a/aas/adapter/xml/__init__.py +++ b/aas/adapter/xml/__init__.py @@ -10,6 +10,7 @@ xml_deserialization.py import os.path from .xml_serialization import write_aas_xml_file -from .xml_deserialization import read_aas_xml_file +from .xml_deserialization import AASFromXmlDecoder, StrictAASFromXmlDecoder, StrippedAASFromXmlDecoder, \ + StrictStrippedAASFromXmlDecoder, XMLConstructables, read_aas_xml_file, read_aas_xml_file_into, read_aas_xml_element XML_SCHEMA_FILE = os.path.join(os.path.dirname(__file__), 'AAS.xsd') diff --git a/aas/adapter/xml/xml_deserialization.py b/aas/adapter/xml/xml_deserialization.py index 068c15b653385c59cb1848775c9c269acce73516..abeae959f60bc59c5afdf592df2e4f39b0c60530 100644 --- a/aas/adapter/xml/xml_deserialization.py +++ b/aas/adapter/xml/xml_deserialization.py @@ -11,15 +11,15 @@ """ Module for deserializing Asset Administration Shell data from the official XML format -Use this module by calling read_xml_aas_file(file, failsafe). -The function returns a DictObjectStore containing all parsed elements. +This module provides the following functions for parsing XML documents: +- read_aas_xml_element() constructs a single object from an XML document containing a single element +- read_aas_xml_file_into() constructs all elements of an XML document and stores them in a given object store +- read_aas_xml_file() constructs all elements of an XML document and returns them in a DictObjectStore -Unlike the JSON deserialization, parsing is done top-down. Elements with a specific tag are searched on the level -directly below the level of the current xml element (in terms of parent and child relation) and parsed when -found. Constructor functions of these elements will then again search for mandatory and optional child elements -and construct them if available, and so on. +These functions take a decoder class as keyword argument, which allows parsing in failsafe (default) or non-failsafe +mode. Parsing stripped elements - used in the HTTP adapter - is also possible. It is also possible to subclass the +default decoder class and provide an own decoder. -This module supports parsing in failsafe and non-failsafe mode. In failsafe mode errors regarding missing attributes and elements or invalid values are caught and logged. In non-failsafe mode any error would abort parsing. Error handling is done only by _failsafe_construct() in this module. Nearly all constructor functions are called @@ -27,16 +27,23 @@ by other constructor functions via _failsafe_construct(), so an error chain is c which allows printing stacktrace-like error messages like the following in the error case (in failsafe mode of course): KeyError: aas:identification on line 252 has no attribute with name idType! - -> Failed to convert aas:identification on line 252 to type Identifier! - -> Failed to convert aas:conceptDescription on line 247 to type ConceptDescription! + -> Failed to construct aas:identification on line 252 using construct_identifier! + -> Failed to construct aas:conceptDescription on line 247 using construct_concept_description! + + +Unlike the JSON deserialization, parsing is done top-down. Elements with a specific tag are searched on the level +directly below the level of the current xml element (in terms of parent and child relation) and parsed when +found. Constructor functions of these elements will then again search for mandatory and optional child elements +and construct them if available, and so on. """ from ... import model from lxml import etree # type: ignore import logging import base64 +import enum -from typing import Any, Callable, Dict, IO, Iterable, Optional, Tuple, Type, TypeVar +from typing import Any, Callable, Dict, IO, Iterable, Optional, Set, Tuple, Type, TypeVar from .xml_serialization import NS_AAS, NS_ABAC, NS_IEC from .._generic import MODELING_KIND_INVERSE, ASSET_KIND_INVERSE, KEY_ELEMENTS_INVERSE, KEY_TYPES_INVERSE, \ IDENTIFIER_TYPES_INVERSE, ENTITY_TYPES_INVERSE, IEC61360_DATA_TYPES_INVERSE, IEC61360_LEVEL_TYPES_INVERSE, \ @@ -45,6 +52,7 @@ from .._generic import MODELING_KIND_INVERSE, ASSET_KIND_INVERSE, KEY_ELEMENTS_I logger = logging.getLogger(__name__) T = TypeVar("T") +RE = TypeVar("RE", bound=model.RelationshipElement) def _str_to_bool(string: str) -> bool: @@ -97,18 +105,6 @@ def _element_pretty_identifier(element: etree.Element) -> str: return identifier -def _constructor_name_to_typename(constructor: Callable[[etree.Element, bool], T]) -> str: - """ - A helper function for converting the name of a constructor function to the respective type name. - - _construct_some_type -> SomeType - - :param constructor: The constructor function. - :return: The name of the type the constructor function constructs. - """ - return "".join([s[0].upper() + s[1:] for s in constructor.__name__.split("_")[2:]]) - - def _exception_to_str(exception: BaseException) -> str: """ A helper function used to stringify exceptions. @@ -138,7 +134,7 @@ def _get_child_mandatory(parent: etree.Element, child_tag: str) -> etree.Element return child -def _get_all_children_expect_tag(parent: etree.Element, exppected_tag: str, failsafe: bool) -> Iterable[etree.Element]: +def _get_all_children_expect_tag(parent: etree.Element, expected_tag: str, failsafe: bool) -> Iterable[etree.Element]: """ Iterates over all children, matching the tag. @@ -146,14 +142,14 @@ def _get_all_children_expect_tag(parent: etree.Element, exppected_tag: str, fail failsafe: Logs a warning if a child element doesn't match. :param parent: The parent element. - :param exppected_tag: The tag of the children. + :param expected_tag: The tag of the children. :return: An iterator over all child elements that match child_tag. :raises KeyError: If the tag of a child element doesn't match and failsafe is true. """ for child in parent: - if child.tag != exppected_tag: + if child.tag != expected_tag: error_message = f"{_element_pretty_identifier(child)}, child of {_element_pretty_identifier(parent)}, " \ - f"doesn't match the expected tag {_tag_replace_namespace(exppected_tag, child.nsmap)}!" + f"doesn't match the expected tag {_tag_replace_namespace(expected_tag, child.nsmap)}!" if not failsafe: raise KeyError(error_message) logger.warning(error_message) @@ -281,10 +277,9 @@ def _failsafe_construct(element: Optional[etree.Element], constructor: Callable[ if element is None: return None try: - return constructor(element, failsafe, **kwargs) + return constructor(element, **kwargs) except (KeyError, ValueError) as e: - type_name = _constructor_name_to_typename(constructor) - error_message = f"Failed to create {type_name} from {_element_pretty_identifier(element)}!" + error_message = f"Failed to construct {_element_pretty_identifier(element)} using {constructor.__name__}!" if not failsafe: raise type(e)(error_message) from e error_type = type(e).__name__ @@ -310,7 +305,7 @@ def _failsafe_construct_mandatory(element: etree.Element, constructor: Callable[ constructed = _failsafe_construct(element, constructor, False, **kwargs) if constructed is None: raise TypeError("The result of a non-failsafe _failsafe_construct() call was None! " - "This is a bug in the pyAAS XML deserialization, please report it!") + "This is a bug in the PyI40AAS XML deserialization, please report it!") return constructed @@ -347,8 +342,8 @@ def _child_construct_mandatory(parent: etree.Element, child_tag: str, constructo return _failsafe_construct_mandatory(_get_child_mandatory(parent, child_tag), constructor, **kwargs) -def _child_construct_multiple(parent: etree.Element, expected_tag: str, constructor: Callable[..., T], failsafe: bool, - **kwargs: Any) -> Iterable[T]: +def _child_construct_multiple(parent: etree.Element, expected_tag: str, constructor: Callable[..., T], + failsafe: bool, **kwargs: Any) -> Iterable[T]: """ Shorthand for _failsafe_construct_multiple() in combination with _get_child_multiple(). @@ -387,49 +382,6 @@ def _child_text_mandatory_mapped(parent: etree.Element, child_tag: str, dct: Dic return _get_text_mandatory_mapped(_get_child_mandatory(parent, child_tag), dct) -def _amend_abstract_attributes(obj: object, element: etree.Element, failsafe: bool) -> None: - """ - A helper function that amends optional attributes to already constructed class instances, if they inherit - from an abstract class like Referable, Identifiable, HasSemantics or Qualifiable. - - :param obj: The constructed class instance. - :param element: The respective xml element. - :param failsafe: Indicates whether errors should be caught or re-raised. - :return: None - """ - if isinstance(obj, model.Referable): - category = _get_text_or_none(element.find(NS_AAS + "category")) - if category is not None: - obj.category = category - description = _failsafe_construct(element.find(NS_AAS + "description"), _construct_lang_string_set, failsafe) - if description is not None: - obj.description = description - if isinstance(obj, model.Identifiable): - id_short = _get_text_or_none(element.find(NS_AAS + "idShort")) - if id_short is not None: - obj.id_short = id_short - administration = _failsafe_construct(element.find(NS_AAS + "administration"), - _construct_administrative_information, failsafe) - if administration: - obj.administration = administration - if isinstance(obj, model.HasSemantics): - semantic_id = _failsafe_construct(element.find(NS_AAS + "semanticId"), _construct_reference, failsafe) - if semantic_id is not None: - obj.semantic_id = semantic_id - if isinstance(obj, model.Qualifiable): - # TODO: simplify this should our suggestion regarding the XML schema get accepted - # https://git.rwth-aachen.de/acplt/pyaas/-/issues/56 - for constraint in element.findall(NS_AAS + "qualifier"): - if len(constraint) == 0: - raise KeyError(f"{_element_pretty_identifier(constraint)} has no constraint!") - if len(constraint) > 1: - logger.warning(f"{_element_pretty_identifier(constraint)} has more than one constraint, " - "using the first one...") - constructed = _failsafe_construct(constraint[0], _construct_constraint, failsafe) - if constructed is not None: - obj.qualifier.add(constructed) - - def _get_modeling_kind(element: etree.Element) -> model.ModelingKind: """ Returns the modeling kind of an element with the default value INSTANCE, if none specified. @@ -441,615 +393,1003 @@ def _get_modeling_kind(element: etree.Element) -> model.ModelingKind: return modeling_kind if modeling_kind is not None else model.ModelingKind.INSTANCE -def _construct_key(element: etree.Element, _failsafe: bool, **_kwargs: Any) -> model.Key: - return model.Key( - _get_attrib_mandatory_mapped(element, "type", KEY_ELEMENTS_INVERSE), - _str_to_bool(_get_attrib_mandatory(element, "local")), - _get_text_mandatory(element), - _get_attrib_mandatory_mapped(element, "idType", KEY_TYPES_INVERSE) - ) - - -def _construct_key_tuple(element: etree.Element, failsafe: bool, namespace: str = NS_AAS, **_kwargs: Any)\ - -> Tuple[model.Key, ...]: - keys = _get_child_mandatory(element, namespace + "keys") - return tuple(_child_construct_multiple(keys, namespace + "key", _construct_key, failsafe)) - - -def _construct_reference(element: etree.Element, failsafe: bool, namespace: str = NS_AAS, **_kwargs: Any) \ - -> model.Reference: - return model.Reference(_construct_key_tuple(element, failsafe, namespace=namespace)) - - -def _construct_aas_reference(element: etree.Element, failsafe: bool, type_: Type[model.base._RT], **_kwargs: Any) \ - -> model.AASReference[model.base._RT]: - keys = _construct_key_tuple(element, failsafe) - if len(keys) != 0 and not issubclass(KEY_ELEMENTS_CLASSES_INVERSE.get(keys[-1].type, type(None)), type_): - logger.warning(f"Type {keys[-1].type.name} of last key of reference to {' / '.join(str(k) for k in keys)} " - f"does not match reference type {type_.__name__}") - return model.AASReference(keys, type_) - - -def _construct_submodel_reference(element: etree.Element, failsafe: bool, **kwargs: Any) \ - -> model.AASReference[model.Submodel]: - return _construct_aas_reference(element, failsafe, model.Submodel, **kwargs) - - -def _construct_asset_reference(element: etree.Element, failsafe: bool, **kwargs: Any) \ - -> model.AASReference[model.Asset]: - return _construct_aas_reference(element, failsafe, model.Asset, **kwargs) - - -def _construct_asset_administration_shell_reference(element: etree.Element, failsafe: bool, **kwargs: Any) \ - -> model.AASReference[model.AssetAdministrationShell]: - return _construct_aas_reference(element, failsafe, model.AssetAdministrationShell, **kwargs) - - -def _construct_referable_reference(element: etree.Element, failsafe: bool, **kwargs: Any) \ - -> model.AASReference[model.Referable]: - return _construct_aas_reference(element, failsafe, model.Referable, **kwargs) - - -def _construct_concept_description_reference(element: etree.Element, failsafe: bool, **kwargs: Any) \ - -> model.AASReference[model.ConceptDescription]: - return _construct_aas_reference(element, failsafe, model.ConceptDescription, **kwargs) +class AASFromXmlDecoder: + """ + The default XML decoder class. + It parses XML documents in a failsafe manner, meaning any errors encountered will be logged and invalid XML elements + will be skipped. + Most member functions support the object_class parameter. It was introduced so they can be overwritten + in subclasses, which allows constructing instances of subtypes. + """ + failsafe = True + stripped = False + + @classmethod + def _amend_abstract_attributes(cls, obj: object, element: etree.Element) -> None: + """ + A helper function that amends optional attributes to already constructed class instances, if they inherit + from an abstract class like Referable, Identifiable, HasSemantics or Qualifiable. + + :param obj: The constructed class instance. + :param element: The respective xml element. + :return: None + """ + if isinstance(obj, model.Referable): + category = _get_text_or_none(element.find(NS_AAS + "category")) + if category is not None: + obj.category = category + description = _failsafe_construct(element.find(NS_AAS + "description"), cls.construct_lang_string_set, + cls.failsafe) + if description is not None: + obj.description = description + if isinstance(obj, model.Identifiable): + id_short = _get_text_or_none(element.find(NS_AAS + "idShort")) + if id_short is not None: + obj.id_short = id_short + administration = _failsafe_construct(element.find(NS_AAS + "administration"), + cls.construct_administrative_information, cls.failsafe) + if administration: + obj.administration = administration + if isinstance(obj, model.HasSemantics): + semantic_id = _failsafe_construct(element.find(NS_AAS + "semanticId"), cls.construct_reference, + cls.failsafe) + if semantic_id is not None: + obj.semantic_id = semantic_id + if isinstance(obj, model.Qualifiable) and not cls.stripped: + # TODO: simplify this should our suggestion regarding the XML schema get accepted + # https://git.rwth-aachen.de/acplt/pyi40aas/-/issues/57 + for constraint in element.findall(NS_AAS + "qualifier"): + if len(constraint) > 1: + logger.warning(f"{_element_pretty_identifier(constraint)} has more than one constraint, " + "using the first one...") + constructed = _failsafe_construct(constraint[0], cls.construct_constraint, cls.failsafe) + if constructed is not None: + obj.qualifier.add(constructed) + + @classmethod + def _construct_relationship_element_internal(cls, element: etree.Element, object_class: Type[RE], **_kwargs: Any) \ + -> RE: + """ + Helper function used by construct_relationship_element() and construct_annotated_relationship_element() + to reduce duplicate code + """ + relationship_element = object_class( + _child_text_mandatory(element, NS_AAS + "idShort"), + _child_construct_mandatory(element, NS_AAS + "first", cls._construct_referable_reference), + _child_construct_mandatory(element, NS_AAS + "second", cls._construct_referable_reference), + kind=_get_modeling_kind(element) + ) + cls._amend_abstract_attributes(relationship_element, element) + return relationship_element + + @classmethod + def _construct_key_tuple(cls, element: etree.Element, namespace: str = NS_AAS, **_kwargs: Any) \ + -> Tuple[model.Key, ...]: + """ + Helper function used by construct_reference() and construct_aas_reference() to reduce duplicate code + """ + keys = _get_child_mandatory(element, namespace + "keys") + return tuple(_child_construct_multiple(keys, namespace + "key", cls.construct_key, cls.failsafe)) + + @classmethod + def _construct_submodel_reference(cls, element: etree.Element, **kwargs: Any) -> model.AASReference[model.Submodel]: + """ + Helper function. Doesn't support the object_class parameter. Overwrite construct_aas_reference instead. + """ + return cls.construct_aas_reference_expect_type(element, model.Submodel, **kwargs) + + @classmethod + def _construct_asset_reference(cls, element: etree.Element, **kwargs: Any) \ + -> model.AASReference[model.Asset]: + """ + Helper function. Doesn't support the object_class parameter. Overwrite construct_aas_reference instead. + """ + return cls.construct_aas_reference_expect_type(element, model.Asset, **kwargs) + + @classmethod + def _construct_asset_administration_shell_reference(cls, element: etree.Element, **kwargs: Any) \ + -> model.AASReference[model.AssetAdministrationShell]: + """ + Helper function. Doesn't support the object_class parameter. Overwrite construct_aas_reference instead. + """ + return cls.construct_aas_reference_expect_type(element, model.AssetAdministrationShell, **kwargs) + + @classmethod + def _construct_referable_reference(cls, element: etree.Element, **kwargs: Any) \ + -> model.AASReference[model.Referable]: + """ + Helper function. Doesn't support the object_class parameter. Overwrite construct_aas_reference instead. + """ + return cls.construct_aas_reference_expect_type(element, model.Referable, **kwargs) + + @classmethod + def _construct_concept_description_reference(cls, element: etree.Element, **kwargs: Any) \ + -> model.AASReference[model.ConceptDescription]: + """ + Helper function. Doesn't support the object_class parameter. Overwrite construct_aas_reference instead. + """ + return cls.construct_aas_reference_expect_type(element, model.ConceptDescription, **kwargs) + + @classmethod + def construct_key(cls, element: etree.Element, object_class=model.Key, **_kwargs: Any) \ + -> model.Key: + return object_class( + _get_attrib_mandatory_mapped(element, "type", KEY_ELEMENTS_INVERSE), + _str_to_bool(_get_attrib_mandatory(element, "local")), + _get_text_mandatory(element), + _get_attrib_mandatory_mapped(element, "idType", KEY_TYPES_INVERSE) + ) + + @classmethod + def construct_reference(cls, element: etree.Element, namespace: str = NS_AAS, object_class=model.Reference, + **_kwargs: Any) -> model.Reference: + return object_class(cls._construct_key_tuple(element, namespace=namespace)) + + @classmethod + def construct_aas_reference(cls, element: etree.Element, object_class=model.AASReference, **_kwargs: Any) \ + -> model.AASReference: + """ + This constructor for AASReference determines the type of the AASReference by its keys. If no keys are present, + it will default to the type Referable. This behaviour is wanted in read_aas_xml_element(). + """ + keys = cls._construct_key_tuple(element) + type_: Type[model.Referable] = model.Referable + if len(keys) > 0: + type_ = KEY_ELEMENTS_CLASSES_INVERSE.get(keys[-1].type, model.Referable) + return object_class(keys, type_) + + @classmethod + def construct_aas_reference_expect_type(cls, element: etree.Element, type_: Type[model.base._RT], + object_class=model.AASReference, **_kwargs: Any) \ + -> model.AASReference[model.base._RT]: + """ + This constructor for AASReference allows passing an expected type, which is checked against the type of the last + key of the reference. This constructor function is used by other constructor functions, since all expect a + specific target type. + """ + keys = cls._construct_key_tuple(element) + if keys and not issubclass(KEY_ELEMENTS_CLASSES_INVERSE.get(keys[-1].type, type(None)), type_): + logger.warning("type %s of last key of reference to %s does not match reference type %s", + keys[-1].type.name, " / ".join(str(k) for k in keys), type_.__name__) + return object_class(keys, type_) + + @classmethod + def construct_administrative_information(cls, element: etree.Element, object_class=model.AdministrativeInformation, + **_kwargs: Any) -> model.AdministrativeInformation: + return object_class( + _get_text_or_none(element.find(NS_AAS + "version")), + _get_text_or_none(element.find(NS_AAS + "revision")) + ) + + @classmethod + def construct_lang_string_set(cls, element: etree.Element, namespace: str = NS_AAS, **_kwargs: Any) \ + -> model.LangStringSet: + """ + This function doesn't support the object_class parameter, because LangStringSet is just a generic type alias. + """ + lss: model.LangStringSet = {} + for lang_string in _get_all_children_expect_tag(element, namespace + "langString", cls.failsafe): + lss[_get_attrib_mandatory(lang_string, "lang")] = _get_text_mandatory(lang_string) + return lss + + @classmethod + def construct_qualifier(cls, element: etree.Element, object_class=model.Qualifier, **_kwargs: Any) \ + -> model.Qualifier: + qualifier = object_class( + _child_text_mandatory(element, NS_AAS + "type"), + _child_text_mandatory_mapped(element, NS_AAS + "valueType", model.datatypes.XSD_TYPE_CLASSES) + ) + value = _get_text_or_none(element.find(NS_AAS + "value")) + if value is not None: + qualifier.value = model.datatypes.from_xsd(value, qualifier.value_type) + value_id = _failsafe_construct(element.find(NS_AAS + "valueId"), cls.construct_reference, cls.failsafe) + if value_id is not None: + qualifier.value_id = value_id + cls._amend_abstract_attributes(qualifier, element) + return qualifier + + @classmethod + def construct_formula(cls, element: etree.Element, object_class=model.Formula, **_kwargs: Any) -> model.Formula: + formula = object_class() + depends_on_refs = element.find(NS_AAS + "dependsOnRefs") + if depends_on_refs is not None: + for ref in _failsafe_construct_multiple(depends_on_refs.findall(NS_AAS + "reference"), + cls.construct_reference, cls.failsafe): + formula.depends_on.add(ref) + return formula + + @classmethod + def construct_identifier(cls, element: etree.Element, object_class=model.Identifier, **_kwargs: Any) \ + -> model.Identifier: + return object_class( + _get_text_mandatory(element), + _get_attrib_mandatory_mapped(element, "idType", IDENTIFIER_TYPES_INVERSE) + ) + + @classmethod + def construct_security(cls, _element: etree.Element, object_class=model.Security, **_kwargs: Any) -> model.Security: + """ + TODO: this is just a stub implementation + """ + return object_class() + + @classmethod + def construct_view(cls, element: etree.Element, object_class=model.View, **_kwargs: Any) -> model.View: + view = object_class(_child_text_mandatory(element, NS_AAS + "idShort")) + contained_elements = element.find(NS_AAS + "containedElements") + if contained_elements is not None: + for ref in _failsafe_construct_multiple(contained_elements.findall(NS_AAS + "containedElementRef"), + cls._construct_referable_reference, cls.failsafe): + view.contained_element.add(ref) + cls._amend_abstract_attributes(view, element) + return view + + @classmethod + def construct_concept_dictionary(cls, element: etree.Element, object_class=model.ConceptDictionary, + **_kwargs: Any) -> model.ConceptDictionary: + concept_dictionary = object_class(_child_text_mandatory(element, NS_AAS + "idShort")) + concept_description = element.find(NS_AAS + "conceptDescriptionRefs") + if concept_description is not None: + for ref in _failsafe_construct_multiple(concept_description.findall(NS_AAS + "conceptDescriptionRef"), + cls._construct_concept_description_reference, cls.failsafe): + concept_dictionary.concept_description.add(ref) + cls._amend_abstract_attributes(concept_dictionary, element) + return concept_dictionary + + @classmethod + def construct_submodel_element(cls, element: etree.Element, **kwargs: Any) -> model.SubmodelElement: + """ + This function doesn't support the object_class parameter. + Overwrite each individual SubmodelElement/DataElement constructor function instead. + """ + # unlike in construct_data_elements, we have to declare a submodel_elements dict without namespace here first + # because mypy doesn't automatically infer Callable[..., model.SubmodelElement] for the functions, because + # construct_submodel_element_collection doesn't have the object_class parameter, but object_class_ordered and + # object_class_unordered + submodel_elements: Dict[str, Callable[..., model.SubmodelElement]] = { + "annotatedRelationshipElement": cls.construct_annotated_relationship_element, + "basicEvent": cls.construct_basic_event, + "capability": cls.construct_capability, + "entity": cls.construct_entity, + "operation": cls.construct_operation, + "relationshipElement": cls.construct_relationship_element, + "submodelElementCollection": cls.construct_submodel_element_collection + } + submodel_elements = {NS_AAS + k: v for k, v in submodel_elements.items()} + if element.tag not in submodel_elements: + return cls.construct_data_element(element, abstract_class_name="SubmodelElement", **kwargs) + return submodel_elements[element.tag](element, **kwargs) + + @classmethod + def construct_data_element(cls, element: etree.Element, abstract_class_name: str = "DataElement", **kwargs: Any) \ + -> model.DataElement: + """ + This function does not support the object_class parameter. + Overwrite each individual DataElement constructor function instead. + """ + data_elements: Dict[str, Callable[..., model.DataElement]] = {NS_AAS + k: v for k, v in { + "blob": cls.construct_blob, + "file": cls.construct_file, + "multiLanguageProperty": cls.construct_multi_language_property, + "property": cls.construct_property, + "range": cls.construct_range, + "referenceElement": cls.construct_reference_element, + }.items()} + if element.tag not in data_elements: + raise KeyError(_element_pretty_identifier(element) + f" is not a valid {abstract_class_name}!") + return data_elements[element.tag](element, **kwargs) + + @classmethod + def construct_constraint(cls, element: etree.Element, **kwargs: Any) -> model.Constraint: + """ + This function does not support the object_class parameter. + Overwrite construct_formula or construct_qualifier instead. + """ + constraints: Dict[str, Callable[..., model.Constraint]] = {NS_AAS + k: v for k, v in { + "formula": cls.construct_formula, + "qualifier": cls.construct_qualifier + }.items()} + if element.tag not in constraints: + raise KeyError(_element_pretty_identifier(element) + " is not a valid Constraint!") + return constraints[element.tag](element, **kwargs) + + @classmethod + def construct_operation_variable(cls, element: etree.Element, object_class=model.OperationVariable, + **_kwargs: Any) -> model.OperationVariable: + value = _get_child_mandatory(element, NS_AAS + "value") + if len(value) == 0: + raise KeyError(f"{_element_pretty_identifier(value)} has no submodel element!") + if len(value) > 1: + logger.warning(f"{_element_pretty_identifier(value)} has more than one submodel element, " + "using the first one...") + return object_class( + _failsafe_construct_mandatory(value[0], cls.construct_submodel_element) + ) + + @classmethod + def construct_annotated_relationship_element(cls, element: etree.Element, + object_class=model.AnnotatedRelationshipElement, **_kwargs: Any) \ + -> model.AnnotatedRelationshipElement: + annotated_relationship_element = cls._construct_relationship_element_internal(element, object_class) + if not cls.stripped: + for data_element in _get_child_mandatory(element, NS_AAS + "annotations"): + if len(data_element) == 0: + raise KeyError(f"{_element_pretty_identifier(data_element)} has no data element!") + if len(data_element) > 1: + logger.warning(f"{_element_pretty_identifier(data_element)} has more than one data element, " + "using the first one...") + constructed = _failsafe_construct(data_element[0], cls.construct_data_element, cls.failsafe) + if constructed is not None: + annotated_relationship_element.annotation.add(constructed) + return annotated_relationship_element + + @classmethod + def construct_basic_event(cls, element: etree.Element, object_class=model.BasicEvent, **_kwargs: Any) \ + -> model.BasicEvent: + basic_event = object_class( + _child_text_mandatory(element, NS_AAS + "idShort"), + _child_construct_mandatory(element, NS_AAS + "observed", cls._construct_referable_reference), + kind=_get_modeling_kind(element) + ) + cls._amend_abstract_attributes(basic_event, element) + return basic_event + + @classmethod + def construct_blob(cls, element: etree.Element, object_class=model.Blob, **_kwargs: Any) -> model.Blob: + blob = object_class( + _child_text_mandatory(element, NS_AAS + "idShort"), + _child_text_mandatory(element, NS_AAS + "mimeType"), + kind=_get_modeling_kind(element) + ) + value = _get_text_or_none(element.find(NS_AAS + "value")) + if value is not None: + blob.value = base64.b64decode(value) + cls._amend_abstract_attributes(blob, element) + return blob + + @classmethod + def construct_capability(cls, element: etree.Element, object_class=model.Capability, **_kwargs: Any) \ + -> model.Capability: + capability = object_class( + _child_text_mandatory(element, NS_AAS + "idShort"), + kind=_get_modeling_kind(element) + ) + cls._amend_abstract_attributes(capability, element) + return capability + + @classmethod + def construct_entity(cls, element: etree.Element, object_class=model.Entity, **_kwargs: Any) -> model.Entity: + entity = object_class( + _child_text_mandatory(element, NS_AAS + "idShort"), + _child_text_mandatory_mapped(element, NS_AAS + "entityType", ENTITY_TYPES_INVERSE), + # pass the asset to the constructor, because self managed entities need asset references + asset=_failsafe_construct(element.find(NS_AAS + "assetRef"), cls._construct_asset_reference, cls.failsafe), + kind=_get_modeling_kind(element) + ) + if not cls.stripped: + # TODO: remove wrapping submodelElement, in accordance to future schemas + # https://git.rwth-aachen.de/acplt/pyi40aas/-/issues/57 + statements = _get_child_mandatory(element, NS_AAS + "statements") + for submodel_element in _get_all_children_expect_tag(statements, NS_AAS + "submodelElement", cls.failsafe): + if len(submodel_element) == 0: + raise KeyError(f"{_element_pretty_identifier(submodel_element)} has no submodel element!") + if len(submodel_element) > 1: + logger.warning(f"{_element_pretty_identifier(submodel_element)} has more than one submodel element," + " using the first one...") + constructed = _failsafe_construct(submodel_element[0], cls.construct_submodel_element, cls.failsafe) + if constructed is not None: + entity.statement.add(constructed) + cls._amend_abstract_attributes(entity, element) + return entity + + @classmethod + def construct_file(cls, element: etree.Element, object_class=model.File, **_kwargs: Any) -> model.File: + file = object_class( + _child_text_mandatory(element, NS_AAS + "idShort"), + _child_text_mandatory(element, NS_AAS + "mimeType"), + kind=_get_modeling_kind(element) + ) + value = _get_text_or_none(element.find(NS_AAS + "value")) + if value is not None: + file.value = value + cls._amend_abstract_attributes(file, element) + return file + + @classmethod + def construct_multi_language_property(cls, element: etree.Element, object_class=model.MultiLanguageProperty, + **_kwargs: Any) -> model.MultiLanguageProperty: + multi_language_property = object_class( + _child_text_mandatory(element, NS_AAS + "idShort"), + kind=_get_modeling_kind(element) + ) + value = _failsafe_construct(element.find(NS_AAS + "value"), cls.construct_lang_string_set, cls.failsafe) + if value is not None: + multi_language_property.value = value + value_id = _failsafe_construct(element.find(NS_AAS + "valueId"), cls.construct_reference, cls.failsafe) + if value_id is not None: + multi_language_property.value_id = value_id + cls._amend_abstract_attributes(multi_language_property, element) + return multi_language_property + + @classmethod + def construct_operation(cls, element: etree.Element, object_class=model.Operation, **_kwargs: Any) \ + -> model.Operation: + operation = object_class( + _child_text_mandatory(element, NS_AAS + "idShort"), + kind=_get_modeling_kind(element) + ) + for input_variable in _failsafe_construct_multiple(element.findall(NS_AAS + "inputVariable"), + cls.construct_operation_variable, cls.failsafe): + operation.input_variable.append(input_variable) + for output_variable in _failsafe_construct_multiple(element.findall(NS_AAS + "outputVariable"), + cls.construct_operation_variable, cls.failsafe): + operation.output_variable.append(output_variable) + for in_output_variable in _failsafe_construct_multiple(element.findall(NS_AAS + "inoutputVariable"), + cls.construct_operation_variable, cls.failsafe): + operation.in_output_variable.append(in_output_variable) + cls._amend_abstract_attributes(operation, element) + return operation + + @classmethod + def construct_property(cls, element: etree.Element, object_class=model.Property, **_kwargs: Any) -> model.Property: + property_ = object_class( + _child_text_mandatory(element, NS_AAS + "idShort"), + value_type=_child_text_mandatory_mapped(element, NS_AAS + "valueType", model.datatypes.XSD_TYPE_CLASSES), + kind=_get_modeling_kind(element) + ) + value = _get_text_or_none(element.find(NS_AAS + "value")) + if value is not None: + property_.value = model.datatypes.from_xsd(value, property_.value_type) + value_id = _failsafe_construct(element.find(NS_AAS + "valueId"), cls.construct_reference, cls.failsafe) + if value_id is not None: + property_.value_id = value_id + cls._amend_abstract_attributes(property_, element) + return property_ + + @classmethod + def construct_range(cls, element: etree.Element, object_class=model.Range, **_kwargs: Any) -> model.Range: + range_ = object_class( + _child_text_mandatory(element, NS_AAS + "idShort"), + value_type=_child_text_mandatory_mapped(element, NS_AAS + "valueType", model.datatypes.XSD_TYPE_CLASSES), + kind=_get_modeling_kind(element) + ) + max_ = _get_text_or_none(element.find(NS_AAS + "max")) + if max_ is not None: + range_.max = model.datatypes.from_xsd(max_, range_.value_type) + min_ = _get_text_or_none(element.find(NS_AAS + "min")) + if min_ is not None: + range_.min = model.datatypes.from_xsd(min_, range_.value_type) + cls._amend_abstract_attributes(range_, element) + return range_ + + @classmethod + def construct_reference_element(cls, element: etree.Element, object_class=model.ReferenceElement, **_kwargs: Any) \ + -> model.ReferenceElement: + reference_element = object_class( + _child_text_mandatory(element, NS_AAS + "idShort"), + kind=_get_modeling_kind(element) + ) + value = _failsafe_construct(element.find(NS_AAS + "value"), cls._construct_referable_reference, cls.failsafe) + if value is not None: + reference_element.value = value + cls._amend_abstract_attributes(reference_element, element) + return reference_element + + @classmethod + def construct_relationship_element(cls, element: etree.Element, object_class=model.RelationshipElement, + **_kwargs: Any) -> model.RelationshipElement: + return cls._construct_relationship_element_internal(element, object_class=object_class, **_kwargs) + + @classmethod + def construct_submodel_element_collection(cls, element: etree.Element, + object_class_ordered=model.SubmodelElementCollectionOrdered, + object_class_unordered=model.SubmodelElementCollectionUnordered, + **_kwargs: Any) -> model.SubmodelElementCollection: + ordered = _str_to_bool(_child_text_mandatory(element, NS_AAS + "ordered")) + collection_type = object_class_ordered if ordered else object_class_unordered + collection = collection_type( + _child_text_mandatory(element, NS_AAS + "idShort"), + kind=_get_modeling_kind(element) + ) + if not cls.stripped: + value = _get_child_mandatory(element, NS_AAS + "value") + # TODO: simplify this should our suggestion regarding the XML schema get accepted + # https://git.rwth-aachen.de/acplt/pyi40aas/-/issues/57 + for submodel_element in _get_all_children_expect_tag(value, NS_AAS + "submodelElement", cls.failsafe): + if len(submodel_element) == 0: + raise KeyError(f"{_element_pretty_identifier(submodel_element)} has no submodel element!") + if len(submodel_element) > 1: + logger.warning(f"{_element_pretty_identifier(submodel_element)} has more than one submodel element," + " using the first one...") + constructed = _failsafe_construct(submodel_element[0], cls.construct_submodel_element, cls.failsafe) + if constructed is not None: + collection.value.add(constructed) + cls._amend_abstract_attributes(collection, element) + return collection + + @classmethod + def construct_asset_administration_shell(cls, element: etree.Element, object_class=model.AssetAdministrationShell, + **_kwargs: Any) -> model.AssetAdministrationShell: + aas = object_class( + _child_construct_mandatory(element, NS_AAS + "assetRef", cls._construct_asset_reference), + _child_construct_mandatory(element, NS_AAS + "identification", cls.construct_identifier) + ) + security = _failsafe_construct(element.find(NS_ABAC + "security"), cls.construct_security, cls.failsafe) + if security is not None: + aas.security = security + if not cls.stripped: + submodels = element.find(NS_AAS + "submodelRefs") + if submodels is not None: + for ref in _child_construct_multiple(submodels, NS_AAS + "submodelRef", + cls._construct_submodel_reference, cls.failsafe): + aas.submodel.add(ref) + views = element.find(NS_AAS + "views") + if views is not None: + for view in _child_construct_multiple(views, NS_AAS + "view", cls.construct_view, cls.failsafe): + aas.view.add(view) + concept_dictionaries = element.find(NS_AAS + "conceptDictionaries") + if concept_dictionaries is not None: + for cd in _child_construct_multiple(concept_dictionaries, NS_AAS + "conceptDictionary", + cls.construct_concept_dictionary, cls.failsafe): + aas.concept_dictionary.add(cd) + derived_from = _failsafe_construct(element.find(NS_AAS + "derivedFrom"), + cls._construct_asset_administration_shell_reference, cls.failsafe) + if derived_from is not None: + aas.derived_from = derived_from + cls._amend_abstract_attributes(aas, element) + return aas + + @classmethod + def construct_asset(cls, element: etree.Element, object_class=model.Asset, **_kwargs: Any) -> model.Asset: + asset = object_class( + _child_text_mandatory_mapped(element, NS_AAS + "kind", ASSET_KIND_INVERSE), + _child_construct_mandatory(element, NS_AAS + "identification", cls.construct_identifier) + ) + asset_identification_model = _failsafe_construct(element.find(NS_AAS + "assetIdentificationModelRef"), + cls._construct_submodel_reference, cls.failsafe) + if asset_identification_model is not None: + asset.asset_identification_model = asset_identification_model + bill_of_material = _failsafe_construct(element.find(NS_AAS + "billOfMaterialRef"), + cls._construct_submodel_reference, cls.failsafe) + if bill_of_material is not None: + asset.bill_of_material = bill_of_material + cls._amend_abstract_attributes(asset, element) + return asset + + @classmethod + def construct_submodel(cls, element: etree.Element, object_class=model.Submodel, **_kwargs: Any) \ + -> model.Submodel: + submodel = object_class( + _child_construct_mandatory(element, NS_AAS + "identification", cls.construct_identifier), + kind=_get_modeling_kind(element) + ) + if not cls.stripped: + # TODO: simplify this should our suggestion regarding the XML schema get accepted + # https://git.rwth-aachen.de/acplt/pyi40aas/-/issues/57 + for submodel_element in _get_all_children_expect_tag( + _get_child_mandatory(element, NS_AAS + "submodelElements"), NS_AAS + "submodelElement", + cls.failsafe): + if len(submodel_element) == 0: + raise KeyError(f"{_element_pretty_identifier(submodel_element)} has no submodel element!") + if len(submodel_element) > 1: + logger.warning(f"{_element_pretty_identifier(submodel_element)} has more than one submodel element," + " using the first one...") + constructed = _failsafe_construct(submodel_element[0], cls.construct_submodel_element, cls.failsafe) + if constructed is not None: + submodel.submodel_element.add(constructed) + cls._amend_abstract_attributes(submodel, element) + return submodel + + @classmethod + def construct_value_reference_pair(cls, element: etree.Element, value_format: Optional[model.DataTypeDef] = None, + object_class=model.ValueReferencePair, **_kwargs: Any) \ + -> model.ValueReferencePair: + if value_format is None: + raise ValueError("No value format given!") + return object_class( + value_format, + model.datatypes.from_xsd(_child_text_mandatory(element, NS_IEC + "value"), value_format), + _child_construct_mandatory(element, NS_IEC + "valueId", cls.construct_reference, namespace=NS_IEC) + ) + + @classmethod + def construct_value_list(cls, element: etree.Element, value_format: Optional[model.DataTypeDef] = None, + **_kwargs: Any) -> model.ValueList: + """ + This function doesn't support the object_class parameter, because ValueList is just a generic type alias. + """ + return set( + _child_construct_multiple(element, NS_IEC + "valueReferencePair", cls.construct_value_reference_pair, + cls.failsafe, value_format=value_format) + ) + + @classmethod + def construct_iec61360_concept_description(cls, element: etree.Element, + identifier: Optional[model.Identifier] = None, + object_class=model.IEC61360ConceptDescription, **_kwargs: Any) \ + -> model.IEC61360ConceptDescription: + if identifier is None: + raise ValueError("No identifier given!") + cd = object_class( + identifier, + _child_construct_mandatory(element, NS_IEC + "preferredName", cls.construct_lang_string_set, + namespace=NS_IEC) + ) + data_type = _get_text_mapped_or_none(element.find(NS_IEC + "dataType"), IEC61360_DATA_TYPES_INVERSE) + if data_type is not None: + cd.data_type = data_type + definition = _failsafe_construct(element.find(NS_IEC + "definition"), cls.construct_lang_string_set, + cls.failsafe, namespace=NS_IEC) + if definition is not None: + cd.definition = definition + short_name = _failsafe_construct(element.find(NS_IEC + "shortName"), cls.construct_lang_string_set, + cls.failsafe, namespace=NS_IEC) + if short_name is not None: + cd.short_name = short_name + unit = _get_text_or_none(element.find(NS_IEC + "unit")) + if unit is not None: + cd.unit = unit + unit_id = _failsafe_construct(element.find(NS_IEC + "unitId"), cls.construct_reference, cls.failsafe, + namespace=NS_IEC) + if unit_id is not None: + cd.unit_id = unit_id + source_of_definition = _get_text_or_none(element.find(NS_IEC + "sourceOfDefinition")) + if source_of_definition is not None: + cd.source_of_definition = source_of_definition + symbol = _get_text_or_none(element.find(NS_IEC + "symbol")) + if symbol is not None: + cd.symbol = symbol + value_format = _get_text_mapped_or_none(element.find(NS_IEC + "valueFormat"), + model.datatypes.XSD_TYPE_CLASSES) + if value_format is not None: + cd.value_format = value_format + value_list = _failsafe_construct(element.find(NS_IEC + "valueList"), cls.construct_value_list, cls.failsafe, + value_format=value_format) + if value_list is not None: + cd.value_list = value_list + value = _get_text_or_none(element.find(NS_IEC + "value")) + if value is not None and value_format is not None: + cd.value = model.datatypes.from_xsd(value, value_format) + value_id = _failsafe_construct(element.find(NS_IEC + "valueId"), cls.construct_reference, cls.failsafe, + namespace=NS_IEC) + if value_id is not None: + cd.value_id = value_id + for level_type_element in element.findall(NS_IEC + "levelType"): + level_type = _get_text_mapped_or_none(level_type_element, IEC61360_LEVEL_TYPES_INVERSE) + if level_type is None: + error_message = f"{_element_pretty_identifier(level_type_element)} has invalid value: " \ + + str(level_type_element.text) + if not cls.failsafe: + raise ValueError(error_message) + logger.warning(error_message) + continue + cd.level_types.add(level_type) + return cd + + @classmethod + def construct_concept_description(cls, element: etree.Element, object_class=model.ConceptDescription, + **_kwargs: Any) -> model.ConceptDescription: + cd: Optional[model.ConceptDescription] = None + identifier = _child_construct_mandatory(element, NS_AAS + "identification", cls.construct_identifier) + # Hack to detect IEC61360ConceptDescriptions, which are represented using dataSpecification according to DotAAS + dspec_tag = NS_AAS + "embeddedDataSpecification" + dspecs = element.findall(dspec_tag) + if len(dspecs) > 1: + logger.warning(f"{_element_pretty_identifier(element)} has more than one " + f"{_tag_replace_namespace(dspec_tag, element.nsmap)}. This model currently supports only one" + f" per {_tag_replace_namespace(element.tag, element.nsmap)}!") + if len(dspecs) > 0: + dspec = dspecs[0] + dspec_content = dspec.find(NS_AAS + "dataSpecificationContent") + if dspec_content is not None: + dspec_ref = _failsafe_construct(dspec.find(NS_AAS + "dataSpecification"), cls.construct_reference, + cls.failsafe) + if dspec_ref is not None and len(dspec_ref.key) > 0 and dspec_ref.key[0].value == \ + "http://admin-shell.io/DataSpecificationTemplates/DataSpecificationIEC61360/2/0": + cd = _failsafe_construct(dspec_content.find(NS_AAS + "dataSpecificationIEC61360"), + cls.construct_iec61360_concept_description, cls.failsafe, + identifier=identifier) + if cd is None: + cd = object_class(identifier) + for ref in _failsafe_construct_multiple(element.findall(NS_AAS + "isCaseOf"), cls.construct_reference, + cls.failsafe): + cd.is_case_of.add(ref) + cls._amend_abstract_attributes(cd, element) + return cd + + +class StrictAASFromXmlDecoder(AASFromXmlDecoder): + """ + Non-failsafe XML decoder. Encountered errors won't be caught and abort parsing. + """ + failsafe = False -def _construct_administrative_information(element: etree.Element, _failsafe: bool, **_kwargs: Any) \ - -> model.AdministrativeInformation: - return model.AdministrativeInformation( - _get_text_or_none(element.find(NS_AAS + "version")), - _get_text_or_none(element.find(NS_AAS + "revision")) - ) +class StrippedAASFromXmlDecoder(AASFromXmlDecoder): + """ + Decoder for stripped XML elements. Used in the HTTP adapter. + """ + stripped = True -def _construct_lang_string_set(element: etree.Element, failsafe: bool, namespace: str = NS_AAS, **_kwargs: Any) \ - -> model.LangStringSet: - lss: model.LangStringSet = {} - for lang_string in _get_all_children_expect_tag(element, namespace + "langString", failsafe): - lss[_get_attrib_mandatory(lang_string, "lang")] = _get_text_mandatory(lang_string) - return lss +class StrictStrippedAASFromXmlDecoder(StrictAASFromXmlDecoder, StrippedAASFromXmlDecoder): + """ + Non-failsafe decoder for stripped XML elements. + """ + pass -def _construct_qualifier(element: etree.Element, failsafe: bool, **_kwargs: Any) -> model.Qualifier: - qualifier = model.Qualifier( - _child_text_mandatory(element, NS_AAS + "type"), - _child_text_mandatory_mapped(element, NS_AAS + "valueType", model.datatypes.XSD_TYPE_CLASSES) - ) - value = _get_text_or_none(element.find(NS_AAS + "value")) - if value is not None: - qualifier.value = model.datatypes.from_xsd(value, qualifier.value_type) - value_id = _failsafe_construct(element.find(NS_AAS + "valueId"), _construct_reference, failsafe) - if value_id is not None: - qualifier.value_id = value_id - _amend_abstract_attributes(qualifier, element, failsafe) - return qualifier +def _parse_xml_document(file: IO, failsafe: bool = True, **parser_kwargs: Any) -> Optional[etree.Element]: + """ + Parse an XML document into an element tree -def _construct_formula(element: etree.Element, failsafe: bool, **_kwargs: Any) -> model.Formula: - formula = model.Formula() - depends_on_refs = element.find(NS_AAS + "dependsOnRefs") - if depends_on_refs is not None: - for ref in _failsafe_construct_multiple(depends_on_refs.findall(NS_AAS + "reference"), _construct_reference, - failsafe): - formula.depends_on.add(ref) - return formula + :param file: A filename or file-like object to read the XML-serialized data from + :param failsafe: If True, the file is parsed in a failsafe way: Instead of raising an Exception if the document + is malformed, parsing is aborted, an error is logged and None is returned + :param parser_kwargs: Keyword arguments passed to the XMLParser constructor + :return: The root element of the element tree + """ + parser = etree.XMLParser(remove_blank_text=True, remove_comments=True, **parser_kwargs) -def _construct_identifier(element: etree.Element, _failsafe: bool, **_kwargs: Any) -> model.Identifier: - return model.Identifier( - _get_text_mandatory(element), - _get_attrib_mandatory_mapped(element, "idType", IDENTIFIER_TYPES_INVERSE) - ) + try: + return etree.parse(file, parser).getroot() + except etree.XMLSyntaxError as e: + if failsafe: + logger.error(e) + return None + raise e -def _construct_security(_element: etree.Element, _failsafe: bool, **_kwargs: Any) -> model.Security: +def _select_decoder(failsafe: bool, stripped: bool, decoder: Optional[Type[AASFromXmlDecoder]]) \ + -> Type[AASFromXmlDecoder]: """ - TODO: this is just a stub implementation + Returns the correct decoder based on the parameters failsafe and stripped. If a decoder class is given, failsafe + and stripped are ignored. + + :param failsafe: If true, a failsafe decoder is selected. Ignored if a decoder class is specified. + :param stripped: If true, a deocder for parsing stripped XML elements is selected. Ignored if a decoder class is + specified. + :param decoder: Is returned, if specified. + :return: A AASFromXmlDecoder (sub)class. """ - return model.Security() - - -def _construct_view(element: etree.Element, failsafe: bool, **_kwargs: Any) -> model.View: - view = model.View(_child_text_mandatory(element, NS_AAS + "idShort")) - contained_elements = element.find(NS_AAS + "containedElements") - if contained_elements is not None: - for ref in _failsafe_construct_multiple(contained_elements.findall(NS_AAS + "containedElementRef"), - _construct_referable_reference, failsafe): - view.contained_element.add(ref) - _amend_abstract_attributes(view, element, failsafe) - return view - - -def _construct_concept_dictionary(element: etree.Element, failsafe: bool, **_kwargs: Any) -> model.ConceptDictionary: - concept_dictionary = model.ConceptDictionary(_child_text_mandatory(element, NS_AAS + "idShort")) - concept_description = element.find(NS_AAS + "conceptDescriptionRefs") - if concept_description is not None: - for ref in _failsafe_construct_multiple(concept_description.findall(NS_AAS + "conceptDescriptionRef"), - _construct_concept_description_reference, failsafe): - concept_dictionary.concept_description.add(ref) - _amend_abstract_attributes(concept_dictionary, element, failsafe) - return concept_dictionary - - -def _construct_submodel_element(element: etree.Element, failsafe: bool, **kwargs: Any) -> model.SubmodelElement: - submodel_elements: Dict[str, Callable[..., model.SubmodelElement]] = {NS_AAS + k: v for k, v in { - "annotatedRelationshipElement": _construct_annotated_relationship_element, - "basicEvent": _construct_basic_event, - "capability": _construct_capability, - "entity": _construct_entity, - "operation": _construct_operation, - "relationshipElement": _construct_relationship_element, - "submodelElementCollection": _construct_submodel_element_collection - }.items()} - if element.tag not in submodel_elements: - return _construct_data_element(element, failsafe, abstract_class_name="submodel element", **kwargs) - return submodel_elements[element.tag](element, failsafe, **kwargs) - - -def _construct_data_element(element: etree.Element, failsafe: bool, abstract_class_name: str = "data element", - **kwargs: Any) -> model.DataElement: - data_elements: Dict[str, Callable[..., model.DataElement]] = {NS_AAS + k: v for k, v in { - "blob": _construct_blob, - "file": _construct_file, - "multiLanguageProperty": _construct_multi_language_property, - "property": _construct_property, - "range": _construct_range, - "referenceElement": _construct_reference_element, - }.items()} - if element.tag not in data_elements: - raise KeyError(_element_pretty_identifier(element) + f" is not a valid {abstract_class_name}!") - return data_elements[element.tag](element, failsafe, **kwargs) - - -def _construct_constraint(element: etree.Element, failsafe: bool, **kwargs: Any) -> model.Constraint: - constraints: Dict[str, Callable[..., model.Constraint]] = {NS_AAS + k: v for k, v in { - "formula": _construct_formula, - "qualifier": _construct_qualifier - }.items()} - if element.tag not in constraints: - raise KeyError(_element_pretty_identifier(element) + " is not a valid constraint!") - return constraints[element.tag](element, failsafe, **kwargs) - - -def _construct_operation_variable(element: etree.Element, _failsafe: bool, **_kwargs: Any) -> model.OperationVariable: - value = _get_child_mandatory(element, NS_AAS + "value") - if len(value) == 0: - raise KeyError(f"{_element_pretty_identifier(value)} has no submodel element!") - if len(value) > 1: - logger.warning(f"{_element_pretty_identifier(value)} has more than one submodel element, " - "using the first one...") - return model.OperationVariable( - _failsafe_construct_mandatory(value[0], _construct_submodel_element) - ) - - -def _construct_annotated_relationship_element(element: etree.Element, failsafe: bool, **_kwargs: Any) \ - -> model.AnnotatedRelationshipElement: - annotated_relationship_element = _construct_relationship_element_internal( - element, failsafe, object_class=model.AnnotatedRelationshipElement - ) - for data_element in _get_child_mandatory(element, NS_AAS + "annotations"): - if len(data_element) == 0: - raise KeyError(f"{_element_pretty_identifier(data_element)} has no data element!") - if len(data_element) > 1: - logger.warning(f"{_element_pretty_identifier(data_element)} has more than one data element, " - "using the first one...") - constructed = _failsafe_construct(data_element[0], _construct_data_element, failsafe) - if constructed is not None: - annotated_relationship_element.annotation.add(constructed) - return annotated_relationship_element - - -def _construct_basic_event(element: etree.Element, failsafe: bool, **_kwargs: Any) -> model.BasicEvent: - basic_event = model.BasicEvent( - _child_text_mandatory(element, NS_AAS + "idShort"), - _child_construct_mandatory(element, NS_AAS + "observed", _construct_referable_reference), - kind=_get_modeling_kind(element) - ) - _amend_abstract_attributes(basic_event, element, failsafe) - return basic_event - - -def _construct_blob(element: etree.Element, failsafe: bool, **_kwargs: Any) -> model.Blob: - blob = model.Blob( - _child_text_mandatory(element, NS_AAS + "idShort"), - _child_text_mandatory(element, NS_AAS + "mimeType"), - kind=_get_modeling_kind(element) - ) - value = _get_text_or_none(element.find(NS_AAS + "value")) - if value is not None: - blob.value = base64.b64decode(value) - _amend_abstract_attributes(blob, element, failsafe) - return blob - - -def _construct_capability(element: etree.Element, failsafe: bool, **_kwargs: Any) -> model.Capability: - capability = model.Capability( - _child_text_mandatory(element, NS_AAS + "idShort"), - kind=_get_modeling_kind(element) - ) - _amend_abstract_attributes(capability, element, failsafe) - return capability - - -def _construct_entity(element: etree.Element, failsafe: bool, **_kwargs: Any) -> model.Entity: - entity = model.Entity( - _child_text_mandatory(element, NS_AAS + "idShort"), - _child_text_mandatory_mapped(element, NS_AAS + "entityType", ENTITY_TYPES_INVERSE), - # pass the asset to the constructor, because self managed entities need asset references - asset=_failsafe_construct(element.find(NS_AAS + "assetRef"), _construct_asset_reference, failsafe), - kind=_get_modeling_kind(element) - ) - # TODO: remove wrapping submodelElement, in accordance to future schemas - statements = _get_child_mandatory(element, NS_AAS + "statements") - for submodel_element in _get_all_children_expect_tag(statements, NS_AAS + "submodelElement", failsafe): - if len(submodel_element) == 0: - raise KeyError(f"{_element_pretty_identifier(submodel_element)} has no submodel element!") - if len(submodel_element) > 1: - logger.warning(f"{_element_pretty_identifier(submodel_element)} has more than one submodel element, " - "using the first one...") - constructed = _failsafe_construct(submodel_element[0], _construct_submodel_element, failsafe) - if constructed is not None: - entity.statement.add(constructed) - _amend_abstract_attributes(entity, element, failsafe) - return entity - - -def _construct_file(element: etree.Element, failsafe: bool, **_kwargs: Any) -> model.File: - file = model.File( - _child_text_mandatory(element, NS_AAS + "idShort"), - _child_text_mandatory(element, NS_AAS + "mimeType"), - kind=_get_modeling_kind(element) - ) - value = _get_text_or_none(element.find(NS_AAS + "value")) - if value is not None: - file.value = value - _amend_abstract_attributes(file, element, failsafe) - return file - - -def _construct_multi_language_property(element: etree.Element, failsafe: bool, **_kwargs: Any) \ - -> model.MultiLanguageProperty: - multi_language_property = model.MultiLanguageProperty( - _child_text_mandatory(element, NS_AAS + "idShort"), - kind=_get_modeling_kind(element) - ) - value = _failsafe_construct(element.find(NS_AAS + "value"), _construct_lang_string_set, failsafe) - if value is not None: - multi_language_property.value = value - value_id = _failsafe_construct(element.find(NS_AAS + "valueId"), _construct_reference, failsafe) - if value_id is not None: - multi_language_property.value_id = value_id - _amend_abstract_attributes(multi_language_property, element, failsafe) - return multi_language_property - - -def _construct_operation(element: etree.Element, failsafe: bool, **_kwargs: Any) -> model.Operation: - operation = model.Operation( - _child_text_mandatory(element, NS_AAS + "idShort"), - kind=_get_modeling_kind(element) - ) - for input_variable in _failsafe_construct_multiple(element.findall(NS_AAS + "inputVariable"), - _construct_operation_variable, failsafe): - operation.input_variable.append(input_variable) - for output_variable in _failsafe_construct_multiple(element.findall(NS_AAS + "outputVariable"), - _construct_operation_variable, failsafe): - operation.output_variable.append(output_variable) - for in_output_variable in _failsafe_construct_multiple(element.findall(NS_AAS + "inoutputVariable"), - _construct_operation_variable, failsafe): - operation.in_output_variable.append(in_output_variable) - _amend_abstract_attributes(operation, element, failsafe) - return operation - - -def _construct_property(element: etree.Element, failsafe: bool, **_kwargs: Any) -> model.Property: - property_ = model.Property( - _child_text_mandatory(element, NS_AAS + "idShort"), - value_type=_child_text_mandatory_mapped(element, NS_AAS + "valueType", model.datatypes.XSD_TYPE_CLASSES), - kind=_get_modeling_kind(element) - ) - value = _get_text_or_none(element.find(NS_AAS + "value")) - if value is not None: - property_.value = model.datatypes.from_xsd(value, property_.value_type) - value_id = _failsafe_construct(element.find(NS_AAS + "valueId"), _construct_reference, failsafe) - if value_id is not None: - property_.value_id = value_id - _amend_abstract_attributes(property_, element, failsafe) - return property_ - - -def _construct_range(element: etree.Element, failsafe: bool, **_kwargs: Any) -> model.Range: - range_ = model.Range( - _child_text_mandatory(element, NS_AAS + "idShort"), - value_type=_child_text_mandatory_mapped(element, NS_AAS + "valueType", model.datatypes.XSD_TYPE_CLASSES), - kind=_get_modeling_kind(element) - ) - max_ = _get_text_or_none(element.find(NS_AAS + "max")) - if max_ is not None: - range_.max = model.datatypes.from_xsd(max_, range_.value_type) - min_ = _get_text_or_none(element.find(NS_AAS + "min")) - if min_ is not None: - range_.min = model.datatypes.from_xsd(min_, range_.value_type) - _amend_abstract_attributes(range_, element, failsafe) - return range_ - - -def _construct_reference_element(element: etree.Element, failsafe: bool, **_kwargs: Any) -> model.ReferenceElement: - reference_element = model.ReferenceElement( - _child_text_mandatory(element, NS_AAS + "idShort"), - kind=_get_modeling_kind(element) - ) - value = _failsafe_construct(element.find(NS_AAS + "value"), _construct_referable_reference, failsafe) - if value is not None: - reference_element.value = value - _amend_abstract_attributes(reference_element, element, failsafe) - return reference_element - - -def _construct_relationship_element(element: etree.Element, failsafe: bool, **_kwargs: Any) \ - -> model.RelationshipElement: - return _construct_relationship_element_internal(element, failsafe, model.RelationshipElement, **_kwargs) - - -RE = TypeVar("RE", bound=model.RelationshipElement) - + if decoder is not None: + return decoder + if failsafe: + if stripped: + return StrippedAASFromXmlDecoder + return AASFromXmlDecoder + else: + if stripped: + return StrictStrippedAASFromXmlDecoder + return StrictAASFromXmlDecoder + + +@enum.unique +class XMLConstructables(enum.Enum): + """ + This enum is used to specify which type to construct in read_aas_xml_element(). + """ + KEY = enum.auto() + REFERENCE = enum.auto() + AAS_REFERENCE = enum.auto() + ADMINISTRATIVE_INFORMATION = enum.auto() + QUALIFIER = enum.auto() + FORMULA = enum.auto() + IDENTIFIER = enum.auto() + SECURITY = enum.auto() + VIEW = enum.auto() + CONCEPT_DICTIONARY = enum.auto() + OPERATION_VARIABLE = enum.auto() + ANNOTATED_RELATIONSHIP_ELEMENT = enum.auto() + BASIC_EVENT = enum.auto() + BLOB = enum.auto() + CAPABILITY = enum.auto() + ENTITY = enum.auto() + FILE = enum.auto() + MULTI_LANGUAGE_PROPERTY = enum.auto() + OPERATION = enum.auto() + PROPERTY = enum.auto() + RANGE = enum.auto() + REFERENCE_ELEMENT = enum.auto() + RELATIONSHIP_ELEMENT = enum.auto() + SUBMODEL_ELEMENT_COLLECTION = enum.auto() + ASSET_ADMINISTRATION_SHELL = enum.auto() + ASSET = enum.auto() + SUBMODEL = enum.auto() + VALUE_REFERENCE_PAIR = enum.auto() + IEC61360_CONCEPT_DESCRIPTION = enum.auto() + CONCEPT_DESCRIPTION = enum.auto() + CONSTRAINT = enum.auto() + DATA_ELEMENT = enum.auto() + SUBMODEL_ELEMENT = enum.auto() + VALUE_LIST = enum.auto() + LANG_STRING_SET = enum.auto() + + +def read_aas_xml_element(file: IO, construct: XMLConstructables, failsafe: bool = True, stripped: bool = False, + decoder: Optional[Type[AASFromXmlDecoder]] = None, **constructor_kwargs) -> Optional[object]: + """ + Construct a single object from an XML string. The namespaces have to be declared on the object itself, since there + is no surrounding aasenv element. -def _construct_relationship_element_internal(element: etree.Element, failsafe: bool, - object_class: Type[RE], **_kwargs: Any) -> RE: - relationship_element = object_class( - _child_text_mandatory(element, NS_AAS + "idShort"), - _child_construct_mandatory(element, NS_AAS + "first", _construct_referable_reference), - _child_construct_mandatory(element, NS_AAS + "second", _construct_referable_reference), - kind=_get_modeling_kind(element) - ) - _amend_abstract_attributes(relationship_element, element, failsafe) - return relationship_element - - -def _construct_submodel_element_collection(element: etree.Element, failsafe: bool, **_kwargs: Any) \ - -> model.SubmodelElementCollection: - ordered = _str_to_bool(_child_text_mandatory(element, NS_AAS + "ordered")) - collection_type = model.SubmodelElementCollectionOrdered if ordered else model.SubmodelElementCollectionUnordered - collection = collection_type( - _child_text_mandatory(element, NS_AAS + "idShort"), - kind=_get_modeling_kind(element) - ) - value = _get_child_mandatory(element, NS_AAS + "value") - # TODO: simplify this should our suggestion regarding the XML schema get accepted - # https://git.rwth-aachen.de/acplt/pyaas/-/issues/57 - for submodel_element in _get_all_children_expect_tag(value, NS_AAS + "submodelElement", failsafe): - if len(submodel_element) == 0: - raise KeyError(f"{_element_pretty_identifier(submodel_element)} has no submodel element!") - if len(submodel_element) > 1: - logger.warning(f"{_element_pretty_identifier(submodel_element)} has more than one submodel element, " - "using the first one...") - constructed = _failsafe_construct(submodel_element[0], _construct_submodel_element, failsafe) - if constructed is not None: - collection.value.add(constructed) - _amend_abstract_attributes(collection, element, failsafe) - return collection - - -def _construct_asset_administration_shell(element: etree.Element, failsafe: bool, **_kwargs: Any) \ - -> model.AssetAdministrationShell: - aas = model.AssetAdministrationShell( - _child_construct_mandatory(element, NS_AAS + "assetRef", _construct_asset_reference), - _child_construct_mandatory(element, NS_AAS + "identification", _construct_identifier) - ) - security = _failsafe_construct(element.find(NS_ABAC + "security"), _construct_security, failsafe) - if security is not None: - aas.security = security - submodels = element.find(NS_AAS + "submodelRefs") - if submodels is not None: - for ref in _child_construct_multiple(submodels, NS_AAS + "submodelRef", _construct_submodel_reference, - failsafe): - aas.submodel.add(ref) - views = element.find(NS_AAS + "views") - if views is not None: - for view in _child_construct_multiple(views, NS_AAS + "view", _construct_view, failsafe): - aas.view.add(view) - concept_dictionaries = element.find(NS_AAS + "conceptDictionaries") - if concept_dictionaries is not None: - for cd in _child_construct_multiple(concept_dictionaries, NS_AAS + "conceptDictionary", - _construct_concept_dictionary, failsafe): - aas.concept_dictionary.add(cd) - derived_from = _failsafe_construct(element.find(NS_AAS + "derivedFrom"), - _construct_asset_administration_shell_reference, failsafe) - if derived_from is not None: - aas.derived_from = derived_from - _amend_abstract_attributes(aas, element, failsafe) - return aas - - -def _construct_asset(element: etree.Element, failsafe: bool, **_kwargs: Any) -> model.Asset: - asset = model.Asset( - _child_text_mandatory_mapped(element, NS_AAS + "kind", ASSET_KIND_INVERSE), - _child_construct_mandatory(element, NS_AAS + "identification", _construct_identifier) - ) - asset_identification_model = _failsafe_construct(element.find(NS_AAS + "assetIdentificationModelRef"), - _construct_submodel_reference, failsafe) - if asset_identification_model is not None: - asset.asset_identification_model = asset_identification_model - bill_of_material = _failsafe_construct(element.find(NS_AAS + "billOfMaterialRef"), _construct_submodel_reference, - failsafe) - if bill_of_material is not None: - asset.bill_of_material = bill_of_material - _amend_abstract_attributes(asset, element, failsafe) - return asset - - -def _construct_submodel(element: etree.Element, failsafe: bool, **_kwargs: Any) -> model.Submodel: - submodel = model.Submodel( - _child_construct_mandatory(element, NS_AAS + "identification", _construct_identifier), - kind=_get_modeling_kind(element) - ) - # TODO: simplify this should our suggestion regarding the XML schema get accepted - # https://git.rwth-aachen.de/acplt/pyaas/-/issues/57 - for submodel_element in _get_all_children_expect_tag( - _get_child_mandatory(element, NS_AAS + "submodelElements"), NS_AAS + "submodelElement", failsafe): - if len(submodel_element) == 0: - raise KeyError(f"{_element_pretty_identifier(submodel_element)} has no submodel element!") - if len(submodel_element) > 1: - logger.warning(f"{_element_pretty_identifier(submodel_element)} has more than one submodel element, " - "using the first one...") - constructed = _failsafe_construct(submodel_element[0], _construct_submodel_element, failsafe) - if constructed is not None: - submodel.submodel_element.add(constructed) - _amend_abstract_attributes(submodel, element, failsafe) - return submodel - - -def _construct_value_reference_pair(element: etree.Element, _failsafe: bool, - value_format: Optional[model.DataTypeDef] = None, **_kwargs: Any) \ - -> model.ValueReferencePair: - if value_format is None: - raise ValueError("No value format given!") - return model.ValueReferencePair( - value_format, - model.datatypes.from_xsd(_child_text_mandatory(element, NS_IEC + "value"), value_format), - _child_construct_mandatory(element, NS_IEC + "valueId", _construct_reference, namespace=NS_IEC) - ) - - -def _construct_value_list(element: etree.Element, failsafe: bool, - value_format: Optional[model.DataTypeDef] = None, **_kwargs: Any) \ - -> model.ValueList: - return set( - _child_construct_multiple(element, NS_IEC + "valueReferencePair", _construct_value_reference_pair, failsafe, - value_format=value_format) - ) - - -def _construct_iec61360_concept_description(element: etree.Element, failsafe: bool, - identifier: Optional[model.Identifier] = None, **_kwargs: Any) \ - -> model.IEC61360ConceptDescription: - if identifier is None: - raise ValueError("No identifier given!") - cd = model.IEC61360ConceptDescription( - identifier, - _child_construct_mandatory(element, NS_IEC + "preferredName", _construct_lang_string_set, namespace=NS_IEC), - ) - data_type = _get_text_mapped_or_none(element.find(NS_IEC + "dataType"), IEC61360_DATA_TYPES_INVERSE) - if data_type is not None: - cd.data_type = data_type - definition = _failsafe_construct(element.find(NS_IEC + "definition"), _construct_lang_string_set, failsafe, - namespace=NS_IEC) - if definition is not None: - cd.definition = definition - short_name = _failsafe_construct(element.find(NS_IEC + "shortName"), _construct_lang_string_set, failsafe, - namespace=NS_IEC) - if short_name is not None: - cd.short_name = short_name - unit = _get_text_or_none(element.find(NS_IEC + "unit")) - if unit is not None: - cd.unit = unit - unit_id = _failsafe_construct(element.find(NS_IEC + "unitId"), _construct_reference, failsafe, namespace=NS_IEC) - if unit_id is not None: - cd.unit_id = unit_id - source_of_definition = _get_text_or_none(element.find(NS_IEC + "sourceOfDefinition")) - if source_of_definition is not None: - cd.source_of_definition = source_of_definition - symbol = _get_text_or_none(element.find(NS_IEC + "symbol")) - if symbol is not None: - cd.symbol = symbol - value_format = _get_text_mapped_or_none(element.find(NS_IEC + "valueFormat"), - model.datatypes.XSD_TYPE_CLASSES) - if value_format is not None: - cd.value_format = value_format - value_list = _failsafe_construct(element.find(NS_IEC + "valueList"), _construct_value_list, failsafe, - value_format=value_format) - if value_list is not None: - cd.value_list = value_list - value = _get_text_or_none(element.find(NS_IEC + "value")) - if value is not None and value_format is not None: - cd.value = model.datatypes.from_xsd(value, value_format) - value_id = _failsafe_construct(element.find(NS_IEC + "valueId"), _construct_reference, failsafe, namespace=NS_IEC) - if value_id is not None: - cd.value_id = value_id - for level_type_element in element.findall(NS_IEC + "levelType"): - level_type = _get_text_mapped_or_none(level_type_element, IEC61360_LEVEL_TYPES_INVERSE) - if level_type is None: - error_message = f"{_element_pretty_identifier(level_type_element)} has invalid value: " \ - + str(level_type_element.text) - if not failsafe: - raise ValueError(error_message) - logger.warning(error_message) - continue - cd.level_types.add(level_type) - return cd - - -def _construct_concept_description(element: etree.Element, failsafe: bool, **_kwargs: Any) -> model.ConceptDescription: - cd: Optional[model.ConceptDescription] = None - identifier = _child_construct_mandatory(element, NS_AAS + "identification", _construct_identifier) - # Hack to detect IEC61360ConceptDescriptions, which are represented using dataSpecification according to DotAAS - dspec_tag = NS_AAS + "embeddedDataSpecification" - dspecs = element.findall(dspec_tag) - if len(dspecs) > 1: - logger.warning(f"{_element_pretty_identifier(element)} has more than one " - f"{_tag_replace_namespace(dspec_tag, element.nsmap)}. This model currently supports only one " - f"per {_tag_replace_namespace(element.tag, element.nsmap)}!") - if len(dspecs) > 0: - dspec = dspecs[0] - dspec_content = dspec.find(NS_AAS + "dataSpecificationContent") - if dspec_content is not None: - dspec_ref = _failsafe_construct(dspec.find(NS_AAS + "dataSpecification"), _construct_reference, failsafe) - if dspec_ref is not None and len(dspec_ref.key) > 0 and dspec_ref.key[0].value == \ - "http://admin-shell.io/DataSpecificationTemplates/DataSpecificationIEC61360/2/0": - cd = _failsafe_construct(dspec_content.find(NS_AAS + "dataSpecificationIEC61360"), - _construct_iec61360_concept_description, failsafe, identifier=identifier) - if cd is None: - cd = model.ConceptDescription(identifier) - for ref in _failsafe_construct_multiple(element.findall(NS_AAS + "isCaseOf"), _construct_reference, failsafe): - cd.is_case_of.add(ref) - _amend_abstract_attributes(cd, element, failsafe) - return cd - - -def read_aas_xml_file(file: IO, failsafe: bool = True) -> model.DictObjectStore[model.Identifiable]: + :param file: A filename or file-like object to read the XML-serialized data from + :param construct: A member of the enum XML_CONSTRUCTABLES, specifying which type to construct. + :param failsafe: If true, the document is parsed in a failsafe way: missing attributes and elements are logged + instead of causing exceptions. Defect objects are skipped. + This parameter is ignored if a decoder class is specified. + :param stripped: If true, stripped XML elements are parsed. + See https://git.rwth-aachen.de/acplt/pyi40aas/-/issues/91 + This parameter is ignored if a decoder class is specified. + :param decoder: The decoder class used to decode the XML elements + :param constructor_kwargs: Keyword arguments passed to the constructor function + :return: The constructed object or None, if an error occurred in failsafe mode. + """ + decoder_ = _select_decoder(failsafe, stripped, decoder) + constructor: Callable[..., object] + + if construct == XMLConstructables.KEY: + constructor = decoder_.construct_key + elif construct == XMLConstructables.REFERENCE: + constructor = decoder_.construct_reference + elif construct == XMLConstructables.AAS_REFERENCE: + constructor = decoder_.construct_aas_reference + elif construct == XMLConstructables.ADMINISTRATIVE_INFORMATION: + constructor = decoder_.construct_administrative_information + elif construct == XMLConstructables.QUALIFIER: + constructor = decoder_.construct_qualifier + elif construct == XMLConstructables.FORMULA: + constructor = decoder_.construct_formula + elif construct == XMLConstructables.IDENTIFIER: + constructor = decoder_.construct_identifier + elif construct == XMLConstructables.SECURITY: + constructor = decoder_.construct_security + elif construct == XMLConstructables.VIEW: + constructor = decoder_.construct_view + elif construct == XMLConstructables.CONCEPT_DICTIONARY: + constructor = decoder_.construct_concept_dictionary + elif construct == XMLConstructables.OPERATION_VARIABLE: + constructor = decoder_.construct_operation_variable + elif construct == XMLConstructables.ANNOTATED_RELATIONSHIP_ELEMENT: + constructor = decoder_.construct_annotated_relationship_element + elif construct == XMLConstructables.BASIC_EVENT: + constructor = decoder_.construct_basic_event + elif construct == XMLConstructables.BLOB: + constructor = decoder_.construct_blob + elif construct == XMLConstructables.CAPABILITY: + constructor = decoder_.construct_capability + elif construct == XMLConstructables.ENTITY: + constructor = decoder_.construct_entity + elif construct == XMLConstructables.FILE: + constructor = decoder_.construct_file + elif construct == XMLConstructables.MULTI_LANGUAGE_PROPERTY: + constructor = decoder_.construct_multi_language_property + elif construct == XMLConstructables.OPERATION: + constructor = decoder_.construct_operation + elif construct == XMLConstructables.PROPERTY: + constructor = decoder_.construct_property + elif construct == XMLConstructables.RANGE: + constructor = decoder_.construct_range + elif construct == XMLConstructables.REFERENCE_ELEMENT: + constructor = decoder_.construct_reference_element + elif construct == XMLConstructables.RELATIONSHIP_ELEMENT: + constructor = decoder_.construct_relationship_element + elif construct == XMLConstructables.SUBMODEL_ELEMENT_COLLECTION: + constructor = decoder_.construct_submodel_element_collection + elif construct == XMLConstructables.ASSET_ADMINISTRATION_SHELL: + constructor = decoder_.construct_asset_administration_shell + elif construct == XMLConstructables.ASSET: + constructor = decoder_.construct_asset + elif construct == XMLConstructables.SUBMODEL: + constructor = decoder_.construct_submodel + elif construct == XMLConstructables.VALUE_REFERENCE_PAIR: + constructor = decoder_.construct_value_reference_pair + elif construct == XMLConstructables.IEC61360_CONCEPT_DESCRIPTION: + constructor = decoder_.construct_iec61360_concept_description + elif construct == XMLConstructables.CONCEPT_DESCRIPTION: + constructor = decoder_.construct_concept_description + # the following constructors decide which constructor to call based on the elements tag + elif construct == XMLConstructables.CONSTRAINT: + constructor = decoder_.construct_constraint + elif construct == XMLConstructables.DATA_ELEMENT: + constructor = decoder_.construct_data_element + elif construct == XMLConstructables.SUBMODEL_ELEMENT: + constructor = decoder_.construct_submodel_element + # type aliases + elif construct == XMLConstructables.VALUE_LIST: + constructor = decoder_.construct_value_list + elif construct == XMLConstructables.LANG_STRING_SET: + constructor = decoder_.construct_lang_string_set + else: + raise ValueError(f"{construct.name} cannot be constructed!") + + element = _parse_xml_document(file, failsafe=decoder_.failsafe) + return _failsafe_construct(element, constructor, decoder_.failsafe, **constructor_kwargs) + + +def read_aas_xml_file_into(object_store: model.AbstractObjectStore[model.Identifiable], file: IO, + replace_existing: bool = False, ignore_existing: bool = False, failsafe: bool = True, + stripped: bool = False, decoder: Optional[Type[AASFromXmlDecoder]] = None, + **parser_kwargs: Any) -> Set[model.Identifier]: """ Read an Asset Administration Shell XML file according to 'Details of the Asset Administration Shell', chapter 5.4 + into a given object store. + :param object_store: The object store in which the identifiable objects should be stored :param file: A filename or file-like object to read the XML-serialized data from - :param failsafe: If True, the file is parsed in a failsafe way: Instead of raising an Exception for missing - attributes and wrong types, errors are logged and defective objects are skipped - :return: A DictObjectStore containing all AAS objects from the XML file + :param replace_existing: Whether to replace existing objects with the same identifier in the object store or not + :param ignore_existing: Whether to ignore existing objects (e.g. log a message) or raise an error. + This parameter is ignored if replace_existing is True. + :param failsafe: If true, the document is parsed in a failsafe way: missing attributes and elements are logged + instead of causing exceptions. Defect objects are skipped. + This parameter is ignored if a decoder class is specified. + :param stripped: If true, stripped XML elements are parsed. + See https://git.rwth-aachen.de/acplt/pyi40aas/-/issues/91 + This parameter is ignored if a decoder class is specified. + :param decoder: The decoder class used to decode the XML elements + :param parser_kwargs: Keyword arguments passed to the XMLParser constructor + :return: A set of identifiers that were added to object_store """ + ret: Set[model.Identifier] = set() - element_constructors = {NS_AAS + k: v for k, v in { - "assetAdministrationShell": _construct_asset_administration_shell, - "asset": _construct_asset, - "submodel": _construct_submodel, - "conceptDescription": _construct_concept_description - }.items()} + decoder_ = _select_decoder(failsafe, stripped, decoder) - ret: model.DictObjectStore[model.Identifiable] = model.DictObjectStore() - parser = etree.XMLParser(remove_blank_text=True, remove_comments=True) + element_constructors: Dict[str, Callable[..., model.Identifiable]] = { + "assetAdministrationShell": decoder_.construct_asset_administration_shell, + "asset": decoder_.construct_asset, + "submodel": decoder_.construct_submodel, + "conceptDescription": decoder_.construct_concept_description + } - try: - tree = etree.parse(file, parser) - except etree.XMLSyntaxError as e: - if failsafe: - logger.error(e) - return ret - raise e + element_constructors = {NS_AAS + k: v for k, v in element_constructors.items()} + + root = _parse_xml_document(file, failsafe=decoder_.failsafe, **parser_kwargs) - root = tree.getroot() + if root is None: + return ret # Add AAS objects to ObjectStore for list_ in root: element_tag = list_.tag[:-1] if list_.tag[-1] != "s" or element_tag not in element_constructors: error_message = f"Unexpected top-level list {_element_pretty_identifier(list_)}!" - if not failsafe: + if not decoder_.failsafe: raise TypeError(error_message) logger.warning(error_message) continue constructor = element_constructors[element_tag] - for element in _child_construct_multiple(list_, element_tag, constructor, failsafe): - # element is always Identifiable, because the tag is checked earlier - # this is just to satisfy the type checker - if isinstance(element, model.Identifiable): - ret.add(element) + for element in _child_construct_multiple(list_, element_tag, constructor, decoder_.failsafe): + if element.identification in ret: + error_message = f"{element} has a duplicate identifier already parsed in the document!" + if not decoder_.failsafe: + raise KeyError(error_message) + logger.error(error_message + " skipping it...") + continue + existing_element = object_store.get(element.identification) + if existing_element is not None: + if not replace_existing: + error_message = f"object with identifier {element.identification} already exists " \ + f"in the object store: {existing_element}!" + if not ignore_existing: + raise KeyError(error_message + f" failed to insert {element}!") + logger.info(error_message + f" skipping insertion of {element}...") + continue + object_store.discard(existing_element) + object_store.add(element) + ret.add(element.identification) return ret + + +def read_aas_xml_file(file: IO, **kwargs: Any) -> model.DictObjectStore[model.Identifiable]: + """ + A wrapper of read_aas_xml_file_into(), that reads all objects in an empty DictObjectStore. This function supports + the same keyword arguments as read_aas_xml_file_into(). + + :param file: A filename or file-like object to read the XML-serialized data from + :param kwargs: Keyword arguments passed to read_aas_xml_file_into() + :return: A DictObjectStore containing all AAS objects from the XML file + """ + object_store: model.DictObjectStore[model.Identifiable] = model.DictObjectStore() + read_aas_xml_file_into(object_store, file, **kwargs) + return object_store diff --git a/aas/compliance_tool/compliance_check_xml.py b/aas/compliance_tool/compliance_check_xml.py index 3ae7a7dfd0075bb316c0ec2f12e6f0572c015a4c..3526124499991e11761fefe2c2f2aa5c6376957d 100644 --- a/aas/compliance_tool/compliance_check_xml.py +++ b/aas/compliance_tool/compliance_check_xml.py @@ -142,7 +142,7 @@ def check_deserialization(file_path: str, state_manager: ComplianceToolStateMana state_manager.add_step('Read file {} and check if it is deserializable'.format(file_info)) else: state_manager.add_step('Read file and check if it is deserializable') - obj_store = xml_deserialization.read_aas_xml_file(file_to_be_checked, True) + obj_store = xml_deserialization.read_aas_xml_file(file_to_be_checked, failsafe=True) state_manager.set_step_status_from_log() diff --git a/aas/model/__init__.py b/aas/model/__init__.py index 2b4efeab62f6359e9675a7a95a6be1dc95ee750c..8c1c6910b381815c081cfe63b41f3b7d8d92e0b6 100644 --- a/aas/model/__init__.py +++ b/aas/model/__init__.py @@ -36,7 +36,7 @@ from . import datatypes # A mapping of PyI40AAS implementation classes to the corresponding `KeyElements` enum members for all classes that are # covered by this enum. -KEY_ELEMENTS_CLASSES: Dict[type, KeyElements] = { +KEY_ELEMENTS_CLASSES: Dict[Type[Referable], KeyElements] = { Asset: KeyElements.ASSET, AssetAdministrationShell: KeyElements.ASSET_ADMINISTRATION_SHELL, ConceptDescription: KeyElements.CONCEPT_DESCRIPTION, diff --git a/aas/model/base.py b/aas/model/base.py index 08c02edf3a443a4361880604ab54f44aad95a99b..c728fc38712ed264d16346e40d05cf36a4f5df7b 100644 --- a/aas/model/base.py +++ b/aas/model/base.py @@ -415,7 +415,7 @@ class Referable(metaclass=abc.ABCMeta): def __init__(self): super().__init__() - self.id_short: Optional[str] = "" + self._id_short: Optional[str] = "" self.category: Optional[str] = "" self.description: Optional[LangStringSet] = set() # We use a Python reference to the parent Namespace instead of a Reference Object, as specified. This allows @@ -450,8 +450,12 @@ class Referable(metaclass=abc.ABCMeta): Constraint AASd-002: idShort shall only feature letters, digits, underscore ('_'); starting mandatory with a letter + Additionally check that the idShort is not already present in the same parent Namespace (if this object is + already contained in a parent Namespace). + :param id_short: Identifying string of the element within its name space - :raises: Exception if the constraint is not fulfilled + :raises ValueError: if the constraint is not fulfilled + :raises KeyError: if the new idShort causes a name collision in the parent Namespace """ if id_short is None and not hasattr(self, 'identification'): @@ -461,6 +465,13 @@ class Referable(metaclass=abc.ABCMeta): raise ValueError("The id_short must contain only letters, digits and underscore") if not re.match("^([a-zA-Z].*|)$", test_id_short): raise ValueError("The id_short must start with a letter") + + if self.parent is not None and id_short != self.id_short: + for set_ in self.parent.namespace_element_sets: + if id_short in set_: + raise KeyError("Referable with id_short '{}' is already present in the parent Namespace" + .format(id_short)) + self._id_short = id_short def update(self, timeout: float = 0) -> None: diff --git a/test/adapter/xml/test_xml_deserialization.py b/test/adapter/xml/test_xml_deserialization.py index 27c523b879b1b9ab76eaaafad8b6ee7a45d8d787..bba30f0a9bf0a5a0f40ba600d258b26c0361adc9 100644 --- a/test/adapter/xml/test_xml_deserialization.py +++ b/test/adapter/xml/test_xml_deserialization.py @@ -14,7 +14,7 @@ import logging import unittest from aas import model -from aas.adapter.xml import read_aas_xml_file +from aas.adapter.xml import XMLConstructables, read_aas_xml_file, read_aas_xml_file_into, read_aas_xml_element from lxml import etree # type: ignore from typing import Iterable, Type, Union @@ -52,13 +52,12 @@ class XMLDeserializationTest(unittest.TestCase): strings = [strings] bytes_io = io.BytesIO(xml.encode("utf-8")) with self.assertLogs(logging.getLogger(), level=log_level) as log_ctx: - read_aas_xml_file(bytes_io, True) - for s in strings: - self.assertIn(s, log_ctx.output[0]) + read_aas_xml_file(bytes_io, failsafe=True) with self.assertRaises(error_type) as err_ctx: - read_aas_xml_file(bytes_io, False) + read_aas_xml_file(bytes_io, failsafe=False) cause = _root_cause(err_ctx.exception) for s in strings: + self.assertIn(s, log_ctx.output[0]) self.assertIn(s, str(cause)) def test_malformed_xml(self) -> None: @@ -70,9 +69,9 @@ class XMLDeserializationTest(unittest.TestCase): for s in xml: bytes_io = io.BytesIO(s.encode("utf-8")) with self.assertRaises(etree.XMLSyntaxError): - read_aas_xml_file(bytes_io, False) + read_aas_xml_file(bytes_io, failsafe=False) with self.assertLogs(logging.getLogger(), level=logging.ERROR): - read_aas_xml_file(bytes_io, True) + read_aas_xml_file(bytes_io, failsafe=True) def test_invalid_list_name(self) -> None: xml = _xml_wrap("<aas:invalidList></aas:invalidList>") @@ -162,11 +161,11 @@ class XMLDeserializationTest(unittest.TestCase): </aas:submodels> """) # should get parsed successfully - object_store = read_aas_xml_file(io.BytesIO(xml.encode("utf-8")), False) + object_store = read_aas_xml_file(io.BytesIO(xml.encode("utf-8")), failsafe=False) # modeling kind should default to INSTANCE submodel = object_store.pop() self.assertIsInstance(submodel, model.Submodel) - assert(isinstance(submodel, model.Submodel)) # to make mypy happy + assert isinstance(submodel, model.Submodel) # to make mypy happy self.assertEqual(submodel.kind, model.ModelingKind.INSTANCE) def test_reference_kind_mismatch(self) -> None: @@ -183,14 +182,13 @@ class XMLDeserializationTest(unittest.TestCase): </aas:assetAdministrationShells> """) with self.assertLogs(logging.getLogger(), level=logging.WARNING) as context: - read_aas_xml_file(io.BytesIO(xml.encode("utf-8")), False) - self.assertIn("GLOBAL_REFERENCE", context.output[0]) - self.assertIn("IRI=http://acplt.org/test_ref", context.output[0]) - self.assertIn("Asset", context.output[0]) + read_aas_xml_file(io.BytesIO(xml.encode("utf-8")), failsafe=False) + for s in ("GLOBAL_REFERENCE", "IRI=http://acplt.org/test_ref", "Asset"): + self.assertIn(s, context.output[0]) def test_invalid_submodel_element(self) -> None: # TODO: simplify this should our suggestion regarding the XML schema get accepted - # https://git.rwth-aachen.de/acplt/pyaas/-/issues/57 + # https://git.rwth-aachen.de/acplt/pyi40aas/-/issues/57 xml = _xml_wrap(""" <aas:submodels> <aas:submodel> @@ -207,7 +205,7 @@ class XMLDeserializationTest(unittest.TestCase): def test_invalid_constraint(self) -> None: # TODO: simplify this should our suggestion regarding the XML schema get accepted - # https://git.rwth-aachen.de/acplt/pyaas/-/issues/56 + # https://git.rwth-aachen.de/acplt/pyi40aas/-/issues/57 xml = _xml_wrap(""" <aas:submodels> <aas:submodel> @@ -223,7 +221,7 @@ class XMLDeserializationTest(unittest.TestCase): def test_operation_variable_no_submodel_element(self) -> None: # TODO: simplify this should our suggestion regarding the XML schema get accepted - # https://git.rwth-aachen.de/acplt/pyaas/-/issues/57 + # https://git.rwth-aachen.de/acplt/pyi40aas/-/issues/57 xml = _xml_wrap(""" <aas:submodels> <aas:submodel> @@ -245,7 +243,7 @@ class XMLDeserializationTest(unittest.TestCase): def test_operation_variable_too_many_submodel_elements(self) -> None: # TODO: simplify this should our suggestion regarding the XML schema get accepted - # https://git.rwth-aachen.de/acplt/pyaas/-/issues/57 + # https://git.rwth-aachen.de/acplt/pyi40aas/-/issues/57 xml = _xml_wrap(""" <aas:submodels> <aas:submodel> @@ -273,5 +271,220 @@ class XMLDeserializationTest(unittest.TestCase): </aas:submodels> """) with self.assertLogs(logging.getLogger(), level=logging.WARNING) as context: - read_aas_xml_file(io.BytesIO(xml.encode("utf-8")), False) + read_aas_xml_file(io.BytesIO(xml.encode("utf-8")), failsafe=False) self.assertIn("aas:value", context.output[0]) + + def test_duplicate_identifier(self) -> None: + xml = _xml_wrap(""" + <aas:assetAdministrationShells> + <aas:assetAdministrationShell> + <aas:identification idType="IRI">http://acplt.org/test_aas</aas:identification> + <aas:assetRef> + <aas:keys> + <aas:key idType="IRI" local="false" type="Asset">http://acplt.org/asset_ref</aas:key> + </aas:keys> + </aas:assetRef> + </aas:assetAdministrationShell> + </aas:assetAdministrationShells> + <aas:submodels> + <aas:submodel> + <aas:identification idType="IRI">http://acplt.org/test_aas</aas:identification> + <aas:submodelElements/> + </aas:submodel> + </aas:submodels> + """) + self._assertInExceptionAndLog(xml, "duplicate identifier", KeyError, logging.ERROR) + + def test_duplicate_identifier_object_store(self) -> None: + sm_id = model.Identifier("http://acplt.org/test_submodel", model.IdentifierType.IRI) + + def get_clean_store() -> model.DictObjectStore: + store: model.DictObjectStore = model.DictObjectStore() + submodel_ = model.Submodel(sm_id, id_short="test123") + store.add(submodel_) + return store + + xml = _xml_wrap(""" + <aas:submodels> + <aas:submodel> + <aas:identification idType="IRI">http://acplt.org/test_submodel</aas:identification> + <aas:idShort>test456</aas:idShort> + <aas:submodelElements/> + </aas:submodel> + </aas:submodels> + """) + bytes_io = io.BytesIO(xml.encode("utf-8")) + + object_store = get_clean_store() + identifiers = read_aas_xml_file_into(object_store, bytes_io, replace_existing=True, ignore_existing=False) + self.assertEqual(identifiers.pop(), sm_id) + submodel = object_store.pop() + self.assertIsInstance(submodel, model.Submodel) + self.assertEqual(submodel.id_short, "test456") + + object_store = get_clean_store() + with self.assertLogs(logging.getLogger(), level=logging.INFO) as log_ctx: + identifiers = read_aas_xml_file_into(object_store, bytes_io, replace_existing=False, ignore_existing=True) + self.assertEqual(len(identifiers), 0) + self.assertIn("already exists in the object store", log_ctx.output[0]) + submodel = object_store.pop() + self.assertIsInstance(submodel, model.Submodel) + self.assertEqual(submodel.id_short, "test123") + + object_store = get_clean_store() + with self.assertRaises(KeyError) as err_ctx: + identifiers = read_aas_xml_file_into(object_store, bytes_io, replace_existing=False, ignore_existing=False) + self.assertEqual(len(identifiers), 0) + cause = _root_cause(err_ctx.exception) + self.assertIn("already exists in the object store", str(cause)) + submodel = object_store.pop() + self.assertIsInstance(submodel, model.Submodel) + self.assertEqual(submodel.id_short, "test123") + + def test_read_aas_xml_element(self) -> None: + xml = """ + <aas:submodel xmlns:aas="http://www.admin-shell.io/aas/2/0"> + <aas:identification idType="IRI">http://acplt.org/test_submodel</aas:identification> + <aas:submodelElements/> + </aas:submodel> + """ + bytes_io = io.BytesIO(xml.encode("utf-8")) + + submodel = read_aas_xml_element(bytes_io, XMLConstructables.SUBMODEL) + self.assertIsInstance(submodel, model.Submodel) + + def test_stripped_qualifiable(self) -> None: + xml = """ + <aas:submodel xmlns:aas="http://www.admin-shell.io/aas/2/0"> + <aas:identification idType="IRI">http://acplt.org/test_stripped_submodel</aas:identification> + <aas:submodelElements> + <aas:submodelElement> + <aas:operation> + <aas:idShort>test_operation</aas:idShort> + <aas:qualifier> + <aas:qualifier> + <aas:type>test_qualifier</aas:type> + <aas:valueType>string</aas:valueType> + </aas:qualifier> + </aas:qualifier> + </aas:operation> + </aas:submodelElement> + </aas:submodelElements> + <aas:qualifier> + <aas:qualifier> + <aas:type>test_qualifier</aas:type> + <aas:valueType>string</aas:valueType> + </aas:qualifier> + </aas:qualifier> + </aas:submodel> + """ + bytes_io = io.BytesIO(xml.encode("utf-8")) + + # check if XML with constraints can be parsed successfully + submodel = read_aas_xml_element(bytes_io, XMLConstructables.SUBMODEL, failsafe=False) + self.assertIsInstance(submodel, model.Submodel) + assert isinstance(submodel, model.Submodel) + self.assertEqual(len(submodel.qualifier), 1) + + # check if constraints are ignored in stripped mode + submodel = read_aas_xml_element(bytes_io, XMLConstructables.SUBMODEL, failsafe=False, stripped=True) + self.assertIsInstance(submodel, model.Submodel) + assert isinstance(submodel, model.Submodel) + self.assertEqual(len(submodel.qualifier), 0) + + def test_stripped_annotated_relationship_element(self) -> None: + xml = """ + <aas:annotatedRelationshipElement xmlns:aas="http://www.admin-shell.io/aas/2/0"> + <aas:idShort>test_annotated_relationship_element</aas:idShort> + <aas:first> + <aas:keys> + <aas:key idType="IdShort" local="true" type="AnnotatedRelationshipElement">test_ref</aas:key> + </aas:keys> + </aas:first> + <aas:second> + <aas:keys> + <aas:key idType="IdShort" local="true" type="AnnotatedRelationshipElement">test_ref</aas:key> + </aas:keys> + </aas:second> + </aas:annotatedRelationshipElement> + """ + bytes_io = io.BytesIO(xml.encode("utf-8")) + + # XML schema requires annotations to be present, so parsing should fail + with self.assertRaises(KeyError): + read_aas_xml_element(bytes_io, XMLConstructables.ANNOTATED_RELATIONSHIP_ELEMENT, failsafe=False) + + # check if it can be parsed in stripped mode + read_aas_xml_element(bytes_io, XMLConstructables.ANNOTATED_RELATIONSHIP_ELEMENT, failsafe=False, stripped=True) + + def test_stripped_entity(self) -> None: + xml = """ + <aas:entity xmlns:aas="http://www.admin-shell.io/aas/2/0"> + <aas:idShort>test_entity</aas:idShort> + <aas:entityType>CoManagedEntity</aas:entityType> + </aas:entity> + """ + bytes_io = io.BytesIO(xml.encode("utf-8")) + + # XML schema requires statements to be present, so parsing should fail + with self.assertRaises(KeyError): + read_aas_xml_element(bytes_io, XMLConstructables.ENTITY, failsafe=False) + + # check if it can be parsed in stripped mode + read_aas_xml_element(bytes_io, XMLConstructables.ENTITY, failsafe=False, stripped=True) + + def test_stripped_submodel_element_collection(self) -> None: + xml = """ + <aas:submodelElementCollection xmlns:aas="http://www.admin-shell.io/aas/2/0"> + <aas:idShort>test_collection</aas:idShort> + <aas:ordered>false</aas:ordered> + </aas:submodelElementCollection> + """ + bytes_io = io.BytesIO(xml.encode("utf-8")) + + # XML schema requires statements to be present, so parsing should fail + with self.assertRaises(KeyError): + read_aas_xml_element(bytes_io, XMLConstructables.SUBMODEL_ELEMENT_COLLECTION, failsafe=False) + + # check if it can be parsed in stripped mode + read_aas_xml_element(bytes_io, XMLConstructables.SUBMODEL_ELEMENT_COLLECTION, failsafe=False, stripped=True) + + def test_stripped_asset_administration_shell(self) -> None: + xml = """ + <aas:assetAdministrationShell xmlns:aas="http://www.admin-shell.io/aas/2/0"> + <aas:identification idType="IRI">http://acplt.org/test_aas</aas:identification> + <aas:assetRef> + <aas:keys> + <aas:key idType="IRI" local="false" type="Asset">http://acplt.org/test_ref</aas:key> + </aas:keys> + </aas:assetRef> + <aas:submodelRefs> + <aas:submodelRef> + <aas:keys> + <aas:key idType="IRI" local="false" type="Submodel">http://acplt.org/test_ref</aas:key> + </aas:keys> + </aas:submodelRef> + </aas:submodelRefs> + <aas:views> + <aas:view> + <aas:idShort>test_view</aas:idShort> + </aas:view> + </aas:views> + </aas:assetAdministrationShell> + """ + bytes_io = io.BytesIO(xml.encode("utf-8")) + + # check if XML with constraints can be parsed successfully + aas = read_aas_xml_element(bytes_io, XMLConstructables.ASSET_ADMINISTRATION_SHELL, failsafe=False) + self.assertIsInstance(aas, model.AssetAdministrationShell) + assert isinstance(aas, model.AssetAdministrationShell) + self.assertEqual(len(aas.submodel), 1) + self.assertEqual(len(aas.view), 1) + + # check if constraints are ignored in stripped mode + aas = read_aas_xml_element(bytes_io, XMLConstructables.ASSET_ADMINISTRATION_SHELL, failsafe=False, + stripped=True) + self.assertIsInstance(aas, model.AssetAdministrationShell) + assert isinstance(aas, model.AssetAdministrationShell) + self.assertEqual(len(aas.submodel), 0) + self.assertEqual(len(aas.view), 0) diff --git a/test/model/test_base.py b/test/model/test_base.py index 3f9002157029f03298196bdac2628b074e0efb88..7680f438fd72eaa9513ce037de75773e2dceb34b 100644 --- a/test/model/test_base.py +++ b/test/model/test_base.py @@ -184,6 +184,17 @@ class ModelNamespaceTest(unittest.TestCase): namespace.get_referable("Prop3") self.assertEqual("'Referable with id_short Prop3 not found in this namespace'", str(cm.exception)) + def test_renaming(self) -> None: + self.namespace.set1.add(self.prop1) + self.namespace.set1.add(self.prop2) + + self.prop1.id_short = "Prop3" + self.assertEqual("Prop3", self.prop1.id_short) + + with self.assertRaises(KeyError) as cm: + self.prop1.id_short = "Prop2" + self.assertIn("already present", str(cm.exception)) + def test_Namespaceset_update_from(self) -> None: # Prop1 is getting its value updated by namespace2.set1 # Prop2 is getting deleted since it does not exist in namespace2.set1