Commit e930ae33 authored by Lukas Weber's avatar Lukas Weber
Browse files

make log messages more userfriendly

parent c219b520
......@@ -5,6 +5,7 @@
#include <fstream>
#include <regex>
#include <sys/stat.h>
#include <iomanip>
namespace loadl {
......@@ -146,6 +147,11 @@ void jobinfo::merge_task(int task_id, const std::vector<evalable> &evalables) {
results.write_yaml(result_filename, taskdir(task_id), jobfile["tasks"][task_name].get_yaml());
}
void jobinfo::log(const std::string &message) {
std::time_t t = std::time(nullptr);
std::cout << std::put_time(std::localtime(&t), "%F %T: ") << message << "\n";
}
int runner_mpi_start(jobinfo job, const mc_factory &mccreator, int argc, char **argv) {
MPI_Init(&argc, &argv);
......@@ -171,13 +177,12 @@ void runner_master::start() {
time_start_ = MPI_Wtime();
MPI_Comm_size(MPI_COMM_WORLD, &num_active_ranks_);
job_.status << fmt::format("0 : Starting job '{}'\n", job_.jobfile_name);
job_.log(fmt::format("Starting job '{}'", job_.jobfile_name));
read();
while(num_active_ranks_ > 1) {
react();
}
end_of_run();
}
int runner_master::get_new_task_id(int old_id) {
......@@ -201,7 +206,6 @@ void runner_master::react() {
MPI_Status stat;
MPI_Recv(&node_status, 1, MPI_INT, MPI_ANY_SOURCE, T_STATUS, MPI_COMM_WORLD, &stat);
int node = stat.MPI_SOURCE;
// job_.status << "0 : Status " << node_status << " from " << node << "\n";
if(node_status == S_IDLE) {
if(time_is_up()) {
send_action(A_EXIT, node);
......@@ -228,11 +232,17 @@ void runner_master::react() {
tasks_[task_id].sweeps += completed_sweeps;
if(tasks_[task_id].is_done()) {
job_.status << fmt::format("0 : {} done, scheduled_runs: {}\n",
job_.task_names[task_id], tasks_[task_id].scheduled_runs);
tasks_[task_id].scheduled_runs--;
send_action((tasks_[task_id].scheduled_runs == 0) ? A_PROCESS_DATA_NEW_JOB : A_NEW_JOB,
node);
if(tasks_[task_id].scheduled_runs > 0) {
job_.log(fmt::format("{} has enough sweeps. Waiting for {} busy ranks.",
job_.task_names[task_id], tasks_[task_id].scheduled_runs));
send_action(A_NEW_JOB, node);
} else {
job_.log(fmt::format("{} is done. Merging.", job_.task_names[task_id]));
send_action(A_PROCESS_DATA_NEW_JOB, node);
}
} else if(time_is_up()) {
send_action(A_EXIT, node);
num_active_ranks_--;
......@@ -241,11 +251,9 @@ void runner_master::react() {
}
}
// report(); // disable for production!!
}
void runner_master::send_action(int action, int destination) {
// job_.status << "0 : Action "<<action<<" destination " << to << "\n";
MPI_Send(&action, 1, MPI_INT, destination, T_ACTION, MPI_COMM_WORLD);
}
......@@ -278,20 +286,6 @@ void runner_master::read() {
tasks_.emplace_back(target_sweeps, target_thermalization, sweeps, scheduled_runs);
}
report();
}
void runner_master::end_of_run() {
report();
}
void runner_master::report() {
job_.status << "0 : Task progress: \n";
for(size_t i = 0; i < tasks_.size(); i++) {
job_.status << fmt::format("-- {}: {:4d} active runs: {:7d} sweeps of {:7d} target\n",
job_.task_names[i], tasks_[i].scheduled_runs, tasks_[i].sweeps,
tasks_[i].target_sweeps + tasks_[i].target_thermalization);
}
}
runner_slave::runner_slave(jobinfo job, mc_factory mccreator)
......@@ -306,10 +300,7 @@ void runner_slave::start() {
while(action != A_EXIT) {
if(action == A_NEW_JOB) {
sys_ = std::unique_ptr<mc>{mccreator_(job_.jobfile_name, job_.task_names[task_id_])};
if(sys_->_read(job_.rundir(task_id_, run_id_))) {
job_.status << rank_ << " : L " << job_.rundir(task_id_, run_id_) << "\n";
} else {
job_.status << rank_ << " : I " << job_.rundir(task_id_, run_id_) << "\n";
if(!sys_->_read(job_.rundir(task_id_, run_id_))) {
sys_->_init();
// checkpointing();
}
......@@ -335,7 +326,7 @@ void runner_slave::start() {
checkpointing();
action = what_is_next(S_BUSY);
}
job_.status << rank_ << " : Done\n";
job_.log(fmt::format("rank {} out of work", rank_));
}
bool runner_slave::is_checkpoint_time() {
......@@ -391,7 +382,7 @@ int runner_slave::recv_action() {
void runner_slave::checkpointing() {
time_last_checkpoint_ = MPI_Wtime();
sys_->_write(job_.rundir(task_id_, run_id_));
job_.status << rank_ << " : C " << job_.rundir(task_id_, run_id_) << "\n";
job_.log(fmt::format("* rank {}: checkpointing {}", rank_, job_.rundir(task_id_, run_id_)));
}
void runner_slave::merge_measurements() {
......@@ -401,7 +392,6 @@ void runner_slave::merge_measurements() {
std::vector<evalable> evalables;
sys_->register_evalables(evalables);
job_.merge_task(task_id_, evalables);
job_.status << rank_ << " : M " << job_.taskdir(task_id_) << "\n";
}
}
......@@ -22,8 +22,6 @@ struct jobinfo {
double checkpoint_time;
double walltime;
std::ostream &status{std::cout};
jobinfo(const std::string &jobfile_name);
std::string rundir(int task_id, int run_id) const;
......@@ -33,6 +31,7 @@ struct jobinfo {
const std::string &file_ending);
void merge_task(int task_id, const std::vector<evalable> &evalables);
void concatenate_results();
void log(const std::string &message);
};
int runner_mpi_start(jobinfo job, const mc_factory &mccreator, int argc, char **argv);
......@@ -55,10 +54,6 @@ private:
void react();
void send_action(int action, int destination);
void end_of_run();
void report();
public:
runner_master(jobinfo job);
void start();
......
......@@ -25,19 +25,16 @@ int runner_single::start() {
read();
task_id_ = get_new_task_id(task_id_);
while(task_id_ != -1 && !time_is_up()) {
sys = std::unique_ptr<mc>{mccreator_(job_.jobfile_name, job_.task_names.at(task_id_))};
if(sys->_read(job_.rundir(task_id_, 1))) {
job_.status << 0 << " : L " << job_.rundir(task_id_, 1) << "\n";
} else {
job_.status << 0 << " : I " << job_.rundir(task_id_, 1) << "\n";
sys->_init();
sys_ = std::unique_ptr<mc>{mccreator_(job_.jobfile_name, job_.task_names.at(task_id_))};
if(!sys_->_read(job_.rundir(task_id_, 1))) {
sys_->_init();
}
while(!tasks_[task_id_].is_done() && !time_is_up()) {
sys->_do_update();
sys_->_do_update();
tasks_[task_id_].sweeps++;
if(sys->is_thermalized()) {
sys->_do_measurement();
if(sys_->is_thermalized()) {
sys_->_do_measurement();
}
if(is_checkpoint_time()) {
......@@ -49,7 +46,6 @@ int runner_single::start() {
merge_measurements();
task_id_ = get_new_task_id(task_id_);
}
end_of_run();
return 0;
}
......@@ -91,40 +87,23 @@ void runner_single::read() {
tasks_.emplace_back(target_sweeps, target_thermalization, sweeps, 0);
}
report();
}
void runner_single::end_of_run() {
report();
job_.status << 0 << " : finalized"
<< "\n";
}
void runner_single::report() {
for(size_t i = 0; i < tasks_.size(); i++) {
job_.status << fmt::format(
"{:4d} {:3.0f}%\n", i,
tasks_[i].sweeps /
static_cast<double>(tasks_[i].target_sweeps + tasks_[i].target_thermalization) *
100);
}
}
void runner_single::checkpointing() {
time_last_checkpoint_ = time(nullptr);
sys->_write(job_.rundir(task_id_, 1));
job_.status << "0 : C " << job_.rundir(task_id_, 1) << "\n";
sys_->_write(job_.rundir(task_id_, 1));
job_.log(fmt::format("* checkpointing {}", job_.rundir(task_id_, 1)));
}
void runner_single::merge_measurements() {
std::string unique_filename = job_.taskdir(task_id_);
sys->_write_output(unique_filename);
sys_->_write_output(unique_filename);
std::vector<evalable> evalables;
sys->register_evalables(evalables);
sys_->register_evalables(evalables);
job_.log(fmt::format("merging {}", job_.taskdir(task_id_)));
job_.merge_task(task_id_, evalables);
job_.status << "0 : M " << job_.taskdir(task_id_) << "\n";
}
}
......@@ -15,7 +15,7 @@ private:
jobinfo job_;
mc_factory mccreator_;
std::unique_ptr<mc> sys;
std::unique_ptr<mc> sys_;
int task_id_{-1};
std::vector<runner_task> tasks_;
......@@ -24,9 +24,7 @@ private:
double time_last_checkpoint_{0};
void read();
void end_of_run();
int get_new_task_id(int old_id);
void report();
bool time_is_up() const;
bool is_checkpoint_time() const;
......
Markdown is supported
0% or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment