Select Git revision
ConcurrentQueue.h
Tim Übelhör authored
Code owners
Assign users and groups as approvers for specific file changes. Learn more.
ConcurrentQueue.h 3.93 KiB
#pragma once
#include "boost/optional.hpp"
#include <condition_variable>
#include <functional>
#include <queue>
#include <mutex>
namespace Utility
{
/// This class enables thread safe access to the queue.
/// It uses conditional variables to for efficient waiting.
/// No hard exception safety for better interface.
template <typename T>
class ConcurrentQueue
{
private:
bool m_finished = false;
std::queue<T> m_queue;
std::mutex _mutex;
std::condition_variable _cond_var;
// Functional style for boilerplate
/// Locks the mutex before executing the action.
/// After the execution has finished the conditional variable will be notified.
void execute_and_notify(std::function<void()> action)
{
std::unique_lock<std::mutex> lock(_mutex);
action();
lock.unlock();
_cond_var.notify_one();
}
public:
/// Returns true if finish() has been called.
bool is_finished()
{
return m_finished;
}
/// Immediately returns the item from the front (or boost::none).
boost::optional<T> pop()
{
std::unique_lock<std::mutex> lock(_mutex);
if (m_queue.empty())
{
return boost::none;
}
else
{
auto item = std::move(m_queue.front());
m_queue.pop();
return item;
}
}
boost::optional<T> waitfor_and_pop(long milliseconds)
{
// Wait for signal
std::unique_lock<std::mutex> lock(_mutex);
// Prevent spurious wakeups by checking the queue
while (m_queue.empty())
{
if (std::cv_status::timeout == _cond_var.wait_for(lock, std::chrono::milliseconds(milliseconds)))
{
// Return none on timeout
return boost::none;
}
if (is_finished() && m_queue.empty())
{
// Return if the queue finished (no more values will be added)
return boost::none;
} }
// Return the front
auto item = std::move(m_queue.front());
m_queue.pop();
return item;
}
/// Waits for data to become available and returns the first element.
/// Returnes none if finish() has been called.
boost::optional<T> wait_and_pop()
{
// Wait for signal
std::unique_lock<std::mutex> lock(_mutex);
// Prevent spurious wakeups by checking the queue
while (m_queue.empty())
{
_cond_var.wait(lock);
// If finish unblocked an empty queue
if (is_finished() && m_queue.empty())
{
return boost::none;
}
}
// Return the front
auto item = std::move(m_queue.front());
m_queue.pop();
return item;
}
/// Add an item to the end of the queue
void push(const T& item)
{
if (!m_finished)
{
execute_and_notify([&]()
{
m_queue.push(item);
});
}
}
/// Add an item to the end of the queue
void push(T&& item)
{
if (!m_finished)
{
execute_and_notify([&]()
{
m_queue.push(std::move(item));
});
}
}
/// The queue won't accept any new items.
/// Will finish the currently running pop request.
void finish()
{
// Notify to cancel pending wait_and_pop
execute_and_notify([&]() { m_finished = true; });
}
};
}