Skip to content
Snippets Groups Projects
Commit 59ed8cf5 authored by Sebastian Heppner's avatar Sebastian Heppner
Browse files

backend.couchdb: Integrate CouchDBObjectStore functionality into backend.couchdb

parent 96000f0d
No related branches found
No related tags found
1 merge request!50Feature/backend couchdb
Pipeline #349528 failed
......@@ -11,7 +11,7 @@
"""
Todo: Add module docstring
"""
from typing import List, Dict, Any, Optional, Iterator, Iterable
from typing import List, Dict, Any, Optional, Iterator, Iterable, Union
import re
import urllib.parse
import urllib.request
......@@ -55,6 +55,7 @@ class CouchDBBackend(backends.Backend):
raise
updated_store_object = data['data']
set_couchdb_revision(url, data["_rev"])
store_object.update_from(updated_store_object)
@classmethod
......@@ -67,30 +68,19 @@ class CouchDBBackend(backends.Backend):
"in the CouchDB")
url = CouchDBBackend._parse_source(store_object.source)
# We need to get the revision of the object, if it already exists, otherwise we cannot write to the Couchdb
request = urllib.request.Request(
url,
headers={'Content-type': 'application/json'},
method='GET'
)
couchdb_revision: Optional[str] = None
try:
revision_data = CouchDBBackend.do_request(request)
couchdb_revision = revision_data['_rev']
except CouchDBServerError as e:
if not e.code == 404:
raise
if get_couchdb_revision(url) is None:
raise CouchDBConflictError("No revision found for the given object. Try calling `update` on it.")
data_to_commit: Dict[str, Any] = {'data': store_object}
if couchdb_revision:
data_to_commit['_rev'] = couchdb_revision
data = json.dumps(data_to_commit, cls=json_serialization.AASToJsonEncoder)
data = json.dumps({'data': store_object, "_rev": get_couchdb_revision(url)},
cls=json_serialization.AASToJsonEncoder)
request = urllib.request.Request(
url,
headers={'Content-type': 'application/json'},
method='PUT',
data=data.encode())
try:
CouchDBBackend.do_request(request)
response = CouchDBBackend.do_request(request)
set_couchdb_revision(url, response["rev"])
except CouchDBServerError as e:
if e.code == 409:
raise CouchDBConflictError("Could not commit changes to id {} due to a concurrent modification in the "
......@@ -100,46 +90,6 @@ class CouchDBBackend(backends.Backend):
.format(store_object.identification, url)) from e
raise
@classmethod
def delete_object(cls, obj: "Referable"): # type: ignore
"""
Deletes the given object from the couchdb
:param obj: Object to delete
"""
if not isinstance(obj, model.Identifiable):
raise CouchDBSourceError("The given store_object is not Identifiable, therefore cannot be found "
"in the CouchDB")
url = CouchDBBackend._parse_source(obj.source)
# We need to get the revision of the object, if it already exists, otherwise we cannot write to the Couchdb
req = urllib.request.Request(
url,
headers={'Content-type': 'application/json'},
method='GET'
)
try:
data = CouchDBBackend.do_request(req)
couchdb_revision: str = data['_rev']
except CouchDBServerError as e:
if e.code == 404:
raise KeyError("Object with id {} was not found in the CouchDB at {}"
.format(obj.identification, url)) from e
raise
req = urllib.request.Request(url+"?rev="+couchdb_revision,
headers={'Content-type': 'application/json'},
method='DELETE')
try:
CouchDBBackend.do_request(req)
except CouchDBServerError as e:
if e.code == 409:
raise CouchDBConflictError("Could not delete object with to id {} due to a concurrent modification in "
"the database.".format(obj.identification)) from e
elif e.code == 404:
raise KeyError("Object with id {} was not found in the CouchDB at {}"
.format(obj.identification, url)) from e
raise
@classmethod
def _parse_source(cls, source: str) -> str:
"""
......@@ -149,16 +99,16 @@ class CouchDBBackend(backends.Backend):
:return: URL to the document
:raises CouchDBBackendSourceError, if the source has the wrong format
"""
couchdb_s = re.match("couchdbs:", source) # Note: Works, since match only checks the beginning of the string
couchdb_s = re.match("couchdbs://", source) # Note: Works, since match only checks the beginning of the string
if couchdb_s:
url = source.replace("couchdbs:", "https://", 1)
url = source.replace("couchdbs://", "https://", 1)
else:
couchdb_wo_s = re.match("couchdb:", source)
couchdb_wo_s = re.match("couchdb://", source)
if couchdb_wo_s:
url = source.replace("couchdb:", "http://", 1)
url = source.replace("couchdb://", "http://", 1)
else:
raise CouchDBSourceError("Source has wrong format. "
"Expected to start with {couchdb, couchdbs}, got {" + source + "}")
"Expected to start with {couchdb://, couchdbs://}, got {" + source + "}")
return url
@classmethod
......@@ -209,15 +159,15 @@ class CouchDBBackend(backends.Backend):
# Global registry for credentials for CouchDB Servers
_credentials_store: urllib.request.HTTPPasswordMgrWithPriorAuth = urllib.request.HTTPPasswordMgrWithPriorAuth()
# todo: Why does this work and not HTTPPasswordMgrWithBasicAuth?
# https://stackoverflow.com/questions/29708708/http-basic-authentication-not-working-in-python-3-4
# Note: The HTTPPasswordMgr is not thread safe during writing, should be thread safe for reading only.
def register_credentials(url: str, username: str, password: str):
"""
Register the credentials of a CouchDB server to the global credentials store
Todo: make thread safe
Warning: Do not use this function, while other threads may be accessing the credentials via the CouchDBObjectStore
or update or commit functions of model.base.Referable objects!
:param url: Toplevel URL
:param username: Username to that CouchDB instance
......@@ -226,6 +176,39 @@ def register_credentials(url: str, username: str, password: str):
_credentials_store.add_password(None, url, username, password, is_authenticated=True)
# Global registry for CouchDB Revisions
_revision_store: Dict[str, str] = {}
def set_couchdb_revision(url: str, revision: str):
"""
Set the CouchDB revision of the given document in the revision store
:param url: URL to the CouchDB document
:param revision: CouchDB revision
"""
_revision_store[url] = revision
def get_couchdb_revision(url: str) -> Optional[str]:
"""
Get the CouchDB revision from the revision store for the given URL to a CouchDB Document
:param url: URL to the CouchDB document
:return: CouchDB-revision, if there is one, otherwise returns None
"""
return _revision_store.get(url)
def delete_couchdb_revision(url: str):
"""
Delete the CouchDB revision from the revision store for the given URL to a CouchDB Document
:param url: URL to the CouchDB document
"""
del _revision_store[url]
class CouchDBObjectStore(model.AbstractObjectStore):
"""
An ObjectStore implementation for Identifiable PyI40AAS objects backed by a CouchDB database server.
......@@ -253,9 +236,6 @@ class CouchDBObjectStore(model.AbstractObjectStore):
"""
self.url: str = url
self.database_name: str = database
self.ssl: bool = False
if re.match("https:", self.url):
self.ssl = True
def check_database(self, create=False):
"""
......@@ -287,10 +267,13 @@ class CouchDBObjectStore(model.AbstractObjectStore):
method='PUT')
CouchDBBackend.do_request(request)
def get_identifiable(self, identifier: model.Identifier) -> model.Identifiable:
def get_identifiable(self, identifier: Union[str, model.Identifier]) -> model.Identifiable:
"""
Retrieve an AAS object from the CouchDB by its Identifier
If the identifier is a string, it is assumed that the string is a correct couchdb-ID-string (according to the
internal conversion rules, see CouchDBObjectStore._transform_id() )
:raises KeyError: If no such object is stored in the database
:raises CouchDBError: If error occur during the request to the CouchDB server (see `_do_request()` for details)
"""
......@@ -313,8 +296,9 @@ class CouchDBObjectStore(model.AbstractObjectStore):
if not isinstance(obj, model.Identifiable):
raise CouchDBResponseError("The CouchDB document with id {} does not contain an identifiable AAS object."
.format(identifier))
obj._store = self
obj.couchdb_revision = data['_rev']
self.generate_source(obj) # Generate the source parameter of this object
set_couchdb_revision("{}/{}/{}".format(self.url, self.database_name, urllib.parse.quote(identifier, safe='')),
data["_rev"])
return obj
def add(self, x: model.Identifiable) -> None:
......@@ -335,12 +319,67 @@ class CouchDBObjectStore(model.AbstractObjectStore):
method='PUT',
data=data.encode())
try:
CouchDBBackend.do_request(request)
response = CouchDBBackend.do_request(request)
set_couchdb_revision("{}/{}/{}".format(self.url, self.database_name, self._transform_id(x.identification)),
response["rev"])
except CouchDBServerError as e:
if e.code == 409:
raise KeyError("Identifiable with id {} already exists in CouchDB database".format(x.identification))\
from e
raise
self.generate_source(x) # Set the source of the object
def discard(self, x: model.Identifiable, safe_delete=False) -> None:
"""
Delete an Identifiable AAS object from the CouchDB database
:param x: The object to be deleted
:param safe_delete: If True, only delete the object if it has not been modified in the database in comparison to
the provided revision. This uses the CouchDB revision token and thus only works with
CouchDBIdentifiable objects retrieved from this database.
:raises KeyError: If the object does not exist in the database
:raises CouchDBConflictError: If safe_delete is true and the object has been modified or deleted in the database
:raises CouchDBError: If error occur during the request to the CouchDB server (see `_do_request()` for details)
"""
logger.debug("Deleting object %s from CouchDB database ...", repr(x))
# If x is not a CouchDBIdentifiable, retrieve x from the database to get the current couchdb_revision
rev = get_couchdb_revision("{}/{}/{}".format(self.url,
self.database_name,
self._transform_id(x.identification)))
if rev is not None:
logger.debug("using the object's stored revision token %s for deletion." %rev)
if rev is None:
if safe_delete:
raise CouchDBConflictError("No CouchDBRevision found for the object")
else:
try:
logger.debug("fetching the current object revision for deletion ...")
self.get_identifiable(x.identification)
except KeyError as e:
raise KeyError(
"No AAS object with id {} exists in CouchDB database".format(x.identification)) from e
rev = get_couchdb_revision("{}/{}/{}".format(self.url,
self.database_name,
self._transform_id(x.identification)))
logger.debug("using the current object revision %s for deletion." % rev)
request = urllib.request.Request(
"{}/{}/{}?rev={}".format(self.url, self.database_name, self._transform_id(x.identification), rev),
headers={'Content-type': 'application/json'},
method='DELETE')
try:
CouchDBBackend.do_request(request)
except CouchDBServerError as e:
if e.code == 404:
raise KeyError("No AAS object with id {} exists in CouchDB database".format(x.identification)) from e
elif e.code == 409:
raise CouchDBConflictError(
"Object with id {} has been modified in the database since "
"the version requested to be deleted.".format(x.identification)) from e
raise
delete_couchdb_revision("{}/{}/{}".format(self.url,
self.database_name,
self._transform_id(x.identification)))
def __contains__(self, x: object) -> bool:
"""
......@@ -357,7 +396,7 @@ class CouchDBObjectStore(model.AbstractObjectStore):
identifier = x.identification
else:
return False
logger.debug("Checking existance of object with id %s in database ...", repr(x))
logger.debug("Checking existence of object with id %s in database ...", repr(x))
request = urllib.request.Request(
"{}/{}/{}".format(self.url, self.database_name, self._transform_id(identifier)),
headers={'Accept': 'application/json'},
......@@ -430,12 +469,8 @@ class CouchDBObjectStore(model.AbstractObjectStore):
:param identifiable: Identifiable object
"""
source: str = self.url
if self.ssl:
source.replace("https://", "couchdbs:")
else:
source.replace("http://", "couchdb")
source += self.database_name + "/" + self._transform_id(identifiable.identification)
source: str = self.url.replace("https://", "couchdbs://").replace("http://", "couchdb://")
source += "/" + self.database_name + "/" + self._transform_id(identifiable.identification)
identifiable.source = source
......
......@@ -10,13 +10,13 @@
# specific language governing permissions and limitations under the License.
import base64
import configparser
import copy
import os
import unittest
import urllib.request
import urllib.error
from aas.backend import backends, couchdb
from aas import model
from aas.examples.data.example_aas import *
......@@ -24,7 +24,7 @@ TEST_CONFIG = configparser.ConfigParser()
TEST_CONFIG.read((os.path.join(os.path.dirname(__file__), "..", "test_config.default.ini"),
os.path.join(os.path.dirname(__file__), "..", "test_config.ini")))
source_core: str = "couchdb:" + TEST_CONFIG["couchdb"]["url"].lstrip("http://") + "/" + \
source_core: str = "couchdb://" + TEST_CONFIG["couchdb"]["url"].lstrip("http://") + "/" + \
TEST_CONFIG["couchdb"]["database"] + "/"
......@@ -53,13 +53,13 @@ class CouchDBBackendOfflineMethodsTest(unittest.TestCase):
password="test_password")
url = couchdb.CouchDBBackend._parse_source(
"couchdbs:couchdb.plt.rwth-aachen.de:5984/path_to_db/path_to_doc"
"couchdbs://couchdb.plt.rwth-aachen.de:5984/path_to_db/path_to_doc"
)
expected_url = "https://couchdb.plt.rwth-aachen.de:5984/path_to_db/path_to_doc"
self.assertEqual(expected_url, url)
url = couchdb.CouchDBBackend._parse_source(
"couchdb:couchdb.plt.rwth-aachen.de:5984/path_to_db/path_to_doc"
"couchdb://couchdb.plt.rwth-aachen.de:5984/path_to_db/path_to_doc"
)
expected_url = "http://couchdb.plt.rwth-aachen.de:5984/path_to_db/path_to_doc"
self.assertEqual(expected_url, url)
......@@ -76,74 +76,130 @@ class CouchDBBackendOfflineMethodsTest(unittest.TestCase):
TEST_CONFIG['couchdb']['database'],
COUCHDB_ERROR))
class CouchDBBackendTest(unittest.TestCase):
def test_authorization(self):
couchdb.register_credentials(TEST_CONFIG["couchdb"]["url"].lstrip("http://"),
def setUp(self) -> None:
self.object_store = couchdb.CouchDBObjectStore(TEST_CONFIG['couchdb']['url'],
TEST_CONFIG['couchdb']['database'])
couchdb.register_credentials(TEST_CONFIG["couchdb"]["url"],
TEST_CONFIG["couchdb"]["user"],
TEST_CONFIG["couchdb"]["password"])
req = urllib.request.Request("{}/{}".format(TEST_CONFIG["couchdb"]["url"], TEST_CONFIG["couchdb"]["database"]),
headers={'Content-type': 'application/json'})
couchdb.CouchDBBackend.do_request(req)
backends.register_backend("couchdb", couchdb.CouchDBBackend)
self.object_store.check_database()
def test_commit_object(self):
test_object = create_example_submodel()
test_object.source = source_core + "example_submodel"
couchdb.CouchDBBackend.commit_object(test_object, test_object, [])
# Cleanup CouchDB
couchdb.CouchDBBackend.delete_object(test_object)
def tearDown(self) -> None:
self.object_store.clear()
def test_commit_nested_object(self):
backends.register_backend("couchdb", couchdb.CouchDBBackend)
test_submodel = create_example_submodel()
test_submodel.source = source_core + "another_example_submodel"
test_property = test_submodel.get_referable("ExampleSubmodelCollectionOrdered").get_referable("ExampleProperty")
self.assertIsInstance(test_property, model.Property)
test_property.commit()
# Cleanup CouchDB
couchdb.CouchDBBackend.delete_object(test_submodel)
def test_update_object(self):
def test_object_store_add(self):
test_object = create_example_submodel()
test_object.source = source_core + "example_submodel"
couchdb.CouchDBBackend.commit_object(test_object, test_object, [])
couchdb.CouchDBBackend.update_object(test_object, test_object, [])
# Cleanup CouchDB
couchdb.CouchDBBackend.delete_object(test_object)
def test_update_nested_object(self):
test_submodel = create_example_submodel()
test_submodel.source = source_core + "another_example_submodel"
test_submodel.commit()
test_property = test_submodel.get_referable("ExampleSubmodelCollectionOrdered").get_referable("ExampleProperty")
self.assertIsInstance(test_property, model.Property)
test_property.value = "A new value"
test_property.update()
self.assertEqual(test_property.value, 'exampleValue')
# Cleanup CouchDB
couchdb.CouchDBBackend.delete_object(test_submodel)
def test_commit_overwrite(self):
test_submodel = create_example_submodel()
test_submodel.source = source_core + "another_example_submodel"
test_submodel.commit()
test_property = test_submodel.get_referable("ExampleSubmodelCollectionOrdered").get_referable("ExampleProperty")
self.assertIsInstance(test_property, model.Property)
test_property.value = "A new value"
test_property.commit()
test_property.value = "Something else"
test_property.update()
self.assertEqual(test_property.value, "A new value")
# Cleanup Couchdb
couchdb.CouchDBBackend.delete_object(test_submodel)
def test_delete(self):
test_submodel = create_example_submodel()
test_submodel.source = source_core + "another_example_submodel"
test_submodel.commit()
couchdb.CouchDBBackend.delete_object(test_submodel)
self.object_store.add(test_object)
self.assertEqual(test_object.source, source_core+"IRI-https%3A%2F%2Facplt.org%2FTest_Submodel")
def test_example_submodel_storing(self) -> None:
example_submodel = create_example_submodel()
# Add exmaple submodel
self.object_store.add(example_submodel)
self.assertEqual(1, len(self.object_store))
self.assertIn(example_submodel, self.object_store)
# Restore example submodel and check data
submodel_restored = self.object_store.get_identifiable(
model.Identifier(id_='https://acplt.org/Test_Submodel', id_type=model.IdentifierType.IRI))
assert (isinstance(submodel_restored, model.Submodel))
checker = AASDataChecker(raise_immediately=True)
check_example_submodel(checker, submodel_restored)
# Delete example submodel
self.object_store.discard(submodel_restored)
self.assertNotIn(example_submodel, self.object_store)
def test_iterating(self) -> None:
example_data = create_full_example()
# Add all objects
for item in example_data:
self.object_store.add(item)
self.assertEqual(6, len(self.object_store))
# Iterate objects, add them to a DictObjectStore and check them
retrieved_data_store: model.provider.DictObjectStore[model.Identifiable] = model.provider.DictObjectStore()
for item in self.object_store:
retrieved_data_store.add(item)
checker = AASDataChecker(raise_immediately=True)
check_full_example(checker, retrieved_data_store)
def test_key_errors(self) -> None:
# Double adding an object should raise a KeyError
example_submodel = create_example_submodel()
self.object_store.add(example_submodel)
with self.assertRaises(KeyError) as cm:
self.object_store.add(example_submodel)
self.assertEqual("'Identifiable with id Identifier(IRI=https://acplt.org/Test_Submodel) already exists in "
"CouchDB database'", str(cm.exception))
# Querying a deleted object should raise a KeyError
retrieved_submodel = self.object_store.get_identifiable(
model.Identifier('https://acplt.org/Test_Submodel', model.IdentifierType.IRI))
self.object_store.discard(example_submodel)
with self.assertRaises(KeyError) as cm:
test_submodel.update()
self.assertEqual(
"'No Identifiable found in CouchDB at http://localhost:5984/aas_test/another_example_submodel'",
self.object_store.get_identifiable(model.Identifier('https://acplt.org/Test_Submodel', model.IdentifierType.IRI))
self.assertEqual("'No Identifiable with id IRI-https://acplt.org/Test_Submodel found in CouchDB database'",
str(cm.exception))
# Double deleting should also raise a KeyError
with self.assertRaises(KeyError) as cm:
self.object_store.discard(retrieved_submodel)
self.assertEqual("'No AAS object with id Identifier(IRI=https://acplt.org/Test_Submodel) exists in "
"CouchDB database'", str(cm.exception))
def test_conflict_errors(self):
# Preperation: add object and retrieve it from the database
example_submodel = create_example_submodel()
self.object_store.add(example_submodel)
retrieved_submodel = self.object_store.get_identifiable(
model.Identifier('https://acplt.org/Test_Submodel', model.IdentifierType.IRI))
# Simulate a concurrent modification
remote_modified_submodel = copy.copy(retrieved_submodel)
remote_modified_submodel.id_short = "newIdShort"
remote_modified_submodel.commit()
# Todo: With the current architecture, it is not possible to change something in the CouchDB without also
# changing the revision
"""
# Committing changes to the retrieved object should now raise a conflict error
retrieved_submodel.id_short = "myOtherNewIdShort"
with self.assertRaises(couchdb.CouchDBConflictError) as cm:
retrieved_submodel.commit()
self.assertEqual("Could not commit changes to id Identifier(IRI=https://acplt.org/Test_Submodel) due to a "
"concurrent modification in the database.", str(cm.exception))
# Deleting the submodel with safe_delete should also raise a conflict error. Deletion without safe_delete should
# work
with self.assertRaises(couchdb.CouchDBConflictError) as cm:
self.object_store.discard(retrieved_submodel, True)
self.assertEqual("Object with id Identifier(IRI=https://acplt.org/Test_Submodel) has been modified in the "
"database since the version requested to be deleted.", str(cm.exception))
self.object_store.discard(retrieved_submodel, False)
self.assertEqual(0, len(self.object_store))
# Committing after deletion should also raise a conflict error
with self.assertRaises(couchdb.CouchDBConflictError) as cm:
retrieved_submodel.commit()
self.assertEqual("Could not commit changes to id Identifier(IRI=https://acplt.org/Test_Submodel) due to a "
"concurrent modification in the database.", str(cm.exception))
"""
def test_editing(self):
test_object = create_example_submodel()
self.object_store.add(test_object)
# Test if update restores changes
test_object.id_short = "SomeNewIdShort"
test_object.update()
self.assertEqual("TestSubmodel", test_object.id_short)
# Test if commit uploads changes
test_object.id_short = "SomeNewIdShort"
test_object.commit()
new_test_object = self.object_store.get_identifiable(test_object.identification)
self.assertEqual("SomeNewIdShort", new_test_object.id_short)
0% Loading or .
You are about to add 0 people to the discussion. Proceed with caution.
Please register or to comment