Commit ba0bc95b authored by Lukas Weber's avatar Lukas Weber
Browse files

fix and formatting

parent 46e64651
#include "jobinfo.h"
#include "merger.h"
#include <ctime>
#include <dirent.h>
#include <fstream>
#include <iomanip>
#include <iostream>
#include <fstream>
#include <regex>
#include <dirent.h>
#include "merger.h"
namespace loadl {
......
#pragma once
#include <vector>
#include <string>
#include "parser.h"
#include "iodump.h"
#include "evalable.h"
#include "iodump.h"
#include "parser.h"
#include <string>
#include <vector>
namespace loadl {
......
......@@ -28,13 +28,13 @@ void mc::_init() {
measure.add_observable("_ll_pt_rank", 1);
}
}
if(param.defined("seed")) {
rng.reset(new random_number_generator(param.get<uint64_t>("seed")));
} else {
rng.reset(new random_number_generator());
}
init();
}
......@@ -78,7 +78,12 @@ void mc::_pt_update_param(double new_param, const std::string &new_dir) {
auto unclean = measure.is_unclean();
if(unclean) {
throw std::runtime_error(fmt::format("Unclean observable: {}\nIn parallel tempering mode you have to choose the binsize for all observables so that it is commensurate with pt_sweeps_per_global_update (so that all bins are empty once it happens). If you don’t like this limitation, implement it properly.", *unclean));
throw std::runtime_error(
fmt::format("Unclean observable: {}\nIn parallel tempering mode you have to choose the "
"binsize for all observables so that it is commensurate with "
"pt_sweeps_per_global_update (so that all bins are empty once it happens). "
"If you don’t like this limitation, implement it properly.",
*unclean));
}
pt_update_param(new_param);
......@@ -143,7 +148,6 @@ double mc::safe_exit_interval() {
return 2 * (max_checkpoint_write_time_ + max_sweep_time_ + max_meas_time_) + 2;
}
bool mc::_read(const std::string &dir) {
if(!file_exists(dir + ".dump.h5")) {
return false;
......@@ -181,7 +185,7 @@ bool mc::is_thermalized() {
if(pt_mode_ && pt_sweeps_per_global_update_ > 0) {
sweep /= pt_sweeps_per_global_update_;
}
return sweep >= therm_;
}
}
......@@ -27,7 +27,7 @@ public:
void samples_write(const iodump::group &meas_file);
// returns nullopt if all observables are clean,
// otherwise the name of a non-empty observable
// otherwise the name of a non-empty observable
std::optional<std::string> is_unclean() const;
private:
......
#include "merger.h"
#include "iodump.h"
#include "evalable.h"
#include "iodump.h"
#include "mc.h"
#include "measurements.h"
......@@ -107,7 +107,7 @@ results merge(const std::vector<std::string> &filenames, const std::vector<evala
for(auto &filename : filenames) {
iodump meas_file = iodump::open_readonly(filename);
auto g = meas_file.get_root();
for(auto & [obs_name, obs] : res.observables) {
for(auto &[obs_name, obs] : res.observables) {
std::vector<double> samples;
obs.name = obs_name;
......@@ -143,7 +143,7 @@ results merge(const std::vector<std::string> &filenames, const std::vector<evala
for(auto &filename : filenames) {
iodump meas_file = iodump::open_readonly(filename);
auto g = meas_file.get_root();
for(auto & [obs_name, obs] : res.observables) {
for(auto &[obs_name, obs] : res.observables) {
std::vector<double> samples;
auto &obs_meta = metadata.at(obs_name);
......
......@@ -13,7 +13,7 @@ void results::write_yaml(const std::string &filename, const std::string &taskdir
out << YAML::Key << "task" << YAML::Value << taskdir;
out << YAML::Key << "parameters" << YAML::Value << params;
out << YAML::Key << "results" << YAML::Value << YAML::BeginMap;
for(auto & [obs_name, obs] : observables) {
for(auto &[obs_name, obs] : observables) {
out << YAML::Key << obs_name;
if(obs.internal_bin_length == 0) {
out << YAML::Comment("evalable");
......
#include "runner.h"
#include "merger.h"
#include "iodump.h"
#include "merger.h"
#include "runner_pt.h"
#include <fmt/format.h>
namespace loadl {
......
......@@ -5,11 +5,11 @@
#include <ostream>
#include <vector>
#include "jobinfo.h"
#include "mc.h"
#include "measurements.h"
#include "parser.h"
#include "runner_task.h"
#include "jobinfo.h"
namespace loadl {
......
......@@ -22,11 +22,10 @@ enum {
MASTER = 0
};
pt_chain_run::pt_chain_run(const pt_chain &chain, int run_id)
: id{chain.id}, run_id{run_id} {
pt_chain_run::pt_chain_run(const pt_chain &chain, int run_id) : id{chain.id}, run_id{run_id} {
rank_to_pos.resize(chain.params.size());
weight_ratios.resize(chain.params.size(), -1);
last_visited.resize(chain.params.size());
for(size_t i = 0; i < rank_to_pos.size(); i++) {
......@@ -72,67 +71,68 @@ void pt_chain::checkpoint_write(const iodump::group &g) {
void pt_chain::clear_histograms() {
std::fill(nup_histogram.begin(), nup_histogram.end(), 0);
std::fill(ndown_histogram.begin(), ndown_histogram.end(), 0);
histogram_entries=0;
histogram_entries = 0;
}
static double linear_regression(const std::vector<double>& x, const std::vector<double>& y, int i, int range) {
int lower = std::max(0, i-range+1);
int upper = std::min(static_cast<int>(y.size()-1), i+range);
static double linear_regression(const std::vector<double> &x, const std::vector<double> &y, int i,
int range) {
int lower = std::max(0, i - range + 1);
int upper = std::min(static_cast<int>(y.size() - 1), i + range);
double sxy = 0;
double sx = 0, sy = 0;
double sx2 = 0;
for(int j = lower; j <= upper; j++) {
sxy += x[j]*y[j];
sxy += x[j] * y[j];
sx += x[j];
sy += y[j];
sx2 += x[j]*x[j];
sx2 += x[j] * x[j];
}
int n = upper-lower+1;
return (sxy-sx*sy/n)/(sx2-sx*sx/n);
int n = upper - lower + 1;
return (sxy - sx * sy / n) / (sx2 - sx * sx / n);
}
void pt_chain::optimize_params() {
void pt_chain::optimize_params(int linreg_len) {
std::vector<double> eta(params.size());
std::vector<double> f(params.size());
std::vector<double> new_params(params.size());
assert(params.size() > 1);
new_params[0] = params[0];
new_params[params.size()-1] = params[params.size()-1];
new_params[params.size() - 1] = params[params.size() - 1];
for(size_t i = 0; i < params.size(); i++) {
if(nup_histogram[i] + ndown_histogram[i] == 0) {
f[i] = 0;
} else {
f[i] = nup_histogram[i]/static_cast<double>(nup_histogram[i]+ndown_histogram[i]);
f[i] = nup_histogram[i] / static_cast<double>(nup_histogram[i] + ndown_histogram[i]);
}
}
double norm = 0;
for(size_t i = 0; i < params.size()-1; i++) {
double dfdp = linear_regression(params, f, i, job_.jobfile["jobconfig"].get<int>("pt_parameter_optimization_regression_len",2));
double dp = params[i+1] - params[i];
eta[i] = sqrt(std::max(0.01,-dfdp/dp));
norm += eta[i]*dp;
for(size_t i = 0; i < params.size() - 1; i++) {
double dfdp = linear_regression(params, f, i, linreg_len);
double dp = params[i + 1] - params[i];
eta[i] = sqrt(std::max(0.01, -dfdp / dp));
norm += eta[i] * dp;
}
for(auto &v : eta) {
v /= norm;
}
for(size_t i = 1; i < params.size()-1; i++) {
double target = static_cast<double>(i)/(params.size()-1);
for(size_t i = 1; i < params.size() - 1; i++) {
double target = static_cast<double>(i) / (params.size() - 1);
int etai = 0;
for(etai = 0; etai < static_cast<int>(params.size())-1; etai++) {
double deta = eta[etai]*(params[etai+1]-params[etai]);
for(etai = 0; etai < static_cast<int>(params.size()) - 1; etai++) {
double deta = eta[etai] * (params[etai + 1] - params[etai]);
if(deta > target) {
break;
}
target -= deta;
}
new_params[i] = params[etai] + target/eta[etai];
new_params[i] = params[etai] + target / eta[etai];
}
params = new_params;
}
......@@ -171,7 +171,7 @@ void runner_pt_master::construct_pt_chains() {
for(size_t i = 0; i < job_.task_names.size(); i++) {
auto task = job_.jobfile["tasks"][job_.task_names[i]];
auto[chain_id, chain_pos] = task.get<std::pair<int, int>>("pt_chain");
auto [chain_id, chain_pos] = task.get<std::pair<int, int>>("pt_chain");
if(chain_id < 0 || chain_pos < 0) {
throw std::runtime_error{"parallel tempering pt_chain indices must be nonnegative"};
}
......@@ -195,32 +195,33 @@ void runner_pt_master::construct_pt_chains() {
chain.params.at(chain_pos) = task.get<double>(pt_param);
chain.task_ids.at(chain_pos) = i;
const char *pt_sweep_error =
"in parallel tempering mode, sweeps are measured in global updates and need to be the "
"same within each chain";
const char *pt_sweep_error = "in parallel tempering mode, sweeps are measured in global updates and need to be the same within each chain";
int target_sweeps = task.get<int>("sweeps");
if(chain.target_sweeps < 0) {
chain.target_sweeps = target_sweeps;
} else if(target_sweeps != chain.target_sweeps) {
throw std::runtime_error{pt_sweep_error};
}
int target_thermalization = task.get<int>("thermalization");
if(chain.target_thermalization < 0) {
chain.target_thermalization = target_thermalization;
} else if(target_thermalization != chain.target_thermalization) {
throw std::runtime_error{pt_sweep_error};
}
int sweeps = job_.read_dump_progress(i);
if(chain.sweeps < 0) {
int sweeps_per_global_update = task.get<int>("pt_sweeps_per_global_update");
chain.sweeps = sweeps/sweeps_per_global_update;
chain.sweeps = sweeps / sweeps_per_global_update;
} else if(sweeps != chain.sweeps) {
throw std::runtime_error{pt_sweep_error};
}
}
chain_len_ = -1;
for(auto &c : pt_chains_) {
if(c.id == -1) {
......@@ -235,7 +236,8 @@ void runner_pt_master::construct_pt_chains() {
c.nup_histogram.resize(chain_len_);
c.ndown_histogram.resize(chain_len_);
c.entries_before_optimization = job_.jobfile["jobconfig"].get<int>("pt_parameter_optimization_nsamples_initial", 500);
c.entries_before_optimization =
job_.jobfile["jobconfig"].get<int>("pt_parameter_optimization_nsamples_initial", 500);
}
if(chain_len_ == -1) {
throw std::runtime_error{
......@@ -326,20 +328,21 @@ void runner_pt_master::start() {
job_.log(fmt::format("starting job '{}' in parallel tempering mode", job_.jobname));
checkpoint_read();
use_param_optimization_ = job_.jobfile["jobconfig"].get<bool>("pt_parameter_optimization", false);
use_param_optimization_ =
job_.jobfile["jobconfig"].get<bool>("pt_parameter_optimization", false);
if(use_param_optimization_) {
job_.log("using feedback parameter optimization");
}
std::vector<int> group_idx(num_active_ranks_);
for(int i = 1; i < num_active_ranks_; i++) {
group_idx[i] = (i-1)/chain_len_;
group_idx[i] = (i - 1) / chain_len_;
}
MPI_Scatter(group_idx.data(), 1, MPI_INT, group_idx.data(), 1, MPI_INT, MASTER, MPI_COMM_WORLD);
MPI_Comm tmp;
MPI_Comm_split(MPI_COMM_WORLD, MPI_UNDEFINED, 0, &tmp);
for(int rank_section = 0; rank_section < (num_active_ranks_ - 1) / chain_len_; rank_section++) {
assign_new_chain(rank_section);
}
......@@ -411,8 +414,7 @@ void runner_pt_master::react() {
int rank = stat.MPI_SOURCE - 1;
if(rank_status == S_BUSY) {
int msg[1];
MPI_Recv(msg, sizeof(msg) / sizeof(msg[0]), MPI_INT, rank + 1, 0, MPI_COMM_WORLD,
&stat);
MPI_Recv(msg, sizeof(msg) / sizeof(msg[0]), MPI_INT, rank + 1, 0, MPI_COMM_WORLD, &stat);
int completed_sweeps = msg[0];
int chain_run_id = rank_to_chain_run_[rank / chain_len_];
......@@ -425,9 +427,8 @@ void runner_pt_master::react() {
int action = A_NEW_JOB;
if(chain.scheduled_runs > 0) {
job_.log(
fmt::format("chain {} has enough sweeps. Waiting for {} busy rank sets.",
chain.id, chain.scheduled_runs));
job_.log(fmt::format("chain {} has enough sweeps. Waiting for {} busy rank sets.",
chain.id, chain.scheduled_runs));
} else {
job_.log(fmt::format("chain {} rank {} is done. Merging.", chain.id, rank + 1));
action = A_PROCESS_DATA_NEW_JOB;
......@@ -447,30 +448,37 @@ void runner_pt_master::react() {
auto &chain_run = pt_chain_runs_[chain_run_id];
auto &chain = pt_chains_[chain_run.id];
int rank_section = rank / chain_len_;
if(use_param_optimization_) {
for(size_t rank = 0; rank < chain_run.rank_to_pos.size(); rank++) {
if(chain_run.rank_to_pos[rank] == 0) {
chain_run.last_visited[rank] = 1;
}
if(chain_run.rank_to_pos[rank] == static_cast<int>(chain_run.rank_to_pos.size())-1) {
if(chain_run.rank_to_pos[rank] ==
static_cast<int>(chain_run.rank_to_pos.size()) - 1) {
chain_run.last_visited[rank] = -1;
}
chain.ndown_histogram[chain_run.rank_to_pos[rank]] += chain_run.last_visited[rank] == -1;
chain.nup_histogram[chain_run.rank_to_pos[rank]] += chain_run.last_visited[rank] == 1;
chain.ndown_histogram[chain_run.rank_to_pos[rank]] +=
chain_run.last_visited[rank] == -1;
chain.nup_histogram[chain_run.rank_to_pos[rank]] +=
chain_run.last_visited[rank] == 1;
}
chain.histogram_entries++;
if(chain.histogram_entries >= chain.entries_before_optimization) {
chain.optimize_params();
chain.optimize_params(job_.jobfile["jobconfig"].get<int>(
"pt_parameter_optimization_regression_len", 2));
chain.clear_histograms();
job_.log(fmt::format("chain {}: pt feedback optimization: {:.2g}, entries: {}", chain.id, fmt::join(chain.params, ", "), chain.entries_before_optimization));
chain.entries_before_optimization *= job_.jobfile["jobconfig"].get<double>("pt_parameter_optimization_nsamples_growth", 1.5);
job_.log(fmt::format("chain {}: pt feedback optimization: {:.2g}, entries: {}",
chain.id, fmt::join(chain.params, ", "),
chain.entries_before_optimization));
chain.entries_before_optimization *= job_.jobfile["jobconfig"].get<double>(
"pt_parameter_optimization_nsamples_growth", 1.5);
}
}
for(int target = 0; target < chain_len_; target++) {
int target_rank = rank_section*chain_len_ + target + 1;
int target_rank = rank_section * chain_len_ + target + 1;
int pos = chain_run.rank_to_pos[target];
// keep consistent with pt_global_update
int partner_pos = pos + (2 * (pos & 1) - 1) * (2 * pt_swap_odd_ - 1);
......@@ -488,7 +496,7 @@ void runner_pt_master::react() {
}
for(int target = 0; target < chain_len_; target++) {
int target_rank = rank_section*chain_len_ + target + 1;
int target_rank = rank_section * chain_len_ + target + 1;
int pos = chain_run.rank_to_pos[target];
if(chain_run.weight_ratios[pos] < 0) {
double weight;
......@@ -504,10 +512,10 @@ void runner_pt_master::react() {
for(int target = 0; target < chain_len_; target++) {
int new_task_id = chain.task_ids[chain_run.rank_to_pos[target]];
double new_param = chain.params[chain_run.rank_to_pos[target]];
MPI_Send(&new_task_id, 1, MPI_INT, rank_section * chain_len_ + target + 1,
0, MPI_COMM_WORLD);
MPI_Send(&new_param, 1, MPI_DOUBLE, rank_section * chain_len_ + target + 1,
0, MPI_COMM_WORLD);
MPI_Send(&new_task_id, 1, MPI_INT, rank_section * chain_len_ + target + 1, 0,
MPI_COMM_WORLD);
MPI_Send(&new_param, 1, MPI_DOUBLE, rank_section * chain_len_ + target + 1, 0,
MPI_COMM_WORLD);
}
} else { // S_TIMEUP
num_active_ranks_--;
......@@ -524,8 +532,8 @@ void runner_pt_master::pt_global_update(pt_chain &chain, pt_chain_run &chain_run
if(r < w1 * w2) {
for(auto &p : chain_run.rank_to_pos) {
if(p == i) {
p = i+1;
} else if(p == i+1) {
p = i + 1;
} else if(p == i + 1) {
p = i;
}
}
......@@ -546,10 +554,11 @@ void runner_pt_slave::start() {
int group_idx;
MPI_Scatter(&group_idx, 1, MPI_INT, &group_idx, 1, MPI_INT, MASTER, MPI_COMM_WORLD);
MPI_Comm_split(MPI_COMM_WORLD, group_idx, 0, &chain_comm_);
MPI_Comm_rank(chain_comm_, &chain_rank_);
bool use_param_optimization = job_.jobfile["jobconfig"].get<bool>("pt_parameter_optimization", false);
bool use_param_optimization =
job_.jobfile["jobconfig"].get<bool>("pt_parameter_optimization", false);
if(!accept_new_chain()) {
return;
......@@ -587,7 +596,6 @@ void runner_pt_slave::start() {
action = what_is_next(S_BUSY);
} while(action != A_EXIT);
if(action == A_EXIT) {
job_.log(fmt::format("rank {} exits: out of work", rank_));
}
......@@ -604,16 +612,16 @@ void runner_pt_slave::pt_global_update() {
int response;
MPI_Recv(&response, 1, MPI_INT, MASTER, 0, MPI_COMM_WORLD, MPI_STATUS_IGNORE);
//job_.log(fmt::format(" * rank {}: ready for global update", rank_));
// job_.log(fmt::format(" * rank {}: ready for global update", rank_));
if(response == GA_CALC_WEIGHT) {
double partner_param;
MPI_Recv(&partner_param, 1, MPI_DOUBLE, MASTER, 0, MPI_COMM_WORLD, MPI_STATUS_IGNORE);
double weight_ratio = sys_->_pt_weight_ratio(partner_param);
MPI_Send(&weight_ratio, 1, MPI_DOUBLE, MASTER, 0, MPI_COMM_WORLD);
//job_.log(fmt::format(" * rank {}: weight sent", rank_));
// job_.log(fmt::format(" * rank {}: weight sent", rank_));
} else {
//job_.log(fmt::format(" * rank {}: no weight needed", rank_));
// job_.log(fmt::format(" * rank {}: no weight needed", rank_));
}
MPI_Barrier(chain_comm_);
......@@ -621,7 +629,7 @@ void runner_pt_slave::pt_global_update() {
double new_param;
int new_task_id;
MPI_Recv(&new_task_id, 1, MPI_INT, MASTER, 0, MPI_COMM_WORLD, MPI_STATUS_IGNORE);
MPI_Recv(&new_param, 1, MPI_DOUBLE, MASTER, 0, MPI_COMM_WORLD, MPI_STATUS_IGNORE);
......@@ -633,8 +641,8 @@ void runner_pt_slave::pt_global_update() {
if(new_task_id != task_id_ || current_param_ != new_param) {
task_id_ = new_task_id;
sweeps_per_global_update_ =
job_.jobfile["tasks"][job_.task_names[task_id_]].get<int>("pt_sweeps_per_global_update");
sweeps_per_global_update_ = job_.jobfile["tasks"][job_.task_names[task_id_]].get<int>(
"pt_sweeps_per_global_update");
sys_->_pt_update_param(new_param, job_.rundir(task_id_, run_id_));
}
current_param_ = new_param;
......
......@@ -26,7 +26,7 @@ struct pt_chain {
void checkpoint_write(const iodump::group &g);
void clear_histograms();
void optimize_params();
void optimize_params(int linreg_len);
};
struct pt_chain_run {
......
#pragma once
#include "mc.h"
#include "jobinfo.h"
#include "mc.h"
#include "runner_task.h"
#include <functional>
#include <ostream>
......
#include "iodump.h"
#include "runner_task.h"
#include "iodump.h"
namespace loadl {
......
#include "runner.cpp"
#include "jobinfo.cpp"
int main() {
using namespace loadl;
......
Markdown is supported
0% or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment