Commit 55921b95 authored by Lukas Weber's avatar Lukas Weber

sort tasks for reliable order, prevent writing of partial measurements

parent 404ee781
......@@ -58,11 +58,11 @@ def run():
pass
if args_run.single:
cmd = [job.jobconfig['mc_binary'], 'single', job.jobname]
cmd = [job.jobconfig['mc_binary'], 'single', jobfile_name]
print('$ '+' '.join(cmd))
subprocess.run(cmd)
else:
clusterutils.run(job.jobname, job.jobconfig, [job.jobconfig['mc_binary'], job.jobname])
clusterutils.run(job.jobname, job.jobconfig, [job.jobconfig['mc_binary'], jobfile_name])
def delete():
import shutil
......@@ -77,7 +77,7 @@ def delete():
os.unlink(results_file)
def merge():
cmd = [job.jobconfig['mc_binary'], 'merge', job.jobname]
cmd = [job.jobconfig['mc_binary'], 'merge', jobfile_name]
print('$ '+' '.join(cmd))
subprocess.run(cmd)
......@@ -87,9 +87,6 @@ def status():
sys.exit(rc)
if args.command == 'delete' or args.command == 'd':
delete()
elif args.command == 'merge' or args.command == 'm':
......
......@@ -9,7 +9,7 @@ batchscript_claix18 = '''#!/usr/bin/env zsh
#SBATCH --job-name={jobname}
#SBATCH --time={walltime}
#SBATCH --mem-per-cpu={mem_per_cpu}
##SBATCH -P {project} #TODO figure out how projects work
##SBATCH --account={project} #TODO
#SBATCH --ntasks={num_cores}
#SBATCH --export=NONE
#SBATCH --output=output.%x
......@@ -71,7 +71,9 @@ def run(jobname, jobconfig, cmd):
os.system(mpicmd)
else:
with tempfile.NamedTemporaryFile(mode='w',delete=False) as f:
f.write(generate_batchscript(batchscript_templates[sysinfo], cmd, jobname, jobconfig))
batchscript = generate_batchscript(batchscript_templates[sysinfo], cmd, jobname, jobconfig)
print(batchscript)
f.write(batchscript)
bscriptname = f.name
mpicmd = batch_commands[sysinfo].format(batchscript=bscriptname)
print('$ '+mpicmd)
......
......@@ -15,7 +15,8 @@ class TaskProgress:
class JobProgress:
def __init__(self, jobfile):
self.jobfile = jobfile
self.tasks = jobfile.tasks.keys()
self.tasks = list(jobfile.tasks.keys())
self.tasks.sort()
self.restart = False
self.progress = []
......@@ -34,11 +35,11 @@ class JobProgress:
tp.num_runs += 1
with h5py.File(runfile, 'r') as f:
tp.sweeps += f['/sweeps'][0]
tp.therm_sweeps += f['/thermalization_sweeps'][0]
tp.sweeps += f['/sweeps'][0]-f['/thermalization_sweeps'][0]
if tp.sweeps < tp.target_sweeps + tp.target_therm:
if tp.sweeps < tp.target_sweeps:
self.restart = True
self.progress.append(tp)
......@@ -95,7 +96,7 @@ def print_status(jobfile, args):
for task, tp in zip(job_prog.tasks, job_prog.progress):
therm_per_run = tp.therm_sweeps/tp.num_runs if tp.num_runs > 0 else 0
print('{t}: {tp.num_runs} runs, {tp.sweeps}/{tp.target_sweeps} sweeps, {therm_per_run}/{tp.target_therm} thermalization'.format(t=task,tp=tp,therm_per_run=int(round(therm_per_run))))
print('{t}: {tp.num_runs} runs, {tp.sweeps:8d}/{tp.target_sweeps} sweeps, {therm_per_run:8d}/{tp.target_therm} thermalization'.format(t=task,tp=tp,therm_per_run=int(round(therm_per_run))))
except FileNotFoundError as e:
print("Error: jobfile '{}' not found.".format(args.jobfile))
......@@ -30,7 +30,7 @@ void evalable::jackknife(const results &res, observable_result &obs_res) const {
if(res.observables.count(obs_name) <= 0) {
std::cerr << fmt::format(
"Warning: evalable '{}': used observable '{}' not found in Monte Carlo results. "
"Skipping...",
"Skipping...\n",
name_, obs_name);
return;
}
......
......@@ -13,14 +13,13 @@ bool measurements::observable_name_is_legal(const std::string &obs_name) {
return true;
}
void measurements::add_observable(const std::string &name, size_t bin_size, size_t vector_length,
size_t initial_length) {
void measurements::add_observable(const std::string &name, size_t bin_size, size_t vector_length) {
if(!observable_name_is_legal(name)) {
throw std::runtime_error(
fmt::format("Illegal observable name '{}': names must not contain / or .", name));
}
observables_.emplace(name, observable{name, bin_size, vector_length, initial_length});
observables_.emplace(name, observable{name, bin_size, vector_length});
}
void measurements::checkpoint_write(const iodump::group &dump_file) {
......
......@@ -13,8 +13,7 @@ class measurements {
public:
static bool observable_name_is_legal(const std::string &name);
void add_observable(const std::string &name, size_t bin_size = 1, size_t vector_length = 1,
size_t initial_length = 1000);
void add_observable(const std::string &name, size_t bin_size = 1, size_t vector_length = 1);
// use this to add a measurement sample to an observable.
template<class T>
......
......@@ -3,11 +3,9 @@
namespace loadl {
observable::observable(std::string name, size_t bin_length, size_t vector_length,
size_t initial_length)
: name_{std::move(name)}, bin_length_{bin_length}, vector_length_{vector_length},
initial_length_{initial_length}, current_bin_{0}, current_bin_filling_{0} {
samples_.reserve(vector_length_ * initial_length_);
observable::observable(std::string name, size_t bin_length, size_t vector_length)
: name_{std::move(name)}, bin_length_{bin_length}, vector_length_{vector_length}, current_bin_{0}, current_bin_filling_{0} {
samples_.reserve(vector_length_ * initial_bin_length);
}
const std::string &observable::name() const {
......@@ -21,6 +19,9 @@ void observable::checkpoint_write(const iodump::group &dump_file) const {
// So if current_bin_ is not 0 here, we have made a mistake.
assert(current_bin_ == 0);
// Another sanity check: if there is a partial bin, the samples_ array should contain it.
assert(current_bin_filling_ > 0 && samples_.size() == 1);
dump_file.write("name", name_);
dump_file.write("vector_length", vector_length_);
dump_file.write("bin_length", bin_length_);
......@@ -32,8 +33,8 @@ void observable::checkpoint_write(const iodump::group &dump_file) const {
void observable::measurement_write(const iodump::group &meas_file) {
std::vector<double> current_bin_value;
// if there is not even one bin…
if(samples_.size() > vector_length_) {
// if there is at least one bin…
if(samples_.size() >= vector_length_) {
current_bin_value.assign(samples_.end() - vector_length_, samples_.end());
samples_.resize(current_bin_ * vector_length_);
}
......
......@@ -10,8 +10,7 @@ namespace loadl {
class observable {
public:
observable(std::string name, size_t bin_length = 1, size_t vector_length = 1,
size_t initial_length = 1000);
observable(std::string name, size_t bin_length = 1, size_t vector_length = 1);
const std::string &name() const;
......@@ -29,6 +28,8 @@ public:
void checkpoint_read(const iodump::group &dump_file);
private:
static const size_t initial_bin_length = 1000;
std::string name_;
size_t bin_length_;
size_t vector_length_;
......
......@@ -72,6 +72,7 @@ jobinfo::jobinfo(const std::string &jobfile_name) : jobfile{jobfile_name} {
std::string task_name = node.first;
task_names.push_back(task_name);
}
std::sort(task_names.begin(), task_names.end());
jobname = jobfile.get<std::string>("jobname");
......@@ -297,7 +298,8 @@ void runner_slave::start() {
int action = what_is_next(S_IDLE);
while(action != A_EXIT) {
if(action == A_NEW_JOB) {
sys_ = std::unique_ptr<mc>{mccreator_(job_.jobfile["tasks"][job_.task_names[task_id_]])};
sys_ =
std::unique_ptr<mc>{mccreator_(job_.jobfile["tasks"][job_.task_names[task_id_]])};
if(!sys_->_read(job_.rundir(task_id_, run_id_))) {
sys_->_init();
// checkpointing();
......@@ -324,7 +326,11 @@ void runner_slave::start() {
checkpointing();
action = what_is_next(S_BUSY);
}
job_.log(fmt::format("rank {} out of work", rank_));
if(time_is_up()) {
job_.log(fmt::format("rank {} exits: time limit reached", rank_));
} else {
job_.log(fmt::format("rank {} exits: out of work", rank_));
}
}
bool runner_slave::is_checkpoint_time() {
......
Markdown is supported
0% or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment