Commit bf6aab90 authored by Lukas Weber's avatar Lukas Weber

really make mpirun key optional

parent 1c681dd7
......@@ -71,7 +71,7 @@ def run(jobname, jobconfig, cmd):
print('running on system \'{}\''.format(sysinfo))
if sysinfo == 'local':
mpicmd = '{} -n {} {}'.format(jobconfig['mpirun'], jobconfig['num_cores'], ' '.join(cmd))
mpicmd = '{} -n {} {}'.format(jobconfig.get('mpirun', 'mpirun'), jobconfig['num_cores'], ' '.join(cmd))
print('$ '+mpicmd)
os.system(mpicmd)
else:
......
......@@ -69,7 +69,7 @@ void mc::_do_update() {
}
}
void mc::_pt_update_param(int target_rank, const std::string& param_name, double new_param) {
void mc::_pt_update_param(int target_rank, const std::string &param_name, double new_param) {
measure.mpi_sendrecv(target_rank);
pt_update_param(param_name, new_param);
}
......@@ -82,7 +82,7 @@ void mc::pt_measure_statistics() {
}
}
double mc::_pt_weight_ratio(const std::string& param_name, double new_param) {
double mc::_pt_weight_ratio(const std::string &param_name, double new_param) {
double wr = pt_weight_ratio(param_name, new_param);
return wr;
}
......
......@@ -32,10 +32,10 @@ protected:
virtual void write_output(const std::string &filename);
virtual void do_update() = 0;
virtual void do_measurement() = 0;
virtual void pt_update_param(const std::string& /*param_name*/, double /*new_param*/) {
virtual void pt_update_param(const std::string & /*param_name*/, double /*new_param*/) {
throw std::runtime_error{"running parallel tempering, but pt_update_param not implemented"};
}
virtual double pt_weight_ratio(const std::string& /*param_name*/, double /*new_param*/) {
virtual double pt_weight_ratio(const std::string & /*param_name*/, double /*new_param*/) {
throw std::runtime_error{"running parallel tempering, but pt_weight_ratio not implemented"};
return 1;
}
......@@ -59,8 +59,8 @@ public:
void _do_update();
void _do_measurement();
void _pt_update_param(int target_rank, const std::string& param_name, double new_param);
double _pt_weight_ratio(const std::string& param_name, double new_param);
void _pt_update_param(int target_rank, const std::string &param_name, double new_param);
double _pt_weight_ratio(const std::string &param_name, double new_param);
void pt_measure_statistics();
......
......@@ -13,7 +13,8 @@ bool measurements::observable_name_is_legal(const std::string &obs_name) {
return true;
}
void measurements::register_observable(const std::string &name, size_t bin_size, size_t vector_length) {
void measurements::register_observable(const std::string &name, size_t bin_size,
size_t vector_length) {
if(!observable_name_is_legal(name)) {
throw std::runtime_error(
fmt::format("Illegal observable name '{}': names must not contain / or .", name));
......@@ -54,34 +55,42 @@ void measurements::mpi_sendrecv(int target_rank) {
if(rank < target_rank) {
unsigned long obscount = observables_.size();
MPI_Send(&obscount, 1, MPI_UNSIGNED_LONG, target_rank, 0, MPI_COMM_WORLD);
for(auto& [name, obs] : observables_) {
for(auto &[name, obs] : observables_) {
(void)obs;
int size = name.size()+1;
int size = name.size() + 1;
MPI_Send(&size, 1, MPI_INT, target_rank, 0, MPI_COMM_WORLD);
MPI_Send(name.c_str(), size, MPI_CHAR, target_rank, 0, MPI_COMM_WORLD);
}
} else {
unsigned long obscount;
MPI_Recv(&obscount, 1, MPI_UNSIGNED_LONG, target_rank, 0, MPI_COMM_WORLD, MPI_STATUS_IGNORE);
MPI_Recv(&obscount, 1, MPI_UNSIGNED_LONG, target_rank, 0, MPI_COMM_WORLD,
MPI_STATUS_IGNORE);
if(obscount != observables_.size()) {
throw std::runtime_error{fmt::format("ranks {}&{} have to contain identical sets of registered observables. But they contain different amounts of observables! {} != {}.", target_rank, rank, obscount, observables_.size())};
throw std::runtime_error{fmt::format(
"ranks {}&{} have to contain identical sets of registered observables. But "
"they contain different amounts of observables! {} != {}.",
target_rank, rank, obscount, observables_.size())};
}
for(auto& [name, obs] : observables_) {
for(auto &[name, obs] : observables_) {
(void)obs;
int size;
MPI_Recv(&size, 1, MPI_INT, target_rank, 0, MPI_COMM_WORLD, MPI_STATUS_IGNORE);
std::vector<char> buf(size);
MPI_Recv(buf.data(), size, MPI_CHAR, target_rank, 0, MPI_COMM_WORLD, MPI_STATUS_IGNORE);
MPI_Recv(buf.data(), size, MPI_CHAR, target_rank, 0, MPI_COMM_WORLD,
MPI_STATUS_IGNORE);
if(std::string{buf.data()} != name) {
throw std::runtime_error{fmt::format("ranks {}&{} have to contain identical sets of registered observables. Found '{}' != '{}'.", target_rank, rank, name, std::string{buf.data()})};
throw std::runtime_error{
fmt::format("ranks {}&{} have to contain identical sets of registered "
"observables. Found '{}' != '{}'.",
target_rank, rank, name, std::string{buf.data()})};
}
}
}
mpi_checked_targets_.insert(target_rank);
}
for(auto& [name, obs] : observables_) {
for(auto &[name, obs] : observables_) {
(void)name;
obs.mpi_sendrecv(target_rank);
}
......
......@@ -3,10 +3,10 @@
#include "iodump.h"
#include "observable.h"
#include <map>
#include <set>
#include <string>
#include <valarray>
#include <vector>
#include <set>
namespace loadl {
......@@ -14,7 +14,8 @@ class measurements {
public:
static bool observable_name_is_legal(const std::string &name);
void register_observable(const std::string &name, size_t bin_size = 1, size_t vector_length = 1);
void register_observable(const std::string &name, size_t bin_size = 1,
size_t vector_length = 1);
// use this to add a measurement sample to an observable.
template<class T>
......@@ -30,6 +31,7 @@ public:
// switches the content of the measurement buffers with the target_rank
// both ranks must have the same set of observables!
void mpi_sendrecv(int target_rank);
private:
std::set<int> mpi_checked_targets_;
std::map<std::string, observable> observables_;
......
......@@ -124,7 +124,7 @@ results merge(const std::vector<std::string> &filenames, const std::vector<evala
size_t vector_idx = i % vector_length;
obs.mean[vector_idx] += samples[i];
if(vector_idx == vector_length-1) {
if(vector_idx == vector_length - 1) {
metadata[obs_name].sample_counter++;
}
}
......@@ -134,9 +134,9 @@ results merge(const std::vector<std::string> &filenames, const std::vector<evala
for(auto &[obs_name, obs] : res.observables) {
assert(metadata[obs_name].sample_counter ==
obs.rebinning_bin_count * obs.rebinning_bin_length);
for(auto &mean : obs.mean) {
mean /= metadata[obs_name].sample_counter;
mean /= metadata[obs_name].sample_counter;
}
}
......
......@@ -46,7 +46,7 @@ void observable::measurement_write(const iodump::group &meas_file) {
current_bin_ = 0;
}
void observable::checkpoint_read(const std::string& name, const iodump::group &d) {
void observable::checkpoint_read(const std::string &name, const iodump::group &d) {
name_ = name;
d.read("vector_length", vector_length_);
d.read("bin_length", bin_length_);
......@@ -61,14 +61,16 @@ void observable::mpi_sendrecv(int target_rank) {
MPI_Comm_rank(MPI_COMM_WORLD, &rank);
unsigned long msg[msg_size] = {current_bin_, vector_length_, bin_length_, current_bin_filling_};
MPI_Sendrecv_replace(msg, msg_size, MPI_UNSIGNED_LONG, target_rank, 0, target_rank, 0, MPI_COMM_WORLD, MPI_STATUS_IGNORE);
MPI_Sendrecv_replace(msg, msg_size, MPI_UNSIGNED_LONG, target_rank, 0, target_rank, 0,
MPI_COMM_WORLD, MPI_STATUS_IGNORE);
current_bin_ = msg[0];
vector_length_ = msg[1];
bin_length_ = msg[2];
current_bin_filling_ = msg[3];
std::vector<double> recvbuf((current_bin_+1)*vector_length_);
MPI_Sendrecv(samples_.data(), samples_.size(), MPI_DOUBLE, target_rank, 0, recvbuf.data(), recvbuf.size(), MPI_DOUBLE, target_rank, 0, MPI_COMM_WORLD, MPI_STATUS_IGNORE);
std::vector<double> recvbuf((current_bin_ + 1) * vector_length_);
MPI_Sendrecv(samples_.data(), samples_.size(), MPI_DOUBLE, target_rank, 0, recvbuf.data(),
recvbuf.size(), MPI_DOUBLE, target_rank, 0, MPI_COMM_WORLD, MPI_STATUS_IGNORE);
samples_ = recvbuf;
}
......
......@@ -25,11 +25,12 @@ public:
// This will empty the cache of already completed bins
void measurement_write(const iodump::group &meas_file);
void checkpoint_read(const std::string& name, const iodump::group &dump_file);
void checkpoint_read(const std::string &name, const iodump::group &dump_file);
// switch copy with target rank.
// useful for parallel tempering mode
void mpi_sendrecv(int target_rank);
private:
static const size_t initial_bin_length = 1000;
......
......@@ -119,18 +119,18 @@ std::tuple<double, double> pt_chain::optimize_params(int linreg_len) {
double fnonlinearity = 0;
// in the worst case, f=0 or f=1 for [1,N-2]
int n = params.size();
double fnonlinearity_worst = sqrt((2*n+1./n-3)/6.);
double fnonlinearity_worst = sqrt((2 * n + 1. / n - 3) / 6.);
for(size_t i = 0; i < params.size(); i++) {
if(nup_histogram[i] + ndown_histogram[i] == 0) {
f[i] = 0;
} else {
f[i] = nup_histogram[i] / static_cast<double>(nup_histogram[i] + ndown_histogram[i]);
}
double ideal_f = 1-i/static_cast<double>(params.size());
fnonlinearity += (f[i]-ideal_f)*(f[i]-ideal_f);
double ideal_f = 1 - i / static_cast<double>(params.size());
fnonlinearity += (f[i] - ideal_f) * (f[i] - ideal_f);
}
fnonlinearity = sqrt(fnonlinearity)/fnonlinearity_worst;
fnonlinearity = sqrt(fnonlinearity) / fnonlinearity_worst;
double norm = 0;
for(size_t i = 0; i < params.size() - 1; i++) {
......@@ -142,8 +142,8 @@ std::tuple<double, double> pt_chain::optimize_params(int linreg_len) {
for(auto &v : eta) {
v /= norm;
}
double convergence = 0;
double convergence = 0;
for(size_t i = 1; i < params.size() - 1; i++) {
double target = static_cast<double>(i) / (params.size() - 1);
int etai = 0;
......@@ -155,14 +155,14 @@ std::tuple<double, double> pt_chain::optimize_params(int linreg_len) {
target -= deta;
}
new_params[i] = params[etai] + target / eta[etai];
convergence += (new_params[i]-params[i])*(new_params[i]-params[i]);
convergence += (new_params[i] - params[i]) * (new_params[i] - params[i]);
}
convergence = sqrt(convergence)/(params.size()-2);
convergence = sqrt(convergence) / (params.size() - 2);
for(size_t i = 0; i < params.size(); i++) {
double relaxation_fac = 1;
params[i] = params[i]*(1-relaxation_fac) + relaxation_fac*new_params[i];
params[i] = params[i] * (1 - relaxation_fac) + relaxation_fac * new_params[i];
}
return std::tie(fnonlinearity, convergence);
......@@ -232,18 +232,22 @@ void runner_pt_master::construct_pt_chains() {
int target_sweeps = task.get<int>("sweeps");
if(chain.target_sweeps >= 0 && target_sweeps != chain.target_sweeps) {
throw std::runtime_error{fmt::format(pt_sweep_error, "target_sweeps", chain.target_sweeps, target_sweeps)};
throw std::runtime_error{
fmt::format(pt_sweep_error, "target_sweeps", chain.target_sweeps, target_sweeps)};
}
chain.target_sweeps = target_sweeps;
int target_thermalization = task.get<int>("thermalization");
if(chain.target_thermalization >= 0 &&target_thermalization != chain.target_thermalization) {
throw std::runtime_error{fmt::format(pt_sweep_error, "thermalization", chain.target_thermalization, target_thermalization)};
if(chain.target_thermalization >= 0 &&
target_thermalization != chain.target_thermalization) {
throw std::runtime_error{fmt::format(pt_sweep_error, "thermalization",
chain.target_thermalization,
target_thermalization)};
}
chain.target_thermalization = target_thermalization;
int sweeps_per_global_update = task.get<int>("pt_sweeps_per_global_update");
int sweeps = job_.read_dump_progress(i)/sweeps_per_global_update;
int sweeps = job_.read_dump_progress(i) / sweeps_per_global_update;
if(chain.sweeps >= 0 && sweeps != chain.sweeps) {
throw std::runtime_error{fmt::format(pt_sweep_error, "sweeps", chain.sweeps != sweeps)};
}
......@@ -301,7 +305,6 @@ void runner_pt_master::checkpoint_read() {
int id = std::stoi(name);
pt_chains_.at(id).checkpoint_read(pt_chains.open_group(name));
}
}
}
......@@ -434,25 +437,24 @@ void runner_pt_master::pt_param_optimization(pt_chain &chain, pt_chain_run &chai
if(chain_run.rank_to_pos[rank] == 0) {
chain_run.last_visited[rank] = 1;
}
if(chain_run.rank_to_pos[rank] ==
static_cast<int>(chain_run.rank_to_pos.size()) - 1) {
if(chain_run.rank_to_pos[rank] == static_cast<int>(chain_run.rank_to_pos.size()) - 1) {
chain_run.last_visited[rank] = -1;
}
chain.ndown_histogram[chain_run.rank_to_pos[rank]] +=
chain_run.last_visited[rank] == -1;
chain.nup_histogram[chain_run.rank_to_pos[rank]] +=
chain_run.last_visited[rank] == 1;
chain.ndown_histogram[chain_run.rank_to_pos[rank]] += chain_run.last_visited[rank] == -1;
chain.nup_histogram[chain_run.rank_to_pos[rank]] += chain_run.last_visited[rank] == 1;
}
chain.histogram_entries++;
if(chain.histogram_entries >= chain.entries_before_optimization) {
auto [fnonlinearity, convergence] = chain.optimize_params(job_.jobfile["jobconfig"].get<int>(
"pt_parameter_optimization_linreg_len", 2));
job_.log(fmt::format("chain {}: pt param optimization: entries={}, f nonlinearity={:.2g}, convergence={:.2g}",
chain.id, chain.entries_before_optimization, fnonlinearity, convergence));
chain.entries_before_optimization *= job_.jobfile["jobconfig"].get<double>(
"pt_parameter_optimization_nsamples_growth", 1.5);
auto [fnonlinearity, convergence] = chain.optimize_params(
job_.jobfile["jobconfig"].get<int>("pt_parameter_optimization_linreg_len", 2));
job_.log(
fmt::format("chain {}: pt param optimization: entries={}, f nonlinearity={:.2g}, "
"convergence={:.2g}",
chain.id, chain.entries_before_optimization, fnonlinearity, convergence));
chain.entries_before_optimization *=
job_.jobfile["jobconfig"].get<double>("pt_parameter_optimization_nsamples_growth", 1.5);
checkpoint_write();
chain.clear_histograms();
}
......@@ -535,14 +537,13 @@ void runner_pt_master::react() {
pt_global_update(chain, chain_run);
std::fill(chain_run.weight_ratios.begin(), chain_run.weight_ratios.end(), -1);
for(int target = 0; target < chain_len_; target++) {
int new_task_id = chain.task_ids[chain_run.rank_to_pos[target]];
int partner_rank = rank_section*chain_len_ + chain_run.switch_partners[target] + 1;
int partner_rank = rank_section * chain_len_ + chain_run.switch_partners[target] + 1;
int msg[2] = {new_task_id, partner_rank};
MPI_Send(msg, sizeof(msg)/sizeof(msg[0]), MPI_INT, rank_section * chain_len_ + target + 1, 0,
MPI_COMM_WORLD);
MPI_Send(msg, sizeof(msg) / sizeof(msg[0]), MPI_INT,
rank_section * chain_len_ + target + 1, 0, MPI_COMM_WORLD);
double new_param = chain.params[chain_run.rank_to_pos[target]];
MPI_Send(&new_param, 1, MPI_DOUBLE, rank_section * chain_len_ + target + 1, 0,
MPI_COMM_WORLD);
......@@ -554,7 +555,7 @@ void runner_pt_master::react() {
void runner_pt_master::pt_global_update(pt_chain &chain, pt_chain_run &chain_run) {
int i = 0;
for(auto& p : chain_run.switch_partners) {
for(auto &p : chain_run.switch_partners) {
p = i++;
}
for(int i = chain_run.swap_odd; i < static_cast<int>(chain.task_ids.size()) - 1; i += 2) {
......@@ -576,7 +577,7 @@ void runner_pt_master::pt_global_update(pt_chain &chain, pt_chain_run &chain_run
}
rank++;
}
chain_run.rank_to_pos[rank0] = i+1;
chain_run.rank_to_pos[rank0] = i + 1;
chain_run.rank_to_pos[rank1] = i;
chain_run.switch_partners[rank0] = rank1;
......@@ -683,7 +684,8 @@ void runner_pt_slave::pt_global_update() {
MPI_Recv(&response, 1, MPI_INT, MASTER, 0, MPI_COMM_WORLD, MPI_STATUS_IGNORE);
// job_.log(fmt::format(" * rank {}: ready for global update", rank_));
const std::string& param_name = job_.jobfile["jobconfig"].get<std::string>("parallel_tempering_parameter");
const std::string &param_name =
job_.jobfile["jobconfig"].get<std::string>("parallel_tempering_parameter");
if(response == GA_CALC_WEIGHT) {
double partner_param;
......@@ -699,7 +701,8 @@ void runner_pt_slave::pt_global_update() {
// this may be a long wait
int msg[2];
MPI_Recv(&msg, sizeof(msg)/sizeof(msg[0]), MPI_INT, MASTER, 0, MPI_COMM_WORLD, MPI_STATUS_IGNORE);
MPI_Recv(&msg, sizeof(msg) / sizeof(msg[0]), MPI_INT, MASTER, 0, MPI_COMM_WORLD,
MPI_STATUS_IGNORE);
int new_task_id = msg[0];
int target_rank = msg[1];
......@@ -771,9 +774,7 @@ int runner_pt_slave::what_is_next(int status) {
void runner_pt_slave::checkpoint_write() {
time_last_checkpoint_ = MPI_Wtime();
sys_->_write(job_.rundir(task_id_, run_id_));
if(chain_rank_ == 0) {
job_.log(fmt::format("* rank {}: chain checkpoint {}", rank_, job_.rundir(task_id_, run_id_)));
}
job_.log(fmt::format("* rank {}: checkpoint {}", rank_, job_.rundir(task_id_, run_id_)));
}
void runner_pt_master::send_action(int action, int destination) {
......
......@@ -14,7 +14,7 @@ struct pt_chain {
int target_sweeps{-1};
int target_thermalization{-1};
int scheduled_runs{};
// parameter optimization
std::vector<int> nup_histogram;
std::vector<int> ndown_histogram;
......@@ -37,7 +37,7 @@ public:
int id{};
int run_id{};
bool swap_odd{};
std::vector<int> rank_to_pos;
std::vector<int> last_visited;
std::vector<int> switch_partners;
......
This diff is collapsed.
Markdown is supported
0%
or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment