aboutsummaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorShauren <shauren.trinity@gmail.com>2023-12-15 12:06:59 +0100
committerfunjoker <funjoker109@gmail.com>2023-12-19 14:14:23 +0100
commite0d45f6dff3afe5caa38c48646f23d966c8c03a1 (patch)
tree6d1b9fef794cec268fa86398d9bb8957cf99f5ba
parent575fc7fde329ab47e517357d74a47316c6413544 (diff)
Core/Database: Replace DatabaseWorker with asio io_context
(cherry picked from commit d958bfd0f32bfe798809b72c1b51c990edfe141c)
-rw-r--r--src/common/Asio/IoContext.h17
-rw-r--r--src/common/Asio/Strand.h1
-rw-r--r--src/server/database/Database/AdhocStatement.cpp40
-rw-r--r--src/server/database/Database/AdhocStatement.h21
-rw-r--r--src/server/database/Database/DatabaseWorker.cpp58
-rw-r--r--src/server/database/Database/DatabaseWorker.h50
-rw-r--r--src/server/database/Database/DatabaseWorkerPool.cpp172
-rw-r--r--src/server/database/Database/DatabaseWorkerPool.h15
-rw-r--r--src/server/database/Database/MySQLConnection.cpp25
-rw-r--r--src/server/database/Database/MySQLConnection.h11
-rw-r--r--src/server/database/Database/PreparedStatement.cpp36
-rw-r--r--src/server/database/Database/PreparedStatement.h26
-rw-r--r--src/server/database/Database/QueryHolder.cpp11
-rw-r--r--src/server/database/Database/QueryHolder.h21
-rw-r--r--src/server/database/Database/SQLOperation.h24
-rw-r--r--src/server/database/Database/Transaction.cpp59
-rw-r--r--src/server/database/Database/Transaction.h33
-rw-r--r--src/server/game/World/World.cpp12
-rw-r--r--src/server/game/World/World.h2
19 files changed, 234 insertions, 400 deletions
diff --git a/src/common/Asio/IoContext.h b/src/common/Asio/IoContext.h
index b26de194b5c..4b469d3baa2 100644
--- a/src/common/Asio/IoContext.h
+++ b/src/common/Asio/IoContext.h
@@ -18,6 +18,7 @@
#ifndef IoContext_h__
#define IoContext_h__
+#include <boost/asio/bind_executor.hpp>
#include <boost/asio/io_context.hpp>
#include <boost/asio/post.hpp>
@@ -28,6 +29,8 @@ namespace Trinity
class IoContext
{
public:
+ using Executor = boost::asio::io_context::executor_type;
+
IoContext() : _impl() { }
explicit IoContext(int concurrency_hint) : _impl(concurrency_hint) { }
@@ -35,9 +38,13 @@ namespace Trinity
operator boost::asio::io_context const&() const { return _impl; }
std::size_t run() { return _impl.run(); }
+ std::size_t poll() { return _impl.poll(); }
void stop() { _impl.stop(); }
- boost::asio::io_context::executor_type get_executor() noexcept { return _impl.get_executor(); }
+ bool stopped() const { return _impl.stopped(); }
+ void restart() { return _impl.restart(); }
+
+ Executor get_executor() noexcept { return _impl.get_executor(); }
private:
boost::asio::io_context _impl;
@@ -50,6 +57,14 @@ namespace Trinity
}
template<typename T>
+ inline decltype(auto) post(boost::asio::io_context::executor_type& executor, T&& t)
+ {
+ return boost::asio::post(executor, std::forward<T>(t));
+ }
+
+ using boost::asio::bind_executor;
+
+ template<typename T>
inline decltype(auto) get_io_context(T&& ioObject)
{
return ioObject.get_executor().context();
diff --git a/src/common/Asio/Strand.h b/src/common/Asio/Strand.h
index 942ddf55cd4..5228a30103b 100644
--- a/src/common/Asio/Strand.h
+++ b/src/common/Asio/Strand.h
@@ -19,7 +19,6 @@
#define Strand_h__
#include "IoContext.h"
-#include <boost/asio/bind_executor.hpp>
#include <boost/asio/strand.hpp>
namespace Trinity
diff --git a/src/server/database/Database/AdhocStatement.cpp b/src/server/database/Database/AdhocStatement.cpp
index 39a753f0212..68c5e835b2c 100644
--- a/src/server/database/Database/AdhocStatement.cpp
+++ b/src/server/database/Database/AdhocStatement.cpp
@@ -18,41 +18,21 @@
#include "AdhocStatement.h"
#include "MySQLConnection.h"
#include "QueryResult.h"
-#include <cstdlib>
-#include <cstring>
/*! Basic, ad-hoc queries. */
-BasicStatementTask::BasicStatementTask(char const* sql, bool async) :
-m_result(nullptr)
+QueryResult BasicStatementTask::Query(MySQLConnection* conn, char const* sql)
{
- m_sql = strdup(sql);
- m_has_result = async; // If the operation is async, then there's a result
- if (async)
- m_result = new QueryResultPromise();
-}
+ ResultSet* result = conn->Query(sql);
+ if (!result || !result->GetRowCount() || !result->NextRow())
+ {
+ delete result;
+ result = nullptr;
+ }
-BasicStatementTask::~BasicStatementTask()
-{
- free((void*)m_sql);
- if (m_has_result && m_result != nullptr)
- delete m_result;
+ return QueryResult(result);
}
-bool BasicStatementTask::Execute()
+bool BasicStatementTask::Execute(MySQLConnection* conn, char const* sql)
{
- if (m_has_result)
- {
- ResultSet* result = m_conn->Query(m_sql);
- if (!result || !result->GetRowCount() || !result->NextRow())
- {
- delete result;
- m_result->set_value(QueryResult(nullptr));
- return false;
- }
-
- m_result->set_value(QueryResult(result));
- return true;
- }
-
- return m_conn->Execute(m_sql);
+ return conn->Execute(sql);
}
diff --git a/src/server/database/Database/AdhocStatement.h b/src/server/database/Database/AdhocStatement.h
index b9e197398cc..c74b15ccd97 100644
--- a/src/server/database/Database/AdhocStatement.h
+++ b/src/server/database/Database/AdhocStatement.h
@@ -18,24 +18,17 @@
#ifndef _ADHOCSTATEMENT_H
#define _ADHOCSTATEMENT_H
-#include "Define.h"
#include "DatabaseEnvFwd.h"
-#include "SQLOperation.h"
+#include "Define.h"
+
+class MySQLConnection;
/*! Raw, ad-hoc query. */
-class TC_DATABASE_API BasicStatementTask : public SQLOperation
+class TC_DATABASE_API BasicStatementTask
{
- public:
- BasicStatementTask(char const* sql, bool async = false);
- ~BasicStatementTask();
-
- bool Execute() override;
- QueryResultFuture GetFuture() const { return m_result->get_future(); }
-
- private:
- char const* m_sql; //- Raw query to be executed
- bool m_has_result;
- QueryResultPromise* m_result;
+public:
+ static QueryResult Query(MySQLConnection* conn, char const* sql);
+ static bool Execute(MySQLConnection* conn, char const* sql);
};
#endif
diff --git a/src/server/database/Database/DatabaseWorker.cpp b/src/server/database/Database/DatabaseWorker.cpp
deleted file mode 100644
index f379f442500..00000000000
--- a/src/server/database/Database/DatabaseWorker.cpp
+++ /dev/null
@@ -1,58 +0,0 @@
-/*
- * This file is part of the TrinityCore Project. See AUTHORS file for Copyright information
- *
- * This program is free software; you can redistribute it and/or modify it
- * under the terms of the GNU General Public License as published by the
- * Free Software Foundation; either version 2 of the License, or (at your
- * option) any later version.
- *
- * This program is distributed in the hope that it will be useful, but WITHOUT
- * ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or
- * FITNESS FOR A PARTICULAR PURPOSE. See the GNU General Public License for
- * more details.
- *
- * You should have received a copy of the GNU General Public License along
- * with this program. If not, see <http://www.gnu.org/licenses/>.
- */
-
-#include "DatabaseWorker.h"
-#include "SQLOperation.h"
-#include "ProducerConsumerQueue.h"
-
-DatabaseWorker::DatabaseWorker(ProducerConsumerQueue<SQLOperation*>* newQueue, MySQLConnection* connection)
-{
- _connection = connection;
- _queue = newQueue;
- _cancelationToken = false;
- _workerThread = std::thread(&DatabaseWorker::WorkerThread, this);
-}
-
-DatabaseWorker::~DatabaseWorker()
-{
- _cancelationToken = true;
-
- _queue->Cancel();
-
- _workerThread.join();
-}
-
-void DatabaseWorker::WorkerThread()
-{
- if (!_queue)
- return;
-
- for (;;)
- {
- SQLOperation* operation = nullptr;
-
- _queue->WaitAndPop(operation);
-
- if (_cancelationToken || !operation)
- return;
-
- operation->SetConnection(_connection);
- operation->call();
-
- delete operation;
- }
-}
diff --git a/src/server/database/Database/DatabaseWorker.h b/src/server/database/Database/DatabaseWorker.h
deleted file mode 100644
index 6245332e614..00000000000
--- a/src/server/database/Database/DatabaseWorker.h
+++ /dev/null
@@ -1,50 +0,0 @@
-/*
- * This file is part of the TrinityCore Project. See AUTHORS file for Copyright information
- *
- * This program is free software; you can redistribute it and/or modify it
- * under the terms of the GNU General Public License as published by the
- * Free Software Foundation; either version 2 of the License, or (at your
- * option) any later version.
- *
- * This program is distributed in the hope that it will be useful, but WITHOUT
- * ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or
- * FITNESS FOR A PARTICULAR PURPOSE. See the GNU General Public License for
- * more details.
- *
- * You should have received a copy of the GNU General Public License along
- * with this program. If not, see <http://www.gnu.org/licenses/>.
- */
-
-#ifndef _WORKERTHREAD_H
-#define _WORKERTHREAD_H
-
-#include "Define.h"
-#include <atomic>
-#include <thread>
-
-template <typename T>
-class ProducerConsumerQueue;
-
-class MySQLConnection;
-class SQLOperation;
-
-class TC_DATABASE_API DatabaseWorker
-{
- public:
- DatabaseWorker(ProducerConsumerQueue<SQLOperation*>* newQueue, MySQLConnection* connection);
- ~DatabaseWorker();
-
- private:
- ProducerConsumerQueue<SQLOperation*>* _queue;
- MySQLConnection* _connection;
-
- void WorkerThread();
- std::thread _workerThread;
-
- std::atomic<bool> _cancelationToken;
-
- DatabaseWorker(DatabaseWorker const& right) = delete;
- DatabaseWorker& operator=(DatabaseWorker const& right) = delete;
-};
-
-#endif
diff --git a/src/server/database/Database/DatabaseWorkerPool.cpp b/src/server/database/Database/DatabaseWorkerPool.cpp
index 0b205e7c3c8..6e9f8a8df1a 100644
--- a/src/server/database/Database/DatabaseWorkerPool.cpp
+++ b/src/server/database/Database/DatabaseWorkerPool.cpp
@@ -19,6 +19,7 @@
#include "AdhocStatement.h"
#include "Common.h"
#include "Errors.h"
+#include "IoContext.h"
#include "Implementation/LoginDatabase.h"
#include "Implementation/WorldDatabase.h"
#include "Implementation/CharacterDatabase.h"
@@ -30,9 +31,9 @@
#include "QueryCallback.h"
#include "QueryHolder.h"
#include "QueryResult.h"
-#include "SQLOperation.h"
#include "Transaction.h"
#include "MySQLWorkaround.h"
+#include <boost/asio/use_future.hpp>
#include <mysqld_error.h>
#ifdef TRINITY_DEBUG
#include <sstream>
@@ -49,20 +50,59 @@
#define MIN_MARIADB_CLIENT_VERSION 30003u
#define MIN_MARIADB_CLIENT_VERSION_STRING "3.0.3"
-class PingOperation : public SQLOperation
+template<typename T>
+struct DatabaseWorkerPool<T>::QueueSizeTracker
{
- //! Operation for idle delaythreads
- bool Execute() override
+ explicit QueueSizeTracker(DatabaseWorkerPool* pool) : _pool(pool)
{
- m_conn->Ping();
- return true;
+ ++_pool->_queueSize;
}
+
+ QueueSizeTracker(QueueSizeTracker const& other) : _pool(other._pool) { ++_pool->_queueSize; }
+ QueueSizeTracker(QueueSizeTracker&& other) noexcept : _pool(std::exchange(other._pool, nullptr)) { }
+
+ QueueSizeTracker& operator=(QueueSizeTracker const& other)
+ {
+ if (this != &other)
+ {
+ if (_pool != other._pool)
+ {
+ if (_pool)
+ --_pool->_queueSize;
+ if (other._pool)
+ ++other._pool->_queueSize;
+ }
+ _pool = other._pool;
+ }
+ return *this;
+ }
+ QueueSizeTracker& operator=(QueueSizeTracker&& other) noexcept
+ {
+ if (this != &other)
+ {
+ if (_pool != other._pool)
+ {
+ if (_pool)
+ --_pool->_queueSize;
+ }
+ _pool = std::exchange(other._pool, nullptr);
+ }
+ return *this;
+ }
+
+ ~QueueSizeTracker()
+ {
+ if (_pool)
+ --_pool->_queueSize;
+ }
+
+private:
+ DatabaseWorkerPool* _pool;
};
template <class T>
DatabaseWorkerPool<T>::DatabaseWorkerPool()
- : _queue(new ProducerConsumerQueue<SQLOperation*>()),
- _async_threads(0), _synch_threads(0)
+ : _async_threads(0), _synch_threads(0)
{
WPFatal(mysql_thread_safe(), "Used MySQL library isn't thread-safe.");
@@ -78,7 +118,6 @@ DatabaseWorkerPool<T>::DatabaseWorkerPool()
template <class T>
DatabaseWorkerPool<T>::~DatabaseWorkerPool()
{
- _queue->Cancel();
}
template <class T>
@@ -100,6 +139,8 @@ uint32 DatabaseWorkerPool<T>::Open()
"Asynchronous connections: {}, synchronous connections: {}.",
GetDatabaseName(), _async_threads, _synch_threads);
+ _ioContext = std::make_unique<Trinity::Asio::IoContext>(_async_threads);
+
uint32 error = OpenConnections(IDX_ASYNC, _async_threads);
if (error)
@@ -122,9 +163,13 @@ void DatabaseWorkerPool<T>::Close()
{
TC_LOG_INFO("sql.driver", "Closing down DatabasePool '{}'.", GetDatabaseName());
+ _ioContext->stop();
+
//! Closes the actualy MySQL connection.
_connections[IDX_ASYNC].clear();
+ _ioContext.reset();
+
TC_LOG_INFO("sql.driver", "Asynchronous connections on DatabasePool '{}' terminated. "
"Proceeding with synchronous connections.",
GetDatabaseName());
@@ -188,63 +233,55 @@ QueryResult DatabaseWorkerPool<T>::Query(char const* sql, T* connection /*= null
if (!connection)
connection = GetFreeConnection();
- ResultSet* result = connection->Query(sql);
+ QueryResult result = BasicStatementTask::Query(connection, sql);
connection->Unlock();
- if (!result || !result->GetRowCount() || !result->NextRow())
- {
- delete result;
- return QueryResult(nullptr);
- }
- return QueryResult(result);
+ return result;
}
template <class T>
PreparedQueryResult DatabaseWorkerPool<T>::Query(PreparedStatement<T>* stmt)
{
- auto connection = GetFreeConnection();
- PreparedResultSet* ret = connection->Query(stmt);
+ T* connection = GetFreeConnection();
+ PreparedQueryResult ret = PreparedStatementTask::Query(connection, stmt);
connection->Unlock();
//! Delete proxy-class. Not needed anymore
delete stmt;
- if (!ret || !ret->GetRowCount())
- {
- delete ret;
- return PreparedQueryResult(nullptr);
- }
-
- return PreparedQueryResult(ret);
+ return ret;
}
template <class T>
QueryCallback DatabaseWorkerPool<T>::AsyncQuery(char const* sql)
{
- BasicStatementTask* task = new BasicStatementTask(sql, true);
- // Store future result before enqueueing - task might get already processed and deleted before returning from this method
- QueryResultFuture result = task->GetFuture();
- Enqueue(task);
+ QueryResultFuture result = boost::asio::post(_ioContext->get_executor(), boost::asio::use_future([this, sql = std::string(sql), tracker = QueueSizeTracker(this)]
+ {
+ T* conn = GetAsyncConnectionForCurrentThread();
+ return BasicStatementTask::Query(conn, sql.c_str());
+ }));
return QueryCallback(std::move(result));
}
template <class T>
QueryCallback DatabaseWorkerPool<T>::AsyncQuery(PreparedStatement<T>* stmt)
{
- PreparedStatementTask* task = new PreparedStatementTask(stmt, true);
- // Store future result before enqueueing - task might get already processed and deleted before returning from this method
- PreparedQueryResultFuture result = task->GetFuture();
- Enqueue(task);
+ PreparedQueryResultFuture result = boost::asio::post(_ioContext->get_executor(), boost::asio::use_future([this, stmt = std::unique_ptr<PreparedStatement<T>>(stmt), tracker = QueueSizeTracker(this)]
+ {
+ T* conn = GetAsyncConnectionForCurrentThread();
+ return PreparedStatementTask::Query(conn, stmt.get());
+ }));
return QueryCallback(std::move(result));
}
template <class T>
SQLQueryHolderCallback DatabaseWorkerPool<T>::DelayQueryHolder(std::shared_ptr<SQLQueryHolder<T>> holder)
{
- SQLQueryHolderTask* task = new SQLQueryHolderTask(holder);
- // Store future result before enqueueing - task might get already processed and deleted before returning from this method
- QueryResultHolderFuture result = task->GetFuture();
- Enqueue(task);
+ QueryResultHolderFuture result = boost::asio::post(_ioContext->get_executor(), boost::asio::use_future([this, holder, tracker = QueueSizeTracker(this)]
+ {
+ T* conn = GetAsyncConnectionForCurrentThread();
+ SQLQueryHolderTask::Execute(conn, holder.get());
+ }));
return { std::move(holder), std::move(result) };
}
@@ -274,7 +311,11 @@ void DatabaseWorkerPool<T>::CommitTransaction(SQLTransaction<T> transaction)
}
#endif // TRINITY_DEBUG
- Enqueue(new TransactionTask(transaction));
+ boost::asio::post(_ioContext->get_executor(), [this, transaction, tracker = QueueSizeTracker(this)]
+ {
+ T* conn = GetAsyncConnectionForCurrentThread();
+ TransactionTask::Execute(conn, transaction);
+ });
}
template <class T>
@@ -297,9 +338,11 @@ TransactionCallback DatabaseWorkerPool<T>::AsyncCommitTransaction(SQLTransaction
}
#endif // TRINITY_DEBUG
- TransactionWithResultTask* task = new TransactionWithResultTask(transaction);
- TransactionFuture result = task->GetFuture();
- Enqueue(task);
+ TransactionFuture result = boost::asio::post(_ioContext->get_executor(), boost::asio::use_future([this, transaction, tracker = QueueSizeTracker(this)]
+ {
+ T* conn = GetAsyncConnectionForCurrentThread();
+ return TransactionTask::Execute(conn, transaction);
+ }));
return TransactionCallback(std::move(result));
}
@@ -369,7 +412,13 @@ void DatabaseWorkerPool<T>::KeepAlive()
//! as the sole purpose is to prevent connections from idling.
auto const count = _connections[IDX_ASYNC].size();
for (uint8 i = 0; i < count; ++i)
- Enqueue(new PingOperation);
+ {
+ boost::asio::post(_ioContext->get_executor(), [this, tracker = QueueSizeTracker(this)]
+ {
+ T* conn = GetAsyncConnectionForCurrentThread();
+ conn->Ping();
+ });
+ }
}
template <class T>
@@ -384,7 +433,7 @@ uint32 DatabaseWorkerPool<T>::OpenConnections(InternalIndex type, uint8 numConne
case IDX_ASYNC:
{
auto c = std::make_unique<T>(*_connectionInfo, CONNECTION_ASYNC);
- c->StartDatabaseWorkerThread(_queue.get());
+ c->StartWorkerThread(_ioContext.get());
return c;
}
case IDX_SYNCH:
@@ -434,15 +483,9 @@ unsigned long DatabaseWorkerPool<T>::EscapeString(char* to, char const* from, un
}
template <class T>
-void DatabaseWorkerPool<T>::Enqueue(SQLOperation* op)
-{
- _queue->Push(op);
-}
-
-template <class T>
size_t DatabaseWorkerPool<T>::QueueSize() const
{
- return _queue->Size();
+ return _queueSize;
}
template <class T>
@@ -473,6 +516,17 @@ T* DatabaseWorkerPool<T>::GetFreeConnection()
}
template <class T>
+T* DatabaseWorkerPool<T>::GetAsyncConnectionForCurrentThread() const
+{
+ std::thread::id id = std::this_thread::get_id();
+ for (auto&& connection : _connections[IDX_ASYNC])
+ if (connection->GetWorkerThreadId() == id)
+ return connection.get();
+
+ return nullptr;
+}
+
+template <class T>
char const* DatabaseWorkerPool<T>::GetDatabaseName() const
{
return _connectionInfo->database.c_str();
@@ -484,15 +538,21 @@ void DatabaseWorkerPool<T>::Execute(char const* sql)
if (!sql)
return;
- BasicStatementTask* task = new BasicStatementTask(sql);
- Enqueue(task);
+ boost::asio::post(_ioContext->get_executor(), [this, sql = std::string(sql), tracker = QueueSizeTracker(this)]
+ {
+ T* conn = GetAsyncConnectionForCurrentThread();
+ BasicStatementTask::Execute(conn, sql.c_str());
+ });
}
template <class T>
void DatabaseWorkerPool<T>::Execute(PreparedStatement<T>* stmt)
{
- PreparedStatementTask* task = new PreparedStatementTask(stmt);
- Enqueue(task);
+ boost::asio::post(_ioContext->get_executor(), [this, stmt = std::unique_ptr<PreparedStatement<T>>(stmt), tracker = QueueSizeTracker(this)]
+ {
+ T* conn = GetAsyncConnectionForCurrentThread();
+ PreparedStatementTask::Execute(conn, stmt.get());
+ });
}
template <class T>
@@ -502,7 +562,7 @@ void DatabaseWorkerPool<T>::DirectExecute(char const* sql)
return;
T* connection = GetFreeConnection();
- connection->Execute(sql);
+ BasicStatementTask::Execute(connection, sql);
connection->Unlock();
}
@@ -510,7 +570,7 @@ template <class T>
void DatabaseWorkerPool<T>::DirectExecute(PreparedStatement<T>* stmt)
{
T* connection = GetFreeConnection();
- connection->Execute(stmt);
+ PreparedStatementTask::Execute(connection, stmt);
connection->Unlock();
//! Delete proxy-class. Not needed anymore
diff --git a/src/server/database/Database/DatabaseWorkerPool.h b/src/server/database/Database/DatabaseWorkerPool.h
index 6e18d466b7f..70c7a022510 100644
--- a/src/server/database/Database/DatabaseWorkerPool.h
+++ b/src/server/database/Database/DatabaseWorkerPool.h
@@ -18,6 +18,7 @@
#ifndef _DATABASEWORKERPOOL_H
#define _DATABASEWORKERPOOL_H
+#include "AsioHacksFwd.h"
#include "Define.h"
#include "DatabaseEnvFwd.h"
#include "StringFormat.h"
@@ -25,10 +26,6 @@
#include <string>
#include <vector>
-template <typename T>
-class ProducerConsumerQueue;
-
-class SQLOperation;
struct MySQLConnectionInfo;
template <class T>
@@ -220,16 +217,20 @@ class DatabaseWorkerPool
unsigned long EscapeString(char* to, char const* from, unsigned long length);
- void Enqueue(SQLOperation* op);
-
//! Gets a free connection in the synchronous connection pool.
//! Caller MUST call t->Unlock() after touching the MySQL context to prevent deadlocks.
T* GetFreeConnection();
+ T* GetAsyncConnectionForCurrentThread() const;
+
char const* GetDatabaseName() const;
+ struct QueueSizeTracker;
+ friend QueueSizeTracker;
+
//! Queue shared by async worker threads.
- std::unique_ptr<ProducerConsumerQueue<SQLOperation*>> _queue;
+ std::unique_ptr<Trinity::Asio::IoContext> _ioContext;
+ std::atomic<size_t> _queueSize;
std::array<std::vector<std::unique_ptr<T>>, IDX_SIZE> _connections;
std::unique_ptr<MySQLConnectionInfo> _connectionInfo;
std::vector<uint8> _preparedStatementSize;
diff --git a/src/server/database/Database/MySQLConnection.cpp b/src/server/database/Database/MySQLConnection.cpp
index c42ca4e2903..56e384520c2 100644
--- a/src/server/database/Database/MySQLConnection.cpp
+++ b/src/server/database/Database/MySQLConnection.cpp
@@ -17,7 +17,7 @@
#include "MySQLConnection.h"
#include "Common.h"
-#include "DatabaseWorker.h"
+#include "IoContext.h"
#include "Log.h"
#include "MySQLHacks.h"
#include "MySQLPreparedStatement.h"
@@ -64,7 +64,11 @@ MySQLConnection::~MySQLConnection()
void MySQLConnection::Close()
{
// Stop the worker thread before the statements are cleared
- m_worker.reset();
+ if (m_workerThread)
+ {
+ m_workerThread->join();
+ m_workerThread.reset();
+ }
m_stmts.clear();
@@ -442,9 +446,22 @@ uint32 MySQLConnection::GetLastError()
return mysql_errno(m_Mysql);
}
-void MySQLConnection::StartDatabaseWorkerThread(ProducerConsumerQueue<SQLOperation*>* queue)
+void MySQLConnection::StartWorkerThread(Trinity::Asio::IoContext* context)
+{
+ m_workerThread = std::make_unique<std::thread>([context]
+ {
+ boost::asio::executor_work_guard executorWorkGuard = boost::asio::make_work_guard(context->get_executor());
+
+ context->run();
+ });
+}
+
+std::thread::id MySQLConnection::GetWorkerThreadId() const
{
- m_worker = std::make_unique<DatabaseWorker>(queue, this);
+ if (m_workerThread)
+ return m_workerThread->get_id();
+
+ return {};
}
bool MySQLConnection::LockIfReady()
diff --git a/src/server/database/Database/MySQLConnection.h b/src/server/database/Database/MySQLConnection.h
index b6656d42d91..df42cd3a342 100644
--- a/src/server/database/Database/MySQLConnection.h
+++ b/src/server/database/Database/MySQLConnection.h
@@ -18,6 +18,7 @@
#ifndef _MYSQLCONNECTION_H
#define _MYSQLCONNECTION_H
+#include "AsioHacksFwd.h"
#include "Define.h"
#include "DatabaseEnvFwd.h"
#include <memory>
@@ -25,12 +26,7 @@
#include <string>
#include <vector>
-template <typename T>
-class ProducerConsumerQueue;
-
-class DatabaseWorker;
class MySQLPreparedStatement;
-class SQLOperation;
enum ConnectionFlags
{
@@ -81,7 +77,8 @@ class TC_DATABASE_API MySQLConnection
uint32 GetLastError();
- void StartDatabaseWorkerThread(ProducerConsumerQueue<SQLOperation*>* queue);
+ void StartWorkerThread(Trinity::Asio::IoContext* context);
+ std::thread::id GetWorkerThreadId() const;
protected:
/// Tries to acquire lock. If lock is acquired by another thread
@@ -106,7 +103,7 @@ class TC_DATABASE_API MySQLConnection
private:
bool _HandleMySQLErrno(uint32 errNo, uint8 attempts = 5);
- std::unique_ptr<DatabaseWorker> m_worker; //!< Core worker task.
+ std::unique_ptr<std::thread> m_workerThread; //!< Core worker thread.
MySQLHandle* m_Mysql; //!< MySQL Handle.
MySQLConnectionInfo& m_connectionInfo; //!< Connection info (used for logging)
ConnectionFlags m_connectionFlags; //!< Connection flags (for preparing relevant statements)
diff --git a/src/server/database/Database/PreparedStatement.cpp b/src/server/database/Database/PreparedStatement.cpp
index b806fb2ee48..fe20f1c0a7e 100644
--- a/src/server/database/Database/PreparedStatement.cpp
+++ b/src/server/database/Database/PreparedStatement.cpp
@@ -118,37 +118,21 @@ void PreparedStatementBase::setNull(const uint8 index)
}
//- Execution
-PreparedStatementTask::PreparedStatementTask(PreparedStatementBase* stmt, bool async) :
-m_stmt(stmt), m_result(nullptr)
+PreparedQueryResult PreparedStatementTask::Query(MySQLConnection* conn, PreparedStatementBase* stmt)
{
- m_has_result = async; // If it's async, then there's a result
- if (async)
- m_result = new PreparedQueryResultPromise();
-}
+ PreparedResultSet* result = conn->Query(stmt);
+ if (!result || !result->GetRowCount())
+ {
+ delete result;
+ result = nullptr;
+ }
-PreparedStatementTask::~PreparedStatementTask()
-{
- delete m_stmt;
- if (m_has_result && m_result != nullptr)
- delete m_result;
+ return PreparedQueryResult(result);
}
-bool PreparedStatementTask::Execute()
+bool PreparedStatementTask::Execute(MySQLConnection* conn, PreparedStatementBase* stmt)
{
- if (m_has_result)
- {
- PreparedResultSet* result = m_conn->Query(m_stmt);
- if (!result || !result->GetRowCount())
- {
- delete result;
- m_result->set_value(PreparedQueryResult(nullptr));
- return false;
- }
- m_result->set_value(PreparedQueryResult(result));
- return true;
- }
-
- return m_conn->Execute(m_stmt);
+ return conn->Execute(stmt);
}
template<typename T>
diff --git a/src/server/database/Database/PreparedStatement.h b/src/server/database/Database/PreparedStatement.h
index af4db9c310e..9fde41e6f2a 100644
--- a/src/server/database/Database/PreparedStatement.h
+++ b/src/server/database/Database/PreparedStatement.h
@@ -18,11 +18,14 @@
#ifndef _PREPAREDSTATEMENT_H
#define _PREPAREDSTATEMENT_H
+#include "DatabaseEnvFwd.h"
#include "Define.h"
-#include "SQLOperation.h"
-#include <future>
-#include <vector>
+#include <array>
+#include <string>
#include <variant>
+#include <vector>
+
+class MySQLConnection;
struct PreparedStatementData
{
@@ -112,18 +115,11 @@ private:
};
//- Lower-level class, enqueuable operation
-class TC_DATABASE_API PreparedStatementTask : public SQLOperation
+class TC_DATABASE_API PreparedStatementTask
{
- public:
- PreparedStatementTask(PreparedStatementBase* stmt, bool async = false);
- ~PreparedStatementTask();
-
- bool Execute() override;
- PreparedQueryResultFuture GetFuture() { return m_result->get_future(); }
-
- protected:
- PreparedStatementBase* m_stmt;
- bool m_has_result;
- PreparedQueryResultPromise* m_result;
+public:
+ static PreparedQueryResult Query(MySQLConnection* conn, PreparedStatementBase* stmt);
+ static bool Execute(MySQLConnection* conn, PreparedStatementBase* stmt);
};
+
#endif
diff --git a/src/server/database/Database/QueryHolder.cpp b/src/server/database/Database/QueryHolder.cpp
index 7908c2c0206..ee76a087cf0 100644
--- a/src/server/database/Database/QueryHolder.cpp
+++ b/src/server/database/Database/QueryHolder.cpp
@@ -72,16 +72,13 @@ void SQLQueryHolderBase::SetSize(size_t size)
m_queries.resize(size);
}
-SQLQueryHolderTask::~SQLQueryHolderTask() = default;
-
-bool SQLQueryHolderTask::Execute()
+bool SQLQueryHolderTask::Execute(MySQLConnection* conn, SQLQueryHolderBase* holder)
{
/// execute all queries in the holder and pass the results
- for (size_t i = 0; i < m_holder->m_queries.size(); ++i)
- if (PreparedStatementBase* stmt = m_holder->m_queries[i].first)
- m_holder->SetPreparedResult(i, m_conn->Query(stmt));
+ for (size_t i = 0; i < holder->m_queries.size(); ++i)
+ if (PreparedStatementBase* stmt = holder->m_queries[i].first)
+ holder->SetPreparedResult(i, conn->Query(stmt));
- m_result.set_value();
return true;
}
diff --git a/src/server/database/Database/QueryHolder.h b/src/server/database/Database/QueryHolder.h
index 25687fedd81..e63fe4add66 100644
--- a/src/server/database/Database/QueryHolder.h
+++ b/src/server/database/Database/QueryHolder.h
@@ -18,9 +18,12 @@
#ifndef _QUERYHOLDER_H
#define _QUERYHOLDER_H
-#include "SQLOperation.h"
+#include "Define.h"
+#include "DatabaseEnvFwd.h"
#include <vector>
+class MySQLConnection;
+
class TC_DATABASE_API SQLQueryHolderBase
{
friend class SQLQueryHolderTask;
@@ -47,20 +50,10 @@ public:
}
};
-class TC_DATABASE_API SQLQueryHolderTask : public SQLOperation
+class TC_DATABASE_API SQLQueryHolderTask
{
- private:
- std::shared_ptr<SQLQueryHolderBase> m_holder;
- QueryResultHolderPromise m_result;
-
- public:
- explicit SQLQueryHolderTask(std::shared_ptr<SQLQueryHolderBase> holder)
- : m_holder(std::move(holder)) { }
-
- ~SQLQueryHolderTask();
-
- bool Execute() override;
- QueryResultHolderFuture GetFuture() { return m_result.get_future(); }
+public:
+ static bool Execute(MySQLConnection* conn, SQLQueryHolderBase* holder);
};
class TC_DATABASE_API SQLQueryHolderCallback
diff --git a/src/server/database/Database/SQLOperation.h b/src/server/database/Database/SQLOperation.h
index 2e85f72c17c..c4b5d325576 100644
--- a/src/server/database/Database/SQLOperation.h
+++ b/src/server/database/Database/SQLOperation.h
@@ -18,7 +18,6 @@
#ifndef _SQLOPERATION_H
#define _SQLOPERATION_H
-#include "Define.h"
#include "DatabaseEnvFwd.h"
//- Union that holds element data
@@ -42,27 +41,4 @@ struct SQLElementData
SQLElementDataType type;
};
-class MySQLConnection;
-
-class TC_DATABASE_API SQLOperation
-{
- public:
- SQLOperation(): m_conn(nullptr) { }
- virtual ~SQLOperation() { }
-
- virtual int call()
- {
- Execute();
- return 0;
- }
- virtual bool Execute() = 0;
- virtual void SetConnection(MySQLConnection* con) { m_conn = con; }
-
- MySQLConnection* m_conn;
-
- private:
- SQLOperation(SQLOperation const& right) = delete;
- SQLOperation& operator=(SQLOperation const& right) = delete;
-};
-
#endif
diff --git a/src/server/database/Database/Transaction.cpp b/src/server/database/Database/Transaction.cpp
index bacfbd6bd4c..00ab5cbc7fa 100644
--- a/src/server/database/Database/Transaction.cpp
+++ b/src/server/database/Database/Transaction.cpp
@@ -70,9 +70,9 @@ void TransactionBase::Cleanup()
_cleanedUp = true;
}
-bool TransactionTask::Execute()
+bool TransactionTask::Execute(MySQLConnection* conn, std::shared_ptr<TransactionBase> trans)
{
- int errorCode = TryExecute();
+ int errorCode = TryExecute(conn, trans);
if (!errorCode)
return true;
@@ -91,7 +91,7 @@ bool TransactionTask::Execute()
for (uint32 loopDuration = 0, startMSTime = getMSTime(); loopDuration <= DEADLOCK_MAX_RETRY_TIME_MS; loopDuration = GetMSTimeDiffToNow(startMSTime))
{
- if (!TryExecute())
+ if (!TryExecute(conn, trans))
return true;
TC_LOG_WARN("sql.sql", "Deadlocked SQL Transaction, retrying. Loop timer: {} ms, Thread Id: {}", loopDuration, threadId);
@@ -101,61 +101,14 @@ bool TransactionTask::Execute()
}
// Clean up now.
- CleanupOnFailure();
+ trans->Cleanup();
return false;
}
-int TransactionTask::TryExecute()
+int TransactionTask::TryExecute(MySQLConnection* conn, std::shared_ptr<TransactionBase> trans)
{
- return m_conn->ExecuteTransaction(m_trans);
-}
-
-void TransactionTask::CleanupOnFailure()
-{
- m_trans->Cleanup();
-}
-
-bool TransactionWithResultTask::Execute()
-{
- int errorCode = TryExecute();
- if (!errorCode)
- {
- m_result.set_value(true);
- return true;
- }
-
- if (errorCode == ER_LOCK_DEADLOCK)
- {
- std::string threadId = []()
- {
- // wrapped in lambda to fix false positive analysis warning C26115
- std::ostringstream threadIdStream;
- threadIdStream << std::this_thread::get_id();
- return threadIdStream.str();
- }();
-
- // Make sure only 1 async thread retries a transaction so they don't keep dead-locking each other
- std::lock_guard<std::mutex> lock(_deadlockLock);
- for (uint32 loopDuration = 0, startMSTime = getMSTime(); loopDuration <= DEADLOCK_MAX_RETRY_TIME_MS; loopDuration = GetMSTimeDiffToNow(startMSTime))
- {
- if (!TryExecute())
- {
- m_result.set_value(true);
- return true;
- }
-
- TC_LOG_WARN("sql.sql", "Deadlocked SQL Transaction, retrying. Loop timer: {} ms, Thread Id: {}", loopDuration, threadId);
- }
-
- TC_LOG_ERROR("sql.sql", "Fatal deadlocked SQL Transaction, it will not be retried anymore. Thread Id: {}", threadId);
- }
-
- // Clean up now.
- CleanupOnFailure();
- m_result.set_value(false);
-
- return false;
+ return conn->ExecuteTransaction(trans);
}
bool TransactionCallback::InvokeIfReady()
diff --git a/src/server/database/Database/Transaction.h b/src/server/database/Database/Transaction.h
index ab6ef73e6af..5fc81bd05c9 100644
--- a/src/server/database/Database/Transaction.h
+++ b/src/server/database/Database/Transaction.h
@@ -26,6 +26,8 @@
#include <mutex>
#include <vector>
+class MySQLConnection;
+
/*! Transactions, high level class. */
class TC_DATABASE_API TransactionBase
{
@@ -69,36 +71,15 @@ public:
};
/*! Low level class*/
-class TC_DATABASE_API TransactionTask : public SQLOperation
-{
- template <class T> friend class DatabaseWorkerPool;
- friend class DatabaseWorker;
- friend class TransactionCallback;
-
- public:
- TransactionTask(std::shared_ptr<TransactionBase> trans) : m_trans(trans) { }
- ~TransactionTask() { }
-
- protected:
- bool Execute() override;
- int TryExecute();
- void CleanupOnFailure();
-
- std::shared_ptr<TransactionBase> m_trans;
- static std::mutex _deadlockLock;
-};
-
-class TC_DATABASE_API TransactionWithResultTask : public TransactionTask
+class TC_DATABASE_API TransactionTask
{
public:
- TransactionWithResultTask(std::shared_ptr<TransactionBase> trans) : TransactionTask(trans) { }
-
- TransactionFuture GetFuture() { return m_result.get_future(); }
+ static bool Execute(MySQLConnection* conn, std::shared_ptr<TransactionBase> trans);
-protected:
- bool Execute() override;
+private:
+ static int TryExecute(MySQLConnection* conn, std::shared_ptr<TransactionBase> trans);
- TransactionPromise m_result;
+ static std::mutex _deadlockLock;
};
class TC_DATABASE_API TransactionCallback
diff --git a/src/server/game/World/World.cpp b/src/server/game/World/World.cpp
index b8cddbc80cb..ec97769bf1d 100644
--- a/src/server/game/World/World.cpp
+++ b/src/server/game/World/World.cpp
@@ -27,10 +27,10 @@
#include "AuctionHouseBot.h"
#include "AuctionHouseMgr.h"
#include "AuthenticationPackets.h"
+#include "BattlePetMgr.h"
#include "BattlefieldMgr.h"
#include "BattlegroundMgr.h"
#include "BattlenetRpcErrorCodes.h"
-#include "BattlePetMgr.h"
#include "BlackMarketMgr.h"
#include "CalendarMgr.h"
#include "ChannelMgr.h"
@@ -46,8 +46,8 @@
#include "CreatureAIRegistry.h"
#include "CreatureGroups.h"
#include "CreatureTextMgr.h"
-#include "DatabaseEnv.h"
#include "DB2Stores.h"
+#include "DatabaseEnv.h"
#include "DetourMemoryFunctions.h"
#include "DisableMgr.h"
#include "GameEventMgr.h"
@@ -58,20 +58,20 @@
#include "GridNotifiersImpl.h"
#include "GroupMgr.h"
#include "GuildMgr.h"
-#include "InstanceLockMgr.h"
#include "IPLocation.h"
+#include "InstanceLockMgr.h"
+#include "LFGMgr.h"
#include "Language.h"
#include "LanguageMgr.h"
-#include "LFGMgr.h"
#include "Log.h"
#include "LootItemStorage.h"
#include "LootMgr.h"
#include "M2Stores.h"
+#include "MMapFactory.h"
#include "Map.h"
#include "MapManager.h"
#include "Metric.h"
#include "MiscPackets.h"
-#include "MMapFactory.h"
#include "ObjectAccessor.h"
#include "ObjectMgr.h"
#include "OutdoorPvPMgr.h"
@@ -86,8 +86,8 @@
#include "ScriptReloadMgr.h"
#include "SkillDiscovery.h"
#include "SkillExtraItems.h"
-#include "SpellMgr.h"
#include "SmartScriptMgr.h"
+#include "SpellMgr.h"
#include "SupportMgr.h"
#include "TaxiPathGraph.h"
#include "TerrainMgr.h"
diff --git a/src/server/game/World/World.h b/src/server/game/World/World.h
index 86d994a6001..a377701631b 100644
--- a/src/server/game/World/World.h
+++ b/src/server/game/World/World.h
@@ -755,7 +755,7 @@ class TC_GAME_API World
void ForceGameEventUpdate();
- void UpdateRealmCharCount(uint32 accid);
+ void UpdateRealmCharCount(uint32 accountId);
LocaleConstant GetAvailableDbcLocale(LocaleConstant locale) const { if (m_availableDbcLocaleMask & (1 << locale)) return locale; else return m_defaultDbcLocale; }