diff options
author | Shauren <shauren.trinity@gmail.com> | 2023-12-15 12:06:59 +0100 |
---|---|---|
committer | funjoker <funjoker109@gmail.com> | 2023-12-19 14:14:23 +0100 |
commit | e0d45f6dff3afe5caa38c48646f23d966c8c03a1 (patch) | |
tree | 6d1b9fef794cec268fa86398d9bb8957cf99f5ba /src | |
parent | 575fc7fde329ab47e517357d74a47316c6413544 (diff) |
Core/Database: Replace DatabaseWorker with asio io_context
(cherry picked from commit d958bfd0f32bfe798809b72c1b51c990edfe141c)
Diffstat (limited to 'src')
19 files changed, 234 insertions, 400 deletions
diff --git a/src/common/Asio/IoContext.h b/src/common/Asio/IoContext.h index b26de194b5c..4b469d3baa2 100644 --- a/src/common/Asio/IoContext.h +++ b/src/common/Asio/IoContext.h @@ -18,6 +18,7 @@ #ifndef IoContext_h__ #define IoContext_h__ +#include <boost/asio/bind_executor.hpp> #include <boost/asio/io_context.hpp> #include <boost/asio/post.hpp> @@ -28,6 +29,8 @@ namespace Trinity class IoContext { public: + using Executor = boost::asio::io_context::executor_type; + IoContext() : _impl() { } explicit IoContext(int concurrency_hint) : _impl(concurrency_hint) { } @@ -35,9 +38,13 @@ namespace Trinity operator boost::asio::io_context const&() const { return _impl; } std::size_t run() { return _impl.run(); } + std::size_t poll() { return _impl.poll(); } void stop() { _impl.stop(); } - boost::asio::io_context::executor_type get_executor() noexcept { return _impl.get_executor(); } + bool stopped() const { return _impl.stopped(); } + void restart() { return _impl.restart(); } + + Executor get_executor() noexcept { return _impl.get_executor(); } private: boost::asio::io_context _impl; @@ -50,6 +57,14 @@ namespace Trinity } template<typename T> + inline decltype(auto) post(boost::asio::io_context::executor_type& executor, T&& t) + { + return boost::asio::post(executor, std::forward<T>(t)); + } + + using boost::asio::bind_executor; + + template<typename T> inline decltype(auto) get_io_context(T&& ioObject) { return ioObject.get_executor().context(); diff --git a/src/common/Asio/Strand.h b/src/common/Asio/Strand.h index 942ddf55cd4..5228a30103b 100644 --- a/src/common/Asio/Strand.h +++ b/src/common/Asio/Strand.h @@ -19,7 +19,6 @@ #define Strand_h__ #include "IoContext.h" -#include <boost/asio/bind_executor.hpp> #include <boost/asio/strand.hpp> namespace Trinity diff --git a/src/server/database/Database/AdhocStatement.cpp b/src/server/database/Database/AdhocStatement.cpp index 39a753f0212..68c5e835b2c 100644 --- a/src/server/database/Database/AdhocStatement.cpp +++ b/src/server/database/Database/AdhocStatement.cpp @@ -18,41 +18,21 @@ #include "AdhocStatement.h" #include "MySQLConnection.h" #include "QueryResult.h" -#include <cstdlib> -#include <cstring> /*! Basic, ad-hoc queries. */ -BasicStatementTask::BasicStatementTask(char const* sql, bool async) : -m_result(nullptr) +QueryResult BasicStatementTask::Query(MySQLConnection* conn, char const* sql) { - m_sql = strdup(sql); - m_has_result = async; // If the operation is async, then there's a result - if (async) - m_result = new QueryResultPromise(); -} + ResultSet* result = conn->Query(sql); + if (!result || !result->GetRowCount() || !result->NextRow()) + { + delete result; + result = nullptr; + } -BasicStatementTask::~BasicStatementTask() -{ - free((void*)m_sql); - if (m_has_result && m_result != nullptr) - delete m_result; + return QueryResult(result); } -bool BasicStatementTask::Execute() +bool BasicStatementTask::Execute(MySQLConnection* conn, char const* sql) { - if (m_has_result) - { - ResultSet* result = m_conn->Query(m_sql); - if (!result || !result->GetRowCount() || !result->NextRow()) - { - delete result; - m_result->set_value(QueryResult(nullptr)); - return false; - } - - m_result->set_value(QueryResult(result)); - return true; - } - - return m_conn->Execute(m_sql); + return conn->Execute(sql); } diff --git a/src/server/database/Database/AdhocStatement.h b/src/server/database/Database/AdhocStatement.h index b9e197398cc..c74b15ccd97 100644 --- a/src/server/database/Database/AdhocStatement.h +++ b/src/server/database/Database/AdhocStatement.h @@ -18,24 +18,17 @@ #ifndef _ADHOCSTATEMENT_H #define _ADHOCSTATEMENT_H -#include "Define.h" #include "DatabaseEnvFwd.h" -#include "SQLOperation.h" +#include "Define.h" + +class MySQLConnection; /*! Raw, ad-hoc query. */ -class TC_DATABASE_API BasicStatementTask : public SQLOperation +class TC_DATABASE_API BasicStatementTask { - public: - BasicStatementTask(char const* sql, bool async = false); - ~BasicStatementTask(); - - bool Execute() override; - QueryResultFuture GetFuture() const { return m_result->get_future(); } - - private: - char const* m_sql; //- Raw query to be executed - bool m_has_result; - QueryResultPromise* m_result; +public: + static QueryResult Query(MySQLConnection* conn, char const* sql); + static bool Execute(MySQLConnection* conn, char const* sql); }; #endif diff --git a/src/server/database/Database/DatabaseWorker.cpp b/src/server/database/Database/DatabaseWorker.cpp deleted file mode 100644 index f379f442500..00000000000 --- a/src/server/database/Database/DatabaseWorker.cpp +++ /dev/null @@ -1,58 +0,0 @@ -/* - * This file is part of the TrinityCore Project. See AUTHORS file for Copyright information - * - * This program is free software; you can redistribute it and/or modify it - * under the terms of the GNU General Public License as published by the - * Free Software Foundation; either version 2 of the License, or (at your - * option) any later version. - * - * This program is distributed in the hope that it will be useful, but WITHOUT - * ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or - * FITNESS FOR A PARTICULAR PURPOSE. See the GNU General Public License for - * more details. - * - * You should have received a copy of the GNU General Public License along - * with this program. If not, see <http://www.gnu.org/licenses/>. - */ - -#include "DatabaseWorker.h" -#include "SQLOperation.h" -#include "ProducerConsumerQueue.h" - -DatabaseWorker::DatabaseWorker(ProducerConsumerQueue<SQLOperation*>* newQueue, MySQLConnection* connection) -{ - _connection = connection; - _queue = newQueue; - _cancelationToken = false; - _workerThread = std::thread(&DatabaseWorker::WorkerThread, this); -} - -DatabaseWorker::~DatabaseWorker() -{ - _cancelationToken = true; - - _queue->Cancel(); - - _workerThread.join(); -} - -void DatabaseWorker::WorkerThread() -{ - if (!_queue) - return; - - for (;;) - { - SQLOperation* operation = nullptr; - - _queue->WaitAndPop(operation); - - if (_cancelationToken || !operation) - return; - - operation->SetConnection(_connection); - operation->call(); - - delete operation; - } -} diff --git a/src/server/database/Database/DatabaseWorker.h b/src/server/database/Database/DatabaseWorker.h deleted file mode 100644 index 6245332e614..00000000000 --- a/src/server/database/Database/DatabaseWorker.h +++ /dev/null @@ -1,50 +0,0 @@ -/* - * This file is part of the TrinityCore Project. See AUTHORS file for Copyright information - * - * This program is free software; you can redistribute it and/or modify it - * under the terms of the GNU General Public License as published by the - * Free Software Foundation; either version 2 of the License, or (at your - * option) any later version. - * - * This program is distributed in the hope that it will be useful, but WITHOUT - * ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or - * FITNESS FOR A PARTICULAR PURPOSE. See the GNU General Public License for - * more details. - * - * You should have received a copy of the GNU General Public License along - * with this program. If not, see <http://www.gnu.org/licenses/>. - */ - -#ifndef _WORKERTHREAD_H -#define _WORKERTHREAD_H - -#include "Define.h" -#include <atomic> -#include <thread> - -template <typename T> -class ProducerConsumerQueue; - -class MySQLConnection; -class SQLOperation; - -class TC_DATABASE_API DatabaseWorker -{ - public: - DatabaseWorker(ProducerConsumerQueue<SQLOperation*>* newQueue, MySQLConnection* connection); - ~DatabaseWorker(); - - private: - ProducerConsumerQueue<SQLOperation*>* _queue; - MySQLConnection* _connection; - - void WorkerThread(); - std::thread _workerThread; - - std::atomic<bool> _cancelationToken; - - DatabaseWorker(DatabaseWorker const& right) = delete; - DatabaseWorker& operator=(DatabaseWorker const& right) = delete; -}; - -#endif diff --git a/src/server/database/Database/DatabaseWorkerPool.cpp b/src/server/database/Database/DatabaseWorkerPool.cpp index 0b205e7c3c8..6e9f8a8df1a 100644 --- a/src/server/database/Database/DatabaseWorkerPool.cpp +++ b/src/server/database/Database/DatabaseWorkerPool.cpp @@ -19,6 +19,7 @@ #include "AdhocStatement.h" #include "Common.h" #include "Errors.h" +#include "IoContext.h" #include "Implementation/LoginDatabase.h" #include "Implementation/WorldDatabase.h" #include "Implementation/CharacterDatabase.h" @@ -30,9 +31,9 @@ #include "QueryCallback.h" #include "QueryHolder.h" #include "QueryResult.h" -#include "SQLOperation.h" #include "Transaction.h" #include "MySQLWorkaround.h" +#include <boost/asio/use_future.hpp> #include <mysqld_error.h> #ifdef TRINITY_DEBUG #include <sstream> @@ -49,20 +50,59 @@ #define MIN_MARIADB_CLIENT_VERSION 30003u #define MIN_MARIADB_CLIENT_VERSION_STRING "3.0.3" -class PingOperation : public SQLOperation +template<typename T> +struct DatabaseWorkerPool<T>::QueueSizeTracker { - //! Operation for idle delaythreads - bool Execute() override + explicit QueueSizeTracker(DatabaseWorkerPool* pool) : _pool(pool) { - m_conn->Ping(); - return true; + ++_pool->_queueSize; } + + QueueSizeTracker(QueueSizeTracker const& other) : _pool(other._pool) { ++_pool->_queueSize; } + QueueSizeTracker(QueueSizeTracker&& other) noexcept : _pool(std::exchange(other._pool, nullptr)) { } + + QueueSizeTracker& operator=(QueueSizeTracker const& other) + { + if (this != &other) + { + if (_pool != other._pool) + { + if (_pool) + --_pool->_queueSize; + if (other._pool) + ++other._pool->_queueSize; + } + _pool = other._pool; + } + return *this; + } + QueueSizeTracker& operator=(QueueSizeTracker&& other) noexcept + { + if (this != &other) + { + if (_pool != other._pool) + { + if (_pool) + --_pool->_queueSize; + } + _pool = std::exchange(other._pool, nullptr); + } + return *this; + } + + ~QueueSizeTracker() + { + if (_pool) + --_pool->_queueSize; + } + +private: + DatabaseWorkerPool* _pool; }; template <class T> DatabaseWorkerPool<T>::DatabaseWorkerPool() - : _queue(new ProducerConsumerQueue<SQLOperation*>()), - _async_threads(0), _synch_threads(0) + : _async_threads(0), _synch_threads(0) { WPFatal(mysql_thread_safe(), "Used MySQL library isn't thread-safe."); @@ -78,7 +118,6 @@ DatabaseWorkerPool<T>::DatabaseWorkerPool() template <class T> DatabaseWorkerPool<T>::~DatabaseWorkerPool() { - _queue->Cancel(); } template <class T> @@ -100,6 +139,8 @@ uint32 DatabaseWorkerPool<T>::Open() "Asynchronous connections: {}, synchronous connections: {}.", GetDatabaseName(), _async_threads, _synch_threads); + _ioContext = std::make_unique<Trinity::Asio::IoContext>(_async_threads); + uint32 error = OpenConnections(IDX_ASYNC, _async_threads); if (error) @@ -122,9 +163,13 @@ void DatabaseWorkerPool<T>::Close() { TC_LOG_INFO("sql.driver", "Closing down DatabasePool '{}'.", GetDatabaseName()); + _ioContext->stop(); + //! Closes the actualy MySQL connection. _connections[IDX_ASYNC].clear(); + _ioContext.reset(); + TC_LOG_INFO("sql.driver", "Asynchronous connections on DatabasePool '{}' terminated. " "Proceeding with synchronous connections.", GetDatabaseName()); @@ -188,63 +233,55 @@ QueryResult DatabaseWorkerPool<T>::Query(char const* sql, T* connection /*= null if (!connection) connection = GetFreeConnection(); - ResultSet* result = connection->Query(sql); + QueryResult result = BasicStatementTask::Query(connection, sql); connection->Unlock(); - if (!result || !result->GetRowCount() || !result->NextRow()) - { - delete result; - return QueryResult(nullptr); - } - return QueryResult(result); + return result; } template <class T> PreparedQueryResult DatabaseWorkerPool<T>::Query(PreparedStatement<T>* stmt) { - auto connection = GetFreeConnection(); - PreparedResultSet* ret = connection->Query(stmt); + T* connection = GetFreeConnection(); + PreparedQueryResult ret = PreparedStatementTask::Query(connection, stmt); connection->Unlock(); //! Delete proxy-class. Not needed anymore delete stmt; - if (!ret || !ret->GetRowCount()) - { - delete ret; - return PreparedQueryResult(nullptr); - } - - return PreparedQueryResult(ret); + return ret; } template <class T> QueryCallback DatabaseWorkerPool<T>::AsyncQuery(char const* sql) { - BasicStatementTask* task = new BasicStatementTask(sql, true); - // Store future result before enqueueing - task might get already processed and deleted before returning from this method - QueryResultFuture result = task->GetFuture(); - Enqueue(task); + QueryResultFuture result = boost::asio::post(_ioContext->get_executor(), boost::asio::use_future([this, sql = std::string(sql), tracker = QueueSizeTracker(this)] + { + T* conn = GetAsyncConnectionForCurrentThread(); + return BasicStatementTask::Query(conn, sql.c_str()); + })); return QueryCallback(std::move(result)); } template <class T> QueryCallback DatabaseWorkerPool<T>::AsyncQuery(PreparedStatement<T>* stmt) { - PreparedStatementTask* task = new PreparedStatementTask(stmt, true); - // Store future result before enqueueing - task might get already processed and deleted before returning from this method - PreparedQueryResultFuture result = task->GetFuture(); - Enqueue(task); + PreparedQueryResultFuture result = boost::asio::post(_ioContext->get_executor(), boost::asio::use_future([this, stmt = std::unique_ptr<PreparedStatement<T>>(stmt), tracker = QueueSizeTracker(this)] + { + T* conn = GetAsyncConnectionForCurrentThread(); + return PreparedStatementTask::Query(conn, stmt.get()); + })); return QueryCallback(std::move(result)); } template <class T> SQLQueryHolderCallback DatabaseWorkerPool<T>::DelayQueryHolder(std::shared_ptr<SQLQueryHolder<T>> holder) { - SQLQueryHolderTask* task = new SQLQueryHolderTask(holder); - // Store future result before enqueueing - task might get already processed and deleted before returning from this method - QueryResultHolderFuture result = task->GetFuture(); - Enqueue(task); + QueryResultHolderFuture result = boost::asio::post(_ioContext->get_executor(), boost::asio::use_future([this, holder, tracker = QueueSizeTracker(this)] + { + T* conn = GetAsyncConnectionForCurrentThread(); + SQLQueryHolderTask::Execute(conn, holder.get()); + })); return { std::move(holder), std::move(result) }; } @@ -274,7 +311,11 @@ void DatabaseWorkerPool<T>::CommitTransaction(SQLTransaction<T> transaction) } #endif // TRINITY_DEBUG - Enqueue(new TransactionTask(transaction)); + boost::asio::post(_ioContext->get_executor(), [this, transaction, tracker = QueueSizeTracker(this)] + { + T* conn = GetAsyncConnectionForCurrentThread(); + TransactionTask::Execute(conn, transaction); + }); } template <class T> @@ -297,9 +338,11 @@ TransactionCallback DatabaseWorkerPool<T>::AsyncCommitTransaction(SQLTransaction } #endif // TRINITY_DEBUG - TransactionWithResultTask* task = new TransactionWithResultTask(transaction); - TransactionFuture result = task->GetFuture(); - Enqueue(task); + TransactionFuture result = boost::asio::post(_ioContext->get_executor(), boost::asio::use_future([this, transaction, tracker = QueueSizeTracker(this)] + { + T* conn = GetAsyncConnectionForCurrentThread(); + return TransactionTask::Execute(conn, transaction); + })); return TransactionCallback(std::move(result)); } @@ -369,7 +412,13 @@ void DatabaseWorkerPool<T>::KeepAlive() //! as the sole purpose is to prevent connections from idling. auto const count = _connections[IDX_ASYNC].size(); for (uint8 i = 0; i < count; ++i) - Enqueue(new PingOperation); + { + boost::asio::post(_ioContext->get_executor(), [this, tracker = QueueSizeTracker(this)] + { + T* conn = GetAsyncConnectionForCurrentThread(); + conn->Ping(); + }); + } } template <class T> @@ -384,7 +433,7 @@ uint32 DatabaseWorkerPool<T>::OpenConnections(InternalIndex type, uint8 numConne case IDX_ASYNC: { auto c = std::make_unique<T>(*_connectionInfo, CONNECTION_ASYNC); - c->StartDatabaseWorkerThread(_queue.get()); + c->StartWorkerThread(_ioContext.get()); return c; } case IDX_SYNCH: @@ -434,15 +483,9 @@ unsigned long DatabaseWorkerPool<T>::EscapeString(char* to, char const* from, un } template <class T> -void DatabaseWorkerPool<T>::Enqueue(SQLOperation* op) -{ - _queue->Push(op); -} - -template <class T> size_t DatabaseWorkerPool<T>::QueueSize() const { - return _queue->Size(); + return _queueSize; } template <class T> @@ -473,6 +516,17 @@ T* DatabaseWorkerPool<T>::GetFreeConnection() } template <class T> +T* DatabaseWorkerPool<T>::GetAsyncConnectionForCurrentThread() const +{ + std::thread::id id = std::this_thread::get_id(); + for (auto&& connection : _connections[IDX_ASYNC]) + if (connection->GetWorkerThreadId() == id) + return connection.get(); + + return nullptr; +} + +template <class T> char const* DatabaseWorkerPool<T>::GetDatabaseName() const { return _connectionInfo->database.c_str(); @@ -484,15 +538,21 @@ void DatabaseWorkerPool<T>::Execute(char const* sql) if (!sql) return; - BasicStatementTask* task = new BasicStatementTask(sql); - Enqueue(task); + boost::asio::post(_ioContext->get_executor(), [this, sql = std::string(sql), tracker = QueueSizeTracker(this)] + { + T* conn = GetAsyncConnectionForCurrentThread(); + BasicStatementTask::Execute(conn, sql.c_str()); + }); } template <class T> void DatabaseWorkerPool<T>::Execute(PreparedStatement<T>* stmt) { - PreparedStatementTask* task = new PreparedStatementTask(stmt); - Enqueue(task); + boost::asio::post(_ioContext->get_executor(), [this, stmt = std::unique_ptr<PreparedStatement<T>>(stmt), tracker = QueueSizeTracker(this)] + { + T* conn = GetAsyncConnectionForCurrentThread(); + PreparedStatementTask::Execute(conn, stmt.get()); + }); } template <class T> @@ -502,7 +562,7 @@ void DatabaseWorkerPool<T>::DirectExecute(char const* sql) return; T* connection = GetFreeConnection(); - connection->Execute(sql); + BasicStatementTask::Execute(connection, sql); connection->Unlock(); } @@ -510,7 +570,7 @@ template <class T> void DatabaseWorkerPool<T>::DirectExecute(PreparedStatement<T>* stmt) { T* connection = GetFreeConnection(); - connection->Execute(stmt); + PreparedStatementTask::Execute(connection, stmt); connection->Unlock(); //! Delete proxy-class. Not needed anymore diff --git a/src/server/database/Database/DatabaseWorkerPool.h b/src/server/database/Database/DatabaseWorkerPool.h index 6e18d466b7f..70c7a022510 100644 --- a/src/server/database/Database/DatabaseWorkerPool.h +++ b/src/server/database/Database/DatabaseWorkerPool.h @@ -18,6 +18,7 @@ #ifndef _DATABASEWORKERPOOL_H #define _DATABASEWORKERPOOL_H +#include "AsioHacksFwd.h" #include "Define.h" #include "DatabaseEnvFwd.h" #include "StringFormat.h" @@ -25,10 +26,6 @@ #include <string> #include <vector> -template <typename T> -class ProducerConsumerQueue; - -class SQLOperation; struct MySQLConnectionInfo; template <class T> @@ -220,16 +217,20 @@ class DatabaseWorkerPool unsigned long EscapeString(char* to, char const* from, unsigned long length); - void Enqueue(SQLOperation* op); - //! Gets a free connection in the synchronous connection pool. //! Caller MUST call t->Unlock() after touching the MySQL context to prevent deadlocks. T* GetFreeConnection(); + T* GetAsyncConnectionForCurrentThread() const; + char const* GetDatabaseName() const; + struct QueueSizeTracker; + friend QueueSizeTracker; + //! Queue shared by async worker threads. - std::unique_ptr<ProducerConsumerQueue<SQLOperation*>> _queue; + std::unique_ptr<Trinity::Asio::IoContext> _ioContext; + std::atomic<size_t> _queueSize; std::array<std::vector<std::unique_ptr<T>>, IDX_SIZE> _connections; std::unique_ptr<MySQLConnectionInfo> _connectionInfo; std::vector<uint8> _preparedStatementSize; diff --git a/src/server/database/Database/MySQLConnection.cpp b/src/server/database/Database/MySQLConnection.cpp index c42ca4e2903..56e384520c2 100644 --- a/src/server/database/Database/MySQLConnection.cpp +++ b/src/server/database/Database/MySQLConnection.cpp @@ -17,7 +17,7 @@ #include "MySQLConnection.h" #include "Common.h" -#include "DatabaseWorker.h" +#include "IoContext.h" #include "Log.h" #include "MySQLHacks.h" #include "MySQLPreparedStatement.h" @@ -64,7 +64,11 @@ MySQLConnection::~MySQLConnection() void MySQLConnection::Close() { // Stop the worker thread before the statements are cleared - m_worker.reset(); + if (m_workerThread) + { + m_workerThread->join(); + m_workerThread.reset(); + } m_stmts.clear(); @@ -442,9 +446,22 @@ uint32 MySQLConnection::GetLastError() return mysql_errno(m_Mysql); } -void MySQLConnection::StartDatabaseWorkerThread(ProducerConsumerQueue<SQLOperation*>* queue) +void MySQLConnection::StartWorkerThread(Trinity::Asio::IoContext* context) +{ + m_workerThread = std::make_unique<std::thread>([context] + { + boost::asio::executor_work_guard executorWorkGuard = boost::asio::make_work_guard(context->get_executor()); + + context->run(); + }); +} + +std::thread::id MySQLConnection::GetWorkerThreadId() const { - m_worker = std::make_unique<DatabaseWorker>(queue, this); + if (m_workerThread) + return m_workerThread->get_id(); + + return {}; } bool MySQLConnection::LockIfReady() diff --git a/src/server/database/Database/MySQLConnection.h b/src/server/database/Database/MySQLConnection.h index b6656d42d91..df42cd3a342 100644 --- a/src/server/database/Database/MySQLConnection.h +++ b/src/server/database/Database/MySQLConnection.h @@ -18,6 +18,7 @@ #ifndef _MYSQLCONNECTION_H #define _MYSQLCONNECTION_H +#include "AsioHacksFwd.h" #include "Define.h" #include "DatabaseEnvFwd.h" #include <memory> @@ -25,12 +26,7 @@ #include <string> #include <vector> -template <typename T> -class ProducerConsumerQueue; - -class DatabaseWorker; class MySQLPreparedStatement; -class SQLOperation; enum ConnectionFlags { @@ -81,7 +77,8 @@ class TC_DATABASE_API MySQLConnection uint32 GetLastError(); - void StartDatabaseWorkerThread(ProducerConsumerQueue<SQLOperation*>* queue); + void StartWorkerThread(Trinity::Asio::IoContext* context); + std::thread::id GetWorkerThreadId() const; protected: /// Tries to acquire lock. If lock is acquired by another thread @@ -106,7 +103,7 @@ class TC_DATABASE_API MySQLConnection private: bool _HandleMySQLErrno(uint32 errNo, uint8 attempts = 5); - std::unique_ptr<DatabaseWorker> m_worker; //!< Core worker task. + std::unique_ptr<std::thread> m_workerThread; //!< Core worker thread. MySQLHandle* m_Mysql; //!< MySQL Handle. MySQLConnectionInfo& m_connectionInfo; //!< Connection info (used for logging) ConnectionFlags m_connectionFlags; //!< Connection flags (for preparing relevant statements) diff --git a/src/server/database/Database/PreparedStatement.cpp b/src/server/database/Database/PreparedStatement.cpp index b806fb2ee48..fe20f1c0a7e 100644 --- a/src/server/database/Database/PreparedStatement.cpp +++ b/src/server/database/Database/PreparedStatement.cpp @@ -118,37 +118,21 @@ void PreparedStatementBase::setNull(const uint8 index) } //- Execution -PreparedStatementTask::PreparedStatementTask(PreparedStatementBase* stmt, bool async) : -m_stmt(stmt), m_result(nullptr) +PreparedQueryResult PreparedStatementTask::Query(MySQLConnection* conn, PreparedStatementBase* stmt) { - m_has_result = async; // If it's async, then there's a result - if (async) - m_result = new PreparedQueryResultPromise(); -} + PreparedResultSet* result = conn->Query(stmt); + if (!result || !result->GetRowCount()) + { + delete result; + result = nullptr; + } -PreparedStatementTask::~PreparedStatementTask() -{ - delete m_stmt; - if (m_has_result && m_result != nullptr) - delete m_result; + return PreparedQueryResult(result); } -bool PreparedStatementTask::Execute() +bool PreparedStatementTask::Execute(MySQLConnection* conn, PreparedStatementBase* stmt) { - if (m_has_result) - { - PreparedResultSet* result = m_conn->Query(m_stmt); - if (!result || !result->GetRowCount()) - { - delete result; - m_result->set_value(PreparedQueryResult(nullptr)); - return false; - } - m_result->set_value(PreparedQueryResult(result)); - return true; - } - - return m_conn->Execute(m_stmt); + return conn->Execute(stmt); } template<typename T> diff --git a/src/server/database/Database/PreparedStatement.h b/src/server/database/Database/PreparedStatement.h index af4db9c310e..9fde41e6f2a 100644 --- a/src/server/database/Database/PreparedStatement.h +++ b/src/server/database/Database/PreparedStatement.h @@ -18,11 +18,14 @@ #ifndef _PREPAREDSTATEMENT_H #define _PREPAREDSTATEMENT_H +#include "DatabaseEnvFwd.h" #include "Define.h" -#include "SQLOperation.h" -#include <future> -#include <vector> +#include <array> +#include <string> #include <variant> +#include <vector> + +class MySQLConnection; struct PreparedStatementData { @@ -112,18 +115,11 @@ private: }; //- Lower-level class, enqueuable operation -class TC_DATABASE_API PreparedStatementTask : public SQLOperation +class TC_DATABASE_API PreparedStatementTask { - public: - PreparedStatementTask(PreparedStatementBase* stmt, bool async = false); - ~PreparedStatementTask(); - - bool Execute() override; - PreparedQueryResultFuture GetFuture() { return m_result->get_future(); } - - protected: - PreparedStatementBase* m_stmt; - bool m_has_result; - PreparedQueryResultPromise* m_result; +public: + static PreparedQueryResult Query(MySQLConnection* conn, PreparedStatementBase* stmt); + static bool Execute(MySQLConnection* conn, PreparedStatementBase* stmt); }; + #endif diff --git a/src/server/database/Database/QueryHolder.cpp b/src/server/database/Database/QueryHolder.cpp index 7908c2c0206..ee76a087cf0 100644 --- a/src/server/database/Database/QueryHolder.cpp +++ b/src/server/database/Database/QueryHolder.cpp @@ -72,16 +72,13 @@ void SQLQueryHolderBase::SetSize(size_t size) m_queries.resize(size); } -SQLQueryHolderTask::~SQLQueryHolderTask() = default; - -bool SQLQueryHolderTask::Execute() +bool SQLQueryHolderTask::Execute(MySQLConnection* conn, SQLQueryHolderBase* holder) { /// execute all queries in the holder and pass the results - for (size_t i = 0; i < m_holder->m_queries.size(); ++i) - if (PreparedStatementBase* stmt = m_holder->m_queries[i].first) - m_holder->SetPreparedResult(i, m_conn->Query(stmt)); + for (size_t i = 0; i < holder->m_queries.size(); ++i) + if (PreparedStatementBase* stmt = holder->m_queries[i].first) + holder->SetPreparedResult(i, conn->Query(stmt)); - m_result.set_value(); return true; } diff --git a/src/server/database/Database/QueryHolder.h b/src/server/database/Database/QueryHolder.h index 25687fedd81..e63fe4add66 100644 --- a/src/server/database/Database/QueryHolder.h +++ b/src/server/database/Database/QueryHolder.h @@ -18,9 +18,12 @@ #ifndef _QUERYHOLDER_H #define _QUERYHOLDER_H -#include "SQLOperation.h" +#include "Define.h" +#include "DatabaseEnvFwd.h" #include <vector> +class MySQLConnection; + class TC_DATABASE_API SQLQueryHolderBase { friend class SQLQueryHolderTask; @@ -47,20 +50,10 @@ public: } }; -class TC_DATABASE_API SQLQueryHolderTask : public SQLOperation +class TC_DATABASE_API SQLQueryHolderTask { - private: - std::shared_ptr<SQLQueryHolderBase> m_holder; - QueryResultHolderPromise m_result; - - public: - explicit SQLQueryHolderTask(std::shared_ptr<SQLQueryHolderBase> holder) - : m_holder(std::move(holder)) { } - - ~SQLQueryHolderTask(); - - bool Execute() override; - QueryResultHolderFuture GetFuture() { return m_result.get_future(); } +public: + static bool Execute(MySQLConnection* conn, SQLQueryHolderBase* holder); }; class TC_DATABASE_API SQLQueryHolderCallback diff --git a/src/server/database/Database/SQLOperation.h b/src/server/database/Database/SQLOperation.h index 2e85f72c17c..c4b5d325576 100644 --- a/src/server/database/Database/SQLOperation.h +++ b/src/server/database/Database/SQLOperation.h @@ -18,7 +18,6 @@ #ifndef _SQLOPERATION_H #define _SQLOPERATION_H -#include "Define.h" #include "DatabaseEnvFwd.h" //- Union that holds element data @@ -42,27 +41,4 @@ struct SQLElementData SQLElementDataType type; }; -class MySQLConnection; - -class TC_DATABASE_API SQLOperation -{ - public: - SQLOperation(): m_conn(nullptr) { } - virtual ~SQLOperation() { } - - virtual int call() - { - Execute(); - return 0; - } - virtual bool Execute() = 0; - virtual void SetConnection(MySQLConnection* con) { m_conn = con; } - - MySQLConnection* m_conn; - - private: - SQLOperation(SQLOperation const& right) = delete; - SQLOperation& operator=(SQLOperation const& right) = delete; -}; - #endif diff --git a/src/server/database/Database/Transaction.cpp b/src/server/database/Database/Transaction.cpp index bacfbd6bd4c..00ab5cbc7fa 100644 --- a/src/server/database/Database/Transaction.cpp +++ b/src/server/database/Database/Transaction.cpp @@ -70,9 +70,9 @@ void TransactionBase::Cleanup() _cleanedUp = true; } -bool TransactionTask::Execute() +bool TransactionTask::Execute(MySQLConnection* conn, std::shared_ptr<TransactionBase> trans) { - int errorCode = TryExecute(); + int errorCode = TryExecute(conn, trans); if (!errorCode) return true; @@ -91,7 +91,7 @@ bool TransactionTask::Execute() for (uint32 loopDuration = 0, startMSTime = getMSTime(); loopDuration <= DEADLOCK_MAX_RETRY_TIME_MS; loopDuration = GetMSTimeDiffToNow(startMSTime)) { - if (!TryExecute()) + if (!TryExecute(conn, trans)) return true; TC_LOG_WARN("sql.sql", "Deadlocked SQL Transaction, retrying. Loop timer: {} ms, Thread Id: {}", loopDuration, threadId); @@ -101,61 +101,14 @@ bool TransactionTask::Execute() } // Clean up now. - CleanupOnFailure(); + trans->Cleanup(); return false; } -int TransactionTask::TryExecute() +int TransactionTask::TryExecute(MySQLConnection* conn, std::shared_ptr<TransactionBase> trans) { - return m_conn->ExecuteTransaction(m_trans); -} - -void TransactionTask::CleanupOnFailure() -{ - m_trans->Cleanup(); -} - -bool TransactionWithResultTask::Execute() -{ - int errorCode = TryExecute(); - if (!errorCode) - { - m_result.set_value(true); - return true; - } - - if (errorCode == ER_LOCK_DEADLOCK) - { - std::string threadId = []() - { - // wrapped in lambda to fix false positive analysis warning C26115 - std::ostringstream threadIdStream; - threadIdStream << std::this_thread::get_id(); - return threadIdStream.str(); - }(); - - // Make sure only 1 async thread retries a transaction so they don't keep dead-locking each other - std::lock_guard<std::mutex> lock(_deadlockLock); - for (uint32 loopDuration = 0, startMSTime = getMSTime(); loopDuration <= DEADLOCK_MAX_RETRY_TIME_MS; loopDuration = GetMSTimeDiffToNow(startMSTime)) - { - if (!TryExecute()) - { - m_result.set_value(true); - return true; - } - - TC_LOG_WARN("sql.sql", "Deadlocked SQL Transaction, retrying. Loop timer: {} ms, Thread Id: {}", loopDuration, threadId); - } - - TC_LOG_ERROR("sql.sql", "Fatal deadlocked SQL Transaction, it will not be retried anymore. Thread Id: {}", threadId); - } - - // Clean up now. - CleanupOnFailure(); - m_result.set_value(false); - - return false; + return conn->ExecuteTransaction(trans); } bool TransactionCallback::InvokeIfReady() diff --git a/src/server/database/Database/Transaction.h b/src/server/database/Database/Transaction.h index ab6ef73e6af..5fc81bd05c9 100644 --- a/src/server/database/Database/Transaction.h +++ b/src/server/database/Database/Transaction.h @@ -26,6 +26,8 @@ #include <mutex> #include <vector> +class MySQLConnection; + /*! Transactions, high level class. */ class TC_DATABASE_API TransactionBase { @@ -69,36 +71,15 @@ public: }; /*! Low level class*/ -class TC_DATABASE_API TransactionTask : public SQLOperation -{ - template <class T> friend class DatabaseWorkerPool; - friend class DatabaseWorker; - friend class TransactionCallback; - - public: - TransactionTask(std::shared_ptr<TransactionBase> trans) : m_trans(trans) { } - ~TransactionTask() { } - - protected: - bool Execute() override; - int TryExecute(); - void CleanupOnFailure(); - - std::shared_ptr<TransactionBase> m_trans; - static std::mutex _deadlockLock; -}; - -class TC_DATABASE_API TransactionWithResultTask : public TransactionTask +class TC_DATABASE_API TransactionTask { public: - TransactionWithResultTask(std::shared_ptr<TransactionBase> trans) : TransactionTask(trans) { } - - TransactionFuture GetFuture() { return m_result.get_future(); } + static bool Execute(MySQLConnection* conn, std::shared_ptr<TransactionBase> trans); -protected: - bool Execute() override; +private: + static int TryExecute(MySQLConnection* conn, std::shared_ptr<TransactionBase> trans); - TransactionPromise m_result; + static std::mutex _deadlockLock; }; class TC_DATABASE_API TransactionCallback diff --git a/src/server/game/World/World.cpp b/src/server/game/World/World.cpp index b8cddbc80cb..ec97769bf1d 100644 --- a/src/server/game/World/World.cpp +++ b/src/server/game/World/World.cpp @@ -27,10 +27,10 @@ #include "AuctionHouseBot.h" #include "AuctionHouseMgr.h" #include "AuthenticationPackets.h" +#include "BattlePetMgr.h" #include "BattlefieldMgr.h" #include "BattlegroundMgr.h" #include "BattlenetRpcErrorCodes.h" -#include "BattlePetMgr.h" #include "BlackMarketMgr.h" #include "CalendarMgr.h" #include "ChannelMgr.h" @@ -46,8 +46,8 @@ #include "CreatureAIRegistry.h" #include "CreatureGroups.h" #include "CreatureTextMgr.h" -#include "DatabaseEnv.h" #include "DB2Stores.h" +#include "DatabaseEnv.h" #include "DetourMemoryFunctions.h" #include "DisableMgr.h" #include "GameEventMgr.h" @@ -58,20 +58,20 @@ #include "GridNotifiersImpl.h" #include "GroupMgr.h" #include "GuildMgr.h" -#include "InstanceLockMgr.h" #include "IPLocation.h" +#include "InstanceLockMgr.h" +#include "LFGMgr.h" #include "Language.h" #include "LanguageMgr.h" -#include "LFGMgr.h" #include "Log.h" #include "LootItemStorage.h" #include "LootMgr.h" #include "M2Stores.h" +#include "MMapFactory.h" #include "Map.h" #include "MapManager.h" #include "Metric.h" #include "MiscPackets.h" -#include "MMapFactory.h" #include "ObjectAccessor.h" #include "ObjectMgr.h" #include "OutdoorPvPMgr.h" @@ -86,8 +86,8 @@ #include "ScriptReloadMgr.h" #include "SkillDiscovery.h" #include "SkillExtraItems.h" -#include "SpellMgr.h" #include "SmartScriptMgr.h" +#include "SpellMgr.h" #include "SupportMgr.h" #include "TaxiPathGraph.h" #include "TerrainMgr.h" diff --git a/src/server/game/World/World.h b/src/server/game/World/World.h index 86d994a6001..a377701631b 100644 --- a/src/server/game/World/World.h +++ b/src/server/game/World/World.h @@ -755,7 +755,7 @@ class TC_GAME_API World void ForceGameEventUpdate(); - void UpdateRealmCharCount(uint32 accid); + void UpdateRealmCharCount(uint32 accountId); LocaleConstant GetAvailableDbcLocale(LocaleConstant locale) const { if (m_availableDbcLocaleMask & (1 << locale)) return locale; else return m_defaultDbcLocale; } |