xml_deserialization.py 15.7 KB
Newer Older
1
2
3
4
5
6
7
8
9
10
11
12
13
# Copyright 2019 PyI40AAS Contributors
#
# Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except in compliance with
# the License. You may obtain a copy of the License at
#
#     http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software distributed under the License is distributed on an
# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the License for the
# specific language governing permissions and limitations under the License.
"""
Module for deserializing Asset Administration Shell data from the official XML format
"""
14

15
# TODO: add constructor for submodel + all classes required by submodel
16
17
18

from ... import model
import xml.etree.ElementTree as ElTree
19
import logging
20

21
22
23
from typing import Any, Callable, Dict, IO, Iterable, Optional, Set, Tuple, Type, TypeVar
from .xml_serialization import NS_AAS, NS_AAS_COMMON, NS_ABAC, NS_IEC, NS_XSI
from .._generic import MODELING_KIND, ASSET_KIND, KEY_ELEMENTS,\
24
25
    KEY_TYPES, IDENTIFIER_TYPES, ENTITY_TYPES, IEC61360_DATA_TYPES, IEC61360_LEVEL_TYPES

26
27
logger = logging.getLogger(__name__)

28
29
30
31
32
33
34
35
36
37
38
MODELING_KIND_INVERSE: Dict[str, model.ModelingKind] = {v: k for k, v in MODELING_KIND.items()}
ASSET_KIND_INVERSE: Dict[str, model.AssetKind] = {v: k for k, v in ASSET_KIND.items()}
KEY_ELEMENTS_INVERSE: Dict[str, model.KeyElements] = {v: k for k, v in KEY_ELEMENTS.items()}
KEY_TYPES_INVERSE: Dict[str, model.KeyType] = {v: k for k, v in KEY_TYPES.items()}
IDENTIFIER_TYPES_INVERSE: Dict[str, model.IdentifierType] = {v: k for k, v in IDENTIFIER_TYPES.items()}
ENTITY_TYPES_INVERSE: Dict[str, model.EntityType] = {v: k for k, v in ENTITY_TYPES.items()}
KEY_ELEMENTS_CLASSES_INVERSE: Dict[model.KeyElements, type] = {v: k for k, v in model.KEY_ELEMENTS_CLASSES.items()}
IEC61360_DATA_TYPES_INVERSE: Dict[str, model.concept.IEC61360DataType] = {v: k for k, v in IEC61360_DATA_TYPES.items()}
IEC61360_LEVEL_TYPES_INVERSE: Dict[str, model.concept.IEC61360LevelType] = \
    {v: k for k, v in IEC61360_LEVEL_TYPES.items()}

39
T = TypeVar('T')
40

41

42
def _unwrap(monad: Optional[T]) -> T:
43
44
    if monad is not None:
        return monad
45
46
47
48
49
    raise TypeError(f"Unwrap failed for value {monad}!")


def _constructor_name_to_typename(constructor: Callable[[ElTree.Element, bool], T]) -> str:
    return "".join([s[0].upper() + s[1:] for s in constructor.__name__.split("_")[2:]])
50
51


52
53
54
55
56
def _get_text_or_none(element: Optional[ElTree.Element]) -> Optional[str]:
    return element.text if element is not None else None


def _get_text_mandatory(element: Optional[ElTree.Element]) -> str:
57
    # unwrap value here so a TypeError is thrown if the element is None
58
59
    element_unwrapped = _unwrap(element)
    text = _get_text_or_none(element_unwrapped)
60
    if text is None:
61
        raise TypeError(f"XML element {element_unwrapped.tag} has no text!")
62
63
64
    return text


65
66
def _objects_from_xml_elements(elements: Iterable[ElTree.Element], constructor: Callable[..., T],
                               failsafe: bool, **kwargs: Any) -> Iterable[T]:
67
    for element in elements:
68
        parsed = _object_from_xml_element(element, constructor, failsafe, **kwargs)
69
70
        if parsed is not None:
            yield parsed
71
72


73
74
def _object_from_xml_element(element: Optional[ElTree.Element], constructor: Callable[..., T],
                             failsafe: bool, **kwargs: Any) -> Optional[T]:
75
76
    if element is None:
        return None
77
    try:
78
        return constructor(element, failsafe, **kwargs)
79
80
81
82
83
84
85
    except (KeyError, TypeError) as e:
        error_message = f"{type(e).__name__} while converting XML element with tag {element.tag} to " \
                        f"type {_constructor_name_to_typename(constructor)}: {e}"
        if not failsafe:
            raise type(e)(error_message) from e
        logger.error(error_message)
        return None
86

87
88

def _object_from_xml_element_mandatory(parent: ElTree.Element, tag: str,
89
                                       constructor: Callable[..., T], **kwargs: Any) -> T:
90
91
    element = parent.find(tag)
    if element is None:
92
        raise KeyError(f"No such element {tag} found in {parent.tag}!")
93
    return _unwrap(_object_from_xml_element(element, constructor, False, **kwargs))
94
95


96
def _construct_key(element: ElTree.Element, _failsafe: bool, **_kwargs: Any) -> model.Key:
97
98
99
    return model.Key(
        KEY_ELEMENTS_INVERSE[element.attrib["type"]],
        element.attrib["local"] == "True",
100
        _get_text_mandatory(element),
101
102
103
104
        KEY_TYPES_INVERSE[element.attrib["idType"]]
    )


105
def _construct_key_tuple(element: ElTree.Element, failsafe: bool, **_kwargs: Any) -> Tuple[model.Key, ...]:
106
107
108
109
    return tuple(_objects_from_xml_elements(_unwrap(element.find(NS_AAS + "keys")).findall(NS_AAS + "key"),
                                            _construct_key, failsafe))


110
def _construct_reference(element: ElTree.Element, failsafe: bool, **_kwargs: Any) -> model.Reference:
111
112
113
    return model.Reference(_construct_key_tuple(element, failsafe))


114
115
def _construct_submodel_reference(element: ElTree.Element, failsafe: bool, **_kwargs: Any)\
        -> model.AASReference[model.Submodel]:
116
117
118
    return model.AASReference(_construct_key_tuple(element, failsafe), model.Submodel)


119
120
def _construct_asset_reference(element: ElTree.Element, failsafe: bool, **_kwargs: Any)\
        -> model.AASReference[model.Asset]:
121
122
123
    return model.AASReference(_construct_key_tuple(element, failsafe), model.Asset)


124
def _construct_asset_administration_shell_reference(element: ElTree.Element, failsafe: bool, **_kwargs: Any)\
125
126
127
128
        -> model.AASReference[model.AssetAdministrationShell]:
    return model.AASReference(_construct_key_tuple(element, failsafe), model.AssetAdministrationShell)


129
130
def _construct_referable_reference(element: ElTree.Element, failsafe: bool, **_kwargs: Any)\
        -> model.AASReference[model.Referable]:
131
132
133
    return model.AASReference(_construct_key_tuple(element, failsafe), model.Referable)


134
def _construct_concept_description_reference(element: ElTree.Element, failsafe: bool, **_kwargs: Any)\
135
136
        -> model.AASReference[model.ConceptDescription]:
    return model.AASReference(_construct_key_tuple(element, failsafe), model.ConceptDescription)
137
138


139
140
def _construct_administrative_information(element: ElTree.Element, _failsafe: bool, **_kwargs: Any)\
        -> model.AdministrativeInformation:
141
    return model.AdministrativeInformation(
142
143
        _get_text_or_none(element.find(NS_AAS + "version")),
        _get_text_or_none(element.find(NS_AAS + "revision"))
144
145
146
    )


147
def _construct_lang_string_set(element: ElTree.Element, _failsafe: bool, **_kwargs: Any) -> model.LangStringSet:
148
    lss: model.LangStringSet = {}
149
150
    for lang_string in element.findall(NS_IEC + "langString"):
        lss[lang_string.attrib["lang"]] = _get_text_mandatory(lang_string)
151
152
153
    return lss


154
def _construct_qualifier(element: ElTree.Element, failsafe: bool, **_kwargs: Any) -> model.Qualifier:
155
    q = model.Qualifier(
156
        _get_text_mandatory(element.find(NS_AAS + "type")),
157
        model.datatypes.XSD_TYPE_CLASSES[_get_text_mandatory(element.find(NS_AAS + "valueType"))],
158
        _get_text_or_none(element.find(NS_AAS + "value"))
159
    )
160
161
162
    value_id = _object_from_xml_element(element.find(NS_AAS + "valueId"), _construct_reference, failsafe)
    if value_id is not None:
        q.value_id = value_id
163
    _amend_abstract_attributes(q, element, failsafe)
164
165
166
    return q


167
def _construct_formula(element: ElTree.Element, failsafe: bool, **_kwargs: Any) -> model.Formula:
168
169
    ref_set: Set[model.Reference] = set()
    for ref in element:
170
        obj = _object_from_xml_element(ref, _construct_reference, failsafe)
171
        if not obj:
172
            logger.warning(f"Skipping invalid XML element with tag {ref.tag}")
173
174
175
            continue
        ref_set.add(obj)
    return model.Formula(ref_set)
176
177


178
def _construct_constraint(element: ElTree.Element, failsafe: bool, **_kwargs: Any) -> model.Constraint:
179
    return {
180
181
        NS_AAS + "qualifier": _construct_qualifier,
        NS_AAS + "formula": _construct_formula
182
183
184
    }[element.tag](element, failsafe)


185
def _construct_identifier(element: ElTree.Element, _failsafe: bool, **_kwargs: Any) -> model.Identifier:
186
187
188
189
190
191
    return model.Identifier(
        _get_text_mandatory(element),
        IDENTIFIER_TYPES_INVERSE[element.attrib["idType"]]
    )


192
def _construct_security(_element: ElTree.Element, _failsafe: bool, **_kwargs: Any) -> model.Security:
193
194
195
    return model.Security()


196
def _construct_view(element: ElTree.Element, failsafe: bool, **_kwargs: Any) -> model.View:
197
198
199
200
201
    view = model.View(_get_text_mandatory(element.find(NS_AAS + "idShort")))
    contained_elements = element.find(NS_AAS + "containedElements")
    if contained_elements is not None:
        view.contained_element = set(
            _objects_from_xml_elements(contained_elements.findall(NS_AAS + "containedElementRef"),
202
                                       _construct_referable_reference, failsafe)
203
204
205
206
207
        )
    _amend_abstract_attributes(view, element, failsafe)
    return view


208
def _construct_concept_dictionary(element: ElTree.Element, failsafe: bool, **_kwargs: Any) -> model.ConceptDictionary:
209
210
211
212
213
    cd = model.ConceptDictionary(_get_text_mandatory(element.find(NS_AAS + "idShort")))
    concept_description = element.find(NS_AAS + "conceptDescriptionRefs")
    if concept_description is not None:
        cd.concept_description = set(_objects_from_xml_elements(
            concept_description.findall(NS_AAS + "conceptDescriptionRef"),
214
            _construct_concept_description_reference,
215
216
217
218
219
220
            failsafe
        ))
    _amend_abstract_attributes(cd, element, failsafe)
    return cd


221
222
def _construct_asset_administration_shell(element: ElTree.Element, failsafe: bool, **_kwargs: Any)\
        -> model.AssetAdministrationShell:
223
224
225
226
    aas = model.AssetAdministrationShell(
        _object_from_xml_element_mandatory(element, NS_AAS + "assetRef", _construct_asset_reference),
        _object_from_xml_element_mandatory(element, NS_AAS + "identification", _construct_identifier)
    )
227
    aas.security = _object_from_xml_element(element.find(NS_ABAC + "security"), _construct_security, failsafe)
228
229
    submodels = element.find(NS_AAS + "submodelRefs")
    if submodels is not None:
230
        aas.submodel = set(_objects_from_xml_elements(submodels.findall(NS_AAS + "submodelRef"),
231
                                                      _construct_submodel_reference, failsafe))
232
233
234
235
236
237
238
239
240
241
242
    views = element.find(NS_AAS + "views")
    if views is not None:
        for view in _objects_from_xml_elements(views.findall(NS_AAS + "view"), _construct_view, failsafe):
            aas.view.add(view)
    concept_dictionaries = element.find(NS_AAS + "conceptDictionaries")
    if concept_dictionaries is not None:
        for cd in _objects_from_xml_elements(concept_dictionaries.findall(NS_AAS + "conceptDictionary"),
                                             _construct_concept_dictionary, failsafe):
            aas.concept_dictionary.add(cd)
    derived_from = element.find(NS_AAS + "derivedFrom")
    if derived_from is not None:
243
        aas.derived_from = _object_from_xml_element(element, _construct_asset_administration_shell_reference, failsafe)
244
245
    _amend_abstract_attributes(aas, element, failsafe)
    return aas
246
247


248
def _construct_asset(element: ElTree.Element, failsafe: bool, **_kwargs: Any) -> model.Asset:
249
250
251
252
253
254
255
256
257
258
    asset = model.Asset(
        ASSET_KIND_INVERSE[_get_text_mandatory(element.find(NS_AAS + "kind"))],
        _object_from_xml_element_mandatory(element, NS_AAS + "identification", _construct_identifier)
    )
    asset.asset_identification_model = _object_from_xml_element(element.find(NS_AAS + "assetIdentificationModelRef"),
                                                                _construct_submodel_reference, failsafe)
    asset.bill_of_material = _object_from_xml_element(element.find(NS_AAS + "billOfMaterialRef"),
                                                      _construct_submodel_reference, failsafe)
    _amend_abstract_attributes(asset, element, failsafe)
    return asset
259
260


261
def _construct_submodel(element: ElTree.Element, failsafe: bool, **_kwargs: Any) -> model.Submodel:
262
263
264
    pass


265
def _construct_concept_description(element: ElTree.Element, failsafe: bool, **_kwargs: Any) -> model.ConceptDescription:
266
    cd = model.ConceptDescription(
267
        _object_from_xml_element_mandatory(element, NS_AAS + "identification", _construct_identifier)
268
    )
269
270
271
    is_case_of = set(_objects_from_xml_elements(element.findall(NS_AAS + "isCaseOf"), _construct_reference, failsafe))
    if len(is_case_of) != 0:
        cd.is_case_of = is_case_of
272
    _amend_abstract_attributes(cd, element, failsafe)
273
    return cd
274
275


276
def _amend_abstract_attributes(obj: object, element: ElTree.Element, failsafe: bool, **_kwargs: Any) -> None:
277
    if isinstance(obj, model.Referable):
278
279
280
        category = element.find(NS_AAS + "category")
        if category:
            obj.category = _get_text_or_none(category)
281
282
        description = element.find(NS_AAS + "description")
        if description:
283
            obj.description = _object_from_xml_element(description, _construct_lang_string_set, failsafe)
284
    if isinstance(obj, model.Identifiable):
285
286
287
        id_short = element.find(NS_AAS + "idShort")
        if id_short:
            obj.id_short = _get_text_or_none(id_short)
288
289
        administration = element.find(NS_AAS + "administration")
        if administration:
290
            obj.administration = _object_from_xml_element(administration, _construct_administrative_information,
291
                                                          failsafe)
292
    if isinstance(obj, model.HasSemantics):
293
294
        semantic_id = element.find(NS_AAS + "semanticId")
        if semantic_id:
295
            obj.semantic_id = _object_from_xml_element(semantic_id, _construct_reference, failsafe)
296
    if isinstance(obj, model.Qualifiable):
297
298
        for constraint in element:
            if constraint.tag != NS_AAS + "qualifiers":
299
                logger.warning(f"Skipping XML element with invalid tag {constraint.tag}")
300
                continue
301
            constraint_obj = _object_from_xml_element(constraint, _construct_constraint, failsafe)
302
            if not constraint_obj:
303
                logger.warning(f"Skipping invalid XML element with tag {constraint.tag}")
304
305
                continue
            obj.qualifier.add(constraint_obj)
306
307
308
309
310
311
312
313
314
315
316


def read_xml_aas_file(file: IO, failsafe: bool = True) -> model.DictObjectStore:
    """
    Read an Asset Administration Shell XML file according to 'Details of the Asset Administration Shell', chapter 5.4

    :param file: A file-like object to read the XML-serialized data from
    :param failsafe: If True, the file is parsed in a failsafe way: Instead of raising an Exception for missing
                     attributes and wrong types, errors are logged and defective objects are skipped
    :return: A DictObjectStore containing all AAS objects from the XML file
    """
317

318
    element_constructors = {
319
320
321
322
        NS_AAS + "assetAdministrationShell": _construct_asset_administration_shell,
        NS_AAS + "asset": _construct_asset,
        NS_AAS + "submodel": _construct_submodel,
        NS_AAS + "conceptDescription": _construct_concept_description
323
324
325
326
327
328
329
330
    }

    tree = ElTree.parse(file)
    root = tree.getroot()

    # Add AAS objects to ObjectStore
    ret: model.DictObjectStore[model.Identifiable] = model.DictObjectStore()
    for list_ in root:
331
332
333
334
335
336
337
338
339
        element_tag = list_.tag[:-1]
        if list_.tag[-1] != "s" or element_tag not in element_constructors.keys():
            raise TypeError(f"Unexpected list {list_.tag}")
        constructor = element_constructors[element_tag]
        for element in _objects_from_xml_elements(list_.findall(element_tag), constructor, failsafe):
            # element is always Identifiable, because the tag is checked earlier
            # this is just to satisfy the type checker
            if isinstance(element, model.Identifiable):
                ret.add(element)
340
    return ret