diff --git a/scheduler/scheduler/ext/commands.py b/scheduler/scheduler/ext/commands.py index 695d3147472ead3c3d443214271b3b21668f0c68..273860e0f6911773adc349a610bfe7874a116ada 100644 --- a/scheduler/scheduler/ext/commands.py +++ b/scheduler/scheduler/ext/commands.py @@ -36,10 +36,8 @@ def populate_db(): data = [] db.session.bulk_save_objects(data) db.session.commit() - - def init_app(app): - # add multiple commands in a bulk + # Add multiple commands in a bulk for command in [create_db, drop_db, populate_db]: app.cli.add_command(app.cli.command()(command)) @@ -47,18 +45,28 @@ def init_app(app): def read_configs(): """ Read all YAML files stored in the configurations directory and create corresponding jobs. + Jobs not found in the configurations will be deleted. """ - reader = YmlConfigReader() try: yaml_configs = reader.read_configs() + + # Create a set of job names from the YAML configurations + config_job_names = {config["engine_name"] for config in yaml_configs} + + # Check for jobs in the database that are not in the configurations + all_jobs = Job.query.all() + for job in all_jobs: + if job.name not in config_job_names: + # Delete jobs not present in the configurations + print(f"Deleting job {job.name} as it is no longer in the configurations.") + delete_job(job.name) # Assuming a `delete_job` function exists + # Create or update jobs based on the YAML configurations for config in yaml_configs: - if ( - Job.query.filter(Job.name == config["engine_name"]).first() - is not None - ): - print("Job with provided name already exists.") + existing_job = Job.query.filter(Job.name == config["engine_name"]).first() + if existing_job: + print(f"Updating job {config['engine_name']}.") update_job( config["engine_name"], config["repo_url"], @@ -72,6 +80,6 @@ def init_app(app): config["crontab"], config["analytics_token"], ) - print(f"New Job {new_job_id} created") + print(f"New Job {new_job_id} created.") except ValueError as e: - print(e) + print(e) \ No newline at end of file diff --git a/scheduler/scheduler/ext/restapi/resources.py b/scheduler/scheduler/ext/restapi/resources.py index 8655ead3250645a75b8c2725d69ab9718cf1f9d7..bc3a122bcf4533d292b56e140d95f454ebae41fb 100644 --- a/scheduler/scheduler/ext/restapi/resources.py +++ b/scheduler/scheduler/ext/restapi/resources.py @@ -174,13 +174,57 @@ def create_job(name, repo_url, crontab, analytics_token): return new_job.id +def delete_job(job_name): + """ + Deletes a job and its associated periodic task by job name. + + Args: + job_name (str): The name of the job to be deleted. + + Returns: + str: Confirmation message of the deletion or an error message if the job/task is not found. + """ + # Find the job by name + job = Job.query.filter_by(name=job_name).first() + + if not job: + print(f"Job with name '{job_name}' not found.") + return f"Job with name '{job_name}' not found." + + # Find the associated periodic task + periodic_task = scheduler_db.query(PeriodicTask).filter_by(id=job.task_id).first() + + if periodic_task: + # Delete the periodic task from the scheduler database + scheduler_db.delete(periodic_task) + scheduler_db.commit() + print(f"Periodic task for job '{job_name}' deleted.") + + # Delete the job from the main database + db.session.delete(job) + db.session.commit() + print(f"Job '{job_name}' deleted.") + + return f"Job '{job_name}' and associated periodic task deleted." + class JobResource(MethodResource, Resource): + """ + RESTful API Resource for managing jobs. + + Provides methods to list all jobs and create a new job. + """ + @doc(description="Returns list of all registered jobs.", tags=["Job List"]) @marshal_with(JobsSchema, code=200) def get(self): - jobs = Job.query.all() + """ + GET endpoint to retrieve all jobs. + Returns: + dict: A dictionary containing a list of all jobs with their details. + """ + jobs = Job.query.all() return {"jobs": [get_job_with_task(job.id) for job in jobs]} @doc(description="Creates a new job.", tags=["Create Job"]) @@ -188,14 +232,26 @@ class JobResource(MethodResource, Resource): @marshal_with(JobSchema, code=200) @marshal_with(MessageSchema, code=400) def post(self, **kwargs): + """ + POST endpoint to create a new job. + + Args: + kwargs: Job details including name, repo_url, crontab, and analytics_token. + + Returns: + dict: Created job details or an error message. + """ try: + # Parse and validate the crontab string crontab = parse_crontab(kwargs["crontab"]) except ValueError: return {"message": "Invalid crontab."}, 400 + # Check if a job with the same name already exists if Job.query.filter(Job.name == kwargs["name"]).first() is not None: return {"message": "Job with provided name already exists."}, 400 + # Create the job new_job_id = create_job( kwargs["name"], kwargs["repo_url"], crontab, kwargs["analytics_token"] )