Skip to content
Snippets Groups Projects
Select Git revision
  • f508ac00a8c9782285fc97baa71b2ee68ff37046
  • master default protected
  • refactor_naming
  • grpc
  • State_Machine
5 results

ConcurrentQueue.h

Blame
  • Code owners
    Assign users and groups as approvers for specific file changes. Learn more.
    ConcurrentQueue.h 3.93 KiB
    #pragma once
    #include "boost/optional.hpp"
    #include <condition_variable>
    #include <functional>
    #include <queue>
    #include <mutex>
    
    namespace Utility
    {
        /// This class enables thread safe access to the queue.
        /// It uses conditional variables to for efficient waiting.
        /// No hard exception safety for better interface.
        template <typename T>
        class ConcurrentQueue
        {
        private:
            bool m_finished = false;
            std::queue<T> m_queue;
            std::mutex _mutex;
            std::condition_variable _cond_var;
    
            // Functional style for boilerplate
            /// Locks the mutex before executing the action.
            /// After the execution has finished the conditional variable will be notified.
            void execute_and_notify(std::function<void()> action)
            {
                std::unique_lock<std::mutex> lock(_mutex);
                action();
                lock.unlock();
                _cond_var.notify_one();
            }
    
        public:
            /// Returns true if finish() has been called.
            bool is_finished()
            {
                return m_finished;
            }
            /// Immediately returns the item from the front (or boost::none).
            boost::optional<T> pop()
            {
                std::unique_lock<std::mutex> lock(_mutex);
                if (m_queue.empty())
                {
                    return boost::none;
                }
                else
                {
                    auto item = std::move(m_queue.front());
                    m_queue.pop();
                    return item;
                }
            }
            boost::optional<T> waitfor_and_pop(long milliseconds)
            {
                // Wait for signal
                std::unique_lock<std::mutex> lock(_mutex);
                // Prevent spurious wakeups by checking the queue
                while (m_queue.empty())
                {
                    if (std::cv_status::timeout == _cond_var.wait_for(lock, std::chrono::milliseconds(milliseconds)))
                    {
                        // Return none on timeout
                        return boost::none;
                    }
                    if (is_finished() && m_queue.empty())
                    {
                        // Return if the queue finished (no more values will be added)
                        return boost::none;
                    }
                }
                // Return the front
                auto item = std::move(m_queue.front());
                m_queue.pop();
                return item;
            }
            /// Waits for data to become available and returns the first element.
            /// Returnes none if finish() has been called.
            boost::optional<T> wait_and_pop()
            {
                // Wait for signal
                std::unique_lock<std::mutex> lock(_mutex);
                // Prevent spurious wakeups by checking the queue
                while (m_queue.empty())
                {
                    _cond_var.wait(lock);
                    // If finish unblocked an empty queue
                    if (is_finished() && m_queue.empty())
                    {
                        return boost::none;
                    }
                }
                // Return the front
                auto item = std::move(m_queue.front());
                m_queue.pop();
                return item;
            }
            /// Add an item to the end of the queue
            void push(const T& item)
            {
                if (!m_finished)
                {
                    execute_and_notify([&]()
                    {
                        m_queue.push(item);
                    });
                }
            }
    
            /// Add an item to the end of the queue
            void push(T&& item)
            {
                if (!m_finished)
                {
                    execute_and_notify([&]()
                    {
                        m_queue.push(std::move(item));
                    });
                }
            }
    
            /// The queue won't accept any new items.
            /// Will finish the currently running pop request.
            void finish()
            {            
                // Notify to cancel pending wait_and_pop
                execute_and_notify([&]() { m_finished = true; });
            }
        };
    }