Skip to content
Snippets Groups Projects

Fix some CouchDB and update/commit things

Merged Michael Thies requested to merge fix/couchdb_update_commit_things into master
All threads resolved!

Files

+ 97
114
@@ -13,14 +13,12 @@ Todo: Add module docstring
@@ -13,14 +13,12 @@ Todo: Add module docstring
"""
"""
import threading
import threading
import weakref
import weakref
from typing import List, Dict, Any, Optional, Iterator, Iterable, Union
from typing import List, Dict, Any, Optional, Iterator, Iterable, Union, Tuple
import re
import urllib.parse
import urllib.parse
import urllib.request
import urllib.error
import logging
import logging
import json
import json
import http.client
 
import urllib3 # type: ignore
from . import backends
from . import backends
from aas.adapter.json import json_deserialization, json_serialization
from aas.adapter.json import json_deserialization, json_serialization
@@ -30,6 +28,9 @@ from aas import model
@@ -30,6 +28,9 @@ from aas import model
logger = logging.getLogger(__name__)
logger = logging.getLogger(__name__)
 
_http_pool_manager = urllib3.PoolManager()
 
 
class CouchDBBackend(backends.Backend):
class CouchDBBackend(backends.Backend):
"""
"""
This Backend stores each Identifiable object as a single JSON document in the configured CouchDB database. Each
This Backend stores each Identifiable object as a single JSON document in the configured CouchDB database. Each
@@ -39,18 +40,16 @@ class CouchDBBackend(backends.Backend):
@@ -39,18 +40,16 @@ class CouchDBBackend(backends.Backend):
"""
"""
@classmethod
@classmethod
def update_object(cls,
def update_object(cls,
updated_object: "Referable", # type: ignore
updated_object: model.Referable,
store_object: "Referable", # type: ignore
store_object: model.Referable,
relative_path: List[str]) -> None:
relative_path: List[str]) -> None:
if not isinstance(store_object, model.Identifiable):
if not isinstance(store_object, model.Identifiable):
raise CouchDBSourceError("The given store_object is not Identifiable, therefore cannot be found "
raise CouchDBSourceError("The given store_object is not Identifiable, therefore cannot be found "
"in the CouchDB")
"in the CouchDB")
url = CouchDBBackend._parse_source(store_object.source)
url = CouchDBBackend._parse_source(store_object.source)
request = urllib.request.Request(url,
headers={'Accept': 'application/json'})
try:
try:
data = CouchDBBackend.do_request(request)
data = CouchDBBackend.do_request(url)
except CouchDBServerError as e:
except CouchDBServerError as e:
if e.code == 404:
if e.code == 404:
raise KeyError("No Identifiable found in CouchDB at {}".format(url)) from e
raise KeyError("No Identifiable found in CouchDB at {}".format(url)) from e
@@ -62,8 +61,8 @@ class CouchDBBackend(backends.Backend):
@@ -62,8 +61,8 @@ class CouchDBBackend(backends.Backend):
@classmethod
@classmethod
def commit_object(cls,
def commit_object(cls,
committed_object: "Referable", # type: ignore
committed_object: model.Referable,
store_object: "Referable", # type: ignore
store_object: model.Referable,
relative_path: List[str]) -> None:
relative_path: List[str]) -> None:
if not isinstance(store_object, model.Identifiable):
if not isinstance(store_object, model.Identifiable):
raise CouchDBSourceError("The given store_object is not Identifiable, therefore cannot be found "
raise CouchDBSourceError("The given store_object is not Identifiable, therefore cannot be found "
@@ -75,13 +74,9 @@ class CouchDBBackend(backends.Backend):
@@ -75,13 +74,9 @@ class CouchDBBackend(backends.Backend):
data = json.dumps({'data': store_object, "_rev": get_couchdb_revision(url)},
data = json.dumps({'data': store_object, "_rev": get_couchdb_revision(url)},
cls=json_serialization.AASToJsonEncoder)
cls=json_serialization.AASToJsonEncoder)
request = urllib.request.Request(
url,
headers={'Content-type': 'application/json'},
method='PUT',
data=data.encode())
try:
try:
response = CouchDBBackend.do_request(request)
response = CouchDBBackend.do_request(
 
url, method='PUT', additional_headers={'Content-type': 'application/json'}, body=data.encode('utf-8'))
set_couchdb_revision(url, response["rev"])
set_couchdb_revision(url, response["rev"])
except CouchDBServerError as e:
except CouchDBServerError as e:
if e.code == 409:
if e.code == 409:
@@ -101,69 +96,82 @@ class CouchDBBackend(backends.Backend):
@@ -101,69 +96,82 @@ class CouchDBBackend(backends.Backend):
:return: URL to the document
:return: URL to the document
:raises CouchDBBackendSourceError, if the source has the wrong format
:raises CouchDBBackendSourceError, if the source has the wrong format
"""
"""
couchdb_s = re.match("couchdbs://", source) # Note: Works, since match only checks the beginning of the string
if source.startswith("couchdbs://"):
if couchdb_s:
url = source.replace("couchdbs://", "https://", 1)
url = source.replace("couchdbs://", "https://", 1)
 
elif source.startswith("couchdb://"):
 
url = source.replace("couchdb://", "http://", 1)
else:
else:
couchdb_wo_s = re.match("couchdb://", source)
raise CouchDBSourceError("Source has wrong format. "
if couchdb_wo_s:
"Expected to start with {couchdb://, couchdbs://}, got {" + source + "}")
url = source.replace("couchdb://", "http://", 1)
else:
raise CouchDBSourceError("Source has wrong format. "
"Expected to start with {couchdb://, couchdbs://}, got {" + source + "}")
return url
return url
@classmethod
@classmethod
def do_request(cls, request: urllib.request.Request) -> Dict[str, Any]:
def do_request(cls, url: str, method: str = "GET", additional_headers: Dict[str, str] = {},
 
body: Optional[bytes] = None) -> Dict[str, Any]:
"""
"""
Perform an HTTP request to the CouchDBServer, parse the result and handle errors
Perform an HTTP(S) request to the CouchDBServer, parse the result and handle errors
:param request:
:param url: The HTTP or HTTPS URL to request
:return:
:param method: The HTTP method for the request
 
:param additional_headers: Additional headers to insert into the request. The default headers include
 
'connection: keep-alive', 'accept-encoding: ...', 'authorization: basic ...', 'Accept: ...'.
 
:param body: Request body for POST, PUT, and PATCH requests
 
:return: The parsed JSON data if the request `method` is other than 'HEAD' or the response headers for 'HEAD'
 
requests
"""
"""
opener = urllib.request.build_opener(urllib.request.HTTPBasicAuthHandler(_credentials_store))
url_parts = urllib.parse.urlparse(url)
 
host = url_parts.scheme + url_parts.netloc
 
auth = _credentials_store.get(host)
 
headers = urllib3.make_headers(keep_alive=True, accept_encoding=True,
 
basic_auth="{}:{}".format(*auth) if auth else None)
 
headers['Accept'] = 'application/json'
 
headers.update(additional_headers)
 
try:
try:
response = opener.open(request)
response = _http_pool_manager.request(method, url, headers=headers, body=body)
except urllib.error.HTTPError as e:
except (urllib3.exceptions.TimeoutError, urllib3.exceptions.SSLError, urllib3.exceptions.ProtocolError) as e:
with e: # close the reponse (socket) when done
logger.debug("Request %s %s finished with HTTP status code %s.",
request.get_method(), request.full_url, e.code)
if e.headers.get('Content-type', None) != 'application/json':
raise CouchDBResponseError("Unexpected Content-type header {} of response from CouchDB server"
.format(e.headers.get('Content-type', None)))
if request.get_method() == 'HEAD':
raise CouchDBServerError(e.code, "", "", "HTTP {}") from e
try:
data = json.load(e)
except json.JSONDecodeError:
raise CouchDBResponseError("Could not parse error message of HTTP {}"
.format(e.code))
raise CouchDBServerError(e.code, data['error'], data['reason'],
"HTTP {}: {} (reason: {})".format(e.code, data['error'], data['reason']))\
from e
except urllib.error.URLError as e:
raise CouchDBConnectionError("Error while connecting to the CouchDB server: {}".format(e)) from e
raise CouchDBConnectionError("Error while connecting to the CouchDB server: {}".format(e)) from e
 
except urllib3.exceptions.HTTPError as e:
 
raise CouchDBResponseError("Error while connecting to the CouchDB server: {}".format(e)) from e
 
 
if not (200 <= response.status < 300):
 
logger.debug("Request %s %s finished with HTTP status code %s.",
 
method, url, response.status)
 
if response.headers.get('Content-type', None) != 'application/json':
 
raise CouchDBResponseError("Unexpected Content-type header {} of response from CouchDB server"
 
.format(response.headers.get('Content-type', None)))
 
 
if method == 'HEAD':
 
raise CouchDBServerError(response.status, "", "", "HTTP {}".format(response.status))
# Check response & parse data
assert (isinstance(response, http.client.HTTPResponse))
with response: # close the reponse (socket) when done
logger.debug("Request %s %s finished successfully.", request.get_method(), request.full_url)
if request.get_method() == 'HEAD':
return {}
if response.getheader('Content-type') != 'application/json':
raise CouchDBResponseError("Unexpected Content-type header")
try:
try:
data = json.load(response, cls=json_deserialization.AASFromJsonDecoder)
data = json.loads(response.data.decode('utf-8'))
except json.JSONDecodeError as e:
except json.JSONDecodeError:
raise CouchDBResponseError("Could not parse CouchDB server response as JSON data.") from e
raise CouchDBResponseError("Could not parse error message of HTTP {}"
return data
.format(response.status))
 
raise CouchDBServerError(response.status, data['error'], data['reason'],
 
"HTTP {}: {} (reason: {})".format(response.status, data['error'], data['reason']))
 
 
# Check response & parse data
 
logger.debug("Request %s %s finished successfully.", method, url)
 
if method == 'HEAD':
 
return response.headers
 
 
if response.getheader('Content-type') != 'application/json':
 
raise CouchDBResponseError("Unexpected Content-type header")
 
try:
 
data = json.loads(response.data.decode('utf-8'), cls=json_deserialization.AASFromJsonDecoder)
 
except json.JSONDecodeError as e:
 
raise CouchDBResponseError("Could not parse CouchDB server response as JSON data.") from e
 
return data
 
 
 
backends.register_backend("couchdb", CouchDBBackend)
 
backends.register_backend("couchdbs", CouchDBBackend)
# Global registry for credentials for CouchDB Servers
# Global registry for credentials for CouchDB Servers
_credentials_store: urllib.request.HTTPPasswordMgrWithPriorAuth = urllib.request.HTTPPasswordMgrWithPriorAuth()
_credentials_store: Dict[str, Tuple[str, str]] = {}
# Note: The HTTPPasswordMgr is not thread safe during writing, should be thread safe for reading only.
# Note: The HTTPPasswordMgr is not thread safe during writing, should be thread safe for reading only.
@@ -178,7 +186,8 @@ def register_credentials(url: str, username: str, password: str):
@@ -178,7 +186,8 @@ def register_credentials(url: str, username: str, password: str):
:param username: Username to that CouchDB instance
:param username: Username to that CouchDB instance
:param password: Password to the Username
:param password: Password to the Username
"""
"""
_credentials_store.add_password(None, url, username, password, is_authenticated=True)
url_parts = urllib.parse.urlparse(url)
 
_credentials_store[url_parts.scheme + url_parts.netloc] = (username, password)
# Global registry for CouchDB Revisions
# Global registry for CouchDB Revisions
@@ -252,12 +261,8 @@ class CouchDBObjectStore(model.AbstractObjectStore):
@@ -252,12 +261,8 @@ class CouchDBObjectStore(model.AbstractObjectStore):
:param create: If True and the database does not exist, try to create it
:param create: If True and the database does not exist, try to create it
:raises CouchDBError: If error occur during the request to the CouchDB server (see `_do_request()` for details)
:raises CouchDBError: If error occur during the request to the CouchDB server (see `_do_request()` for details)
"""
"""
request = urllib.request.Request(
"{}/{}".format(self.url, self.database_name),
headers={'Accept': 'application/json'},
method='HEAD')
try:
try:
CouchDBBackend.do_request(request)
CouchDBBackend.do_request("{}/{}".format(self.url, self.database_name), 'HEAD')
except CouchDBServerError as e:
except CouchDBServerError as e:
# If an HTTPError is raised, re-raise it, unless it is a 404 error and we are requested to create the
# If an HTTPError is raised, re-raise it, unless it is a 404 error and we are requested to create the
# database
# database
@@ -269,11 +274,7 @@ class CouchDBObjectStore(model.AbstractObjectStore):
@@ -269,11 +274,7 @@ class CouchDBObjectStore(model.AbstractObjectStore):
# Create database
# Create database
logger.info("Creating CouchDB database %s/%s ...", self.url, self.database_name)
logger.info("Creating CouchDB database %s/%s ...", self.url, self.database_name)
request = urllib.request.Request(
CouchDBBackend.do_request("{}/{}".format(self.url, self.database_name), 'PUT')
"{}/{}".format(self.url, self.database_name),
headers={'Accept': 'application/json'},
method='PUT')
CouchDBBackend.do_request(request)
def get_identifiable(self, identifier: Union[str, model.Identifier]) -> model.Identifiable:
def get_identifiable(self, identifier: Union[str, model.Identifier]) -> model.Identifiable:
"""
"""
@@ -289,11 +290,9 @@ class CouchDBObjectStore(model.AbstractObjectStore):
@@ -289,11 +290,9 @@ class CouchDBObjectStore(model.AbstractObjectStore):
identifier = self._transform_id(identifier, False)
identifier = self._transform_id(identifier, False)
# Create and issue HTTP request (raises HTTPError on status != 200)
# Create and issue HTTP request (raises HTTPError on status != 200)
request = urllib.request.Request(
"{}/{}/{}".format(self.url, self.database_name, urllib.parse.quote(identifier, safe='')),
headers={'Accept': 'application/json'})
try:
try:
data = CouchDBBackend.do_request(request)
data = CouchDBBackend.do_request(
 
"{}/{}/{}".format(self.url, self.database_name, urllib.parse.quote(identifier, safe='')))
except CouchDBServerError as e:
except CouchDBServerError as e:
if e.code == 404:
if e.code == 404:
raise KeyError("No Identifiable with id {} found in CouchDB database".format(identifier)) from e
raise KeyError("No Identifiable with id {} found in CouchDB database".format(identifier)) from e
@@ -334,13 +333,12 @@ class CouchDBObjectStore(model.AbstractObjectStore):
@@ -334,13 +333,12 @@ class CouchDBObjectStore(model.AbstractObjectStore):
data = json.dumps({'data': x}, cls=json_serialization.AASToJsonEncoder)
data = json.dumps({'data': x}, cls=json_serialization.AASToJsonEncoder)
# Create and issue HTTP request (raises HTTPError on status != 200)
# Create and issue HTTP request (raises HTTPError on status != 200)
request = urllib.request.Request(
"{}/{}/{}".format(self.url, self.database_name, self._transform_id(x.identification)),
headers={'Content-type': 'application/json'},
method='PUT',
data=data.encode())
try:
try:
response = CouchDBBackend.do_request(request)
response = CouchDBBackend.do_request(
 
"{}/{}/{}".format(self.url, self.database_name, self._transform_id(x.identification)),
 
'PUT',
 
{'Content-type': 'application/json'},
 
data.encode('utf-8'))
set_couchdb_revision("{}/{}/{}".format(self.url, self.database_name, self._transform_id(x.identification)),
set_couchdb_revision("{}/{}/{}".format(self.url, self.database_name, self._transform_id(x.identification)),
response["rev"])
response["rev"])
except CouchDBServerError as e:
except CouchDBServerError as e:
@@ -378,25 +376,19 @@ class CouchDBObjectStore(model.AbstractObjectStore):
@@ -378,25 +376,19 @@ class CouchDBObjectStore(model.AbstractObjectStore):
# ETag response header
# ETag response header
try:
try:
logger.debug("fetching the current object revision for deletion ...")
logger.debug("fetching the current object revision for deletion ...")
request = urllib.request.Request(
headers = CouchDBBackend.do_request(
"{}/{}/{}".format(self.url, self.database_name, self._transform_id(x.identification)),
"{}/{}/{}".format(self.url, self.database_name, self._transform_id(x.identification)), 'HEAD')
headers={'Accept': 'application/json'},
rev = headers['ETag'][1:-1]
method='HEAD')
except CouchDBServerError as e:
opener = urllib.request.build_opener(urllib.request.HTTPBasicAuthHandler(_credentials_store))
response = opener.open(request)
rev = response.getheader('ETag')[1:-1]
except urllib.error.HTTPError as e:
if e.code == 404:
if e.code == 404:
raise KeyError("No AAS object with id {} exists in CouchDB database".format(x.identification))\
raise KeyError("No AAS object with id {} exists in CouchDB database".format(x.identification))\
from e
from e
raise
raise
request = urllib.request.Request(
"{}/{}/{}?rev={}".format(self.url, self.database_name, self._transform_id(x.identification), rev),
headers={'Content-type': 'application/json'},
method='DELETE')
try:
try:
CouchDBBackend.do_request(request)
CouchDBBackend.do_request(
 
"{}/{}/{}?rev={}".format(self.url, self.database_name, self._transform_id(x.identification), rev),
 
'DELETE')
except CouchDBServerError as e:
except CouchDBServerError as e:
if e.code == 404:
if e.code == 404:
raise KeyError("No AAS object with id {} exists in CouchDB database".format(x.identification)) from e
raise KeyError("No AAS object with id {} exists in CouchDB database".format(x.identification)) from e
@@ -428,12 +420,9 @@ class CouchDBObjectStore(model.AbstractObjectStore):
@@ -428,12 +420,9 @@ class CouchDBObjectStore(model.AbstractObjectStore):
else:
else:
return False
return False
logger.debug("Checking existence of object with id %s in database ...", repr(x))
logger.debug("Checking existence of object with id %s in database ...", repr(x))
request = urllib.request.Request(
"{}/{}/{}".format(self.url, self.database_name, self._transform_id(identifier)),
headers={'Accept': 'application/json'},
method='HEAD')
try:
try:
CouchDBBackend.do_request(request)
CouchDBBackend.do_request(
 
"{}/{}/{}".format(self.url, self.database_name, self._transform_id(identifier)), 'HEAD')
except CouchDBServerError as e:
except CouchDBServerError as e:
if e.code == 404:
if e.code == 404:
return False
return False
@@ -448,10 +437,7 @@ class CouchDBObjectStore(model.AbstractObjectStore):
@@ -448,10 +437,7 @@ class CouchDBObjectStore(model.AbstractObjectStore):
:raises CouchDBError: If error occur during the request to the CouchDB server (see `_do_request()` for details)
:raises CouchDBError: If error occur during the request to the CouchDB server (see `_do_request()` for details)
"""
"""
logger.debug("Fetching number of documents from database ...")
logger.debug("Fetching number of documents from database ...")
request = urllib.request.Request(
data = CouchDBBackend.do_request("{}/{}".format(self.url, self.database_name))
"{}/{}".format(self.url, self.database_name),
headers={'Accept': 'application/json'})
data = CouchDBBackend.do_request(request)
return data['doc_count']
return data['doc_count']
def __iter__(self) -> Iterator[model.Identifiable]:
def __iter__(self) -> Iterator[model.Identifiable]:
@@ -476,10 +462,7 @@ class CouchDBObjectStore(model.AbstractObjectStore):
@@ -476,10 +462,7 @@ class CouchDBObjectStore(model.AbstractObjectStore):
# Fetch a list of all ids and construct Iterator object
# Fetch a list of all ids and construct Iterator object
logger.debug("Creating iterator over objects in database ...")
logger.debug("Creating iterator over objects in database ...")
request = urllib.request.Request(
data = CouchDBBackend.do_request("{}/{}/_all_docs".format(self.url, self.database_name))
"{}/{}/_all_docs".format(self.url, self.database_name),
headers={'Accept': 'application/json'})
data = CouchDBBackend.do_request(request)
return CouchDBIdentifiableIterator(self, (row['id'] for row in data['rows']))
return CouchDBIdentifiableIterator(self, (row['id'] for row in data['rows']))
@staticmethod
@staticmethod
Loading