Commit bc38cf1d authored by Pascal Palenda's avatar Pascal Palenda
Browse files

Add CScheduler class and tests - WIP

parent b8206201
#ifndef INCLUDE_WATCHER_ITA_SIMULATION_SCHEDULER_ROOM_ACOUSTICS_SCHEDULER
#define INCLUDE_WATCHER_ITA_SIMULATION_SCHEDULER_ROOM_ACOUSTICS_SCHEDULER
// std includes
#include <memory>
#include <vector>
#include <unordered_map>
#include <list>
// API includes
#include <ITA/simulation_scheduler/definitions.h>
// simulation scheduler includes
#include <ITA/simulation_scheduler/room_acoustics/scheduler_interface.h>
#include <ITA/simulation_scheduler/room_acoustics/simulation_result.h>
#include <ITA/simulation_scheduler/config.h>
#include <ITA/simulation_scheduler/types.h>
#include <ITA/simulation_scheduler/update_message.h>
#include <ITA/simulation_scheduler/update_scene.h>
#include <ITA/simulation_scheduler/update_config.h>
#include <ITA/simulation_scheduler/audibility_filter/audibility_filter_interface.h>
#include <ITA/simulation_scheduler/room_acoustics/worker_interface.h>
// VISTA includes
#include <VistaInterProcComm/Concurrency/VistaThreadLoop.h>
#include <VistaInterProcComm/Concurrency/VistaThreadEvent.h>
#include <VistaAspects/VistaPropertyList.h>
// ITA includes
#include <ITAAtomicPrimitives.h>
// Other includes
#include <tbb/concurrent_queue.h>
namespace ITA
{
namespace simulation_scheduler
{
class CUpdateScene;
class CUpdateConfig;
namespace room_acoustics
{
class CSimulationResult;
class IWorkerInterface;
///
/// \brief Handles the filtering and scheduling of updates to its workers
///
/// CScheduler is kind of the main class of the SimulationScheduler Library.
/// This scheduler runs local to where it was created.
/// Each CScheduler is responsible for one FieldOfDuty.
/// It receives updates, depending on what type od update it is, it either applies the config change (CUpdateConfig), or processes the CUpdateScene.
/// For this, the CScheduler uses its audibility filters to determine if the update is indeed audible.
/// If the update is in fact audible, CScheduler distributes the update to idle workers who actually simulate the update.
/// When the update is simulated, the result is passed to all attached result handlers.
/// \todo Determine where whe save the last simulated update, before sending it to the worker or when receiving it from the worker.
///
class ITA_SIMULATION_SCHEDULER_API CScheduler : public ISchedulerInterface, public VistaThreadLoop
{
public:
///
/// \brief Configuration class for local schedulers.
///
/// This is a basic overload of VistaPropertyList.
/// Its main purpose is to distinguish a scheduler config and allow easy access to the configs for both worker an d filter.
/// \todo Allow for duplicate filters?
///
class ITA_SIMULATION_SCHEDULER_API SchedulerConfig : public VistaPropertyList
{
public:
///
/// \brief Returns the default Configuration for the local scheduler.
/// \return the default config for the local scheduler.
///
static SchedulerConfig getDefaultLocalSchedulerConfig ( )
{
auto config = SchedulerConfig ( );
config.SetValueInSubList ( "ReplaceUpdates", "Scheduler", true );
config.SetValueInSubList ( "FieldOfDuty", "Scheduler", as_integer ( room_acoustics::FieldOfDuty::directSound ) );
return config;
}
///
/// \brief Set the worker config of the SchedulerConfig.
/// \param vWorkerConfig all workers to add to the config.
/// \todo Do we set or add worker configs? If we set, do we overwrite?
///
void setWorkerConfig ( const std::vector<IWorkerInterface::WorkerConfig>& vWorkerConfig );
///
/// \brief Set the filter config of the SchedulerConfig.
///
/// The order of the filters is also saved in the config.
/// \param vFilterConfig all filter to add to the config.
/// \remark Note the use of std::list instead of std::vector as the filter possess an order.
/// \todo Do we set or add filter configs? If we set, do we overwrite?
///
void setAudibilityFilterConfig (
const std::list<audibility_filter::IAudibilityFilter::AudibilityFilterConfig>& vFilterConfig );
///
/// \brief Get all worker configs from the SchedulerConfig.
/// \return a vector with all worker configs.
///
std::vector<IWorkerInterface::WorkerConfig> getWorkerConfig ( ) const;
///
/// \brief Get all filter configs from the SchedulerConfig.
///
/// The order in which the filters were set in the config is also represented in the list.
/// \return a list with all filter configs.
/// \remark Note the use of std::list instead of std::vector as the filter possess an order.
///
std::list<audibility_filter::IAudibilityFilter::AudibilityFilterConfig> getAudibilityFilterConfig ( ) const;
};
CScheduler ( ) = delete;
///
/// \brief Constructor for a local CScheduler.
///
/// The scheduler is configured via its SchedulerConfig.
/// \param pConfig the SchedulerConfig used to configure the CScheduler.
///
CScheduler ( const SchedulerConfig& pConfig );
///
/// \brief Destructor of CScheduler.
///
/// It stops the loop.
///
virtual ~CScheduler ( );
///
/// \copydoc ISchedulerInterface::postUpdate()
///
virtual void postUpdate ( std::unique_ptr<IUpdateMessage> pUpdateMessage ) override;
///
/// \copydoc ISchedulerInterface::attachResultHandler()
///
virtual void attachResultHandler ( IResultHandler* pResultHandler ) override;
///
/// \copydoc ISchedulerInterface::detachResultHandler()
///
virtual void detachResultHandler ( IResultHandler* pResultHandler ) override;
///
/// \brief Handle when a result is finished.
///
/// This function is primarily called by workers who finished a simulation.
/// The result of this simulation is passed back to the scheduler and distributed to the result handlers.
/// \remark This function takes ownership of the result.
/// \param pResult a finished result.
///
void handleSimulationFinished ( std::unique_ptr<CSimulationResult> pResult );
protected:
///
/// \brief Loop body for the CScheduler.
///
/// The following steps are done in the loop:
/// -# Wait for a new update in m_qUpdateQueue.
/// -# move the updates from the concurrent queue to a list.
/// -# The updates are processed depending on their type
/// - \b CUpdateConfig: The config change is applied.
/// - \b CUpdateScene: The audibility of the update is determined and if the update is audible, placed in m_lPendingUpdateList.
/// -# The m_lUpdateList gets cleared for the next loop iteration.
/// -# If the updates should be filtered, filterReplace() is called.
/// -# The updates are distributed to the workers.
/// -# When a update is posted to a worker, it is saved as the latest update for its source-receiver-pair for the filters to reference..
/// \return true if VistaThreadLoop::ThreadBody() shall call VistaThread::YieldThread() before the next loop, false else
///
bool LoopBody ( ) override;
///
/// \brief Gets a vector of pointers to the workers.
/// \note This function is primarily for testing.
/// However, it could also be useful in the long run.
/// \return a vector of pointers to the workers.
///
std::vector<IWorkerInterface*> getWorker ( );
///
/// \brief Gets a vector of pointers to the filters.
/// \note This function is primarily for testing.
/// However, it could also be useful in the long run.
/// \return a vector of pointers to the filters.
///
std::vector<audibility_filter::IAudibilityFilter*> getFilter ( );
private:
///
/// \brief Process a CUpdateScene.
///
/// In this function the CUpdateScene is compared to the latest simulated CUpdateScene that was simulated to determine its audibility.
/// If the update is audible, it is moved to m_lPendingUpdateList.
/// \param pUpdate the CUpdateScene to check for audibility.
///
void processUpdateScene ( std::unique_ptr<CUpdateScene> pUpdate );
///
/// \brief Process a CUpdateConfig and apply it.
/// \param pUpdate the CUpdateConfig to apply.
///
void processUpdateConfig ( std::unique_ptr <CUpdateConfig> pUpdate );
///
/// \brief Perform a reset.
///
/// This function signals a reset to all workers and clears all update lists.
/// \todo Make the reset process more elegant if possible.
///
void reset ( );
///
/// \brief Vector of all attached IResultHandlers.
///
std::vector<IResultHandler*> m_vpResultHandlers;
///
/// \brief All workers belonging to the CScheduler.
///
std::vector<std::unique_ptr<IWorkerInterface>> m_vWorker;
///
/// \brief List of the audibility filters belonging to CScheduler.
/// \remark The order of the filters matters, thus this is a std::list.
///
std::list<std::unique_ptr<audibility_filter::IAudibilityFilter>> m_lAudibilityFilters;
///
/// \brief Concurrent queue to handle the input of new Updates.
///
/// This queue is thus accessed by two threads the calling and the loop thread.
///
tbb::concurrent_queue<std::unique_ptr<IUpdateMessage>> m_qUpdateQueue;
///
/// \brief List to temporarily move the new updates in from the queue.
///
/// This list simplifies the access to the updates, as this list now only is accessed by one thread.
///
std::list<std::unique_ptr<IUpdateMessage>> m_lUpdateList;
///
/// \brief List with CUpdateScenes pending for simulation.
///
std::list<std::unique_ptr<CUpdateScene>> m_lPendingUpdateList;
///
/// \brief Map containing all latest simulated updates.
///
/// This is map is used for the filters to be able to reference the latest simulated update for its decision.
/// \remark Note the use of unordered_map. This type of map has on average constant complexity for \p find.
///
std::unordered_map<int, CUpdateScene*> m_mPreviousStates;
///
/// \brief The #FieldOfDuty of the CScheduler.
/// \todo Pass the FOD to the workers and filters via the config in some way, shape or form.
///
FieldOfDuty m_eFieldOfDuty;
///
/// \brief True if the controller should filter its received updates using filterReplace().
///
bool m_bReplaceUpdates;
///
/// \brief Trigger for starting a thread loop.
///
VistaThreadEvent m_evTriggerLoop;
///
/// \{
/// \brief Bools for handling the stop of the loop.
///
/// As these will be accessed by two threads, they have to be atomic.
///
ITAAtomicBool m_bStopIndicated = false;
ITAAtomicBool m_bStopACK = false;
/// \}
///
/// \brief Bool for handeling the reset.
/// \todo This probably does not have to be atomic as the reset happens in the same thread.
///
ITAAtomicBool m_bResetIndicated = false;
};
} // namespace room_acoustics
} // namespace simulation_scheduler
} // namespace ITA
#endif // INCLUDE_WATCHER_ITA_SIMULATION_SCHEDULER_ROOM_ACOUSTICS_SCHEDULER
\ No newline at end of file
// Header include
#include <ITA/simulation_scheduler/room_acoustics/scheduler.h>
// simulation scheduler includes
#include <ITA/simulation_scheduler/room_acoustics/result_handler.h>
#include "../src/ITA/simulation_scheduler/room_acoustics/replacement_filter.h"
// Vista includes
#include <VistaBase/VistaTimeUtils.h>
// ITA include
#include <ITAException.h>
namespace ITA
{
namespace simulation_scheduler
{
namespace room_acoustics
{
void CScheduler::SchedulerConfig::setWorkerConfig (
const std::vector<IWorkerInterface::WorkerConfig>& vWorkerConfig )
{
if ( HasSubList ( "Scheduler" ) )
{
auto&& schedulerConfig = GetSubListRef ( "Scheduler" );
auto config = VistaPropertyList ( );
for ( auto workerConfig : vWorkerConfig )
{
// Check whether there is only one item in workerConfig.
// If so, begin() points to this item.
// So we check if it is a property list so we can use it as the config.
if ( workerConfig.size ( ) == 1 &&
workerConfig.begin ( )->second.GetPropertyType ( ) == VistaProperty::ePropType::PROPT_PROPERTYLIST )
config.MergeWith ( workerConfig );
else
ITA_EXCEPT_INVALID_PARAMETER ( "The given config should only have one sub list entry." );
}
schedulerConfig.SetPropertyListValue ( "Scheduler/Worker", config );
}
else
ITA_EXCEPT_INVALID_PARAMETER ( "Config is not a Scheduler config." )
}
void CScheduler::SchedulerConfig::setAudibilityFilterConfig (
const std::list<audibility_filter::IAudibilityFilter::AudibilityFilterConfig>& vFilterConfig )
{
if ( HasSubList ( "Scheduler" ) )
{
auto&& schedulerConfig = GetSubListRef ( "Scheduler" );
auto config = VistaPropertyList ( );
int counter = 0;
for ( auto filterConfig : vFilterConfig )
{
// Check whether there is only one item in filterConfig.
// If so, begin() points to this item.
// So we check if it is a property list so we can use it as the config.
if ( filterConfig.size ( ) == 1 &&
filterConfig.begin ( )->second.GetPropertyType ( ) == VistaProperty::ePropType::PROPT_PROPERTYLIST )
{
schedulerConfig.SetValueInSubList ( std::to_string ( counter ), "Scheduler/AudibilityFilterOrder", filterConfig.begin ( )->first );
config.MergeWith ( filterConfig );
}
else
ITA_EXCEPT_INVALID_PARAMETER ( "The given config should only have one sub list entry." );
counter++;
}
schedulerConfig.SetPropertyListValue ( "Scheduler/AudibilityFilter", config );
}
else
ITA_EXCEPT_INVALID_PARAMETER ( "Config is not a Scheduler config." )
}
std::vector<IWorkerInterface::WorkerConfig> CScheduler::SchedulerConfig::
getWorkerConfig ( ) const
{
if ( HasSubList ( "Scheduler" ) )
{
const auto schedulerConfig = GetSubListConstRef ( "Scheduler" );
if ( schedulerConfig.HasSubList ( "Scheduler/Worker" ) )
{
const auto workerConfigs = schedulerConfig.GetSubListConstRef ( "Scheduler/Worker" );
std::vector<IWorkerInterface::WorkerConfig> configs;
for ( auto iter = workerConfigs.begin ( ); iter != workerConfigs.end ( ); ++iter )
{
IWorkerInterface::WorkerConfig workerConfig;
workerConfig.SetPropertyListValue ( iter->first, iter->second.GetPropertyListValue ( ) );
configs.push_back ( workerConfig );
}
return configs;
}
else
return std::vector<IWorkerInterface::WorkerConfig> ( );
}
else
ITA_EXCEPT_INVALID_PARAMETER ( "Config is not a Scheduler config." )
}
std::list<audibility_filter::IAudibilityFilter::AudibilityFilterConfig> CScheduler::SchedulerConfig::
getAudibilityFilterConfig ( ) const
{
if ( HasSubList ( "Scheduler" ) )
{
const auto schedulerConfig = GetSubListConstRef ( "Scheduler" );
if ( schedulerConfig.HasSubList ( "Scheduler/AudibilityFilter" ) && schedulerConfig.HasSubList ( "Scheduler/AudibilityFilterOrder" ) )
{
const auto filterConfigs = schedulerConfig.GetSubListConstRef ( "Scheduler/AudibilityFilter" );
const auto filterOrder = schedulerConfig.GetSubListConstRef ( "Scheduler/AudibilityFilterOrder" );
std::list<audibility_filter::IAudibilityFilter::AudibilityFilterConfig> configs;
for ( int i = 0; i < filterOrder.size ( ); ++i )
{
auto filterName = filterOrder.GetValue<std::string> ( std::to_string ( i ) );
audibility_filter::IAudibilityFilter::AudibilityFilterConfig filterConfig;
filterConfig.SetPropertyListValue ( filterName, filterConfigs.GetSubListCopy ( filterName ) );
configs.push_back ( filterConfig );
}
return configs;
}
else
return std::list<audibility_filter::IAudibilityFilter::AudibilityFilterConfig> ( );
}
else
ITA_EXCEPT_INVALID_PARAMETER ( "Config is not a Scheduler config." )
}
CScheduler::CScheduler ( const SchedulerConfig& pConfig )
{
if ( pConfig.HasSubList ( "Scheduler" ) )
{
m_bReplaceUpdates = pConfig.GetValueInSubList<bool> ( "ReplaceUpdates", "Scheduler" );
m_eFieldOfDuty = static_cast< FieldOfDuty >( pConfig.GetValueInSubList<int> ( "FieldOfDuty", "Scheduler" ) );
auto filterConfigs = pConfig.getAudibilityFilterConfig ( );
auto workerConfigs = pConfig.getWorkerConfig ( );
for ( const auto& filterConfig : filterConfigs )
{
m_lAudibilityFilters.push_back ( std::move ( audibility_filter::CAudibilityFilterFactory::createFilter ( filterConfig ) ) );
}
for ( const auto& workerConfig : workerConfigs )
{
m_vWorker.push_back ( std::move ( CWorkerFactory::createWorker ( workerConfig, this ) ) );
}
Run ( );
}
else
ITA_EXCEPT_INVALID_PARAMETER ( "Config is not a Scheduler config." )
}
CScheduler::~CScheduler ( )
{
m_bStopIndicated = true;
m_evTriggerLoop.SignalEvent ( );
while ( m_bStopACK != true )
VistaTimeUtils::Sleep ( 250 );
m_evTriggerLoop.SignalEvent ( );
VistaThreadLoop::StopGently ( true );
}
void CScheduler::postUpdate ( std::unique_ptr<IUpdateMessage> pUpdateMessage )
{
m_qUpdateQueue.push ( std::move ( pUpdateMessage ) );
m_evTriggerLoop.SignalEvent ( );
}
void CScheduler::attachResultHandler ( IResultHandler* pResultHandler )
{
m_vpResultHandlers.push_back ( pResultHandler );
}
void CScheduler::detachResultHandler ( IResultHandler* pResultHandler )
{
const auto iterator = std::remove ( m_vpResultHandlers.begin ( ), m_vpResultHandlers.end ( ), pResultHandler );
m_vpResultHandlers.erase ( iterator );
}
void CScheduler::handleSimulationFinished ( std::unique_ptr<CSimulationResult> pResult )
{
for ( auto handler : m_vpResultHandlers )
handler->postResultReceived ( std::make_unique<CSimulationResult> ( *pResult ) );
}
void CScheduler::processUpdateScene ( std::unique_ptr <CUpdateScene> pUpdate )
{
auto bAudible = true;
// Check if a previous update exists.
if ( m_mPreviousStates.find ( pUpdate->getReferenceID ( ) ) != m_mPreviousStates.end ( ) )
{
// Test if the update is audible.
// Note the extra condition in the for loop, this gains performance if a previous update indicates non audibility.
for ( auto filterIter = m_lAudibilityFilters.begin ( );
bAudible && filterIter != m_lAudibilityFilters.end ( );
++filterIter )
{
bAudible &= ( *filterIter )->filter ( *m_mPreviousStates.at ( pUpdate->getReferenceID ( ) ), *pUpdate );
}
}
if ( bAudible )
m_lPendingUpdateList.push_back ( std::move ( pUpdate ) );
}
void CScheduler::processUpdateConfig ( std::unique_ptr <CUpdateConfig> pUpdate )
{
switch ( pUpdate->getType ( ) )
{
case CUpdateConfig::ConfigChangeType::changeDirectivity:
ITA_EXCEPT_NOT_IMPLEMENTED;
break;
case CUpdateConfig::ConfigChangeType::changeHRTF:
ITA_EXCEPT_NOT_IMPLEMENTED;
break;
case CUpdateConfig::ConfigChangeType::resetAll:
reset ( );
break;
default:
ITA_EXCEPT_INVALID_PARAMETER ( "Unkown type of config change" );
}
}
void CScheduler::reset ( )
{
// signal the reset to the workers
for ( auto& worker : m_vWorker )
{
while ( worker->isBusy ( ) )
VistaTimeUtils::Sleep ( 250 );
worker->reset ( );
}
// The loop was on hold, clear all update lists.
m_qUpdateQueue.clear ( );
m_lUpdateList.clear ( );
m_lPendingUpdateList.clear ( );
// start the loop again.
m_bResetIndicated = true;
m_evTriggerLoop.SignalEvent ( );
}
bool CScheduler::LoopBody ( )
{
// Wait for trigger
m_evTriggerLoop.WaitForEvent ( true );
m_evTriggerLoop.ResetThisEvent ( );
if ( m_bResetIndicated )
{
m_bResetIndicated = false;
}
// Handle the stop of the loop
if ( m_bStopIndicated )
{
IndicateLoopEnd ( );
m_bStopACK = true;
return false;
}
// get the updates from the queue to a list.
std::unique_ptr<IUpdateMessage> updateMessage;
while ( m_qUpdateQueue.try_pop ( updateMessage ) )
{
m_lUpdateList.push_back ( std::move ( updateMessage ) );
}
// process the updates
for ( auto& update : m_lUpdateList )
{