Skip to content
Snippets Groups Projects
Select Git revision
  • 08af9ed06d05b87ee3b29592702cdeeb94096278
  • main default protected
  • gitkeep
  • dev protected
  • Issue/2914-trellisMigrator
  • Issue/2847-reporting
  • Hotfix/2776-workingNewVersion
  • Hotfix/xxxx-correctAssignments
  • Issue/2666-adminCronjobs-theSequal
  • Issue/2666-adminCronjobs
  • Issue/2518-docs
  • Hotfix/xxxx-coscineGraph
  • Issue/2304-virtuosoRoars
  • Fix/v0.1.7-dependencies
  • Hotfix/2212-fixFiles
  • Issue/2222-resourceDateCreated
  • Issue/2221-projectDateCreated
  • Hotfix/xxxx-changeUrls
  • Issue/1321-pidEnquiryOverhaul
  • Issue/1782-structualDataIntegration
  • Issue/2084-migrateResourceStructuralData
  • v0.1.24
  • v0.1.23
  • v0.1.22
  • v0.1.21
  • v0.1.20
  • v0.1.19
  • v0.1.18
  • v0.1.17
  • v0.1.16
  • v0.1.15
  • v0.1.14
  • v0.1.13
  • v0.1.12
  • v0.1.11
  • v0.1.10
  • v0.1.9
  • v0.1.7
  • v0.1.8
  • v0.1.6
  • v0.1.5
41 results

ResourceStructuralData.cs

Blame
  • Code owners
    Assign users and groups as approvers for specific file changes. Learn more.
    main.cpp 15.15 KiB
    #include "remote-worker.grpc.pb.h"
    
    #include <CLI/App.hpp>
    #include <CLI/Config.hpp>
    #include <CLI/Formatter.hpp>
    #include <ITA/SimulationScheduler/RoomAcoustics/rir_simulation_result.h>
    #include <ITA/SimulationScheduler/Utils/json_config_utils.h>
    #include <ITA/SimulationScheduler/scheduler_interface.h>
    #include <ITA/SimulationScheduler/worker_remote.h>
    #include <csignal>
    #include <future>
    #include <grpcpp/ext/proto_server_reflection_plugin.h>
    #include <grpcpp/grpcpp.h>
    #include <grpcpp/health_check_service_interface.h>
    #include <iostream>
    #include <memory>
    #include <string>
    #include <tbb/concurrent_queue.h>
    
    //--- global variables ---
    
    ///
    /// \brief Global exit request.
    ///
    /// This promise object is used to signal the shutdown of the server.
    ///
    std::promise<void> exit_requested;
    
    
    ///
    /// \brief Encoders.
    ///
    /// Transform the gRPC data structures to ITASimulationScheduler data structures.
    /// \{
    ITA::SimulationScheduler::RemoteWorkerRPC::SampleFrame encode( const ITASampleFrame objIn )
    {
    	ITA::SimulationScheduler::RemoteWorkerRPC::SampleFrame objOut;
    	objOut.set_nsamples( objIn.GetLength( ) );
    
    	for( int channel = 0; channel < objIn.channels( ); ++channel )
    	{
    		const auto buf = objOut.add_buffers( );
    
    		for( int sample = 0; sample < objIn.length( ); ++sample )
    		{
    			buf->add_samples( objIn[channel][sample] );
    		}
    	}
    
    	return objOut;
    }
    
    ITA::SimulationScheduler::RemoteWorkerRPC::Object3D encode( const ITA::SimulationScheduler::C3DObject* objIn )
    {
    	ITA::SimulationScheduler::RemoteWorkerRPC::Object3D objOut;
    
    	objOut.set_id( objIn->GetId( ) );
    	objOut.set_type( objIn->GetType( ) == ITA::SimulationScheduler::C3DObject::Type::source ? ITA::SimulationScheduler::RemoteWorkerRPC::Object3D_Type_SOURCE :
                                                                                                  ITA::SimulationScheduler::RemoteWorkerRPC::Object3D_Type_RECEIVER );
    
    	objOut.mutable_position( )->set_x( objIn->GetPosition( )[0] );
    	objOut.mutable_position( )->set_y( objIn->GetPosition( )[1] );
    	objOut.mutable_position( )->set_z( objIn->GetPosition( )[2] );
    
    	objOut.mutable_quaternion( )->set_x( objIn->GetOrientation( )[0] );
    	objOut.mutable_quaternion( )->set_y( objIn->GetOrientation( )[1] );
    	objOut.mutable_quaternion( )->set_z( objIn->GetOrientation( )[2] );
    	objOut.mutable_quaternion( )->set_w( objIn->GetOrientation( )[3] );
    
    	return objOut;
    }
    
    ITA::SimulationScheduler::RemoteWorkerRPC::SimulationResult encode( const ITA::SimulationScheduler::CIRSimulationResult* objIn )
    {
    	ITA::SimulationScheduler::RemoteWorkerRPC::SimulationResult objOut;
    
    	objOut.mutable_receiver( )->CopyFrom( encode( objIn->sourceReceiverPair.receiver ) );
    	objOut.mutable_source( )->CopyFrom( encode( objIn->sourceReceiverPair.source ) );
    	objOut.set_time_stamp( objIn->dTimeStamp );
    	objOut.mutable_ir_simulation_result( )->mutable_result( )->CopyFrom( encode( objIn->sfResult ) );
    	objOut.mutable_ir_simulation_result( )->set_leading_zero( objIn->iLeadingZeros );
    	objOut.mutable_ir_simulation_result( )->set_trailing_zeros( objIn->iTrailingZeros );
    	objOut.mutable_ir_simulation_result( )->set_zeros_stripped( objIn->bZerosStripped );
    
    
    	return objOut;
    }
    
    ITA::SimulationScheduler::RemoteWorkerRPC::SimulationResult encode( const ITA::SimulationScheduler::RoomAcoustics::CRIRSimulationResult* objIn )
    {
    	ITA::SimulationScheduler::RemoteWorkerRPC::SimulationResult objOut = encode( static_cast<const ITA::SimulationScheduler::CIRSimulationResult*>( objIn ) );
    
    	if( objIn->eResultType == ITA::SimulationScheduler::RoomAcoustics::FieldOfDuty::directSound )
    		objOut.mutable_rir_simulation_result( )->set_result_type(
    		    ITA::SimulationScheduler::RemoteWorkerRPC::RIRSimulationResult::FieldOfDuty::RIRSimulationResult_FieldOfDuty_DIRECT_SOUND );
    	else if( objIn->eResultType == ITA::SimulationScheduler::RoomAcoustics::FieldOfDuty::earlyReflections )
    		objOut.mutable_rir_simulation_result( )->set_result_type(
    		    ITA::SimulationScheduler::RemoteWorkerRPC::RIRSimulationResult::FieldOfDuty::RIRSimulationResult_FieldOfDuty_EARLY_REFLECTIONS );
    	else if( objIn->eResultType == ITA::SimulationScheduler::RoomAcoustics::FieldOfDuty::diffuseDecay )
    		objOut.mutable_rir_simulation_result( )->set_result_type(
    		    ITA::SimulationScheduler::RemoteWorkerRPC::RIRSimulationResult::FieldOfDuty::RIRSimulationResult_FieldOfDuty_DIFFUSE_DECAY );
    
    	objOut.mutable_rir_simulation_result( )->set_same_room( objIn->bSameRoom );
    
    	return objOut;
    }
    /// \}
    
    
    ///
    /// \brief Decoders.
    ///
    /// Transform the ITASimulationScheduler data structures to gRPC data structures.
    /// \{
    std::unique_ptr<ITA::SimulationScheduler::C3DObject> decode( const ITA::SimulationScheduler::RemoteWorkerRPC::Object3D& objIn )
    {
    	VistaVector3D position( objIn.position( ).x( ), objIn.position( ).y( ), objIn.position( ).z( ) );
    	VistaQuaternion rotation( objIn.quaternion( ).x( ), objIn.quaternion( ).y( ), objIn.quaternion( ).z( ), objIn.quaternion( ).w( ) );
    
    	ITA::SimulationScheduler::C3DObject::Type type;
    	if( objIn.type( ) == ITA::SimulationScheduler::RemoteWorkerRPC::Object3D_Type_RECEIVER )
    		type = ITA::SimulationScheduler::C3DObject::Type::receiver;
    	else if( objIn.type( ) == ITA::SimulationScheduler::RemoteWorkerRPC::Object3D_Type_SOURCE )
    		type = ITA::SimulationScheduler::C3DObject::Type::source;
    
    	auto objOut = std::make_unique<ITA::SimulationScheduler::C3DObject>( position, rotation, type, objIn.id( ) );
    
    	return std::move( objOut );
    }
    
    std::unique_ptr<ITA::SimulationScheduler::CUpdateScene> decode( const ITA::SimulationScheduler::RemoteWorkerRPC::UpdateMessage& msg )
    {
    	if( msg.has_scene( ) )
    	{
    		auto updateScene = std::make_unique<ITA::SimulationScheduler::CUpdateScene>( msg.scene( ).time_stamp( ) );
    
    		updateScene->SetSourceReceiverPair( decode( msg.scene( ).source( ) ), decode( msg.scene( ).receiver( ) ) );
    
    		return std::move( updateScene );
    	}
    	return nullptr;
    }
    /// \}
    
    ///
    /// \brief Implementation of the worker service for the RPC.
    ///
    /// This class also inherits the ITA::SimulationScheduler::ISchedulerInterface to be able to receive simulation results for the workers the service contains.
    /// \todo Does reset and shutdown perform as expected aka in the way that the parent scheduler still works correctly?
    /// \todo Add option to make the shutdown handle like a reset of the application, so that the used does not have to restart the application once a shutdown is received.
    ///
    class WorkerServiceImpl final
        : public ITA::SimulationScheduler::RemoteWorkerRPC::RemoteWorker::Service
        , ITA::SimulationScheduler::ISchedulerInterface
    {
    public:
    	explicit WorkerServiceImpl( bool shutdownIsReset_ ) : shutdownIsReset( shutdownIsReset_ ) {}
    
    	///
    	/// \brief Dummy implementation for the ITA::SimulationScheduler::ISchedulerInterface.
    	///
    	void PushUpdate( std::unique_ptr<ITA::SimulationScheduler::IUpdateMessage> pUpdateMessage ) override { ITA_EXCEPT_NOT_IMPLEMENTED }
    
    	///
    	/// \brief Dummy implementation for the ITA::SimulationScheduler::ISchedulerInterface.
    	///
    	void DetachResultHandler( ITA::SimulationScheduler::IResultHandler* pResultHandler ) override { ITA_EXCEPT_NOT_IMPLEMENTED }
    
    	///
    	/// \brief Handle a finished simulation from the workers.
    	///
    	/// Enque the result in the resultQueue for ::resultFinished to send to the client.
    	///
    	void HandleSimulationFinished( std::unique_ptr<ITA::SimulationScheduler::CSimulationResult> pResult ) override { resultQueue.push( std::move( pResult ) ); }
    
    	///
    	/// \brief Dummy implementation for the ITA::SimulationScheduler::ISchedulerInterface.
    	///
    	bool IsBusy( ) const override { return false; }
    
    	///
    	/// \brief Handle an update received fom the client.
    	///
    	/// Decodes the protobuf object and pushes the object to a free worker.
    	/// \param context the context for the RPC call.
    	/// \param request the request object.
    	/// \param reply the reply object.
    	///
    	grpc::Status PushUpdate( grpc::ServerContext* context, const ITA::SimulationScheduler::RemoteWorkerRPC::UpdateMessage* request,
    	                         google::protobuf::Empty* reply ) override
    	{
    		for( auto&& worker: workers )
    		{
    			if( !worker->IsBusy( ) )
    			{
    				worker->PushUpdate( decode( *request ) );
    				break;
    			}
    		}
    
    		return grpc::Status::OK;
    	}
    
    	///
    	/// \brief Stream finished results back to the client.
    	///
    	/// Uses a server streaming RPC to send all finished results to the client.
    	/// This call only finishes if a reset is signaled.
    	/// While it is running, it tries to get any results from the ::resultQueue, encode it and send write it to the stream.
    	/// \param context the context for the RPC call.
    	/// \param request the request object.
    	/// \param replyWriter the writer for the replies.
    	///
    	grpc::Status ResultFinished( grpc::ServerContext* context, const google::protobuf::Empty* request,
    	                             grpc::ServerWriter<ITA::SimulationScheduler::RemoteWorkerRPC::SimulationResult>* replyWriter ) override
    	{
    		shutdownSignaled = false;
    
    		std::unique_ptr<ITA::SimulationScheduler::CSimulationResult> result;
    		while( !shutdownSignaled )
    		{
    			while( resultQueue.try_pop( result ) )
    			{
    				if( const auto pRIRResult = dynamic_cast<ITA::SimulationScheduler::RoomAcoustics::CRIRSimulationResult*>( result.get( ) ) )
    				{
    					auto res = encode( pRIRResult );
    					replyWriter->Write( res );
    				}
    			}
    		}
    
    		return grpc::Status::OK;
    	}
    
    	///
    	/// \brief Initiate the workers for the service.
    	/// \param context the context for the RPC call.
    	/// \param request the request object. Contains the json config of the remote worker, including the workers that are supposed to be running in the service.
    	/// \param reply the reply object.
    	///
    	grpc::Status InitWorker( grpc::ServerContext* context, const ITA::SimulationScheduler::RemoteWorkerRPC::InitMessage* request,
    	                         ITA::SimulationScheduler::RemoteWorkerRPC::Reply* reply ) override
    	{
    		ITA::SimulationScheduler::CWorkerRemote::WorkerRemoteConfig config;
    
    		const auto jnRoot = nlohmann::json::parse( request->config( ) );
    
    		const auto vistaConfig = ITA::SimulationScheduler::Utils::JSONConfigUtils::ReadVistaPropertyListFromJSON( jnRoot );
    
    		config.Load( vistaConfig );
    
    		for( const auto& workerConfig: config.vpWorkerConfigs )
    		{
    			workers.push_back( ITA::SimulationScheduler::CWorkerFactory::CreateWorker( workerConfig, this ) );
    		}
    
    		reply->set_success( true );
    
    		return grpc::Status::OK;
    	}
    
    	///
    	/// \brief Check if the RPC service is busy.
    	///
    	/// This is only the case if all contained workers are busy.
    	/// \param context the context for the RPC call.
    	/// \param request the request object.
    	/// \param reply the reply object.
    	///
    	grpc::Status IsBusy( grpc::ServerContext* context, const google::protobuf::Empty* request, google::protobuf::BoolValue* reply ) override
    	{
    		reply->set_value( std::all_of( workers.begin( ), workers.end( ),
    		                               []( const std::unique_ptr<ITA::SimulationScheduler::IWorkerInterface>& worker ) { return worker->IsBusy( ); } ) );
    		return grpc::Status::OK;
    	}
    
    	///
    	/// \brief Reset the service.
    	/// \param context the context for the RPC call.
    	/// \param request the request object.
    	/// \param reply the reply object.
    	///
    	grpc::Status Reset( grpc::ServerContext* context, const google::protobuf::Empty* request, ITA::SimulationScheduler::RemoteWorkerRPC::Reply* reply ) override
    	{
    		std::cout << "Reset received, forwarding to workers.\n";
    
    		std::vector resettedWorker( workers.size( ), false );
    		while( !std::all_of( resettedWorker.begin( ), resettedWorker.end( ), []( const bool& val ) { return val; } ) )
    		{
    			int counter = 0;
    			for( auto&& worker: workers )
    			{
    				if( !worker->IsBusy( ) )
    				{
    					worker->Reset( );
    					resettedWorker[counter] = true;
    				}
    				counter++;
    			}
    		}
    
    		reply->set_success( true );
    		return grpc::Status::OK;
    	}
    
    	///
    	/// \brief Shutdown the service and the server.
    	///
    	/// When this RPC is called, the application will terminate.
    	/// \param context the context for the RPC call.
    	/// \param request the request object.
    	/// \param reply the reply object.
    	///
    	grpc::Status Shutdown( grpc::ServerContext* context, const google::protobuf::Empty* request, ITA::SimulationScheduler::RemoteWorkerRPC::Reply* reply ) override
    	{
    		std::cout << "Shutdown requested.\n";
    		shutdownSignaled = true;
    
    		if( shutdownIsReset )
    		{
    			workers.clear( );
    		}
    		else
    		{
    			exit_requested.set_value( );
    		}
    		reply->set_success( true );
    		return grpc::Status::OK;
    	}
    
    private:
    	///
    	/// \brief Workers contained in this service.
    	///
    	std::vector<std::unique_ptr<ITA::SimulationScheduler::IWorkerInterface>> workers;
    
    	///
    	/// \brief Signal for a reset.
    	///
    	std::atomic_bool shutdownSignaled = false;
    
    	///
    	/// \brief Thread-safe queue for finished results.
    	///
    	/// Results are pushed on the queue when a worker finished a simulation and are removed when they get send to the client in ::resultFinished
    	///
    	tbb::concurrent_queue<std::unique_ptr<ITA::SimulationScheduler::CSimulationResult>> resultQueue;
    
    	///
    	/// \brief Indicate if a shutdown request resets the service.
    	///
    	bool shutdownIsReset = false;
    };
    
    ///
    /// \brief Run the RPC server on the given port.
    /// \param port the port to listen on for RPC calls.
    /// \param serverShutdownIsReset indicate if a shutdown request resets the application.
    ///
    void RunServer( int port, bool serverShutdownIsReset )
    {
    	// shutdown implementation from https://stackoverflow.com/questions/32504208/how-to-shutdown-grpc-server-from-client-using-rpc-function
    	auto handler = []( int s )
    	{
    		exit_requested.set_value( );
    	};
    
    	std::signal( SIGABRT, handler );
    	std::signal( SIGBREAK, handler );
    	std::signal( SIGINT, handler );
    	std::signal( SIGTERM, handler );
    
    	const std::string server_address( "0.0.0.0:" + std::to_string( port ) );
    	WorkerServiceImpl service( serverShutdownIsReset );
    
    	grpc::EnableDefaultHealthCheckService( true );
    	grpc::reflection::InitProtoReflectionServerBuilderPlugin( );
    	grpc::ServerBuilder builder;
    
    	// Listen on the given address without any authentication mechanism.
    	builder.AddListeningPort( server_address, grpc::InsecureServerCredentials( ) );
    
    	// Register "service" as the instance through which we'll communicate with clients.
    	builder.RegisterService( &service );
    
    	// Assemble the server.
    	const std::unique_ptr<grpc::Server> server( builder.BuildAndStart( ) );
    	std::cout << "Server listening on " << server_address << '\n';
    	std::cout << "To manually shut down press CTRL+C or close the window\n";
    
    	std::thread serving_thread( [&]( ) { server->Wait( ); } );
    
    	const auto f = exit_requested.get_future( );
    	f.wait( );
    	server->Shutdown( );
    	serving_thread.join( );
    	std::cout << "Server shutting down\n";
    }
    
    int main( int argc, char** argv )
    {
    	CLI::App app {
    		R"(
    Remote worker application for the ITASimulationScheduler.
    
    This application runs a RPC server that listens for RPC calls on the given port.
    		)"
    	};
    
    	uint16_t port = 50052;
    	app.add_option( "port,-p,--port", port, "Port to listen for RPC calls." )
    	    ->check( CLI::Range( static_cast<uint16_t>( 1024 ), std::numeric_limits<uint16_t>::max( ) ) )
    	    ->capture_default_str( );
    
    	bool serverShutdownIsReset = false;
    	app.add_flag( "-s,--shutdown-is-reset", serverShutdownIsReset, "If given, the application is reset after a shutdown is received." );
    
    	try
    	{
    		app.parse( argc, argv );
    	}
    	catch( const CLI::ParseError& e )
    	{
    		return app.exit( e );
    	}
    
    	RunServer( port, serverShutdownIsReset );
    
    	return 0;
    }