Commit d2ba8249 authored by Lukas Weber's avatar Lukas Weber

fix multirun sweep counting

parent 524c0635
......@@ -31,15 +31,15 @@ class JobProgress:
for runfile in glob.iglob('{}.data/{}/run*.dump.h5'.format(self.jobfile.jobname,task)):
tp.num_runs += 1
sweeps_per_global_update = 1
if 'parallel_tempering_parameter' in jobfile.jobconfig.keys():
sweeps_per_global_update = jobfile.tasks[task].get('pt_sweeps_per_global_update',1)
with h5py.File(runfile, 'r') as f:
sweeps = f['/sweeps'][0]//jobfile.tasks[task].get('pt_sweeps_per_global_update',1)
tp.therm_sweeps += min(sweeps,tp.target_therm)
tp.sweeps += max(0,sweeps - tp.target_therm)
tp.sweeps += f['/sweeps'][0]//sweeps_per_global_update
tp.therm_sweeps += f['/thermalization_sweeps'][0]//sweeps_per_global_update
if tp.therm_sweeps < tp.target_therm or tp.sweeps < tp.target_sweeps:
if tp.sweeps < tp.target_sweeps:
self.restart = True
self.progress.append(tp)
......
......@@ -3,7 +3,7 @@ namespace loadl {
mc::mc(const parser &p) : param{p} {
therm_ = p.get<int>("thermalization");
pt_sweeps_per_global_update_ = p.get<int>("pt_sweeps_per_global_update", -1);
pt_sweeps_per_global_update_ = p.get<int>("pt_sweeps_per_global_update", 1);
}
void mc::write_output(const std::string &) {}
......@@ -82,7 +82,12 @@ void mc::_write(const std::string &dir) {
checkpoint_write(g.open_group("simulation"));
measure.checkpoint_write(g.open_group("measurements"));
g.write("sweeps", sweep_);
int therm = therm_;
if(pt_mode_) {
therm *= pt_sweeps_per_global_update_;
}
g.write("thermalization_sweeps", std::min(sweep_,therm_));
g.write("sweeps", std::max(0,sweep_-therm_));
}
rename((dir + ".dump.h5.tmp").c_str(), (dir + ".dump.h5").c_str());
......@@ -108,7 +113,10 @@ bool mc::_read(const std::string &dir) {
measure.checkpoint_read(g.open_group("measurements"));
checkpoint_read(g.open_group("simulation"));
g.read("sweeps", sweep_);
int sweeps, therm_sweeps;
g.read("thermalization_sweeps", therm_sweeps);
g.read("sweeps", sweeps);
sweep_ = sweeps + therm_sweeps;
clock_gettime(CLOCK_MONOTONIC_RAW, &tend);
measure.add("_ll_checkpoint_read_time",
......
......@@ -85,9 +85,7 @@ void runner_master::react() {
send_action(A_NEW_JOB, node);
tasks_[current_task_id_].scheduled_runs++;
int msg[3] = {current_task_id_, tasks_[current_task_id_].scheduled_runs,
tasks_[current_task_id_].target_sweeps +
tasks_[current_task_id_].target_thermalization -
tasks_[current_task_id_].sweeps};
tasks_[current_task_id_].target_sweeps - tasks_[current_task_id_].sweeps};
MPI_Send(&msg, sizeof(msg) / sizeof(msg[0]), MPI_INT, node, T_NEW_JOB, MPI_COMM_WORLD);
}
} else if(node_status == S_BUSY) {
......@@ -126,11 +124,10 @@ void runner_master::read() {
auto task = job_.jobfile["tasks"][job_.task_names[i]];
int target_sweeps = task.get<int>("sweeps");
int target_thermalization = task.get<int>("thermalization");
int sweeps = job_.read_dump_progress(i);
int scheduled_runs = 0;
tasks_.emplace_back(target_sweeps, target_thermalization, sweeps, scheduled_runs);
tasks_.emplace_back(target_sweeps, sweeps, scheduled_runs);
}
}
......@@ -163,10 +160,10 @@ void runner_slave::start() {
while(sweeps_since_last_query_ < sweeps_before_communication_) {
sys_->_do_update();
sweeps_since_last_query_++;
if(sys_->is_thermalized()) {
sys_->_do_measurement();
sweeps_since_last_query_++;
}
if(is_checkpoint_time() || time_is_up()) {
......
......@@ -124,7 +124,7 @@ std::tuple<double, double> pt_chain::optimize_params() {
}
bool pt_chain::is_done() {
return sweeps >= target_sweeps + target_thermalization;
return sweeps >= target_sweeps;
}
int runner_pt_start(jobinfo job, const mc_factory &mccreator, int argc, char **argv) {
......@@ -182,20 +182,20 @@ void runner_pt_master::construct_pt_chains() {
chain.task_ids.at(chain_pos) = i;
const char *pt_sweep_error =
"in parallel tempering mode, sweeps are measured in global updates and need to be the "
"chain {}: in parallel tempering mode, sweeps are measured in global updates and need to be the "
"same within each chain: {} = {} != {}";
int target_sweeps = task.get<int>("sweeps");
if(chain.target_sweeps >= 0 && target_sweeps != chain.target_sweeps) {
throw std::runtime_error{
fmt::format(pt_sweep_error, "target_sweeps", chain.target_sweeps, target_sweeps)};
fmt::format(pt_sweep_error, chain.id, "target_sweeps", chain.target_sweeps, target_sweeps)};
}
chain.target_sweeps = target_sweeps;
int target_thermalization = task.get<int>("thermalization");
if(chain.target_thermalization >= 0 &&
target_thermalization != chain.target_thermalization) {
throw std::runtime_error{fmt::format(pt_sweep_error, "thermalization",
throw std::runtime_error{fmt::format(pt_sweep_error, chain.id, "thermalization",
chain.target_thermalization,
target_thermalization)};
}
......@@ -204,7 +204,7 @@ void runner_pt_master::construct_pt_chains() {
int sweeps_per_global_update = task.get<int>("pt_sweeps_per_global_update");
int sweeps = job_.read_dump_progress(i) / sweeps_per_global_update;
if(chain.sweeps >= 0 && sweeps != chain.sweeps) {
throw std::runtime_error{fmt::format(pt_sweep_error, "sweeps", chain.sweeps, sweeps)};
throw std::runtime_error{fmt::format(pt_sweep_error, chain.id, "sweeps", chain.sweeps, sweeps)};
}
chain.sweeps = sweeps;
}
......@@ -402,7 +402,7 @@ int runner_pt_master::assign_new_chain(int rank_section) {
auto &chain = pt_chains_[chain_run.id];
msg[0] = chain.task_ids[target];
msg[1] = chain_run.run_id;
msg[2] = chain.target_sweeps + chain.target_thermalization - chain.sweeps;
msg[2] = chain.target_sweeps + chain.sweeps;
} else {
// this will prompt the slave to quit
num_active_ranks_--;
......@@ -593,7 +593,9 @@ void runner_pt_slave::start() {
if(sys_->sweep() % sweeps_per_global_update_ == 0) {
pt_global_update();
if(sys_->is_thermalized()) {
sweeps_since_last_query_++;
}
timeout = negotiate_timeout();
if(timeout != TR_CONTINUE) {
......
......@@ -35,9 +35,9 @@ int runner_single::start() {
while(!tasks_[task_id_].is_done() && !time_is_up()) {
sys_->_do_update();
tasks_[task_id_].sweeps++;
if(sys_->is_thermalized()) {
sys_->_do_measurement();
tasks_[task_id_].sweeps++;
}
if(is_checkpoint_time()) {
......@@ -79,11 +79,10 @@ void runner_single::read() {
auto task = job_.jobfile["tasks"][job_.task_names[i]];
int target_sweeps = task.get<int>("sweeps");
int target_thermalization = task.get<int>("thermalization");
int sweeps = 0;
sweeps = job_.read_dump_progress(i);
tasks_.emplace_back(target_sweeps, target_thermalization, sweeps, 0);
tasks_.emplace_back(target_sweeps, sweeps, 0);
}
}
......
......@@ -3,12 +3,12 @@
namespace loadl {
runner_task::runner_task(int target_sweeps, int target_thermalization, int sweeps,
runner_task::runner_task(int target_sweeps, int sweeps,
int scheduled_runs)
: target_sweeps{target_sweeps}, target_thermalization{target_thermalization}, sweeps{sweeps},
: target_sweeps{target_sweeps}, sweeps{sweeps},
scheduled_runs{scheduled_runs} {}
bool runner_task::is_done() const {
return sweeps >= (target_sweeps + target_thermalization);
return sweeps > target_sweeps;
}
}
......@@ -5,11 +5,10 @@ namespace loadl {
// used by the runner
struct runner_task {
int target_sweeps;
int target_thermalization;
int sweeps;
int scheduled_runs;
bool is_done() const;
runner_task(int target_sweeps, int target_thermalization, int sweeps, int scheduled_runs);
runner_task(int target_sweeps, int sweeps, int scheduled_runs);
};
}
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment