diff options
-rw-r--r-- | src/server/database/Database/DatabaseWorkerPool.h | 150 |
1 files changed, 67 insertions, 83 deletions
diff --git a/src/server/database/Database/DatabaseWorkerPool.h b/src/server/database/Database/DatabaseWorkerPool.h index d5a254647eb..36e12e46f4f 100644 --- a/src/server/database/Database/DatabaseWorkerPool.h +++ b/src/server/database/Database/DatabaseWorkerPool.h @@ -32,6 +32,7 @@ #include <mysqld_error.h> #include <memory> +#include <array> #define MIN_MYSQL_SERVER_VERSION 50100u #define MIN_MYSQL_CLIENT_VERSION 50100u @@ -62,9 +63,6 @@ class DatabaseWorkerPool DatabaseWorkerPool() : _queue(new ProducerConsumerQueue<SQLOperation*>()), _async_threads(0), _synch_threads(0) { - memset(_connectionCount, 0, sizeof(_connectionCount)); - _connections.resize(IDX_SIZE); - WPFatal(mysql_thread_safe(), "Used MySQL library isn't thread-safe."); WPFatal(mysql_get_client_version() >= MIN_MYSQL_CLIENT_VERSION, "TrinityCore does not support MySQL versions below 5.1"); WPFatal(mysql_get_client_version() == MYSQL_VERSION_ID, "Used MySQL library version (%s) does not match the version used to compile TrinityCore (%s).", @@ -78,7 +76,7 @@ class DatabaseWorkerPool void SetConnectionInfo(std::string const& infoString, uint8 const asyncThreads, uint8 const synchThreads) { - _connectionInfo.reset(new MySQLConnectionInfo(infoString)); + _connectionInfo = Trinity::make_unique<MySQLConnectionInfo>(infoString); _async_threads = asyncThreads; _synch_threads = synchThreads; @@ -100,8 +98,8 @@ class DatabaseWorkerPool if (!error) { - TC_LOG_INFO("sql.driver", "DatabasePool '%s' opened successfully. %u total connections running.", GetDatabaseName(), - (_connectionCount[IDX_SYNCH] + _connectionCount[IDX_ASYNC])); + TC_LOG_INFO("sql.driver", "DatabasePool '%s' opened successfully. %u total connections running.", + GetDatabaseName(), (_connections[IDX_SYNCH].size() + _connections[IDX_ASYNC].size())); } return error; @@ -111,11 +109,8 @@ class DatabaseWorkerPool { TC_LOG_INFO("sql.driver", "Closing down DatabasePool '%s'.", GetDatabaseName()); - for (uint8 i = 0; i < _connectionCount[IDX_ASYNC]; ++i) - { - T* t = _connections[IDX_ASYNC][i]; - t->Close(); //! Closes the actualy MySQL connection. - } + //! Closes the actualy MySQL connection. + _connections[IDX_ASYNC].clear(); TC_LOG_INFO("sql.driver", "Asynchronous connections on DatabasePool '%s' terminated. Proceeding with synchronous connections.", GetDatabaseName()); @@ -124,8 +119,7 @@ class DatabaseWorkerPool //! There's no need for locking the connection, because DatabaseWorkerPool<>::Close //! should only be called after any other thread tasks in the core have exited, //! meaning there can be no concurrent access at this point. - for (uint8 i = 0; i < _connectionCount[IDX_SYNCH]; ++i) - _connections[IDX_SYNCH][i]->Close(); + _connections[IDX_SYNCH].clear(); TC_LOG_INFO("sql.driver", "All connections on DatabasePool '%s' closed.", GetDatabaseName()); } @@ -133,19 +127,18 @@ class DatabaseWorkerPool //! Prepares all prepared statements bool PrepareStatements() { - for (uint8 i = 0; i < IDX_SIZE; ++i) - for (uint32 c = 0; c < _connectionCount[i]; ++c) + for (auto& connections : _connections) + for (auto& connection : connections) { - T* t = _connections[i][c]; - t->LockIfReady(); - if (!t->PrepareStatements()) + connection->LockIfReady(); + if (!connection->PrepareStatements()) { - t->Unlock(); + connection->Unlock(); Close(); return false; } else - t->Unlock(); + connection->Unlock(); } return true; @@ -201,9 +194,9 @@ class DatabaseWorkerPool if (!sql) return; - T* t = GetFreeConnection(); - t->Execute(sql); - t->Unlock(); + T* connection = GetFreeConnection(); + connection->Execute(sql); + connection->Unlock(); } //! Directly executes a one-way SQL operation in string format -with variable args-, that will block the calling thread until finished. @@ -221,9 +214,9 @@ class DatabaseWorkerPool //! Statement must be prepared with the CONNECTION_SYNCH flag. void DirectExecute(PreparedStatement* stmt) { - T* t = GetFreeConnection(); - t->Execute(stmt); - t->Unlock(); + T* connection = GetFreeConnection(); + connection->Execute(stmt); + connection->Unlock(); //! Delete proxy-class. Not needed anymore delete stmt; @@ -235,13 +228,13 @@ class DatabaseWorkerPool //! Directly executes an SQL query in string format that will block the calling thread until finished. //! Returns reference counted auto pointer, no need for manual memory management in upper level code. - QueryResult Query(const char* sql, T* conn = nullptr) + QueryResult Query(const char* sql, T* connection = nullptr) { - if (!conn) - conn = GetFreeConnection(); + if (!connection) + connection = GetFreeConnection(); - ResultSet* result = conn->Query(sql); - conn->Unlock(); + ResultSet* result = connection->Query(sql); + connection->Unlock(); if (!result || !result->GetRowCount() || !result->NextRow()) { delete result; @@ -278,9 +271,9 @@ class DatabaseWorkerPool //! Statement must be prepared with CONNECTION_SYNCH flag. PreparedQueryResult Query(PreparedStatement* stmt) { - T* t = GetFreeConnection(); - PreparedResultSet* ret = t->Query(stmt); - t->Unlock(); + auto connection = GetFreeConnection(); + PreparedResultSet* ret = connection->Query(stmt); + connection->Unlock(); //! Delete proxy-class. Not needed anymore delete stmt; @@ -380,11 +373,11 @@ class DatabaseWorkerPool //! were appended to the transaction will be respected during execution. void DirectCommitTransaction(SQLTransaction& transaction) { - T* con = GetFreeConnection(); - int errorCode = con->ExecuteTransaction(transaction); + T* connection = GetFreeConnection(); + int errorCode = connection->ExecuteTransaction(transaction); if (!errorCode) { - con->Unlock(); // OK, operation succesful + connection->Unlock(); // OK, operation succesful return; } @@ -395,7 +388,7 @@ class DatabaseWorkerPool uint8 loopBreaker = 5; for (uint8 i = 0; i < loopBreaker; ++i) { - if (!con->ExecuteTransaction(transaction)) + if (!connection->ExecuteTransaction(transaction)) break; } } @@ -403,7 +396,7 @@ class DatabaseWorkerPool //! Clean up now. transaction->Cleanup(); - con->Unlock(); + connection->Unlock(); } //! Method used to execute prepared statements in a diverse context. @@ -456,63 +449,55 @@ class DatabaseWorkerPool void KeepAlive() { //! Ping synchronous connections - for (uint8 i = 0; i < _connectionCount[IDX_SYNCH]; ++i) + for (auto& connection : _connections[IDX_SYNCH]) { - T* t = _connections[IDX_SYNCH][i]; - if (t->LockIfReady()) + if (connection->LockIfReady()) { - t->Ping(); - t->Unlock(); + connection->Ping(); + connection->Unlock(); } } //! Assuming all worker threads are free, every worker thread will receive 1 ping operation request //! If one or more worker threads are busy, the ping operations will not be split evenly, but this doesn't matter //! as the sole purpose is to prevent connections from idling. - for (size_t i = 0; i < _connections[IDX_ASYNC].size(); ++i) + for (auto& connection : _connections[IDX_ASYNC]) Enqueue(new PingOperation); } private: uint32 OpenConnections(InternalIndex type, uint8 numConnections) { - _connections[type].resize(numConnections); for (uint8 i = 0; i < numConnections; ++i) { - T* t; - - if (type == IDX_ASYNC) - t = new T(_queue.get(), *_connectionInfo); - else if (type == IDX_SYNCH) - t = new T(*_connectionInfo); - else - ABORT(); - - _connections[type][i] = t; - ++_connectionCount[type]; - - uint32 error = t->Open(); - - if (!error) - { - if (mysql_get_server_version(t->GetHandle()) < MIN_MYSQL_SERVER_VERSION) + // Create the connection + auto connection = [&] { + switch (type) { - TC_LOG_ERROR("sql.driver", "TrinityCore does not support MySQL versions below 5.1"); - error = 1; + case IDX_ASYNC: + return Trinity::make_unique<T>(_queue.get(), *_connectionInfo); + case IDX_SYNCH: + return Trinity::make_unique<T>(*_connectionInfo); + default: + ABORT(); } - } + }(); - // Failed to open a connection or invalid version, abort and cleanup - if (error) + if (uint32 error = connection->Open()) { - while (_connectionCount[type] != 0) - { - t = _connections[type][i--]; - delete t; - --_connectionCount[type]; - } + // Failed to open a connection or invalid version, abort and cleanup + _connections[type].clear(); return error; } + else if (mysql_get_server_version(connection->GetHandle()) < MIN_MYSQL_SERVER_VERSION) + { + TC_LOG_ERROR("sql.driver", "TrinityCore does not support MySQL versions below 5.1"); + return 1; + } + else + { + _connections[type].push_back(std::move(connection)); + } } // Everything is fine @@ -524,7 +509,8 @@ class DatabaseWorkerPool if (!to || !from || !length) return 0; - return mysql_real_escape_string(_connections[IDX_SYNCH][0]->GetHandle(), to, from, length); + return mysql_real_escape_string( + _connections[IDX_SYNCH].front()->GetHandle(), to, from, length); } void Enqueue(SQLOperation* op) @@ -537,18 +523,18 @@ class DatabaseWorkerPool T* GetFreeConnection() { uint8 i = 0; - size_t num_cons = _connectionCount[IDX_SYNCH]; - T* t = NULL; + auto const num_cons = _connections[IDX_SYNCH].size(); + T* connection = nullptr; //! Block forever until a connection is free for (;;) { - t = _connections[IDX_SYNCH][++i % num_cons]; + connection = _connections[IDX_SYNCH][++i % num_cons].get(); //! Must be matched with t->Unlock() or you will get deadlocks - if (t->LockIfReady()) + if (connection->LockIfReady()) break; } - return t; + return connection; } char const* GetDatabaseName() const @@ -558,9 +544,7 @@ class DatabaseWorkerPool //! Queue shared by async worker threads. std::unique_ptr<ProducerConsumerQueue<SQLOperation*>> _queue; - std::vector<std::vector<T*>> _connections; - //! Counter of MySQL connections; - uint32 _connectionCount[IDX_SIZE]; + std::array<std::vector<std::unique_ptr<T>>, IDX_SIZE> _connections; std::unique_ptr<MySQLConnectionInfo> _connectionInfo; uint8 _async_threads, _synch_threads; }; |