Skip to content
Snippets Groups Projects
Commit 71b23993 authored by Benjamin Ledel's avatar Benjamin Ledel
Browse files

Update 2 files

- /src/providers/views.py
- /docs/docs/analytics_engine.md
parent 75f69514
No related branches found
No related tags found
No related merge requests found
Pipeline #1611109 passed
Pipeline: FIRST_START

#1611131

    ......@@ -4,10 +4,84 @@ The analysis engine is used to evaluate stored XAPI statements. The results gene
    ## Configuring Engines
    Analytics Engines are controlled via YAML files, which are stored in the `configuration` directory. The analytics engines scheduler needs to be notfied about configuration changes by running `scheduler read-configs`, which reads all existing YAML files and starts or updates analytics engines.
    Analytics Engines are controlled via YAML files, which are stored in the `configuration` directory. The analytics engines scheduler needs to be notified about configuration changes by running `scheduler read-configs`, which reads all existing YAML files and starts or updates analytics engines.
    If you are running the analytics engine in a Docker container, you should run the following command.
    ```bash
    $ docker compose exec -it scheduler sh -c 'scheduler read-configs'
    ```
    ## API Endpoints
    ### Retrieve Provider Statements
    **Endpoint:**
    ```
    POST /provider/data
    ```
    **Description:** Retrieves provider statements from the LRS (Learning Record Store) based on dynamic filters provided in the request. This endpoint allows analytics engines to query statements efficiently with support for pagination and filtering.
    **Request Headers:**
    - `Authorization`: `Basic <token>` (Required) - Authentication token.
    **Request Parameters (JSON Body):**
    | Parameter | Type | Description |
    | ------------------- | ------- | ------------------------------------------------------------------------------------- |
    | `last_object_id` | String | (Optional) The `_id` from the last retrieved document to paginate results. |
    | `page_size` | Integer | (Optional) Number of statements to retrieve per request (default: system-configured). |
    | `verb.id__in` | String | (Optional) Comma-separated list of verb IDs to filter statements. |
    | `actor.mbox__regex` | String | (Optional) Regular expression to filter statements based on `actor.mbox`. |
    | `actor.tan__exists` | Boolean | (Optional) Whether to include/exclude statements with `actor.tan`. |
    | `created_at__gt` | String | (Optional) Retrieves statements created after the given timestamp. |
    | `created_at__lt` | String | (Optional) Retrieves statements created before the given timestamp. |
    **Example Request:**
    ```json
    {
    "last_object_id": "65bfc2a...",
    "page_size": 10,
    "verb.id__in": "run,jump",
    "actor.mbox__regex": "^mailto",
    "actor.tan__exists": false,
    "created_at__gt": "2024-01-01T00:00:00Z"
    }
    ```
    **Response:**
    ```json
    {
    "verbs": ["run", "jump"],
    "statements": [
    {
    "_id": "65bfc2a...",
    "actor": {
    "mbox": "mailto:example@example.com"
    },
    "verb": {
    "id": "run"
    },
    "object": {
    "id": "example-object"
    }
    }
    ],
    "page_size": 10
    }
    ```
    **Response Codes:**
    | Status Code | Description |
    | --------------------------- | --------------------------------------------------- |
    | `200 OK` | Successful response with filtered statements. |
    | `400 Bad Request` | Invalid request parameters. |
    | `401 Unauthorized` | Authentication token is missing or invalid. |
    | `500 Internal Server Error` | Server-side error, such as missing provider schema. |
    ......@@ -390,11 +390,58 @@ class GetProviderData(APIView):
    Endpoint that allows an analytics engine to obtain provider statements from the LRS.
    """
    def build_mongo_query(self, filters, active_verbs, anon_verbs, providers):
    """Dynamically construct MongoDB query based on request filters."""
    query = {"$and": [{"actor.tan": {"$exists": False}}, {"verb.id": {"$in": active_verbs}}]}
    if filters.get("last_object_id"):
    try:
    query["$and"].append({"_id": {"$gt": ObjectId(filters["last_object_id"])}})
    except:
    pass # Ignore invalid ObjectId
    # Adding generic filters dynamically
    for key, value in filters.items():
    if key == "last_object_id":
    continue # Already handled
    elif key.endswith("__gt"):
    query["$and"].append({key.replace("__gt", ""): {"$gt": value}})
    elif key.endswith("__lt"):
    query["$and"].append({key.replace("__lt", ""): {"$lt": value}})
    elif key.endswith("__regex"):
    query["$and"].append({key.replace("__regex", ""): {"$regex": value}})
    elif key.endswith("__in"):
    query["$and"].append({key.replace("__in", ""): {"$in": value.split(",")}})
    elif key.endswith("__exists"):
    query["$and"].append({key.replace("__exists", ""): {"$exists": value.lower() == "true"}})
    else:
    query["$and"].append({key: value})
    query["$and"].append(
    {
    "$or": [
    {"$and": [
    {"actor.mbox": {"$exists": True}},
    {"actor.mbox": {"$regex": "^" + settings.ANONYMIZATION_HASH_PREFIX}},
    {"verb.id": {"$in": anon_verbs}},
    ]},
    {"actor.mbox": {"$exists": False}},
    {"$and": [
    {"actor.mbox": {"$exists": True}},
    {"actor.mbox": {"$regex": "^mailto"}},
    ]},
    get_system_statement_query(providers),
    ]
    }
    )
    return query
    def post(self, request):
    start_time = time.time()
    print("Starting GetProviderData.post")
    token = request.headers.get("Authorization").split("Basic ")[1]
    # Extract Authorization Token
    token = request.headers.get("Authorization", "").replace("Basic ", "").strip()
    print(f"Token extracted: {token}")
    try:
    ......@@ -402,143 +449,99 @@ class GetProviderData(APIView):
    req_serializer.is_valid(raise_exception=True)
    print("Request serializer validated")
    # Fetch Analytics Token
    analytics_token = AnalyticsToken.objects.get(key=token)
    print("Analytics token retrieved")
    if (
    analytics_token.expires is not None
    and analytics_token.expires <= timezone.now()
    ):
    if analytics_token.expires and analytics_token.expires <= timezone.now():
    print("Token has expired")
    return Response(
    {"message": "Token has expired."},
    status=status.HTTP_401_UNAUTHORIZED,
    )
    return Response({"message": "Token has expired."}, status=status.HTTP_401_UNAUTHORIZED)
    last_object_id = req_serializer.data.get("last_object_id", None)
    page_size = req_serializer.data.get("page_size", None)
    collection = lrs_db["statements"]
    # Extract filters from request
    filters = request.data.copy() # Use request.POST if it's form-data
    last_object_id = filters.pop("last_object_id", None)
    page_size = int(filters.pop("page_size", settings.DEFAULT_PAGE_SIZE)) # Default if not provided
    # Fetch Active Verbs
    db_fetch_start = time.time()
    active_verbs = list(map(lambda x: x["verb"], list(AnalyticsTokenVerb.objects.filter(analytics_token_id=analytics_token).values())))
    active_verbs = list(
    AnalyticsTokenVerb.objects.filter(analytics_token_id=analytics_token)
    .values_list("verb", flat=True)
    )
    print(f"Active verbs retrieved in {time.time() - db_fetch_start:.6f} seconds: {active_verbs}")
    providers = []
    for analytics_token_verb in AnalyticsTokenVerb.objects.filter(analytics_token_id=analytics_token):
    if analytics_token_verb.provider not in providers:
    providers.append(analytics_token_verb.provider)
    # Fetch Providers
    providers = list(
    AnalyticsTokenVerb.objects.filter(analytics_token_id=analytics_token)
    .values_list("provider", flat=True)
    .distinct()
    )
    # Fetch Anonymous Verbs
    anon_verbs = []
    for provider in providers:
    try:
    latest_schema = ProviderSchema.objects.get(
    provider=provider, superseded_by__isnull=True
    )
    latest_schema = ProviderSchema.objects.get(provider=provider, superseded_by__isnull=True)
    except ObjectDoesNotExist:
    print(f"No consent provider schema found for provider: {provider.name}")
    return JsonResponse(
    {
    "message": "No consent provider schema found.",
    "provider": provider.name,
    },
    safe=False,
    status=status.HTTP_500_INTERNAL_SERVER_ERROR,
    )
    for verb in [verb for verblist in [group["verbs"] for group in latest_schema.groups] for verb in verblist]:
    print(f"No consent provider schema found for provider: {provider}")
    return JsonResponse({"message": "No consent provider schema found.", "provider": provider},
    safe=False, status=status.HTTP_500_INTERNAL_SERVER_ERROR)
    for verb in (verb for group in latest_schema.groups for verb in group["verbs"]):
    if verb["id"] in active_verbs and verb.get("allowAnonymizedCollection", False):
    min_count = verb.get("allowAnonymizedCollectionMinCount", settings.ANONYMIZATION_DEFAULT_MINIMUM_COUNT)
    query_start = time.time()
    current_count = collection.distinct("actor.mbox", {
    "$and": [
    {"verb.id": {"$eq": verb["id"]}},
    {"actor.mbox": {"$exists": True}},
    {"actor.mbox": {"$regex": "^" + settings.ANONYMIZATION_HASH_PREFIX}}
    ]
    }).__len__()
    current_count = len(
    collection.distinct("actor.mbox", {
    "$and": [
    {"verb.id": verb["id"]},
    {"actor.mbox": {"$exists": True}},
    {"actor.mbox": {"$regex": "^" + settings.ANONYMIZATION_HASH_PREFIX}}
    ]
    })
    )
    print(f"Query for verb {verb['id']} executed in {time.time() - query_start:.6f} seconds")
    if current_count >= min_count:
    anon_verbs.append(verb)
    anon_verbs.append(verb["id"])
    print(f"Anonymous verbs determined: {anon_verbs}")
    # Construct Query with Dynamic Filtering
    query_start_time = time.time()
    query = (
    {
    "$and": [
    {"actor.tan": {"$exists": False}},
    {"_id": {"$gt": ObjectId(last_object_id)}},
    {"verb.id": {"$in": active_verbs}},
    {"$or": [
    {"$and": [
    {"actor.mbox": {"$exists": True}},
    {"actor.mbox": {"$regex": "^" + settings.ANONYMIZATION_HASH_PREFIX}},
    {"verb.id": {"$in": anon_verbs}},
    ]},
    {"actor.mbox": {"$exists": False}},
    {"$and": [
    {"actor.mbox": {"$exists": True}},
    {"actor.mbox": {"$regex": "^mailto"}},
    ]},
    get_system_statement_query(providers)
    ]}
    ]
    }
    if last_object_id
    else {
    "$and": [
    {"actor.tan": {"$exists": False}},
    {"verb.id": {"$in": active_verbs}},
    {"$or": [
    {"$and": [
    {"actor.mbox": {"$exists": True}},
    {"actor.mbox": {"$regex": "^" + settings.ANONYMIZATION_HASH_PREFIX}},
    {"verb.id": {"$in": anon_verbs}},
    ]},
    {"actor.mbox": {"$exists": False}},
    {"$and": [
    {"actor.mbox": {"$exists": True}},
    {"actor.mbox": {"$regex": "^mailto"}},
    ]},
    get_system_statement_query(providers)
    ]}
    ]
    }
    )
    print(f"Query constructed in {time.time() - query_start_time:.6f} seconds")
    query = self.build_mongo_query(filters, active_verbs, anon_verbs, providers)
    print(f"Query constructed in {time.time() - query_start_time:.6f} seconds: {query}")
    # Execute MongoDB Query
    execution_start = time.time()
    cursor = collection.find(query).limit(page_size).batch_size(page_size)
    print(f"Query executed in {time.time() - execution_start:.6f} seconds")
    # Process Data
    data_time = time.time()
    data = {
    "verbs": list(dict.fromkeys(active_verbs)),
    "statements": [replace_provider_id(item) for item in cursor],
    statements = [replace_provider_id(item) for item in cursor]
    print(f"Data mapping executed in {time.time() - data_time:.6f} seconds")
    response_data = {
    "verbs": list(set(active_verbs)), # Remove duplicates
    "statements": statements,
    "page_size": page_size,
    }
    print(f"Data mapping executed in {time.time() - data_time:.6f} seconds")
    serializer = ProviderDataSerializer(data=data)
    # Serialize Response
    serializer = ProviderDataSerializer(data=response_data)
    if serializer.is_valid():
    print(f"Total execution time: {time.time() - start_time:.6f} seconds")
    return Response(serializer.data, status=status.HTTP_200_OK)
    print("Failed to serialize response data")
    return Response(
    {
    "message": "Failed to serialize response data.",
    "errors": serializer.errors,
    },
    {"message": "Failed to serialize response data.", "errors": serializer.errors},
    status=status.HTTP_400_BAD_REQUEST,
    )
    except ObjectDoesNotExist as c:
    print(f"Exception occurred: {c}")
    return Response(
    {"message": "Invalid analytics token " + token},
    status=status.HTTP_401_UNAUTHORIZED,
    )
    except ObjectDoesNotExist:
    print(f"Invalid analytics token: {token}")
    return Response({"message": "Invalid analytics token " + token}, status=status.HTTP_401_UNAUTHORIZED)
    class StoreAnalyticsEngineResult(APIView):
    ......
    0% Loading or .
    You are about to add 0 people to the discussion. Proceed with caution.
    Please register or to comment