runner.cpp 6.46 KB
Newer Older
1
#include "runner.h"
2
#include "iodump.h"
Lukas Weber's avatar
Lukas Weber committed
3
#include "merger.h"
4
#include "runner_pt.h"
5
#include <fmt/format.h>
6 7
namespace loadl {

Lukas Weber's avatar
Lukas Weber committed
8 9 10 11 12 13 14 15
enum {
	MASTER = 0,
	T_STATUS = 1,
	T_ACTION = 2,
	T_NEW_JOB = 3,

	S_IDLE = 1,
	S_BUSY = 2,
16
	S_TIMEUP = 3,
Lukas Weber's avatar
Lukas Weber committed
17 18 19 20 21 22 23

	A_EXIT = 1,
	A_CONTINUE = 2,
	A_NEW_JOB = 3,
	A_PROCESS_DATA_NEW_JOB = 4,
};

24
int runner_mpi_start(jobinfo job, const mc_factory &mccreator, int argc, char **argv) {
25 26 27 28 29
	if(job.jobfile["jobconfig"].defined("parallel_tempering_parameter")) {
		runner_pt_start(std::move(job), mccreator, argc, argv);
		return 0;
	}

30
	MPI_Init(&argc, &argv);
Lukas Weber's avatar
Lukas Weber committed
31

Lukas Weber's avatar
Lukas Weber committed
32 33
	int rank;
	MPI_Comm_rank(MPI_COMM_WORLD, &rank);
Lukas Weber's avatar
Lukas Weber committed
34

Lukas Weber's avatar
Lukas Weber committed
35 36 37 38 39 40
	if(rank == 0) {
		runner_master r{std::move(job)};
		r.start();
	} else {
		runner_slave r{std::move(job), mccreator};
		r.start();
Lukas Weber's avatar
Lukas Weber committed
41 42
	}

Lukas Weber's avatar
Lukas Weber committed
43
	MPI_Finalize();
Lukas Weber's avatar
Lukas Weber committed
44

45
	return 0;
Stefan Weßel's avatar
ic  
Stefan Weßel committed
46 47
}

48
runner_master::runner_master(jobinfo job) : job_{std::move(job)} {}
Stefan Weßel's avatar
ic  
Stefan Weßel committed
49

Lukas Weber's avatar
Lukas Weber committed
50 51 52
void runner_master::start() {
	MPI_Comm_size(MPI_COMM_WORLD, &num_active_ranks_);

53
	job_.log(fmt::format("Starting job '{}'", job_.jobname));
Lukas Weber's avatar
Lukas Weber committed
54 55 56 57 58
	read();

	while(num_active_ranks_ > 1) {
		react();
	}
Stefan Weßel's avatar
ic  
Stefan Weßel committed
59 60
}

Lukas Weber's avatar
Lukas Weber committed
61
int runner_master::get_new_task_id(int old_id) {
Lukas Weber's avatar
Lukas Weber committed
62 63 64
	int ntasks = tasks_.size();
	int i;
	for(i = 1; i <= ntasks; i++) {
65 66
		if(!tasks_[(old_id + i) % ntasks].is_done())
			return (old_id + i) % ntasks;
Stefan Weßel's avatar
ic  
Stefan Weßel committed
67
	}
Lukas Weber's avatar
Lukas Weber committed
68 69 70

	// everything done!
	return -1;
Stefan Weßel's avatar
ic  
Stefan Weßel committed
71 72
}

Lukas Weber's avatar
Lukas Weber committed
73
void runner_master::react() {
Stefan Weßel's avatar
ic  
Stefan Weßel committed
74 75
	int node_status;
	MPI_Status stat;
Lukas Weber's avatar
Lukas Weber committed
76 77
	MPI_Recv(&node_status, 1, MPI_INT, MPI_ANY_SOURCE, T_STATUS, MPI_COMM_WORLD, &stat);
	int node = stat.MPI_SOURCE;
78
	if(node_status == S_IDLE) {
79 80 81
		current_task_id_ = get_new_task_id(current_task_id_);

		if(current_task_id_ < 0) {
Lukas Weber's avatar
Lukas Weber committed
82
			send_action(A_EXIT, node);
83
			num_active_ranks_--;
Lukas Weber's avatar
Lukas Weber committed
84
		} else {
85 86 87
			send_action(A_NEW_JOB, node);
			tasks_[current_task_id_].scheduled_runs++;
			int msg[3] = {current_task_id_, tasks_[current_task_id_].scheduled_runs,
88 89 90 91
			              tasks_[current_task_id_].target_sweeps +
			                  tasks_[current_task_id_].target_thermalization -
			                  tasks_[current_task_id_].sweeps};
			MPI_Send(&msg, sizeof(msg) / sizeof(msg[0]), MPI_INT, node, T_NEW_JOB, MPI_COMM_WORLD);
Stefan Weßel's avatar
ic  
Stefan Weßel committed
92
		}
93
	} else if(node_status == S_BUSY) {
Lukas Weber's avatar
Lukas Weber committed
94
		int msg[2];
95
		MPI_Recv(msg, sizeof(msg) / sizeof(msg[0]), MPI_INT, node, T_STATUS, MPI_COMM_WORLD, &stat);
Lukas Weber's avatar
Lukas Weber committed
96 97
		int task_id = msg[0];
		int completed_sweeps = msg[1];
Lukas Weber's avatar
Lukas Weber committed
98

Lukas Weber's avatar
Lukas Weber committed
99 100 101
		tasks_[task_id].sweeps += completed_sweeps;
		if(tasks_[task_id].is_done()) {
			tasks_[task_id].scheduled_runs--;
102

103 104
			if(tasks_[task_id].scheduled_runs > 0) {
				job_.log(fmt::format("{} has enough sweeps. Waiting for {} busy ranks.",
105
				                     job_.task_names[task_id], tasks_[task_id].scheduled_runs));
106 107 108
				send_action(A_NEW_JOB, node);
			} else {
				job_.log(fmt::format("{} is done. Merging.", job_.task_names[task_id]));
109

110 111
				send_action(A_PROCESS_DATA_NEW_JOB, node);
			}
Lukas Weber's avatar
Lukas Weber committed
112
		} else {
Lukas Weber's avatar
Lukas Weber committed
113
			send_action(A_CONTINUE, node);
Lukas Weber's avatar
Lukas Weber committed
114
		}
115 116
	} else { // S_TIMEUP
		num_active_ranks_--;
Stefan Weßel's avatar
ic  
Stefan Weßel committed
117 118 119
	}
}

120 121
void runner_master::send_action(int action, int destination) {
	MPI_Send(&action, 1, MPI_INT, destination, T_ACTION, MPI_COMM_WORLD);
Stefan Weßel's avatar
ic  
Stefan Weßel committed
122 123
}

Lukas Weber's avatar
Lukas Weber committed
124 125 126 127
void runner_master::read() {
	for(size_t i = 0; i < job_.task_names.size(); i++) {
		auto task = job_.jobfile["tasks"][job_.task_names[i]];

Lukas Weber's avatar
Lukas Weber committed
128 129
		int target_sweeps = task.get<int>("sweeps");
		int target_thermalization = task.get<int>("thermalization");
130
		int sweeps = job_.read_dump_progress(i);
Lukas Weber's avatar
Lukas Weber committed
131
		int scheduled_runs = 0;
Lukas Weber's avatar
Lukas Weber committed
132

Lukas Weber's avatar
Lukas Weber committed
133
		tasks_.emplace_back(target_sweeps, target_thermalization, sweeps, scheduled_runs);
Stefan Weßel's avatar
ic  
Stefan Weßel committed
134 135 136
	}
}

137
runner_slave::runner_slave(jobinfo job, mc_factory mccreator)
138
    : job_{std::move(job)}, mccreator_{std::move(mccreator)} {}
Lukas Weber's avatar
Lukas Weber committed
139 140 141 142 143 144

void runner_slave::start() {
	MPI_Comm_rank(MPI_COMM_WORLD, &rank_);
	time_start_ = MPI_Wtime();
	time_last_checkpoint_ = time_start_;

145 146 147
	int action = what_is_next(S_IDLE);
	while(action != A_EXIT) {
		if(action == A_NEW_JOB) {
148 149
			sys_ =
			    std::unique_ptr<mc>{mccreator_(job_.jobfile["tasks"][job_.task_names[task_id_]])};
150
			if(!sys_->_read(job_.rundir(task_id_, run_id_))) {
Lukas Weber's avatar
Lukas Weber committed
151
				sys_->_init();
Lukas Weber's avatar
Lukas Weber committed
152
				job_.log(fmt::format("* initialized {}", job_.rundir(task_id_, run_id_)));
153
				checkpoint_write();
Lukas Weber's avatar
Lukas Weber committed
154 155
			} else {
				job_.log(fmt::format("* read {}", job_.rundir(task_id_, run_id_)));
Lukas Weber's avatar
Lukas Weber committed
156
			}
157 158
		} else {
			if(!sys_) {
159 160
				throw std::runtime_error(
				    "slave got A_CONTINUE even though there is no job to be continued");
161
			}
Lukas Weber's avatar
Lukas Weber committed
162 163
		}

164
		while(sweeps_since_last_query_ < sweeps_before_communication_) {
Lukas Weber's avatar
Lukas Weber committed
165 166 167 168
			sys_->_do_update();
			sweeps_since_last_query_++;

			if(sys_->is_thermalized()) {
169
				sys_->_do_measurement();
Lukas Weber's avatar
Lukas Weber committed
170 171 172 173 174 175
			}

			if(is_checkpoint_time() || time_is_up()) {
				break;
			}
		}
176
		checkpoint_write();
177 178 179

		if(time_is_up()) {
			what_is_next(S_TIMEUP);
180
			job_.log(fmt::format("rank {} exits: time up", rank_));
181 182
			break;
		}
183

184
		action = what_is_next(S_BUSY);
Lukas Weber's avatar
Lukas Weber committed
185
	}
186

187
	if(action == A_EXIT) {
188 189
		job_.log(fmt::format("rank {} exits: out of work", rank_));
	}
Lukas Weber's avatar
Lukas Weber committed
190 191 192
}

bool runner_slave::is_checkpoint_time() {
193
	return MPI_Wtime() - time_last_checkpoint_ > job_.checkpoint_time;
Lukas Weber's avatar
Lukas Weber committed
194 195 196
}

bool runner_slave::time_is_up() {
Lukas Weber's avatar
Lukas Weber committed
197
	return MPI_Wtime() - time_start_ > job_.runtime;
Lukas Weber's avatar
Lukas Weber committed
198 199 200
}

int runner_slave::what_is_next(int status) {
Stefan Weßel's avatar
ic  
Stefan Weßel committed
201
	MPI_Send(&status, 1, MPI_INT, MASTER, T_STATUS, MPI_COMM_WORLD);
202 203 204
	if(status == S_TIMEUP) {
		return 0;
	} else if(status == S_IDLE) {
Stefan Weßel's avatar
ic  
Stefan Weßel committed
205
		int new_action = recv_action();
206
		if(new_action == A_EXIT) {
Lukas Weber's avatar
Lukas Weber committed
207
			return A_EXIT;
208
		}
Stefan Weßel's avatar
ic  
Stefan Weßel committed
209
		MPI_Status stat;
210
		int msg[3];
211
		MPI_Recv(&msg, sizeof(msg) / sizeof(msg[0]), MPI_INT, 0, T_NEW_JOB, MPI_COMM_WORLD, &stat);
Lukas Weber's avatar
Lukas Weber committed
212 213
		task_id_ = msg[0];
		run_id_ = msg[1];
214
		sweeps_before_communication_ = msg[2];
Lukas Weber's avatar
Lukas Weber committed
215

Lukas Weber's avatar
Lukas Weber committed
216
		return A_NEW_JOB;
Stefan Weßel's avatar
ic  
Stefan Weßel committed
217
	}
218

219 220
	int msg[2] = {task_id_, sweeps_since_last_query_};
	MPI_Send(msg, sizeof(msg) / sizeof(msg[0]), MPI_INT, 0, T_STATUS, MPI_COMM_WORLD);
221 222 223 224 225 226 227 228 229 230 231 232
	sweeps_since_last_query_ = 0;
	int new_action = recv_action();
	if(new_action == A_PROCESS_DATA_NEW_JOB) {
		merge_measurements();
		return what_is_next(S_IDLE);
	}
	if(new_action == A_NEW_JOB) {
		return what_is_next(S_IDLE);
	}
	if(new_action == A_EXIT) {
		return A_EXIT;
	}
233

Lukas Weber's avatar
Lukas Weber committed
234
	return A_CONTINUE;
Stefan Weßel's avatar
ic  
Stefan Weßel committed
235 236
}

Lukas Weber's avatar
Lukas Weber committed
237
int runner_slave::recv_action() {
Stefan Weßel's avatar
ic  
Stefan Weßel committed
238 239 240 241 242 243
	MPI_Status stat;
	int new_action;
	MPI_Recv(&new_action, 1, MPI_INT, MASTER, T_ACTION, MPI_COMM_WORLD, &stat);
	return new_action;
}

244
void runner_slave::checkpoint_write() {
Lukas Weber's avatar
Lukas Weber committed
245 246
	time_last_checkpoint_ = MPI_Wtime();
	sys_->_write(job_.rundir(task_id_, run_id_));
247
	job_.log(fmt::format("* rank {}: checkpoint {}", rank_, job_.rundir(task_id_, run_id_)));
Stefan Weßel's avatar
ic  
Stefan Weßel committed
248 249
}

Lukas Weber's avatar
Lukas Weber committed
250
void runner_slave::merge_measurements() {
251 252
	std::string unique_filename = job_.taskdir(task_id_);
	sys_->_write_output(unique_filename);
253

254 255 256
	std::vector<evalable> evalables;
	sys_->register_evalables(evalables);
	job_.merge_task(task_id_, evalables);
Stefan Weßel's avatar
ic  
Stefan Weßel committed
257
}
258
}