diff --git a/aas/backend/couchdb.py b/aas/backend/couchdb.py index c7168e634c26df5b68db5ecd8d8680ff7b36ed29..0b8c6f9111a965982a68523b85c681527e7346ff 100644 --- a/aas/backend/couchdb.py +++ b/aas/backend/couchdb.py @@ -13,15 +13,14 @@ Todo: Add module docstring """ import threading import weakref -from typing import List, Dict, Any, Optional, Iterator, Iterable, Union -import re +from typing import List, Dict, Any, Optional, Iterator, Iterable, Union, Tuple import urllib.parse -import urllib.request -import urllib.error import logging import json import http.client +import urllib3 + from . import backends from aas.adapter.json import json_deserialization, json_serialization from aas import model @@ -30,6 +29,9 @@ from aas import model logger = logging.getLogger(__name__) +_http_pool_manager = urllib3.PoolManager() + + class CouchDBBackend(backends.Backend): """ This Backend stores each Identifiable object as a single JSON document in the configured CouchDB database. Each @@ -47,10 +49,8 @@ class CouchDBBackend(backends.Backend): raise CouchDBSourceError("The given store_object is not Identifiable, therefore cannot be found " "in the CouchDB") url = CouchDBBackend._parse_source(store_object.source) - request = urllib.request.Request(url, - headers={'Accept': 'application/json'}) try: - data = CouchDBBackend.do_request(request) + data = CouchDBBackend.do_request(url) except CouchDBServerError as e: if e.code == 404: raise KeyError("No Identifiable found in CouchDB at {}".format(url)) from e @@ -75,13 +75,9 @@ class CouchDBBackend(backends.Backend): data = json.dumps({'data': store_object, "_rev": get_couchdb_revision(url)}, cls=json_serialization.AASToJsonEncoder) - request = urllib.request.Request( - url, - headers={'Content-type': 'application/json'}, - method='PUT', - data=data.encode()) try: - response = CouchDBBackend.do_request(request) + response = CouchDBBackend.do_request( + url, method='PUT', additional_headers={'Content-type': 'application/json'}, body=data.encode('utf-8')) set_couchdb_revision(url, response["rev"]) except CouchDBServerError as e: if e.code == 409: @@ -111,52 +107,64 @@ class CouchDBBackend(backends.Backend): return url @classmethod - def do_request(cls, request: urllib.request.Request) -> Dict[str, Any]: + def do_request(cls, url: str, method: str = "GET", additional_headers: Dict[str, str] = {}, + body: Optional[bytes] = None) -> Dict[str, Any]: """ - Perform an HTTP request to the CouchDBServer, parse the result and handle errors - - :param request: - :return: + Perform an HTTP(S) request to the CouchDBServer, parse the result and handle errors + + :param url: The HTTP or HTTPS URL to request + :param method: The HTTP method for the request + :param additional_headers: Additional headers to insert into the request. The default headers include + 'connection: keep-alive', 'accept-encoding: ...', 'authorization: basic ...', 'Accept: ...'. + :param body: Request body for POST requests + :return: The parsed JSON data if the request `method` is other than 'HEAD' or the response headers for 'HEAD' + requests """ - opener = urllib.request.build_opener(urllib.request.HTTPBasicAuthHandler(_credentials_store)) + url_parts = urllib.parse.urlparse(url) + host = url_parts.scheme + url_parts.netloc + auth = _credentials_store.get(host) + headers = urllib3.make_headers(keep_alive=True, accept_encoding=True, + basic_auth="{}:{}".format(*auth) if auth else None) + headers['Accept'] = 'application/json' + headers.update(additional_headers) + try: - response = opener.open(request) - except urllib.error.HTTPError as e: - with e: # close the reponse (socket) when done - logger.debug("Request %s %s finished with HTTP status code %s.", - request.get_method(), request.full_url, e.code) - if e.headers.get('Content-type', None) != 'application/json': - raise CouchDBResponseError("Unexpected Content-type header {} of response from CouchDB server" - .format(e.headers.get('Content-type', None))) - - if request.get_method() == 'HEAD': - raise CouchDBServerError(e.code, "", "", "HTTP {}") from e - - try: - data = json.load(e) - except json.JSONDecodeError: - raise CouchDBResponseError("Could not parse error message of HTTP {}" - .format(e.code)) - raise CouchDBServerError(e.code, data['error'], data['reason'], - "HTTP {}: {} (reason: {})".format(e.code, data['error'], data['reason']))\ - from e - except urllib.error.URLError as e: + response = _http_pool_manager.request(method, url, headers=headers, body=body) + except (urllib3.exceptions.TimeoutError, urllib3.exceptions.SSLError, urllib3.exceptions.ProtocolError) as e: raise CouchDBConnectionError("Error while connecting to the CouchDB server: {}".format(e)) from e + except urllib3.exceptions.HTTPError as e: + raise CouchDBResponseError("Error while connecting to the CouchDB server: {}".format(e)) from e + + if not (200 <= response.status < 300): + logger.debug("Request %s %s finished with HTTP status code %s.", + method, url, response.status) + if response.headers.get('Content-type', None) != 'application/json': + raise CouchDBResponseError("Unexpected Content-type header {} of response from CouchDB server" + .format(response.headers.get('Content-type', None))) + + if method == 'HEAD': + raise CouchDBServerError(response.status, "", "", "HTTP {}".format(response.status)) - # Check response & parse data - assert (isinstance(response, http.client.HTTPResponse)) - with response: # close the reponse (socket) when done - logger.debug("Request %s %s finished successfully.", request.get_method(), request.full_url) - if request.get_method() == 'HEAD': - return {} - - if response.getheader('Content-type') != 'application/json': - raise CouchDBResponseError("Unexpected Content-type header") try: - data = json.load(response, cls=json_deserialization.AASFromJsonDecoder) - except json.JSONDecodeError as e: - raise CouchDBResponseError("Could not parse CouchDB server response as JSON data.") from e - return data + data = json.loads(response.data.decode('utf-8')) + except json.JSONDecodeError: + raise CouchDBResponseError("Could not parse error message of HTTP {}" + .format(response.status)) + raise CouchDBServerError(response.status, data['error'], data['reason'], + "HTTP {}: {} (reason: {})".format(response.status, data['error'], data['reason'])) + + # Check response & parse data + logger.debug("Request %s %s finished successfully.", method, url) + if method == 'HEAD': + return response.headers + + if response.getheader('Content-type') != 'application/json': + raise CouchDBResponseError("Unexpected Content-type header") + try: + data = json.loads(response.data.decode('utf-8'), cls=json_deserialization.AASFromJsonDecoder) + except json.JSONDecodeError as e: + raise CouchDBResponseError("Could not parse CouchDB server response as JSON data.") from e + return data backends.register_backend("couchdb", CouchDBBackend) @@ -164,7 +172,7 @@ backends.register_backend("couchdbs", CouchDBBackend) # Global registry for credentials for CouchDB Servers -_credentials_store: urllib.request.HTTPPasswordMgrWithPriorAuth = urllib.request.HTTPPasswordMgrWithPriorAuth() +_credentials_store: Dict[str, Tuple[str, str]] = {} # Note: The HTTPPasswordMgr is not thread safe during writing, should be thread safe for reading only. @@ -179,7 +187,8 @@ def register_credentials(url: str, username: str, password: str): :param username: Username to that CouchDB instance :param password: Password to the Username """ - _credentials_store.add_password(None, url, username, password, is_authenticated=True) + url_parts = urllib.parse.urlparse(url) + _credentials_store[url_parts.scheme + url_parts.netloc] = (username, password) # Global registry for CouchDB Revisions @@ -253,12 +262,8 @@ class CouchDBObjectStore(model.AbstractObjectStore): :param create: If True and the database does not exist, try to create it :raises CouchDBError: If error occur during the request to the CouchDB server (see `_do_request()` for details) """ - request = urllib.request.Request( - "{}/{}".format(self.url, self.database_name), - headers={'Accept': 'application/json'}, - method='HEAD') try: - CouchDBBackend.do_request(request) + CouchDBBackend.do_request("{}/{}".format(self.url, self.database_name), 'HEAD') except CouchDBServerError as e: # If an HTTPError is raised, re-raise it, unless it is a 404 error and we are requested to create the # database @@ -270,11 +275,7 @@ class CouchDBObjectStore(model.AbstractObjectStore): # Create database logger.info("Creating CouchDB database %s/%s ...", self.url, self.database_name) - request = urllib.request.Request( - "{}/{}".format(self.url, self.database_name), - headers={'Accept': 'application/json'}, - method='PUT') - CouchDBBackend.do_request(request) + CouchDBBackend.do_request("{}/{}".format(self.url, self.database_name), 'PUT') def get_identifiable(self, identifier: Union[str, model.Identifier]) -> model.Identifiable: """ @@ -290,11 +291,9 @@ class CouchDBObjectStore(model.AbstractObjectStore): identifier = self._transform_id(identifier, False) # Create and issue HTTP request (raises HTTPError on status != 200) - request = urllib.request.Request( - "{}/{}/{}".format(self.url, self.database_name, urllib.parse.quote(identifier, safe='')), - headers={'Accept': 'application/json'}) try: - data = CouchDBBackend.do_request(request) + data = CouchDBBackend.do_request( + "{}/{}/{}".format(self.url, self.database_name, urllib.parse.quote(identifier, safe=''))) except CouchDBServerError as e: if e.code == 404: raise KeyError("No Identifiable with id {} found in CouchDB database".format(identifier)) from e @@ -335,13 +334,12 @@ class CouchDBObjectStore(model.AbstractObjectStore): data = json.dumps({'data': x}, cls=json_serialization.AASToJsonEncoder) # Create and issue HTTP request (raises HTTPError on status != 200) - request = urllib.request.Request( - "{}/{}/{}".format(self.url, self.database_name, self._transform_id(x.identification)), - headers={'Content-type': 'application/json'}, - method='PUT', - data=data.encode()) try: - response = CouchDBBackend.do_request(request) + response = CouchDBBackend.do_request( + "{}/{}/{}".format(self.url, self.database_name, self._transform_id(x.identification)), + 'PUT', + {'Content-type': 'application/json'}, + data.encode('utf-8')) set_couchdb_revision("{}/{}/{}".format(self.url, self.database_name, self._transform_id(x.identification)), response["rev"]) except CouchDBServerError as e: @@ -379,25 +377,19 @@ class CouchDBObjectStore(model.AbstractObjectStore): # ETag response header try: logger.debug("fetching the current object revision for deletion ...") - request = urllib.request.Request( - "{}/{}/{}".format(self.url, self.database_name, self._transform_id(x.identification)), - headers={'Accept': 'application/json'}, - method='HEAD') - opener = urllib.request.build_opener(urllib.request.HTTPBasicAuthHandler(_credentials_store)) - response = opener.open(request) - rev = response.getheader('ETag')[1:-1] - except urllib.error.HTTPError as e: + headers = CouchDBBackend.do_request( + "{}/{}/{}".format(self.url, self.database_name, self._transform_id(x.identification)), 'HEAD') + rev = headers['ETag'][1:-1] + except CouchDBServerError as e: if e.code == 404: raise KeyError("No AAS object with id {} exists in CouchDB database".format(x.identification))\ from e raise - request = urllib.request.Request( - "{}/{}/{}?rev={}".format(self.url, self.database_name, self._transform_id(x.identification), rev), - headers={'Content-type': 'application/json'}, - method='DELETE') try: - CouchDBBackend.do_request(request) + CouchDBBackend.do_request( + "{}/{}/{}?rev={}".format(self.url, self.database_name, self._transform_id(x.identification), rev), + 'DELETE') except CouchDBServerError as e: if e.code == 404: raise KeyError("No AAS object with id {} exists in CouchDB database".format(x.identification)) from e @@ -429,12 +421,9 @@ class CouchDBObjectStore(model.AbstractObjectStore): else: return False logger.debug("Checking existence of object with id %s in database ...", repr(x)) - request = urllib.request.Request( - "{}/{}/{}".format(self.url, self.database_name, self._transform_id(identifier)), - headers={'Accept': 'application/json'}, - method='HEAD') try: - CouchDBBackend.do_request(request) + CouchDBBackend.do_request( + "{}/{}/{}".format(self.url, self.database_name, self._transform_id(identifier)), 'HEAD') except CouchDBServerError as e: if e.code == 404: return False @@ -449,10 +438,7 @@ class CouchDBObjectStore(model.AbstractObjectStore): :raises CouchDBError: If error occur during the request to the CouchDB server (see `_do_request()` for details) """ logger.debug("Fetching number of documents from database ...") - request = urllib.request.Request( - "{}/{}".format(self.url, self.database_name), - headers={'Accept': 'application/json'}) - data = CouchDBBackend.do_request(request) + data = CouchDBBackend.do_request("{}/{}".format(self.url, self.database_name)) return data['doc_count'] def __iter__(self) -> Iterator[model.Identifiable]: @@ -477,10 +463,7 @@ class CouchDBObjectStore(model.AbstractObjectStore): # Fetch a list of all ids and construct Iterator object logger.debug("Creating iterator over objects in database ...") - request = urllib.request.Request( - "{}/{}/_all_docs".format(self.url, self.database_name), - headers={'Accept': 'application/json'}) - data = CouchDBBackend.do_request(request) + data = CouchDBBackend.do_request("{}/{}/_all_docs".format(self.url, self.database_name)) return CouchDBIdentifiableIterator(self, (row['id'] for row in data['rows'])) @staticmethod diff --git a/test/backend/test_couchdb.py b/test/backend/test_couchdb.py index 475b1602960b8c5173d327ecafa2f3c48a0dc2b4..6fb40fd1fb86fc9fc55e0b54352964a5804ff8cf 100644 --- a/test/backend/test_couchdb.py +++ b/test/backend/test_couchdb.py @@ -15,7 +15,7 @@ import urllib.error from aas.backend import couchdb from aas.examples.data.example_aas import * -from .._helper.test_helpers import TEST_CONFIG, COUCHDB_OKAY, COUCHDB_ERROR +from test._helper.test_helpers import TEST_CONFIG, COUCHDB_OKAY, COUCHDB_ERROR source_core: str = "couchdb://" + TEST_CONFIG["couchdb"]["url"].lstrip("http://") + "/" + \