diff options
Diffstat (limited to 'src')
-rw-r--r-- | src/common/Threading/PCQueue.h | 56 | ||||
-rw-r--r-- | src/server/game/Maps/MapUpdater.cpp | 62 | ||||
-rw-r--r-- | src/server/game/Maps/MapUpdater.h | 11 |
3 files changed, 62 insertions, 67 deletions
diff --git a/src/common/Threading/PCQueue.h b/src/common/Threading/PCQueue.h index 1efb494ba0..57ea0b2d34 100644 --- a/src/common/Threading/PCQueue.h +++ b/src/common/Threading/PCQueue.h @@ -20,51 +20,51 @@ #include <condition_variable> #include <queue> +#include <atomic> +#include <mutex> template <typename T> class ProducerConsumerQueue { private: - std::mutex _queueLock; + mutable std::mutex _queueLock; std::queue<T> _queue; std::condition_variable _condition; - std::atomic<bool> _cancel; - std::atomic<bool> _shutdown; + std::atomic<bool> _cancel{}; + std::atomic<bool> _shutdown{}; public: - ProducerConsumerQueue() : _cancel(false), _shutdown(false) { } + ProducerConsumerQueue() = default; void Push(const T& value) { - std::lock_guard<std::mutex> lock(_queueLock); - _queue.push(std::move(value)); - + { + std::lock_guard<std::mutex> lock(_queueLock); + _queue.push(std::move(value)); + } _condition.notify_one(); } - bool Empty() + bool Empty() const { std::lock_guard<std::mutex> lock(_queueLock); - return _queue.empty(); } [[nodiscard]] std::size_t Size() const { + std::lock_guard<std::mutex> lock(_queueLock); return _queue.size(); } bool Pop(T& value) { std::lock_guard<std::mutex> lock(_queueLock); - if (_queue.empty() || _cancel) return false; - value = _queue.front(); - + value = std::move(_queue.front()); _queue.pop(); - return true; } @@ -72,39 +72,30 @@ public: { std::unique_lock<std::mutex> lock(_queueLock); - // we could be using .wait(lock, predicate) overload here but it is broken - // https://connect.microsoft.com/VisualStudio/feedback/details/1098841 - while (_queue.empty() && !_cancel && !_shutdown) - _condition.wait(lock); + // Wait for the queue to have an element or the cancel/shutdown flag + _condition.wait(lock, [this] { return !_queue.empty() || _cancel || _shutdown; }); if (_queue.empty() || _cancel) return; - value = _queue.front(); - + value = std::move(_queue.front()); _queue.pop(); } - // Clears the queue and will immediately stop any consumers + // Clears the queue and immediately stops any consumers. void Cancel() { - std::unique_lock<std::mutex> lock(_queueLock); - - while (!_queue.empty()) - { + std::lock_guard<std::mutex> lock(_queueLock); + while (!_queue.empty()) { T& value = _queue.front(); - DeleteQueuedObject(value); - _queue.pop(); } - _cancel = true; - _condition.notify_all(); } - // Graceful stop, will wait for queue to become empty before stopping consumers + // Graceful stop: waits for the queue to become empty before stopping consumers. void Shutdown() { _shutdown = true; @@ -113,10 +104,13 @@ public: private: template<typename E = T> - typename std::enable_if<std::is_pointer<E>::value>::type DeleteQueuedObject(E& obj) { delete obj; } + typename std::enable_if<std::is_pointer<E>::value>::type DeleteQueuedObject(E& obj) + { + delete obj; + } template<typename E = T> - typename std::enable_if<!std::is_pointer<E>::value>::type DeleteQueuedObject(E const& /*packet*/) { } + typename std::enable_if<!std::is_pointer<E>::value>::type DeleteQueuedObject(E const& /*obj*/) { } }; #endif diff --git a/src/server/game/Maps/MapUpdater.cpp b/src/server/game/Maps/MapUpdater.cpp index 552b48d7e3..8f8704da5b 100644 --- a/src/server/game/Maps/MapUpdater.cpp +++ b/src/server/game/Maps/MapUpdater.cpp @@ -67,7 +67,7 @@ private: uint32 m_diff; }; -MapUpdater::MapUpdater(): pending_requests(0) +MapUpdater::MapUpdater() : pending_requests(0), _cancelationToken(false) { } @@ -84,10 +84,11 @@ void MapUpdater::deactivate() { _cancelationToken = true; - wait(); + wait(); // This is where we wait for tasks to complete - _queue.Cancel(); + _queue.Cancel(); // Cancel the queue to prevent further task processing + // Join all worker threads for (auto& thread : _workerThreads) { if (thread.joinable()) @@ -99,44 +100,45 @@ void MapUpdater::deactivate() void MapUpdater::wait() { - std::unique_lock<std::mutex> guard(_lock); + std::unique_lock<std::mutex> guard(_lock); // Guard lock for safe waiting - while (pending_requests > 0) - _condition.wait(guard); + // Wait until there are no pending requests + _condition.wait(guard, [this] { + return pending_requests.load(std::memory_order_acquire) == 0; + }); +} - guard.unlock(); +void MapUpdater::schedule_task(UpdateRequest* request) +{ + // Atomic increment for pending_requests + pending_requests.fetch_add(1, std::memory_order_release); + _queue.Push(request); } void MapUpdater::schedule_update(Map& map, uint32 diff, uint32 s_diff) { - std::lock_guard<std::mutex> guard(_lock); - - ++pending_requests; - - _queue.Push(new MapUpdateRequest(map, *this, diff, s_diff)); + schedule_task(new MapUpdateRequest(map, *this, diff, s_diff)); } void MapUpdater::schedule_lfg_update(uint32 diff) { - std::lock_guard<std::mutex> guard(_lock); - - ++pending_requests; - - _queue.Push(new LFGUpdateRequest(*this, diff)); + schedule_task(new LFGUpdateRequest(*this, diff)); } bool MapUpdater::activated() { - return _workerThreads.size() > 0; + return !_workerThreads.empty(); } void MapUpdater::update_finished() { - std::lock_guard<std::mutex> lock(_lock); - - --pending_requests; - - _condition.notify_all(); + // Atomic decrement for pending_requests + if (pending_requests.fetch_sub(1, std::memory_order_acquire) == 1) + { + // Only notify when pending_requests becomes 0 (i.e., all tasks are finished) + std::lock_guard<std::mutex> lock(_lock); // Lock only for condition variable notification + _condition.notify_all(); // Notify waiting threads that all requests are complete + } } void MapUpdater::WorkerThread() @@ -145,16 +147,16 @@ void MapUpdater::WorkerThread() CharacterDatabase.WarnAboutSyncQueries(true); WorldDatabase.WarnAboutSyncQueries(true); - while (1) + while (!_cancelationToken) { UpdateRequest* request = nullptr; - _queue.WaitAndPop(request); - if (_cancelationToken) - return; + _queue.WaitAndPop(request); // Wait for and pop a request from the queue - request->call(); - - delete request; + if (!_cancelationToken && request) + { + request->call(); // Execute the request + delete request; // Clean up after processing + } } } diff --git a/src/server/game/Maps/MapUpdater.h b/src/server/game/Maps/MapUpdater.h index e65cb01939..174cef9771 100644 --- a/src/server/game/Maps/MapUpdater.h +++ b/src/server/game/Maps/MapUpdater.h @@ -22,6 +22,7 @@ #include "PCQueue.h" #include <condition_variable> #include <thread> +#include <atomic> class Map; class UpdateRequest; @@ -32,6 +33,7 @@ public: MapUpdater(); ~MapUpdater() = default; + void schedule_task(UpdateRequest* request); void schedule_update(Map& map, uint32 diff, uint32 s_diff); void schedule_lfg_update(uint32 diff); void wait(); @@ -42,15 +44,12 @@ public: private: void WorkerThread(); - ProducerConsumerQueue<UpdateRequest*> _queue; - + std::atomic<int> pending_requests; // Use std::atomic for pending_requests to avoid lock contention + std::atomic<bool> _cancelationToken; // Atomic flag for cancellation to avoid race conditions std::vector<std::thread> _workerThreads; - std::atomic<bool> _cancelationToken; - - std::mutex _lock; + std::mutex _lock; // Mutex and condition variable for synchronization std::condition_variable _condition; - std::size_t pending_requests; }; #endif //_MAP_UPDATER_H_INCLUDED |