Skip to content
Snippets Groups Projects
Select Git revision
  • aef42a773666470cca577671d24a5eb09ffb2248
  • master default protected
  • dev protected
  • Issue/3189-onboardingUniBonn
  • Issue/3130-onboardingUzK
  • Issue/3109-onboarding
  • Issue/2915-migrateSql2Linked
  • test_ci
  • Issue/xxxx-fixDevcontainer
  • Issue/xxxx-generateLatestTag
  • Issue/2980-fixContainerBuild
  • Issue/2967-fixGD
  • Issue/2944-gdShenanigans
  • Issue/2906-containerCron
  • Issue/2880-gd
  • petar.hristov-master-patch-9e49
  • Issue/2668-graphDeployer
  • gitkeep
  • Hotfix/xxxx-fastDeployment
  • Hotfix/2615-graphDeployerLag
  • Issue/2568-betterLogging
  • v2.2.0
  • v2.1.11
  • v2.1.10
  • v2.1.9
  • v2.1.8
  • v2.1.7
  • v2.1.6
  • v2.1.5
  • v2.1.4
  • v2.1.3
  • v2.1.2
  • v2.1.1
  • v2.1.0
  • v2.0.1
  • v2.0.0
  • v1.2.11
  • v1.2.10
  • v1.2.9
  • v1.2.8
  • v1.2.7
41 results

GraphDeployer.sln

Blame
  • Code owners
    Assign users and groups as approvers for specific file changes. Learn more.
    utilsJupyter.py 1.39 KiB
    import json
    
    import requests
    
    PAGE_SIZE = 100
    
    #this module loads statements from a json file and saves results to a json file instead of accessing the stupla x api
    
    def get_statements_page(api_url, analytics_token, last_object_id=None):
        print(api_url)
        payload = (
            {"page_size": PAGE_SIZE, "last_object_id": last_object_id}
            if last_object_id
            else {"page_size": PAGE_SIZE}
        )
        response = requests.post(
            f"{api_url}/api/v1/provider/data",
            headers={"Authorization": f"Basic {analytics_token}"},
            json=payload,
        )
        response.raise_for_status()
    
        payload = response.json()
        return payload.get("statements", [])
    
    
    def get_statements(api_url, analytics_token):
        #reads data from data/stupla_x_api_statements.json
        statements = []
        with open('data/stupla_x_api_statements.json') as json_file:
            data = json.load(json_file)
            statements.extend(data)
        return statements
    
    def get_existing_results(api_url, analytics_token):
        response = requests.get(
            f"{api_url}/api/v1/provider/results",
            headers={"Authorization": f"Basic {analytics_token}"},
        )
        response.raise_for_status()
        data = response.json()
        return data
    
    def save_results(api_url, analytics_token, results):
        #save results to file in data/results.json
        with open('results/results.json', 'w') as outfile:
            json.dump(results, outfile)