runner_pt.cpp 22.6 KB
Newer Older
1
#include "runner_pt.h"
2
#include "util.h"
3
#include <fstream>
4
#include <filesystem>
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22

namespace loadl {

enum {
	S_BUSY,
	S_READY_FOR_GLOBAL,
	S_TIMEUP,

	A_CONTINUE,
	A_EXIT,
	A_NEW_JOB,
	A_PROCESS_DATA_NEW_JOB,

	// global update status response
	GA_DONE,
	GA_SKIP,
	GA_CALC_WEIGHT,

23
	MASTER = 0
24
25
};

26
27
28
29
30
31
enum {
	TR_CONTINUE,
	TR_CHECKPOINT,
	TR_TIMEUP,
};

Lukas Weber's avatar
Lukas Weber committed
32
pt_chain_run::pt_chain_run(const pt_chain &chain, int run_id) : id{chain.id}, run_id{run_id} {
33
34
	rank_to_pos.resize(chain.params.size());
	weight_ratios.resize(chain.params.size(), -1);
35
	switch_partners.resize(chain.params.size());
Lukas Weber's avatar
Lukas Weber committed
36

37
38
	for(size_t i = 0; i < rank_to_pos.size(); i++) {
		rank_to_pos[i] = i;
39
40
41
	}
}

Lukas Weber's avatar
fix pt    
Lukas Weber committed
42
pt_chain_run pt_chain_run::checkpoint_read(const pt_chain &chain, const iodump::group &g) {
43
44
	pt_chain_run run;
	g.read("id", run.id);
45
	assert(chain.id == run.id);
46
	g.read("run_id", run.run_id);
47
48
49
	uint8_t swap_odd;
	g.read("swap_odd", swap_odd);
	run.swap_odd = swap_odd;
50

Lukas Weber's avatar
fix pt    
Lukas Weber committed
51
52
53
54
55
56
57
58
	size_t size = chain.params.size();
	run.weight_ratios.resize(size, -1);
	run.switch_partners.resize(size);
	run.rank_to_pos.resize(size);

	for(size_t i = 0; i < run.rank_to_pos.size(); i++) {
		run.rank_to_pos[i] = i;
	}
59
60
61
62

	return run;
}

63
void pt_chain_run::checkpoint_write(const iodump::group &g) {
64
65
	g.write("id", id);
	g.write("run_id", run_id);
66
	g.write("swap_odd", static_cast<uint8_t>(swap_odd));
67
68
69
70
}

void pt_chain::checkpoint_read(const iodump::group &g) {
	g.read("params", params);
71
72
	g.read("rejection_rates", rejection_rates);
	g.read("rejection_rate_entries", rejection_rate_entries);
73
74
75
76
	g.read("entries_before_optimization", entries_before_optimization);
}

void pt_chain::checkpoint_write(const iodump::group &g) {
77
	g.write("params", params);
78
79
	g.write("rejection_rates", rejection_rates);
	g.write("rejection_rate_entries", rejection_rate_entries);
80
	g.write("entries_before_optimization", entries_before_optimization);
81
82
}

83
void pt_chain::clear_histograms() {
Lukas Weber's avatar
Lukas Weber committed
84
85
	rejection_rate_entries[0] = 0;
	rejection_rate_entries[1] = 0;
86
	std::fill(rejection_rates.begin(), rejection_rates.end(), 0);
87
88
}

89
90
91
// https://arxiv.org/pdf/1905.02939.pdf
std::tuple<double, double> pt_chain::optimize_params() {
	std::vector<double> rejection_est(rejection_rates);
Lukas Weber's avatar
Lukas Weber committed
92
	bool odd = false;
93
	for(auto &r : rejection_est) {
Lukas Weber's avatar
Lukas Weber committed
94
95
		r /= rejection_rate_entries[odd];
		odd = !odd;
96
97
98
		if(r == 0) { // ensure the comm_barrier is invertible
			r = 1e-3;
		}
99
	}
100

101
	std::vector<double> comm_barrier(params.size());
Lukas Weber's avatar
Lukas Weber committed
102

103
	double sum{};
104
	double efficiency{};
105
	for(size_t i = 0; i < comm_barrier.size() - 1; i++) {
106
		comm_barrier[i] = sum;
107
108
		sum += rejection_est[i];
		efficiency += rejection_est[i] / (1 - rejection_est[i]);
109
	}
110
	comm_barrier[comm_barrier.size() - 1] = sum;
111
112
113
114

	monotonic_interpolator lambda{comm_barrier, params};
	double convergence{};

115
116
	for(size_t i = 1; i < params.size() - 1; i++) {
		double new_param = lambda(sum * i / (params.size() - 1));
117
		double d = (new_param - params[i]);
118
119

		convergence += d * d;
120
		params[i] = new_param;
121
	}
Lukas Weber's avatar
Lukas Weber committed
122

123
	convergence = sqrt(convergence) / params.size();
Lukas Weber's avatar
Lukas Weber committed
124

125
	double round_trip_rate = (1 + sum) / (1 + efficiency);
126
127

	return std::tie(round_trip_rate, convergence);
128
129
130
}

bool pt_chain::is_done() {
Lukas Weber's avatar
Lukas Weber committed
131
	return sweeps >= target_sweeps;
132
133
134
135
136
137
138
}

int runner_pt_start(jobinfo job, const mc_factory &mccreator, int argc, char **argv) {
	MPI_Init(&argc, &argv);

	int rank;
	MPI_Comm_rank(MPI_COMM_WORLD, &rank);
139
	int rc = 0;
140
141
142

	if(rank == 0) {
		runner_pt_master r{std::move(job)};
143
		rc = r.start();
144
145
146
147
148
149
150
	} else {
		runner_pt_slave r{std::move(job), mccreator};
		r.start();
	}

	MPI_Finalize();

151
	return rc;
152
153
154
155
156
157
158
}

runner_pt_master::runner_pt_master(jobinfo job) : job_{std::move(job)} {
	rng_ = std::make_unique<random_number_generator>();
}

void runner_pt_master::construct_pt_chains() {
159
160
161
	std::string pt_param =
	    job_.jobfile["jobconfig"].get<std::string>("parallel_tempering_parameter");

162
163
164
	for(size_t i = 0; i < job_.task_names.size(); i++) {
		auto task = job_.jobfile["tasks"][job_.task_names[i]];

Lukas Weber's avatar
Lukas Weber committed
165
		auto [chain_id, chain_pos] = task.get<std::pair<int, int>>("pt_chain");
166
167
168
169
170
		if(chain_id < 0 || chain_pos < 0) {
			throw std::runtime_error{"parallel tempering pt_chain indices must be nonnegative"};
		}

		if(chain_id >= static_cast<int>(pt_chains_.size())) {
171
			pt_chains_.resize(chain_id + 1);
172
173
174
175
176
177
		}

		auto &chain = pt_chains_.at(chain_id);
		chain.id = chain_id;

		if(chain_pos >= static_cast<int>(chain.task_ids.size())) {
178
			chain.task_ids.resize(chain_pos + 1, -1);
179
			chain.params.resize(chain_pos + 1);
180
		}
181

182
183
184
185
		if(chain.task_ids.at(chain_pos) != -1) {
			throw std::runtime_error{"parallel tempering pt_chain map not unique"};
		}

186
		chain.params.at(chain_pos) = task.get<double>(pt_param);
187
188
		chain.task_ids.at(chain_pos) = i;

Lukas Weber's avatar
Lukas Weber committed
189
		const char *pt_sweep_error =
190
191
		    "chain {}: task {}: in parallel tempering mode, sweeps are measured in global updates "
		    "and need to be the "
192
		    "same within each chain: {} = {} != {}";
193

Lukas Weber's avatar
Lukas Weber committed
194
		int64_t target_sweeps = task.get<int>("sweeps");
195
		if(chain.target_sweeps >= 0 && target_sweeps != chain.target_sweeps) {
196
197
			throw std::runtime_error{fmt::format(pt_sweep_error, chain.id, i, "target_sweeps",
			                                     chain.target_sweeps, target_sweeps)};
198
		}
199
		chain.target_sweeps = target_sweeps;
Lukas Weber's avatar
Lukas Weber committed
200

201
		int target_thermalization = task.get<int>("thermalization");
Lukas Weber's avatar
Lukas Weber committed
202
203
		if(chain.target_thermalization >= 0 &&
		   target_thermalization != chain.target_thermalization) {
Lukas Weber's avatar
Lukas Weber committed
204
			throw std::runtime_error{fmt::format(pt_sweep_error, chain.id, i, "thermalization",
Lukas Weber's avatar
Lukas Weber committed
205
206
			                                     chain.target_thermalization,
			                                     target_thermalization)};
207
		}
208
		chain.target_thermalization = target_thermalization;
Lukas Weber's avatar
Lukas Weber committed
209

Lukas Weber's avatar
Lukas Weber committed
210
211
		int64_t sweeps_per_global_update = task.get<int>("pt_sweeps_per_global_update");
		int64_t sweeps = job_.read_dump_progress(i) / sweeps_per_global_update;
212
		if(chain.sweeps >= 0 && sweeps != chain.sweeps) {
213
214
			throw std::runtime_error{
			    fmt::format(pt_sweep_error, chain.id, i, "sweeps", chain.sweeps, sweeps)};
215
		}
216
		chain.sweeps = sweeps;
217
	}
Lukas Weber's avatar
Lukas Weber committed
218

219
220
221
222
223
	chain_len_ = -1;
	for(auto &c : pt_chains_) {
		if(c.id == -1) {
			throw std::runtime_error{"parallel tempering pt_chain map contains gaps"};
		}
224

225
226
227
		if(chain_len_ != -1 && chain_len_ != static_cast<int>(c.task_ids.size())) {
			throw std::runtime_error{"parallel tempering pt_chains do not have the same length"};
		}
228

229
		chain_len_ = c.task_ids.size();
230

231
		c.rejection_rates.resize(chain_len_ - 1);
232

233
234
235
		if(po_config_.enabled) {
			c.entries_before_optimization = po_config_.nsamples_initial;
		}
236
237
	}
	if(chain_len_ == -1) {
238
239
240
		throw std::runtime_error{
		    "parallel tempering pt_chain mapping missing. You need to specify 'pt_chain: "
		    "[chain_id, chain_position]' for every task in the job."};
241
242
	}

243
244
245
246
	if((num_active_ranks_ - 1) % chain_len_ != 0) {
		throw std::runtime_error{
		    "parallel tempering: number of ranks should be of the form chain_length*n+1 for best "
		    "efficiency"};
247
248
249
250
251
252
253
	}
}

void runner_pt_master::checkpoint_read() {
	construct_pt_chains();

	std::string master_dump_name = job_.jobdir() + "/pt_master.dump.h5";
254
	if(std::filesystem::exists(master_dump_name)) {
255
		job_.log(fmt::format("master reading dump from '{}'", master_dump_name));
256
257
258
259
260
		iodump dump = iodump::open_readonly(master_dump_name);
		auto g = dump.get_root();

		rng_->checkpoint_read(g.open_group("random_number_generator"));

261
262
263
264
265
		auto pt_chains = g.open_group("pt_chains");
		for(std::string name : pt_chains) {
			int id = std::stoi(name);
			pt_chains_.at(id).checkpoint_read(pt_chains.open_group(name));
		}
266

Lukas Weber's avatar
fix pt    
Lukas Weber committed
267
268
269
270
271
272
273
		auto pt_chain_runs = g.open_group("pt_chain_runs");
		for(std::string name : pt_chain_runs) {
			int id;
			pt_chain_runs.read(fmt::format("{}/id", name), id);
			pt_chain_runs_.emplace_back(
			    pt_chain_run::checkpoint_read(pt_chains_.at(id), pt_chain_runs.open_group(name)));
		}
274
275
276
	}
}

277
278
void runner_pt_master::write_params_json() {
	nlohmann::json params;
279
	for(auto c : pt_chains_) {
280
		params[fmt::format("chain{:04d}", c.id)] = c.params;
281
282
	}

283
284
	std::ofstream file{job_.jobdir() + "/pt_optimized_params.json"};
	file << params.dump(1) << "\n";
285
286
}

287
288
289
290
291
292
void runner_pt_master::write_statistics(const pt_chain_run &chain_run) {
	std::string stat_name = job_.jobdir() + "/pt_statistics.h5";
	iodump stat = iodump::open_readwrite(stat_name);
	auto g = stat.get_root();

	g.write("chain_length", chain_len_);
293

294
295
296
297
298
299
	auto cg = g.open_group(fmt::format("chain{:04d}_run{:04d}", chain_run.id, chain_run.run_id));
	cg.insert_back("rank_to_pos", chain_run.rank_to_pos);
}

void runner_pt_master::write_param_optimization_statistics() {
	std::string stat_name = job_.jobdir() + "/pt_statistics.h5";
300
301
302
303
	iodump stat = iodump::open_readwrite(stat_name);
	auto g = stat.get_root();

	g.write("chain_length", chain_len_);
304

305
306
307
	for(auto &chain : pt_chains_) {
		auto cg = g.open_group(fmt::format("chain{:04d}", chain.id));
		cg.insert_back("params", chain.params);
308
309

		std::vector<double> rejection_est(chain.rejection_rates);
Lukas Weber's avatar
Lukas Weber committed
310
		bool odd = false;
311
		for(auto &r : rejection_est) {
Lukas Weber's avatar
Lukas Weber committed
312
313
			r /= chain.rejection_rate_entries[odd];
			odd = !odd;
314
315
316
		}

		cg.insert_back("rejection_rates", rejection_est);
317
318
319
	}
}

320
321
322
323
void runner_pt_master::checkpoint_write() {
	std::string master_dump_name = job_.jobdir() + "/pt_master.dump.h5";

	job_.log(fmt::format("master: checkpoint {}", master_dump_name));
324

325
326
327
328
329
330
	iodump dump = iodump::create(master_dump_name);
	auto g = dump.get_root();

	rng_->checkpoint_write(g.open_group("random_number_generator"));
	auto pt_chain_runs = g.open_group("pt_chain_runs");
	for(auto &c : pt_chain_runs_) {
331
332
		c.checkpoint_write(
		    pt_chain_runs.open_group(fmt::format("chain{:04d}_run{:04d}", c.id, c.run_id)));
333
334
	}

335
336
337
338
339
	auto pt_chains = g.open_group("pt_chains");
	for(auto &c : pt_chains_) {
		c.checkpoint_write(pt_chains.open_group(fmt::format("{:04d}", c.id)));
	}

340
	if(po_config_.enabled) {
341
		write_params_json();
342
	}
343
344
}

345
int runner_pt_master::start() {
346
347
	MPI_Comm_size(MPI_COMM_WORLD, &num_active_ranks_);

348
	po_config_.enabled = job_.jobfile["jobconfig"].defined("pt_parameter_optimization");
349
	if(po_config_.enabled) {
350
		job_.log("using feedback parameter optimization");
351
352
353
		const auto &po = job_.jobfile["jobconfig"]["pt_parameter_optimization"];
		po_config_.nsamples_initial = po.get<int>("nsamples_initial");
		po_config_.nsamples_growth = po.get<double>("nsamples_growth");
354
355
	}

356
357
358
	job_.log(fmt::format("starting job '{}' in parallel tempering mode", job_.jobname));
	checkpoint_read();

359
360
	std::vector<int> group_idx(num_active_ranks_);
	for(int i = 1; i < num_active_ranks_; i++) {
Lukas Weber's avatar
Lukas Weber committed
361
		group_idx[i] = (i - 1) / chain_len_;
362
	}
Lukas Weber's avatar
Lukas Weber committed
363
	MPI_Scatter(group_idx.data(), 1, MPI_INT, MPI_IN_PLACE, 1, MPI_INT, MASTER, MPI_COMM_WORLD);
Lukas Weber's avatar
Lukas Weber committed
364

365
366
	MPI_Comm tmp;
	MPI_Comm_split(MPI_COMM_WORLD, MPI_UNDEFINED, 0, &tmp);
Lukas Weber's avatar
Lukas Weber committed
367

Lukas Weber's avatar
Lukas Weber committed
368
369
	int chain_count = (num_active_ranks_ - 1) / chain_len_;
	for(int rank_section = 0; rank_section < chain_count; rank_section++) {
370
		assign_new_chain(rank_section);
371
372
373
	}

	time_last_checkpoint_ = MPI_Wtime();
374

375
376
377
378
379
380
381
	while(num_active_ranks_ > 1) {
		react();
		if(MPI_Wtime() - time_last_checkpoint_ > job_.checkpoint_time) {
			time_last_checkpoint_ = MPI_Wtime();
			checkpoint_write();
		}
	}
382
	checkpoint_write();
383
384
385
386
387
388
389
390
391

	bool all_done = true;
	for(auto &c : pt_chains_) {
		if(!c.is_done()) {
			all_done = false;
			break;
		}
	}
	return !all_done;
392
393
394
395
396
397
398
}

int runner_pt_master::schedule_chain_run() {
	int old_id = current_chain_id_;
	int nchains = pt_chains_.size();
	for(int i = 1; i <= nchains; i++) {
		if(!pt_chains_[(old_id + i) % nchains].is_done()) {
399
400
			current_chain_id_ = (old_id + i) % nchains;
			auto &chain = pt_chains_[current_chain_id_];
401
			chain.scheduled_runs++;
402

403
404
405
406
407
408
409
410
411
			int idx = 0;
			for(auto &run : pt_chain_runs_) {
				if(run.id == chain.id && run.run_id == chain.scheduled_runs) {
					return idx;
				}
				idx++;
			}

			pt_chain_runs_.emplace_back(chain, chain.scheduled_runs);
412
			return pt_chain_runs_.size() - 1;
413
414
415
416
417
418
419
		}
	}

	// everything done!
	return -1;
}

420
int runner_pt_master::assign_new_chain(int rank_section) {
421
422
	int chain_run_id = schedule_chain_run();
	for(int target = 0; target < chain_len_; target++) {
Lukas Weber's avatar
Lukas Weber committed
423
		int64_t msg[3] = {-1, 0, 0};
Lukas Weber's avatar
ehhhh    
Lukas Weber committed
424
		if(chain_run_id >= 0) {
425
426
427
428
			auto &chain_run = pt_chain_runs_[chain_run_id];
			auto &chain = pt_chains_[chain_run.id];
			msg[0] = chain.task_ids[target];
			msg[1] = chain_run.run_id;
Lukas Weber's avatar
Lukas Weber committed
429
			msg[2] = std::max(1L, chain.target_sweeps - chain.sweeps);
430
431
432
433
		} else {
			// this will prompt the slave to quit
			num_active_ranks_--;
		}
Lukas Weber's avatar
Lukas Weber committed
434
		MPI_Send(&msg, sizeof(msg) / sizeof(msg[0]), MPI_INT64_T,
435
		         rank_section * chain_len_ + target + 1, 0, MPI_COMM_WORLD);
436
	}
437
	rank_to_chain_run_[rank_section] = chain_run_id;
438
439
440
	return chain_run_id;
}

441
void runner_pt_master::pt_param_optimization(pt_chain &chain) {
442
443
	if(std::min(chain.rejection_rate_entries[0], chain.rejection_rate_entries[1]) >=
	   chain.entries_before_optimization) {
444
445
		chain.entries_before_optimization *= po_config_.nsamples_growth;

446
		auto [efficiency, convergence] = chain.optimize_params();
447
		job_.log(
448
		    fmt::format("chain {}: pt param optimization: entries={}, efficiency={:.2g}, "
449
		                "convergence={:.2g}",
450
		                chain.id, chain.rejection_rate_entries[0], efficiency, convergence));
451
		checkpoint_write();
452
		write_param_optimization_statistics();
453
		chain.clear_histograms();
Lukas Weber's avatar
Lukas Weber committed
454
455
456
	}
}

457
void runner_pt_master::react() {
458
	int rank_status;
459
	MPI_Status stat;
460
461
462
	MPI_Recv(&rank_status, 1, MPI_INT, MPI_ANY_SOURCE, 0, MPI_COMM_WORLD, &stat);
	int rank = stat.MPI_SOURCE - 1;
	if(rank_status == S_BUSY) {
Lukas Weber's avatar
Lukas Weber committed
463
		int64_t msg[1];
464
465
		MPI_Recv(msg, sizeof(msg) / sizeof(msg[0]), MPI_INT64_T, rank + 1, 0, MPI_COMM_WORLD,
		         &stat);
Lukas Weber's avatar
Lukas Weber committed
466
		int64_t completed_sweeps = msg[0];
467

468
		int chain_run_id = rank_to_chain_run_[rank / chain_len_];
469
470
471
		auto &chain_run = pt_chain_runs_[chain_run_id];
		auto &chain = pt_chains_[chain_run.id];

472
		chain.sweeps += completed_sweeps;
473
		if(chain.is_done()) {
474
475
476
477
			chain.scheduled_runs--;
			int action = A_NEW_JOB;

			if(chain.scheduled_runs > 0) {
Lukas Weber's avatar
Lukas Weber committed
478
479
				job_.log(fmt::format("chain {} has enough sweeps. Waiting for {} busy rank sets.",
				                     chain.id, chain.scheduled_runs));
480
481
482
483
			} else {
				job_.log(fmt::format("chain {} rank {} is done. Merging.", chain.id, rank + 1));
				action = A_PROCESS_DATA_NEW_JOB;
				checkpoint_write();
484
			}
485
486
487
488
			for(int target = 0; target < chain_len_; target++) {
				send_action(action, rank / chain_len_ * chain_len_ + target + 1);
			}
			assign_new_chain(rank / chain_len_);
489
		} else {
490
491
492
			for(int target = 0; target < chain_len_; target++) {
				send_action(A_CONTINUE, rank / chain_len_ * chain_len_ + target + 1);
			}
493
		}
494
495
	} else if(rank_status == S_READY_FOR_GLOBAL) {
		int chain_run_id = rank_to_chain_run_[rank / chain_len_];
496
497
		auto &chain_run = pt_chain_runs_[chain_run_id];
		auto &chain = pt_chains_[chain_run.id];
498
		int rank_section = rank / chain_len_;
Lukas Weber's avatar
Lukas Weber committed
499

500
		if(po_config_.enabled) {
501
			pt_param_optimization(chain);
502
503
		}

504
		std::fill(chain_run.weight_ratios.begin(), chain_run.weight_ratios.end(), -1);
505
		for(int target = 0; target < chain_len_; target++) {
Lukas Weber's avatar
Lukas Weber committed
506
			int target_rank = rank_section * chain_len_ + target + 1;
507
			int pos = chain_run.rank_to_pos[target];
508
			// keep consistent with pt_global_update
509
			int partner_pos = pos + (2 * (pos & 1) - 1) * (2 * chain_run.swap_odd - 1);
510
511
			if(partner_pos < 0 || partner_pos >= chain_len_) {
				int response = GA_SKIP;
512
				MPI_Send(&response, 1, MPI_INT, target_rank, 0, MPI_COMM_WORLD);
Lukas Weber's avatar
Lukas Weber committed
513
				chain_run.weight_ratios[pos] = 1;
514
515
			} else {
				int response = GA_CALC_WEIGHT;
516
				MPI_Send(&response, 1, MPI_INT, target_rank, 0, MPI_COMM_WORLD);
517

518
519
520
521
				double partner_param = chain.params[partner_pos];
				MPI_Send(&partner_param, 1, MPI_DOUBLE, target_rank, 0, MPI_COMM_WORLD);
			}
		}
522

523
		for(int target = 0; target < chain_len_; target++) {
Lukas Weber's avatar
Lukas Weber committed
524
			int target_rank = rank_section * chain_len_ + target + 1;
525
526
			int pos = chain_run.rank_to_pos[target];
			if(chain_run.weight_ratios[pos] < 0) {
527
				double weight;
528
				MPI_Recv(&weight, 1, MPI_DOUBLE, target_rank, 0, MPI_COMM_WORLD, &stat);
Lukas Weber's avatar
Lukas Weber committed
529
				chain_run.weight_ratios[pos] = weight;
530
			}
531
		}
532

533
534
535
536
		pt_global_update(chain, chain_run);

		for(int target = 0; target < chain_len_; target++) {
			int new_task_id = chain.task_ids[chain_run.rank_to_pos[target]];
Lukas Weber's avatar
Lukas Weber committed
537
			int partner_rank = rank_section * chain_len_ + chain_run.switch_partners[target] + 1;
538
			int msg[2] = {new_task_id, partner_rank};
Lukas Weber's avatar
Lukas Weber committed
539
540
			MPI_Send(msg, sizeof(msg) / sizeof(msg[0]), MPI_INT,
			         rank_section * chain_len_ + target + 1, 0, MPI_COMM_WORLD);
541
			double new_param = chain.params[chain_run.rank_to_pos[target]];
Lukas Weber's avatar
Lukas Weber committed
542
543
			MPI_Send(&new_param, 1, MPI_DOUBLE, rank_section * chain_len_ + target + 1, 0,
			         MPI_COMM_WORLD);
544
		}
545
546
547
548

		if(job_.jobfile["jobconfig"].get<bool>("pt_statistics", false)) {
			write_statistics(chain_run);
		}
549
550
551
552
553
554
	} else { // S_TIMEUP
		num_active_ranks_--;
	}
}

void runner_pt_master::pt_global_update(pt_chain &chain, pt_chain_run &chain_run) {
555
	int i = 0;
Lukas Weber's avatar
Lukas Weber committed
556
	for(auto &p : chain_run.switch_partners) {
557
558
559
		p = i++;
	}
	for(int i = chain_run.swap_odd; i < static_cast<int>(chain.task_ids.size()) - 1; i += 2) {
560
		double w1 = chain_run.weight_ratios[i];
561
		double w2 = chain_run.weight_ratios[i + 1];
562

563
		double p = std::min(exp(w1 + w2), 1.);
564
565
		double r = rng_->random_double();

566
567
		chain.rejection_rates[i] += 1 - p;
		if(r < p) {
568
569
570
571
			int rank0{};
			int rank1{};

			int rank = 0;
572
			for(auto &p : chain_run.rank_to_pos) {
Lukas Weber's avatar
Lukas Weber committed
573
				if(p == i) {
574
					rank0 = rank;
Lukas Weber's avatar
Lukas Weber committed
575
				} else if(p == i + 1) {
576
					rank1 = rank;
Lukas Weber's avatar
Lukas Weber committed
577
				}
578
				rank++;
Lukas Weber's avatar
Lukas Weber committed
579
			}
Lukas Weber's avatar
Lukas Weber committed
580
			chain_run.rank_to_pos[rank0] = i + 1;
581
582
583
584
			chain_run.rank_to_pos[rank1] = i;

			chain_run.switch_partners[rank0] = rank1;
			chain_run.switch_partners[rank1] = rank0;
585
586
587
		}
	}

588
	chain.rejection_rate_entries[chain_run.swap_odd]++;
589
	chain_run.swap_odd = !chain_run.swap_odd;
590
591
592
593
594
595
596
597
598
599
}

runner_pt_slave::runner_pt_slave(jobinfo job, mc_factory mccreator)
    : job_{std::move(job)}, mccreator_{std::move(mccreator)} {}

void runner_pt_slave::start() {
	MPI_Comm_rank(MPI_COMM_WORLD, &rank_);
	time_start_ = MPI_Wtime();
	time_last_checkpoint_ = time_start_;

600
	int group_idx;
Lukas Weber's avatar
Lukas Weber committed
601
	MPI_Scatter(NULL, 1, MPI_INT, &group_idx, 1, MPI_INT, MASTER, MPI_COMM_WORLD);
602
	MPI_Comm_split(MPI_COMM_WORLD, group_idx, 0, &chain_comm_);
Lukas Weber's avatar
Lukas Weber committed
603

604
605
	MPI_Comm_rank(chain_comm_, &chain_rank_);

606
	bool use_param_optimization = job_.jobfile["jobconfig"].defined("pt_parameter_optimization");
607

608
	if(!accept_new_chain()) {
Lukas Weber's avatar
Lukas Weber committed
609
		job_.log(fmt::format("rank {} exits: out of work", rank_));
610
611
612
613
614
		return;
	}

	int action{};
	do {
615
616
		int timeout{TR_CHECKPOINT};

617
618
619
		while(sweeps_since_last_query_ < sweeps_before_communication_) {
			sys_->_do_update();

620
			if(sys_->is_thermalized() && !use_param_optimization) {
621
622
623
624
625
				sys_->_do_measurement();
			}

			if(sys_->sweep() % sweeps_per_global_update_ == 0) {
				pt_global_update();
Lukas Weber's avatar
Lukas Weber committed
626
627
628
				if(sys_->is_thermalized()) {
					sweeps_since_last_query_++;
				}
629

630
631
632
633
				timeout = negotiate_timeout();
				if(timeout != TR_CONTINUE) {
					break;
				}
634
635
			}
		}
636

637
638
		checkpoint_write();

639
		if(timeout == TR_TIMEUP) {
640
			send_status(S_TIMEUP);
Lukas Weber's avatar
Lukas Weber committed
641
			job_.log(fmt::format("rank {} exits: time up", rank_));
642
643
644
645
646
647
648
649
650
651
652
			break;
		}
		action = what_is_next(S_BUSY);
	} while(action != A_EXIT);

	if(action == A_EXIT) {
		job_.log(fmt::format("rank {} exits: out of work", rank_));
	}
}

void runner_pt_slave::send_status(int status) {
653
	MPI_Send(&status, 1, MPI_INT, MASTER, 0, MPI_COMM_WORLD);
654
655
}

656
657
658
659
660
661
662
int runner_pt_slave::negotiate_timeout() {
	int result = TR_CONTINUE;
	if(chain_rank_ == 0) {
		if(MPI_Wtime() - time_last_checkpoint_ > job_.checkpoint_time) {
			result = TR_CHECKPOINT;
		}

Lukas Weber's avatar
Lukas Weber committed
663
		if(MPI_Wtime() - time_start_ > job_.runtime) {
664
665
666
667
668
669
670
671
			result = TR_TIMEUP;
		}
	}

	MPI_Bcast(&result, 1, MPI_INT, 0, chain_comm_);
	return result;
}

672
void runner_pt_slave::pt_global_update() {
673
674
675
	if(chain_rank_ == 0) {
		send_status(S_READY_FOR_GLOBAL);
	}
676
677

	int response;
678
	MPI_Recv(&response, 1, MPI_INT, MASTER, 0, MPI_COMM_WORLD, MPI_STATUS_IGNORE);
Lukas Weber's avatar
Lukas Weber committed
679
	// job_.log(fmt::format(" * rank {}: ready for global update", rank_));
680

Lukas Weber's avatar
Lukas Weber committed
681
682
	const std::string &param_name =
	    job_.jobfile["jobconfig"].get<std::string>("parallel_tempering_parameter");
683

684
	if(response == GA_CALC_WEIGHT) {
685
		double partner_param;
686
		MPI_Recv(&partner_param, 1, MPI_DOUBLE, MASTER, 0, MPI_COMM_WORLD, MPI_STATUS_IGNORE);
687
		double weight_ratio = sys_->_pt_weight_ratio(param_name, partner_param);
688
		MPI_Send(&weight_ratio, 1, MPI_DOUBLE, MASTER, 0, MPI_COMM_WORLD);
Lukas Weber's avatar
Lukas Weber committed
689
		// job_.log(fmt::format(" * rank {}: weight sent", rank_));
690
	} else {
Lukas Weber's avatar
Lukas Weber committed
691
		// job_.log(fmt::format(" * rank {}: no weight needed", rank_));
692
	}
693
	MPI_Barrier(chain_comm_);
694
695
696

	// this may be a long wait

697
	int msg[2];
Lukas Weber's avatar
Lukas Weber committed
698
699
	MPI_Recv(&msg, sizeof(msg) / sizeof(msg[0]), MPI_INT, MASTER, 0, MPI_COMM_WORLD,
	         MPI_STATUS_IGNORE);
700
701
	int new_task_id = msg[0];
	int target_rank = msg[1];
Lukas Weber's avatar
Lukas Weber committed
702

703
	double new_param;
704
705
706
707
	MPI_Recv(&new_param, 1, MPI_DOUBLE, MASTER, 0, MPI_COMM_WORLD, MPI_STATUS_IGNORE);

	if(new_task_id != task_id_ || current_param_ != new_param) {
		task_id_ = new_task_id;
Lukas Weber's avatar
Lukas Weber committed
708
709
		sweeps_per_global_update_ = job_.jobfile["tasks"][job_.task_names[task_id_]].get<int>(
		    "pt_sweeps_per_global_update");
710
		sys_->_pt_update_param(target_rank, param_name, new_param);
711
712
	}
	current_param_ = new_param;
713
714
715
}

bool runner_pt_slave::accept_new_chain() {
Lukas Weber's avatar
Lukas Weber committed
716
	int64_t msg[3];
717
718
	MPI_Recv(&msg, sizeof(msg) / sizeof(msg[0]), MPI_INT64_T, 0, 0, MPI_COMM_WORLD,
	         MPI_STATUS_IGNORE);
719
720
721
722
723
724
725
726
	task_id_ = msg[0];
	run_id_ = msg[1];
	sweeps_before_communication_ = msg[2];

	if(task_id_ < 0) {
		return false;
	}

727
728
	sweeps_per_global_update_ = job_.jobfile["tasks"][job_.task_names[task_id_]].get<int64_t>(
	    "pt_sweeps_per_global_update");
729

730
	sys_ = std::unique_ptr<mc>{mccreator_(job_.jobfile["tasks"][job_.task_names[task_id_]])};
731
	sys_->pt_mode_ = true;
732
733
734
735
736
737
738
739
740
741
742
743
	if(!sys_->_read(job_.rundir(task_id_, run_id_))) {
		sys_->_init();
		job_.log(fmt::format("* initialized {}", job_.rundir(task_id_, run_id_)));
		checkpoint_write();
	} else {
		job_.log(fmt::format("* read {}", job_.rundir(task_id_, run_id_)));
	}

	return true;
}

int runner_pt_slave::what_is_next(int status) {
744
745
	if(chain_rank_ == 0) {
		send_status(status);
746

Lukas Weber's avatar
Lukas Weber committed
747
748
		int64_t msg[1] = {sweeps_since_last_query_};
		MPI_Send(msg, sizeof(msg) / sizeof(msg[0]), MPI_INT64_T, 0, 0, MPI_COMM_WORLD);
749
	}
750
751
	sweeps_since_last_query_ = 0;
	int new_action = recv_action();
752

753
754
755
756
757
758
759
760
761
762
763
764
765
766
767
768
769
770
771
	if(new_action == A_EXIT) {
		return A_EXIT;
	}

	if(new_action != A_CONTINUE) {
		if(new_action == A_PROCESS_DATA_NEW_JOB) {
			merge_measurements();
		}

		if(!accept_new_chain()) {
			return A_EXIT;
		}
	}
	return A_CONTINUE;
}

void runner_pt_slave::checkpoint_write() {
	time_last_checkpoint_ = MPI_Wtime();
	sys_->_write(job_.rundir(task_id_, run_id_));
772
773
	MPI_Barrier(chain_comm_);
	sys_->_write_finalize(job_.rundir(task_id_, run_id_));
Lukas Weber's avatar
Lukas Weber committed
774
	job_.log(fmt::format("* rank {}: checkpoint {}", rank_, job_.rundir(task_id_, run_id_)));
775
776
777
}

void runner_pt_master::send_action(int action, int destination) {
778
	MPI_Send(&action, 1, MPI_INT, destination, 0, MPI_COMM_WORLD);
779
780
781
782
}

int runner_pt_slave::recv_action() {
	int new_action;
783
	MPI_Recv(&new_action, 1, MPI_INT, MASTER, 0, MPI_COMM_WORLD, MPI_STATUS_IGNORE);
784
785
786
787
788
	return new_action;
}

void runner_pt_slave::merge_measurements() {
	std::string unique_filename = job_.taskdir(task_id_);
789
	sys_->write_output(unique_filename);
790

791
	job_.merge_task(task_id_);
792
793
794
}

}