aboutsummaryrefslogtreecommitdiff
path: root/src/server/shared/Threading
diff options
context:
space:
mode:
Diffstat (limited to 'src/server/shared/Threading')
-rw-r--r--src/server/shared/Threading/DelayExecutor.cpp111
-rw-r--r--src/server/shared/Threading/DelayExecutor.h37
-rw-r--r--src/server/shared/Threading/LockedQueue.h139
-rw-r--r--src/server/shared/Threading/Threading.cpp232
-rw-r--r--src/server/shared/Threading/Threading.h108
5 files changed, 627 insertions, 0 deletions
diff --git a/src/server/shared/Threading/DelayExecutor.cpp b/src/server/shared/Threading/DelayExecutor.cpp
new file mode 100644
index 00000000000..9a718823232
--- /dev/null
+++ b/src/server/shared/Threading/DelayExecutor.cpp
@@ -0,0 +1,111 @@
+#include <ace/Singleton.h>
+#include <ace/Thread_Mutex.h>
+#include <ace/Log_Msg.h>
+
+#include "DelayExecutor.h"
+
+DelayExecutor* DelayExecutor::instance()
+{
+ return ACE_Singleton<DelayExecutor, ACE_Thread_Mutex>::instance();
+}
+
+DelayExecutor::DelayExecutor()
+ : activated_(false), pre_svc_hook_(0), post_svc_hook_(0)
+{
+}
+
+DelayExecutor::~DelayExecutor()
+{
+ if (pre_svc_hook_)
+ delete pre_svc_hook_;
+
+ if (post_svc_hook_)
+ delete post_svc_hook_;
+
+ deactivate();
+}
+
+int DelayExecutor::deactivate()
+{
+ if (!activated())
+ return -1;
+
+ activated(false);
+ queue_.queue()->deactivate();
+ wait();
+
+ return 0;
+}
+
+int DelayExecutor::svc()
+{
+ if (pre_svc_hook_)
+ pre_svc_hook_->call();
+
+ for (;;)
+ {
+ ACE_Method_Request* rq = queue_.dequeue();
+
+ if (!rq)
+ break;
+
+ rq->call();
+ delete rq;
+ }
+
+ if (post_svc_hook_)
+ post_svc_hook_->call();
+
+ return 0;
+}
+
+int DelayExecutor::activate(int num_threads, ACE_Method_Request* pre_svc_hook, ACE_Method_Request* post_svc_hook)
+{
+ if (activated())
+ return -1;
+
+ if (num_threads < 1)
+ return -1;
+
+ if (pre_svc_hook_)
+ delete pre_svc_hook_;
+
+ if (post_svc_hook_)
+ delete post_svc_hook_;
+
+ pre_svc_hook_ = pre_svc_hook;
+ post_svc_hook_ = post_svc_hook;
+
+ queue_.queue()->activate();
+
+ if (ACE_Task_Base::activate(THR_NEW_LWP | THR_JOINABLE | THR_INHERIT_SCHED, num_threads) == -1)
+ return -1;
+
+ activated(true);
+
+ return true;
+}
+
+int DelayExecutor::execute(ACE_Method_Request* new_req)
+{
+ if (new_req == NULL)
+ return -1;
+
+ if (queue_.enqueue(new_req, (ACE_Time_Value*)&ACE_Time_Value::zero) == -1)
+ {
+ delete new_req;
+ ACE_ERROR_RETURN((LM_ERROR, ACE_TEXT("(%t) %p\n"), ACE_TEXT("DelayExecutor::execute enqueue")), -1);
+ }
+
+ return 0;
+}
+
+bool DelayExecutor::activated()
+{
+ return activated_;
+}
+
+void DelayExecutor::activated(bool s)
+{
+ activated_ = s;
+}
diff --git a/src/server/shared/Threading/DelayExecutor.h b/src/server/shared/Threading/DelayExecutor.h
new file mode 100644
index 00000000000..664d8ca78a2
--- /dev/null
+++ b/src/server/shared/Threading/DelayExecutor.h
@@ -0,0 +1,37 @@
+#ifndef _M_DELAY_EXECUTOR_H
+#define _M_DELAY_EXECUTOR_H
+
+#include <ace/Task.h>
+#include <ace/Activation_Queue.h>
+#include <ace/Method_Request.h>
+
+class DelayExecutor : protected ACE_Task_Base
+{
+ public:
+
+ DelayExecutor();
+ virtual ~DelayExecutor();
+
+ static DelayExecutor* instance();
+
+ int execute(ACE_Method_Request* new_req);
+
+ int activate(int num_threads = 1, ACE_Method_Request* pre_svc_hook = NULL, ACE_Method_Request* post_svc_hook = NULL);
+
+ int deactivate();
+
+ bool activated();
+
+ virtual int svc();
+
+ private:
+
+ ACE_Activation_Queue queue_;
+ ACE_Method_Request* pre_svc_hook_;
+ ACE_Method_Request* post_svc_hook_;
+ bool activated_;
+
+ void activated(bool s);
+};
+
+#endif // _M_DELAY_EXECUTOR_H
diff --git a/src/server/shared/Threading/LockedQueue.h b/src/server/shared/Threading/LockedQueue.h
new file mode 100644
index 00000000000..9f8afae6c14
--- /dev/null
+++ b/src/server/shared/Threading/LockedQueue.h
@@ -0,0 +1,139 @@
+/*
+ * Copyright (C) 2009 MaNGOS <http://getmangos.com/>
+ *
+ * This program is free software; you can redistribute it and/or modify
+ * it under the terms of the GNU General Public License as published by
+ * the Free Software Foundation; either version 2 of the License, or
+ * (at your option) any later version.
+ *
+ * This program is distributed in the hope that it will be useful,
+ * but WITHOUT ANY WARRANTY; without even the implied warranty of
+ * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
+ * GNU General Public License for more details.
+ *
+ * You should have received a copy of the GNU General Public License
+ * along with this program; if not, write to the Free Software
+ * Foundation, Inc., 59 Temple Place, Suite 330, Boston, MA 02111-1307 USA
+ */
+
+#ifndef LOCKEDQUEUE_H
+#define LOCKEDQUEUE_H
+
+#include <ace/Guard_T.h>
+#include <ace/Thread_Mutex.h>
+#include <deque>
+#include <assert.h>
+#include "Errors.h"
+
+namespace ACE_Based
+{
+ template <class T, class LockType, typename StorageType=std::deque<T> >
+ class LockedQueue
+ {
+ //! Lock access to the queue.
+ LockType _lock;
+
+ //! Storage backing the queue.
+ StorageType _queue;
+
+ //! Cancellation flag.
+ volatile bool _canceled;
+
+ public:
+
+ //! Create a LockedQueue.
+ LockedQueue()
+ : _canceled(false)
+ {
+ }
+
+ //! Destroy a LockedQueue.
+ virtual ~LockedQueue()
+ {
+ }
+
+ //! Adds an item to the queue.
+ void add(const T& item)
+ {
+ lock();
+
+ //ASSERT(!this->_canceled);
+ // throw Cancellation_Exception();
+
+ _queue.push_back(item);
+
+ unlock();
+ }
+
+ //! Gets the next result in the queue, if any.
+ bool next(T& result)
+ {
+ // ACE_Guard<LockType> g(this->_lock);
+ ACE_GUARD_RETURN (LockType, g, this->_lock, false);
+
+ if (_queue.empty())
+ return false;
+
+ //ASSERT (!_queue.empty() || !this->_canceled);
+ // throw Cancellation_Exception();
+ result = _queue.front();
+ _queue.pop_front();
+
+ return true;
+ }
+
+ //! Peeks at the top of the queue. Remember to unlock after use.
+ T& peek()
+ {
+ lock();
+
+ T& result = _queue.front();
+
+ return result;
+ }
+
+ //! Cancels the queue.
+ void cancel()
+ {
+ lock();
+
+ _canceled = true;
+
+ unlock();
+ }
+
+ //! Checks if the queue is cancelled.
+ bool cancelled()
+ {
+ ACE_Guard<LockType> g(this->_lock);
+ return _canceled;
+ }
+
+ //! Locks the queue for access.
+ void lock()
+ {
+ this->_lock.acquire();
+ }
+
+ //! Unlocks the queue.
+ void unlock()
+ {
+ this->_lock.release();
+ }
+
+ ///! Calls pop_front of the queue
+ void pop_front()
+ {
+ ACE_GUARD (LockType, g, this->_lock);
+ _queue.pop_front();
+ }
+
+ ///! Checks if we're empty or not with locks held
+ bool empty()
+ {
+ ACE_GUARD_RETURN (LockType, g, this->_lock, false);
+ return _queue.empty();
+ }
+ };
+}
+#endif
diff --git a/src/server/shared/Threading/Threading.cpp b/src/server/shared/Threading/Threading.cpp
new file mode 100644
index 00000000000..3938286dbdb
--- /dev/null
+++ b/src/server/shared/Threading/Threading.cpp
@@ -0,0 +1,232 @@
+/*
+ * Copyright (C) 2009 MaNGOS <http://getmangos.com/>
+ *
+ * This program is free software; you can redistribute it and/or modify
+ * it under the terms of the GNU General Public License as published by
+ * the Free Software Foundation; either version 2 of the License, or
+ * (at your option) any later version.
+ *
+ * This program is distributed in the hope that it will be useful,
+ * but WITHOUT ANY WARRANTY; without even the implied warranty of
+ * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
+ * GNU General Public License for more details.
+ *
+ * You should have received a copy of the GNU General Public License
+ * along with this program; if not, write to the Free Software
+ * Foundation, Inc., 59 Temple Place, Suite 330, Boston, MA 02111-1307 USA
+ */
+
+#include "Threading.h"
+#include "Errors.h"
+#include <ace/OS_NS_unistd.h>
+#include <ace/Sched_Params.h>
+#include <vector>
+
+using namespace ACE_Based;
+
+ThreadPriority::ThreadPriority()
+{
+ for (int i = Idle; i < MAXPRIORITYNUM; ++i)
+ m_priority[i] = ACE_THR_PRI_OTHER_DEF;
+
+ m_priority[Idle] = ACE_Sched_Params::priority_min(ACE_SCHED_OTHER);
+ m_priority[Realtime] = ACE_Sched_Params::priority_max(ACE_SCHED_OTHER);
+
+ std::vector<int> _tmp;
+
+ ACE_Sched_Params::Policy _policy = ACE_SCHED_OTHER;
+ ACE_Sched_Priority_Iterator pr_iter(_policy);
+
+ while (pr_iter.more())
+ {
+ _tmp.push_back(pr_iter.priority());
+ pr_iter.next();
+ }
+
+ ASSERT (!_tmp.empty());
+
+ if(_tmp.size() >= MAXPRIORITYNUM)
+ {
+ const size_t max_pos = _tmp.size();
+ size_t min_pos = 1;
+ size_t norm_pos = 0;
+ for (size_t i = 0; i < max_pos; ++i)
+ {
+ if(_tmp[i] == ACE_THR_PRI_OTHER_DEF)
+ {
+ norm_pos = i + 1;
+ break;
+ }
+ }
+
+ //since we have only 7(seven) values in enum Priority
+ //and 3 we know already (Idle, Normal, Realtime) so
+ //we need to split each list [Idle...Normal] and [Normal...Realtime]
+ //into ¹ piesces
+ const size_t _divider = 4;
+ size_t _div = (norm_pos - min_pos) / _divider;
+ if(_div == 0)
+ _div = 1;
+
+ min_pos = (norm_pos - 1);
+
+ m_priority[Low] = _tmp[min_pos -= _div];
+ m_priority[Lowest] = _tmp[min_pos -= _div ];
+
+ _div = (max_pos - norm_pos) / _divider;
+ if(_div == 0)
+ _div = 1;
+
+ min_pos = norm_pos - 1;
+
+ m_priority[High] = _tmp[min_pos += _div];
+ m_priority[Highest] = _tmp[min_pos += _div];
+ }
+}
+
+int ThreadPriority::getPriority(Priority p) const
+{
+ if(p < Idle)
+ p = Idle;
+
+ if(p > Realtime)
+ p = Realtime;
+
+ return m_priority[p];
+}
+
+#define THREADFLAG (THR_NEW_LWP | THR_SCHED_DEFAULT| THR_JOINABLE)
+
+Thread::Thread() : m_task(0), m_iThreadId(0), m_hThreadHandle(0)
+{
+
+}
+
+Thread::Thread(Runnable* instance) : m_task(instance), m_iThreadId(0), m_hThreadHandle(0)
+{
+ // register reference to m_task to prevent it deeltion until destructor
+ if (m_task)
+ m_task->incReference();
+
+ bool _start = start();
+ ASSERT (_start);
+}
+
+Thread::~Thread()
+{
+ //Wait();
+
+ // deleted runnable object (if no other references)
+ if (m_task)
+ m_task->decReference();
+}
+
+//initialize Thread's class static member
+Thread::ThreadStorage Thread::m_ThreadStorage;
+ThreadPriority Thread::m_TpEnum;
+
+bool Thread::start()
+{
+ if (m_task == 0 || m_iThreadId != 0)
+ return false;
+
+ bool res = (ACE_Thread::spawn(&Thread::ThreadTask, (void*)m_task, THREADFLAG, &m_iThreadId, &m_hThreadHandle) == 0);
+
+ if (res)
+ m_task->incReference();
+
+ return res;
+}
+
+bool Thread::wait()
+{
+ if (!m_hThreadHandle || !m_task)
+ return false;
+
+ ACE_THR_FUNC_RETURN _value = ACE_THR_FUNC_RETURN(-1);
+ int _res = ACE_Thread::join(m_hThreadHandle, &_value);
+
+ m_iThreadId = 0;
+ m_hThreadHandle = 0;
+
+ return (_res == 0);
+}
+
+void Thread::destroy()
+{
+ if (!m_iThreadId || !m_task)
+ return;
+
+ if (ACE_Thread::kill(m_iThreadId, -1) != 0)
+ return;
+
+ m_iThreadId = 0;
+ m_hThreadHandle = 0;
+
+ // reference set at ACE_Thread::spawn
+ m_task->decReference();
+}
+
+void Thread::suspend()
+{
+ ACE_Thread::suspend(m_hThreadHandle);
+}
+
+void Thread::resume()
+{
+ ACE_Thread::resume(m_hThreadHandle);
+}
+
+ACE_THR_FUNC_RETURN Thread::ThreadTask(void * param)
+{
+ Runnable * _task = (Runnable*)param;
+ _task->run();
+
+ // task execution complete, free referecne added at
+ _task->decReference();
+
+ return (ACE_THR_FUNC_RETURN)0;
+}
+
+ACE_thread_t Thread::currentId()
+{
+ return ACE_Thread::self();
+}
+
+ACE_hthread_t Thread::currentHandle()
+{
+ ACE_hthread_t _handle;
+ ACE_Thread::self(_handle);
+
+ return _handle;
+}
+
+Thread * Thread::current()
+{
+ Thread * _thread = m_ThreadStorage.ts_object();
+ if(!_thread)
+ {
+ _thread = new Thread();
+ _thread->m_iThreadId = Thread::currentId();
+ _thread->m_hThreadHandle = Thread::currentHandle();
+
+ Thread * _oldValue = m_ThreadStorage.ts_object(_thread);
+ if(_oldValue)
+ delete _oldValue;
+ }
+
+ return _thread;
+}
+
+void Thread::setPriority(Priority type)
+{
+ int _priority = m_TpEnum.getPriority(type);
+ int _ok = ACE_Thread::setprio(m_hThreadHandle, _priority);
+ //remove this ASSERT in case you don't want to know is thread priority change was successful or not
+ ASSERT (_ok == 0);
+}
+
+void Thread::Sleep(unsigned long msecs)
+{
+ ACE_OS::sleep(ACE_Time_Value(0, 1000 * msecs));
+}
diff --git a/src/server/shared/Threading/Threading.h b/src/server/shared/Threading/Threading.h
new file mode 100644
index 00000000000..fa046117c6e
--- /dev/null
+++ b/src/server/shared/Threading/Threading.h
@@ -0,0 +1,108 @@
+/*
+ * Copyright (C) 2009 MaNGOS <http://getmangos.com/>
+ *
+ * This program is free software; you can redistribute it and/or modify
+ * it under the terms of the GNU General Public License as published by
+ * the Free Software Foundation; either version 2 of the License, or
+ * (at your option) any later version.
+ *
+ * This program is distributed in the hope that it will be useful,
+ * but WITHOUT ANY WARRANTY; without even the implied warranty of
+ * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
+ * GNU General Public License for more details.
+ *
+ * You should have received a copy of the GNU General Public License
+ * along with this program; if not, write to the Free Software
+ * Foundation, Inc., 59 Temple Place, Suite 330, Boston, MA 02111-1307 USA
+ */
+
+#ifndef THREADING_H
+#define THREADING_H
+
+#include <ace/Thread.h>
+#include <ace/TSS_T.h>
+#include "ace/Atomic_Op.h"
+#include <assert.h>
+
+namespace ACE_Based
+{
+
+ class Runnable
+ {
+ public:
+ virtual ~Runnable() {}
+ virtual void run() = 0;
+
+ void incReference() { ++m_refs; }
+ void decReference()
+ {
+ if(!--m_refs)
+ delete this;
+ }
+ private:
+ ACE_Atomic_Op<ACE_Thread_Mutex, long> m_refs;
+ };
+
+ enum Priority
+ {
+ Idle,
+ Lowest,
+ Low,
+ Normal,
+ High,
+ Highest,
+ Realtime,
+ };
+
+#define MAXPRIORITYNUM (Realtime + 1)
+
+ class ThreadPriority
+ {
+ public:
+ ThreadPriority();
+ int getPriority(Priority p) const;
+
+ private:
+ int m_priority[MAXPRIORITYNUM];
+ };
+
+ class Thread
+ {
+ public:
+ Thread();
+ explicit Thread(Runnable* instance);
+ ~Thread();
+
+ bool start();
+ bool wait();
+ void destroy();
+
+ void suspend();
+ void resume();
+
+ void setPriority(Priority type);
+
+ static void Sleep(unsigned long msecs);
+ static ACE_thread_t currentId();
+ static ACE_hthread_t currentHandle();
+ static Thread * current();
+
+ private:
+ Thread(const Thread&);
+ Thread& operator=(const Thread&);
+
+ static ACE_THR_FUNC_RETURN ThreadTask(void * param);
+
+ ACE_thread_t m_iThreadId;
+ ACE_hthread_t m_hThreadHandle;
+ Runnable * m_task;
+
+ typedef ACE_TSS<Thread> ThreadStorage;
+ //global object - container for Thread class representation of every thread
+ static ThreadStorage m_ThreadStorage;
+ //use this object to determine current OS thread priority values mapped to enum Priority{}
+ static ThreadPriority m_TpEnum;
+ };
+
+}
+#endif