From 71b239938d4d49063f275dbcc1fcc1c2b484777b Mon Sep 17 00:00:00 2001
From: Benjamin Ledel <benjamin.ledel@digitallearning.gmbh>
Date: Mon, 10 Feb 2025 17:00:13 +0100
Subject: [PATCH] Update 2 files

- /src/providers/views.py
- /docs/docs/analytics_engine.md
---
 docs/docs/analytics_engine.md |  76 ++++++++++++-
 src/providers/views.py        | 203 +++++++++++++++++-----------------
 2 files changed, 178 insertions(+), 101 deletions(-)

diff --git a/docs/docs/analytics_engine.md b/docs/docs/analytics_engine.md
index 76c084b..b1b08b4 100644
--- a/docs/docs/analytics_engine.md
+++ b/docs/docs/analytics_engine.md
@@ -4,10 +4,84 @@ The analysis engine is used to evaluate stored XAPI statements. The results gene
 
 ## Configuring Engines
 
-Analytics Engines are controlled via YAML files, which are stored in the `configuration` directory. The analytics engines scheduler needs to be notfied about configuration changes by running `scheduler read-configs`, which reads all existing YAML files and starts or updates analytics engines.
+Analytics Engines are controlled via YAML files, which are stored in the `configuration` directory. The analytics engines scheduler needs to be notified about configuration changes by running `scheduler read-configs`, which reads all existing YAML files and starts or updates analytics engines.
 
 If you are running the analytics engine in a Docker container, you should run the following command.
 
 ```bash
 $ docker compose exec -it scheduler sh -c 'scheduler read-configs'
 ```
+
+## API Endpoints
+
+### Retrieve Provider Statements
+
+**Endpoint:**
+
+```
+POST /provider/data
+```
+
+**Description:** Retrieves provider statements from the LRS (Learning Record Store) based on dynamic filters provided in the request. This endpoint allows analytics engines to query statements efficiently with support for pagination and filtering.
+
+**Request Headers:**
+
+- `Authorization`: `Basic <token>` (Required) - Authentication token.
+
+**Request Parameters (JSON Body):**
+
+| Parameter           | Type    | Description                                                                           |
+| ------------------- | ------- | ------------------------------------------------------------------------------------- |
+| `last_object_id`    | String  | (Optional) The `_id` from the last retrieved document to paginate results.            |
+| `page_size`         | Integer | (Optional) Number of statements to retrieve per request (default: system-configured). |
+| `verb.id__in`       | String  | (Optional) Comma-separated list of verb IDs to filter statements.                     |
+| `actor.mbox__regex` | String  | (Optional) Regular expression to filter statements based on `actor.mbox`.             |
+| `actor.tan__exists` | Boolean | (Optional) Whether to include/exclude statements with `actor.tan`.                    |
+| `created_at__gt`    | String  | (Optional) Retrieves statements created after the given timestamp.                    |
+| `created_at__lt`    | String  | (Optional) Retrieves statements created before the given timestamp.                   |
+
+**Example Request:**
+
+```json
+{
+    "last_object_id": "65bfc2a...",
+    "page_size": 10,
+    "verb.id__in": "run,jump",
+    "actor.mbox__regex": "^mailto",
+    "actor.tan__exists": false,
+    "created_at__gt": "2024-01-01T00:00:00Z"
+}
+```
+
+**Response:**
+
+```json
+{
+    "verbs": ["run", "jump"],
+    "statements": [
+        {
+            "_id": "65bfc2a...",
+            "actor": {
+                "mbox": "mailto:example@example.com"
+            },
+            "verb": {
+                "id": "run"
+            },
+            "object": {
+                "id": "example-object"
+            }
+        }
+    ],
+    "page_size": 10
+}
+```
+
+
+**Response Codes:**
+
+| Status Code                 | Description                                         |
+| --------------------------- | --------------------------------------------------- |
+| `200 OK`                    | Successful response with filtered statements.       |
+| `400 Bad Request`           | Invalid request parameters.                         |
+| `401 Unauthorized`          | Authentication token is missing or invalid.         |
+| `500 Internal Server Error` | Server-side error, such as missing provider schema. |
diff --git a/src/providers/views.py b/src/providers/views.py
index ae6554c..1accfb7 100644
--- a/src/providers/views.py
+++ b/src/providers/views.py
@@ -390,11 +390,58 @@ class GetProviderData(APIView):
     Endpoint that allows an analytics engine to obtain provider statements from the LRS.
     """
 
+    def build_mongo_query(self, filters, active_verbs, anon_verbs, providers):
+        """Dynamically construct MongoDB query based on request filters."""
+        query = {"$and": [{"actor.tan": {"$exists": False}}, {"verb.id": {"$in": active_verbs}}]}
+
+        if filters.get("last_object_id"):
+            try:
+                query["$and"].append({"_id": {"$gt": ObjectId(filters["last_object_id"])}})
+            except:
+                pass  # Ignore invalid ObjectId
+
+        # Adding generic filters dynamically
+        for key, value in filters.items():
+            if key == "last_object_id":
+                continue  # Already handled
+            elif key.endswith("__gt"):
+                query["$and"].append({key.replace("__gt", ""): {"$gt": value}})
+            elif key.endswith("__lt"):
+                query["$and"].append({key.replace("__lt", ""): {"$lt": value}})
+            elif key.endswith("__regex"):
+                query["$and"].append({key.replace("__regex", ""): {"$regex": value}})
+            elif key.endswith("__in"):
+                query["$and"].append({key.replace("__in", ""): {"$in": value.split(",")}})
+            elif key.endswith("__exists"):
+                query["$and"].append({key.replace("__exists", ""): {"$exists": value.lower() == "true"}})
+            else:
+                query["$and"].append({key: value})
+
+        query["$and"].append(
+            {
+                "$or": [
+                    {"$and": [
+                        {"actor.mbox": {"$exists": True}},
+                        {"actor.mbox": {"$regex": "^" + settings.ANONYMIZATION_HASH_PREFIX}},
+                        {"verb.id": {"$in": anon_verbs}},
+                    ]},
+                    {"actor.mbox": {"$exists": False}},
+                    {"$and": [
+                        {"actor.mbox": {"$exists": True}},
+                        {"actor.mbox": {"$regex": "^mailto"}},
+                    ]},
+                    get_system_statement_query(providers),
+                ]
+            }
+        )
+
+        return query
+
     def post(self, request):
         start_time = time.time()
-        print("Starting GetProviderData.post")
 
-        token = request.headers.get("Authorization").split("Basic ")[1]
+        # Extract Authorization Token
+        token = request.headers.get("Authorization", "").replace("Basic ", "").strip()
         print(f"Token extracted: {token}")
 
         try:
@@ -402,143 +449,99 @@ class GetProviderData(APIView):
             req_serializer.is_valid(raise_exception=True)
             print("Request serializer validated")
 
+            # Fetch Analytics Token
             analytics_token = AnalyticsToken.objects.get(key=token)
             print("Analytics token retrieved")
 
-            if (
-                analytics_token.expires is not None
-                and analytics_token.expires <= timezone.now()
-            ):
+            if analytics_token.expires and analytics_token.expires <= timezone.now():
                 print("Token has expired")
-                return Response(
-                    {"message": "Token has expired."},
-                    status=status.HTTP_401_UNAUTHORIZED,
-                )
+                return Response({"message": "Token has expired."}, status=status.HTTP_401_UNAUTHORIZED)
 
-            last_object_id = req_serializer.data.get("last_object_id", None)
-            page_size = req_serializer.data.get("page_size", None)
-
-            collection = lrs_db["statements"]
+            # Extract filters from request
+            filters = request.data.copy()  # Use request.POST if it's form-data
+            last_object_id = filters.pop("last_object_id", None)
+            page_size = int(filters.pop("page_size", settings.DEFAULT_PAGE_SIZE))  # Default if not provided
 
+            # Fetch Active Verbs
             db_fetch_start = time.time()
-            active_verbs = list(map(lambda x: x["verb"], list(AnalyticsTokenVerb.objects.filter(analytics_token_id=analytics_token).values())))
+            active_verbs = list(
+                AnalyticsTokenVerb.objects.filter(analytics_token_id=analytics_token)
+                .values_list("verb", flat=True)
+            )
             print(f"Active verbs retrieved in {time.time() - db_fetch_start:.6f} seconds: {active_verbs}")
 
-            providers = []
-            for analytics_token_verb in AnalyticsTokenVerb.objects.filter(analytics_token_id=analytics_token):
-                if analytics_token_verb.provider not in providers:
-                    providers.append(analytics_token_verb.provider)
+            # Fetch Providers
+            providers = list(
+                AnalyticsTokenVerb.objects.filter(analytics_token_id=analytics_token)
+                .values_list("provider", flat=True)
+                .distinct()
+            )
 
+            # Fetch Anonymous Verbs
             anon_verbs = []
             for provider in providers:
                 try:
-                    latest_schema = ProviderSchema.objects.get(
-                        provider=provider, superseded_by__isnull=True
-                    )
+                    latest_schema = ProviderSchema.objects.get(provider=provider, superseded_by__isnull=True)
                 except ObjectDoesNotExist:
-                    print(f"No consent provider schema found for provider: {provider.name}")
-                    return JsonResponse(
-                        {
-                            "message": "No consent provider schema found.",
-                            "provider": provider.name,
-                        },
-                        safe=False,
-                        status=status.HTTP_500_INTERNAL_SERVER_ERROR,
-                    )
-                
-                for verb in [verb for verblist in [group["verbs"] for group in latest_schema.groups] for verb in verblist]:
+                    print(f"No consent provider schema found for provider: {provider}")
+                    return JsonResponse({"message": "No consent provider schema found.", "provider": provider},
+                                        safe=False, status=status.HTTP_500_INTERNAL_SERVER_ERROR)
+
+                for verb in (verb for group in latest_schema.groups for verb in group["verbs"]):
                     if verb["id"] in active_verbs and verb.get("allowAnonymizedCollection", False):
                         min_count = verb.get("allowAnonymizedCollectionMinCount", settings.ANONYMIZATION_DEFAULT_MINIMUM_COUNT)
                         query_start = time.time()
-                        current_count = collection.distinct("actor.mbox", {
-                            "$and": [
-                                {"verb.id": {"$eq": verb["id"]}},
-                                {"actor.mbox": {"$exists": True}},
-                                {"actor.mbox": {"$regex": "^" + settings.ANONYMIZATION_HASH_PREFIX}}
-                            ]
-                        }).__len__()
+                        current_count = len(
+                            collection.distinct("actor.mbox", {
+                                "$and": [
+                                    {"verb.id": verb["id"]},
+                                    {"actor.mbox": {"$exists": True}},
+                                    {"actor.mbox": {"$regex": "^" + settings.ANONYMIZATION_HASH_PREFIX}}
+                                ]
+                            })
+                        )
                         print(f"Query for verb {verb['id']} executed in {time.time() - query_start:.6f} seconds")
                         if current_count >= min_count:
-                            anon_verbs.append(verb)
+                            anon_verbs.append(verb["id"])
             print(f"Anonymous verbs determined: {anon_verbs}")
 
+            # Construct Query with Dynamic Filtering
             query_start_time = time.time()
-            query = (
-                {
-                    "$and": [
-                        {"actor.tan": {"$exists": False}},
-                        {"_id": {"$gt": ObjectId(last_object_id)}},
-                        {"verb.id": {"$in": active_verbs}},
-                        {"$or": [
-                            {"$and": [
-                                {"actor.mbox": {"$exists": True}},
-                                {"actor.mbox": {"$regex": "^" + settings.ANONYMIZATION_HASH_PREFIX}},
-                                {"verb.id": {"$in": anon_verbs}},
-                            ]},
-                            {"actor.mbox": {"$exists": False}},
-                            {"$and": [
-                                {"actor.mbox": {"$exists": True}},
-                                {"actor.mbox": {"$regex": "^mailto"}},
-                            ]},
-                            get_system_statement_query(providers)
-                        ]}
-                    ]
-                }
-                if last_object_id
-                else {
-                    "$and": [
-                        {"actor.tan": {"$exists": False}},
-                        {"verb.id": {"$in": active_verbs}},
-                        {"$or": [
-                            {"$and": [
-                                {"actor.mbox": {"$exists": True}},
-                                {"actor.mbox": {"$regex": "^" + settings.ANONYMIZATION_HASH_PREFIX}},
-                                {"verb.id": {"$in": anon_verbs}},
-                            ]},
-                            {"actor.mbox": {"$exists": False}},
-                            {"$and": [
-                                {"actor.mbox": {"$exists": True}},
-                                {"actor.mbox": {"$regex": "^mailto"}},
-                            ]},
-                            get_system_statement_query(providers)
-                        ]}
-                    ]
-                }
-            )
-            print(f"Query constructed in {time.time() - query_start_time:.6f} seconds")
+            query = self.build_mongo_query(filters, active_verbs, anon_verbs, providers)
+            print(f"Query constructed in {time.time() - query_start_time:.6f} seconds: {query}")
 
+            # Execute MongoDB Query
             execution_start = time.time()
             cursor = collection.find(query).limit(page_size).batch_size(page_size)
             print(f"Query executed in {time.time() - execution_start:.6f} seconds")
+
+            # Process Data
             data_time = time.time()
-            data = {
-                "verbs": list(dict.fromkeys(active_verbs)),
-                "statements": [replace_provider_id(item) for item in cursor],
+            statements = [replace_provider_id(item) for item in cursor]
+            print(f"Data mapping executed in {time.time() - data_time:.6f} seconds")
+
+            response_data = {
+                "verbs": list(set(active_verbs)),  # Remove duplicates
+                "statements": statements,
                 "page_size": page_size,
             }
 
-            print(f"Data mapping executed in {time.time() - data_time:.6f} seconds")
-            serializer = ProviderDataSerializer(data=data)
-
+            # Serialize Response
+            serializer = ProviderDataSerializer(data=response_data)
             if serializer.is_valid():
                 print(f"Total execution time: {time.time() - start_time:.6f} seconds")
                 return Response(serializer.data, status=status.HTTP_200_OK)
 
             print("Failed to serialize response data")
             return Response(
-                {
-                    "message": "Failed to serialize response data.",
-                    "errors": serializer.errors,
-                },
+                {"message": "Failed to serialize response data.", "errors": serializer.errors},
                 status=status.HTTP_400_BAD_REQUEST,
             )
 
-        except ObjectDoesNotExist as c:
-            print(f"Exception occurred: {c}")
-            return Response(
-                {"message": "Invalid analytics token " + token},
-                status=status.HTTP_401_UNAUTHORIZED,
-            )
+        except ObjectDoesNotExist:
+            print(f"Invalid analytics token: {token}")
+            return Response({"message": "Invalid analytics token " + token}, status=status.HTTP_401_UNAUTHORIZED)
+
 
 
 class StoreAnalyticsEngineResult(APIView):
-- 
GitLab