From a8b719da03f3209957ff71df92af65140d10e4d0 Mon Sep 17 00:00:00 2001
From: Tim Uebelhoer <tim.uebelhoer@rwth-aachen.de>
Date: Wed, 3 Jan 2018 18:18:34 +0100
Subject: [PATCH] Implemented the QueueDispatcher

---
 BackendTests/BackendTests.vcxproj         |  1 +
 BackendTests/BackendTests.vcxproj.filters |  3 +
 BackendTests/QueueDispatcherTest.cpp      | 67 +++++++++++++++++++++++
 BackendTests/SimulatorTest.cpp            | 15 ++---
 BackendTests/WaitableQueueTest.cpp        | 25 +++++++--
 UtilityClasses/QueueDispatcher.cpp        | 61 +++++++++++++++++++++
 UtilityClasses/QueueDispatcher.h          | 52 ++++++++++++++++++
 UtilityClasses/UtilityClasses.vcxitems    |  4 ++
 UtilityClasses/WaitableQueue.h            | 39 ++++++++++---
 9 files changed, 242 insertions(+), 25 deletions(-)
 create mode 100644 BackendTests/QueueDispatcherTest.cpp
 create mode 100644 UtilityClasses/QueueDispatcher.cpp
 create mode 100644 UtilityClasses/QueueDispatcher.h

diff --git a/BackendTests/BackendTests.vcxproj b/BackendTests/BackendTests.vcxproj
index 0d8ab30..5a9bea4 100644
--- a/BackendTests/BackendTests.vcxproj
+++ b/BackendTests/BackendTests.vcxproj
@@ -171,6 +171,7 @@
   </ItemGroup>
   <ItemGroup>
     <ClCompile Include="EngineObserver.cpp" />
+    <ClCompile Include="QueueDispatcherTest.cpp" />
     <ClCompile Include="stdafx.cpp">
       <PrecompiledHeader Condition="'$(Configuration)|$(Platform)'=='Release|x64'">Create</PrecompiledHeader>
       <PrecompiledHeader Condition="'$(Configuration)|$(Platform)'=='Debug|Win32'">Create</PrecompiledHeader>
diff --git a/BackendTests/BackendTests.vcxproj.filters b/BackendTests/BackendTests.vcxproj.filters
index de4ac7c..227ed27 100644
--- a/BackendTests/BackendTests.vcxproj.filters
+++ b/BackendTests/BackendTests.vcxproj.filters
@@ -38,5 +38,8 @@
     <ClCompile Include="WaitableQueueTest.cpp">
       <Filter>Quelldateien</Filter>
     </ClCompile>
+    <ClCompile Include="QueueDispatcherTest.cpp">
+      <Filter>Quelldateien</Filter>
+    </ClCompile>
   </ItemGroup>
 </Project>
\ No newline at end of file
diff --git a/BackendTests/QueueDispatcherTest.cpp b/BackendTests/QueueDispatcherTest.cpp
new file mode 100644
index 0000000..cc61c4c
--- /dev/null
+++ b/BackendTests/QueueDispatcherTest.cpp
@@ -0,0 +1,67 @@
+#include "stdafx.h"
+#include "CppUnitTest.h"
+#include "QueueDispatcher.h"
+#include <iostream>
+#include <future>
+
+using namespace Microsoft::VisualStudio::CppUnitTestFramework;
+using namespace std::placeholders;
+
+namespace BackendTests
+{
+    TEST_CLASS(QueueDispatcherTest)
+    {
+    private:
+        Utility::QueueDispatcher m_dispatcher;
+    public:
+        TEST_METHOD(TestQueueDispatcher)
+        {
+            auto addOne = [](int n)
+            {
+                return n + 1;
+            };
+            auto get42 = [addOne]()
+            {
+                return addOne(41);
+            };
+            // Fail
+            Assert::IsFalse(m_dispatcher.execute_one());
+            // Add an action manually
+            auto dispFuture = m_dispatcher.push<int>(get42);
+            m_dispatcher.execute_one();
+            Assert::AreEqual(42, dispFuture.get());
+            // Fail
+            Assert::IsFalse(m_dispatcher.execute_one());
+            // More fun with execution in auto mode and two threads
+            m_dispatcher.set_automatic();
+            const int COUNT = 10000;
+            std::thread one([&]()
+            {
+                for (int i = 0; i < COUNT; i++)
+                {
+                    auto addOneToI = [addOne, i]()
+                    {
+                        return addOne(i);
+                    };
+                    auto res = m_dispatcher.push<int>(addOneToI);
+                    Assert::AreEqual(i + 1, res.get());
+                }
+            });
+            std::thread two([&]()
+            {
+                for (int i = COUNT; i > 0; i--)
+                {
+                    auto addOneToI = [addOne, i]()
+                    {
+                        return addOne(i);
+                    };
+                    auto res = m_dispatcher.push<int>(addOneToI);
+                    Assert::AreEqual(i + 1, res.get());
+                }
+            });
+            one.join();
+            two.join();
+        }
+
+    };
+}
\ No newline at end of file
diff --git a/BackendTests/SimulatorTest.cpp b/BackendTests/SimulatorTest.cpp
index 31f7927..fc9c07a 100644
--- a/BackendTests/SimulatorTest.cpp
+++ b/BackendTests/SimulatorTest.cpp
@@ -15,18 +15,13 @@ namespace BackendTests
 		TEST_METHOD(TestFmuEngine)
 		{
 			// Basic setup
-			Simulator fmuEngine("TestID");
+			Simulator simulator;
 			auto observer = std::make_shared<EngineObserver>();
 			// Add & remove observer
-			fmuEngine.AddObserver(observer);
-			fmuEngine.AddObserver(observer);	// double insert
-			fmuEngine.RemoveObserver(observer);
-			fmuEngine.RemoveObserver(observer);	// Remove Empty
-			// Add & remove channel link
-			fmuEngine.AddChannelLink("master", "slave", 1, 2, 3, 4);
-			fmuEngine.AddChannelLink("master", "slave", 1, 2, 3, 4);
-			fmuEngine.RemoveChannelLink("master", "slave", 1, 2);
-			fmuEngine.RemoveChannelLink("master", "slave", 1, 2);
+			simulator.AddObserver(observer);
+			simulator.AddObserver(observer);	// double insert
+			simulator.RemoveObserver(observer);
+			simulator.RemoveObserver(observer);	// Remove Empty
 		}
 
 	};
diff --git a/BackendTests/WaitableQueueTest.cpp b/BackendTests/WaitableQueueTest.cpp
index 9c6519f..d295340 100644
--- a/BackendTests/WaitableQueueTest.cpp
+++ b/BackendTests/WaitableQueueTest.cpp
@@ -9,20 +9,20 @@ namespace BackendTests
     TEST_CLASS(WaitableQueueTest)
     {
     private:
-        Utility::WaitableQueue<int> _queue;
+        Utility::WaitableQueue<int> m_queue;
     public:
 
         TEST_METHOD_INITIALIZE(TestWaitableQueueInitialize)
         {
             // Boost none is expected
-            Assert::IsTrue(!_queue.pop());
+            Assert::IsTrue(!m_queue.pop());
         }
 
         void produce(int n)
         {
             for (int i = 0; i < n; i++)
             {
-                _queue.push(i);
+                m_queue.push(i);
             }
         }
 
@@ -30,7 +30,7 @@ namespace BackendTests
         {
             for (int i = 0; i < n; i++)
             {
-                Assert::AreEqual(i, _queue.wait_and_pop());
+                Assert::AreEqual(i, m_queue.wait_and_pop());
             }
         }
 
@@ -44,9 +44,22 @@ namespace BackendTests
             producer.join();
             consumer.join();
             // One more availabe to test regular pop
-            Assert::AreEqual(COUNT, _queue.pop().get());
+            Assert::AreEqual(COUNT, m_queue.pop().get());
+            // Test waitfor
+            std::thread popTimeout([&]()
+            {
+                Assert::IsTrue(!m_queue.waitfor_and_pop(100));
+            });
+            std::this_thread::sleep_for(std::chrono::milliseconds(300));
+            popTimeout.join();
+            std::thread popNoTimeout([&]()
+            {
+                Assert::AreEqual(42, m_queue.waitfor_and_pop(300).get());
+            });
+            m_queue.push(42);
+            popNoTimeout.join();
             // Nothing available
-            Assert::IsTrue(!_queue.pop());
+            Assert::IsTrue(!m_queue.pop());
         }
 
     };
diff --git a/UtilityClasses/QueueDispatcher.cpp b/UtilityClasses/QueueDispatcher.cpp
new file mode 100644
index 0000000..86eb599
--- /dev/null
+++ b/UtilityClasses/QueueDispatcher.cpp
@@ -0,0 +1,61 @@
+#include "QueueDispatcher.h"
+
+namespace Utility
+{
+
+    QueueDispatcher::QueueDispatcher()
+    {
+        m_manual = true;
+    }
+
+
+    QueueDispatcher::~QueueDispatcher()
+    {
+        // Stop execution
+        set_manual();
+    }
+    void QueueDispatcher::set_manual()
+    {
+        m_manual = true;
+        if (t_execution.joinable())
+        {
+            t_execution.join();
+        }
+    }
+    bool QueueDispatcher::execute_one()
+    {
+        if (m_manual)
+        {
+            if (auto action = m_queue.pop())
+            {
+                action.get().set_value();
+                return true;
+            }
+            else
+            {
+                return false;
+            }
+        }
+        else
+        {
+            return false;
+        }
+    }
+    void QueueDispatcher::set_automatic()
+    {
+        m_manual = false;
+        // Execute in new thread
+        t_execution = std::thread(std::bind(&QueueDispatcher::execute_loop, this));
+    }
+
+    void QueueDispatcher::execute_loop()
+    {
+        while (!m_manual)
+        {
+            if (auto action = m_queue.waitfor_and_pop(100))
+            {
+                action.get().set_value();
+            }
+        }
+    }
+}
\ No newline at end of file
diff --git a/UtilityClasses/QueueDispatcher.h b/UtilityClasses/QueueDispatcher.h
new file mode 100644
index 0000000..943a974
--- /dev/null
+++ b/UtilityClasses/QueueDispatcher.h
@@ -0,0 +1,52 @@
+#pragma once
+#include "WaitableQueue.h"
+#include <atomic>
+#include <future>
+#include <functional>
+#include <thread>
+#include <utility>
+
+namespace Utility
+{
+    /// Use this class to dispatch actions sequencially.
+    /// There are two modes available: Run automaic or execute manually.
+    class QueueDispatcher
+    {
+    public:
+        /// Creates a QueueDispacther in manual mode
+        QueueDispatcher();
+        // Only one dispatcher may be responsible
+        ~QueueDispatcher();
+
+        /// Switch to manual execution
+        void set_manual();
+        /// Execute one action, returns false if no action is available or automatic mode is active
+        bool execute_one();
+        /// Switch to automatic execution
+        void set_automatic();
+
+        /// Pushes the action to the execuion queue.
+        template <typename T>
+        std::future<T> push(std::function<T()> fn)
+        {
+            // Wrapper that waits for a signal
+            auto wrapperFn = [](std::function<T()> fn, std::future<void> signal)
+            {
+                signal.wait();
+                return fn();
+            };
+            std::promise<void> signaler;
+            // Get the async operation ready before adding the signaler
+            std::future<T> future = async(std::launch::async, wrapperFn, fn, signaler.get_future());
+            m_queue.push(std::move(signaler));
+            return future;
+        }
+
+    private:
+        /// Signal by setting the promises
+        WaitableQueue<std::promise<void>> m_queue;
+        std::atomic_bool m_manual;
+        std::thread t_execution;
+        void execute_loop();
+    };
+}
\ No newline at end of file
diff --git a/UtilityClasses/UtilityClasses.vcxitems b/UtilityClasses/UtilityClasses.vcxitems
index 358818e..637c601 100644
--- a/UtilityClasses/UtilityClasses.vcxitems
+++ b/UtilityClasses/UtilityClasses.vcxitems
@@ -15,6 +15,10 @@
     <ProjectCapability Include="SourceItemsFromImports" />
   </ItemGroup>
   <ItemGroup>
+    <ClInclude Include="$(MSBuildThisFileDirectory)QueueDispatcher.h" />
     <ClInclude Include="$(MSBuildThisFileDirectory)WaitableQueue.h" />
   </ItemGroup>
+  <ItemGroup>
+    <ClCompile Include="$(MSBuildThisFileDirectory)QueueDispatcher.cpp" />
+  </ItemGroup>
 </Project>
\ No newline at end of file
diff --git a/UtilityClasses/WaitableQueue.h b/UtilityClasses/WaitableQueue.h
index cf5cf87..eeb03a9 100644
--- a/UtilityClasses/WaitableQueue.h
+++ b/UtilityClasses/WaitableQueue.h
@@ -13,7 +13,7 @@ namespace Utility
     class WaitableQueue
     {
     private:
-        std::queue<T> _queue;
+        std::queue<T> m_queue;
         std::mutex _mutex;
         std::condition_variable _cond_var;
 
@@ -33,17 +33,38 @@ namespace Utility
         boost::optional<T> pop()
         {
             std::unique_lock<std::mutex> lock(_mutex);
-            if (_queue.empty())
+            if (m_queue.empty())
             {
                 return boost::none;
             }
             else
             {
-                auto item = _queue.front();
-                _queue.pop();
+                auto item = std::move(m_queue.front());
+                m_queue.pop();
                 return item;
             }
         }
+        boost::optional<T> waitfor_and_pop(long milliseconds)
+        {
+            // Wait for signal
+            std::unique_lock<std::mutex> lock(_mutex);
+            // Prevent spurious wakeups by checking the queue
+            if (_cond_var.wait_for(lock, std::chrono::milliseconds(milliseconds), [&]()
+            {
+                return !m_queue.empty();
+            }))
+            {
+                // Return the front
+                auto item = std::move(m_queue.front());
+                m_queue.pop();
+                return item;
+            }
+            else
+            {
+                // Timed out
+                return boost::none;
+            }
+        }
         /// Waits for data to become available and returns the first element.
         T wait_and_pop()
         {
@@ -52,11 +73,11 @@ namespace Utility
             // Prevent spurious wakeups by checking the queue
             _cond_var.wait(lock, [&]()
             {
-                return !_queue.empty();
+                return !m_queue.empty();
             });
             // Return the front
-            auto item = _queue.front();
-            _queue.pop();
+            auto item = std::move(m_queue.front());
+            m_queue.pop();
             return item;
         }
         /// Add an item to the end of the queue
@@ -64,7 +85,7 @@ namespace Utility
         {
             executeAndNotify([&]()
             {
-                _queue.push(item);
+                m_queue.push(item);
             });
         }
         /// Add an item to the end of the queue
@@ -72,7 +93,7 @@ namespace Utility
         {
             executeAndNotify([&]()
             {
-                _queue.push(std::move(item));
+                m_queue.push(std::move(item));
             });
         }
     };
-- 
GitLab