diff options
Diffstat (limited to 'src')
-rw-r--r-- | src/game/CMakeLists.txt | 3 | ||||
-rw-r--r-- | src/game/Map.cpp | 10 | ||||
-rw-r--r-- | src/game/MapManager.cpp | 42 | ||||
-rw-r--r-- | src/game/MapManager.h | 2 | ||||
-rw-r--r-- | src/game/MapUpdater.cpp | 126 | ||||
-rw-r--r-- | src/game/MapUpdater.h | 36 | ||||
-rw-r--r-- | src/game/ObjectAccessor.cpp | 1 | ||||
-rw-r--r-- | src/game/ObjectAccessor.h | 7 | ||||
-rw-r--r-- | src/game/Player.cpp | 9 | ||||
-rw-r--r-- | src/game/Unit.cpp | 7 | ||||
-rw-r--r-- | src/shared/CMakeLists.txt | 3 | ||||
-rw-r--r-- | src/shared/DelayExecutor.cpp | 114 | ||||
-rw-r--r-- | src/shared/DelayExecutor.h | 33 | ||||
-rw-r--r-- | src/shared/Util.cpp | 56 |
14 files changed, 355 insertions, 94 deletions
diff --git a/src/game/CMakeLists.txt b/src/game/CMakeLists.txt index 663e827812b..0131437b732 100644 --- a/src/game/CMakeLists.txt +++ b/src/game/CMakeLists.txt @@ -150,6 +150,8 @@ SET(game_STAT_SRCS MapInstanced.h MapManager.cpp MapManager.h + MapUpdater.cpp + MapUpdater.h MapReference.h MapRefManager.h MiscHandler.cpp @@ -294,6 +296,5 @@ SET(game_STAT_SRCS OutdoorPvPImpl.h ZoneScript.h ) -add_definitions(-fopenmp) add_library(game STATIC ${game_STAT_SRCS}) ADD_DEPENDENCIES(game revision.h) diff --git a/src/game/Map.cpp b/src/game/Map.cpp index 53702235f7a..a34ce673d9a 100644 --- a/src/game/Map.cpp +++ b/src/game/Map.cpp @@ -706,10 +706,12 @@ void Map::RemoveUnitFromNotify(Unit *unit) void Map::Update(const uint32 &t_diff) { /// update players at tick - for (m_mapRefIter = m_mapRefManager.begin(); m_mapRefIter != m_mapRefManager.end(); ++m_mapRefIter) - if (Player* plr = m_mapRefIter->getSource()) - if (plr && plr->IsInWorld()) - plr->Update(t_diff); + for(m_mapRefIter = m_mapRefManager.begin(); m_mapRefIter != m_mapRefManager.end(); ++m_mapRefIter) + { + Player* plr = m_mapRefIter->getSource(); + if(plr && plr->IsInWorld()) + plr->Update(t_diff); + } m_notifyTimer.Update(t_diff); if (m_notifyTimer.Passed()) diff --git a/src/game/MapManager.cpp b/src/game/MapManager.cpp index ae6a01003cf..5121b9dc574 100644 --- a/src/game/MapManager.cpp +++ b/src/game/MapManager.cpp @@ -18,9 +18,6 @@ * Foundation, Inc., 59 Temple Place, Suite 330, Boston, MA 02111-1307 USA */ -#ifdef MULTI_THREAD_MAP -#include <omp.h> -#endif #include "MapManager.h" #include "InstanceSaveMgr.h" #include "Policies/SingletonImp.h" @@ -32,6 +29,7 @@ #include "MapInstanced.h" #include "InstanceData.h" #include "DestinationHolderImp.h" +#include "Config/ConfigEnv.h" #include "World.h" #include "CellImpl.h" #include "Corpse.h" @@ -72,6 +70,12 @@ MapManager::Initialize() i_GridStateErrorCount = 0; } +#ifdef MULTI_THREAD_MAP + int num_threads(sWorld.getConfig(CONFIG_NUMTHREADS)); + // Start mtmaps if needed. + if(num_threads > 0 && m_updater.activate(num_threads) == -1) + abort(); +#endif InitMaxInstanceId(); } @@ -299,22 +303,17 @@ MapManager::Update(uint32 diff) MapMapType::iterator iter = i_maps.begin(); #ifdef MULTI_THREAD_MAP - std::vector<Map*> update_queue(i_maps.size()); - int omp_set_num_threads(sWorld.getConfig(CONFIG_NUMTHREADS)); - for (uint32 i = 0; iter != i_maps.end(); ++iter, ++i) - update_queue[i] = iter->second; -/* - gomp in gcc <4.4 version cannot parallelise loops using random access iterators - so until gcc 4.4 isnt standard, we need the update_queue workaround -*/ -#pragma omp parallel for schedule(dynamic) private(i) shared(update_queue) - for (uint32 i = 0; i < i_maps.size(); ++i) + for(; iter != i_maps.end(); ++iter) { - checkAndCorrectGridStatesArray(); // debugging code, should be deleted some day - update_queue[i]->Update(i_timer.GetCurrent()); - sWorld.RecordTimeDiff("UpdateMap %u", update_queue[i]->GetId()); - //sLog.outError("This is thread %d out of %d threads,updating map %u",omp_get_thread_num(),omp_get_num_threads(),iter->second->GetId()); + if (m_updater.activated()) + m_updater.schedule_update(*iter->second, i_timer.GetCurrent()); + else + { + iter->second->Update(i_timer.GetCurrent()); + } } + if (m_updater.activated()) + m_updater.wait(); #else for (; iter != i_maps.end(); ++iter) { @@ -377,6 +376,11 @@ void MapManager::UnloadAll() delete i_maps.begin()->second; i_maps.erase(i_maps.begin()); } + +#ifdef MULTI_THREAD_MAP + if (m_updater.activated()) + m_updater.deactivate(); +#endif } void MapManager::InitMaxInstanceId() @@ -393,6 +397,8 @@ void MapManager::InitMaxInstanceId() uint32 MapManager::GetNumInstances() { + Guard guard(*this); + uint32 ret = 0; for (MapMapType::iterator itr = i_maps.begin(); itr != i_maps.end(); ++itr) { @@ -407,6 +413,8 @@ uint32 MapManager::GetNumInstances() uint32 MapManager::GetNumPlayersInInstances() { + Guard guard(*this); + uint32 ret = 0; for (MapMapType::iterator itr = i_maps.begin(); itr != i_maps.end(); ++itr) { diff --git a/src/game/MapManager.h b/src/game/MapManager.h index f9e64240686..05fd3db9951 100644 --- a/src/game/MapManager.h +++ b/src/game/MapManager.h @@ -27,6 +27,7 @@ #include "Common.h" #include "Map.h" #include "GridStates.h" +#include "MapUpdater.h" class Transport; @@ -152,6 +153,7 @@ class MANGOS_DLL_DECL MapManager : public MaNGOS::Singleton<MapManager, MaNGOS:: IntervalTimer i_timer; uint32 i_MaxInstanceId; + MapUpdater m_updater; }; #endif diff --git a/src/game/MapUpdater.cpp b/src/game/MapUpdater.cpp new file mode 100644 index 00000000000..8162efa0d06 --- /dev/null +++ b/src/game/MapUpdater.cpp @@ -0,0 +1,126 @@ +#include "MapUpdater.h" +#include "DelayExecutor.h" +#include "Map.h" +#include "Database/DatabaseEnv.h" + +#include <ace/Guard_T.h> +#include <ace/Method_Request.h> + +//the reason this things are here is that i want to make +//the netcode patch and the multithreaded maps independant +//once they are merged 1 class should be used +class WDBThreadStartReq1 : public ACE_Method_Request +{ + public: + WDBThreadStartReq1(){} + virtual int + + call (void) + { + WorldDatabase.ThreadStart(); + return 0; + } +}; + +class WDBThreadEndReq1 : public ACE_Method_Request +{ + public: + WDBThreadEndReq1(){} + virtual int + + call (void) + { + WorldDatabase.ThreadEnd(); + return 0; + } +}; + +class MapUpdateRequest : public ACE_Method_Request +{ + public: + Map& m_map; + MapUpdater& m_updater; + ACE_UINT32 m_diff; + MapUpdateRequest(Map& m, MapUpdater& u, ACE_UINT32 d) : m_map(m), m_updater(u), m_diff(d){} + virtual int + + call (void) + { + m_map.Update (m_diff); + m_updater.update_finished (); + return 0; + } +}; + +MapUpdater::MapUpdater() : +m_mutex(), +m_condition(m_mutex), +m_executor(), +pedning_requests(0) +{ + return; +} + +MapUpdater::~MapUpdater() +{ + this->deactivate(); +} + +int MapUpdater::activate(size_t num_threads) +{ + return this->m_executor.activate(static_cast<int> (num_threads), new WDBThreadStartReq1, new WDBThreadEndReq1); +} + +int MapUpdater::deactivate(void) +{ + this->wait(); + + return this->m_executor.deactivate(); +} + +int MapUpdater::wait() +{ + ACE_GUARD_RETURN(ACE_Thread_Mutex, guard, this->m_mutex, -1); + + while(this->pedning_requests > 0) + this->m_condition.wait(); + + return 0; +} + +int MapUpdater::schedule_update(Map& map, ACE_UINT32 diff) +{ + ACE_GUARD_RETURN(ACE_Thread_Mutex, guard, this->m_mutex, -1); + + ++this->pedning_requests; + + if (this->m_executor.execute(new MapUpdateRequest(map, *this, diff)) == -1) + { + ACE_DEBUG((LM_ERROR, ACE_TEXT("(%t) \n"), ACE_TEXT("Failed to schedule Map Update"))); + + --this->pedning_requests; + return -1; + } + + return 0; +} + +bool MapUpdater::activated() +{ + return m_executor.activated(); +} + +void MapUpdater::update_finished() +{ + ACE_GUARD(ACE_Thread_Mutex, guard, this->m_mutex); + + if (this->pedning_requests == 0) + { + ACE_ERROR((LM_ERROR,ACE_TEXT("(%t)\n"), ACE_TEXT("MapUpdater::update_finished BUG, report to devs"))); + return; + } + + --this->pedning_requests; + + this->m_condition.broadcast(); +} diff --git a/src/game/MapUpdater.h b/src/game/MapUpdater.h new file mode 100644 index 00000000000..3909569e163 --- /dev/null +++ b/src/game/MapUpdater.h @@ -0,0 +1,36 @@ +#ifndef _MAP_UPDATER_H_INCLUDED +#define _MAP_UPDATER_H_INCLUDED + +#include <ace/Thread_Mutex.h> +#include <ace/Condition_Thread_Mutex.h> + +#include "DelayExecutor.h" + +class Map; + +class MapUpdater +{ + public: + MapUpdater(); + virtual ~MapUpdater(); + + friend class MapUpdateRequest; + + int schedule_update(Map& map, ACE_UINT32 diff); + + int wait(); + + int activate(size_t num_threads); + + int deactivate(void); + + bool activated(); + private: + void update_finished(); + + DelayExecutor m_executor; + ACE_Condition_Thread_Mutex m_condition; + ACE_Thread_Mutex m_mutex; + size_t pedning_requests; +}; +#endif //_MAP_UPDATER_H_INCLUDED diff --git a/src/game/ObjectAccessor.cpp b/src/game/ObjectAccessor.cpp index 72e448a5dd8..2dfd9c9ae7b 100644 --- a/src/game/ObjectAccessor.cpp +++ b/src/game/ObjectAccessor.cpp @@ -162,6 +162,7 @@ Player* ObjectAccessor::FindPlayerByName(const char *name) { //TODO: Player Guard + Guard guard(*HashMapHolder<Player>::GetLock()); HashMapHolder<Player>::MapType& m = HashMapHolder<Player>::GetContainer(); HashMapHolder<Player>::MapType::iterator iter = m.begin(); for (; iter != m.end(); ++iter) diff --git a/src/game/ObjectAccessor.h b/src/game/ObjectAccessor.h index 67ef5efef89..79afa78fb6a 100644 --- a/src/game/ObjectAccessor.h +++ b/src/game/ObjectAccessor.h @@ -52,7 +52,11 @@ class HashMapHolder typedef ACE_Thread_Mutex LockType; typedef MaNGOS::GeneralLock<LockType > Guard; - static void Insert(T* o) { m_objectMap[o->GetGUID()] = o; } + static void Insert(T* o) + { + Guard guard(i_lock); + m_objectMap[o->GetGUID()] = o; + } static void Remove(T* o) { @@ -62,6 +66,7 @@ class HashMapHolder static T* Find(uint64 guid) { + Guard guard(i_lock); typename MapType::iterator itr = m_objectMap.find(guid); return (itr != m_objectMap.end()) ? itr->second : NULL; } diff --git a/src/game/Player.cpp b/src/game/Player.cpp index ba5fddd3637..7b50c44ea16 100644 --- a/src/game/Player.cpp +++ b/src/game/Player.cpp @@ -1121,12 +1121,6 @@ void Player::SetDrunkValue(uint16 newDrunkenValue, uint32 itemId) void Player::Update( uint32 p_time ) { - // Until Trinity is thread safe, anything that could result in a - // map, zone, or area change in this Update should be preceded by: - // #pragma omp critical(UpdateThreadSafety) - // This will only allow one thread at a time to process a "UpdateThreadSafety" block. - // NOTE: I'm only certain about the map change part. The zone and area #pragma is just a precaution. - if (!IsInWorld()) return; @@ -1294,7 +1288,6 @@ void Player::Update( uint32 p_time ) m_weaponChangeTimer -= p_time; } - #pragma omp critical(UpdateThreadSafety) if (m_zoneUpdateTimer > 0) { if (p_time >= m_zoneUpdateTimer) @@ -1374,7 +1367,6 @@ void Player::Update( uint32 p_time ) // not auto-free ghost from body in instances if(m_deathTimer > 0 && !GetBaseMap()->Instanceable()) { - #pragma omp critical(UpdateThreadSafety) if(p_time >= m_deathTimer) { m_deathTimer = 0; @@ -1398,7 +1390,6 @@ void Player::Update( uint32 p_time ) //we should execute delayed teleports only for alive(!) players //because we don't want player's ghost teleported from graveyard - #pragma omp critical(UpdateThreadSafety) if (IsHasDelayedTeleport() && isAlive()) TeleportTo(m_teleport_dest, m_teleport_options); } diff --git a/src/game/Unit.cpp b/src/game/Unit.cpp index 74580d45f9b..90836cd23ea 100644 --- a/src/game/Unit.cpp +++ b/src/game/Unit.cpp @@ -225,11 +225,8 @@ void Unit::Update(uint32 p_time) // WARNING! Order of execution here is important, do not change. // Spells must be processed with event system BEFORE they go to _UpdateSpells. // Or else we may have some SPELL_STATE_FINISHED spells stalled in pointers, that is bad. - #pragma omp critical(UpdateThreadSafety) - { - m_Events.Update(p_time); - _UpdateSpells(p_time); - } + m_Events.Update(p_time); + _UpdateSpells(p_time); // If this is set during update SetCantProc(false) call is missing somewhere in the code // Having this would prevent spells from being proced, so let's crash diff --git a/src/shared/CMakeLists.txt b/src/shared/CMakeLists.txt index de73967546b..5349a89f613 100644 --- a/src/shared/CMakeLists.txt +++ b/src/shared/CMakeLists.txt @@ -9,6 +9,8 @@ SET(shared_STAT_SRCS ByteBuffer.h Common.cpp Common.h + DelayExecutor.cpp + DelayExecutor.h Errors.h Log.cpp Log.h @@ -21,7 +23,6 @@ SET(shared_STAT_SRCS WorldPacket.h SystemConfig.h ) -add_definitions(-fopenmp) add_library(shared STATIC ${shared_STAT_SRCS}) target_link_libraries( shared diff --git a/src/shared/DelayExecutor.cpp b/src/shared/DelayExecutor.cpp new file mode 100644 index 00000000000..07a691d6357 --- /dev/null +++ b/src/shared/DelayExecutor.cpp @@ -0,0 +1,114 @@ +#include <ace/Singleton.h> +#include <ace/Thread_Mutex.h> +#include <ace/Log_Msg.h> + +#include "DelayExecutor.h" + +DelayExecutor* +DelayExecutor::instance() +{ + return ACE_Singleton<DelayExecutor, ACE_Thread_Mutex>::instance(); +} + +DelayExecutor::DelayExecutor(): +activated_ (false), +pre_svc_hook_ (0), +post_svc_hook_ (0) {} + +DelayExecutor::~DelayExecutor() +{ + if (pre_svc_hook_) + delete pre_svc_hook_; + + if (post_svc_hook_) + delete post_svc_hook_; + + this->deactivate (); +} + +int DelayExecutor::deactivate() +{ + if (!this->activated()) + return -1; + + this->activated(false); + + this->queue_.queue()->deactivate(); + + this->wait(); + + return 0; +} + +int DelayExecutor::svc (void) +{ + if (pre_svc_hook_) + pre_svc_hook_->call(); + + for (;;) + { + ACE_Method_Request* rq = this->queue_.dequeue(); + + if (!rq) + break; + + rq->call(); + + delete rq; + } + + if (post_svc_hook_) + post_svc_hook_->call(); + + return 0; +} + +int DelayExecutor::activate(int num_threads, ACE_Method_Request* pre_svc_hook, ACE_Method_Request* post_svc_hook) +{ + if (this->activated()) + return -1; + + if (num_threads < 1) + return -1; + + if (pre_svc_hook_) + delete pre_svc_hook_; + + if (post_svc_hook_) + delete post_svc_hook_; + + this->pre_svc_hook_ = pre_svc_hook; + this->post_svc_hook_ = post_svc_hook; + + this->queue_.queue ()->activate (); + + if (ACE_Task_Base::activate(THR_NEW_LWP | THR_JOINABLE | THR_INHERIT_SCHED, num_threads) == -1) + return -1; + + this->activated(true); + + return true; +} + +int DelayExecutor::execute(ACE_Method_Request* new_req) +{ + if (new_req == NULL) + return -1; + + if (this->queue_.enqueue(new_req,(ACE_Time_Value*)&ACE_Time_Value::zero) == -1) + { + delete new_req; + ACE_ERROR_RETURN((LM_ERROR, ACE_TEXT("(%t) %p\n"), ACE_TEXT("DelayExecutor::execute enqueue")), -1); + } + return 0; +} + +bool DelayExecutor::activated() +{ + return this->activated_; +} + +void DelayExecutor::activated(bool s) +{ + this->activated_ = s; +} diff --git a/src/shared/DelayExecutor.h b/src/shared/DelayExecutor.h new file mode 100644 index 00000000000..7a160d5ec92 --- /dev/null +++ b/src/shared/DelayExecutor.h @@ -0,0 +1,33 @@ +#ifndef _M_DELAY_EXECUTOR_H +#define _M_DELAY_EXECUTOR_H + +#include <ace/Task.h> +#include <ace/Activation_Queue.h> +#include <ace/Method_Request.h> + +class DelayExecutor : protected ACE_Task_Base +{ + public: + DelayExecutor(); + virtual ~DelayExecutor(); + + static DelayExecutor* instance(); + + int execute(ACE_Method_Request* new_req); + + int activate(int num_threads = 1, ACE_Method_Request* pre_svc_hook = 0, ACE_Method_Request* post_svc_hook = 0); + + int deactivate(); + + bool activated(); + + virtual int svc(void); + private: + ACE_Activation_Queue queue_; + ACE_Method_Request* pre_svc_hook_; + ACE_Method_Request* post_svc_hook_; + + void activated(bool s); + bool activated_; +}; +#endif // _M_DELAY_EXECUTOR_H diff --git a/src/shared/Util.cpp b/src/shared/Util.cpp index 9846b48286c..c5384b6f32e 100644 --- a/src/shared/Util.cpp +++ b/src/shared/Util.cpp @@ -28,60 +28,6 @@ typedef ACE_TSS<MTRand> MTRandTSS; static MTRandTSS mtRand; -#ifdef MULTI_THREAD_MAP - -int32 irand (int32 min, int32 max) -{ - int32 result; -#pragma omp critical (mtrand) -{ - result = int32 (mtRand->randInt (max - min)) + min; -} - return result; -} - -uint32 urand (uint32 min, uint32 max) -{ - uint32 result; -#pragma omp critical (mtrand) -{ - result = mtRand->randInt (max - min) + min; -} - return result; -} - -int32 rand32 () -{ - int32 result; -#pragma omp critical (mtrand) -{ - result = mtRand->randInt (); -} - return result; -} - -double rand_norm(void) -{ - double result; -#pragma omp critical (mtrand) -{ - result = mtRand->randExc (); -} - return result; -} - -double rand_chance (void) -{ - double result; -#pragma omp critical (mtrand) -{ - result = mtRand->randExc (100.0); -} - return result; -} - -#else - int32 irand (int32 min, int32 max) { return int32 (mtRand->randInt (max - min)) + min; @@ -107,8 +53,6 @@ double rand_chance (void) return mtRand->randExc (100.0); } -#endif - Tokens StrSplit(const std::string &src, const std::string &sep) { Tokens r; |