Commit 253a9209 authored by Lukas Weber's avatar Lukas Weber

first implementation of parallel tempering

parent f6f3e95d
......@@ -266,4 +266,10 @@ iodump::h5_handle::~h5_handle() {
hid_t iodump::h5_handle::operator*() {
return handle_;
}
bool file_exists(const std::string &path) {
struct stat buf;
return stat(path.c_str(), &buf) == 0;
}
}
......@@ -4,6 +4,7 @@
#include <hdf5.h>
#include <string>
#include <vector>
#include <sys/stat.h>
namespace loadl {
......@@ -264,4 +265,8 @@ void iodump::group::read(const std::string &name, T &value) const {
assert(buf.size() == 1);
value = buf.at(0);
}
// utility
bool file_exists(const std::string &path);
}
#include "mc.h"
#include <sys/stat.h>
namespace loadl {
mc::mc(const parser &p) : param{p} {
......@@ -65,6 +63,15 @@ void mc::_do_update() {
}
}
void mc::_pt_update_param(double new_param, const std::string &new_dir) {
// take over the bins of the new target dir
{
iodump dump_file = iodump::create(new_dir + ".dump.h5.tmp");
measure.checkpoint_read(dump_file.get_root().open_group("measurements"));
}
pt_update_param(new_param);
}
void mc::_write(const std::string &dir) {
struct timespec tstart, tend;
clock_gettime(CLOCK_MONOTONIC_RAW, &tstart);
......@@ -106,10 +113,6 @@ double mc::safe_exit_interval() {
return 2*(max_checkpoint_write_time_ + max_sweep_time_ + max_meas_time_) + 2;
}
static bool file_exists(const std::string &path) {
struct stat buf;
return stat(path.c_str(), &buf) == 0;
}
bool mc::_read(const std::string &dir) {
if(!file_exists(dir + ".dump.h5")) {
......
......@@ -31,13 +31,19 @@ protected:
virtual void write_output(const std::string &filename);
virtual void do_update() = 0;
virtual void do_measurement() = 0;
virtual void pt_update_param(double /*new_param*/) {
throw std::runtime_error{"running parallel tempering, but pt_update_param not implemented"};
}
public:
double random01();
int sweep() const;
virtual void register_evalables(std::vector<evalable> &evalables) = 0;
virtual double pt_weight_ratio(double /*new_param*/) {
throw std::runtime_error{"running parallel tempering, but pt_weight_ratio not implemented"};
return 1;
}
// these functions do a little more, like taking care of the
// random number generator state, then call the child class versions.
void _init();
......@@ -48,6 +54,7 @@ public:
void _do_update();
void _do_measurement();
void _pt_update_param(double new_param, const std::string &new_dir);
double safe_exit_interval();
......
......@@ -26,6 +26,7 @@ loadleveller_sources = files([
'runner.cpp',
'runner_single.cpp',
'runner_task.cpp',
'runner_pt.cpp',
])
loadleveller_headers = files([
......@@ -43,6 +44,7 @@ loadleveller_headers = files([
'runner.h',
'runner_single.h',
'runner_task.h',
'runner_pt.h',
])
libloadleveller = library('loadleveller',
......
......@@ -18,12 +18,4 @@ void rng_internal_mersenne::backend_checkpoint_read(const iodump::group &d) {
mtrand_.load(rand_state);
}
double rng_internal_mersenne::random_double() {
return mtrand_.randDblExc(1);
}
int rng_internal_mersenne::random_integer(int bound) {
return mtrand_.randInt(bound - 1);
}
}
......@@ -75,16 +75,20 @@ public:
// based on a dinosaur code in the MersenneTwister.h header
class rng_internal_mersenne {
private:
MTRand mtrand_;
public:
void backend_checkpoint_write(const iodump::group &dump_file);
void backend_checkpoint_read(const iodump::group &dump_file);
void set_seed(uint64_t seed);
double random_double();
int random_integer(int bound);
private:
MTRand mtrand_;
double random_double() {
return mtrand_.randDblExc(1);
}
int random_integer(int bound) {
return mtrand_.randInt(bound - 1);
}
};
// based on the c++ stl implementation
......
......@@ -6,7 +6,7 @@
#include <iomanip>
#include <regex>
#include <sys/stat.h>
#include "runner_pt.h"
namespace loadl {
enum {
......@@ -60,8 +60,12 @@ static int parse_duration(const std::string &str) {
}
}
std::string jobinfo::jobdir() const {
return jobname + ".data";
}
std::string jobinfo::taskdir(int task_id) const {
return fmt::format("{}.data/{}", jobname, task_names.at(task_id));
return fmt::format("{}/{}", jobdir(), task_names.at(task_id));
}
std::string jobinfo::rundir(int task_id, int run_id) const {
......@@ -77,7 +81,7 @@ jobinfo::jobinfo(const std::string &jobfile_name) : jobfile{jobfile_name} {
jobname = jobfile.get<std::string>("jobname");
std::string datadir = fmt::format("{}.data", jobname);
std::string datadir = jobdir();
int rc = mkdir(datadir.c_str(), 0755);
if(rc != 0 && errno != EEXIST) {
throw std::runtime_error{
......@@ -124,6 +128,22 @@ std::vector<std::string> jobinfo::list_run_files(const std::string &taskdir,
return results;
}
int jobinfo::read_dump_progress(int task_id) const {
int sweeps = 0;
try {
for(auto &dump_name : list_run_files(taskdir(task_id), "dump\\.h5")) {
int dump_sweeps = 0;
iodump d = iodump::open_readonly(dump_name);
d.get_root().read("sweeps", dump_sweeps);
sweeps += dump_sweeps;
}
} catch(std::ios_base::failure &e) {
// might happen if the taskdir does not exist
}
return sweeps;
}
void jobinfo::concatenate_results() {
std::ofstream cat_results{fmt::format("{}.results.yml", jobname)};
for(size_t i = 0; i < task_names.size(); i++) {
......@@ -153,6 +173,11 @@ void jobinfo::log(const std::string &message) {
}
int runner_mpi_start(jobinfo job, const mc_factory &mccreator, int argc, char **argv) {
if(job.jobfile["jobconfig"].defined("parallel_tempering_parameter")) {
runner_pt_start(std::move(job), mccreator, argc, argv);
return 0;
}
MPI_Init(&argc, &argv);
int rank;
......@@ -211,7 +236,7 @@ void runner_master::react() {
send_action(A_NEW_JOB, node);
tasks_[current_task_id_].scheduled_runs++;
int msg[3] = {current_task_id_, tasks_[current_task_id_].scheduled_runs,
tasks_[current_task_id_].target_sweeps};
tasks_[current_task_id_].target_sweeps+tasks_[current_task_id_].target_thermalization-tasks_[current_task_id_].sweeps};
MPI_Send(&msg, sizeof(msg) / sizeof(msg[0]), MPI_INT, node, T_NEW_JOB,
MPI_COMM_WORLD);
}
......@@ -246,31 +271,13 @@ void runner_master::send_action(int action, int destination) {
MPI_Send(&action, 1, MPI_INT, destination, T_ACTION, MPI_COMM_WORLD);
}
int runner_master::read_dump_progress(int task_id) {
int sweeps = 0;
try {
for(auto &dump_name : jobinfo::list_run_files(job_.taskdir(task_id), "dump\\.h5")) {
int dump_sweeps = 0;
iodump d = iodump::open_readonly(dump_name);
d.get_root().read("sweeps", dump_sweeps);
sweeps += dump_sweeps;
}
} catch(iodump_exception &e) {
// okay
} catch(std::ios_base::failure &e) {
// might happen if the taskdir does not exist
}
return sweeps;
}
void runner_master::read() {
for(size_t i = 0; i < job_.task_names.size(); i++) {
auto task = job_.jobfile["tasks"][job_.task_names[i]];
int target_sweeps = task.get<int>("sweeps");
int target_thermalization = task.get<int>("thermalization");
int sweeps = read_dump_progress(i);
int sweeps = job_.read_dump_progress(i);
int scheduled_runs = 0;
tasks_.emplace_back(target_sweeps, target_thermalization, sweeps, scheduled_runs);
......@@ -293,7 +300,7 @@ void runner_slave::start() {
if(!sys_->_read(job_.rundir(task_id_, run_id_))) {
sys_->_init();
job_.log(fmt::format("* initialized {}", job_.rundir(task_id_, run_id_)));
checkpointing();
write_checkpoint();
} else {
job_.log(fmt::format("* read {}", job_.rundir(task_id_, run_id_)));
}
......@@ -316,7 +323,7 @@ void runner_slave::start() {
break;
}
}
checkpointing();
write_checkpoint();
if(time_is_up()) {
what_is_next(S_TIMEUP);
......@@ -388,7 +395,7 @@ int runner_slave::recv_action() {
return new_action;
}
void runner_slave::checkpointing() {
void runner_slave::write_checkpoint() {
time_last_checkpoint_ = MPI_Wtime();
sys_->_write(job_.rundir(task_id_, run_id_));
job_.log(fmt::format("* rank {}: checkpoint {}", rank_, job_.rundir(task_id_, run_id_)));
......
......@@ -24,11 +24,13 @@ struct jobinfo {
jobinfo(const std::string &jobfile_name);
std::string jobdir() const;
std::string rundir(int task_id, int run_id) const;
std::string taskdir(int task_id) const;
static std::vector<std::string> list_run_files(const std::string &taskdir,
const std::string &file_ending);
int read_dump_progress(int task_id) const;
void merge_task(int task_id, const std::vector<evalable> &evalables);
void concatenate_results();
void log(const std::string &message);
......@@ -46,7 +48,6 @@ private:
int current_task_id_{-1};
void read();
int read_dump_progress(int task_id);
int get_new_task_id(int old_id);
void react();
......@@ -77,7 +78,7 @@ private:
void end_of_run();
int recv_action();
int what_is_next(int);
void checkpointing();
void write_checkpoint();
void merge_measurements();
public:
......
#include "runner_pt.h"
namespace loadl {
enum {
S_IDLE,
S_BUSY,
S_READY_FOR_GLOBAL,
S_TIMEUP,
A_CONTINUE,
A_EXIT,
A_NEW_JOB,
A_PROCESS_DATA_NEW_JOB,
// global update status response
GA_DONE,
GA_SKIP,
GA_CALC_WEIGHT,
T_ACTION,
T_GLOBAL,
T_NEW_JOB,
T_STATUS,
MASTER=0
};
pt_chain_run::pt_chain_run(const pt_chain& chain, int run_id)
: id{chain.id}, run_id{run_id}, params{chain.start_params} {
node_to_pos.resize(params.size());
done.resize(params.size());
weight_ratios.resize(params.size());
for(size_t i = 0; i < node_to_pos.size(); i++) {
node_to_pos[i] = i;
}
}
pt_chain_run pt_chain_run::checkpoint_read(const iodump::group& g) {
pt_chain_run run;
g.read("id", run.id);
g.read("run_id", run.run_id);
g.read("params", run.params);
g.read("node_to_pos", run.node_to_pos);
run.weight_ratios.resize(run.params.size());
run.done.resize(run.params.size());
return run;
}
void pt_chain_run::checkpoint_write(const iodump::group& g) {
g.write("id", id);
g.write("run_id", run_id);
g.write("node_to_pos", node_to_pos);
g.write("params", params);
}
bool pt_chain::is_done() {
return std::all_of(sweeps.begin(), sweeps.end(), [this](double s) { return s >= target_sweeps + target_thermalization;});
}
int runner_pt_start(jobinfo job, const mc_factory &mccreator, int argc, char **argv) {
MPI_Init(&argc, &argv);
int rank;
MPI_Comm_rank(MPI_COMM_WORLD, &rank);
if(rank == 0) {
runner_pt_master r{std::move(job)};
r.start();
} else {
runner_pt_slave r{std::move(job), mccreator};
r.start();
}
MPI_Finalize();
return 0;
}
runner_pt_master::runner_pt_master(jobinfo job) : job_{std::move(job)} {
rng_ = std::make_unique<random_number_generator>();
}
void runner_pt_master::construct_pt_chains() {
std::string pt_param = job_.jobfile["jobconfig"].get<std::string>("parallel_tempering_parameter");
for(size_t i = 0; i < job_.task_names.size(); i++) {
auto task = job_.jobfile["tasks"][job_.task_names[i]];
auto [chain_id, chain_pos] = task.get<std::pair<int, int>>("pt_chain");
if(chain_id < 0 || chain_pos < 0) {
throw std::runtime_error{"parallel tempering pt_chain indices must be nonnegative"};
}
if(chain_id >= static_cast<int>(pt_chains_.size())) {
pt_chains_.resize(chain_id+1);
}
auto &chain = pt_chains_.at(chain_id);
chain.id = chain_id;
if(chain_pos >= static_cast<int>(chain.task_ids.size())) {
chain.task_ids.resize(chain_pos+1, -1);
chain.start_params.resize(chain_pos+1);
chain.sweeps.resize(chain_pos+1);
}
if(chain.task_ids.at(chain_pos) != -1) {
throw std::runtime_error{"parallel tempering pt_chain map not unique"};
}
chain.start_params.at(chain_pos) = task.get<double>(pt_param);
chain.task_ids.at(chain_pos) = i;
chain.target_sweeps = std::max(chain.target_sweeps, task.get<int>("sweeps"));
chain.target_thermalization = std::max(chain.target_thermalization, task.get<int>("thermalization"));
chain.sweeps[chain_pos] = job_.read_dump_progress(chain_pos);
}
chain_len_ = -1;
for(auto &c : pt_chains_) {
if(c.id == -1) {
throw std::runtime_error{"parallel tempering pt_chain map contains gaps"};
}
if(chain_len_ != -1 && chain_len_ != static_cast<int>(c.task_ids.size())) {
throw std::runtime_error{"parallel tempering pt_chains do not have the same length"};
}
chain_len_ = c.task_ids.size();
}
if(chain_len_ == -1) {
throw std::runtime_error{"parallel tempering pt_chain mapping missing. You need to specify 'pt_chain: [chain_id, chain_position]' for every task in the job."};
}
if((num_active_ranks_-1) % chain_len_ != 0) {
throw std::runtime_error{"parallel tempering: number of ranks should be of the form chain_length*n+1 for best efficiency"};
}
}
void runner_pt_master::checkpoint_read() {
construct_pt_chains();
std::string master_dump_name = job_.jobdir() + "/pt_master.dump.h5";
if(file_exists(master_dump_name)) {
iodump dump = iodump::open_readonly(master_dump_name);
auto g = dump.get_root();
rng_->checkpoint_read(g.open_group("random_number_generator"));
auto pt_chain_runs = g.open_group("pt_chain_runs");
for(std::string name : pt_chain_runs) {
pt_chain_runs_.emplace_back(pt_chain_run::checkpoint_read(pt_chain_runs.open_group(name)));
}
g.read("current_chain_id", current_chain_id_);
uint8_t pt_swap_odd;
g.read("pt_swap_odd", pt_swap_odd);
pt_swap_odd_ = pt_swap_odd;
}
}
void runner_pt_master::checkpoint_write() {
std::string master_dump_name = job_.jobdir() + "/pt_master.dump.h5";
job_.log(fmt::format("master: checkpoint {}", master_dump_name));
iodump dump = iodump::create(master_dump_name);
auto g = dump.get_root();
rng_->checkpoint_write(g.open_group("random_number_generator"));
auto pt_chain_runs = g.open_group("pt_chain_runs");
for(auto &c : pt_chain_runs_) {
c.checkpoint_write(pt_chain_runs.open_group(fmt::format("chain{:04d}_run{:04d}", c.id, c.run_id)));
}
g.write("current_chain_id", current_chain_id_);
g.write("pt_swap_odd", static_cast<uint8_t>(pt_swap_odd_));
}
void runner_pt_master::start() {
MPI_Comm_size(MPI_COMM_WORLD, &num_active_ranks_);
job_.log(fmt::format("Starting job '{}'", job_.jobname));
checkpoint_read();
for(int node_section = 0; node_section < (num_active_ranks_-1)/chain_len_; node_section++) {
assign_new_chain(node_section);
}
time_last_checkpoint_ = MPI_Wtime();
while(num_active_ranks_ > 1) {
react();
if(MPI_Wtime() - time_last_checkpoint_ > job_.checkpoint_time) {
time_last_checkpoint_ = MPI_Wtime();
checkpoint_write();
}
}
}
int runner_pt_master::schedule_chain_run() {
int old_id = current_chain_id_;
int nchains = pt_chains_.size();
for(int i = 1; i <= nchains; i++) {
if(!pt_chains_[(old_id + i) % nchains].is_done()) {
int new_chain_id = (old_id + i) % nchains;
auto &chain = pt_chains_[new_chain_id];
chain.scheduled_runs++;
int idx = 0;
for(auto &run : pt_chain_runs_) {
if(run.id == chain.id && run.run_id == chain.scheduled_runs) {
return idx;
}
idx++;
}
pt_chain_runs_.emplace_back(chain, chain.scheduled_runs);
return pt_chain_runs_.size()-1;
}
}
// everything done!
return -1;
}
int runner_pt_master::assign_new_chain(int node_section) {
int chain_run_id = schedule_chain_run();
for(int target = 0; target < chain_len_; target++) {
int msg[3] = {-1,0,0};
if(chain_run_id >= 0) {
auto &chain_run = pt_chain_runs_[chain_run_id];
auto &chain = pt_chains_[chain_run.id];
msg[0] = chain.task_ids[target];
msg[1] = chain_run.run_id;
msg[2] = chain.target_sweeps+chain.target_thermalization-chain.sweeps[chain_run.node_to_pos[target]];
} else {
// this will prompt the slave to quit
num_active_ranks_--;
}
MPI_Send(&msg, sizeof(msg) / sizeof(msg[0]), MPI_INT, node_section*chain_len_+target+1, T_NEW_JOB,
MPI_COMM_WORLD);
}
node_to_chain_run_[node_section] = chain_run_id;
return chain_run_id;
}
void runner_pt_master::react() {
int node_status;
MPI_Status stat;
MPI_Recv(&node_status, 1, MPI_INT, MPI_ANY_SOURCE, T_STATUS, MPI_COMM_WORLD, &stat);
int node = stat.MPI_SOURCE-1;
if(node_status == S_BUSY) {
int msg[1];
MPI_Recv(msg, sizeof(msg) / sizeof(msg[0]), MPI_INT, node+1, T_STATUS, MPI_COMM_WORLD, &stat);
int completed_sweeps = msg[0];
int chain_run_id = node_to_chain_run_[node/chain_len_];
auto &chain_run = pt_chain_runs_[chain_run_id];
auto &chain = pt_chains_[chain_run.id];
chain.sweeps[chain_run.node_to_pos[node%chain_len_]] += completed_sweeps;
if(chain.is_done()) {
chain_run.done[node%chain_len_] = true;
if(std::all_of(chain_run.done.begin(), chain_run.done.end(), [](bool x) {return x;})) {
chain.scheduled_runs--;
int action = A_NEW_JOB;
if(chain.scheduled_runs > 0) {
job_.log(fmt::format("chain {} has enough sweeps. Waiting for {} busy rank sets.", chain.id, chain.scheduled_runs));
} else {
job_.log(fmt::format("chain {} rank {} is done. Merging.", chain.id, node+1));
action = A_PROCESS_DATA_NEW_JOB;
checkpoint_write();
}
for(int target = 0; target < chain_len_; target++) {
send_action(action, node/chain_len_*chain_len_+target+1);
}
std::fill(chain_run.done.begin(), chain_run.done.end(), -1);
assign_new_chain(node/chain_len_);
}
} else {
send_action(A_CONTINUE, node+1);
}
} else if(node_status == S_READY_FOR_GLOBAL) {
int chain_run_id = node_to_chain_run_[node/chain_len_];
auto &chain_run = pt_chain_runs_[chain_run_id];
auto &chain = pt_chains_[chain_run.id];
if(chain.is_done()) {
int exit = GA_DONE;
MPI_Send(&exit, 1, MPI_INT, node+1, T_GLOBAL, MPI_COMM_WORLD);
} else {
int pos = chain_run.node_to_pos[node%chain_len_];
// keep consistent with pt_global_update
int partner_pos = pos + (2*(pos&1)-1)*(2*pt_swap_odd_-1);
if(partner_pos < 0 || partner_pos >= chain_len_) {
int response = GA_SKIP;
MPI_Send(&response, 1, MPI_INT, node+1, T_GLOBAL, MPI_COMM_WORLD);
chain_run.weight_ratios[chain_run.node_to_pos[node%chain_len_]] = 1;
} else {
int response = GA_CALC_WEIGHT;
MPI_Send(&response, 1, MPI_INT, node+1, T_GLOBAL, MPI_COMM_WORLD);
double partner_param = chain_run.params[partner_pos];
MPI_Send(&partner_param, 1, MPI_DOUBLE, node+1, T_GLOBAL, MPI_COMM_WORLD);
double weight;
MPI_Recv(&weight, 1, MPI_DOUBLE, node+1, T_GLOBAL, MPI_COMM_WORLD, &stat);
chain_run.weight_ratios[chain_run.node_to_pos[node%chain_len_]] = weight;
}
bool all_ready = std::all_of(chain_run.weight_ratios.begin(), chain_run.weight_ratios.end(), [](double w) { return w > 0; });
if(all_ready) {
pt_global_update(chain, chain_run);
std::fill(chain_run.weight_ratios.begin(), chain_run.weight_ratios.end(), -1);
int node_section = node/chain_len_;
for(int target = 0; target < chain_len_; target++) {
int new_task_id = chain.task_ids[chain_run.node_to_pos[target]];
double new_param = chain_run.params[chain_run.node_to_pos[target]];
MPI_Send(&new_task_id, 1, MPI_INT, node_section*chain_len_+target+1, T_GLOBAL, MPI_COMM_WORLD);