diff options
Diffstat (limited to 'src/server/shared/Threading')
| -rw-r--r-- | src/server/shared/Threading/DelayExecutor.cpp | 111 | ||||
| -rw-r--r-- | src/server/shared/Threading/DelayExecutor.h | 37 | ||||
| -rw-r--r-- | src/server/shared/Threading/LockedQueue.h | 139 | ||||
| -rw-r--r-- | src/server/shared/Threading/Threading.cpp | 232 | ||||
| -rw-r--r-- | src/server/shared/Threading/Threading.h | 108 |
5 files changed, 627 insertions, 0 deletions
diff --git a/src/server/shared/Threading/DelayExecutor.cpp b/src/server/shared/Threading/DelayExecutor.cpp new file mode 100644 index 00000000000..9a718823232 --- /dev/null +++ b/src/server/shared/Threading/DelayExecutor.cpp @@ -0,0 +1,111 @@ +#include <ace/Singleton.h> +#include <ace/Thread_Mutex.h> +#include <ace/Log_Msg.h> + +#include "DelayExecutor.h" + +DelayExecutor* DelayExecutor::instance() +{ + return ACE_Singleton<DelayExecutor, ACE_Thread_Mutex>::instance(); +} + +DelayExecutor::DelayExecutor() + : activated_(false), pre_svc_hook_(0), post_svc_hook_(0) +{ +} + +DelayExecutor::~DelayExecutor() +{ + if (pre_svc_hook_) + delete pre_svc_hook_; + + if (post_svc_hook_) + delete post_svc_hook_; + + deactivate(); +} + +int DelayExecutor::deactivate() +{ + if (!activated()) + return -1; + + activated(false); + queue_.queue()->deactivate(); + wait(); + + return 0; +} + +int DelayExecutor::svc() +{ + if (pre_svc_hook_) + pre_svc_hook_->call(); + + for (;;) + { + ACE_Method_Request* rq = queue_.dequeue(); + + if (!rq) + break; + + rq->call(); + delete rq; + } + + if (post_svc_hook_) + post_svc_hook_->call(); + + return 0; +} + +int DelayExecutor::activate(int num_threads, ACE_Method_Request* pre_svc_hook, ACE_Method_Request* post_svc_hook) +{ + if (activated()) + return -1; + + if (num_threads < 1) + return -1; + + if (pre_svc_hook_) + delete pre_svc_hook_; + + if (post_svc_hook_) + delete post_svc_hook_; + + pre_svc_hook_ = pre_svc_hook; + post_svc_hook_ = post_svc_hook; + + queue_.queue()->activate(); + + if (ACE_Task_Base::activate(THR_NEW_LWP | THR_JOINABLE | THR_INHERIT_SCHED, num_threads) == -1) + return -1; + + activated(true); + + return true; +} + +int DelayExecutor::execute(ACE_Method_Request* new_req) +{ + if (new_req == NULL) + return -1; + + if (queue_.enqueue(new_req, (ACE_Time_Value*)&ACE_Time_Value::zero) == -1) + { + delete new_req; + ACE_ERROR_RETURN((LM_ERROR, ACE_TEXT("(%t) %p\n"), ACE_TEXT("DelayExecutor::execute enqueue")), -1); + } + + return 0; +} + +bool DelayExecutor::activated() +{ + return activated_; +} + +void DelayExecutor::activated(bool s) +{ + activated_ = s; +} diff --git a/src/server/shared/Threading/DelayExecutor.h b/src/server/shared/Threading/DelayExecutor.h new file mode 100644 index 00000000000..664d8ca78a2 --- /dev/null +++ b/src/server/shared/Threading/DelayExecutor.h @@ -0,0 +1,37 @@ +#ifndef _M_DELAY_EXECUTOR_H +#define _M_DELAY_EXECUTOR_H + +#include <ace/Task.h> +#include <ace/Activation_Queue.h> +#include <ace/Method_Request.h> + +class DelayExecutor : protected ACE_Task_Base +{ + public: + + DelayExecutor(); + virtual ~DelayExecutor(); + + static DelayExecutor* instance(); + + int execute(ACE_Method_Request* new_req); + + int activate(int num_threads = 1, ACE_Method_Request* pre_svc_hook = NULL, ACE_Method_Request* post_svc_hook = NULL); + + int deactivate(); + + bool activated(); + + virtual int svc(); + + private: + + ACE_Activation_Queue queue_; + ACE_Method_Request* pre_svc_hook_; + ACE_Method_Request* post_svc_hook_; + bool activated_; + + void activated(bool s); +}; + +#endif // _M_DELAY_EXECUTOR_H diff --git a/src/server/shared/Threading/LockedQueue.h b/src/server/shared/Threading/LockedQueue.h new file mode 100644 index 00000000000..9f8afae6c14 --- /dev/null +++ b/src/server/shared/Threading/LockedQueue.h @@ -0,0 +1,139 @@ +/* + * Copyright (C) 2009 MaNGOS <http://getmangos.com/> + * + * This program is free software; you can redistribute it and/or modify + * it under the terms of the GNU General Public License as published by + * the Free Software Foundation; either version 2 of the License, or + * (at your option) any later version. + * + * This program is distributed in the hope that it will be useful, + * but WITHOUT ANY WARRANTY; without even the implied warranty of + * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the + * GNU General Public License for more details. + * + * You should have received a copy of the GNU General Public License + * along with this program; if not, write to the Free Software + * Foundation, Inc., 59 Temple Place, Suite 330, Boston, MA 02111-1307 USA + */ + +#ifndef LOCKEDQUEUE_H +#define LOCKEDQUEUE_H + +#include <ace/Guard_T.h> +#include <ace/Thread_Mutex.h> +#include <deque> +#include <assert.h> +#include "Errors.h" + +namespace ACE_Based +{ + template <class T, class LockType, typename StorageType=std::deque<T> > + class LockedQueue + { + //! Lock access to the queue. + LockType _lock; + + //! Storage backing the queue. + StorageType _queue; + + //! Cancellation flag. + volatile bool _canceled; + + public: + + //! Create a LockedQueue. + LockedQueue() + : _canceled(false) + { + } + + //! Destroy a LockedQueue. + virtual ~LockedQueue() + { + } + + //! Adds an item to the queue. + void add(const T& item) + { + lock(); + + //ASSERT(!this->_canceled); + // throw Cancellation_Exception(); + + _queue.push_back(item); + + unlock(); + } + + //! Gets the next result in the queue, if any. + bool next(T& result) + { + // ACE_Guard<LockType> g(this->_lock); + ACE_GUARD_RETURN (LockType, g, this->_lock, false); + + if (_queue.empty()) + return false; + + //ASSERT (!_queue.empty() || !this->_canceled); + // throw Cancellation_Exception(); + result = _queue.front(); + _queue.pop_front(); + + return true; + } + + //! Peeks at the top of the queue. Remember to unlock after use. + T& peek() + { + lock(); + + T& result = _queue.front(); + + return result; + } + + //! Cancels the queue. + void cancel() + { + lock(); + + _canceled = true; + + unlock(); + } + + //! Checks if the queue is cancelled. + bool cancelled() + { + ACE_Guard<LockType> g(this->_lock); + return _canceled; + } + + //! Locks the queue for access. + void lock() + { + this->_lock.acquire(); + } + + //! Unlocks the queue. + void unlock() + { + this->_lock.release(); + } + + ///! Calls pop_front of the queue + void pop_front() + { + ACE_GUARD (LockType, g, this->_lock); + _queue.pop_front(); + } + + ///! Checks if we're empty or not with locks held + bool empty() + { + ACE_GUARD_RETURN (LockType, g, this->_lock, false); + return _queue.empty(); + } + }; +} +#endif diff --git a/src/server/shared/Threading/Threading.cpp b/src/server/shared/Threading/Threading.cpp new file mode 100644 index 00000000000..3938286dbdb --- /dev/null +++ b/src/server/shared/Threading/Threading.cpp @@ -0,0 +1,232 @@ +/* + * Copyright (C) 2009 MaNGOS <http://getmangos.com/> + * + * This program is free software; you can redistribute it and/or modify + * it under the terms of the GNU General Public License as published by + * the Free Software Foundation; either version 2 of the License, or + * (at your option) any later version. + * + * This program is distributed in the hope that it will be useful, + * but WITHOUT ANY WARRANTY; without even the implied warranty of + * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the + * GNU General Public License for more details. + * + * You should have received a copy of the GNU General Public License + * along with this program; if not, write to the Free Software + * Foundation, Inc., 59 Temple Place, Suite 330, Boston, MA 02111-1307 USA + */ + +#include "Threading.h" +#include "Errors.h" +#include <ace/OS_NS_unistd.h> +#include <ace/Sched_Params.h> +#include <vector> + +using namespace ACE_Based; + +ThreadPriority::ThreadPriority() +{ + for (int i = Idle; i < MAXPRIORITYNUM; ++i) + m_priority[i] = ACE_THR_PRI_OTHER_DEF; + + m_priority[Idle] = ACE_Sched_Params::priority_min(ACE_SCHED_OTHER); + m_priority[Realtime] = ACE_Sched_Params::priority_max(ACE_SCHED_OTHER); + + std::vector<int> _tmp; + + ACE_Sched_Params::Policy _policy = ACE_SCHED_OTHER; + ACE_Sched_Priority_Iterator pr_iter(_policy); + + while (pr_iter.more()) + { + _tmp.push_back(pr_iter.priority()); + pr_iter.next(); + } + + ASSERT (!_tmp.empty()); + + if(_tmp.size() >= MAXPRIORITYNUM) + { + const size_t max_pos = _tmp.size(); + size_t min_pos = 1; + size_t norm_pos = 0; + for (size_t i = 0; i < max_pos; ++i) + { + if(_tmp[i] == ACE_THR_PRI_OTHER_DEF) + { + norm_pos = i + 1; + break; + } + } + + //since we have only 7(seven) values in enum Priority + //and 3 we know already (Idle, Normal, Realtime) so + //we need to split each list [Idle...Normal] and [Normal...Realtime] + //into ¹ piesces + const size_t _divider = 4; + size_t _div = (norm_pos - min_pos) / _divider; + if(_div == 0) + _div = 1; + + min_pos = (norm_pos - 1); + + m_priority[Low] = _tmp[min_pos -= _div]; + m_priority[Lowest] = _tmp[min_pos -= _div ]; + + _div = (max_pos - norm_pos) / _divider; + if(_div == 0) + _div = 1; + + min_pos = norm_pos - 1; + + m_priority[High] = _tmp[min_pos += _div]; + m_priority[Highest] = _tmp[min_pos += _div]; + } +} + +int ThreadPriority::getPriority(Priority p) const +{ + if(p < Idle) + p = Idle; + + if(p > Realtime) + p = Realtime; + + return m_priority[p]; +} + +#define THREADFLAG (THR_NEW_LWP | THR_SCHED_DEFAULT| THR_JOINABLE) + +Thread::Thread() : m_task(0), m_iThreadId(0), m_hThreadHandle(0) +{ + +} + +Thread::Thread(Runnable* instance) : m_task(instance), m_iThreadId(0), m_hThreadHandle(0) +{ + // register reference to m_task to prevent it deeltion until destructor + if (m_task) + m_task->incReference(); + + bool _start = start(); + ASSERT (_start); +} + +Thread::~Thread() +{ + //Wait(); + + // deleted runnable object (if no other references) + if (m_task) + m_task->decReference(); +} + +//initialize Thread's class static member +Thread::ThreadStorage Thread::m_ThreadStorage; +ThreadPriority Thread::m_TpEnum; + +bool Thread::start() +{ + if (m_task == 0 || m_iThreadId != 0) + return false; + + bool res = (ACE_Thread::spawn(&Thread::ThreadTask, (void*)m_task, THREADFLAG, &m_iThreadId, &m_hThreadHandle) == 0); + + if (res) + m_task->incReference(); + + return res; +} + +bool Thread::wait() +{ + if (!m_hThreadHandle || !m_task) + return false; + + ACE_THR_FUNC_RETURN _value = ACE_THR_FUNC_RETURN(-1); + int _res = ACE_Thread::join(m_hThreadHandle, &_value); + + m_iThreadId = 0; + m_hThreadHandle = 0; + + return (_res == 0); +} + +void Thread::destroy() +{ + if (!m_iThreadId || !m_task) + return; + + if (ACE_Thread::kill(m_iThreadId, -1) != 0) + return; + + m_iThreadId = 0; + m_hThreadHandle = 0; + + // reference set at ACE_Thread::spawn + m_task->decReference(); +} + +void Thread::suspend() +{ + ACE_Thread::suspend(m_hThreadHandle); +} + +void Thread::resume() +{ + ACE_Thread::resume(m_hThreadHandle); +} + +ACE_THR_FUNC_RETURN Thread::ThreadTask(void * param) +{ + Runnable * _task = (Runnable*)param; + _task->run(); + + // task execution complete, free referecne added at + _task->decReference(); + + return (ACE_THR_FUNC_RETURN)0; +} + +ACE_thread_t Thread::currentId() +{ + return ACE_Thread::self(); +} + +ACE_hthread_t Thread::currentHandle() +{ + ACE_hthread_t _handle; + ACE_Thread::self(_handle); + + return _handle; +} + +Thread * Thread::current() +{ + Thread * _thread = m_ThreadStorage.ts_object(); + if(!_thread) + { + _thread = new Thread(); + _thread->m_iThreadId = Thread::currentId(); + _thread->m_hThreadHandle = Thread::currentHandle(); + + Thread * _oldValue = m_ThreadStorage.ts_object(_thread); + if(_oldValue) + delete _oldValue; + } + + return _thread; +} + +void Thread::setPriority(Priority type) +{ + int _priority = m_TpEnum.getPriority(type); + int _ok = ACE_Thread::setprio(m_hThreadHandle, _priority); + //remove this ASSERT in case you don't want to know is thread priority change was successful or not + ASSERT (_ok == 0); +} + +void Thread::Sleep(unsigned long msecs) +{ + ACE_OS::sleep(ACE_Time_Value(0, 1000 * msecs)); +} diff --git a/src/server/shared/Threading/Threading.h b/src/server/shared/Threading/Threading.h new file mode 100644 index 00000000000..fa046117c6e --- /dev/null +++ b/src/server/shared/Threading/Threading.h @@ -0,0 +1,108 @@ +/* + * Copyright (C) 2009 MaNGOS <http://getmangos.com/> + * + * This program is free software; you can redistribute it and/or modify + * it under the terms of the GNU General Public License as published by + * the Free Software Foundation; either version 2 of the License, or + * (at your option) any later version. + * + * This program is distributed in the hope that it will be useful, + * but WITHOUT ANY WARRANTY; without even the implied warranty of + * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the + * GNU General Public License for more details. + * + * You should have received a copy of the GNU General Public License + * along with this program; if not, write to the Free Software + * Foundation, Inc., 59 Temple Place, Suite 330, Boston, MA 02111-1307 USA + */ + +#ifndef THREADING_H +#define THREADING_H + +#include <ace/Thread.h> +#include <ace/TSS_T.h> +#include "ace/Atomic_Op.h" +#include <assert.h> + +namespace ACE_Based +{ + + class Runnable + { + public: + virtual ~Runnable() {} + virtual void run() = 0; + + void incReference() { ++m_refs; } + void decReference() + { + if(!--m_refs) + delete this; + } + private: + ACE_Atomic_Op<ACE_Thread_Mutex, long> m_refs; + }; + + enum Priority + { + Idle, + Lowest, + Low, + Normal, + High, + Highest, + Realtime, + }; + +#define MAXPRIORITYNUM (Realtime + 1) + + class ThreadPriority + { + public: + ThreadPriority(); + int getPriority(Priority p) const; + + private: + int m_priority[MAXPRIORITYNUM]; + }; + + class Thread + { + public: + Thread(); + explicit Thread(Runnable* instance); + ~Thread(); + + bool start(); + bool wait(); + void destroy(); + + void suspend(); + void resume(); + + void setPriority(Priority type); + + static void Sleep(unsigned long msecs); + static ACE_thread_t currentId(); + static ACE_hthread_t currentHandle(); + static Thread * current(); + + private: + Thread(const Thread&); + Thread& operator=(const Thread&); + + static ACE_THR_FUNC_RETURN ThreadTask(void * param); + + ACE_thread_t m_iThreadId; + ACE_hthread_t m_hThreadHandle; + Runnable * m_task; + + typedef ACE_TSS<Thread> ThreadStorage; + //global object - container for Thread class representation of every thread + static ThreadStorage m_ThreadStorage; + //use this object to determine current OS thread priority values mapped to enum Priority{} + static ThreadPriority m_TpEnum; + }; + +} +#endif |
