Commit c2d72143 authored by Lukas Weber's avatar Lukas Weber
Browse files

enlarged datatype for sweeps

parent 7875ed9b
...@@ -113,10 +113,10 @@ std::vector<std::string> jobinfo::list_run_files(const std::string &taskdir, ...@@ -113,10 +113,10 @@ std::vector<std::string> jobinfo::list_run_files(const std::string &taskdir,
} }
int jobinfo::read_dump_progress(int task_id) const { int jobinfo::read_dump_progress(int task_id) const {
int sweeps = 0; size_t sweeps = 0;
try { try {
for(auto &dump_name : list_run_files(taskdir(task_id), "dump\\.h5")) { for(auto &dump_name : list_run_files(taskdir(task_id), "dump\\.h5")) {
int dump_sweeps = 0; size_t dump_sweeps = 0;
iodump d = iodump::open_readonly(dump_name); iodump d = iodump::open_readonly(dump_name);
d.get_root().read("sweeps", dump_sweeps); d.get_root().read("sweeps", dump_sweeps);
sweeps += dump_sweeps; sweeps += dump_sweeps;
......
...@@ -8,7 +8,7 @@ mc::mc(const parser &p) : param{p} { ...@@ -8,7 +8,7 @@ mc::mc(const parser &p) : param{p} {
void mc::write_output(const std::string &) {} void mc::write_output(const std::string &) {}
int mc::sweep() const { size_t mc::sweep() const {
return sweep_; return sweep_;
} }
...@@ -82,12 +82,12 @@ void mc::_write(const std::string &dir) { ...@@ -82,12 +82,12 @@ void mc::_write(const std::string &dir) {
checkpoint_write(g.open_group("simulation")); checkpoint_write(g.open_group("simulation"));
measure.checkpoint_write(g.open_group("measurements")); measure.checkpoint_write(g.open_group("measurements"));
int therm = therm_; size_t therm = therm_;
if(pt_mode_) { if(pt_mode_) {
therm *= pt_sweeps_per_global_update_; therm *= pt_sweeps_per_global_update_;
} }
g.write("thermalization_sweeps", std::min(sweep_,therm)); g.write("thermalization_sweeps", std::min(sweep_,therm));
g.write("sweeps", std::max(0,sweep_-therm)); g.write("sweeps", sweep_-std::min(sweep_,therm));
} }
clock_gettime(CLOCK_MONOTONIC_RAW, &tend); clock_gettime(CLOCK_MONOTONIC_RAW, &tend);
...@@ -118,7 +118,7 @@ bool mc::_read(const std::string &dir) { ...@@ -118,7 +118,7 @@ bool mc::_read(const std::string &dir) {
measure.checkpoint_read(g.open_group("measurements")); measure.checkpoint_read(g.open_group("measurements"));
checkpoint_read(g.open_group("simulation")); checkpoint_read(g.open_group("simulation"));
int sweeps, therm_sweeps; size_t sweeps, therm_sweeps;
g.read("thermalization_sweeps", therm_sweeps); g.read("thermalization_sweeps", therm_sweeps);
g.read("sweeps", sweeps); g.read("sweeps", sweeps);
sweep_ = sweeps + therm_sweeps; sweep_ = sweeps + therm_sweeps;
...@@ -130,7 +130,7 @@ bool mc::_read(const std::string &dir) { ...@@ -130,7 +130,7 @@ bool mc::_read(const std::string &dir) {
} }
bool mc::is_thermalized() { bool mc::is_thermalized() {
int sweep = sweep_; size_t sweep = sweep_;
if(pt_mode_ && pt_sweeps_per_global_update_ > 0) { if(pt_mode_ && pt_sweeps_per_global_update_ > 0) {
sweep /= pt_sweeps_per_global_update_; sweep /= pt_sweeps_per_global_update_;
} }
......
...@@ -12,8 +12,8 @@ namespace loadl { ...@@ -12,8 +12,8 @@ namespace loadl {
class mc { class mc {
private: private:
int sweep_{0}; size_t sweep_{0};
int therm_{0}; size_t therm_{0};
int pt_sweeps_per_global_update_{-1}; int pt_sweeps_per_global_update_{-1};
protected: protected:
parser param; parser param;
...@@ -39,7 +39,7 @@ protected: ...@@ -39,7 +39,7 @@ protected:
public: public:
bool pt_mode_{}; bool pt_mode_{};
int sweep() const; size_t sweep() const;
virtual void register_evalables(std::vector<evalable> &evalables) = 0; virtual void register_evalables(std::vector<evalable> &evalables) = 0;
virtual void write_output(const std::string &filename); virtual void write_output(const std::string &filename);
......
...@@ -83,15 +83,18 @@ void runner_master::react() { ...@@ -83,15 +83,18 @@ void runner_master::react() {
} else { } else {
send_action(A_NEW_JOB, node); send_action(A_NEW_JOB, node);
tasks_[current_task_id_].scheduled_runs++; tasks_[current_task_id_].scheduled_runs++;
int msg[3] = {current_task_id_, tasks_[current_task_id_].scheduled_runs,
std::max(1,tasks_[current_task_id_].target_sweeps - tasks_[current_task_id_].sweeps)}; size_t sweeps_until_comm = 1 + tasks_[current_task_id_].target_sweeps - std::min(tasks_[current_task_id_].target_sweeps, tasks_[current_task_id_].sweeps);
MPI_Send(&msg, sizeof(msg) / sizeof(msg[0]), MPI_INT, node, T_NEW_JOB, MPI_COMM_WORLD); assert(current_task_id_ >= 0);
uint64_t msg[3] = {static_cast<uint64_t>(current_task_id_), static_cast<uint64_t>(tasks_[current_task_id_].scheduled_runs),
sweeps_until_comm};
MPI_Send(&msg, sizeof(msg) / sizeof(msg[0]), MPI_UINT64_T, node, T_NEW_JOB, MPI_COMM_WORLD);
} }
} else if(node_status == S_BUSY) { } else if(node_status == S_BUSY) {
int msg[2]; uint64_t msg[2];
MPI_Recv(msg, sizeof(msg) / sizeof(msg[0]), MPI_INT, node, T_STATUS, MPI_COMM_WORLD, &stat); MPI_Recv(msg, sizeof(msg) / sizeof(msg[0]), MPI_UINT64_T, node, T_STATUS, MPI_COMM_WORLD, &stat);
int task_id = msg[0]; int task_id = msg[0];
int completed_sweeps = msg[1]; size_t completed_sweeps = msg[1];
tasks_[task_id].sweeps += completed_sweeps; tasks_[task_id].sweeps += completed_sweeps;
if(tasks_[task_id].is_done()) { if(tasks_[task_id].is_done()) {
...@@ -122,8 +125,8 @@ void runner_master::read() { ...@@ -122,8 +125,8 @@ void runner_master::read() {
for(size_t i = 0; i < job_.task_names.size(); i++) { for(size_t i = 0; i < job_.task_names.size(); i++) {
auto task = job_.jobfile["tasks"][job_.task_names[i]]; auto task = job_.jobfile["tasks"][job_.task_names[i]];
int target_sweeps = task.get<int>("sweeps"); size_t target_sweeps = task.get<size_t>("sweeps");
int sweeps = job_.read_dump_progress(i); size_t sweeps = job_.read_dump_progress(i);
int scheduled_runs = 0; int scheduled_runs = 0;
tasks_.emplace_back(target_sweeps, sweeps, scheduled_runs); tasks_.emplace_back(target_sweeps, sweeps, scheduled_runs);
...@@ -203,8 +206,8 @@ int runner_slave::what_is_next(int status) { ...@@ -203,8 +206,8 @@ int runner_slave::what_is_next(int status) {
return A_EXIT; return A_EXIT;
} }
MPI_Status stat; MPI_Status stat;
int msg[3]; uint64_t msg[3];
MPI_Recv(&msg, sizeof(msg) / sizeof(msg[0]), MPI_INT, 0, T_NEW_JOB, MPI_COMM_WORLD, &stat); MPI_Recv(&msg, sizeof(msg) / sizeof(msg[0]), MPI_UINT64_T, 0, T_NEW_JOB, MPI_COMM_WORLD, &stat);
task_id_ = msg[0]; task_id_ = msg[0];
run_id_ = msg[1]; run_id_ = msg[1];
sweeps_before_communication_ = msg[2]; sweeps_before_communication_ = msg[2];
...@@ -212,8 +215,9 @@ int runner_slave::what_is_next(int status) { ...@@ -212,8 +215,9 @@ int runner_slave::what_is_next(int status) {
return A_NEW_JOB; return A_NEW_JOB;
} }
int msg[2] = {task_id_, sweeps_since_last_query_}; assert(task_id_ >= 0);
MPI_Send(msg, sizeof(msg) / sizeof(msg[0]), MPI_INT, 0, T_STATUS, MPI_COMM_WORLD); uint64_t msg[2] = {static_cast<uint64_t>(task_id_), sweeps_since_last_query_};
MPI_Send(msg, sizeof(msg) / sizeof(msg[0]), MPI_UINT64_T, 0, T_STATUS, MPI_COMM_WORLD);
sweeps_since_last_query_ = 0; sweeps_since_last_query_ = 0;
int new_action = recv_action(); int new_action = recv_action();
if(new_action == A_PROCESS_DATA_NEW_JOB) { if(new_action == A_PROCESS_DATA_NEW_JOB) {
......
...@@ -46,8 +46,8 @@ private: ...@@ -46,8 +46,8 @@ private:
double time_start_{0}; double time_start_{0};
int rank_{0}; int rank_{0};
int sweeps_since_last_query_{0}; size_t sweeps_since_last_query_{0};
int sweeps_before_communication_{0}; size_t sweeps_before_communication_{0};
int task_id_{-1}; int task_id_{-1};
int run_id_{-1}; int run_id_{-1};
......
...@@ -185,7 +185,7 @@ void runner_pt_master::construct_pt_chains() { ...@@ -185,7 +185,7 @@ void runner_pt_master::construct_pt_chains() {
"chain {}: task {}: in parallel tempering mode, sweeps are measured in global updates and need to be the " "chain {}: task {}: in parallel tempering mode, sweeps are measured in global updates and need to be the "
"same within each chain: {} = {} != {}"; "same within each chain: {} = {} != {}";
int target_sweeps = task.get<int>("sweeps"); int64_t target_sweeps = task.get<int>("sweeps");
if(chain.target_sweeps >= 0 && target_sweeps != chain.target_sweeps) { if(chain.target_sweeps >= 0 && target_sweeps != chain.target_sweeps) {
throw std::runtime_error{ throw std::runtime_error{
fmt::format(pt_sweep_error, chain.id, i, "target_sweeps", chain.target_sweeps, target_sweeps)}; fmt::format(pt_sweep_error, chain.id, i, "target_sweeps", chain.target_sweeps, target_sweeps)};
...@@ -201,8 +201,8 @@ void runner_pt_master::construct_pt_chains() { ...@@ -201,8 +201,8 @@ void runner_pt_master::construct_pt_chains() {
} }
chain.target_thermalization = target_thermalization; chain.target_thermalization = target_thermalization;
int sweeps_per_global_update = task.get<int>("pt_sweeps_per_global_update"); int64_t sweeps_per_global_update = task.get<int>("pt_sweeps_per_global_update");
int sweeps = job_.read_dump_progress(i) / sweeps_per_global_update; int64_t sweeps = job_.read_dump_progress(i) / sweeps_per_global_update;
if(chain.sweeps >= 0 && sweeps != chain.sweeps) { if(chain.sweeps >= 0 && sweeps != chain.sweeps) {
throw std::runtime_error{fmt::format(pt_sweep_error, chain.id, i, "sweeps", chain.sweeps, sweeps)}; throw std::runtime_error{fmt::format(pt_sweep_error, chain.id, i, "sweeps", chain.sweeps, sweeps)};
} }
...@@ -403,18 +403,18 @@ int runner_pt_master::schedule_chain_run() { ...@@ -403,18 +403,18 @@ int runner_pt_master::schedule_chain_run() {
int runner_pt_master::assign_new_chain(int rank_section) { int runner_pt_master::assign_new_chain(int rank_section) {
int chain_run_id = schedule_chain_run(); int chain_run_id = schedule_chain_run();
for(int target = 0; target < chain_len_; target++) { for(int target = 0; target < chain_len_; target++) {
int msg[3] = {-1, 0, 0}; int64_t msg[3] = {-1, 0, 0};
if(chain_run_id >= 0) { if(chain_run_id >= 0) {
auto &chain_run = pt_chain_runs_[chain_run_id]; auto &chain_run = pt_chain_runs_[chain_run_id];
auto &chain = pt_chains_[chain_run.id]; auto &chain = pt_chains_[chain_run.id];
msg[0] = chain.task_ids[target]; msg[0] = chain.task_ids[target];
msg[1] = chain_run.run_id; msg[1] = chain_run.run_id;
msg[2] = std::max(1, chain.target_sweeps - chain.sweeps); msg[2] = std::max(1L, chain.target_sweeps - chain.sweeps);
} else { } else {
// this will prompt the slave to quit // this will prompt the slave to quit
num_active_ranks_--; num_active_ranks_--;
} }
MPI_Send(&msg, sizeof(msg) / sizeof(msg[0]), MPI_INT, MPI_Send(&msg, sizeof(msg) / sizeof(msg[0]), MPI_INT64_T,
rank_section * chain_len_ + target + 1, 0, MPI_COMM_WORLD); rank_section * chain_len_ + target + 1, 0, MPI_COMM_WORLD);
} }
rank_to_chain_run_[rank_section] = chain_run_id; rank_to_chain_run_[rank_section] = chain_run_id;
...@@ -442,9 +442,9 @@ void runner_pt_master::react() { ...@@ -442,9 +442,9 @@ void runner_pt_master::react() {
MPI_Recv(&rank_status, 1, MPI_INT, MPI_ANY_SOURCE, 0, MPI_COMM_WORLD, &stat); MPI_Recv(&rank_status, 1, MPI_INT, MPI_ANY_SOURCE, 0, MPI_COMM_WORLD, &stat);
int rank = stat.MPI_SOURCE - 1; int rank = stat.MPI_SOURCE - 1;
if(rank_status == S_BUSY) { if(rank_status == S_BUSY) {
int msg[1]; int64_t msg[1];
MPI_Recv(msg, sizeof(msg) / sizeof(msg[0]), MPI_INT, rank + 1, 0, MPI_COMM_WORLD, &stat); MPI_Recv(msg, sizeof(msg) / sizeof(msg[0]), MPI_INT64_T, rank + 1, 0, MPI_COMM_WORLD, &stat);
int completed_sweeps = msg[0]; int64_t completed_sweeps = msg[0];
int chain_run_id = rank_to_chain_run_[rank / chain_len_]; int chain_run_id = rank_to_chain_run_[rank / chain_len_];
auto &chain_run = pt_chain_runs_[chain_run_id]; auto &chain_run = pt_chain_runs_[chain_run_id];
...@@ -695,8 +695,8 @@ void runner_pt_slave::pt_global_update() { ...@@ -695,8 +695,8 @@ void runner_pt_slave::pt_global_update() {
} }
bool runner_pt_slave::accept_new_chain() { bool runner_pt_slave::accept_new_chain() {
int msg[3]; int64_t msg[3];
MPI_Recv(&msg, sizeof(msg) / sizeof(msg[0]), MPI_INT, 0, 0, MPI_COMM_WORLD, MPI_STATUS_IGNORE); MPI_Recv(&msg, sizeof(msg) / sizeof(msg[0]), MPI_INT64_T, 0, 0, MPI_COMM_WORLD, MPI_STATUS_IGNORE);
task_id_ = msg[0]; task_id_ = msg[0];
run_id_ = msg[1]; run_id_ = msg[1];
sweeps_before_communication_ = msg[2]; sweeps_before_communication_ = msg[2];
...@@ -706,7 +706,7 @@ bool runner_pt_slave::accept_new_chain() { ...@@ -706,7 +706,7 @@ bool runner_pt_slave::accept_new_chain() {
} }
sweeps_per_global_update_ = sweeps_per_global_update_ =
job_.jobfile["tasks"][job_.task_names[task_id_]].get<int>("pt_sweeps_per_global_update"); job_.jobfile["tasks"][job_.task_names[task_id_]].get<int64_t>("pt_sweeps_per_global_update");
sys_ = std::unique_ptr<mc>{mccreator_(job_.jobfile["tasks"][job_.task_names[task_id_]])}; sys_ = std::unique_ptr<mc>{mccreator_(job_.jobfile["tasks"][job_.task_names[task_id_]])};
sys_->pt_mode_ = true; sys_->pt_mode_ = true;
...@@ -725,8 +725,8 @@ int runner_pt_slave::what_is_next(int status) { ...@@ -725,8 +725,8 @@ int runner_pt_slave::what_is_next(int status) {
if(chain_rank_ == 0) { if(chain_rank_ == 0) {
send_status(status); send_status(status);
int msg[1] = {sweeps_since_last_query_}; int64_t msg[1] = {sweeps_since_last_query_};
MPI_Send(msg, sizeof(msg) / sizeof(msg[0]), MPI_INT, 0, 0, MPI_COMM_WORLD); MPI_Send(msg, sizeof(msg) / sizeof(msg[0]), MPI_INT64_T, 0, 0, MPI_COMM_WORLD);
} }
sweeps_since_last_query_ = 0; sweeps_since_last_query_ = 0;
int new_action = recv_action(); int new_action = recv_action();
......
...@@ -10,9 +10,9 @@ struct pt_chain { ...@@ -10,9 +10,9 @@ struct pt_chain {
std::vector<int> task_ids; std::vector<int> task_ids;
std::vector<double> params; std::vector<double> params;
int sweeps{-1}; int64_t sweeps{-1};
int target_sweeps{-1}; int64_t target_sweeps{-1};
int target_thermalization{-1}; int64_t target_thermalization{-1};
int scheduled_runs{}; int scheduled_runs{};
// parameter optimization // parameter optimization
...@@ -107,9 +107,9 @@ private: ...@@ -107,9 +107,9 @@ private:
double time_start_{0}; double time_start_{0};
int rank_{}; int rank_{};
int sweeps_since_last_query_{}; int64_t sweeps_since_last_query_{};
int sweeps_before_communication_{}; int64_t sweeps_before_communication_{};
int sweeps_per_global_update_{}; int64_t sweeps_per_global_update_{};
int task_id_{-1}; int task_id_{-1};
int run_id_{-1}; int run_id_{-1};
......
...@@ -78,8 +78,8 @@ void runner_single::read() { ...@@ -78,8 +78,8 @@ void runner_single::read() {
for(size_t i = 0; i < job_.task_names.size(); i++) { for(size_t i = 0; i < job_.task_names.size(); i++) {
auto task = job_.jobfile["tasks"][job_.task_names[i]]; auto task = job_.jobfile["tasks"][job_.task_names[i]];
int target_sweeps = task.get<int>("sweeps"); size_t target_sweeps = task.get<int>("sweeps");
int sweeps = 0; size_t sweeps = 0;
sweeps = job_.read_dump_progress(i); sweeps = job_.read_dump_progress(i);
tasks_.emplace_back(target_sweeps, sweeps, 0); tasks_.emplace_back(target_sweeps, sweeps, 0);
......
...@@ -3,7 +3,7 @@ ...@@ -3,7 +3,7 @@
namespace loadl { namespace loadl {
runner_task::runner_task(int target_sweeps, int sweeps, runner_task::runner_task(size_t target_sweeps, size_t sweeps,
int scheduled_runs) int scheduled_runs)
: target_sweeps{target_sweeps}, sweeps{sweeps}, : target_sweeps{target_sweeps}, sweeps{sweeps},
scheduled_runs{scheduled_runs} {} scheduled_runs{scheduled_runs} {}
......
#pragma once #pragma once
#include <cstddef>
namespace loadl { namespace loadl {
// used by the runner // used by the runner
struct runner_task { struct runner_task {
int target_sweeps; size_t target_sweeps;
int sweeps; size_t sweeps;
int scheduled_runs; int scheduled_runs;
bool is_done() const; bool is_done() const;
runner_task(int target_sweeps, int sweeps, int scheduled_runs); runner_task(size_t target_sweeps, size_t sweeps, int scheduled_runs);
}; };
} }
Markdown is supported
0% or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment