diff options
22 files changed, 247 insertions, 144 deletions
diff --git a/src/common/Utilities/AsyncCallbackProcessor.h b/src/common/Utilities/AsyncCallbackProcessor.h new file mode 100644 index 00000000000..76781f49425 --- /dev/null +++ b/src/common/Utilities/AsyncCallbackProcessor.h @@ -0,0 +1,63 @@ +/* + * This file is part of the TrinityCore Project. See AUTHORS file for Copyright information + * + * This program is free software; you can redistribute it and/or modify it + * under the terms of the GNU General Public License as published by the + * Free Software Foundation; either version 2 of the License, or (at your + * option) any later version. + * + * This program is distributed in the hope that it will be useful, but WITHOUT + * ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or + * FITNESS FOR A PARTICULAR PURPOSE. See the GNU General Public License for + * more details. + * + * You should have received a copy of the GNU General Public License along + * with this program. If not, see <http://www.gnu.org/licenses/>. + */ + +#ifndef AsyncCallbackProcessor_h__ +#define AsyncCallbackProcessor_h__ + +#include "Define.h" +#include <algorithm> +#include <vector> + +//template <class T> +//concept AsyncCallback = requires(T t) { { t.InvokeIfReady() } -> std::convertible_to<bool> }; + +template<typename T> // requires AsyncCallback<T> +class AsyncCallbackProcessor +{ +public: + AsyncCallbackProcessor() = default; + ~AsyncCallbackProcessor() = default; + + T& AddCallback(T&& query) + { + _callbacks.emplace_back(std::move(query)); + return _callbacks.back(); + } + + void ProcessReadyCallbacks() + { + if (_callbacks.empty()) + return; + + std::vector<T> updateCallbacks{ std::move(_callbacks) }; + + updateCallbacks.erase(std::remove_if(updateCallbacks.begin(), updateCallbacks.end(), [](T& callback) + { + return callback.InvokeIfReady(); + }), updateCallbacks.end()); + + _callbacks.insert(_callbacks.end(), std::make_move_iterator(updateCallbacks.begin()), std::make_move_iterator(updateCallbacks.end())); + } + +private: + AsyncCallbackProcessor(AsyncCallbackProcessor const&) = delete; + AsyncCallbackProcessor& operator=(AsyncCallbackProcessor const&) = delete; + + std::vector<T> _callbacks; +}; + +#endif // AsyncCallbackProcessor_h__ diff --git a/src/server/bnetserver/REST/LoginRESTService.cpp b/src/server/bnetserver/REST/LoginRESTService.cpp index a94537ca05c..5ea40fc8267 100644 --- a/src/server/bnetserver/REST/LoginRESTService.cpp +++ b/src/server/bnetserver/REST/LoginRESTService.cpp @@ -47,7 +47,7 @@ public: bool InvokeIfReady() { ASSERT(_callback); - return _callback->InvokeIfReady() == QueryCallback::Completed; + return _callback->InvokeIfReady(); } soap* GetClient() { return &_client; } diff --git a/src/server/bnetserver/Server/Session.cpp b/src/server/bnetserver/Server/Session.cpp index f3e89becb36..d9e7bfad4c5 100644 --- a/src/server/bnetserver/Server/Session.cpp +++ b/src/server/bnetserver/Server/Session.cpp @@ -94,7 +94,7 @@ void Battlenet::Session::Start() LoginDatabasePreparedStatement* stmt = LoginDatabase.GetPreparedStatement(LOGIN_SEL_IP_INFO); stmt->setString(0, ip_address); - _queryProcessor.AddQuery(LoginDatabase.AsyncQuery(stmt).WithPreparedCallback(std::bind(&Battlenet::Session::CheckIpCallback, this, std::placeholders::_1))); + _queryProcessor.AddCallback(LoginDatabase.AsyncQuery(stmt).WithPreparedCallback(std::bind(&Battlenet::Session::CheckIpCallback, this, std::placeholders::_1))); } void Battlenet::Session::CheckIpCallback(PreparedQueryResult result) @@ -126,7 +126,7 @@ bool Battlenet::Session::Update() if (!BattlenetSocket::Update()) return false; - _queryProcessor.ProcessReadyQueries(); + _queryProcessor.ProcessReadyCallbacks(); return true; } @@ -258,7 +258,7 @@ uint32 Battlenet::Session::VerifyWebCredentials(std::string const& webCredential std::function<void(ServiceBase*, uint32, ::google::protobuf::Message const*)> asyncContinuation = std::move(continuation); std::shared_ptr<AccountInfo> accountInfo = std::make_shared<AccountInfo>(); - _queryProcessor.AddQuery(LoginDatabase.AsyncQuery(stmt).WithChainingPreparedCallback([this, accountInfo, asyncContinuation](QueryCallback& callback, PreparedQueryResult result) + _queryProcessor.AddCallback(LoginDatabase.AsyncQuery(stmt).WithChainingPreparedCallback([this, accountInfo, asyncContinuation](QueryCallback& callback, PreparedQueryResult result) { Battlenet::Services::Authentication asyncContinuationService(this); NoData response; diff --git a/src/server/bnetserver/Server/Session.h b/src/server/bnetserver/Server/Session.h index 49a93303a1d..5738b43e381 100644 --- a/src/server/bnetserver/Server/Session.h +++ b/src/server/bnetserver/Server/Session.h @@ -18,13 +18,13 @@ #ifndef Session_h__ #define Session_h__ +#include "AsyncCallbackProcessor.h" #include "Realm.h" #include "SslContext.h" #include "SslSocket.h" #include "Socket.h" #include "BigNumber.h" #include "QueryResult.h" -#include "QueryCallbackProcessor.h" #include <boost/asio/ip/tcp.hpp> #include <boost/asio/ssl.hpp> #include <google/protobuf/message.h> diff --git a/src/server/database/Database/DatabaseEnvFwd.h b/src/server/database/Database/DatabaseEnvFwd.h index b743c88506d..e981be5c026 100644 --- a/src/server/database/Database/DatabaseEnvFwd.h +++ b/src/server/database/Database/DatabaseEnvFwd.h @@ -24,9 +24,9 @@ class Field; class ResultSet; -typedef std::shared_ptr<ResultSet> QueryResult; -typedef std::future<QueryResult> QueryResultFuture; -typedef std::promise<QueryResult> QueryResultPromise; +using QueryResult = std::shared_ptr<ResultSet>; +using QueryResultFuture = std::future<QueryResult>; +using QueryResultPromise = std::promise<QueryResult>; class CharacterDatabaseConnection; class HotfixDatabaseConnection; @@ -44,17 +44,27 @@ using LoginDatabasePreparedStatement = PreparedStatement<LoginDatabaseConnection using WorldDatabasePreparedStatement = PreparedStatement<WorldDatabaseConnection>; class PreparedResultSet; -typedef std::shared_ptr<PreparedResultSet> PreparedQueryResult; -typedef std::future<PreparedQueryResult> PreparedQueryResultFuture; -typedef std::promise<PreparedQueryResult> PreparedQueryResultPromise; +using PreparedQueryResult = std::shared_ptr<PreparedResultSet>; +using PreparedQueryResultFuture = std::future<PreparedQueryResult>; +using PreparedQueryResultPromise = std::promise<PreparedQueryResult>; class QueryCallback; +template<typename T> +class AsyncCallbackProcessor; + +using QueryCallbackProcessor = AsyncCallbackProcessor<QueryCallback>; + class TransactionBase; +using TransactionFuture = std::future<bool>; +using TransactionPromise = std::promise<bool>; + template<typename T> class Transaction; +class TransactionCallback; + template<typename T> using SQLTransaction = std::shared_ptr<Transaction<T>>; @@ -64,8 +74,8 @@ using LoginDatabaseTransaction = SQLTransaction<LoginDatabaseConnection>; using WorldDatabaseTransaction = SQLTransaction<WorldDatabaseConnection>; class SQLQueryHolderBase; -typedef std::future<SQLQueryHolderBase*> QueryResultHolderFuture; -typedef std::promise<SQLQueryHolderBase*> QueryResultHolderPromise; +using QueryResultHolderFuture = std::future<SQLQueryHolderBase*>; +using QueryResultHolderPromise = std::promise<SQLQueryHolderBase*>; template<typename T> class SQLQueryHolder; diff --git a/src/server/database/Database/DatabaseWorkerPool.cpp b/src/server/database/Database/DatabaseWorkerPool.cpp index daf27b206fd..b5eb4cfaeee 100644 --- a/src/server/database/Database/DatabaseWorkerPool.cpp +++ b/src/server/database/Database/DatabaseWorkerPool.cpp @@ -262,6 +262,32 @@ void DatabaseWorkerPool<T>::CommitTransaction(SQLTransaction<T> transaction) } template <class T> +TransactionCallback DatabaseWorkerPool<T>::AsyncCommitTransaction(SQLTransaction<T> transaction) +{ +#ifdef TRINITY_DEBUG + //! Only analyze transaction weaknesses in Debug mode. + //! Ideally we catch the faults in Debug mode and then correct them, + //! so there's no need to waste these CPU cycles in Release mode. + switch (transaction->GetSize()) + { + case 0: + TC_LOG_DEBUG("sql.driver", "Transaction contains 0 queries. Not executing."); + return; + case 1: + TC_LOG_DEBUG("sql.driver", "Warning: Transaction only holds 1 query, consider removing Transaction context in code."); + break; + default: + break; + } +#endif // TRINITY_DEBUG + + TransactionWithResultTask* task = new TransactionWithResultTask(transaction); + TransactionFuture result = task->GetFuture(); + Enqueue(task); + return TransactionCallback(std::move(result)); +} + +template <class T> void DatabaseWorkerPool<T>::DirectCommitTransaction(SQLTransaction<T>& transaction) { T* connection = GetFreeConnection(); diff --git a/src/server/database/Database/DatabaseWorkerPool.h b/src/server/database/Database/DatabaseWorkerPool.h index d321c78fa56..e84b1072733 100644 --- a/src/server/database/Database/DatabaseWorkerPool.h +++ b/src/server/database/Database/DatabaseWorkerPool.h @@ -173,6 +173,10 @@ class DatabaseWorkerPool //! were appended to the transaction will be respected during execution. void CommitTransaction(SQLTransaction<T> transaction); + //! Enqueues a collection of one-way SQL operations (can be both adhoc and prepared). The order in which these operations + //! were appended to the transaction will be respected during execution. + TransactionCallback AsyncCommitTransaction(SQLTransaction<T> transaction); + //! Directly executes a collection of one-way SQL operations (can be both adhoc and prepared). The order in which these operations //! were appended to the transaction will be respected during execution. void DirectCommitTransaction(SQLTransaction<T>& transaction); diff --git a/src/server/database/Database/QueryCallback.cpp b/src/server/database/Database/QueryCallback.cpp index 65b52ff7627..02365a76fe7 100644 --- a/src/server/database/Database/QueryCallback.cpp +++ b/src/server/database/Database/QueryCallback.cpp @@ -175,7 +175,7 @@ void QueryCallback::SetNextQuery(QueryCallback&& next) MoveFrom(this, std::move(next)); } -QueryCallback::Status QueryCallback::InvokeIfReady() +bool QueryCallback::InvokeIfReady() { QueryCallbackData& callback = _callbacks.front(); auto checkStateAndReturnCompletion = [this]() @@ -185,15 +185,15 @@ QueryCallback::Status QueryCallback::InvokeIfReady() if (_callbacks.empty()) { ASSERT(!hasNext); - return Completed; + return true; } // abort chain if (!hasNext) - return Completed; + return true; ASSERT(_isPrepared == _callbacks.front()._isPrepared); - return NextStep; + return false; }; if (!_isPrepared) @@ -217,5 +217,5 @@ QueryCallback::Status QueryCallback::InvokeIfReady() } } - return NotReady; + return false; } diff --git a/src/server/database/Database/QueryCallback.h b/src/server/database/Database/QueryCallback.h index 1d7e835e999..6cc315964b7 100644 --- a/src/server/database/Database/QueryCallback.h +++ b/src/server/database/Database/QueryCallback.h @@ -44,14 +44,8 @@ public: // Moves std::future from next to this object void SetNextQuery(QueryCallback&& next); - enum Status - { - NotReady, - NextStep, - Completed - }; - - Status InvokeIfReady(); + // returns true when completed + bool InvokeIfReady(); private: QueryCallback(QueryCallback const& right) = delete; diff --git a/src/server/database/Database/QueryCallbackProcessor.cpp b/src/server/database/Database/QueryCallbackProcessor.cpp deleted file mode 100644 index 546a6d17c04..00000000000 --- a/src/server/database/Database/QueryCallbackProcessor.cpp +++ /dev/null @@ -1,48 +0,0 @@ -/* - * This file is part of the TrinityCore Project. See AUTHORS file for Copyright information - * - * This program is free software; you can redistribute it and/or modify it - * under the terms of the GNU General Public License as published by the - * Free Software Foundation; either version 2 of the License, or (at your - * option) any later version. - * - * This program is distributed in the hope that it will be useful, but WITHOUT - * ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or - * FITNESS FOR A PARTICULAR PURPOSE. See the GNU General Public License for - * more details. - * - * You should have received a copy of the GNU General Public License along - * with this program. If not, see <http://www.gnu.org/licenses/>. - */ - -#include "QueryCallbackProcessor.h" -#include "QueryCallback.h" -#include <algorithm> - -QueryCallbackProcessor::QueryCallbackProcessor() -{ -} - -QueryCallbackProcessor::~QueryCallbackProcessor() -{ -} - -void QueryCallbackProcessor::AddQuery(QueryCallback&& query) -{ - _callbacks.emplace_back(std::move(query)); -} - -void QueryCallbackProcessor::ProcessReadyQueries() -{ - if (_callbacks.empty()) - return; - - std::vector<QueryCallback> updateCallbacks{ std::move(_callbacks) }; - - updateCallbacks.erase(std::remove_if(updateCallbacks.begin(), updateCallbacks.end(), [](QueryCallback& callback) - { - return callback.InvokeIfReady() == QueryCallback::Completed; - }), updateCallbacks.end()); - - _callbacks.insert(_callbacks.end(), std::make_move_iterator(updateCallbacks.begin()), std::make_move_iterator(updateCallbacks.end())); -} diff --git a/src/server/database/Database/QueryCallbackProcessor.h b/src/server/database/Database/QueryCallbackProcessor.h deleted file mode 100644 index 85596e90092..00000000000 --- a/src/server/database/Database/QueryCallbackProcessor.h +++ /dev/null @@ -1,42 +0,0 @@ -/* - * This file is part of the TrinityCore Project. See AUTHORS file for Copyright information - * - * This program is free software; you can redistribute it and/or modify it - * under the terms of the GNU General Public License as published by the - * Free Software Foundation; either version 2 of the License, or (at your - * option) any later version. - * - * This program is distributed in the hope that it will be useful, but WITHOUT - * ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or - * FITNESS FOR A PARTICULAR PURPOSE. See the GNU General Public License for - * more details. - * - * You should have received a copy of the GNU General Public License along - * with this program. If not, see <http://www.gnu.org/licenses/>. - */ - -#ifndef QueryCallbackProcessor_h__ -#define QueryCallbackProcessor_h__ - -#include "Define.h" -#include <vector> - -class QueryCallback; - -class TC_DATABASE_API QueryCallbackProcessor -{ -public: - QueryCallbackProcessor(); - ~QueryCallbackProcessor(); - - void AddQuery(QueryCallback&& query); - void ProcessReadyQueries(); - -private: - QueryCallbackProcessor(QueryCallbackProcessor const&) = delete; - QueryCallbackProcessor& operator=(QueryCallbackProcessor const&) = delete; - - std::vector<QueryCallback> _callbacks; -}; - -#endif // QueryCallbackProcessor_h__ diff --git a/src/server/database/Database/Transaction.cpp b/src/server/database/Database/Transaction.cpp index ecf0ba25a7b..27aa2a4808c 100644 --- a/src/server/database/Database/Transaction.cpp +++ b/src/server/database/Database/Transaction.cpp @@ -65,7 +65,7 @@ void TransactionBase::Cleanup() bool TransactionTask::Execute() { - int errorCode = m_conn->ExecuteTransaction(m_trans); + int errorCode = TryExecute(); if (!errorCode) return true; @@ -75,12 +75,64 @@ bool TransactionTask::Execute() std::lock_guard<std::mutex> lock(_deadlockLock); uint8 loopBreaker = 5; // Handle MySQL Errno 1213 without extending deadlock to the core itself for (uint8 i = 0; i < loopBreaker; ++i) - if (!m_conn->ExecuteTransaction(m_trans)) + if (!TryExecute()) return true; } // Clean up now. + CleanupOnFailure(); + + return false; +} + +int TransactionTask::TryExecute() +{ + return m_conn->ExecuteTransaction(m_trans); +} + +void TransactionTask::CleanupOnFailure() +{ m_trans->Cleanup(); +} + +bool TransactionWithResultTask::Execute() +{ + int errorCode = TryExecute(); + if (!errorCode) + { + m_result.set_value(true); + return true; + } + + if (errorCode == ER_LOCK_DEADLOCK) + { + // Make sure only 1 async thread retries a transaction so they don't keep dead-locking each other + std::lock_guard<std::mutex> lock(_deadlockLock); + uint8 loopBreaker = 5; // Handle MySQL Errno 1213 without extending deadlock to the core itself + for (uint8 i = 0; i < loopBreaker; ++i) + { + if (!TryExecute()) + { + m_result.set_value(true); + return true; + } + } + } + + // Clean up now. + CleanupOnFailure(); + m_result.set_value(false); + + return false; +} + +bool TransactionCallback::InvokeIfReady() +{ + if (m_future.valid() && m_future.wait_for(std::chrono::seconds(0)) == std::future_status::ready) + { + m_callback(m_future.get()); + return true; + } return false; } diff --git a/src/server/database/Database/Transaction.h b/src/server/database/Database/Transaction.h index 16fc08fb0d8..36f7d3ec749 100644 --- a/src/server/database/Database/Transaction.h +++ b/src/server/database/Database/Transaction.h @@ -22,6 +22,7 @@ #include "DatabaseEnvFwd.h" #include "SQLOperation.h" #include "StringFormat.h" +#include <functional> #include <mutex> #include <vector> @@ -72,6 +73,7 @@ class TC_DATABASE_API TransactionTask : public SQLOperation { template <class T> friend class DatabaseWorkerPool; friend class DatabaseWorker; + friend class TransactionCallback; public: TransactionTask(std::shared_ptr<TransactionBase> trans) : m_trans(trans) { } @@ -79,9 +81,43 @@ class TC_DATABASE_API TransactionTask : public SQLOperation protected: bool Execute() override; + int TryExecute(); + void CleanupOnFailure(); std::shared_ptr<TransactionBase> m_trans; static std::mutex _deadlockLock; }; +class TC_DATABASE_API TransactionWithResultTask : public TransactionTask +{ +public: + TransactionWithResultTask(std::shared_ptr<TransactionBase> trans) : TransactionTask(trans) { } + + TransactionFuture GetFuture() { return m_result.get_future(); } + +protected: + bool Execute() override; + + TransactionPromise m_result; +}; + +class TC_DATABASE_API TransactionCallback +{ +public: + TransactionCallback(TransactionFuture&& future) : m_future(std::move(future)) { } + TransactionCallback(TransactionCallback&&) = default; + + TransactionCallback& operator=(TransactionCallback&&) = default; + + void AfterComplete(std::function<void(bool)> callback) & + { + m_callback = std::move(callback); + } + + bool InvokeIfReady(); + + TransactionFuture m_future; + std::function<void(bool)> m_callback; +}; + #endif diff --git a/src/server/game/Handlers/CharacterHandler.cpp b/src/server/game/Handlers/CharacterHandler.cpp index bfc3bfbedc4..98eb6410960 100644 --- a/src/server/game/Handlers/CharacterHandler.cpp +++ b/src/server/game/Handlers/CharacterHandler.cpp @@ -402,7 +402,7 @@ void WorldSession::HandleCharEnumOpcode(WorldPackets::Character::EnumCharacters& stmt->setUInt8(0, PET_SAVE_AS_CURRENT); stmt->setUInt32(1, GetAccountId()); - _queryProcessor.AddQuery(CharacterDatabase.AsyncQuery(stmt).WithPreparedCallback(std::bind(&WorldSession::HandleCharEnum, this, std::placeholders::_1))); + _queryProcessor.AddCallback(CharacterDatabase.AsyncQuery(stmt).WithPreparedCallback(std::bind(&WorldSession::HandleCharEnum, this, std::placeholders::_1))); } void WorldSession::HandleCharUndeleteEnum(PreparedQueryResult result) @@ -444,7 +444,7 @@ void WorldSession::HandleCharUndeleteEnumOpcode(WorldPackets::Character::EnumCha stmt->setUInt8(0, PET_SAVE_AS_CURRENT); stmt->setUInt32(1, GetAccountId()); - _queryProcessor.AddQuery(CharacterDatabase.AsyncQuery(stmt).WithPreparedCallback(std::bind(&WorldSession::HandleCharUndeleteEnum, this, std::placeholders::_1))); + _queryProcessor.AddCallback(CharacterDatabase.AsyncQuery(stmt).WithPreparedCallback(std::bind(&WorldSession::HandleCharUndeleteEnum, this, std::placeholders::_1))); } void WorldSession::HandleCharCreateOpcode(WorldPackets::Character::CreateCharacter& charCreate) @@ -582,7 +582,7 @@ void WorldSession::HandleCharCreateOpcode(WorldPackets::Character::CreateCharact CharacterDatabasePreparedStatement* stmt = CharacterDatabase.GetPreparedStatement(CHAR_SEL_CHECK_NAME); stmt->setString(0, charCreate.CreateInfo->Name); - _queryProcessor.AddQuery(CharacterDatabase.AsyncQuery(stmt) + _queryProcessor.AddCallback(CharacterDatabase.AsyncQuery(stmt) .WithChainingPreparedCallback([this](QueryCallback& queryCallback, PreparedQueryResult result) { if (result) @@ -1305,7 +1305,7 @@ void WorldSession::HandleCharRenameOpcode(WorldPackets::Character::CharacterRena stmt->setUInt64(0, request.RenameInfo->Guid.GetCounter()); stmt->setString(1, request.RenameInfo->NewName); - _queryProcessor.AddQuery(CharacterDatabase.AsyncQuery(stmt) + _queryProcessor.AddCallback(CharacterDatabase.AsyncQuery(stmt) .WithPreparedCallback(std::bind(&WorldSession::HandleCharRenameCallBack, this, request.RenameInfo, std::placeholders::_1))); } @@ -1514,7 +1514,7 @@ void WorldSession::HandleCharCustomizeOpcode(WorldPackets::Character::CharCustom CharacterDatabasePreparedStatement* stmt = CharacterDatabase.GetPreparedStatement(CHAR_SEL_CHAR_CUSTOMIZE_INFO); stmt->setUInt64(0, packet.CustomizeInfo->CharGUID.GetCounter()); - _queryProcessor.AddQuery(CharacterDatabase.AsyncQuery(stmt) + _queryProcessor.AddCallback(CharacterDatabase.AsyncQuery(stmt) .WithPreparedCallback(std::bind(&WorldSession::HandleCharCustomizeCallback, this, packet.CustomizeInfo, std::placeholders::_1))); } @@ -1799,7 +1799,7 @@ void WorldSession::HandleCharRaceOrFactionChangeOpcode(WorldPackets::Character:: CharacterDatabasePreparedStatement* stmt = CharacterDatabase.GetPreparedStatement(CHAR_SEL_CHAR_RACE_OR_FACTION_CHANGE_INFOS); stmt->setUInt64(0, packet.RaceOrFactionChangeInfo->Guid.GetCounter()); - _queryProcessor.AddQuery(CharacterDatabase.AsyncQuery(stmt) + _queryProcessor.AddCallback(CharacterDatabase.AsyncQuery(stmt) .WithPreparedCallback(std::bind(&WorldSession::HandleCharRaceOrFactionChangeCallback, this, packet.RaceOrFactionChangeInfo, std::placeholders::_1))); } @@ -2385,7 +2385,7 @@ void WorldSession::HandleGetUndeleteCooldownStatus(WorldPackets::Character::GetU LoginDatabasePreparedStatement* stmt = LoginDatabase.GetPreparedStatement(LOGIN_SEL_LAST_CHAR_UNDELETE); stmt->setUInt32(0, GetBattlenetAccountId()); - _queryProcessor.AddQuery(LoginDatabase.AsyncQuery(stmt).WithPreparedCallback(std::bind(&WorldSession::HandleUndeleteCooldownStatusCallback, this, std::placeholders::_1))); + _queryProcessor.AddCallback(LoginDatabase.AsyncQuery(stmt).WithPreparedCallback(std::bind(&WorldSession::HandleUndeleteCooldownStatusCallback, this, std::placeholders::_1))); } void WorldSession::HandleUndeleteCooldownStatusCallback(PreparedQueryResult result) @@ -2415,7 +2415,7 @@ void WorldSession::HandleCharUndeleteOpcode(WorldPackets::Character::UndeleteCha stmt->setUInt32(0, GetBattlenetAccountId()); std::shared_ptr<WorldPackets::Character::CharacterUndeleteInfo> undeleteInfo = undeleteCharacter.UndeleteInfo; - _queryProcessor.AddQuery(LoginDatabase.AsyncQuery(stmt) + _queryProcessor.AddCallback(LoginDatabase.AsyncQuery(stmt) .WithChainingPreparedCallback([this, undeleteInfo](QueryCallback& queryCallback, PreparedQueryResult result) { if (result) diff --git a/src/server/game/Handlers/NPCHandler.cpp b/src/server/game/Handlers/NPCHandler.cpp index 84238bc686b..08981180970 100644 --- a/src/server/game/Handlers/NPCHandler.cpp +++ b/src/server/game/Handlers/NPCHandler.cpp @@ -375,7 +375,7 @@ void WorldSession::SendStablePet(ObjectGuid guid) stmt->setUInt8(1, PET_SAVE_FIRST_STABLE_SLOT); stmt->setUInt8(2, PET_SAVE_LAST_STABLE_SLOT); - _queryProcessor.AddQuery(CharacterDatabase.AsyncQuery(stmt).WithPreparedCallback(std::bind(&WorldSession::SendStablePetCallback, this, guid, std::placeholders::_1))); + _queryProcessor.AddCallback(CharacterDatabase.AsyncQuery(stmt).WithPreparedCallback(std::bind(&WorldSession::SendStablePetCallback, this, guid, std::placeholders::_1))); } void WorldSession::SendStablePetCallback(ObjectGuid guid, PreparedQueryResult result) @@ -474,7 +474,7 @@ void WorldSession::HandleStablePet(WorldPacket& recvData) stmt->setUInt8(1, PET_SAVE_FIRST_STABLE_SLOT); stmt->setUInt8(2, PET_SAVE_LAST_STABLE_SLOT); - _queryProcessor.AddQuery(CharacterDatabase.AsyncQuery(stmt).WithPreparedCallback(std::bind(&WorldSession::HandleStablePetCallback, this, std::placeholders::_1))); + _queryProcessor.AddCallback(CharacterDatabase.AsyncQuery(stmt).WithPreparedCallback(std::bind(&WorldSession::HandleStablePetCallback, this, std::placeholders::_1))); } void WorldSession::HandleStablePetCallback(PreparedQueryResult result) @@ -534,7 +534,7 @@ void WorldSession::HandleUnstablePet(WorldPacket& recvData) stmt->setUInt8(2, PET_SAVE_FIRST_STABLE_SLOT); stmt->setUInt8(3, PET_SAVE_LAST_STABLE_SLOT); - _queryProcessor.AddQuery(CharacterDatabase.AsyncQuery(stmt).WithPreparedCallback(std::bind(&WorldSession::HandleUnstablePetCallback, this, petnumber, std::placeholders::_1))); + _queryProcessor.AddCallback(CharacterDatabase.AsyncQuery(stmt).WithPreparedCallback(std::bind(&WorldSession::HandleUnstablePetCallback, this, petnumber, std::placeholders::_1))); } void WorldSession::HandleUnstablePetCallback(uint32 petId, PreparedQueryResult result) @@ -658,7 +658,7 @@ void WorldSession::HandleStableSwapPet(WorldPacket& recvData) stmt->setUInt64(0, _player->GetGUID().GetCounter()); stmt->setUInt32(1, petId); - _queryProcessor.AddQuery(CharacterDatabase.AsyncQuery(stmt).WithPreparedCallback(std::bind(&WorldSession::HandleStableSwapPetCallback, this, petId, std::placeholders::_1))); + _queryProcessor.AddCallback(CharacterDatabase.AsyncQuery(stmt).WithPreparedCallback(std::bind(&WorldSession::HandleStableSwapPetCallback, this, petId, std::placeholders::_1))); } void WorldSession::HandleStableSwapPetCallback(uint32 petId, PreparedQueryResult result) diff --git a/src/server/game/Handlers/SpellHandler.cpp b/src/server/game/Handlers/SpellHandler.cpp index e9e83554996..bf2f9dc3cd5 100644 --- a/src/server/game/Handlers/SpellHandler.cpp +++ b/src/server/game/Handlers/SpellHandler.cpp @@ -193,7 +193,7 @@ void WorldSession::HandleOpenItemOpcode(WorldPackets::Spells::OpenItem& packet) { CharacterDatabasePreparedStatement* stmt = CharacterDatabase.GetPreparedStatement(CHAR_SEL_CHARACTER_GIFT_BY_ITEM); stmt->setUInt64(0, item->GetGUID().GetCounter()); - _queryProcessor.AddQuery(CharacterDatabase.AsyncQuery(stmt) + _queryProcessor.AddCallback(CharacterDatabase.AsyncQuery(stmt) .WithPreparedCallback(std::bind(&WorldSession::HandleOpenWrappedItemCallback, this, item->GetPos(), item->GetGUID(), std::placeholders::_1))); } else diff --git a/src/server/game/Server/WorldSession.cpp b/src/server/game/Server/WorldSession.cpp index 21993a4d311..0b7c1583faf 100644 --- a/src/server/game/Server/WorldSession.cpp +++ b/src/server/game/Server/WorldSession.cpp @@ -234,7 +234,7 @@ void WorldSession::SendPacket(WorldPacket const* packet, bool forced /*= false*/ { if (packet->GetConnection() != CONNECTION_TYPE_INSTANCE && IsInstanceOnlyOpcode(packet->GetOpcode())) { - TC_LOG_ERROR("network.opcode", "Prevented sending of instance only opcode %u with connection type %u to %s", packet->GetOpcode(), packet->GetConnection(), GetPlayerInfo().c_str()); + TC_LOG_ERROR("network.opcode", "Prevented sending of instance only opcode %u with connection type %u to %s", packet->GetOpcode(), uint32(packet->GetConnection()), GetPlayerInfo().c_str()); return; } @@ -243,7 +243,7 @@ void WorldSession::SendPacket(WorldPacket const* packet, bool forced /*= false*/ if (!m_Socket[conIdx]) { - TC_LOG_ERROR("network.opcode", "Prevented sending of %s to non existent socket %u to %s", GetOpcodeNameForLogging(static_cast<OpcodeServer>(packet->GetOpcode())).c_str(), conIdx, GetPlayerInfo().c_str()); + TC_LOG_ERROR("network.opcode", "Prevented sending of %s to non existent socket %u to %s", GetOpcodeNameForLogging(static_cast<OpcodeServer>(packet->GetOpcode())).c_str(), uint32(conIdx), GetPlayerInfo().c_str()); return; } @@ -883,7 +883,8 @@ void WorldSession::SetPlayer(Player* player) void WorldSession::ProcessQueryCallbacks() { - _queryProcessor.ProcessReadyQueries(); + _queryProcessor.ProcessReadyCallbacks(); + _transactionCallbacks.ProcessReadyCallbacks(); if (_realmAccountLoginCallback.valid() && _realmAccountLoginCallback.wait_for(std::chrono::seconds(0)) == std::future_status::ready && _accountLoginCallback.valid() && _accountLoginCallback.wait_for(std::chrono::seconds(0)) == std::future_status::ready) @@ -895,6 +896,11 @@ void WorldSession::ProcessQueryCallbacks() HandlePlayerLogin(reinterpret_cast<LoginQueryHolder*>(_charLoginCallback.get())); } +TransactionCallback& WorldSession::AddTransactionCallback(TransactionCallback&& callback) +{ + return _transactionCallbacks.AddCallback(std::move(callback)); +} + void WorldSession::InitWarden(BigNumber* k) { if (_os == "Win") diff --git a/src/server/game/Server/WorldSession.h b/src/server/game/Server/WorldSession.h index 635d2a16ce1..1f8b8100b02 100644 --- a/src/server/game/Server/WorldSession.h +++ b/src/server/game/Server/WorldSession.h @@ -23,11 +23,11 @@ #define __WORLDSESSION_H #include "Common.h" +#include "AsyncCallbackProcessor.h" #include "DatabaseEnvFwd.h" #include "LockedQueue.h" #include "ObjectGuid.h" #include "Packet.h" -#include "QueryCallbackProcessor.h" #include "SharedDefines.h" #include <array> #include <unordered_map> @@ -1746,6 +1746,7 @@ class TC_GAME_API WorldSession public: QueryCallbackProcessor& GetQueryProcessor() { return _queryProcessor; } + TransactionCallback& AddTransactionCallback(TransactionCallback&& callback); private: void ProcessQueryCallbacks(); @@ -1755,6 +1756,7 @@ class TC_GAME_API WorldSession QueryResultHolderFuture _charLoginCallback; QueryCallbackProcessor _queryProcessor; + AsyncCallbackProcessor<TransactionCallback> _transactionCallbacks; friend class World; protected: diff --git a/src/server/game/Server/WorldSocket.cpp b/src/server/game/Server/WorldSocket.cpp index 9c4ea31db48..36363f3c3aa 100644 --- a/src/server/game/Server/WorldSocket.cpp +++ b/src/server/game/Server/WorldSocket.cpp @@ -92,7 +92,7 @@ void WorldSocket::Start() LoginDatabasePreparedStatement* stmt = LoginDatabase.GetPreparedStatement(LOGIN_SEL_IP_INFO); stmt->setString(0, ip_address); - _queryProcessor.AddQuery(LoginDatabase.AsyncQuery(stmt).WithPreparedCallback(std::bind(&WorldSocket::CheckIpCallback, this, std::placeholders::_1))); + _queryProcessor.AddCallback(LoginDatabase.AsyncQuery(stmt).WithPreparedCallback(std::bind(&WorldSocket::CheckIpCallback, this, std::placeholders::_1))); } void WorldSocket::CheckIpCallback(PreparedQueryResult result) @@ -230,7 +230,7 @@ bool WorldSocket::Update() if (!BaseSocket::Update()) return false; - _queryProcessor.ProcessReadyQueries(); + _queryProcessor.ProcessReadyCallbacks(); return true; } @@ -653,7 +653,7 @@ void WorldSocket::HandleAuthSession(std::shared_ptr<WorldPackets::Auth::AuthSess stmt->setInt32(0, int32(realm.Id.Realm)); stmt->setString(1, authSession->RealmJoinTicket); - _queryProcessor.AddQuery(LoginDatabase.AsyncQuery(stmt).WithPreparedCallback(std::bind(&WorldSocket::HandleAuthSessionCallback, this, authSession, std::placeholders::_1))); + _queryProcessor.AddCallback(LoginDatabase.AsyncQuery(stmt).WithPreparedCallback(std::bind(&WorldSocket::HandleAuthSessionCallback, this, authSession, std::placeholders::_1))); } void WorldSocket::HandleAuthSessionCallback(std::shared_ptr<WorldPackets::Auth::AuthSession> authSession, PreparedQueryResult result) @@ -852,7 +852,7 @@ void WorldSocket::HandleAuthSessionCallback(std::shared_ptr<WorldPackets::Auth:: if (wardenActive) _worldSession->InitWarden(&_sessionKey); - _queryProcessor.AddQuery(_worldSession->LoadPermissionsAsync().WithPreparedCallback(std::bind(&WorldSocket::LoadSessionPermissionsCallback, this, std::placeholders::_1))); + _queryProcessor.AddCallback(_worldSession->LoadPermissionsAsync().WithPreparedCallback(std::bind(&WorldSocket::LoadSessionPermissionsCallback, this, std::placeholders::_1))); AsyncRead(); } @@ -881,7 +881,7 @@ void WorldSocket::HandleAuthContinuedSession(std::shared_ptr<WorldPackets::Auth: LoginDatabasePreparedStatement* stmt = LoginDatabase.GetPreparedStatement(LOGIN_SEL_ACCOUNT_INFO_CONTINUED_SESSION); stmt->setUInt32(0, accountId); - _queryProcessor.AddQuery(LoginDatabase.AsyncQuery(stmt).WithPreparedCallback(std::bind(&WorldSocket::HandleAuthContinuedSessionCallback, this, authSession, std::placeholders::_1))); + _queryProcessor.AddCallback(LoginDatabase.AsyncQuery(stmt).WithPreparedCallback(std::bind(&WorldSocket::HandleAuthContinuedSessionCallback, this, authSession, std::placeholders::_1))); } void WorldSocket::HandleAuthContinuedSessionCallback(std::shared_ptr<WorldPackets::Auth::AuthContinuedSession> authSession, PreparedQueryResult result) diff --git a/src/server/game/Server/WorldSocket.h b/src/server/game/Server/WorldSocket.h index 48ad3c02bb5..572df283d8a 100644 --- a/src/server/game/Server/WorldSocket.h +++ b/src/server/game/Server/WorldSocket.h @@ -19,10 +19,10 @@ #define __WORLDSOCKET_H__ #include "Common.h" +#include "AsyncCallbackProcessor.h" #include "BigNumber.h" #include "DatabaseEnvFwd.h" #include "MessageBuffer.h" -#include "QueryCallbackProcessor.h" #include "Socket.h" #include "WorldPacketCrypt.h" #include "MPSCQueue.h" diff --git a/src/server/game/World/World.cpp b/src/server/game/World/World.cpp index d470c4746a1..028f1bb7800 100644 --- a/src/server/game/World/World.cpp +++ b/src/server/game/World/World.cpp @@ -3087,7 +3087,7 @@ void World::UpdateRealmCharCount(uint32 accountId) { CharacterDatabasePreparedStatement* stmt = CharacterDatabase.GetPreparedStatement(CHAR_SEL_CHARACTER_COUNT); stmt->setUInt32(0, accountId); - _queryProcessor.AddQuery(CharacterDatabase.AsyncQuery(stmt).WithPreparedCallback(std::bind(&World::_UpdateRealmCharCount, this, std::placeholders::_1))); + _queryProcessor.AddCallback(CharacterDatabase.AsyncQuery(stmt).WithPreparedCallback(std::bind(&World::_UpdateRealmCharCount, this, std::placeholders::_1))); } void World::_UpdateRealmCharCount(PreparedQueryResult resultCharCount) @@ -3508,7 +3508,7 @@ uint32 World::getWorldState(uint32 index) const void World::ProcessQueryCallbacks() { - _queryProcessor.ProcessReadyQueries(); + _queryProcessor.ProcessReadyCallbacks(); } void World::ReloadRBAC() diff --git a/src/server/game/World/World.h b/src/server/game/World/World.h index b1ae0edc8d5..0384ce86174 100644 --- a/src/server/game/World/World.h +++ b/src/server/game/World/World.h @@ -23,10 +23,10 @@ #define __WORLD_H #include "Common.h" +#include "AsyncCallbackProcessor.h" #include "DatabaseEnvFwd.h" #include "LockedQueue.h" #include "ObjectGuid.h" -#include "QueryCallbackProcessor.h" #include "SharedDefines.h" #include "Timer.h" |