Skip to content
Snippets Groups Projects
Commit f72b5e80 authored by Lennard Strohmeyer's avatar Lennard Strohmeyer :penguin:
Browse files

#106 and #99: implemented retry for unsuccessful forwards to other LRS with a...

#106 and #99: implemented retry for unsuccessful forwards to other LRS with a cutoff after approx. 2 weeks. Improved error reporting and fixed some error cases.
parent bc9ca58d
Branches
Tags
No related merge requests found
Pipeline #1560450 failed
......@@ -805,27 +805,26 @@ class RunResultsRetention(APIView):
def post(self, request):
token = request.headers.get("Authorization").split("Basic ")[1]
try:
AnalyticsToken.objects.get(key=token)
collection = lrs_db["results"]
query = {"analytics_token": token}
for context_id_distinct in collection.distinct("context_id",query):
print("Run result retention {0}".format(context_id_distinct))
cursor = collection.find({"analytics_token": token, "context_id": context_id_distinct}, sort=[("created_at", -1)]).skip(settings.ANALYTICS_RESULTS_RETENTION)
document_ids = [document["_id"] for document in cursor]
query = {"_id": {"$in": document_ids}}
cursor = collection.delete_many(query)
return Response({"deleted_documents": cursor.deleted_count}, status=status.HTTP_200_OK)
return Response({"deleted_documents": 0}, status=status.HTTP_200_OK)
except ObjectDoesNotExist:
return Response(
{"message": "Invalid analytics token"},
status=status.HTTP_401_UNAUTHORIZED,
)
collection = lrs_db["results"]
query = {"analytics_token": token}
for context_id_distinct in collection.distinct("context_id",query):
print("Run result retention {0}".format(context_id_distinct))
cursor = collection.find({"analytics_token": token, "context_id": context_id_distinct}, sort=[("created_at", -1)]).skip(settings.ANALYTICS_RESULTS_RETENTION)
document_ids = [document["_id"] for document in cursor]
if len(document_ids) > 0:
query = {"_id": {"$in": document_ids}}
cursor = collection.delete_many(query)
return Response({"deleted_documents": cursor.deleted_count}, status=status.HTTP_200_OK)
return Response({"deleted_documents": 0}, status=status.HTTP_200_OK)
class AddAnalyticsEngineAccess(APIView):
......
......@@ -147,9 +147,15 @@
"token": {
"type": "string",
"description": "token to authenticate with"
},
"token_type": {
"type": "string",
"description": "token to authenticate with",
"enum": ["Basic", "Bearer"],
"default": "Bearer"
}
},
"required": ["url", "token"]
"required": ["url", "token", "token_type"]
}
]
}
......
import requests
from django.conf import settings
from celery import shared_task
@shared_task(bind=True, autoretry_for=(RuntimeError,Exception,), retry_backoff=True, retry_jitter=False, retry_kwargs={'max_retries': 20}) # exponential backoff, 20 retries comes down to approx. 2 weeks
def retry_forward_statements(self, data, token_type, token, url):
headers = {"Authorization": token_type + " " + token}
res = requests.post(url, json=data, headers=headers)
if not res:
raise RuntimeError(f"Could not submit {len(data)} statements to {url}.")
\ No newline at end of file
......@@ -935,7 +935,8 @@ class TextxAPIAdditionalLrs(BaseTestCase):
"additionalLrs": [
{
"url": "http://localhost:5555/xapi/statements",
"token": "token_to_check"
"token": "token_to_check",
"token_type": "Bearer"
}
]
}
......
......@@ -22,6 +22,8 @@ import redis
import zeep
from pathlib import Path
from .tasks import retry_forward_statements
PROJECT_PATH = os.path.abspath(os.path.dirname(__name__))
with open(os.path.join(PROJECT_PATH, "static/xapi_statement.schema.json")) as f:
......@@ -58,54 +60,54 @@ def anonymize_statement(x_api_statement):
def shib_connector_resolver_to_pairwaise_id(email, provider):
if settings.SHIB_ID_CONNECTOR_ENABLED:
r = None
if settings.SHIB_ID_CONNECTOR_CACHE:
r = redis.from_url(settings.CELERY_BROKER_URL)
user_id = ""
lrs_type = ""
if settings.SHIB_ID_CONNECTOR_USE_FILE_MAPPING:
BASE_DIR = Path(__file__).resolve().parent.parent
mapping = json.load(open(os.path.join(BASE_DIR,'static/shibboleth_mapping.json')))
if provider.name in mapping:
lrs_type = mapping[provider.name]
user_id = email
else:
user_id, lrs_type = email.split("@")
if settings.SHIB_ID_CONNECTOR_CACHE and r.exists(email):
shib_id = str(r.get(email), encoding='utf-8')
else:
client = zeep.Client(wsdl=settings.SHIB_ID_CONNECTOR_URL)
app_secret = settings.SHIB_ID_CONNECTOR_APP_SECRET
processId = settings.SHIB_ID_CONNECTOR_PROCESS_ID
linkType = settings.SHIB_ID_CONNECTOR_LINK_TYPE
if settings.DEBUG:
print("Try to connect with app_secret={0}, processId={1}, linkType={2}, givenType={3}, user_id={4}".format(app_secret,processId, linkType, lrs_type, user_id))
additionalData = None
shib_id = client.service.GetOrGenerateIdAndConnect(app_secret, user_id, lrs_type, linkType, processId, additionalData)
if len(shib_id) != 1:
print("Multiple pairwise ids found, only use the first one!")
shib_id = shib_id[0]
if settings.DEBUG:
print("Result shib_id: {0}".format(shib_id))
if settings.SHIB_ID_CONNECTOR_CACHE:
r.set(email,shib_id)
r.expire(email,settings.SHIB_ID_CONNECTOR_CACHE_MAX_AGE*60)
if shib_id is not None:
return shib_id
r = None
if settings.SHIB_ID_CONNECTOR_CACHE:
r = redis.from_url(settings.CELERY_BROKER_URL)
user_id = ""
lrs_type = ""
if settings.SHIB_ID_CONNECTOR_USE_FILE_MAPPING:
BASE_DIR = Path(__file__).resolve().parent.parent
mapping = json.load(open(os.path.join(BASE_DIR,'static/shibboleth_mapping.json')))
if provider.name in mapping:
lrs_type = mapping[provider.name]
user_id = email
else:
user_id, lrs_type = email.split("@")
if settings.SHIB_ID_CONNECTOR_CACHE and r.exists(email):
shib_id = str(r.get(email), encoding='utf-8')
else:
client = zeep.Client(wsdl=settings.SHIB_ID_CONNECTOR_URL)
app_secret = settings.SHIB_ID_CONNECTOR_APP_SECRET
processId = settings.SHIB_ID_CONNECTOR_PROCESS_ID
linkType = settings.SHIB_ID_CONNECTOR_LINK_TYPE
if settings.DEBUG:
print("Try to connect with app_secret={0}, processId={1}, linkType={2}, givenType={3}, user_id={4}".format(app_secret,processId, linkType, lrs_type, user_id))
additionalData = None
shib_id = client.service.GetOrGenerateIdAndConnect(app_secret, user_id, lrs_type, linkType, processId, additionalData)
if len(shib_id) != 1:
print("Multiple pairwise ids found, only use the first one!")
shib_id = shib_id[0]
if settings.DEBUG:
print("Result shib_id: {0}".format(shib_id))
if settings.SHIB_ID_CONNECTOR_CACHE:
r.set(email, shib_id)
r.expire(email, settings.SHIB_ID_CONNECTOR_CACHE_MAX_AGE*60)
if shib_id is not None:
return shib_id
return ""
def shib_connector_resolver(email, provider):
if settings.SHIB_ID_CONNECTOR_ENABLED:
shib_id = shib_connector_resolver_to_pairwaise_id(email=email, provider=provider)
# Shib-ID is here the pairwaise id,so we need to resolve to the user email
try:
user = CustomUser.objects.get(shibboleth_connector_identifier=shib_id)
except ObjectDoesNotExist:
return "not_found";
email = user.email;
print("resolved to email: {0}".format(email))
shib_id = shib_connector_resolver_to_pairwaise_id(email=email, provider=provider)
# Shib-ID is the pairwise id, so we need to resolve to the user email
try:
user = CustomUser.objects.get(shibboleth_connector_identifier=shib_id)
except ObjectDoesNotExist:
return "not_found"
email = user.email
print("resolved to email: {0}".format(email))
return email
......@@ -146,7 +148,11 @@ def process_statement(x_api_statement, provider, latest_schema):
"reason": "User missing in statement",
}
email = mbox if mbox else account_email
email = shib_connector_resolver(email=email,provider=provider)
try:
email = shib_connector_resolver(email=email,provider=provider)
except Exception as e:
print(e)
raise RuntimeError("Error when resolving shib_id: " + str(e))
if settings.SHIB_ID_CONNECTOR_ENABLED:
x_api_statement["actor"]["mbox"] = "mailto:" + email
......@@ -320,21 +326,35 @@ class CreateStatement(APIView):
# forward to other LRS without validation etc., if given
if latest_schema.additional_lrs and isinstance(latest_schema.additional_lrs, list) and len(latest_schema.additional_lrs) > 0:
for additional_lrs in latest_schema.additional_lrs:
headers = {"Authorization": "Bearer " + additional_lrs["token"]}
for stmt in x_api_statements:
res = requests.post(additional_lrs["url"], json=stmt, headers=headers)
if not res and settings.DEBUG:
print("Could not forward to ", additional_lrs["url"], ":", res.reason, "({})".format(res.status_code))
elif res and settings.DEBUG:
print("Forwarded statement to ", additional_lrs["url"], ":", res.reason, "({})".format(res.status_code))
headers = {"Authorization": additional_lrs["token_type"] + " " + additional_lrs["token"]}
res = None
try:
res = requests.post(additional_lrs["url"], json=x_api_statements, headers=headers)
except Exception as e:
if settings.DEBUG:
print("Could not forward to ", additional_lrs["url"], ":", e)
if not res or res.status_code != 200:
retry_forward_statements.delay(x_api_statements, additional_lrs["token_type"], additional_lrs["token"], additional_lrs["url"])
if settings.DEBUG:
print("Could not forward to ", additional_lrs["url"], ":", res.reason if res is not None else "URL could not be reached", "({})".format(res.status_code) if res is not None else "")
elif res and settings.DEBUG:
print("Forwarded statement to ", additional_lrs["url"], ":", res.reason, "({})".format(res.status_code))
if settings.SHOW_XAPI_STATEMENTS:
print(x_api_statements)
result = [
process_statement(stmt, provider, latest_schema)
for stmt in x_api_statements
]
try:
result = [
process_statement(stmt, provider, latest_schema)
for stmt in x_api_statements
]
except Exception as e:
return JsonResponse(
{
"message": str(e),
},
status=status.HTTP_500_INTERNAL_SERVER_ERROR,
)
invalid_or_not_consented = (
len([e for e in result if e["valid"] == False or e["accepted"] == False])
......
0% Loading or .
You are about to add 0 people to the discussion. Proceed with caution.
Please register or to comment