Skip to content
Snippets Groups Projects
Commit fdfe7577 authored by Max Lou's avatar Max Lou
Browse files

Reading YAML configurations with analytics tokens

parent 1c315972
No related branches found
No related tags found
No related merge requests found
......@@ -4,5 +4,10 @@ This project is divided into the two main parts the scheduler and the runner
## Scheduler
The scheduler can be found in the scheduler folder.
## Runner
The runner can be found in the runner folder.
### Periodic Tasks
Analytics engines are stored as YAML files in the `configuration` directory.
`scheduler read-configs` reads all existing YAML files and starts or updates analytics engines. Since analytics engine configuration might change over time, a crontab, that runs `scheduler read-configs`, should be registerd.
import glob
from pathlib import Path
import click
import yaml
from celery_sqlalchemy_scheduler import (CrontabSchedule, IntervalSchedule,
PeriodicTask, PeriodicTaskChanged,
SolarSchedule)
......@@ -11,6 +7,7 @@ from sqlalchemy.ext.declarative import declarative_base
from scheduler.ext.database import db, engine
from scheduler.ext.restapi.resources import (create_job, parse_crontab,
update_job)
from scheduler.ext.yml_config_reader import YmlConfigReader
from scheduler.models import Job
......@@ -51,43 +48,30 @@ def init_app(app):
"""
Read all YAML files stored in the configurations directory and create corresponding jobs.
"""
directory = "configuration"
file_paths = (
p.resolve()
for p in Path(directory).glob("*")
if p.suffix in {".yml", ".yaml"}
)
configs = []
for p in file_paths:
with p.open() as f:
configs.append(
{"file_name": p.name, "config": yaml.safe_load(f.read())}
)
print(f"Found {len(configs)} configuration file(s)")
for config in configs:
print(
f"{config['file_name']} contains {len(config['config'].keys())} engines"
)
for engine_name in config["config"].keys():
reader = YmlConfigReader()
try:
repo_url = config["config"][engine_name]["repo"]
crontab = parse_crontab(config["config"][engine_name]["run"])
analytics_token = parse_crontab(config["config"][engine_name]["analytics_token"])
print("Engine: ", engine_name)
print("Interval", " ".join(crontab))
print("Repo", repo_url)
yaml_configs = reader.read_configs()
# TODO handle engine with name already exists
if Job.query.filter(Job.name == engine_name).first() is not None:
for config in yaml_configs:
if (
Job.query.filter(Job.name == config["engine_name"]).first()
is not None
):
print("Job with provided name already exists.")
update_job(engine_name, crontab)
update_job(
config["engine_name"],
config["repo_url"],
config["crontab"],
config["analytics_token"],
)
else:
new_job_id = create_job(engine_name, repo_url, crontab, analytics_token)
new_job_id = create_job(
config["engine_name"],
config["repo_url"],
config["crontab"],
config["analytics_token"],
)
print(f"New Job {new_job_id} created")
except ValueError:
print("Invalid crontab")
return {"message": "Invalid crontab."}, 400
except ValueError as e:
print(e)
......@@ -78,7 +78,10 @@ def get_crontab_schedule(crontab):
return schedule
def update_job(name, crontab):
def update_job(name, repo_url, crontab, analytics_token):
"""
Update existing job (including perdiodic task) in case repo_url, crontab or analytics_token changed.
"""
old_job = Job.query.filter(Job.name == name).first()
if old_job is None:
......@@ -87,8 +90,6 @@ def update_job(name, crontab):
schedule = get_crontab_schedule(crontab)
print("schedule", schedule)
periodic_task = scheduler_db.query(PeriodicTask).get(old_job.task_id)
if periodic_task is None:
......@@ -108,11 +109,34 @@ def update_job(name, crontab):
return
periodic_task_change.last_update = datetime.now()
scheduler_db.commit()
else:
print("Crontab didn't change, no update required.")
# Update repo_url and analytics_token
old_kwargs = json.loads(periodic_task.kwargs)
if old_kwargs["repo_url"] != repo_url:
print("Repo url changed.")
old_kwargs["repo_url"] = repo_url
old_job.repo_url = repo_url
old_job.analtyics_token = analytics_token
db.session.commit()
periodic_task.kwargs = json.dumps(old_kwargs)
scheduler_db.commit()
if old_kwargs["analytics_token"] != analytics_token:
print("Analytics token changed.")
old_kwargs["analytics_token"] = analytics_token
old_job.repo_url = repo_url
old_job.analtyics_token = analytics_token
db.session.commit()
periodic_task.kwargs = json.dumps(old_kwargs)
scheduler_db.commit()
def create_job(name, repo_url, crontab, analytics_token):
"""
Create a new job along with a periodic task.
"""
print(name, crontab, repo_url)
result_token = secrets.token_hex(32)
......
import glob
from pathlib import Path
import yaml
class YmlConfigReader:
"""
Analytics engines are described as YAML files in 'configuration' directory.
This class provides helpers to validate and parse these files.
"""
directory = "configuration"
def read_configs(self):
"""
Read all YAML files in the 'configuration' directory.
"""
config_files = []
file_paths = (
p.resolve()
for p in Path(self.directory).glob("*")
if p.suffix in {".yml", ".yaml"}
)
for p in file_paths:
with p.open() as f:
config_files.append(
{"file_name": p.name, "config": yaml.safe_load(f.read())}
)
print(f"Found {len(config_files)} configuration file(s)")
validated_configs = self.validate_config_files(config_files)
return validated_configs
def validate_config_files(self, config_files):
"""
We expect the YAML files to follow a certain form, which requires validating each analytics engine YAML configuration.
"""
validated_configs = []
for config_file in config_files:
for engine_name in config_file["config"].keys():
engine_config = self.validate_config(
config_file["config"][engine_name],
config_file["file_name"],
engine_name,
)
validated_configs.append(engine_config)
return validated_configs
def validate_config(self, config, file_name, engine_name):
"""
Validates a single analytics engine YAML configuration and raises a ValueError, in case a field is invalid or missing.
"""
repo_url = config.get("repo")
crontab = config.get("crontab", "").split(" ")
analytics_token = config.get("analytics_token")
if repo_url is None:
raise ValueError(
f"Missing repo_url for engine '{engine_name}' in file '{file_name}'."
)
if len(crontab) != 5:
raise ValueError(
f"Invalid crontab {config.get('run', '')} for engine '{engine_name}' in file '{file_name}'."
)
if analytics_token is None:
raise ValueError(
f"Missing analytics token for engine '{engine_name}' in file '{file_name}'."
)
return {
"engine_name": engine_name,
"repo_url": repo_url,
"crontab": crontab,
"analytics_token": analytics_token,
}
0% Loading or .
You are about to add 0 people to the discussion. Proceed with caution.
Please register or to comment