Commit 62fbc7d2 authored by Sonja Happ's avatar Sonja Happ

starting implementation of async bcast

parent 45989650
......@@ -1501,7 +1501,25 @@ void Model::forward_backward_sweep() {
IO->reset_number_of_fbs_sweeps();
IO->reset_loop_counters();
while (!forward_backward_sweep_finished) {
MPI_Request FBS_convergece_bcast;
MPI_Status MPI_bcast_status;
int MPI_bcast_complete = 0;
//check if finished
if(!context.contains(repast::AgentId(SLACK_ID,rank,TYPE_SLACK_INT, rank))){
// if this rank does not hold the slack
// it issues an asynchronous broadcast
if(world_size>1) {
IO->start_time_measurement(IO->tm_bcast_FBS);
//TODO use MPI_Ibcast instead
MPI_Ibcast(&forward_backward_sweep_finished, 1, MPI_C_BOOL, agent_rank_relation[SLACK_ID-1], MPI_COMM_WORLD, &FBS_convergece_bcast);
IO->stop_time_measurement(IO->tm_bcast_FBS);
}
}
while (!MPI_bcast_complete) {//(!forward_backward_sweep_finished) {
IO->log_info("####### number of loops: " + std::to_string(IO->get_number_of_fbs_sweeps()+1) + "#######");
IO->log_info("################ calculate components");
// calculation of all components
......@@ -1521,15 +1539,17 @@ void Model::forward_backward_sweep() {
// IO->log_info(IO->id2str(i->getId()));
//}
if(!local_agents.empty()) {
this->do_forward_sweep();
}
else {
IO->start_time_measurement(IO->tm_bcast_forward_sweep);
//TODO use MPI_Ibcast instead
MPI_Bcast(&forward_sweep_finished, 1, MPI_C_BOOL, agent_rank_relation[SLACK_ID-1], MPI_COMM_WORLD); // root was 0
IO->stop_time_measurement(IO->tm_bcast_forward_sweep);
}
this->do_forward_sweep();
// if(!local_agents.empty()) {
// this->do_forward_sweep();
// }
// else {
// IO->start_time_measurement(IO->tm_bcast_forward_sweep);
// //TODO use MPI_Ibcast instead
// MPI_Bcast(&forward_sweep_finished, 1, MPI_C_BOOL, agent_rank_relation[SLACK_ID-1], MPI_COMM_WORLD); // root was 0
// IO->stop_time_measurement(IO->tm_bcast_forward_sweep);
// }
......@@ -1542,19 +1562,25 @@ void Model::forward_backward_sweep() {
": Stop loop and send broadcast");
forward_backward_sweep_finished = true;
if(world_size>1) {
IO->start_time_measurement(IO->tm_bcast_FBS);
//TODO use MPI_Ibcast instead
MPI_Ibcast(&forward_backward_sweep_finished, 1, MPI_C_BOOL, agent_rank_relation[SLACK_ID-1], MPI_COMM_WORLD, &FBS_convergece_bcast);
IO->stop_time_measurement(IO->tm_bcast_FBS);
}
}
}
if(world_size>1) {
IO->start_time_measurement(IO->tm_bcast_FBS);
//TODO use MPI_Ibcast instead
MPI_Bcast(&forward_backward_sweep_finished, 1, MPI_C_BOOL, agent_rank_relation[SLACK_ID-1], MPI_COMM_WORLD);
IO->stop_time_measurement(IO->tm_bcast_FBS);
}
IO->increment_number_of_fbs_sweeps();
//synchronize changes: not needed because in last forward sweep iteration latest changes have been synchronized
//this->synchronize();
IO->flush_log();
// check status of asynchronous MPI Broadcast operation as stop criterion of while loop
MPI_Test(&FBS_convergece_bcast, &MPI_bcast_complete, &MPI_bcast_status);
IO->log_info("MPI-Test has returned: " + std::to_string(MPI_bcast_complete));
} //while
forward_backward_sweep_finished = false;
......@@ -1579,7 +1605,21 @@ void Model::do_backward_sweep() {
int counter= 0;
uint have_backward_sweeped = 0;
std::vector<bool> state(agents_scheduling.size(), false);
while(!backward_sweep_finished) {
MPI_Request backward_complete_bcast;
MPI_Status MPI_bcast_status;
int MPI_bcast_complete = 0;
if (!context.contains(repast::AgentId(SLACK_ID, rank, TYPE_SLACK_INT, rank))) {
// if rank does not contain slack it starts an async. broadcast here
if(world_size > 1){
//TODO use MPI_Ibcast instead
IO->log_info("Backward sweep: issuing bcast as receiver");
MPI_Ibcast(&backward_sweep_finished, 1, MPI_C_BOOL, agent_rank_relation[SLACK_ID-1], MPI_COMM_WORLD, &backward_complete_bcast);
}
}
while(!MPI_bcast_complete) {//(!backward_sweep_finished) {
if(have_backward_sweeped < agents_scheduling.size()){
//perform backward sweep for all node agents if not all node agents have finished backward sweeping
......@@ -1611,13 +1651,15 @@ void Model::do_backward_sweep() {
if ((slack->get_next_action_expected() == FORWARD_SWEEP)) {
backward_sweep_finished = true;
IO->log_info("backward_sweep_finished = " + std::to_string(backward_sweep_finished));
if(world_size > 1){
//TODO use MPI_Ibcast instead
IO->log_info("Backward sweep finished, sending bcast");
MPI_Ibcast(&backward_sweep_finished, 1, MPI_C_BOOL, agent_rank_relation[SLACK_ID-1], MPI_COMM_WORLD, &backward_complete_bcast);
}
}
}
if(world_size > 1){
//TODO use MPI_Ibcast instead
MPI_Bcast(&backward_sweep_finished, 1, MPI_C_BOOL, agent_rank_relation[SLACK_ID-1], MPI_COMM_WORLD);
}
IO->log_info("Backward sweep finished: " + std::to_string(backward_sweep_finished));
IO->stop_time_measurement(IO->tm_bcast_backward_sweep);
......@@ -1627,6 +1669,10 @@ void Model::do_backward_sweep() {
IO->start_time_measurement(IO->tm_backward_sweep);
IO->flush_log();
// check status of asynchronous MPI Broadcast operation as stop criterion of while loop
MPI_Test(&backward_complete_bcast, &MPI_bcast_complete, &MPI_bcast_status);
IO->log_info("Backward Sweep MPI-Test has returned: " + std::to_string(MPI_bcast_complete));
}
IO->add_to_backward_loop_counter(counter);
IO->log_info("backward sweep took " + std::to_string(counter) + " iterations.");
......@@ -1643,7 +1689,21 @@ void Model::do_forward_sweep() {
bool started_convergence_check = false;
uint checked_convergence = 0;
std::vector<uint> state(agents_scheduling.size(), DONE_NOTHING);
while(!forward_sweep_finished){
MPI_Request forward_complete_bcast;
MPI_Status MPI_bcast_status;
int MPI_bcast_complete = 0;
if (!context.contains(repast::AgentId(SLACK_ID, rank, TYPE_SLACK_INT, rank))) {
// if rank does not contain slack it starts an async. broadcast here
if(world_size > 1){
//TODO use MPI_Ibcast instead
IO->log_info("Forward sweep: issuing bcast as receiver");
MPI_Ibcast(&forward_sweep_finished, 1, MPI_C_BOOL, agent_rank_relation[SLACK_ID-1], MPI_COMM_WORLD, &forward_complete_bcast);
}
}
while(!MPI_bcast_complete) {//(!forward_sweep_finished){
if(checked_convergence < agents_scheduling.size()){
if(!started_convergence_check) {
//perform forward sweep for all node agents
......@@ -1690,10 +1750,15 @@ void Model::do_forward_sweep() {
Agent * slack = context.getAgent(repast::AgentId(SLACK_ID,rank,TYPE_SLACK_INT, rank));
if((slack->get_next_action_expected() == BACKWARD_SWEEP)){
forward_sweep_finished = true;
if(world_size > 1){
//TODO use MPI_Ibcast instead
IO->log_info("Forward sweep finished, sending bcast");
MPI_Ibcast(&forward_sweep_finished, 1, MPI_C_BOOL, agent_rank_relation[SLACK_ID-1], MPI_COMM_WORLD, &forward_complete_bcast);
}
}
}
//TODO use MPI_Ibcast instead
MPI_Bcast(&forward_sweep_finished, 1, MPI_C_BOOL, agent_rank_relation[SLACK_ID-1], MPI_COMM_WORLD); // root was 0
IO->stop_time_measurement(IO->tm_bcast_forward_sweep);
// in forward sweep info about next and previous electrical nodes are required
......@@ -1703,6 +1768,9 @@ void Model::do_forward_sweep() {
IO->flush_log();
// check status of asynchronous MPI Broadcast operation as stop criterion of while loop
MPI_Test(&forward_complete_bcast, &MPI_bcast_complete, &MPI_bcast_status);
IO->log_info("Forward Sweep MPI-Test has returned: " + std::to_string(MPI_bcast_complete));
}
IO->add_to_forward_loop_counter(counter);
IO->log_info("forward sweep took " + std::to_string(counter) + " iterations.");
......
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment