Aufgrund einer Wartung wird GitLab am 17.08. zwischen 8:30 und 9:00 Uhr kurzzeitig nicht zur Verfügung stehen. / Due to maintenance, GitLab will be temporarily unavailable on 17.08. between 8:30 and 9:00 am.

Commit 4f73681a authored by Lukas Weber's avatar Lukas Weber

remove binsize limitations and fix bugs

parent fc8d34fd
......@@ -20,8 +20,8 @@ void mc::_init() {
// simple profiling support: measure the time spent for sweeps/measurements etc
measure.add_observable("_ll_checkpoint_read_time", 1);
measure.add_observable("_ll_checkpoint_write_time", 1);
measure.add_observable("_ll_measurement_time", pt_mode_ ? pt_sweeps_per_global_update_ : 1000);
measure.add_observable("_ll_sweep_time", pt_mode_ ? pt_sweeps_per_global_update_ : 1000);
measure.add_observable("_ll_measurement_time", 1000);
measure.add_observable("_ll_sweep_time", 1000);
if(pt_mode_) {
if(param.get<bool>("pt_statistics", false)) {
......@@ -69,23 +69,8 @@ void mc::_do_update() {
}
}
void mc::_pt_update_param(const std::string& param_name, double new_param, const std::string &new_dir) {
// take over the bins of the new target dir
{
iodump dump_file = iodump::open_readonly(new_dir + ".dump.h5");
measure.checkpoint_read(dump_file.get_root().open_group("measurements"));
}
auto unclean = measure.is_unclean();
if(unclean) {
throw std::runtime_error(
fmt::format("Unclean observable: {}\nIn parallel tempering mode you have to choose the "
"binsize for all observables so that it is commensurate with "
"pt_sweeps_per_global_update (so that all bins are empty once it happens). "
"If you don’t like this limitation, implement it properly.",
*unclean));
}
void mc::_pt_update_param(int target_rank, const std::string& param_name, double new_param) {
measure.mpi_sendrecv(target_rank);
pt_update_param(param_name, new_param);
}
......@@ -102,20 +87,16 @@ double mc::_pt_weight_ratio(const std::string& param_name, double new_param) {
return wr;
}
void mc::measurements_write(const std::string &dir) {
void mc::_write(const std::string &dir) {
struct timespec tstart, tend;
clock_gettime(CLOCK_MONOTONIC_RAW, &tstart);
// blocks limit scopes of the dump file handles to ensure they are closed at the right time.
{
iodump meas_file = iodump::open_readwrite(dir + ".meas.h5");
auto g = meas_file.get_root();
measure.samples_write(g);
}
}
void mc::_write(const std::string &dir) {
struct timespec tstart, tend;
clock_gettime(CLOCK_MONOTONIC_RAW, &tstart);
measurements_write(dir);
{
iodump dump_file = iodump::create(dir + ".dump.h5.tmp");
......
......@@ -55,13 +55,11 @@ public:
void _write(const std::string &dir);
bool _read(const std::string &dir);
void measurements_write(const std::string &dir);
void _write_output(const std::string &filename);
void _do_update();
void _do_measurement();
void _pt_update_param(const std::string& param_name, double new_param, const std::string &new_dir);
void _pt_update_param(int target_rank, const std::string& param_name, double new_param);
double _pt_weight_ratio(const std::string& param_name, double new_param);
void pt_measure_statistics();
......
#include "measurements.h"
#include <fmt/format.h>
#include <mpi.h>
namespace loadl {
bool measurements::observable_name_is_legal(const std::string &obs_name) {
......@@ -31,7 +31,7 @@ void measurements::checkpoint_write(const iodump::group &dump_file) {
void measurements::checkpoint_read(const iodump::group &dump_file) {
for(const auto &obs_name : dump_file) {
add_observable(obs_name);
observables_.at(obs_name).checkpoint_read(dump_file.open_group(obs_name));
observables_.at(obs_name).checkpoint_read(obs_name, dump_file.open_group(obs_name));
}
}
......@@ -42,12 +42,49 @@ void measurements::samples_write(const iodump::group &meas_file) {
}
}
std::optional<std::string> measurements::is_unclean() const {
for(const auto &obs : observables_) {
if(!obs.second.is_clean()) {
return obs.first;
void measurements::mpi_sendrecv(int target_rank) {
int rank;
MPI_Comm_rank(MPI_COMM_WORLD, &rank);
if(rank == target_rank) {
return;
}
if(mpi_checked_targets_.count(target_rank) == 0) {
if(rank < target_rank) {
unsigned long obscount = observables_.size();
MPI_Send(&obscount, 1, MPI_UNSIGNED_LONG, target_rank, 0, MPI_COMM_WORLD);
for(auto& [name, obs] : observables_) {
(void)obs;
int size = name.size()+1;
MPI_Send(&size, 1, MPI_INT, target_rank, 0, MPI_COMM_WORLD);
MPI_Send(name.c_str(), size, MPI_CHAR, target_rank, 0, MPI_COMM_WORLD);
}
} else {
unsigned long obscount;
MPI_Recv(&obscount, 1, MPI_UNSIGNED_LONG, target_rank, 0, MPI_COMM_WORLD, MPI_STATUS_IGNORE);
if(obscount != observables_.size()) {
throw std::runtime_error{fmt::format("ranks {}&{} have to contain identical sets of registered observables. But they contain different amounts of observables! {} != {}.", target_rank, rank, obscount, observables_.size())};
}
for(auto& [name, obs] : observables_) {
(void)obs;
int size;
MPI_Recv(&size, 1, MPI_INT, target_rank, 0, MPI_COMM_WORLD, MPI_STATUS_IGNORE);
std::vector<char> buf(size);
MPI_Recv(buf.data(), size, MPI_CHAR, target_rank, 0, MPI_COMM_WORLD, MPI_STATUS_IGNORE);
if(std::string{buf.data()} != name) {
throw std::runtime_error{fmt::format("ranks {}&{} have to contain identical sets of registered observables. Found '{}' != '{}'.", target_rank, rank, name, std::string{buf.data()})};
}
}
}
mpi_checked_targets_.insert(target_rank);
}
for(auto& [name, obs] : observables_) {
(void)name;
obs.mpi_sendrecv(target_rank);
}
return std::nullopt;
}
}
......@@ -6,6 +6,7 @@
#include <string>
#include <valarray>
#include <vector>
#include <set>
namespace loadl {
......@@ -26,11 +27,11 @@ public:
// should be opened in read/write mode.
void samples_write(const iodump::group &meas_file);
// returns nullopt if all observables are clean,
// otherwise the name of a non-empty observable
std::optional<std::string> is_unclean() const;
// switches the content of the measurement buffers with the target_rank
// both ranks must have the same set of observables!
void mpi_sendrecv(int target_rank);
private:
std::set<int> mpi_checked_targets_;
std::map<std::string, observable> observables_;
};
......
#include "observable.h"
#include <fmt/format.h>
#include <iostream>
#include <mpi.h>
namespace loadl {
observable::observable(std::string name, size_t bin_length, size_t vector_length)
......@@ -23,7 +22,6 @@ void observable::checkpoint_write(const iodump::group &dump_file) const {
// Another sanity check: the samples_ array should contain one partial bin.
assert(samples_.size() == vector_length_);
dump_file.write("name", name_);
dump_file.write("vector_length", vector_length_);
dump_file.write("bin_length", bin_length_);
dump_file.write("current_bin_filling", current_bin_filling_);
......@@ -48,8 +46,8 @@ void observable::measurement_write(const iodump::group &meas_file) {
current_bin_ = 0;
}
void observable::checkpoint_read(const iodump::group &d) {
d.read("name", name_);
void observable::checkpoint_read(const std::string& name, const iodump::group &d) {
name_ = name;
d.read("vector_length", vector_length_);
d.read("bin_length", bin_length_);
d.read("current_bin_filling", current_bin_filling_);
......@@ -57,11 +55,22 @@ void observable::checkpoint_read(const iodump::group &d) {
current_bin_ = 0;
}
bool observable::is_clean() const {
if(current_bin_filling_ != 0) {
std::cout << current_bin_filling_ << "\n";
}
return current_bin_ == 0 && current_bin_filling_ == 0;
void observable::mpi_sendrecv(int target_rank) {
const int msg_size = 4;
int rank;
MPI_Comm_rank(MPI_COMM_WORLD, &rank);
unsigned long msg[msg_size] = {current_bin_, vector_length_, bin_length_, current_bin_filling_};
MPI_Sendrecv_replace(msg, msg_size, MPI_UNSIGNED_LONG, target_rank, 0, target_rank, 0, MPI_COMM_WORLD, MPI_STATUS_IGNORE);
current_bin_ = msg[0];
vector_length_ = msg[1];
bin_length_ = msg[2];
current_bin_filling_ = msg[3];
std::vector<double> recvbuf((current_bin_+1)*vector_length_);
MPI_Sendrecv(samples_.data(), samples_.size(), MPI_DOUBLE, target_rank, 0, recvbuf.data(), recvbuf.size(), MPI_DOUBLE, target_rank, 0, MPI_COMM_WORLD, MPI_STATUS_IGNORE);
samples_ = recvbuf;
}
}
......@@ -25,11 +25,11 @@ public:
// This will empty the cache of already completed bins
void measurement_write(const iodump::group &meas_file);
void checkpoint_read(const iodump::group &dump_file);
// true if there are no samples in the bin
bool is_clean() const;
void checkpoint_read(const std::string& name, const iodump::group &dump_file);
// switch copy with target rank.
// useful for parallel tempering mode
void mpi_sendrecv(int target_rank);
private:
static const size_t initial_bin_length = 1000;
......
......@@ -22,16 +22,21 @@ enum {
MASTER = 0
};
enum {
TR_CONTINUE,
TR_CHECKPOINT,
TR_TIMEUP,
};
pt_chain_run::pt_chain_run(const pt_chain &chain, int run_id) : id{chain.id}, run_id{run_id} {
rank_to_pos.resize(chain.params.size());
weight_ratios.resize(chain.params.size(), -1);
switch_partners.resize(chain.params.size());
last_visited.resize(chain.params.size());
for(size_t i = 0; i < rank_to_pos.size(); i++) {
rank_to_pos[i] = i;
last_visited[i] = 0;
}
}
......@@ -40,8 +45,13 @@ pt_chain_run pt_chain_run::checkpoint_read(const iodump::group &g) {
g.read("id", run.id);
g.read("run_id", run.run_id);
g.read("rank_to_pos", run.rank_to_pos);
g.read("last_visited", run.last_visited);
uint8_t swap_odd;
g.read("swap_odd", swap_odd);
run.swap_odd = swap_odd;
run.weight_ratios.resize(run.rank_to_pos.size(), -1);
run.switch_partners.resize(run.rank_to_pos.size());
return run;
}
......@@ -50,6 +60,8 @@ void pt_chain_run::checkpoint_write(const iodump::group &g) {
g.write("id", id);
g.write("run_id", run_id);
g.write("rank_to_pos", rank_to_pos);
g.write("last_visited", last_visited);
g.write("swap_odd", static_cast<uint8_t>(swap_odd));
}
void pt_chain::checkpoint_read(const iodump::group &g) {
......@@ -107,7 +119,7 @@ std::tuple<double, double> pt_chain::optimize_params(int linreg_len) {
double fnonlinearity = 0;
// in the worst case, f=0 or f=1 for [1,N-2]
int n = params.size();
double fnonlinearity_worst = sqrt((2*n+1./n-3)/6.)/n;
double fnonlinearity_worst = sqrt((2*n+1./n-3)/6.);
for(size_t i = 0; i < params.size(); i++) {
if(nup_histogram[i] + ndown_histogram[i] == 0) {
f[i] = 0;
......@@ -118,7 +130,7 @@ std::tuple<double, double> pt_chain::optimize_params(int linreg_len) {
double ideal_f = 1-i/static_cast<double>(params.size());
fnonlinearity += (f[i]-ideal_f)*(f[i]-ideal_f);
}
fnonlinearity = sqrt(fnonlinearity)/params.size()/fnonlinearity_worst;
fnonlinearity = sqrt(fnonlinearity)/fnonlinearity_worst;
double norm = 0;
for(size_t i = 0; i < params.size() - 1; i++) {
......@@ -216,29 +228,26 @@ void runner_pt_master::construct_pt_chains() {
const char *pt_sweep_error =
"in parallel tempering mode, sweeps are measured in global updates and need to be the "
"same within each chain";
"same within each chain: {} = {} != {}";
int target_sweeps = task.get<int>("sweeps");
if(chain.target_sweeps < 0) {
chain.target_sweeps = target_sweeps;
} else if(target_sweeps != chain.target_sweeps) {
throw std::runtime_error{pt_sweep_error};
if(chain.target_sweeps >= 0 && target_sweeps != chain.target_sweeps) {
throw std::runtime_error{fmt::format(pt_sweep_error, "target_sweeps", chain.target_sweeps, target_sweeps)};
}
chain.target_sweeps = target_sweeps;
int target_thermalization = task.get<int>("thermalization");
if(chain.target_thermalization < 0) {
chain.target_thermalization = target_thermalization;
} else if(target_thermalization != chain.target_thermalization) {
throw std::runtime_error{pt_sweep_error};
if(chain.target_thermalization >= 0 &&target_thermalization != chain.target_thermalization) {
throw std::runtime_error{fmt::format(pt_sweep_error, "thermalization", chain.target_thermalization, target_thermalization)};
}
chain.target_thermalization = target_thermalization;
int sweeps = job_.read_dump_progress(i);
if(chain.sweeps < 0) {
int sweeps_per_global_update = task.get<int>("pt_sweeps_per_global_update");
chain.sweeps = sweeps / sweeps_per_global_update;
} else if(sweeps != chain.sweeps) {
throw std::runtime_error{pt_sweep_error};
int sweeps_per_global_update = task.get<int>("pt_sweeps_per_global_update");
int sweeps = job_.read_dump_progress(i)/sweeps_per_global_update;
if(chain.sweeps >= 0 && sweeps != chain.sweeps) {
throw std::runtime_error{fmt::format(pt_sweep_error, "sweeps", chain.sweeps != sweeps)};
}
chain.sweeps = sweeps;
}
chain_len_ = -1;
......@@ -276,6 +285,7 @@ void runner_pt_master::checkpoint_read() {
std::string master_dump_name = job_.jobdir() + "/pt_master.dump.h5";
if(file_exists(master_dump_name)) {
job_.log(fmt::format("master reading dump from '{}'", master_dump_name));
iodump dump = iodump::open_readonly(master_dump_name);
auto g = dump.get_root();
......@@ -293,9 +303,6 @@ void runner_pt_master::checkpoint_read() {
}
g.read("current_chain_id", current_chain_id_);
uint8_t pt_swap_odd;
g.read("pt_swap_odd", pt_swap_odd);
pt_swap_odd_ = pt_swap_odd;
}
}
......@@ -334,7 +341,6 @@ void runner_pt_master::checkpoint_write() {
}
g.write("current_chain_id", current_chain_id_);
g.write("pt_swap_odd", static_cast<uint8_t>(pt_swap_odd_));
if(use_param_optimization_) {
write_params_yaml();
......@@ -445,12 +451,13 @@ void runner_pt_master::pt_param_optimization(pt_chain &chain, pt_chain_run &chai
if(chain.histogram_entries >= chain.entries_before_optimization) {
auto [fnonlinearity, convergence] = chain.optimize_params(job_.jobfile["jobconfig"].get<int>(
"pt_parameter_optimization_linreg_len", 2));
chain.clear_histograms();
job_.log(fmt::format("chain {}: pt param optimization: entries={}, f nonlinearity={:.2g}, convergence={:.2g}",
chain.id, chain.entries_before_optimization, fnonlinearity, convergence));
chain.entries_before_optimization *= job_.jobfile["jobconfig"].get<double>(
"pt_parameter_optimization_nsamples_growth", 1.5);
checkpoint_write();
chain.clear_histograms();
}
}
......@@ -504,7 +511,7 @@ void runner_pt_master::react() {
int target_rank = rank_section * chain_len_ + target + 1;
int pos = chain_run.rank_to_pos[target];
// keep consistent with pt_global_update
int partner_pos = pos + (2 * (pos & 1) - 1) * (2 * pt_swap_odd_ - 1);
int partner_pos = pos + (2 * (pos & 1) - 1) * (2 * chain_run.swap_odd - 1);
if(partner_pos < 0 || partner_pos >= chain_len_) {
int response = GA_SKIP;
MPI_Send(&response, 1, MPI_INT, target_rank, 0, MPI_COMM_WORLD);
......@@ -531,12 +538,15 @@ void runner_pt_master::react() {
pt_global_update(chain, chain_run);
std::fill(chain_run.weight_ratios.begin(), chain_run.weight_ratios.end(), -1);
for(int target = 0; target < chain_len_; target++) {
int new_task_id = chain.task_ids[chain_run.rank_to_pos[target]];
double new_param = chain.params[chain_run.rank_to_pos[target]];
MPI_Send(&new_task_id, 1, MPI_INT, rank_section * chain_len_ + target + 1, 0,
int partner_rank = rank_section*chain_len_ + chain_run.switch_partners[target] + 1;
int msg[2] = {new_task_id, partner_rank};
MPI_Send(msg, sizeof(msg)/sizeof(msg[0]), MPI_INT, rank_section * chain_len_ + target + 1, 0,
MPI_COMM_WORLD);
double new_param = chain.params[chain_run.rank_to_pos[target]];
MPI_Send(&new_param, 1, MPI_DOUBLE, rank_section * chain_len_ + target + 1, 0,
MPI_COMM_WORLD);
}
......@@ -546,24 +556,38 @@ void runner_pt_master::react() {
}
void runner_pt_master::pt_global_update(pt_chain &chain, pt_chain_run &chain_run) {
for(int i = pt_swap_odd_; i < static_cast<int>(chain.task_ids.size()) - 1; i += 2) {
int i = 0;
for(auto& p : chain_run.switch_partners) {
p = i++;
}
for(int i = chain_run.swap_odd; i < static_cast<int>(chain.task_ids.size()) - 1; i += 2) {
double w1 = chain_run.weight_ratios[i];
double w2 = chain_run.weight_ratios[i + 1];
double r = rng_->random_double();
if(r < w1 * w2) {
int rank0{};
int rank1{};
int rank = 0;
for(auto &p : chain_run.rank_to_pos) {
if(p == i) {
p = i + 1;
rank0 = rank;
} else if(p == i + 1) {
p = i;
rank1 = rank;
}
rank++;
}
chain_run.rank_to_pos[rank0] = i+1;
chain_run.rank_to_pos[rank1] = i;
chain_run.switch_partners[rank0] = rank1;
chain_run.switch_partners[rank1] = rank0;
}
}
pt_swap_odd_ = !pt_swap_odd_;
chain_run.swap_odd = !chain_run.swap_odd;
}
runner_pt_slave::runner_pt_slave(jobinfo job, mc_factory mccreator)
......@@ -589,6 +613,8 @@ void runner_pt_slave::start() {
int action{};
do {
int timeout{TR_CHECKPOINT};
while(sweeps_since_last_query_ < sweeps_before_communication_) {
sys_->_do_update();
......@@ -600,17 +626,17 @@ void runner_pt_slave::start() {
sys_->pt_measure_statistics();
pt_global_update();
sweeps_since_last_query_++;
}
if(is_checkpoint_time() || time_is_up()) {
break;
timeout = negotiate_timeout();
if(timeout != TR_CONTINUE) {
break;
}
}
}
checkpoint_write();
MPI_Barrier(chain_comm_);
if(time_is_up()) {
if(timeout == TR_TIMEUP) {
send_status(S_TIMEUP);
job_.log(fmt::format("rank {} exits: walltime up (safety interval = {} s)", rank_,
sys_->safe_exit_interval()));
......@@ -628,6 +654,29 @@ void runner_pt_slave::send_status(int status) {
MPI_Send(&status, 1, MPI_INT, MASTER, 0, MPI_COMM_WORLD);
}
int runner_pt_slave::negotiate_timeout() {
double safe_interval = 0, max_safe_interval = 0;
if(sys_ != nullptr) {
safe_interval = sys_->safe_exit_interval();
}
MPI_Reduce(&safe_interval, &max_safe_interval, 1, MPI_DOUBLE, MPI_MAX, 0, chain_comm_);
int result = TR_CONTINUE;
if(chain_rank_ == 0) {
if(MPI_Wtime() - time_last_checkpoint_ > job_.checkpoint_time) {
result = TR_CHECKPOINT;
}
if(MPI_Wtime() - time_start_ > job_.walltime - max_safe_interval) {
result = TR_TIMEUP;
}
}
MPI_Bcast(&result, 1, MPI_INT, 0, chain_comm_);
return result;
}
void runner_pt_slave::pt_global_update() {
if(chain_rank_ == 0) {
send_status(S_READY_FOR_GLOBAL);
......@@ -652,23 +701,19 @@ void runner_pt_slave::pt_global_update() {
// this may be a long wait
double new_param;
int new_task_id;
int msg[2];
MPI_Recv(&msg, sizeof(msg)/sizeof(msg[0]), MPI_INT, MASTER, 0, MPI_COMM_WORLD, MPI_STATUS_IGNORE);
int new_task_id = msg[0];
int target_rank = msg[1];
MPI_Recv(&new_task_id, 1, MPI_INT, MASTER, 0, MPI_COMM_WORLD, MPI_STATUS_IGNORE);
double new_param;
MPI_Recv(&new_param, 1, MPI_DOUBLE, MASTER, 0, MPI_COMM_WORLD, MPI_STATUS_IGNORE);
if(new_task_id != task_id_ || current_param_ != new_param) {
sys_->measurements_write(job_.rundir(task_id_, run_id_));
}
MPI_Barrier(chain_comm_);
if(new_task_id != task_id_ || current_param_ != new_param) {
task_id_ = new_task_id;
sweeps_per_global_update_ = job_.jobfile["tasks"][job_.task_names[task_id_]].get<int>(
"pt_sweeps_per_global_update");
sys_->_pt_update_param(param_name, new_param, job_.rundir(task_id_, run_id_));
sys_->_pt_update_param(target_rank, param_name, new_param);
}
current_param_ = new_param;
}
......@@ -757,15 +802,4 @@ void runner_pt_slave::merge_measurements() {
job_.merge_task(task_id_, evalables);
}
bool runner_pt_slave::is_checkpoint_time() {
return MPI_Wtime() - time_last_checkpoint_ > job_.checkpoint_time;
}
bool runner_pt_slave::time_is_up() {
double safe_interval = 0;
if(sys_ != nullptr) {
safe_interval = sys_->safe_exit_interval();
}
return MPI_Wtime() - time_start_ > job_.walltime - safe_interval;
}
}
......@@ -9,18 +9,18 @@ struct pt_chain {
int id{};
std::vector<int> task_ids;
std::vector<double> params;
std::vector<int> nup_histogram;
std::vector<int> ndown_histogram;
int sweeps{-1};
int target_sweeps{-1};
int target_thermalization{-1};
int scheduled_runs{};
// parameter optimization
std::vector<int> nup_histogram;
std::vector<int> ndown_histogram;
int entries_before_optimization{0};
int histogram_entries{};
int scheduled_runs{};
bool is_done();
void checkpoint_read(const iodump::group &g);
void checkpoint_write(const iodump::group &g);
......@@ -36,12 +36,14 @@ private:
public:
int id{};
int run_id{};
bool swap_odd{};
pt_chain_run(const pt_chain &chain, int run_id);
static pt_chain_run checkpoint_read(const iodump::group &g);
void checkpoint_write(const iodump::group &g);
std::vector<int> rank_to_pos;
std::vector<int> switch_partners;
std::vector<double> weight_ratios;
std::vector<int> last_visited;
......@@ -57,7 +59,6 @@ private:
double time_last_checkpoint_{0};
bool use_param_optimization_{};
bool pt_swap_odd_{};
std::vector<pt_chain> pt_chains_;
std::vector<pt_chain_run> pt_chain_runs_;
int chain_len_;
......@@ -108,8 +109,8 @@ private:
void pt_global_update();
bool is_checkpoint_time();
bool time_is_up();
int negotiate_timeout();
void send_status(int status);
int recv_action();
void checkpoint_write();
......
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment