Aufgrund einer Störung des s3 Storage, könnten in nächster Zeit folgende GitLab Funktionen nicht zur Verfügung stehen: LFS, Container Registry, Job Artifacs, Uploads (Wiki, Bilder, Projekt-Exporte). Wir bitten um Verständnis. Es wird mit Hochdruck an der Behebung des Problems gearbeitet. Weitere Informationen zur Störung des Object Storage finden Sie hier: https://maintenance.itc.rwth-aachen.de/ticket/status/messages/59-object-storage-pilot

runner_pt.cpp 22.3 KB
Newer Older
1
#include "runner_pt.h"
2
#include <fstream>
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21

namespace loadl {

enum {
	S_IDLE,
	S_BUSY,
	S_READY_FOR_GLOBAL,
	S_TIMEUP,

	A_CONTINUE,
	A_EXIT,
	A_NEW_JOB,
	A_PROCESS_DATA_NEW_JOB,

	// global update status response
	GA_DONE,
	GA_SKIP,
	GA_CALC_WEIGHT,

22
	MASTER = 0
23
24
};

Lukas Weber's avatar
Lukas Weber committed
25
pt_chain_run::pt_chain_run(const pt_chain &chain, int run_id) : id{chain.id}, run_id{run_id} {
26
27
	rank_to_pos.resize(chain.params.size());
	weight_ratios.resize(chain.params.size(), -1);
Lukas Weber's avatar
Lukas Weber committed
28

29
30
31
32
	last_visited.resize(chain.params.size());

	for(size_t i = 0; i < rank_to_pos.size(); i++) {
		rank_to_pos[i] = i;
33

34
		last_visited[i] = 0;
35
36
37
	}
}

38
pt_chain_run pt_chain_run::checkpoint_read(const iodump::group &g) {
39
40
41
	pt_chain_run run;
	g.read("id", run.id);
	g.read("run_id", run.run_id);
42
	g.read("rank_to_pos", run.rank_to_pos);
43

44
	run.weight_ratios.resize(run.rank_to_pos.size(), -1);
45
46
47
48

	return run;
}

49
void pt_chain_run::checkpoint_write(const iodump::group &g) {
50
51
	g.write("id", id);
	g.write("run_id", run_id);
52
53
54
55
56
57
58
59
60
61
62
63
	g.write("rank_to_pos", rank_to_pos);
}

void pt_chain::checkpoint_read(const iodump::group &g) {
	g.read("params", params);
	g.read("nup_histogram", nup_histogram);
	g.read("ndown_histogram", ndown_histogram);
	g.read("entries_before_optimization", entries_before_optimization);
	g.read("histogram_entries", histogram_entries);
}

void pt_chain::checkpoint_write(const iodump::group &g) {
64
	g.write("params", params);
65
66
67
68
69
70
71
72
73
	g.write("nup_histogram", nup_histogram);
	g.write("ndown_histogram", ndown_histogram);
	g.write("entries_before_optimization", entries_before_optimization);
	g.write("histogram_entries", histogram_entries);
}

void pt_chain::clear_histograms() {
	std::fill(nup_histogram.begin(), nup_histogram.end(), 0);
	std::fill(ndown_histogram.begin(), ndown_histogram.end(), 0);
Lukas Weber's avatar
Lukas Weber committed
74
	histogram_entries = 0;
75
76
}

Lukas Weber's avatar
Lukas Weber committed
77
78
79
80
static double linear_regression(const std::vector<double> &x, const std::vector<double> &y, int i,
                                int range) {
	int lower = std::max(0, i - range + 1);
	int upper = std::min(static_cast<int>(y.size() - 1), i + range);
81
82
83
84

	double sxy = 0;
	double sx = 0, sy = 0;
	double sx2 = 0;
Lukas Weber's avatar
Lukas Weber committed
85

86
	for(int j = lower; j <= upper; j++) {
Lukas Weber's avatar
Lukas Weber committed
87
		sxy += x[j] * y[j];
88
89
		sx += x[j];
		sy += y[j];
Lukas Weber's avatar
Lukas Weber committed
90
		sx2 += x[j] * x[j];
91
92
	}

Lukas Weber's avatar
Lukas Weber committed
93
94
	int n = upper - lower + 1;
	return (sxy - sx * sy / n) / (sx2 - sx * sx / n);
95
96
}

Lukas Weber's avatar
Lukas Weber committed
97
std::tuple<double, double> pt_chain::optimize_params(int linreg_len) {
98
99
100
101
102
	std::vector<double> eta(params.size());
	std::vector<double> f(params.size());
	std::vector<double> new_params(params.size());

	assert(params.size() > 1);
Lukas Weber's avatar
Lukas Weber committed
103

104
	new_params[0] = params[0];
Lukas Weber's avatar
Lukas Weber committed
105
	new_params[params.size() - 1] = params[params.size() - 1];
106

Lukas Weber's avatar
ehhhh  
Lukas Weber committed
107
108
109
110
	double fnonlinearity = 0;
	// in the worst case, f=0 or f=1 for [1,N-2]
	int n = params.size();
	double fnonlinearity_worst = sqrt((2*n+1./n-3)/6.)/n;
111
112
113
114
	for(size_t i = 0; i < params.size(); i++) {
		if(nup_histogram[i] + ndown_histogram[i] == 0) {
			f[i] = 0;
		} else {
Lukas Weber's avatar
Lukas Weber committed
115
			f[i] = nup_histogram[i] / static_cast<double>(nup_histogram[i] + ndown_histogram[i]);
116
		}
Lukas Weber's avatar
Lukas Weber committed
117
118
		
		double ideal_f = 1-i/static_cast<double>(params.size());
Lukas Weber's avatar
ehhhh  
Lukas Weber committed
119
		fnonlinearity += (f[i]-ideal_f)*(f[i]-ideal_f);
120
	}
Lukas Weber's avatar
ehhhh  
Lukas Weber committed
121
	fnonlinearity = sqrt(fnonlinearity)/params.size()/fnonlinearity_worst;
122
123

	double norm = 0;
Lukas Weber's avatar
Lukas Weber committed
124
125
126
127
128
	for(size_t i = 0; i < params.size() - 1; i++) {
		double dfdp = linear_regression(params, f, i, linreg_len);
		double dp = params[i + 1] - params[i];
		eta[i] = sqrt(std::max(0.01, -dfdp / dp));
		norm += eta[i] * dp;
129
130
131
132
	}
	for(auto &v : eta) {
		v /= norm;
	}
Lukas Weber's avatar
Lukas Weber committed
133
134
	
	double convergence = 0;	
Lukas Weber's avatar
Lukas Weber committed
135
136
	for(size_t i = 1; i < params.size() - 1; i++) {
		double target = static_cast<double>(i) / (params.size() - 1);
137
		int etai = 0;
Lukas Weber's avatar
Lukas Weber committed
138
139
		for(etai = 0; etai < static_cast<int>(params.size()) - 1; etai++) {
			double deta = eta[etai] * (params[etai + 1] - params[etai]);
140
141
142
143
144
			if(deta > target) {
				break;
			}
			target -= deta;
		}
Lukas Weber's avatar
Lukas Weber committed
145
		new_params[i] = params[etai] + target / eta[etai];
Lukas Weber's avatar
Lukas Weber committed
146
		convergence += (new_params[i]-params[i])*(new_params[i]-params[i]);
147
	}
Lukas Weber's avatar
Lukas Weber committed
148
149
150
151
152
153
154
155

	convergence = sqrt(convergence)/(params.size()-2);

	for(size_t i = 0; i < params.size(); i++) {
		double relaxation_fac = 1;
		params[i] = params[i]*(1-relaxation_fac) + relaxation_fac*new_params[i];
	}

Lukas Weber's avatar
ehhhh  
Lukas Weber committed
156
	return std::tie(fnonlinearity, convergence);
157
158
159
}

bool pt_chain::is_done() {
160
	return sweeps >= target_sweeps + target_thermalization;
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
}

int runner_pt_start(jobinfo job, const mc_factory &mccreator, int argc, char **argv) {
	MPI_Init(&argc, &argv);

	int rank;
	MPI_Comm_rank(MPI_COMM_WORLD, &rank);

	if(rank == 0) {
		runner_pt_master r{std::move(job)};
		r.start();
	} else {
		runner_pt_slave r{std::move(job), mccreator};
		r.start();
	}

	MPI_Finalize();

	return 0;
}

runner_pt_master::runner_pt_master(jobinfo job) : job_{std::move(job)} {
	rng_ = std::make_unique<random_number_generator>();
}

void runner_pt_master::construct_pt_chains() {
187
188
189
	std::string pt_param =
	    job_.jobfile["jobconfig"].get<std::string>("parallel_tempering_parameter");

190
191
192
	for(size_t i = 0; i < job_.task_names.size(); i++) {
		auto task = job_.jobfile["tasks"][job_.task_names[i]];

Lukas Weber's avatar
Lukas Weber committed
193
		auto [chain_id, chain_pos] = task.get<std::pair<int, int>>("pt_chain");
194
195
196
197
198
		if(chain_id < 0 || chain_pos < 0) {
			throw std::runtime_error{"parallel tempering pt_chain indices must be nonnegative"};
		}

		if(chain_id >= static_cast<int>(pt_chains_.size())) {
199
			pt_chains_.resize(chain_id + 1);
200
201
202
203
204
205
		}

		auto &chain = pt_chains_.at(chain_id);
		chain.id = chain_id;

		if(chain_pos >= static_cast<int>(chain.task_ids.size())) {
206
			chain.task_ids.resize(chain_pos + 1, -1);
207
			chain.params.resize(chain_pos + 1);
208
		}
209

210
211
212
213
		if(chain.task_ids.at(chain_pos) != -1) {
			throw std::runtime_error{"parallel tempering pt_chain map not unique"};
		}

214
		chain.params.at(chain_pos) = task.get<double>(pt_param);
215
216
		chain.task_ids.at(chain_pos) = i;

Lukas Weber's avatar
Lukas Weber committed
217
218
219
		const char *pt_sweep_error =
		    "in parallel tempering mode, sweeps are measured in global updates and need to be the "
		    "same within each chain";
220

221
222
223
224
225
226
		int target_sweeps = task.get<int>("sweeps");
		if(chain.target_sweeps < 0) {
			chain.target_sweeps = target_sweeps;
		} else if(target_sweeps != chain.target_sweeps) {
			throw std::runtime_error{pt_sweep_error};
		}
Lukas Weber's avatar
Lukas Weber committed
227

228
229
230
231
232
233
		int target_thermalization = task.get<int>("thermalization");
		if(chain.target_thermalization < 0) {
			chain.target_thermalization = target_thermalization;
		} else if(target_thermalization != chain.target_thermalization) {
			throw std::runtime_error{pt_sweep_error};
		}
Lukas Weber's avatar
Lukas Weber committed
234

235
236
237
		int sweeps = job_.read_dump_progress(i);
		if(chain.sweeps < 0) {
			int sweeps_per_global_update = task.get<int>("pt_sweeps_per_global_update");
Lukas Weber's avatar
Lukas Weber committed
238
			chain.sweeps = sweeps / sweeps_per_global_update;
239
240
241
242
		} else if(sweeps != chain.sweeps) {
			throw std::runtime_error{pt_sweep_error};
		}
	}
Lukas Weber's avatar
Lukas Weber committed
243

244
245
246
247
248
	chain_len_ = -1;
	for(auto &c : pt_chains_) {
		if(c.id == -1) {
			throw std::runtime_error{"parallel tempering pt_chain map contains gaps"};
		}
249

250
251
252
		if(chain_len_ != -1 && chain_len_ != static_cast<int>(c.task_ids.size())) {
			throw std::runtime_error{"parallel tempering pt_chains do not have the same length"};
		}
253

254
		chain_len_ = c.task_ids.size();
255
256
257

		c.nup_histogram.resize(chain_len_);
		c.ndown_histogram.resize(chain_len_);
Lukas Weber's avatar
Lukas Weber committed
258
		c.entries_before_optimization =
Lukas Weber's avatar
Lukas Weber committed
259
		    job_.jobfile["jobconfig"].get<int>("pt_parameter_optimization_nsamples_initial", 10000);
260
261
	}
	if(chain_len_ == -1) {
262
263
264
		throw std::runtime_error{
		    "parallel tempering pt_chain mapping missing. You need to specify 'pt_chain: "
		    "[chain_id, chain_position]' for every task in the job."};
265
266
	}

267
268
269
270
	if((num_active_ranks_ - 1) % chain_len_ != 0) {
		throw std::runtime_error{
		    "parallel tempering: number of ranks should be of the form chain_length*n+1 for best "
		    "efficiency"};
271
272
273
274
275
276
277
	}
}

void runner_pt_master::checkpoint_read() {
	construct_pt_chains();

	std::string master_dump_name = job_.jobdir() + "/pt_master.dump.h5";
278
	if(file_exists(master_dump_name)) {
279
280
281
282
283
284
		iodump dump = iodump::open_readonly(master_dump_name);
		auto g = dump.get_root();

		rng_->checkpoint_read(g.open_group("random_number_generator"));
		auto pt_chain_runs = g.open_group("pt_chain_runs");
		for(std::string name : pt_chain_runs) {
285
286
			pt_chain_runs_.emplace_back(
			    pt_chain_run::checkpoint_read(pt_chain_runs.open_group(name)));
287
288
		}

289
290
291
292
293
294
		auto pt_chains = g.open_group("pt_chains");
		for(std::string name : pt_chains) {
			int id = std::stoi(name);
			pt_chains_.at(id).checkpoint_read(pt_chains.open_group(name));
		}

295
296
297
298
299
300
301
		g.read("current_chain_id", current_chain_id_);
		uint8_t pt_swap_odd;
		g.read("pt_swap_odd", pt_swap_odd);
		pt_swap_odd_ = pt_swap_odd;
	}
}

302
303
304
305
306
307
308
309
310
311
312
313
314
315
void runner_pt_master::write_params_yaml() {
	using namespace YAML;
	Emitter params;
	params << BeginMap;
	for(auto c : pt_chains_) {
		params << Key << fmt::format("chain{:04d}", c.id);
		params << Value << Flow << c.params;
	}
	params << EndMap;

	std::ofstream file{job_.jobdir() + "/pt_optimized_params.yml"};
	file << params.c_str() << "\n";
}

316
317
318
319
void runner_pt_master::checkpoint_write() {
	std::string master_dump_name = job_.jobdir() + "/pt_master.dump.h5";

	job_.log(fmt::format("master: checkpoint {}", master_dump_name));
320

321
322
323
324
325
326
	iodump dump = iodump::create(master_dump_name);
	auto g = dump.get_root();

	rng_->checkpoint_write(g.open_group("random_number_generator"));
	auto pt_chain_runs = g.open_group("pt_chain_runs");
	for(auto &c : pt_chain_runs_) {
327
328
		c.checkpoint_write(
		    pt_chain_runs.open_group(fmt::format("chain{:04d}_run{:04d}", c.id, c.run_id)));
329
330
	}

331
332
333
334
335
	auto pt_chains = g.open_group("pt_chains");
	for(auto &c : pt_chains_) {
		c.checkpoint_write(pt_chains.open_group(fmt::format("{:04d}", c.id)));
	}

336
337
	g.write("current_chain_id", current_chain_id_);
	g.write("pt_swap_odd", static_cast<uint8_t>(pt_swap_odd_));
338
339
340
341

	if(use_param_optimization_) {
		write_params_yaml();
	}
342
343
344
345
346
}

void runner_pt_master::start() {
	MPI_Comm_size(MPI_COMM_WORLD, &num_active_ranks_);

347
	job_.log(fmt::format("starting job '{}' in parallel tempering mode", job_.jobname));
348
349
	checkpoint_read();

Lukas Weber's avatar
Lukas Weber committed
350
351
	use_param_optimization_ =
	    job_.jobfile["jobconfig"].get<bool>("pt_parameter_optimization", false);
352
353
354
355
356
357
	if(use_param_optimization_) {
		job_.log("using feedback parameter optimization");
	}

	std::vector<int> group_idx(num_active_ranks_);
	for(int i = 1; i < num_active_ranks_; i++) {
Lukas Weber's avatar
Lukas Weber committed
358
		group_idx[i] = (i - 1) / chain_len_;
359
	}
Lukas Weber's avatar
Lukas Weber committed
360
	MPI_Scatter(group_idx.data(), 1, MPI_INT, MPI_IN_PLACE, 1, MPI_INT, MASTER, MPI_COMM_WORLD);
Lukas Weber's avatar
Lukas Weber committed
361

362
363
	MPI_Comm tmp;
	MPI_Comm_split(MPI_COMM_WORLD, MPI_UNDEFINED, 0, &tmp);
Lukas Weber's avatar
Lukas Weber committed
364

365
366
	for(int rank_section = 0; rank_section < (num_active_ranks_ - 1) / chain_len_; rank_section++) {
		assign_new_chain(rank_section);
367
368
369
	}

	time_last_checkpoint_ = MPI_Wtime();
370

371
372
373
374
375
376
377
	while(num_active_ranks_ > 1) {
		react();
		if(MPI_Wtime() - time_last_checkpoint_ > job_.checkpoint_time) {
			time_last_checkpoint_ = MPI_Wtime();
			checkpoint_write();
		}
	}
378
	checkpoint_write();
379
380
381
382
383
384
385
386
387
388
}

int runner_pt_master::schedule_chain_run() {
	int old_id = current_chain_id_;
	int nchains = pt_chains_.size();
	for(int i = 1; i <= nchains; i++) {
		if(!pt_chains_[(old_id + i) % nchains].is_done()) {
			int new_chain_id = (old_id + i) % nchains;
			auto &chain = pt_chains_[new_chain_id];
			chain.scheduled_runs++;
389

390
391
392
393
394
395
396
397
398
			int idx = 0;
			for(auto &run : pt_chain_runs_) {
				if(run.id == chain.id && run.run_id == chain.scheduled_runs) {
					return idx;
				}
				idx++;
			}

			pt_chain_runs_.emplace_back(chain, chain.scheduled_runs);
399
			return pt_chain_runs_.size() - 1;
400
401
402
403
404
405
406
		}
	}

	// everything done!
	return -1;
}

407
int runner_pt_master::assign_new_chain(int rank_section) {
408
409
410
	int chain_run_id = schedule_chain_run();

	for(int target = 0; target < chain_len_; target++) {
411
		int msg[3] = {-1, 0, 0};
Lukas Weber's avatar
ehhhh  
Lukas Weber committed
412
		if(chain_run_id >= 0) {
413
414
415
416
			auto &chain_run = pt_chain_runs_[chain_run_id];
			auto &chain = pt_chains_[chain_run.id];
			msg[0] = chain.task_ids[target];
			msg[1] = chain_run.run_id;
417
			msg[2] = chain.target_sweeps + chain.target_thermalization - chain.sweeps;
418
419
420
421
		} else {
			// this will prompt the slave to quit
			num_active_ranks_--;
		}
422
		MPI_Send(&msg, sizeof(msg) / sizeof(msg[0]), MPI_INT,
423
		         rank_section * chain_len_ + target + 1, 0, MPI_COMM_WORLD);
424
	}
425
	rank_to_chain_run_[rank_section] = chain_run_id;
426
427
428
	return chain_run_id;
}

Lukas Weber's avatar
Lukas Weber committed
429
430
431
432
433
434
435
436
437
438
439
440
441
442
443
444
445
void runner_pt_master::pt_param_optimization(pt_chain &chain, pt_chain_run &chain_run) {
	for(size_t rank = 0; rank < chain_run.rank_to_pos.size(); rank++) {
		if(chain_run.rank_to_pos[rank] == 0) {
			chain_run.last_visited[rank] = 1;
		}
		if(chain_run.rank_to_pos[rank] ==
		   static_cast<int>(chain_run.rank_to_pos.size()) - 1) {
			chain_run.last_visited[rank] = -1;
		}

		chain.ndown_histogram[chain_run.rank_to_pos[rank]] +=
		    chain_run.last_visited[rank] == -1;
		chain.nup_histogram[chain_run.rank_to_pos[rank]] +=
		    chain_run.last_visited[rank] == 1;
	}
	chain.histogram_entries++;
	if(chain.histogram_entries >= chain.entries_before_optimization) {
Lukas Weber's avatar
ehhhh  
Lukas Weber committed
446
		auto [fnonlinearity, convergence] = chain.optimize_params(job_.jobfile["jobconfig"].get<int>(
Lukas Weber's avatar
Lukas Weber committed
447
448
		    "pt_parameter_optimization_linreg_len", 2));
		chain.clear_histograms();
Lukas Weber's avatar
ehhhh  
Lukas Weber committed
449
450
		job_.log(fmt::format("chain {}: pt param optimization: entries={}, f nonlinearity={:.2g}, convergence={:.2g}",
		                     chain.id, chain.entries_before_optimization, fnonlinearity, convergence));
Lukas Weber's avatar
Lukas Weber committed
451
452
453
454
455
456

		chain.entries_before_optimization *= job_.jobfile["jobconfig"].get<double>(
		    "pt_parameter_optimization_nsamples_growth", 1.5);
	}
}

457
void runner_pt_master::react() {
458
	int rank_status;
459
	MPI_Status stat;
460
461
462
	MPI_Recv(&rank_status, 1, MPI_INT, MPI_ANY_SOURCE, 0, MPI_COMM_WORLD, &stat);
	int rank = stat.MPI_SOURCE - 1;
	if(rank_status == S_BUSY) {
463
		int msg[1];
Lukas Weber's avatar
Lukas Weber committed
464
		MPI_Recv(msg, sizeof(msg) / sizeof(msg[0]), MPI_INT, rank + 1, 0, MPI_COMM_WORLD, &stat);
465
466
		int completed_sweeps = msg[0];

467
		int chain_run_id = rank_to_chain_run_[rank / chain_len_];
468
469
470
		auto &chain_run = pt_chain_runs_[chain_run_id];
		auto &chain = pt_chains_[chain_run.id];

471
		chain.sweeps += completed_sweeps;
472
		if(chain.is_done()) {
473
474
475
476
			chain.scheduled_runs--;
			int action = A_NEW_JOB;

			if(chain.scheduled_runs > 0) {
Lukas Weber's avatar
Lukas Weber committed
477
478
				job_.log(fmt::format("chain {} has enough sweeps. Waiting for {} busy rank sets.",
				                     chain.id, chain.scheduled_runs));
479
480
481
482
			} else {
				job_.log(fmt::format("chain {} rank {} is done. Merging.", chain.id, rank + 1));
				action = A_PROCESS_DATA_NEW_JOB;
				checkpoint_write();
483
			}
484
485
486
487
			for(int target = 0; target < chain_len_; target++) {
				send_action(action, rank / chain_len_ * chain_len_ + target + 1);
			}
			assign_new_chain(rank / chain_len_);
488
		} else {
489
490
491
			for(int target = 0; target < chain_len_; target++) {
				send_action(A_CONTINUE, rank / chain_len_ * chain_len_ + target + 1);
			}
492
		}
493
494
	} else if(rank_status == S_READY_FOR_GLOBAL) {
		int chain_run_id = rank_to_chain_run_[rank / chain_len_];
495
496
		auto &chain_run = pt_chain_runs_[chain_run_id];
		auto &chain = pt_chains_[chain_run.id];
497
		int rank_section = rank / chain_len_;
Lukas Weber's avatar
Lukas Weber committed
498

499
		if(use_param_optimization_) {
Lukas Weber's avatar
Lukas Weber committed
500
			pt_param_optimization(chain, chain_run);
501
502
503
		}

		for(int target = 0; target < chain_len_; target++) {
Lukas Weber's avatar
Lukas Weber committed
504
			int target_rank = rank_section * chain_len_ + target + 1;
505
			int pos = chain_run.rank_to_pos[target];
506
			// keep consistent with pt_global_update
507
			int partner_pos = pos + (2 * (pos & 1) - 1) * (2 * pt_swap_odd_ - 1);
508
509
			if(partner_pos < 0 || partner_pos >= chain_len_) {
				int response = GA_SKIP;
510
				MPI_Send(&response, 1, MPI_INT, target_rank, 0, MPI_COMM_WORLD);
Lukas Weber's avatar
Lukas Weber committed
511
				chain_run.weight_ratios[pos] = 1;
512
513
			} else {
				int response = GA_CALC_WEIGHT;
514
				MPI_Send(&response, 1, MPI_INT, target_rank, 0, MPI_COMM_WORLD);
515

516
517
518
519
				double partner_param = chain.params[partner_pos];
				MPI_Send(&partner_param, 1, MPI_DOUBLE, target_rank, 0, MPI_COMM_WORLD);
			}
		}
520

521
		for(int target = 0; target < chain_len_; target++) {
Lukas Weber's avatar
Lukas Weber committed
522
			int target_rank = rank_section * chain_len_ + target + 1;
523
524
			int pos = chain_run.rank_to_pos[target];
			if(chain_run.weight_ratios[pos] < 0) {
525
				double weight;
526
				MPI_Recv(&weight, 1, MPI_DOUBLE, target_rank, 0, MPI_COMM_WORLD, &stat);
527
				assert(weight >= 0);
Lukas Weber's avatar
Lukas Weber committed
528
				chain_run.weight_ratios[pos] = weight;
529
			}
530
		}
531

532
533
534
535
536
537
		pt_global_update(chain, chain_run);
		std::fill(chain_run.weight_ratios.begin(), chain_run.weight_ratios.end(), -1);

		for(int target = 0; target < chain_len_; target++) {
			int new_task_id = chain.task_ids[chain_run.rank_to_pos[target]];
			double new_param = chain.params[chain_run.rank_to_pos[target]];
Lukas Weber's avatar
Lukas Weber committed
538
539
540
541
			MPI_Send(&new_task_id, 1, MPI_INT, rank_section * chain_len_ + target + 1, 0,
			         MPI_COMM_WORLD);
			MPI_Send(&new_param, 1, MPI_DOUBLE, rank_section * chain_len_ + target + 1, 0,
			         MPI_COMM_WORLD);
542
543
544
545
546
547
548
		}
	} else { // S_TIMEUP
		num_active_ranks_--;
	}
}

void runner_pt_master::pt_global_update(pt_chain &chain, pt_chain_run &chain_run) {
549
	for(int i = pt_swap_odd_; i < static_cast<int>(chain.task_ids.size()) - 1; i += 2) {
550
		double w1 = chain_run.weight_ratios[i];
551
		double w2 = chain_run.weight_ratios[i + 1];
552
553
554

		double r = rng_->random_double();

555
		if(r < w1 * w2) {
556
			for(auto &p : chain_run.rank_to_pos) {
Lukas Weber's avatar
Lukas Weber committed
557
				if(p == i) {
Lukas Weber's avatar
Lukas Weber committed
558
559
					p = i + 1;
				} else if(p == i + 1) {
Lukas Weber's avatar
Lukas Weber committed
560
561
562
					p = i;
				}
			}
563
564
565
566
567
568
569
570
571
572
573
574
575
576
		}
	}

	pt_swap_odd_ = !pt_swap_odd_;
}

runner_pt_slave::runner_pt_slave(jobinfo job, mc_factory mccreator)
    : job_{std::move(job)}, mccreator_{std::move(mccreator)} {}

void runner_pt_slave::start() {
	MPI_Comm_rank(MPI_COMM_WORLD, &rank_);
	time_start_ = MPI_Wtime();
	time_last_checkpoint_ = time_start_;

577
	int group_idx;
Lukas Weber's avatar
Lukas Weber committed
578
	MPI_Scatter(NULL, 1, MPI_INT, &group_idx, 1, MPI_INT, MASTER, MPI_COMM_WORLD);
579
	MPI_Comm_split(MPI_COMM_WORLD, group_idx, 0, &chain_comm_);
Lukas Weber's avatar
Lukas Weber committed
580

581
582
	MPI_Comm_rank(chain_comm_, &chain_rank_);

Lukas Weber's avatar
Lukas Weber committed
583
584
	bool use_param_optimization =
	    job_.jobfile["jobconfig"].get<bool>("pt_parameter_optimization", false);
585

586
587
588
589
590
591
592
593
594
	if(!accept_new_chain()) {
		return;
	}

	int action{};
	do {
		while(sweeps_since_last_query_ < sweeps_before_communication_) {
			sys_->_do_update();

595
			if(sys_->is_thermalized() && !use_param_optimization) {
596
597
598
599
				sys_->_do_measurement();
			}

			if(sys_->sweep() % sweeps_per_global_update_ == 0) {
600
				sys_->pt_measure_statistics();
601
				pt_global_update();
602
				sweeps_since_last_query_++;
603
604
605
606
607
608
			}

			if(is_checkpoint_time() || time_is_up()) {
				break;
			}
		}
609

610
		checkpoint_write();
611
		MPI_Barrier(chain_comm_);
612
613
614

		if(time_is_up()) {
			send_status(S_TIMEUP);
615
616
			job_.log(fmt::format("rank {} exits: walltime up (safety interval = {} s)", rank_,
			                     sys_->safe_exit_interval()));
617
618
619
620
621
622
623
624
625
626
627
			break;
		}
		action = what_is_next(S_BUSY);
	} while(action != A_EXIT);

	if(action == A_EXIT) {
		job_.log(fmt::format("rank {} exits: out of work", rank_));
	}
}

void runner_pt_slave::send_status(int status) {
628
	MPI_Send(&status, 1, MPI_INT, MASTER, 0, MPI_COMM_WORLD);
629
630
631
}

void runner_pt_slave::pt_global_update() {
632
633
634
	if(chain_rank_ == 0) {
		send_status(S_READY_FOR_GLOBAL);
	}
635
636

	int response;
637
	MPI_Recv(&response, 1, MPI_INT, MASTER, 0, MPI_COMM_WORLD, MPI_STATUS_IGNORE);
Lukas Weber's avatar
Lukas Weber committed
638
	// job_.log(fmt::format(" * rank {}: ready for global update", rank_));
639

640
641
	const std::string& param_name = job_.jobfile["jobconfig"].get<std::string>("parallel_tempering_parameter");

642
	if(response == GA_CALC_WEIGHT) {
643
		double partner_param;
644
		MPI_Recv(&partner_param, 1, MPI_DOUBLE, MASTER, 0, MPI_COMM_WORLD, MPI_STATUS_IGNORE);
645
		double weight_ratio = sys_->_pt_weight_ratio(param_name, partner_param);
646
		MPI_Send(&weight_ratio, 1, MPI_DOUBLE, MASTER, 0, MPI_COMM_WORLD);
Lukas Weber's avatar
Lukas Weber committed
647
		// job_.log(fmt::format(" * rank {}: weight sent", rank_));
648
	} else {
Lukas Weber's avatar
Lukas Weber committed
649
		// job_.log(fmt::format(" * rank {}: no weight needed", rank_));
650
	}
651
	MPI_Barrier(chain_comm_);
652
653
654
655

	// this may be a long wait

	double new_param;
656
	int new_task_id;
Lukas Weber's avatar
Lukas Weber committed
657

658
659
660
661
662
663
664
665
	MPI_Recv(&new_task_id, 1, MPI_INT, MASTER, 0, MPI_COMM_WORLD, MPI_STATUS_IGNORE);
	MPI_Recv(&new_param, 1, MPI_DOUBLE, MASTER, 0, MPI_COMM_WORLD, MPI_STATUS_IGNORE);

	if(new_task_id != task_id_ || current_param_ != new_param) {
		sys_->measurements_write(job_.rundir(task_id_, run_id_));
	}

	MPI_Barrier(chain_comm_);
666

667
668
	if(new_task_id != task_id_ || current_param_ != new_param) {
		task_id_ = new_task_id;
Lukas Weber's avatar
Lukas Weber committed
669
670
		sweeps_per_global_update_ = job_.jobfile["tasks"][job_.task_names[task_id_]].get<int>(
		    "pt_sweeps_per_global_update");
671
		sys_->_pt_update_param(param_name, new_param, job_.rundir(task_id_, run_id_));
672
673
	}
	current_param_ = new_param;
674
675
676
677
}

bool runner_pt_slave::accept_new_chain() {
	int msg[3];
678
	MPI_Recv(&msg, sizeof(msg) / sizeof(msg[0]), MPI_INT, 0, 0, MPI_COMM_WORLD, MPI_STATUS_IGNORE);
679
680
681
682
683
684
685
686
	task_id_ = msg[0];
	run_id_ = msg[1];
	sweeps_before_communication_ = msg[2];

	if(task_id_ < 0) {
		return false;
	}

687
688
	sweeps_per_global_update_ =
	    job_.jobfile["tasks"][job_.task_names[task_id_]].get<int>("pt_sweeps_per_global_update");
689

690
	sys_ = std::unique_ptr<mc>{mccreator_(job_.jobfile["tasks"][job_.task_names[task_id_]])};
691
	sys_->pt_mode_ = true;
692
693
694
695
696
697
698
699
700
701
702
703
	if(!sys_->_read(job_.rundir(task_id_, run_id_))) {
		sys_->_init();
		job_.log(fmt::format("* initialized {}", job_.rundir(task_id_, run_id_)));
		checkpoint_write();
	} else {
		job_.log(fmt::format("* read {}", job_.rundir(task_id_, run_id_)));
	}

	return true;
}

int runner_pt_slave::what_is_next(int status) {
704
705
	if(chain_rank_ == 0) {
		send_status(status);
706

707
708
709
		int msg[1] = {sweeps_since_last_query_};
		MPI_Send(msg, sizeof(msg) / sizeof(msg[0]), MPI_INT, 0, 0, MPI_COMM_WORLD);
	}
710
711
	sweeps_since_last_query_ = 0;
	int new_action = recv_action();
712

713
714
715
716
717
718
719
720
721
722
723
724
725
726
727
728
729
730
731
732
733
734
735
	if(new_action == A_EXIT) {
		return A_EXIT;
	}

	if(new_action != A_CONTINUE) {
		if(new_action == A_PROCESS_DATA_NEW_JOB) {
			merge_measurements();
		}

		if(!accept_new_chain()) {
			return A_EXIT;
		}
	}
	return A_CONTINUE;
}

void runner_pt_slave::checkpoint_write() {
	time_last_checkpoint_ = MPI_Wtime();
	sys_->_write(job_.rundir(task_id_, run_id_));
	job_.log(fmt::format("* rank {}: checkpoint {}", rank_, job_.rundir(task_id_, run_id_)));
}

void runner_pt_master::send_action(int action, int destination) {
736
	MPI_Send(&action, 1, MPI_INT, destination, 0, MPI_COMM_WORLD);
737
738
739
740
}

int runner_pt_slave::recv_action() {
	int new_action;
741
	MPI_Recv(&new_action, 1, MPI_INT, MASTER, 0, MPI_COMM_WORLD, MPI_STATUS_IGNORE);
742
743
744
745
746
747
748
749
	return new_action;
}

void runner_pt_slave::merge_measurements() {
	std::string unique_filename = job_.taskdir(task_id_);
	sys_->_write_output(unique_filename);

	std::vector<evalable> evalables;
750
751
752
753
754
755
756
	if(job_.jobfile["jobconfig"].get<bool>("pt_parameter_optimization", false)) {
		if(rank_ == 1) {
			job_.log("Running in parameter optimization mode. No evalables are calculated.");
		}
	} else {
		sys_->register_evalables(evalables);
	}
757
758
759
760
761
762
763
764
765
766
767
768
	job_.merge_task(task_id_, evalables);
}

bool runner_pt_slave::is_checkpoint_time() {
	return MPI_Wtime() - time_last_checkpoint_ > job_.checkpoint_time;
}

bool runner_pt_slave::time_is_up() {
	double safe_interval = 0;
	if(sys_ != nullptr) {
		safe_interval = sys_->safe_exit_interval();
	}
769
	return MPI_Wtime() - time_start_ > job_.walltime - safe_interval;
770
771
}
}