diff options
author | Naios <naios-dev@live.de> | 2016-03-03 01:31:51 +0100 |
---|---|---|
committer | Naios <naios-dev@live.de> | 2016-03-03 01:51:49 +0100 |
commit | 31c8f9a7ed9acee9a02be690bfb70a820f9c53e8 (patch) | |
tree | 0876b8533f042bec336d360370e0e3a5edbed21d | |
parent | 6a4a65521dbfbf3b270f385554b89c70728d1214 (diff) |
Core/Database: Move DatabaseWorkerPool into it's own translation unit
(cherry picked from commit 09fa0ab46a487ea85d475ac0b3495724c170fe1a)
-rw-r--r-- | src/server/database/Database/DatabaseWorkerPool.cpp | 323 | ||||
-rw-r--r-- | src/server/database/Database/DatabaseWorkerPool.h | 277 |
2 files changed, 339 insertions, 261 deletions
diff --git a/src/server/database/Database/DatabaseWorkerPool.cpp b/src/server/database/Database/DatabaseWorkerPool.cpp new file mode 100644 index 00000000000..6c352bb11b8 --- /dev/null +++ b/src/server/database/Database/DatabaseWorkerPool.cpp @@ -0,0 +1,323 @@ +/* + * Copyright (C) 2008-2016 TrinityCore <http://www.trinitycore.org/> + * + * This program is free software; you can redistribute it and/or modify it + * under the terms of the GNU General Public License as published by the + * Free Software Foundation; either version 2 of the License, or (at your + * option) any later version. + * + * This program is distributed in the hope that it will be useful, but WITHOUT + * ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or + * FITNESS FOR A PARTICULAR PURPOSE. See the GNU General Public License for + * more details. + * + * You should have received a copy of the GNU General Public License along + * with this program. If not, see <http://www.gnu.org/licenses/>. + */ + +#include "DatabaseWorkerPool.h" +#include "DatabaseEnv.h" + +#define MIN_MYSQL_SERVER_VERSION 50100u +#define MIN_MYSQL_CLIENT_VERSION 50100u + +template <class T> +DatabaseWorkerPool<T>::DatabaseWorkerPool() + : _queue(new ProducerConsumerQueue<SQLOperation*>()), + _async_threads(0), _synch_threads(0) +{ + WPFatal(mysql_thread_safe(), "Used MySQL library isn't thread-safe."); + WPFatal(mysql_get_client_version() >= MIN_MYSQL_CLIENT_VERSION, "TrinityCore does not support MySQL versions below 5.1"); + WPFatal(mysql_get_client_version() == MYSQL_VERSION_ID, "Used MySQL library version (%s) does not match the version used to compile TrinityCore (%s).", + mysql_get_client_info(), MYSQL_SERVER_VERSION); +} + +template <class T> +void DatabaseWorkerPool<T>::SetConnectionInfo(std::string const& infoString, + uint8 const asyncThreads, uint8 const synchThreads) +{ + _connectionInfo = Trinity::make_unique<MySQLConnectionInfo>(infoString); + + _async_threads = asyncThreads; + _synch_threads = synchThreads; +} + +template <class T> +uint32 DatabaseWorkerPool<T>::Open() +{ + WPFatal(_connectionInfo.get(), "Connection info was not set!"); + + TC_LOG_INFO("sql.driver", "Opening DatabasePool '%s'. " + "Asynchronous connections: %u, synchronous connections: %u.", + GetDatabaseName(), _async_threads, _synch_threads); + + uint32 error = OpenConnections(IDX_ASYNC, _async_threads); + + if (error) + return error; + + error = OpenConnections(IDX_SYNCH, _synch_threads); + + if (!error) + { + TC_LOG_INFO("sql.driver", "DatabasePool '%s' opened successfully. " SZFMTD + " total connections running.", GetDatabaseName(), + (_connections[IDX_SYNCH].size() + _connections[IDX_ASYNC].size())); + } + + return error; +} + +template <class T> +void DatabaseWorkerPool<T>::Close() +{ + TC_LOG_INFO("sql.driver", "Closing down DatabasePool '%s'.", GetDatabaseName()); + + //! Closes the actualy MySQL connection. + _connections[IDX_ASYNC].clear(); + + TC_LOG_INFO("sql.driver", "Asynchronous connections on DatabasePool '%s' terminated. " + "Proceeding with synchronous connections.", + GetDatabaseName()); + + //! Shut down the synchronous connections + //! There's no need for locking the connection, because DatabaseWorkerPool<>::Close + //! should only be called after any other thread tasks in the core have exited, + //! meaning there can be no concurrent access at this point. + _connections[IDX_SYNCH].clear(); + + TC_LOG_INFO("sql.driver", "All connections on DatabasePool '%s' closed.", GetDatabaseName()); +} + +template <class T> +bool DatabaseWorkerPool<T>::PrepareStatements() +{ + for (auto& connections : _connections) + for (auto& connection : connections) + { + connection->LockIfReady(); + if (!connection->PrepareStatements()) + { + connection->Unlock(); + Close(); + return false; + } + else + connection->Unlock(); + } + + return true; +} + +template <class T> +QueryResult DatabaseWorkerPool<T>::Query(const char* sql, T* connection /*= nullptr*/) +{ + if (!connection) + connection = GetFreeConnection(); + + ResultSet* result = connection->Query(sql); + connection->Unlock(); + if (!result || !result->GetRowCount() || !result->NextRow()) + { + delete result; + return QueryResult(NULL); + } + + return QueryResult(result); +} + +template <class T> +PreparedQueryResult DatabaseWorkerPool<T>::Query(PreparedStatement* stmt) +{ + auto connection = GetFreeConnection(); + PreparedResultSet* ret = connection->Query(stmt); + connection->Unlock(); + + //! Delete proxy-class. Not needed anymore + delete stmt; + + if (!ret || !ret->GetRowCount()) + { + delete ret; + return PreparedQueryResult(NULL); + } + + return PreparedQueryResult(ret); +} + +template <class T> +QueryResultFuture DatabaseWorkerPool<T>::AsyncQuery(const char* sql) +{ + BasicStatementTask* task = new BasicStatementTask(sql, true); + // Store future result before enqueueing - task might get already processed and deleted before returning from this method + QueryResultFuture result = task->GetFuture(); + Enqueue(task); + return result; +} + +template <class T> +PreparedQueryResultFuture DatabaseWorkerPool<T>::AsyncQuery(PreparedStatement* stmt) +{ + PreparedStatementTask* task = new PreparedStatementTask(stmt, true); + // Store future result before enqueueing - task might get already processed and deleted before returning from this method + PreparedQueryResultFuture result = task->GetFuture(); + Enqueue(task); + return result; +} + +template <class T> +QueryResultHolderFuture DatabaseWorkerPool<T>::DelayQueryHolder(SQLQueryHolder* holder) +{ + SQLQueryHolderTask* task = new SQLQueryHolderTask(holder); + // Store future result before enqueueing - task might get already processed and deleted before returning from this method + QueryResultHolderFuture result = task->GetFuture(); + Enqueue(task); + return result; +} + +template <class T> +void DatabaseWorkerPool<T>::CommitTransaction(SQLTransaction transaction) +{ +#ifdef TRINITY_DEBUG + //! Only analyze transaction weaknesses in Debug mode. + //! Ideally we catch the faults in Debug mode and then correct them, + //! so there's no need to waste these CPU cycles in Release mode. + switch (transaction->GetSize()) + { + case 0: + TC_LOG_DEBUG("sql.driver", "Transaction contains 0 queries. Not executing."); + return; + case 1: + TC_LOG_DEBUG("sql.driver", "Warning: Transaction only holds 1 query, consider removing Transaction context in code."); + break; + default: + break; + } +#endif // TRINITY_DEBUG + + Enqueue(new TransactionTask(transaction)); +} + +template <class T> +void DatabaseWorkerPool<T>::DirectCommitTransaction(SQLTransaction& transaction) +{ + T* connection = GetFreeConnection(); + int errorCode = connection->ExecuteTransaction(transaction); + if (!errorCode) + { + connection->Unlock(); // OK, operation succesful + return; + } + + //! Handle MySQL Errno 1213 without extending deadlock to the core itself + /// @todo More elegant way + if (errorCode == ER_LOCK_DEADLOCK) + { + uint8 loopBreaker = 5; + for (uint8 i = 0; i < loopBreaker; ++i) + { + if (!connection->ExecuteTransaction(transaction)) + break; + } + } + + //! Clean up now. + transaction->Cleanup(); + + connection->Unlock(); +} + +template <class T> +void DatabaseWorkerPool<T>::EscapeString(std::string& str) +{ + if (str.empty()) + return; + + char* buf = new char[str.size() * 2 + 1]; + EscapeString(buf, str.c_str(), str.size()); + str = buf; + delete[] buf; +} + +template <class T> +void DatabaseWorkerPool<T>::KeepAlive() +{ + //! Ping synchronous connections + for (auto& connection : _connections[IDX_SYNCH]) + { + if (connection->LockIfReady()) + { + connection->Ping(); + connection->Unlock(); + } + } + + //! Assuming all worker threads are free, every worker thread will receive 1 ping operation request + //! If one or more worker threads are busy, the ping operations will not be split evenly, but this doesn't matter + //! as the sole purpose is to prevent connections from idling. + auto const count = _connections[IDX_ASYNC].size(); + for (uint8 i = 0; i < count; ++i) + Enqueue(new PingOperation); +} + +template <class T> +uint32 DatabaseWorkerPool<T>::OpenConnections(InternalIndex type, uint8 numConnections) +{ + for (uint8 i = 0; i < numConnections; ++i) + { + // Create the connection + auto connection = [&] { + switch (type) + { + case IDX_ASYNC: + return Trinity::make_unique<T>(_queue.get(), *_connectionInfo); + case IDX_SYNCH: + return Trinity::make_unique<T>(*_connectionInfo); + default: + ABORT(); + } + }(); + + if (uint32 error = connection->Open()) + { + // Failed to open a connection or invalid version, abort and cleanup + _connections[type].clear(); + return error; + } + else if (mysql_get_server_version(connection->GetHandle()) < MIN_MYSQL_SERVER_VERSION) + { + TC_LOG_ERROR("sql.driver", "TrinityCore does not support MySQL versions below 5.1"); + return 1; + } + else + { + _connections[type].push_back(std::move(connection)); + } + } + + // Everything is fine + return 0; +} + +template <class T> +T* DatabaseWorkerPool<T>::GetFreeConnection() +{ + uint8 i = 0; + auto const num_cons = _connections[IDX_SYNCH].size(); + T* connection = nullptr; + //! Block forever until a connection is free + for (;;) + { + connection = _connections[IDX_SYNCH][++i % num_cons].get(); + //! Must be matched with t->Unlock() or you will get deadlocks + if (connection->LockIfReady()) + break; + } + + return connection; +} + +template class DatabaseWorkerPool<LoginDatabaseConnection>; +template class DatabaseWorkerPool<WorldDatabaseConnection>; +template class DatabaseWorkerPool<CharacterDatabaseConnection>; +template class DatabaseWorkerPool<HotfixDatabaseConnection>; diff --git a/src/server/database/Database/DatabaseWorkerPool.h b/src/server/database/Database/DatabaseWorkerPool.h index c2b73437455..d883366237f 100644 --- a/src/server/database/Database/DatabaseWorkerPool.h +++ b/src/server/database/Database/DatabaseWorkerPool.h @@ -34,9 +34,6 @@ #include <memory> #include <array> -#define MIN_MYSQL_SERVER_VERSION 50100u -#define MIN_MYSQL_CLIENT_VERSION 50100u - class PingOperation : public SQLOperation { //! Operation for idle delaythreads @@ -60,89 +57,21 @@ class DatabaseWorkerPool public: /* Activity state */ - DatabaseWorkerPool() : _queue(new ProducerConsumerQueue<SQLOperation*>()), - _async_threads(0), _synch_threads(0) - { - WPFatal(mysql_thread_safe(), "Used MySQL library isn't thread-safe."); - WPFatal(mysql_get_client_version() >= MIN_MYSQL_CLIENT_VERSION, "TrinityCore does not support MySQL versions below 5.1"); - WPFatal(mysql_get_client_version() == MYSQL_VERSION_ID, "Used MySQL library version (%s) does not match the version used to compile TrinityCore (%s).", - mysql_get_client_info(), MYSQL_SERVER_VERSION); - } + DatabaseWorkerPool(); ~DatabaseWorkerPool() { _queue->Cancel(); } - void SetConnectionInfo(std::string const& infoString, uint8 const asyncThreads, uint8 const synchThreads) - { - _connectionInfo = Trinity::make_unique<MySQLConnectionInfo>(infoString); - - _async_threads = asyncThreads; - _synch_threads = synchThreads; - } - - uint32 Open() - { - WPFatal(_connectionInfo.get(), "Connection info was not set!"); - - TC_LOG_INFO("sql.driver", "Opening DatabasePool '%s'. Asynchronous connections: %u, synchronous connections: %u.", - GetDatabaseName(), _async_threads, _synch_threads); - - uint32 error = OpenConnections(IDX_ASYNC, _async_threads); - - if (error) - return error; - - error = OpenConnections(IDX_SYNCH, _synch_threads); + void SetConnectionInfo(std::string const& infoString, uint8 const asyncThreads, uint8 const synchThreads); - if (!error) - { - TC_LOG_INFO("sql.driver", "DatabasePool '%s' opened successfully. %u total connections running.", - GetDatabaseName(), (_connections[IDX_SYNCH].size() + _connections[IDX_ASYNC].size())); - } - - return error; - } - - void Close() - { - TC_LOG_INFO("sql.driver", "Closing down DatabasePool '%s'.", GetDatabaseName()); + uint32 Open(); - //! Closes the actualy MySQL connection. - _connections[IDX_ASYNC].clear(); - - TC_LOG_INFO("sql.driver", "Asynchronous connections on DatabasePool '%s' terminated. Proceeding with synchronous connections.", - GetDatabaseName()); - - //! Shut down the synchronous connections - //! There's no need for locking the connection, because DatabaseWorkerPool<>::Close - //! should only be called after any other thread tasks in the core have exited, - //! meaning there can be no concurrent access at this point. - _connections[IDX_SYNCH].clear(); - - TC_LOG_INFO("sql.driver", "All connections on DatabasePool '%s' closed.", GetDatabaseName()); - } + void Close(); //! Prepares all prepared statements - bool PrepareStatements() - { - for (auto& connections : _connections) - for (auto& connection : connections) - { - connection->LockIfReady(); - if (!connection->PrepareStatements()) - { - connection->Unlock(); - Close(); - return false; - } - else - connection->Unlock(); - } - - return true; - } + bool PrepareStatements(); inline MySQLConnectionInfo const* GetConnectionInfo() const { @@ -228,21 +157,7 @@ class DatabaseWorkerPool //! Directly executes an SQL query in string format that will block the calling thread until finished. //! Returns reference counted auto pointer, no need for manual memory management in upper level code. - QueryResult Query(const char* sql, T* connection = nullptr) - { - if (!connection) - connection = GetFreeConnection(); - - ResultSet* result = connection->Query(sql); - connection->Unlock(); - if (!result || !result->GetRowCount() || !result->NextRow()) - { - delete result; - return QueryResult(NULL); - } - - return QueryResult(result); - } + QueryResult Query(const char* sql, T* connection = nullptr); //! Directly executes an SQL query in string format -with variable args- that will block the calling thread until finished. //! Returns reference counted auto pointer, no need for manual memory management in upper level code. @@ -269,23 +184,7 @@ class DatabaseWorkerPool //! Directly executes an SQL query in prepared format that will block the calling thread until finished. //! Returns reference counted auto pointer, no need for manual memory management in upper level code. //! Statement must be prepared with CONNECTION_SYNCH flag. - PreparedQueryResult Query(PreparedStatement* stmt) - { - auto connection = GetFreeConnection(); - PreparedResultSet* ret = connection->Query(stmt); - connection->Unlock(); - - //! Delete proxy-class. Not needed anymore - delete stmt; - - if (!ret || !ret->GetRowCount()) - { - delete ret; - return PreparedQueryResult(NULL); - } - - return PreparedQueryResult(ret); - } + PreparedQueryResult Query(PreparedStatement* stmt); /** Asynchronous query (with resultset) methods. @@ -293,14 +192,7 @@ class DatabaseWorkerPool //! Enqueues a query in string format that will set the value of the QueryResultFuture return object as soon as the query is executed. //! The return value is then processed in ProcessQueryCallback methods. - QueryResultFuture AsyncQuery(const char* sql) - { - BasicStatementTask* task = new BasicStatementTask(sql, true); - // Store future result before enqueueing - task might get already processed and deleted before returning from this method - QueryResultFuture result = task->GetFuture(); - Enqueue(task); - return result; - } + QueryResultFuture AsyncQuery(const char* sql); //! Enqueues a query in string format -with variable args- that will set the value of the QueryResultFuture return object as soon as the query is executed. //! The return value is then processed in ProcessQueryCallback methods. @@ -313,27 +205,13 @@ class DatabaseWorkerPool //! Enqueues a query in prepared format that will set the value of the PreparedQueryResultFuture return object as soon as the query is executed. //! The return value is then processed in ProcessQueryCallback methods. //! Statement must be prepared with CONNECTION_ASYNC flag. - PreparedQueryResultFuture AsyncQuery(PreparedStatement* stmt) - { - PreparedStatementTask* task = new PreparedStatementTask(stmt, true); - // Store future result before enqueueing - task might get already processed and deleted before returning from this method - PreparedQueryResultFuture result = task->GetFuture(); - Enqueue(task); - return result; - } + PreparedQueryResultFuture AsyncQuery(PreparedStatement* stmt); //! Enqueues a vector of SQL operations (can be both adhoc and prepared) that will set the value of the QueryResultHolderFuture //! return object as soon as the query is executed. //! The return value is then processed in ProcessQueryCallback methods. //! Any prepared statements added to this holder need to be prepared with the CONNECTION_ASYNC flag. - QueryResultHolderFuture DelayQueryHolder(SQLQueryHolder* holder) - { - SQLQueryHolderTask* task = new SQLQueryHolderTask(holder); - // Store future result before enqueueing - task might get already processed and deleted before returning from this method - QueryResultHolderFuture result = task->GetFuture(); - Enqueue(task); - return result; - } + QueryResultHolderFuture DelayQueryHolder(SQLQueryHolder* holder); /** Transaction context methods. @@ -347,57 +225,11 @@ class DatabaseWorkerPool //! Enqueues a collection of one-way SQL operations (can be both adhoc and prepared). The order in which these operations //! were appended to the transaction will be respected during execution. - void CommitTransaction(SQLTransaction transaction) - { - #ifdef TRINITY_DEBUG - //! Only analyze transaction weaknesses in Debug mode. - //! Ideally we catch the faults in Debug mode and then correct them, - //! so there's no need to waste these CPU cycles in Release mode. - switch (transaction->GetSize()) - { - case 0: - TC_LOG_DEBUG("sql.driver", "Transaction contains 0 queries. Not executing."); - return; - case 1: - TC_LOG_DEBUG("sql.driver", "Warning: Transaction only holds 1 query, consider removing Transaction context in code."); - break; - default: - break; - } - #endif // TRINITY_DEBUG - - Enqueue(new TransactionTask(transaction)); - } + void CommitTransaction(SQLTransaction transaction); //! Directly executes a collection of one-way SQL operations (can be both adhoc and prepared). The order in which these operations //! were appended to the transaction will be respected during execution. - void DirectCommitTransaction(SQLTransaction& transaction) - { - T* connection = GetFreeConnection(); - int errorCode = connection->ExecuteTransaction(transaction); - if (!errorCode) - { - connection->Unlock(); // OK, operation succesful - return; - } - - //! Handle MySQL Errno 1213 without extending deadlock to the core itself - /// @todo More elegant way - if (errorCode == ER_LOCK_DEADLOCK) - { - uint8 loopBreaker = 5; - for (uint8 i = 0; i < loopBreaker; ++i) - { - if (!connection->ExecuteTransaction(transaction)) - break; - } - } - - //! Clean up now. - transaction->Cleanup(); - - connection->Unlock(); - } + void DirectCommitTransaction(SQLTransaction& transaction); //! Method used to execute prepared statements in a diverse context. //! Will be wrapped in a transaction if valid object is present, otherwise executed standalone. @@ -434,75 +266,13 @@ class DatabaseWorkerPool } //! Apply escape string'ing for current collation. (utf8) - void EscapeString(std::string& str) - { - if (str.empty()) - return; - - char* buf = new char[str.size() * 2 + 1]; - EscapeString(buf, str.c_str(), uint32(str.length())); - str = buf; - delete[] buf; - } + void EscapeString(std::string& str); //! Keeps all our MySQL connections alive, prevent the server from disconnecting us. - void KeepAlive() - { - //! Ping synchronous connections - for (auto& connection : _connections[IDX_SYNCH]) - { - if (connection->LockIfReady()) - { - connection->Ping(); - connection->Unlock(); - } - } - - //! Assuming all worker threads are free, every worker thread will receive 1 ping operation request - //! If one or more worker threads are busy, the ping operations will not be split evenly, but this doesn't matter - //! as the sole purpose is to prevent connections from idling. - for (auto& connection : _connections[IDX_ASYNC]) - Enqueue(new PingOperation); - } + void KeepAlive(); private: - uint32 OpenConnections(InternalIndex type, uint8 numConnections) - { - for (uint8 i = 0; i < numConnections; ++i) - { - // Create the connection - auto connection = [&] { - switch (type) - { - case IDX_ASYNC: - return Trinity::make_unique<T>(_queue.get(), *_connectionInfo); - case IDX_SYNCH: - return Trinity::make_unique<T>(*_connectionInfo); - default: - ABORT(); - } - }(); - - if (uint32 error = connection->Open()) - { - // Failed to open a connection or invalid version, abort and cleanup - _connections[type].clear(); - return error; - } - else if (mysql_get_server_version(connection->GetHandle()) < MIN_MYSQL_SERVER_VERSION) - { - TC_LOG_ERROR("sql.driver", "TrinityCore does not support MySQL versions below 5.1"); - return 1; - } - else - { - _connections[type].push_back(std::move(connection)); - } - } - - // Everything is fine - return 0; - } + uint32 OpenConnections(InternalIndex type, uint8 numConnections); unsigned long EscapeString(char *to, const char *from, unsigned long length) { @@ -520,22 +290,7 @@ class DatabaseWorkerPool //! Gets a free connection in the synchronous connection pool. //! Caller MUST call t->Unlock() after touching the MySQL context to prevent deadlocks. - T* GetFreeConnection() - { - uint8 i = 0; - auto const num_cons = _connections[IDX_SYNCH].size(); - T* connection = nullptr; - //! Block forever until a connection is free - for (;;) - { - connection = _connections[IDX_SYNCH][++i % num_cons].get(); - //! Must be matched with t->Unlock() or you will get deadlocks - if (connection->LockIfReady()) - break; - } - - return connection; - } + T* GetFreeConnection(); char const* GetDatabaseName() const { |