Commit 0c7d8274 authored by Lukas Weber's avatar Lukas Weber

make scheduler runs stop early to assure checkpoint is written before

walltimeout
parent 2a2dada4
......@@ -42,8 +42,12 @@ void mc::_do_measurement() {
do_measurement();
clock_gettime(CLOCK_MONOTONIC_RAW, &tend);
measure.add("_ll_measurement_time",
(tend.tv_sec - tstart.tv_sec) + 1e-9 * (tend.tv_nsec - tstart.tv_nsec));
double measurement_time = (tend.tv_sec - tstart.tv_sec) + 1e-9 * (tend.tv_nsec - tstart.tv_nsec);
measure.add("_ll_measurement_time", measurement_time);
if(measurement_time > max_meas_time_) {
max_meas_time_ = measurement_time;
}
}
void mc::_do_update() {
......@@ -53,30 +57,49 @@ void mc::_do_update() {
do_update();
clock_gettime(CLOCK_MONOTONIC_RAW, &tend);
measure.add("_ll_sweep_time",
(tend.tv_sec - tstart.tv_sec) + 1e-9 * (tend.tv_nsec - tstart.tv_nsec));
double sweep_time = (tend.tv_sec - tstart.tv_sec) + 1e-9 * (tend.tv_nsec - tstart.tv_nsec);
measure.add("_ll_sweep_time", sweep_time);
if(sweep_time > max_sweep_time_) {
max_sweep_time_ = sweep_time;
}
}
void mc::_write(const std::string &dir) {
struct timespec tstart, tend;
clock_gettime(CLOCK_MONOTONIC_RAW, &tstart);
iodump meas_file = iodump::open_readwrite(dir + ".meas.h5");
measure.samples_write(meas_file.get_root());
// blocks limit scopes of the dump file handles to ensure they are closed at the right time.
{
iodump meas_file = iodump::open_readwrite(dir + ".meas.h5");
measure.samples_write(meas_file.get_root());
}
iodump dump_file = iodump::create(dir + ".dump.h5");
auto g = dump_file.get_root();
{
iodump dump_file = iodump::create(dir + ".dump.h5.tmp");
auto g = dump_file.get_root();
measure.checkpoint_write(g.open_group("measurements"));
rng->checkpoint_write(g.open_group("random_number_generator"));
checkpoint_write(g.open_group("simulation"));
measure.checkpoint_write(g.open_group("measurements"));
rng->checkpoint_write(g.open_group("random_number_generator"));
checkpoint_write(g.open_group("simulation"));
g.write("sweeps", sweep_);
g.write("thermalization_sweeps", std::min(therm_, sweep_)); // only for convenience
g.write("sweeps", sweep_);
g.write("thermalization_sweeps", std::min(therm_, sweep_)); // only for convenience
}
rename((dir + ".dump.h5.tmp").c_str(), (dir + ".dump.h5").c_str());
clock_gettime(CLOCK_MONOTONIC_RAW, &tend);
measure.add("_ll_checkpoint_write_time",
(tend.tv_sec - tstart.tv_sec) + 1e-9 * (tend.tv_nsec - tstart.tv_nsec));
double checkpoint_write_time = (tend.tv_sec - tstart.tv_sec) + 1e-9 * (tend.tv_nsec - tstart.tv_nsec);
measure.add("_ll_checkpoint_write_time", checkpoint_write_time);
if(checkpoint_write_time > max_checkpoint_write_time_) {
max_checkpoint_write_time_ = checkpoint_write_time;
}
}
double mc::safe_exit_interval() {
// this is more or less guesswork in an attempt to make it safe for as many cases as possible
return 2*(max_checkpoint_write_time_ + max_sweep_time_ + max_meas_time_) + 2;
}
static bool file_exists(const std::string &path) {
......
......@@ -17,6 +17,10 @@ private:
int sweep_ = 0;
int therm_ = 0;
// The following times in seconds are used to estimate a safe exit interval before walltime is up.
double max_checkpoint_write_time_{0};
double max_sweep_time_{0};
double max_meas_time_{0};
protected:
parser param;
std::unique_ptr<random_number_generator> rng;
......@@ -45,6 +49,8 @@ public:
void _do_update();
void _do_measurement();
double safe_exit_interval();
bool is_thermalized();
measurements measure;
......
......@@ -17,6 +17,7 @@ enum {
S_IDLE = 1,
S_BUSY = 2,
S_TIMEUP = 3,
A_EXIT = 1,
A_CONTINUE = 2,
......@@ -173,7 +174,6 @@ int runner_mpi_start(jobinfo job, const mc_factory &mccreator, int argc, char **
runner_master::runner_master(jobinfo job) : job_{std::move(job)} {}
void runner_master::start() {
time_start_ = MPI_Wtime();
MPI_Comm_size(MPI_COMM_WORLD, &num_active_ranks_);
job_.log(fmt::format("Starting job '{}'", job_.jobname));
......@@ -196,34 +196,26 @@ int runner_master::get_new_task_id(int old_id) {
return -1;
}
bool runner_master::time_is_up() const {
return MPI_Wtime() - time_start_ > job_.walltime;
}
void runner_master::react() {
int node_status;
MPI_Status stat;
MPI_Recv(&node_status, 1, MPI_INT, MPI_ANY_SOURCE, T_STATUS, MPI_COMM_WORLD, &stat);
int node = stat.MPI_SOURCE;
if(node_status == S_IDLE) {
if(time_is_up()) {
current_task_id_ = get_new_task_id(current_task_id_);
if(current_task_id_ < 0) {
send_action(A_EXIT, node);
num_active_ranks_--;
} else {
current_task_id_ = get_new_task_id(current_task_id_);
if(current_task_id_ < 0) {
send_action(A_EXIT, node);
num_active_ranks_--;
} else {
send_action(A_NEW_JOB, node);
tasks_[current_task_id_].scheduled_runs++;
int msg[3] = {current_task_id_, tasks_[current_task_id_].scheduled_runs,
tasks_[current_task_id_].target_sweeps};
MPI_Send(&msg, sizeof(msg) / sizeof(msg[0]), MPI_INT, node, T_NEW_JOB,
MPI_COMM_WORLD);
}
send_action(A_NEW_JOB, node);
tasks_[current_task_id_].scheduled_runs++;
int msg[3] = {current_task_id_, tasks_[current_task_id_].scheduled_runs,
tasks_[current_task_id_].target_sweeps};
MPI_Send(&msg, sizeof(msg) / sizeof(msg[0]), MPI_INT, node, T_NEW_JOB,
MPI_COMM_WORLD);
}
} else { // S_BUSY
} else if(node_status == S_BUSY) {
int msg[2];
MPI_Recv(msg, sizeof(msg) / sizeof(msg[0]), MPI_INT, node, T_STATUS, MPI_COMM_WORLD, &stat);
int task_id = msg[0];
......@@ -242,14 +234,12 @@ void runner_master::react() {
send_action(A_PROCESS_DATA_NEW_JOB, node);
}
} else if(time_is_up()) {
send_action(A_EXIT, node);
num_active_ranks_--;
} else {
send_action(A_CONTINUE, node);
}
} else { // S_TIMEUP
num_active_ranks_--;
}
}
void runner_master::send_action(int action, int destination) {
......@@ -327,10 +317,17 @@ void runner_slave::start() {
}
}
checkpointing();
if(time_is_up()) {
what_is_next(S_TIMEUP);
break;
}
action = what_is_next(S_BUSY);
}
if(time_is_up()) {
job_.log(fmt::format("rank {} exits: time limit reached", rank_));
job_.log(fmt::format("rank {} exits: walltime up", rank_));
} else {
job_.log(fmt::format("rank {} exits: out of work", rank_));
}
......@@ -341,12 +338,18 @@ bool runner_slave::is_checkpoint_time() {
}
bool runner_slave::time_is_up() {
return MPI_Wtime() - time_start_ > job_.walltime;
double safe_interval = 0;
if(sys_ != nullptr) {
safe_interval = sys_->safe_exit_interval();
}
return MPI_Wtime() - time_start_ > job_.walltime-safe_interval;
}
int runner_slave::what_is_next(int status) {
MPI_Send(&status, 1, MPI_INT, MASTER, T_STATUS, MPI_COMM_WORLD);
if(status == S_IDLE) {
if(status == S_TIMEUP) {
return 0;
} else if(status == S_IDLE) {
int new_action = recv_action();
if(new_action == A_EXIT) {
return A_EXIT;
......@@ -389,7 +392,7 @@ int runner_slave::recv_action() {
void runner_slave::checkpointing() {
time_last_checkpoint_ = MPI_Wtime();
sys_->_write(job_.rundir(task_id_, run_id_));
job_.log(fmt::format("* rank {}: checkpointing {}", rank_, job_.rundir(task_id_, run_id_)));
job_.log(fmt::format("* rank {}: checkpoint {}", rank_, job_.rundir(task_id_, run_id_)));
}
void runner_slave::merge_measurements() {
......
......@@ -41,13 +41,10 @@ private:
jobinfo job_;
int num_active_ranks_{0};
int time_start_{0};
std::vector<runner_task> tasks_;
int current_task_id_{-1};
bool time_is_up() const;
void read();
int read_dump_progress(int task_id);
int get_new_task_id(int old_id);
......
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment