diff options
Diffstat (limited to 'src/server/database')
| -rw-r--r-- | src/server/database/Database/DatabaseLoader.cpp | 67 | ||||
| -rw-r--r-- | src/server/database/Database/DatabaseLoader.h | 13 | ||||
| -rw-r--r-- | src/server/database/Database/DatabaseWorkerPool.cpp | 323 | ||||
| -rw-r--r-- | src/server/database/Database/DatabaseWorkerPool.h | 313 | ||||
| -rw-r--r-- | src/server/database/Database/Implementation/CharacterDatabase.cpp | 2 | ||||
| -rw-r--r-- | src/server/database/Database/MySQLConnection.cpp | 87 | ||||
| -rw-r--r-- | src/server/database/Database/MySQLConnection.h | 8 | ||||
| -rw-r--r-- | src/server/database/Updater/DBUpdater.cpp | 33 | ||||
| -rw-r--r-- | src/server/database/Updater/DBUpdater.h | 4 | ||||
| -rw-r--r-- | src/server/database/Updater/UpdateFetcher.cpp | 29 | ||||
| -rw-r--r-- | src/server/database/Updater/UpdateFetcher.h | 8 |
11 files changed, 479 insertions, 408 deletions
diff --git a/src/server/database/Database/DatabaseLoader.cpp b/src/server/database/Database/DatabaseLoader.cpp index 8a4c704697c..2cbf31dbef8 100644 --- a/src/server/database/Database/DatabaseLoader.cpp +++ b/src/server/database/Database/DatabaseLoader.cpp @@ -32,7 +32,7 @@ DatabaseLoader& DatabaseLoader::AddDatabase(DatabaseWorkerPool<T>& pool, std::st { bool const updatesEnabledForThis = DBUpdater<T>::IsEnabled(_updateFlags); - _open.push(std::make_pair([this, name, updatesEnabledForThis, &pool]() -> bool + _open.push([this, name, updatesEnabledForThis, &pool]() -> bool { std::string const dbString = sConfigMgr->GetStringDefault(name + "DatabaseInfo", ""); if (dbString.empty()) @@ -71,12 +71,13 @@ DatabaseLoader& DatabaseLoader::AddDatabase(DatabaseWorkerPool<T>& pool, std::st return false; } } + // Add the close operation + _close.push([&pool] + { + pool.Close(); + }); return true; - }, - [&pool]() - { - pool.Close(); - })); + }); // Populate and update only if updates are enabled for this pool if (updatesEnabledForThis) @@ -137,38 +138,7 @@ bool DatabaseLoader::Load() bool DatabaseLoader::OpenDatabases() { - while (!_open.empty()) - { - std::pair<Predicate, std::function<void()>> const load = _open.top(); - if (load.first()) - _close.push(load.second); - else - { - // Close all loaded databases - while (!_close.empty()) - { - _close.top()(); - _close.pop(); - } - return false; - } - - _open.pop(); - } - return true; -} - -// Processes the elements of the given stack until a predicate returned false. -bool DatabaseLoader::Process(std::stack<Predicate>& stack) -{ - while (!stack.empty()) - { - if (!stack.top()()) - return false; - - stack.pop(); - } - return true; + return Process(_open); } bool DatabaseLoader::PopulateDatabases() @@ -186,6 +156,27 @@ bool DatabaseLoader::PrepareStatements() return Process(_prepare); } +bool DatabaseLoader::Process(std::queue<Predicate>& queue) +{ + while (!queue.empty()) + { + if (!queue.front()()) + { + // Close all open databases which have a registered close operation + while (!_close.empty()) + { + _close.top()(); + _close.pop(); + } + + return false; + } + + queue.pop(); + } + return true; +} + template DatabaseLoader& DatabaseLoader::AddDatabase<LoginDatabaseConnection>(DatabaseWorkerPool<LoginDatabaseConnection>& pool, std::string const& name); template diff --git a/src/server/database/Database/DatabaseLoader.h b/src/server/database/Database/DatabaseLoader.h index be3bfd6fdb7..1b31d3691c4 100644 --- a/src/server/database/Database/DatabaseLoader.h +++ b/src/server/database/Database/DatabaseLoader.h @@ -21,8 +21,9 @@ #include "DatabaseWorkerPool.h" #include "DatabaseEnv.h" -#include <stack> #include <functional> +#include <stack> +#include <queue> // A helper class to initiate all database worker pools, // handles updating, delays preparing of statements and cleans up on failure. @@ -57,16 +58,18 @@ private: bool PrepareStatements(); using Predicate = std::function<bool()>; + using Closer = std::function<void()>; - static bool Process(std::stack<Predicate>& stack); + // Invokes all functions in the given queue and closes the databases on errors. + // Returns false when there was an error. + bool Process(std::queue<Predicate>& queue); std::string const _logger; bool const _autoSetup; uint32 const _updateFlags; - std::stack<std::pair<Predicate, std::function<void()>>> _open; - std::stack<std::function<void()>> _close; - std::stack<Predicate> _populate, _update, _prepare; + std::queue<Predicate> _open, _populate, _update, _prepare; + std::stack<Closer> _close; }; #endif // DatabaseLoader_h__ diff --git a/src/server/database/Database/DatabaseWorkerPool.cpp b/src/server/database/Database/DatabaseWorkerPool.cpp new file mode 100644 index 00000000000..6c352bb11b8 --- /dev/null +++ b/src/server/database/Database/DatabaseWorkerPool.cpp @@ -0,0 +1,323 @@ +/* + * Copyright (C) 2008-2016 TrinityCore <http://www.trinitycore.org/> + * + * This program is free software; you can redistribute it and/or modify it + * under the terms of the GNU General Public License as published by the + * Free Software Foundation; either version 2 of the License, or (at your + * option) any later version. + * + * This program is distributed in the hope that it will be useful, but WITHOUT + * ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or + * FITNESS FOR A PARTICULAR PURPOSE. See the GNU General Public License for + * more details. + * + * You should have received a copy of the GNU General Public License along + * with this program. If not, see <http://www.gnu.org/licenses/>. + */ + +#include "DatabaseWorkerPool.h" +#include "DatabaseEnv.h" + +#define MIN_MYSQL_SERVER_VERSION 50100u +#define MIN_MYSQL_CLIENT_VERSION 50100u + +template <class T> +DatabaseWorkerPool<T>::DatabaseWorkerPool() + : _queue(new ProducerConsumerQueue<SQLOperation*>()), + _async_threads(0), _synch_threads(0) +{ + WPFatal(mysql_thread_safe(), "Used MySQL library isn't thread-safe."); + WPFatal(mysql_get_client_version() >= MIN_MYSQL_CLIENT_VERSION, "TrinityCore does not support MySQL versions below 5.1"); + WPFatal(mysql_get_client_version() == MYSQL_VERSION_ID, "Used MySQL library version (%s) does not match the version used to compile TrinityCore (%s).", + mysql_get_client_info(), MYSQL_SERVER_VERSION); +} + +template <class T> +void DatabaseWorkerPool<T>::SetConnectionInfo(std::string const& infoString, + uint8 const asyncThreads, uint8 const synchThreads) +{ + _connectionInfo = Trinity::make_unique<MySQLConnectionInfo>(infoString); + + _async_threads = asyncThreads; + _synch_threads = synchThreads; +} + +template <class T> +uint32 DatabaseWorkerPool<T>::Open() +{ + WPFatal(_connectionInfo.get(), "Connection info was not set!"); + + TC_LOG_INFO("sql.driver", "Opening DatabasePool '%s'. " + "Asynchronous connections: %u, synchronous connections: %u.", + GetDatabaseName(), _async_threads, _synch_threads); + + uint32 error = OpenConnections(IDX_ASYNC, _async_threads); + + if (error) + return error; + + error = OpenConnections(IDX_SYNCH, _synch_threads); + + if (!error) + { + TC_LOG_INFO("sql.driver", "DatabasePool '%s' opened successfully. " SZFMTD + " total connections running.", GetDatabaseName(), + (_connections[IDX_SYNCH].size() + _connections[IDX_ASYNC].size())); + } + + return error; +} + +template <class T> +void DatabaseWorkerPool<T>::Close() +{ + TC_LOG_INFO("sql.driver", "Closing down DatabasePool '%s'.", GetDatabaseName()); + + //! Closes the actualy MySQL connection. + _connections[IDX_ASYNC].clear(); + + TC_LOG_INFO("sql.driver", "Asynchronous connections on DatabasePool '%s' terminated. " + "Proceeding with synchronous connections.", + GetDatabaseName()); + + //! Shut down the synchronous connections + //! There's no need for locking the connection, because DatabaseWorkerPool<>::Close + //! should only be called after any other thread tasks in the core have exited, + //! meaning there can be no concurrent access at this point. + _connections[IDX_SYNCH].clear(); + + TC_LOG_INFO("sql.driver", "All connections on DatabasePool '%s' closed.", GetDatabaseName()); +} + +template <class T> +bool DatabaseWorkerPool<T>::PrepareStatements() +{ + for (auto& connections : _connections) + for (auto& connection : connections) + { + connection->LockIfReady(); + if (!connection->PrepareStatements()) + { + connection->Unlock(); + Close(); + return false; + } + else + connection->Unlock(); + } + + return true; +} + +template <class T> +QueryResult DatabaseWorkerPool<T>::Query(const char* sql, T* connection /*= nullptr*/) +{ + if (!connection) + connection = GetFreeConnection(); + + ResultSet* result = connection->Query(sql); + connection->Unlock(); + if (!result || !result->GetRowCount() || !result->NextRow()) + { + delete result; + return QueryResult(NULL); + } + + return QueryResult(result); +} + +template <class T> +PreparedQueryResult DatabaseWorkerPool<T>::Query(PreparedStatement* stmt) +{ + auto connection = GetFreeConnection(); + PreparedResultSet* ret = connection->Query(stmt); + connection->Unlock(); + + //! Delete proxy-class. Not needed anymore + delete stmt; + + if (!ret || !ret->GetRowCount()) + { + delete ret; + return PreparedQueryResult(NULL); + } + + return PreparedQueryResult(ret); +} + +template <class T> +QueryResultFuture DatabaseWorkerPool<T>::AsyncQuery(const char* sql) +{ + BasicStatementTask* task = new BasicStatementTask(sql, true); + // Store future result before enqueueing - task might get already processed and deleted before returning from this method + QueryResultFuture result = task->GetFuture(); + Enqueue(task); + return result; +} + +template <class T> +PreparedQueryResultFuture DatabaseWorkerPool<T>::AsyncQuery(PreparedStatement* stmt) +{ + PreparedStatementTask* task = new PreparedStatementTask(stmt, true); + // Store future result before enqueueing - task might get already processed and deleted before returning from this method + PreparedQueryResultFuture result = task->GetFuture(); + Enqueue(task); + return result; +} + +template <class T> +QueryResultHolderFuture DatabaseWorkerPool<T>::DelayQueryHolder(SQLQueryHolder* holder) +{ + SQLQueryHolderTask* task = new SQLQueryHolderTask(holder); + // Store future result before enqueueing - task might get already processed and deleted before returning from this method + QueryResultHolderFuture result = task->GetFuture(); + Enqueue(task); + return result; +} + +template <class T> +void DatabaseWorkerPool<T>::CommitTransaction(SQLTransaction transaction) +{ +#ifdef TRINITY_DEBUG + //! Only analyze transaction weaknesses in Debug mode. + //! Ideally we catch the faults in Debug mode and then correct them, + //! so there's no need to waste these CPU cycles in Release mode. + switch (transaction->GetSize()) + { + case 0: + TC_LOG_DEBUG("sql.driver", "Transaction contains 0 queries. Not executing."); + return; + case 1: + TC_LOG_DEBUG("sql.driver", "Warning: Transaction only holds 1 query, consider removing Transaction context in code."); + break; + default: + break; + } +#endif // TRINITY_DEBUG + + Enqueue(new TransactionTask(transaction)); +} + +template <class T> +void DatabaseWorkerPool<T>::DirectCommitTransaction(SQLTransaction& transaction) +{ + T* connection = GetFreeConnection(); + int errorCode = connection->ExecuteTransaction(transaction); + if (!errorCode) + { + connection->Unlock(); // OK, operation succesful + return; + } + + //! Handle MySQL Errno 1213 without extending deadlock to the core itself + /// @todo More elegant way + if (errorCode == ER_LOCK_DEADLOCK) + { + uint8 loopBreaker = 5; + for (uint8 i = 0; i < loopBreaker; ++i) + { + if (!connection->ExecuteTransaction(transaction)) + break; + } + } + + //! Clean up now. + transaction->Cleanup(); + + connection->Unlock(); +} + +template <class T> +void DatabaseWorkerPool<T>::EscapeString(std::string& str) +{ + if (str.empty()) + return; + + char* buf = new char[str.size() * 2 + 1]; + EscapeString(buf, str.c_str(), str.size()); + str = buf; + delete[] buf; +} + +template <class T> +void DatabaseWorkerPool<T>::KeepAlive() +{ + //! Ping synchronous connections + for (auto& connection : _connections[IDX_SYNCH]) + { + if (connection->LockIfReady()) + { + connection->Ping(); + connection->Unlock(); + } + } + + //! Assuming all worker threads are free, every worker thread will receive 1 ping operation request + //! If one or more worker threads are busy, the ping operations will not be split evenly, but this doesn't matter + //! as the sole purpose is to prevent connections from idling. + auto const count = _connections[IDX_ASYNC].size(); + for (uint8 i = 0; i < count; ++i) + Enqueue(new PingOperation); +} + +template <class T> +uint32 DatabaseWorkerPool<T>::OpenConnections(InternalIndex type, uint8 numConnections) +{ + for (uint8 i = 0; i < numConnections; ++i) + { + // Create the connection + auto connection = [&] { + switch (type) + { + case IDX_ASYNC: + return Trinity::make_unique<T>(_queue.get(), *_connectionInfo); + case IDX_SYNCH: + return Trinity::make_unique<T>(*_connectionInfo); + default: + ABORT(); + } + }(); + + if (uint32 error = connection->Open()) + { + // Failed to open a connection or invalid version, abort and cleanup + _connections[type].clear(); + return error; + } + else if (mysql_get_server_version(connection->GetHandle()) < MIN_MYSQL_SERVER_VERSION) + { + TC_LOG_ERROR("sql.driver", "TrinityCore does not support MySQL versions below 5.1"); + return 1; + } + else + { + _connections[type].push_back(std::move(connection)); + } + } + + // Everything is fine + return 0; +} + +template <class T> +T* DatabaseWorkerPool<T>::GetFreeConnection() +{ + uint8 i = 0; + auto const num_cons = _connections[IDX_SYNCH].size(); + T* connection = nullptr; + //! Block forever until a connection is free + for (;;) + { + connection = _connections[IDX_SYNCH][++i % num_cons].get(); + //! Must be matched with t->Unlock() or you will get deadlocks + if (connection->LockIfReady()) + break; + } + + return connection; +} + +template class DatabaseWorkerPool<LoginDatabaseConnection>; +template class DatabaseWorkerPool<WorldDatabaseConnection>; +template class DatabaseWorkerPool<CharacterDatabaseConnection>; +template class DatabaseWorkerPool<HotfixDatabaseConnection>; diff --git a/src/server/database/Database/DatabaseWorkerPool.h b/src/server/database/Database/DatabaseWorkerPool.h index f2d0d101d56..d883366237f 100644 --- a/src/server/database/Database/DatabaseWorkerPool.h +++ b/src/server/database/Database/DatabaseWorkerPool.h @@ -32,9 +32,7 @@ #include <mysqld_error.h> #include <memory> - -#define MIN_MYSQL_SERVER_VERSION 50100u -#define MIN_MYSQL_CLIENT_VERSION 50100u +#include <array> class PingOperation : public SQLOperation { @@ -59,97 +57,21 @@ class DatabaseWorkerPool public: /* Activity state */ - DatabaseWorkerPool() : _queue(new ProducerConsumerQueue<SQLOperation*>()), - _async_threads(0), _synch_threads(0) - { - memset(_connectionCount, 0, sizeof(_connectionCount)); - _connections.resize(IDX_SIZE); - - WPFatal(mysql_thread_safe(), "Used MySQL library isn't thread-safe."); - WPFatal(mysql_get_client_version() >= MIN_MYSQL_CLIENT_VERSION, "TrinityCore does not support MySQL versions below 5.1"); - WPFatal(mysql_get_client_version() == MYSQL_VERSION_ID, "Used MySQL library version (%s) does not match the version used to compile TrinityCore (%s).", - mysql_get_client_info(), MYSQL_SERVER_VERSION); - } + DatabaseWorkerPool(); ~DatabaseWorkerPool() { _queue->Cancel(); } - void SetConnectionInfo(std::string const& infoString, uint8 const asyncThreads, uint8 const synchThreads) - { - _connectionInfo.reset(new MySQLConnectionInfo(infoString)); - - _async_threads = asyncThreads; - _synch_threads = synchThreads; - } - - uint32 Open() - { - WPFatal(_connectionInfo.get(), "Connection info was not set!"); - - TC_LOG_INFO("sql.driver", "Opening DatabasePool '%s'. Asynchronous connections: %u, synchronous connections: %u.", - GetDatabaseName(), _async_threads, _synch_threads); - - uint32 error = OpenConnections(IDX_ASYNC, _async_threads); - - if (error) - return error; - - error = OpenConnections(IDX_SYNCH, _synch_threads); - - if (!error) - { - TC_LOG_INFO("sql.driver", "DatabasePool '%s' opened successfully. %u total connections running.", GetDatabaseName(), - (_connectionCount[IDX_SYNCH] + _connectionCount[IDX_ASYNC])); - } - - return error; - } - - void Close() - { - TC_LOG_INFO("sql.driver", "Closing down DatabasePool '%s'.", GetDatabaseName()); + void SetConnectionInfo(std::string const& infoString, uint8 const asyncThreads, uint8 const synchThreads); - for (uint8 i = 0; i < _connectionCount[IDX_ASYNC]; ++i) - { - T* t = _connections[IDX_ASYNC][i]; - t->Close(); //! Closes the actualy MySQL connection. - } + uint32 Open(); - TC_LOG_INFO("sql.driver", "Asynchronous connections on DatabasePool '%s' terminated. Proceeding with synchronous connections.", - GetDatabaseName()); - - //! Shut down the synchronous connections - //! There's no need for locking the connection, because DatabaseWorkerPool<>::Close - //! should only be called after any other thread tasks in the core have exited, - //! meaning there can be no concurrent access at this point. - for (uint8 i = 0; i < _connectionCount[IDX_SYNCH]; ++i) - _connections[IDX_SYNCH][i]->Close(); - - TC_LOG_INFO("sql.driver", "All connections on DatabasePool '%s' closed.", GetDatabaseName()); - } + void Close(); //! Prepares all prepared statements - bool PrepareStatements() - { - for (uint8 i = 0; i < IDX_SIZE; ++i) - for (uint32 c = 0; c < _connectionCount[i]; ++c) - { - T* t = _connections[i][c]; - t->LockIfReady(); - if (!t->PrepareStatements()) - { - t->Unlock(); - Close(); - return false; - } - else - t->Unlock(); - } - - return true; - } + bool PrepareStatements(); inline MySQLConnectionInfo const* GetConnectionInfo() const { @@ -201,9 +123,9 @@ class DatabaseWorkerPool if (!sql) return; - T* t = GetFreeConnection(); - t->Execute(sql); - t->Unlock(); + T* connection = GetFreeConnection(); + connection->Execute(sql); + connection->Unlock(); } //! Directly executes a one-way SQL operation in string format -with variable args-, that will block the calling thread until finished. @@ -221,9 +143,9 @@ class DatabaseWorkerPool //! Statement must be prepared with the CONNECTION_SYNCH flag. void DirectExecute(PreparedStatement* stmt) { - T* t = GetFreeConnection(); - t->Execute(stmt); - t->Unlock(); + T* connection = GetFreeConnection(); + connection->Execute(stmt); + connection->Unlock(); //! Delete proxy-class. Not needed anymore delete stmt; @@ -235,21 +157,7 @@ class DatabaseWorkerPool //! Directly executes an SQL query in string format that will block the calling thread until finished. //! Returns reference counted auto pointer, no need for manual memory management in upper level code. - QueryResult Query(const char* sql, T* conn = nullptr) - { - if (!conn) - conn = GetFreeConnection(); - - ResultSet* result = conn->Query(sql); - conn->Unlock(); - if (!result || !result->GetRowCount() || !result->NextRow()) - { - delete result; - return QueryResult(NULL); - } - - return QueryResult(result); - } + QueryResult Query(const char* sql, T* connection = nullptr); //! Directly executes an SQL query in string format -with variable args- that will block the calling thread until finished. //! Returns reference counted auto pointer, no need for manual memory management in upper level code. @@ -276,23 +184,7 @@ class DatabaseWorkerPool //! Directly executes an SQL query in prepared format that will block the calling thread until finished. //! Returns reference counted auto pointer, no need for manual memory management in upper level code. //! Statement must be prepared with CONNECTION_SYNCH flag. - PreparedQueryResult Query(PreparedStatement* stmt) - { - T* t = GetFreeConnection(); - PreparedResultSet* ret = t->Query(stmt); - t->Unlock(); - - //! Delete proxy-class. Not needed anymore - delete stmt; - - if (!ret || !ret->GetRowCount()) - { - delete ret; - return PreparedQueryResult(NULL); - } - - return PreparedQueryResult(ret); - } + PreparedQueryResult Query(PreparedStatement* stmt); /** Asynchronous query (with resultset) methods. @@ -300,14 +192,7 @@ class DatabaseWorkerPool //! Enqueues a query in string format that will set the value of the QueryResultFuture return object as soon as the query is executed. //! The return value is then processed in ProcessQueryCallback methods. - QueryResultFuture AsyncQuery(const char* sql) - { - BasicStatementTask* task = new BasicStatementTask(sql, true); - // Store future result before enqueueing - task might get already processed and deleted before returning from this method - QueryResultFuture result = task->GetFuture(); - Enqueue(task); - return result; - } + QueryResultFuture AsyncQuery(const char* sql); //! Enqueues a query in string format -with variable args- that will set the value of the QueryResultFuture return object as soon as the query is executed. //! The return value is then processed in ProcessQueryCallback methods. @@ -320,27 +205,13 @@ class DatabaseWorkerPool //! Enqueues a query in prepared format that will set the value of the PreparedQueryResultFuture return object as soon as the query is executed. //! The return value is then processed in ProcessQueryCallback methods. //! Statement must be prepared with CONNECTION_ASYNC flag. - PreparedQueryResultFuture AsyncQuery(PreparedStatement* stmt) - { - PreparedStatementTask* task = new PreparedStatementTask(stmt, true); - // Store future result before enqueueing - task might get already processed and deleted before returning from this method - PreparedQueryResultFuture result = task->GetFuture(); - Enqueue(task); - return result; - } + PreparedQueryResultFuture AsyncQuery(PreparedStatement* stmt); //! Enqueues a vector of SQL operations (can be both adhoc and prepared) that will set the value of the QueryResultHolderFuture //! return object as soon as the query is executed. //! The return value is then processed in ProcessQueryCallback methods. //! Any prepared statements added to this holder need to be prepared with the CONNECTION_ASYNC flag. - QueryResultHolderFuture DelayQueryHolder(SQLQueryHolder* holder) - { - SQLQueryHolderTask* task = new SQLQueryHolderTask(holder); - // Store future result before enqueueing - task might get already processed and deleted before returning from this method - QueryResultHolderFuture result = task->GetFuture(); - Enqueue(task); - return result; - } + QueryResultHolderFuture DelayQueryHolder(SQLQueryHolder* holder); /** Transaction context methods. @@ -354,57 +225,11 @@ class DatabaseWorkerPool //! Enqueues a collection of one-way SQL operations (can be both adhoc and prepared). The order in which these operations //! were appended to the transaction will be respected during execution. - void CommitTransaction(SQLTransaction transaction) - { - #ifdef TRINITY_DEBUG - //! Only analyze transaction weaknesses in Debug mode. - //! Ideally we catch the faults in Debug mode and then correct them, - //! so there's no need to waste these CPU cycles in Release mode. - switch (transaction->GetSize()) - { - case 0: - TC_LOG_DEBUG("sql.driver", "Transaction contains 0 queries. Not executing."); - return; - case 1: - TC_LOG_DEBUG("sql.driver", "Warning: Transaction only holds 1 query, consider removing Transaction context in code."); - break; - default: - break; - } - #endif // TRINITY_DEBUG - - Enqueue(new TransactionTask(transaction)); - } + void CommitTransaction(SQLTransaction transaction); //! Directly executes a collection of one-way SQL operations (can be both adhoc and prepared). The order in which these operations //! were appended to the transaction will be respected during execution. - void DirectCommitTransaction(SQLTransaction& transaction) - { - T* con = GetFreeConnection(); - int errorCode = con->ExecuteTransaction(transaction); - if (!errorCode) - { - con->Unlock(); // OK, operation succesful - return; - } - - //! Handle MySQL Errno 1213 without extending deadlock to the core itself - /// @todo More elegant way - if (errorCode == ER_LOCK_DEADLOCK) - { - uint8 loopBreaker = 5; - for (uint8 i = 0; i < loopBreaker; ++i) - { - if (!con->ExecuteTransaction(transaction)) - break; - } - } - - //! Clean up now. - transaction->Cleanup(); - - con->Unlock(); - } + void DirectCommitTransaction(SQLTransaction& transaction); //! Method used to execute prepared statements in a diverse context. //! Will be wrapped in a transaction if valid object is present, otherwise executed standalone. @@ -441,90 +266,21 @@ class DatabaseWorkerPool } //! Apply escape string'ing for current collation. (utf8) - void EscapeString(std::string& str) - { - if (str.empty()) - return; - - char* buf = new char[str.size() * 2 + 1]; - EscapeString(buf, str.c_str(), uint32(str.length())); - str = buf; - delete[] buf; - } + void EscapeString(std::string& str); //! Keeps all our MySQL connections alive, prevent the server from disconnecting us. - void KeepAlive() - { - //! Ping synchronous connections - for (uint8 i = 0; i < _connectionCount[IDX_SYNCH]; ++i) - { - T* t = _connections[IDX_SYNCH][i]; - if (t->LockIfReady()) - { - t->Ping(); - t->Unlock(); - } - } - - //! Assuming all worker threads are free, every worker thread will receive 1 ping operation request - //! If one or more worker threads are busy, the ping operations will not be split evenly, but this doesn't matter - //! as the sole purpose is to prevent connections from idling. - for (size_t i = 0; i < _connections[IDX_ASYNC].size(); ++i) - Enqueue(new PingOperation); - } + void KeepAlive(); private: - uint32 OpenConnections(InternalIndex type, uint8 numConnections) - { - _connections[type].resize(numConnections); - for (uint8 i = 0; i < numConnections; ++i) - { - T* t; - - if (type == IDX_ASYNC) - t = new T(_queue.get(), *_connectionInfo); - else if (type == IDX_SYNCH) - t = new T(*_connectionInfo); - else - ABORT(); - - _connections[type][i] = t; - ++_connectionCount[type]; - - uint32 error = t->Open(); - - if (!error) - { - if (mysql_get_server_version(t->GetHandle()) < MIN_MYSQL_SERVER_VERSION) - { - TC_LOG_ERROR("sql.driver", "TrinityCore does not support MySQL versions below 5.1"); - error = 1; - } - } - - // Failed to open a connection or invalid version, abort and cleanup - if (error) - { - while (_connectionCount[type] != 0) - { - t = _connections[type][i--]; - delete t; - --_connectionCount[type]; - } - return error; - } - } - - // Everything is fine - return 0; - } + uint32 OpenConnections(InternalIndex type, uint8 numConnections); unsigned long EscapeString(char *to, const char *from, unsigned long length) { if (!to || !from || !length) return 0; - return mysql_real_escape_string(_connections[IDX_SYNCH][0]->GetHandle(), to, from, length); + return mysql_real_escape_string( + _connections[IDX_SYNCH].front()->GetHandle(), to, from, length); } void Enqueue(SQLOperation* op) @@ -534,22 +290,7 @@ class DatabaseWorkerPool //! Gets a free connection in the synchronous connection pool. //! Caller MUST call t->Unlock() after touching the MySQL context to prevent deadlocks. - T* GetFreeConnection() - { - uint8 i = 0; - size_t num_cons = _connectionCount[IDX_SYNCH]; - T* t = NULL; - //! Block forever until a connection is free - for (;;) - { - t = _connections[IDX_SYNCH][++i % num_cons]; - //! Must be matched with t->Unlock() or you will get deadlocks - if (t->LockIfReady()) - break; - } - - return t; - } + T* GetFreeConnection(); char const* GetDatabaseName() const { @@ -558,9 +299,7 @@ class DatabaseWorkerPool //! Queue shared by async worker threads. std::unique_ptr<ProducerConsumerQueue<SQLOperation*>> _queue; - std::vector<std::vector<T*>> _connections; - //! Counter of MySQL connections; - uint32 _connectionCount[IDX_SIZE]; + std::array<std::vector<std::unique_ptr<T>>, IDX_SIZE> _connections; std::unique_ptr<MySQLConnectionInfo> _connectionInfo; uint8 _async_threads, _synch_threads; }; diff --git a/src/server/database/Database/Implementation/CharacterDatabase.cpp b/src/server/database/Database/Implementation/CharacterDatabase.cpp index 2b802a634e0..babf39ec3e5 100644 --- a/src/server/database/Database/Implementation/CharacterDatabase.cpp +++ b/src/server/database/Database/Implementation/CharacterDatabase.cpp @@ -619,7 +619,7 @@ void CharacterDatabaseConnection::DoPrepareStatements() // Pet PrepareStatement(CHAR_SEL_PET_SLOTS, "SELECT owner, slot FROM character_pet WHERE owner = ? AND slot >= ? AND slot <= ? ORDER BY slot", CONNECTION_ASYNC); - PrepareStatement(CHAR_SEL_PET_SLOTS_DETAIL, "SELECT owner, id, entry, level, name FROM character_pet WHERE owner = ? AND slot >= ? AND slot <= ? ORDER BY slot", CONNECTION_ASYNC); + PrepareStatement(CHAR_SEL_PET_SLOTS_DETAIL, "SELECT owner, id, entry, level, name, modelid FROM character_pet WHERE owner = ? AND slot >= ? AND slot <= ? ORDER BY slot", CONNECTION_ASYNC); PrepareStatement(CHAR_SEL_PET_ENTRY, "SELECT entry FROM character_pet WHERE owner = ? AND id = ? AND slot >= ? AND slot <= ?", CONNECTION_ASYNC); PrepareStatement(CHAR_SEL_PET_SLOT_BY_ID, "SELECT slot, entry FROM character_pet WHERE owner = ? AND id = ?", CONNECTION_ASYNC); PrepareStatement(CHAR_SEL_PET_SPELL_LIST, "SELECT DISTINCT pet_spell.spell FROM pet_spell, character_pet WHERE character_pet.owner = ? AND character_pet.id = pet_spell.guid AND character_pet.id <> ?", CONNECTION_SYNCH); diff --git a/src/server/database/Database/MySQLConnection.cpp b/src/server/database/Database/MySQLConnection.cpp index a7afaac41c5..fed368f2aac 100644 --- a/src/server/database/Database/MySQLConnection.cpp +++ b/src/server/database/Database/MySQLConnection.cpp @@ -39,7 +39,6 @@ MySQLConnection::MySQLConnection(MySQLConnectionInfo& connInfo) : m_reconnecting(false), m_prepareError(false), m_queue(NULL), -m_worker(NULL), m_Mysql(NULL), m_connectionInfo(connInfo), m_connectionFlags(CONNECTION_SYNCH) { } @@ -52,24 +51,26 @@ m_Mysql(NULL), m_connectionInfo(connInfo), m_connectionFlags(CONNECTION_ASYNC) { - m_worker = new DatabaseWorker(m_queue, this); + m_worker = Trinity::make_unique<DatabaseWorker>(m_queue, this); } MySQLConnection::~MySQLConnection() { - delete m_worker; - - for (size_t i = 0; i < m_stmts.size(); ++i) - delete m_stmts[i]; - - if (m_Mysql) - mysql_close(m_Mysql); + Close(); } void MySQLConnection::Close() { - /// Only close us if we're not operating - delete this; + // Stop the worker thread before the statements are cleared + m_worker.reset(); + + m_stmts.clear(); + + if (m_Mysql) + { + mysql_close(m_Mysql); + m_Mysql = nullptr; + } } uint32 MySQLConnection::Open() @@ -414,7 +415,7 @@ int MySQLConnection::ExecuteTransaction(SQLTransaction& transaction) MySQLPreparedStatement* MySQLConnection::GetPreparedStatement(uint32 index) { ASSERT(index < m_stmts.size()); - MySQLPreparedStatement* ret = m_stmts[index]; + MySQLPreparedStatement* ret = m_stmts[index].get(); if (!ret) TC_LOG_ERROR("sql.sql", "Could not fetch prepared statement %u on database `%s`, connection type: %s.", index, m_connectionInfo.database.c_str(), (m_connectionFlags & CONNECTION_ASYNC) ? "asynchronous" : "synchronous"); @@ -426,16 +427,12 @@ void MySQLConnection::PrepareStatement(uint32 index, const char* sql, Connection { m_queries.insert(PreparedStatementMap::value_type(index, std::make_pair(sql, flags))); - // For reconnection case - if (m_reconnecting) - delete m_stmts[index]; - // Check if specified query should be prepared on this connection // i.e. don't prepare async statements on synchronous connections // to save memory that will not be used. if (!(m_connectionFlags & flags)) { - m_stmts[index] = NULL; + m_stmts[index].reset(); return; } @@ -457,8 +454,7 @@ void MySQLConnection::PrepareStatement(uint32 index, const char* sql, Connection } else { - MySQLPreparedStatement* mStmt = new MySQLPreparedStatement(stmt); - m_stmts[index] = mStmt; + m_stmts[index] = Trinity::make_unique<MySQLPreparedStatement>(stmt); } } } @@ -479,7 +475,7 @@ PreparedResultSet* MySQLConnection::Query(PreparedStatement* stmt) return new PreparedResultSet(stmt->m_stmt->GetSTMT(), result, rowCount, fieldCount); } -bool MySQLConnection::_HandleMySQLErrno(uint32 errNo) +bool MySQLConnection::_HandleMySQLErrno(uint32 errNo, uint8 attempts /*= 5*/) { switch (errNo) { @@ -488,9 +484,21 @@ bool MySQLConnection::_HandleMySQLErrno(uint32 errNo) case CR_INVALID_CONN_HANDLE: case CR_SERVER_LOST_EXTENDED: { + if (m_Mysql) + { + TC_LOG_ERROR("sql.sql", "Lost the connection to the MySQL server!"); + + mysql_close(GetHandle()); + m_Mysql = nullptr; + } + + /*no break*/ + } + case CR_CONN_HOST_ERROR: + { + TC_LOG_INFO("sql.sql", "Attempting to reconnect to the MySQL server..."); + m_reconnecting = true; - uint64 oldThreadId = mysql_thread_id(GetHandle()); - mysql_close(GetHandle()); uint32 const lErrno = Open(); if (!lErrno) @@ -498,24 +506,37 @@ bool MySQLConnection::_HandleMySQLErrno(uint32 errNo) // Don't remove 'this' pointer unless you want to skip loading all prepared statements... if (!this->PrepareStatements()) { - TC_LOG_ERROR("sql.sql", "Could not re-prepare statements!"); - Close(); - return false; + TC_LOG_FATAL("sql.sql", "Could not re-prepare statements!"); + std::this_thread::sleep_for(std::chrono::seconds(10)); + std::abort(); } - TC_LOG_INFO("sql.sql", "Connection to the MySQL server is active."); - if (oldThreadId != mysql_thread_id(GetHandle())) - TC_LOG_INFO("sql.sql", "Successfully reconnected to %s @%s:%s (%s).", - m_connectionInfo.database.c_str(), m_connectionInfo.host.c_str(), m_connectionInfo.port_or_socket.c_str(), - (m_connectionFlags & CONNECTION_ASYNC) ? "asynchronous" : "synchronous"); + TC_LOG_INFO("sql.sql", "Successfully reconnected to %s @%s:%s (%s).", + m_connectionInfo.database.c_str(), m_connectionInfo.host.c_str(), m_connectionInfo.port_or_socket.c_str(), + (m_connectionFlags & CONNECTION_ASYNC) ? "asynchronous" : "synchronous"); m_reconnecting = false; return true; } - // It's possible this attempted reconnect throws 2006 at us. To prevent crazy recursive calls, sleep here. - std::this_thread::sleep_for(std::chrono::seconds(3)); // Sleep 3 seconds - return _HandleMySQLErrno(lErrno); // Call self (recursive) + if ((--attempts) == 0) + { + // Shut down the server when the mysql server isn't + // reachable for some time + TC_LOG_FATAL("sql.sql", "Failed to reconnect to the MySQL server, " + "terminating the server to prevent data corruption!"); + + // We could also initiate a shutdown through using std::raise(SIGTERM) + std::this_thread::sleep_for(std::chrono::seconds(10)); + std::abort(); + } + else + { + // It's possible this attempted reconnect throws 2006 at us. + // To prevent crazy recursive calls, sleep here. + std::this_thread::sleep_for(std::chrono::seconds(3)); // Sleep 3 seconds + return _HandleMySQLErrno(lErrno, attempts); // Call self (recursive) + } } case ER_LOCK_DEADLOCK: diff --git a/src/server/database/Database/MySQLConnection.h b/src/server/database/Database/MySQLConnection.h index a981caa607e..a0b908593df 100644 --- a/src/server/database/Database/MySQLConnection.h +++ b/src/server/database/Database/MySQLConnection.h @@ -116,18 +116,18 @@ class MySQLConnection virtual void DoPrepareStatements() = 0; protected: - std::vector<MySQLPreparedStatement*> m_stmts; //! PreparedStatements storage + std::vector<std::unique_ptr<MySQLPreparedStatement>> m_stmts; //! PreparedStatements storage PreparedStatementMap m_queries; //! Query storage bool m_reconnecting; //! Are we reconnecting? bool m_prepareError; //! Was there any error while preparing statements? private: - bool _HandleMySQLErrno(uint32 errNo); + bool _HandleMySQLErrno(uint32 errNo, uint8 attempts = 5); private: ProducerConsumerQueue<SQLOperation*>* m_queue; //! Queue shared with other asynchronous connections. - DatabaseWorker* m_worker; //! Core worker task. - MYSQL * m_Mysql; //! MySQL Handle. + std::unique_ptr<DatabaseWorker> m_worker; //! Core worker task. + MYSQL* m_Mysql; //! MySQL Handle. MySQLConnectionInfo& m_connectionInfo; //! Connection info (used for logging) ConnectionFlags m_connectionFlags; //! Connection flags (for preparing relevant statements) std::mutex m_Mutex; diff --git a/src/server/database/Updater/DBUpdater.cpp b/src/server/database/Updater/DBUpdater.cpp index b36ccd37691..ab4b767b104 100644 --- a/src/server/database/Updater/DBUpdater.cpp +++ b/src/server/database/Updater/DBUpdater.cpp @@ -21,6 +21,7 @@ #include "UpdateFetcher.h" #include "DatabaseLoader.h" #include "Config.h" +#include "BuiltInConfig.h" #include <fstream> #include <iostream> @@ -35,23 +36,17 @@ using namespace boost::process; using namespace boost::process::initializers; using namespace boost::iostreams; -std::string DBUpdaterUtil::GetMySqlCli() +std::string DBUpdaterUtil::GetCorrectedMySQLExecutable() { if (!corrected_path().empty()) return corrected_path(); else - { - std::string const entry = sConfigMgr->GetStringDefault("Updates.MySqlCLIPath", ""); - if (!entry.empty()) - return entry; - else - return GitRevision::GetMySQLExecutable(); - } + return BuiltInConfig::GetMySQLExecutable(); } bool DBUpdaterUtil::CheckExecutable() { - boost::filesystem::path exe(GetMySqlCli()); + boost::filesystem::path exe(GetCorrectedMySQLExecutable()); if (!exists(exe)) { exe.clear(); @@ -85,16 +80,6 @@ std::string& DBUpdaterUtil::corrected_path() return path; } -template<class T> -std::string DBUpdater<T>::GetSourceDirectory() -{ - std::string const entry = sConfigMgr->GetStringDefault("Updates.SourcePath", ""); - if (!entry.empty()) - return entry; - else - return GitRevision::GetSourceDirectory(); -} - // Auth Database template<> std::string DBUpdater<LoginDatabaseConnection>::GetConfigEntry() @@ -111,7 +96,8 @@ std::string DBUpdater<LoginDatabaseConnection>::GetTableName() template<> std::string DBUpdater<LoginDatabaseConnection>::GetBaseFile() { - return DBUpdater<LoginDatabaseConnection>::GetSourceDirectory() + "/sql/base/auth_database.sql"; + return BuiltInConfig::GetSourceDirectory() + + "/sql/base/auth_database.sql"; } template<> @@ -169,7 +155,8 @@ std::string DBUpdater<CharacterDatabaseConnection>::GetTableName() template<> std::string DBUpdater<CharacterDatabaseConnection>::GetBaseFile() { - return DBUpdater<CharacterDatabaseConnection>::GetSourceDirectory() + "/sql/base/characters_database.sql"; + return BuiltInConfig::GetSourceDirectory() + + "/sql/base/characters_database.sql"; } template<> @@ -271,7 +258,7 @@ bool DBUpdater<T>::Update(DatabaseWorkerPool<T>& pool) TC_LOG_INFO("sql.updates", "Updating %s database...", DBUpdater<T>::GetTableName().c_str()); - Path const sourceDirectory(GetSourceDirectory()); + Path const sourceDirectory(BuiltInConfig::GetSourceDirectory()); if (!is_directory(sourceDirectory)) { @@ -442,7 +429,7 @@ void DBUpdater<T>::ApplyFile(DatabaseWorkerPool<T>& pool, std::string const& hos boost::process::pipe errPipe = create_pipe(); child c = execute(run_exe( - boost::filesystem::absolute(DBUpdaterUtil::GetMySqlCli()).generic_string()), + boost::filesystem::absolute(DBUpdaterUtil::GetCorrectedMySQLExecutable()).generic_string()), set_args(args), bind_stdin(source), throw_on_error(), bind_stdout(file_descriptor_sink(outPipe.sink, close_handle)), bind_stderr(file_descriptor_sink(errPipe.sink, close_handle))); diff --git a/src/server/database/Updater/DBUpdater.h b/src/server/database/Updater/DBUpdater.h index c9792ffe060..dbb897d2527 100644 --- a/src/server/database/Updater/DBUpdater.h +++ b/src/server/database/Updater/DBUpdater.h @@ -57,7 +57,7 @@ struct UpdateResult class DBUpdaterUtil { public: - static std::string GetMySqlCli(); + static std::string GetCorrectedMySQLExecutable(); static bool CheckExecutable(); @@ -71,8 +71,6 @@ class DBUpdater public: using Path = boost::filesystem::path; - static std::string GetSourceDirectory(); - static inline std::string GetConfigEntry(); static inline std::string GetTableName(); diff --git a/src/server/database/Updater/UpdateFetcher.cpp b/src/server/database/Updater/UpdateFetcher.cpp index 001fdf20610..2d60cdb92ef 100644 --- a/src/server/database/Updater/UpdateFetcher.cpp +++ b/src/server/database/Updater/UpdateFetcher.cpp @@ -137,22 +137,33 @@ UpdateFetcher::AppliedFileStorage UpdateFetcher::ReceiveAppliedFiles() const return map; } -UpdateFetcher::SQLUpdate UpdateFetcher::ReadSQLUpdate(boost::filesystem::path const& file) const +std::string UpdateFetcher::ReadSQLUpdate(boost::filesystem::path const& file) const { std::ifstream in(file.c_str()); - WPFatal(in.is_open(), "Could not read an update file."); + if (!in.is_open()) + { + TC_LOG_FATAL("sql.updates", "Failed to open the sql update \"%s\" for reading! " + "Stopping the server to keep the database integrity, " + "try to identify and solve the issue or disabled the database updater.", + file.generic_string().c_str()); + + throw UpdateException("Opening the sql update failed!"); + } auto update = [&in] { std::ostringstream ss; ss << in.rdbuf(); - return Trinity::make_unique<std::string>(ss.str()); + return ss.str(); }(); in.close(); return update; } -UpdateResult UpdateFetcher::Update(bool const redundancyChecks, bool const allowRehash, bool const archivedRedundancy, int32 const cleanDeadReferencesMaxCount) const +UpdateResult UpdateFetcher::Update(bool const redundancyChecks, + bool const allowRehash, + bool const archivedRedundancy, + int32 const cleanDeadReferencesMaxCount) const { LocaleFileStorage const available = GetFileList(); AppliedFileStorage applied = ReceiveAppliedFiles(); @@ -198,11 +209,9 @@ UpdateResult UpdateFetcher::Update(bool const redundancyChecks, bool const allow } } - // Read update from file - SQLUpdate const update = ReadSQLUpdate(availableQuery.first); - // Calculate hash - std::string const hash = CalculateHash(update); + std::string const hash = + CalculateHash(ReadSQLUpdate(availableQuery.first)); UpdateMode mode = MODE_APPLY; @@ -325,11 +334,11 @@ UpdateResult UpdateFetcher::Update(bool const redundancyChecks, bool const allow return UpdateResult(importedUpdates, countRecentUpdates, countArchivedUpdates); } -std::string UpdateFetcher::CalculateHash(SQLUpdate const& query) const +std::string UpdateFetcher::CalculateHash(std::string const& query) const { // Calculate a Sha1 hash based on query content. unsigned char digest[SHA_DIGEST_LENGTH]; - SHA1((unsigned char*)query->c_str(), query->length(), (unsigned char*)&digest); + SHA1((unsigned char*)query.c_str(), query.length(), (unsigned char*)&digest); return ByteArrayToHexStr(digest, SHA_DIGEST_LENGTH); } diff --git a/src/server/database/Updater/UpdateFetcher.h b/src/server/database/Updater/UpdateFetcher.h index 32f8516413d..c87efea2b02 100644 --- a/src/server/database/Updater/UpdateFetcher.h +++ b/src/server/database/Updater/UpdateFetcher.h @@ -103,16 +103,16 @@ private: typedef std::unordered_map<std::string, std::string> HashToFileNameStorage; typedef std::unordered_map<std::string, AppliedFileEntry> AppliedFileStorage; typedef std::vector<UpdateFetcher::DirectoryEntry> DirectoryStorage; - typedef std::unique_ptr<std::string> SQLUpdate; LocaleFileStorage GetFileList() const; - void FillFileListRecursively(Path const& path, LocaleFileStorage& storage, State const state, uint32 const depth) const; + void FillFileListRecursively(Path const& path, LocaleFileStorage& storage, + State const state, uint32 const depth) const; DirectoryStorage ReceiveIncludedDirectories() const; AppliedFileStorage ReceiveAppliedFiles() const; - SQLUpdate ReadSQLUpdate(Path const& file) const; - std::string CalculateHash(SQLUpdate const& query) const; + std::string ReadSQLUpdate(Path const& file) const; + std::string CalculateHash(std::string const& query) const; uint32 Apply(Path const& path) const; |
