diff options
author | leak <leak@bitmx.net> | 2014-07-02 00:41:30 +0200 |
---|---|---|
committer | leak <leak@bitmx.net> | 2014-07-02 00:41:30 +0200 |
commit | 25e633aa34ef227841b4092270d862b0864dc372 (patch) | |
tree | ddbc654fed3dc969dac57c32338e7a501354823e /src | |
parent | f8e829da65c41cb753cec6cd9c59ae8a65434bce (diff) |
Replaced ACE_Method_Request based DelayExecutor by PCQ impl
Untested due to worldserver still breaking because of ACE threading fails
Diffstat (limited to 'src')
-rw-r--r-- | src/server/game/Maps/MapManager.cpp | 4 | ||||
-rw-r--r-- | src/server/game/Maps/MapUpdater.cpp | 71 | ||||
-rw-r--r-- | src/server/game/Maps/MapUpdater.h | 21 | ||||
-rw-r--r-- | src/server/game/Weather/WeatherMgr.cpp | 1 | ||||
-rw-r--r-- | src/server/shared/AutoPtr.h | 53 | ||||
-rw-r--r-- | src/server/shared/Threading/DelayExecutor.cpp | 126 | ||||
-rw-r--r-- | src/server/shared/Threading/DelayExecutor.h | 35 | ||||
-rw-r--r-- | src/server/worldserver/Master.cpp | 1 |
8 files changed, 54 insertions, 258 deletions
diff --git a/src/server/game/Maps/MapManager.cpp b/src/server/game/Maps/MapManager.cpp index 2b8dbde8280..1de13d137c4 100644 --- a/src/server/game/Maps/MapManager.cpp +++ b/src/server/game/Maps/MapManager.cpp @@ -52,8 +52,8 @@ void MapManager::Initialize() int num_threads(sWorld->getIntConfig(CONFIG_NUMTHREADS)); // Start mtmaps if needed. - if (num_threads > 0 && m_updater.activate(num_threads) == -1) - abort(); + if (num_threads > 0) + m_updater.activate(num_threads); } void MapManager::InitializeVisibilityDistanceInfo() diff --git a/src/server/game/Maps/MapUpdater.cpp b/src/server/game/Maps/MapUpdater.cpp index 2a6b810fcef..4ab95d87d48 100644 --- a/src/server/game/Maps/MapUpdater.cpp +++ b/src/server/game/Maps/MapUpdater.cpp @@ -18,57 +18,61 @@ #include <mutex> #include <condition_variable> -#include <ace/Method_Request.h> #include "MapUpdater.h" -#include "DelayExecutor.h" #include "Map.h" -#include "DatabaseEnv.h" -class MapUpdateRequest : public ACE_Method_Request +class MapUpdateRequest { private: Map& m_map; MapUpdater& m_updater; - ACE_UINT32 m_diff; + uint32 m_diff; public: - MapUpdateRequest(Map& m, MapUpdater& u, ACE_UINT32 d) + MapUpdateRequest(Map& m, MapUpdater& u, uint32 d) : m_map(m), m_updater(u), m_diff(d) { } - virtual int call() + void call() { m_map.Update (m_diff); m_updater.update_finished(); - return 0; } }; -MapUpdater::MapUpdater(): m_executor(), pending_requests(0) { } +MapUpdater::MapUpdater(): pending_requests(0) { } MapUpdater::~MapUpdater() { deactivate(); } -int MapUpdater::activate(size_t num_threads) +void MapUpdater::activate(size_t num_threads) { - return m_executor.start((int)num_threads); + for (size_t i = 0; i < num_threads; ++i) + { + _workerThreads.push_back(std::thread(&MapUpdater::WorkerThread, this)); + } } -int MapUpdater::deactivate() +void MapUpdater::deactivate() { wait(); - return m_executor.deactivate(); + _queue.Cancel(); + + for (auto& thread : _workerThreads) + { + thread.join(); + } } -int MapUpdater::wait() +void MapUpdater::wait() { std::unique_lock<std::mutex> lock(_lock); @@ -76,43 +80,44 @@ int MapUpdater::wait() _condition.wait(lock); lock.unlock(); - - return 0; } -int MapUpdater::schedule_update(Map& map, ACE_UINT32 diff) +void MapUpdater::schedule_update(Map& map, uint32 diff) { std::lock_guard<std::mutex> lock(_lock); ++pending_requests; - if (m_executor.execute(new MapUpdateRequest(map, *this, diff)) == -1) - { - ACE_DEBUG((LM_ERROR, ACE_TEXT("(%t) \n"), ACE_TEXT("Failed to schedule Map Update"))); - - --pending_requests; - return -1; - } - - return 0; + _queue.Push(new MapUpdateRequest(map, *this, diff)); } bool MapUpdater::activated() { - return m_executor.activated(); + return _workerThreads.size() > 0; } void MapUpdater::update_finished() { std::lock_guard<std::mutex> lock(_lock); - if (pending_requests == 0) - { - ACE_ERROR((LM_ERROR, ACE_TEXT("(%t)\n"), ACE_TEXT("MapUpdater::update_finished BUG, report to devs"))); - return; - } - --pending_requests; _condition.notify_all(); } + +void MapUpdater::WorkerThread() +{ + while (1) + { + MapUpdateRequest* request = nullptr; + + _queue.WaitAndPop(request); + + if (_cancelationToken) + return; + + request->call(); + + delete request; + } +} diff --git a/src/server/game/Maps/MapUpdater.h b/src/server/game/Maps/MapUpdater.h index 8461b53e992..ff1d85a23e9 100644 --- a/src/server/game/Maps/MapUpdater.h +++ b/src/server/game/Maps/MapUpdater.h @@ -19,10 +19,11 @@ #ifndef _MAP_UPDATER_H_INCLUDED #define _MAP_UPDATER_H_INCLUDED +#include "Define.h" #include <mutex> +#include <thread> #include <condition_variable> - -#include "DelayExecutor.h" +#include "ProducerConsumerQueue.h" class Map; @@ -35,24 +36,30 @@ class MapUpdater friend class MapUpdateRequest; - int schedule_update(Map& map, ACE_UINT32 diff); + void schedule_update(Map& map, uint32 diff); - int wait(); + void wait(); - int activate(size_t num_threads); + void activate(size_t num_threads); - int deactivate(); + void deactivate(); bool activated(); private: - DelayExecutor m_executor; + ProducerConsumerQueue <MapUpdateRequest*> _queue; + + std::vector<std::thread> _workerThreads; + std::atomic_bool _cancelationToken; + std::mutex _lock; std::condition_variable _condition; size_t pending_requests; void update_finished(); + + void WorkerThread(); }; #endif //_MAP_UPDATER_H_INCLUDED diff --git a/src/server/game/Weather/WeatherMgr.cpp b/src/server/game/Weather/WeatherMgr.cpp index 5274dcd358d..938c91b0228 100644 --- a/src/server/game/Weather/WeatherMgr.cpp +++ b/src/server/game/Weather/WeatherMgr.cpp @@ -24,7 +24,6 @@ #include "Weather.h" #include "Log.h" #include "ObjectMgr.h" -#include "AutoPtr.h" #include "Player.h" #include "WorldPacket.h" #include "WorldSession.h" diff --git a/src/server/shared/AutoPtr.h b/src/server/shared/AutoPtr.h deleted file mode 100644 index 96ecbfc79fe..00000000000 --- a/src/server/shared/AutoPtr.h +++ /dev/null @@ -1,53 +0,0 @@ -/* - * Copyright (C) 2008-2014 TrinityCore <http://www.trinitycore.org/> - * - * This program is free software; you can redistribute it and/or modify it - * under the terms of the GNU General Public License as published by the - * Free Software Foundation; either version 2 of the License, or (at your - * option) any later version. - * - * This program is distributed in the hope that it will be useful, but WITHOUT - * ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or - * FITNESS FOR A PARTICULAR PURPOSE. See the GNU General Public License for - * more details. - * - * You should have received a copy of the GNU General Public License along - * with this program. If not, see <http://www.gnu.org/licenses/>. - */ - -#ifndef _TRINITY_AUTO_PTR_H -#define _TRINITY_AUTO_PTR_H - -#include <ace/Bound_Ptr.h> - -namespace Trinity -{ - -template <class Pointer, class Lock> -class AutoPtr : public ACE_Strong_Bound_Ptr<Pointer, Lock> -{ - typedef ACE_Strong_Bound_Ptr<Pointer, Lock> Base; - -public: - AutoPtr() - : Base() - { } - - AutoPtr(Pointer* x) - : Base(x) - { } - - operator bool() const - { - return !Base::null(); - } - - bool operator !() const - { - return Base::null(); - } -}; - -} // namespace Trinity - -#endif diff --git a/src/server/shared/Threading/DelayExecutor.cpp b/src/server/shared/Threading/DelayExecutor.cpp deleted file mode 100644 index ef3c028fd97..00000000000 --- a/src/server/shared/Threading/DelayExecutor.cpp +++ /dev/null @@ -1,126 +0,0 @@ -/* -* Copyright (C) 2008-2014 TrinityCore <http://www.trinitycore.org/> -* -* This program is free software; you can redistribute it and/or modify it -* under the terms of the GNU General Public License as published by the -* Free Software Foundation; either version 2 of the License, or (at your -* option) any later version. -* -* This program is distributed in the hope that it will be useful, but WITHOUT -* ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or -* FITNESS FOR A PARTICULAR PURPOSE. See the GNU General Public License for -* more details. -* -* You should have received a copy of the GNU General Public License along -* with this program. If not, see <http://www.gnu.org/licenses/>. -*/ - -#include <ace/Thread_Mutex.h> -#include <ace/Log_Msg.h> - -#include "DelayExecutor.h" - -DelayExecutor* DelayExecutor::instance() -{ - static DelayExecutor* instance = new DelayExecutor(); - return instance; -} - -DelayExecutor::DelayExecutor() - : pre_svc_hook_(0), post_svc_hook_(0), activated_(false) { } - -DelayExecutor::~DelayExecutor() -{ - if (pre_svc_hook_) - delete pre_svc_hook_; - - if (post_svc_hook_) - delete post_svc_hook_; - - deactivate(); -} - -int DelayExecutor::deactivate() -{ - if (!activated()) - return -1; - - activated(false); - queue_.queue()->deactivate(); - wait(); - - return 0; -} - -int DelayExecutor::svc() -{ - if (pre_svc_hook_) - pre_svc_hook_->call(); - - for (;;) - { - ACE_Method_Request* rq = queue_.dequeue(); - - if (!rq) - break; - - rq->call(); - delete rq; - } - - if (post_svc_hook_) - post_svc_hook_->call(); - - return 0; -} - -int DelayExecutor::start(int num_threads, ACE_Method_Request* pre_svc_hook, ACE_Method_Request* post_svc_hook) -{ - if (activated()) - return -1; - - if (num_threads < 1) - return -1; - - if (pre_svc_hook_) - delete pre_svc_hook_; - - if (post_svc_hook_) - delete post_svc_hook_; - - pre_svc_hook_ = pre_svc_hook; - post_svc_hook_ = post_svc_hook; - - queue_.queue()->activate(); - - if (ACE_Task_Base::activate(THR_NEW_LWP | THR_JOINABLE | THR_INHERIT_SCHED, num_threads) == -1) - return -1; - - activated(true); - - return true; -} - -int DelayExecutor::execute(ACE_Method_Request* new_req) -{ - if (new_req == NULL) - return -1; - - if (queue_.enqueue(new_req, (ACE_Time_Value*)&ACE_Time_Value::zero) == -1) - { - delete new_req; - ACE_ERROR_RETURN((LM_ERROR, ACE_TEXT("(%t) %p\n"), ACE_TEXT("DelayExecutor::execute enqueue")), -1); - } - - return 0; -} - -bool DelayExecutor::activated() -{ - return activated_; -} - -void DelayExecutor::activated(bool s) -{ - activated_ = s; -} diff --git a/src/server/shared/Threading/DelayExecutor.h b/src/server/shared/Threading/DelayExecutor.h deleted file mode 100644 index 2a3721263fc..00000000000 --- a/src/server/shared/Threading/DelayExecutor.h +++ /dev/null @@ -1,35 +0,0 @@ -#ifndef _M_DELAY_EXECUTOR_H -#define _M_DELAY_EXECUTOR_H - -#include <ace/Task.h> -#include <ace/Activation_Queue.h> -#include <ace/Method_Request.h> - -class DelayExecutor : protected ACE_Task_Base -{ - public: - DelayExecutor(); - virtual ~DelayExecutor(); - - static DelayExecutor* instance(); - - int execute(ACE_Method_Request* new_req); - - int start(int num_threads = 1, ACE_Method_Request* pre_svc_hook = NULL, ACE_Method_Request* post_svc_hook = NULL); - - int deactivate(); - - bool activated(); - - virtual int svc(); - - private: - ACE_Activation_Queue queue_; - ACE_Method_Request* pre_svc_hook_; - ACE_Method_Request* post_svc_hook_; - bool activated_; - - void activated(bool s); -}; - -#endif // _M_DELAY_EXECUTOR_H diff --git a/src/server/worldserver/Master.cpp b/src/server/worldserver/Master.cpp index 41060e37b38..ecc3e31a349 100644 --- a/src/server/worldserver/Master.cpp +++ b/src/server/worldserver/Master.cpp @@ -24,7 +24,6 @@ #include "Common.h" #include "SystemConfig.h" -#include "SignalHandler.h" #include "World.h" #include "WorldRunnable.h" #include "WorldSocket.h" |