Commit 1cdff5f2 authored by Lukas Weber's avatar Lukas Weber
Browse files

mpi runner seems to work

parent 2eb293be
This diff is collapsed.
......@@ -30,7 +30,7 @@ namespace load_leveller {
return run_mc<runner_single>(mccreator, argc-1, argv+1);
}
return run_mc<runner>(mccreator, argc, argv);
return run_mc<runner_mpi>(mccreator, argc, argv);
}
}
#ifndef MC_PT_H
#define MC_PT_H
#include <iostream>
#include <vector>
#include <string>
#include "measurements.h"
#include "random.h"
#include "parser.h"
#include "types.h"
using namespace std;
class mc_pt
{
private:
int Lx;
int Ly;
vector<double> Tvec;
double T;
int therm;
int sweep;
int pt_spacing;
int label;
vector<int> spin;
public:
parser param;
void param_init(string dir) {param.read_file(dir);}
bool have_random;
randomnumbergenerator * rng;
void random_init() {if (param.defined("SEED"))
rng = new randomnumbergenerator(param.value_of<luint>("SEED"));
else
rng = new randomnumbergenerator();
have_random=true;
}
void random_write(odump& d) {rng->write(d);}
void seed_write(string fn) {ofstream s; s.open(fn.c_str()); s << rng->seed()<<endl;s.close();}
void random_read(idump& d) {rng = new randomnumbergenerator();have_random=true;rng->read(d);}
void random_clear() { if (have_random) {delete rng;have_random=false;} }
double random01() {return rng->d();}
void init();
void do_update();
void do_measurement();
void write(string);
bool read(string);
void write_output(string,int);
bool is_thermalized();
bool request_global_update();
void change_parameter(int);
void change_to(int);
double get_weight(int);
int get_label();
int myrep;
vector<measurements> measure;
mc_pt(string);
~mc_pt();
};
#endif
......@@ -16,8 +16,8 @@ static std::runtime_error non_map_error(const std::string& filename) {
return std::runtime_error(fmt::format("YAML: {}: trying to dereference non-map node.", filename));
}
static std::runtime_error key_error(const std::string& filename, const std::string& name) {
return std::runtime_error(fmt::format("YAML: {}: could not find required key '{}'", filename, name));
static std::runtime_error key_error(const std::string& filename, const std::string& key) {
return std::runtime_error(fmt::format("YAML: {}: could not find required key '{}'", filename, key));
}
......@@ -50,30 +50,30 @@ parser::iterator parser::begin() {
parser::iterator parser::end() {
return iterator{filename_, content_.begin()};
return iterator{filename_, content_.end()};
}
bool parser::defined(const std::string& name) {
bool parser::defined(const std::string& key) {
if(!content_.IsMap()) {
return false;
}
return content_[name].IsDefined();
return content_[key].IsDefined();
}
parser parser::operator[](const std::string& name) {
parser parser::operator[](const std::string& key) {
if(!content_.IsMap()) {
throw non_map_error(filename_);
}
auto node = content_[name];
auto node = content_[key];
if(!node.IsDefined())
throw key_error(filename_, name);
throw key_error(filename_, key);
if(!node.IsMap())
throw std::runtime_error(fmt::format("YAML: {}: Found key '{}', but it has a scalar value. Was expecting it to be a map", filename_, name));
throw std::runtime_error(fmt::format("YAML: {}: Found key '{}', but it has a scalar value. Was expecting it to be a map", filename_, key));
try {
return parser{node, filename_};
} catch(YAML::Exception) {
throw key_error(filename_, name);
throw key_error(filename_, key);
}
}
......@@ -29,22 +29,22 @@ public:
parser(const std::string& filename);
template<typename T>
T get(const std::string& name) {
if(!content_[name]) {
throw std::runtime_error(fmt::format("YAML: {}: required key '{}' not found.", filename_, name));
T get(const std::string& key) {
if(!content_[key]) {
throw std::runtime_error(fmt::format("YAML: {}: required key '{}' not found.", filename_, key));
}
return content_[name].as<T>();
return content_[key].as<T>();
}
template<typename T>
auto get(const std::string& name, T default_val) -> decltype(default_val) {
return content_[name].as<T>(default_val);
auto get(const std::string& key, T default_val) -> decltype(default_val) {
return content_[key].as<T>(default_val);
}
// is key name defined?
bool defined(const std::string& name);
// is key defined?
bool defined(const std::string& key);
parser operator[](const std::string& name);
parser operator[](const std::string& key);
iterator begin();
iterator end();
......
......@@ -3,20 +3,55 @@
#include <fmt/format.h>
#include <regex>
#include <dirent.h>
#include <fstream>
std::string runner::taskdir(int task_id) const {
return fmt::format("{}.{}", jobfile_name_, task_names_.at(task_id));
enum {
MASTER = 0,
T_STATUS = 1,
T_ACTION = 2,
T_NEW_JOB = 3,
S_IDLE = 1,
S_BUSY = 2,
A_EXIT = 1,
A_CONTINUE = 2,
A_NEW_JOB = 3,
A_PROCESS_DATA_NEW_JOB = 4,
};
std::string jobinfo::taskdir(int task_id) const {
return fmt::format("{}.{}", jobfile_name, task_names.at(task_id));
}
std::string runner::rundir(int task_id, int run_id) const {
std::string jobinfo::rundir(int task_id, int run_id) const {
return fmt::format("{}/run{:04d}", taskdir(task_id), run_id);
}
jobinfo::jobinfo(const std::string& jobfile_name)
: jobfile_name{jobfile_name}, jobfile{jobfile_name} {
for(auto node : jobfile["tasks"]) {
std::string task_name = node.first;
if(std::find(task_names.begin(), task_names.end(), task_name) != task_names.end()) {
throw std::runtime_error(fmt::format("Task '{}' occured more than once in '{}'", task_name, jobfile_name));
}
task_names.push_back(task_name);
}
// The jobconfig file contains information about the launch options, walltime, number of cores etc... not sure if this is really the best way to solve the issue.
std::string jobconfig_path = jobfile.get<std::string>("jobconfig");
parser jobconfig{jobconfig_path};
walltime = jobconfig.get<double>("mc_walltime");
checkpoint_time = jobconfig.get<double>("mc_checkpoint_time");
sweeps_before_communication = jobconfig.get<int>("mc_sweeps_before_communication", 10000);
}
// This function lists files that could be run files being in the taskdir
// and having the right file_ending.
// The regex has to be matched with the output of the rundir function.
std::vector<std::string> runner::list_run_files(const std::string& taskdir, const std::string& file_ending) {
std::vector<std::string> jobinfo::list_run_files(const std::string& taskdir, const std::string& file_ending) {
std::regex run_filename{"run\\d{4,}."+file_ending};
std::vector<std::string> results;
DIR *dir = opendir(taskdir.c_str());
......@@ -32,61 +67,47 @@ std::vector<std::string> runner::list_run_files(const std::string& taskdir, cons
return results;
}
int runner::start(const std::string& jobfile_name, const mc_factory& mccreator, int argc, char **argv)
{
int runner_mpi::start(const std::string& jobfile_name, const mc_factory& mccreator, int argc, char **argv) {
MPI_Init(&argc,&argv);
jobfile_name_ = jobfile_name;
jobfile_ = std::unique_ptr<parser>{new parser{jobfile_name}};
int rank;
int num_ranks;
MPI_Comm_rank(MPI_COMM_WORLD, &rank);
MPI_Comm_size(MPI_COMM_WORLD, &num_ranks);
for(auto node : (*jobfile_)["tasks"]) {
std::string task_name = node.first;
if(std::find(task_names_.begin(), task_names_.end(), task_name) != task_names_.end()) {
throw std::runtime_error(fmt::format("Task '{}' occured more than once in '{}'", task_name, jobfile_name));
}
task_names_.push_back(task_name);
jobinfo job{jobfile_name};
if(rank == 0) {
runner_master r{std::move(job)};
r.start();
} else {
runner_slave r{std::move(job), mccreator};
r.start();
}
// The jobconfig file contains information about the launch options, walltime, number of cores etc... not sure if this is really the best way to solve the issue.
std::string jobconfig_path = jobfile_->get<std::string>("jobconfig");
parser jobconfig{jobconfig_path};
walltime_ = jobconfig.get<double>("mc_walltime");
chktime_ = jobconfig.get<double>("mc_checkpoint_time");
sweeps_before_communication_ = jobconfig.get<int>("mc_sweeps_before_communication", 10000);
MPI_Finalize();
mccreator_ = mccreator;
MPI_Init(&argc,&argv);
MPI_Comm_rank(MPI_COMM_WORLD, &my_rank);
MPI_Comm_size(MPI_COMM_WORLD, &world_size);
time_start_ = MPI_Wtime();
time_last_chkpt_ = time_start_;
STATUS.open(fmt::format("{}.status", jobfile_name), std::ios::out|std::ios::app);
if(my_rank == MASTER) {
M_read();
M_wait();
}
else what_is_next(S_IDLE);
return 0;
}
bool runner::is_chkpt_time()
{
if((MPI_Wtime() - time_last_chkpt_) > chktime_){
time_last_chkpt_ = MPI_Wtime();
return true;
}
else return false;
runner_master::runner_master(jobinfo&& job)
: job_{job} {
}
bool runner::time_is_up()
{
return (MPI_Wtime() - time_start_) > walltime_;
void runner_master::start() {
time_start_ = MPI_Wtime();
MPI_Comm_size(MPI_COMM_WORLD, &num_active_ranks_);
job_.status << fmt::format("0 : Starting job '{}'\n", job_.jobfile_name);
read();
while(num_active_ranks_ > 1) {
react();
}
end_of_run();
}
int runner::M_get_new_task_id(int old_id) {
int runner_master::get_new_task_id(int old_id) {
int ntasks = tasks_.size();
int i;
for(i = 1; i <= ntasks; i++) {
......@@ -98,96 +119,90 @@ int runner::M_get_new_task_id(int old_id) {
return -1;
}
void runner::M_update(int node)
{
bool runner_master::time_is_up() const {
return MPI_Wtime()-time_start_ > job_.walltime;
}
void runner_master::react() {
int node_status;
MPI_Status stat;
MPI_Recv(&node_status, 1, MPI_INT, node, T_STATUS, MPI_COMM_WORLD, &stat);
//STATUS << my_rank << ": Status " << node_status << " from " << node << "\n";
MPI_Recv(&node_status, 1, MPI_INT, MPI_ANY_SOURCE, T_STATUS, MPI_COMM_WORLD, &stat);
int node = stat.MPI_SOURCE;
//job_.status << "0 : Status " << node_status << " from " << node << "\n";
if (node_status == S_IDLE) {
if(time_is_up()) {
M_send_action(A_EXIT,node);
send_action(A_EXIT, node);
} else {
current_task_id_ = M_get_new_task_id(current_task_id_);
current_task_id_ = get_new_task_id(current_task_id_);
if(current_task_id_ < 0) {
M_send_action(A_EXIT,node);
send_action(A_EXIT,node);
} else {
M_send_action(A_NEW_JOB,node);
send_action(A_NEW_JOB,node);
tasks_[current_task_id_].scheduled_runs++;
int msg[2] = { current_task_id_, tasks_[current_task_id_].scheduled_runs };
MPI_Send(&msg, sizeof(msg)/sizeof(msg[0]), MPI_INT, node, T_NEW_JOB, MPI_COMM_WORLD);
}
}
} else { // S_BUSY or S_FINISHED
} else { // S_BUSY
int msg[2];
MPI_Recv(msg, sizeof(msg)/sizeof(msg[0]), MPI_INT, node, T_STATUS, MPI_COMM_WORLD, &stat);
int task_id = msg[0];
int completed_sweeps = msg[1];
tasks_[task_id].sweeps += completed_sweeps;
if(tasks_[task_id].is_done()) {
tasks_[task_id].scheduled_runs--;
M_send_action((tasks_[task_id].scheduled_runs == 0) ?
send_action((tasks_[task_id].scheduled_runs == 0) ?
A_PROCESS_DATA_NEW_JOB : A_NEW_JOB, node);
} else if(time_is_up()) {
send_action(A_EXIT, node);
num_active_ranks_--;
} else {
M_send_action(time_is_up() ? A_EXIT : A_CONTINUE, node);
send_action(A_CONTINUE, node);
}
}
//M_report(); we will otherwise have a lot of output!
//report(); // disable for production!!
}
void runner::M_send_action(int action, int to)
{
//STATUS << my_rank << ": Action "<<action<<" to " << to << "\n";
void runner_master::send_action(int action, int to) {
//job_.status << "0 : Action "<<action<<" to " << to << "\n";
MPI_Send(&action, 1, MPI_INT, to, T_ACTION, MPI_COMM_WORLD);
if(action == A_EXIT) N_exit ++;
}
void runner::M_wait() {
while (N_exit != world_size-1) {
MPI_Status stat;
int flag = 0;
while(!flag) {
MPI_Iprobe(MPI_ANY_SOURCE, MPI_ANY_TAG, MPI_COMM_WORLD, &flag, &stat);
}
if(stat.MPI_TAG == T_STATUS)
M_update(stat.MPI_SOURCE);
else {
STATUS << fmt::format("mpi rank {}: unknown message tag {} from {}\n", my_rank, stat.MPI_TAG, stat.MPI_SOURCE);
}
}
M_end_of_run();
}
int runner::M_read_dump_progress(int task_id) {
int runner_master::read_dump_progress(int task_id) {
int sweeps = 0;
for(auto& dump_name : list_run_files(taskdir(task_id), "dump.h5")) {
int dump_sweeps;
for(auto& dump_name : jobinfo::list_run_files(job_.taskdir(task_id), "dump.h5")) {
int dump_sweeps = 0;
try {
iodump d = iodump::open_readonly(dump_name);
d.get_root().read("sweeps", dump_sweeps);
sweeps += dump_sweeps;
} catch(iodump_exception& e) {
// okay
}
}
return sweeps;
}
void runner::M_read() {
for(size_t i = 0; i < task_names_.size(); i++) {
auto task = (*jobfile_)["tasks"][task_names_[i]];
void runner_master::read() {
for(size_t i = 0; i < job_.task_names.size(); i++) {
auto task = job_.jobfile["tasks"][job_.task_names[i]];
int target_sweeps = task.get<int>("sweeps");
int target_thermalization = task.get<int>("thermalization");
int sweeps = M_read_dump_progress(i);
int sweeps = read_dump_progress(i);
int scheduled_runs = 0;
tasks_.emplace_back(target_sweeps, target_thermalization, sweeps, scheduled_runs);
}
M_report();
report();
}
void runner::M_end_of_run()
{
void runner_master::end_of_run() {
bool need_restart = false;
for(size_t i = 0; i < tasks_.size(); i ++) {
if(!tasks_[i].is_done()) {
......@@ -195,112 +210,130 @@ void runner::M_end_of_run()
break;
}
}
if(need_restart) {
std::string rfilename = jobfile_name_+".restart";
std::string rfilename = job_.jobfile_name+".restart";
std::ofstream rfile(rfilename);
rfile << "restart me\n";
rfile.close();
STATUS << my_rank << ": Restart needed" << "\n";
job_.status << "0 : Restart needed" << "\n";
}
M_report();
MPI_Finalize();
STATUS << my_rank << ": MPI finalized" << "\n";
exit(0);
report();
}
void runner::M_report() {
void runner_master::report() {
job_.status << "0 : Task progress: \n";
for(size_t i = 0; i < tasks_.size(); i ++) {
STATUS << fmt::format("{:4d} {:4d} {:3.0f}%\n", i, tasks_[i].scheduled_runs,
tasks_[i].sweeps/(double)(tasks_[i].target_sweeps + tasks_[i].target_thermalization)*100);
job_.status << fmt::format("-- {}: {:4d} active runs: {:7d} sweeps of {:7d} target\n", job_.task_names[i], tasks_[i].scheduled_runs,
tasks_[i].sweeps, tasks_[i].target_sweeps + tasks_[i].target_thermalization);
}
}
void runner::what_is_next(int status) {
runner_slave::runner_slave(jobinfo&& job, const mc_factory& mccreator)
: job_{job}, mccreator_{mccreator} {
}
void runner_slave::start() {
MPI_Comm_rank(MPI_COMM_WORLD, &rank_);
time_start_ = MPI_Wtime();
time_last_checkpoint_ = time_start_;
while(true) {
int action = what_is_next(S_BUSY);
if(action == A_EXIT) {
break;
} else if(action == A_NEW_JOB) {
sys_ = std::unique_ptr<abstract_mc>{mccreator_(job_.jobfile_name, job_.task_names[task_id_])};
if (sys_->_read(job_.rundir(task_id_, run_id_))) {
job_.status << rank_ << ": L " << job_.rundir(task_id_, run_id_) << "\n";
} else {
job_.status << rank_ << ": I " << job_.rundir(task_id_, run_id_) << "\n";
sys_->_init();
//checkpointing();
}
}
what_is_next(S_BUSY);
while(sweeps_since_last_query_ < job_.sweeps_before_communication) {
sys_->_do_update();
sweeps_since_last_query_++;
if(sys_->is_thermalized()) {
sys_->do_measurement();
}
if(is_checkpoint_time() || time_is_up()) {
break;
}
}
checkpointing();
}
job_.status << rank_ << " : Done\n";
}
bool runner_slave::is_checkpoint_time() {
return MPI_Wtime()-time_last_checkpoint_ > job_.checkpoint_time;
}
bool runner_slave::time_is_up() {
return MPI_Wtime()-time_start_ > job_.walltime;
}
int runner_slave::what_is_next(int status) {
MPI_Send(&status, 1, MPI_INT, MASTER, T_STATUS, MPI_COMM_WORLD);
if (status==S_IDLE) {
int new_action = recv_action();
if(new_action == A_EXIT)
end_of_run();
return A_EXIT;
MPI_Status stat;
int msg[3];
int msg[2];
MPI_Recv(&msg, sizeof(msg)/sizeof(msg[0]), MPI_INT, 0, T_NEW_JOB, MPI_COMM_WORLD, &stat);
task_id_ = msg[0];
run_id_ = msg[1];
run();
} else {
int msg[2] = { current_task_id_, sys->sweep() };
return A_NEW_JOB;
} else {
int msg[2] = { task_id_, sweeps_since_last_query_ };
MPI_Send(msg, sizeof(msg)/sizeof(msg[0]), MPI_INT, 0, T_STATUS, MPI_COMM_WORLD);
sweeps_since_last_query_ = 0;
int new_action = recv_action();
if(new_action == A_PROCESS_DATA_NEW_JOB) {
merge_measurements();
what_is_next(S_IDLE);
return what_is_next(S_IDLE);
} else if(new_action == A_NEW_JOB) {
what_is_next(S_IDLE);
return what_is_next(S_IDLE);
} else if(new_action == A_EXIT) {
end_of_run();
return A_EXIT;
}
//else, new_action == A_CONTINUE, and we
//continue from where we got here.
}
return A_CONTINUE;
}
int runner::recv_action() {
int runner_slave::recv_action() {
MPI_Status stat;
int new_action;
MPI_Recv(&new_action, 1, MPI_INT, MASTER, T_ACTION, MPI_COMM_WORLD, &stat);
return new_action;
}
void runner::run() {
sys = std::unique_ptr<abstract_mc>{mccreator_(jobfile_name_, task_names_[task_id_])};
if (sys->_read(rundir(task_id_, run_id_))) {
STATUS << my_rank << ": L " << rundir(task_id_, run_id_) << "\n";
} else {
STATUS << my_rank << ": I " << rundir(task_id_, run_id_) << "\n";
sys->_init();
checkpointing();
}
what_is_next(S_BUSY);
while(sys->sweep() < sweeps_before_communication_) {
sys->_do_update();
if(sys->is_thermalized()) { // TODO
sys->do_measurement();
}
if(is_chkpt_time() || time_is_up()) {
checkpointing();
what_is_next(S_BUSY);
}
}
checkpointing();
what_is_next(S_BUSY);
}
void runner::checkpointing()
{
sys->_write(rundir(task_id_, run_id_));
STATUS << my_rank << ": C " << rundir(task_id_, run_id_) << "\n";
void runner_slave::checkpointing() {
time_last_checkpoint_ = MPI_Wtime();
sys_->_write(job_.rundir(task_id_, run_id_));