Skip to content
Snippets Groups Projects
Select Git revision
  • 59ed8cf51a8218bc0f7154d59cc9fc79611a10d6
  • master default protected
  • backport/docs_ci
  • improve/V30RC02
  • improve/V30RC01
  • feature/http_api
  • backport/http_api
  • feature/couchdb_update_commit
  • fix/tutorial_dynamic_model
  • v0.2.3 protected
  • v0.2.2 protected
  • v0.2.1 protected
  • v0.2.0 protected
  • v0.1.0 protected
14 results

couchdb.py

Blame
  • Code owners
    Assign users and groups as approvers for specific file changes. Learn more.
    couchdb.py 22.80 KiB
    # Copyright 2020 PyI40AAS Contributors
    #
    # Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except in compliance with
    # the License. You may obtain a copy of the License at
    #
    #     http://www.apache.org/licenses/LICENSE-2.0
    #
    # Unless required by applicable law or agreed to in writing, software distributed under the License is distributed on an
    # "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the License for the
    # specific language governing permissions and limitations under the License.
    """
    Todo: Add module docstring
    """
    from typing import List, Dict, Any, Optional, Iterator, Iterable, Union
    import re
    import urllib.parse
    import urllib.request
    import urllib.error
    import logging
    import json
    import http.client
    
    from . import backends
    from aas.adapter.json import json_deserialization, json_serialization
    from aas import model
    
    
    logger = logging.getLogger(__name__)
    
    
    class CouchDBBackend(backends.Backend):
        """
        This Backend stores each Identifiable object as a single JSON document in the configured CouchDB database. Each
        document's id is build from the object's identifier using the pattern {idtype}-{idvalue}; the document's contents
        comprise a single property "data", containing the JSON serialization of the PyI40AAS object. The aas.adapter.json
        package is used for serialization and deserialization of objects.
        """
        @classmethod
        def update_object(cls,
                          updated_object: "Referable",  # type: ignore
                          store_object: "Referable",  # type: ignore
                          relative_path: List[str]) -> None:
    
            if not isinstance(store_object, model.Identifiable):
                raise CouchDBSourceError("The given store_object is not Identifiable, therefore cannot be found "
                                         "in the CouchDB")
            url = CouchDBBackend._parse_source(store_object.source)
            request = urllib.request.Request(url,
                                             headers={'Accept': 'application/json'})
            try:
                data = CouchDBBackend.do_request(request)
            except CouchDBServerError as e:
                if e.code == 404:
                    raise KeyError("No Identifiable found in CouchDB at {}".format(url)) from e
                raise
    
            updated_store_object = data['data']
            set_couchdb_revision(url, data["_rev"])
            store_object.update_from(updated_store_object)
    
        @classmethod
        def commit_object(cls,
                          committed_object: "Referable",  # type: ignore
                          store_object: "Referable",  # type: ignore
                          relative_path: List[str]) -> None:
            if not isinstance(store_object, model.Identifiable):
                raise CouchDBSourceError("The given store_object is not Identifiable, therefore cannot be found "
                                         "in the CouchDB")
            url = CouchDBBackend._parse_source(store_object.source)
            # We need to get the revision of the object, if it already exists, otherwise we cannot write to the Couchdb
            if get_couchdb_revision(url) is None:
                raise CouchDBConflictError("No revision found for the given object. Try calling `update` on it.")
    
            data = json.dumps({'data': store_object, "_rev": get_couchdb_revision(url)},
                              cls=json_serialization.AASToJsonEncoder)
            request = urllib.request.Request(
                url,
                headers={'Content-type': 'application/json'},
                method='PUT',
                data=data.encode())
            try:
                response = CouchDBBackend.do_request(request)
                set_couchdb_revision(url, response["rev"])
            except CouchDBServerError as e:
                if e.code == 409:
                    raise CouchDBConflictError("Could not commit changes to id {} due to a concurrent modification in the "
                                               "database.".format(store_object.identification)) from e
                elif e.code == 404:
                    raise KeyError("Object with id {} was not found in the CouchDB at {}"
                                   .format(store_object.identification, url)) from e
                raise
    
        @classmethod
        def _parse_source(cls, source: str) -> str:
            """
            Parses the source parameter of a model.Referable object
    
            :param source: Source string of the model.Referable object
            :return: URL to the document
            :raises CouchDBBackendSourceError, if the source has the wrong format
            """
            couchdb_s = re.match("couchdbs://", source)  # Note: Works, since match only checks the beginning of the string
            if couchdb_s:
                url = source.replace("couchdbs://", "https://", 1)
            else:
                couchdb_wo_s = re.match("couchdb://", source)
                if couchdb_wo_s:
                    url = source.replace("couchdb://", "http://", 1)
                else:
                    raise CouchDBSourceError("Source has wrong format. "
                                             "Expected to start with {couchdb://, couchdbs://}, got {" + source + "}")
            return url
    
        @classmethod
        def do_request(cls, request: urllib.request.Request) -> Dict[str, Any]:
            """
            Perform an HTTP request to the CouchDBServer, parse the result and handle errors
    
            :param request:
            :return:
            """
            opener = urllib.request.build_opener(urllib.request.HTTPBasicAuthHandler(_credentials_store))
            try:
                response = opener.open(request)
            except urllib.error.HTTPError as e:
                logger.debug("Request %s %s finished with HTTP status code %s.",
                             request.get_method(), request.full_url, e.code)
                if e.headers.get('Content-type', None) != 'application/json':
                    raise CouchDBResponseError("Unexpected Content-type header {} of response from CouchDB server"
                                               .format(e.headers.get('Content-type', None)))
    
                if request.get_method() == 'HEAD':
                    raise CouchDBServerError(e.code, "", "", "HTTP {}") from e
    
                try:
                    data = json.load(e)
                except json.JSONDecodeError:
                    raise CouchDBResponseError("Could not parse error message of HTTP {}"
                                               .format(e.code))
                raise CouchDBServerError(e.code, data['error'], data['reason'],
                                         "HTTP {}: {} (reason: {})".format(e.code, data['error'], data['reason'])) from e
            except urllib.error.URLError as e:
                raise CouchDBConnectionError("Error while connecting to the CouchDB server: {}".format(e)) from e
    
            # Check response & parse data
            assert (isinstance(response, http.client.HTTPResponse))
            logger.debug("Request %s %s finished successfully.", request.get_method(), request.full_url)
            if request.get_method() == 'HEAD':
                return {}
    
            if response.getheader('Content-type') != 'application/json':
                raise CouchDBResponseError("Unexpected Content-type header")
            try:
                data = json.load(response, cls=json_deserialization.AASFromJsonDecoder)
            except json.JSONDecodeError as e:
                raise CouchDBResponseError("Could not parse CouchDB server response as JSON data.") from e
            return data
    
    
    # Global registry for credentials for CouchDB Servers
    _credentials_store: urllib.request.HTTPPasswordMgrWithPriorAuth = urllib.request.HTTPPasswordMgrWithPriorAuth()
    # Note: The HTTPPasswordMgr is not thread safe during writing, should be thread safe for reading only.
    
    
    def register_credentials(url: str, username: str, password: str):
        """
        Register the credentials of a CouchDB server to the global credentials store
    
        Warning: Do not use this function, while other threads may be accessing the credentials via the CouchDBObjectStore
                 or update or commit functions of model.base.Referable objects!
    
        :param url: Toplevel URL
        :param username: Username to that CouchDB instance
        :param password: Password to the Username
        """
        _credentials_store.add_password(None, url, username, password, is_authenticated=True)
    
    
    # Global registry for CouchDB Revisions
    _revision_store: Dict[str, str] = {}
    
    
    def set_couchdb_revision(url: str, revision: str):
        """
        Set the CouchDB revision of the given document in the revision store
    
        :param url: URL to the CouchDB document
        :param revision: CouchDB revision
        """
        _revision_store[url] = revision
    
    
    def get_couchdb_revision(url: str) -> Optional[str]:
        """
        Get the CouchDB revision from the revision store for the given URL to a CouchDB Document
    
        :param url: URL to the CouchDB document
        :return: CouchDB-revision, if there is one, otherwise returns None
        """
        return _revision_store.get(url)
    
    
    def delete_couchdb_revision(url: str):
        """
        Delete the CouchDB revision from the revision store for the given URL to a CouchDB Document
    
        :param url: URL to the CouchDB document
        """
        del _revision_store[url]
    
    
    class CouchDBObjectStore(model.AbstractObjectStore):
        """
        An ObjectStore implementation for Identifiable PyI40AAS objects backed by a CouchDB database server.
    
        todo: adapt the following text
        All methods of the `CouchDBObjectStore` are blocking, i.e. they stop the current thread's execution until they
        receive a response from the CouchDB server (or encounter a timeout). However, the `CouchDBObjectStore` objects are
        thread-safe, meaning that you may run multiple method calls on the same CouchDBObjectStore in parallel in different
        threads. For example, you could use a ThreadPoolExecutor to add a large number of objects to the database:
    
            import concurrent.futures
            submodels = [submodel1, submodel2, submodel3]
            database = CouchDBObjectStore('localhost:5984', 'aas_test')
            database.login('test', 'test')
            with concurrent.futures.ThreadPoolExecutor() as pool:
                pool.map(database.add, submodels)
    
        """
        def __init__(self, url: str, database: str):
            """
            Initializer of class CouchDBObjectStore
    
            :param url: URL to the CouchDB
            :param database: Name of the Database inside the CouchDB
            """
            self.url: str = url
            self.database_name: str = database
    
        def check_database(self, create=False):
            """
            Check if the database exists and created it if not (and requested to do so)
    
            :param create: If True and the database does not exist, try to create it
            :raises CouchDBError: If error occur during the request to the CouchDB server (see `_do_request()` for details)
            """
            request = urllib.request.Request(
                "{}/{}".format(self.url, self.database_name),
                headers={'Accept': 'application/json'},
                method='HEAD')
            try:
                CouchDBBackend.do_request(request)
            except CouchDBServerError as e:
                # If an HTTPError is raised, re-raise it, unless it is a 404 error and we are requested to create the
                # database
                if e.code != 404 or not create:
                    raise e
            else:
                # If no exception is given raised, the database is okay
                return
    
            # Create database
            logger.info("Creating CouchDB database %s/%s ...", self.url, self.database_name)
            request = urllib.request.Request(
                "{}/{}".format(self.url, self.database_name),
                headers={'Accept': 'application/json'},
                method='PUT')
            CouchDBBackend.do_request(request)
    
        def get_identifiable(self, identifier: Union[str, model.Identifier]) -> model.Identifiable:
            """
            Retrieve an AAS object from the CouchDB by its Identifier
    
            If the identifier is a string, it is assumed that the string is a correct couchdb-ID-string (according to the
            internal conversion rules, see CouchDBObjectStore._transform_id() )
    
            :raises KeyError: If no such object is stored in the database
            :raises CouchDBError: If error occur during the request to the CouchDB server (see `_do_request()` for details)
            """
            if isinstance(identifier, model.Identifier):
                identifier = self._transform_id(identifier, False)
    
            # Create and issue HTTP request (raises HTTPError on status != 200)
            request = urllib.request.Request(
                "{}/{}/{}".format(self.url, self.database_name, urllib.parse.quote(identifier, safe='')),
                headers={'Accept': 'application/json'})
            try:
                data = CouchDBBackend.do_request(request)
            except CouchDBServerError as e:
                if e.code == 404:
                    raise KeyError("No Identifiable with id {} found in CouchDB database".format(identifier)) from e
                raise
    
            # Add CouchDB meta data (for later commits) to object
            obj = data['data']
            if not isinstance(obj, model.Identifiable):
                raise CouchDBResponseError("The CouchDB document with id {} does not contain an identifiable AAS object."
                                           .format(identifier))
            self.generate_source(obj)  # Generate the source parameter of this object
            set_couchdb_revision("{}/{}/{}".format(self.url, self.database_name, urllib.parse.quote(identifier, safe='')),
                                 data["_rev"])
            return obj
    
        def add(self, x: model.Identifiable) -> None:
            """
            Add an object to the store
    
            :raises KeyError: If an object with the same id exists already in the database
            :raises CouchDBError: If error occur during the request to the CouchDB server (see `_do_request()` for details)
            """
            logger.debug("Adding object %s to CouchDB database ...", repr(x))
            # Serialize data
            data = json.dumps({'data': x}, cls=json_serialization.AASToJsonEncoder)
    
            # Create and issue HTTP request (raises HTTPError on status != 200)
            request = urllib.request.Request(
                "{}/{}/{}".format(self.url, self.database_name, self._transform_id(x.identification)),
                headers={'Content-type': 'application/json'},
                method='PUT',
                data=data.encode())
            try:
                response = CouchDBBackend.do_request(request)
                set_couchdb_revision("{}/{}/{}".format(self.url, self.database_name, self._transform_id(x.identification)),
                                     response["rev"])
            except CouchDBServerError as e:
                if e.code == 409:
                    raise KeyError("Identifiable with id {} already exists in CouchDB database".format(x.identification))\
                        from e
                raise
            self.generate_source(x)  # Set the source of the object
    
        def discard(self, x: model.Identifiable, safe_delete=False) -> None:
            """
            Delete an Identifiable AAS object from the CouchDB database
    
            :param x: The object to be deleted
            :param safe_delete: If True, only delete the object if it has not been modified in the database in comparison to
                                the provided revision. This uses the CouchDB revision token and thus only works with
                                CouchDBIdentifiable objects retrieved from this database.
            :raises KeyError: If the object does not exist in the database
            :raises CouchDBConflictError: If safe_delete is true and the object has been modified or deleted in the database
            :raises CouchDBError: If error occur during the request to the CouchDB server (see `_do_request()` for details)
            """
            logger.debug("Deleting object %s from CouchDB database ...", repr(x))
            # If x is not a CouchDBIdentifiable, retrieve x from the database to get the current couchdb_revision
            rev = get_couchdb_revision("{}/{}/{}".format(self.url,
                                                         self.database_name,
                                                         self._transform_id(x.identification)))
            if rev is not None:
                logger.debug("using the object's stored revision token %s for deletion." %rev)
            if rev is None:
                if safe_delete:
                    raise CouchDBConflictError("No CouchDBRevision found for the object")
                else:
                    try:
                        logger.debug("fetching the current object revision for deletion ...")
                        self.get_identifiable(x.identification)
                    except KeyError as e:
                        raise KeyError(
                            "No AAS object with id {} exists in CouchDB database".format(x.identification)) from e
                rev = get_couchdb_revision("{}/{}/{}".format(self.url,
                                                             self.database_name,
                                                             self._transform_id(x.identification)))
                logger.debug("using the current object revision %s for deletion." % rev)
    
            request = urllib.request.Request(
                "{}/{}/{}?rev={}".format(self.url, self.database_name, self._transform_id(x.identification), rev),
                headers={'Content-type': 'application/json'},
                method='DELETE')
            try:
                CouchDBBackend.do_request(request)
            except CouchDBServerError as e:
                if e.code == 404:
                    raise KeyError("No AAS object with id {} exists in CouchDB database".format(x.identification)) from e
                elif e.code == 409:
                    raise CouchDBConflictError(
                        "Object with id {} has been modified in the database since "
                        "the version requested to be deleted.".format(x.identification)) from e
                raise
            delete_couchdb_revision("{}/{}/{}".format(self.url,
                                                      self.database_name,
                                                      self._transform_id(x.identification)))
    
        def __contains__(self, x: object) -> bool:
            """
            Check if an object with the given Identifier or the same Identifier as the given object is contained in the
            CouchDB database
    
            :param x: AAS object Identifier or Identifiable AAS object
            :return: True if such an object exists in the database, False otherwise
            :raises CouchDBError: If error occur during the request to the CouchDB server (see `_do_request()` for details)
            """
            if isinstance(x, model.Identifier):
                identifier = x
            elif isinstance(x, model.Identifiable):
                identifier = x.identification
            else:
                return False
            logger.debug("Checking existence of object with id %s in database ...", repr(x))
            request = urllib.request.Request(
                "{}/{}/{}".format(self.url, self.database_name, self._transform_id(identifier)),
                headers={'Accept': 'application/json'},
                method='HEAD')
            try:
                CouchDBBackend.do_request(request)
            except CouchDBServerError as e:
                if e.code == 404:
                    return False
                raise
            return True
    
        def __len__(self) -> int:
            """
            Retrieve the number of objects in the CouchDB database
    
            :return: The number of objects (determined from the number of documents)
            :raises CouchDBError: If error occur during the request to the CouchDB server (see `_do_request()` for details)
            """
            logger.debug("Fetching number of documents from database ...")
            request = urllib.request.Request(
                "{}/{}".format(self.url, self.database_name),
                headers={'Accept': 'application/json'})
            data = CouchDBBackend.do_request(request)
            return data['doc_count']
    
        def __iter__(self) -> Iterator[model.Identifiable]:
            """
            Iterate all Identifiables in the CouchDB database.
    
            This method returns a lazy iterator, containing only a list of all identifiers in the database and retrieving
            the identifiable objects on the fly.
    
            :raises CouchDBError: If error occur during fetching the list of objects from the CouchDB server (see
                                  `_do_request()` for details)
            """
            # Iterator class storing the list of ids and fetching Identifiable objects on the fly
            class CouchDBIdentifiableIterator(Iterator[model.Identifiable]):
                def __init__(self, store: CouchDBObjectStore, ids: Iterable[str]):
                    self._iter = iter(ids)
                    self._store = store
    
                def __next__(self):
                    next_id = next(self._iter)
                    return self._store.get_identifiable(next_id)
    
            # Fetch a list of all ids and construct Iterator object
            logger.debug("Creating iterator over objects in database ...")
            request = urllib.request.Request(
                "{}/{}/_all_docs".format(self.url, self.database_name),
                headers={'Accept': 'application/json'})
            data = CouchDBBackend.do_request(request)
            return CouchDBIdentifiableIterator(self, (row['id'] for row in data['rows']))
    
        @staticmethod
        def _transform_id(identifier: model.Identifier, url_quote=True) -> str:
            """
            Helper method to represent an ASS Identifier as a string to be used as CouchDB document id
    
            :param url_quote: If True, the result id string is url-encoded to be used in a HTTP request URL
            """
            result = "{}-{}".format(identifier.id_type.name, identifier.id)
            if url_quote:
                result = urllib.parse.quote(result, safe='')
            return result
    
        def generate_source(self, identifiable: model.Identifiable):
            """
            Generates the source string for an Identifiable object that is backed by the Couchdb
    
            :param identifiable: Identifiable object
            """
            source: str = self.url.replace("https://", "couchdbs://").replace("http://", "couchdb://")
            source += "/" + self.database_name + "/" + self._transform_id(identifiable.identification)
            identifiable.source = source
    
    
    # #################################################################################################
    # Custom Exception classes for reporting errors during interaction with the CouchDB server
    
    class CouchDBError(Exception):
        pass
    
    
    class CouchDBSourceError(CouchDBError):
        """Exception raised when the source has the wrong format"""
        pass
    
    
    class CouchDBConnectionError(CouchDBError):
        """Exception raised when the CouchDB server could not be reached"""
        pass
    
    
    class CouchDBResponseError(CouchDBError):
        """Exception raised by when an HTTP of the CouchDB server could not be handled (e.g.
        no JSON body)"""
        pass
    
    
    class CouchDBServerError(CouchDBError):
        """Exception raised when the CouchDB server returns an unexpected error code"""
        def __init__(self, code: int, error: str, reason: str, *args):
            super().__init__(*args)
            self.code = code
            self.error = error
            self.reason = reason
    
    
    class CouchDBConflictError(CouchDBError):
        """Exception raised when an object could not be committed due to an concurrent
        modification in the database"""
        pass