Core/DBLayer: Implement async transaction completion callbacks

This commit is contained in:
Shauren
2020-04-14 16:23:44 +02:00
parent 34b393ab64
commit 0f0ca3a919
22 changed files with 248 additions and 145 deletions

View File

@@ -0,0 +1,63 @@
/*
* This file is part of the TrinityCore Project. See AUTHORS file for Copyright information
*
* This program is free software; you can redistribute it and/or modify it
* under the terms of the GNU General Public License as published by the
* Free Software Foundation; either version 2 of the License, or (at your
* option) any later version.
*
* This program is distributed in the hope that it will be useful, but WITHOUT
* ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or
* FITNESS FOR A PARTICULAR PURPOSE. See the GNU General Public License for
* more details.
*
* You should have received a copy of the GNU General Public License along
* with this program. If not, see <http://www.gnu.org/licenses/>.
*/
#ifndef AsyncCallbackProcessor_h__
#define AsyncCallbackProcessor_h__
#include "Define.h"
#include <algorithm>
#include <vector>
//template <class T>
//concept AsyncCallback = requires(T t) { { t.InvokeIfReady() } -> std::convertible_to<bool> };
template<typename T> // requires AsyncCallback<T>
class AsyncCallbackProcessor
{
public:
AsyncCallbackProcessor() = default;
~AsyncCallbackProcessor() = default;
T& AddCallback(T&& query)
{
_callbacks.emplace_back(std::move(query));
return _callbacks.back();
}
void ProcessReadyCallbacks()
{
if (_callbacks.empty())
return;
std::vector<T> updateCallbacks{ std::move(_callbacks) };
updateCallbacks.erase(std::remove_if(updateCallbacks.begin(), updateCallbacks.end(), [](T& callback)
{
return callback.InvokeIfReady();
}), updateCallbacks.end());
_callbacks.insert(_callbacks.end(), std::make_move_iterator(updateCallbacks.begin()), std::make_move_iterator(updateCallbacks.end()));
}
private:
AsyncCallbackProcessor(AsyncCallbackProcessor const&) = delete;
AsyncCallbackProcessor& operator=(AsyncCallbackProcessor const&) = delete;
std::vector<T> _callbacks;
};
#endif // AsyncCallbackProcessor_h__

View File

@@ -47,7 +47,7 @@ public:
bool InvokeIfReady()
{
ASSERT(_callback);
return _callback->InvokeIfReady() == QueryCallback::Completed;
return _callback->InvokeIfReady();
}
soap* GetClient() { return &_client; }

View File

@@ -94,7 +94,7 @@ void Battlenet::Session::Start()
LoginDatabasePreparedStatement* stmt = LoginDatabase.GetPreparedStatement(LOGIN_SEL_IP_INFO);
stmt->setString(0, ip_address);
_queryProcessor.AddQuery(LoginDatabase.AsyncQuery(stmt).WithPreparedCallback(std::bind(&Battlenet::Session::CheckIpCallback, this, std::placeholders::_1)));
_queryProcessor.AddCallback(LoginDatabase.AsyncQuery(stmt).WithPreparedCallback(std::bind(&Battlenet::Session::CheckIpCallback, this, std::placeholders::_1)));
}
void Battlenet::Session::CheckIpCallback(PreparedQueryResult result)
@@ -126,7 +126,7 @@ bool Battlenet::Session::Update()
if (!BattlenetSocket::Update())
return false;
_queryProcessor.ProcessReadyQueries();
_queryProcessor.ProcessReadyCallbacks();
return true;
}
@@ -258,7 +258,7 @@ uint32 Battlenet::Session::VerifyWebCredentials(std::string const& webCredential
std::function<void(ServiceBase*, uint32, ::google::protobuf::Message const*)> asyncContinuation = std::move(continuation);
std::shared_ptr<AccountInfo> accountInfo = std::make_shared<AccountInfo>();
_queryProcessor.AddQuery(LoginDatabase.AsyncQuery(stmt).WithChainingPreparedCallback([this, accountInfo, asyncContinuation](QueryCallback& callback, PreparedQueryResult result)
_queryProcessor.AddCallback(LoginDatabase.AsyncQuery(stmt).WithChainingPreparedCallback([this, accountInfo, asyncContinuation](QueryCallback& callback, PreparedQueryResult result)
{
Battlenet::Services::Authentication asyncContinuationService(this);
NoData response;

View File

@@ -18,13 +18,13 @@
#ifndef Session_h__
#define Session_h__
#include "AsyncCallbackProcessor.h"
#include "Realm.h"
#include "SslContext.h"
#include "SslSocket.h"
#include "Socket.h"
#include "BigNumber.h"
#include "QueryResult.h"
#include "QueryCallbackProcessor.h"
#include <boost/asio/ip/tcp.hpp>
#include <boost/asio/ssl.hpp>
#include <google/protobuf/message.h>

View File

@@ -24,9 +24,9 @@
class Field;
class ResultSet;
typedef std::shared_ptr<ResultSet> QueryResult;
typedef std::future<QueryResult> QueryResultFuture;
typedef std::promise<QueryResult> QueryResultPromise;
using QueryResult = std::shared_ptr<ResultSet>;
using QueryResultFuture = std::future<QueryResult>;
using QueryResultPromise = std::promise<QueryResult>;
class CharacterDatabaseConnection;
class HotfixDatabaseConnection;
@@ -44,17 +44,27 @@ using LoginDatabasePreparedStatement = PreparedStatement<LoginDatabaseConnection
using WorldDatabasePreparedStatement = PreparedStatement<WorldDatabaseConnection>;
class PreparedResultSet;
typedef std::shared_ptr<PreparedResultSet> PreparedQueryResult;
typedef std::future<PreparedQueryResult> PreparedQueryResultFuture;
typedef std::promise<PreparedQueryResult> PreparedQueryResultPromise;
using PreparedQueryResult = std::shared_ptr<PreparedResultSet>;
using PreparedQueryResultFuture = std::future<PreparedQueryResult>;
using PreparedQueryResultPromise = std::promise<PreparedQueryResult>;
class QueryCallback;
template<typename T>
class AsyncCallbackProcessor;
using QueryCallbackProcessor = AsyncCallbackProcessor<QueryCallback>;
class TransactionBase;
using TransactionFuture = std::future<bool>;
using TransactionPromise = std::promise<bool>;
template<typename T>
class Transaction;
class TransactionCallback;
template<typename T>
using SQLTransaction = std::shared_ptr<Transaction<T>>;
@@ -64,8 +74,8 @@ using LoginDatabaseTransaction = SQLTransaction<LoginDatabaseConnection>;
using WorldDatabaseTransaction = SQLTransaction<WorldDatabaseConnection>;
class SQLQueryHolderBase;
typedef std::future<SQLQueryHolderBase*> QueryResultHolderFuture;
typedef std::promise<SQLQueryHolderBase*> QueryResultHolderPromise;
using QueryResultHolderFuture = std::future<SQLQueryHolderBase*>;
using QueryResultHolderPromise = std::promise<SQLQueryHolderBase*>;
template<typename T>
class SQLQueryHolder;

View File

@@ -261,6 +261,32 @@ void DatabaseWorkerPool<T>::CommitTransaction(SQLTransaction<T> transaction)
Enqueue(new TransactionTask(transaction));
}
template <class T>
TransactionCallback DatabaseWorkerPool<T>::AsyncCommitTransaction(SQLTransaction<T> transaction)
{
#ifdef TRINITY_DEBUG
//! Only analyze transaction weaknesses in Debug mode.
//! Ideally we catch the faults in Debug mode and then correct them,
//! so there's no need to waste these CPU cycles in Release mode.
switch (transaction->GetSize())
{
case 0:
TC_LOG_DEBUG("sql.driver", "Transaction contains 0 queries. Not executing.");
return;
case 1:
TC_LOG_DEBUG("sql.driver", "Warning: Transaction only holds 1 query, consider removing Transaction context in code.");
break;
default:
break;
}
#endif // TRINITY_DEBUG
TransactionWithResultTask* task = new TransactionWithResultTask(transaction);
TransactionFuture result = task->GetFuture();
Enqueue(task);
return TransactionCallback(std::move(result));
}
template <class T>
void DatabaseWorkerPool<T>::DirectCommitTransaction(SQLTransaction<T>& transaction)
{

View File

@@ -173,6 +173,10 @@ class DatabaseWorkerPool
//! were appended to the transaction will be respected during execution.
void CommitTransaction(SQLTransaction<T> transaction);
//! Enqueues a collection of one-way SQL operations (can be both adhoc and prepared). The order in which these operations
//! were appended to the transaction will be respected during execution.
TransactionCallback AsyncCommitTransaction(SQLTransaction<T> transaction);
//! Directly executes a collection of one-way SQL operations (can be both adhoc and prepared). The order in which these operations
//! were appended to the transaction will be respected during execution.
void DirectCommitTransaction(SQLTransaction<T>& transaction);

View File

@@ -175,7 +175,7 @@ void QueryCallback::SetNextQuery(QueryCallback&& next)
MoveFrom(this, std::move(next));
}
QueryCallback::Status QueryCallback::InvokeIfReady()
bool QueryCallback::InvokeIfReady()
{
QueryCallbackData& callback = _callbacks.front();
auto checkStateAndReturnCompletion = [this]()
@@ -185,15 +185,15 @@ QueryCallback::Status QueryCallback::InvokeIfReady()
if (_callbacks.empty())
{
ASSERT(!hasNext);
return Completed;
return true;
}
// abort chain
if (!hasNext)
return Completed;
return true;
ASSERT(_isPrepared == _callbacks.front()._isPrepared);
return NextStep;
return false;
};
if (!_isPrepared)
@@ -217,5 +217,5 @@ QueryCallback::Status QueryCallback::InvokeIfReady()
}
}
return NotReady;
return false;
}

View File

@@ -44,14 +44,8 @@ public:
// Moves std::future from next to this object
void SetNextQuery(QueryCallback&& next);
enum Status
{
NotReady,
NextStep,
Completed
};
Status InvokeIfReady();
// returns true when completed
bool InvokeIfReady();
private:
QueryCallback(QueryCallback const& right) = delete;

View File

@@ -1,48 +0,0 @@
/*
* This file is part of the TrinityCore Project. See AUTHORS file for Copyright information
*
* This program is free software; you can redistribute it and/or modify it
* under the terms of the GNU General Public License as published by the
* Free Software Foundation; either version 2 of the License, or (at your
* option) any later version.
*
* This program is distributed in the hope that it will be useful, but WITHOUT
* ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or
* FITNESS FOR A PARTICULAR PURPOSE. See the GNU General Public License for
* more details.
*
* You should have received a copy of the GNU General Public License along
* with this program. If not, see <http://www.gnu.org/licenses/>.
*/
#include "QueryCallbackProcessor.h"
#include "QueryCallback.h"
#include <algorithm>
QueryCallbackProcessor::QueryCallbackProcessor()
{
}
QueryCallbackProcessor::~QueryCallbackProcessor()
{
}
void QueryCallbackProcessor::AddQuery(QueryCallback&& query)
{
_callbacks.emplace_back(std::move(query));
}
void QueryCallbackProcessor::ProcessReadyQueries()
{
if (_callbacks.empty())
return;
std::vector<QueryCallback> updateCallbacks{ std::move(_callbacks) };
updateCallbacks.erase(std::remove_if(updateCallbacks.begin(), updateCallbacks.end(), [](QueryCallback& callback)
{
return callback.InvokeIfReady() == QueryCallback::Completed;
}), updateCallbacks.end());
_callbacks.insert(_callbacks.end(), std::make_move_iterator(updateCallbacks.begin()), std::make_move_iterator(updateCallbacks.end()));
}

View File

@@ -1,42 +0,0 @@
/*
* This file is part of the TrinityCore Project. See AUTHORS file for Copyright information
*
* This program is free software; you can redistribute it and/or modify it
* under the terms of the GNU General Public License as published by the
* Free Software Foundation; either version 2 of the License, or (at your
* option) any later version.
*
* This program is distributed in the hope that it will be useful, but WITHOUT
* ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or
* FITNESS FOR A PARTICULAR PURPOSE. See the GNU General Public License for
* more details.
*
* You should have received a copy of the GNU General Public License along
* with this program. If not, see <http://www.gnu.org/licenses/>.
*/
#ifndef QueryCallbackProcessor_h__
#define QueryCallbackProcessor_h__
#include "Define.h"
#include <vector>
class QueryCallback;
class TC_DATABASE_API QueryCallbackProcessor
{
public:
QueryCallbackProcessor();
~QueryCallbackProcessor();
void AddQuery(QueryCallback&& query);
void ProcessReadyQueries();
private:
QueryCallbackProcessor(QueryCallbackProcessor const&) = delete;
QueryCallbackProcessor& operator=(QueryCallbackProcessor const&) = delete;
std::vector<QueryCallback> _callbacks;
};
#endif // QueryCallbackProcessor_h__

View File

@@ -65,7 +65,7 @@ void TransactionBase::Cleanup()
bool TransactionTask::Execute()
{
int errorCode = m_conn->ExecuteTransaction(m_trans);
int errorCode = TryExecute();
if (!errorCode)
return true;
@@ -75,12 +75,64 @@ bool TransactionTask::Execute()
std::lock_guard<std::mutex> lock(_deadlockLock);
uint8 loopBreaker = 5; // Handle MySQL Errno 1213 without extending deadlock to the core itself
for (uint8 i = 0; i < loopBreaker; ++i)
if (!m_conn->ExecuteTransaction(m_trans))
if (!TryExecute())
return true;
}
// Clean up now.
m_trans->Cleanup();
CleanupOnFailure();
return false;
}
int TransactionTask::TryExecute()
{
return m_conn->ExecuteTransaction(m_trans);
}
void TransactionTask::CleanupOnFailure()
{
m_trans->Cleanup();
}
bool TransactionWithResultTask::Execute()
{
int errorCode = TryExecute();
if (!errorCode)
{
m_result.set_value(true);
return true;
}
if (errorCode == ER_LOCK_DEADLOCK)
{
// Make sure only 1 async thread retries a transaction so they don't keep dead-locking each other
std::lock_guard<std::mutex> lock(_deadlockLock);
uint8 loopBreaker = 5; // Handle MySQL Errno 1213 without extending deadlock to the core itself
for (uint8 i = 0; i < loopBreaker; ++i)
{
if (!TryExecute())
{
m_result.set_value(true);
return true;
}
}
}
// Clean up now.
CleanupOnFailure();
m_result.set_value(false);
return false;
}
bool TransactionCallback::InvokeIfReady()
{
if (m_future.valid() && m_future.wait_for(std::chrono::seconds(0)) == std::future_status::ready)
{
m_callback(m_future.get());
return true;
}
return false;
}

View File

@@ -22,6 +22,7 @@
#include "DatabaseEnvFwd.h"
#include "SQLOperation.h"
#include "StringFormat.h"
#include <functional>
#include <mutex>
#include <vector>
@@ -72,6 +73,7 @@ class TC_DATABASE_API TransactionTask : public SQLOperation
{
template <class T> friend class DatabaseWorkerPool;
friend class DatabaseWorker;
friend class TransactionCallback;
public:
TransactionTask(std::shared_ptr<TransactionBase> trans) : m_trans(trans) { }
@@ -79,9 +81,43 @@ class TC_DATABASE_API TransactionTask : public SQLOperation
protected:
bool Execute() override;
int TryExecute();
void CleanupOnFailure();
std::shared_ptr<TransactionBase> m_trans;
static std::mutex _deadlockLock;
};
class TC_DATABASE_API TransactionWithResultTask : public TransactionTask
{
public:
TransactionWithResultTask(std::shared_ptr<TransactionBase> trans) : TransactionTask(trans) { }
TransactionFuture GetFuture() { return m_result.get_future(); }
protected:
bool Execute() override;
TransactionPromise m_result;
};
class TC_DATABASE_API TransactionCallback
{
public:
TransactionCallback(TransactionFuture&& future) : m_future(std::move(future)) { }
TransactionCallback(TransactionCallback&&) = default;
TransactionCallback& operator=(TransactionCallback&&) = default;
void AfterComplete(std::function<void(bool)> callback) &
{
m_callback = std::move(callback);
}
bool InvokeIfReady();
TransactionFuture m_future;
std::function<void(bool)> m_callback;
};
#endif

View File

@@ -402,7 +402,7 @@ void WorldSession::HandleCharEnumOpcode(WorldPackets::Character::EnumCharacters&
stmt->setUInt8(0, PET_SAVE_AS_CURRENT);
stmt->setUInt32(1, GetAccountId());
_queryProcessor.AddQuery(CharacterDatabase.AsyncQuery(stmt).WithPreparedCallback(std::bind(&WorldSession::HandleCharEnum, this, std::placeholders::_1)));
_queryProcessor.AddCallback(CharacterDatabase.AsyncQuery(stmt).WithPreparedCallback(std::bind(&WorldSession::HandleCharEnum, this, std::placeholders::_1)));
}
void WorldSession::HandleCharUndeleteEnum(PreparedQueryResult result)
@@ -444,7 +444,7 @@ void WorldSession::HandleCharUndeleteEnumOpcode(WorldPackets::Character::EnumCha
stmt->setUInt8(0, PET_SAVE_AS_CURRENT);
stmt->setUInt32(1, GetAccountId());
_queryProcessor.AddQuery(CharacterDatabase.AsyncQuery(stmt).WithPreparedCallback(std::bind(&WorldSession::HandleCharUndeleteEnum, this, std::placeholders::_1)));
_queryProcessor.AddCallback(CharacterDatabase.AsyncQuery(stmt).WithPreparedCallback(std::bind(&WorldSession::HandleCharUndeleteEnum, this, std::placeholders::_1)));
}
void WorldSession::HandleCharCreateOpcode(WorldPackets::Character::CreateCharacter& charCreate)
@@ -582,7 +582,7 @@ void WorldSession::HandleCharCreateOpcode(WorldPackets::Character::CreateCharact
CharacterDatabasePreparedStatement* stmt = CharacterDatabase.GetPreparedStatement(CHAR_SEL_CHECK_NAME);
stmt->setString(0, charCreate.CreateInfo->Name);
_queryProcessor.AddQuery(CharacterDatabase.AsyncQuery(stmt)
_queryProcessor.AddCallback(CharacterDatabase.AsyncQuery(stmt)
.WithChainingPreparedCallback([this](QueryCallback& queryCallback, PreparedQueryResult result)
{
if (result)
@@ -1305,7 +1305,7 @@ void WorldSession::HandleCharRenameOpcode(WorldPackets::Character::CharacterRena
stmt->setUInt64(0, request.RenameInfo->Guid.GetCounter());
stmt->setString(1, request.RenameInfo->NewName);
_queryProcessor.AddQuery(CharacterDatabase.AsyncQuery(stmt)
_queryProcessor.AddCallback(CharacterDatabase.AsyncQuery(stmt)
.WithPreparedCallback(std::bind(&WorldSession::HandleCharRenameCallBack, this, request.RenameInfo, std::placeholders::_1)));
}
@@ -1514,7 +1514,7 @@ void WorldSession::HandleCharCustomizeOpcode(WorldPackets::Character::CharCustom
CharacterDatabasePreparedStatement* stmt = CharacterDatabase.GetPreparedStatement(CHAR_SEL_CHAR_CUSTOMIZE_INFO);
stmt->setUInt64(0, packet.CustomizeInfo->CharGUID.GetCounter());
_queryProcessor.AddQuery(CharacterDatabase.AsyncQuery(stmt)
_queryProcessor.AddCallback(CharacterDatabase.AsyncQuery(stmt)
.WithPreparedCallback(std::bind(&WorldSession::HandleCharCustomizeCallback, this, packet.CustomizeInfo, std::placeholders::_1)));
}
@@ -1799,7 +1799,7 @@ void WorldSession::HandleCharRaceOrFactionChangeOpcode(WorldPackets::Character::
CharacterDatabasePreparedStatement* stmt = CharacterDatabase.GetPreparedStatement(CHAR_SEL_CHAR_RACE_OR_FACTION_CHANGE_INFOS);
stmt->setUInt64(0, packet.RaceOrFactionChangeInfo->Guid.GetCounter());
_queryProcessor.AddQuery(CharacterDatabase.AsyncQuery(stmt)
_queryProcessor.AddCallback(CharacterDatabase.AsyncQuery(stmt)
.WithPreparedCallback(std::bind(&WorldSession::HandleCharRaceOrFactionChangeCallback, this, packet.RaceOrFactionChangeInfo, std::placeholders::_1)));
}
@@ -2385,7 +2385,7 @@ void WorldSession::HandleGetUndeleteCooldownStatus(WorldPackets::Character::GetU
LoginDatabasePreparedStatement* stmt = LoginDatabase.GetPreparedStatement(LOGIN_SEL_LAST_CHAR_UNDELETE);
stmt->setUInt32(0, GetBattlenetAccountId());
_queryProcessor.AddQuery(LoginDatabase.AsyncQuery(stmt).WithPreparedCallback(std::bind(&WorldSession::HandleUndeleteCooldownStatusCallback, this, std::placeholders::_1)));
_queryProcessor.AddCallback(LoginDatabase.AsyncQuery(stmt).WithPreparedCallback(std::bind(&WorldSession::HandleUndeleteCooldownStatusCallback, this, std::placeholders::_1)));
}
void WorldSession::HandleUndeleteCooldownStatusCallback(PreparedQueryResult result)
@@ -2415,7 +2415,7 @@ void WorldSession::HandleCharUndeleteOpcode(WorldPackets::Character::UndeleteCha
stmt->setUInt32(0, GetBattlenetAccountId());
std::shared_ptr<WorldPackets::Character::CharacterUndeleteInfo> undeleteInfo = undeleteCharacter.UndeleteInfo;
_queryProcessor.AddQuery(LoginDatabase.AsyncQuery(stmt)
_queryProcessor.AddCallback(LoginDatabase.AsyncQuery(stmt)
.WithChainingPreparedCallback([this, undeleteInfo](QueryCallback& queryCallback, PreparedQueryResult result)
{
if (result)

View File

@@ -375,7 +375,7 @@ void WorldSession::SendStablePet(ObjectGuid guid)
stmt->setUInt8(1, PET_SAVE_FIRST_STABLE_SLOT);
stmt->setUInt8(2, PET_SAVE_LAST_STABLE_SLOT);
_queryProcessor.AddQuery(CharacterDatabase.AsyncQuery(stmt).WithPreparedCallback(std::bind(&WorldSession::SendStablePetCallback, this, guid, std::placeholders::_1)));
_queryProcessor.AddCallback(CharacterDatabase.AsyncQuery(stmt).WithPreparedCallback(std::bind(&WorldSession::SendStablePetCallback, this, guid, std::placeholders::_1)));
}
void WorldSession::SendStablePetCallback(ObjectGuid guid, PreparedQueryResult result)
@@ -474,7 +474,7 @@ void WorldSession::HandleStablePet(WorldPacket& recvData)
stmt->setUInt8(1, PET_SAVE_FIRST_STABLE_SLOT);
stmt->setUInt8(2, PET_SAVE_LAST_STABLE_SLOT);
_queryProcessor.AddQuery(CharacterDatabase.AsyncQuery(stmt).WithPreparedCallback(std::bind(&WorldSession::HandleStablePetCallback, this, std::placeholders::_1)));
_queryProcessor.AddCallback(CharacterDatabase.AsyncQuery(stmt).WithPreparedCallback(std::bind(&WorldSession::HandleStablePetCallback, this, std::placeholders::_1)));
}
void WorldSession::HandleStablePetCallback(PreparedQueryResult result)
@@ -534,7 +534,7 @@ void WorldSession::HandleUnstablePet(WorldPacket& recvData)
stmt->setUInt8(2, PET_SAVE_FIRST_STABLE_SLOT);
stmt->setUInt8(3, PET_SAVE_LAST_STABLE_SLOT);
_queryProcessor.AddQuery(CharacterDatabase.AsyncQuery(stmt).WithPreparedCallback(std::bind(&WorldSession::HandleUnstablePetCallback, this, petnumber, std::placeholders::_1)));
_queryProcessor.AddCallback(CharacterDatabase.AsyncQuery(stmt).WithPreparedCallback(std::bind(&WorldSession::HandleUnstablePetCallback, this, petnumber, std::placeholders::_1)));
}
void WorldSession::HandleUnstablePetCallback(uint32 petId, PreparedQueryResult result)
@@ -658,7 +658,7 @@ void WorldSession::HandleStableSwapPet(WorldPacket& recvData)
stmt->setUInt64(0, _player->GetGUID().GetCounter());
stmt->setUInt32(1, petId);
_queryProcessor.AddQuery(CharacterDatabase.AsyncQuery(stmt).WithPreparedCallback(std::bind(&WorldSession::HandleStableSwapPetCallback, this, petId, std::placeholders::_1)));
_queryProcessor.AddCallback(CharacterDatabase.AsyncQuery(stmt).WithPreparedCallback(std::bind(&WorldSession::HandleStableSwapPetCallback, this, petId, std::placeholders::_1)));
}
void WorldSession::HandleStableSwapPetCallback(uint32 petId, PreparedQueryResult result)

View File

@@ -193,7 +193,7 @@ void WorldSession::HandleOpenItemOpcode(WorldPackets::Spells::OpenItem& packet)
{
CharacterDatabasePreparedStatement* stmt = CharacterDatabase.GetPreparedStatement(CHAR_SEL_CHARACTER_GIFT_BY_ITEM);
stmt->setUInt64(0, item->GetGUID().GetCounter());
_queryProcessor.AddQuery(CharacterDatabase.AsyncQuery(stmt)
_queryProcessor.AddCallback(CharacterDatabase.AsyncQuery(stmt)
.WithPreparedCallback(std::bind(&WorldSession::HandleOpenWrappedItemCallback, this, item->GetPos(), item->GetGUID(), std::placeholders::_1)));
}
else

View File

@@ -234,7 +234,7 @@ void WorldSession::SendPacket(WorldPacket const* packet, bool forced /*= false*/
{
if (packet->GetConnection() != CONNECTION_TYPE_INSTANCE && IsInstanceOnlyOpcode(packet->GetOpcode()))
{
TC_LOG_ERROR("network.opcode", "Prevented sending of instance only opcode %u with connection type %u to %s", packet->GetOpcode(), packet->GetConnection(), GetPlayerInfo().c_str());
TC_LOG_ERROR("network.opcode", "Prevented sending of instance only opcode %u with connection type %u to %s", packet->GetOpcode(), uint32(packet->GetConnection()), GetPlayerInfo().c_str());
return;
}
@@ -243,7 +243,7 @@ void WorldSession::SendPacket(WorldPacket const* packet, bool forced /*= false*/
if (!m_Socket[conIdx])
{
TC_LOG_ERROR("network.opcode", "Prevented sending of %s to non existent socket %u to %s", GetOpcodeNameForLogging(static_cast<OpcodeServer>(packet->GetOpcode())).c_str(), conIdx, GetPlayerInfo().c_str());
TC_LOG_ERROR("network.opcode", "Prevented sending of %s to non existent socket %u to %s", GetOpcodeNameForLogging(static_cast<OpcodeServer>(packet->GetOpcode())).c_str(), uint32(conIdx), GetPlayerInfo().c_str());
return;
}
@@ -883,7 +883,8 @@ void WorldSession::SetPlayer(Player* player)
void WorldSession::ProcessQueryCallbacks()
{
_queryProcessor.ProcessReadyQueries();
_queryProcessor.ProcessReadyCallbacks();
_transactionCallbacks.ProcessReadyCallbacks();
if (_realmAccountLoginCallback.valid() && _realmAccountLoginCallback.wait_for(std::chrono::seconds(0)) == std::future_status::ready &&
_accountLoginCallback.valid() && _accountLoginCallback.wait_for(std::chrono::seconds(0)) == std::future_status::ready)
@@ -895,6 +896,11 @@ void WorldSession::ProcessQueryCallbacks()
HandlePlayerLogin(reinterpret_cast<LoginQueryHolder*>(_charLoginCallback.get()));
}
TransactionCallback& WorldSession::AddTransactionCallback(TransactionCallback&& callback)
{
return _transactionCallbacks.AddCallback(std::move(callback));
}
void WorldSession::InitWarden(BigNumber* k)
{
if (_os == "Win")

View File

@@ -23,11 +23,11 @@
#define __WORLDSESSION_H
#include "Common.h"
#include "AsyncCallbackProcessor.h"
#include "DatabaseEnvFwd.h"
#include "LockedQueue.h"
#include "ObjectGuid.h"
#include "Packet.h"
#include "QueryCallbackProcessor.h"
#include "SharedDefines.h"
#include <array>
#include <unordered_map>
@@ -1746,6 +1746,7 @@ class TC_GAME_API WorldSession
public:
QueryCallbackProcessor& GetQueryProcessor() { return _queryProcessor; }
TransactionCallback& AddTransactionCallback(TransactionCallback&& callback);
private:
void ProcessQueryCallbacks();
@@ -1755,6 +1756,7 @@ class TC_GAME_API WorldSession
QueryResultHolderFuture _charLoginCallback;
QueryCallbackProcessor _queryProcessor;
AsyncCallbackProcessor<TransactionCallback> _transactionCallbacks;
friend class World;
protected:

View File

@@ -92,7 +92,7 @@ void WorldSocket::Start()
LoginDatabasePreparedStatement* stmt = LoginDatabase.GetPreparedStatement(LOGIN_SEL_IP_INFO);
stmt->setString(0, ip_address);
_queryProcessor.AddQuery(LoginDatabase.AsyncQuery(stmt).WithPreparedCallback(std::bind(&WorldSocket::CheckIpCallback, this, std::placeholders::_1)));
_queryProcessor.AddCallback(LoginDatabase.AsyncQuery(stmt).WithPreparedCallback(std::bind(&WorldSocket::CheckIpCallback, this, std::placeholders::_1)));
}
void WorldSocket::CheckIpCallback(PreparedQueryResult result)
@@ -230,7 +230,7 @@ bool WorldSocket::Update()
if (!BaseSocket::Update())
return false;
_queryProcessor.ProcessReadyQueries();
_queryProcessor.ProcessReadyCallbacks();
return true;
}
@@ -653,7 +653,7 @@ void WorldSocket::HandleAuthSession(std::shared_ptr<WorldPackets::Auth::AuthSess
stmt->setInt32(0, int32(realm.Id.Realm));
stmt->setString(1, authSession->RealmJoinTicket);
_queryProcessor.AddQuery(LoginDatabase.AsyncQuery(stmt).WithPreparedCallback(std::bind(&WorldSocket::HandleAuthSessionCallback, this, authSession, std::placeholders::_1)));
_queryProcessor.AddCallback(LoginDatabase.AsyncQuery(stmt).WithPreparedCallback(std::bind(&WorldSocket::HandleAuthSessionCallback, this, authSession, std::placeholders::_1)));
}
void WorldSocket::HandleAuthSessionCallback(std::shared_ptr<WorldPackets::Auth::AuthSession> authSession, PreparedQueryResult result)
@@ -852,7 +852,7 @@ void WorldSocket::HandleAuthSessionCallback(std::shared_ptr<WorldPackets::Auth::
if (wardenActive)
_worldSession->InitWarden(&_sessionKey);
_queryProcessor.AddQuery(_worldSession->LoadPermissionsAsync().WithPreparedCallback(std::bind(&WorldSocket::LoadSessionPermissionsCallback, this, std::placeholders::_1)));
_queryProcessor.AddCallback(_worldSession->LoadPermissionsAsync().WithPreparedCallback(std::bind(&WorldSocket::LoadSessionPermissionsCallback, this, std::placeholders::_1)));
AsyncRead();
}
@@ -881,7 +881,7 @@ void WorldSocket::HandleAuthContinuedSession(std::shared_ptr<WorldPackets::Auth:
LoginDatabasePreparedStatement* stmt = LoginDatabase.GetPreparedStatement(LOGIN_SEL_ACCOUNT_INFO_CONTINUED_SESSION);
stmt->setUInt32(0, accountId);
_queryProcessor.AddQuery(LoginDatabase.AsyncQuery(stmt).WithPreparedCallback(std::bind(&WorldSocket::HandleAuthContinuedSessionCallback, this, authSession, std::placeholders::_1)));
_queryProcessor.AddCallback(LoginDatabase.AsyncQuery(stmt).WithPreparedCallback(std::bind(&WorldSocket::HandleAuthContinuedSessionCallback, this, authSession, std::placeholders::_1)));
}
void WorldSocket::HandleAuthContinuedSessionCallback(std::shared_ptr<WorldPackets::Auth::AuthContinuedSession> authSession, PreparedQueryResult result)

View File

@@ -19,10 +19,10 @@
#define __WORLDSOCKET_H__
#include "Common.h"
#include "AsyncCallbackProcessor.h"
#include "BigNumber.h"
#include "DatabaseEnvFwd.h"
#include "MessageBuffer.h"
#include "QueryCallbackProcessor.h"
#include "Socket.h"
#include "WorldPacketCrypt.h"
#include "MPSCQueue.h"

View File

@@ -3087,7 +3087,7 @@ void World::UpdateRealmCharCount(uint32 accountId)
{
CharacterDatabasePreparedStatement* stmt = CharacterDatabase.GetPreparedStatement(CHAR_SEL_CHARACTER_COUNT);
stmt->setUInt32(0, accountId);
_queryProcessor.AddQuery(CharacterDatabase.AsyncQuery(stmt).WithPreparedCallback(std::bind(&World::_UpdateRealmCharCount, this, std::placeholders::_1)));
_queryProcessor.AddCallback(CharacterDatabase.AsyncQuery(stmt).WithPreparedCallback(std::bind(&World::_UpdateRealmCharCount, this, std::placeholders::_1)));
}
void World::_UpdateRealmCharCount(PreparedQueryResult resultCharCount)
@@ -3508,7 +3508,7 @@ uint32 World::getWorldState(uint32 index) const
void World::ProcessQueryCallbacks()
{
_queryProcessor.ProcessReadyQueries();
_queryProcessor.ProcessReadyCallbacks();
}
void World::ReloadRBAC()

View File

@@ -23,10 +23,10 @@
#define __WORLD_H
#include "Common.h"
#include "AsyncCallbackProcessor.h"
#include "DatabaseEnvFwd.h"
#include "LockedQueue.h"
#include "ObjectGuid.h"
#include "QueryCallbackProcessor.h"
#include "SharedDefines.h"
#include "Timer.h"