From eb36acd1522a2e5b8a7d2b4b4a67fc34fc777f03 Mon Sep 17 00:00:00 2001 From: leak Date: Mon, 30 Jun 2014 14:44:52 +0200 Subject: Replaced ACE_Task_Base based LogWorker with ProducerConsumerQueue --- .../shared/Threading/ProducerConsumerQueue.h | 102 +++++++++++++++++++++ 1 file changed, 102 insertions(+) create mode 100644 src/server/shared/Threading/ProducerConsumerQueue.h (limited to 'src/server/shared/Threading/ProducerConsumerQueue.h') diff --git a/src/server/shared/Threading/ProducerConsumerQueue.h b/src/server/shared/Threading/ProducerConsumerQueue.h new file mode 100644 index 00000000000..961cb9f9c82 --- /dev/null +++ b/src/server/shared/Threading/ProducerConsumerQueue.h @@ -0,0 +1,102 @@ +/* +* Copyright (C) 2008-2014 TrinityCore +* +* This program is free software; you can redistribute it and/or modify it +* under the terms of the GNU General Public License as published by the +* Free Software Foundation; either version 2 of the License, or (at your +* option) any later version. +* +* This program is distributed in the hope that it will be useful, but WITHOUT +* ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or +* FITNESS FOR A PARTICULAR PURPOSE. See the GNU General Public License for +* more details. +* +* You should have received a copy of the GNU General Public License along +* with this program. If not, see . +*/ + +#ifndef _PCQ_H +#define _PCQ_H + +#include +#include +#include + +template +class ProducerConsumerQueue +{ +private: + std::mutex _queueLock; + std::queue _queue; + std::condition_variable _condition; + +public: + + void Push(const T& value) + { + _queueLock.lock(); + + _queue.push(std::move(value)); + + _queueLock.unlock(); + + _condition.notify_one(); + } + + bool Empty() const + { + std::lock_guard lock(_queueLock); + + return _queue.empty(); + } + + bool Pop(T& value) + { + std::lock_guard lock(_queueLock); + + if (_queue.empty()) + return false; + + value = _queue.front(); + + _queue.pop(); + + return true; + } + + void WaitAndPop(T& value) + { + std::unique_lock lock(_queueLock); + + _condition.wait(lock, [this](){ return !_queue.empty(); }); + + if (_queue.empty()) + return; + + value = _queue.front(); + + _queue.pop(); + } + + void Cancel() + { + _queueLock.lock(); + + while (!_queue.empty()) + { + T& value = _queue.front(); + + delete &value; + + _queue.pop(); + } + + _queueLock.unlock(); + + _condition.notify_all(); + } +}; + +#endif + + -- cgit v1.2.3 From d39a013b6b979a5158bf86c37a197cb902b2c2f9 Mon Sep 17 00:00:00 2001 From: leak Date: Mon, 30 Jun 2014 18:37:23 +0200 Subject: Replaced ACE_Task_Base based DatabaseWorker with PCQ Note: Not exactly sure how shutdown should be handled, currently the queue clears itself out before shutting down This might need to be changed if the queue should be fully processed before being deleted --- src/server/shared/Database/DatabaseWorker.cpp | 44 +++++++++++++--------- src/server/shared/Database/DatabaseWorker.h | 21 ++++++----- src/server/shared/Database/DatabaseWorkerPool.h | 20 ++++------ .../Database/Implementation/CharacterDatabase.h | 2 +- .../shared/Database/Implementation/LoginDatabase.h | 2 +- .../shared/Database/Implementation/WorldDatabase.h | 2 +- src/server/shared/Database/MySQLConnection.cpp | 13 +++---- src/server/shared/Database/MySQLConnection.h | 5 ++- src/server/shared/Database/SQLOperation.h | 5 +-- .../shared/Threading/ProducerConsumerQueue.h | 11 +++++- 10 files changed, 68 insertions(+), 57 deletions(-) (limited to 'src/server/shared/Threading/ProducerConsumerQueue.h') diff --git a/src/server/shared/Database/DatabaseWorker.cpp b/src/server/shared/Database/DatabaseWorker.cpp index 3581f8e0211..3944c008652 100644 --- a/src/server/shared/Database/DatabaseWorker.cpp +++ b/src/server/shared/Database/DatabaseWorker.cpp @@ -20,32 +20,42 @@ #include "SQLOperation.h" #include "MySQLConnection.h" #include "MySQLThreading.h" +#include "ProducerConsumerQueue.h" -DatabaseWorker::DatabaseWorker(ACE_Activation_Queue* new_queue, MySQLConnection* con) : -m_queue(new_queue), -m_conn(con) +DatabaseWorker::DatabaseWorker(ProducerConsumerQueue* newQueue, MySQLConnection* connection) { - /// Assign thread to task - activate(); + _connection = connection; + _queue = newQueue; + _cancelationToken = false; + _workerThread = std::thread(&DatabaseWorker::WorkerThread, this); } -int DatabaseWorker::svc() +DatabaseWorker::~DatabaseWorker() { - if (!m_queue) - return -1; + _cancelationToken = true; + + _queue->Cancel(); + + _workerThread.join(); +} + +void DatabaseWorker::WorkerThread() +{ + if (!_queue) + return; - SQLOperation *request = NULL; while (1) { - request = (SQLOperation*)(m_queue->dequeue()); - if (!request) - break; + SQLOperation* operation = nullptr; - request->SetConnection(m_conn); - request->call(); + _queue->WaitAndPop(operation); - delete request; - } + if (_cancelationToken) + return; - return 0; + operation->SetConnection(_connection); + operation->call(); + + delete operation; + } } diff --git a/src/server/shared/Database/DatabaseWorker.h b/src/server/shared/Database/DatabaseWorker.h index dc883dd3428..9b45318e39f 100644 --- a/src/server/shared/Database/DatabaseWorker.h +++ b/src/server/shared/Database/DatabaseWorker.h @@ -19,23 +19,24 @@ #define _WORKERTHREAD_H #include "Define.h" -#include -#include +#include "ProducerConsumerQueue.h" class MySQLConnection; -class DatabaseWorker : protected ACE_Task_Base +class DatabaseWorker { public: - DatabaseWorker(ACE_Activation_Queue* new_queue, MySQLConnection* con); - - ///- Inherited from ACE_Task_Base - int svc(); - int wait() { return ACE_Task_Base::wait(); } + DatabaseWorker(ProducerConsumerQueue* newQueue, MySQLConnection* connection); + ~DatabaseWorker(); private: - ACE_Activation_Queue* m_queue; - MySQLConnection* m_conn; + ProducerConsumerQueue* _queue; + MySQLConnection* _connection; + + void WorkerThread(); + std::thread _workerThread; + + std::atomic_bool _cancelationToken; DatabaseWorker(DatabaseWorker const& right) = delete; DatabaseWorker& operator=(DatabaseWorker const& right) = delete; diff --git a/src/server/shared/Database/DatabaseWorkerPool.h b/src/server/shared/Database/DatabaseWorkerPool.h index 078b97762b5..e56dcc329cd 100644 --- a/src/server/shared/Database/DatabaseWorkerPool.h +++ b/src/server/shared/Database/DatabaseWorkerPool.h @@ -52,7 +52,7 @@ class DatabaseWorkerPool DatabaseWorkerPool() : _connectionInfo(NULL) { _messageQueue = new ACE_Message_Queue(8 * 1024 * 1024, 8 * 1024 * 1024); - _queue = new ACE_Activation_Queue(_messageQueue); + _queue = new ProducerConsumerQueue(); memset(_connectionCount, 0, sizeof(_connectionCount)); _connections.resize(IDX_SIZE); @@ -107,16 +107,10 @@ class DatabaseWorkerPool { TC_LOG_INFO("sql.driver", "Closing down DatabasePool '%s'.", GetDatabaseName()); - //! Shuts down delaythreads for this connection pool by underlying deactivate(). - //! The next dequeue attempt in the worker thread tasks will result in an error, - //! ultimately ending the worker thread task. - _queue->queue()->close(); - for (uint8 i = 0; i < _connectionCount[IDX_ASYNC]; ++i) { T* t = _connections[IDX_ASYNC][i]; DatabaseWorker* worker = t->m_worker; - worker->wait(); //! Block until no more threads are running this task. delete worker; t->Close(); //! Closes the actualy MySQL connection. } @@ -488,7 +482,7 @@ class DatabaseWorkerPool void Enqueue(SQLOperation* op) { - _queue->enqueue(op); + _queue->Push(op); } //! Gets a free connection in the synchronous connection pool. @@ -523,11 +517,11 @@ class DatabaseWorkerPool IDX_SIZE }; - ACE_Message_Queue* _messageQueue; //! Message Queue used by ACE_Activation_Queue - ACE_Activation_Queue* _queue; //! Queue shared by async worker threads. - std::vector< std::vector > _connections; - uint32 _connectionCount[2]; //! Counter of MySQL connections; - MySQLConnectionInfo* _connectionInfo; + ACE_Message_Queue* _messageQueue; //! Message Queue used by ACE_Activation_Queue + ProducerConsumerQueue* _queue; //! Queue shared by async worker threads. + std::vector< std::vector > _connections; + uint32 _connectionCount[2]; //! Counter of MySQL connections; + MySQLConnectionInfo* _connectionInfo; }; #endif diff --git a/src/server/shared/Database/Implementation/CharacterDatabase.h b/src/server/shared/Database/Implementation/CharacterDatabase.h index 98d7fe231f1..61167681b0b 100644 --- a/src/server/shared/Database/Implementation/CharacterDatabase.h +++ b/src/server/shared/Database/Implementation/CharacterDatabase.h @@ -26,7 +26,7 @@ class CharacterDatabaseConnection : public MySQLConnection public: //- Constructors for sync and async connections CharacterDatabaseConnection(MySQLConnectionInfo& connInfo) : MySQLConnection(connInfo) { } - CharacterDatabaseConnection(ACE_Activation_Queue* q, MySQLConnectionInfo& connInfo) : MySQLConnection(q, connInfo) { } + CharacterDatabaseConnection(ProducerConsumerQueue* q, MySQLConnectionInfo& connInfo) : MySQLConnection(q, connInfo) { } //- Loads database type specific prepared statements void DoPrepareStatements(); diff --git a/src/server/shared/Database/Implementation/LoginDatabase.h b/src/server/shared/Database/Implementation/LoginDatabase.h index 604e9d39551..7fa2ff49324 100644 --- a/src/server/shared/Database/Implementation/LoginDatabase.h +++ b/src/server/shared/Database/Implementation/LoginDatabase.h @@ -26,7 +26,7 @@ class LoginDatabaseConnection : public MySQLConnection public: //- Constructors for sync and async connections LoginDatabaseConnection(MySQLConnectionInfo& connInfo) : MySQLConnection(connInfo) { } - LoginDatabaseConnection(ACE_Activation_Queue* q, MySQLConnectionInfo& connInfo) : MySQLConnection(q, connInfo) { } + LoginDatabaseConnection(ProducerConsumerQueue* q, MySQLConnectionInfo& connInfo) : MySQLConnection(q, connInfo) { } //- Loads database type specific prepared statements void DoPrepareStatements(); diff --git a/src/server/shared/Database/Implementation/WorldDatabase.h b/src/server/shared/Database/Implementation/WorldDatabase.h index a815373a1c6..c8c38d8a629 100644 --- a/src/server/shared/Database/Implementation/WorldDatabase.h +++ b/src/server/shared/Database/Implementation/WorldDatabase.h @@ -26,7 +26,7 @@ class WorldDatabaseConnection : public MySQLConnection public: //- Constructors for sync and async connections WorldDatabaseConnection(MySQLConnectionInfo& connInfo) : MySQLConnection(connInfo) { } - WorldDatabaseConnection(ACE_Activation_Queue* q, MySQLConnectionInfo& connInfo) : MySQLConnection(q, connInfo) { } + WorldDatabaseConnection(ProducerConsumerQueue* q, MySQLConnectionInfo& connInfo) : MySQLConnection(q, connInfo) { } //- Loads database type specific prepared statements void DoPrepareStatements(); diff --git a/src/server/shared/Database/MySQLConnection.cpp b/src/server/shared/Database/MySQLConnection.cpp index 0d2d97f7f28..8b24f508331 100644 --- a/src/server/shared/Database/MySQLConnection.cpp +++ b/src/server/shared/Database/MySQLConnection.cpp @@ -33,8 +33,7 @@ #include "DatabaseWorker.h" #include "Timer.h" #include "Log.h" - -#include +#include "ProducerConsumerQueue.h" MySQLConnection::MySQLConnection(MySQLConnectionInfo& connInfo) : m_reconnecting(false), @@ -45,7 +44,7 @@ m_Mysql(NULL), m_connectionInfo(connInfo), m_connectionFlags(CONNECTION_SYNCH) { } -MySQLConnection::MySQLConnection(ACE_Activation_Queue* queue, MySQLConnectionInfo& connInfo) : +MySQLConnection::MySQLConnection(ProducerConsumerQueue* queue, MySQLConnectionInfo& connInfo) : m_reconnecting(false), m_prepareError(false), m_queue(queue), @@ -502,8 +501,8 @@ bool MySQLConnection::_HandleMySQLErrno(uint32 errNo) } uint32 lErrno = mysql_errno(GetHandle()); // It's possible this attempted reconnect throws 2006 at us. To prevent crazy recursive calls, sleep here. - ACE_OS::sleep(3); // Sleep 3 seconds - return _HandleMySQLErrno(lErrno); // Call self (recursive) + std::this_thread::sleep_for(std::chrono::seconds(3)); // Sleep 3 seconds + return _HandleMySQLErrno(lErrno); // Call self (recursive) } case ER_LOCK_DEADLOCK: @@ -517,12 +516,12 @@ bool MySQLConnection::_HandleMySQLErrno(uint32 errNo) case ER_BAD_FIELD_ERROR: case ER_NO_SUCH_TABLE: TC_LOG_ERROR("sql.sql", "Your database structure is not up to date. Please make sure you've executed all queries in the sql/updates folders."); - ACE_OS::sleep(10); + std::this_thread::sleep_for(std::chrono::seconds(10)); std::abort(); return false; case ER_PARSE_ERROR: TC_LOG_ERROR("sql.sql", "Error while parsing SQL. Core fix required."); - ACE_OS::sleep(10); + std::this_thread::sleep_for(std::chrono::seconds(10)); std::abort(); return false; default: diff --git a/src/server/shared/Database/MySQLConnection.h b/src/server/shared/Database/MySQLConnection.h index 512df7c16c7..3b7efeb5846 100644 --- a/src/server/shared/Database/MySQLConnection.h +++ b/src/server/shared/Database/MySQLConnection.h @@ -20,6 +20,7 @@ #include "DatabaseWorkerPool.h" #include "Transaction.h" #include "Util.h" +#include "ProducerConsumerQueue.h" #ifndef _MYSQLCONNECTION_H #define _MYSQLCONNECTION_H @@ -70,7 +71,7 @@ class MySQLConnection public: MySQLConnection(MySQLConnectionInfo& connInfo); //! Constructor for synchronous connections. - MySQLConnection(ACE_Activation_Queue* queue, MySQLConnectionInfo& connInfo); //! Constructor for asynchronous connections. + MySQLConnection(ProducerConsumerQueue* queue, MySQLConnectionInfo& connInfo); //! Constructor for asynchronous connections. virtual ~MySQLConnection(); virtual bool Open(); @@ -125,7 +126,7 @@ class MySQLConnection bool _HandleMySQLErrno(uint32 errNo); private: - ACE_Activation_Queue* m_queue; //! Queue shared with other asynchronous connections. + ProducerConsumerQueue* m_queue; //! Queue shared with other asynchronous connections. DatabaseWorker* m_worker; //! Core worker task. MYSQL * m_Mysql; //! MySQL Handle. MySQLConnectionInfo& m_connectionInfo; //! Connection info (used for logging) diff --git a/src/server/shared/Database/SQLOperation.h b/src/server/shared/Database/SQLOperation.h index 6f933a051e3..64fc64e2c2e 100644 --- a/src/server/shared/Database/SQLOperation.h +++ b/src/server/shared/Database/SQLOperation.h @@ -18,9 +18,6 @@ #ifndef _SQLOPERATION_H #define _SQLOPERATION_H -#include -#include - #include "QueryResult.h" //- Forward declare (don't include header to prevent circular includes) @@ -56,7 +53,7 @@ union SQLResultSetUnion class MySQLConnection; -class SQLOperation : public ACE_Method_Request +class SQLOperation { public: SQLOperation(): m_conn(NULL) { } diff --git a/src/server/shared/Threading/ProducerConsumerQueue.h b/src/server/shared/Threading/ProducerConsumerQueue.h index 961cb9f9c82..41bff445c2e 100644 --- a/src/server/shared/Threading/ProducerConsumerQueue.h +++ b/src/server/shared/Threading/ProducerConsumerQueue.h @@ -21,6 +21,7 @@ #include #include #include +#include template class ProducerConsumerQueue @@ -29,9 +30,12 @@ private: std::mutex _queueLock; std::queue _queue; std::condition_variable _condition; + std::atomic _shutdown; public: + ProducerConsumerQueue() : _shutdown(false) { } + void Push(const T& value) { _queueLock.lock(); @@ -68,7 +72,10 @@ public: { std::unique_lock lock(_queueLock); - _condition.wait(lock, [this](){ return !_queue.empty(); }); + while (_queue.empty() && !_shutdown) + { + _condition.wait(lock); + } if (_queue.empty()) return; @@ -91,6 +98,8 @@ public: _queue.pop(); } + _shutdown = true; + _queueLock.unlock(); _condition.notify_all(); -- cgit v1.2.3 From ecde28d1c114a151c9828fff8898def5e74de4e1 Mon Sep 17 00:00:00 2001 From: leak Date: Mon, 14 Jul 2014 22:51:13 +0200 Subject: Replacing ACE based multi threading with PCQ --- src/server/collision/Management/VMapManager2.cpp | 1 - .../shared/Threading/ProducerConsumerQueue.h | 2 +- src/tools/mmaps_generator/CMakeLists.txt | 2 + src/tools/mmaps_generator/MapBuilder.cpp | 83 +++++++++++++--------- src/tools/mmaps_generator/MapBuilder.h | 9 +++ src/tools/mmaps_generator/TerrainBuilder.cpp | 2 - 6 files changed, 62 insertions(+), 37 deletions(-) (limited to 'src/server/shared/Threading/ProducerConsumerQueue.h') diff --git a/src/server/collision/Management/VMapManager2.cpp b/src/server/collision/Management/VMapManager2.cpp index 00381cb1205..1d267cbd2a5 100644 --- a/src/server/collision/Management/VMapManager2.cpp +++ b/src/server/collision/Management/VMapManager2.cpp @@ -18,7 +18,6 @@ #include #include -#include #include #include #include "VMapManager2.h" diff --git a/src/server/shared/Threading/ProducerConsumerQueue.h b/src/server/shared/Threading/ProducerConsumerQueue.h index 41bff445c2e..3fefd27ba6e 100644 --- a/src/server/shared/Threading/ProducerConsumerQueue.h +++ b/src/server/shared/Threading/ProducerConsumerQueue.h @@ -47,7 +47,7 @@ public: _condition.notify_one(); } - bool Empty() const + bool Empty() { std::lock_guard lock(_queueLock); diff --git a/src/tools/mmaps_generator/CMakeLists.txt b/src/tools/mmaps_generator/CMakeLists.txt index f38ecc5144d..a2b729ca998 100644 --- a/src/tools/mmaps_generator/CMakeLists.txt +++ b/src/tools/mmaps_generator/CMakeLists.txt @@ -22,6 +22,7 @@ set(mmap_gen_Includes ${CMAKE_SOURCE_DIR}/dep/recastnavigation/Detour/Include ${CMAKE_SOURCE_DIR}/src/server/shared ${CMAKE_SOURCE_DIR}/src/server/shared/Utilities + ${CMAKE_SOURCE_DIR}/src/server/shared/Threading ${CMAKE_SOURCE_DIR}/src/server/game/Conditions ${CMAKE_SOURCE_DIR}/src/server/collision ${CMAKE_SOURCE_DIR}/src/server/collision/Management @@ -47,6 +48,7 @@ target_link_libraries(mmaps_generator Detour ${BZIP2_LIBRARIES} ${ZLIB_LIBRARIES} + ${Boost_LIBRARIES} ) if( UNIX ) diff --git a/src/tools/mmaps_generator/MapBuilder.cpp b/src/tools/mmaps_generator/MapBuilder.cpp index fdd8a7d177a..3b1516b3d11 100644 --- a/src/tools/mmaps_generator/MapBuilder.cpp +++ b/src/tools/mmaps_generator/MapBuilder.cpp @@ -165,42 +165,59 @@ namespace MMAP } /**************************************************************************/ + + void MapBuilder::WorkerThread() + { + while (1) + { + uint32 mapId; + + _queue.WaitAndPop(mapId); + + if (_cancelationToken) + return; + + buildMap(mapId); + } + } + void MapBuilder::buildAllMaps(int threads) { + for (size_t i = 0; i < threads; ++i) + { + _workerThreads.push_back(std::thread(&MapBuilder::WorkerThread, this)); + } + + m_tiles.sort([](MapTiles a, MapTiles b) + { + return a.m_tiles->size() > b.m_tiles->size(); + }); -// TODO fix that shit -// std::vector _threads; -// -// BuilderThreadPool* pool = threads > 0 ? new BuilderThreadPool() : NULL; -// -// m_tiles.sort([](MapTiles a, MapTiles b) -// { -// return a.m_tiles->size() > b.m_tiles->size(); -// }); -// -// for (TileList::iterator it = m_tiles.begin(); it != m_tiles.end(); ++it) -// { -// uint32 mapID = it->m_mapId; -// if (!shouldSkipMap(mapID)) -// { -// if (threads > 0) -// pool->Enqueue(new MapBuildRequest(mapID)); -// else -// buildMap(mapID); -// } -// } -// -// for (int i = 0; i < threads; ++i) -// _threads.push_back(new BuilderThread(this, pool->Queue())); -// -// // Free memory -// for (std::vector::iterator _th = _threads.begin(); _th != _threads.end(); ++_th) -// { -// (*_th)->wait(); -// delete *_th; -// } -// -// delete pool; + for (TileList::iterator it = m_tiles.begin(); it != m_tiles.end(); ++it) + { + uint32 mapID = it->m_mapId; + if (!shouldSkipMap(mapID)) + { + if (threads > 0) + _queue.Push(mapID); + else + buildMap(mapID); + } + } + + while (!_queue.Empty()) + { + std::this_thread::sleep_for(std::chrono::milliseconds(1000)); + } + + _cancelationToken = true; + + _queue.Cancel(); + + for (auto& thread : _workerThreads) + { + thread.join(); + } } /**************************************************************************/ diff --git a/src/tools/mmaps_generator/MapBuilder.h b/src/tools/mmaps_generator/MapBuilder.h index c9ea03190a8..4cd36602a4b 100644 --- a/src/tools/mmaps_generator/MapBuilder.h +++ b/src/tools/mmaps_generator/MapBuilder.h @@ -23,12 +23,15 @@ #include #include #include +#include +#include #include "TerrainBuilder.h" #include "IntermediateValues.h" #include "Recast.h" #include "DetourNavMesh.h" +#include "ProducerConsumerQueue.h" using namespace VMAP; @@ -96,6 +99,8 @@ namespace MMAP // builds list of maps, then builds all of mmap tiles (based on the skip settings) void buildAllMaps(int threads); + void MapBuilder::WorkerThread(); + private: // detect maps and tiles void discoverTiles(); @@ -138,6 +143,10 @@ namespace MMAP // build performance - not really used for now rcContext* m_rcContext; + + std::vector _workerThreads; + ProducerConsumerQueue _queue; + std::atomic _cancelationToken; }; } diff --git a/src/tools/mmaps_generator/TerrainBuilder.cpp b/src/tools/mmaps_generator/TerrainBuilder.cpp index 7832cef18de..771275a1757 100644 --- a/src/tools/mmaps_generator/TerrainBuilder.cpp +++ b/src/tools/mmaps_generator/TerrainBuilder.cpp @@ -18,13 +18,11 @@ #include "TerrainBuilder.h" -#include "PathCommon.h" #include "MapBuilder.h" #include "VMapManager2.h" #include "MapTree.h" #include "ModelInstance.h" -#include // ****************************************** // Map file format defines -- cgit v1.2.3