Commit 46e64651 authored by Lukas Weber's avatar Lukas Weber

parallel tempering feedback optimization

parent 798b506b
......@@ -28,7 +28,7 @@ if jobdir != '':
os.chdir(jobdir)
try:
job = jobfile.JobFile('./'+os.path.basename(args.jobfile))
job = jobfile.JobFile(os.path.basename(args.jobfile))
except jobfile.JobFileGenError as e:
print('Error: {}'.format(e))
sys.exit(1)
......
......@@ -10,7 +10,9 @@ class JobFileGenError(Exception):
class JobFile:
def __init__(self, filename):
result = subprocess.run([filename], stdout=subprocess.PIPE)
env = dict(os.environ)
env['PATH'] += ':.'
result = subprocess.run([filename], stdout=subprocess.PIPE, env=env)
self.raw_jobfile = result.stdout.decode('utf-8')
if result.returncode != 0:
......
......@@ -6,24 +6,28 @@ import numpy
def _expand_path(path):
return os.path.abspath(os.path.expandvars(os.path.expanduser(path)))
def JobConfig(filename):
with open(_expand_path(filename), 'r') as f:
return yaml.safe_load(f)
class TaskMaker:
def __init__(self, name, jobconfig):
self._tm_name = os.path.splitext(name)[0]
self._tm_name = name
self._tm_tasks = []
with open(_expand_path(jobconfig), 'r') as f:
self._tm_jobconfig = yaml.safe_load(f)
if type(jobconfig) == dict:
self._tm_jobconfig = jobconfig
elif type(jobconfig) == str:
self._tm_jobconfig = JobConfig(jobconfig)
self._tm_jobconfig['mc_binary'] = _expand_path(self._tm_jobconfig['mc_binary'])
def task(self, **additional_parameters):
self._tm_tasks.append({**self.__dict__, **additional_parameters})
def write(self):
filenames = []
jobfile_dict = { 'jobname': self._tm_name, 'jobconfig': self._tm_jobconfig, 'tasks': {}}
for i, t in enumerate(self._tm_tasks):
task_name = f'task{i+1:04d}'
task_dict = {}
......
#include "dump.h"
#include "iodump.h"
#include <iostream>
#include <sstream>
#include <sys/file.h>
......
#include "jobinfo.h"
#include <ctime>
#include <iomanip>
#include <iostream>
#include <fstream>
#include <regex>
#include <dirent.h>
#include "merger.h"
namespace loadl {
// parses the duration '[[hours:]minutes:]seconds' into seconds
// replace as soon as there is an alternative
static int parse_duration(const std::string &str) {
size_t idx;
try {
int i1 = std::stoi(str, &idx, 10);
if(idx == str.size()) {
return i1;
} else if(str[idx] == ':') {
std::string str1 = str.substr(idx + 1);
int i2 = std::stoi(str1, &idx, 10);
if(idx == str1.size()) {
return 60 * i1 + i2;
} else if(str[idx] == ':') {
std::string str2 = str1.substr(idx + 1);
int i3 = std::stoi(str2, &idx, 10);
if(idx != str2.size()) {
throw std::runtime_error{"minutes"};
}
return 60 * 60 * i1 + 60 * i2 + i3;
} else {
throw std::runtime_error{"hours"};
}
} else {
throw std::runtime_error{"seconds"};
}
} catch(std::exception &e) {
throw std::runtime_error{fmt::format(
"'{}' does not fit time format [[hours:]minutes:]seconds: {}", str, e.what())};
}
}
std::string jobinfo::jobdir() const {
return jobname + ".data";
}
std::string jobinfo::taskdir(int task_id) const {
return fmt::format("{}/{}", jobdir(), task_names.at(task_id));
}
std::string jobinfo::rundir(int task_id, int run_id) const {
return fmt::format("{}/run{:04d}", taskdir(task_id), run_id);
}
jobinfo::jobinfo(const std::string &jobfile_name) : jobfile{jobfile_name} {
for(auto node : jobfile["tasks"]) {
std::string task_name = node.first;
task_names.push_back(task_name);
}
std::sort(task_names.begin(), task_names.end());
jobname = jobfile.get<std::string>("jobname");
std::string datadir = jobdir();
int rc = mkdir(datadir.c_str(), 0755);
if(rc != 0 && errno != EEXIST) {
throw std::runtime_error{
fmt::format("creation of output directory '{}' failed: {}", datadir, strerror(errno))};
}
// perhaps a bit controversally, jobinfo tries to create the task directories. TODO: single file
// output.
for(size_t i = 0; i < task_names.size(); i++) {
int rc = mkdir(taskdir(i).c_str(), 0755);
if(rc != 0 && errno != EEXIST) {
throw std::runtime_error{fmt::format("creation of output directory '{}' failed: {}",
taskdir(i), strerror(errno))};
}
}
parser jobconfig{jobfile["jobconfig"]};
walltime = parse_duration(jobconfig.get<std::string>("mc_walltime"));
checkpoint_time = parse_duration(jobconfig.get<std::string>("mc_checkpoint_time"));
}
// This function lists files that could be run files being in the taskdir
// and having the right file_ending.
// The regex has to be matched with the output of the rundir function.
std::vector<std::string> jobinfo::list_run_files(const std::string &taskdir,
const std::string &file_ending) {
std::regex run_filename{"run\\d{4,}\\." + file_ending};
std::vector<std::string> results;
DIR *dir = opendir(taskdir.c_str());
if(dir == nullptr) {
throw std::ios_base::failure(
fmt::format("could not open directory '{}': {}", taskdir, strerror(errno)));
}
struct dirent *result;
while((result = readdir(dir)) != nullptr) {
std::string fname{result->d_name};
if(std::regex_search(fname, run_filename)) {
results.emplace_back(fmt::format("{}/{}", taskdir, fname));
}
}
closedir(dir);
return results;
}
int jobinfo::read_dump_progress(int task_id) const {
int sweeps = 0;
try {
for(auto &dump_name : list_run_files(taskdir(task_id), "dump\\.h5")) {
int dump_sweeps = 0;
iodump d = iodump::open_readonly(dump_name);
d.get_root().read("sweeps", dump_sweeps);
sweeps += dump_sweeps;
}
} catch(std::ios_base::failure &e) {
// might happen if the taskdir does not exist
}
return sweeps;
}
void jobinfo::concatenate_results() {
std::ofstream cat_results{fmt::format("{}.results.yml", jobname)};
for(size_t i = 0; i < task_names.size(); i++) {
std::ifstream res_file{taskdir(i) + "/results.yml"};
res_file.seekg(0, res_file.end);
size_t size = res_file.tellg();
res_file.seekg(0, res_file.beg);
std::vector<char> buf(size + 1, 0);
res_file.read(buf.data(), size);
cat_results << buf.data() << "\n";
}
}
void jobinfo::merge_task(int task_id, const std::vector<evalable> &evalables) {
std::vector<std::string> meas_files = list_run_files(taskdir(task_id), "meas\\.h5");
results results = merge(meas_files, evalables);
std::string result_filename = fmt::format("{}/results.yml", taskdir(task_id));
const std::string &task_name = task_names.at(task_id);
results.write_yaml(result_filename, taskdir(task_id), jobfile["tasks"][task_name].get_yaml());
}
void jobinfo::log(const std::string &message) {
std::time_t t = std::time(nullptr);
std::cout << std::put_time(std::localtime(&t), "%F %T: ") << message << "\n";
}
}
#pragma once
#include <vector>
#include <string>
#include "parser.h"
#include "iodump.h"
#include "evalable.h"
namespace loadl {
struct jobinfo {
parser jobfile;
std::string jobname;
std::vector<std::string> task_names;
double checkpoint_time;
double walltime;
jobinfo(const std::string &jobfile_name);
std::string jobdir() const;
std::string rundir(int task_id, int run_id) const;
std::string taskdir(int task_id) const;
static std::vector<std::string> list_run_files(const std::string &taskdir,
const std::string &file_ending);
int read_dump_progress(int task_id) const;
void merge_task(int task_id, const std::vector<evalable> &evalables);
void concatenate_results();
void log(const std::string &message);
};
}
......@@ -2,7 +2,8 @@
namespace loadl {
mc::mc(const parser &p) : param{p} {
therm_ = param.get<int>("thermalization");
therm_ = p.get<int>("thermalization");
pt_sweeps_per_global_update_ = p.get<int>("pt_sweeps_per_global_update", -1);
}
void mc::write_output(const std::string &) {}
......@@ -19,12 +20,13 @@ void mc::_init() {
// simple profiling support: measure the time spent for sweeps/measurements etc
measure.add_observable("_ll_checkpoint_read_time", 1);
measure.add_observable("_ll_checkpoint_write_time", 1);
measure.add_observable("_ll_measurement_time", 1000);
measure.add_observable("_ll_sweep_time", 1000);
measure.add_observable("_ll_measurement_time", pt_mode_ ? pt_sweeps_per_global_update_ : 1000);
measure.add_observable("_ll_sweep_time", pt_mode_ ? pt_sweeps_per_global_update_ : 1000);
if(param.get<bool>("pt_statistics", false)) {
measure.add_observable("_ll_pt_rank", 1);
measure.add_observable("_ll_pt_weight_ratio", 1);
if(pt_mode_) {
if(param.get<bool>("pt_statistics", false)) {
measure.add_observable("_ll_pt_rank", 1);
}
}
if(param.defined("seed")) {
......@@ -69,44 +71,54 @@ void mc::_do_update() {
void mc::_pt_update_param(double new_param, const std::string &new_dir) {
// take over the bins of the new target dir
/*{
{
iodump dump_file = iodump::open_readonly(new_dir + ".dump.h5");
measure.checkpoint_read(dump_file.get_root().open_group("measurements"));
}*/
}
auto unclean = measure.is_unclean();
if(unclean) {
throw std::runtime_error(fmt::format("Unclean observable: {}\nIn parallel tempering mode you have to choose the binsize for all observables so that it is commensurate with pt_sweeps_per_global_update (so that all bins are empty once it happens). If you don’t like this limitation, implement it properly.", *unclean));
}
pt_update_param(new_param);
}
void mc::pt_measure_statistics() {
if(param.get<bool>("pt_statistics", false)) {
int rank;
MPI_Comm_rank(MPI_COMM_WORLD, &rank);
measure.add("_ll_pt_rank", rank);
}
pt_update_param(new_param);
}
double mc::_pt_weight_ratio(double new_param) {
double wr = pt_weight_ratio(new_param);
if(param.get<bool>("pt_statistics", false)) {
measure.add("_ll_pt_weight_ratio", wr);
}
return wr;
}
void mc::_write(const std::string &dir) {
struct timespec tstart, tend;
clock_gettime(CLOCK_MONOTONIC_RAW, &tstart);
void mc::measurements_write(const std::string &dir) {
// blocks limit scopes of the dump file handles to ensure they are closed at the right time.
{
iodump meas_file = iodump::open_readwrite(dir + ".meas.h5");
measure.samples_write(meas_file.get_root());
auto g = meas_file.get_root();
measure.samples_write(g);
}
}
void mc::_write(const std::string &dir) {
struct timespec tstart, tend;
clock_gettime(CLOCK_MONOTONIC_RAW, &tstart);
measurements_write(dir);
{
iodump dump_file = iodump::create(dir + ".dump.h5.tmp");
auto g = dump_file.get_root();
measure.checkpoint_write(g.open_group("measurements"));
rng->checkpoint_write(g.open_group("random_number_generator"));
checkpoint_write(g.open_group("simulation"));
measure.checkpoint_write(g.open_group("measurements"));
g.write("max_checkpoint_write_time", max_checkpoint_write_time_);
g.write("max_sweep_time", max_sweep_time_);
......@@ -131,6 +143,7 @@ double mc::safe_exit_interval() {
return 2 * (max_checkpoint_write_time_ + max_sweep_time_ + max_meas_time_) + 2;
}
bool mc::_read(const std::string &dir) {
if(!file_exists(dir + ".dump.h5")) {
return false;
......@@ -164,6 +177,11 @@ void mc::_write_output(const std::string &filename) {
}
bool mc::is_thermalized() {
return sweep_ > therm_;
int sweep = sweep_;
if(pt_mode_ && pt_sweeps_per_global_update_ > 0) {
sweep /= pt_sweeps_per_global_update_;
}
return sweep >= therm_;
}
}
......@@ -12,8 +12,9 @@ namespace loadl {
class mc {
private:
int sweep_ = 0;
int therm_ = 0;
int sweep_{0};
int therm_{0};
int pt_sweeps_per_global_update_{-1};
// The following times in seconds are used to estimate a safe exit interval before walltime is
// up.
......@@ -40,6 +41,8 @@ protected:
}
public:
bool pt_mode_{};
double random01();
int sweep() const;
......@@ -52,6 +55,8 @@ public:
void _write(const std::string &dir);
bool _read(const std::string &dir);
void measurements_write(const std::string &dir);
void _write_output(const std::string &filename);
void _do_update();
......@@ -59,10 +64,9 @@ public:
void _pt_update_param(double new_param, const std::string &new_dir);
double _pt_weight_ratio(double new_param);
void pt_measure_statistics();
double safe_exit_interval();
// write only measurement data (useful for parallel tempering)
void measurement_write(const std::string &dir);
bool is_thermalized();
measurements measure;
......
......@@ -41,4 +41,13 @@ void measurements::samples_write(const iodump::group &meas_file) {
obs.second.measurement_write(g);
}
}
std::optional<std::string> measurements::is_unclean() const {
for(const auto &obs : observables_) {
if(!obs.second.is_clean()) {
return obs.first;
}
}
return std::nullopt;
}
}
#pragma once
#include "dump.h"
#include "iodump.h"
#include "observable.h"
#include <map>
#include <string>
......@@ -26,6 +26,10 @@ public:
// should be opened in read/write mode.
void samples_write(const iodump::group &meas_file);
// returns nullopt if all observables are clean,
// otherwise the name of a non-empty observable
std::optional<std::string> is_unclean() const;
private:
std::map<std::string, observable> observables_;
};
......
#include "merger.h"
#include "dump.h"
#include "iodump.h"
#include "evalable.h"
#include "mc.h"
#include "measurements.h"
......
......@@ -14,8 +14,9 @@ configure_file(input : 'config.h.in',
config_inc = include_directories('.')
loadleveller_sources = files([
'dump.cpp',
'evalable.cpp',
'iodump.cpp',
'jobinfo.cpp',
'mc.cpp',
'measurements.cpp',
'merger.cpp',
......@@ -30,8 +31,9 @@ loadleveller_sources = files([
])
loadleveller_headers = files([
'dump.h',
'evalable.h',
'iodump.h',
'jobinfo.h',
'loadleveller.h',
'mc.h',
'measurements.h',
......@@ -58,7 +60,7 @@ libloadleveller = library('loadleveller',
if should_install
pkg = import('pkgconfig')
pkg.generate(libloadleveller,
description : 'Framework for distributed (Quantum) Monte Carlo codes',
description : 'Framework for distributed (quantum) Monte Carlo codes',
)
install_headers(loadleveller_headers, subdir : 'loadleveller')
......
#include "observable.h"
#include <fmt/format.h>
#include <iostream>
namespace loadl {
observable::observable(std::string name, size_t bin_length, size_t vector_length)
......@@ -56,4 +56,12 @@ void observable::checkpoint_read(const iodump::group &d) {
d.read("samples", samples_);
current_bin_ = 0;
}
bool observable::is_clean() const {
if(current_bin_filling_ != 0) {
std::cout << current_bin_filling_ << "\n";
}
return current_bin_ == 0 && current_bin_filling_ == 0;
}
}
#pragma once
#include "dump.h"
#include "iodump.h"
#include <cmath>
#include <map>
#include <string>
......@@ -27,6 +27,9 @@ public:
void checkpoint_read(const iodump::group &dump_file);
// true if there are no samples in the bin
bool is_clean() const;
private:
static const size_t initial_bin_length = 1000;
......
......@@ -3,7 +3,7 @@
#include "config.h"
#include "MersenneTwister.h"
#include "dump.h"
#include "iodump.h"
#include <random>
#include <sstream>
#include <typeinfo>
......
#include "runner.h"
#include "merger.h"
#include "iodump.h"
#include "runner_pt.h"
#include <dirent.h>
#include <fmt/format.h>
#include <fstream>
#include <iomanip>
#include <regex>
#include <sys/stat.h>
namespace loadl {
enum {
......@@ -25,153 +21,6 @@ enum {
A_PROCESS_DATA_NEW_JOB = 4,
};
// parses the duration '[[hours:]minutes:]seconds' into seconds
// replace as soon as there is an alternative
static int parse_duration(const std::string &str) {
size_t idx;
try {
int i1 = std::stoi(str, &idx, 10);
if(idx == str.size()) {
return i1;
} else if(str[idx] == ':') {
std::string str1 = str.substr(idx + 1);
int i2 = std::stoi(str1, &idx, 10);
if(idx == str1.size()) {
return 60 * i1 + i2;
} else if(str[idx] == ':') {
std::string str2 = str1.substr(idx + 1);
int i3 = std::stoi(str2, &idx, 10);
if(idx != str2.size()) {
throw std::runtime_error{"minutes"};
}
return 60 * 60 * i1 + 60 * i2 + i3;
} else {
throw std::runtime_error{"hours"};
}
} else {
throw std::runtime_error{"seconds"};
}
} catch(std::exception &e) {
throw std::runtime_error{fmt::format(
"'{}' does not fit time format [[hours:]minutes:]seconds: {}", str, e.what())};
}
}
std::string jobinfo::jobdir() const {
return jobname + ".data";
}
std::string jobinfo::taskdir(int task_id) const {
return fmt::format("{}/{}", jobdir(), task_names.at(task_id));
}
std::string jobinfo::rundir(int task_id, int run_id) const {
return fmt::format("{}/run{:04d}", taskdir(task_id), run_id);
}
jobinfo::jobinfo(const std::string &jobfile_name) : jobfile{jobfile_name} {
for(auto node : jobfile["tasks"]) {
std::string task_name = node.first;
task_names.push_back(task_name);
}
std::sort(task_names.begin(), task_names.end());
jobname = jobfile.get<std::string>("jobname");
std::string datadir = jobdir();
int rc = mkdir(datadir.c_str(), 0755);
if(rc != 0 && errno != EEXIST) {
throw std::runtime_error{
fmt::format("creation of output directory '{}' failed: {}", datadir, strerror(errno))};
}
// perhaps a bit controversally, jobinfo tries to create the task directories. TODO: single file
// output.
for(size_t i = 0; i < task_names.size(); i++) {
int rc = mkdir(taskdir(i).c_str(), 0755);
if(rc != 0 && errno != EEXIST) {
throw std::runtime_error{fmt::format("creation of output directory '{}' failed: {}",
taskdir(i), strerror(errno))};
}
}
parser jobconfig{jobfile["jobconfig"]};
walltime = parse_duration(jobconfig.get<std::string>("mc_walltime"));
checkpoint_time = parse_duration(jobconfig.get<std::string>("mc_checkpoint_time"));
}
// This function lists files that could be run files being in the taskdir
// and having the right file_ending.
// The regex has to be matched with the output of the rundir function.
std::vector<std::string> jobinfo::list_run_files(const std::string &taskdir,
const std::string &file_ending) {
std::regex run_filename{"run\\d{4,}\\." + file_ending};
std::vector<std::string> results;
DIR *dir = opendir(taskdir.c_str());
if(dir == nullptr) {