diff options
Diffstat (limited to 'src/common/Threading')
| -rw-r--r-- | src/common/Threading/Callback.h | 209 | ||||
| -rw-r--r-- | src/common/Threading/LockedQueue.h | 8 | ||||
| -rw-r--r-- | src/common/Threading/MPSCQueue.h | 83 |
3 files changed, 91 insertions, 209 deletions
diff --git a/src/common/Threading/Callback.h b/src/common/Threading/Callback.h deleted file mode 100644 index f7eab57ddda..00000000000 --- a/src/common/Threading/Callback.h +++ /dev/null @@ -1,209 +0,0 @@ -/* - * Copyright (C) 2008-2016 TrinityCore <http://www.trinitycore.org/> - * - * This program is free software; you can redistribute it and/or modify it - * under the terms of the GNU General Public License as published by the - * Free Software Foundation; either version 2 of the License, or (at your - * option) any later version. - * - * This program is distributed in the hope that it will be useful, but WITHOUT - * ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or - * FITNESS FOR A PARTICULAR PURPOSE. See the GNU General Public License for - * more details. - * - * You should have received a copy of the GNU General Public License along - * with this program. If not, see <http://www.gnu.org/licenses/>. - */ - -#ifndef _CALLBACK_H -#define _CALLBACK_H - -#include <future> -#include "QueryResult.h" - -typedef std::future<QueryResult> QueryResultFuture; -typedef std::promise<QueryResult> QueryResultPromise; -typedef std::future<PreparedQueryResult> PreparedQueryResultFuture; -typedef std::promise<PreparedQueryResult> PreparedQueryResultPromise; - -#define CALLBACK_STAGE_INVALID uint8(-1) - -template <typename Result, typename ParamType, bool chain = false> -class QueryCallback -{ - public: - QueryCallback() : _param(), _stage(chain ? 0 : CALLBACK_STAGE_INVALID) { } - - //! The parameter of this function should be a resultset returned from either .AsyncQuery or .AsyncPQuery - void SetFutureResult(std::future<Result> value) - { - _result = std::move(value); - } - - std::future<Result>& GetFutureResult() - { - return _result; - } - - int IsReady() - { - return _result.valid() && _result.wait_for(std::chrono::seconds(0)) == std::future_status::ready; - } - - void GetResult(Result& res) - { - res = _result.get(); - } - - void FreeResult() - { - // Nothing to do here, the constructor of std::future will take care of the cleanup - } - - void SetParam(ParamType value) - { - _param = value; - } - - ParamType GetParam() - { - return _param; - } - - //! Resets the stage of the callback chain - void ResetStage() - { - if (!chain) - return; - - _stage = 0; - } - - //! Advances the callback chain to the next stage, so upper level code can act on its results accordingly - void NextStage() - { - if (!chain) - return; - - ++_stage; - } - - //! Returns the callback stage (or CALLBACK_STAGE_INVALID if invalid) - uint8 GetStage() - { - return _stage; - } - - //! Resets all underlying variables (param, result and stage) - void Reset() - { - SetParam(ParamType()); - FreeResult(); - ResetStage(); - } - - private: - std::future<Result> _result; - ParamType _param; - uint8 _stage; - - QueryCallback(QueryCallback const& right) = delete; - QueryCallback& operator=(QueryCallback const& right) = delete; -}; - -template <typename Result, typename ParamType1, typename ParamType2, bool chain = false> -class QueryCallback_2 -{ - public: - QueryCallback_2() : _stage(chain ? 0 : CALLBACK_STAGE_INVALID) { } - - //! The parameter of this function should be a resultset returned from either .AsyncQuery or .AsyncPQuery - void SetFutureResult(std::future<Result> value) - { - _result = std::move(value); - } - - std::future<Result>& GetFutureResult() - { - return _result; - } - - int IsReady() - { - return _result.valid() && _result.wait_for(std::chrono::seconds(0)) == std::future_status::ready; - } - - void GetResult(Result& res) - { - res = _result.get(); - } - - void FreeResult() - { - // Nothing to do here, the constructor of std::future will take care of the cleanup - } - - void SetFirstParam(ParamType1 value) - { - _param_1 = value; - } - - void SetSecondParam(ParamType2 value) - { - _param_2 = value; - } - - ParamType1 GetFirstParam() - { - return _param_1; - } - - ParamType2 GetSecondParam() - { - return _param_2; - } - - //! Resets the stage of the callback chain - void ResetStage() - { - if (!chain) - return; - - _stage = 0; - } - - //! Advances the callback chain to the next stage, so upper level code can act on its results accordingly - void NextStage() - { - if (!chain) - return; - - ++_stage; - } - - //! Returns the callback stage (or CALLBACK_STAGE_INVALID if invalid) - uint8 GetStage() - { - return _stage; - } - - //! Resets all underlying variables (param, result and stage) - void Reset() - { - SetFirstParam(NULL); - SetSecondParam(NULL); - FreeResult(); - ResetStage(); - } - - private: - std::future<Result> _result; - ParamType1 _param_1; - ParamType2 _param_2; - uint8 _stage; - - QueryCallback_2(QueryCallback_2 const& right) = delete; - QueryCallback_2& operator=(QueryCallback_2 const& right) = delete; -}; - -#endif diff --git a/src/common/Threading/LockedQueue.h b/src/common/Threading/LockedQueue.h index c6faaaf81ca..21a29d7e53b 100644 --- a/src/common/Threading/LockedQueue.h +++ b/src/common/Threading/LockedQueue.h @@ -57,6 +57,14 @@ public: unlock(); } + //! Adds items back to front of the queue + template<class Iterator> + void readd(Iterator begin, Iterator end) + { + std::lock_guard<std::mutex> lock(_lock); + _queue.insert(_queue.begin(), begin, end); + } + //! Gets the next result in the queue, if any. bool next(T& result) { diff --git a/src/common/Threading/MPSCQueue.h b/src/common/Threading/MPSCQueue.h new file mode 100644 index 00000000000..09648b844be --- /dev/null +++ b/src/common/Threading/MPSCQueue.h @@ -0,0 +1,83 @@ +/* + * Copyright (C) 2008-2016 TrinityCore <http://www.trinitycore.org/> + * + * This program is free software; you can redistribute it and/or modify it + * under the terms of the GNU General Public License as published by the + * Free Software Foundation; either version 2 of the License, or (at your + * option) any later version. + * + * This program is distributed in the hope that it will be useful, but WITHOUT + * ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or + * FITNESS FOR A PARTICULAR PURPOSE. See the GNU General Public License for + * more details. + * + * You should have received a copy of the GNU General Public License along + * with this program. If not, see <http://www.gnu.org/licenses/>. + */ + +#ifndef MPSCQueue_h__ +#define MPSCQueue_h__ + +#include <atomic> +#include <utility> + +// C++ implementation of Dmitry Vyukov's lock free MPSC queue +// http://www.1024cores.net/home/lock-free-algorithms/queues/non-intrusive-mpsc-node-based-queue +template<typename T> +class MPSCQueue +{ +public: + MPSCQueue() : _head(new Node()), _tail(_head.load(std::memory_order_relaxed)) + { + Node* front = _head.load(std::memory_order_relaxed); + front->Next.store(nullptr, std::memory_order_relaxed); + } + + ~MPSCQueue() + { + T* output; + while (this->Dequeue(output)) + ; + + Node* front = _head.load(std::memory_order_relaxed); + delete front; + } + + void Enqueue(T* input) + { + Node* node = new Node(input); + Node* prevHead = _head.exchange(node, std::memory_order_acq_rel); + prevHead->Next.store(node, std::memory_order_release); + } + + bool Dequeue(T*& result) + { + Node* tail = _tail.load(std::memory_order_relaxed); + Node* next = tail->Next.load(std::memory_order_acquire); + if (!next) + return false; + + result = next->Data; + _tail.store(next, std::memory_order_release); + delete tail; + return true; + } + +private: + struct Node + { + Node() = default; + explicit Node(T* data) : Data(data) { Next.store(nullptr, std::memory_order_relaxed); } + + T* Data; + std::atomic<Node*> Next; + }; + + std::atomic<Node*> _head; + std::atomic<Node*> _tail; + + MPSCQueue(MPSCQueue const&) = delete; + MPSCQueue& operator=(MPSCQueue const&) = delete; +}; + +#endif // MPSCQueue_h__ |
