Skip to content
Snippets Groups Projects
Commit a8b719da authored by Tim Übelhör's avatar Tim Übelhör
Browse files

Implemented the QueueDispatcher

parent 676259c9
No related branches found
No related tags found
No related merge requests found
...@@ -171,6 +171,7 @@ ...@@ -171,6 +171,7 @@
</ItemGroup> </ItemGroup>
<ItemGroup> <ItemGroup>
<ClCompile Include="EngineObserver.cpp" /> <ClCompile Include="EngineObserver.cpp" />
<ClCompile Include="QueueDispatcherTest.cpp" />
<ClCompile Include="stdafx.cpp"> <ClCompile Include="stdafx.cpp">
<PrecompiledHeader Condition="'$(Configuration)|$(Platform)'=='Release|x64'">Create</PrecompiledHeader> <PrecompiledHeader Condition="'$(Configuration)|$(Platform)'=='Release|x64'">Create</PrecompiledHeader>
<PrecompiledHeader Condition="'$(Configuration)|$(Platform)'=='Debug|Win32'">Create</PrecompiledHeader> <PrecompiledHeader Condition="'$(Configuration)|$(Platform)'=='Debug|Win32'">Create</PrecompiledHeader>
......
...@@ -38,5 +38,8 @@ ...@@ -38,5 +38,8 @@
<ClCompile Include="WaitableQueueTest.cpp"> <ClCompile Include="WaitableQueueTest.cpp">
<Filter>Quelldateien</Filter> <Filter>Quelldateien</Filter>
</ClCompile> </ClCompile>
<ClCompile Include="QueueDispatcherTest.cpp">
<Filter>Quelldateien</Filter>
</ClCompile>
</ItemGroup> </ItemGroup>
</Project> </Project>
\ No newline at end of file
#include "stdafx.h"
#include "CppUnitTest.h"
#include "QueueDispatcher.h"
#include <iostream>
#include <future>
using namespace Microsoft::VisualStudio::CppUnitTestFramework;
using namespace std::placeholders;
namespace BackendTests
{
TEST_CLASS(QueueDispatcherTest)
{
private:
Utility::QueueDispatcher m_dispatcher;
public:
TEST_METHOD(TestQueueDispatcher)
{
auto addOne = [](int n)
{
return n + 1;
};
auto get42 = [addOne]()
{
return addOne(41);
};
// Fail
Assert::IsFalse(m_dispatcher.execute_one());
// Add an action manually
auto dispFuture = m_dispatcher.push<int>(get42);
m_dispatcher.execute_one();
Assert::AreEqual(42, dispFuture.get());
// Fail
Assert::IsFalse(m_dispatcher.execute_one());
// More fun with execution in auto mode and two threads
m_dispatcher.set_automatic();
const int COUNT = 10000;
std::thread one([&]()
{
for (int i = 0; i < COUNT; i++)
{
auto addOneToI = [addOne, i]()
{
return addOne(i);
};
auto res = m_dispatcher.push<int>(addOneToI);
Assert::AreEqual(i + 1, res.get());
}
});
std::thread two([&]()
{
for (int i = COUNT; i > 0; i--)
{
auto addOneToI = [addOne, i]()
{
return addOne(i);
};
auto res = m_dispatcher.push<int>(addOneToI);
Assert::AreEqual(i + 1, res.get());
}
});
one.join();
two.join();
}
};
}
\ No newline at end of file
...@@ -15,18 +15,13 @@ namespace BackendTests ...@@ -15,18 +15,13 @@ namespace BackendTests
TEST_METHOD(TestFmuEngine) TEST_METHOD(TestFmuEngine)
{ {
// Basic setup // Basic setup
Simulator fmuEngine("TestID"); Simulator simulator;
auto observer = std::make_shared<EngineObserver>(); auto observer = std::make_shared<EngineObserver>();
// Add & remove observer // Add & remove observer
fmuEngine.AddObserver(observer); simulator.AddObserver(observer);
fmuEngine.AddObserver(observer); // double insert simulator.AddObserver(observer); // double insert
fmuEngine.RemoveObserver(observer); simulator.RemoveObserver(observer);
fmuEngine.RemoveObserver(observer); // Remove Empty simulator.RemoveObserver(observer); // Remove Empty
// Add & remove channel link
fmuEngine.AddChannelLink("master", "slave", 1, 2, 3, 4);
fmuEngine.AddChannelLink("master", "slave", 1, 2, 3, 4);
fmuEngine.RemoveChannelLink("master", "slave", 1, 2);
fmuEngine.RemoveChannelLink("master", "slave", 1, 2);
} }
}; };
......
...@@ -9,20 +9,20 @@ namespace BackendTests ...@@ -9,20 +9,20 @@ namespace BackendTests
TEST_CLASS(WaitableQueueTest) TEST_CLASS(WaitableQueueTest)
{ {
private: private:
Utility::WaitableQueue<int> _queue; Utility::WaitableQueue<int> m_queue;
public: public:
TEST_METHOD_INITIALIZE(TestWaitableQueueInitialize) TEST_METHOD_INITIALIZE(TestWaitableQueueInitialize)
{ {
// Boost none is expected // Boost none is expected
Assert::IsTrue(!_queue.pop()); Assert::IsTrue(!m_queue.pop());
} }
void produce(int n) void produce(int n)
{ {
for (int i = 0; i < n; i++) for (int i = 0; i < n; i++)
{ {
_queue.push(i); m_queue.push(i);
} }
} }
...@@ -30,7 +30,7 @@ namespace BackendTests ...@@ -30,7 +30,7 @@ namespace BackendTests
{ {
for (int i = 0; i < n; i++) for (int i = 0; i < n; i++)
{ {
Assert::AreEqual(i, _queue.wait_and_pop()); Assert::AreEqual(i, m_queue.wait_and_pop());
} }
} }
...@@ -44,9 +44,22 @@ namespace BackendTests ...@@ -44,9 +44,22 @@ namespace BackendTests
producer.join(); producer.join();
consumer.join(); consumer.join();
// One more availabe to test regular pop // One more availabe to test regular pop
Assert::AreEqual(COUNT, _queue.pop().get()); Assert::AreEqual(COUNT, m_queue.pop().get());
// Test waitfor
std::thread popTimeout([&]()
{
Assert::IsTrue(!m_queue.waitfor_and_pop(100));
});
std::this_thread::sleep_for(std::chrono::milliseconds(300));
popTimeout.join();
std::thread popNoTimeout([&]()
{
Assert::AreEqual(42, m_queue.waitfor_and_pop(300).get());
});
m_queue.push(42);
popNoTimeout.join();
// Nothing available // Nothing available
Assert::IsTrue(!_queue.pop()); Assert::IsTrue(!m_queue.pop());
} }
}; };
......
#include "QueueDispatcher.h"
namespace Utility
{
QueueDispatcher::QueueDispatcher()
{
m_manual = true;
}
QueueDispatcher::~QueueDispatcher()
{
// Stop execution
set_manual();
}
void QueueDispatcher::set_manual()
{
m_manual = true;
if (t_execution.joinable())
{
t_execution.join();
}
}
bool QueueDispatcher::execute_one()
{
if (m_manual)
{
if (auto action = m_queue.pop())
{
action.get().set_value();
return true;
}
else
{
return false;
}
}
else
{
return false;
}
}
void QueueDispatcher::set_automatic()
{
m_manual = false;
// Execute in new thread
t_execution = std::thread(std::bind(&QueueDispatcher::execute_loop, this));
}
void QueueDispatcher::execute_loop()
{
while (!m_manual)
{
if (auto action = m_queue.waitfor_and_pop(100))
{
action.get().set_value();
}
}
}
}
\ No newline at end of file
#pragma once
#include "WaitableQueue.h"
#include <atomic>
#include <future>
#include <functional>
#include <thread>
#include <utility>
namespace Utility
{
/// Use this class to dispatch actions sequencially.
/// There are two modes available: Run automaic or execute manually.
class QueueDispatcher
{
public:
/// Creates a QueueDispacther in manual mode
QueueDispatcher();
// Only one dispatcher may be responsible
~QueueDispatcher();
/// Switch to manual execution
void set_manual();
/// Execute one action, returns false if no action is available or automatic mode is active
bool execute_one();
/// Switch to automatic execution
void set_automatic();
/// Pushes the action to the execuion queue.
template <typename T>
std::future<T> push(std::function<T()> fn)
{
// Wrapper that waits for a signal
auto wrapperFn = [](std::function<T()> fn, std::future<void> signal)
{
signal.wait();
return fn();
};
std::promise<void> signaler;
// Get the async operation ready before adding the signaler
std::future<T> future = async(std::launch::async, wrapperFn, fn, signaler.get_future());
m_queue.push(std::move(signaler));
return future;
}
private:
/// Signal by setting the promises
WaitableQueue<std::promise<void>> m_queue;
std::atomic_bool m_manual;
std::thread t_execution;
void execute_loop();
};
}
\ No newline at end of file
...@@ -15,6 +15,10 @@ ...@@ -15,6 +15,10 @@
<ProjectCapability Include="SourceItemsFromImports" /> <ProjectCapability Include="SourceItemsFromImports" />
</ItemGroup> </ItemGroup>
<ItemGroup> <ItemGroup>
<ClInclude Include="$(MSBuildThisFileDirectory)QueueDispatcher.h" />
<ClInclude Include="$(MSBuildThisFileDirectory)WaitableQueue.h" /> <ClInclude Include="$(MSBuildThisFileDirectory)WaitableQueue.h" />
</ItemGroup> </ItemGroup>
<ItemGroup>
<ClCompile Include="$(MSBuildThisFileDirectory)QueueDispatcher.cpp" />
</ItemGroup>
</Project> </Project>
\ No newline at end of file
...@@ -13,7 +13,7 @@ namespace Utility ...@@ -13,7 +13,7 @@ namespace Utility
class WaitableQueue class WaitableQueue
{ {
private: private:
std::queue<T> _queue; std::queue<T> m_queue;
std::mutex _mutex; std::mutex _mutex;
std::condition_variable _cond_var; std::condition_variable _cond_var;
...@@ -33,17 +33,38 @@ namespace Utility ...@@ -33,17 +33,38 @@ namespace Utility
boost::optional<T> pop() boost::optional<T> pop()
{ {
std::unique_lock<std::mutex> lock(_mutex); std::unique_lock<std::mutex> lock(_mutex);
if (_queue.empty()) if (m_queue.empty())
{ {
return boost::none; return boost::none;
} }
else else
{ {
auto item = _queue.front(); auto item = std::move(m_queue.front());
_queue.pop(); m_queue.pop();
return item; return item;
} }
} }
boost::optional<T> waitfor_and_pop(long milliseconds)
{
// Wait for signal
std::unique_lock<std::mutex> lock(_mutex);
// Prevent spurious wakeups by checking the queue
if (_cond_var.wait_for(lock, std::chrono::milliseconds(milliseconds), [&]()
{
return !m_queue.empty();
}))
{
// Return the front
auto item = std::move(m_queue.front());
m_queue.pop();
return item;
}
else
{
// Timed out
return boost::none;
}
}
/// Waits for data to become available and returns the first element. /// Waits for data to become available and returns the first element.
T wait_and_pop() T wait_and_pop()
{ {
...@@ -52,11 +73,11 @@ namespace Utility ...@@ -52,11 +73,11 @@ namespace Utility
// Prevent spurious wakeups by checking the queue // Prevent spurious wakeups by checking the queue
_cond_var.wait(lock, [&]() _cond_var.wait(lock, [&]()
{ {
return !_queue.empty(); return !m_queue.empty();
}); });
// Return the front // Return the front
auto item = _queue.front(); auto item = std::move(m_queue.front());
_queue.pop(); m_queue.pop();
return item; return item;
} }
/// Add an item to the end of the queue /// Add an item to the end of the queue
...@@ -64,7 +85,7 @@ namespace Utility ...@@ -64,7 +85,7 @@ namespace Utility
{ {
executeAndNotify([&]() executeAndNotify([&]()
{ {
_queue.push(item); m_queue.push(item);
}); });
} }
/// Add an item to the end of the queue /// Add an item to the end of the queue
...@@ -72,7 +93,7 @@ namespace Utility ...@@ -72,7 +93,7 @@ namespace Utility
{ {
executeAndNotify([&]() executeAndNotify([&]()
{ {
_queue.push(std::move(item)); m_queue.push(std::move(item));
}); });
} }
}; };
......
0% Loading or .
You are about to add 0 people to the discussion. Proceed with caution.
Please register or to comment