Commit da2655b0 authored by Lukas Weber's avatar Lukas Weber
Browse files

embed jobconfig into input file, add jobname field

parent e930ae33
#!/usr/bin/env python3
import sys
import argparse
import subprocess
from loadleveller import jobfile
import os
parser = argparse.ArgumentParser(description='Helper script for running and managing loadleveller Monte Carlo jobs.', usage='''loadl <command> <jobfile> [<args>]
<jobfile> is a script file containing the parameters for the job of interest.
Possible commands and their shorthands are
delete, d delete all data related to a job
merge, m merges results of an unfinished job into an output file
run, r runs the job
status, s print job completion information''')
parser.add_argument('command')
parser.add_argument('jobfile')
args = parser.parse_args(sys.argv[1:3])
jobdir = os.path.dirname(args.jobfile)
jobfile_name = os.path.basename(args.jobfile)
job = jobfile.JobFile(jobfile_name)
if jobdir != '':
os.chdir(jobdir)
def run():
import glob
from loadleveller import clusterutils
parser = argparse.ArgumentParser(description='run a loadleveller job on a cluster or locally')
parser.add_argument('-s','--single', action='store_true', help='Run in the single core scheduler mode')
parser.add_argument('-f', '--force', action='store_true', help='Ignore warnings about possible job corruption')
parser.add_argument('-r', '--restart', action='store_true', help='Delete all existing job data before starting.')
args_run = parser.parse_args(sys.argv[3:])
if args_run.restart:
delete()
else:
# check age of the different files
binary_modtime = os.stat(job.jobconfig['mc_binary']).st_mtime
try:
f = next(glob.iglob('{}.data/*/*.h5'.format(job.jobname))) # only check one of the output files for speed
data_modtime = os.stat(f).st_mtime
label = 'Warning' if args_run.force else 'Error'
if binary_modtime > data_modtime:
print('{}: binary \'{}\' is newer than the checkpoint files.'.format(label, job.mc_binary))
if not args.force:
print('Use \'--restart\' to start from a blank run or use \'--force\' to proceed if you are sure\nthe changes you made are compatible.')
sys.exit(1)
except StopIteration:
pass
if args_run.single:
cmd = [job.mc_binary, 'single', jobname]
print('$ '+' '.join(cmd))
subprocess.run(cmd)
else:
clusterutils.run(job.jobname, job.jobconfig, [job.jobconfig['mc_binary'], job.jobname])
def delete():
import shutil
print('$ rm -r {}.data'.format(job.jobname))
shutil.rmtree('{}.data'.format(job.jobname))
print('$ rm -r {}.results.yml'.format(job.jobname))
shutil.rmtree('{}.results.yml'.format(job.jobname))
def merge():
cmd = [job.jobconfig['mc_binary'], 'merge', job.jobname]
print('$ '+' '.join(cmd))
subprocess.run(cmd)
def status():
from loadleveller import jobstatus
rc = jobstatus.print_status(job, sys.args[3:])
sys.exit(rc)
if args.command == 'delete' or args.command == 'd':
delete()
elif args.command == 'merge' or args.command == 'm':
merge()
elif args.command == 'run' or args.command == 'r':
run()
elif args.command == 'status' or args.command == 's':
status()
else:
print('Unknown command \'{}\'.'.format(args.command))
parser.print_help()
sys.exit(1)
......@@ -7,18 +7,5 @@ class JobFile:
def __init__(self, filename):
with open(filename, 'r') as f:
jobfile = yaml.safe_load(f)
self.tasks = jobfile['tasks']
cwd = os.getcwd()
try:
jobpath = os.path.dirname(filename)
if jobpath != '':
os.chdir(os.path.dirname(filename))
jobconfig_path = os.path.expandvars(os.path.expanduser(jobfile['jobconfig']))
with open(jobconfig_path) as f:
self.jobconfig = yaml.safe_load(f)
self.mc_binary = os.path.abspath(os.path.expandvars(os.path.expanduser(self.jobconfig['mc_binary'])))
finally:
os.chdir(cwd)
self.__dict__ = jobfile
......@@ -15,25 +15,22 @@ class TaskProgress:
class JobProgress:
def __init__(self, jobfile):
self.jobfile = jobfile
with open(jobfile, 'r') as f:
jobfile = yaml.safe_load(f)
self.tasks = jobfile['tasks'].keys()
self.tasks = jobfile.tasks.keys()
self.restart = False
self.progress = []
for task in self.tasks:
tp = TaskProgress()
tp.target_sweeps = jobfile['tasks'][task]['sweeps']
tp.target_therm = jobfile['tasks'][task]['thermalization']
tp.target_sweeps = jobfile.tasks[task]['sweeps']
tp.target_therm = jobfile.tasks[task]['thermalization']
tp.sweeps = 0
tp.therm_sweeps = 0
tp.num_runs = 0
for runfile in glob.iglob('{}.data/{}/run*.dump.h5'.format(self.jobfile,task)):
for runfile in glob.iglob('{}.data/{}/run*.dump.h5'.format(self.jobfile.jobname,task)):
tp.num_runs += 1
with h5py.File(runfile, 'r') as f:
......@@ -52,32 +49,31 @@ class JobProgress:
def needs_merge(self):
result_mtime = 0
try:
result_mtime = os.path.getmtime(self.jobfile+'.results.yml')
result_mtime = os.path.getmtime(self.jobfile.jobname+'.results.yml')
except FileNotFoundError:
return True
for task in self.tasks:
for measfile in glob.iglob('{}.data/{}/run*.meas.h5'.format(self.jobfile,task)):
for measfile in glob.iglob('{}.data/{}/run*.meas.h5'.format(self.jobfile.jobname, task)):
if os.path.getmtime(measfile) > result_mtime:
return True
return False
def ystatus():
""" This function is exported as the ystatus command """
def print_status(jobfile, args):
""" This function is exported as the loadl status command """
import argparse
import sys
parser = argparse.ArgumentParser(description='Prints the status and progress of a loadleveller Monte Carlo job.')
parser.add_argument('jobfile', metavar='JOBFILE', help='Configuration file containing all the job information.')
parser.add_argument('--need-restart', action='store_true', help='Return 1 if the job is not completed yet and 0 otherwise.')
parser.add_argument('--need-merge', action='store_true', help='Return 1 if the merged results are older than the raw data and 0 otherwise.')
args = parser.parse_args()
try:
job_prog = JobProgress(args.jobfile)
job_prog = JobProgress(jobfile)
if args.need_restart and args.need_merge:
print("Error: only one option of '--need-restart' and '--need-merge' can appear at once", file=sys.stderr)
......
......@@ -3,17 +3,26 @@ import os
import yaml
import numpy
def _expand_path(path):
return os.path.abspath(os.path.expandvars(os.path.expanduser(path)))
class TaskMaker:
def __init__(self, name, jobconfig):
self._tm_name = os.path.splitext(name)[0]
self._tm_tasks = []
self._tm_jobconfig = jobconfig
with open(_expand_path(jobconfig), 'r') as f:
self._tm_jobconfig = yaml.safe_load(f)
self._tm_jobconfig['mc_binary'] = _expand_path(self._tm_jobconfig['mc_binary'])
def task(self, **additional_parameters):
self._tm_tasks.append({**self.__dict__, **additional_parameters})
def write(self):
filenames = []
jobfile_dict = { 'jobconfig': self._tm_jobconfig, 'tasks': {}}
jobfile_dict = { 'jobname': self._tm_name, 'jobconfig': self._tm_jobconfig, 'tasks': {}}
for i, t in enumerate(self._tm_tasks):
task_name = f'task{i+1:04d}'
......@@ -28,5 +37,4 @@ class TaskMaker:
task_dict[k] = v
jobfile_dict['tasks'][task_name] = task_dict
with open(self._tm_name, 'w') as f:
f.write(yaml.dump(jobfile_dict))
print(yaml.dump(jobfile_dict))
......@@ -9,11 +9,6 @@ setuptools.setup(
url="https://git.rwth-aachen.de/lukas.weber2/load_leveller",
packages=setuptools.find_packages(),
license="MIT",
scripts=["ydelete","yrun"],
scripts=["loadl"],
install_requires=["pyyaml","h5py","numpy"],
entry_points={
'console_scripts': [
'ystatus = loadleveller.jobstatus:ystatus',
],
}
)
#!/usr/bin/env python3
import yaml
import shutil
import argparse
parser = argparse.ArgumentParser(description='This script deletes all the simulation result directories of the job specified by JOBFILE so that you can restart it from scratch. Use with caution.')
parser.add_argument('jobfile', metavar='JOBFILE', help='Configuration file containing all the job information. May be generated using ytaskmaker')
args = parser.parse_args()
with open(args.jobfile, 'r') as f:
jobfile = yaml.safe_load(f)
try:
shutil.rmtree('{}.data'.format(args.jobfile)) # adjust this if you adjust the task dir format in load-leveller
shutil.rmtree('{}.results.yml'.format(args.jobfile))
except:
pass
#!/usr/bin/env python3
import yaml
import argparse
import subprocess
from loadleveller import clusterutils, jobfile
import glob
import sys
import os
parser = argparse.ArgumentParser(description='This helper program runs a loadleveller Monte Carlo program using a provided YAML-formatted jobfile. The jobfile contains information on how to run the job (what mc binary, mpi-parameters, ...) and a number of tasks with different simulation parameters each. When running on a cluster batch system, the batch script is generated using ygeneratebatchscript.')
parser.add_argument('jobfile', metavar='JOBFILE', help='Configuration file containing all the job information. May be generated using ytaskmaker')
parser.add_argument('-s','--single', action='store_true', help='Run in the single core scheduler mode')
parser.add_argument('-m','--merge', action='store_true', help='Merge the measurement data of all tasks')
parser.add_argument('--force', action='store_true', help='Ignore warnings about possible job corruption')
args = parser.parse_args()
jobdir = os.path.dirname(args.jobfile)
jobfile_name = os.path.basename(args.jobfile)
if jobdir != '':
os.chdir(jobdir)
if args.single and args.merge:
print('Error: cannot merge and run in single mode at the same time.')
sys.exit(1)
job = jobfile.JobFile(jobfile_name)
# check age of the different files
binary_modtime = os.stat(job.mc_binary).st_mtime
jobfile_modtime = os.stat(jobfile_name).st_mtime
try:
f = next(glob.iglob('{}.data/*/*.h5'.format(jobfile_name))) # only check one of the output files for speed
data_modtime = os.stat(f).st_mtime
error = False
label = 'Warning' if args.force else 'Error'
if binary_modtime > data_modtime:
print('{}: binary \'{}\' is newer than the checkpoint files.'.format(label, job.mc_binary))
error = True
if jobfile_modtime > data_modtime:
print('{}: jobfile \'{}\' is newer than the checkpoint files.'.format(label, jobfile_name))
error = True
if not args.force and error:
print('Use ydelete to start from a blank run or use \'--force\' to proceed if you are sure\nthe changes you made are compatible.')
sys.exit(1)
except StopIteration:
pass
if args.single:
cmd = [job.mc_binary, 'single', jobfile_name]
print('$ '+' '.join(cmd))
subprocess.run(cmd)
elif args.merge:
subprocess.run([job.mc_binary, 'merge', jobfile_name])
else:
clusterutils.run(jobfile_name, job.jobconfig, [job.mc_binary, jobfile_name])
......@@ -3,9 +3,9 @@
#include <dirent.h>
#include <fmt/format.h>
#include <fstream>
#include <iomanip>
#include <regex>
#include <sys/stat.h>
#include <iomanip>
namespace loadl {
......@@ -60,21 +60,22 @@ static double parse_duration(std::string str) {
}
std::string jobinfo::taskdir(int task_id) const {
return fmt::format("{}.data/{}", jobfile_name, task_names.at(task_id));
return fmt::format("{}.data/{}", jobname, task_names.at(task_id));
}
std::string jobinfo::rundir(int task_id, int run_id) const {
return fmt::format("{}/run{:04d}", taskdir(task_id), run_id);
}
jobinfo::jobinfo(const std::string &jobfile_name)
: jobfile_name{jobfile_name}, jobfile{jobfile_name} {
jobinfo::jobinfo(const std::string &jobfile_name) : jobfile{jobfile_name} {
for(auto node : jobfile["tasks"]) {
std::string task_name = node.first;
task_names.push_back(task_name);
}
std::string datadir = fmt::format("{}.data", jobfile_name);
jobname = jobfile.get<std::string>("jobname");
std::string datadir = fmt::format("{}.data", jobname);
int rc = mkdir(datadir.c_str(), 0755);
if(rc != 0 && errno != EEXIST) {
throw std::runtime_error{
......@@ -91,10 +92,7 @@ jobinfo::jobinfo(const std::string &jobfile_name)
}
}
// The jobconfig file contains information about the launch options, walltime, number of cores
// etc... not sure if this is really the best way to solve the issue.
auto jobconfig_path = jobfile.get<std::string>("jobconfig");
parser jobconfig{jobconfig_path};
parser jobconfig{jobfile["jobconfig"]};
walltime = parse_duration(jobconfig.get<std::string>("mc_walltime"));
checkpoint_time = parse_duration(jobconfig.get<std::string>("mc_checkpoint_time"));
......@@ -125,7 +123,7 @@ std::vector<std::string> jobinfo::list_run_files(const std::string &taskdir,
}
void jobinfo::concatenate_results() {
std::ofstream cat_results{fmt::format("{}.results.yml", jobfile_name)};
std::ofstream cat_results{fmt::format("{}.results.yml", jobname)};
for(size_t i = 0; i < task_names.size(); i++) {
std::ifstream res_file{taskdir(i) + "/results.yml"};
res_file.seekg(0, res_file.end);
......@@ -177,7 +175,7 @@ void runner_master::start() {
time_start_ = MPI_Wtime();
MPI_Comm_size(MPI_COMM_WORLD, &num_active_ranks_);
job_.log(fmt::format("Starting job '{}'", job_.jobfile_name));
job_.log(fmt::format("Starting job '{}'", job_.jobname));
read();
while(num_active_ranks_ > 1) {
......@@ -233,14 +231,14 @@ void runner_master::react() {
tasks_[task_id].sweeps += completed_sweeps;
if(tasks_[task_id].is_done()) {
tasks_[task_id].scheduled_runs--;
if(tasks_[task_id].scheduled_runs > 0) {
job_.log(fmt::format("{} has enough sweeps. Waiting for {} busy ranks.",
job_.task_names[task_id], tasks_[task_id].scheduled_runs));
job_.task_names[task_id], tasks_[task_id].scheduled_runs));
send_action(A_NEW_JOB, node);
} else {
job_.log(fmt::format("{} is done. Merging.", job_.task_names[task_id]));
send_action(A_PROCESS_DATA_NEW_JOB, node);
}
} else if(time_is_up()) {
......@@ -299,7 +297,7 @@ void runner_slave::start() {
int action = what_is_next(S_IDLE);
while(action != A_EXIT) {
if(action == A_NEW_JOB) {
sys_ = std::unique_ptr<mc>{mccreator_(job_.jobfile_name, job_.task_names[task_id_])};
sys_ = std::unique_ptr<mc>{mccreator_(job_.jobname, job_.task_names[task_id_])};
if(!sys_->_read(job_.rundir(task_id_, run_id_))) {
sys_->_init();
// checkpointing();
......@@ -393,5 +391,4 @@ void runner_slave::merge_measurements() {
sys_->register_evalables(evalables);
job_.merge_task(task_id_, evalables);
}
}
......@@ -14,8 +14,8 @@
namespace loadl {
struct jobinfo {
std::string jobfile_name;
parser jobfile;
std::string jobname;
std::vector<std::string> task_names;
......
......@@ -25,7 +25,7 @@ int runner_single::start() {
read();
task_id_ = get_new_task_id(task_id_);
while(task_id_ != -1 && !time_is_up()) {
sys_ = std::unique_ptr<mc>{mccreator_(job_.jobfile_name, job_.task_names.at(task_id_))};
sys_ = std::unique_ptr<mc>{mccreator_(job_.jobname, job_.task_names.at(task_id_))};
if(!sys_->_read(job_.rundir(task_id_, 1))) {
sys_->_init();
}
......
Markdown is supported
0% or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment