Skip to content
Snippets Groups Projects

Fix some CouchDB and update/commit things

Merged Michael Thies requested to merge fix/couchdb_update_commit_things into master
All threads resolved!
Files
10
+ 97
114
@@ -13,14 +13,12 @@ Todo: Add module docstring
"""
import threading
import weakref
from typing import List, Dict, Any, Optional, Iterator, Iterable, Union
import re
from typing import List, Dict, Any, Optional, Iterator, Iterable, Union, Tuple
import urllib.parse
import urllib.request
import urllib.error
import logging
import json
import http.client
import urllib3 # type: ignore
from . import backends
from aas.adapter.json import json_deserialization, json_serialization
@@ -30,6 +28,9 @@ from aas import model
logger = logging.getLogger(__name__)
_http_pool_manager = urllib3.PoolManager()
class CouchDBBackend(backends.Backend):
"""
This Backend stores each Identifiable object as a single JSON document in the configured CouchDB database. Each
@@ -39,18 +40,16 @@ class CouchDBBackend(backends.Backend):
"""
@classmethod
def update_object(cls,
updated_object: "Referable", # type: ignore
store_object: "Referable", # type: ignore
updated_object: model.Referable,
store_object: model.Referable,
relative_path: List[str]) -> None:
if not isinstance(store_object, model.Identifiable):
raise CouchDBSourceError("The given store_object is not Identifiable, therefore cannot be found "
"in the CouchDB")
url = CouchDBBackend._parse_source(store_object.source)
request = urllib.request.Request(url,
headers={'Accept': 'application/json'})
try:
data = CouchDBBackend.do_request(request)
data = CouchDBBackend.do_request(url)
except CouchDBServerError as e:
if e.code == 404:
raise KeyError("No Identifiable found in CouchDB at {}".format(url)) from e
@@ -62,8 +61,8 @@ class CouchDBBackend(backends.Backend):
@classmethod
def commit_object(cls,
committed_object: "Referable", # type: ignore
store_object: "Referable", # type: ignore
committed_object: model.Referable,
store_object: model.Referable,
relative_path: List[str]) -> None:
if not isinstance(store_object, model.Identifiable):
raise CouchDBSourceError("The given store_object is not Identifiable, therefore cannot be found "
@@ -75,13 +74,9 @@ class CouchDBBackend(backends.Backend):
data = json.dumps({'data': store_object, "_rev": get_couchdb_revision(url)},
cls=json_serialization.AASToJsonEncoder)
request = urllib.request.Request(
url,
headers={'Content-type': 'application/json'},
method='PUT',
data=data.encode())
try:
response = CouchDBBackend.do_request(request)
response = CouchDBBackend.do_request(
url, method='PUT', additional_headers={'Content-type': 'application/json'}, body=data.encode('utf-8'))
set_couchdb_revision(url, response["rev"])
except CouchDBServerError as e:
if e.code == 409:
@@ -101,69 +96,82 @@ class CouchDBBackend(backends.Backend):
:return: URL to the document
:raises CouchDBBackendSourceError, if the source has the wrong format
"""
couchdb_s = re.match("couchdbs://", source) # Note: Works, since match only checks the beginning of the string
if couchdb_s:
if source.startswith("couchdbs://"):
url = source.replace("couchdbs://", "https://", 1)
elif source.startswith("couchdb://"):
url = source.replace("couchdb://", "http://", 1)
else:
couchdb_wo_s = re.match("couchdb://", source)
if couchdb_wo_s:
url = source.replace("couchdb://", "http://", 1)
else:
raise CouchDBSourceError("Source has wrong format. "
"Expected to start with {couchdb://, couchdbs://}, got {" + source + "}")
raise CouchDBSourceError("Source has wrong format. "
"Expected to start with {couchdb://, couchdbs://}, got {" + source + "}")
return url
@classmethod
def do_request(cls, request: urllib.request.Request) -> Dict[str, Any]:
def do_request(cls, url: str, method: str = "GET", additional_headers: Dict[str, str] = {},
body: Optional[bytes] = None) -> Dict[str, Any]:
"""
Perform an HTTP request to the CouchDBServer, parse the result and handle errors
:param request:
:return:
Perform an HTTP(S) request to the CouchDBServer, parse the result and handle errors
:param url: The HTTP or HTTPS URL to request
:param method: The HTTP method for the request
:param additional_headers: Additional headers to insert into the request. The default headers include
'connection: keep-alive', 'accept-encoding: ...', 'authorization: basic ...', 'Accept: ...'.
:param body: Request body for POST, PUT, and PATCH requests
:return: The parsed JSON data if the request `method` is other than 'HEAD' or the response headers for 'HEAD'
requests
"""
opener = urllib.request.build_opener(urllib.request.HTTPBasicAuthHandler(_credentials_store))
url_parts = urllib.parse.urlparse(url)
host = url_parts.scheme + url_parts.netloc
auth = _credentials_store.get(host)
headers = urllib3.make_headers(keep_alive=True, accept_encoding=True,
basic_auth="{}:{}".format(*auth) if auth else None)
headers['Accept'] = 'application/json'
headers.update(additional_headers)
try:
response = opener.open(request)
except urllib.error.HTTPError as e:
with e: # close the reponse (socket) when done
logger.debug("Request %s %s finished with HTTP status code %s.",
request.get_method(), request.full_url, e.code)
if e.headers.get('Content-type', None) != 'application/json':
raise CouchDBResponseError("Unexpected Content-type header {} of response from CouchDB server"
.format(e.headers.get('Content-type', None)))
if request.get_method() == 'HEAD':
raise CouchDBServerError(e.code, "", "", "HTTP {}") from e
try:
data = json.load(e)
except json.JSONDecodeError:
raise CouchDBResponseError("Could not parse error message of HTTP {}"
.format(e.code))
raise CouchDBServerError(e.code, data['error'], data['reason'],
"HTTP {}: {} (reason: {})".format(e.code, data['error'], data['reason']))\
from e
except urllib.error.URLError as e:
response = _http_pool_manager.request(method, url, headers=headers, body=body)
except (urllib3.exceptions.TimeoutError, urllib3.exceptions.SSLError, urllib3.exceptions.ProtocolError) as e:
raise CouchDBConnectionError("Error while connecting to the CouchDB server: {}".format(e)) from e
except urllib3.exceptions.HTTPError as e:
raise CouchDBResponseError("Error while connecting to the CouchDB server: {}".format(e)) from e
if not (200 <= response.status < 300):
logger.debug("Request %s %s finished with HTTP status code %s.",
method, url, response.status)
if response.headers.get('Content-type', None) != 'application/json':
raise CouchDBResponseError("Unexpected Content-type header {} of response from CouchDB server"
.format(response.headers.get('Content-type', None)))
if method == 'HEAD':
raise CouchDBServerError(response.status, "", "", "HTTP {}".format(response.status))
# Check response & parse data
assert (isinstance(response, http.client.HTTPResponse))
with response: # close the reponse (socket) when done
logger.debug("Request %s %s finished successfully.", request.get_method(), request.full_url)
if request.get_method() == 'HEAD':
return {}
if response.getheader('Content-type') != 'application/json':
raise CouchDBResponseError("Unexpected Content-type header")
try:
data = json.load(response, cls=json_deserialization.AASFromJsonDecoder)
except json.JSONDecodeError as e:
raise CouchDBResponseError("Could not parse CouchDB server response as JSON data.") from e
return data
data = json.loads(response.data.decode('utf-8'))
except json.JSONDecodeError:
raise CouchDBResponseError("Could not parse error message of HTTP {}"
.format(response.status))
raise CouchDBServerError(response.status, data['error'], data['reason'],
"HTTP {}: {} (reason: {})".format(response.status, data['error'], data['reason']))
# Check response & parse data
logger.debug("Request %s %s finished successfully.", method, url)
if method == 'HEAD':
return response.headers
if response.getheader('Content-type') != 'application/json':
raise CouchDBResponseError("Unexpected Content-type header")
try:
data = json.loads(response.data.decode('utf-8'), cls=json_deserialization.AASFromJsonDecoder)
except json.JSONDecodeError as e:
raise CouchDBResponseError("Could not parse CouchDB server response as JSON data.") from e
return data
backends.register_backend("couchdb", CouchDBBackend)
backends.register_backend("couchdbs", CouchDBBackend)
# Global registry for credentials for CouchDB Servers
_credentials_store: urllib.request.HTTPPasswordMgrWithPriorAuth = urllib.request.HTTPPasswordMgrWithPriorAuth()
_credentials_store: Dict[str, Tuple[str, str]] = {}
# Note: The HTTPPasswordMgr is not thread safe during writing, should be thread safe for reading only.
@@ -178,7 +186,8 @@ def register_credentials(url: str, username: str, password: str):
:param username: Username to that CouchDB instance
:param password: Password to the Username
"""
_credentials_store.add_password(None, url, username, password, is_authenticated=True)
url_parts = urllib.parse.urlparse(url)
_credentials_store[url_parts.scheme + url_parts.netloc] = (username, password)
# Global registry for CouchDB Revisions
@@ -252,12 +261,8 @@ class CouchDBObjectStore(model.AbstractObjectStore):
:param create: If True and the database does not exist, try to create it
:raises CouchDBError: If error occur during the request to the CouchDB server (see `_do_request()` for details)
"""
request = urllib.request.Request(
"{}/{}".format(self.url, self.database_name),
headers={'Accept': 'application/json'},
method='HEAD')
try:
CouchDBBackend.do_request(request)
CouchDBBackend.do_request("{}/{}".format(self.url, self.database_name), 'HEAD')
except CouchDBServerError as e:
# If an HTTPError is raised, re-raise it, unless it is a 404 error and we are requested to create the
# database
@@ -269,11 +274,7 @@ class CouchDBObjectStore(model.AbstractObjectStore):
# Create database
logger.info("Creating CouchDB database %s/%s ...", self.url, self.database_name)
request = urllib.request.Request(
"{}/{}".format(self.url, self.database_name),
headers={'Accept': 'application/json'},
method='PUT')
CouchDBBackend.do_request(request)
CouchDBBackend.do_request("{}/{}".format(self.url, self.database_name), 'PUT')
def get_identifiable(self, identifier: Union[str, model.Identifier]) -> model.Identifiable:
"""
@@ -289,11 +290,9 @@ class CouchDBObjectStore(model.AbstractObjectStore):
identifier = self._transform_id(identifier, False)
# Create and issue HTTP request (raises HTTPError on status != 200)
request = urllib.request.Request(
"{}/{}/{}".format(self.url, self.database_name, urllib.parse.quote(identifier, safe='')),
headers={'Accept': 'application/json'})
try:
data = CouchDBBackend.do_request(request)
data = CouchDBBackend.do_request(
"{}/{}/{}".format(self.url, self.database_name, urllib.parse.quote(identifier, safe='')))
except CouchDBServerError as e:
if e.code == 404:
raise KeyError("No Identifiable with id {} found in CouchDB database".format(identifier)) from e
@@ -334,13 +333,12 @@ class CouchDBObjectStore(model.AbstractObjectStore):
data = json.dumps({'data': x}, cls=json_serialization.AASToJsonEncoder)
# Create and issue HTTP request (raises HTTPError on status != 200)
request = urllib.request.Request(
"{}/{}/{}".format(self.url, self.database_name, self._transform_id(x.identification)),
headers={'Content-type': 'application/json'},
method='PUT',
data=data.encode())
try:
response = CouchDBBackend.do_request(request)
response = CouchDBBackend.do_request(
"{}/{}/{}".format(self.url, self.database_name, self._transform_id(x.identification)),
'PUT',
{'Content-type': 'application/json'},
data.encode('utf-8'))
set_couchdb_revision("{}/{}/{}".format(self.url, self.database_name, self._transform_id(x.identification)),
response["rev"])
except CouchDBServerError as e:
@@ -378,25 +376,19 @@ class CouchDBObjectStore(model.AbstractObjectStore):
# ETag response header
try:
logger.debug("fetching the current object revision for deletion ...")
request = urllib.request.Request(
"{}/{}/{}".format(self.url, self.database_name, self._transform_id(x.identification)),
headers={'Accept': 'application/json'},
method='HEAD')
opener = urllib.request.build_opener(urllib.request.HTTPBasicAuthHandler(_credentials_store))
response = opener.open(request)
rev = response.getheader('ETag')[1:-1]
except urllib.error.HTTPError as e:
headers = CouchDBBackend.do_request(
"{}/{}/{}".format(self.url, self.database_name, self._transform_id(x.identification)), 'HEAD')
rev = headers['ETag'][1:-1]
except CouchDBServerError as e:
if e.code == 404:
raise KeyError("No AAS object with id {} exists in CouchDB database".format(x.identification))\
from e
raise
request = urllib.request.Request(
"{}/{}/{}?rev={}".format(self.url, self.database_name, self._transform_id(x.identification), rev),
headers={'Content-type': 'application/json'},
method='DELETE')
try:
CouchDBBackend.do_request(request)
CouchDBBackend.do_request(
"{}/{}/{}?rev={}".format(self.url, self.database_name, self._transform_id(x.identification), rev),
'DELETE')
except CouchDBServerError as e:
if e.code == 404:
raise KeyError("No AAS object with id {} exists in CouchDB database".format(x.identification)) from e
@@ -428,12 +420,9 @@ class CouchDBObjectStore(model.AbstractObjectStore):
else:
return False
logger.debug("Checking existence of object with id %s in database ...", repr(x))
request = urllib.request.Request(
"{}/{}/{}".format(self.url, self.database_name, self._transform_id(identifier)),
headers={'Accept': 'application/json'},
method='HEAD')
try:
CouchDBBackend.do_request(request)
CouchDBBackend.do_request(
"{}/{}/{}".format(self.url, self.database_name, self._transform_id(identifier)), 'HEAD')
except CouchDBServerError as e:
if e.code == 404:
return False
@@ -448,10 +437,7 @@ class CouchDBObjectStore(model.AbstractObjectStore):
:raises CouchDBError: If error occur during the request to the CouchDB server (see `_do_request()` for details)
"""
logger.debug("Fetching number of documents from database ...")
request = urllib.request.Request(
"{}/{}".format(self.url, self.database_name),
headers={'Accept': 'application/json'})
data = CouchDBBackend.do_request(request)
data = CouchDBBackend.do_request("{}/{}".format(self.url, self.database_name))
return data['doc_count']
def __iter__(self) -> Iterator[model.Identifiable]:
@@ -476,10 +462,7 @@ class CouchDBObjectStore(model.AbstractObjectStore):
# Fetch a list of all ids and construct Iterator object
logger.debug("Creating iterator over objects in database ...")
request = urllib.request.Request(
"{}/{}/_all_docs".format(self.url, self.database_name),
headers={'Accept': 'application/json'})
data = CouchDBBackend.do_request(request)
data = CouchDBBackend.do_request("{}/{}/_all_docs".format(self.url, self.database_name))
return CouchDBIdentifiableIterator(self, (row['id'] for row in data['rows']))
@staticmethod
Loading