diff options
Diffstat (limited to 'src/common')
-rw-r--r-- | src/common/Threading/LockedQueue.h | 128 | ||||
-rw-r--r-- | src/common/Threading/MPSCQueue.h | 149 |
2 files changed, 189 insertions, 88 deletions
diff --git a/src/common/Threading/LockedQueue.h b/src/common/Threading/LockedQueue.h index 04ff6fca8e..76266a6b21 100644 --- a/src/common/Threading/LockedQueue.h +++ b/src/common/Threading/LockedQueue.h @@ -18,40 +18,49 @@ #ifndef LOCKEDQUEUE_H #define LOCKEDQUEUE_H +#include <atomic> #include <deque> +#include <memory> #include <mutex> template <class T, typename StorageType = std::deque<T>> class LockedQueue { - //! Lock access to the queue. - std::mutex _lock; + mutable std::mutex _lock; ///< Mutex to protect access to the queue - //! Storage backing the queue. - StorageType _queue; + std::atomic<bool> _canceled{false}; ///< Flag indicating if the queue is canceled - //! Cancellation flag. - volatile bool _canceled{false}; + StorageType _queue; ///< Storage container for the queue public: - //! Create a LockedQueue. + /** + * @brief Default constructor to create an empty LockedQueue. + */ LockedQueue() = default; - //! Destroy a LockedQueue. + /** + * @brief Destructor for LockedQueue. + */ virtual ~LockedQueue() = default; - //! Adds an item to the queue. + /** + * @brief Adds an item to the back of the queue. + * + * @param item The item to be added to the queue. + */ void add(const T& item) { - lock(); - - _queue.push_back(item); - - unlock(); + std::lock_guard<std::mutex> lock(_lock); + _queue.push_back(std::move(item)); } - //! Adds items back to front of the queue + /** + * @brief Adds a range of items to the front of the queue. + * + * @param begin Iterator pointing to the beginning of the range of items to be added. + * @param end Iterator pointing to the end of the range of items to be added. + */ template<class Iterator> void readd(Iterator begin, Iterator end) { @@ -59,33 +68,42 @@ public: _queue.insert(_queue.begin(), begin, end); } - //! Gets the next result in the queue, if any. + /** + * @brief Gets the next item in the queue and removes it. + * + * @param result The variable where the next item will be stored. + * @return true if an item was retrieved and removed, false if the queue is empty. + */ bool next(T& result) { std::lock_guard<std::mutex> lock(_lock); - if (_queue.empty()) { return false; } - result = _queue.front(); + result = std::move(_queue.front()); _queue.pop_front(); - return true; } + /** + * @brief Retrieves the next item from the queue if it satisfies the provided checker. + * + * @param result The variable where the next item will be stored. + * @param check A checker object that will be used to validate the item. + * @return true if an item was retrieved, checked, and removed; false otherwise. + */ template<class Checker> bool next(T& result, Checker& check) { std::lock_guard<std::mutex> lock(_lock); - if (_queue.empty()) { return false; } - result = _queue.front(); + result = std::move(_queue.front()); if (!check.Process(result)) { return false; @@ -95,60 +113,54 @@ public: return true; } - //! Peeks at the top of the queue. Check if the queue is empty before calling! Remember to unlock after use if autoUnlock == false. - T& peek(bool autoUnlock = false) - { - lock(); - - T& result = _queue.front(); - - if (autoUnlock) - { - unlock(); - } - - return result; - } - - //! Cancels the queue. - void cancel() + /** + * @brief Peeks at the top of the queue without removing it. + * + * @return A reference to the item at the front of the queue, assuming there's an item in the queue (as per previous implementation) + */ + T& peek() { std::lock_guard<std::mutex> lock(_lock); - - _canceled = true; + return _queue.front(); } - //! Checks if the queue is cancelled. - bool cancelled() + /** + * @brief Cancels the queue, preventing further processing of items. + */ + void cancel() { - std::lock_guard<std::mutex> lock(_lock); - return _canceled; + _canceled.store(true, std::memory_order_release); } - //! Locks the queue for access. - void lock() + /** + * @brief Checks if the queue has been canceled. + * + * @return true if the queue is canceled, false otherwise. + */ + bool cancelled() const { - this->_lock.lock(); + return _canceled.load(std::memory_order_acquire); } - //! Unlocks the queue. - void unlock() + /** + * @brief Checks if the queue is empty. + * + * @return true if the queue is empty, false otherwise. + */ + bool empty() const { - this->_lock.unlock(); + std::lock_guard<std::mutex> lock(_lock); + return _queue.empty(); } - ///! Calls pop_front of the queue + /** + * @brief Removes the item at the front of the queue. + */ void pop_front() { std::lock_guard<std::mutex> lock(_lock); _queue.pop_front(); } - - ///! Checks if we're empty or not with locks held - bool empty() - { - std::lock_guard<std::mutex> lock(_lock); - return _queue.empty(); - } }; + #endif diff --git a/src/common/Threading/MPSCQueue.h b/src/common/Threading/MPSCQueue.h index 15176178b0..2bc2ec8262 100644 --- a/src/common/Threading/MPSCQueue.h +++ b/src/common/Threading/MPSCQueue.h @@ -19,41 +19,81 @@ #define MPSCQueue_h__ #include <atomic> +#include <memory> namespace Acore::Impl { - // C++ implementation of Dmitry Vyukov's lock free MPSC queue - // http://www.1024cores.net/home/lock-free-algorithms/queues/non-intrusive-mpsc-node-based-queue + /** + * @brief C++ implementation of Dmitry Vyukov's lock-free MPSC queue (Non-Intrusive). + * + * This queue allows multiple producers to enqueue items concurrently, but only one consumer + * can dequeue items. The queue is lock-free and non-intrusive, meaning it does not modify + * the data types that are enqueued. + * + * @tparam T The type of data that is being enqueued in the queue. + */ template<typename T> class MPSCQueueNonIntrusive { public: - MPSCQueueNonIntrusive() : _head(new Node()), _tail(_head.load(std::memory_order_relaxed)) + /** + * @brief Constructs a new MPSCQueueNonIntrusive object. + * + * Initializes the queue with a dummy node and sets up atomic pointers to the head and tail. + */ + MPSCQueueNonIntrusive() + : _head(new Node(nullptr)), _tail(_head.load(std::memory_order_acquire)) { - Node* front = _head.load(std::memory_order_relaxed); - front->Next.store(nullptr, std::memory_order_relaxed); + Node* front = _head.load(std::memory_order_acquire); + front->Next.store(nullptr, std::memory_order_release); ///< Store with release to ensure visibility } + /** + * @brief Destroys the MPSCQueueNonIntrusive object. + * + * Dequeues all items and deletes them, followed by proper cleanup of remaining nodes in the queue. + */ ~MPSCQueueNonIntrusive() { T* output; while (Dequeue(output)) delete output; - Node* front = _head.load(std::memory_order_relaxed); - delete front; + // Properly delete remaining nodes + Node* front = _head.load(std::memory_order_acquire); + while (front) + { + Node* next = front->Next.load(std::memory_order_acquire); + delete front; + front = next; + } } + /** + * @brief Enqueues a new item in the queue. + * + * This function adds a new item at the head of the queue. + * + * @param input Pointer to the item to be enqueued. + */ void Enqueue(T* input) { Node* node = new Node(input); - Node* prevHead = _head.exchange(node, std::memory_order_acq_rel); + Node* prevHead = _head.exchange(node, std::memory_order_acq_rel); ///< Exchange with acquire-release semantics prevHead->Next.store(node, std::memory_order_release); } + /** + * @brief Dequeues an item from the queue. + * + * This function removes the item at the front of the queue and returns it. + * + * @param result Reference to a pointer where the dequeued item will be stored. + * @return True if an item was successfully dequeued, false if the queue was empty. + */ bool Dequeue(T*& result) { - Node* tail = _tail.load(std::memory_order_relaxed); + Node* tail = _tail.load(std::memory_order_acquire); Node* next = tail->Next.load(std::memory_order_acquire); if (!next) return false; @@ -65,39 +105,63 @@ namespace Acore::Impl } private: + /** + * @brief A structure representing a node in the queue. + * + * Each node holds a pointer to data and an atomic pointer to the next node in the queue. + */ struct Node { Node() = default; explicit Node(T* data) : Data(data) { - Next.store(nullptr, std::memory_order_relaxed); + Next.store(nullptr, std::memory_order_release); } - T* Data; - std::atomic<Node*> Next; + T* Data; ///< Data stored in the node + std::atomic<Node*> Next; ///< Atomic pointer to the next node }; - std::atomic<Node*> _head; - std::atomic<Node*> _tail; + std::atomic<Node*> _head; ///< Atomic pointer to the head node of the queue + std::atomic<Node*> _tail; ///< Atomic pointer to the tail node of the queue - MPSCQueueNonIntrusive(MPSCQueueNonIntrusive const&) = delete; - MPSCQueueNonIntrusive& operator=(MPSCQueueNonIntrusive const&) = delete; + MPSCQueueNonIntrusive(MPSCQueueNonIntrusive const&) = delete; ///< Deleted copy constructor + MPSCQueueNonIntrusive& operator=(MPSCQueueNonIntrusive const&) = delete; ///< Deleted copy assignment operator }; - // C++ implementation of Dmitry Vyukov's lock free MPSC queue - // http://www.1024cores.net/home/lock-free-algorithms/queues/intrusive-mpsc-node-based-queue + /** + * @brief C++ implementation of Dmitry Vyukov's lock-free MPSC queue (Intrusive). + * + * This queue allows multiple producers to enqueue items concurrently, but only one consumer + * can dequeue items. The queue is lock-free and intrusive, meaning that the enqueued objects + * must have an atomic link to the next item in the queue. + * + * @tparam T The type of data that is being enqueued in the queue. + * @tparam IntrusiveLink A member pointer to the atomic link used for linking the nodes. + */ template<typename T, std::atomic<T*> T::* IntrusiveLink> class MPSCQueueIntrusive { public: - MPSCQueueIntrusive() : _dummyPtr(reinterpret_cast<T*>(std::addressof(_dummy))), _head(_dummyPtr), _tail(_dummyPtr) + /** + * @brief Constructs a new MPSCQueueIntrusive object. + * + * Initializes the queue with a dummy node and sets up atomic pointers to the head and tail. + * The dummy node's atomic link is initialized. + */ + MPSCQueueIntrusive() + : _dummyPtr(reinterpret_cast<T*>(std::addressof(_dummy))), _head(_dummyPtr), _tail(_dummyPtr) { - // _dummy is constructed from aligned_storage and is intentionally left uninitialized (it might not be default constructible) - // so we init only its IntrusiveLink here + // Initialize the intrusive link in the dummy node std::atomic<T*>* dummyNext = new (&(_dummyPtr->*IntrusiveLink)) std::atomic<T*>(); - dummyNext->store(nullptr, std::memory_order_relaxed); + dummyNext->store(nullptr, std::memory_order_release); } + /** + * @brief Destroys the MPSCQueueIntrusive object. + * + * Dequeues all items and deletes them. + */ ~MPSCQueueIntrusive() { T* output; @@ -105,16 +169,32 @@ namespace Acore::Impl delete output; } + /** + * @brief Enqueues a new item in the queue. + * + * This function adds a new item at the head of the queue. + * + * @param input Pointer to the item to be enqueued. + */ void Enqueue(T* input) { + // Set the next link to nullptr initially (input->*IntrusiveLink).store(nullptr, std::memory_order_release); - T* prevHead = _head.exchange(input, std::memory_order_acq_rel); + T* prevHead = _head.exchange(input, std::memory_order_acq_rel); ///< Exchange with acquire-release semantics (prevHead->*IntrusiveLink).store(input, std::memory_order_release); } + /** + * @brief Dequeues an item from the queue. + * + * This function removes the item at the front of the queue and returns it. + * + * @param result Reference to a pointer where the dequeued item will be stored. + * @return True if an item was successfully dequeued, false if the queue was empty. + */ bool Dequeue(T*& result) { - T* tail = _tail.load(std::memory_order_relaxed); + T* tail = _tail.load(std::memory_order_acquire); T* next = (tail->*IntrusiveLink).load(std::memory_order_acquire); if (tail == _dummyPtr) { @@ -149,16 +229,25 @@ namespace Acore::Impl } private: - std::aligned_storage_t<sizeof(T), alignof(T)> _dummy; - T* _dummyPtr; - std::atomic<T*> _head; - std::atomic<T*> _tail; + std::aligned_storage_t<sizeof(T), alignof(T)> _dummy; ///< Storage for the dummy object + T* _dummyPtr; ///< Pointer to the dummy object + std::atomic<T*> _head; ///< Atomic pointer to the head node of the queue + std::atomic<T*> _tail; ///< Atomic pointer to the tail node of the queue - MPSCQueueIntrusive(MPSCQueueIntrusive const&) = delete; - MPSCQueueIntrusive& operator=(MPSCQueueIntrusive const&) = delete; + MPSCQueueIntrusive(MPSCQueueIntrusive const&) = delete; ///< Deleted copy constructor + MPSCQueueIntrusive& operator=(MPSCQueueIntrusive const&) = delete; ///< Deleted copy assignment operator }; } +/** + * @brief Conditional type alias for MPSCQueue. + * + * This alias provides the appropriate type of MPSCQueue based on whether the queue is intrusive + * or non-intrusive. + * + * @tparam T The type of data that is being enqueued in the queue. + * @tparam IntrusiveLink If provided, the queue will be intrusive, otherwise, it will be non-intrusive. + */ template<typename T, std::atomic<T*> T::* IntrusiveLink = nullptr> using MPSCQueue = std::conditional_t<IntrusiveLink != nullptr, Acore::Impl::MPSCQueueIntrusive<T, IntrusiveLink>, Acore::Impl::MPSCQueueNonIntrusive<T>>; |