/*
* This file is part of the TrinityCore Project. See AUTHORS file for Copyright information
*
* This program is free software; you can redistribute it and/or modify it
* under the terms of the GNU General Public License as published by the
* Free Software Foundation; either version 2 of the License, or (at your
* option) any later version.
*
* This program is distributed in the hope that it will be useful, but WITHOUT
* ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or
* FITNESS FOR A PARTICULAR PURPOSE. See the GNU General Public License for
* more details.
*
* You should have received a copy of the GNU General Public License along
* with this program. If not, see .
*/
#include "DatabaseWorkerPool.h"
#include "AdhocStatement.h"
#include "Common.h"
#include "Errors.h"
#include "IoContext.h"
#include "Implementation/LoginDatabase.h"
#include "Implementation/WorldDatabase.h"
#include "Implementation/CharacterDatabase.h"
#include "Implementation/HotfixDatabase.h"
#include "Log.h"
#include "MySQLPreparedStatement.h"
#include "PreparedStatement.h"
#include "ProducerConsumerQueue.h"
#include "QueryCallback.h"
#include "QueryHolder.h"
#include "QueryResult.h"
#include "Transaction.h"
#include "MySQLWorkaround.h"
#include
#include
#include
#ifdef TRINITY_DEBUG
#include
#endif
static consteval uint32 ParseVersionString(std::string_view chars)
{
uint32 result = 0;
uint32 partialResult = 0;
uint32 multiplier = 10000;
for (std::size_t i = 0; i < chars.length(); ++i)
{
char c = chars[i];
if (c == '.')
{
if (multiplier < 100)
throw "Too many . characters in version string";
result += partialResult * multiplier;
multiplier /= 100;
partialResult = 0;
}
else if (c >= '0' && c <= '9')
{
partialResult *= 10;
partialResult += c - '0';
}
else
throw "Invalid input character";
}
result += partialResult * multiplier;
return result;
}
namespace
{
#ifdef TRINITY_DEBUG
template
thread_local bool WarnSyncQueries = false;
#endif
}
template
struct DatabaseWorkerPool::QueueSizeTracker
{
explicit QueueSizeTracker(DatabaseWorkerPool* pool) : _pool(pool)
{
++_pool->_queueSize;
}
QueueSizeTracker(QueueSizeTracker const& other) : _pool(other._pool) { ++_pool->_queueSize; }
QueueSizeTracker(QueueSizeTracker&& other) noexcept : _pool(std::exchange(other._pool, nullptr)) { }
QueueSizeTracker& operator=(QueueSizeTracker const& other)
{
if (this != &other)
{
if (_pool != other._pool)
{
if (_pool)
--_pool->_queueSize;
if (other._pool)
++other._pool->_queueSize;
}
_pool = other._pool;
}
return *this;
}
QueueSizeTracker& operator=(QueueSizeTracker&& other) noexcept
{
if (this != &other)
{
if (_pool != other._pool)
{
if (_pool)
--_pool->_queueSize;
}
_pool = std::exchange(other._pool, nullptr);
}
return *this;
}
~QueueSizeTracker()
{
if (_pool)
--_pool->_queueSize;
}
private:
DatabaseWorkerPool* _pool;
};
template
DatabaseWorkerPool::DatabaseWorkerPool()
: _async_threads(0), _synch_threads(0)
{
// We only need check compiled version match on Windows
// because on other platforms ABI compatibility is ensured by SOVERSION
// and Windows MySQL releases don't even have abi-version-like component in their dll file name
#if TRINITY_PLATFORM == TRINITY_PLATFORM_WINDOWS
#if defined(LIBMARIADB) && MARIADB_PACKAGE_VERSION_ID >= 30200
#define TRINITY_COMPILED_CLIENT_VERSION MARIADB_PACKAGE_VERSION_ID
#else
#define TRINITY_COMPILED_CLIENT_VERSION MYSQL_VERSION_ID
#endif
WPFatal(mysql_get_client_version() == TRINITY_COMPILED_CLIENT_VERSION, "Used " TRINITY_MYSQL_FLAVOR " library version (%s id %lu) does not match the version id used to compile TrinityCore (id %u). Search on forum for TCE00011.", mysql_get_client_info(), mysql_get_client_version(), TRINITY_COMPILED_CLIENT_VERSION);
#undef TRINITY_COMPILED_CLIENT_VERSION
#endif
}
template
DatabaseWorkerPool::~DatabaseWorkerPool()
{
}
template
void DatabaseWorkerPool::SetConnectionInfo(std::string const& infoString,
uint8 const asyncThreads, uint8 const synchThreads)
{
_connectionInfo = std::make_unique(infoString);
_async_threads = asyncThreads;
_synch_threads = synchThreads;
}
template
uint32 DatabaseWorkerPool::Open()
{
WPFatal(_connectionInfo.get(), "Connection info was not set!");
TC_LOG_INFO("sql.driver", "Opening DatabasePool '{}'. "
"Asynchronous connections: {}, synchronous connections: {}.",
GetDatabaseName(), _async_threads, _synch_threads);
_ioContext = std::make_unique(_async_threads);
uint32 error = OpenConnections(IDX_ASYNC, _async_threads);
if (error)
return error;
error = OpenConnections(IDX_SYNCH, _synch_threads);
if (error)
return error;
for (std::unique_ptr const& connection : _connections[IDX_ASYNC])
connection->StartWorkerThread(_ioContext.get());
TC_LOG_INFO("sql.driver", "DatabasePool '{}' opened successfully. "
"{} total connections running.", GetDatabaseName(),
(_connections[IDX_SYNCH].size() + _connections[IDX_ASYNC].size()));
return 0;
}
template
void DatabaseWorkerPool::Close()
{
TC_LOG_INFO("sql.driver", "Closing down DatabasePool '{}'.", GetDatabaseName());
if (_ioContext)
_ioContext->stop();
//! Closes the actualy MySQL connection.
_connections[IDX_ASYNC].clear();
_ioContext.reset();
TC_LOG_INFO("sql.driver", "Asynchronous connections on DatabasePool '{}' terminated. "
"Proceeding with synchronous connections.",
GetDatabaseName());
//! Shut down the synchronous connections
//! There's no need for locking the connection, because DatabaseWorkerPool<>::Close
//! should only be called after any other thread tasks in the core have exited,
//! meaning there can be no concurrent access at this point.
_connections[IDX_SYNCH].clear();
TC_LOG_INFO("sql.driver", "All connections on DatabasePool '{}' closed.", GetDatabaseName());
}
template
bool DatabaseWorkerPool::PrepareStatements()
{
for (auto& connections : _connections)
{
for (auto& connection : connections)
{
connection->LockIfReady();
if (!connection->PrepareStatements())
{
connection->Unlock();
Close();
return false;
}
else
connection->Unlock();
size_t const preparedSize = connection->m_stmts.size();
if (_preparedStatementSize.size() < preparedSize)
_preparedStatementSize.resize(preparedSize);
for (size_t i = 0; i < preparedSize; ++i)
{
// already set by another connection
// (each connection only has prepared statements of it's own type sync/async)
if (_preparedStatementSize[i] > 0)
continue;
if (MySQLPreparedStatement * stmt = connection->m_stmts[i].get())
{
uint32 const paramCount = stmt->GetParameterCount();
// TC only supports uint8 indices.
ASSERT(paramCount < std::numeric_limits::max());
_preparedStatementSize[i] = static_cast(paramCount);
}
}
}
}
return true;
}
template
QueryResult DatabaseWorkerPool::Query(char const* sql, T* connection /*= nullptr*/)
{
if (!connection)
connection = GetFreeConnection();
QueryResult result = BasicStatementTask::Query(connection, sql);
connection->Unlock();
return result;
}
template
PreparedQueryResult DatabaseWorkerPool::Query(PreparedStatement* stmt)
{
T* connection = GetFreeConnection();
PreparedQueryResult ret = PreparedStatementTask::Query(connection, stmt);
connection->Unlock();
//! Delete proxy-class. Not needed anymore
delete stmt;
return ret;
}
template
QueryCallback DatabaseWorkerPool::AsyncQuery(char const* sql)
{
std::future result = boost::asio::post(_ioContext->get_executor(), boost::asio::use_future([this, sql = std::string(sql), tracker = QueueSizeTracker(this)]
{
T* conn = GetAsyncConnectionForCurrentThread();
return BasicStatementTask::Query(conn, sql.c_str());
}));
return QueryCallback(std::move(result));
}
template
QueryCallback DatabaseWorkerPool::AsyncQuery(PreparedStatement* stmt)
{
std::future result = boost::asio::post(_ioContext->get_executor(), boost::asio::use_future([this, stmt = std::unique_ptr>(stmt), tracker = QueueSizeTracker(this)]
{
T* conn = GetAsyncConnectionForCurrentThread();
return PreparedStatementTask::Query(conn, stmt.get());
}));
return QueryCallback(std::move(result));
}
template
SQLQueryHolderCallback DatabaseWorkerPool::DelayQueryHolder(std::shared_ptr> holder)
{
std::future result = boost::asio::post(_ioContext->get_executor(), boost::asio::use_future([this, holder, tracker = QueueSizeTracker(this)]
{
T* conn = GetAsyncConnectionForCurrentThread();
SQLQueryHolderTask::Execute(conn, holder.get());
}));
return { std::move(holder), std::move(result) };
}
template
SQLTransaction DatabaseWorkerPool::BeginTransaction()
{
return std::make_shared>();
}
template
void DatabaseWorkerPool::CommitTransaction(SQLTransaction transaction)
{
#ifdef TRINITY_DEBUG
//! Only analyze transaction weaknesses in Debug mode.
//! Ideally we catch the faults in Debug mode and then correct them,
//! so there's no need to waste these CPU cycles in Release mode.
switch (transaction->GetSize())
{
case 0:
TC_LOG_DEBUG("sql.driver", "Transaction contains 0 queries. Not executing.");
return;
case 1:
TC_LOG_DEBUG("sql.driver", "Warning: Transaction only holds 1 query, consider removing Transaction context in code.");
break;
default:
break;
}
#endif // TRINITY_DEBUG
boost::asio::post(_ioContext->get_executor(), [this, transaction, tracker = QueueSizeTracker(this)]
{
T* conn = GetAsyncConnectionForCurrentThread();
TransactionTask::Execute(conn, transaction);
});
}
template
TransactionCallback DatabaseWorkerPool::AsyncCommitTransaction(SQLTransaction transaction)
{
#ifdef TRINITY_DEBUG
//! Only analyze transaction weaknesses in Debug mode.
//! Ideally we catch the faults in Debug mode and then correct them,
//! so there's no need to waste these CPU cycles in Release mode.
switch (transaction->GetSize())
{
case 0:
TC_LOG_DEBUG("sql.driver", "Transaction contains 0 queries. Not executing.");
break;
case 1:
TC_LOG_DEBUG("sql.driver", "Warning: Transaction only holds 1 query, consider removing Transaction context in code.");
break;
default:
break;
}
#endif // TRINITY_DEBUG
std::future result = boost::asio::post(_ioContext->get_executor(), boost::asio::use_future([this, transaction, tracker = QueueSizeTracker(this)]
{
T* conn = GetAsyncConnectionForCurrentThread();
return TransactionTask::Execute(conn, transaction);
}));
return TransactionCallback(std::move(result));
}
template
void DatabaseWorkerPool::DirectCommitTransaction(SQLTransaction& transaction)
{
T* connection = GetFreeConnection();
int errorCode = connection->ExecuteTransaction(transaction);
if (!errorCode)
{
connection->Unlock(); // OK, operation succesful
return;
}
//! Handle MySQL Errno 1213 without extending deadlock to the core itself
/// @todo More elegant way
if (errorCode == ER_LOCK_DEADLOCK)
{
//todo: handle multiple sync threads deadlocking in a similar way as async threads
uint8 loopBreaker = 5;
for (uint8 i = 0; i < loopBreaker; ++i)
{
if (!connection->ExecuteTransaction(transaction))
break;
}
}
//! Clean up now.
transaction->Cleanup();
connection->Unlock();
}
template
PreparedStatement* DatabaseWorkerPool::GetPreparedStatement(PreparedStatementIndex index)
{
return new PreparedStatement(index, _preparedStatementSize[index]);
}
template
void DatabaseWorkerPool::EscapeString(std::string& str)
{
if (str.empty())
return;
char* buf = new char[str.size() * 2 + 1];
EscapeString(buf, str.c_str(), uint32(str.size()));
str = buf;
delete[] buf;
}
template
void DatabaseWorkerPool::KeepAlive()
{
//! Ping synchronous connections
for (auto& connection : _connections[IDX_SYNCH])
{
if (connection->LockIfReady())
{
connection->Ping();
connection->Unlock();
}
}
//! Assuming all worker threads are free, every worker thread will receive 1 ping operation request
//! If one or more worker threads are busy, the ping operations will not be split evenly, but this doesn't matter
//! as the sole purpose is to prevent connections from idling.
auto const count = _connections[IDX_ASYNC].size();
for (uint8 i = 0; i < count; ++i)
{
boost::asio::post(_ioContext->get_executor(), [this, tracker = QueueSizeTracker(this)]
{
T* conn = GetAsyncConnectionForCurrentThread();
conn->Ping();
});
}
}
#ifdef TRINITY_DEBUG
template
void DatabaseWorkerPool::WarnAboutSyncQueries([[maybe_unused]] bool warn)
{
WarnSyncQueries = warn;
}
#endif
template
uint32 DatabaseWorkerPool::OpenConnections(InternalIndex type, uint8 numConnections)
{
for (uint8 i = 0; i < numConnections; ++i)
{
// Create the connection
constexpr std::array flags = { { CONNECTION_ASYNC, CONNECTION_SYNCH } };
std::unique_ptr connection = std::make_unique(*_connectionInfo, flags[type]);
if (uint32 error = connection->Open())
{
// Failed to open a connection or invalid version, abort and cleanup
_connections[type].clear();
return error;
}
else if (uint32 serverVersion = connection->GetServerVersion(); serverVersion < ParseVersionString(TRINITY_REQUIRED_MYSQL_VERSION))
{
TC_LOG_ERROR("sql.driver", "TrinityCore does not support " TRINITY_MYSQL_FLAVOR " versions below " TRINITY_REQUIRED_MYSQL_VERSION " (found id {}, need id >= {}), please update your " TRINITY_MYSQL_FLAVOR " server", serverVersion, ParseVersionString(TRINITY_REQUIRED_MYSQL_VERSION));
return 1;
}
else
{
_connections[type].push_back(std::move(connection));
}
}
// Everything is fine
return 0;
}
template
unsigned long DatabaseWorkerPool::EscapeString(char* to, char const* from, unsigned long length)
{
if (!to || !from || !length)
return 0;
return _connections[IDX_SYNCH].front()->EscapeString(to, from, length);
}
template
size_t DatabaseWorkerPool::QueueSize() const
{
return _queueSize;
}
template
T* DatabaseWorkerPool::GetFreeConnection()
{
#ifdef TRINITY_DEBUG
if (WarnSyncQueries)
{
TC_LOG_WARN("sql.performances", "Sync query at:\n{}", boost::stacktrace::to_string(boost::stacktrace::stacktrace()));
}
#endif
uint8 i = 0;
auto const num_cons = _connections[IDX_SYNCH].size();
T* connection = nullptr;
//! Block forever until a connection is free
for (;;)
{
connection = _connections[IDX_SYNCH][i++ % num_cons].get();
//! Must be matched with t->Unlock() or you will get deadlocks
if (connection->LockIfReady())
break;
}
return connection;
}
template
T* DatabaseWorkerPool::GetAsyncConnectionForCurrentThread() const
{
std::thread::id id = std::this_thread::get_id();
for (auto&& connection : _connections[IDX_ASYNC])
if (connection->GetWorkerThreadId() == id)
return connection.get();
return nullptr;
}
template
char const* DatabaseWorkerPool::GetDatabaseName() const
{
return _connectionInfo->database.c_str();
}
template
void DatabaseWorkerPool::Execute(char const* sql)
{
if (!sql)
return;
boost::asio::post(_ioContext->get_executor(), [this, sql = std::string(sql), tracker = QueueSizeTracker(this)]
{
T* conn = GetAsyncConnectionForCurrentThread();
BasicStatementTask::Execute(conn, sql.c_str());
});
}
template
void DatabaseWorkerPool::Execute(PreparedStatement* stmt)
{
boost::asio::post(_ioContext->get_executor(), [this, stmt = std::unique_ptr>(stmt), tracker = QueueSizeTracker(this)]
{
T* conn = GetAsyncConnectionForCurrentThread();
PreparedStatementTask::Execute(conn, stmt.get());
});
}
template
void DatabaseWorkerPool::DirectExecute(char const* sql)
{
if (!sql)
return;
T* connection = GetFreeConnection();
BasicStatementTask::Execute(connection, sql);
connection->Unlock();
}
template
void DatabaseWorkerPool::DirectExecute(PreparedStatement* stmt)
{
T* connection = GetFreeConnection();
PreparedStatementTask::Execute(connection, stmt);
connection->Unlock();
//! Delete proxy-class. Not needed anymore
delete stmt;
}
template
void DatabaseWorkerPool::ExecuteOrAppend(SQLTransaction& trans, char const* sql)
{
if (!trans)
Execute(sql);
else
trans->Append(sql);
}
template
void DatabaseWorkerPool::ExecuteOrAppend(SQLTransaction& trans, PreparedStatement* stmt)
{
if (!trans)
Execute(stmt);
else
trans->Append(stmt);
}
template class TC_DATABASE_API DatabaseWorkerPool;
template class TC_DATABASE_API DatabaseWorkerPool;
template class TC_DATABASE_API DatabaseWorkerPool;
template class TC_DATABASE_API DatabaseWorkerPool;