Core/Database: Replace DatabaseWorker with asio io_context

This commit is contained in:
Shauren
2023-12-15 12:06:59 +01:00
parent ee95a5e00f
commit d958bfd0f3
19 changed files with 236 additions and 402 deletions

View File

@@ -18,6 +18,7 @@
#ifndef IoContext_h__
#define IoContext_h__
#include <boost/asio/bind_executor.hpp>
#include <boost/asio/io_context.hpp>
#include <boost/asio/post.hpp>
@@ -28,6 +29,8 @@ namespace Trinity
class IoContext
{
public:
using Executor = boost::asio::io_context::executor_type;
IoContext() : _impl() { }
explicit IoContext(int concurrency_hint) : _impl(concurrency_hint) { }
@@ -35,9 +38,13 @@ namespace Trinity
operator boost::asio::io_context const&() const { return _impl; }
std::size_t run() { return _impl.run(); }
std::size_t poll() { return _impl.poll(); }
void stop() { _impl.stop(); }
boost::asio::io_context::executor_type get_executor() noexcept { return _impl.get_executor(); }
bool stopped() const { return _impl.stopped(); }
void restart() { return _impl.restart(); }
Executor get_executor() noexcept { return _impl.get_executor(); }
private:
boost::asio::io_context _impl;
@@ -49,6 +56,14 @@ namespace Trinity
return boost::asio::post(ioContext, std::forward<T>(t));
}
template<typename T>
inline decltype(auto) post(boost::asio::io_context::executor_type& executor, T&& t)
{
return boost::asio::post(executor, std::forward<T>(t));
}
using boost::asio::bind_executor;
template<typename T>
inline decltype(auto) get_io_context(T&& ioObject)
{

View File

@@ -19,7 +19,6 @@
#define Strand_h__
#include "IoContext.h"
#include <boost/asio/bind_executor.hpp>
#include <boost/asio/strand.hpp>
namespace Trinity

View File

@@ -18,41 +18,21 @@
#include "AdhocStatement.h"
#include "MySQLConnection.h"
#include "QueryResult.h"
#include <cstdlib>
#include <cstring>
/*! Basic, ad-hoc queries. */
BasicStatementTask::BasicStatementTask(char const* sql, bool async) :
m_result(nullptr)
QueryResult BasicStatementTask::Query(MySQLConnection* conn, char const* sql)
{
m_sql = strdup(sql);
m_has_result = async; // If the operation is async, then there's a result
if (async)
m_result = new QueryResultPromise();
}
BasicStatementTask::~BasicStatementTask()
{
free((void*)m_sql);
if (m_has_result && m_result != nullptr)
delete m_result;
}
bool BasicStatementTask::Execute()
{
if (m_has_result)
ResultSet* result = conn->Query(sql);
if (!result || !result->GetRowCount() || !result->NextRow())
{
ResultSet* result = m_conn->Query(m_sql);
if (!result || !result->GetRowCount() || !result->NextRow())
{
delete result;
m_result->set_value(QueryResult(nullptr));
return false;
}
m_result->set_value(QueryResult(result));
return true;
delete result;
result = nullptr;
}
return m_conn->Execute(m_sql);
return QueryResult(result);
}
bool BasicStatementTask::Execute(MySQLConnection* conn, char const* sql)
{
return conn->Execute(sql);
}

View File

@@ -18,24 +18,17 @@
#ifndef _ADHOCSTATEMENT_H
#define _ADHOCSTATEMENT_H
#include "Define.h"
#include "DatabaseEnvFwd.h"
#include "SQLOperation.h"
#include "Define.h"
class MySQLConnection;
/*! Raw, ad-hoc query. */
class TC_DATABASE_API BasicStatementTask : public SQLOperation
class TC_DATABASE_API BasicStatementTask
{
public:
BasicStatementTask(char const* sql, bool async = false);
~BasicStatementTask();
bool Execute() override;
QueryResultFuture GetFuture() const { return m_result->get_future(); }
private:
char const* m_sql; //- Raw query to be executed
bool m_has_result;
QueryResultPromise* m_result;
public:
static QueryResult Query(MySQLConnection* conn, char const* sql);
static bool Execute(MySQLConnection* conn, char const* sql);
};
#endif

View File

@@ -1,58 +0,0 @@
/*
* This file is part of the TrinityCore Project. See AUTHORS file for Copyright information
*
* This program is free software; you can redistribute it and/or modify it
* under the terms of the GNU General Public License as published by the
* Free Software Foundation; either version 2 of the License, or (at your
* option) any later version.
*
* This program is distributed in the hope that it will be useful, but WITHOUT
* ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or
* FITNESS FOR A PARTICULAR PURPOSE. See the GNU General Public License for
* more details.
*
* You should have received a copy of the GNU General Public License along
* with this program. If not, see <http://www.gnu.org/licenses/>.
*/
#include "DatabaseWorker.h"
#include "SQLOperation.h"
#include "ProducerConsumerQueue.h"
DatabaseWorker::DatabaseWorker(ProducerConsumerQueue<SQLOperation*>* newQueue, MySQLConnection* connection)
{
_connection = connection;
_queue = newQueue;
_cancelationToken = false;
_workerThread = std::thread(&DatabaseWorker::WorkerThread, this);
}
DatabaseWorker::~DatabaseWorker()
{
_cancelationToken = true;
_queue->Cancel();
_workerThread.join();
}
void DatabaseWorker::WorkerThread()
{
if (!_queue)
return;
for (;;)
{
SQLOperation* operation = nullptr;
_queue->WaitAndPop(operation);
if (_cancelationToken || !operation)
return;
operation->SetConnection(_connection);
operation->call();
delete operation;
}
}

View File

@@ -1,50 +0,0 @@
/*
* This file is part of the TrinityCore Project. See AUTHORS file for Copyright information
*
* This program is free software; you can redistribute it and/or modify it
* under the terms of the GNU General Public License as published by the
* Free Software Foundation; either version 2 of the License, or (at your
* option) any later version.
*
* This program is distributed in the hope that it will be useful, but WITHOUT
* ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or
* FITNESS FOR A PARTICULAR PURPOSE. See the GNU General Public License for
* more details.
*
* You should have received a copy of the GNU General Public License along
* with this program. If not, see <http://www.gnu.org/licenses/>.
*/
#ifndef _WORKERTHREAD_H
#define _WORKERTHREAD_H
#include "Define.h"
#include <atomic>
#include <thread>
template <typename T>
class ProducerConsumerQueue;
class MySQLConnection;
class SQLOperation;
class TC_DATABASE_API DatabaseWorker
{
public:
DatabaseWorker(ProducerConsumerQueue<SQLOperation*>* newQueue, MySQLConnection* connection);
~DatabaseWorker();
private:
ProducerConsumerQueue<SQLOperation*>* _queue;
MySQLConnection* _connection;
void WorkerThread();
std::thread _workerThread;
std::atomic<bool> _cancelationToken;
DatabaseWorker(DatabaseWorker const& right) = delete;
DatabaseWorker& operator=(DatabaseWorker const& right) = delete;
};
#endif

View File

@@ -19,6 +19,7 @@
#include "AdhocStatement.h"
#include "Common.h"
#include "Errors.h"
#include "IoContext.h"
#include "Implementation/LoginDatabase.h"
#include "Implementation/WorldDatabase.h"
#include "Implementation/CharacterDatabase.h"
@@ -30,9 +31,9 @@
#include "QueryCallback.h"
#include "QueryHolder.h"
#include "QueryResult.h"
#include "SQLOperation.h"
#include "Transaction.h"
#include "MySQLWorkaround.h"
#include <boost/asio/use_future.hpp>
#include <mysqld_error.h>
#ifdef TRINITY_DEBUG
#include <sstream>
@@ -49,20 +50,59 @@
#define MIN_MARIADB_CLIENT_VERSION 30003u
#define MIN_MARIADB_CLIENT_VERSION_STRING "3.0.3"
class PingOperation : public SQLOperation
template<typename T>
struct DatabaseWorkerPool<T>::QueueSizeTracker
{
//! Operation for idle delaythreads
bool Execute() override
explicit QueueSizeTracker(DatabaseWorkerPool* pool) : _pool(pool)
{
m_conn->Ping();
return true;
++_pool->_queueSize;
}
QueueSizeTracker(QueueSizeTracker const& other) : _pool(other._pool) { ++_pool->_queueSize; }
QueueSizeTracker(QueueSizeTracker&& other) noexcept : _pool(std::exchange(other._pool, nullptr)) { }
QueueSizeTracker& operator=(QueueSizeTracker const& other)
{
if (this != &other)
{
if (_pool != other._pool)
{
if (_pool)
--_pool->_queueSize;
if (other._pool)
++other._pool->_queueSize;
}
_pool = other._pool;
}
return *this;
}
QueueSizeTracker& operator=(QueueSizeTracker&& other) noexcept
{
if (this != &other)
{
if (_pool != other._pool)
{
if (_pool)
--_pool->_queueSize;
}
_pool = std::exchange(other._pool, nullptr);
}
return *this;
}
~QueueSizeTracker()
{
if (_pool)
--_pool->_queueSize;
}
private:
DatabaseWorkerPool* _pool;
};
template <class T>
DatabaseWorkerPool<T>::DatabaseWorkerPool()
: _queue(new ProducerConsumerQueue<SQLOperation*>()),
_async_threads(0), _synch_threads(0)
: _async_threads(0), _synch_threads(0)
{
WPFatal(mysql_thread_safe(), "Used MySQL library isn't thread-safe.");
@@ -78,7 +118,6 @@ DatabaseWorkerPool<T>::DatabaseWorkerPool()
template <class T>
DatabaseWorkerPool<T>::~DatabaseWorkerPool()
{
_queue->Cancel();
}
template <class T>
@@ -100,6 +139,8 @@ uint32 DatabaseWorkerPool<T>::Open()
"Asynchronous connections: {}, synchronous connections: {}.",
GetDatabaseName(), _async_threads, _synch_threads);
_ioContext = std::make_unique<Trinity::Asio::IoContext>(_async_threads);
uint32 error = OpenConnections(IDX_ASYNC, _async_threads);
if (error)
@@ -122,9 +163,13 @@ void DatabaseWorkerPool<T>::Close()
{
TC_LOG_INFO("sql.driver", "Closing down DatabasePool '{}'.", GetDatabaseName());
_ioContext->stop();
//! Closes the actualy MySQL connection.
_connections[IDX_ASYNC].clear();
_ioContext.reset();
TC_LOG_INFO("sql.driver", "Asynchronous connections on DatabasePool '{}' terminated. "
"Proceeding with synchronous connections.",
GetDatabaseName());
@@ -188,63 +233,55 @@ QueryResult DatabaseWorkerPool<T>::Query(char const* sql, T* connection /*= null
if (!connection)
connection = GetFreeConnection();
ResultSet* result = connection->Query(sql);
QueryResult result = BasicStatementTask::Query(connection, sql);
connection->Unlock();
if (!result || !result->GetRowCount() || !result->NextRow())
{
delete result;
return QueryResult(nullptr);
}
return QueryResult(result);
return result;
}
template <class T>
PreparedQueryResult DatabaseWorkerPool<T>::Query(PreparedStatement<T>* stmt)
{
auto connection = GetFreeConnection();
PreparedResultSet* ret = connection->Query(stmt);
T* connection = GetFreeConnection();
PreparedQueryResult ret = PreparedStatementTask::Query(connection, stmt);
connection->Unlock();
//! Delete proxy-class. Not needed anymore
delete stmt;
if (!ret || !ret->GetRowCount())
{
delete ret;
return PreparedQueryResult(nullptr);
}
return PreparedQueryResult(ret);
return ret;
}
template <class T>
QueryCallback DatabaseWorkerPool<T>::AsyncQuery(char const* sql)
{
BasicStatementTask* task = new BasicStatementTask(sql, true);
// Store future result before enqueueing - task might get already processed and deleted before returning from this method
QueryResultFuture result = task->GetFuture();
Enqueue(task);
QueryResultFuture result = boost::asio::post(_ioContext->get_executor(), boost::asio::use_future([this, sql = std::string(sql), tracker = QueueSizeTracker(this)]
{
T* conn = GetAsyncConnectionForCurrentThread();
return BasicStatementTask::Query(conn, sql.c_str());
}));
return QueryCallback(std::move(result));
}
template <class T>
QueryCallback DatabaseWorkerPool<T>::AsyncQuery(PreparedStatement<T>* stmt)
{
PreparedStatementTask* task = new PreparedStatementTask(stmt, true);
// Store future result before enqueueing - task might get already processed and deleted before returning from this method
PreparedQueryResultFuture result = task->GetFuture();
Enqueue(task);
PreparedQueryResultFuture result = boost::asio::post(_ioContext->get_executor(), boost::asio::use_future([this, stmt = std::unique_ptr<PreparedStatement<T>>(stmt), tracker = QueueSizeTracker(this)]
{
T* conn = GetAsyncConnectionForCurrentThread();
return PreparedStatementTask::Query(conn, stmt.get());
}));
return QueryCallback(std::move(result));
}
template <class T>
SQLQueryHolderCallback DatabaseWorkerPool<T>::DelayQueryHolder(std::shared_ptr<SQLQueryHolder<T>> holder)
{
SQLQueryHolderTask* task = new SQLQueryHolderTask(holder);
// Store future result before enqueueing - task might get already processed and deleted before returning from this method
QueryResultHolderFuture result = task->GetFuture();
Enqueue(task);
QueryResultHolderFuture result = boost::asio::post(_ioContext->get_executor(), boost::asio::use_future([this, holder, tracker = QueueSizeTracker(this)]
{
T* conn = GetAsyncConnectionForCurrentThread();
SQLQueryHolderTask::Execute(conn, holder.get());
}));
return { std::move(holder), std::move(result) };
}
@@ -274,7 +311,11 @@ void DatabaseWorkerPool<T>::CommitTransaction(SQLTransaction<T> transaction)
}
#endif // TRINITY_DEBUG
Enqueue(new TransactionTask(transaction));
boost::asio::post(_ioContext->get_executor(), [this, transaction, tracker = QueueSizeTracker(this)]
{
T* conn = GetAsyncConnectionForCurrentThread();
TransactionTask::Execute(conn, transaction);
});
}
template <class T>
@@ -297,9 +338,11 @@ TransactionCallback DatabaseWorkerPool<T>::AsyncCommitTransaction(SQLTransaction
}
#endif // TRINITY_DEBUG
TransactionWithResultTask* task = new TransactionWithResultTask(transaction);
TransactionFuture result = task->GetFuture();
Enqueue(task);
TransactionFuture result = boost::asio::post(_ioContext->get_executor(), boost::asio::use_future([this, transaction, tracker = QueueSizeTracker(this)]
{
T* conn = GetAsyncConnectionForCurrentThread();
return TransactionTask::Execute(conn, transaction);
}));
return TransactionCallback(std::move(result));
}
@@ -369,7 +412,13 @@ void DatabaseWorkerPool<T>::KeepAlive()
//! as the sole purpose is to prevent connections from idling.
auto const count = _connections[IDX_ASYNC].size();
for (uint8 i = 0; i < count; ++i)
Enqueue(new PingOperation);
{
boost::asio::post(_ioContext->get_executor(), [this, tracker = QueueSizeTracker(this)]
{
T* conn = GetAsyncConnectionForCurrentThread();
conn->Ping();
});
}
}
template <class T>
@@ -384,7 +433,7 @@ uint32 DatabaseWorkerPool<T>::OpenConnections(InternalIndex type, uint8 numConne
case IDX_ASYNC:
{
auto c = std::make_unique<T>(*_connectionInfo, CONNECTION_ASYNC);
c->StartDatabaseWorkerThread(_queue.get());
c->StartWorkerThread(_ioContext.get());
return c;
}
case IDX_SYNCH:
@@ -433,16 +482,10 @@ unsigned long DatabaseWorkerPool<T>::EscapeString(char* to, char const* from, un
return _connections[IDX_SYNCH].front()->EscapeString(to, from, length);
}
template <class T>
void DatabaseWorkerPool<T>::Enqueue(SQLOperation* op)
{
_queue->Push(op);
}
template <class T>
size_t DatabaseWorkerPool<T>::QueueSize() const
{
return _queue->Size();
return _queueSize;
}
template <class T>
@@ -472,6 +515,17 @@ T* DatabaseWorkerPool<T>::GetFreeConnection()
return connection;
}
template <class T>
T* DatabaseWorkerPool<T>::GetAsyncConnectionForCurrentThread() const
{
std::thread::id id = std::this_thread::get_id();
for (auto&& connection : _connections[IDX_ASYNC])
if (connection->GetWorkerThreadId() == id)
return connection.get();
return nullptr;
}
template <class T>
char const* DatabaseWorkerPool<T>::GetDatabaseName() const
{
@@ -484,15 +538,21 @@ void DatabaseWorkerPool<T>::Execute(char const* sql)
if (!sql)
return;
BasicStatementTask* task = new BasicStatementTask(sql);
Enqueue(task);
boost::asio::post(_ioContext->get_executor(), [this, sql = std::string(sql), tracker = QueueSizeTracker(this)]
{
T* conn = GetAsyncConnectionForCurrentThread();
BasicStatementTask::Execute(conn, sql.c_str());
});
}
template <class T>
void DatabaseWorkerPool<T>::Execute(PreparedStatement<T>* stmt)
{
PreparedStatementTask* task = new PreparedStatementTask(stmt);
Enqueue(task);
boost::asio::post(_ioContext->get_executor(), [this, stmt = std::unique_ptr<PreparedStatement<T>>(stmt), tracker = QueueSizeTracker(this)]
{
T* conn = GetAsyncConnectionForCurrentThread();
PreparedStatementTask::Execute(conn, stmt.get());
});
}
template <class T>
@@ -502,7 +562,7 @@ void DatabaseWorkerPool<T>::DirectExecute(char const* sql)
return;
T* connection = GetFreeConnection();
connection->Execute(sql);
BasicStatementTask::Execute(connection, sql);
connection->Unlock();
}
@@ -510,7 +570,7 @@ template <class T>
void DatabaseWorkerPool<T>::DirectExecute(PreparedStatement<T>* stmt)
{
T* connection = GetFreeConnection();
connection->Execute(stmt);
PreparedStatementTask::Execute(connection, stmt);
connection->Unlock();
//! Delete proxy-class. Not needed anymore

View File

@@ -18,6 +18,7 @@
#ifndef _DATABASEWORKERPOOL_H
#define _DATABASEWORKERPOOL_H
#include "AsioHacksFwd.h"
#include "Define.h"
#include "DatabaseEnvFwd.h"
#include "StringFormat.h"
@@ -25,10 +26,6 @@
#include <string>
#include <vector>
template <typename T>
class ProducerConsumerQueue;
class SQLOperation;
struct MySQLConnectionInfo;
template <class T>
@@ -220,16 +217,20 @@ class DatabaseWorkerPool
unsigned long EscapeString(char* to, char const* from, unsigned long length);
void Enqueue(SQLOperation* op);
//! Gets a free connection in the synchronous connection pool.
//! Caller MUST call t->Unlock() after touching the MySQL context to prevent deadlocks.
T* GetFreeConnection();
T* GetAsyncConnectionForCurrentThread() const;
char const* GetDatabaseName() const;
struct QueueSizeTracker;
friend QueueSizeTracker;
//! Queue shared by async worker threads.
std::unique_ptr<ProducerConsumerQueue<SQLOperation*>> _queue;
std::unique_ptr<Trinity::Asio::IoContext> _ioContext;
std::atomic<size_t> _queueSize;
std::array<std::vector<std::unique_ptr<T>>, IDX_SIZE> _connections;
std::unique_ptr<MySQLConnectionInfo> _connectionInfo;
std::vector<uint8> _preparedStatementSize;

View File

@@ -17,7 +17,7 @@
#include "MySQLConnection.h"
#include "Common.h"
#include "DatabaseWorker.h"
#include "IoContext.h"
#include "Log.h"
#include "MySQLHacks.h"
#include "MySQLPreparedStatement.h"
@@ -64,7 +64,11 @@ MySQLConnection::~MySQLConnection()
void MySQLConnection::Close()
{
// Stop the worker thread before the statements are cleared
m_worker.reset();
if (m_workerThread)
{
m_workerThread->join();
m_workerThread.reset();
}
m_stmts.clear();
@@ -442,9 +446,22 @@ uint32 MySQLConnection::GetLastError()
return mysql_errno(m_Mysql);
}
void MySQLConnection::StartDatabaseWorkerThread(ProducerConsumerQueue<SQLOperation*>* queue)
void MySQLConnection::StartWorkerThread(Trinity::Asio::IoContext* context)
{
m_worker = std::make_unique<DatabaseWorker>(queue, this);
m_workerThread = std::make_unique<std::thread>([context]
{
boost::asio::executor_work_guard executorWorkGuard = boost::asio::make_work_guard(context->get_executor());
context->run();
});
}
std::thread::id MySQLConnection::GetWorkerThreadId() const
{
if (m_workerThread)
return m_workerThread->get_id();
return {};
}
bool MySQLConnection::LockIfReady()

View File

@@ -18,6 +18,7 @@
#ifndef _MYSQLCONNECTION_H
#define _MYSQLCONNECTION_H
#include "AsioHacksFwd.h"
#include "Define.h"
#include "DatabaseEnvFwd.h"
#include <memory>
@@ -25,12 +26,7 @@
#include <string>
#include <vector>
template <typename T>
class ProducerConsumerQueue;
class DatabaseWorker;
class MySQLPreparedStatement;
class SQLOperation;
enum ConnectionFlags
{
@@ -81,7 +77,8 @@ class TC_DATABASE_API MySQLConnection
uint32 GetLastError();
void StartDatabaseWorkerThread(ProducerConsumerQueue<SQLOperation*>* queue);
void StartWorkerThread(Trinity::Asio::IoContext* context);
std::thread::id GetWorkerThreadId() const;
protected:
/// Tries to acquire lock. If lock is acquired by another thread
@@ -106,7 +103,7 @@ class TC_DATABASE_API MySQLConnection
private:
bool _HandleMySQLErrno(uint32 errNo, uint8 attempts = 5);
std::unique_ptr<DatabaseWorker> m_worker; //!< Core worker task.
std::unique_ptr<std::thread> m_workerThread; //!< Core worker thread.
MySQLHandle* m_Mysql; //!< MySQL Handle.
MySQLConnectionInfo& m_connectionInfo; //!< Connection info (used for logging)
ConnectionFlags m_connectionFlags; //!< Connection flags (for preparing relevant statements)

View File

@@ -118,37 +118,21 @@ void PreparedStatementBase::setNull(const uint8 index)
}
//- Execution
PreparedStatementTask::PreparedStatementTask(PreparedStatementBase* stmt, bool async) :
m_stmt(stmt), m_result(nullptr)
PreparedQueryResult PreparedStatementTask::Query(MySQLConnection* conn, PreparedStatementBase* stmt)
{
m_has_result = async; // If it's async, then there's a result
if (async)
m_result = new PreparedQueryResultPromise();
}
PreparedStatementTask::~PreparedStatementTask()
{
delete m_stmt;
if (m_has_result && m_result != nullptr)
delete m_result;
}
bool PreparedStatementTask::Execute()
{
if (m_has_result)
PreparedResultSet* result = conn->Query(stmt);
if (!result || !result->GetRowCount())
{
PreparedResultSet* result = m_conn->Query(m_stmt);
if (!result || !result->GetRowCount())
{
delete result;
m_result->set_value(PreparedQueryResult(nullptr));
return false;
}
m_result->set_value(PreparedQueryResult(result));
return true;
delete result;
result = nullptr;
}
return m_conn->Execute(m_stmt);
return PreparedQueryResult(result);
}
bool PreparedStatementTask::Execute(MySQLConnection* conn, PreparedStatementBase* stmt)
{
return conn->Execute(stmt);
}
template<typename T>

View File

@@ -18,11 +18,14 @@
#ifndef _PREPAREDSTATEMENT_H
#define _PREPAREDSTATEMENT_H
#include "DatabaseEnvFwd.h"
#include "Define.h"
#include "SQLOperation.h"
#include <future>
#include <vector>
#include <array>
#include <string>
#include <variant>
#include <vector>
class MySQLConnection;
struct PreparedStatementData
{
@@ -112,18 +115,11 @@ private:
};
//- Lower-level class, enqueuable operation
class TC_DATABASE_API PreparedStatementTask : public SQLOperation
class TC_DATABASE_API PreparedStatementTask
{
public:
PreparedStatementTask(PreparedStatementBase* stmt, bool async = false);
~PreparedStatementTask();
bool Execute() override;
PreparedQueryResultFuture GetFuture() { return m_result->get_future(); }
protected:
PreparedStatementBase* m_stmt;
bool m_has_result;
PreparedQueryResultPromise* m_result;
public:
static PreparedQueryResult Query(MySQLConnection* conn, PreparedStatementBase* stmt);
static bool Execute(MySQLConnection* conn, PreparedStatementBase* stmt);
};
#endif

View File

@@ -72,16 +72,13 @@ void SQLQueryHolderBase::SetSize(size_t size)
m_queries.resize(size);
}
SQLQueryHolderTask::~SQLQueryHolderTask() = default;
bool SQLQueryHolderTask::Execute()
bool SQLQueryHolderTask::Execute(MySQLConnection* conn, SQLQueryHolderBase* holder)
{
/// execute all queries in the holder and pass the results
for (size_t i = 0; i < m_holder->m_queries.size(); ++i)
if (PreparedStatementBase* stmt = m_holder->m_queries[i].first)
m_holder->SetPreparedResult(i, m_conn->Query(stmt));
for (size_t i = 0; i < holder->m_queries.size(); ++i)
if (PreparedStatementBase* stmt = holder->m_queries[i].first)
holder->SetPreparedResult(i, conn->Query(stmt));
m_result.set_value();
return true;
}

View File

@@ -18,9 +18,12 @@
#ifndef _QUERYHOLDER_H
#define _QUERYHOLDER_H
#include "SQLOperation.h"
#include "Define.h"
#include "DatabaseEnvFwd.h"
#include <vector>
class MySQLConnection;
class TC_DATABASE_API SQLQueryHolderBase
{
friend class SQLQueryHolderTask;
@@ -47,20 +50,10 @@ public:
}
};
class TC_DATABASE_API SQLQueryHolderTask : public SQLOperation
class TC_DATABASE_API SQLQueryHolderTask
{
private:
std::shared_ptr<SQLQueryHolderBase> m_holder;
QueryResultHolderPromise m_result;
public:
explicit SQLQueryHolderTask(std::shared_ptr<SQLQueryHolderBase> holder)
: m_holder(std::move(holder)) { }
~SQLQueryHolderTask();
bool Execute() override;
QueryResultHolderFuture GetFuture() { return m_result.get_future(); }
public:
static bool Execute(MySQLConnection* conn, SQLQueryHolderBase* holder);
};
class TC_DATABASE_API SQLQueryHolderCallback

View File

@@ -18,7 +18,6 @@
#ifndef _SQLOPERATION_H
#define _SQLOPERATION_H
#include "Define.h"
#include "DatabaseEnvFwd.h"
//- Union that holds element data
@@ -42,27 +41,4 @@ struct SQLElementData
SQLElementDataType type;
};
class MySQLConnection;
class TC_DATABASE_API SQLOperation
{
public:
SQLOperation(): m_conn(nullptr) { }
virtual ~SQLOperation() { }
virtual int call()
{
Execute();
return 0;
}
virtual bool Execute() = 0;
virtual void SetConnection(MySQLConnection* con) { m_conn = con; }
MySQLConnection* m_conn;
private:
SQLOperation(SQLOperation const& right) = delete;
SQLOperation& operator=(SQLOperation const& right) = delete;
};
#endif

View File

@@ -70,9 +70,9 @@ void TransactionBase::Cleanup()
_cleanedUp = true;
}
bool TransactionTask::Execute()
bool TransactionTask::Execute(MySQLConnection* conn, std::shared_ptr<TransactionBase> trans)
{
int errorCode = TryExecute();
int errorCode = TryExecute(conn, trans);
if (!errorCode)
return true;
@@ -91,7 +91,7 @@ bool TransactionTask::Execute()
for (uint32 loopDuration = 0, startMSTime = getMSTime(); loopDuration <= DEADLOCK_MAX_RETRY_TIME_MS; loopDuration = GetMSTimeDiffToNow(startMSTime))
{
if (!TryExecute())
if (!TryExecute(conn, trans))
return true;
TC_LOG_WARN("sql.sql", "Deadlocked SQL Transaction, retrying. Loop timer: {} ms, Thread Id: {}", loopDuration, threadId);
@@ -101,61 +101,14 @@ bool TransactionTask::Execute()
}
// Clean up now.
CleanupOnFailure();
trans->Cleanup();
return false;
}
int TransactionTask::TryExecute()
int TransactionTask::TryExecute(MySQLConnection* conn, std::shared_ptr<TransactionBase> trans)
{
return m_conn->ExecuteTransaction(m_trans);
}
void TransactionTask::CleanupOnFailure()
{
m_trans->Cleanup();
}
bool TransactionWithResultTask::Execute()
{
int errorCode = TryExecute();
if (!errorCode)
{
m_result.set_value(true);
return true;
}
if (errorCode == ER_LOCK_DEADLOCK)
{
std::string threadId = []()
{
// wrapped in lambda to fix false positive analysis warning C26115
std::ostringstream threadIdStream;
threadIdStream << std::this_thread::get_id();
return threadIdStream.str();
}();
// Make sure only 1 async thread retries a transaction so they don't keep dead-locking each other
std::lock_guard<std::mutex> lock(_deadlockLock);
for (uint32 loopDuration = 0, startMSTime = getMSTime(); loopDuration <= DEADLOCK_MAX_RETRY_TIME_MS; loopDuration = GetMSTimeDiffToNow(startMSTime))
{
if (!TryExecute())
{
m_result.set_value(true);
return true;
}
TC_LOG_WARN("sql.sql", "Deadlocked SQL Transaction, retrying. Loop timer: {} ms, Thread Id: {}", loopDuration, threadId);
}
TC_LOG_ERROR("sql.sql", "Fatal deadlocked SQL Transaction, it will not be retried anymore. Thread Id: {}", threadId);
}
// Clean up now.
CleanupOnFailure();
m_result.set_value(false);
return false;
return conn->ExecuteTransaction(trans);
}
bool TransactionCallback::InvokeIfReady()

View File

@@ -26,6 +26,8 @@
#include <mutex>
#include <vector>
class MySQLConnection;
/*! Transactions, high level class. */
class TC_DATABASE_API TransactionBase
{
@@ -69,36 +71,15 @@ public:
};
/*! Low level class*/
class TC_DATABASE_API TransactionTask : public SQLOperation
{
template <class T> friend class DatabaseWorkerPool;
friend class DatabaseWorker;
friend class TransactionCallback;
public:
TransactionTask(std::shared_ptr<TransactionBase> trans) : m_trans(trans) { }
~TransactionTask() { }
protected:
bool Execute() override;
int TryExecute();
void CleanupOnFailure();
std::shared_ptr<TransactionBase> m_trans;
static std::mutex _deadlockLock;
};
class TC_DATABASE_API TransactionWithResultTask : public TransactionTask
class TC_DATABASE_API TransactionTask
{
public:
TransactionWithResultTask(std::shared_ptr<TransactionBase> trans) : TransactionTask(trans) { }
static bool Execute(MySQLConnection* conn, std::shared_ptr<TransactionBase> trans);
TransactionFuture GetFuture() { return m_result.get_future(); }
private:
static int TryExecute(MySQLConnection* conn, std::shared_ptr<TransactionBase> trans);
protected:
bool Execute() override;
TransactionPromise m_result;
static std::mutex _deadlockLock;
};
class TC_DATABASE_API TransactionCallback

View File

@@ -27,10 +27,10 @@
#include "AuctionHouseBot.h"
#include "AuctionHouseMgr.h"
#include "AuthenticationPackets.h"
#include "BattlePetMgr.h"
#include "BattlefieldMgr.h"
#include "BattlegroundMgr.h"
#include "BattlenetRpcErrorCodes.h"
#include "BattlePetMgr.h"
#include "BlackMarketMgr.h"
#include "CalendarMgr.h"
#include "ChannelMgr.h"
@@ -46,8 +46,8 @@
#include "CreatureAIRegistry.h"
#include "CreatureGroups.h"
#include "CreatureTextMgr.h"
#include "DatabaseEnv.h"
#include "DB2Stores.h"
#include "DatabaseEnv.h"
#include "DetourMemoryFunctions.h"
#include "DisableMgr.h"
#include "GameEventMgr.h"
@@ -59,21 +59,21 @@
#include "GridNotifiersImpl.h"
#include "GroupMgr.h"
#include "GuildMgr.h"
#include "InstanceLockMgr.h"
#include "IPLocation.h"
#include "InstanceLockMgr.h"
#include "ItemBonusMgr.h"
#include "LFGMgr.h"
#include "Language.h"
#include "LanguageMgr.h"
#include "LFGMgr.h"
#include "Log.h"
#include "LootItemStorage.h"
#include "LootMgr.h"
#include "M2Stores.h"
#include "MMapFactory.h"
#include "Map.h"
#include "MapManager.h"
#include "Metric.h"
#include "MiscPackets.h"
#include "MMapFactory.h"
#include "ObjectAccessor.h"
#include "ObjectMgr.h"
#include "OutdoorPvPMgr.h"
@@ -88,8 +88,8 @@
#include "ScriptReloadMgr.h"
#include "SkillDiscovery.h"
#include "SkillExtraItems.h"
#include "SpellMgr.h"
#include "SmartScriptMgr.h"
#include "SpellMgr.h"
#include "SupportMgr.h"
#include "TaxiPathGraph.h"
#include "TerrainMgr.h"

View File

@@ -754,7 +754,7 @@ class TC_GAME_API World
void ForceGameEventUpdate();
void UpdateRealmCharCount(uint32 accid);
void UpdateRealmCharCount(uint32 accountId);
LocaleConstant GetAvailableDbcLocale(LocaleConstant locale) const { if (m_availableDbcLocaleMask & (1 << locale)) return locale; else return m_defaultDbcLocale; }