Commit 66f3e30f authored by Michael Thies's avatar Michael Thies
Browse files

Use urllib3 for CouchDB client implementation

parent 7b7d8d8b
......@@ -13,15 +13,14 @@ Todo: Add module docstring
"""
import threading
import weakref
from typing import List, Dict, Any, Optional, Iterator, Iterable, Union
import re
from typing import List, Dict, Any, Optional, Iterator, Iterable, Union, Tuple
import urllib.parse
import urllib.request
import urllib.error
import logging
import json
import http.client
import urllib3
from . import backends
from aas.adapter.json import json_deserialization, json_serialization
from aas import model
......@@ -30,6 +29,9 @@ from aas import model
logger = logging.getLogger(__name__)
_http_pool_manager = urllib3.PoolManager()
class CouchDBBackend(backends.Backend):
"""
This Backend stores each Identifiable object as a single JSON document in the configured CouchDB database. Each
......@@ -47,10 +49,8 @@ class CouchDBBackend(backends.Backend):
raise CouchDBSourceError("The given store_object is not Identifiable, therefore cannot be found "
"in the CouchDB")
url = CouchDBBackend._parse_source(store_object.source)
request = urllib.request.Request(url,
headers={'Accept': 'application/json'})
try:
data = CouchDBBackend.do_request(request)
data = CouchDBBackend.do_request(url)
except CouchDBServerError as e:
if e.code == 404:
raise KeyError("No Identifiable found in CouchDB at {}".format(url)) from e
......@@ -75,13 +75,9 @@ class CouchDBBackend(backends.Backend):
data = json.dumps({'data': store_object, "_rev": get_couchdb_revision(url)},
cls=json_serialization.AASToJsonEncoder)
request = urllib.request.Request(
url,
headers={'Content-type': 'application/json'},
method='PUT',
data=data.encode())
try:
response = CouchDBBackend.do_request(request)
response = CouchDBBackend.do_request(
url, method='PUT', additional_headers={'Content-type': 'application/json'}, body=data.encode('utf-8'))
set_couchdb_revision(url, response["rev"])
except CouchDBServerError as e:
if e.code == 409:
......@@ -111,52 +107,64 @@ class CouchDBBackend(backends.Backend):
return url
@classmethod
def do_request(cls, request: urllib.request.Request) -> Dict[str, Any]:
def do_request(cls, url: str, method: str = "GET", additional_headers: Dict[str, str] = {},
body: Optional[bytes] = None) -> Dict[str, Any]:
"""
Perform an HTTP request to the CouchDBServer, parse the result and handle errors
:param request:
:return:
Perform an HTTP(S) request to the CouchDBServer, parse the result and handle errors
:param url: The HTTP or HTTPS URL to request
:param method: The HTTP method for the request
:param additional_headers: Additional headers to insert into the request. The default headers include
'connection: keep-alive', 'accept-encoding: ...', 'authorization: basic ...', 'Accept: ...'.
:param body: Request body for POST requests
:return: The parsed JSON data if the request `method` is other than 'HEAD' or the response headers for 'HEAD'
requests
"""
opener = urllib.request.build_opener(urllib.request.HTTPBasicAuthHandler(_credentials_store))
url_parts = urllib.parse.urlparse(url)
host = url_parts.scheme + url_parts.netloc
auth = _credentials_store.get(host)
headers = urllib3.make_headers(keep_alive=True, accept_encoding=True,
basic_auth="{}:{}".format(*auth) if auth else None)
headers['Accept'] = 'application/json'
headers.update(additional_headers)
try:
response = opener.open(request)
except urllib.error.HTTPError as e:
with e: # close the reponse (socket) when done
logger.debug("Request %s %s finished with HTTP status code %s.",
request.get_method(), request.full_url, e.code)
if e.headers.get('Content-type', None) != 'application/json':
raise CouchDBResponseError("Unexpected Content-type header {} of response from CouchDB server"
.format(e.headers.get('Content-type', None)))
if request.get_method() == 'HEAD':
raise CouchDBServerError(e.code, "", "", "HTTP {}") from e
try:
data = json.load(e)
except json.JSONDecodeError:
raise CouchDBResponseError("Could not parse error message of HTTP {}"
.format(e.code))
raise CouchDBServerError(e.code, data['error'], data['reason'],
"HTTP {}: {} (reason: {})".format(e.code, data['error'], data['reason']))\
from e
except urllib.error.URLError as e:
response = _http_pool_manager.request(method, url, headers=headers, body=body)
except (urllib3.exceptions.TimeoutError, urllib3.exceptions.SSLError, urllib3.exceptions.ProtocolError) as e:
raise CouchDBConnectionError("Error while connecting to the CouchDB server: {}".format(e)) from e
except urllib3.exceptions.HTTPError as e:
raise CouchDBResponseError("Error while connecting to the CouchDB server: {}".format(e)) from e
if not (200 <= response.status < 300):
logger.debug("Request %s %s finished with HTTP status code %s.",
method, url, response.status)
if response.headers.get('Content-type', None) != 'application/json':
raise CouchDBResponseError("Unexpected Content-type header {} of response from CouchDB server"
.format(response.headers.get('Content-type', None)))
if method == 'HEAD':
raise CouchDBServerError(response.status, "", "", "HTTP {}".format(response.status))
# Check response & parse data
assert (isinstance(response, http.client.HTTPResponse))
with response: # close the reponse (socket) when done
logger.debug("Request %s %s finished successfully.", request.get_method(), request.full_url)
if request.get_method() == 'HEAD':
return {}
if response.getheader('Content-type') != 'application/json':
raise CouchDBResponseError("Unexpected Content-type header")
try:
data = json.load(response, cls=json_deserialization.AASFromJsonDecoder)
except json.JSONDecodeError as e:
raise CouchDBResponseError("Could not parse CouchDB server response as JSON data.") from e
return data
data = json.loads(response.data.decode('utf-8'))
except json.JSONDecodeError:
raise CouchDBResponseError("Could not parse error message of HTTP {}"
.format(response.status))
raise CouchDBServerError(response.status, data['error'], data['reason'],
"HTTP {}: {} (reason: {})".format(response.status, data['error'], data['reason']))
# Check response & parse data
logger.debug("Request %s %s finished successfully.", method, url)
if method == 'HEAD':
return response.headers
if response.getheader('Content-type') != 'application/json':
raise CouchDBResponseError("Unexpected Content-type header")
try:
data = json.loads(response.data.decode('utf-8'), cls=json_deserialization.AASFromJsonDecoder)
except json.JSONDecodeError as e:
raise CouchDBResponseError("Could not parse CouchDB server response as JSON data.") from e
return data
backends.register_backend("couchdb", CouchDBBackend)
......@@ -164,7 +172,7 @@ backends.register_backend("couchdbs", CouchDBBackend)
# Global registry for credentials for CouchDB Servers
_credentials_store: urllib.request.HTTPPasswordMgrWithPriorAuth = urllib.request.HTTPPasswordMgrWithPriorAuth()
_credentials_store: Dict[str, Tuple[str, str]] = {}
# Note: The HTTPPasswordMgr is not thread safe during writing, should be thread safe for reading only.
......@@ -179,7 +187,8 @@ def register_credentials(url: str, username: str, password: str):
:param username: Username to that CouchDB instance
:param password: Password to the Username
"""
_credentials_store.add_password(None, url, username, password, is_authenticated=True)
url_parts = urllib.parse.urlparse(url)
_credentials_store[url_parts.scheme + url_parts.netloc] = (username, password)
# Global registry for CouchDB Revisions
......@@ -253,12 +262,8 @@ class CouchDBObjectStore(model.AbstractObjectStore):
:param create: If True and the database does not exist, try to create it
:raises CouchDBError: If error occur during the request to the CouchDB server (see `_do_request()` for details)
"""
request = urllib.request.Request(
"{}/{}".format(self.url, self.database_name),
headers={'Accept': 'application/json'},
method='HEAD')
try:
CouchDBBackend.do_request(request)
CouchDBBackend.do_request("{}/{}".format(self.url, self.database_name), 'HEAD')
except CouchDBServerError as e:
# If an HTTPError is raised, re-raise it, unless it is a 404 error and we are requested to create the
# database
......@@ -270,11 +275,7 @@ class CouchDBObjectStore(model.AbstractObjectStore):
# Create database
logger.info("Creating CouchDB database %s/%s ...", self.url, self.database_name)
request = urllib.request.Request(
"{}/{}".format(self.url, self.database_name),
headers={'Accept': 'application/json'},
method='PUT')
CouchDBBackend.do_request(request)
CouchDBBackend.do_request("{}/{}".format(self.url, self.database_name), 'PUT')
def get_identifiable(self, identifier: Union[str, model.Identifier]) -> model.Identifiable:
"""
......@@ -290,11 +291,9 @@ class CouchDBObjectStore(model.AbstractObjectStore):
identifier = self._transform_id(identifier, False)
# Create and issue HTTP request (raises HTTPError on status != 200)
request = urllib.request.Request(
"{}/{}/{}".format(self.url, self.database_name, urllib.parse.quote(identifier, safe='')),
headers={'Accept': 'application/json'})
try:
data = CouchDBBackend.do_request(request)
data = CouchDBBackend.do_request(
"{}/{}/{}".format(self.url, self.database_name, urllib.parse.quote(identifier, safe='')))
except CouchDBServerError as e:
if e.code == 404:
raise KeyError("No Identifiable with id {} found in CouchDB database".format(identifier)) from e
......@@ -335,13 +334,12 @@ class CouchDBObjectStore(model.AbstractObjectStore):
data = json.dumps({'data': x}, cls=json_serialization.AASToJsonEncoder)
# Create and issue HTTP request (raises HTTPError on status != 200)
request = urllib.request.Request(
"{}/{}/{}".format(self.url, self.database_name, self._transform_id(x.identification)),
headers={'Content-type': 'application/json'},
method='PUT',
data=data.encode())
try:
response = CouchDBBackend.do_request(request)
response = CouchDBBackend.do_request(
"{}/{}/{}".format(self.url, self.database_name, self._transform_id(x.identification)),
'PUT',
{'Content-type': 'application/json'},
data.encode('utf-8'))
set_couchdb_revision("{}/{}/{}".format(self.url, self.database_name, self._transform_id(x.identification)),
response["rev"])
except CouchDBServerError as e:
......@@ -379,25 +377,19 @@ class CouchDBObjectStore(model.AbstractObjectStore):
# ETag response header
try:
logger.debug("fetching the current object revision for deletion ...")
request = urllib.request.Request(
"{}/{}/{}".format(self.url, self.database_name, self._transform_id(x.identification)),
headers={'Accept': 'application/json'},
method='HEAD')
opener = urllib.request.build_opener(urllib.request.HTTPBasicAuthHandler(_credentials_store))
response = opener.open(request)
rev = response.getheader('ETag')[1:-1]
except urllib.error.HTTPError as e:
headers = CouchDBBackend.do_request(
"{}/{}/{}".format(self.url, self.database_name, self._transform_id(x.identification)), 'HEAD')
rev = headers['ETag'][1:-1]
except CouchDBServerError as e:
if e.code == 404:
raise KeyError("No AAS object with id {} exists in CouchDB database".format(x.identification))\
from e
raise
request = urllib.request.Request(
"{}/{}/{}?rev={}".format(self.url, self.database_name, self._transform_id(x.identification), rev),
headers={'Content-type': 'application/json'},
method='DELETE')
try:
CouchDBBackend.do_request(request)
CouchDBBackend.do_request(
"{}/{}/{}?rev={}".format(self.url, self.database_name, self._transform_id(x.identification), rev),
'DELETE')
except CouchDBServerError as e:
if e.code == 404:
raise KeyError("No AAS object with id {} exists in CouchDB database".format(x.identification)) from e
......@@ -429,12 +421,9 @@ class CouchDBObjectStore(model.AbstractObjectStore):
else:
return False
logger.debug("Checking existence of object with id %s in database ...", repr(x))
request = urllib.request.Request(
"{}/{}/{}".format(self.url, self.database_name, self._transform_id(identifier)),
headers={'Accept': 'application/json'},
method='HEAD')
try:
CouchDBBackend.do_request(request)
CouchDBBackend.do_request(
"{}/{}/{}".format(self.url, self.database_name, self._transform_id(identifier)), 'HEAD')
except CouchDBServerError as e:
if e.code == 404:
return False
......@@ -449,10 +438,7 @@ class CouchDBObjectStore(model.AbstractObjectStore):
:raises CouchDBError: If error occur during the request to the CouchDB server (see `_do_request()` for details)
"""
logger.debug("Fetching number of documents from database ...")
request = urllib.request.Request(
"{}/{}".format(self.url, self.database_name),
headers={'Accept': 'application/json'})
data = CouchDBBackend.do_request(request)
data = CouchDBBackend.do_request("{}/{}".format(self.url, self.database_name))
return data['doc_count']
def __iter__(self) -> Iterator[model.Identifiable]:
......@@ -477,10 +463,7 @@ class CouchDBObjectStore(model.AbstractObjectStore):
# Fetch a list of all ids and construct Iterator object
logger.debug("Creating iterator over objects in database ...")
request = urllib.request.Request(
"{}/{}/_all_docs".format(self.url, self.database_name),
headers={'Accept': 'application/json'})
data = CouchDBBackend.do_request(request)
data = CouchDBBackend.do_request("{}/{}/_all_docs".format(self.url, self.database_name))
return CouchDBIdentifiableIterator(self, (row['id'] for row in data['rows']))
@staticmethod
......
......@@ -15,7 +15,7 @@ import urllib.error
from aas.backend import couchdb
from aas.examples.data.example_aas import *
from .._helper.test_helpers import TEST_CONFIG, COUCHDB_OKAY, COUCHDB_ERROR
from test._helper.test_helpers import TEST_CONFIG, COUCHDB_OKAY, COUCHDB_ERROR
source_core: str = "couchdb://" + TEST_CONFIG["couchdb"]["url"].lstrip("http://") + "/" + \
......
Supports Markdown
0% or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment