Commit b0bbba73 authored by Lukas Weber's avatar Lukas Weber

remove dependence on the working directory

instead of $(pwd)/JOBNAME.data, the job director is now the directory containing the parameter file.
This will not change the behavior of the loadl script, which changes directory and creates the parameters file at
$(pwd)/JOBNAME.data/parameters.json

However, when you used loadleveller without loadl, there were subtle complications when you do not get the working directory hopping correct.
The new behavior is more predictable and helps with that. One slightly ugly artifact is that the .results.json file gets created in “the parent directory of the job directory” which can be really messy if you don’t follow the same convention loadl uses. But I’ll leave it like this for
the sake of simplicity.

For good measure, I have extended the use of std::filesystem::path in the code.
parent f321abba
......@@ -45,19 +45,16 @@ static int parse_duration(const std::string &str) {
}
}
std::string jobinfo::jobdir() const {
return jobname + ".data";
std::filesystem::path jobinfo::taskdir(int task_id) const {
return jobdir / task_names.at(task_id);
}
std::string jobinfo::taskdir(int task_id) const {
return fmt::format("{}/{}", jobdir(), task_names.at(task_id));
std::filesystem::path jobinfo::rundir(int task_id, int run_id) const {
return taskdir(task_id) / fmt::format("run{:04d}",run_id);
}
std::string jobinfo::rundir(int task_id, int run_id) const {
return fmt::format("{}/run{:04d}", taskdir(task_id), run_id);
}
jobinfo::jobinfo(const std::string &jobfile_name, register_evalables_func evalable_func) : evalable_func_{evalable_func}, jobfile{jobfile_name} {
jobinfo::jobinfo(const std::filesystem::path &jobfile_name, register_evalables_func evalable_func)
: evalable_func_{evalable_func}, jobfile{jobfile_name}, jobdir{jobfile_name.parent_path()} {
for(auto node : jobfile["tasks"]) {
std::string task_name = node.first;
task_names.push_back(task_name);
......@@ -66,9 +63,8 @@ jobinfo::jobinfo(const std::string &jobfile_name, register_evalables_func evalab
jobname = jobfile.get<std::string>("jobname");
std::string datadir = jobdir();
std::error_code ec;
std::filesystem::create_directories(datadir, ec);
std::filesystem::create_directories(jobdir, ec);
// perhaps a bit controversally, jobinfo tries to create the task directories. TODO: single file
// output.
......@@ -85,10 +81,10 @@ jobinfo::jobinfo(const std::string &jobfile_name, register_evalables_func evalab
// This function lists files that could be run files being in the taskdir
// and having the right file_ending.
// The regex has to be matched with the output of the rundir function.
std::vector<std::string> jobinfo::list_run_files(const std::string &taskdir,
std::vector<std::filesystem::path> jobinfo::list_run_files(const std::string &taskdir,
const std::string &file_ending) {
std::regex run_filename{"^run\\d{4,}\\." + file_ending + "$"};
std::vector<std::string> results;
std::vector<std::filesystem::path> results;
for(const auto &p : std::filesystem::directory_iterator(taskdir)) {
if(std::regex_search(p.path().filename().string(), run_filename)) {
......@@ -116,10 +112,10 @@ int jobinfo::read_dump_progress(int task_id) const {
}
void jobinfo::concatenate_results() {
std::ofstream cat_results{fmt::format("{}.results.json", jobname)};
std::ofstream cat_results{jobdir.parent_path()/fmt::format("{}.results.json", jobname)};
cat_results << "[";
for(size_t i = 0; i < task_names.size(); i++) {
std::ifstream res_file{taskdir(i) + "/results.json"};
std::ifstream res_file{taskdir(i) / "results.json"};
res_file.seekg(0, res_file.end);
size_t size = res_file.tellg();
res_file.seekg(0, res_file.beg);
......@@ -136,7 +132,7 @@ void jobinfo::concatenate_results() {
}
void jobinfo::merge_task(int task_id) {
std::vector<std::string> meas_files = list_run_files(taskdir(task_id), "meas\\.h5");
std::vector<std::filesystem::path> meas_files = list_run_files(taskdir(task_id), "meas\\.h5");
size_t rebinning_bin_length = jobfile["jobconfig"].get<size_t>("merge_rebin_length", 0);
size_t sample_skip = jobfile["jobconfig"].get<size_t>("merge_sample_skip", 0);
results results = merge(meas_files, rebinning_bin_length, sample_skip);
......@@ -145,7 +141,7 @@ void jobinfo::merge_task(int task_id) {
evalable_func_(eval, jobfile["tasks"][task_names[task_id]]);
eval.append_results();
std::string result_filename = fmt::format("{}/results.json", taskdir(task_id));
std::filesystem::path result_filename = taskdir(task_id) / "results.json";
const std::string &task_name = task_names.at(task_id);
results.write_json(result_filename, taskdir(task_id), jobfile["tasks"][task_name].get_json());
}
......
......@@ -5,6 +5,7 @@
#include "parser.h"
#include <string>
#include <vector>
#include <filesystem>
namespace loadl {
......@@ -15,6 +16,7 @@ private:
register_evalables_func evalable_func_;
public:
parser jobfile;
const std::filesystem::path jobdir;
std::string jobname;
std::vector<std::string> task_names;
......@@ -22,13 +24,12 @@ public:
double checkpoint_time{};
double runtime{};
jobinfo(const std::string &jobfile_name, register_evalables_func evalable_func);
jobinfo(const std::filesystem::path &jobfile_name, register_evalables_func evalable_func);
std::string jobdir() const;
std::string rundir(int task_id, int run_id) const;
std::string taskdir(int task_id) const;
std::filesystem::path rundir(int task_id, int run_id) const;
std::filesystem::path taskdir(int task_id) const;
static std::vector<std::string> list_run_files(const std::string &taskdir,
static std::vector<std::filesystem::path> list_run_files(const std::string &taskdir,
const std::string &file_ending);
int read_dump_progress(int task_id) const;
void merge_task(int task_id);
......
......@@ -11,7 +11,7 @@ inline int merge_only(jobinfo job, const mc_factory &, int, char **) {
for(size_t task_id = 0; task_id < job.task_names.size(); task_id++) {
job.merge_task(task_id);
std::cout << fmt::format("-- {} merged\n", job.taskdir(task_id));
std::cout << fmt::format("-- {} merged\n", job.taskdir(task_id).string());
}
return 0;
......
......@@ -9,7 +9,7 @@
namespace loadl {
results merge(const std::vector<std::string> &filenames, size_t rebinning_bin_length,
results merge(const std::vector<std::filesystem::path> &filenames, size_t rebinning_bin_length,
size_t sample_skip) {
results res;
......
......@@ -2,10 +2,11 @@
#include "evalable.h"
#include "results.h"
#include <filesystem>
namespace loadl {
// if rebinning_bin_length is 0, cbrt(total_sample_count) is used as default.
results merge(const std::vector<std::string> &filenames, size_t rebinning_bin_length = 0,
results merge(const std::vector<std::filesystem::path> &filenames, size_t rebinning_bin_length = 0,
size_t skip = 0);
}
......@@ -157,10 +157,10 @@ void runner_slave::start() {
std::unique_ptr<mc>{mccreator_(job_.jobfile["tasks"][job_.task_names[task_id_]])};
if(!sys_->_read(job_.rundir(task_id_, run_id_))) {
sys_->_init();
job_.log(fmt::format("* initialized {}", job_.rundir(task_id_, run_id_)));
job_.log(fmt::format("* initialized {}", job_.rundir(task_id_, run_id_).string()));
checkpoint_write();
} else {
job_.log(fmt::format("* read {}", job_.rundir(task_id_, run_id_)));
job_.log(fmt::format("* read {}", job_.rundir(task_id_, run_id_).string()));
}
} else {
if(!sys_) {
......@@ -255,11 +255,11 @@ void runner_slave::checkpoint_write() {
time_last_checkpoint_ = MPI_Wtime();
sys_->_write(job_.rundir(task_id_, run_id_));
sys_->_write_finalize(job_.rundir(task_id_, run_id_));
job_.log(fmt::format("* rank {}: checkpoint {}", rank_, job_.rundir(task_id_, run_id_)));
job_.log(fmt::format("* rank {}: checkpoint {}", rank_, job_.rundir(task_id_, run_id_).string()));
}
void runner_slave::merge_measurements() {
std::string unique_filename = job_.taskdir(task_id_);
std::filesystem::path unique_filename = job_.taskdir(task_id_);
sys_->write_output(unique_filename);
job_.merge_task(task_id_);
......
......@@ -250,7 +250,7 @@ void runner_pt_master::construct_pt_chains() {
void runner_pt_master::checkpoint_read() {
construct_pt_chains();
std::string master_dump_name = job_.jobdir() + "/pt_master.dump.h5";
std::string master_dump_name = job_.jobdir / "pt_master.dump.h5";
if(std::filesystem::exists(master_dump_name)) {
job_.log(fmt::format("master reading dump from '{}'", master_dump_name));
iodump dump = iodump::open_readonly(master_dump_name);
......@@ -280,12 +280,12 @@ void runner_pt_master::write_params_json() {
params[fmt::format("chain{:04d}", c.id)] = c.params;
}
std::ofstream file{job_.jobdir() + "/pt_optimized_params.json"};
std::ofstream file{job_.jobdir / "pt_optimized_params.json"};
file << params.dump(1) << "\n";
}
void runner_pt_master::write_statistics(const pt_chain_run &chain_run) {
std::string stat_name = job_.jobdir() + "/pt_statistics.h5";
std::string stat_name = job_.jobdir / "pt_statistics.h5";
iodump stat = iodump::open_readwrite(stat_name);
auto g = stat.get_root();
......@@ -296,7 +296,7 @@ void runner_pt_master::write_statistics(const pt_chain_run &chain_run) {
}
void runner_pt_master::write_param_optimization_statistics(const pt_chain &chain) {
std::string stat_name = job_.jobdir() + "/pt_statistics.h5";
std::string stat_name = job_.jobdir / "pt_statistics.h5";
iodump stat = iodump::open_readwrite(stat_name);
auto g = stat.get_root();
......@@ -316,7 +316,7 @@ void runner_pt_master::write_param_optimization_statistics(const pt_chain &chain
}
void runner_pt_master::checkpoint_write() {
std::string master_dump_name = job_.jobdir() + "/pt_master.dump.h5";
std::string master_dump_name = job_.jobdir / "pt_master.dump.h5";
job_.log(fmt::format("master: checkpoint {}", master_dump_name));
......@@ -729,10 +729,10 @@ bool runner_pt_slave::accept_new_chain() {
sys_->pt_mode_ = true;
if(!sys_->_read(job_.rundir(task_id_, run_id_))) {
sys_->_init();
job_.log(fmt::format("* initialized {}", job_.rundir(task_id_, run_id_)));
job_.log(fmt::format("* initialized {}", job_.rundir(task_id_, run_id_).string()));
checkpoint_write();
} else {
job_.log(fmt::format("* read {}", job_.rundir(task_id_, run_id_)));
job_.log(fmt::format("* read {}", job_.rundir(task_id_, run_id_).string()));
}
return true;
......@@ -769,7 +769,7 @@ void runner_pt_slave::checkpoint_write() {
sys_->_write(job_.rundir(task_id_, run_id_));
MPI_Barrier(chain_comm_);
sys_->_write_finalize(job_.rundir(task_id_, run_id_));
job_.log(fmt::format("* rank {}: checkpoint {}", rank_, job_.rundir(task_id_, run_id_)));
job_.log(fmt::format("* rank {}: checkpoint {}", rank_, job_.rundir(task_id_, run_id_).string()));
}
void runner_pt_master::send_action(int action, int destination) {
......@@ -783,7 +783,7 @@ int runner_pt_slave::recv_action() {
}
void runner_pt_slave::merge_measurements() {
std::string unique_filename = job_.taskdir(task_id_);
std::filesystem::path unique_filename = job_.taskdir(task_id_);
sys_->write_output(unique_filename);
job_.merge_task(task_id_);
......
......@@ -27,9 +27,9 @@ int runner_single::start() {
sys_ = std::unique_ptr<mc>{mccreator_(job_.jobfile["tasks"][job_.task_names.at(task_id_)])};
if(!sys_->_read(job_.rundir(task_id_, 1))) {
sys_->_init();
job_.log(fmt::format("* initialized {}", job_.rundir(task_id_, 1)));
job_.log(fmt::format("* initialized {}", job_.rundir(task_id_, 1).string()));
} else {
job_.log(fmt::format("* read {}", job_.rundir(task_id_, 1)));
job_.log(fmt::format("* read {}", job_.rundir(task_id_, 1).string()));
}
while(!tasks_[task_id_].is_done() && !time_is_up()) {
......@@ -88,16 +88,16 @@ void runner_single::read() {
void runner_single::checkpointing() {
time_last_checkpoint_ = time(nullptr);
sys_->_write(job_.rundir(task_id_, 1));
sys_->_write(job_.rundir(task_id_, 1).string());
sys_->_write_finalize(job_.rundir(task_id_, 1));
job_.log(fmt::format("* checkpointing {}", job_.rundir(task_id_, 1)));
job_.log(fmt::format("* checkpointing {}", job_.rundir(task_id_, 1).string()));
}
void runner_single::merge_measurements() {
std::string unique_filename = job_.taskdir(task_id_);
std::filesystem::path unique_filename = job_.taskdir(task_id_);
sys_->write_output(unique_filename);
job_.log(fmt::format("merging {}", job_.taskdir(task_id_)));
job_.log(fmt::format("merging {}", job_.taskdir(task_id_).string()));
job_.merge_task(task_id_);
}
}
......@@ -3,14 +3,19 @@
mc=$1
testparams=$2
rm -rf silly_job_long.data
jobdir=silly_job_long.data
rm -rf $jobdir
mkdir -p $jobdir
mpirun -np 3 $mc $testparams
params=$jobdir/parameters.json
cp $testparams $params
mpirun -np 3 $mc $params
if [ $? -ne 0 ]; then
exit 1
fi
mpirun -np 3 $mc $testparams
mpirun -np 3 $mc $params
if [ $? -ne 0 ]; then
exit 1
fi
......@@ -26,7 +31,7 @@ file.close()
END
echo "trying to merge..."
$mc merge $testparams
$mc merge $params
if [ $? -ne 0 ]; then
exit 1
fi
......
......@@ -14,14 +14,21 @@ parser.add_argument('silly_mc')
parser.add_argument('test_param_file')
args = parser.parse_args()
shutil.rmtree('silly_job.data', ignore_errors=True)
proc = subprocess.run(['valgrind', args.silly_mc, 'single', args.test_param_file])
jobdir = 'silly_job.data'
shutil.rmtree(jobdir, ignore_errors=True)
os.makedirs(jobdir)
param_file = jobdir + '/parameters.json'
shutil.copy(args.test_param_file, param_file)
proc = subprocess.run(['valgrind', args.silly_mc, 'single', param_file])
if proc.returncode != 0:
sys.exit(1)
with open('silly_job.results.json', 'r') as f:
results = json.load(f)
with open(args.test_param_file, 'r') as f:
with open(param_file, 'r') as f:
rebin_size = json.load(f)['jobconfig']['merge_rebin_length']
sweeps = results[0]['parameters']['sweeps']
......
Markdown is supported
0% or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment