xml_deserialization.py 16.8 KB
Newer Older
1
2
3
4
5
6
7
8
9
10
11
12
13
# Copyright 2019 PyI40AAS Contributors
#
# Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except in compliance with
# the License. You may obtain a copy of the License at
#
#     http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software distributed under the License is distributed on an
# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the License for the
# specific language governing permissions and limitations under the License.
"""
Module for deserializing Asset Administration Shell data from the official XML format
"""
14

15
# TODO: add constructor for submodel + all classes required by submodel
16
17
18

from ... import model
import xml.etree.ElementTree as ElTree
19
import logging
20

21
from typing import Any, Callable, Dict, IO, Iterable, List, Optional, Set, Tuple, Type, TypeVar
22
from .xml_serialization import NS_AAS, NS_AAS_COMMON, NS_ABAC, NS_IEC, NS_XSI
23
24
25
from .._generic import MODELING_KIND_INVERSE, ASSET_KIND_INVERSE, KEY_ELEMENTS_INVERSE, KEY_TYPES_INVERSE,\
    IDENTIFIER_TYPES_INVERSE, ENTITY_TYPES_INVERSE, IEC61360_DATA_TYPES_INVERSE, IEC61360_LEVEL_TYPES_INVERSE,\
    KEY_ELEMENTS_CLASSES_INVERSE
26

27
28
logger = logging.getLogger(__name__)

29
T = TypeVar('T')
30

31

32
33
34
35
36
def _get_child_mandatory(element: ElTree.Element, child_tag: str) -> ElTree.Element:
    child = element.find(child_tag)
    if child is None:
        raise KeyError(f"XML element {element.tag} has no child {child_tag}!")
    return child
37
38


39
40
41
42
43
44
45
46
47
48
49
def _get_attrib_mandatory(element: ElTree.Element, attrib: str) -> str:
    if attrib not in element.attrib:
        raise KeyError(f"XML element {element.tag} has no attribute with name {attrib}!")
    return element.attrib[attrib]


def _get_attrib_mandatory_mapped(element: ElTree.Element, attrib: str, dct: Dict[str, T]) -> T:
    attrib_value = _get_attrib_mandatory(element, attrib)
    if attrib_value not in dct:
        raise ValueError(f"Attribute {attrib} of XML element {element.tag} has invalid value: {attrib_value}")
    return dct[attrib_value]
50
51


52
53
54
55
def _get_text_or_none(element: Optional[ElTree.Element]) -> Optional[str]:
    return element.text if element is not None else None


56
57
def _get_text_mandatory(element: ElTree.Element) -> str:
    text = element.text
58
    if text is None:
59
        raise KeyError(f"XML element {element.tag} has no text!")
60
61
62
    return text


63
64
65
66
67
68
69
70
71
def _get_text_mandatory_mapped(element: ElTree.Element, dct: Dict[str, T]) -> T:
    text = _get_text_mandatory(element)
    if text not in dct:
        raise ValueError(f"Text of XML element {element.tag} is invalid: {text}")
    return dct[text]


def _constructor_name_to_typename(constructor: Callable[[ElTree.Element, bool], T]) -> str:
    return "".join([s[0].upper() + s[1:] for s in constructor.__name__.split("_")[2:]])
72
73


74
75
76
77
78
79
80
def _exception_to_str(exception: BaseException) -> str:
    string = str(exception)
    return string[1:-1] if isinstance(exception, KeyError) else string


def _failsafe_construct(element: Optional[ElTree.Element], constructor: Callable[..., T], failsafe: bool,
                        **kwargs: Any) -> Optional[T]:
81
82
    if element is None:
        return None
83
    try:
84
        return constructor(element, failsafe, **kwargs)
85
86
87
    except (KeyError, ValueError) as e:
        error_message = f"while converting XML element with tag {element.tag} to "\
                        f"type {_constructor_name_to_typename(constructor)}"
88
89
        if not failsafe:
            raise type(e)(error_message) from e
90
91
92
93
94
95
96
        error_type = type(e).__name__
        cause: Optional[BaseException] = e
        while cause is not None:
            error_message = _exception_to_str(cause) + "\n -> " + error_message
            cause = cause.__cause__
        logger.error(error_type + ": " + error_message)
        logger.error(f"Failed to construct {_constructor_name_to_typename(constructor)}!")
97
        return None
98

99

100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
def _failsafe_construct_multiple(elements: Iterable[ElTree.Element], constructor: Callable[..., T], failsafe: bool,
                                 **kwargs: Any) -> Iterable[T]:
    for element in elements:
        parsed = _failsafe_construct(element, constructor, failsafe, **kwargs)
        if parsed is not None:
            yield parsed


def _find_and_construct_mandatory(element: ElTree.Element, child_tag: str, constructor: Callable[..., T],
                                  **kwargs: Any) -> T:
    constructed = _failsafe_construct(_get_child_mandatory(element, child_tag), constructor, False, **kwargs)
    if constructed is None:
        raise TypeError("The result of a non-failsafe _failsafe_construct() call was None! "
                        "This is a bug in the pyAAS XML deserialization, please report it!")
    return constructed
115
116


117
def _construct_key(element: ElTree.Element, _failsafe: bool, **_kwargs: Any) -> model.Key:
118
    return model.Key(
119
120
        _get_attrib_mandatory_mapped(element, "type", KEY_ELEMENTS_INVERSE),
        _get_attrib_mandatory(element, "local").lower() == "true",
121
        _get_text_mandatory(element),
122
        _get_attrib_mandatory_mapped(element, "idType", KEY_TYPES_INVERSE)
123
124
125
    )


126
def _construct_key_tuple(element: ElTree.Element, failsafe: bool, **_kwargs: Any) -> Tuple[model.Key, ...]:
127
128
    keys = _get_child_mandatory(element, NS_AAS + "keys")
    return tuple(_failsafe_construct_multiple(keys.findall(NS_AAS + "key"), _construct_key, failsafe))
129
130


131
def _construct_reference(element: ElTree.Element, failsafe: bool, **_kwargs: Any) -> model.Reference:
132
133
134
    return model.Reference(_construct_key_tuple(element, failsafe))


135
136
137
138
139
140
141
142
143
144
def _construct_aas_reference(element: ElTree.Element, failsafe: bool, type_: Type[model.base._RT], **_kwargs: Any)\
        -> model.AASReference[model.base._RT]:
    keys = _construct_key_tuple(element, failsafe)
    if len(keys) != 0 and not issubclass(KEY_ELEMENTS_CLASSES_INVERSE.get(keys[-1].type, type(None)), type_):
        logger.warning(f"Type {keys[-1].type.name} of last key of reference to {' / '.join(str(k) for k in keys)} "
                       f"does not match reference type {type_.__name__}")
    return model.AASReference(keys, type_)


def _construct_submodel_reference(element: ElTree.Element, failsafe: bool, **kwargs: Any)\
145
        -> model.AASReference[model.Submodel]:
146
    return _construct_aas_reference(element, failsafe, model.Submodel, **kwargs)
147
148


149
def _construct_asset_reference(element: ElTree.Element, failsafe: bool, **kwargs: Any)\
150
        -> model.AASReference[model.Asset]:
151
    return _construct_aas_reference(element, failsafe, model.Asset, **kwargs)
152
153


154
def _construct_asset_administration_shell_reference(element: ElTree.Element, failsafe: bool, **kwargs: Any)\
155
        -> model.AASReference[model.AssetAdministrationShell]:
156
    return _construct_aas_reference(element, failsafe, model.AssetAdministrationShell, **kwargs)
157
158


159
def _construct_referable_reference(element: ElTree.Element, failsafe: bool, **kwargs: Any)\
160
        -> model.AASReference[model.Referable]:
161
    return _construct_aas_reference(element, failsafe, model.Referable, **kwargs)
162
163


164
def _construct_concept_description_reference(element: ElTree.Element, failsafe: bool, **kwargs: Any)\
165
        -> model.AASReference[model.ConceptDescription]:
166
    return _construct_aas_reference(element, failsafe, model.ConceptDescription, **kwargs)
167
168


169
170
def _construct_administrative_information(element: ElTree.Element, _failsafe: bool, **_kwargs: Any)\
        -> model.AdministrativeInformation:
171
    return model.AdministrativeInformation(
172
173
        _get_text_or_none(element.find(NS_AAS + "version")),
        _get_text_or_none(element.find(NS_AAS + "revision"))
174
175
176
    )


177
def _construct_lang_string_set(element: ElTree.Element, _failsafe: bool, **_kwargs: Any) -> model.LangStringSet:
178
    lss: model.LangStringSet = {}
179
    for lang_string in element.findall(NS_IEC + "langString"):
180
        lss[_get_attrib_mandatory(lang_string, "lang")] = _get_text_mandatory(lang_string)
181
182
183
    return lss


184
def _construct_qualifier(element: ElTree.Element, failsafe: bool, **_kwargs: Any) -> model.Qualifier:
185
    q = model.Qualifier(
186
187
188
        _get_text_mandatory(_get_child_mandatory(element, NS_AAS + "type")),
        _get_text_mandatory_mapped(_get_child_mandatory(element, NS_AAS + "valueType"),
                                   model.datatypes.XSD_TYPE_CLASSES)
189
    )
190
191
192
193
    value = element.find(NS_AAS + "value")
    if value is not None:
        q.value = value.text
    value_id = _failsafe_construct(element.find(NS_AAS + "valueId"), _construct_reference, failsafe)
194
195
    if value_id is not None:
        q.value_id = value_id
196
    _amend_abstract_attributes(q, element, failsafe)
197
198
199
    return q


200
def _construct_formula(element: ElTree.Element, failsafe: bool, **_kwargs: Any) -> model.Formula:
201
    ref_set: Set[model.Reference] = set()
202
203
204
205
    depends_on_refs = element.find(NS_AAS + "dependsOnRefs")
    if depends_on_refs is not None:
        ref_set = set(_failsafe_construct_multiple(depends_on_refs.findall(NS_AAS + "reference"), _construct_reference,
                                                   failsafe))
206
    return model.Formula(ref_set)
207
208


209
def _construct_constraint(element: ElTree.Element, failsafe: bool, **_kwargs: Any) -> model.Constraint:
210
    return {
211
212
        NS_AAS + "qualifier": _construct_qualifier,
        NS_AAS + "formula": _construct_formula
213
214
215
    }[element.tag](element, failsafe)


216
def _construct_identifier(element: ElTree.Element, _failsafe: bool, **_kwargs: Any) -> model.Identifier:
217
218
    return model.Identifier(
        _get_text_mandatory(element),
219
        _get_attrib_mandatory_mapped(element, "idType", IDENTIFIER_TYPES_INVERSE)
220
221
222
    )


223
def _construct_security(_element: ElTree.Element, _failsafe: bool, **_kwargs: Any) -> model.Security:
224
225
226
    return model.Security()


227
def _construct_view(element: ElTree.Element, failsafe: bool, **_kwargs: Any) -> model.View:
228
    view = model.View(_get_text_mandatory(_get_child_mandatory(element, NS_AAS + "idShort")))
229
230
231
    contained_elements = element.find(NS_AAS + "containedElements")
    if contained_elements is not None:
        view.contained_element = set(
232
233
            _failsafe_construct_multiple(contained_elements.findall(NS_AAS + "containedElementRef"),
                                         _construct_referable_reference, failsafe)
234
235
236
237
238
        )
    _amend_abstract_attributes(view, element, failsafe)
    return view


239
def _construct_concept_dictionary(element: ElTree.Element, failsafe: bool, **_kwargs: Any) -> model.ConceptDictionary:
240
    cd = model.ConceptDictionary(_get_text_mandatory(_get_child_mandatory(element, NS_AAS + "idShort")))
241
242
    concept_description = element.find(NS_AAS + "conceptDescriptionRefs")
    if concept_description is not None:
243
        cd.concept_description = set(_failsafe_construct_multiple(
244
            concept_description.findall(NS_AAS + "conceptDescriptionRef"),
245
            _construct_concept_description_reference,
246
247
248
249
250
251
            failsafe
        ))
    _amend_abstract_attributes(cd, element, failsafe)
    return cd


252
253
def _construct_asset_administration_shell(element: ElTree.Element, failsafe: bool, **_kwargs: Any)\
        -> model.AssetAdministrationShell:
254
    aas = model.AssetAdministrationShell(
255
256
        _find_and_construct_mandatory(element, NS_AAS + "assetRef", _construct_asset_reference),
        _find_and_construct_mandatory(element, NS_AAS + "identification", _construct_identifier)
257
    )
258
259
260
    security = _failsafe_construct(element.find(NS_ABAC + "security"), _construct_security, failsafe)
    if security is not None:
        aas.security = security
261
262
    submodels = element.find(NS_AAS + "submodelRefs")
    if submodels is not None:
263
264
        aas.submodel = set(_failsafe_construct_multiple(submodels.findall(NS_AAS + "submodelRef"),
                                                        _construct_submodel_reference, failsafe))
265
266
    views = element.find(NS_AAS + "views")
    if views is not None:
267
        for view in _failsafe_construct_multiple(views.findall(NS_AAS + "view"), _construct_view, failsafe):
268
269
270
            aas.view.add(view)
    concept_dictionaries = element.find(NS_AAS + "conceptDictionaries")
    if concept_dictionaries is not None:
271
272
        for cd in _failsafe_construct_multiple(concept_dictionaries.findall(NS_AAS + "conceptDictionary"),
                                               _construct_concept_dictionary, failsafe):
273
            aas.concept_dictionary.add(cd)
274
275
    derived_from = _failsafe_construct(element.find(NS_AAS + "derivedFrom"),
                                       _construct_asset_administration_shell_reference, failsafe)
276
    if derived_from is not None:
277
        aas.derived_from = derived_from
278
279
    _amend_abstract_attributes(aas, element, failsafe)
    return aas
280
281


282
def _construct_asset(element: ElTree.Element, failsafe: bool, **_kwargs: Any) -> model.Asset:
283
    asset = model.Asset(
284
285
        _get_text_mandatory_mapped(_get_child_mandatory(element, NS_AAS + "kind"), ASSET_KIND_INVERSE),
        _find_and_construct_mandatory(element, NS_AAS + "identification", _construct_identifier)
286
    )
287
288
289
290
    asset.asset_identification_model = _failsafe_construct(element.find(NS_AAS + "assetIdentificationModelRef"),
                                                           _construct_submodel_reference, failsafe)
    asset.bill_of_material = _failsafe_construct(element.find(NS_AAS + "billOfMaterialRef"),
                                                 _construct_submodel_reference, failsafe)
291
292
    _amend_abstract_attributes(asset, element, failsafe)
    return asset
293
294


295
def _construct_submodel(element: ElTree.Element, failsafe: bool, **_kwargs: Any) -> model.Submodel:
296
297
298
    pass


299
def _construct_concept_description(element: ElTree.Element, failsafe: bool, **_kwargs: Any) -> model.ConceptDescription:
300
    cd = model.ConceptDescription(
301
        _find_and_construct_mandatory(element, NS_AAS + "identification", _construct_identifier)
302
    )
303
    is_case_of = set(_failsafe_construct_multiple(element.findall(NS_AAS + "isCaseOf"), _construct_reference, failsafe))
304
305
    if len(is_case_of) != 0:
        cd.is_case_of = is_case_of
306
    _amend_abstract_attributes(cd, element, failsafe)
307
    return cd
308
309


310
def _amend_abstract_attributes(obj: object, element: ElTree.Element, failsafe: bool, **_kwargs: Any) -> None:
311
    if isinstance(obj, model.Referable):
312
313
314
315
316
317
        category = _get_text_or_none(element.find(NS_AAS + "category"))
        if category is not None:
            obj.category = category
        description = _failsafe_construct(element.find(NS_AAS + "description"), _construct_lang_string_set, failsafe)
        if description is not None:
            obj.description = description
318
    if isinstance(obj, model.Identifiable):
319
320
321
322
323
        id_short = _get_text_or_none(element.find(NS_AAS + "idShort"))
        if id_short is not None:
            obj.id_short = id_short
        administration = _failsafe_construct(element.find(NS_AAS + "administration"),
                                             _construct_administrative_information, failsafe)
324
        if administration:
325
            obj.administration = administration
326
    if isinstance(obj, model.HasSemantics):
327
328
329
        semantic_id = _failsafe_construct(element.find(NS_AAS + "semanticId"), _construct_reference, failsafe)
        if semantic_id is not None:
            obj.semantic_id = semantic_id
330
    if isinstance(obj, model.Qualifiable):
331
332
333
        for constraint in _failsafe_construct_multiple(element.findall(NS_AAS + "qualifiers"), _construct_constraint,
                                                       failsafe):
            obj.qualifier.add(constraint)
334
335
336
337
338
339
340
341
342
343
344


def read_xml_aas_file(file: IO, failsafe: bool = True) -> model.DictObjectStore:
    """
    Read an Asset Administration Shell XML file according to 'Details of the Asset Administration Shell', chapter 5.4

    :param file: A file-like object to read the XML-serialized data from
    :param failsafe: If True, the file is parsed in a failsafe way: Instead of raising an Exception for missing
                     attributes and wrong types, errors are logged and defective objects are skipped
    :return: A DictObjectStore containing all AAS objects from the XML file
    """
345

346
    element_constructors = {
347
348
349
350
        NS_AAS + "assetAdministrationShell": _construct_asset_administration_shell,
        NS_AAS + "asset": _construct_asset,
        NS_AAS + "submodel": _construct_submodel,
        NS_AAS + "conceptDescription": _construct_concept_description
351
352
353
354
355
356
357
358
    }

    tree = ElTree.parse(file)
    root = tree.getroot()

    # Add AAS objects to ObjectStore
    ret: model.DictObjectStore[model.Identifiable] = model.DictObjectStore()
    for list_ in root:
359
360
361
362
        element_tag = list_.tag[:-1]
        if list_.tag[-1] != "s" or element_tag not in element_constructors.keys():
            raise TypeError(f"Unexpected list {list_.tag}")
        constructor = element_constructors[element_tag]
363
        for element in _failsafe_construct_multiple(list_.findall(element_tag), constructor, failsafe):
364
365
366
367
            # element is always Identifiable, because the tag is checked earlier
            # this is just to satisfy the type checker
            if isinstance(element, model.Identifiable):
                ret.add(element)
368
    return ret