Skip to content
Snippets Groups Projects
Commit c413da79 authored by Leon Mauritz Möller's avatar Leon Mauritz Möller Committed by Michael Thies
Browse files

adapter.http: add json/xml result serialization

adapter.http: add request body parsing
adapter.http: implement all remaining aas routes
parent 568d1ae6
No related branches found
No related tags found
No related merge requests found
......@@ -20,7 +20,7 @@ from aas import model
from aas.adapter.json import json_serialization
from aas.adapter.xml import xml_serialization
from typing import Dict, List, Optional, Type, Union
from typing import Dict, List, Sequence, Type, Union
@enum.unique
......@@ -33,12 +33,15 @@ class MessageType(enum.Enum):
FATAL = enum.auto()
EXCEPTION = enum.auto()
def __str__(self):
return self.name.capitalize()
class Message:
def __init__(self, code: str, text: str, message_type: MessageType = MessageType.UNSPECIFIED):
self.message_type = message_type
self.code = code
self.text = text
self.message_type = message_type
class Result:
......@@ -48,16 +51,23 @@ class Result:
self.messages = messages
ResponseData = Union[Result, model.Referable, List[model.Referable]]
# not all sequence types are json serializable, but Sequence is covariant,
# which is necessary for List[Submodel] or List[AssetAdministrationShell] to be valid for List[Referable]
ResponseData = Union[Result, model.Referable, Sequence[model.Referable]]
ResponseDataInternal = Union[Result, model.Referable, List[model.Referable]]
class APIResponse(abc.ABC, Response):
def __init__(self, data: ResponseData, *args, **kwargs):
super().__init__(*args, **kwargs)
# convert possible sequence types to List (see line 54-55)
if isinstance(data, Sequence):
data = list(data)
self.data = self.serialize(data)
@abc.abstractmethod
def serialize(self, data: ResponseData) -> str:
def serialize(self, data: ResponseDataInternal) -> str:
pass
......@@ -65,16 +75,17 @@ class JsonResponse(APIResponse):
def __init__(self, *args, content_type="application/json", **kwargs):
super().__init__(*args, **kwargs, content_type=content_type)
def serialize(self, data: ResponseData) -> str:
return json.dumps(data, cls=json_serialization.AASToJsonEncoder)
def serialize(self, data: ResponseDataInternal) -> str:
return json.dumps(data, cls=ResultToJsonEncoder if isinstance(data, Result)
else json_serialization.AASToJsonEncoder)
class XmlResponse(APIResponse):
def __init__(self, *args, content_type="application/xml", **kwargs):
super().__init__(*args, **kwargs, content_type=content_type)
def serialize(self, data: ResponseData) -> str:
return ""
def serialize(self, data: ResponseDataInternal) -> str:
return xml_element_to_str(response_data_to_xml(data))
class XmlResponseAlt(XmlResponse):
......@@ -89,18 +100,101 @@ def xml_element_to_str(element: etree.Element) -> str:
return etree.tostring(element, xml_declaration=True, encoding="utf-8")
class ResultToJsonEncoder(json.JSONEncoder):
def default(self, obj: object) -> object:
if isinstance(obj, Result):
return result_to_json(obj)
if isinstance(obj, Message):
return message_to_json(obj)
if isinstance(obj, MessageType):
return str(obj)
return super().default(obj)
def result_to_json(result: Result) -> Dict[str, object]:
return {
"success": result.success,
"isException": result.is_exception,
"messages": result.messages
}
def message_to_json(message: Message) -> Dict[str, object]:
return {
"messageType": message.message_type,
"code": message.code,
"text": message.text
}
def response_data_to_xml(data: ResponseDataInternal) -> etree.Element:
if isinstance(data, Result):
return result_to_xml(data)
if isinstance(data, model.Referable):
return referable_to_xml(data)
if isinstance(data, List):
elements: List[etree.Element] = [referable_to_xml(obj) for obj in data]
wrapper = etree.Element("list")
for elem in elements:
wrapper.append(elem)
return wrapper
def referable_to_xml(data: model.Referable) -> etree.Element:
# TODO: maybe support more referables
if isinstance(data, model.AssetAdministrationShell):
return xml_serialization.asset_administration_shell_to_xml(data)
if isinstance(data, model.Submodel):
return xml_serialization.submodel_to_xml(data)
if isinstance(data, model.SubmodelElement):
return xml_serialization.submodel_element_to_xml(data)
if isinstance(data, model.ConceptDictionary):
return xml_serialization.concept_dictionary_to_xml(data)
if isinstance(data, model.ConceptDescription):
return xml_serialization.concept_description_to_xml(data)
if isinstance(data, model.View):
return xml_serialization.view_to_xml(data)
if isinstance(data, model.Asset):
return xml_serialization.asset_to_xml(data)
raise TypeError(f"Referable {data} couldn't be serialized to xml (unsupported type)!")
def result_to_xml(result: Result) -> etree.Element:
result_elem = etree.Element("result")
success_elem = etree.Element("success")
success_elem.text = xml_serialization.boolean_to_xml(result.success)
is_exception_elem = etree.Element("isException")
is_exception_elem.text = xml_serialization.boolean_to_xml(result.is_exception)
messages_elem = etree.Element("messages")
for message in result.messages:
messages_elem.append(message_to_xml(message))
result_elem.append(success_elem)
result_elem.append(is_exception_elem)
result_elem.append(messages_elem)
return result_elem
def message_to_xml(message: Message) -> etree.Element:
message_elem = etree.Element("message")
message_type_elem = etree.Element("messageType")
message_type_elem.text = str(message.message_type)
code_elem = etree.Element("code")
code_elem.text = message.code
text_elem = etree.Element("text")
text_elem.text = message.text
message_elem.append(message_type_elem)
message_elem.append(code_elem)
message_elem.append(text_elem)
return message_elem
def get_response_type(request: Request) -> Type[APIResponse]:
response_types: Dict[str, Type[APIResponse]] = {
"application/json": JsonResponse,
"application/xml": XmlResponse,
"text/xml": XmlResponseAlt
}
accept_str: Optional[str] = request.headers.get("accept")
if accept_str is None:
# default to json in case unspecified
return JsonResponse
accept = werkzeug.http.parse_accept_header(accept_str, werkzeug.datastructures.MIMEAccept)
mime_type = accept.best_match(response_types)
mime_type = request.accept_mimetypes.best_match(response_types)
if mime_type is None:
raise werkzeug.exceptions.NotAcceptable(f"This server supports the following content types: "
+ ", ".join(response_types.keys()))
......@@ -113,4 +207,5 @@ def http_exception_to_response(exception: werkzeug.exceptions.HTTPException, res
message_type = MessageType.INFORMATION if success else MessageType.ERROR
message = Message(type(exception).__name__, exception.description if exception.description is not None else "",
message_type)
return response_type(Result(success, not success, [message]))
return response_type(Result(success, not success, [message]), status=exception.code,
headers=exception.get_headers())
......@@ -10,18 +10,66 @@
# specific language governing permissions and limitations under the License.
import io
import json
from lxml import etree # type: ignore
import urllib.parse
import werkzeug
from werkzeug.exceptions import BadRequest, NotAcceptable, NotFound, NotImplemented
from werkzeug.exceptions import BadRequest, InternalServerError, NotFound, NotImplemented
from werkzeug.routing import Rule, Submount
from werkzeug.wrappers import Request
from werkzeug.wrappers import Request, Response
from aas import model
from .response import APIResponse, get_response_type, http_exception_to_response
from ..xml import xml_deserialization
from ..json import json_deserialization
from .._generic import IDENTIFIER_TYPES, IDENTIFIER_TYPES_INVERSE
from .response import get_response_type, http_exception_to_response
from typing import Dict, Optional, Type
def parse_request_body(request: Request, expect_type: Type[model.base._RT]) -> model.base._RT:
"""
TODO: werkzeug documentation recommends checking the content length before retrieving the body to prevent
running out of memory. but it doesn't state how to check the content length
also: what would be a reasonable maximum content length? the request body isn't limited by the xml/json schema
"""
xml_constructors = {
model.Submodel: xml_deserialization._construct_submodel,
model.View: xml_deserialization._construct_view,
model.ConceptDictionary: xml_deserialization._construct_concept_dictionary,
model.ConceptDescription: xml_deserialization._construct_concept_description,
model.SubmodelElement: xml_deserialization._construct_submodel_element
}
valid_content_types = ("application/json", "application/xml", "text/xml")
if request.mimetype not in valid_content_types:
raise werkzeug.exceptions.UnsupportedMediaType(f"Invalid content-type: {request.mimetype}! Supported types: "
+ ", ".join(valid_content_types))
if request.mimetype == "application/json":
json_data = request.get_data()
try:
rv = json.loads(json_data, cls=json_deserialization.AASFromJsonDecoder)
except json.decoder.JSONDecodeError as e:
raise BadRequest(str(e)) from e
else:
parser = etree.XMLParser(remove_blank_text=True, remove_comments=True)
xml_data = io.BytesIO(request.get_data())
try:
tree = etree.parse(xml_data, parser)
except etree.XMLSyntaxError as e:
raise BadRequest(str(e)) from e
# TODO: check tag of root element
root = tree.getroot()
try:
rv = xml_constructors[expect_type](root, failsafe=False)
except (KeyError, ValueError) as e:
raise BadRequest(xml_deserialization._exception_to_str(e)) from e
from typing import Dict, Iterable, Optional, Type
assert(isinstance(rv, expect_type))
return rv
def identifier_uri_encode(id_: model.Identifier) -> str:
......@@ -59,10 +107,33 @@ class WSGIApp:
self.url_map = werkzeug.routing.Map([
Submount("/api/v1.0", [
Submount("/shells/<identifier:aas_id>", [
Rule("/aas", methods=["GET"], endpoint=self.get_aas)
Rule("/aas", methods=["GET"], endpoint=self.get_aas),
Submount("/aas", [
Rule("/asset", methods=["GET"], endpoint=self.get_aas_asset),
Rule("/submodels", methods=["GET"], endpoint=self.get_aas_submodels),
Rule("/submodels", methods=["PUT"], endpoint=self.put_aas_submodels),
Rule("/views", methods=["GET"], endpoint=self.get_aas_views),
Rule("/views/<string(minlength=1):id_short>", methods=["GET"],
endpoint=self.get_aas_views_specific),
Rule("/views/<string(minlength=1):id_short>", methods=["DELETE"],
endpoint=self.delete_aas_views_specific),
Rule("/conceptDictionaries", methods=["GET"], endpoint=self.get_aas_concept_dictionaries),
Rule("/conceptDictionaries", methods=["PUT"], endpoint=self.put_aas_concept_dictionaries),
Rule("/conceptDictionaries/<string(minlength=1):id_short>", methods=["GET"],
endpoint=self.get_aas_concept_dictionaries_specific),
Rule("/conceptDictionaries/<string(minlength=1):id_short>", methods=["DELETE"],
endpoint=self.delete_aas_concept_dictionaries_specific),
Rule("/submodels/<string(minlength=1):id_short>", methods=["GET"],
endpoint=self.get_aas_submodels_specific),
Rule("/submodels/<string(minlength=1):id_short>", methods=["DELETE"],
endpoint=self.delete_aas_submodels_specific),
])
]),
Submount("/submodels/<identifier:sm_id>", [
Rule("/submodel", methods=["GET"], endpoint=self.get_sm)
Submount("/submodels/<identifier:submodel_id>", [
Rule("/submodel", methods=["GET"], endpoint=self.get_submodel),
Submount("/submodel", [
])
])
])
], converters={"identifier": IdentifierConverter})
......@@ -85,25 +156,198 @@ class WSGIApp:
raise NotFound(f"No {type_.__name__} with {identifier} found!")
return identifiable
def resolve_reference(self, reference: model.AASReference[model.base._RT]) -> model.base._RT:
try:
return reference.resolve(self.object_store)
except (KeyError, TypeError, model.base.UnexpectedTypeError) as e:
raise InternalServerError(xml_deserialization._exception_to_str(e)) from e
def handle_request(self, request: Request):
adapter = self.url_map.bind_to_environ(request.environ)
# determine response content type
try:
endpoint, values = adapter.match()
if endpoint is None:
return NotImplemented("This route is not yet implemented.")
raise NotImplemented("This route is not yet implemented.")
return endpoint(request, values)
# any raised error that leaves this function will cause a 500 internal server error
# so catch raised http exceptions and return them
except werkzeug.exceptions.NotAcceptable as e:
return e
except werkzeug.exceptions.HTTPException as e:
try:
# get_response_type() may raise a NotAcceptable error, so we have to handle that
return http_exception_to_response(e, get_response_type(request))
except werkzeug.exceptions.NotAcceptable as e:
return e
def get_aas(self, request: Request, url_args: Dict) -> APIResponse:
def get_aas(self, request: Request, url_args: Dict) -> Response:
# TODO: depth parameter
response = get_response_type(request)
return response(self.get_obj_ts(url_args["aas_id"], model.AssetAdministrationShell))
response_t = get_response_type(request)
aas = self.get_obj_ts(url_args["aas_id"], model.AssetAdministrationShell)
aas.update()
return response_t(aas)
def get_aas_asset(self, request: Request, url_args: Dict) -> Response:
response_t = get_response_type(request)
aas = self.get_obj_ts(url_args["aas_id"], model.AssetAdministrationShell)
aas.update()
asset = self.resolve_reference(aas.asset)
asset.update()
return response_t(asset)
def get_aas_submodels(self, request: Request, url_args: Dict) -> Response:
# TODO: depth parameter
response_t = get_response_type(request)
aas = self.get_obj_ts(url_args["aas_id"], model.AssetAdministrationShell)
aas.update()
submodels = [self.resolve_reference(ref) for ref in aas.submodel]
for submodel in submodels:
submodel.update()
identification_id: Optional[str] = request.args.get("identification.id")
if identification_id is not None:
# mypy doesn't propagate type restrictions to nested functions: https://github.com/python/mypy/issues/2608
submodels = filter(lambda s: identification_id in s.identification.id, submodels) # type: ignore
semantic_id: Optional[str] = request.args.get("semanticId")
if semantic_id is not None:
# mypy doesn't propagate type restrictions to nested functions: https://github.com/python/mypy/issues/2608
submodels = filter(lambda s: s.semantic_id is not None # type: ignore
and len(s.semantic_id.key) > 0
and semantic_id in s.semantic_id.key[0].value, submodels) # type: ignore
return response_t(list(submodels))
def put_aas_submodels(self, request: Request, url_args: Dict) -> Response:
response_t = get_response_type(request)
new_submodel = parse_request_body(request, model.Submodel)
aas = self.get_obj_ts(url_args["aas_id"], model.AssetAdministrationShell)
aas.update()
current_submodel = None
for s in iter(self.resolve_reference(ref) for ref in aas.submodel):
if s.identification == new_submodel.identification:
current_submodel = s
break
if current_submodel is None:
aas.submodel.add(model.AASReference.from_referable(new_submodel))
aas.commit()
not_referenced_submodel = self.object_store.get(new_submodel.identification)
assert(isinstance(not_referenced_submodel, model.Submodel))
current_submodel = not_referenced_submodel
if current_submodel is not None:
self.object_store.discard(current_submodel)
self.object_store.add(new_submodel)
return response_t(new_submodel, status=201)
def get_aas_views(self, request: Request, url_args: Dict) -> Response:
# TODO: filter parameter
response_t = get_response_type(request)
aas = self.get_obj_ts(url_args["aas_id"], model.AssetAdministrationShell)
if len(aas.view) == 0:
raise NotFound("No views found!")
return response_t(list(aas.view))
def put_aas_views(self, request: Request, url_args: Dict) -> Response:
response_t = get_response_type(request)
new_view = parse_request_body(request, model.View)
aas = self.get_obj_ts(url_args["aas_id"], model.AssetAdministrationShell)
aas.update()
old_view = aas.view.get(new_view.id_short)
if old_view is not None:
aas.view.discard(old_view)
aas.view.add(new_view)
aas.commit()
return response_t(new_view, status=201)
def get_aas_views_specific(self, request: Request, url_args: Dict) -> Response:
response_t = get_response_type(request)
aas = self.get_obj_ts(url_args["aas_id"], model.AssetAdministrationShell)
aas.update()
id_short = url_args["id_short"]
view = aas.view.get(id_short)
if view is None:
raise NotFound(f"No view with idShort '{id_short}' found!")
view.update()
return response_t(view)
def delete_aas_views_specific(self, request: Request, url_args: Dict) -> Response:
aas = self.get_obj_ts(url_args["aas_id"], model.AssetAdministrationShell)
aas.update()
id_short = url_args["id_short"]
view = aas.view.get(id_short)
if view is None:
raise NotFound(f"No view with idShort '{id_short}' found!")
view.update()
aas.view.remove(view.id_short)
return Response(status=204)
def get_aas_concept_dictionaries(self, request: Request, url_args: Dict) -> Response:
# TODO: depth parameter
response_t = get_response_type(request)
aas = self.get_obj_ts(url_args["aas_id"], model.AssetAdministrationShell)
aas.update()
if len(aas.concept_dictionary) == 0:
raise NotFound("No concept dictionaries found!")
return response_t(list(aas.concept_dictionary))
def put_aas_concept_dictionaries(self, request: Request, url_args: Dict) -> Response:
response_t = get_response_type(request)
new_concept_dictionary = parse_request_body(request, model.ConceptDictionary)
aas = self.get_obj_ts(url_args["aas_id"], model.AssetAdministrationShell)
aas.update()
old_concept_dictionary = aas.concept_dictionary.get(new_concept_dictionary.id_short)
if old_concept_dictionary is not None:
aas.concept_dictionary.discard(old_concept_dictionary)
aas.concept_dictionary.add(new_concept_dictionary)
aas.commit()
return response_t(new_concept_dictionary, status=201)
def get_aas_concept_dictionaries_specific(self, request: Request, url_args: Dict) -> Response:
response_t = get_response_type(request)
aas = self.get_obj_ts(url_args["aas_id"], model.AssetAdministrationShell)
aas.update()
id_short = url_args["id_short"]
concept_dictionary = aas.concept_dictionary.get(id_short)
if concept_dictionary is None:
raise NotFound(f"No concept dictionary with idShort '{id_short}' found!")
concept_dictionary.update()
return response_t(concept_dictionary)
def delete_aas_concept_dictionaries_specific(self, request: Request, url_args: Dict) -> Response:
aas = self.get_obj_ts(url_args["aas_id"], model.AssetAdministrationShell)
aas.update()
id_short = url_args["id_short"]
concept_dictionaries = aas.concept_dictionary.get(id_short)
if concept_dictionaries is None:
raise NotFound(f"No concept dictionary with idShort '{id_short}' found!")
concept_dictionaries.update()
aas.view.remove(concept_dictionaries.id_short)
return Response(status=204)
def get_aas_submodels_specific(self, request: Request, url_args: Dict) -> Response:
response_t = get_response_type(request)
aas = self.get_obj_ts(url_args["aas_id"], model.AssetAdministrationShell)
aas.update()
id_short = url_args["id_short"]
for submodel in iter(self.resolve_reference(ref) for ref in aas.submodel):
submodel.update()
if submodel.id_short == id_short:
return response_t(submodel)
raise NotFound(f"No submodel with idShort '{id_short}' found!")
def delete_aas_submodels_specific(self, request: Request, url_args: Dict) -> Response:
aas = self.get_obj_ts(url_args["aas_id"], model.AssetAdministrationShell)
aas.update()
id_short = url_args["id_short"]
for ref in aas.submodel:
submodel = self.resolve_reference(ref)
submodel.update()
if submodel.id_short == id_short:
aas.submodel.discard(ref)
self.object_store.discard(submodel)
return Response(status=204)
raise NotFound(f"No submodel with idShort '{id_short}' found!")
def get_sm(self, request: Request, url_args: Dict) -> APIResponse:
def get_submodel(self, request: Request, url_args: Dict) -> Response:
# TODO: depth parameter
response = get_response_type(request)
return response(self.get_obj_ts(url_args["sm_id"], model.Submodel))
response_t = get_response_type(request)
submodel = self.get_obj_ts(url_args["submodel_id"], model.Submodel)
submodel.update()
return response_t(submodel)
0% Loading or .
You are about to add 0 people to the discussion. Proceed with caution.
Please register or to comment