summaryrefslogtreecommitdiff
path: root/src/common
diff options
context:
space:
mode:
Diffstat (limited to 'src/common')
-rw-r--r--src/common/Threading/PCQueue.h25
1 files changed, 14 insertions, 11 deletions
diff --git a/src/common/Threading/PCQueue.h b/src/common/Threading/PCQueue.h
index 57f3b08efb..1efb494ba0 100644
--- a/src/common/Threading/PCQueue.h
+++ b/src/common/Threading/PCQueue.h
@@ -28,10 +28,11 @@ private:
std::mutex _queueLock;
std::queue<T> _queue;
std::condition_variable _condition;
+ std::atomic<bool> _cancel;
std::atomic<bool> _shutdown;
public:
- ProducerConsumerQueue() : _shutdown(false) { }
+ ProducerConsumerQueue() : _cancel(false), _shutdown(false) { }
void Push(const T& value)
{
@@ -57,10 +58,8 @@ public:
{
std::lock_guard<std::mutex> lock(_queueLock);
- if (_queue.empty() || _shutdown)
- {
+ if (_queue.empty() || _cancel)
return false;
- }
value = _queue.front();
@@ -75,21 +74,18 @@ public:
// we could be using .wait(lock, predicate) overload here but it is broken
// https://connect.microsoft.com/VisualStudio/feedback/details/1098841
- while (_queue.empty() && !_shutdown)
- {
+ while (_queue.empty() && !_cancel && !_shutdown)
_condition.wait(lock);
- }
- if (_queue.empty() || _shutdown)
- {
+ if (_queue.empty() || _cancel)
return;
- }
value = _queue.front();
_queue.pop();
}
+ // Clears the queue and will immediately stop any consumers
void Cancel()
{
std::unique_lock<std::mutex> lock(_queueLock);
@@ -103,8 +99,15 @@ public:
_queue.pop();
}
- _shutdown = true;
+ _cancel = true;
+
+ _condition.notify_all();
+ }
+ // Graceful stop, will wait for queue to become empty before stopping consumers
+ void Shutdown()
+ {
+ _shutdown = true;
_condition.notify_all();
}