Skip to content
Snippets Groups Projects
Commit 2ce4311d authored by Sebastian Heppner's avatar Sebastian Heppner
Browse files

Merge branch 'feature/backend_couchdb' into 'master'

Feature/backend couchdb

Closes #78

See merge request !50
parents be574330 44cfc792
No related branches found
No related tags found
No related merge requests found
Pipeline #354308 passed
......@@ -33,7 +33,7 @@ test:
- pip install --cache-dir="$PIP_CACHE_DIR" unittest-xml-reporting coverage
- pip install --cache-dir="$PIP_CACHE_DIR" -r requirements.txt
# Setup test config and CouchDB database server
- echo -e "[couchdb]\nurl = http://couchdb:5984" > test/test_config.ini
- echo -e "[couchdb]\nurl = http://couchdb:5984\n" > test/test_config.ini
- python test/_helper/setup_testdb.py -u "$COUCHDB_USER" -p "$COUCHDB_PASSWORD"
# Add source directory to PYTHONPATH to allow testing our CLI scripts, which import our modules
- export PYTHONPATH=".:$PYTHONPATH"
......
......@@ -26,7 +26,7 @@ import re
from typing import List, Dict, Type, TYPE_CHECKING
if TYPE_CHECKING:
from .model import Referable
from ..model import Referable
class Backend(metaclass=abc.ABCMeta):
......
......@@ -9,121 +9,238 @@
# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the License for the
# specific language governing permissions and limitations under the License.
"""
CouchDB backend for persistently storing AAS objects
Todo: Add module docstring
"""
import threading
import weakref
from typing import List, Dict, Any, Optional, Iterator, Iterable, Union
import re
import urllib.parse
import urllib.request
import urllib.error
import logging
import json
import http.client
This module provides the `CouchDBObjectStore` class, that implements the AbstractObjectStore interface for storing and
retrieving Identifiable PyI40AAS objects in/from a CouchDB server. The objects are serialized to JSON using the
aas.adapter.json package and transferred to the configured CouchDB database.
from . import backends
from aas.adapter.json import json_deserialization, json_serialization
from aas import model
Typical usage:
database = CouchDBObjectStore('localhost:5984', 'aas_test')
database.login('user', 'password')
logger = logging.getLogger(__name__)
submodel = aas.model.Submodel(...)
database.add(submodel)
aas = database.get_identifiable(aas.model.Identifier('https://acplt.org/MyAAS', aas.model.IdentifierType.IRI))
aas.description['de'] = "Eine neue Beschreibung"
aas.commit_changes()
class CouchDBBackend(backends.Backend):
"""
This Backend stores each Identifiable object as a single JSON document in the configured CouchDB database. Each
document's id is build from the object's identifier using the pattern {idtype}-{idvalue}; the document's contents
comprise a single property "data", containing the JSON serialization of the PyI40AAS object. The aas.adapter.json
package is used for serialization and deserialization of objects.
"""
@classmethod
def update_object(cls,
updated_object: "Referable", # type: ignore
store_object: "Referable", # type: ignore
relative_path: List[str]) -> None:
if not isinstance(store_object, model.Identifiable):
raise CouchDBSourceError("The given store_object is not Identifiable, therefore cannot be found "
"in the CouchDB")
url = CouchDBBackend._parse_source(store_object.source)
request = urllib.request.Request(url,
headers={'Accept': 'application/json'})
try:
data = CouchDBBackend.do_request(request)
except CouchDBServerError as e:
if e.code == 404:
raise KeyError("No Identifiable found in CouchDB at {}".format(url)) from e
raise
database.logout()
updated_store_object = data['data']
set_couchdb_revision(url, data["_rev"])
store_object.update_from(updated_store_object)
To allow committing changes, the objects retrieved from the CouchDBObjectStore are instances of special classes
(`CouchDBAssetAdministrationShell`, etc.), inheriting from the special base class `CouchDBIdentifiable`. However, these
classes also inherit from the appropriate class in `aas.model` to be used as any other PyI40AAS object.
@classmethod
def commit_object(cls,
committed_object: "Referable", # type: ignore
store_object: "Referable", # type: ignore
relative_path: List[str]) -> None:
if not isinstance(store_object, model.Identifiable):
raise CouchDBSourceError("The given store_object is not Identifiable, therefore cannot be found "
"in the CouchDB")
url = CouchDBBackend._parse_source(store_object.source)
# We need to get the revision of the object, if it already exists, otherwise we cannot write to the Couchdb
if get_couchdb_revision(url) is None:
raise CouchDBConflictError("No revision found for the given object. Try calling `update` on it.")
data = json.dumps({'data': store_object, "_rev": get_couchdb_revision(url)},
cls=json_serialization.AASToJsonEncoder)
request = urllib.request.Request(
url,
headers={'Content-type': 'application/json'},
method='PUT',
data=data.encode())
try:
response = CouchDBBackend.do_request(request)
set_couchdb_revision(url, response["rev"])
except CouchDBServerError as e:
if e.code == 409:
raise CouchDBConflictError("Could not commit changes to id {} due to a concurrent modification in the "
"database.".format(store_object.identification)) from e
elif e.code == 404:
raise KeyError("Object with id {} was not found in the CouchDB at {}"
.format(store_object.identification, url)) from e
raise
Additionally, this module defines a custom Exception class `CouchDBError` and some subclasses. These Exceptions are used
to unambiguously report errors (connection errors, parser errors or HTTP errors from the server) when interacting with
the CouchDB server.
@classmethod
def _parse_source(cls, source: str) -> str:
"""
Parses the source parameter of a model.Referable object
import abc
import http.client
import http.cookiejar
import json
from typing import Iterator, Dict, Optional, Any, Iterable, Union
import urllib.parse
import urllib.request
import urllib.error
import threading
import logging
:param source: Source string of the model.Referable object
:return: URL to the document
:raises CouchDBBackendSourceError, if the source has the wrong format
"""
couchdb_s = re.match("couchdbs://", source) # Note: Works, since match only checks the beginning of the string
if couchdb_s:
url = source.replace("couchdbs://", "https://", 1)
else:
couchdb_wo_s = re.match("couchdb://", source)
if couchdb_wo_s:
url = source.replace("couchdb://", "http://", 1)
else:
raise CouchDBSourceError("Source has wrong format. "
"Expected to start with {couchdb://, couchdbs://}, got {" + source + "}")
return url
from .. import model
from .json import StrictAASFromJsonDecoder, AASToJsonEncoder
@classmethod
def do_request(cls, request: urllib.request.Request) -> Dict[str, Any]:
"""
Perform an HTTP request to the CouchDBServer, parse the result and handle errors
logger = logging.getLogger(__name__)
:param request:
:return:
"""
opener = urllib.request.build_opener(urllib.request.HTTPBasicAuthHandler(_credentials_store))
try:
response = opener.open(request)
except urllib.error.HTTPError as e:
logger.debug("Request %s %s finished with HTTP status code %s.",
request.get_method(), request.full_url, e.code)
if e.headers.get('Content-type', None) != 'application/json':
raise CouchDBResponseError("Unexpected Content-type header {} of response from CouchDB server"
.format(e.headers.get('Content-type', None)))
if request.get_method() == 'HEAD':
raise CouchDBServerError(e.code, "", "", "HTTP {}") from e
class CouchDBObjectStore(model.AbstractObjectStore):
try:
data = json.load(e)
except json.JSONDecodeError:
raise CouchDBResponseError("Could not parse error message of HTTP {}"
.format(e.code))
raise CouchDBServerError(e.code, data['error'], data['reason'],
"HTTP {}: {} (reason: {})".format(e.code, data['error'], data['reason'])) from e
except urllib.error.URLError as e:
raise CouchDBConnectionError("Error while connecting to the CouchDB server: {}".format(e)) from e
# Check response & parse data
assert (isinstance(response, http.client.HTTPResponse))
logger.debug("Request %s %s finished successfully.", request.get_method(), request.full_url)
if request.get_method() == 'HEAD':
return {}
if response.getheader('Content-type') != 'application/json':
raise CouchDBResponseError("Unexpected Content-type header")
try:
data = json.load(response, cls=json_deserialization.AASFromJsonDecoder)
except json.JSONDecodeError as e:
raise CouchDBResponseError("Could not parse CouchDB server response as JSON data.") from e
return data
# Global registry for credentials for CouchDB Servers
_credentials_store: urllib.request.HTTPPasswordMgrWithPriorAuth = urllib.request.HTTPPasswordMgrWithPriorAuth()
# Note: The HTTPPasswordMgr is not thread safe during writing, should be thread safe for reading only.
def register_credentials(url: str, username: str, password: str):
"""
An ObjectStore implementation for Identifiable PyI40AAS objects backed by a CouchDB database server.
Register the credentials of a CouchDB server to the global credentials store
This ObjectStore stores each Identifiable object as a single JSON document in the configured CouchDB database. Each
document's id is build from the object's identifier using the pattern {idtype}-{idvalue}; the document's contents
comprise a single property "data", containing the JSON serialization of the PyI40AAS object. The aas.adapter.json
package is used for serialization and deserialization of objects.
Warning: Do not use this function, while other threads may be accessing the credentials via the CouchDBObjectStore
or update or commit functions of model.base.Referable objects!
Objects retrieved from the CouchDBObjectStore are instances of the appropriate PyI40AAS model class. Additionally,
they inherit from the special base class `CouchDBIdentifiable`. It provides a `commit()` method to write back
changes, which have been made to the object, to the database.
:param url: Toplevel URL
:param username: Username to that CouchDB instance
:param password: Password to the Username
"""
_credentials_store.add_password(None, url, username, password, is_authenticated=True)
All methods of the `CouchDBObjectStore` are blocking, i.e. they stop the current thread's execution until they
receive a response from the CouchDB server (or encounter a timeout). However, the `CouchDBObjectStore` objects are
thread-safe, meaning that you may run multiple method calls on the same CouchDBObjectStore in parallel in different
threads. For example, you could use a ThreadPoolExecutor to add a large number of objects to the database:
import concurrent.futures
submodels = [submodel1, submodel2, submodel3]
database = CouchDBObjectStore('localhost:5984', 'aas_test')
database.login('test', 'test')
with concurrent.futures.ThreadPoolExecutor() as pool:
pool.map(database.add, submodels)
# Global registry for CouchDB Revisions
_revision_store_lock = threading.Lock()
_revision_store: Dict[str, str] = {}
def set_couchdb_revision(url: str, revision: str):
"""
def __init__(self, url: str, database: str):
self.url = url
self.database_name = database
Set the CouchDB revision of the given document in the revision store
# Build shared cookie jar for session caching and thread-local store for OpenerDirector
self._cookie_jar = http.cookiejar.CookieJar()
self._thread_local = threading.local()
:param url: URL to the CouchDB document
:param revision: CouchDB revision
"""
with _revision_store_lock:
_revision_store[url] = revision
# TODO method to delete database
def login(self, user: str, password: str):
def get_couchdb_revision(url: str) -> Optional[str]:
"""
Login at the CouchDB server with the given user credentials.
Get the CouchDB revision from the revision store for the given URL to a CouchDB Document
This method uses the /_session endpoint of the CouchDB server to obtain a session cookie, which is used for
further HTTP requests. This is required to be performed before any other request to the object store, unless
the CouchDB server does not require authentication.
:param url: URL to the CouchDB document
:return: CouchDB-revision, if there is one, otherwise returns None
"""
with _revision_store_lock:
return _revision_store.get(url)
:raises CouchDBError: If error occur during the request to the CouchDB server (see `_do_request()` for details)
def delete_couchdb_revision(url: str):
"""
logger.info("Logging in to CouchDB server %s with user %s ...", self.url, user)
request = urllib.request.Request(
"{}/_session".format(self.url),
headers={'Content-type': 'application/json'},
method='POST',
data=json.dumps({'name': user, 'password': password}).encode())
self._do_request(request)
Delete the CouchDB revision from the revision store for the given URL to a CouchDB Document
def logout(self):
:param url: URL to the CouchDB document
"""
Logout from the CouchDB server.
with _revision_store_lock:
del _revision_store[url]
This method uses the /_session endpoint of the CouchDB server to invalidate the user session and delete the
session cookie.
:raises CouchDBError: If error occur during the request to the CouchDB server (see `_do_request()` for details)
class CouchDBObjectStore(model.AbstractObjectStore):
"""
logger.info("Logging out from CouchDB server %s ...", self.url)
request = urllib.request.Request(
"{}/_session".format(self.url),
headers={'Content-type': 'application/json'},
method='DELETE')
self._do_request(request)
An ObjectStore implementation for Identifiable PyI40AAS objects backed by a CouchDB database server.
All methods of the `CouchDBObjectStore` are blocking, i.e. they stop the current thread's execution until they
receive a response from the CouchDB server (or encounter a timeout). However, the `CouchDBObjectStore` objects are
thread-safe, as long as no CouchDB credentials are added (via `register_credentials()`) during transactions.
"""
def __init__(self, url: str, database: str):
"""
Initializer of class CouchDBObjectStore
:param url: URL to the CouchDB
:param database: Name of the Database inside the CouchDB
"""
self.url: str = url
self.database_name: str = database
# A dictionary of weak references to local replications of stored objects. Objects are kept in this cache as
# long as there is any other reference in the Python application to them. We use this to make sure that only one
# local replication of each object is kept in the application and retrieving an object from the store always
# returns the **same** (not only equal) object. Still, objects are forgotten, when they are not referenced
# anywhere else to save memory.
self._object_cache: weakref.WeakValueDictionary[model.Identifier, model.Identifiable]\
= weakref.WeakValueDictionary()
self._object_cache_lock = threading.Lock()
def check_database(self, create=False):
"""
......@@ -137,7 +254,7 @@ class CouchDBObjectStore(model.AbstractObjectStore):
headers={'Accept': 'application/json'},
method='HEAD')
try:
self._do_request(request)
CouchDBBackend.do_request(request)
except CouchDBServerError as e:
# If an HTTPError is raised, re-raise it, unless it is a 404 error and we are requested to create the
# database
......@@ -153,12 +270,15 @@ class CouchDBObjectStore(model.AbstractObjectStore):
"{}/{}".format(self.url, self.database_name),
headers={'Accept': 'application/json'},
method='PUT')
self._do_request(request)
CouchDBBackend.do_request(request)
def get_identifiable(self, identifier: Union[str, model.Identifier]) -> "CouchDBIdentifiable":
def get_identifiable(self, identifier: Union[str, model.Identifier]) -> model.Identifiable:
"""
Retrieve an AAS object from the CouchDB by its Identifier
If the identifier is a string, it is assumed that the string is a correct couchdb-ID-string (according to the
internal conversion rules, see CouchDBObjectStore._transform_id() )
:raises KeyError: If no such object is stored in the database
:raises CouchDBError: If error occur during the request to the CouchDB server (see `_do_request()` for details)
"""
......@@ -170,7 +290,7 @@ class CouchDBObjectStore(model.AbstractObjectStore):
"{}/{}/{}".format(self.url, self.database_name, urllib.parse.quote(identifier, safe='')),
headers={'Accept': 'application/json'})
try:
data = self._do_request(request)
data = CouchDBBackend.do_request(request)
except CouchDBServerError as e:
if e.code == 404:
raise KeyError("No Identifiable with id {} found in CouchDB database".format(identifier)) from e
......@@ -178,11 +298,25 @@ class CouchDBObjectStore(model.AbstractObjectStore):
# Add CouchDB meta data (for later commits) to object
obj = data['data']
if not isinstance(obj, CouchDBIdentifiable):
if not isinstance(obj, model.Identifiable):
raise CouchDBResponseError("The CouchDB document with id {} does not contain an identifiable AAS object."
.format(identifier))
obj._store = self
obj.couchdb_revision = data['_rev']
self.generate_source(obj) # Generate the source parameter of this object
set_couchdb_revision("{}/{}/{}".format(self.url, self.database_name, urllib.parse.quote(identifier, safe='')),
data["_rev"])
# If we still have a local replication of that object (since it is referenced from anywhere else), update that
# replication and return it.
with self._object_cache_lock:
if obj.identification in self._object_cache:
old_obj = self._object_cache[obj.identification]
# If the source does not match the correct source for this CouchDB backend, the object seems to belong
# to another backend now, so we return a fresh copy
if old_obj.source == obj.source:
old_obj.update_from(obj)
return old_obj
self._object_cache[obj.identification] = obj
return obj
def add(self, x: model.Identifiable) -> None:
......@@ -194,7 +328,7 @@ class CouchDBObjectStore(model.AbstractObjectStore):
"""
logger.debug("Adding object %s to CouchDB database ...", repr(x))
# Serialize data
data = json.dumps({'data': x}, cls=AASToJsonEncoder)
data = json.dumps({'data': x}, cls=json_serialization.AASToJsonEncoder)
# Create and issue HTTP request (raises HTTPError on status != 200)
request = urllib.request.Request(
......@@ -203,44 +337,17 @@ class CouchDBObjectStore(model.AbstractObjectStore):
method='PUT',
data=data.encode())
try:
self._do_request(request)
response = CouchDBBackend.do_request(request)
set_couchdb_revision("{}/{}/{}".format(self.url, self.database_name, self._transform_id(x.identification)),
response["rev"])
except CouchDBServerError as e:
if e.code == 409:
raise KeyError("Identifiable with id {} already exists in CouchDB database".format(x.identification))\
from e
raise
def commit(self, x: "CouchDBIdentifiable") -> None:
"""
Commit in-memory changes in a CouchDBIdentifiable PyI40AAS object to the database
:param x: The changed object
:raises KeyError: If the object does not exist in the database
:raises CouchDBConflictError: If a concurrent modification (or deletion) in the database was detected
:raises CouchDBError: If error occur during the request to the CouchDB server (see `_do_request()` for details)
"""
logger.debug("Committing changes of object %s based on revision %s to CouchDB database ...",
repr(x), x.couchdb_revision)
# Serialize data
data = json.dumps({'data': x, '_rev': x.couchdb_revision}, cls=AASToJsonEncoder)
# Create and issue HTTP request (raises HTTPError on status != 200)
request = urllib.request.Request(
"{}/{}/{}".format(self.url, self.database_name, self._transform_id(x.identification)),
headers={'Content-type': 'application/json'},
method='PUT',
data=data.encode())
try:
response_data = self._do_request(request)
except CouchDBServerError as e:
if e.code == 409:
raise CouchDBConflictError("Could not commit changes to id {} due to a concurrent modification in the "
"database.".format(x.identification)) from e
elif e.code == 404:
raise KeyError("Object with id {} was not found in the database {}"
.format(x.identification, self.database_name)) from e
raise
x.couchdb_revision = response_data['rev']
with self._object_cache_lock:
self._object_cache[x.identification] = x
self.generate_source(x) # Set the source of the object
def discard(self, x: model.Identifiable, safe_delete=False) -> None:
"""
......@@ -255,34 +362,52 @@ class CouchDBObjectStore(model.AbstractObjectStore):
:raises CouchDBError: If error occur during the request to the CouchDB server (see `_do_request()` for details)
"""
logger.debug("Deleting object %s from CouchDB database ...", repr(x))
# If x is not a CouchDBIdentifiable, retrieve x from the database to get the current couchdb_revision
if hasattr(x, 'couchdb_revision') and safe_delete:
rev = x.couchdb_revision # type: ignore
logger.debug("using the object's stored revision token %s for deletion.",
x.couchdb_revision) # type: ignore
rev = get_couchdb_revision("{}/{}/{}".format(self.url,
self.database_name,
self._transform_id(x.identification)))
if rev is not None and safe_delete:
logger.debug("using the object's stored revision token %s for deletion." % rev)
elif safe_delete:
raise CouchDBConflictError("No CouchDBRevision found for the object")
else:
# If not safe_delete, fetch the current document revision from the database using a HEAD request and the
# ETag response header
try:
logger.debug("fetching the current object revision for deletion ...")
current = self.get_identifiable(x.identification)
except KeyError as e:
raise KeyError("No AAS object with id {} exists in CouchDB database".format(x.identification)) from e
rev = current.couchdb_revision
logger.debug("using the current object revision %s for deletion.")
request = urllib.request.Request(
"{}/{}/{}".format(self.url, self.database_name, self._transform_id(x.identification)),
headers={'Accept': 'application/json'},
method='HEAD')
opener = urllib.request.build_opener(urllib.request.HTTPBasicAuthHandler(_credentials_store))
response = opener.open(request)
rev = response.getheader('ETag')[1:-1]
except urllib.error.HTTPError as e:
if e.code == 404:
raise KeyError("No AAS object with id {} exists in CouchDB database".format(x.identification))\
from e
raise
request = urllib.request.Request(
"{}/{}/{}?rev={}".format(self.url, self.database_name, self._transform_id(x.identification), rev),
headers={'Content-type': 'application/json'},
method='DELETE')
try:
self._do_request(request)
CouchDBBackend.do_request(request)
except CouchDBServerError as e:
if e.code == 404:
raise KeyError("No AAS object with id {} exists in CouchDB database".format(x.identification)) from e
elif e.code == 409:
raise CouchDBConflictError(
"Object with id {} has been modified in the database since the version requested to be deleted."
.format(x.identification)) from e
"Object with id {} has been modified in the database since "
"the version requested to be deleted.".format(x.identification)) from e
raise
delete_couchdb_revision("{}/{}/{}".format(self.url,
self.database_name,
self._transform_id(x.identification)))
with self._object_cache_lock:
del self._object_cache[x.identification]
x.source = ""
def __contains__(self, x: object) -> bool:
"""
......@@ -299,13 +424,13 @@ class CouchDBObjectStore(model.AbstractObjectStore):
identifier = x.identification
else:
return False
logger.debug("Checking existance of object with id %s in database ...", repr(x))
logger.debug("Checking existence of object with id %s in database ...", repr(x))
request = urllib.request.Request(
"{}/{}/{}".format(self.url, self.database_name, self._transform_id(identifier)),
headers={'Accept': 'application/json'},
method='HEAD')
try:
self._do_request(request)
CouchDBBackend.do_request(request)
except CouchDBServerError as e:
if e.code == 404:
return False
......@@ -323,7 +448,7 @@ class CouchDBObjectStore(model.AbstractObjectStore):
request = urllib.request.Request(
"{}/{}".format(self.url, self.database_name),
headers={'Accept': 'application/json'})
data = self._do_request(request)
data = CouchDBBackend.do_request(request)
return data['doc_count']
def __iter__(self) -> Iterator[model.Identifiable]:
......@@ -351,66 +476,9 @@ class CouchDBObjectStore(model.AbstractObjectStore):
request = urllib.request.Request(
"{}/{}/_all_docs".format(self.url, self.database_name),
headers={'Accept': 'application/json'})
data = self._do_request(request)
data = CouchDBBackend.do_request(request)
return CouchDBIdentifiableIterator(self, (row['id'] for row in data['rows']))
def _do_request(self, request: urllib.request.Request) -> Dict[str, Any]:
"""
Perform an HTTP request to the CouchDB server, parse the result and handle errors
This function performs the request described by the given Request object, checks the response status code and
either raises a CouchDBError or returns the parsed JSON response data.
:raises CouchDBServerError: When receiving an HTTP status code != 200
:raises CouchDBResponseError: When the HTTP response could not be parsed
:raises CouchDBConnectionError: On errors while connecting to the CouchDB server
"""
# Create thread-local OpenerDirector with shared cookie jar if not existing in this thread
if hasattr(self._thread_local, 'opener'):
opener = self._thread_local.opener
else:
logger.debug("Creating new urllib OpenerDirector for current thread.")
opener = urllib.request.build_opener(urllib.request.HTTPCookieProcessor(self._cookie_jar))
self._thread_local.opener = opener
# Do request and handle HTTP Errors
logger.debug("Sending HTTP request to CouchDB server: %s %s ...", request.get_method(), request.full_url)
try:
response = opener.open(request)
except urllib.error.HTTPError as e:
logger.debug("Request %s %s finished with HTTP status code %s.",
request.get_method(), request.full_url, e.code)
if e.headers.get('Content-type', None) != 'application/json':
raise CouchDBResponseError("Unexpected Content-type header {} of response from CouchDB server"
.format(e.headers.get('Content-type', None)))
if request.get_method() == 'HEAD':
raise CouchDBServerError(e.code, "", "", "HTTP {}") from e
try:
data = json.load(e)
except json.JSONDecodeError:
raise CouchDBResponseError("Could not parse error message of HTTP {}"
.format(e.code))
raise CouchDBServerError(e.code, data['error'], data['reason'],
"HTTP {}: {} (reason: {})".format(e.code, data['error'], data['reason'])) from e
except urllib.error.URLError as e:
raise CouchDBConnectionError("Error while connecting to the CouchDB server: {}".format(e)) from e
# Check response & parse data
assert (isinstance(response, http.client.HTTPResponse))
logger.debug("Request %s %s finished successfully.", request.get_method(), request.full_url)
if request.get_method() == 'HEAD':
return {}
if response.getheader('Content-type') != 'application/json':
raise CouchDBResponseError("Unexpected Content-type header")
try:
data = json.load(response, cls=CouchDBJSONDecoder)
except json.JSONDecodeError as e:
raise CouchDBResponseError("Could not parse CouchDB server response as JSON data.") from e
return data
@staticmethod
def _transform_id(identifier: model.Identifier, url_quote=True) -> str:
"""
......@@ -423,99 +491,42 @@ class CouchDBObjectStore(model.AbstractObjectStore):
result = urllib.parse.quote(result, safe='')
return result
# #################################################################################################
# Special object classes for Identifiable PyI40AAS objects retrieved from the CouchDBObjectStore
class CouchDBIdentifiable(model.Identifiable, metaclass=abc.ABCMeta):
"""
Special base class for Identifiable PyI40AAS retrieved from the CouchDBObjectStore, allowing to write back (commit)
changes to the database.
This is an abstract base class. For each Identifiable AAS object type, there is one subclass, inheriting from this
abstract base class and the appropriate aas.model class.
This base class provides the `commit_changes()` method and the `_store` and `couchdb_revision` attributes required
to perform the commit action. `_store` holds a reference to the CouchDBObjectStore instance; `couchdb_revision`
contains the CouchDB document revision token of the latest object revision in the database. It is transferred to
CouchDB when committing changes to check for editing conflicts.
def generate_source(self, identifiable: model.Identifiable):
"""
Generates the source string for an Identifiable object that is backed by the Couchdb
def __init__(self) -> None:
super().__init__()
self._store: Optional[CouchDBObjectStore] = None
self.couchdb_revision: Optional[str] = None
def commit_changes(self) -> None:
if self._store is None:
raise ValueError("CouchDBIdentifiable is not associated with a store")
self._store.commit(self)
class CouchDBAssetAdministrationShell(model.AssetAdministrationShell, CouchDBIdentifiable):
pass
class CouchDBAsset(model.Asset, CouchDBIdentifiable):
pass
class CouchDBConceptDescription(model.ConceptDescription, CouchDBIdentifiable):
pass
class CouchDBSubmodel(model.Submodel, CouchDBIdentifiable):
pass
class CouchDBJSONDecoder(StrictAASFromJsonDecoder):
"""
Special json.JSONDecoder class for deserializing AAS objects received from the CouchDB server
This class inherits from StrictAASFromJsonDecoder to deserialize AAS JSON structures into the corresponding PyI40AAS
object classes. However, it overrides the constructor methods of all Identifiable AAS objects to create instances of
the `CouchDBIdentifiable` classes, defined above, instead of the usual aas.model classes.
:param identifiable: Identifiable object
"""
@classmethod
def _construct_asset_administration_shell(
cls, dct: Dict[str, object], object_class=model.AssetAdministrationShell) -> model.AssetAdministrationShell:
return super()._construct_asset_administration_shell(dct, object_class=CouchDBAssetAdministrationShell)
@classmethod
def _construct_asset(cls, dct: Dict[str, object], object_class=model.Asset) -> model.Asset:
return super()._construct_asset(dct, object_class=CouchDBAsset)
@classmethod
def _construct_concept_description(cls, dct: Dict[str, object], object_class=model.ConceptDescription)\
-> model.ConceptDescription:
return super()._construct_concept_description(dct, object_class=CouchDBConceptDescription)
@classmethod
def _construct_submodel(cls, dct: Dict[str, object], object_class=model.Submodel) -> model.Submodel:
return super()._construct_submodel(dct, object_class=CouchDBSubmodel)
source: str = self.url.replace("https://", "couchdbs://").replace("http://", "couchdb://")
source += "/" + self.database_name + "/" + self._transform_id(identifiable.identification)
identifiable.source = source
# #################################################################################################
# Custom Exception classes for reporting errors during interaction with the CouchDB server
class CouchDBError(Exception):
"""Base class of all exceptions raised by the CouchDBObjectStore"""
pass
class CouchDBSourceError(CouchDBError):
"""Exception raised when the source has the wrong format"""
pass
class CouchDBConnectionError(CouchDBError):
"""Exception raised by the CouchDBObjectStore when the CouchDB server could not be reached"""
"""Exception raised when the CouchDB server could not be reached"""
pass
class CouchDBResponseError(CouchDBError):
"""Exception raised by the CouchDBObjectStore when an HTTP of the CouchDB server could not be handled (e.g.
"""Exception raised by when an HTTP of the CouchDB server could not be handled (e.g.
no JSON body)"""
pass
class CouchDBServerError(CouchDBError):
"""Exception raised by the CouchDBObjectStore when the CouchDB server returns an unexpected error code"""
"""Exception raised when the CouchDB server returns an unexpected error code"""
def __init__(self, code: int, error: str, reason: str, *args):
super().__init__(*args)
self.code = code
......@@ -524,6 +535,6 @@ class CouchDBServerError(CouchDBError):
class CouchDBConflictError(CouchDBError):
"""Exception raised by the CouchDBObjectStore when an object could not be committed due to an concurrent
"""Exception raised when an object could not be committed due to an concurrent
modification in the database"""
pass
......@@ -22,7 +22,7 @@ from typing import List, Optional, Set, TypeVar, MutableSet, Generic, Iterable,
import re
from . import datatypes
from .. import backends
from ..backend import backends
if TYPE_CHECKING:
from . import provider
......@@ -541,7 +541,7 @@ class Referable(metaclass=abc.ABCMeta):
break
return None, None
def update_from(self, other: "Referable"):
def update_from(self, other: "Referable", update_source: bool = False):
"""
Internal function to updates the object's attributes from another object of a similar type.
......@@ -549,9 +549,12 @@ class Referable(metaclass=abc.ABCMeta):
protocol clients, etc.) to update the object's data, after `update()` has been called.
:param other: The object to update from
:param update_source: Update the source attribute with the other's source attribute. This is not propagated
recursively
"""
for name, var in vars(other).items():
if name == "parent": # do not update the parent
# do not update the parent or source (depending on update_source parameter)
if name == "parent" or name == "source" and not update_source:
continue
if isinstance(var, NamespaceSet):
# update the elements of the NameSpaceSet
......@@ -1126,7 +1129,7 @@ class NamespaceSet(MutableSet[_RT], Generic[_RT]):
referable = self._backend[other_referable.id_short]
if type(referable) is type(other_referable):
# referable is the same as other referable
referable.update_from(other_referable)
referable.update_from(other_referable, update_source=True)
except KeyError:
# other referable is not in NamespaceSet
referables_to_add.append(other_referable)
......
from unittest import mock
import unittest
from aas import backends
from aas.backend import backends
class BackendsTest(unittest.TestCase):
def test_backend_store(self):
with mock.patch("aas.backends.Backend") as mock_backend:
with mock.patch("aas.backend.backends.Backend") as mock_backend:
backends.register_backend("mockScheme", mock_backend)
self.assertEqual(backends.get_backend("mockScheme:x-test:test_backend"), mock_backend)
......
......@@ -9,15 +9,15 @@
# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the License for the
# specific language governing permissions and limitations under the License.
import base64
import concurrent.futures
import configparser
import copy
import os
import unittest
import unittest.mock
import urllib.request
import urllib.error
from aas.adapter import couchdb
from aas.backend import backends, couchdb
from aas.examples.data.example_aas import *
......@@ -25,8 +25,11 @@ TEST_CONFIG = configparser.ConfigParser()
TEST_CONFIG.read((os.path.join(os.path.dirname(__file__), "..", "test_config.default.ini"),
os.path.join(os.path.dirname(__file__), "..", "test_config.ini")))
source_core: str = "couchdb://" + TEST_CONFIG["couchdb"]["url"].lstrip("http://") + "/" + \
TEST_CONFIG["couchdb"]["database"] + "/"
# Check if CouchDB database is avalable. Otherwise, skip tests.
# Check if CouchDB database is available. Otherwise, skip tests.
try:
request = urllib.request.Request(
"{}/{}".format(TEST_CONFIG['couchdb']['url'], TEST_CONFIG['couchdb']['database']),
......@@ -44,173 +47,174 @@ except urllib.error.URLError as e:
COUCHDB_ERROR = e
class CouchDBBackendOfflineMethodsTest(unittest.TestCase):
def test_parse_source(self):
couchdb.register_credentials(url="couchdb.plt.rwth-aachen.de:5984",
username="test_user",
password="test_password")
url = couchdb.CouchDBBackend._parse_source(
"couchdbs://couchdb.plt.rwth-aachen.de:5984/path_to_db/path_to_doc"
)
expected_url = "https://couchdb.plt.rwth-aachen.de:5984/path_to_db/path_to_doc"
self.assertEqual(expected_url, url)
url = couchdb.CouchDBBackend._parse_source(
"couchdb://couchdb.plt.rwth-aachen.de:5984/path_to_db/path_to_doc"
)
expected_url = "http://couchdb.plt.rwth-aachen.de:5984/path_to_db/path_to_doc"
self.assertEqual(expected_url, url)
with self.assertRaises(couchdb.CouchDBSourceError) as cm:
couchdb.CouchDBBackend._parse_source("wrong_scheme:plt.rwth-aachen.couchdb:5984/path_to_db/path_to_doc")
self.assertEqual("Source has wrong format. "
"Expected to start with {couchdb, couchdbs}, got "
"{wrong_scheme:plt.rwth-aachen.couchdb:5984/path_to_db/path_to_doc}",
cm.exception)
@unittest.skipUnless(COUCHDB_OKAY, "No CouchDB is reachable at {}/{}: {}".format(TEST_CONFIG['couchdb']['url'],
TEST_CONFIG['couchdb']['database'],
COUCHDB_ERROR))
class CouchDBTest(unittest.TestCase):
class CouchDBBackendTest(unittest.TestCase):
def setUp(self) -> None:
# Create CouchDB store, login and check database
self.db = couchdb.CouchDBObjectStore(TEST_CONFIG['couchdb']['url'], TEST_CONFIG['couchdb']['database'])
self.db.login(TEST_CONFIG['couchdb']['user'], TEST_CONFIG['couchdb']['password'])
self.db.check_database()
self.object_store = couchdb.CouchDBObjectStore(TEST_CONFIG['couchdb']['url'],
TEST_CONFIG['couchdb']['database'])
couchdb.register_credentials(TEST_CONFIG["couchdb"]["url"],
TEST_CONFIG["couchdb"]["user"],
TEST_CONFIG["couchdb"]["password"])
backends.register_backend("couchdb", couchdb.CouchDBBackend)
self.object_store.check_database()
def tearDown(self) -> None:
self.db.clear()
self.db.logout()
self.object_store.clear()
def test_object_store_add(self):
test_object = create_example_submodel()
self.object_store.add(test_object)
self.assertEqual(test_object.source, source_core+"IRI-https%3A%2F%2Facplt.org%2FTest_Submodel")
def test_retrieval(self):
test_object = create_example_submodel()
self.object_store.add(test_object)
# When retrieving the object, we should get the *same* instance as we added
test_object_retrieved = self.object_store.get_identifiable(
model.Identifier(id_='https://acplt.org/Test_Submodel', id_type=model.IdentifierType.IRI))
self.assertIs(test_object, test_object_retrieved)
# When retrieving it again, we should still get the same object
del test_object
test_object_retrieved_again = self.object_store.get_identifiable(
model.Identifier(id_='https://acplt.org/Test_Submodel', id_type=model.IdentifierType.IRI))
self.assertIs(test_object_retrieved, test_object_retrieved_again)
# However, a changed source should invalidate the cached object, so we should get a new copy
test_object_retrieved.source = "couchdb://example.com/example/IRI-https%3A%2F%2Facplt.org%2FTest_Submodel"
test_object_retrieved_third = self.object_store.get_identifiable(
model.Identifier(id_='https://acplt.org/Test_Submodel', id_type=model.IdentifierType.IRI))
self.assertIsNot(test_object_retrieved, test_object_retrieved_third)
def test_example_submodel_storing(self) -> None:
example_submodel = create_example_submodel()
# Add exmaple submodel
self.db.add(example_submodel)
self.assertEqual(1, len(self.db))
self.assertIn(example_submodel, self.db)
self.object_store.add(example_submodel)
self.assertEqual(1, len(self.object_store))
self.assertIn(example_submodel, self.object_store)
# Restore example submodel and check data
submodel_restored = self.db.get_identifiable(
submodel_restored = self.object_store.get_identifiable(
model.Identifier(id_='https://acplt.org/Test_Submodel', id_type=model.IdentifierType.IRI))
assert (isinstance(submodel_restored, model.Submodel))
checker = AASDataChecker(raise_immediately=True)
check_example_submodel(checker, submodel_restored)
# Delete example submodel
self.db.discard(submodel_restored)
self.assertNotIn(example_submodel, self.db)
self.object_store.discard(submodel_restored)
self.assertNotIn(example_submodel, self.object_store)
def test_iterating(self) -> None:
example_data = create_full_example()
# Add all objects
for item in example_data:
self.db.add(item)
self.object_store.add(item)
self.assertEqual(6, len(self.db))
self.assertEqual(6, len(self.object_store))
# Iterate objects, add them to a DictObjectStore and check them
retrieved_data_store: model.provider.DictObjectStore[model.Identifiable] = model.provider.DictObjectStore()
for item in self.db:
for item in self.object_store:
retrieved_data_store.add(item)
checker = AASDataChecker(raise_immediately=True)
check_full_example(checker, retrieved_data_store)
def test_parallel_iterating(self) -> None:
example_data = create_full_example()
ids = [item.identification for item in example_data]
# Add objects via thread pool executor
with concurrent.futures.ThreadPoolExecutor() as pool:
result = pool.map(self.db.add, example_data)
list(result) # Iterate Executor result to raise exceptions
self.assertEqual(6, len(self.db))
# Retrieve objects via thread pool executor
with concurrent.futures.ThreadPoolExecutor() as pool:
retrieved_objects = pool.map(self.db.get_identifiable, ids)
retrieved_data_store: model.provider.DictObjectStore[model.Identifiable] = model.provider.DictObjectStore()
for item in retrieved_objects:
retrieved_data_store.add(item)
self.assertEqual(6, len(retrieved_data_store))
checker = AASDataChecker(raise_immediately=True)
check_full_example(checker, retrieved_data_store)
# Delete objects via thread pool executor
with concurrent.futures.ThreadPoolExecutor() as pool:
result = pool.map(self.db.discard, example_data)
list(result) # Iterate Executor result to raise exceptions
self.assertEqual(0, len(self.db))
def test_key_errors(self) -> None:
# Double adding an object should raise a KeyError
example_submodel = create_example_submodel()
self.db.add(example_submodel)
self.object_store.add(example_submodel)
with self.assertRaises(KeyError) as cm:
self.db.add(example_submodel)
self.object_store.add(example_submodel)
self.assertEqual("'Identifiable with id Identifier(IRI=https://acplt.org/Test_Submodel) already exists in "
"CouchDB database'", str(cm.exception))
# Querying a deleted object should raise a KeyError
retrieved_submodel = self.db.get_identifiable(
retrieved_submodel = self.object_store.get_identifiable(
model.Identifier('https://acplt.org/Test_Submodel', model.IdentifierType.IRI))
self.db.discard(example_submodel)
self.object_store.discard(example_submodel)
with self.assertRaises(KeyError) as cm:
self.db.get_identifiable(model.Identifier('https://acplt.org/Test_Submodel', model.IdentifierType.IRI))
self.object_store.get_identifiable(model.Identifier('https://acplt.org/Test_Submodel',
model.IdentifierType.IRI))
self.assertEqual("'No Identifiable with id IRI-https://acplt.org/Test_Submodel found in CouchDB database'",
str(cm.exception))
# Double deleting should also raise a KeyError
with self.assertRaises(KeyError) as cm:
self.db.discard(retrieved_submodel)
self.object_store.discard(retrieved_submodel)
self.assertEqual("'No AAS object with id Identifier(IRI=https://acplt.org/Test_Submodel) exists in "
"CouchDB database'", str(cm.exception))
def test_conflict_errors(self) -> None:
def test_conflict_errors(self):
# Preperation: add object and retrieve it from the database
example_submodel = create_example_submodel()
self.db.add(example_submodel)
retrieved_submodel = self.db.get_identifiable(
self.object_store.add(example_submodel)
retrieved_submodel = self.object_store.get_identifiable(
model.Identifier('https://acplt.org/Test_Submodel', model.IdentifierType.IRI))
# Simulate a concurrent modification
remote_modified_submodel = copy.copy(retrieved_submodel)
remote_modified_submodel.id_short = "newIdShort"
remote_modified_submodel.commit_changes()
# Simulate a concurrent modification (Commit submodel, while preventing that the couchdb revision store is
# updated)
with unittest.mock.patch("aas.backend.couchdb.set_couchdb_revision"):
retrieved_submodel.commit()
# Committing changes to the retrieved object should now raise a conflict error
retrieved_submodel.id_short = "myOtherNewIdShort"
with self.assertRaises(couchdb.CouchDBConflictError) as cm:
retrieved_submodel.commit_changes()
retrieved_submodel.commit()
self.assertEqual("Could not commit changes to id Identifier(IRI=https://acplt.org/Test_Submodel) due to a "
"concurrent modification in the database.", str(cm.exception))
# Deleting the submodel with safe_delete should also raise a conflict error. Deletion without safe_delete should
# work
with self.assertRaises(couchdb.CouchDBConflictError) as cm:
self.db.discard(retrieved_submodel, True)
self.object_store.discard(retrieved_submodel, True)
self.assertEqual("Object with id Identifier(IRI=https://acplt.org/Test_Submodel) has been modified in the "
"database since the version requested to be deleted.", str(cm.exception))
self.db.discard(retrieved_submodel, False)
self.assertEqual(0, len(self.db))
self.object_store.discard(retrieved_submodel, False)
self.assertEqual(0, len(self.object_store))
# Committing after deletion should also raise a conflict error
with self.assertRaises(couchdb.CouchDBConflictError) as cm:
retrieved_submodel.commit_changes()
self.assertEqual("Could not commit changes to id Identifier(IRI=https://acplt.org/Test_Submodel) due to a "
"concurrent modification in the database.", str(cm.exception))
def test_editing(self) -> None:
example_submodel = create_example_submodel()
self.db.add(example_submodel)
# Committing after deletion should not raise a conflict error due to removal of the source attribute
retrieved_submodel.commit()
# Retrieve submodel from database and change ExampleCapability's semanticId
submodel = self.db.get_identifiable(
model.Identifier('https://acplt.org/Test_Submodel', model.IdentifierType.IRI))
assert(isinstance(submodel, couchdb.CouchDBSubmodel))
capability = submodel.submodel_element.get_referable('ExampleCapability')
capability.semantic_id = model.Reference((model.Key(type_=model.KeyElements.GLOBAL_REFERENCE,
local=False,
value='http://acplt.org/Capabilities/AnotherCapability',
id_type=model.KeyType.IRDI),))
# Commit changes
submodel.commit_changes()
def test_editing(self):
test_object = create_example_submodel()
self.object_store.add(test_object)
# Change ExampleSubmodelCollectionOrdered's description
collection = submodel.submodel_element.get_referable('ExampleSubmodelCollectionOrdered')
collection.description['de'] = "Eine sehr wichtige Sammlung von Elementen" # type: ignore
# Test if commit uploads changes
test_object.id_short = "SomeNewIdShort"
test_object.commit()
# Commit changes
submodel.commit_changes()
# Check version in database
new_submodel = self.db.get_identifiable(
model.Identifier('https://acplt.org/Test_Submodel', model.IdentifierType.IRI))
assert(isinstance(new_submodel, couchdb.CouchDBSubmodel))
capability = new_submodel.submodel_element.get_referable('ExampleCapability')
assert(isinstance(capability, model.Capability))
self.assertEqual('http://acplt.org/Capabilities/AnotherCapability',
capability.semantic_id.key[0].value) # type: ignore
collection = new_submodel.submodel_element.get_referable('ExampleSubmodelCollectionOrdered')
self.assertEqual("Eine sehr wichtige Sammlung von Elementen", collection.description['de']) # type: ignore
# Test if update restores changes
test_object.id_short = "AnotherIdShort"
test_object.update()
self.assertEqual("SomeNewIdShort", test_object.id_short)
......@@ -13,7 +13,8 @@ import unittest
from unittest import mock
from typing import Optional, List
from aas import model, backends
from aas import model
from aas.backend import backends
from aas.model import Identifier, Identifiable
......
0% Loading or .
You are about to add 0 people to the discussion. Proceed with caution.
Please register or to comment