diff options
Diffstat (limited to 'src/common/Threading')
-rw-r--r-- | src/common/Threading/Callback.h | 209 | ||||
-rw-r--r-- | src/common/Threading/LockedQueue.h | 144 | ||||
-rw-r--r-- | src/common/Threading/ProcessPriority.h | 105 | ||||
-rw-r--r-- | src/common/Threading/ProducerConsumerQueue.h | 112 |
4 files changed, 570 insertions, 0 deletions
diff --git a/src/common/Threading/Callback.h b/src/common/Threading/Callback.h new file mode 100644 index 00000000000..1f4ffc97cfc --- /dev/null +++ b/src/common/Threading/Callback.h @@ -0,0 +1,209 @@ +/* + * Copyright (C) 2008-2015 TrinityCore <http://www.trinitycore.org/> + * + * This program is free software; you can redistribute it and/or modify it + * under the terms of the GNU General Public License as published by the + * Free Software Foundation; either version 2 of the License, or (at your + * option) any later version. + * + * This program is distributed in the hope that it will be useful, but WITHOUT + * ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or + * FITNESS FOR A PARTICULAR PURPOSE. See the GNU General Public License for + * more details. + * + * You should have received a copy of the GNU General Public License along + * with this program. If not, see <http://www.gnu.org/licenses/>. + */ + +#ifndef _CALLBACK_H +#define _CALLBACK_H + +#include <future> +#include "QueryResult.h" + +typedef std::future<QueryResult> QueryResultFuture; +typedef std::promise<QueryResult> QueryResultPromise; +typedef std::future<PreparedQueryResult> PreparedQueryResultFuture; +typedef std::promise<PreparedQueryResult> PreparedQueryResultPromise; + +#define CALLBACK_STAGE_INVALID uint8(-1) + +template <typename Result, typename ParamType, bool chain = false> +class QueryCallback +{ + public: + QueryCallback() : _param(), _stage(chain ? 0 : CALLBACK_STAGE_INVALID) { } + + //! The parameter of this function should be a resultset returned from either .AsyncQuery or .AsyncPQuery + void SetFutureResult(std::future<Result> value) + { + _result = std::move(value); + } + + std::future<Result>& GetFutureResult() + { + return _result; + } + + int IsReady() + { + return _result.valid() && _result.wait_for(std::chrono::seconds(0)) == std::future_status::ready; + } + + void GetResult(Result& res) + { + res = _result.get(); + } + + void FreeResult() + { + // Nothing to do here, the constructor of std::future will take care of the cleanup + } + + void SetParam(ParamType value) + { + _param = value; + } + + ParamType GetParam() + { + return _param; + } + + //! Resets the stage of the callback chain + void ResetStage() + { + if (!chain) + return; + + _stage = 0; + } + + //! Advances the callback chain to the next stage, so upper level code can act on its results accordingly + void NextStage() + { + if (!chain) + return; + + ++_stage; + } + + //! Returns the callback stage (or CALLBACK_STAGE_INVALID if invalid) + uint8 GetStage() + { + return _stage; + } + + //! Resets all underlying variables (param, result and stage) + void Reset() + { + SetParam(ParamType()); + FreeResult(); + ResetStage(); + } + + private: + std::future<Result> _result; + ParamType _param; + uint8 _stage; + + QueryCallback(QueryCallback const& right) = delete; + QueryCallback& operator=(QueryCallback const& right) = delete; +}; + +template <typename Result, typename ParamType1, typename ParamType2, bool chain = false> +class QueryCallback_2 +{ + public: + QueryCallback_2() : _stage(chain ? 0 : CALLBACK_STAGE_INVALID) { } + + //! The parameter of this function should be a resultset returned from either .AsyncQuery or .AsyncPQuery + void SetFutureResult(std::future<Result> value) + { + _result = std::move(value); + } + + std::future<Result>& GetFutureResult() + { + return _result; + } + + int IsReady() + { + return _result.valid() && _result.wait_for(std::chrono::seconds(0)) == std::future_status::ready; + } + + void GetResult(Result& res) + { + res = _result.get(); + } + + void FreeResult() + { + // Nothing to do here, the constructor of std::future will take care of the cleanup + } + + void SetFirstParam(ParamType1 value) + { + _param_1 = value; + } + + void SetSecondParam(ParamType2 value) + { + _param_2 = value; + } + + ParamType1 GetFirstParam() + { + return _param_1; + } + + ParamType2 GetSecondParam() + { + return _param_2; + } + + //! Resets the stage of the callback chain + void ResetStage() + { + if (!chain) + return; + + _stage = 0; + } + + //! Advances the callback chain to the next stage, so upper level code can act on its results accordingly + void NextStage() + { + if (!chain) + return; + + ++_stage; + } + + //! Returns the callback stage (or CALLBACK_STAGE_INVALID if invalid) + uint8 GetStage() + { + return _stage; + } + + //! Resets all underlying variables (param, result and stage) + void Reset() + { + SetFirstParam(NULL); + SetSecondParam(NULL); + FreeResult(); + ResetStage(); + } + + private: + std::future<Result> _result; + ParamType1 _param_1; + ParamType2 _param_2; + uint8 _stage; + + QueryCallback_2(QueryCallback_2 const& right) = delete; + QueryCallback_2& operator=(QueryCallback_2 const& right) = delete; +}; + +#endif diff --git a/src/common/Threading/LockedQueue.h b/src/common/Threading/LockedQueue.h new file mode 100644 index 00000000000..3abb0f4b8bc --- /dev/null +++ b/src/common/Threading/LockedQueue.h @@ -0,0 +1,144 @@ +/* + * Copyright (C) 2008-2015 TrinityCore <http://www.trinitycore.org/> + * Copyright (C) 2005-2008 MaNGOS <http://getmangos.com/> + * + * This program is free software; you can redistribute it and/or modify it + * under the terms of the GNU General Public License as published by the + * Free Software Foundation; either version 2 of the License, or (at your + * option) any later version. + * + * This program is distributed in the hope that it will be useful, but WITHOUT + * ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or + * FITNESS FOR A PARTICULAR PURPOSE. See the GNU General Public License for + * more details. + * + * You should have received a copy of the GNU General Public License along + * with this program. If not, see <http://www.gnu.org/licenses/>. + */ + +#ifndef LOCKEDQUEUE_H +#define LOCKEDQUEUE_H + +#include <deque> +#include <mutex> + +template <class T, typename StorageType = std::deque<T> > +class LockedQueue +{ + //! Lock access to the queue. + std::mutex _lock; + + //! Storage backing the queue. + StorageType _queue; + + //! Cancellation flag. + volatile bool _canceled; + +public: + + //! Create a LockedQueue. + LockedQueue() + : _canceled(false) + { + } + + //! Destroy a LockedQueue. + virtual ~LockedQueue() + { + } + + //! Adds an item to the queue. + void add(const T& item) + { + lock(); + + _queue.push_back(item); + + unlock(); + } + + //! Gets the next result in the queue, if any. + bool next(T& result) + { + std::lock_guard<std::mutex> lock(_lock); + + if (_queue.empty()) + return false; + + result = _queue.front(); + _queue.pop_front(); + + return true; + } + + template<class Checker> + bool next(T& result, Checker& check) + { + std::lock_guard<std::mutex> lock(_lock); + + if (_queue.empty()) + return false; + + result = _queue.front(); + if (!check.Process(result)) + return false; + + _queue.pop_front(); + return true; + } + + //! Peeks at the top of the queue. Check if the queue is empty before calling! Remember to unlock after use if autoUnlock == false. + T& peek(bool autoUnlock = false) + { + lock(); + + T& result = _queue.front(); + + if (autoUnlock) + unlock(); + + return result; + } + + //! Cancels the queue. + void cancel() + { + std::lock_guard<std::mutex> lock(_lock); + + _canceled = true; + } + + //! Checks if the queue is cancelled. + bool cancelled() + { + std::lock_guard<std::mutex> lock(_lock); + return _canceled; + } + + //! Locks the queue for access. + void lock() + { + this->_lock.lock(); + } + + //! Unlocks the queue. + void unlock() + { + this->_lock.unlock(); + } + + ///! Calls pop_front of the queue + void pop_front() + { + std::lock_guard<std::mutex> lock(_lock); + _queue.pop_front(); + } + + ///! Checks if we're empty or not with locks held + bool empty() + { + std::lock_guard<std::mutex> lock(_lock); + return _queue.empty(); + } +}; +#endif diff --git a/src/common/Threading/ProcessPriority.h b/src/common/Threading/ProcessPriority.h new file mode 100644 index 00000000000..2a8501a0249 --- /dev/null +++ b/src/common/Threading/ProcessPriority.h @@ -0,0 +1,105 @@ +/* +* Copyright (C) 2008-2015 TrinityCore <http://www.trinitycore.org/> +* +* This program is free software; you can redistribute it and/or modify it +* under the terms of the GNU General Public License as published by the +* Free Software Foundation; either version 2 of the License, or (at your +* option) any later version. +* +* This program is distributed in the hope that it will be useful, but WITHOUT +* ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or +* FITNESS FOR A PARTICULAR PURPOSE. See the GNU General Public License for +* more details. +* +* You should have received a copy of the GNU General Public License along +* with this program. If not, see <http://www.gnu.org/licenses/>. +*/ + +#ifndef _PROCESSPRIO_H +#define _PROCESSPRIO_H + +#include "Configuration/Config.h" + +#ifdef __linux__ +#include <sched.h> +#include <sys/resource.h> +#define PROCESS_HIGH_PRIORITY -15 // [-20, 19], default is 0 +#endif + +void SetProcessPriority(const std::string& logChannel) +{ +// Suppresses Mac OS X Warning since logChannel isn't used. +#if PLATFORM_APPLE + (void)logChannel; +#endif + +#if defined(_WIN32) || defined(__linux__) + + ///- Handle affinity for multiple processors and process priority + uint32 affinity = sConfigMgr->GetIntDefault("UseProcessors", 0); + bool highPriority = sConfigMgr->GetBoolDefault("ProcessPriority", false); + +#ifdef _WIN32 // Windows + + HANDLE hProcess = GetCurrentProcess(); + if (affinity > 0) + { + ULONG_PTR appAff; + ULONG_PTR sysAff; + + if (GetProcessAffinityMask(hProcess, &appAff, &sysAff)) + { + // remove non accessible processors + ULONG_PTR currentAffinity = affinity & appAff; + + if (!currentAffinity) + TC_LOG_ERROR(logChannel, "Processors marked in UseProcessors bitmask (hex) %x are not accessible. Accessible processors bitmask (hex): %x", affinity, appAff); + else if (SetProcessAffinityMask(hProcess, currentAffinity)) + TC_LOG_INFO(logChannel, "Using processors (bitmask, hex): %x", currentAffinity); + else + TC_LOG_ERROR(logChannel, "Can't set used processors (hex): %x", currentAffinity); + } + } + + if (highPriority) + { + if (SetPriorityClass(hProcess, HIGH_PRIORITY_CLASS)) + TC_LOG_INFO(logChannel, "Process priority class set to HIGH"); + else + TC_LOG_ERROR(logChannel, "Can't set process priority class."); + } + +#else // Linux + + if (affinity > 0) + { + cpu_set_t mask; + CPU_ZERO(&mask); + + for (unsigned int i = 0; i < sizeof(affinity) * 8; ++i) + if (affinity & (1 << i)) + CPU_SET(i, &mask); + + if (sched_setaffinity(0, sizeof(mask), &mask)) + TC_LOG_ERROR(logChannel, "Can't set used processors (hex): %x, error: %s", affinity, strerror(errno)); + else + { + CPU_ZERO(&mask); + sched_getaffinity(0, sizeof(mask), &mask); + TC_LOG_INFO(logChannel, "Using processors (bitmask, hex): %lx", *(__cpu_mask*)(&mask)); + } + } + + if (highPriority) + { + if (setpriority(PRIO_PROCESS, 0, PROCESS_HIGH_PRIORITY)) + TC_LOG_ERROR(logChannel, "Can't set process priority class, error: %s", strerror(errno)); + else + TC_LOG_INFO(logChannel, "Process priority class set to %i", getpriority(PRIO_PROCESS, 0)); + } + +#endif +#endif +} + +#endif diff --git a/src/common/Threading/ProducerConsumerQueue.h b/src/common/Threading/ProducerConsumerQueue.h new file mode 100644 index 00000000000..96546960393 --- /dev/null +++ b/src/common/Threading/ProducerConsumerQueue.h @@ -0,0 +1,112 @@ +/* +* Copyright (C) 2008-2015 TrinityCore <http://www.trinitycore.org/> +* +* This program is free software; you can redistribute it and/or modify it +* under the terms of the GNU General Public License as published by the +* Free Software Foundation; either version 2 of the License, or (at your +* option) any later version. +* +* This program is distributed in the hope that it will be useful, but WITHOUT +* ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or +* FITNESS FOR A PARTICULAR PURPOSE. See the GNU General Public License for +* more details. +* +* You should have received a copy of the GNU General Public License along +* with this program. If not, see <http://www.gnu.org/licenses/>. +*/ + +#ifndef _PCQ_H +#define _PCQ_H + +#include <condition_variable> +#include <mutex> +#include <queue> +#include <atomic> +#include <type_traits> + +template <typename T> +class ProducerConsumerQueue +{ +private: + std::mutex _queueLock; + std::queue<T> _queue; + std::condition_variable _condition; + std::atomic<bool> _shutdown; + +public: + + ProducerConsumerQueue<T>() : _shutdown(false) { } + + void Push(const T& value) + { + std::lock_guard<std::mutex> lock(_queueLock); + _queue.push(std::move(value)); + + _condition.notify_one(); + } + + bool Empty() + { + std::lock_guard<std::mutex> lock(_queueLock); + + return _queue.empty(); + } + + bool Pop(T& value) + { + std::lock_guard<std::mutex> lock(_queueLock); + + if (_queue.empty() || _shutdown) + return false; + + value = _queue.front(); + + _queue.pop(); + + return true; + } + + void WaitAndPop(T& value) + { + std::unique_lock<std::mutex> lock(_queueLock); + + // we could be using .wait(lock, predicate) overload here but it is broken + // https://connect.microsoft.com/VisualStudio/feedback/details/1098841 + while (_queue.empty() && !_shutdown) + _condition.wait(lock); + + if (_queue.empty() || _shutdown) + return; + + value = _queue.front(); + + _queue.pop(); + } + + void Cancel() + { + std::unique_lock<std::mutex> lock(_queueLock); + + while (!_queue.empty()) + { + T& value = _queue.front(); + + DeleteQueuedObject(value); + + _queue.pop(); + } + + _shutdown = true; + + _condition.notify_all(); + } + +private: + template<typename E = T> + typename std::enable_if<std::is_pointer<E>::value>::type DeleteQueuedObject(E& obj) { delete obj; } + + template<typename E = T> + typename std::enable_if<!std::is_pointer<E>::value>::type DeleteQueuedObject(E const& /*packet*/) { } +}; + +#endif |