/* * This file is part of the TrinityCore Project. See AUTHORS file for Copyright information * * This program is free software; you can redistribute it and/or modify it * under the terms of the GNU General Public License as published by the * Free Software Foundation; either version 2 of the License, or (at your * option) any later version. * * This program is distributed in the hope that it will be useful, but WITHOUT * ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or * FITNESS FOR A PARTICULAR PURPOSE. See the GNU General Public License for * more details. * * You should have received a copy of the GNU General Public License along * with this program. If not, see . */ #include "DatabaseWorkerPool.h" #include "AdhocStatement.h" #include "Common.h" #include "Errors.h" #include "IoContext.h" #include "Implementation/LoginDatabase.h" #include "Implementation/WorldDatabase.h" #include "Implementation/CharacterDatabase.h" #include "Implementation/HotfixDatabase.h" #include "Log.h" #include "MySQLPreparedStatement.h" #include "PreparedStatement.h" #include "ProducerConsumerQueue.h" #include "QueryCallback.h" #include "QueryHolder.h" #include "QueryResult.h" #include "Transaction.h" #include "MySQLWorkaround.h" #include #include #include #ifdef TRINITY_DEBUG #include #endif static consteval uint32 ParseVersionString(std::string_view chars) { uint32 result = 0; uint32 partialResult = 0; uint32 multiplier = 10000; for (std::size_t i = 0; i < chars.length(); ++i) { char c = chars[i]; if (c == '.') { if (multiplier < 100) throw "Too many . characters in version string"; result += partialResult * multiplier; multiplier /= 100; partialResult = 0; } else if (c >= '0' && c <= '9') { partialResult *= 10; partialResult += c - '0'; } else throw "Invalid input character"; } result += partialResult * multiplier; return result; } namespace { #ifdef TRINITY_DEBUG template thread_local bool WarnSyncQueries = false; #endif } template struct DatabaseWorkerPool::QueueSizeTracker { explicit QueueSizeTracker(DatabaseWorkerPool* pool) : _pool(pool) { ++_pool->_queueSize; } QueueSizeTracker(QueueSizeTracker const& other) : _pool(other._pool) { ++_pool->_queueSize; } QueueSizeTracker(QueueSizeTracker&& other) noexcept : _pool(std::exchange(other._pool, nullptr)) { } QueueSizeTracker& operator=(QueueSizeTracker const& other) { if (this != &other) { if (_pool != other._pool) { if (_pool) --_pool->_queueSize; if (other._pool) ++other._pool->_queueSize; } _pool = other._pool; } return *this; } QueueSizeTracker& operator=(QueueSizeTracker&& other) noexcept { if (this != &other) { if (_pool != other._pool) { if (_pool) --_pool->_queueSize; } _pool = std::exchange(other._pool, nullptr); } return *this; } ~QueueSizeTracker() { if (_pool) --_pool->_queueSize; } private: DatabaseWorkerPool* _pool; }; template DatabaseWorkerPool::DatabaseWorkerPool() : _async_threads(0), _synch_threads(0) { // We only need check compiled version match on Windows // because on other platforms ABI compatibility is ensured by SOVERSION // and Windows MySQL releases don't even have abi-version-like component in their dll file name #if TRINITY_PLATFORM == TRINITY_PLATFORM_WINDOWS #if defined(LIBMARIADB) && MARIADB_PACKAGE_VERSION_ID >= 30200 #define TRINITY_COMPILED_CLIENT_VERSION MARIADB_PACKAGE_VERSION_ID #else #define TRINITY_COMPILED_CLIENT_VERSION MYSQL_VERSION_ID #endif WPFatal(mysql_get_client_version() == TRINITY_COMPILED_CLIENT_VERSION, "Used " TRINITY_MYSQL_FLAVOR " library version (%s id %lu) does not match the version id used to compile TrinityCore (id %u). Search on forum for TCE00011.", mysql_get_client_info(), mysql_get_client_version(), TRINITY_COMPILED_CLIENT_VERSION); #undef TRINITY_COMPILED_CLIENT_VERSION #endif } template DatabaseWorkerPool::~DatabaseWorkerPool() { } template void DatabaseWorkerPool::SetConnectionInfo(std::string const& infoString, uint8 const asyncThreads, uint8 const synchThreads) { _connectionInfo = std::make_unique(infoString); _async_threads = asyncThreads; _synch_threads = synchThreads; } template uint32 DatabaseWorkerPool::Open() { WPFatal(_connectionInfo.get(), "Connection info was not set!"); TC_LOG_INFO("sql.driver", "Opening DatabasePool '{}'. " "Asynchronous connections: {}, synchronous connections: {}.", GetDatabaseName(), _async_threads, _synch_threads); _ioContext = std::make_unique(_async_threads); uint32 error = OpenConnections(IDX_ASYNC, _async_threads); if (error) return error; error = OpenConnections(IDX_SYNCH, _synch_threads); if (error) return error; for (std::unique_ptr const& connection : _connections[IDX_ASYNC]) connection->StartWorkerThread(_ioContext.get()); TC_LOG_INFO("sql.driver", "DatabasePool '{}' opened successfully. " "{} total connections running.", GetDatabaseName(), (_connections[IDX_SYNCH].size() + _connections[IDX_ASYNC].size())); return 0; } template void DatabaseWorkerPool::Close() { TC_LOG_INFO("sql.driver", "Closing down DatabasePool '{}'.", GetDatabaseName()); if (_ioContext) _ioContext->stop(); //! Closes the actualy MySQL connection. _connections[IDX_ASYNC].clear(); _ioContext.reset(); TC_LOG_INFO("sql.driver", "Asynchronous connections on DatabasePool '{}' terminated. " "Proceeding with synchronous connections.", GetDatabaseName()); //! Shut down the synchronous connections //! There's no need for locking the connection, because DatabaseWorkerPool<>::Close //! should only be called after any other thread tasks in the core have exited, //! meaning there can be no concurrent access at this point. _connections[IDX_SYNCH].clear(); TC_LOG_INFO("sql.driver", "All connections on DatabasePool '{}' closed.", GetDatabaseName()); } template bool DatabaseWorkerPool::PrepareStatements() { for (auto& connections : _connections) { for (auto& connection : connections) { connection->LockIfReady(); if (!connection->PrepareStatements()) { connection->Unlock(); Close(); return false; } else connection->Unlock(); size_t const preparedSize = connection->m_stmts.size(); if (_preparedStatementSize.size() < preparedSize) _preparedStatementSize.resize(preparedSize); for (size_t i = 0; i < preparedSize; ++i) { // already set by another connection // (each connection only has prepared statements of it's own type sync/async) if (_preparedStatementSize[i] > 0) continue; if (MySQLPreparedStatement * stmt = connection->m_stmts[i].get()) { uint32 const paramCount = stmt->GetParameterCount(); // TC only supports uint8 indices. ASSERT(paramCount < std::numeric_limits::max()); _preparedStatementSize[i] = static_cast(paramCount); } } } } return true; } template QueryResult DatabaseWorkerPool::Query(char const* sql, T* connection /*= nullptr*/) { if (!connection) connection = GetFreeConnection(); QueryResult result = BasicStatementTask::Query(connection, sql); connection->Unlock(); return result; } template PreparedQueryResult DatabaseWorkerPool::Query(PreparedStatement* stmt) { T* connection = GetFreeConnection(); PreparedQueryResult ret = PreparedStatementTask::Query(connection, stmt); connection->Unlock(); //! Delete proxy-class. Not needed anymore delete stmt; return ret; } template QueryCallback DatabaseWorkerPool::AsyncQuery(char const* sql) { std::future result = boost::asio::post(_ioContext->get_executor(), boost::asio::use_future([this, sql = std::string(sql), tracker = QueueSizeTracker(this)] { T* conn = GetAsyncConnectionForCurrentThread(); return BasicStatementTask::Query(conn, sql.c_str()); })); return QueryCallback(std::move(result)); } template QueryCallback DatabaseWorkerPool::AsyncQuery(PreparedStatement* stmt) { std::future result = boost::asio::post(_ioContext->get_executor(), boost::asio::use_future([this, stmt = std::unique_ptr>(stmt), tracker = QueueSizeTracker(this)] { T* conn = GetAsyncConnectionForCurrentThread(); return PreparedStatementTask::Query(conn, stmt.get()); })); return QueryCallback(std::move(result)); } template SQLQueryHolderCallback DatabaseWorkerPool::DelayQueryHolder(std::shared_ptr> holder) { std::future result = boost::asio::post(_ioContext->get_executor(), boost::asio::use_future([this, holder, tracker = QueueSizeTracker(this)] { T* conn = GetAsyncConnectionForCurrentThread(); SQLQueryHolderTask::Execute(conn, holder.get()); })); return { std::move(holder), std::move(result) }; } template SQLTransaction DatabaseWorkerPool::BeginTransaction() { return std::make_shared>(); } template void DatabaseWorkerPool::CommitTransaction(SQLTransaction transaction) { #ifdef TRINITY_DEBUG //! Only analyze transaction weaknesses in Debug mode. //! Ideally we catch the faults in Debug mode and then correct them, //! so there's no need to waste these CPU cycles in Release mode. switch (transaction->GetSize()) { case 0: TC_LOG_DEBUG("sql.driver", "Transaction contains 0 queries. Not executing."); return; case 1: TC_LOG_DEBUG("sql.driver", "Warning: Transaction only holds 1 query, consider removing Transaction context in code."); break; default: break; } #endif // TRINITY_DEBUG boost::asio::post(_ioContext->get_executor(), [this, transaction, tracker = QueueSizeTracker(this)] { T* conn = GetAsyncConnectionForCurrentThread(); TransactionTask::Execute(conn, transaction); }); } template TransactionCallback DatabaseWorkerPool::AsyncCommitTransaction(SQLTransaction transaction) { #ifdef TRINITY_DEBUG //! Only analyze transaction weaknesses in Debug mode. //! Ideally we catch the faults in Debug mode and then correct them, //! so there's no need to waste these CPU cycles in Release mode. switch (transaction->GetSize()) { case 0: TC_LOG_DEBUG("sql.driver", "Transaction contains 0 queries. Not executing."); break; case 1: TC_LOG_DEBUG("sql.driver", "Warning: Transaction only holds 1 query, consider removing Transaction context in code."); break; default: break; } #endif // TRINITY_DEBUG std::future result = boost::asio::post(_ioContext->get_executor(), boost::asio::use_future([this, transaction, tracker = QueueSizeTracker(this)] { T* conn = GetAsyncConnectionForCurrentThread(); return TransactionTask::Execute(conn, transaction); })); return TransactionCallback(std::move(result)); } template void DatabaseWorkerPool::DirectCommitTransaction(SQLTransaction& transaction) { T* connection = GetFreeConnection(); int errorCode = connection->ExecuteTransaction(transaction); if (!errorCode) { connection->Unlock(); // OK, operation succesful return; } //! Handle MySQL Errno 1213 without extending deadlock to the core itself /// @todo More elegant way if (errorCode == ER_LOCK_DEADLOCK) { //todo: handle multiple sync threads deadlocking in a similar way as async threads uint8 loopBreaker = 5; for (uint8 i = 0; i < loopBreaker; ++i) { if (!connection->ExecuteTransaction(transaction)) break; } } //! Clean up now. transaction->Cleanup(); connection->Unlock(); } template PreparedStatement* DatabaseWorkerPool::GetPreparedStatement(PreparedStatementIndex index) { return new PreparedStatement(index, _preparedStatementSize[index]); } template void DatabaseWorkerPool::EscapeString(std::string& str) { if (str.empty()) return; char* buf = new char[str.size() * 2 + 1]; EscapeString(buf, str.c_str(), uint32(str.size())); str = buf; delete[] buf; } template void DatabaseWorkerPool::KeepAlive() { //! Ping synchronous connections for (auto& connection : _connections[IDX_SYNCH]) { if (connection->LockIfReady()) { connection->Ping(); connection->Unlock(); } } //! Assuming all worker threads are free, every worker thread will receive 1 ping operation request //! If one or more worker threads are busy, the ping operations will not be split evenly, but this doesn't matter //! as the sole purpose is to prevent connections from idling. auto const count = _connections[IDX_ASYNC].size(); for (uint8 i = 0; i < count; ++i) { boost::asio::post(_ioContext->get_executor(), [this, tracker = QueueSizeTracker(this)] { T* conn = GetAsyncConnectionForCurrentThread(); conn->Ping(); }); } } #ifdef TRINITY_DEBUG template void DatabaseWorkerPool::WarnAboutSyncQueries([[maybe_unused]] bool warn) { WarnSyncQueries = warn; } #endif template uint32 DatabaseWorkerPool::OpenConnections(InternalIndex type, uint8 numConnections) { for (uint8 i = 0; i < numConnections; ++i) { // Create the connection constexpr std::array flags = { { CONNECTION_ASYNC, CONNECTION_SYNCH } }; std::unique_ptr connection = std::make_unique(*_connectionInfo, flags[type]); if (uint32 error = connection->Open()) { // Failed to open a connection or invalid version, abort and cleanup _connections[type].clear(); return error; } else if (uint32 serverVersion = connection->GetServerVersion(); serverVersion < ParseVersionString(TRINITY_REQUIRED_MYSQL_VERSION)) { TC_LOG_ERROR("sql.driver", "TrinityCore does not support " TRINITY_MYSQL_FLAVOR " versions below " TRINITY_REQUIRED_MYSQL_VERSION " (found id {}, need id >= {}), please update your " TRINITY_MYSQL_FLAVOR " server", serverVersion, ParseVersionString(TRINITY_REQUIRED_MYSQL_VERSION)); return 1; } else { _connections[type].push_back(std::move(connection)); } } // Everything is fine return 0; } template unsigned long DatabaseWorkerPool::EscapeString(char* to, char const* from, unsigned long length) { if (!to || !from || !length) return 0; return _connections[IDX_SYNCH].front()->EscapeString(to, from, length); } template size_t DatabaseWorkerPool::QueueSize() const { return _queueSize; } template T* DatabaseWorkerPool::GetFreeConnection() { #ifdef TRINITY_DEBUG if (WarnSyncQueries) { TC_LOG_WARN("sql.performances", "Sync query at:\n{}", boost::stacktrace::to_string(boost::stacktrace::stacktrace())); } #endif uint8 i = 0; auto const num_cons = _connections[IDX_SYNCH].size(); T* connection = nullptr; //! Block forever until a connection is free for (;;) { connection = _connections[IDX_SYNCH][i++ % num_cons].get(); //! Must be matched with t->Unlock() or you will get deadlocks if (connection->LockIfReady()) break; } return connection; } template T* DatabaseWorkerPool::GetAsyncConnectionForCurrentThread() const { std::thread::id id = std::this_thread::get_id(); for (auto&& connection : _connections[IDX_ASYNC]) if (connection->GetWorkerThreadId() == id) return connection.get(); return nullptr; } template char const* DatabaseWorkerPool::GetDatabaseName() const { return _connectionInfo->database.c_str(); } template void DatabaseWorkerPool::Execute(char const* sql) { if (!sql) return; boost::asio::post(_ioContext->get_executor(), [this, sql = std::string(sql), tracker = QueueSizeTracker(this)] { T* conn = GetAsyncConnectionForCurrentThread(); BasicStatementTask::Execute(conn, sql.c_str()); }); } template void DatabaseWorkerPool::Execute(PreparedStatement* stmt) { boost::asio::post(_ioContext->get_executor(), [this, stmt = std::unique_ptr>(stmt), tracker = QueueSizeTracker(this)] { T* conn = GetAsyncConnectionForCurrentThread(); PreparedStatementTask::Execute(conn, stmt.get()); }); } template void DatabaseWorkerPool::DirectExecute(char const* sql) { if (!sql) return; T* connection = GetFreeConnection(); BasicStatementTask::Execute(connection, sql); connection->Unlock(); } template void DatabaseWorkerPool::DirectExecute(PreparedStatement* stmt) { T* connection = GetFreeConnection(); PreparedStatementTask::Execute(connection, stmt); connection->Unlock(); //! Delete proxy-class. Not needed anymore delete stmt; } template void DatabaseWorkerPool::ExecuteOrAppend(SQLTransaction& trans, char const* sql) { if (!trans) Execute(sql); else trans->Append(sql); } template void DatabaseWorkerPool::ExecuteOrAppend(SQLTransaction& trans, PreparedStatement* stmt) { if (!trans) Execute(stmt); else trans->Append(stmt); } template class TC_DATABASE_API DatabaseWorkerPool; template class TC_DATABASE_API DatabaseWorkerPool; template class TC_DATABASE_API DatabaseWorkerPool; template class TC_DATABASE_API DatabaseWorkerPool;