runner_pt.cpp 22.1 KB
Newer Older
1
#include "runner_pt.h"
2
#include <fstream>
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21

namespace loadl {

enum {
	S_IDLE,
	S_BUSY,
	S_READY_FOR_GLOBAL,
	S_TIMEUP,

	A_CONTINUE,
	A_EXIT,
	A_NEW_JOB,
	A_PROCESS_DATA_NEW_JOB,

	// global update status response
	GA_DONE,
	GA_SKIP,
	GA_CALC_WEIGHT,

22
	MASTER = 0
23
24
};

Lukas Weber's avatar
Lukas Weber committed
25
pt_chain_run::pt_chain_run(const pt_chain &chain, int run_id) : id{chain.id}, run_id{run_id} {
26
27
	rank_to_pos.resize(chain.params.size());
	weight_ratios.resize(chain.params.size(), -1);
Lukas Weber's avatar
Lukas Weber committed
28

29
30
31
32
	last_visited.resize(chain.params.size());

	for(size_t i = 0; i < rank_to_pos.size(); i++) {
		rank_to_pos[i] = i;
33

34
		last_visited[i] = 0;
35
36
37
	}
}

38
pt_chain_run pt_chain_run::checkpoint_read(const iodump::group &g) {
39
40
41
	pt_chain_run run;
	g.read("id", run.id);
	g.read("run_id", run.run_id);
42
	g.read("rank_to_pos", run.rank_to_pos);
43

44
	run.weight_ratios.resize(run.rank_to_pos.size(), -1);
45
46
47
48

	return run;
}

49
void pt_chain_run::checkpoint_write(const iodump::group &g) {
50
51
	g.write("id", id);
	g.write("run_id", run_id);
52
53
54
55
56
57
58
59
60
61
62
63
	g.write("rank_to_pos", rank_to_pos);
}

void pt_chain::checkpoint_read(const iodump::group &g) {
	g.read("params", params);
	g.read("nup_histogram", nup_histogram);
	g.read("ndown_histogram", ndown_histogram);
	g.read("entries_before_optimization", entries_before_optimization);
	g.read("histogram_entries", histogram_entries);
}

void pt_chain::checkpoint_write(const iodump::group &g) {
64
	g.write("params", params);
65
66
67
68
69
70
71
72
73
	g.write("nup_histogram", nup_histogram);
	g.write("ndown_histogram", ndown_histogram);
	g.write("entries_before_optimization", entries_before_optimization);
	g.write("histogram_entries", histogram_entries);
}

void pt_chain::clear_histograms() {
	std::fill(nup_histogram.begin(), nup_histogram.end(), 0);
	std::fill(ndown_histogram.begin(), ndown_histogram.end(), 0);
Lukas Weber's avatar
Lukas Weber committed
74
	histogram_entries = 0;
75
76
}

Lukas Weber's avatar
Lukas Weber committed
77
78
79
80
static double linear_regression(const std::vector<double> &x, const std::vector<double> &y, int i,
                                int range) {
	int lower = std::max(0, i - range + 1);
	int upper = std::min(static_cast<int>(y.size() - 1), i + range);
81
82
83
84

	double sxy = 0;
	double sx = 0, sy = 0;
	double sx2 = 0;
Lukas Weber's avatar
Lukas Weber committed
85

86
	for(int j = lower; j <= upper; j++) {
Lukas Weber's avatar
Lukas Weber committed
87
		sxy += x[j] * y[j];
88
89
		sx += x[j];
		sy += y[j];
Lukas Weber's avatar
Lukas Weber committed
90
		sx2 += x[j] * x[j];
91
92
	}

Lukas Weber's avatar
Lukas Weber committed
93
94
	int n = upper - lower + 1;
	return (sxy - sx * sy / n) / (sx2 - sx * sx / n);
95
96
}

Lukas Weber's avatar
Lukas Weber committed
97
std::tuple<double, double> pt_chain::optimize_params(int linreg_len) {
98
99
100
101
102
	std::vector<double> eta(params.size());
	std::vector<double> f(params.size());
	std::vector<double> new_params(params.size());

	assert(params.size() > 1);
Lukas Weber's avatar
Lukas Weber committed
103

104
	new_params[0] = params[0];
Lukas Weber's avatar
Lukas Weber committed
105
	new_params[params.size() - 1] = params[params.size() - 1];
106

Lukas Weber's avatar
Lukas Weber committed
107
	double flinearity = 0;
108
109
110
111
	for(size_t i = 0; i < params.size(); i++) {
		if(nup_histogram[i] + ndown_histogram[i] == 0) {
			f[i] = 0;
		} else {
Lukas Weber's avatar
Lukas Weber committed
112
			f[i] = nup_histogram[i] / static_cast<double>(nup_histogram[i] + ndown_histogram[i]);
113
		}
Lukas Weber's avatar
Lukas Weber committed
114
115
116
		
		double ideal_f = 1-i/static_cast<double>(params.size());
		flinearity += (f[i]-ideal_f)*(f[i]-ideal_f);
117
	}
Lukas Weber's avatar
Lukas Weber committed
118
	flinearity = sqrt(flinearity)/params.size();
119
120

	double norm = 0;
Lukas Weber's avatar
Lukas Weber committed
121
122
123
124
125
	for(size_t i = 0; i < params.size() - 1; i++) {
		double dfdp = linear_regression(params, f, i, linreg_len);
		double dp = params[i + 1] - params[i];
		eta[i] = sqrt(std::max(0.01, -dfdp / dp));
		norm += eta[i] * dp;
126
127
128
129
	}
	for(auto &v : eta) {
		v /= norm;
	}
Lukas Weber's avatar
Lukas Weber committed
130
131
	
	double convergence = 0;	
Lukas Weber's avatar
Lukas Weber committed
132
133
	for(size_t i = 1; i < params.size() - 1; i++) {
		double target = static_cast<double>(i) / (params.size() - 1);
134
		int etai = 0;
Lukas Weber's avatar
Lukas Weber committed
135
136
		for(etai = 0; etai < static_cast<int>(params.size()) - 1; etai++) {
			double deta = eta[etai] * (params[etai + 1] - params[etai]);
137
138
139
140
141
			if(deta > target) {
				break;
			}
			target -= deta;
		}
Lukas Weber's avatar
Lukas Weber committed
142
		new_params[i] = params[etai] + target / eta[etai];
Lukas Weber's avatar
Lukas Weber committed
143
		convergence += (new_params[i]-params[i])*(new_params[i]-params[i]);
144
	}
Lukas Weber's avatar
Lukas Weber committed
145
146
147
148
149
150
151
152
153

	convergence = sqrt(convergence)/(params.size()-2);

	for(size_t i = 0; i < params.size(); i++) {
		double relaxation_fac = 1;
		params[i] = params[i]*(1-relaxation_fac) + relaxation_fac*new_params[i];
	}

	return std::tie(flinearity, convergence);
154
155
156
}

bool pt_chain::is_done() {
157
	return sweeps >= target_sweeps + target_thermalization;
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
}

int runner_pt_start(jobinfo job, const mc_factory &mccreator, int argc, char **argv) {
	MPI_Init(&argc, &argv);

	int rank;
	MPI_Comm_rank(MPI_COMM_WORLD, &rank);

	if(rank == 0) {
		runner_pt_master r{std::move(job)};
		r.start();
	} else {
		runner_pt_slave r{std::move(job), mccreator};
		r.start();
	}

	MPI_Finalize();

	return 0;
}

runner_pt_master::runner_pt_master(jobinfo job) : job_{std::move(job)} {
	rng_ = std::make_unique<random_number_generator>();
}

void runner_pt_master::construct_pt_chains() {
184
185
186
	std::string pt_param =
	    job_.jobfile["jobconfig"].get<std::string>("parallel_tempering_parameter");

187
188
189
	for(size_t i = 0; i < job_.task_names.size(); i++) {
		auto task = job_.jobfile["tasks"][job_.task_names[i]];

Lukas Weber's avatar
Lukas Weber committed
190
		auto [chain_id, chain_pos] = task.get<std::pair<int, int>>("pt_chain");
191
192
193
194
195
		if(chain_id < 0 || chain_pos < 0) {
			throw std::runtime_error{"parallel tempering pt_chain indices must be nonnegative"};
		}

		if(chain_id >= static_cast<int>(pt_chains_.size())) {
196
			pt_chains_.resize(chain_id + 1);
197
198
199
200
201
202
		}

		auto &chain = pt_chains_.at(chain_id);
		chain.id = chain_id;

		if(chain_pos >= static_cast<int>(chain.task_ids.size())) {
203
			chain.task_ids.resize(chain_pos + 1, -1);
204
			chain.params.resize(chain_pos + 1);
205
		}
206

207
208
209
210
		if(chain.task_ids.at(chain_pos) != -1) {
			throw std::runtime_error{"parallel tempering pt_chain map not unique"};
		}

211
		chain.params.at(chain_pos) = task.get<double>(pt_param);
212
213
		chain.task_ids.at(chain_pos) = i;

Lukas Weber's avatar
Lukas Weber committed
214
215
216
		const char *pt_sweep_error =
		    "in parallel tempering mode, sweeps are measured in global updates and need to be the "
		    "same within each chain";
217

218
219
220
221
222
223
		int target_sweeps = task.get<int>("sweeps");
		if(chain.target_sweeps < 0) {
			chain.target_sweeps = target_sweeps;
		} else if(target_sweeps != chain.target_sweeps) {
			throw std::runtime_error{pt_sweep_error};
		}
Lukas Weber's avatar
Lukas Weber committed
224

225
226
227
228
229
230
		int target_thermalization = task.get<int>("thermalization");
		if(chain.target_thermalization < 0) {
			chain.target_thermalization = target_thermalization;
		} else if(target_thermalization != chain.target_thermalization) {
			throw std::runtime_error{pt_sweep_error};
		}
Lukas Weber's avatar
Lukas Weber committed
231

232
233
234
		int sweeps = job_.read_dump_progress(i);
		if(chain.sweeps < 0) {
			int sweeps_per_global_update = task.get<int>("pt_sweeps_per_global_update");
Lukas Weber's avatar
Lukas Weber committed
235
			chain.sweeps = sweeps / sweeps_per_global_update;
236
237
238
239
		} else if(sweeps != chain.sweeps) {
			throw std::runtime_error{pt_sweep_error};
		}
	}
Lukas Weber's avatar
Lukas Weber committed
240

241
242
243
244
245
	chain_len_ = -1;
	for(auto &c : pt_chains_) {
		if(c.id == -1) {
			throw std::runtime_error{"parallel tempering pt_chain map contains gaps"};
		}
246

247
248
249
		if(chain_len_ != -1 && chain_len_ != static_cast<int>(c.task_ids.size())) {
			throw std::runtime_error{"parallel tempering pt_chains do not have the same length"};
		}
250

251
		chain_len_ = c.task_ids.size();
252
253
254

		c.nup_histogram.resize(chain_len_);
		c.ndown_histogram.resize(chain_len_);
Lukas Weber's avatar
Lukas Weber committed
255
		c.entries_before_optimization =
Lukas Weber's avatar
Lukas Weber committed
256
		    job_.jobfile["jobconfig"].get<int>("pt_parameter_optimization_nsamples_initial", 10000);
257
258
	}
	if(chain_len_ == -1) {
259
260
261
		throw std::runtime_error{
		    "parallel tempering pt_chain mapping missing. You need to specify 'pt_chain: "
		    "[chain_id, chain_position]' for every task in the job."};
262
263
	}

264
265
266
267
	if((num_active_ranks_ - 1) % chain_len_ != 0) {
		throw std::runtime_error{
		    "parallel tempering: number of ranks should be of the form chain_length*n+1 for best "
		    "efficiency"};
268
269
270
271
272
273
274
	}
}

void runner_pt_master::checkpoint_read() {
	construct_pt_chains();

	std::string master_dump_name = job_.jobdir() + "/pt_master.dump.h5";
275
	if(file_exists(master_dump_name)) {
276
277
278
279
280
281
		iodump dump = iodump::open_readonly(master_dump_name);
		auto g = dump.get_root();

		rng_->checkpoint_read(g.open_group("random_number_generator"));
		auto pt_chain_runs = g.open_group("pt_chain_runs");
		for(std::string name : pt_chain_runs) {
282
283
			pt_chain_runs_.emplace_back(
			    pt_chain_run::checkpoint_read(pt_chain_runs.open_group(name)));
284
285
		}

286
287
288
289
290
291
		auto pt_chains = g.open_group("pt_chains");
		for(std::string name : pt_chains) {
			int id = std::stoi(name);
			pt_chains_.at(id).checkpoint_read(pt_chains.open_group(name));
		}

292
293
294
295
296
297
298
		g.read("current_chain_id", current_chain_id_);
		uint8_t pt_swap_odd;
		g.read("pt_swap_odd", pt_swap_odd);
		pt_swap_odd_ = pt_swap_odd;
	}
}

299
300
301
302
303
304
305
306
307
308
309
310
311
312
void runner_pt_master::write_params_yaml() {
	using namespace YAML;
	Emitter params;
	params << BeginMap;
	for(auto c : pt_chains_) {
		params << Key << fmt::format("chain{:04d}", c.id);
		params << Value << Flow << c.params;
	}
	params << EndMap;

	std::ofstream file{job_.jobdir() + "/pt_optimized_params.yml"};
	file << params.c_str() << "\n";
}

313
314
315
316
void runner_pt_master::checkpoint_write() {
	std::string master_dump_name = job_.jobdir() + "/pt_master.dump.h5";

	job_.log(fmt::format("master: checkpoint {}", master_dump_name));
317

318
319
320
321
322
323
	iodump dump = iodump::create(master_dump_name);
	auto g = dump.get_root();

	rng_->checkpoint_write(g.open_group("random_number_generator"));
	auto pt_chain_runs = g.open_group("pt_chain_runs");
	for(auto &c : pt_chain_runs_) {
324
325
		c.checkpoint_write(
		    pt_chain_runs.open_group(fmt::format("chain{:04d}_run{:04d}", c.id, c.run_id)));
326
327
	}

328
329
330
331
332
	auto pt_chains = g.open_group("pt_chains");
	for(auto &c : pt_chains_) {
		c.checkpoint_write(pt_chains.open_group(fmt::format("{:04d}", c.id)));
	}

333
334
	g.write("current_chain_id", current_chain_id_);
	g.write("pt_swap_odd", static_cast<uint8_t>(pt_swap_odd_));
335
336
337
338

	if(use_param_optimization_) {
		write_params_yaml();
	}
339
340
341
342
343
}

void runner_pt_master::start() {
	MPI_Comm_size(MPI_COMM_WORLD, &num_active_ranks_);

344
	job_.log(fmt::format("starting job '{}' in parallel tempering mode", job_.jobname));
345
346
	checkpoint_read();

Lukas Weber's avatar
Lukas Weber committed
347
348
	use_param_optimization_ =
	    job_.jobfile["jobconfig"].get<bool>("pt_parameter_optimization", false);
349
350
351
352
353
354
	if(use_param_optimization_) {
		job_.log("using feedback parameter optimization");
	}

	std::vector<int> group_idx(num_active_ranks_);
	for(int i = 1; i < num_active_ranks_; i++) {
Lukas Weber's avatar
Lukas Weber committed
355
		group_idx[i] = (i - 1) / chain_len_;
356
	}
Lukas Weber's avatar
Lukas Weber committed
357
	MPI_Scatter(group_idx.data(), 1, MPI_INT, MPI_IN_PLACE, 1, MPI_INT, MASTER, MPI_COMM_WORLD);
Lukas Weber's avatar
Lukas Weber committed
358

359
360
	MPI_Comm tmp;
	MPI_Comm_split(MPI_COMM_WORLD, MPI_UNDEFINED, 0, &tmp);
Lukas Weber's avatar
Lukas Weber committed
361

362
363
	for(int rank_section = 0; rank_section < (num_active_ranks_ - 1) / chain_len_; rank_section++) {
		assign_new_chain(rank_section);
364
365
366
	}

	time_last_checkpoint_ = MPI_Wtime();
367

368
369
370
371
372
373
374
	while(num_active_ranks_ > 1) {
		react();
		if(MPI_Wtime() - time_last_checkpoint_ > job_.checkpoint_time) {
			time_last_checkpoint_ = MPI_Wtime();
			checkpoint_write();
		}
	}
375
	checkpoint_write();
376
377
378
379
380
381
382
383
384
385
}

int runner_pt_master::schedule_chain_run() {
	int old_id = current_chain_id_;
	int nchains = pt_chains_.size();
	for(int i = 1; i <= nchains; i++) {
		if(!pt_chains_[(old_id + i) % nchains].is_done()) {
			int new_chain_id = (old_id + i) % nchains;
			auto &chain = pt_chains_[new_chain_id];
			chain.scheduled_runs++;
386

387
388
389
390
391
392
393
394
395
			int idx = 0;
			for(auto &run : pt_chain_runs_) {
				if(run.id == chain.id && run.run_id == chain.scheduled_runs) {
					return idx;
				}
				idx++;
			}

			pt_chain_runs_.emplace_back(chain, chain.scheduled_runs);
396
			return pt_chain_runs_.size() - 1;
397
398
399
400
401
402
403
		}
	}

	// everything done!
	return -1;
}

404
int runner_pt_master::assign_new_chain(int rank_section) {
405
406
407
	int chain_run_id = schedule_chain_run();

	for(int target = 0; target < chain_len_; target++) {
408
		int msg[3] = {-1, 0, 0};
409
		if(chain_run_id > 0) {
410
411
412
413
			auto &chain_run = pt_chain_runs_[chain_run_id];
			auto &chain = pt_chains_[chain_run.id];
			msg[0] = chain.task_ids[target];
			msg[1] = chain_run.run_id;
414
			msg[2] = chain.target_sweeps + chain.target_thermalization - chain.sweeps;
415
416
417
418
		} else {
			// this will prompt the slave to quit
			num_active_ranks_--;
		}
419
		MPI_Send(&msg, sizeof(msg) / sizeof(msg[0]), MPI_INT,
420
		         rank_section * chain_len_ + target + 1, 0, MPI_COMM_WORLD);
421
	}
422
	rank_to_chain_run_[rank_section] = chain_run_id;
423
424
425
	return chain_run_id;
}

Lukas Weber's avatar
Lukas Weber committed
426
427
428
429
430
431
432
433
434
435
436
437
438
439
440
441
442
443
444
445
446
447
448
449
450
451
452
453
void runner_pt_master::pt_param_optimization(pt_chain &chain, pt_chain_run &chain_run) {
	for(size_t rank = 0; rank < chain_run.rank_to_pos.size(); rank++) {
		if(chain_run.rank_to_pos[rank] == 0) {
			chain_run.last_visited[rank] = 1;
		}
		if(chain_run.rank_to_pos[rank] ==
		   static_cast<int>(chain_run.rank_to_pos.size()) - 1) {
			chain_run.last_visited[rank] = -1;
		}

		chain.ndown_histogram[chain_run.rank_to_pos[rank]] +=
		    chain_run.last_visited[rank] == -1;
		chain.nup_histogram[chain_run.rank_to_pos[rank]] +=
		    chain_run.last_visited[rank] == 1;
	}
	chain.histogram_entries++;
	if(chain.histogram_entries >= chain.entries_before_optimization) {
		auto [flinearity, convergence] = chain.optimize_params(job_.jobfile["jobconfig"].get<int>(
		    "pt_parameter_optimization_linreg_len", 2));
		chain.clear_histograms();
		job_.log(fmt::format("chain {}: pt param optimization: entries={}, f linearity={:.2g}, convergence={:.2g}",
		                     chain.id, chain.entries_before_optimization, flinearity, convergence));

		chain.entries_before_optimization *= job_.jobfile["jobconfig"].get<double>(
		    "pt_parameter_optimization_nsamples_growth", 1.5);
	}
}

454
void runner_pt_master::react() {
455
	int rank_status;
456
	MPI_Status stat;
457
458
459
	MPI_Recv(&rank_status, 1, MPI_INT, MPI_ANY_SOURCE, 0, MPI_COMM_WORLD, &stat);
	int rank = stat.MPI_SOURCE - 1;
	if(rank_status == S_BUSY) {
460
		int msg[1];
Lukas Weber's avatar
Lukas Weber committed
461
		MPI_Recv(msg, sizeof(msg) / sizeof(msg[0]), MPI_INT, rank + 1, 0, MPI_COMM_WORLD, &stat);
462
463
		int completed_sweeps = msg[0];

464
		int chain_run_id = rank_to_chain_run_[rank / chain_len_];
465
466
467
		auto &chain_run = pt_chain_runs_[chain_run_id];
		auto &chain = pt_chains_[chain_run.id];

468
		chain.sweeps += completed_sweeps;
469
		if(chain.is_done()) {
470
471
472
473
			chain.scheduled_runs--;
			int action = A_NEW_JOB;

			if(chain.scheduled_runs > 0) {
Lukas Weber's avatar
Lukas Weber committed
474
475
				job_.log(fmt::format("chain {} has enough sweeps. Waiting for {} busy rank sets.",
				                     chain.id, chain.scheduled_runs));
476
477
478
479
			} else {
				job_.log(fmt::format("chain {} rank {} is done. Merging.", chain.id, rank + 1));
				action = A_PROCESS_DATA_NEW_JOB;
				checkpoint_write();
480
			}
481
482
483
484
			for(int target = 0; target < chain_len_; target++) {
				send_action(action, rank / chain_len_ * chain_len_ + target + 1);
			}
			assign_new_chain(rank / chain_len_);
485
		} else {
486
487
488
			for(int target = 0; target < chain_len_; target++) {
				send_action(A_CONTINUE, rank / chain_len_ * chain_len_ + target + 1);
			}
489
		}
490
491
	} else if(rank_status == S_READY_FOR_GLOBAL) {
		int chain_run_id = rank_to_chain_run_[rank / chain_len_];
492
493
		auto &chain_run = pt_chain_runs_[chain_run_id];
		auto &chain = pt_chains_[chain_run.id];
494
		int rank_section = rank / chain_len_;
Lukas Weber's avatar
Lukas Weber committed
495

496
		if(use_param_optimization_) {
Lukas Weber's avatar
Lukas Weber committed
497
			pt_param_optimization(chain, chain_run);
498
499
500
		}

		for(int target = 0; target < chain_len_; target++) {
Lukas Weber's avatar
Lukas Weber committed
501
			int target_rank = rank_section * chain_len_ + target + 1;
502
			int pos = chain_run.rank_to_pos[target];
503
			// keep consistent with pt_global_update
504
			int partner_pos = pos + (2 * (pos & 1) - 1) * (2 * pt_swap_odd_ - 1);
505
506
			if(partner_pos < 0 || partner_pos >= chain_len_) {
				int response = GA_SKIP;
507
				MPI_Send(&response, 1, MPI_INT, target_rank, 0, MPI_COMM_WORLD);
Lukas Weber's avatar
Lukas Weber committed
508
				chain_run.weight_ratios[pos] = 1;
509
510
			} else {
				int response = GA_CALC_WEIGHT;
511
				MPI_Send(&response, 1, MPI_INT, target_rank, 0, MPI_COMM_WORLD);
512

513
514
515
516
				double partner_param = chain.params[partner_pos];
				MPI_Send(&partner_param, 1, MPI_DOUBLE, target_rank, 0, MPI_COMM_WORLD);
			}
		}
517

518
		for(int target = 0; target < chain_len_; target++) {
Lukas Weber's avatar
Lukas Weber committed
519
			int target_rank = rank_section * chain_len_ + target + 1;
520
521
			int pos = chain_run.rank_to_pos[target];
			if(chain_run.weight_ratios[pos] < 0) {
522
				double weight;
523
				MPI_Recv(&weight, 1, MPI_DOUBLE, target_rank, 0, MPI_COMM_WORLD, &stat);
524
				assert(weight >= 0);
Lukas Weber's avatar
Lukas Weber committed
525
				chain_run.weight_ratios[pos] = weight;
526
			}
527
		}
528

529
530
531
532
533
534
		pt_global_update(chain, chain_run);
		std::fill(chain_run.weight_ratios.begin(), chain_run.weight_ratios.end(), -1);

		for(int target = 0; target < chain_len_; target++) {
			int new_task_id = chain.task_ids[chain_run.rank_to_pos[target]];
			double new_param = chain.params[chain_run.rank_to_pos[target]];
Lukas Weber's avatar
Lukas Weber committed
535
536
537
538
			MPI_Send(&new_task_id, 1, MPI_INT, rank_section * chain_len_ + target + 1, 0,
			         MPI_COMM_WORLD);
			MPI_Send(&new_param, 1, MPI_DOUBLE, rank_section * chain_len_ + target + 1, 0,
			         MPI_COMM_WORLD);
539
540
541
542
543
544
545
		}
	} else { // S_TIMEUP
		num_active_ranks_--;
	}
}

void runner_pt_master::pt_global_update(pt_chain &chain, pt_chain_run &chain_run) {
546
	for(int i = pt_swap_odd_; i < static_cast<int>(chain.task_ids.size()) - 1; i += 2) {
547
		double w1 = chain_run.weight_ratios[i];
548
		double w2 = chain_run.weight_ratios[i + 1];
549
550
551

		double r = rng_->random_double();

552
		if(r < w1 * w2) {
553
			for(auto &p : chain_run.rank_to_pos) {
Lukas Weber's avatar
Lukas Weber committed
554
				if(p == i) {
Lukas Weber's avatar
Lukas Weber committed
555
556
					p = i + 1;
				} else if(p == i + 1) {
Lukas Weber's avatar
Lukas Weber committed
557
558
559
					p = i;
				}
			}
560
561
562
563
564
565
566
567
568
569
570
571
572
573
		}
	}

	pt_swap_odd_ = !pt_swap_odd_;
}

runner_pt_slave::runner_pt_slave(jobinfo job, mc_factory mccreator)
    : job_{std::move(job)}, mccreator_{std::move(mccreator)} {}

void runner_pt_slave::start() {
	MPI_Comm_rank(MPI_COMM_WORLD, &rank_);
	time_start_ = MPI_Wtime();
	time_last_checkpoint_ = time_start_;

574
	int group_idx;
Lukas Weber's avatar
Lukas Weber committed
575
	MPI_Scatter(NULL, 1, MPI_INT, &group_idx, 1, MPI_INT, MASTER, MPI_COMM_WORLD);
576
	MPI_Comm_split(MPI_COMM_WORLD, group_idx, 0, &chain_comm_);
Lukas Weber's avatar
Lukas Weber committed
577

578
579
	MPI_Comm_rank(chain_comm_, &chain_rank_);

Lukas Weber's avatar
Lukas Weber committed
580
581
	bool use_param_optimization =
	    job_.jobfile["jobconfig"].get<bool>("pt_parameter_optimization", false);
582

583
584
585
586
587
588
589
590
591
	if(!accept_new_chain()) {
		return;
	}

	int action{};
	do {
		while(sweeps_since_last_query_ < sweeps_before_communication_) {
			sys_->_do_update();

592
			if(sys_->is_thermalized() && !use_param_optimization) {
593
594
595
596
				sys_->_do_measurement();
			}

			if(sys_->sweep() % sweeps_per_global_update_ == 0) {
597
				sys_->pt_measure_statistics();
598
				pt_global_update();
599
				sweeps_since_last_query_++;
600
601
602
603
604
605
			}

			if(is_checkpoint_time() || time_is_up()) {
				break;
			}
		}
606

607
		checkpoint_write();
608
		MPI_Barrier(chain_comm_);
609
610
611

		if(time_is_up()) {
			send_status(S_TIMEUP);
612
613
			job_.log(fmt::format("rank {} exits: walltime up (safety interval = {} s)", rank_,
			                     sys_->safe_exit_interval()));
614
615
616
617
618
619
620
621
622
623
624
			break;
		}
		action = what_is_next(S_BUSY);
	} while(action != A_EXIT);

	if(action == A_EXIT) {
		job_.log(fmt::format("rank {} exits: out of work", rank_));
	}
}

void runner_pt_slave::send_status(int status) {
625
	MPI_Send(&status, 1, MPI_INT, MASTER, 0, MPI_COMM_WORLD);
626
627
628
}

void runner_pt_slave::pt_global_update() {
629
630
631
	if(chain_rank_ == 0) {
		send_status(S_READY_FOR_GLOBAL);
	}
632
633

	int response;
634
	MPI_Recv(&response, 1, MPI_INT, MASTER, 0, MPI_COMM_WORLD, MPI_STATUS_IGNORE);
Lukas Weber's avatar
Lukas Weber committed
635
	// job_.log(fmt::format(" * rank {}: ready for global update", rank_));
636

637
638
	const std::string& param_name = job_.jobfile["jobconfig"].get<std::string>("parallel_tempering_parameter");

639
	if(response == GA_CALC_WEIGHT) {
640
		double partner_param;
641
		MPI_Recv(&partner_param, 1, MPI_DOUBLE, MASTER, 0, MPI_COMM_WORLD, MPI_STATUS_IGNORE);
642
		double weight_ratio = sys_->_pt_weight_ratio(param_name, partner_param);
643
		MPI_Send(&weight_ratio, 1, MPI_DOUBLE, MASTER, 0, MPI_COMM_WORLD);
Lukas Weber's avatar
Lukas Weber committed
644
		// job_.log(fmt::format(" * rank {}: weight sent", rank_));
645
	} else {
Lukas Weber's avatar
Lukas Weber committed
646
		// job_.log(fmt::format(" * rank {}: no weight needed", rank_));
647
	}
648
	MPI_Barrier(chain_comm_);
649
650
651
652

	// this may be a long wait

	double new_param;
653
	int new_task_id;
Lukas Weber's avatar
Lukas Weber committed
654

655
656
657
658
659
660
661
662
	MPI_Recv(&new_task_id, 1, MPI_INT, MASTER, 0, MPI_COMM_WORLD, MPI_STATUS_IGNORE);
	MPI_Recv(&new_param, 1, MPI_DOUBLE, MASTER, 0, MPI_COMM_WORLD, MPI_STATUS_IGNORE);

	if(new_task_id != task_id_ || current_param_ != new_param) {
		sys_->measurements_write(job_.rundir(task_id_, run_id_));
	}

	MPI_Barrier(chain_comm_);
663

664
665
	if(new_task_id != task_id_ || current_param_ != new_param) {
		task_id_ = new_task_id;
Lukas Weber's avatar
Lukas Weber committed
666
667
		sweeps_per_global_update_ = job_.jobfile["tasks"][job_.task_names[task_id_]].get<int>(
		    "pt_sweeps_per_global_update");
668
		sys_->_pt_update_param(param_name, new_param, job_.rundir(task_id_, run_id_));
669
670
	}
	current_param_ = new_param;
671
672
673
674
}

bool runner_pt_slave::accept_new_chain() {
	int msg[3];
675
	MPI_Recv(&msg, sizeof(msg) / sizeof(msg[0]), MPI_INT, 0, 0, MPI_COMM_WORLD, MPI_STATUS_IGNORE);
676
677
678
679
680
681
682
683
	task_id_ = msg[0];
	run_id_ = msg[1];
	sweeps_before_communication_ = msg[2];

	if(task_id_ < 0) {
		return false;
	}

684
685
	sweeps_per_global_update_ =
	    job_.jobfile["tasks"][job_.task_names[task_id_]].get<int>("pt_sweeps_per_global_update");
686

687
	sys_ = std::unique_ptr<mc>{mccreator_(job_.jobfile["tasks"][job_.task_names[task_id_]])};
688
	sys_->pt_mode_ = true;
689
690
691
692
693
694
695
696
697
698
699
700
	if(!sys_->_read(job_.rundir(task_id_, run_id_))) {
		sys_->_init();
		job_.log(fmt::format("* initialized {}", job_.rundir(task_id_, run_id_)));
		checkpoint_write();
	} else {
		job_.log(fmt::format("* read {}", job_.rundir(task_id_, run_id_)));
	}

	return true;
}

int runner_pt_slave::what_is_next(int status) {
701
702
	if(chain_rank_ == 0) {
		send_status(status);
703

704
705
706
		int msg[1] = {sweeps_since_last_query_};
		MPI_Send(msg, sizeof(msg) / sizeof(msg[0]), MPI_INT, 0, 0, MPI_COMM_WORLD);
	}
707
708
	sweeps_since_last_query_ = 0;
	int new_action = recv_action();
709

710
711
712
713
714
715
716
717
718
719
720
721
722
723
724
725
726
727
728
729
730
731
732
	if(new_action == A_EXIT) {
		return A_EXIT;
	}

	if(new_action != A_CONTINUE) {
		if(new_action == A_PROCESS_DATA_NEW_JOB) {
			merge_measurements();
		}

		if(!accept_new_chain()) {
			return A_EXIT;
		}
	}
	return A_CONTINUE;
}

void runner_pt_slave::checkpoint_write() {
	time_last_checkpoint_ = MPI_Wtime();
	sys_->_write(job_.rundir(task_id_, run_id_));
	job_.log(fmt::format("* rank {}: checkpoint {}", rank_, job_.rundir(task_id_, run_id_)));
}

void runner_pt_master::send_action(int action, int destination) {
733
	MPI_Send(&action, 1, MPI_INT, destination, 0, MPI_COMM_WORLD);
734
735
736
737
}

int runner_pt_slave::recv_action() {
	int new_action;
738
	MPI_Recv(&new_action, 1, MPI_INT, MASTER, 0, MPI_COMM_WORLD, MPI_STATUS_IGNORE);
739
740
741
742
743
744
745
746
	return new_action;
}

void runner_pt_slave::merge_measurements() {
	std::string unique_filename = job_.taskdir(task_id_);
	sys_->_write_output(unique_filename);

	std::vector<evalable> evalables;
747
748
749
750
751
752
753
	if(job_.jobfile["jobconfig"].get<bool>("pt_parameter_optimization", false)) {
		if(rank_ == 1) {
			job_.log("Running in parameter optimization mode. No evalables are calculated.");
		}
	} else {
		sys_->register_evalables(evalables);
	}
754
755
756
757
758
759
760
761
762
763
764
765
	job_.merge_task(task_id_, evalables);
}

bool runner_pt_slave::is_checkpoint_time() {
	return MPI_Wtime() - time_last_checkpoint_ > job_.checkpoint_time;
}

bool runner_pt_slave::time_is_up() {
	double safe_interval = 0;
	if(sys_ != nullptr) {
		safe_interval = sys_->safe_exit_interval();
	}
766
	return MPI_Wtime() - time_start_ > job_.walltime - safe_interval;
767
768
}
}