Select Git revision
ResourceStructuralData.cs
Code owners
Assign users and groups as approvers for specific file changes. Learn more.
main.cpp 15.15 KiB
#include "remote-worker.grpc.pb.h"
#include <CLI/App.hpp>
#include <CLI/Config.hpp>
#include <CLI/Formatter.hpp>
#include <ITA/SimulationScheduler/RoomAcoustics/rir_simulation_result.h>
#include <ITA/SimulationScheduler/Utils/json_config_utils.h>
#include <ITA/SimulationScheduler/scheduler_interface.h>
#include <ITA/SimulationScheduler/worker_remote.h>
#include <csignal>
#include <future>
#include <grpcpp/ext/proto_server_reflection_plugin.h>
#include <grpcpp/grpcpp.h>
#include <grpcpp/health_check_service_interface.h>
#include <iostream>
#include <memory>
#include <string>
#include <tbb/concurrent_queue.h>
//--- global variables ---
///
/// \brief Global exit request.
///
/// This promise object is used to signal the shutdown of the server.
///
std::promise<void> exit_requested;
///
/// \brief Encoders.
///
/// Transform the gRPC data structures to ITASimulationScheduler data structures.
/// \{
ITA::SimulationScheduler::RemoteWorkerRPC::SampleFrame encode( const ITASampleFrame objIn )
{
ITA::SimulationScheduler::RemoteWorkerRPC::SampleFrame objOut;
objOut.set_nsamples( objIn.GetLength( ) );
for( int channel = 0; channel < objIn.channels( ); ++channel )
{
const auto buf = objOut.add_buffers( );
for( int sample = 0; sample < objIn.length( ); ++sample )
{
buf->add_samples( objIn[channel][sample] );
}
}
return objOut;
}
ITA::SimulationScheduler::RemoteWorkerRPC::Object3D encode( const ITA::SimulationScheduler::C3DObject* objIn )
{
ITA::SimulationScheduler::RemoteWorkerRPC::Object3D objOut;
objOut.set_id( objIn->GetId( ) );
objOut.set_type( objIn->GetType( ) == ITA::SimulationScheduler::C3DObject::Type::source ? ITA::SimulationScheduler::RemoteWorkerRPC::Object3D_Type_SOURCE :
ITA::SimulationScheduler::RemoteWorkerRPC::Object3D_Type_RECEIVER );
objOut.mutable_position( )->set_x( objIn->GetPosition( )[0] );
objOut.mutable_position( )->set_y( objIn->GetPosition( )[1] );
objOut.mutable_position( )->set_z( objIn->GetPosition( )[2] );
objOut.mutable_quaternion( )->set_x( objIn->GetOrientation( )[0] );
objOut.mutable_quaternion( )->set_y( objIn->GetOrientation( )[1] );
objOut.mutable_quaternion( )->set_z( objIn->GetOrientation( )[2] );
objOut.mutable_quaternion( )->set_w( objIn->GetOrientation( )[3] );
return objOut;
}
ITA::SimulationScheduler::RemoteWorkerRPC::SimulationResult encode( const ITA::SimulationScheduler::CIRSimulationResult* objIn )
{
ITA::SimulationScheduler::RemoteWorkerRPC::SimulationResult objOut;
objOut.mutable_receiver( )->CopyFrom( encode( objIn->sourceReceiverPair.receiver ) );
objOut.mutable_source( )->CopyFrom( encode( objIn->sourceReceiverPair.source ) );
objOut.set_time_stamp( objIn->dTimeStamp );
objOut.mutable_ir_simulation_result( )->mutable_result( )->CopyFrom( encode( objIn->sfResult ) );
objOut.mutable_ir_simulation_result( )->set_leading_zero( objIn->iLeadingZeros );
objOut.mutable_ir_simulation_result( )->set_trailing_zeros( objIn->iTrailingZeros );
objOut.mutable_ir_simulation_result( )->set_zeros_stripped( objIn->bZerosStripped );
return objOut;
}
ITA::SimulationScheduler::RemoteWorkerRPC::SimulationResult encode( const ITA::SimulationScheduler::RoomAcoustics::CRIRSimulationResult* objIn )
{
ITA::SimulationScheduler::RemoteWorkerRPC::SimulationResult objOut = encode( static_cast<const ITA::SimulationScheduler::CIRSimulationResult*>( objIn ) );
if( objIn->eResultType == ITA::SimulationScheduler::RoomAcoustics::FieldOfDuty::directSound )
objOut.mutable_rir_simulation_result( )->set_result_type(
ITA::SimulationScheduler::RemoteWorkerRPC::RIRSimulationResult::FieldOfDuty::RIRSimulationResult_FieldOfDuty_DIRECT_SOUND );
else if( objIn->eResultType == ITA::SimulationScheduler::RoomAcoustics::FieldOfDuty::earlyReflections )
objOut.mutable_rir_simulation_result( )->set_result_type(
ITA::SimulationScheduler::RemoteWorkerRPC::RIRSimulationResult::FieldOfDuty::RIRSimulationResult_FieldOfDuty_EARLY_REFLECTIONS );
else if( objIn->eResultType == ITA::SimulationScheduler::RoomAcoustics::FieldOfDuty::diffuseDecay )
objOut.mutable_rir_simulation_result( )->set_result_type(
ITA::SimulationScheduler::RemoteWorkerRPC::RIRSimulationResult::FieldOfDuty::RIRSimulationResult_FieldOfDuty_DIFFUSE_DECAY );
objOut.mutable_rir_simulation_result( )->set_same_room( objIn->bSameRoom );
return objOut;
}
/// \}
///
/// \brief Decoders.
///
/// Transform the ITASimulationScheduler data structures to gRPC data structures.
/// \{
std::unique_ptr<ITA::SimulationScheduler::C3DObject> decode( const ITA::SimulationScheduler::RemoteWorkerRPC::Object3D& objIn )
{
VistaVector3D position( objIn.position( ).x( ), objIn.position( ).y( ), objIn.position( ).z( ) );
VistaQuaternion rotation( objIn.quaternion( ).x( ), objIn.quaternion( ).y( ), objIn.quaternion( ).z( ), objIn.quaternion( ).w( ) );
ITA::SimulationScheduler::C3DObject::Type type;
if( objIn.type( ) == ITA::SimulationScheduler::RemoteWorkerRPC::Object3D_Type_RECEIVER )
type = ITA::SimulationScheduler::C3DObject::Type::receiver;
else if( objIn.type( ) == ITA::SimulationScheduler::RemoteWorkerRPC::Object3D_Type_SOURCE )
type = ITA::SimulationScheduler::C3DObject::Type::source;
auto objOut = std::make_unique<ITA::SimulationScheduler::C3DObject>( position, rotation, type, objIn.id( ) );
return std::move( objOut );
}
std::unique_ptr<ITA::SimulationScheduler::CUpdateScene> decode( const ITA::SimulationScheduler::RemoteWorkerRPC::UpdateMessage& msg )
{
if( msg.has_scene( ) )
{
auto updateScene = std::make_unique<ITA::SimulationScheduler::CUpdateScene>( msg.scene( ).time_stamp( ) );
updateScene->SetSourceReceiverPair( decode( msg.scene( ).source( ) ), decode( msg.scene( ).receiver( ) ) );
return std::move( updateScene );
}
return nullptr;
}
/// \}
///
/// \brief Implementation of the worker service for the RPC.
///
/// This class also inherits the ITA::SimulationScheduler::ISchedulerInterface to be able to receive simulation results for the workers the service contains.
/// \todo Does reset and shutdown perform as expected aka in the way that the parent scheduler still works correctly?
/// \todo Add option to make the shutdown handle like a reset of the application, so that the used does not have to restart the application once a shutdown is received.
///
class WorkerServiceImpl final
: public ITA::SimulationScheduler::RemoteWorkerRPC::RemoteWorker::Service
, ITA::SimulationScheduler::ISchedulerInterface
{
public:
explicit WorkerServiceImpl( bool shutdownIsReset_ ) : shutdownIsReset( shutdownIsReset_ ) {}
///
/// \brief Dummy implementation for the ITA::SimulationScheduler::ISchedulerInterface.
///
void PushUpdate( std::unique_ptr<ITA::SimulationScheduler::IUpdateMessage> pUpdateMessage ) override { ITA_EXCEPT_NOT_IMPLEMENTED }
///
/// \brief Dummy implementation for the ITA::SimulationScheduler::ISchedulerInterface.
///
void DetachResultHandler( ITA::SimulationScheduler::IResultHandler* pResultHandler ) override { ITA_EXCEPT_NOT_IMPLEMENTED }
///
/// \brief Handle a finished simulation from the workers.
///
/// Enque the result in the resultQueue for ::resultFinished to send to the client.
///
void HandleSimulationFinished( std::unique_ptr<ITA::SimulationScheduler::CSimulationResult> pResult ) override { resultQueue.push( std::move( pResult ) ); }
///
/// \brief Dummy implementation for the ITA::SimulationScheduler::ISchedulerInterface.
///
bool IsBusy( ) const override { return false; }
///
/// \brief Handle an update received fom the client.
///
/// Decodes the protobuf object and pushes the object to a free worker.
/// \param context the context for the RPC call.
/// \param request the request object.
/// \param reply the reply object.
///
grpc::Status PushUpdate( grpc::ServerContext* context, const ITA::SimulationScheduler::RemoteWorkerRPC::UpdateMessage* request,
google::protobuf::Empty* reply ) override
{
for( auto&& worker: workers )
{
if( !worker->IsBusy( ) )
{
worker->PushUpdate( decode( *request ) );
break;
}
}
return grpc::Status::OK;
}
///
/// \brief Stream finished results back to the client.
///
/// Uses a server streaming RPC to send all finished results to the client.
/// This call only finishes if a reset is signaled.
/// While it is running, it tries to get any results from the ::resultQueue, encode it and send write it to the stream.
/// \param context the context for the RPC call.
/// \param request the request object.
/// \param replyWriter the writer for the replies.
///
grpc::Status ResultFinished( grpc::ServerContext* context, const google::protobuf::Empty* request,
grpc::ServerWriter<ITA::SimulationScheduler::RemoteWorkerRPC::SimulationResult>* replyWriter ) override
{
shutdownSignaled = false;
std::unique_ptr<ITA::SimulationScheduler::CSimulationResult> result;
while( !shutdownSignaled )
{
while( resultQueue.try_pop( result ) )
{
if( const auto pRIRResult = dynamic_cast<ITA::SimulationScheduler::RoomAcoustics::CRIRSimulationResult*>( result.get( ) ) )
{
auto res = encode( pRIRResult );
replyWriter->Write( res );
}
}
}
return grpc::Status::OK;
}
///
/// \brief Initiate the workers for the service.
/// \param context the context for the RPC call.
/// \param request the request object. Contains the json config of the remote worker, including the workers that are supposed to be running in the service.
/// \param reply the reply object.
///
grpc::Status InitWorker( grpc::ServerContext* context, const ITA::SimulationScheduler::RemoteWorkerRPC::InitMessage* request,
ITA::SimulationScheduler::RemoteWorkerRPC::Reply* reply ) override
{
ITA::SimulationScheduler::CWorkerRemote::WorkerRemoteConfig config;
const auto jnRoot = nlohmann::json::parse( request->config( ) );
const auto vistaConfig = ITA::SimulationScheduler::Utils::JSONConfigUtils::ReadVistaPropertyListFromJSON( jnRoot );
config.Load( vistaConfig );
for( const auto& workerConfig: config.vpWorkerConfigs )
{
workers.push_back( ITA::SimulationScheduler::CWorkerFactory::CreateWorker( workerConfig, this ) );
}
reply->set_success( true );
return grpc::Status::OK;
}
///
/// \brief Check if the RPC service is busy.
///
/// This is only the case if all contained workers are busy.
/// \param context the context for the RPC call.
/// \param request the request object.
/// \param reply the reply object.
///
grpc::Status IsBusy( grpc::ServerContext* context, const google::protobuf::Empty* request, google::protobuf::BoolValue* reply ) override
{
reply->set_value( std::all_of( workers.begin( ), workers.end( ),
[]( const std::unique_ptr<ITA::SimulationScheduler::IWorkerInterface>& worker ) { return worker->IsBusy( ); } ) );
return grpc::Status::OK;
}
///
/// \brief Reset the service.
/// \param context the context for the RPC call.
/// \param request the request object.
/// \param reply the reply object.
///
grpc::Status Reset( grpc::ServerContext* context, const google::protobuf::Empty* request, ITA::SimulationScheduler::RemoteWorkerRPC::Reply* reply ) override
{
std::cout << "Reset received, forwarding to workers.\n";
std::vector resettedWorker( workers.size( ), false );
while( !std::all_of( resettedWorker.begin( ), resettedWorker.end( ), []( const bool& val ) { return val; } ) )
{
int counter = 0;
for( auto&& worker: workers )
{
if( !worker->IsBusy( ) )
{
worker->Reset( );
resettedWorker[counter] = true;
}
counter++;
}
}
reply->set_success( true );
return grpc::Status::OK;
}
///
/// \brief Shutdown the service and the server.
///
/// When this RPC is called, the application will terminate.
/// \param context the context for the RPC call.
/// \param request the request object.
/// \param reply the reply object.
///
grpc::Status Shutdown( grpc::ServerContext* context, const google::protobuf::Empty* request, ITA::SimulationScheduler::RemoteWorkerRPC::Reply* reply ) override
{
std::cout << "Shutdown requested.\n";
shutdownSignaled = true;
if( shutdownIsReset )
{
workers.clear( );
}
else
{
exit_requested.set_value( );
}
reply->set_success( true );
return grpc::Status::OK;
}
private:
///
/// \brief Workers contained in this service.
///
std::vector<std::unique_ptr<ITA::SimulationScheduler::IWorkerInterface>> workers;
///
/// \brief Signal for a reset.
///
std::atomic_bool shutdownSignaled = false;
///
/// \brief Thread-safe queue for finished results.
///
/// Results are pushed on the queue when a worker finished a simulation and are removed when they get send to the client in ::resultFinished
///
tbb::concurrent_queue<std::unique_ptr<ITA::SimulationScheduler::CSimulationResult>> resultQueue;
///
/// \brief Indicate if a shutdown request resets the service.
///
bool shutdownIsReset = false;
};
///
/// \brief Run the RPC server on the given port.
/// \param port the port to listen on for RPC calls.
/// \param serverShutdownIsReset indicate if a shutdown request resets the application.
///
void RunServer( int port, bool serverShutdownIsReset )
{
// shutdown implementation from https://stackoverflow.com/questions/32504208/how-to-shutdown-grpc-server-from-client-using-rpc-function
auto handler = []( int s )
{
exit_requested.set_value( );
};
std::signal( SIGABRT, handler );
std::signal( SIGBREAK, handler );
std::signal( SIGINT, handler );
std::signal( SIGTERM, handler );
const std::string server_address( "0.0.0.0:" + std::to_string( port ) );
WorkerServiceImpl service( serverShutdownIsReset );
grpc::EnableDefaultHealthCheckService( true );
grpc::reflection::InitProtoReflectionServerBuilderPlugin( );
grpc::ServerBuilder builder;
// Listen on the given address without any authentication mechanism.
builder.AddListeningPort( server_address, grpc::InsecureServerCredentials( ) );
// Register "service" as the instance through which we'll communicate with clients.
builder.RegisterService( &service );
// Assemble the server.
const std::unique_ptr<grpc::Server> server( builder.BuildAndStart( ) );
std::cout << "Server listening on " << server_address << '\n';
std::cout << "To manually shut down press CTRL+C or close the window\n";
std::thread serving_thread( [&]( ) { server->Wait( ); } );
const auto f = exit_requested.get_future( );
f.wait( );
server->Shutdown( );
serving_thread.join( );
std::cout << "Server shutting down\n";
}
int main( int argc, char** argv )
{
CLI::App app {
R"(
Remote worker application for the ITASimulationScheduler.
This application runs a RPC server that listens for RPC calls on the given port.
)"
};
uint16_t port = 50052;
app.add_option( "port,-p,--port", port, "Port to listen for RPC calls." )
->check( CLI::Range( static_cast<uint16_t>( 1024 ), std::numeric_limits<uint16_t>::max( ) ) )
->capture_default_str( );
bool serverShutdownIsReset = false;
app.add_flag( "-s,--shutdown-is-reset", serverShutdownIsReset, "If given, the application is reset after a shutdown is received." );
try
{
app.parse( argc, argv );
}
catch( const CLI::ParseError& e )
{
return app.exit( e );
}
RunServer( port, serverShutdownIsReset );
return 0;
}