diff options
author | Yehonal <yehonal.azeroth@gmail.com> | 2016-08-16 00:01:37 +0200 |
---|---|---|
committer | Yehonal <yehonal.azeroth@gmail.com> | 2016-08-19 10:58:37 +0200 |
commit | ea286f7332894ddc74f8ab524e42193b57d99f47 (patch) | |
tree | 6b2dc94ac298003d242d13e3ad36c02251553a76 /modules | |
parent | 3761e9d8433f4d031b3477b787134a1d868a8971 (diff) |
Rewritten Threading system using c++11 std instead of ACE
It also allow full compilation with clang under all supported platforms
Need tests
Diffstat (limited to 'modules')
-rw-r--r-- | modules/acore/framework/Database/MySQLThreading.h | 6 | ||||
-rw-r--r-- | modules/acore/framework/Threading/DelayExecutor.cpp | 12 | ||||
-rw-r--r-- | modules/acore/framework/Threading/Threading.cpp | 204 | ||||
-rw-r--r-- | modules/acore/framework/Threading/Threading.h | 67 |
4 files changed, 72 insertions, 217 deletions
diff --git a/modules/acore/framework/Database/MySQLThreading.h b/modules/acore/framework/Database/MySQLThreading.h index fd2ddfe214..17e6a4c9b6 100644 --- a/modules/acore/framework/Database/MySQLThreading.h +++ b/modules/acore/framework/Database/MySQLThreading.h @@ -24,7 +24,7 @@ class MySQL { mysql_thread_init(); sLog->outSQLDriver("Core thread with ID [" UI64FMTD "] initializing MySQL thread.", - (uint64)ACE_Based::Thread::currentId()); + ACORE::Thread::currentId()); } /*! Shuts down MySQL thread and frees resources, should only be called @@ -35,7 +35,7 @@ class MySQL { mysql_thread_end(); sLog->outSQLDriver("Core thread with ID [" UI64FMTD "] shutting down MySQL thread.", - (uint64)ACE_Based::Thread::currentId()); + ACORE::Thread::currentId()); } static void Library_Init() @@ -49,4 +49,4 @@ class MySQL } }; -#endif
\ No newline at end of file +#endif diff --git a/modules/acore/framework/Threading/DelayExecutor.cpp b/modules/acore/framework/Threading/DelayExecutor.cpp index b64f8d65ab..99d8133f81 100644 --- a/modules/acore/framework/Threading/DelayExecutor.cpp +++ b/modules/acore/framework/Threading/DelayExecutor.cpp @@ -80,14 +80,14 @@ int DelayExecutor::start(int num_threads, ACE_Method_Request* pre_svc_hook, ACE_ queue_.queue()->activate(); // pussywizard: - ACE_Based::ThreadPriority tp; - int _priority = tp.getPriority(ACE_Based::Highest); - if (ACE_Task_Base::activate(THR_NEW_LWP | THR_JOINABLE, num_threads, 0, _priority) == -1) - return -1; - - //if (ACE_Task_Base::activate(THR_NEW_LWP | THR_JOINABLE | THR_INHERIT_SCHED, num_threads) == -1) + //ACORE::ThreadPriority tp; + //int _priority = tp.getPriority(ACORE::Priority_Highest); + //if (ACE_Task_Base::activate(THR_NEW_LWP | THR_JOINABLE, num_threads, 0, _priority) == -1) // return -1; + if (ACE_Task_Base::activate(THR_NEW_LWP | THR_JOINABLE | THR_INHERIT_SCHED, num_threads) == -1) + return -1; + activated(true); return true; diff --git a/modules/acore/framework/Threading/Threading.cpp b/modules/acore/framework/Threading/Threading.cpp index 47ad54eefa..eac2178ec8 100644 --- a/modules/acore/framework/Threading/Threading.cpp +++ b/modules/acore/framework/Threading/Threading.cpp @@ -6,218 +6,100 @@ #include "Threading.h" #include "Errors.h" -#include <ace/OS_NS_unistd.h> -#include <ace/Sched_Params.h> -#include <vector> -using namespace ACE_Based; +#include <chrono> +#include <system_error> -ThreadPriority::ThreadPriority() -{ - for (int i = Idle; i < MAXPRIORITYNUM; ++i) - m_priority[i] = ACE_THR_PRI_OTHER_DEF; - - m_priority[Idle] = ACE_Sched_Params::priority_min(ACE_SCHED_OTHER); - m_priority[Realtime] = ACE_Sched_Params::priority_max(ACE_SCHED_OTHER); - - std::vector<int> _tmp; - - ACE_Sched_Params::Policy _policy = ACE_SCHED_OTHER; - ACE_Sched_Priority_Iterator pr_iter(_policy); - - while (pr_iter.more()) - { - _tmp.push_back(pr_iter.priority()); - pr_iter.next(); - } +using namespace ACORE; - ASSERT (!_tmp.empty()); - - if (_tmp.size() >= MAXPRIORITYNUM) - { - const size_t max_pos = _tmp.size(); - size_t min_pos = 1; - size_t norm_pos = 0; - for (size_t i = 0; i < max_pos; ++i) - { - if (_tmp[i] == ACE_THR_PRI_OTHER_DEF) - { - norm_pos = i + 1; - break; - } - } - - // since we have only 7(seven) values in enum Priority - // and 3 we know already (Idle, Normal, Realtime) so - // we need to split each list [Idle...Normal] and [Normal...Realtime] - // into pieces - const size_t _divider = 4; - size_t _div = (norm_pos - min_pos) / _divider; - if (_div == 0) - _div = 1; - - min_pos = (norm_pos - 1); - - m_priority[Low] = _tmp[min_pos -= _div]; - m_priority[Lowest] = _tmp[min_pos -= _div ]; - - _div = (max_pos - norm_pos) / _divider; - if (_div == 0) - _div = 1; - - min_pos = norm_pos - 1; - - m_priority[High] = _tmp[min_pos += _div]; - m_priority[Highest] = _tmp[min_pos += _div]; - } -} - -int ThreadPriority::getPriority(Priority p) const +Thread::Thread() : m_task(nullptr), m_iThreadId(), m_ThreadImp() { - if (p < Idle) - p = Idle; - - if (p > Realtime) - p = Realtime; - - return m_priority[p]; } -#define THREADFLAG (THR_NEW_LWP | THR_SCHED_DEFAULT| THR_JOINABLE) - -Thread::Thread(): m_iThreadId(0), m_hThreadHandle(0), m_task(0) +Thread::Thread(Runnable* instance) : m_task(instance), m_ThreadImp(&Thread::ThreadTask, (void*)m_task) { + m_iThreadId = m_ThreadImp.get_id(); -} - -Thread::Thread(Runnable* instance): m_iThreadId(0), m_hThreadHandle(0), m_task(instance) -{ // register reference to m_task to prevent it deeltion until destructor if (m_task) m_task->incReference(); - - bool _start = start(); - ASSERT (_start); } Thread::~Thread() { - //Wait(); + // Wait(); // deleted runnable object (if no other references) if (m_task) m_task->decReference(); } -//initialize Thread's class static member -Thread::ThreadStorage Thread::m_ThreadStorage; -ThreadPriority Thread::m_TpEnum; - -bool Thread::start() +bool Thread::wait() { - if (m_task == 0 || m_iThreadId != 0) + if (m_iThreadId == std::thread::id() || !m_task) return false; - // incRef before spawing the thread, otherwise Thread::ThreadTask() might call decRef and delete m_task - m_task->incReference(); + bool res = true; - bool res = (ACE_Thread::spawn(&Thread::ThreadTask, (void*)m_task, THREADFLAG, &m_iThreadId, &m_hThreadHandle) == 0); + try + { + m_ThreadImp.join(); + } + catch (std::system_error&) + { + res = false; + } - if (!res) - m_task->decReference(); + m_iThreadId = std::thread::id(); return res; } -bool Thread::wait() -{ - if (!m_hThreadHandle || !m_task) - return false; - - ACE_THR_FUNC_RETURN _value = ACE_THR_FUNC_RETURN(-1); - int _res = ACE_Thread::join(m_hThreadHandle, &_value); - - m_iThreadId = 0; - m_hThreadHandle = 0; - - return (_res == 0); -} - void Thread::destroy() { - if (!m_iThreadId || !m_task) - return; - - if (ACE_Thread::kill(m_iThreadId, -1) != 0) + if (m_iThreadId == std::thread::id() || !m_task) return; - m_iThreadId = 0; - m_hThreadHandle = 0; - - // reference set at ACE_Thread::spawn - m_task->decReference(); -} - -void Thread::suspend() -{ - ACE_Thread::suspend(m_hThreadHandle); -} - -void Thread::resume() -{ - ACE_Thread::resume(m_hThreadHandle); + // FIXME: We need to make sure that all threads can be trusted to + // halt execution on their own as this is not an interrupt + m_ThreadImp.join(); + m_iThreadId = std::thread::id(); } -ACE_THR_FUNC_RETURN Thread::ThreadTask(void * param) +void Thread::ThreadTask(void* param) { Runnable* _task = (Runnable*)param; _task->run(); - - // task execution complete, free referecne added at - _task->decReference(); - - return (ACE_THR_FUNC_RETURN)0; } -ACE_thread_t Thread::currentId() +std::thread::id Thread::currentId() { - return ACE_Thread::self(); + return std::this_thread::get_id(); } -ACE_hthread_t Thread::currentHandle() +void Thread::setPriority(Priority priority) { - ACE_hthread_t _handle; - ACE_Thread::self(_handle); + std::thread::native_handle_type handle = m_ThreadImp.native_handle(); + bool _ok = true; +#ifdef WIN32 - return _handle; -} - -Thread * Thread::current() -{ - Thread * _thread = m_ThreadStorage.ts_object(); - if (!_thread) + switch (priority) { - _thread = new Thread(); - _thread->m_iThreadId = Thread::currentId(); - _thread->m_hThreadHandle = Thread::currentHandle(); - - Thread * _oldValue = m_ThreadStorage.ts_object(_thread); - if (_oldValue) - delete _oldValue; + case Priority_Realtime: _ok = SetThreadPriority(handle, THREAD_PRIORITY_TIME_CRITICAL); break; + case Priority_Highest: _ok = SetThreadPriority(handle, THREAD_PRIORITY_HIGHEST); break; + case Priority_High: _ok = SetThreadPriority(handle, THREAD_PRIORITY_ABOVE_NORMAL); break; + case Priority_Normal: _ok = SetThreadPriority(handle, THREAD_PRIORITY_NORMAL); break; + case Priority_Low: _ok = SetThreadPriority(handle, THREAD_PRIORITY_BELOW_NORMAL); break; + case Priority_Lowest: _ok = SetThreadPriority(handle, THREAD_PRIORITY_LOWEST); break; + case Priority_Idle: _ok = SetThreadPriority(handle, THREAD_PRIORITY_IDLE); break; } +#endif - return _thread; -} - -void Thread::setPriority(Priority type) -{ - int _priority = m_TpEnum.getPriority(type); - int _ok = ACE_Thread::setprio(m_hThreadHandle, _priority); - //remove this ASSERT in case you don't want to know is thread priority change was successful or not - ASSERT (_ok == 0); + // remove this ASSERT in case you don't want to know is thread priority change was successful or not + ASSERT(_ok); } void Thread::Sleep(unsigned long msecs) { - ACE_OS::sleep(ACE_Time_Value(0, 1000 * msecs)); + std::this_thread::sleep_for(std::chrono::milliseconds(msecs)); } diff --git a/modules/acore/framework/Threading/Threading.h b/modules/acore/framework/Threading/Threading.h index f71d2a3afe..ad6d52639c 100644 --- a/modules/acore/framework/Threading/Threading.h +++ b/modules/acore/framework/Threading/Threading.h @@ -7,20 +7,18 @@ #ifndef THREADING_H #define THREADING_H -#include "Common.h" -#include <ace/ACE.h> -#include <ace/Thread.h> -#include <ace/TSS_T.h> -#include <ace/Atomic_Op.h> -#include <assert.h> +#include <thread> +#include <atomic> -namespace ACE_Based -{ +#include <thread> +#include <atomic> +namespace ACORE +{ class Runnable { public: - virtual ~Runnable() { } + virtual ~Runnable() {} virtual void run() = 0; void incReference() { ++m_refs; } @@ -30,30 +28,18 @@ namespace ACE_Based delete this; } private: - ACE_Atomic_Op<ACE_Thread_Mutex, long> m_refs; + std::atomic_long m_refs; }; enum Priority { - Idle, - Lowest, - Low, - Normal, - High, - Highest, - Realtime - }; - -#define MAXPRIORITYNUM (Realtime + 1) - - class ThreadPriority - { - public: - ThreadPriority(); - int getPriority(Priority p) const; - - private: - int m_priority[MAXPRIORITYNUM]; + Priority_Idle, + Priority_Lowest, + Priority_Low, + Priority_Normal, + Priority_High, + Priority_Highest, + Priority_Realtime, }; class Thread @@ -63,36 +49,23 @@ namespace ACE_Based explicit Thread(Runnable* instance); ~Thread(); - bool start(); bool wait(); void destroy(); - void suspend(); - void resume(); - void setPriority(Priority type); static void Sleep(unsigned long msecs); - static ACE_thread_t currentId(); - static ACE_hthread_t currentHandle(); - static Thread * current(); + static std::thread::id currentId(); private: Thread(const Thread&); Thread& operator=(const Thread&); - static ACE_THR_FUNC_RETURN ThreadTask(void * param); - - ACE_thread_t m_iThreadId; - ACE_hthread_t m_hThreadHandle; - Runnable* m_task; + static void ThreadTask(void* param); - typedef ACE_TSS<Thread> ThreadStorage; - //global object - container for Thread class representation of every thread - static ThreadStorage m_ThreadStorage; - //use this object to determine current OS thread priority values mapped to enum Priority{ } - static ThreadPriority m_TpEnum; + Runnable* const m_task; + std::thread::id m_iThreadId; + std::thread m_ThreadImp; }; - } #endif |