xml_deserialization.py 48.6 KB
Newer Older
1
# Copyright 2020 PyI40AAS Contributors
2
3
4
5
6
7
8
9
10
11
12
#
# Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except in compliance with
# the License. You may obtain a copy of the License at
#
#     http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software distributed under the License is distributed on an
# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the License for the
# specific language governing permissions and limitations under the License.
"""
Module for deserializing Asset Administration Shell data from the official XML format
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27

Use this module by calling read_xml_aas_file(file, failsafe).
The function returns a DictObjectStore containing all parsed elements.

Unlike the JSON deserialization, parsing is done top-down. Elements with a specific tag are searched on the level
directly below the level of the current xml element (in terms of parent and child relation) and parsed when
found. Constructor functions of these elements will then again search for mandatory and optional child elements
and construct them if available, and so on.

This module supports parsing in failsafe and non-failsafe mode.
In failsafe mode errors regarding missing attributes and elements or invalid values are caught and logged.
In non-failsafe mode any error would abort parsing.
Error handling is done only by _failsafe_construct() in this module. Nearly all constructor functions are called
by other constructor functions via _failsafe_construct(), so an error chain is constructed in the error case,
which allows printing stacktrace-like error messages like the following in the error case (in failsafe mode of course):
28

29
30
31
KeyError: aas:identification on line 252 has no attribute with name idType!
 -> Failed to convert aas:identification on line 252 to type Identifier!
 -> Failed to convert aas:conceptDescription on line 247 to type ConceptDescription!
32
"""
33
34

from ... import model
35
from lxml import etree  # type: ignore
36
import logging
37
import base64
38

39
from typing import Any, Callable, Dict, IO, Iterable, Optional, Tuple, Type, TypeVar
40
from mypy_extensions import TypedDict  # TODO: import this from typing should we require python 3.8+ at some point
41
from .xml_serialization import NS_AAS, NS_ABAC, NS_IEC
42
from .._generic import MODELING_KIND_INVERSE, ASSET_KIND_INVERSE, KEY_ELEMENTS_INVERSE, KEY_TYPES_INVERSE, \
43
44
    IDENTIFIER_TYPES_INVERSE, ENTITY_TYPES_INVERSE, IEC61360_DATA_TYPES_INVERSE, IEC61360_LEVEL_TYPES_INVERSE, \
    KEY_ELEMENTS_CLASSES_INVERSE
45

46
47
logger = logging.getLogger(__name__)

48
T = TypeVar("T")
49

50

51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
def _str_to_bool(string: str) -> bool:
    """
    XML only allows "false" and "true" (case-sensitive) as valid values for a boolean.

    This function checks the string and raises a ValueError if the string is neither "true" nor "false".

    :param string: String representation of a boolean. ("true" or "false")
    :return: The respective boolean value.
    :raises ValueError: If string is neither "true" nor "false".
    """
    if string not in ("true", "false"):
        raise ValueError(f"{string} is not a valid boolean! Only true and false are allowed.")
    return string == "true"


66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
def _tag_replace_namespace(tag: str, nsmap: Dict[str, str]) -> str:
    """
    Attempts to replace the namespace in front of a tag with the prefix used in the xml document.

    :param tag: The tag of an xml element.
    :param nsmap: A dict mapping prefixes to namespaces.
    :return: The modified element tag. If the namespace wasn't found in nsmap, the unmodified tag is returned.
    """
    split = tag.split("}")
    for prefix, namespace in nsmap.items():
        if namespace == split[0][1:]:
            return prefix + ":" + split[1]
    return tag


def _element_pretty_identifier(element: etree.Element) -> str:
    """
    Returns a pretty element identifier for a given XML element.

    If the prefix is known, the namespace in the element tag is replaced by the prefix.
    If additionally also the sourceline is known, is is added as a suffix to name.
    For example, instead of "{http://www.admin-shell.io/aas/2/0}assetAdministrationShell" this function would return
    "aas:assetAdministrationShell on line $line", if both, prefix and sourceline, are known.

    :param element: The xml element.
    :return: The pretty element identifier.
    """
    identifier = element.tag
    if element.prefix is not None:
        identifier = element.prefix + ":" + element.tag.split("}")[1]
    if element.sourceline is not None:
        identifier += f" on line {element.sourceline}"
    return identifier


def _constructor_name_to_typename(constructor: Callable[[etree.Element, bool], T]) -> str:
    """
    A helper function for converting the name of a constructor function to the respective type name.

    _construct_some_type -> SomeType

    :param constructor: The constructor function.
    :return: The name of the type the constructor function constructs.
    """
    return "".join([s[0].upper() + s[1:] for s in constructor.__name__.split("_")[2:]])


def _exception_to_str(exception: BaseException) -> str:
    """
    A helper function used to stringify exceptions.

    It removes the quotation marks '' that are put around str(KeyError), otherwise it's just calls str(exception).

    :param exception: The exception to stringify.
    :return: The stringified exception.
    """
    string = str(exception)
    return string[1:-1] if isinstance(exception, KeyError) else string


126
def _get_child_mandatory(parent: etree.Element, child_tag: str) -> etree.Element:
127
128
129
    """
    A helper function for getting a mandatory child element.

130
    :param parent: The parent element.
131
132
133
134
    :param child_tag: The tag of the child element to return.
    :return: The child element.
    :raises KeyError: If the parent element has no child element with the given tag.
    """
135
    child = parent.find(child_tag)
136
    if child is None:
137
138
        raise KeyError(_element_pretty_identifier(parent)
                       + f" has no child {_tag_replace_namespace(child_tag, parent.nsmap)}!")
139
    return child
140
141


142
def _get_all_children_expect_tag(parent: etree.Element, exppected_tag: str, failsafe: bool) -> Iterable[etree.Element]:
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
    """
    Iterates over all children, matching the tag.

    not failsafe: Throws an error if a child element doesn't match.
    failsafe: Logs a warning if a child element doesn't match.

    :param parent: The parent element.
    :param exppected_tag: The tag of the children.
    :return: An iterator over all child elements that match child_tag.
    :raises KeyError: If the tag of a child element doesn't match and failsafe is true.
    """
    for child in parent:
        if child.tag != exppected_tag:
            error_message = f"{_element_pretty_identifier(child)}, child of {_element_pretty_identifier(parent)}, " \
                            f"doesn't match the expected tag {_tag_replace_namespace(exppected_tag, child.nsmap)}!"
            if not failsafe:
                raise KeyError(error_message)
            logger.warning(error_message)
            continue
        yield child


165
def _get_attrib_mandatory(element: etree.Element, attrib: str) -> str:
166
167
168
169
170
171
172
173
    """
    A helper function for getting a mandatory attribute of an element.

    :param element: The xml element.
    :param attrib: The name of the attribute.
    :return: The value of the attribute.
    :raises KeyError: If the attribute does not exist.
    """
174
    if attrib not in element.attrib:
175
        raise KeyError(f"{_element_pretty_identifier(element)} has no attribute with name {attrib}!")
176
177
178
    return element.attrib[attrib]


179
def _get_attrib_mandatory_mapped(element: etree.Element, attrib: str, dct: Dict[str, T]) -> T:
180
181
182
183
184
185
186
187
188
189
190
191
192
    """
    A helper function for getting a mapped mandatory attribute of an xml element.

    It first gets the attribute value using _get_attrib_mandatory(), which raises a KeyError if the attribute
    does not exist.
    Then it returns dct[<attribute value>] and raises a ValueError, if the attribute value does not exist in the dict.

    :param element: The xml element.
    :param attrib: The name of the attribute.
    :param dct: The dictionary that is used to map the attribute value.
    :return: The mapped value of the attribute.
    :raises ValueError: If the value of the attribute does not exist in dct.
    """
193
194
    attrib_value = _get_attrib_mandatory(element, attrib)
    if attrib_value not in dct:
195
196
        raise ValueError(f"Attribute {attrib} of {_element_pretty_identifier(element)} "
                         f"has invalid value: {attrib_value}")
197
    return dct[attrib_value]
198
199


200
def _get_text_or_none(element: Optional[etree.Element]) -> Optional[str]:
201
202
203
204
205
206
207
208
209
210
211
212
    """
    A helper function for getting the text of an element, when it's not clear whether the element exists or not.

    This function is useful whenever the text of an optional child element is needed.
    Then the text can be get with: text = _get_text_or_none(element.find("childElement")
    element.find() returns either the element or None, if it doesn't exist. This is why this function accepts
    an optional element, to reduce the amount of code in the constructor functions below.

    :param element: The xml element or None.
    :return: The text of the xml element if the xml element is not None and if the xml element has a text.
             None otherwise.
    """
213
214
215
    return element.text if element is not None else None


216
217
218
219
220
221
222
223
224
225
226
227
228
229
def _get_text_mapped_or_none(element: Optional[etree.Element], dct: Dict[str, T]) -> Optional[T]:
    """
    Returns dct[element.text] or None, if the element is None, has no text or the text is not in dct.

    :param element: The xml element or None.
    :param dct: The dictionary that is used to map the text.
    :return: The mapped text or None.
    """
    text = _get_text_or_none(element)
    if text is None or text not in dct:
        return None
    return dct[text]


230
def _get_text_mandatory(element: etree.Element) -> str:
231
232
233
234
235
236
237
    """
    A helper function for getting the mandatory text of an element.

    :param element: The xml element.
    :return: The text of the xml element.
    :raises KeyError: If the xml element has no text.
    """
238
    text = element.text
239
    if text is None:
240
        raise KeyError(_element_pretty_identifier(element) + " has no text!")
241
242
243
    return text


244
def _get_text_mandatory_mapped(element: etree.Element, dct: Dict[str, T]) -> T:
245
246
247
248
249
250
251
252
253
254
255
256
    """
    A helper function for getting the mapped mandatory text of an element.

    It first gets the text of the element using _get_text_mandatory(),
    which raises a KeyError if the element has no text.
    Then it returns dct[<element text>] and raises a ValueError, if the text of the element does not exist in the dict.

    :param element: The xml element.
    :param dct: The dictionary that is used to map the text.
    :return: The mapped text of the element.
    :raises ValueError: If the text of the xml element does not exist in dct.
    """
257
258
    text = _get_text_mandatory(element)
    if text not in dct:
259
        raise ValueError(_element_pretty_identifier(element) + f" has invalid text: {text}")
260
261
262
    return dct[text]


263
def _failsafe_construct(element: Optional[etree.Element], constructor: Callable[..., T], failsafe: bool,
264
                        **kwargs: Any) -> Optional[T]:
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
    """
    A wrapper function that is used to handle exceptions raised in constructor functions.

    This is the only function of this module where exceptions are caught.
    This is why constructor functions should (in almost all cases) call other constructor functions using this function,
    so errors can be caught and logged in failsafe mode.
    The functions accepts None as a valid value for element for the same reason _get_text_or_none() does, so it can be
    called like _failsafe_construct(element.find("childElement"), ...), since element.find() can return None.
    This function will also return None in this case.

    :param element: The xml element or None.
    :param constructor: The constructor function to apply on the element.
    :param failsafe: Indicates whether errors should be caught or re-raised.
    :param kwargs: Optional keyword arguments that are passed to the constructor function.
    :return: The constructed class instance, if construction was successful.
             None if the element was None or if the construction failed.
    """
282
283
    if element is None:
        return None
284
    try:
285
        return constructor(element, failsafe, **kwargs)
286
    except (KeyError, ValueError) as e:
287
        type_name = _constructor_name_to_typename(constructor)
288
        error_message = f"Failed to create {type_name} from {_element_pretty_identifier(element)}!"
289
290
        if not failsafe:
            raise type(e)(error_message) from e
291
292
293
294
295
296
        error_type = type(e).__name__
        cause: Optional[BaseException] = e
        while cause is not None:
            error_message = _exception_to_str(cause) + "\n -> " + error_message
            cause = cause.__cause__
        logger.error(error_type + ": " + error_message)
297
        return None
298

299

300
def _failsafe_construct_mandatory(element: etree.Element, constructor: Callable[..., T], **kwargs: Any) -> T:
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
    """
    _failsafe_construct() but not failsafe and it returns T instead of Optional[T]

    :param element: The xml element.
    :param constructor: The constructor function to apply on the xml element.
    :param kwargs: Optional keyword arguments that are passed to the constructor function.
    :return: The constructed child element.
    :raises TypeError: If the result of _failsafe_construct() in non-failsafe mode was None.
                       This shouldn't be possible and if it happens, indicates a bug in _failsafe_construct().
    """
    constructed = _failsafe_construct(element, constructor, False, **kwargs)
    if constructed is None:
        raise TypeError("The result of a non-failsafe _failsafe_construct() call was None! "
                        "This is a bug in the pyAAS XML deserialization, please report it!")
    return constructed


318
def _failsafe_construct_multiple(elements: Iterable[etree.Element], constructor: Callable[..., T], failsafe: bool,
319
                                 **kwargs: Any) -> Iterable[T]:
320
321
322
323
324
325
326
327
328
    """
    A generator function that applies _failsafe_construct() to multiple elements.

    :param elements: Any iterable containing any number of xml elements.
    :param constructor: The constructor function to apply on the xml elements.
    :param failsafe: Indicates whether errors should be caught or re-raised.
    :param kwargs: Optional keyword arguments that are passed to the constructor function.
    :return: An iterator over the successfully constructed elements.
             If an error occurred while constructing an element and while in failsafe mode,
329
             the respective element will be skipped.
330
    """
331
332
333
334
335
336
    for element in elements:
        parsed = _failsafe_construct(element, constructor, failsafe, **kwargs)
        if parsed is not None:
            yield parsed


337
def _child_construct_mandatory(parent: etree.Element, child_tag: str, constructor: Callable[..., T], **kwargs: Any) \
338
        -> T:
339
    """
340
    Shorthand for _failsafe_construct_mandatory() in combination with _get_child_mandatory().
341

342
343
344
    :param parent: The xml element where the child element is searched.
    :param child_tag: The tag of the child element to construct.
    :param constructor: The constructor function for the child element.
345
346
347
    :param kwargs: Optional keyword arguments that are passed to the constructor function.
    :return: The constructed child element.
    """
348
349
350
    return _failsafe_construct_mandatory(_get_child_mandatory(parent, child_tag), constructor, **kwargs)


351
352
353
354
355
356
357
358
359
360
361
362
363
def _child_construct_multiple(parent: etree.Element, expected_tag: str, constructor: Callable[..., T], failsafe: bool,
                              **kwargs: Any) -> Iterable[T]:
    """
    Shorthand for _failsafe_construct_multiple() in combination with _get_child_multiple().

    :param parent: The xml element where child elements are searched.
    :param expected_tag: The expected tag of the child elements.
    :param constructor: The constructor function for the child element.
    :param kwargs: Optional keyword arguments that are passed to the constructor function.
    :return: An iterator over successfully constructed child elements.
             If an error occurred while constructing an element and while in failsafe mode,
             the respective element will be skipped.
    """
364
365
    return _failsafe_construct_multiple(_get_all_children_expect_tag(parent, expected_tag, failsafe), constructor,
                                        failsafe, **kwargs)
366
367


368
def _child_text_mandatory(parent: etree.Element, child_tag: str) -> str:
369
370
371
372
373
374
375
376
377
378
    """
    Shorthand for _get_text_mandatory() in combination with _get_child_mandatory().

    :param parent: The xml element where the child element is searched.
    :param child_tag: The tag of the child element to get the text from.
    :return: The text of the child element.
    """
    return _get_text_mandatory(_get_child_mandatory(parent, child_tag))


379
def _child_text_mandatory_mapped(parent: etree.Element, child_tag: str, dct: Dict[str, T]) -> T:
380
381
382
383
384
385
386
387
388
    """
    Shorthand for _get_text_mandatory_mapped() in combination with _get_child_mandatory().

    :param parent: The xml element where the child element is searched.
    :param child_tag: The tag of the child element to get the text from.
    :param dct: The dictionary that is used to map the text of the child element.
    :return: The mapped text of the child element.
    """
    return _get_text_mandatory_mapped(_get_child_mandatory(parent, child_tag), dct)
389
390


391
def _amend_abstract_attributes(obj: object, element: etree.Element, failsafe: bool) -> None:
392
393
394
395
396
397
398
399
400
401
402
403
404
405
406
407
408
409
410
411
412
413
414
415
416
417
418
419
420
    """
    A helper function that amends optional attributes to already constructed class instances, if they inherit
    from an abstract class like Referable, Identifiable, HasSemantics or Qualifiable.

    :param obj: The constructed class instance.
    :param element: The respective xml element.
    :param failsafe: Indicates whether errors should be caught or re-raised.
    :return: None
    """
    if isinstance(obj, model.Referable):
        category = _get_text_or_none(element.find(NS_AAS + "category"))
        if category is not None:
            obj.category = category
        description = _failsafe_construct(element.find(NS_AAS + "description"), _construct_lang_string_set, failsafe)
        if description is not None:
            obj.description = description
    if isinstance(obj, model.Identifiable):
        id_short = _get_text_or_none(element.find(NS_AAS + "idShort"))
        if id_short is not None:
            obj.id_short = id_short
        administration = _failsafe_construct(element.find(NS_AAS + "administration"),
                                             _construct_administrative_information, failsafe)
        if administration:
            obj.administration = administration
    if isinstance(obj, model.HasSemantics):
        semantic_id = _failsafe_construct(element.find(NS_AAS + "semanticId"), _construct_reference, failsafe)
        if semantic_id is not None:
            obj.semantic_id = semantic_id
    if isinstance(obj, model.Qualifiable):
421
422
        qualifiers = element.find(NS_AAS + "qualifiers")
        if qualifiers is not None:
423
424
            for constraint in _failsafe_construct_multiple(qualifiers, _construct_constraint, failsafe):
                obj.qualifier.add(constraint)
425
426
427
428
429
430


class ModelingKindKwArg(TypedDict, total=False):
    kind: model.ModelingKind


431
def _get_modeling_kind_kwarg(element: etree.Element) -> ModelingKindKwArg:
432
433
434
435
436
437
438
439
440
441
442
443
444
445
446
    """
    A helper function that creates a dict containing the modeling kind or nothing for a given xml element.

    Since the modeling kind can only be set in the __init__ method of a class that inherits from model.HasKind,
    the dict returned by this function can be passed directly to the classes __init__ method.
    An alternative to this function would be returning the modeling kind directly and falling back to the default
    value if no "kind" xml element is present, but in this case the default value would have to be defined here as well.
    In my opinion defining what the default value is, should be the task of the __init__ method, not the task of any
    function in the deserialization.

    :param element: The xml element.
    :return: A dict containing {"kind": <the parsed modeling kind>}, if a kind element was found.
             An empty dict if not.
    """
    kwargs: ModelingKindKwArg = ModelingKindKwArg()
447
    kind = _get_text_mapped_or_none(element.find(NS_AAS + "kind"), MODELING_KIND_INVERSE)
448
    if kind is not None:
449
        kwargs["kind"] = kind
450
    return kwargs
451
452


453
def _construct_key(element: etree.Element, _failsafe: bool, **_kwargs: Any) -> model.Key:
454
    return model.Key(
455
        _get_attrib_mandatory_mapped(element, "type", KEY_ELEMENTS_INVERSE),
456
        _str_to_bool(_get_attrib_mandatory(element, "local")),
457
        _get_text_mandatory(element),
458
        _get_attrib_mandatory_mapped(element, "idType", KEY_TYPES_INVERSE)
459
460
461
    )


462
463
464
465
def _construct_key_tuple(element: etree.Element, failsafe: bool, namespace: str = NS_AAS, **_kwargs: Any)\
        -> Tuple[model.Key, ...]:
    keys = _get_child_mandatory(element, namespace + "keys")
    return tuple(_child_construct_multiple(keys, namespace + "key", _construct_key, failsafe))
466
467


468
469
470
def _construct_reference(element: etree.Element, failsafe: bool, namespace: str = NS_AAS, **_kwargs: Any) \
        -> model.Reference:
    return model.Reference(_construct_key_tuple(element, failsafe, namespace=namespace))
471
472


473
def _construct_aas_reference(element: etree.Element, failsafe: bool, type_: Type[model.base._RT], **_kwargs: Any) \
474
475
476
477
478
479
480
481
        -> model.AASReference[model.base._RT]:
    keys = _construct_key_tuple(element, failsafe)
    if len(keys) != 0 and not issubclass(KEY_ELEMENTS_CLASSES_INVERSE.get(keys[-1].type, type(None)), type_):
        logger.warning(f"Type {keys[-1].type.name} of last key of reference to {' / '.join(str(k) for k in keys)} "
                       f"does not match reference type {type_.__name__}")
    return model.AASReference(keys, type_)


482
def _construct_submodel_reference(element: etree.Element, failsafe: bool, **kwargs: Any) \
483
        -> model.AASReference[model.Submodel]:
484
    return _construct_aas_reference(element, failsafe, model.Submodel, **kwargs)
485
486


487
def _construct_asset_reference(element: etree.Element, failsafe: bool, **kwargs: Any) \
488
        -> model.AASReference[model.Asset]:
489
    return _construct_aas_reference(element, failsafe, model.Asset, **kwargs)
490
491


492
def _construct_asset_administration_shell_reference(element: etree.Element, failsafe: bool, **kwargs: Any) \
493
        -> model.AASReference[model.AssetAdministrationShell]:
494
    return _construct_aas_reference(element, failsafe, model.AssetAdministrationShell, **kwargs)
495
496


497
def _construct_referable_reference(element: etree.Element, failsafe: bool, **kwargs: Any) \
498
        -> model.AASReference[model.Referable]:
499
    return _construct_aas_reference(element, failsafe, model.Referable, **kwargs)
500
501


502
def _construct_concept_description_reference(element: etree.Element, failsafe: bool, **kwargs: Any) \
503
        -> model.AASReference[model.ConceptDescription]:
504
    return _construct_aas_reference(element, failsafe, model.ConceptDescription, **kwargs)
505
506


507
def _construct_data_element_reference(element: etree.Element, failsafe: bool, **kwargs: Any) \
508
509
510
511
        -> model.AASReference[model.DataElement]:
    return _construct_aas_reference(element, failsafe, model.DataElement, **kwargs)


512
def _construct_administrative_information(element: etree.Element, _failsafe: bool, **_kwargs: Any) \
513
        -> model.AdministrativeInformation:
514
    return model.AdministrativeInformation(
515
516
        _get_text_or_none(element.find(NS_AAS + "version")),
        _get_text_or_none(element.find(NS_AAS + "revision"))
517
518
519
    )


520
521
def _construct_lang_string_set(element: etree.Element, failsafe: bool, namespace: str = NS_AAS, **_kwargs: Any) \
        -> model.LangStringSet:
522
    lss: model.LangStringSet = {}
523
    for lang_string in _get_all_children_expect_tag(element, namespace + "langString", failsafe):
524
        lss[_get_attrib_mandatory(lang_string, "lang")] = _get_text_mandatory(lang_string)
525
526
527
    return lss


528
def _construct_qualifier(element: etree.Element, failsafe: bool, **_kwargs: Any) -> model.Qualifier:
529
    qualifier = model.Qualifier(
530
531
        _child_text_mandatory(element, NS_AAS + "type"),
        _child_text_mandatory_mapped(element, NS_AAS + "valueType", model.datatypes.XSD_TYPE_CLASSES)
532
    )
533
    value = _get_text_or_none(element.find(NS_AAS + "value"))
534
    if value is not None:
535
        qualifier.value = model.datatypes.from_xsd(value, qualifier.value_type)
536
    value_id = _failsafe_construct(element.find(NS_AAS + "valueId"), _construct_reference, failsafe)
537
    if value_id is not None:
538
539
540
        qualifier.value_id = value_id
    _amend_abstract_attributes(qualifier, element, failsafe)
    return qualifier
541
542


543
def _construct_formula(element: etree.Element, failsafe: bool, **_kwargs: Any) -> model.Formula:
544
    formula = model.Formula()
545
546
    depends_on_refs = element.find(NS_AAS + "dependsOnRefs")
    if depends_on_refs is not None:
547
548
549
550
        for ref in _failsafe_construct_multiple(depends_on_refs.findall(NS_AAS + "reference"), _construct_reference,
                                                failsafe):
            formula.depends_on.add(ref)
    return formula
551
552


553
def _construct_identifier(element: etree.Element, _failsafe: bool, **_kwargs: Any) -> model.Identifier:
554
555
    return model.Identifier(
        _get_text_mandatory(element),
556
        _get_attrib_mandatory_mapped(element, "idType", IDENTIFIER_TYPES_INVERSE)
557
558
559
    )


560
def _construct_security(_element: etree.Element, _failsafe: bool, **_kwargs: Any) -> model.Security:
561
562
563
    """
    TODO: this is just a stub implementation
    """
564
565
566
    return model.Security()


567
def _construct_view(element: etree.Element, failsafe: bool, **_kwargs: Any) -> model.View:
568
    view = model.View(_child_text_mandatory(element, NS_AAS + "idShort"))
569
570
    contained_elements = element.find(NS_AAS + "containedElements")
    if contained_elements is not None:
571
572
573
        for ref in _failsafe_construct_multiple(contained_elements.findall(NS_AAS + "containedElementRef"),
                                                _construct_referable_reference, failsafe):
            view.contained_element.add(ref)
574
575
576
577
    _amend_abstract_attributes(view, element, failsafe)
    return view


578
def _construct_concept_dictionary(element: etree.Element, failsafe: bool, **_kwargs: Any) -> model.ConceptDictionary:
579
    concept_dictionary = model.ConceptDictionary(_child_text_mandatory(element, NS_AAS + "idShort"))
580
581
    concept_description = element.find(NS_AAS + "conceptDescriptionRefs")
    if concept_description is not None:
582
583
584
        for ref in _failsafe_construct_multiple(concept_description.findall(NS_AAS + "conceptDescriptionRef"),
                                                _construct_concept_description_reference, failsafe):
            concept_dictionary.concept_description.add(ref)
585
586
587
588
    _amend_abstract_attributes(concept_dictionary, element, failsafe)
    return concept_dictionary


589
def _construct_submodel_element(element: etree.Element, failsafe: bool, **kwargs: Any) -> model.SubmodelElement:
590
591
592
593
594
595
596
597
598
599
600
601
602
603
604
605
    submodel_elements: Dict[str, Callable[..., model.SubmodelElement]] = {NS_AAS + k: v for k, v in {
        "annotatedRelationshipElement": _construct_annotated_relationship_element,
        "basicEvent": _construct_basic_event,
        "blob": _construct_blob,
        "capability": _construct_capability,
        "entity": _construct_entity,
        "file": _construct_file,
        "multiLanguageProperty": _construct_multi_language_property,
        "operation": _construct_operation,
        "property": _construct_property,
        "range": _construct_range,
        "referenceElement": _construct_reference_element,
        "relationshipElement": _construct_relationship_element,
        "submodelElementCollection": _construct_submodel_element_collection
    }.items()}
    if element.tag not in submodel_elements:
606
        raise KeyError(_element_pretty_identifier(element) + " is not a valid submodel element!")
607
608
609
    return submodel_elements[element.tag](element, failsafe, **kwargs)


610
def _construct_constraint(element: etree.Element, failsafe: bool, **kwargs: Any) -> model.Constraint:
611
612
613
614
615
    constraints: Dict[str, Callable[..., model.Constraint]] = {NS_AAS + k: v for k, v in {
        "formula": _construct_formula,
        "qualifier": _construct_qualifier
    }.items()}
    if element.tag not in constraints:
616
        raise KeyError(_element_pretty_identifier(element) + " is not a valid constraint!")
617
618
619
    return constraints[element.tag](element, failsafe, **kwargs)


620
def _construct_operation_variable(element: etree.Element, _failsafe: bool, **_kwargs: Any) -> model.OperationVariable:
621
622
    value = _get_child_mandatory(element, NS_AAS + "value")
    if len(value) == 0:
623
        raise KeyError(f"{_element_pretty_identifier(value)} has no submodel element!")
624
    if len(value) > 1:
625
626
        logger.warning(f"{_element_pretty_identifier(value)} has more than one submodel element,"
                       "using the first one...")
627
628
629
630
631
    return model.OperationVariable(
        _failsafe_construct_mandatory(value[0], _construct_submodel_element)
    )


632
def _construct_annotated_relationship_element(element: etree.Element, failsafe: bool, **_kwargs: Any) \
633
634
635
636
637
638
639
640
641
642
643
644
645
646
647
        -> model.AnnotatedRelationshipElement:
    annotated_relationship_element = model.AnnotatedRelationshipElement(
        _child_text_mandatory(element, NS_AAS + "idShort"),
        _child_construct_mandatory(element, NS_AAS + "first", _construct_referable_reference),
        _child_construct_mandatory(element, NS_AAS + "second", _construct_referable_reference),
        **_get_modeling_kind_kwarg(element)
    )
    annotations = _get_child_mandatory(element, NS_AAS + "annotations")
    for data_element_ref in _failsafe_construct_multiple(annotations.findall(NS_AAS + "reference"),
                                                         _construct_data_element_reference, failsafe):
        annotated_relationship_element.annotation.add(data_element_ref)
    _amend_abstract_attributes(annotated_relationship_element, element, failsafe)
    return annotated_relationship_element


648
def _construct_basic_event(element: etree.Element, failsafe: bool, **_kwargs: Any) -> model.BasicEvent:
649
650
651
652
653
654
655
656
657
    basic_event = model.BasicEvent(
        _child_text_mandatory(element, NS_AAS + "idShort"),
        _child_construct_mandatory(element, NS_AAS + "observed", _construct_referable_reference),
        **_get_modeling_kind_kwarg(element)
    )
    _amend_abstract_attributes(basic_event, element, failsafe)
    return basic_event


658
def _construct_blob(element: etree.Element, failsafe: bool, **_kwargs: Any) -> model.Blob:
659
660
661
662
663
    blob = model.Blob(
        _child_text_mandatory(element, NS_AAS + "idShort"),
        _child_text_mandatory(element, NS_AAS + "mimeType"),
        **_get_modeling_kind_kwarg(element)
    )
664
    value = _get_text_or_none(element.find(NS_AAS + "value"))
665
    if value is not None:
666
        blob.value = base64.b64decode(value)
667
668
669
670
    _amend_abstract_attributes(blob, element, failsafe)
    return blob


671
def _construct_capability(element: etree.Element, failsafe: bool, **_kwargs: Any) -> model.Capability:
672
673
674
675
676
677
678
679
    capability = model.Capability(
        _child_text_mandatory(element, NS_AAS + "idShort"),
        **_get_modeling_kind_kwarg(element)
    )
    _amend_abstract_attributes(capability, element, failsafe)
    return capability


680
def _construct_entity(element: etree.Element, failsafe: bool, **_kwargs: Any) -> model.Entity:
681
682
683
    entity = model.Entity(
        _child_text_mandatory(element, NS_AAS + "idShort"),
        _child_text_mandatory_mapped(element, NS_AAS + "entityType", ENTITY_TYPES_INVERSE),
684
685
        # pass the asset to the constructor, because self managed entities need asset references
        asset=_failsafe_construct(element.find(NS_AAS + "assetRef"), _construct_asset_reference, failsafe),
686
687
688
689
690
691
692
693
694
        **_get_modeling_kind_kwarg(element)
    )
    for stmt in _failsafe_construct_multiple(_get_child_mandatory(element, NS_AAS + "statements"),
                                             _construct_submodel_element, failsafe):
        entity.statement.add(stmt)
    _amend_abstract_attributes(entity, element, failsafe)
    return entity


695
def _construct_file(element: etree.Element, failsafe: bool, **_kwargs: Any) -> model.File:
696
697
    file = model.File(
        _child_text_mandatory(element, NS_AAS + "idShort"),
698
        _child_text_mandatory(element, NS_AAS + "mimeType"),
699
700
        **_get_modeling_kind_kwarg(element)
    )
701
    value = _get_text_or_none(element.find(NS_AAS + "value"))
702
    if value is not None:
703
        file.value = value
704
705
706
707
    _amend_abstract_attributes(file, element, failsafe)
    return file


708
def _construct_multi_language_property(element: etree.Element, failsafe: bool, **_kwargs: Any) \
709
710
711
712
713
714
715
716
717
718
719
720
721
722
723
        -> model.MultiLanguageProperty:
    multi_language_property = model.MultiLanguageProperty(
        _child_text_mandatory(element, NS_AAS + "idShort"),
        **_get_modeling_kind_kwarg(element)
    )
    value = _failsafe_construct(element.find(NS_AAS + "value"), _construct_lang_string_set, failsafe)
    if value is not None:
        multi_language_property.value = value
    value_id = _failsafe_construct(element.find(NS_AAS + "valueId"), _construct_reference, failsafe)
    if value_id is not None:
        multi_language_property.value_id = value_id
    _amend_abstract_attributes(multi_language_property, element, failsafe)
    return multi_language_property


724
def _construct_operation(element: etree.Element, failsafe: bool, **_kwargs: Any) -> model.Operation:
725
726
727
728
    operation = model.Operation(
        _child_text_mandatory(element, NS_AAS + "idShort"),
        **_get_modeling_kind_kwarg(element)
    )
729
730
    in_output_variable = element.find(NS_AAS + "inoutputVariable")
    if in_output_variable is not None:
731
732
        for var in _child_construct_multiple(in_output_variable, NS_AAS + "operationVariable",
                                             _construct_operation_variable, failsafe):
733
734
735
            operation.in_output_variable.append(var)
    input_variable = element.find(NS_AAS + "inputVariable")
    if input_variable is not None:
736
737
        for var in _child_construct_multiple(input_variable, NS_AAS + "operationVariable",
                                             _construct_operation_variable, failsafe):
738
739
740
            operation.input_variable.append(var)
    output_variable = element.find(NS_AAS + "outputVariable")
    if output_variable is not None:
741
742
        for var in _child_construct_multiple(output_variable, NS_AAS + "operationVariable",
                                             _construct_operation_variable, failsafe):
743
            operation.output_variable.append(var)
744
745
746
747
    _amend_abstract_attributes(operation, element, failsafe)
    return operation


748
def _construct_property(element: etree.Element, failsafe: bool, **_kwargs: Any) -> model.Property:
749
    property_ = model.Property(
750
751
752
753
754
755
        _child_text_mandatory(element, NS_AAS + "idShort"),
        value_type=_child_text_mandatory_mapped(element, NS_AAS + "valueType", model.datatypes.XSD_TYPE_CLASSES),
        **_get_modeling_kind_kwarg(element)
    )
    value = _get_text_or_none(element.find(NS_AAS + "value"))
    if value is not None:
756
        property_.value = model.datatypes.from_xsd(value, property_.value_type)
757
758
    value_id = _failsafe_construct(element.find(NS_AAS + "valueId"), _construct_reference, failsafe)
    if value_id is not None:
759
760
761
        property_.value_id = value_id
    _amend_abstract_attributes(property_, element, failsafe)
    return property_
762
763


764
def _construct_range(element: etree.Element, failsafe: bool, **_kwargs: Any) -> model.Range:
765
    range_ = model.Range(
766
767
768
769
        _child_text_mandatory(element, NS_AAS + "idShort"),
        value_type=_child_text_mandatory_mapped(element, NS_AAS + "valueType", model.datatypes.XSD_TYPE_CLASSES),
        **_get_modeling_kind_kwarg(element)
    )
770
771
772
773
774
775
776
777
    max_ = _get_text_or_none(element.find(NS_AAS + "max"))
    if max_ is not None:
        range_.max = model.datatypes.from_xsd(max_, range_.value_type)
    min_ = _get_text_or_none(element.find(NS_AAS + "min"))
    if min_ is not None:
        range_.min = model.datatypes.from_xsd(min_, range_.value_type)
    _amend_abstract_attributes(range_, element, failsafe)
    return range_
778
779


780
def _construct_reference_element(element: etree.Element, failsafe: bool, **_kwargs: Any) -> model.ReferenceElement:
781
782
783
784
785
786
787
788
789
    reference_element = model.ReferenceElement(
        _child_text_mandatory(element, NS_AAS + "idShort"),
        **_get_modeling_kind_kwarg(element)
    )
    value = _failsafe_construct(element.find(NS_AAS + "value"), _construct_referable_reference, failsafe)
    if value is not None:
        reference_element.value = value
    _amend_abstract_attributes(reference_element, element, failsafe)
    return reference_element
790
791


792
def _construct_relationship_element(element: etree.Element, failsafe: bool, **_kwargs: Any) \
793
        -> model.RelationshipElement:
794
795
796
797
798
799
800
801
    relationship_element = model.RelationshipElement(
        _child_text_mandatory(element, NS_AAS + "idShort"),
        _child_construct_mandatory(element, NS_AAS + "first", _construct_referable_reference),
        _child_construct_mandatory(element, NS_AAS + "second", _construct_referable_reference),
        **_get_modeling_kind_kwarg(element)
    )
    _amend_abstract_attributes(relationship_element, element, failsafe)
    return relationship_element
802
803


804
def _construct_submodel_element_collection(element: etree.Element, failsafe: bool, **_kwargs: Any) \
805
        -> model.SubmodelElementCollection:
806
    ordered = _str_to_bool(_child_text_mandatory(element, NS_AAS + "ordered"))
807
808
809
810
811
812
813
814
815
816
    collection_type = model.SubmodelElementCollectionOrdered if ordered else model.SubmodelElementCollectionUnordered
    collection = collection_type(
        _child_text_mandatory(element, NS_AAS + "idShort"),
        **_get_modeling_kind_kwarg(element)
    )
    value = _get_child_mandatory(element, NS_AAS + "value")
    for se in _failsafe_construct_multiple(value, _construct_submodel_element, failsafe):
        collection.value.add(se)
    _amend_abstract_attributes(collection, element, failsafe)
    return collection
817
818


819
def _construct_asset_administration_shell(element: etree.Element, failsafe: bool, **_kwargs: Any) \
820
        -> model.AssetAdministrationShell:
821
    aas = model.AssetAdministrationShell(
822
823
        _child_construct_mandatory(element, NS_AAS + "assetRef", _construct_asset_reference),
        _child_construct_mandatory(element, NS_AAS + "identification", _construct_identifier)
824
    )
825
826
827
    security = _failsafe_construct(element.find(NS_ABAC + "security"), _construct_security, failsafe)
    if security is not None:
        aas.security = security
828
829
    submodels = element.find(NS_AAS + "submodelRefs")
    if submodels is not None:
830
831
        for ref in _child_construct_multiple(submodels, NS_AAS + "submodelRef", _construct_submodel_reference,
                                             failsafe):
832
            aas.submodel.add(ref)
833
834
    views = element.find(NS_AAS + "views")
    if views is not None:
835
        for view in _child_construct_multiple(views, NS_AAS + "view", _construct_view, failsafe):
836
837
838
            aas.view.add(view)
    concept_dictionaries = element.find(NS_AAS + "conceptDictionaries")
    if concept_dictionaries is not None:
839
840
        for cd in _child_construct_multiple(concept_dictionaries, NS_AAS + "conceptDictionary",
                                            _construct_concept_dictionary, failsafe):
841
            aas.concept_dictionary.add(cd)
842
843
    derived_from = _failsafe_construct(element.find(NS_AAS + "derivedFrom"),
                                       _construct_asset_administration_shell_reference, failsafe)
844
    if derived_from is not None:
845
        aas.derived_from = derived_from
846
847
    _amend_abstract_attributes(aas, element, failsafe)
    return aas
848
849


850
def _construct_asset(element: etree.Element, failsafe: bool, **_kwargs: Any) -> model.Asset:
851
    asset = model.Asset(
852
853
        _child_text_mandatory_mapped(element, NS_AAS + "kind", ASSET_KIND_INVERSE),
        _child_construct_mandatory(element, NS_AAS + "identification", _construct_identifier)
854
    )
855
856
857
858
859
860
861
862
    asset_identification_model = _failsafe_construct(element.find(NS_AAS + "assetIdentificationModelRef"),
                                                     _construct_submodel_reference, failsafe)
    if asset_identification_model is not None:
        asset.asset_identification_model = asset_identification_model
    bill_of_material = _failsafe_construct(element.find(NS_AAS + "billOfMaterialRef"), _construct_submodel_reference,
                                           failsafe)
    if bill_of_material is not None:
        asset.bill_of_material = bill_of_material
863
864
    _amend_abstract_attributes(asset, element, failsafe)
    return asset
865
866


867
def _construct_submodel(element: etree.Element, failsafe: bool, **_kwargs: Any) -> model.Submodel:
868
    submodel = model.Submodel(
869
        _child_construct_mandatory(element, NS_AAS + "identification", _construct_identifier),
870
871
        **_get_modeling_kind_kwarg(element)
    )
872
873
874
875
    for submodel_element in _get_child_mandatory(element, NS_AAS + "submodelElements"):
        constructed = _failsafe_construct(submodel_element, _construct_submodel_element, failsafe)
        if constructed is not None:
            submodel.submodel_element.add(constructed)
876
877
    _amend_abstract_attributes(submodel, element, failsafe)
    return submodel
878
879


880
881
882
883
884
885
886
887
888
def _construct_value_reference_pair(element: etree.Element, _failsafe: bool,
                                    value_format: Optional[model.DataTypeDef] = None, **_kwargs: Any) \
        -> model.ValueReferencePair:
    if value_format is None:
        raise ValueError("No value format given!")
    return model.ValueReferencePair(
        value_format,
        model.datatypes.from_xsd(_child_text_mandatory(element, NS_IEC + "value"), value_format),
        _child_construct_mandatory(element, NS_IEC + "valueId", _construct_reference, namespace=NS_IEC)
889
    )
890
891
892
893
894
895
896
897
898
899
900
901
902
903
904
905
906
907
908
909
910
911
912
913
914
915
916
917
918
919
920
921
922
923
924
925
926
927
928
929
930
931
932
933
934
935
936
937
938
939
940
941
942
943
944
945
946
947
948
949
950
951
952
953
954
955
956
957
958
959
960
961
962
963
964
965
966
967
968
969
970
971
972
973
974
975
976
977
978
979
980
981


def _construct_value_list(element: etree.Element, failsafe: bool,
                          value_format: Optional[model.DataTypeDef] = None, **_kwargs: Any) \
        -> model.ValueList:
    return set(
        _child_construct_multiple(element, NS_IEC + "valueReferencePair", _construct_value_reference_pair, failsafe,
                                  value_format=value_format)
    )


def _construct_iec61360_concept_description(element: etree.Element, failsafe: bool,
                                            identifier: Optional[model.Identifier] = None, **_kwargs: Any) \
        -> model.IEC61360ConceptDescription:
    if identifier is None:
        raise ValueError("No identifier given!")
    cd = model.IEC61360ConceptDescription(
        identifier,
        _child_construct_mandatory(element, NS_IEC + "preferredName", _construct_lang_string_set, namespace=NS_IEC),
        _child_text_mandatory_mapped(element, NS_IEC + "dataType", IEC61360_DATA_TYPES_INVERSE)
    )
    definition = _failsafe_construct(element.find(NS_IEC + "definition"), _construct_lang_string_set, failsafe,
                                     namespace=NS_IEC)
    if definition is not None:
        cd.definition = definition
    short_name = _failsafe_construct(element.find(NS_IEC + "shortName"), _construct_lang_string_set, failsafe,
                                     namespace=NS_IEC)
    if short_name is not None:
        cd.short_name = short_name
    unit = _get_text_or_none(element.find(NS_IEC + "unit"))
    if unit is not None:
        cd.unit = unit
    unit_id = _failsafe_construct(element.find(NS_IEC + "unitId"), _construct_reference, failsafe, namespace=NS_IEC)
    if unit_id is not None:
        cd.unit_id = unit_id
    source_of_definition = _get_text_or_none(element.find(NS_IEC + "sourceOfDefinition"))
    if source_of_definition is not None:
        cd.source_of_definition = source_of_definition
    symbol = _get_text_or_none(element.find(NS_IEC + "symbol"))
    if symbol is not None:
        cd.symbol = symbol
    value_format = _get_text_mapped_or_none(element.find(NS_IEC + "valueFormat"),
                                            model.datatypes.XSD_TYPE_CLASSES)
    if value_format is not None:
        cd.value_format = value_format
    value_list = _failsafe_construct(element.find(NS_IEC + "valueList"), _construct_value_list, failsafe,
                                     value_format=value_format)
    if value_list is not None:
        cd.value_list = value_list
    value = _get_text_or_none(element.find(NS_IEC + "value"))
    if value is not None and value_format is not None:
        cd.value = model.datatypes.from_xsd(value, value_format)
    value_id = _failsafe_construct(element.find(NS_IEC + "valueId"), _construct_reference, failsafe, namespace=NS_IEC)
    if value_id is not None:
        cd.value_id = value_id
    for level_type_element in element.findall(NS_IEC + "levelType"):
        level_type = _get_text_mapped_or_none(level_type_element, IEC61360_LEVEL_TYPES_INVERSE)
        if level_type is None:
            error_message = f"{_element_pretty_identifier(level_type_element)} has invalid value: " \
                            + str(level_type_element.text)
            if not failsafe:
                raise ValueError(error_message)
            logger.warning(error_message)
            continue
        cd.level_types.add(level_type)
    return cd


def _construct_concept_description(element: etree.Element, failsafe: bool, **_kwargs: Any) -> model.ConceptDescription:
    """
    TODO: our model doesn't support more than one embeddedDataSpecification (yet)
    """
    cd: Optional[model.ConceptDescription] = None
    identifier = _child_construct_mandatory(element, NS_AAS + "identification", _construct_identifier)
    # Hack to detect IEC61360ConceptDescriptions, which are represented using dataSpecification according to DotAAS
    dspec_tag = NS_AAS + "embeddedDataSpecification"
    dspecs = element.findall(dspec_tag)
    if len(dspecs) > 1:
        logger.warning(f"{_element_pretty_identifier(element)} has more than one "
                       f"{_tag_replace_namespace(dspec_tag, element.nsmap)}. This model currently supports only one "
                       f"per {_tag_replace_namespace(element.tag, element.nsmap)}!")
    if len(dspecs) > 0:
        dspec = dspecs[0]
        dspec_content = dspec.find(NS_AAS + "dataSpecificationContent")
        if dspec_content is not None:
            dspec_ref = _failsafe_construct(dspec.find(NS_AAS + "dataSpecification"), _construct_reference, failsafe)
            if dspec_ref is not None and len(dspec_ref.key) > 0 and dspec_ref.key[0].value == \
                    "http://admin-shell.io/DataSpecificationTemplates/DataSpecificationIEC61360/2/0":
                cd = _failsafe_construct(dspec_content.find(NS_AAS + "dataSpecificationIEC61360"),
                                         _construct_iec61360_concept_description, failsafe, identifier=identifier)
    if cd is None:
        cd = model.ConceptDescription(identifier)
982
983
    for ref in _failsafe_construct_multiple(element.findall(NS_AAS + "isCaseOf"), _construct_reference, failsafe):
        cd.is_case_of.add(ref)
984
    _amend_abstract_attributes(cd, element, failsafe)
985
    return cd
986
987


988
def read_aas_xml_file(file: IO, failsafe: bool = True) -> model.DictObjectStore:
989
990
991
    """
    Read an Asset Administration Shell XML file according to 'Details of the Asset Administration Shell', chapter 5.4

992
    :param file: A filename or file-like object to read the XML-serialized data from
993
994
995
996
    :param failsafe: If True, the file is parsed in a failsafe way: Instead of raising an Exception for missing
                     attributes and wrong types, errors are logged and defective objects are skipped
    :return: A DictObjectStore containing all AAS objects from the XML file
    """
997

998
999
1000
    element_constructors = {NS_AAS + k: v for k, v in {
        "assetAdministrationShell": _construct_asset_administration_shell,
        "asset": _construct_asset,