Commit 296e2f1e authored by Lukas Weber's avatar Lukas Weber
Browse files

seemingly working parallel tempering

parent 253a9209
......@@ -271,5 +271,4 @@ bool file_exists(const std::string &path) {
struct stat buf;
return stat(path.c_str(), &buf) == 0;
}
}
......@@ -3,8 +3,8 @@
#include <fmt/format.h>
#include <hdf5.h>
#include <string>
#include <vector>
#include <sys/stat.h>
#include <vector>
namespace loadl {
......@@ -114,7 +114,7 @@ public:
// TODO: once the intel compiler can do guaranteed copy elision,
// please uncomment this line! and be careful about bugs!
//iodump(iodump &) = delete;
// iodump(iodump &) = delete;
~iodump();
friend class group;
......@@ -268,5 +268,4 @@ void iodump::group::read(const std::string &name, T &value) const {
// utility
bool file_exists(const std::string &path);
}
......@@ -7,14 +7,6 @@ mc::mc(const parser &p) : param{p} {
void mc::write_output(const std::string &) {}
void mc::random_init() {
if(param.defined("seed")) {
rng.reset(new random_number_generator(param.get<uint64_t>("seed")));
} else {
rng.reset(new random_number_generator());
}
}
double mc::random01() {
return rng->random_double();
}
......@@ -29,7 +21,17 @@ void mc::_init() {
measure.add_observable("_ll_checkpoint_write_time", 1);
measure.add_observable("_ll_measurement_time", 1000);
measure.add_observable("_ll_sweep_time", 1000);
random_init();
if(param.get<bool>("pt_statistics", false)) {
measure.add_observable("_ll_pt_rank", 1);
}
if(param.defined("seed")) {
rng.reset(new random_number_generator(param.get<uint64_t>("seed")));
} else {
rng.reset(new random_number_generator());
}
init();
}
......@@ -40,8 +42,9 @@ void mc::_do_measurement() {
do_measurement();
clock_gettime(CLOCK_MONOTONIC_RAW, &tend);
double measurement_time = (tend.tv_sec - tstart.tv_sec) + 1e-9 * (tend.tv_nsec - tstart.tv_nsec);
double measurement_time =
(tend.tv_sec - tstart.tv_sec) + 1e-9 * (tend.tv_nsec - tstart.tv_nsec);
measure.add("_ll_measurement_time", measurement_time);
if(measurement_time > max_meas_time_) {
max_meas_time_ = measurement_time;
......@@ -69,6 +72,12 @@ void mc::_pt_update_param(double new_param, const std::string &new_dir) {
iodump dump_file = iodump::create(new_dir + ".dump.h5.tmp");
measure.checkpoint_read(dump_file.get_root().open_group("measurements"));
}
if(param.get<bool>("pt_statistics", false)) {
int rank;
MPI_Comm_rank(MPI_COMM_WORLD, &rank);
measure.add("_ll_pt_rank", rank);
}
pt_update_param(new_param);
}
......@@ -98,10 +107,10 @@ void mc::_write(const std::string &dir) {
g.write("thermalization_sweeps", std::min(therm_, sweep_)); // only for convenience
}
rename((dir + ".dump.h5.tmp").c_str(), (dir + ".dump.h5").c_str());
clock_gettime(CLOCK_MONOTONIC_RAW, &tend);
double checkpoint_write_time = (tend.tv_sec - tstart.tv_sec) + 1e-9 * (tend.tv_nsec - tstart.tv_nsec);
double checkpoint_write_time =
(tend.tv_sec - tstart.tv_sec) + 1e-9 * (tend.tv_nsec - tstart.tv_nsec);
measure.add("_ll_checkpoint_write_time", checkpoint_write_time);
if(checkpoint_write_time > max_checkpoint_write_time_) {
max_checkpoint_write_time_ = checkpoint_write_time;
......@@ -110,10 +119,9 @@ void mc::_write(const std::string &dir) {
double mc::safe_exit_interval() {
// this is more or less guesswork in an attempt to make it safe for as many cases as possible
return 2*(max_checkpoint_write_time_ + max_sweep_time_ + max_meas_time_) + 2;
return 2 * (max_checkpoint_write_time_ + max_sweep_time_ + max_meas_time_) + 2;
}
bool mc::_read(const std::string &dir) {
if(!file_exists(dir + ".dump.h5")) {
return false;
......
......@@ -12,15 +12,15 @@ namespace loadl {
class mc {
private:
void random_init();
int sweep_ = 0;
int therm_ = 0;
// The following times in seconds are used to estimate a safe exit interval before walltime is up.
// The following times in seconds are used to estimate a safe exit interval before walltime is
// up.
double max_checkpoint_write_time_{0};
double max_sweep_time_{0};
double max_meas_time_{0};
protected:
parser param;
std::unique_ptr<random_number_generator> rng;
......@@ -34,6 +34,7 @@ protected:
virtual void pt_update_param(double /*new_param*/) {
throw std::runtime_error{"running parallel tempering, but pt_update_param not implemented"};
}
public:
double random01();
int sweep() const;
......@@ -43,7 +44,7 @@ public:
throw std::runtime_error{"running parallel tempering, but pt_weight_ratio not implemented"};
return 1;
}
// these functions do a little more, like taking care of the
// random number generator state, then call the child class versions.
void _init();
......
......@@ -33,7 +33,8 @@ private:
template<class T>
void measurements::add(const std::string name, T value) {
if(observables_.count(name) == 0) {
throw std::runtime_error{fmt::format("tried to add to observable '{}' which was not registered!", name)};
throw std::runtime_error{
fmt::format("tried to add to observable '{}' which was not registered!", name)};
}
observables_.at(name).add(value);
}
......
......@@ -91,7 +91,8 @@ results merge(const std::vector<std::string> &filenames, const std::vector<evala
if(obs.total_sample_count <= min_bin_count) {
obs.rebinning_bin_count = obs.total_sample_count;
} else {
obs.rebinning_bin_count = min_bin_count + cbrt(obs.total_sample_count-min_bin_count);
obs.rebinning_bin_count =
min_bin_count + cbrt(obs.total_sample_count - min_bin_count);
}
} else {
obs.rebinning_bin_count = rebinning_bin_count;
......
......@@ -17,5 +17,4 @@ void rng_internal_mersenne::backend_checkpoint_read(const iodump::group &d) {
d.read("state", rand_state);
mtrand_.load(rand_state);
}
}
......@@ -77,6 +77,7 @@ public:
class rng_internal_mersenne {
private:
MTRand mtrand_;
public:
void backend_checkpoint_write(const iodump::group &dump_file);
void backend_checkpoint_read(const iodump::group &dump_file);
......@@ -85,7 +86,7 @@ public:
double random_double() {
return mtrand_.randDblExc(1);
}
int random_integer(int bound) {
return mtrand_.randInt(bound - 1);
}
......
......@@ -30,7 +30,6 @@ struct observable_result {
std::vector<double> autocorrelation_time;
};
// results holds the means and errors merged from all the runs belonging to a task
// this includes both regular observables and evalables.
struct results {
......
#include "runner.h"
#include "merger.h"
#include "runner_pt.h"
#include <dirent.h>
#include <fmt/format.h>
#include <fstream>
#include <iomanip>
#include <regex>
#include <sys/stat.h>
#include "runner_pt.h"
namespace loadl {
enum {
......@@ -35,13 +35,13 @@ static int parse_duration(const std::string &str) {
if(idx == str.size()) {
return i1;
} else if(str[idx] == ':') {
std::string str1 = str.substr(idx+1);
std::string str1 = str.substr(idx + 1);
int i2 = std::stoi(str1, &idx, 10);
if(idx == str1.size()) {
return 60 * i1 + i2;
} else if(str[idx] == ':') {
std::string str2 = str1.substr(idx+1);
std::string str2 = str1.substr(idx + 1);
int i3 = std::stoi(str2, &idx, 10);
if(idx != str2.size()) {
throw std::runtime_error{"minutes"};
......@@ -55,8 +55,8 @@ static int parse_duration(const std::string &str) {
throw std::runtime_error{"seconds"};
}
} catch(std::exception &e) {
throw std::runtime_error{
fmt::format("'{}' does not fit time format [[hours:]minutes:]seconds: {}", str, e.what())};
throw std::runtime_error{fmt::format(
"'{}' does not fit time format [[hours:]minutes:]seconds: {}", str, e.what())};
}
}
......@@ -236,9 +236,10 @@ void runner_master::react() {
send_action(A_NEW_JOB, node);
tasks_[current_task_id_].scheduled_runs++;
int msg[3] = {current_task_id_, tasks_[current_task_id_].scheduled_runs,
tasks_[current_task_id_].target_sweeps+tasks_[current_task_id_].target_thermalization-tasks_[current_task_id_].sweeps};
MPI_Send(&msg, sizeof(msg) / sizeof(msg[0]), MPI_INT, node, T_NEW_JOB,
MPI_COMM_WORLD);
tasks_[current_task_id_].target_sweeps +
tasks_[current_task_id_].target_thermalization -
tasks_[current_task_id_].sweeps};
MPI_Send(&msg, sizeof(msg) / sizeof(msg[0]), MPI_INT, node, T_NEW_JOB, MPI_COMM_WORLD);
}
} else if(node_status == S_BUSY) {
int msg[2];
......@@ -327,10 +328,11 @@ void runner_slave::start() {
if(time_is_up()) {
what_is_next(S_TIMEUP);
job_.log(fmt::format("rank {} exits: walltime up (safety interval = {} s)", rank_, sys_->safe_exit_interval()));
job_.log(fmt::format("rank {} exits: walltime up (safety interval = {} s)", rank_,
sys_->safe_exit_interval()));
break;
}
action = what_is_next(S_BUSY);
}
......@@ -348,7 +350,7 @@ bool runner_slave::time_is_up() {
if(sys_ != nullptr) {
safe_interval = sys_->safe_exit_interval();
}
return MPI_Wtime() - time_start_ > job_.walltime-safe_interval;
return MPI_Wtime() - time_start_ > job_.walltime - safe_interval;
}
int runner_slave::what_is_next(int status) {
......
......@@ -52,6 +52,7 @@ private:
void react();
void send_action(int action, int destination);
public:
runner_master(jobinfo job);
void start();
......
......@@ -23,34 +23,34 @@ enum {
T_NEW_JOB,
T_STATUS,
MASTER=0
MASTER = 0
};
pt_chain_run::pt_chain_run(const pt_chain& chain, int run_id)
: id{chain.id}, run_id{run_id}, params{chain.start_params} {
pt_chain_run::pt_chain_run(const pt_chain &chain, int run_id)
: id{chain.id}, run_id{run_id}, params{chain.start_params} {
node_to_pos.resize(params.size());
done.resize(params.size());
weight_ratios.resize(params.size());
weight_ratios.resize(params.size(), -1);
for(size_t i = 0; i < node_to_pos.size(); i++) {
node_to_pos[i] = i;
}
}
pt_chain_run pt_chain_run::checkpoint_read(const iodump::group& g) {
pt_chain_run pt_chain_run::checkpoint_read(const iodump::group &g) {
pt_chain_run run;
g.read("id", run.id);
g.read("run_id", run.run_id);
g.read("params", run.params);
g.read("node_to_pos", run.node_to_pos);
run.weight_ratios.resize(run.params.size());
run.weight_ratios.resize(run.params.size(), -1);
run.done.resize(run.params.size());
return run;
}
void pt_chain_run::checkpoint_write(const iodump::group& g) {
void pt_chain_run::checkpoint_write(const iodump::group &g) {
g.write("id", id);
g.write("run_id", run_id);
g.write("node_to_pos", node_to_pos);
......@@ -58,7 +58,8 @@ void pt_chain_run::checkpoint_write(const iodump::group& g) {
}
bool pt_chain::is_done() {
return std::all_of(sweeps.begin(), sweeps.end(), [this](double s) { return s >= target_sweeps + target_thermalization;});
return std::all_of(sweeps.begin(), sweeps.end(),
[this](double s) { return s >= target_sweeps + target_thermalization; });
}
int runner_pt_start(jobinfo job, const mc_factory &mccreator, int argc, char **argv) {
......@@ -80,35 +81,35 @@ int runner_pt_start(jobinfo job, const mc_factory &mccreator, int argc, char **a
return 0;
}
runner_pt_master::runner_pt_master(jobinfo job) : job_{std::move(job)} {
rng_ = std::make_unique<random_number_generator>();
}
void runner_pt_master::construct_pt_chains() {
std::string pt_param = job_.jobfile["jobconfig"].get<std::string>("parallel_tempering_parameter");
std::string pt_param =
job_.jobfile["jobconfig"].get<std::string>("parallel_tempering_parameter");
for(size_t i = 0; i < job_.task_names.size(); i++) {
auto task = job_.jobfile["tasks"][job_.task_names[i]];
auto [chain_id, chain_pos] = task.get<std::pair<int, int>>("pt_chain");
auto[chain_id, chain_pos] = task.get<std::pair<int, int>>("pt_chain");
if(chain_id < 0 || chain_pos < 0) {
throw std::runtime_error{"parallel tempering pt_chain indices must be nonnegative"};
}
if(chain_id >= static_cast<int>(pt_chains_.size())) {
pt_chains_.resize(chain_id+1);
pt_chains_.resize(chain_id + 1);
}
auto &chain = pt_chains_.at(chain_id);
chain.id = chain_id;
if(chain_pos >= static_cast<int>(chain.task_ids.size())) {
chain.task_ids.resize(chain_pos+1, -1);
chain.start_params.resize(chain_pos+1);
chain.sweeps.resize(chain_pos+1);
chain.task_ids.resize(chain_pos + 1, -1);
chain.start_params.resize(chain_pos + 1);
chain.sweeps.resize(chain_pos + 1);
}
if(chain.task_ids.at(chain_pos) != -1) {
throw std::runtime_error{"parallel tempering pt_chain map not unique"};
}
......@@ -116,7 +117,8 @@ void runner_pt_master::construct_pt_chains() {
chain.start_params.at(chain_pos) = task.get<double>(pt_param);
chain.task_ids.at(chain_pos) = i;
chain.target_sweeps = std::max(chain.target_sweeps, task.get<int>("sweeps"));
chain.target_thermalization = std::max(chain.target_thermalization, task.get<int>("thermalization"));
chain.target_thermalization =
std::max(chain.target_thermalization, task.get<int>("thermalization"));
chain.sweeps[chain_pos] = job_.read_dump_progress(chain_pos);
}
......@@ -126,18 +128,22 @@ void runner_pt_master::construct_pt_chains() {
if(c.id == -1) {
throw std::runtime_error{"parallel tempering pt_chain map contains gaps"};
}
if(chain_len_ != -1 && chain_len_ != static_cast<int>(c.task_ids.size())) {
throw std::runtime_error{"parallel tempering pt_chains do not have the same length"};
}
chain_len_ = c.task_ids.size();
}
if(chain_len_ == -1) {
throw std::runtime_error{"parallel tempering pt_chain mapping missing. You need to specify 'pt_chain: [chain_id, chain_position]' for every task in the job."};
throw std::runtime_error{
"parallel tempering pt_chain mapping missing. You need to specify 'pt_chain: "
"[chain_id, chain_position]' for every task in the job."};
}
if((num_active_ranks_-1) % chain_len_ != 0) {
throw std::runtime_error{"parallel tempering: number of ranks should be of the form chain_length*n+1 for best efficiency"};
if((num_active_ranks_ - 1) % chain_len_ != 0) {
throw std::runtime_error{
"parallel tempering: number of ranks should be of the form chain_length*n+1 for best "
"efficiency"};
}
}
......@@ -145,14 +151,15 @@ void runner_pt_master::checkpoint_read() {
construct_pt_chains();
std::string master_dump_name = job_.jobdir() + "/pt_master.dump.h5";
if(file_exists(master_dump_name)) {
if(file_exists(master_dump_name)) {
iodump dump = iodump::open_readonly(master_dump_name);
auto g = dump.get_root();
rng_->checkpoint_read(g.open_group("random_number_generator"));
auto pt_chain_runs = g.open_group("pt_chain_runs");
for(std::string name : pt_chain_runs) {
pt_chain_runs_.emplace_back(pt_chain_run::checkpoint_read(pt_chain_runs.open_group(name)));
pt_chain_runs_.emplace_back(
pt_chain_run::checkpoint_read(pt_chain_runs.open_group(name)));
}
g.read("current_chain_id", current_chain_id_);
......@@ -166,14 +173,15 @@ void runner_pt_master::checkpoint_write() {
std::string master_dump_name = job_.jobdir() + "/pt_master.dump.h5";
job_.log(fmt::format("master: checkpoint {}", master_dump_name));
iodump dump = iodump::create(master_dump_name);
auto g = dump.get_root();
rng_->checkpoint_write(g.open_group("random_number_generator"));
auto pt_chain_runs = g.open_group("pt_chain_runs");
for(auto &c : pt_chain_runs_) {
c.checkpoint_write(pt_chain_runs.open_group(fmt::format("chain{:04d}_run{:04d}", c.id, c.run_id)));
c.checkpoint_write(
pt_chain_runs.open_group(fmt::format("chain{:04d}_run{:04d}", c.id, c.run_id)));
}
g.write("current_chain_id", current_chain_id_);
......@@ -186,12 +194,12 @@ void runner_pt_master::start() {
job_.log(fmt::format("Starting job '{}'", job_.jobname));
checkpoint_read();
for(int node_section = 0; node_section < (num_active_ranks_-1)/chain_len_; node_section++) {
for(int node_section = 0; node_section < (num_active_ranks_ - 1) / chain_len_; node_section++) {
assign_new_chain(node_section);
}
time_last_checkpoint_ = MPI_Wtime();
while(num_active_ranks_ > 1) {
react();
if(MPI_Wtime() - time_last_checkpoint_ > job_.checkpoint_time) {
......@@ -209,7 +217,7 @@ int runner_pt_master::schedule_chain_run() {
int new_chain_id = (old_id + i) % nchains;
auto &chain = pt_chains_[new_chain_id];
chain.scheduled_runs++;
int idx = 0;
for(auto &run : pt_chain_runs_) {
if(run.id == chain.id && run.run_id == chain.scheduled_runs) {
......@@ -219,7 +227,7 @@ int runner_pt_master::schedule_chain_run() {
}
pt_chain_runs_.emplace_back(chain, chain.scheduled_runs);
return pt_chain_runs_.size()-1;
return pt_chain_runs_.size() - 1;
}
}
......@@ -231,19 +239,20 @@ int runner_pt_master::assign_new_chain(int node_section) {
int chain_run_id = schedule_chain_run();
for(int target = 0; target < chain_len_; target++) {
int msg[3] = {-1,0,0};
int msg[3] = {-1, 0, 0};
if(chain_run_id >= 0) {
auto &chain_run = pt_chain_runs_[chain_run_id];
auto &chain = pt_chains_[chain_run.id];
msg[0] = chain.task_ids[target];
msg[1] = chain_run.run_id;
msg[2] = chain.target_sweeps+chain.target_thermalization-chain.sweeps[chain_run.node_to_pos[target]];
msg[2] = chain.target_sweeps + chain.target_thermalization -
chain.sweeps[chain_run.node_to_pos[target]];
} else {
// this will prompt the slave to quit
num_active_ranks_--;
}
MPI_Send(&msg, sizeof(msg) / sizeof(msg[0]), MPI_INT, node_section*chain_len_+target+1, T_NEW_JOB,
MPI_COMM_WORLD);
MPI_Send(&msg, sizeof(msg) / sizeof(msg[0]), MPI_INT,
node_section * chain_len_ + target + 1, T_NEW_JOB, MPI_COMM_WORLD);
}
node_to_chain_run_[node_section] = chain_run_id;
return chain_run_id;
......@@ -253,82 +262,108 @@ void runner_pt_master::react() {
int node_status;
MPI_Status stat;
MPI_Recv(&node_status, 1, MPI_INT, MPI_ANY_SOURCE, T_STATUS, MPI_COMM_WORLD, &stat);
int node = stat.MPI_SOURCE-1;
int node = stat.MPI_SOURCE - 1;
if(node_status == S_BUSY) {
int msg[1];
MPI_Recv(msg, sizeof(msg) / sizeof(msg[0]), MPI_INT, node+1, T_STATUS, MPI_COMM_WORLD, &stat);
MPI_Recv(msg, sizeof(msg) / sizeof(msg[0]), MPI_INT, node + 1, T_STATUS, MPI_COMM_WORLD,
&stat);
int completed_sweeps = msg[0];
int chain_run_id = node_to_chain_run_[node/chain_len_];
int chain_run_id = node_to_chain_run_[node / chain_len_];
auto &chain_run = pt_chain_runs_[chain_run_id];
auto &chain = pt_chains_[chain_run.id];
chain.sweeps[chain_run.node_to_pos[node%chain_len_]] += completed_sweeps;
chain.sweeps[chain_run.node_to_pos[node % chain_len_]] += completed_sweeps;
if(chain.is_done()) {
chain_run.done[node%chain_len_] = true;
chain_run.done[node % chain_len_] = true;
if(std::all_of(chain_run.done.begin(), chain_run.done.end(), [](bool x) {return x;})) {
if(std::all_of(chain_run.done.begin(), chain_run.done.end(),
[](bool x) { return x; })) {
chain.scheduled_runs--;
int action = A_NEW_JOB;
if(chain.scheduled_runs > 0) {
job_.log(fmt::format("chain {} has enough sweeps. Waiting for {} busy rank sets.", chain.id, chain.scheduled_runs));
job_.log(
fmt::format("chain {} has enough sweeps. Waiting for {} busy rank sets.",
chain.id, chain.scheduled_runs));
} else {
job_.log(fmt::format("chain {} rank {} is done. Merging.", chain.id, node+1));
job_.log(fmt::format("chain {} rank {} is done. Merging.", chain.id, node + 1));
action = A_PROCESS_DATA_NEW_JOB;
checkpoint_write();
}
for(int target = 0; target < chain_len_; target++) {
send_action(action, node/chain_len_*chain_len_+target+1);
send_action(action, node / chain_len_ * chain_len_ + target + 1);
}
std::fill(chain_run.done.begin(), chain_run.done.end(), -1);
assign_new_chain(node/chain_len_);
assign_new_chain(node / chain_len_);
}
} else {
send_action(A_CONTINUE, node+1);
send_action(A_CONTINUE, node + 1);
}
} else if(node_status == S_READY_FOR_GLOBAL) {
int chain_run_id = node_to_chain_run_[node/chain_len_];
int chain_run_id = node_to_chain_run_[node / chain_len_];
auto &chain_run = pt_chain_runs_[chain_run_id];
auto &chain = pt_chains_[chain_run.id];
if(chain.is_done()) {
int exit = GA_DONE;
MPI_Send(&exit, 1, MPI_INT, node+1, T_GLOBAL, MPI_COMM_WORLD);
MPI_Send(&exit, 1, MPI_INT, node + 1, T_GLOBAL, MPI_COMM_WORLD);
// cancel all pending ranks as gracefully as possible
// TODO tidy this up
int node_section = node / chain_len_;
for(int target = 0; target < chain_len_; target++) {
if(chain_run.weight_ratios[chain_run.node_to_pos[target]] < 0) {
continue;
}
int new_task_id = chain.task_ids[chain_run.node_to_pos[target]];
double new_param = chain_run.params[chain_run.node_to_pos[target]];
MPI_Send(&new_task_id, 1, MPI_INT, node_section * chain_len_ + target + 1,
T_GLOBAL, MPI_COMM_WORLD);
MPI_Send(&new_param, 1, MPI_DOUBLE, node_section * chain_len_ + target + 1,
T_GLOBAL, MPI_COMM_WORLD);
}
std::fill(chain_run.weight_ratios.begin(), chain_run.weight_ratios.end(), -1);
} else {
int pos = chain_run.node_to_pos[node%chain_len_];
int pos = chain_run.node_to_pos[node % chain_len_];
// keep consistent with pt_global_update
int partner_pos = pos + (2*(pos&1)-1)*(2*pt_swap_odd_-1);
int partner_pos = pos + (2 * (pos & 1) - 1) * (2 * pt_swap_odd_ - 1);
if(partner_pos < 0 || partner_pos >= chain_len_) {
int response = GA_SKIP;
MPI_Send(&response, 1, MPI_INT, node+1, T_GLOBAL, MPI_COMM_WORLD);
chain_run.weight_ratios[chain_run.node_to_pos[node%chain_len_]] = 1;
MPI_Send(&response, 1, MPI_INT, node + 1, T_GLOBAL, MPI_COMM_WORLD);
chain_run.weight_ratios[chain_run.node_to_pos[node % chain_len_]] = 1;
} else {
int response = GA_CALC_WEIGHT;
MPI_Send(&response, 1, MPI_INT, node+1, T_GLOBAL, MPI_COMM_WORLD);