diff --git a/docs/docs/analytics_engine.md b/docs/docs/analytics_engine.md index 76c084bd4a091f16caf794f941dfa8ed1f4d7f15..b1b08b4610cb2b1624ce4fc0505c739104b8df6d 100644 --- a/docs/docs/analytics_engine.md +++ b/docs/docs/analytics_engine.md @@ -4,10 +4,84 @@ The analysis engine is used to evaluate stored XAPI statements. The results gene ## Configuring Engines -Analytics Engines are controlled via YAML files, which are stored in the `configuration` directory. The analytics engines scheduler needs to be notfied about configuration changes by running `scheduler read-configs`, which reads all existing YAML files and starts or updates analytics engines. +Analytics Engines are controlled via YAML files, which are stored in the `configuration` directory. The analytics engines scheduler needs to be notified about configuration changes by running `scheduler read-configs`, which reads all existing YAML files and starts or updates analytics engines. If you are running the analytics engine in a Docker container, you should run the following command. ```bash $ docker compose exec -it scheduler sh -c 'scheduler read-configs' ``` + +## API Endpoints + +### Retrieve Provider Statements + +**Endpoint:** + +``` +POST /provider/data +``` + +**Description:** Retrieves provider statements from the LRS (Learning Record Store) based on dynamic filters provided in the request. This endpoint allows analytics engines to query statements efficiently with support for pagination and filtering. + +**Request Headers:** + +- `Authorization`: `Basic <token>` (Required) - Authentication token. + +**Request Parameters (JSON Body):** + +| Parameter | Type | Description | +| ------------------- | ------- | ------------------------------------------------------------------------------------- | +| `last_object_id` | String | (Optional) The `_id` from the last retrieved document to paginate results. | +| `page_size` | Integer | (Optional) Number of statements to retrieve per request (default: system-configured). | +| `verb.id__in` | String | (Optional) Comma-separated list of verb IDs to filter statements. | +| `actor.mbox__regex` | String | (Optional) Regular expression to filter statements based on `actor.mbox`. | +| `actor.tan__exists` | Boolean | (Optional) Whether to include/exclude statements with `actor.tan`. | +| `created_at__gt` | String | (Optional) Retrieves statements created after the given timestamp. | +| `created_at__lt` | String | (Optional) Retrieves statements created before the given timestamp. | + +**Example Request:** + +```json +{ + "last_object_id": "65bfc2a...", + "page_size": 10, + "verb.id__in": "run,jump", + "actor.mbox__regex": "^mailto", + "actor.tan__exists": false, + "created_at__gt": "2024-01-01T00:00:00Z" +} +``` + +**Response:** + +```json +{ + "verbs": ["run", "jump"], + "statements": [ + { + "_id": "65bfc2a...", + "actor": { + "mbox": "mailto:example@example.com" + }, + "verb": { + "id": "run" + }, + "object": { + "id": "example-object" + } + } + ], + "page_size": 10 +} +``` + + +**Response Codes:** + +| Status Code | Description | +| --------------------------- | --------------------------------------------------- | +| `200 OK` | Successful response with filtered statements. | +| `400 Bad Request` | Invalid request parameters. | +| `401 Unauthorized` | Authentication token is missing or invalid. | +| `500 Internal Server Error` | Server-side error, such as missing provider schema. | diff --git a/src/providers/views.py b/src/providers/views.py index ae6554cf5c178e5d6f4c4623ce2b6488f08516b8..1accfb740427a635f11c4d07196b3634486f4155 100644 --- a/src/providers/views.py +++ b/src/providers/views.py @@ -390,11 +390,58 @@ class GetProviderData(APIView): Endpoint that allows an analytics engine to obtain provider statements from the LRS. """ + def build_mongo_query(self, filters, active_verbs, anon_verbs, providers): + """Dynamically construct MongoDB query based on request filters.""" + query = {"$and": [{"actor.tan": {"$exists": False}}, {"verb.id": {"$in": active_verbs}}]} + + if filters.get("last_object_id"): + try: + query["$and"].append({"_id": {"$gt": ObjectId(filters["last_object_id"])}}) + except: + pass # Ignore invalid ObjectId + + # Adding generic filters dynamically + for key, value in filters.items(): + if key == "last_object_id": + continue # Already handled + elif key.endswith("__gt"): + query["$and"].append({key.replace("__gt", ""): {"$gt": value}}) + elif key.endswith("__lt"): + query["$and"].append({key.replace("__lt", ""): {"$lt": value}}) + elif key.endswith("__regex"): + query["$and"].append({key.replace("__regex", ""): {"$regex": value}}) + elif key.endswith("__in"): + query["$and"].append({key.replace("__in", ""): {"$in": value.split(",")}}) + elif key.endswith("__exists"): + query["$and"].append({key.replace("__exists", ""): {"$exists": value.lower() == "true"}}) + else: + query["$and"].append({key: value}) + + query["$and"].append( + { + "$or": [ + {"$and": [ + {"actor.mbox": {"$exists": True}}, + {"actor.mbox": {"$regex": "^" + settings.ANONYMIZATION_HASH_PREFIX}}, + {"verb.id": {"$in": anon_verbs}}, + ]}, + {"actor.mbox": {"$exists": False}}, + {"$and": [ + {"actor.mbox": {"$exists": True}}, + {"actor.mbox": {"$regex": "^mailto"}}, + ]}, + get_system_statement_query(providers), + ] + } + ) + + return query + def post(self, request): start_time = time.time() - print("Starting GetProviderData.post") - token = request.headers.get("Authorization").split("Basic ")[1] + # Extract Authorization Token + token = request.headers.get("Authorization", "").replace("Basic ", "").strip() print(f"Token extracted: {token}") try: @@ -402,143 +449,99 @@ class GetProviderData(APIView): req_serializer.is_valid(raise_exception=True) print("Request serializer validated") + # Fetch Analytics Token analytics_token = AnalyticsToken.objects.get(key=token) print("Analytics token retrieved") - if ( - analytics_token.expires is not None - and analytics_token.expires <= timezone.now() - ): + if analytics_token.expires and analytics_token.expires <= timezone.now(): print("Token has expired") - return Response( - {"message": "Token has expired."}, - status=status.HTTP_401_UNAUTHORIZED, - ) + return Response({"message": "Token has expired."}, status=status.HTTP_401_UNAUTHORIZED) - last_object_id = req_serializer.data.get("last_object_id", None) - page_size = req_serializer.data.get("page_size", None) - - collection = lrs_db["statements"] + # Extract filters from request + filters = request.data.copy() # Use request.POST if it's form-data + last_object_id = filters.pop("last_object_id", None) + page_size = int(filters.pop("page_size", settings.DEFAULT_PAGE_SIZE)) # Default if not provided + # Fetch Active Verbs db_fetch_start = time.time() - active_verbs = list(map(lambda x: x["verb"], list(AnalyticsTokenVerb.objects.filter(analytics_token_id=analytics_token).values()))) + active_verbs = list( + AnalyticsTokenVerb.objects.filter(analytics_token_id=analytics_token) + .values_list("verb", flat=True) + ) print(f"Active verbs retrieved in {time.time() - db_fetch_start:.6f} seconds: {active_verbs}") - providers = [] - for analytics_token_verb in AnalyticsTokenVerb.objects.filter(analytics_token_id=analytics_token): - if analytics_token_verb.provider not in providers: - providers.append(analytics_token_verb.provider) + # Fetch Providers + providers = list( + AnalyticsTokenVerb.objects.filter(analytics_token_id=analytics_token) + .values_list("provider", flat=True) + .distinct() + ) + # Fetch Anonymous Verbs anon_verbs = [] for provider in providers: try: - latest_schema = ProviderSchema.objects.get( - provider=provider, superseded_by__isnull=True - ) + latest_schema = ProviderSchema.objects.get(provider=provider, superseded_by__isnull=True) except ObjectDoesNotExist: - print(f"No consent provider schema found for provider: {provider.name}") - return JsonResponse( - { - "message": "No consent provider schema found.", - "provider": provider.name, - }, - safe=False, - status=status.HTTP_500_INTERNAL_SERVER_ERROR, - ) - - for verb in [verb for verblist in [group["verbs"] for group in latest_schema.groups] for verb in verblist]: + print(f"No consent provider schema found for provider: {provider}") + return JsonResponse({"message": "No consent provider schema found.", "provider": provider}, + safe=False, status=status.HTTP_500_INTERNAL_SERVER_ERROR) + + for verb in (verb for group in latest_schema.groups for verb in group["verbs"]): if verb["id"] in active_verbs and verb.get("allowAnonymizedCollection", False): min_count = verb.get("allowAnonymizedCollectionMinCount", settings.ANONYMIZATION_DEFAULT_MINIMUM_COUNT) query_start = time.time() - current_count = collection.distinct("actor.mbox", { - "$and": [ - {"verb.id": {"$eq": verb["id"]}}, - {"actor.mbox": {"$exists": True}}, - {"actor.mbox": {"$regex": "^" + settings.ANONYMIZATION_HASH_PREFIX}} - ] - }).__len__() + current_count = len( + collection.distinct("actor.mbox", { + "$and": [ + {"verb.id": verb["id"]}, + {"actor.mbox": {"$exists": True}}, + {"actor.mbox": {"$regex": "^" + settings.ANONYMIZATION_HASH_PREFIX}} + ] + }) + ) print(f"Query for verb {verb['id']} executed in {time.time() - query_start:.6f} seconds") if current_count >= min_count: - anon_verbs.append(verb) + anon_verbs.append(verb["id"]) print(f"Anonymous verbs determined: {anon_verbs}") + # Construct Query with Dynamic Filtering query_start_time = time.time() - query = ( - { - "$and": [ - {"actor.tan": {"$exists": False}}, - {"_id": {"$gt": ObjectId(last_object_id)}}, - {"verb.id": {"$in": active_verbs}}, - {"$or": [ - {"$and": [ - {"actor.mbox": {"$exists": True}}, - {"actor.mbox": {"$regex": "^" + settings.ANONYMIZATION_HASH_PREFIX}}, - {"verb.id": {"$in": anon_verbs}}, - ]}, - {"actor.mbox": {"$exists": False}}, - {"$and": [ - {"actor.mbox": {"$exists": True}}, - {"actor.mbox": {"$regex": "^mailto"}}, - ]}, - get_system_statement_query(providers) - ]} - ] - } - if last_object_id - else { - "$and": [ - {"actor.tan": {"$exists": False}}, - {"verb.id": {"$in": active_verbs}}, - {"$or": [ - {"$and": [ - {"actor.mbox": {"$exists": True}}, - {"actor.mbox": {"$regex": "^" + settings.ANONYMIZATION_HASH_PREFIX}}, - {"verb.id": {"$in": anon_verbs}}, - ]}, - {"actor.mbox": {"$exists": False}}, - {"$and": [ - {"actor.mbox": {"$exists": True}}, - {"actor.mbox": {"$regex": "^mailto"}}, - ]}, - get_system_statement_query(providers) - ]} - ] - } - ) - print(f"Query constructed in {time.time() - query_start_time:.6f} seconds") + query = self.build_mongo_query(filters, active_verbs, anon_verbs, providers) + print(f"Query constructed in {time.time() - query_start_time:.6f} seconds: {query}") + # Execute MongoDB Query execution_start = time.time() cursor = collection.find(query).limit(page_size).batch_size(page_size) print(f"Query executed in {time.time() - execution_start:.6f} seconds") + + # Process Data data_time = time.time() - data = { - "verbs": list(dict.fromkeys(active_verbs)), - "statements": [replace_provider_id(item) for item in cursor], + statements = [replace_provider_id(item) for item in cursor] + print(f"Data mapping executed in {time.time() - data_time:.6f} seconds") + + response_data = { + "verbs": list(set(active_verbs)), # Remove duplicates + "statements": statements, "page_size": page_size, } - print(f"Data mapping executed in {time.time() - data_time:.6f} seconds") - serializer = ProviderDataSerializer(data=data) - + # Serialize Response + serializer = ProviderDataSerializer(data=response_data) if serializer.is_valid(): print(f"Total execution time: {time.time() - start_time:.6f} seconds") return Response(serializer.data, status=status.HTTP_200_OK) print("Failed to serialize response data") return Response( - { - "message": "Failed to serialize response data.", - "errors": serializer.errors, - }, + {"message": "Failed to serialize response data.", "errors": serializer.errors}, status=status.HTTP_400_BAD_REQUEST, ) - except ObjectDoesNotExist as c: - print(f"Exception occurred: {c}") - return Response( - {"message": "Invalid analytics token " + token}, - status=status.HTTP_401_UNAUTHORIZED, - ) + except ObjectDoesNotExist: + print(f"Invalid analytics token: {token}") + return Response({"message": "Invalid analytics token " + token}, status=status.HTTP_401_UNAUTHORIZED) + class StoreAnalyticsEngineResult(APIView):