runner.cpp 6.29 KB
Newer Older
1
#include "runner.h"
2
#include "iodump.h"
Lukas Weber's avatar
Lukas Weber committed
3
#include "merger.h"
4
#include "runner_pt.h"
5
#include <fmt/format.h>
6 7
namespace loadl {

Lukas Weber's avatar
Lukas Weber committed
8 9 10 11 12 13 14 15
enum {
	MASTER = 0,
	T_STATUS = 1,
	T_ACTION = 2,
	T_NEW_JOB = 3,

	S_IDLE = 1,
	S_BUSY = 2,
16
	S_TIMEUP = 3,
Lukas Weber's avatar
Lukas Weber committed
17 18 19 20 21 22 23

	A_EXIT = 1,
	A_CONTINUE = 2,
	A_NEW_JOB = 3,
	A_PROCESS_DATA_NEW_JOB = 4,
};

24
int runner_mpi_start(jobinfo job, const mc_factory &mccreator, int argc, char **argv) {
25 26 27 28 29
	if(job.jobfile["jobconfig"].defined("parallel_tempering_parameter")) {
		runner_pt_start(std::move(job), mccreator, argc, argv);
		return 0;
	}

30
	MPI_Init(&argc, &argv);
Lukas Weber's avatar
Lukas Weber committed
31

Lukas Weber's avatar
Lukas Weber committed
32 33
	int rank;
	MPI_Comm_rank(MPI_COMM_WORLD, &rank);
Lukas Weber's avatar
Lukas Weber committed
34

Lukas Weber's avatar
Lukas Weber committed
35 36 37 38 39 40
	if(rank == 0) {
		runner_master r{std::move(job)};
		r.start();
	} else {
		runner_slave r{std::move(job), mccreator};
		r.start();
Lukas Weber's avatar
Lukas Weber committed
41 42
	}

Lukas Weber's avatar
Lukas Weber committed
43
	MPI_Finalize();
Lukas Weber's avatar
Lukas Weber committed
44

45
	return 0;
Stefan Weßel's avatar
ic  
Stefan Weßel committed
46 47
}

48
runner_master::runner_master(jobinfo job) : job_{std::move(job)} {}
Stefan Weßel's avatar
ic  
Stefan Weßel committed
49

Lukas Weber's avatar
Lukas Weber committed
50 51 52
void runner_master::start() {
	MPI_Comm_size(MPI_COMM_WORLD, &num_active_ranks_);

53
	job_.log(fmt::format("Starting job '{}'", job_.jobname));
Lukas Weber's avatar
Lukas Weber committed
54 55 56 57 58
	read();

	while(num_active_ranks_ > 1) {
		react();
	}
Stefan Weßel's avatar
ic  
Stefan Weßel committed
59 60
}

Lukas Weber's avatar
Lukas Weber committed
61
int runner_master::get_new_task_id(int old_id) {
Lukas Weber's avatar
Lukas Weber committed
62 63 64
	int ntasks = tasks_.size();
	int i;
	for(i = 1; i <= ntasks; i++) {
65 66
		if(!tasks_[(old_id + i) % ntasks].is_done())
			return (old_id + i) % ntasks;
Stefan Weßel's avatar
ic  
Stefan Weßel committed
67
	}
Lukas Weber's avatar
Lukas Weber committed
68 69 70

	// everything done!
	return -1;
Stefan Weßel's avatar
ic  
Stefan Weßel committed
71 72
}

Lukas Weber's avatar
Lukas Weber committed
73
void runner_master::react() {
Stefan Weßel's avatar
ic  
Stefan Weßel committed
74 75
	int node_status;
	MPI_Status stat;
Lukas Weber's avatar
Lukas Weber committed
76 77
	MPI_Recv(&node_status, 1, MPI_INT, MPI_ANY_SOURCE, T_STATUS, MPI_COMM_WORLD, &stat);
	int node = stat.MPI_SOURCE;
78
	if(node_status == S_IDLE) {
79 80 81
		current_task_id_ = get_new_task_id(current_task_id_);

		if(current_task_id_ < 0) {
Lukas Weber's avatar
Lukas Weber committed
82
			send_action(A_EXIT, node);
83
			num_active_ranks_--;
Lukas Weber's avatar
Lukas Weber committed
84
		} else {
85 86 87
			send_action(A_NEW_JOB, node);
			tasks_[current_task_id_].scheduled_runs++;
			int msg[3] = {current_task_id_, tasks_[current_task_id_].scheduled_runs,
Lukas Weber's avatar
Lukas Weber committed
88
			              tasks_[current_task_id_].target_sweeps -  tasks_[current_task_id_].sweeps};
89
			MPI_Send(&msg, sizeof(msg) / sizeof(msg[0]), MPI_INT, node, T_NEW_JOB, MPI_COMM_WORLD);
Stefan Weßel's avatar
ic  
Stefan Weßel committed
90
		}
91
	} else if(node_status == S_BUSY) {
Lukas Weber's avatar
Lukas Weber committed
92
		int msg[2];
93
		MPI_Recv(msg, sizeof(msg) / sizeof(msg[0]), MPI_INT, node, T_STATUS, MPI_COMM_WORLD, &stat);
Lukas Weber's avatar
Lukas Weber committed
94 95
		int task_id = msg[0];
		int completed_sweeps = msg[1];
Lukas Weber's avatar
Lukas Weber committed
96

Lukas Weber's avatar
Lukas Weber committed
97 98 99
		tasks_[task_id].sweeps += completed_sweeps;
		if(tasks_[task_id].is_done()) {
			tasks_[task_id].scheduled_runs--;
100

101 102
			if(tasks_[task_id].scheduled_runs > 0) {
				job_.log(fmt::format("{} has enough sweeps. Waiting for {} busy ranks.",
103
				                     job_.task_names[task_id], tasks_[task_id].scheduled_runs));
104 105 106
				send_action(A_NEW_JOB, node);
			} else {
				job_.log(fmt::format("{} is done. Merging.", job_.task_names[task_id]));
107

108 109
				send_action(A_PROCESS_DATA_NEW_JOB, node);
			}
Lukas Weber's avatar
Lukas Weber committed
110
		} else {
Lukas Weber's avatar
Lukas Weber committed
111
			send_action(A_CONTINUE, node);
Lukas Weber's avatar
Lukas Weber committed
112
		}
113 114
	} else { // S_TIMEUP
		num_active_ranks_--;
Stefan Weßel's avatar
ic  
Stefan Weßel committed
115 116 117
	}
}

118 119
void runner_master::send_action(int action, int destination) {
	MPI_Send(&action, 1, MPI_INT, destination, T_ACTION, MPI_COMM_WORLD);
Stefan Weßel's avatar
ic  
Stefan Weßel committed
120 121
}

Lukas Weber's avatar
Lukas Weber committed
122 123 124 125
void runner_master::read() {
	for(size_t i = 0; i < job_.task_names.size(); i++) {
		auto task = job_.jobfile["tasks"][job_.task_names[i]];

Lukas Weber's avatar
Lukas Weber committed
126
		int target_sweeps = task.get<int>("sweeps");
127
		int sweeps = job_.read_dump_progress(i);
Lukas Weber's avatar
Lukas Weber committed
128
		int scheduled_runs = 0;
Lukas Weber's avatar
Lukas Weber committed
129

Lukas Weber's avatar
Lukas Weber committed
130
		tasks_.emplace_back(target_sweeps, sweeps, scheduled_runs);
Stefan Weßel's avatar
ic  
Stefan Weßel committed
131 132 133
	}
}

134
runner_slave::runner_slave(jobinfo job, mc_factory mccreator)
135
    : job_{std::move(job)}, mccreator_{std::move(mccreator)} {}
Lukas Weber's avatar
Lukas Weber committed
136 137 138 139 140 141

void runner_slave::start() {
	MPI_Comm_rank(MPI_COMM_WORLD, &rank_);
	time_start_ = MPI_Wtime();
	time_last_checkpoint_ = time_start_;

142 143 144
	int action = what_is_next(S_IDLE);
	while(action != A_EXIT) {
		if(action == A_NEW_JOB) {
145 146
			sys_ =
			    std::unique_ptr<mc>{mccreator_(job_.jobfile["tasks"][job_.task_names[task_id_]])};
147
			if(!sys_->_read(job_.rundir(task_id_, run_id_))) {
Lukas Weber's avatar
Lukas Weber committed
148
				sys_->_init();
Lukas Weber's avatar
Lukas Weber committed
149
				job_.log(fmt::format("* initialized {}", job_.rundir(task_id_, run_id_)));
150
				checkpoint_write();
Lukas Weber's avatar
Lukas Weber committed
151 152
			} else {
				job_.log(fmt::format("* read {}", job_.rundir(task_id_, run_id_)));
Lukas Weber's avatar
Lukas Weber committed
153
			}
154 155
		} else {
			if(!sys_) {
156 157
				throw std::runtime_error(
				    "slave got A_CONTINUE even though there is no job to be continued");
158
			}
Lukas Weber's avatar
Lukas Weber committed
159 160
		}

161
		while(sweeps_since_last_query_ < sweeps_before_communication_) {
Lukas Weber's avatar
Lukas Weber committed
162 163 164
			sys_->_do_update();

			if(sys_->is_thermalized()) {
165
				sys_->_do_measurement();
Lukas Weber's avatar
Lukas Weber committed
166
				sweeps_since_last_query_++;
Lukas Weber's avatar
Lukas Weber committed
167 168 169 170 171 172
			}

			if(is_checkpoint_time() || time_is_up()) {
				break;
			}
		}
173
		checkpoint_write();
174 175 176

		if(time_is_up()) {
			what_is_next(S_TIMEUP);
177
			job_.log(fmt::format("rank {} exits: time up", rank_));
178 179
			break;
		}
180

181
		action = what_is_next(S_BUSY);
Lukas Weber's avatar
Lukas Weber committed
182
	}
183

184
	if(action == A_EXIT) {
185 186
		job_.log(fmt::format("rank {} exits: out of work", rank_));
	}
Lukas Weber's avatar
Lukas Weber committed
187 188 189
}

bool runner_slave::is_checkpoint_time() {
190
	return MPI_Wtime() - time_last_checkpoint_ > job_.checkpoint_time;
Lukas Weber's avatar
Lukas Weber committed
191 192 193
}

bool runner_slave::time_is_up() {
Lukas Weber's avatar
Lukas Weber committed
194
	return MPI_Wtime() - time_start_ > job_.runtime;
Lukas Weber's avatar
Lukas Weber committed
195 196 197
}

int runner_slave::what_is_next(int status) {
Stefan Weßel's avatar
ic  
Stefan Weßel committed
198
	MPI_Send(&status, 1, MPI_INT, MASTER, T_STATUS, MPI_COMM_WORLD);
199 200 201
	if(status == S_TIMEUP) {
		return 0;
	} else if(status == S_IDLE) {
Stefan Weßel's avatar
ic  
Stefan Weßel committed
202
		int new_action = recv_action();
203
		if(new_action == A_EXIT) {
Lukas Weber's avatar
Lukas Weber committed
204
			return A_EXIT;
205
		}
Stefan Weßel's avatar
ic  
Stefan Weßel committed
206
		MPI_Status stat;
207
		int msg[3];
208
		MPI_Recv(&msg, sizeof(msg) / sizeof(msg[0]), MPI_INT, 0, T_NEW_JOB, MPI_COMM_WORLD, &stat);
Lukas Weber's avatar
Lukas Weber committed
209 210
		task_id_ = msg[0];
		run_id_ = msg[1];
211
		sweeps_before_communication_ = msg[2];
Lukas Weber's avatar
Lukas Weber committed
212

Lukas Weber's avatar
Lukas Weber committed
213
		return A_NEW_JOB;
Stefan Weßel's avatar
ic  
Stefan Weßel committed
214
	}
215

216 217
	int msg[2] = {task_id_, sweeps_since_last_query_};
	MPI_Send(msg, sizeof(msg) / sizeof(msg[0]), MPI_INT, 0, T_STATUS, MPI_COMM_WORLD);
218 219 220 221 222 223 224 225 226 227 228 229
	sweeps_since_last_query_ = 0;
	int new_action = recv_action();
	if(new_action == A_PROCESS_DATA_NEW_JOB) {
		merge_measurements();
		return what_is_next(S_IDLE);
	}
	if(new_action == A_NEW_JOB) {
		return what_is_next(S_IDLE);
	}
	if(new_action == A_EXIT) {
		return A_EXIT;
	}
230

Lukas Weber's avatar
Lukas Weber committed
231
	return A_CONTINUE;
Stefan Weßel's avatar
ic  
Stefan Weßel committed
232 233
}

Lukas Weber's avatar
Lukas Weber committed
234
int runner_slave::recv_action() {
Stefan Weßel's avatar
ic  
Stefan Weßel committed
235 236 237 238 239 240
	MPI_Status stat;
	int new_action;
	MPI_Recv(&new_action, 1, MPI_INT, MASTER, T_ACTION, MPI_COMM_WORLD, &stat);
	return new_action;
}

241
void runner_slave::checkpoint_write() {
Lukas Weber's avatar
Lukas Weber committed
242 243
	time_last_checkpoint_ = MPI_Wtime();
	sys_->_write(job_.rundir(task_id_, run_id_));
244
	job_.log(fmt::format("* rank {}: checkpoint {}", rank_, job_.rundir(task_id_, run_id_)));
Stefan Weßel's avatar
ic  
Stefan Weßel committed
245 246
}

Lukas Weber's avatar
Lukas Weber committed
247
void runner_slave::merge_measurements() {
248 249
	std::string unique_filename = job_.taskdir(task_id_);
	sys_->_write_output(unique_filename);
250

251 252 253
	std::vector<evalable> evalables;
	sys_->register_evalables(evalables);
	job_.merge_task(task_id_, evalables);
Stefan Weßel's avatar
ic  
Stefan Weßel committed
254
}
255
}