diff options
| author | Yehonal <yehonal.azeroth@gmail.com> | 2016-08-11 20:25:27 +0200 |
|---|---|---|
| committer | Yehonal <yehonal.azeroth@gmail.com> | 2016-08-11 20:25:27 +0200 |
| commit | 0f85ce1c54810148a5bfcfcedf64a573f4e6e482 (patch) | |
| tree | f2483faec39ed958bdd935dba9c71af35c71e8b8 /src/server/shared/Threading | |
| parent | c62a72c0a8fc84a1c0af34f483e22a2e821640f8 (diff) | |
Big re-organization of repository [W.I.P]
Diffstat (limited to 'src/server/shared/Threading')
| -rw-r--r-- | src/server/shared/Threading/Callback.h | 310 | ||||
| -rw-r--r-- | src/server/shared/Threading/DelayExecutor.cpp | 122 | ||||
| -rw-r--r-- | src/server/shared/Threading/DelayExecutor.h | 38 | ||||
| -rw-r--r-- | src/server/shared/Threading/LockedQueue.h | 158 | ||||
| -rw-r--r-- | src/server/shared/Threading/Threading.cpp | 235 | ||||
| -rw-r--r-- | src/server/shared/Threading/Threading.h | 110 |
6 files changed, 0 insertions, 973 deletions
diff --git a/src/server/shared/Threading/Callback.h b/src/server/shared/Threading/Callback.h deleted file mode 100644 index 0b1e7ee86c..0000000000 --- a/src/server/shared/Threading/Callback.h +++ /dev/null @@ -1,310 +0,0 @@ -/* - * Copyright (C) - * - * This program is free software; you can redistribute it and/or modify it - * under the terms of the GNU General Public License as published by the - * Free Software Foundation; either version 2 of the License, or (at your - * option) any later version. - * - * This program is distributed in the hope that it will be useful, but WITHOUT - * ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or - * FITNESS FOR A PARTICULAR PURPOSE. See the GNU General Public License for - * more details. - * - * You should have received a copy of the GNU General Public License along - * with this program. If not, see <http://www.gnu.org/licenses/>. - */ - -#ifndef _CALLBACK_H -#define _CALLBACK_H - -#include <ace/Future.h> -#include <ace/Future_Set.h> -#include "QueryResult.h" - -typedef ACE_Future<QueryResult> QueryResultFuture; -typedef ACE_Future<PreparedQueryResult> PreparedQueryResultFuture; - -/*! A simple template using ACE_Future to manage callbacks from the thread and object that - issued the request. <ParamType> is variable type of parameter that is used as parameter - for the callback function. -*/ -#define CALLBACK_STAGE_INVALID uint8(-1) - -template <typename Result, typename ParamType, bool chain = false> -class QueryCallback -{ - public: - QueryCallback() : _param(), _stage(chain ? 0 : CALLBACK_STAGE_INVALID) {} - - //! The parameter of this function should be a resultset returned from either .AsyncQuery or .AsyncPQuery - void SetFutureResult(ACE_Future<Result> value) - { - _result = value; - } - - ACE_Future<Result> GetFutureResult() - { - return _result; - } - - int IsReady() - { - return _result.ready(); - } - - void GetResult(Result& res) - { - _result.get(res); - } - - void FreeResult() - { - _result.cancel(); - } - - void SetParam(ParamType value) - { - _param = value; - } - - ParamType GetParam() - { - return _param; - } - - //! Resets the stage of the callback chain - void ResetStage() - { - if (!chain) - return; - - _stage = 0; - } - - //! Advances the callback chain to the next stage, so upper level code can act on its results accordingly - void NextStage() - { - if (!chain) - return; - - ++_stage; - } - - //! Returns the callback stage (or CALLBACK_STAGE_INVALID if invalid) - uint8 GetStage() - { - return _stage; - } - - //! Resets all underlying variables (param, result and stage) - void Reset() - { - SetParam(NULL); - FreeResult(); - ResetStage(); - } - - private: - ACE_Future<Result> _result; - ParamType _param; - uint8 _stage; -}; - -template <typename Result, typename ParamType1, typename ParamType2, bool chain = false> -class QueryCallback_2 -{ - public: - QueryCallback_2() : _stage(chain ? 0 : CALLBACK_STAGE_INVALID) {} - - //! The parameter of this function should be a resultset returned from either .AsyncQuery or .AsyncPQuery - void SetFutureResult(ACE_Future<Result> value) - { - _result = value; - } - - ACE_Future<Result> GetFutureResult() - { - return _result; - } - - int IsReady() - { - return _result.ready(); - } - - void GetResult(Result& res) - { - _result.get(res); - } - - void FreeResult() - { - _result.cancel(); - } - - void SetFirstParam(ParamType1 value) - { - _param_1 = value; - } - - void SetSecondParam(ParamType2 value) - { - _param_2 = value; - } - - ParamType1 GetFirstParam() - { - return _param_1; - } - - ParamType2 GetSecondParam() - { - return _param_2; - } - - //! Resets the stage of the callback chain - void ResetStage() - { - if (!chain) - return; - - _stage = 0; - } - - //! Advances the callback chain to the next stage, so upper level code can act on its results accordingly - void NextStage() - { - if (!chain) - return; - - ++_stage; - } - - //! Returns the callback stage (or CALLBACK_STAGE_INVALID if invalid) - uint8 GetStage() - { - return _stage; - } - - //! Resets all underlying variables (param, result and stage) - void Reset() - { - SetFirstParam(0); - SetSecondParam(NULL); - FreeResult(); - ResetStage(); - } - - private: - ACE_Future<Result> _result; - ParamType1 _param_1; - ParamType2 _param_2; - uint8 _stage; -}; - -template <typename Result, typename ParamType1, typename ParamType2, typename ParamType3, bool chain = false> -class QueryCallback_3 -{ - public: - QueryCallback_3() : _stage(chain ? 0 : CALLBACK_STAGE_INVALID) {} - - //! The parameter of this function should be a resultset returned from either .AsyncQuery or .AsyncPQuery - void SetFutureResult(ACE_Future<Result> value) - { - _result = value; - } - - ACE_Future<Result> GetFutureResult() - { - return _result; - } - - int IsReady() - { - return _result.ready(); - } - - void GetResult(Result& res) - { - _result.get(res); - } - - void FreeResult() - { - _result.cancel(); - } - - void SetFirstParam(ParamType1 value) - { - _param_1 = value; - } - - void SetSecondParam(ParamType2 value) - { - _param_2 = value; - } - - void SetThirdParam(ParamType3 value) - { - _param_3 = value; - } - - ParamType1 GetFirstParam() - { - return _param_1; - } - - ParamType2 GetSecondParam() - { - return _param_2; - } - - ParamType3 GetThirdParam() - { - return _param_3; - } - - //! Resets the stage of the callback chain - void ResetStage() - { - if (!chain) - return; - - _stage = 0; - } - - //! Advances the callback chain to the next stage, so upper level code can act on its results accordingly - void NextStage() - { - if (!chain) - return; - - ++_stage; - } - - //! Returns the callback stage (or CALLBACK_STAGE_INVALID if invalid) - uint8 GetStage() - { - return _stage; - } - - //! Resets all underlying variables (param, result and stage) - void Reset() - { - SetFirstParam(NULL); - SetSecondParam(NULL); - SetThirdParam(NULL); - FreeResult(); - ResetStage(); - } - - private: - ACE_Future<Result> _result; - ParamType1 _param_1; - ParamType2 _param_2; - ParamType3 _param_3; - uint8 _stage; -}; - -#endif
\ No newline at end of file diff --git a/src/server/shared/Threading/DelayExecutor.cpp b/src/server/shared/Threading/DelayExecutor.cpp deleted file mode 100644 index b64f8d65ab..0000000000 --- a/src/server/shared/Threading/DelayExecutor.cpp +++ /dev/null @@ -1,122 +0,0 @@ -#include <ace/Singleton.h> -#include <ace/Thread_Mutex.h> -#include <ace/Log_Msg.h> -#include "Threading.h" - -#include "DelayExecutor.h" - -DelayExecutor* DelayExecutor::instance() -{ - return ACE_Singleton<DelayExecutor, ACE_Thread_Mutex>::instance(); -} - -DelayExecutor::DelayExecutor() - : pre_svc_hook_(0), post_svc_hook_(0), activated_(false), mqueue_(1*1024*1024, 1*1024*1024), queue_(&mqueue_) -{ -} - -DelayExecutor::~DelayExecutor() -{ - if (pre_svc_hook_) - delete pre_svc_hook_; - - if (post_svc_hook_) - delete post_svc_hook_; - - deactivate(); -} - -int DelayExecutor::deactivate() -{ - if (!activated()) - return -1; - - activated(false); - queue_.queue()->deactivate(); - wait(); - - return 0; -} - -int DelayExecutor::svc() -{ - if (pre_svc_hook_) - pre_svc_hook_->call(); - - for (;;) - { - ACE_Method_Request* rq = queue_.dequeue(); - - if (!rq) - break; - - rq->call(); - delete rq; - } - - if (post_svc_hook_) - post_svc_hook_->call(); - - return 0; -} - -int DelayExecutor::start(int num_threads, ACE_Method_Request* pre_svc_hook, ACE_Method_Request* post_svc_hook) -{ - if (activated()) - return -1; - - if (num_threads < 1) - return -1; - - if (pre_svc_hook_) - delete pre_svc_hook_; - - if (post_svc_hook_) - delete post_svc_hook_; - - pre_svc_hook_ = pre_svc_hook; - post_svc_hook_ = post_svc_hook; - - queue_.queue()->activate(); - - // pussywizard: - ACE_Based::ThreadPriority tp; - int _priority = tp.getPriority(ACE_Based::Highest); - if (ACE_Task_Base::activate(THR_NEW_LWP | THR_JOINABLE, num_threads, 0, _priority) == -1) - return -1; - - //if (ACE_Task_Base::activate(THR_NEW_LWP | THR_JOINABLE | THR_INHERIT_SCHED, num_threads) == -1) - // return -1; - - activated(true); - - return true; -} - -int DelayExecutor::execute(ACE_Method_Request* new_req) -{ - if (new_req == NULL) - return -1; - - // pussywizard: NULL as param for enqueue - wait until the action is possible! - // new tasks are added to the queue during map update (schedule_update in MapInstanced::Update) - // the queue can be momentarily blocked by map threads constantly waiting for tasks (for (;;) { queue_.dequeue();... } in DelayExecutor::svc()) - // so just wait a moment, don't drop the task xDddd - if (queue_.enqueue(new_req, /*(ACE_Time_Value*)&ACE_Time_Value::zero*/ NULL) == -1) - { - delete new_req; - ACE_ERROR_RETURN((LM_ERROR, ACE_TEXT("(%t) %p\n"), ACE_TEXT("DelayExecutor::execute enqueue")), -1); - } - - return 0; -} - -bool DelayExecutor::activated() -{ - return activated_; -} - -void DelayExecutor::activated(bool s) -{ - activated_ = s; -} diff --git a/src/server/shared/Threading/DelayExecutor.h b/src/server/shared/Threading/DelayExecutor.h deleted file mode 100644 index 4e588cae5e..0000000000 --- a/src/server/shared/Threading/DelayExecutor.h +++ /dev/null @@ -1,38 +0,0 @@ -#ifndef _M_DELAY_EXECUTOR_H -#define _M_DELAY_EXECUTOR_H - -#include <ace/Task.h> -#include <ace/Activation_Queue.h> -#include <ace/Method_Request.h> - -class DelayExecutor : protected ACE_Task_Base -{ - public: - - DelayExecutor(); - virtual ~DelayExecutor(); - - static DelayExecutor* instance(); - - int execute(ACE_Method_Request* new_req); - - int start(int num_threads = 1, ACE_Method_Request* pre_svc_hook = NULL, ACE_Method_Request* post_svc_hook = NULL); - - int deactivate(); - - bool activated(); - - virtual int svc(); - - private: - - ACE_Message_Queue<ACE_SYNCH> mqueue_; - ACE_Activation_Queue queue_; - ACE_Method_Request* pre_svc_hook_; - ACE_Method_Request* post_svc_hook_; - bool activated_; - - void activated(bool s); -}; - -#endif // _M_DELAY_EXECUTOR_H diff --git a/src/server/shared/Threading/LockedQueue.h b/src/server/shared/Threading/LockedQueue.h deleted file mode 100644 index 31d81cdb10..0000000000 --- a/src/server/shared/Threading/LockedQueue.h +++ /dev/null @@ -1,158 +0,0 @@ -/* - * Copyright (C) - * Copyright (C) - * - * This program is free software; you can redistribute it and/or modify it - * under the terms of the GNU General Public License as published by the - * Free Software Foundation; either version 2 of the License, or (at your - * option) any later version. - * - * This program is distributed in the hope that it will be useful, but WITHOUT - * ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or - * FITNESS FOR A PARTICULAR PURPOSE. See the GNU General Public License for - * more details. - * - * You should have received a copy of the GNU General Public License along - * with this program. If not, see <http://www.gnu.org/licenses/>. - */ - -#ifndef LOCKEDQUEUE_H -#define LOCKEDQUEUE_H - -#include <ace/Guard_T.h> -#include <ace/Thread_Mutex.h> -#include <deque> -#include <assert.h> -#include "Debugging/Errors.h" - -namespace ACE_Based -{ - template <class T, class LockType, typename StorageType=std::deque<T> > - class LockedQueue - { - //! Lock access to the queue. - LockType _lock; - - //! Storage backing the queue. - StorageType _queue; - - //! Cancellation flag. - volatile bool _canceled; - - public: - - //! Create a LockedQueue. - LockedQueue() - : _canceled(false) - { - } - - //! Destroy a LockedQueue. - virtual ~LockedQueue() - { - } - - //! Adds an item to the queue. - void add(const T& item) - { - lock(); - - //ASSERT(!this->_canceled); - // throw Cancellation_Exception(); - - _queue.push_back(item); - - unlock(); - } - - //! Gets the next result in the queue, if any. - bool next(T& result) - { - // ACE_Guard<LockType> g(this->_lock); - ACE_GUARD_RETURN (LockType, g, this->_lock, false); - - if (_queue.empty()) - return false; - - //ASSERT (!_queue.empty() || !this->_canceled); - // throw Cancellation_Exception(); - result = _queue.front(); - _queue.pop_front(); - - return true; - } - - template<class Checker> - bool next(T& result, Checker& check) - { - ACE_Guard<LockType> g(this->_lock); - - if (_queue.empty()) - return false; - - result = _queue.front(); - if (!check.Process(result)) - return false; - - _queue.pop_front(); - return true; - } - - //! Peeks at the top of the queue. Check if the queue is empty before calling! Remember to unlock after use if autoUnlock == false. - T& peek(bool autoUnlock = false) - { - lock(); - - T& result = _queue.front(); - - if (autoUnlock) - unlock(); - - return result; - } - - //! Cancels the queue. - void cancel() - { - lock(); - - _canceled = true; - - unlock(); - } - - //! Checks if the queue is cancelled. - bool cancelled() - { - ACE_Guard<LockType> g(this->_lock); - return _canceled; - } - - //! Locks the queue for access. - void lock() - { - this->_lock.acquire(); - } - - //! Unlocks the queue. - void unlock() - { - this->_lock.release(); - } - - ///! Calls pop_front of the queue - void pop_front() - { - ACE_GUARD (LockType, g, this->_lock); - _queue.pop_front(); - } - - ///! Checks if we're empty or not with locks held - bool empty() - { - ACE_GUARD_RETURN (LockType, g, this->_lock, false); - return _queue.empty(); - } - }; -} -#endif diff --git a/src/server/shared/Threading/Threading.cpp b/src/server/shared/Threading/Threading.cpp deleted file mode 100644 index 023939fad6..0000000000 --- a/src/server/shared/Threading/Threading.cpp +++ /dev/null @@ -1,235 +0,0 @@ -/* - * Copyright (C) - * Copyright (C) - * - * This program is free software; you can redistribute it and/or modify it - * under the terms of the GNU General Public License as published by the - * Free Software Foundation; either version 2 of the License, or (at your - * option) any later version. - * - * This program is distributed in the hope that it will be useful, but WITHOUT - * ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or - * FITNESS FOR A PARTICULAR PURPOSE. See the GNU General Public License for - * more details. - * - * You should have received a copy of the GNU General Public License along - * with this program. If not, see <http://www.gnu.org/licenses/>. - */ - -#include "Threading.h" -#include "Errors.h" -#include <ace/OS_NS_unistd.h> -#include <ace/Sched_Params.h> -#include <vector> - -using namespace ACE_Based; - -ThreadPriority::ThreadPriority() -{ - for (int i = Idle; i < MAXPRIORITYNUM; ++i) - m_priority[i] = ACE_THR_PRI_OTHER_DEF; - - m_priority[Idle] = ACE_Sched_Params::priority_min(ACE_SCHED_OTHER); - m_priority[Realtime] = ACE_Sched_Params::priority_max(ACE_SCHED_OTHER); - - std::vector<int> _tmp; - - ACE_Sched_Params::Policy _policy = ACE_SCHED_OTHER; - ACE_Sched_Priority_Iterator pr_iter(_policy); - - while (pr_iter.more()) - { - _tmp.push_back(pr_iter.priority()); - pr_iter.next(); - } - - ASSERT (!_tmp.empty()); - - if (_tmp.size() >= MAXPRIORITYNUM) - { - const size_t max_pos = _tmp.size(); - size_t min_pos = 1; - size_t norm_pos = 0; - for (size_t i = 0; i < max_pos; ++i) - { - if (_tmp[i] == ACE_THR_PRI_OTHER_DEF) - { - norm_pos = i + 1; - break; - } - } - - // since we have only 7(seven) values in enum Priority - // and 3 we know already (Idle, Normal, Realtime) so - // we need to split each list [Idle...Normal] and [Normal...Realtime] - // into pieces - const size_t _divider = 4; - size_t _div = (norm_pos - min_pos) / _divider; - if (_div == 0) - _div = 1; - - min_pos = (norm_pos - 1); - - m_priority[Low] = _tmp[min_pos -= _div]; - m_priority[Lowest] = _tmp[min_pos -= _div ]; - - _div = (max_pos - norm_pos) / _divider; - if (_div == 0) - _div = 1; - - min_pos = norm_pos - 1; - - m_priority[High] = _tmp[min_pos += _div]; - m_priority[Highest] = _tmp[min_pos += _div]; - } -} - -int ThreadPriority::getPriority(Priority p) const -{ - if (p < Idle) - p = Idle; - - if (p > Realtime) - p = Realtime; - - return m_priority[p]; -} - -#define THREADFLAG (THR_NEW_LWP | THR_SCHED_DEFAULT| THR_JOINABLE) - -Thread::Thread(): m_iThreadId(0), m_hThreadHandle(0), m_task(0) -{ - -} - -Thread::Thread(Runnable* instance): m_iThreadId(0), m_hThreadHandle(0), m_task(instance) -{ - // register reference to m_task to prevent it deeltion until destructor - if (m_task) - m_task->incReference(); - - bool _start = start(); - ASSERT (_start); -} - -Thread::~Thread() -{ - //Wait(); - - // deleted runnable object (if no other references) - if (m_task) - m_task->decReference(); -} - -//initialize Thread's class static member -Thread::ThreadStorage Thread::m_ThreadStorage; -ThreadPriority Thread::m_TpEnum; - -bool Thread::start() -{ - if (m_task == 0 || m_iThreadId != 0) - return false; - - // incRef before spawing the thread, otherwise Thread::ThreadTask() might call decRef and delete m_task - m_task->incReference(); - - bool res = (ACE_Thread::spawn(&Thread::ThreadTask, (void*)m_task, THREADFLAG, &m_iThreadId, &m_hThreadHandle) == 0); - - if (!res) - m_task->decReference(); - - return res; -} - -bool Thread::wait() -{ - if (!m_hThreadHandle || !m_task) - return false; - - ACE_THR_FUNC_RETURN _value = ACE_THR_FUNC_RETURN(-1); - int _res = ACE_Thread::join(m_hThreadHandle, &_value); - - m_iThreadId = 0; - m_hThreadHandle = 0; - - return (_res == 0); -} - -void Thread::destroy() -{ - if (!m_iThreadId || !m_task) - return; - - if (ACE_Thread::kill(m_iThreadId, -1) != 0) - return; - - m_iThreadId = 0; - m_hThreadHandle = 0; - - // reference set at ACE_Thread::spawn - m_task->decReference(); -} - -void Thread::suspend() -{ - ACE_Thread::suspend(m_hThreadHandle); -} - -void Thread::resume() -{ - ACE_Thread::resume(m_hThreadHandle); -} - -ACE_THR_FUNC_RETURN Thread::ThreadTask(void * param) -{ - Runnable* _task = (Runnable*)param; - _task->run(); - - // task execution complete, free referecne added at - _task->decReference(); - - return (ACE_THR_FUNC_RETURN)0; -} - -ACE_thread_t Thread::currentId() -{ - return ACE_Thread::self(); -} - -ACE_hthread_t Thread::currentHandle() -{ - ACE_hthread_t _handle; - ACE_Thread::self(_handle); - - return _handle; -} - -Thread * Thread::current() -{ - Thread * _thread = m_ThreadStorage.ts_object(); - if (!_thread) - { - _thread = new Thread(); - _thread->m_iThreadId = Thread::currentId(); - _thread->m_hThreadHandle = Thread::currentHandle(); - - Thread * _oldValue = m_ThreadStorage.ts_object(_thread); - if (_oldValue) - delete _oldValue; - } - - return _thread; -} - -void Thread::setPriority(Priority type) -{ - int _priority = m_TpEnum.getPriority(type); - int _ok = ACE_Thread::setprio(m_hThreadHandle, _priority); - //remove this ASSERT in case you don't want to know is thread priority change was successful or not - ASSERT (_ok == 0); -} - -void Thread::Sleep(unsigned long msecs) -{ - ACE_OS::sleep(ACE_Time_Value(0, 1000 * msecs)); -} diff --git a/src/server/shared/Threading/Threading.h b/src/server/shared/Threading/Threading.h deleted file mode 100644 index 8ec585382d..0000000000 --- a/src/server/shared/Threading/Threading.h +++ /dev/null @@ -1,110 +0,0 @@ -/* - * Copyright (C) - * Copyright (C) - * - * This program is free software; you can redistribute it and/or modify it - * under the terms of the GNU General Public License as published by the - * Free Software Foundation; either version 2 of the License, or (at your - * option) any later version. - * - * This program is distributed in the hope that it will be useful, but WITHOUT - * ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or - * FITNESS FOR A PARTICULAR PURPOSE. See the GNU General Public License for - * more details. - * - * You should have received a copy of the GNU General Public License along - * with this program. If not, see <http://www.gnu.org/licenses/>. - */ - -#ifndef THREADING_H -#define THREADING_H - -#include "Common.h" -#include <ace/ACE.h> -#include <ace/Thread.h> -#include <ace/TSS_T.h> -#include <ace/Atomic_Op.h> -#include <assert.h> - -namespace ACE_Based -{ - - class Runnable - { - public: - virtual ~Runnable() { } - virtual void run() = 0; - - void incReference() { ++m_refs; } - void decReference() - { - if (!--m_refs) - delete this; - } - private: - ACE_Atomic_Op<ACE_Thread_Mutex, long> m_refs; - }; - - enum Priority - { - Idle, - Lowest, - Low, - Normal, - High, - Highest, - Realtime - }; - -#define MAXPRIORITYNUM (Realtime + 1) - - class ThreadPriority - { - public: - ThreadPriority(); - int getPriority(Priority p) const; - - private: - int m_priority[MAXPRIORITYNUM]; - }; - - class Thread - { - public: - Thread(); - explicit Thread(Runnable* instance); - ~Thread(); - - bool start(); - bool wait(); - void destroy(); - - void suspend(); - void resume(); - - void setPriority(Priority type); - - static void Sleep(unsigned long msecs); - static ACE_thread_t currentId(); - static ACE_hthread_t currentHandle(); - static Thread * current(); - - private: - Thread(const Thread&); - Thread& operator=(const Thread&); - - static ACE_THR_FUNC_RETURN ThreadTask(void * param); - - ACE_thread_t m_iThreadId; - ACE_hthread_t m_hThreadHandle; - Runnable* m_task; - - typedef ACE_TSS<Thread> ThreadStorage; - //global object - container for Thread class representation of every thread - static ThreadStorage m_ThreadStorage; - //use this object to determine current OS thread priority values mapped to enum Priority{ } - static ThreadPriority m_TpEnum; - }; - -} -#endif |
