diff --git a/README.md b/README.md index 47b62fd95a3bb0c98694c2dbfc4b7d84c64c9594..7cadda7dd38d90e0e0d4f49fe835c811d21124fc 100644 --- a/README.md +++ b/README.md @@ -1,3 +1,5 @@ + + # POLARIS - Rights Engine / Consent Engine This is the repository contains the source code of the rights engine of the Polaris project. The Consent Engine is a software solution designed to provide a secure and efficient way for obtaining and managing user consent. The project is divided into two main components: frontend and backend. The frontend component is responsible for providing a user-friendly interface for obtaining consent, while the backend component is responsible for storing and managing user consent data. diff --git a/src/__init__.py b/src/__init__.py new file mode 100644 index 0000000000000000000000000000000000000000..e69de29bb2d1d6434b8b29ae775ad8c2e48c5391 diff --git a/src/consents/tests/tests_provider_endpoints.py b/src/consents/tests/tests_provider_endpoints.py new file mode 100644 index 0000000000000000000000000000000000000000..adf6a9f5de081357a2252feee9027a217b1b0944 --- /dev/null +++ b/src/consents/tests/tests_provider_endpoints.py @@ -0,0 +1,190 @@ +import json +import os +from io import StringIO + +from rest_framework.test import APIClient +from rolepermissions.roles import assign_role +from django.core.management import call_command + +from consents.tests.tests_consent_operations import BaseTestCase +from providers.models import ProviderAuthorization, Provider, ProviderVerbGroup, ProviderSchema +from users.models import CustomUser + + +class TestVerbGroups(BaseTestCase): + verb_groups = [ + { + "provider_id": 1, + "id": "default_group", + "label": "Default group", + "description": "default", + "showVerbDetails": True, + "purposeOfCollection": "Lorem Ipsum", + "requiresConsent": True, + "verbs": [ + {"id": "http://h5p.example.com/expapi/verbs/experienced"}, + {"id": "http://h5p.example.com/expapi/verbs/attempted"}, + {"id": "http://h5p.example.com/expapi/verbs/interacted"}, + {"id": "http://h5p.example.com/expapi/verbs/answered"} + ] + }, + ] + + def setUp(self): + call_command('check_and_apply_migrations') + normal_user = CustomUser.objects.create_user( + self.test_user_email, self.test_user_password + ) + provider_user = CustomUser.objects.create_user( + self.test_provider_email, self.test_provider_password + ) + + assign_role(normal_user, "user") + assign_role(provider_user, "provider_manager") + + response = self.client.post( + "/api/v1/auth/token", + {"email": self.test_user_email, "password": self.test_user_password}, + ) # obtain token + self.assertEqual(response.status_code, 200) + self.assertTrue("access" in response.data) + user_token = response.data["access"] + + response = self.client.post( + "/api/v1/auth/token", + { + "email": self.test_provider_email, + "password": self.test_provider_password, + }, + ) # obtain token + self.assertEqual(response.status_code, 200) + self.assertTrue("access" in response.data) + provider_token = response.data["access"] + + self.user_client = APIClient() + self.user_client.credentials(HTTP_AUTHORIZATION="Bearer " + user_token) + + self.provider_client = APIClient() + self.provider_client.credentials(HTTP_AUTHORIZATION="Bearer " + provider_token) + + # Create provider schema + with StringIO(json.dumps(self.h5p_provider_schema)) as fp: + response = self.provider_client.put( + "/api/v1/consents/provider/create", + {"provider-schema": fp}, + format="multipart", + ) + self.assertEqual(response.status_code, 201) + + # Get application token for created provider + self.application_token = ProviderAuthorization.objects.get( + provider__name="H5P" + ).key + self.third_party_client = APIClient() + self.third_party_client.credentials( + HTTP_AUTHORIZATION="Basic " + self.application_token + ) + + # create some verb groups + for group in self.verb_groups: + response = self.provider_client.post( + "/api/v1/consents/provider/" + str(Provider.objects.latest('id').id) + "/create-verb-group", + group, + format="json", + ) + self.assertEqual(response.status_code, 200) + + + def test_get_verb_groups(self): + """ + Ensure correct verb groups are returned for a given provider. + """ + provider_id = Provider.objects.latest('id').id + # create a verb group + group = {"id": "default_group", "label": "Group 2", + "description": "Lorem ipsum", "showVerbDetails": True, "purposeOfCollection": "Lorem Ipsum", + "requiresConsent": True, "verbs": [ + {"id": "http://h5p.example.com/expapi/verbs/experienced"}, + {"id": "http://h5p.example.com/expapi/verbs/attempted"}, + {"id": "http://h5p.example.com/expapi/verbs/interacted"}, + {"id": "http://h5p.example.com/expapi/verbs/answered"}, + ]} + response = self.provider_client.post( + f"/api/v1/consents/provider/{provider_id}/create-verb-group", + group, + format="json", + ) + self.assertEqual(response.status_code, 200) + + # test permissions + response = self.user_client.get( + f"/api/v1/consents/provider/{provider_id}/verb-groups" + ) + self.assertEqual(response.status_code, 403) + + # check if group exists + response = self.provider_client.get( + f"/api/v1/consents/provider/{provider_id}/verb-groups" + ) + self.assertEqual(response.status_code, 200) + data = json.loads(response.content.decode()) + self.assertTrue(isinstance(data, list)) + self.assertEqual(len(data), 2) # another group is created during setup + self.assertEqual(data[-1]["purposeOfCollection"], "Lorem Ipsum") + self.assertTrue(isinstance(data[-1]["verbs"], list)) + self.assertEqual(data[-1]["verbs"][0]["id"], "http://h5p.example.com/expapi/verbs/experienced") + + def test_create_invalid_verb_group(self): + provider_id = Provider.objects.latest('id').id + group = {"id": "default_group", "label": "Group 2", + "description": "Lorem ipsum", "showVerbDetails": True, "purposeOfCollection": "Lorem Ipsum", + "requiresConsent": True, "verbs": [ + {"id": "http://h5p.example.com/expapi/verbs/experienced"}, + {"id": "http://h5p.example.com/expapi/verbs/atztempted"}, # mistyped on purpose + {"id": "http://h5p.example.com/expapi/verbs/interacted"}, + {"id": "http://h5p.example.com/expapi/verbs/answered"}, + ]} + response = self.provider_client.post( + f"/api/v1/consents/provider/{provider_id}/create-verb-group", + group, + format="json", + ) + self.assertEqual(response.status_code, 400) + + + def test_update_existing_verb_group(self): + provider_id = Provider.objects.latest('id').id + group = {"id": "default_group", "label": "Group 2", + "description": "Lorem ipsum", "showVerbDetails": True, "purposeOfCollection": "Lorem Ipsum", + "requiresConsent": True, "verbs": [ + {"id": "http://h5p.example.com/expapi/verbs/experienced"}, + {"id": "http://h5p.example.com/expapi/verbs/interacted"}, + {"id": "http://h5p.example.com/expapi/verbs/answered"}, + ]} + response = self.provider_client.post( + f"/api/v1/consents/provider/{provider_id}/create-verb-group", + group, + format="json", + ) + self.assertEqual(response.status_code, 200) + + group_id = ProviderVerbGroup.objects.latest('group_id').group_id # generated on server, has to be supplied for update + + group = {"id": group_id, "label": "Group 2", # id has to be the same and is generated when creating the group + "description": "Lorem ipsum!", "showVerbDetails": True, "purposeOfCollection": "Lorem Ipsum!", # changed + "requiresConsent": True, "verbs": [ + {"id": "http://h5p.example.com/expapi/verbs/experienced"}, + {"id": "http://h5p.example.com/expapi/verbs/attempted"}, # added + {"id": "http://h5p.example.com/expapi/verbs/interacted"}, + {"id": "http://h5p.example.com/expapi/verbs/answered"}, + ]} + response = self.provider_client.post( + f"/api/v1/consents/provider/{provider_id}/create-verb-group", + group, + format="json", + ) + self.assertEqual(response.status_code, 200) + + groups = ProviderVerbGroup.objects.filter(provider_id=provider_id).all() + + self.assertEqual(len(groups), 2) \ No newline at end of file diff --git a/src/consents/tests/tests_third_party.py b/src/consents/tests/tests_third_party.py index a1aae6a90770af88a329196e4a36cda57fd7aaa1..547841e4bf294e8a587f9f4ef630e5f7eaaf604d 100644 --- a/src/consents/tests/tests_third_party.py +++ b/src/consents/tests/tests_third_party.py @@ -303,3 +303,272 @@ class TestThirdPartyUserConsentUpdate(BaseTestCase): "/api/v1/consents/user/save/third-party", data=payload, format="json" ) self.assertEqual(response.status_code, 400) + + + + +class TestThirdPartyGetProviderStatus(BaseTestCase): + verb_groups = [ + { + "provider_id": 1, + "id": "default_group", + "label": "Default group", + "description": "default", + "showVerbDetails": True, + "purposeOfCollection": "Lorem Ipsum", + "requiresConsent": True, + "verbs": [ + {"id": "http://h5p.example.com/expapi/verbs/experienced"}, + {"id": "http://h5p.example.com/expapi/verbs/attempted"}, + {"id": "http://h5p.example.com/expapi/verbs/interacted"}, + {"id": "http://h5p.example.com/expapi/verbs/answered"} + ] + }, + ] + def setUp(self): + call_command('check_and_apply_migrations') + normal_user = CustomUser.objects.create_user( + self.test_user_email, self.test_user_password + ) + provider_user = CustomUser.objects.create_user( + self.test_provider_email, self.test_provider_password + ) + + assign_role(normal_user, "user") + assign_role(provider_user, "provider_manager") + + response = self.client.post( + "/api/v1/auth/token", + {"email": self.test_user_email, "password": self.test_user_password}, + ) # obtain token + self.assertEqual(response.status_code, 200) + self.assertTrue("access" in response.data) + user_token = response.data["access"] + + response = self.client.post( + "/api/v1/auth/token", + { + "email": self.test_provider_email, + "password": self.test_provider_password, + }, + ) # obtain token + self.assertEqual(response.status_code, 200) + self.assertTrue("access" in response.data) + provider_token = response.data["access"] + + self.user_client = APIClient() + self.user_client.credentials(HTTP_AUTHORIZATION="Bearer " + user_token) + + self.provider_client = APIClient() + self.provider_client.credentials(HTTP_AUTHORIZATION="Bearer " + provider_token) + + # Create provider schema + with StringIO(json.dumps(self.h5p_provider_schema)) as fp: + response = self.provider_client.put( + "/api/v1/consents/provider/create", + {"provider-schema": fp}, + format="multipart", + ) + self.assertEqual(response.status_code, 201) + + # Get application token for created provider + self.application_token = ProviderAuthorization.objects.get( + provider__name="H5P" + ).key + self.third_party_client = APIClient() + self.third_party_client.credentials( + HTTP_AUTHORIZATION="Basic " + self.application_token + ) + + # create some verb groups + for group in self.verb_groups: + response = self.provider_client.post( + "/api/v1/consents/provider/" + str(Provider.objects.latest('id').id) + "/create-verb-group", + group, + format="json", + ) + self.assertEqual(response.status_code, 200) + + def test_provider_status_401(self): + """ + Ensure requests including an invalid application token are rejected. + """ + response = self.user_client.get( + "/api/v1/consents/provider-status/third-party" + ) + self.assertEqual(response.status_code, 401) + + def test_provider_status(self): + response = self.third_party_client.get( + "/api/v1/consents/provider-status/third-party" + ) + self.assertEqual(response.status_code, 200) + + data = json.loads(response.content.decode()) + self.assertTrue(all(key in data.keys() for key in ["id", "name", "groups", "versions"])) + self.assertEqual(data["name"], "H5P") + +class TestThirdPartyUserManagement(BaseTestCase): + verb_groups = [ + { + "provider_id": 1, + "id": "default_group", + "label": "Default group", + "description": "default", + "showVerbDetails": True, + "purposeOfCollection": "Lorem Ipsum", + "requiresConsent": True, + "verbs": [ + {"id": "http://h5p.example.com/expapi/verbs/experienced"}, + {"id": "http://h5p.example.com/expapi/verbs/attempted"}, + {"id": "http://h5p.example.com/expapi/verbs/interacted"}, + {"id": "http://h5p.example.com/expapi/verbs/answered"} + ] + }, + ] + + def setUp(self): + call_command('check_and_apply_migrations') + normal_user = CustomUser.objects.create_user( + self.test_user_email, self.test_user_password + ) + provider_user = CustomUser.objects.create_user( + self.test_provider_email, self.test_provider_password + ) + + assign_role(normal_user, "user") + assign_role(provider_user, "provider_manager") + + response = self.client.post( + "/api/v1/auth/token", + {"email": self.test_user_email, "password": self.test_user_password}, + ) # obtain token + self.assertEqual(response.status_code, 200) + self.assertTrue("access" in response.data) + user_token = response.data["access"] + + response = self.client.post( + "/api/v1/auth/token", + { + "email": self.test_provider_email, + "password": self.test_provider_password, + }, + ) # obtain token + self.assertEqual(response.status_code, 200) + self.assertTrue("access" in response.data) + provider_token = response.data["access"] + + self.user_client = APIClient() + self.user_client.credentials(HTTP_AUTHORIZATION="Bearer " + user_token) + + self.provider_client = APIClient() + self.provider_client.credentials(HTTP_AUTHORIZATION="Bearer " + provider_token) + + # Create provider schema + with StringIO(json.dumps(self.h5p_provider_schema)) as fp: + response = self.provider_client.put( + "/api/v1/consents/provider/create", + {"provider-schema": fp}, + format="multipart", + ) + self.assertEqual(response.status_code, 201) + + # Get application token for created provider + self.application_token = ProviderAuthorization.objects.get( + provider__name="H5P" + ).key + self.third_party_client = APIClient() + self.third_party_client.credentials( + HTTP_AUTHORIZATION="Basic " + self.application_token + ) + + # create some verb groups + for group in self.verb_groups: + response = self.provider_client.post( + "/api/v1/consents/provider/" + str(Provider.objects.latest('id').id) + "/create-verb-group", + group, + format="json", + ) + self.assertEqual(response.status_code, 200) + + + def test_create_user(self): + # test permission check + response = self.user_client.post( + "/api/v1/consents/user/create", + {} + ) + self.assertEqual(response.status_code, 401) + + # check validation + response = self.third_party_client.post( + "/api/v1/consents/user/create", + {} + ) + self.assertEqual(response.status_code, 400) + + response = self.third_party_client.post( + "/api/v1/consents/user/create", + { + "email": "test@polaris.com", + "first_name": "Max", + "last_name": "Mustermann" + } + ) + self.assertEqual(response.status_code, 200) + + count_before = CustomUser.objects.count() + 0 + + # try to create with same email again + response = self.third_party_client.post( + "/api/v1/consents/user/create", + { + "email": "test@polaris.com", + "first_name": "Max", + "last_name": "Mustermann" + } + ) + self.assertEqual(response.status_code, 200) + # no new user should have been created + self.assertEqual(CustomUser.objects.count(), count_before) + + def test_create_user_via_connect_service(self): + # test permission check + response = self.user_client.post( + "/api/v1/consents/user/create-via-connect-service", + {} + ) + self.assertEqual(response.status_code, 401) + + # check validation + response = self.third_party_client.post( + "/api/v1/consents/user/create-via-connect-service", + {} + ) + self.assertEqual(response.status_code, 400) + + # create user + response = self.third_party_client.post( + "/api/v1/consents/user/create-via-connect-service", + { + "email": "test@polaris.com", + } + ) + self.assertEqual(response.status_code, 200) + data = json.loads(response.content.decode()) + self.assertEqual(data["message"], "user has been created") + + count_before = CustomUser.objects.count() + 0 + + # try to create with same email again + response = self.third_party_client.post( + "/api/v1/consents/user/create-via-connect-service", + { + "email": "test@polaris.com", + } + ) + self.assertEqual(response.status_code, 200) + data = json.loads(response.content.decode()) + self.assertEqual(data["message"], "user exists") + # no new user should have been created + self.assertEqual(CustomUser.objects.count(), count_before) \ No newline at end of file diff --git a/src/consents/tests/tests_user_endpoints.py b/src/consents/tests/tests_user_endpoints.py new file mode 100644 index 0000000000000000000000000000000000000000..6c9f17584ede554104b1e5dc26aa11190f8d5521 --- /dev/null +++ b/src/consents/tests/tests_user_endpoints.py @@ -0,0 +1,327 @@ +import json +import os +from io import StringIO + +from rest_framework.test import APIClient +from rolepermissions.roles import assign_role +from django.core.management import call_command + +from consents.tests.tests_consent_operations import BaseTestCase +from providers.models import ProviderAuthorization, Provider, ProviderVerbGroup, ProviderSchema +from users.models import CustomUser + +from consents.models import UserConsents + + +class TestUserEndpoints(BaseTestCase): + verb_groups = [ + { + "provider_id": 1, + "id": "default_group", + "label": "Default group", + "description": "default", + "showVerbDetails": True, + "purposeOfCollection": "Lorem Ipsum", + "requiresConsent": True, + "verbs": [ + {"id": "http://h5p.example.com/expapi/verbs/experienced"}, + {"id": "http://h5p.example.com/expapi/verbs/attempted"}, + {"id": "http://h5p.example.com/expapi/verbs/interacted"}, + {"id": "http://h5p.example.com/expapi/verbs/answered"} + ] + }, + ] + + def setUp(self): + call_command('check_and_apply_migrations') + normal_user = CustomUser.objects.create_user( + self.test_user_email, self.test_user_password + ) + provider_user = CustomUser.objects.create_user( + self.test_provider_email, self.test_provider_password + ) + + assign_role(normal_user, "user") + assign_role(provider_user, "provider_manager") + assign_role(provider_user, 'analyst') # needed to create analytics tokens later on + + response = self.client.post( + "/api/v1/auth/token", + {"email": self.test_user_email, "password": self.test_user_password}, + ) # obtain token + self.assertEqual(response.status_code, 200) + self.assertTrue("access" in response.data) + user_token = response.data["access"] + + response = self.client.post( + "/api/v1/auth/token", + { + "email": self.test_provider_email, + "password": self.test_provider_password, + }, + ) # obtain token + self.assertEqual(response.status_code, 200) + self.assertTrue("access" in response.data) + provider_token = response.data["access"] + + self.user_client = APIClient() + self.user_client.credentials(HTTP_AUTHORIZATION="Bearer " + user_token) + + self.provider_client = APIClient() + self.provider_client.credentials(HTTP_AUTHORIZATION="Bearer " + provider_token) + + # Create provider schema + with StringIO(json.dumps(self.h5p_provider_schema)) as fp: + response = self.provider_client.put( + "/api/v1/consents/provider/create", + {"provider-schema": fp}, + format="multipart", + ) + self.assertEqual(response.status_code, 201) + + # Get application token for created provider + self.application_token = ProviderAuthorization.objects.get( + provider__name="H5P" + ).key + self.third_party_client = APIClient() + self.third_party_client.credentials( + HTTP_AUTHORIZATION="Basic " + self.application_token + ) + + # create some verb groups + for group in self.verb_groups: + response = self.provider_client.post( + "/api/v1/consents/provider/" + str(Provider.objects.latest('id').id) + "/create-verb-group", + group, + format="json", + ) + self.assertEqual(response.status_code, 200) + + def test_get_consent_for_provider(self): + # Note: this route is currently not being used in the frontend since the consent history replaces most of the functionality + provider_id = Provider.objects.latest('id').id + + response = self.user_client.get( + f"/api/v1/consents/user/{provider_id}", + ) + self.assertEqual(response.status_code, 200) + data = json.loads(response.content.decode()) + + # right now we haven't declared consent for any groups + self.assertEqual(data["consent"], None) + + # create consent + group_id = ProviderVerbGroup.objects.latest('id').id # maybe todo: rework frontend to use the "group_id" attribute instead of primary keys + response = self.user_client.post( + "/api/v1/consents/user/save", {"groups": [group_id]}, format="json" + ) + self.assertEqual(response.status_code, 200) + + # now we would expect the consent to be present in the response + response = self.user_client.get( + f"/api/v1/consents/user/{provider_id}", + ) + self.assertEqual(response.status_code, 200) + data = json.loads(response.content.decode()) + + self.assertTrue(all(key in data.keys() for key in ["consent", "provider_schema", "paused_data_recording"])) + self.assertFalse(data["consent"] is None) + + def test_get_consent_history(self): + provider_id = Provider.objects.latest('id').id + + # consent history should be empty + response = self.user_client.get( + "/api/v1/consents/user/history", + ) + self.assertEqual(response.status_code, 200) + data = json.loads(response.content.decode()) + self.assertTrue("groups" in data.keys()) + self.assertEqual(len(data["groups"]), 0) + + # create consent + group_id = ProviderVerbGroup.objects.latest('id').id + response = self.user_client.post( + "/api/v1/consents/user/save", {"groups": [group_id]}, format="json" + ) + self.assertEqual(response.status_code, 200) + + response = self.user_client.get( + "/api/v1/consents/user/history", + ) + self.assertEqual(response.status_code, 200) + data = json.loads(response.content.decode()) + self.assertTrue("groups" in data.keys()) + self.assertEqual(len(data["groups"]), 1) + + # create another group + group = {"id": "default_group", "label": "Group 2", + "description": "Lorem ipsum", "showVerbDetails": True, "purposeOfCollection": "Lorem Ipsum", + "requiresConsent": True, "verbs": [ + {"id": "http://h5p.example.com/expapi/verbs/experienced"}, + {"id": "http://h5p.example.com/expapi/verbs/interacted"}, + {"id": "http://h5p.example.com/expapi/verbs/answered"}, + ]} + response = self.provider_client.post( + f"/api/v1/consents/provider/{provider_id}/create-verb-group", + group, + format="json", + ) + self.assertEqual(response.status_code, 200) + + # create consent for second group + group_id = ProviderVerbGroup.objects.latest('id').id + response = self.user_client.post( + "/api/v1/consents/user/save", {"groups": [group_id]}, format="json" + ) + self.assertEqual(response.status_code, 200) + + # check that both consents are present in history + response = self.user_client.get( + "/api/v1/consents/user/history", + ) + self.assertEqual(response.status_code, 200) + data = json.loads(response.content.decode()) + self.assertTrue("groups" in data.keys()) + self.assertEqual(len(data["groups"]), 2) + + def test_update_consent_group_active(self): + provider_id = Provider.objects.latest('id').id + + response = self.user_client.get( + f"/api/v1/consents/user/{provider_id}", + ) + self.assertEqual(response.status_code, 200) + data = json.loads(response.content.decode()) + + # right now we haven't declared consent for any groups + self.assertEqual(data["consent"], None) + + # create consent + group_id = ProviderVerbGroup.objects.latest('id').id + response = self.user_client.post( + "/api/v1/consents/user/save", {"groups": [group_id]}, format="json" + ) + self.assertEqual(response.status_code, 200) + + # check if incorrect request is rejected + response = self.user_client.post( + "/api/v1/consents/user/update-consent-group-active", {}, format="json" + ) + self.assertEqual(response.status_code, 400) + + # update group consent + response = self.user_client.post( + "/api/v1/consents/user/update-consent-group-active", {"id": group_id, "active": False}, format="json" + ) + self.assertEqual(response.status_code, 200) + # check if status correctly stored in DB + self.assertFalse(UserConsents.objects.last().active) + + def test_revoke_consent_group(self): + provider_id = Provider.objects.latest('id').id + + response = self.user_client.get( + f"/api/v1/consents/user/{provider_id}", + ) + self.assertEqual(response.status_code, 200) + data = json.loads(response.content.decode()) + + # right now we haven't declared consent for any groups + self.assertEqual(data["consent"], None) + + # create consent + group_id = ProviderVerbGroup.objects.latest('id').id + response = self.user_client.post( + "/api/v1/consents/user/save", {"groups": [group_id]}, format="json" + ) + self.assertEqual(response.status_code, 200) + + # check if incorrect request is rejected + response = self.user_client.post( + "/api/v1/consents/user/revoke-consent-group", {}, format="json" + ) + self.assertEqual(response.status_code, 400) + + # update group consent + response = self.user_client.post( + "/api/v1/consents/user/revoke-consent-group", {"id": group_id}, format="json" + ) + self.assertEqual(response.status_code, 200) + # check if consent correctly updated in DB + self.assertFalse(UserConsents.objects.last().consented) + + def test_get_analytics_tokens(self): + response = self.provider_client.get( + f"/api/v1/consents/user/analytics-tokens", + ) + self.assertEqual(response.status_code, 200) + data = json.loads(response.content.decode()) + self.assertEqual(len(data), 0) + + response = self.provider_client.post( + f"/api/v1/provider/analytics-tokens/create", + { + "name": "Test analytics token", + "description": "Test analytics token", + "expires": "12/31/2099", + "can_access": [] + } + ) + self.assertEqual(response.status_code, 201) + + response = self.provider_client.get( + f"/api/v1/consents/user/analytics-tokens", + ) + self.assertEqual(response.status_code, 200) + data = json.loads(response.content.decode()) + self.assertEqual(len(data), 1) + + def test_get_analytics_token_consent(self): + response = self.provider_client.get( + f"/api/v1/consents/user/analytics-tokens", + ) + self.assertEqual(response.status_code, 200) + data = json.loads(response.content.decode()) + self.assertEqual(len(data), 0) + + response = self.provider_client.post( + f"/api/v1/provider/analytics-tokens/create", + { + "name": "Test analytics token", + "description": "Test analytics token", + "expires": "12/31/2099", + "can_access": [] + } + ) + self.assertEqual(response.status_code, 201) + data = json.loads(response.content.decode()) + analyticTokenId = data["id"] + + response = self.user_client.post( + f"/api/v1/consents/user/analytics-tokens/consent", + { + "analyticTokenId": analyticTokenId + } + ) + self.assertEqual(response.status_code, 200) + # maybe todo: check if consents created correctly (?) + + def test_get_providers(self): + response = self.user_client.get( + f"/api/v1/consents/user/providers", + ) + self.assertEqual(response.status_code, 200) + data = json.loads(response.content.decode()) + self.assertEqual(len(data), 1) + + + def test_get_status(self): + response = self.user_client.get( + f"/api/v1/consents/user/status", + ) + self.assertEqual(response.status_code, 200) + data = json.loads(response.content.decode()) + self.assertTrue("result" in data.keys()) + self.assertTrue("1" in data["result"].keys()) + self.assertEqual(data["result"]["1"]["status"], "none") # since consent is coupled with groups, consent for a whole provider schema can not exist \ No newline at end of file diff --git a/src/consents/urls.py b/src/consents/urls.py index a1e4f71d98a566c174f130c13187c3c8dc5e86d1..fc0ea1c0ea78a2ea882a34271b2b5304165fb3a9 100644 --- a/src/consents/urls.py +++ b/src/consents/urls.py @@ -6,8 +6,8 @@ urlpatterns = [ path('provider', views.GetProviderSchemasView.as_view()), path('provider-status/third-party', views.GetProviderStatusThirdPartyView.as_view()), path('provider/create', views.CreateProviderConsentView.as_view()), - path('provider/<provider_id>/create-verb-group', views.CreateProviderVerbGroupView.as_view()), # TODO - path('provider/<provider_id>/verb-groups', views.GetProviderVerbGroupsView.as_view()), # TODO + path('provider/<provider_id>/create-verb-group', views.CreateProviderVerbGroupView.as_view()), + path('provider/<provider_id>/verb-groups', views.GetProviderVerbGroupsView.as_view()), path('user/save', views.SaveUserConsentView.as_view()), path('user/create', views.CreateUserConsentView.as_view()), path('user/create-via-connect-service', views.CreateUserConsentViaConnectServiceView.as_view()), diff --git a/src/consents/views.py b/src/consents/views.py index 6e140bcbf60fef60cb64c7406be5380ce33cf0b8..c4731ba2bfbac8b60ee099120704b37ca39b70f4 100644 --- a/src/consents/views.py +++ b/src/consents/views.py @@ -199,14 +199,16 @@ class GetProviderVerbGroupsView(generics.ListAPIView): """ Lists all verb groups for the current provider. """ - def get(self, request): - application_token = request.headers.get("Authorization", "").split("Basic ")[-1] - provider = ProviderAuthorization.objects.filter(key=application_token).first() + permission_classes = [IsAuthenticated, IsProvider] + + def get(self, request, provider_id): + + provider = ProviderAuthorization.objects.filter(pk=provider_id).first() if provider is None: return JsonResponse( - {"message": "invalid access token"}, + {"message": "invalid provider supplied"}, safe=False, - status=status.HTTP_401_UNAUTHORIZED, + status=status.HTTP_404_NOT_FOUND, ) verb_groups = ProviderVerbGroup.objects.filter(provider=provider.provider).all() serializer = ProviderVerbGroupSerializer(verb_groups, many=True) @@ -487,7 +489,7 @@ class GetUserConsentHistoryView(APIView): if not user_consents.first(): return JsonResponse( { - "consents": [], + "groups": [], }, safe=False, status=status.HTTP_200_OK, diff --git a/src/data_disclosure/tests.py b/src/data_disclosure/tests.py deleted file mode 100644 index 7ce503c2dd97ba78597f6ff6e4393132753573f6..0000000000000000000000000000000000000000 --- a/src/data_disclosure/tests.py +++ /dev/null @@ -1,3 +0,0 @@ -from django.test import TestCase - -# Create your tests here. diff --git a/src/data_disclosure/tests/__init__.py b/src/data_disclosure/tests/__init__.py new file mode 100644 index 0000000000000000000000000000000000000000..e69de29bb2d1d6434b8b29ae775ad8c2e48c5391 diff --git a/src/data_disclosure/tests/tests_data_disclosure.py b/src/data_disclosure/tests/tests_data_disclosure.py new file mode 100644 index 0000000000000000000000000000000000000000..d338e4fc58b524256cd3560558c21427125cbdfe --- /dev/null +++ b/src/data_disclosure/tests/tests_data_disclosure.py @@ -0,0 +1,251 @@ +import json +import os +from io import StringIO + +from django.conf import settings +from django.core.files.base import ContentFile +from django.core.files.storage import default_storage +from rest_framework.test import APIClient +from rolepermissions.roles import assign_role +from django.core.management import call_command + +from consents.tests.tests_consent_operations import BaseTestCase +from providers.models import ProviderAuthorization, Provider, ProviderVerbGroup, ProviderSchema +from users.models import CustomUser + +from data_disclosure.models import DataDisclosure + + +class TestsDataDisclosure(BaseTestCase): + verb_groups = [ + { + "provider_id": 1, + "id": "default_group", + "label": "Default group", + "description": "default", + "showVerbDetails": True, + "purposeOfCollection": "Lorem Ipsum", + "requiresConsent": True, + "verbs": [ + {"id": "http://h5p.example.com/expapi/verbs/experienced"}, + {"id": "http://h5p.example.com/expapi/verbs/attempted"}, + {"id": "http://h5p.example.com/expapi/verbs/interacted"}, + {"id": "http://h5p.example.com/expapi/verbs/answered"} + ] + }, + ] + + def setUp(self): + call_command('check_and_apply_migrations') + normal_user = CustomUser.objects.create_user( + self.test_user_email, self.test_user_password + ) + provider_user = CustomUser.objects.create_user( + self.test_provider_email, self.test_provider_password + ) + + assign_role(normal_user, "user") + assign_role(provider_user, "provider_manager") + + response = self.client.post( + "/api/v1/auth/token", + {"email": self.test_user_email, "password": self.test_user_password}, + ) # obtain token + self.assertEqual(response.status_code, 200) + self.assertTrue("access" in response.data) + user_token = response.data["access"] + + response = self.client.post( + "/api/v1/auth/token", + { + "email": self.test_provider_email, + "password": self.test_provider_password, + }, + ) # obtain token + self.assertEqual(response.status_code, 200) + self.assertTrue("access" in response.data) + provider_token = response.data["access"] + + self.user_client = APIClient() + self.user_client.credentials(HTTP_AUTHORIZATION="Bearer " + user_token) + + self.provider_client = APIClient() + self.provider_client.credentials(HTTP_AUTHORIZATION="Bearer " + provider_token) + + # Create provider schema + with StringIO(json.dumps(self.h5p_provider_schema)) as fp: + response = self.provider_client.put( + "/api/v1/consents/provider/create", + {"provider-schema": fp}, + format="multipart", + ) + self.assertEqual(response.status_code, 201) + + # Get application token for created provider + self.application_token = ProviderAuthorization.objects.get( + provider__name="H5P" + ).key + self.third_party_client = APIClient() + self.third_party_client.credentials( + HTTP_AUTHORIZATION="Basic " + self.application_token + ) + + # create some verb groups + for group in self.verb_groups: + response = self.provider_client.post( + "/api/v1/consents/provider/" + str(Provider.objects.latest('id').id) + "/create-verb-group", + group, + format="json", + ) + self.assertEqual(response.status_code, 200) + + + def test_create_job(self): + self.assertEqual(DataDisclosure.objects.count(), 0) + + response = self.user_client.post("/api/v1/data-disclosure/create") + self.assertEqual(response.status_code, 200) + + self.assertEqual(DataDisclosure.objects.count(), 1) + + def test_list_jobs(self): + response = self.user_client.get("/api/v1/data-disclosure/list") + self.assertEqual(response.status_code, 200) + data = json.loads(response.content.decode()) + self.assertEqual(len(data), 0) + + response = self.user_client.post("/api/v1/data-disclosure/create") + self.assertEqual(response.status_code, 200) + + response = self.user_client.get("/api/v1/data-disclosure/list") + self.assertEqual(response.status_code, 200) + data = json.loads(response.content.decode()) + self.assertEqual(len(data), 1) + + def test_get_secret(self): + response = self.user_client.get("/api/v1/data-disclosure/list") + self.assertEqual(response.status_code, 200) + data = json.loads(response.content.decode()) + self.assertEqual(len(data), 0) + + response = self.user_client.post("/api/v1/data-disclosure/create") + self.assertEqual(response.status_code, 200) + + # create second job from another account + response = self.provider_client.post("/api/v1/data-disclosure/create") + self.assertEqual(response.status_code, 200) + + response = self.user_client.get("/api/v1/data-disclosure/list") + self.assertEqual(response.status_code, 200) + data = json.loads(response.content.decode()) + self.assertEqual(len(data), 1) + + # check if we can get the key + response = self.user_client.get(f"/api/v1/data-disclosure/file_secret/{data[0]['id']}") + self.assertEqual(response.status_code, 200) + data = json.loads(response.content.decode()) + self.assertTrue("secret" in data.keys()) + + # get ID for other account to test permission check + response = self.provider_client.get("/api/v1/data-disclosure/list") + self.assertEqual(response.status_code, 200) + data = json.loads(response.content.decode()) + self.assertEqual(len(data), 1) + + response = self.user_client.get(f"/api/v1/data-disclosure/file_secret/{data[0]['id']}") # note the "user_client" + self.assertEqual(response.status_code, 403) + + + def test_get_file(self): + response = self.user_client.get("/api/v1/data-disclosure/list") + self.assertEqual(response.status_code, 200) + data = json.loads(response.content.decode()) + self.assertEqual(len(data), 0) + + response = self.user_client.post("/api/v1/data-disclosure/create") + self.assertEqual(response.status_code, 200) + + + response = self.user_client.get("/api/v1/data-disclosure/list") + self.assertEqual(response.status_code, 200) + data = json.loads(response.content.decode()) + self.assertEqual(len(data), 1) + + file_id = data[0]["id"] + + # get the key + response = self.user_client.get(f"/api/v1/data-disclosure/file_secret/{file_id}") + self.assertEqual(response.status_code, 200) + data = json.loads(response.content.decode()) + self.assertTrue("secret" in data.keys()) + + secret = data["secret"] + + # create zip file to fulfil requirements + empty_zip_data = ContentFile(b'PK\x05\x06\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00') + file_name = default_storage.save( + f"{settings.DATA_DISCLOSURE_LOCATION}/{file_id}.zip", empty_zip_data + ) + + disc = DataDisclosure.objects.get(id=file_id) + disc.filename = file_name + disc.save() + + # try with wrong secret + response = self.user_client.get(f"/api/v1/data-disclosure/files/{file_id}/somerandomstring") + self.assertEqual(response.status_code, 403) + + + response = self.user_client.get(f"/api/v1/data-disclosure/files/{file_id}/{secret}") + self.assertEqual(response.status_code, 200) + + def test_delete_file(self): + response = self.user_client.get("/api/v1/data-disclosure/list") + self.assertEqual(response.status_code, 200) + data = json.loads(response.content.decode()) + self.assertEqual(len(data), 0) + + response = self.user_client.post("/api/v1/data-disclosure/create") + self.assertEqual(response.status_code, 200) + + + response = self.user_client.get("/api/v1/data-disclosure/list") + self.assertEqual(response.status_code, 200) + data = json.loads(response.content.decode()) + self.assertEqual(len(data), 1) + + file_id = data[0]["id"] + + # get the key + response = self.user_client.get(f"/api/v1/data-disclosure/file_secret/{file_id}") + self.assertEqual(response.status_code, 200) + data = json.loads(response.content.decode()) + self.assertTrue("secret" in data.keys()) + + secret = data["secret"] + + # try with wrong secret + response = self.user_client.get(f"/api/v1/data-disclosure/files/{file_id}/somerandomstring") + self.assertEqual(response.status_code, 403) + + # create zip file to fulfil requirements + empty_zip_data = ContentFile(b'PK\x05\x06\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00') + file_name = default_storage.save( + f"{settings.DATA_DISCLOSURE_LOCATION}/{file_id}.zip", empty_zip_data + ) + + disc = DataDisclosure.objects.get(id=file_id) + disc.filename = file_name + disc.save() + + response = self.user_client.get(f"/api/v1/data-disclosure/files/{file_id}/{secret}") + self.assertEqual(response.status_code, 200) + + # try wrong account for deletion + response = self.provider_client.delete(f"/api/v1/data-disclosure/{file_id}/delete") + self.assertEqual(response.status_code, 403) + + response = self.user_client.delete(f"/api/v1/data-disclosure/{file_id}/delete") + self.assertEqual(response.status_code, 200) + + self.assertFalse(default_storage.exists(file_name)) \ No newline at end of file diff --git a/src/data_removal/tests.py b/src/data_removal/tests.py deleted file mode 100644 index 7ce503c2dd97ba78597f6ff6e4393132753573f6..0000000000000000000000000000000000000000 --- a/src/data_removal/tests.py +++ /dev/null @@ -1,3 +0,0 @@ -from django.test import TestCase - -# Create your tests here. diff --git a/src/data_removal/tests/__init__.py b/src/data_removal/tests/__init__.py new file mode 100644 index 0000000000000000000000000000000000000000..e69de29bb2d1d6434b8b29ae775ad8c2e48c5391 diff --git a/src/data_removal/tests/tests_data_removal.py b/src/data_removal/tests/tests_data_removal.py new file mode 100644 index 0000000000000000000000000000000000000000..1774e7e6ce1a09a55382eec16047a56b4371ca80 --- /dev/null +++ b/src/data_removal/tests/tests_data_removal.py @@ -0,0 +1,130 @@ +import json +import os +from io import StringIO + +from django.conf import settings +from django.core.files.base import ContentFile +from django.core.files.storage import default_storage +from rest_framework.test import APIClient +from rolepermissions.roles import assign_role +from django.core.management import call_command + +from consents.tests.tests_consent_operations import BaseTestCase +from providers.models import ProviderAuthorization, Provider, ProviderVerbGroup, ProviderSchema +from users.models import CustomUser + +from data_removal.models import DataRemovalJob + + +class TestsDataRemoval(BaseTestCase): + verb_groups = [ + { + "provider_id": 1, + "id": "default_group", + "label": "Default group", + "description": "default", + "showVerbDetails": True, + "purposeOfCollection": "Lorem Ipsum", + "requiresConsent": True, + "verbs": [ + {"id": "http://h5p.example.com/expapi/verbs/experienced"}, + {"id": "http://h5p.example.com/expapi/verbs/attempted"}, + {"id": "http://h5p.example.com/expapi/verbs/interacted"}, + {"id": "http://h5p.example.com/expapi/verbs/answered"} + ] + }, + ] + + def setUp(self): + call_command('check_and_apply_migrations') + normal_user = CustomUser.objects.create_user( + self.test_user_email, self.test_user_password + ) + provider_user = CustomUser.objects.create_user( + self.test_provider_email, self.test_provider_password + ) + + assign_role(normal_user, "user") + assign_role(provider_user, "provider_manager") + + response = self.client.post( + "/api/v1/auth/token", + {"email": self.test_user_email, "password": self.test_user_password}, + ) # obtain token + self.assertEqual(response.status_code, 200) + self.assertTrue("access" in response.data) + user_token = response.data["access"] + + response = self.client.post( + "/api/v1/auth/token", + { + "email": self.test_provider_email, + "password": self.test_provider_password, + }, + ) # obtain token + self.assertEqual(response.status_code, 200) + self.assertTrue("access" in response.data) + provider_token = response.data["access"] + + self.user_client = APIClient() + self.user_client.credentials(HTTP_AUTHORIZATION="Bearer " + user_token) + + self.provider_client = APIClient() + self.provider_client.credentials(HTTP_AUTHORIZATION="Bearer " + provider_token) + + # Create provider schema + with StringIO(json.dumps(self.h5p_provider_schema)) as fp: + response = self.provider_client.put( + "/api/v1/consents/provider/create", + {"provider-schema": fp}, + format="multipart", + ) + self.assertEqual(response.status_code, 201) + + # Get application token for created provider + self.application_token = ProviderAuthorization.objects.get( + provider__name="H5P" + ).key + self.third_party_client = APIClient() + self.third_party_client.credentials( + HTTP_AUTHORIZATION="Basic " + self.application_token + ) + + # create some verb groups + for group in self.verb_groups: + response = self.provider_client.post( + "/api/v1/consents/provider/" + str(Provider.objects.latest('id').id) + "/create-verb-group", + group, + format="json", + ) + self.assertEqual(response.status_code, 200) + + + def test_create_job(self): + self.assertEqual(DataRemovalJob.objects.count(), 0) + + response = self.user_client.post("/api/v1/data-removal/create", { + "scope": {}, + "immediately": True, + }, format="json") + self.assertEqual(response.status_code, 200) + + self.assertEqual(DataRemovalJob.objects.count(), 1) + + + def test_list_jobs(self): + response = self.user_client.get("/api/v1/data-removal/list", ) + self.assertEqual(response.status_code, 200) + data = json.loads(response.content.decode()) + self.assertEqual(len(data), 0) + + response = self.user_client.post("/api/v1/data-removal/create", { + "scope": {}, + "immediately": True, + }, format="json") + self.assertEqual(response.status_code, 200) + + response = self.user_client.get("/api/v1/data-removal/list",) + self.assertEqual(response.status_code, 200) + data = json.loads(response.content.decode()) + self.assertEqual(len(data), 1) \ No newline at end of file diff --git a/src/data_removal/views.py b/src/data_removal/views.py index dd3d7f0c6518b91d3425f9b4eda9ee44b222d6b1..a84e922d7cacb577817536ef90ff383dd4cb6ceb 100644 --- a/src/data_removal/views.py +++ b/src/data_removal/views.py @@ -61,7 +61,7 @@ class CreateDataRemovalJob(APIView): [statement_filter, description] = get_scope_filter( request.user.id, request.data["scope"] ) - if request.data["immedialty"]: + if request.data["immediately"]: DataRemovalJob.objects.create( user=request.user, filter=statement_filter, diff --git a/src/frontend/src/app/data-removal.service.ts b/src/frontend/src/app/data-removal.service.ts index cd518c62b42f0011e461afd8d653809ddf1228d7..650f6b7e53cdda82bbcf4b97290bafd6532ca137 100644 --- a/src/frontend/src/app/data-removal.service.ts +++ b/src/frontend/src/app/data-removal.service.ts @@ -35,7 +35,7 @@ export class DataRemovalService { scope: DeleteAllUserStatements | DeleteVerbStatements | DeleteObjectStatements }): Observable<void> { return this.http.post<void>(`${environment.apiUrl}/api/v1/data-removal/create`, { - immedialty: data.immediately, + immediately: data.immediately, scope: data.scope }) } diff --git a/src/providers/models.py b/src/providers/models.py index ab80e24e68d4313b44985a30038c9ec1695373ba..c7f7b126b995c87ba76b30b25a914d8ced79ff87 100644 --- a/src/providers/models.py +++ b/src/providers/models.py @@ -71,8 +71,10 @@ class ProviderVerbGroup(models.Model): def __str__(self): return (f"Provider Verb Group {self.id}:\n" + f"- label {self.label}\n" f"- contains {len(self.verbs.all())} verbs\n" f"- group ID: {self.group_id}\n" + f"- provider ID: {self.provider_id}\n" f"- provider Schema ID: {self.provider_schema_id}\n") diff --git a/src/providers/tests/__init__.py b/src/providers/tests/__init__.py new file mode 100644 index 0000000000000000000000000000000000000000..e69de29bb2d1d6434b8b29ae775ad8c2e48c5391 diff --git a/src/providers/tests/tests_analytics_tokens.py b/src/providers/tests/tests_analytics_tokens.py new file mode 100644 index 0000000000000000000000000000000000000000..3c011d702c14b8792c000a84148aa848ad97cbb6 --- /dev/null +++ b/src/providers/tests/tests_analytics_tokens.py @@ -0,0 +1,241 @@ +import json +import os + +from django.core.files.uploadedfile import SimpleUploadedFile +from django.test import TestCase +from rest_framework.test import APIClient +from rolepermissions.roles import assign_role +from django.core.management import call_command + +from users.models import CustomUser + +from providers.models import Provider, ProviderAuthorization, AnalyticsToken + +PROJECT_PATH = os.path.abspath(os.path.dirname(__name__)) + +class ProviderTestCase(TestCase): + + test_user_email = "test@mail.com" + test_user_password = "test123" + + test_provider_email = "test2@mail.com" + test_provider_password = "test123" + + def setUp(self): + call_command('check_and_apply_migrations') + normal_user = CustomUser.objects.create_user(self.test_user_email, self.test_user_password) + provider_user = CustomUser.objects.create_user(self.test_provider_email, self.test_provider_password) + + assign_role(normal_user, 'user') + assign_role(provider_user, 'provider_manager') + assign_role(provider_user, 'analyst') # needed to create analytics tokens + + response=self.client.post('/api/v1/auth/token', {"email": self.test_user_email, "password": self.test_user_password}) # obtain token + self.assertEqual(response.status_code, 200) + self.assertTrue('access' in response.data) + user_token = response.data['access'] + + response=self.client.post('/api/v1/auth/token', {"email": self.test_provider_email, "password": self.test_provider_password}) # obtain token + self.assertEqual(response.status_code, 200) + self.assertTrue('access' in response.data) + provider_token = response.data['access'] + + self.user_client = APIClient() + self.user_client.credentials(HTTP_AUTHORIZATION='Bearer ' + user_token) + + self.provider_client = APIClient() + self.provider_client.credentials(HTTP_AUTHORIZATION='Bearer ' + provider_token) + + # we need provider schemas for many of these tests + with open(os.path.join(PROJECT_PATH, "static/provider_schema_h5p_v1.example.json")) as fp: + response=self.provider_client.put('/api/v1/consents/provider/create', { + "provider-schema": fp + }, format='multipart') + self.assertEqual(response.status_code, 201) + self.assertTrue('message' in response.json()) + + def test_create_token(self): + response = self.provider_client.post( + f"/api/v1/provider/analytics-tokens/create", + { + "name": "Test analytics token", + "description": "Test analytics token", + "expires": "12/31/2099", + "can_access": [] + }, format="json" + ) + self.assertEqual(response.status_code, 201) + + + def test_token_name_available(self): + response = self.provider_client.post( + f"/api/v1/provider/analytics-tokens/create", + { + "name": "Test analytics token", + "description": "Test analytics token", + "expires": "12/31/2099", + "can_access": [] + }, format="json" + ) + self.assertEqual(response.status_code, 201) + + # test unavailable + response = self.provider_client.post( + f"/api/v1/provider/analytics-tokens/name-available", + { + "name": "Test analytics token" + }, format="json" + ) + self.assertEqual(response.status_code, 200) + data = json.loads(response.content.decode()) + self.assertFalse(data["is_available"]) + + # test available + response = self.provider_client.post( + f"/api/v1/provider/analytics-tokens/name-available", + { + "name": "Test analytics token other" + }, format="json" + ) + self.assertEqual(response.status_code, 200) + data = json.loads(response.content.decode()) + self.assertTrue(data["is_available"]) + + + # test failure case + response = self.provider_client.post( + f"/api/v1/provider/analytics-tokens/name-available", + { + "name": None + }, format="json" + ) + self.assertEqual(response.status_code, 400) + + def test_delete_token(self): + response = self.provider_client.post( + f"/api/v1/provider/analytics-tokens/create", + { + "name": "Test analytics token", + "description": "Test analytics token", + "expires": "12/31/2099", + "can_access": [] + }, format="json" + ) + self.assertEqual(response.status_code, 201) + self.assertEqual(AnalyticsToken.objects.count(), 1) + + token_id = AnalyticsToken.objects.latest('id').id + + response = self.provider_client.delete(f"/api/v1/provider/analytics-tokens/{token_id}/delete") + self.assertEqual(response.status_code, 204) + self.assertEqual(AnalyticsToken.objects.count(), 0) + + def test_update_token_verbs(self): + provider_id = Provider.objects.latest('id').id + + response = self.provider_client.post( + f"/api/v1/provider/analytics-tokens/create", + { + "name": "Test analytics token", + "description": "Test analytics token", + "expires": "12/31/2099", + "can_access": [] + }, format="json" + ) + self.assertEqual(response.status_code, 201) + + token_id = AnalyticsToken.objects.latest('id').id + + # test wrong token ID + response = self.provider_client.post(f"/api/v1/provider/analytics-tokens/1000000/update-verbs", + { + "active_verbs": [ + {"verb": "http://h5p.example.com/expapi/verbs/experienced", + "provider": provider_id}, + {"verb": "http://h5p.example.com/expapi/verbs/attempted", + "provider": provider_id}, + {"verb": "http://h5p.example.com/expapi/verbs/interacted", + "provider": provider_id}, + {"verb": "http://h5p.example.com/expapi/verbs/answered", + "provider": provider_id}, + ] + }, format="json") + self.assertEqual(response.status_code, 404) + + response = self.provider_client.post(f"/api/v1/provider/analytics-tokens/{token_id}/update-verbs", + { + "active_verbs": [ + {"verb": "http://h5p.example.com/expapi/verbs/experienced", "provider": provider_id}, + {"verb": "http://h5p.example.com/expapi/verbs/attempted", "provider": provider_id}, + {"verb": "http://h5p.example.com/expapi/verbs/interacted", "provider": provider_id}, + {"verb": "http://h5p.example.com/expapi/verbs/answered", "provider": provider_id}, + ] + }, format="json") + self.assertEqual(response.status_code, 200) + + token = AnalyticsToken.objects.filter(pk=token_id).get() + self.assertEqual(token.analyticstokenverb_set.count(), 4) + + def test_update_and_delete_token_image(self): + provider_id = Provider.objects.latest('id').id + + response = self.provider_client.post( + f"/api/v1/provider/analytics-tokens/create", + { + "name": "Test analytics token", + "description": "Test analytics token", + "expires": "12/31/2099", + "can_access": [] + }, format="json" + ) + self.assertEqual(response.status_code, 201) + + token_id = AnalyticsToken.objects.latest('id').id + + response = self.provider_client.post(f"/api/v1/provider/analytics-tokens/{token_id}/update-verbs", + { + "active_verbs": [ + {"verb": "http://h5p.example.com/expapi/verbs/experienced", "provider": provider_id}, + {"verb": "http://h5p.example.com/expapi/verbs/attempted", "provider": provider_id}, + {"verb": "http://h5p.example.com/expapi/verbs/interacted", "provider": provider_id}, + {"verb": "http://h5p.example.com/expapi/verbs/answered", "provider": provider_id}, + ] + }, format="json") + self.assertEqual(response.status_code, 200) + + token = AnalyticsToken.objects.filter(pk=token_id).get() + self.assertEqual(token.analyticstokenverb_set.count(), 4) + + # test upload invalid image + upload_image = SimpleUploadedFile("image.mp4", b"aslkdjnwekjgfnwkgbwekrjgbwgbkwjebg", content_type="video/mp4") + response = self.provider_client.post(f"/api/v1/provider/analytics-tokens/{token_id}/image", + {"image": upload_image}) + self.assertEqual(response.status_code, 400) + + # test upload "valid" image + upload_image = SimpleUploadedFile("image.png", b"aslkdjnwekjgfnwkgbwekrjgbwgbkwjebg", content_type="image/png") + response = self.provider_client.post(f"/api/v1/provider/analytics-tokens/{token_id}/image", + {"image": upload_image}) + self.assertEqual(response.status_code, 200) + + token = AnalyticsToken.objects.filter(pk=token_id).get() + + # test get image + response = self.provider_client.get(f"/api/v1/provider/analytics-tokens/{token_id}/image/{token.image_path}") + self.assertEqual(response.status_code, 200) + + # test get wrong image + response = self.provider_client.get(f"/api/v1/provider/analytics-tokens/{token_id}/image/nonexistent.png") + self.assertEqual(response.status_code, 404) + + # test deletion failure for nonexistent token + response = self.provider_client.delete(f"/api/v1/provider/analytics-tokens/1000000/image") + self.assertEqual(response.status_code, 404) + + # test deletion + response = self.provider_client.delete(f"/api/v1/provider/analytics-tokens/{token_id}/image") + self.assertEqual(response.status_code, 200) + + # test get image + response = self.provider_client.get(f"/api/v1/provider/analytics-tokens/{token_id}/image/{token.image_path}") + self.assertEqual(response.status_code, 404) \ No newline at end of file diff --git a/src/providers/tests.py b/src/providers/tests/tests_provider_management.py similarity index 98% rename from src/providers/tests.py rename to src/providers/tests/tests_provider_management.py index c7d75f04935f8a88de00c5b28e51108322a8d158..f17447f46ffa612ec351f34d3ec0fd227e089099 100644 --- a/src/providers/tests.py +++ b/src/providers/tests/tests_provider_management.py @@ -8,7 +8,7 @@ from django.core.management import call_command from users.models import CustomUser -from .models import Provider, ProviderAuthorization +from src.providers.models import Provider, ProviderAuthorization PROJECT_PATH = os.path.abspath(os.path.dirname(__name__)) diff --git a/src/providers/views.py b/src/providers/views.py index 9f6a42c3d4393f7032489ee5cc8a378dfb29601c..a210f30b9dc9e35657954eadfe4afade095c0767 100644 --- a/src/providers/views.py +++ b/src/providers/views.py @@ -238,8 +238,8 @@ class UpdateAnalyticsTokensVerbs(APIView): status=status.HTTP_401_UNAUTHORIZED, ) - # delete all currently assignes verbs - AnalyticsTokenVerb.objects.filter(analytics_token=analytics_token.id).delete(); + # delete all currently assigned verbs + AnalyticsTokenVerb.objects.filter(analytics_token=analytics_token.id).delete() for activeverb in request.data["active_verbs"]: v = AnalyticsTokenVerb() v.verb = activeverb["verb"] @@ -248,7 +248,7 @@ class UpdateAnalyticsTokensVerbs(APIView): v.save() - return Response({"message": "sucess"}, status=status.HTTP_200_OK) + return Response({"message": "success"}, status=status.HTTP_200_OK) except ObjectDoesNotExist: return Response( {"message": "token doesn't exist"}, status=status.HTTP_404_NOT_FOUND diff --git a/src/settings/tests/__init__.py b/src/settings/tests/__init__.py new file mode 100644 index 0000000000000000000000000000000000000000..e69de29bb2d1d6434b8b29ae775ad8c2e48c5391 diff --git a/src/settings/tests/tests_privacy_policy.py b/src/settings/tests/tests_privacy_policy.py new file mode 100644 index 0000000000000000000000000000000000000000..9a67194bcbaad5c02129b9a5632742ec371e1370 --- /dev/null +++ b/src/settings/tests/tests_privacy_policy.py @@ -0,0 +1,136 @@ +import json +import os +from io import StringIO + +from rest_framework.test import APIClient +from rolepermissions.roles import assign_role +from django.core.management import call_command + +from consents.tests.tests_consent_operations import BaseTestCase +from providers.models import ProviderAuthorization, Provider, ProviderVerbGroup, ProviderSchema +from users.models import CustomUser + +from consents.models import UserConsents + + +class TestUserEndpoints(BaseTestCase): + verb_groups = [ + { + "provider_id": 1, + "id": "default_group", + "label": "Default group", + "description": "default", + "showVerbDetails": True, + "purposeOfCollection": "Lorem Ipsum", + "requiresConsent": True, + "verbs": [ + {"id": "http://h5p.example.com/expapi/verbs/experienced"}, + {"id": "http://h5p.example.com/expapi/verbs/attempted"}, + {"id": "http://h5p.example.com/expapi/verbs/interacted"}, + {"id": "http://h5p.example.com/expapi/verbs/answered"} + ] + }, + ] + + def setUp(self): + call_command('check_and_apply_migrations') + normal_user = CustomUser.objects.create_user( + self.test_user_email, self.test_user_password + ) + provider_user = CustomUser.objects.create_superuser( # super user, since we need extended permissions + self.test_provider_email, self.test_provider_password + ) + + assign_role(normal_user, "user") + assign_role(provider_user, "provider_manager") + assign_role(provider_user, 'polaris_administrator') + + response = self.client.post( + "/api/v1/auth/token", + {"email": self.test_user_email, "password": self.test_user_password}, + ) # obtain token + self.assertEqual(response.status_code, 200) + self.assertTrue("access" in response.data) + user_token = response.data["access"] + + response = self.client.post( + "/api/v1/auth/token", + { + "email": self.test_provider_email, + "password": self.test_provider_password, + }, + ) # obtain token + self.assertEqual(response.status_code, 200) + self.assertTrue("access" in response.data) + provider_token = response.data["access"] + + self.user_client = APIClient() + self.user_client.credentials(HTTP_AUTHORIZATION="Bearer " + user_token) + + self.provider_client = APIClient() + self.provider_client.credentials(HTTP_AUTHORIZATION="Bearer " + provider_token) + + # Create provider schema + with StringIO(json.dumps(self.h5p_provider_schema)) as fp: + response = self.provider_client.put( + "/api/v1/consents/provider/create", + {"provider-schema": fp}, + format="multipart", + ) + self.assertEqual(response.status_code, 201) + + # Get application token for created provider + self.application_token = ProviderAuthorization.objects.get( + provider__name="H5P" + ).key + self.third_party_client = APIClient() + self.third_party_client.credentials( + HTTP_AUTHORIZATION="Basic " + self.application_token + ) + + # create some verb groups + for group in self.verb_groups: + response = self.provider_client.post( + "/api/v1/consents/provider/" + str(Provider.objects.latest('id').id) + "/create-verb-group", + group, + format="json", + ) + self.assertEqual(response.status_code, 200) + + def test_set_privacy_policy(self): + # test unauthorized user + response = self.user_client.post( + f"/api/v1/settings/privacy-policy", + {"content": "test"}, + format="json" + ) + self.assertEqual(response.status_code, 403) + + # test authorized user + response = self.provider_client.post( + f"/api/v1/settings/privacy-policy", + {"content": "test"}, + format="json" + ) + self.assertEqual(response.status_code, 200) + data = json.loads(response.content.decode()) + self.assertEqual(data["content"], "test") + + def test_get_privacy_policy(self): + # test authorized user + response = self.provider_client.post( + f"/api/v1/settings/privacy-policy", + {"content": "test"}, + format="json" + ) + self.assertEqual(response.status_code, 200) + data = json.loads(response.content.decode()) + self.assertEqual(data["content"], "test") + + # test unauthorized user + response = self.user_client.get( + f"/api/v1/settings/privacy-policy", + ) + self.assertEqual(response.status_code, 200) + data = json.loads(response.content.decode()) + self.assertEqual(data["content"], "test") \ No newline at end of file