Commit 0184335a authored by maximilianlohoefer's avatar maximilianlohoefer
Browse files

parellel tempering now allows flow analysis with flag MCL_HISTOGRAM and

tracking of replicas with flag MCL_POS. Fixed a bug which caused the
stack to grow huge.
The boost library Mersenne twistor can now be used with flag MCL_BOOST
in random.h but currently does not support saving and restoring of
generator states.


git-svn-id: https://svn.rwth-aachen.de/repos/sw440870_load_leveller/trunk@16 36cca2a8-63f0-4188-8a56-663e3b0d1cf3
parent 15bf0014
......@@ -5,6 +5,8 @@
#define T_NEW_JOB 3
#define T_PARTNER 4
#define T_WEIGHT 5
#define T_LABEL 6
#define T_POS 7
#define S_IDLE 1
#define S_BUSY 2
......
......@@ -20,12 +20,12 @@ using namespace std;
int main ( int argc, char *argv[] )
{
if(argc < 2)
/* if(argc < 2)
{
cerr << "Usage: " << argv[0] <<" jobfile [walltime] [checkpointtime] "<< endl;
return 1;
}
*/
#ifdef MCL_PT
runner_pt my_run(argc,argv);
#else
......
......@@ -13,11 +13,11 @@ randomnumbergenerator :: randomnumbergenerator(luint seed)
my_seed=ptr->get_seed();
}
void randomnumbergenerator :: write(odump& d)
void randomnumbergenerator :: write(odump& d)
{
MTRand::uint32* randState;
randState = new MTRand::uint32[ MTRand::SAVE ];
ptr->save( randState );
ptr->save( randState );
d.write(randState,MTRand::SAVE);
d.write(my_seed);
delete [] randState;
......@@ -27,7 +27,7 @@ void randomnumbergenerator :: read(idump& d)
{
MTRand::uint32* randState;
randState = new MTRand::uint32[ MTRand::SAVE ];
d.read(randState,MTRand::SAVE);
d.read(randState,MTRand::SAVE);
d.read(my_seed);
ptr->load( randState );
delete [] randState;
......@@ -37,21 +37,21 @@ void randomnumbergenerator :: read(idump& d)
#ifdef MCL_RNG_SPRNG_4
randomnumbergenerator :: randomnumbergenerator()
{
int gtype = 4; //Multipl. Lagg. Fib.
ptr = SelectType(gtype);
int gtype = 4; //Multipl. Lagg. Fib.
ptr = SelectType(gtype);
my_seed=make_sprng_seed();
ptr->init_sprng(0, 1, my_seed, SPRNG_DEFAULT);
ptr->init_sprng(0, 1, my_seed, SPRNG_DEFAULT);
}
randomnumbergenerator :: randomnumbergenerator(luint seed)
{
int gtype = 4; //Multipl. Lagg. Fib.
ptr = SelectType(gtype);
int gtype = 4; //Multipl. Lagg. Fib.
ptr = SelectType(gtype);
my_seed=seed;
ptr->init_sprng(0, 1, my_seed, SPRNG_DEFAULT);
ptr->init_sprng(0, 1, my_seed, SPRNG_DEFAULT);
}
void randomnumbergenerator :: write(odump& d)
void randomnumbergenerator :: write(odump& d)
{
char* buffer;
int buffersize=ptr->pack_sprng(&buffer);
......@@ -74,15 +74,32 @@ void randomnumbergenerator :: read(idump& d)
}
#endif
// #ifdef MCL_RNG_BOOST
// randomnumbergenerator :: randomnumbergenerator(int seed)
// {
// rng = new boost::mt19937;
// val = new boost::uniform_read<> (0,1);
// die = new boost::variate_generator<boost::mt19937&, boost::uniform_real<> >
// (*rng, *val);
// }
// #endif
#ifdef MCL_RNG_BOOST
void randomnumbergenerator :: write(odump& d)
{
}
void randomnumbergenerator :: read(idump& d)
{
}
randomnumbergenerator :: randomnumbergenerator()
{
rng = new boost::mt19937;
val = new boost::uniform_real<> (0.0, 1.0);
my_seed = 5489;
}
randomnumbergenerator :: randomnumbergenerator(int seed)
{
rng = new boost::mt19937;
rng->seed(seed);
val = new boost::uniform_real<> (0.0, 1.0);
my_seed = seed;
}
#endif
//
// #ifdef MCL_RNG_ACML
// randomnumbergenerator :: randomnumbergenerator(int seed)
......@@ -92,4 +109,3 @@ void randomnumbergenerator :: read(idump& d)
//
// #endif
......@@ -11,20 +11,20 @@
#include "MersenneTwister.h"
class randomnumbergenerator
{
public:
randomnumbergenerator();
randomnumbergenerator(luint seed);
~randomnumbergenerator() {delete ptr;}
void write(odump&);
void read(idump&);
luint seed() {return my_seed;}
double d() {return ptr->randDblExc();} //in (0,1)
double d(double supp) {return ptr->randDblExc(supp);} //in (0,supp)
int i(int bound) {return ptr->randInt(bound-1);} //in [0,bound)
public:
randomnumbergenerator();
randomnumbergenerator(luint seed);
~randomnumbergenerator() {delete ptr;}
void write(odump&);
void read(idump&);
luint seed() {return my_seed;}
double d() {return ptr->randDblExc();} //in (0,1)
double d(double supp) {return ptr->randDblExc(supp);} //in (0,supp)
int i(int bound) {return ptr->randInt(bound-1);} //in [0,bound)
private:
MTRand* ptr;
luint my_seed;
private:
MTRand* ptr;
luint my_seed;
};
#endif
......@@ -33,42 +33,46 @@ class randomnumbergenerator
#include "sprng_cpp.h"
class randomnumbergenerator
{
public:
randomnumbergenerator();
randomnumbergenerator(luint seed);
~randomnumbergenerator() {delete ptr;}
void write(odump&);
void read(idump&);
luint seed() {return my_seed;}
double d() {return ptr->sprng();} //in (0,1)
double d(double supp) {return supp*d();} //in (0,supp)
int i(int bound) {return int(bound*d());} //in [0,bound)
public:
randomnumbergenerator();
randomnumbergenerator(luint seed);
~randomnumbergenerator() {delete ptr;}
void write(odump&);
void read(idump&);
luint seed() {return my_seed;}
double d() {return ptr->sprng();} //in (0,1)
double d(double supp) {return supp*d();} //in (0,supp)
int i(int bound) {return int(bound*d());} //in [0,bound)
private:
Sprng * ptr;
luint my_seed;
private:
Sprng * ptr;
luint my_seed;
};
#endif
// #ifdef MCL_RNG_BOOST
// #warning RNG: BOOST
// #include "boost/random.hpp"
// class randomnumbergenerator
// {
// public:
// randomnumbergenerator(int seed);
// double d() {return (*die)();} //returns a value between 0 (excl) and 1 (excl).
// double d(double supp) {return supp*d();} //returns a value between 0 (excl) and supp (excl.)
// int i(int bound) {return int(bound*d());} //returns a value between 0 (incl) and bound (excl.)
//
// private:
// boost::mt19937 * rng;
// boost::uniform_int<> * val;
// boost::variate_generator<boost::mt19937&, boost::uniform_int<> > * die;
// };
// #endif
//
#ifdef MCL_RNG_BOOST
#warning RNG: BOOST
#include <boost/random.hpp>
class randomnumbergenerator
{
public:
randomnumbergenerator();
randomnumbergenerator(int seed);
void write(odump&);
void read(idump&);
luint seed() {return my_seed;}
double d() {return (*val)(*rng);} //returns a value between 0 (excl) and 1 (excl).
double d(double supp) {return supp*d();} //returns a value between 0 (excl) and supp (excl.)
int i(int bound) {return int(bound*d());} //returns a value between 0 (incl) and bound (excl.)
private:
boost::mt19937 * rng;
boost::uniform_real<> * val;
luint my_seed;
};
#endif
// #ifdef MCL_RNG_ACML
// #warning RNG: ACML
// #include "acml_rand.h"
......
......@@ -107,8 +107,6 @@ void runner :: M_update(int node)
}
//M_write(); we will otherwise have too much redundant IO traffic
//M_report(); we will otherwise have a lot of output!
if(N_exit == world_size-1) {M_write();M_end_of_run();}
else M_wait();
}
void runner :: M_send_action(int action, int to)
......@@ -120,18 +118,21 @@ void runner :: M_send_action(int action, int to)
void runner :: M_wait()
{
MPI_Status stat;
int flag = 0;
while(!flag) {
if (is_chkpt_time()) M_write();
MPI_Iprobe(MPI_ANY_SOURCE, MPI_ANY_TAG, MPI_COMM_WORLD, &flag, &stat);
}
if(stat.MPI_TAG == T_STATUS)
M_update(stat.MPI_SOURCE);
else {
cerr<<my_rank<<": WARNING "<< stat.MPI_TAG<<" from "<<stat.MPI_SOURCE<<endl;
M_wait();
while (N_exit != world_size-1) {
MPI_Status stat;
int flag = 0;
while(!flag) {
if (is_chkpt_time()) M_write();
MPI_Iprobe(MPI_ANY_SOURCE, MPI_ANY_TAG, MPI_COMM_WORLD, &flag, &stat);
}
if(stat.MPI_TAG == T_STATUS)
M_update(stat.MPI_SOURCE);
else {
cerr<<my_rank<<": WARNING "<< stat.MPI_TAG<<" from "<<stat.MPI_SOURCE<<endl;
}
}
M_write();
M_end_of_run();
}
void runner :: M_write()
......
......@@ -8,26 +8,27 @@ runner_pt::runner_pt(int argc, char *argv[])
sys = NULL;
jobfile = argv[1];
parser parsedfile(jobfile);
if (argc>2) walltime=atof(argv[2]);
else walltime = parsedfile.return_value_of<double>("walltime");
if (argc>3) chktime=atof(argv[3]);
else chktime = parsedfile.return_value_of<double>("checkpointtime");
if (argc>2) walltime=atof(argv[2]);
else walltime = parsedfile.return_value_of<double>("walltime");
if (argc>3) chktime=atof(argv[3]);
else chktime = parsedfile.return_value_of<double>("checkpointtime");
if (argc>4) {//default is seconds
if (argv[4][0]=='h') {
walltime*=3600;
chktime*=3600;
}
if (argv[4][0]=='m') {
walltime*=60;
chktime*=60;
}
}
if (argv[4][0]=='h') {
walltime*=3600;
chktime*=3600;
}
if (argv[4][0]=='m') {
walltime*=60;
chktime*=60;
}
}
taskfiles= parsedfile.return_vector< string >("@taskfiles");
time_start = MPI_Wtime();
time_last_chkpt = time_start;
statusfile = parsedfile.value_or_default<string>("statusfile",jobfile+".status");
masterfile = parsedfile.value_or_default<string>("masterfile",jobfile+".master");
acceptfile = parsedfile.value_or_default<string>("acceptfile",jobfile+".accept");
positionsfile = parsedfile.value_or_default<string>("positionfile",jobfile+".pos");
STATUS = new std::ofstream (statusfile.c_str(), std::ios::out|std::ios::app);
if(my_rank == MASTER) {
pt_rng = new randomnumbergenerator();
......@@ -77,17 +78,17 @@ void runner_pt :: M_update(int node)
MPI_Recv(&node_status, 1, MPI_INT, node, T_STATUS, MPI_COMM_WORLD, &stat);
//(*STATUS) << my_rank << ": Status " << node_status << " from " << node << "\n";
if(node_status == S_GLOBAL_UPDATE) {
pt_node_waits_for_global_update[node-1]=true;
pt_N_global_update_waiter++;
if (pt_N_global_update_waiter == world_size-1) {
for (int i=1;i<world_size;++i) {
M_send_action(A_GLOBAL_UPDATE, i);
pt_node_waits_for_global_update[i-1]=false;
}
pt_N_global_update_waiter=0;
M_global_update();
}
}
pt_node_waits_for_global_update[node-1]=true;
pt_N_global_update_waiter++;
if (pt_N_global_update_waiter == world_size-1) {
for (int i=1;i<world_size;++i) {
M_send_action(A_GLOBAL_UPDATE, i);
pt_node_waits_for_global_update[i-1]=false;
}
pt_N_global_update_waiter=0;
M_global_update();
}
}
if (node_status == S_CHECKPOINT) {
one_task act;
int task_comm_size = sizeof(one_task) / sizeof(char);
......@@ -95,16 +96,16 @@ void runner_pt :: M_update(int node)
pt_node_steps_done[node-1]=act.steps_done;
pt_node_mes_done[node-1]=act.mes_done;
}
if (node_status == S_IDLE) {
if(time_is_up()) M_send_action(A_EXIT,node);
else {
if (node_status == S_IDLE) {
if(time_is_up()) M_send_action(A_EXIT,node);
else {
if (pt_node_order[node-1]==-1) {
++pt_node_task[node-1];
if (pt_node_task[node-1]==(int)tasks.size())
pt_node_task[node-1]=-1;
if(pt_node_task[node-1]<0) M_send_action(A_EXIT,node);
else {
M_send_action(A_NEW_JOB,node);
else {
M_send_action(A_NEW_JOB,node);
pt_node_order[node-1] =node-1;
pt_node_mes_done[node-1] =0;
pt_node_steps_done[node-1]=0;
......@@ -112,9 +113,9 @@ void runner_pt :: M_update(int node)
tasks[pt_node_task[node-1]].mes_done =pt_node_mes_done[node-1];
tasks[pt_node_task[node-1]].steps_done=pt_node_steps_done[node-1];
tasks[pt_node_task[node-1]].pt_pos =pt_node_order[node-1];
char * ptr = (char*) &tasks[pt_node_task[node-1]];
int task_comm_size = sizeof(one_task) / sizeof(char);
MPI_Send(ptr, task_comm_size, MPI_CHAR, node, T_NEW_JOB, MPI_COMM_WORLD);
char * ptr = (char*) &tasks[pt_node_task[node-1]];
int task_comm_size = sizeof(one_task) / sizeof(char);
MPI_Send(ptr, task_comm_size, MPI_CHAR, node, T_NEW_JOB, MPI_COMM_WORLD);
}
}
else {
......@@ -122,13 +123,13 @@ void runner_pt :: M_update(int node)
tasks[pt_node_task[node-1]].mes_done =pt_node_mes_done[node-1];
tasks[pt_node_task[node-1]].steps_done=pt_node_steps_done[node-1];
tasks[pt_node_task[node-1]].pt_pos =pt_node_order[node-1];
char * ptr = (char*) &tasks[pt_node_task[node-1]];
int task_comm_size = sizeof(one_task) / sizeof(char);
MPI_Send(ptr, task_comm_size, MPI_CHAR, node, T_NEW_JOB, MPI_COMM_WORLD);
}
}
}
char * ptr = (char*) &tasks[pt_node_task[node-1]];
int task_comm_size = sizeof(one_task) / sizeof(char);
MPI_Send(ptr, task_comm_size, MPI_CHAR, node, T_NEW_JOB, MPI_COMM_WORLD);
}
}
}
if (node_status == S_BUSY) {
M_send_action( ((time_is_up()) ? A_EXIT : A_CONTINUE), node);
}
......@@ -142,7 +143,7 @@ void runner_pt :: M_update(int node)
tasks[act.task_id].is_done = 1;
M_send_action(A_PROCESS_DATA_NEW_JOB,node);
M_report_acc_ratio();
pt_moves=0;
pt_moves = 0;
for (uint i=0;i<pt_accepted.size();++i)
pt_accepted[i]=0;
}
......@@ -152,8 +153,6 @@ void runner_pt :: M_update(int node)
M_write();
M_report();
}
if(N_exit + pt_N_global_update_waiter == world_size-1) M_end_of_run();
else M_wait();
}
void runner_pt :: M_send_action(int action, int to)
......@@ -166,16 +165,18 @@ void runner_pt :: M_send_action(int action, int to)
void runner_pt :: M_wait()
{
//(*STATUS) << my_rank << ": Waiting..." <<"\n";
MPI_Status stat;
int flag = 0;
while(!flag)
MPI_Iprobe(MPI_ANY_SOURCE, MPI_ANY_TAG, MPI_COMM_WORLD, &flag, &stat);
if(stat.MPI_TAG == T_STATUS)
M_update(stat.MPI_SOURCE);
else {
cerr<<my_rank<<": WARNING "<< stat.MPI_TAG<<" from "<<stat.MPI_SOURCE<<"\n";
M_wait();
while (N_exit + pt_N_global_update_waiter != world_size-1) {
MPI_Status stat;
int flag = 0;
while(!flag)
MPI_Iprobe(MPI_ANY_SOURCE, MPI_ANY_TAG, MPI_COMM_WORLD, &flag, &stat);
if(stat.MPI_TAG == T_STATUS)
M_update(stat.MPI_SOURCE);
else {
cerr<<my_rank<<": WARNING "<< stat.MPI_TAG<<" from "<<stat.MPI_SOURCE<<"\n";
}
}
M_end_of_run();
}
void runner_pt :: M_write()
......@@ -188,6 +189,10 @@ void runner_pt :: M_write()
d.write< int >(pt_node_mes_done);
d.write< int >(pt_node_steps_done);
d.write< int >(pt_accepted);
#ifdef MCL_HISTOGRAM
d.write< int >(histogram_up);
d.write< int >(histogram_down);
#endif
d.close();
}
......@@ -202,6 +207,10 @@ void runner_pt :: M_read()
d.read< int >(pt_node_mes_done);
d.read< int >(pt_node_steps_done);
d.read< int >(pt_accepted);
#ifdef MCL_HISTOGRAM
d.read< int >(histogram_up);
d.read< int >(histogram_down);
#endif
}
else {
pt_moves = 0;
......@@ -210,6 +219,10 @@ void runner_pt :: M_read()
pt_node_order.push_back(-1);
pt_node_mes_done.push_back(0);
pt_node_steps_done.push_back(0);
#ifdef MCL_HISTOGRAM
histogram_up.push_back(0);
histogram_down.push_back(0);
#endif
}
for (int i=1;i<world_size-1;++i)
pt_accepted.push_back(0);
......@@ -261,9 +274,21 @@ void runner_pt :: M_end_of_run()
rfile.close();
(*STATUS) << my_rank << ": Restart needed" << "\n";
}
#ifdef MCL_HISTOGRAM
else {
stringstream ss;
for (size_t i = 0; i < histogram_up.size(); ++i) {
ss << (double) histogram_up[i]/(histogram_up[i] + histogram_down[i]) << " ";
}
string flowfile = jobfile + ".flow";
ofstream flowf(flowfile.c_str());
flowf << ss.str();
flowf.close();
}
#endif
M_report();
M_write();
MPI_Finalize();
MPI_Finalize();
(*STATUS) << my_rank << ": MPI finalized" << "\n";
exit(0);
}
......@@ -272,10 +297,10 @@ void runner_pt :: M_report()
{
for(uint i = 0; i < pt_node_steps_done.size(); i ++) {
(*STATUS)
<< i+1 << "\t"
<< pt_node_steps_done[i] << "\t"
<< pt_node_mes_done[i] << "\t"
<< "\n";
<< i+1 << "\t"
<< pt_node_steps_done[i] << "\t"
<< pt_node_mes_done[i] << "\t"
<< "\n";
}
(*STATUS) << "\n";
}
......@@ -291,59 +316,81 @@ void runner_pt :: M_report_acc_ratio() {
void runner_pt :: M_global_update()
{
//(*STATUS) << my_rank << ": start global update"<< "\n";
++pt_moves;
for(int i = 0; i < world_size-2; i ++) {
++pt_moves;
int a=i;
int b=i+1;
MPI_Send(&a, 1, MPI_INT, pt_node_order[i+1]+1, T_PARTNER, MPI_COMM_WORLD);
MPI_Send(&b , 1, MPI_INT, pt_node_order[i]+1, T_PARTNER, MPI_COMM_WORLD);
// cerr<<"Send call to "<<pt_node_order[i+1]+1<<" and " << pt_node_order[i]+1<<"\n";
MPI_Status stat;
double W1,W2;
MPI_Recv(&W1, 1, MPI_DOUBLE, pt_node_order[i+1]+1, T_WEIGHT, MPI_COMM_WORLD, &stat);
MPI_Recv(&W2, 1, MPI_DOUBLE, pt_node_order[i]+1, T_WEIGHT, MPI_COMM_WORLD, &stat);
// cerr<<"Recieved weights"<<"\n";
if(pt_rng->d()<exp(W1+W2)) {
int tmp = pt_node_order[i];
pt_node_order[i] = pt_node_order[i+1];
pt_node_order[i+1] = tmp;
++pt_accepted[i];
}
// cerr<<"Send new positions"<<"\n";
MPI_Send(&b, 1, MPI_INT, pt_node_order[i+1]+1, T_PARTNER, MPI_COMM_WORLD);
MPI_Send(&a, 1, MPI_INT, pt_node_order[i]+1, T_PARTNER, MPI_COMM_WORLD);
// cerr<<"Sending done"<<"\n";
}
int tmp=-1;
for(int i = 1; i < world_size; i ++)
MPI_Send(&tmp, 1, MPI_INT, i, T_PARTNER, MPI_COMM_WORLD);
//(*STATUS) << my_rank << ": did global update"<< "\n";
//(*STATUS) << my_rank << ": start global update"<< "\n";
#ifdef MCL_POS
std::ofstream afile(positionsfile.c_str(),std::ios::out|std::ios::app);
for (uint i = 0; i < pt_node_order.size(); ++i)
afile<< pt_node_order[i] << " ";
afile << "\n";
afile.close();
#endif
++pt_moves;
for(int i = 0; i < world_size-2; i ++) {
int a=i;
int b=i+1;
MPI_Send(&a, 1, MPI_INT, pt_node_order[i+1]+1, T_PARTNER, MPI_COMM_WORLD);
MPI_Send(&b , 1, MPI_INT, pt_node_order[i]+1, T_PARTNER, MPI_COMM_WORLD);
//cerr<<"Send call to "<<pt_node_order[i+1]+1<<" and " << pt_node_order[i]+1<<"\n";
MPI_Status stat;
double W1,W2;
MPI_Recv(&W1, 1, MPI_DOUBLE, pt_node_order[i+1]+1, T_WEIGHT, MPI_COMM_WORLD, &stat);
MPI_Recv(&W2, 1, MPI_DOUBLE, pt_node_order[i]+1, T_WEIGHT, MPI_COMM_WORLD, &stat);
//cerr<<"Recieved weights"<<"\n";
if(pt_rng->d()<exp(W1+W2)) {
int tmp = pt_node_order[i];
pt_node_order[i] = pt_node_order[i+1];
pt_node_order[i+1] = tmp;
++pt_accepted[i];
}
//cerr<<"Send new positions"<<"\n";
MPI_Send(&b, 1, MPI_INT, pt_node_order[i+1]+1, T_PARTNER, MPI_COMM_WORLD);
MPI_Send(&a, 1, MPI_INT, pt_node_order[i]+1, T_PARTNER, MPI_COMM_WORLD);
//cerr<<"Sending done"<<"\n";
}
int tmp=-1;
for(int i = 1; i < world_size; i ++) {
MPI_Send(&tmp, 1, MPI_INT, i, T_PARTNER, MPI_COMM_WORLD);
#ifdef MCL_HISTOGRAM
int label, pos;
MPI_Status stat;
MPI_Recv(&pos, 1, MPI_INT, i, T_POS, MPI_COMM_WORLD, &stat);
MPI_Recv(&label, 1, MPI_INT, i, T_LABEL, MPI_COMM_WORLD, &stat);
if (label == 1)
histogram_up[pos]++;
else if (label == -1)
histogram_down[pos]++;
#endif
}
//(*STATUS) << my_rank << ": did global update"<< "\n";
}
void runner_pt :: global_update()
{
int ini_pos=my_task.pt_pos;
bool finished = false;
while(!finished) {
MPI_Status stat;
int partner;
MPI_Recv(&partner, 1, MPI_INT, MASTER, T_PARTNER, MPI_COMM_WORLD, &stat);
if (partner == -1) finished = true;
else {
double weight = (*sys).get_weight(partner);
MPI_Send(&weight, 1, MPI_DOUBLE, MASTER, T_WEIGHT, MPI_COMM_WORLD);
int new_pos;
MPI_Recv(&new_pos, 1, MPI_INT, MASTER, T_PARTNER, MPI_COMM_WORLD, &stat);
if(new_pos != my_task.pt_pos) {
my_task.pt_pos = new_pos;