diff --git a/Readme.md b/Readme.md index 43c6459d355a362c02a452a1b91d90be5a6ab969..b9f38466f8704480cb423b453deece76bccf7e13 100644 --- a/Readme.md +++ b/Readme.md @@ -4,5 +4,10 @@ This project is divided into the two main parts the scheduler and the runner ## Scheduler The scheduler can be found in the scheduler folder. -## Runner -The runner can be found in the runner folder. + +### Periodic Tasks + +Analytics engines are stored as YAML files in the `configuration` directory. + +`scheduler read-configs` reads all existing YAML files and starts or updates analytics engines. Since analytics engine configuration might change over time, a crontab, that runs `scheduler read-configs`, should be registerd. + diff --git a/scheduler/scheduler/ext/commands.py b/scheduler/scheduler/ext/commands.py index ba0f1be73c8cd18d4e42ec7df9e32acc557fabc3..695d3147472ead3c3d443214271b3b21668f0c68 100644 --- a/scheduler/scheduler/ext/commands.py +++ b/scheduler/scheduler/ext/commands.py @@ -1,8 +1,4 @@ -import glob -from pathlib import Path - import click -import yaml from celery_sqlalchemy_scheduler import (CrontabSchedule, IntervalSchedule, PeriodicTask, PeriodicTaskChanged, SolarSchedule) @@ -11,6 +7,7 @@ from sqlalchemy.ext.declarative import declarative_base from scheduler.ext.database import db, engine from scheduler.ext.restapi.resources import (create_job, parse_crontab, update_job) +from scheduler.ext.yml_config_reader import YmlConfigReader from scheduler.models import Job @@ -51,43 +48,30 @@ def init_app(app): """ Read all YAML files stored in the configurations directory and create corresponding jobs. """ - directory = "configuration" - - file_paths = ( - p.resolve() - for p in Path(directory).glob("*") - if p.suffix in {".yml", ".yaml"} - ) - - configs = [] - for p in file_paths: - with p.open() as f: - configs.append( - {"file_name": p.name, "config": yaml.safe_load(f.read())} - ) - - print(f"Found {len(configs)} configuration file(s)") - for config in configs: - print( - f"{config['file_name']} contains {len(config['config'].keys())} engines" - ) - for engine_name in config["config"].keys(): - try: - repo_url = config["config"][engine_name]["repo"] - crontab = parse_crontab(config["config"][engine_name]["run"]) - analytics_token = parse_crontab(config["config"][engine_name]["analytics_token"]) - print("Engine: ", engine_name) - print("Interval", " ".join(crontab)) - print("Repo", repo_url) - - # TODO handle engine with name already exists - if Job.query.filter(Job.name == engine_name).first() is not None: - print("Job with provided name already exists.") - update_job(engine_name, crontab) - else: - new_job_id = create_job(engine_name, repo_url, crontab, analytics_token) - print(f"New Job {new_job_id} created") - except ValueError: - print("Invalid crontab") - return {"message": "Invalid crontab."}, 400 + reader = YmlConfigReader() + try: + yaml_configs = reader.read_configs() + + for config in yaml_configs: + if ( + Job.query.filter(Job.name == config["engine_name"]).first() + is not None + ): + print("Job with provided name already exists.") + update_job( + config["engine_name"], + config["repo_url"], + config["crontab"], + config["analytics_token"], + ) + else: + new_job_id = create_job( + config["engine_name"], + config["repo_url"], + config["crontab"], + config["analytics_token"], + ) + print(f"New Job {new_job_id} created") + except ValueError as e: + print(e) diff --git a/scheduler/scheduler/ext/restapi/resources.py b/scheduler/scheduler/ext/restapi/resources.py index 144d37c09836d1ba2cace880a8db448405f673a0..8655ead3250645a75b8c2725d69ab9718cf1f9d7 100644 --- a/scheduler/scheduler/ext/restapi/resources.py +++ b/scheduler/scheduler/ext/restapi/resources.py @@ -78,7 +78,10 @@ def get_crontab_schedule(crontab): return schedule -def update_job(name, crontab): +def update_job(name, repo_url, crontab, analytics_token): + """ + Update existing job (including perdiodic task) in case repo_url, crontab or analytics_token changed. + """ old_job = Job.query.filter(Job.name == name).first() if old_job is None: @@ -87,8 +90,6 @@ def update_job(name, crontab): schedule = get_crontab_schedule(crontab) - print("schedule", schedule) - periodic_task = scheduler_db.query(PeriodicTask).get(old_job.task_id) if periodic_task is None: @@ -108,11 +109,34 @@ def update_job(name, crontab): return periodic_task_change.last_update = datetime.now() scheduler_db.commit() - else: - print("Crontab didn't change, no update required.") + # Update repo_url and analytics_token + old_kwargs = json.loads(periodic_task.kwargs) + + if old_kwargs["repo_url"] != repo_url: + print("Repo url changed.") + old_kwargs["repo_url"] = repo_url + old_job.repo_url = repo_url + old_job.analtyics_token = analytics_token + db.session.commit() + + periodic_task.kwargs = json.dumps(old_kwargs) + scheduler_db.commit() + + if old_kwargs["analytics_token"] != analytics_token: + print("Analytics token changed.") + old_kwargs["analytics_token"] = analytics_token + old_job.repo_url = repo_url + old_job.analtyics_token = analytics_token + db.session.commit() + + periodic_task.kwargs = json.dumps(old_kwargs) + scheduler_db.commit() def create_job(name, repo_url, crontab, analytics_token): + """ + Create a new job along with a periodic task. + """ print(name, crontab, repo_url) result_token = secrets.token_hex(32) diff --git a/scheduler/scheduler/ext/yml_config_reader.py b/scheduler/scheduler/ext/yml_config_reader.py new file mode 100644 index 0000000000000000000000000000000000000000..feaad2445e944aca6d598eaac179a389d0f1f6df --- /dev/null +++ b/scheduler/scheduler/ext/yml_config_reader.py @@ -0,0 +1,81 @@ +import glob +from pathlib import Path + +import yaml + + +class YmlConfigReader: + """ + Analytics engines are described as YAML files in 'configuration' directory. + This class provides helpers to validate and parse these files. + """ + + directory = "configuration" + + def read_configs(self): + """ + Read all YAML files in the 'configuration' directory. + """ + config_files = [] + file_paths = ( + p.resolve() + for p in Path(self.directory).glob("*") + if p.suffix in {".yml", ".yaml"} + ) + + for p in file_paths: + with p.open() as f: + config_files.append( + {"file_name": p.name, "config": yaml.safe_load(f.read())} + ) + + print(f"Found {len(config_files)} configuration file(s)") + + validated_configs = self.validate_config_files(config_files) + return validated_configs + + def validate_config_files(self, config_files): + """ + We expect the YAML files to follow a certain form, which requires validating each analytics engine YAML configuration. + """ + validated_configs = [] + for config_file in config_files: + for engine_name in config_file["config"].keys(): + engine_config = self.validate_config( + config_file["config"][engine_name], + config_file["file_name"], + engine_name, + ) + validated_configs.append(engine_config) + + return validated_configs + + def validate_config(self, config, file_name, engine_name): + """ + Validates a single analytics engine YAML configuration and raises a ValueError, in case a field is invalid or missing. + """ + repo_url = config.get("repo") + crontab = config.get("crontab", "").split(" ") + analytics_token = config.get("analytics_token") + + if repo_url is None: + raise ValueError( + f"Missing repo_url for engine '{engine_name}' in file '{file_name}'." + ) + + if len(crontab) != 5: + raise ValueError( + f"Invalid crontab {config.get('run', '')} for engine '{engine_name}' in file '{file_name}'." + ) + + if analytics_token is None: + raise ValueError( + f"Missing analytics token for engine '{engine_name}' in file '{file_name}'." + ) + + return { + "engine_name": engine_name, + "repo_url": repo_url, + "crontab": crontab, + "analytics_token": analytics_token, + }