Core/DBLayer: Implement async transaction completion callbacks

This commit is contained in:
Shauren
2020-04-15 21:41:14 +02:00
committed by Ovahlord
parent 882c275e65
commit ca881f7e3f
22 changed files with 246 additions and 141 deletions

View File

@@ -24,9 +24,9 @@
class Field;
class ResultSet;
typedef std::shared_ptr<ResultSet> QueryResult;
typedef std::future<QueryResult> QueryResultFuture;
typedef std::promise<QueryResult> QueryResultPromise;
using QueryResult = std::shared_ptr<ResultSet>;
using QueryResultFuture = std::future<QueryResult>;
using QueryResultPromise = std::promise<QueryResult>;
class CharacterDatabaseConnection;
class HotfixDatabaseConnection;
@@ -44,17 +44,27 @@ using LoginDatabasePreparedStatement = PreparedStatement<LoginDatabaseConnection
using WorldDatabasePreparedStatement = PreparedStatement<WorldDatabaseConnection>;
class PreparedResultSet;
typedef std::shared_ptr<PreparedResultSet> PreparedQueryResult;
typedef std::future<PreparedQueryResult> PreparedQueryResultFuture;
typedef std::promise<PreparedQueryResult> PreparedQueryResultPromise;
using PreparedQueryResult = std::shared_ptr<PreparedResultSet>;
using PreparedQueryResultFuture = std::future<PreparedQueryResult>;
using PreparedQueryResultPromise = std::promise<PreparedQueryResult>;
class QueryCallback;
template<typename T>
class AsyncCallbackProcessor;
using QueryCallbackProcessor = AsyncCallbackProcessor<QueryCallback>;
class TransactionBase;
using TransactionFuture = std::future<bool>;
using TransactionPromise = std::promise<bool>;
template<typename T>
class Transaction;
class TransactionCallback;
template<typename T>
using SQLTransaction = std::shared_ptr<Transaction<T>>;
@@ -64,8 +74,8 @@ using LoginDatabaseTransaction = SQLTransaction<LoginDatabaseConnection>;
using WorldDatabaseTransaction = SQLTransaction<WorldDatabaseConnection>;
class SQLQueryHolderBase;
typedef std::future<SQLQueryHolderBase*> QueryResultHolderFuture;
typedef std::promise<SQLQueryHolderBase*> QueryResultHolderPromise;
using QueryResultHolderFuture = std::future<SQLQueryHolderBase*>;
using QueryResultHolderPromise = std::promise<SQLQueryHolderBase*>;
template<typename T>
class SQLQueryHolder;

View File

@@ -261,6 +261,32 @@ void DatabaseWorkerPool<T>::CommitTransaction(SQLTransaction<T> transaction)
Enqueue(new TransactionTask(transaction));
}
template <class T>
TransactionCallback DatabaseWorkerPool<T>::AsyncCommitTransaction(SQLTransaction<T> transaction)
{
#ifdef TRINITY_DEBUG
//! Only analyze transaction weaknesses in Debug mode.
//! Ideally we catch the faults in Debug mode and then correct them,
//! so there's no need to waste these CPU cycles in Release mode.
switch (transaction->GetSize())
{
case 0:
TC_LOG_DEBUG("sql.driver", "Transaction contains 0 queries. Not executing.");
return;
case 1:
TC_LOG_DEBUG("sql.driver", "Warning: Transaction only holds 1 query, consider removing Transaction context in code.");
break;
default:
break;
}
#endif // TRINITY_DEBUG
TransactionWithResultTask* task = new TransactionWithResultTask(transaction);
TransactionFuture result = task->GetFuture();
Enqueue(task);
return TransactionCallback(std::move(result));
}
template <class T>
void DatabaseWorkerPool<T>::DirectCommitTransaction(SQLTransaction<T>& transaction)
{

View File

@@ -173,6 +173,10 @@ class DatabaseWorkerPool
//! were appended to the transaction will be respected during execution.
void CommitTransaction(SQLTransaction<T> transaction);
//! Enqueues a collection of one-way SQL operations (can be both adhoc and prepared). The order in which these operations
//! were appended to the transaction will be respected during execution.
TransactionCallback AsyncCommitTransaction(SQLTransaction<T> transaction);
//! Directly executes a collection of one-way SQL operations (can be both adhoc and prepared). The order in which these operations
//! were appended to the transaction will be respected during execution.
void DirectCommitTransaction(SQLTransaction<T>& transaction);

View File

@@ -175,7 +175,7 @@ void QueryCallback::SetNextQuery(QueryCallback&& next)
MoveFrom(this, std::move(next));
}
QueryCallback::Status QueryCallback::InvokeIfReady()
bool QueryCallback::InvokeIfReady()
{
QueryCallbackData& callback = _callbacks.front();
auto checkStateAndReturnCompletion = [this]()
@@ -185,15 +185,15 @@ QueryCallback::Status QueryCallback::InvokeIfReady()
if (_callbacks.empty())
{
ASSERT(!hasNext);
return Completed;
return true;
}
// abort chain
if (!hasNext)
return Completed;
return true;
ASSERT(_isPrepared == _callbacks.front()._isPrepared);
return NextStep;
return false;
};
if (!_isPrepared)
@@ -217,5 +217,5 @@ QueryCallback::Status QueryCallback::InvokeIfReady()
}
}
return NotReady;
return false;
}

View File

@@ -44,14 +44,8 @@ public:
// Moves std::future from next to this object
void SetNextQuery(QueryCallback&& next);
enum Status
{
NotReady,
NextStep,
Completed
};
Status InvokeIfReady();
// returns true when completed
bool InvokeIfReady();
private:
QueryCallback(QueryCallback const& right) = delete;

View File

@@ -1,48 +0,0 @@
/*
* This file is part of the TrinityCore Project. See AUTHORS file for Copyright information
*
* This program is free software; you can redistribute it and/or modify it
* under the terms of the GNU General Public License as published by the
* Free Software Foundation; either version 2 of the License, or (at your
* option) any later version.
*
* This program is distributed in the hope that it will be useful, but WITHOUT
* ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or
* FITNESS FOR A PARTICULAR PURPOSE. See the GNU General Public License for
* more details.
*
* You should have received a copy of the GNU General Public License along
* with this program. If not, see <http://www.gnu.org/licenses/>.
*/
#include "QueryCallbackProcessor.h"
#include "QueryCallback.h"
#include <algorithm>
QueryCallbackProcessor::QueryCallbackProcessor()
{
}
QueryCallbackProcessor::~QueryCallbackProcessor()
{
}
void QueryCallbackProcessor::AddQuery(QueryCallback&& query)
{
_callbacks.emplace_back(std::move(query));
}
void QueryCallbackProcessor::ProcessReadyQueries()
{
if (_callbacks.empty())
return;
std::vector<QueryCallback> updateCallbacks{ std::move(_callbacks) };
updateCallbacks.erase(std::remove_if(updateCallbacks.begin(), updateCallbacks.end(), [](QueryCallback& callback)
{
return callback.InvokeIfReady() == QueryCallback::Completed;
}), updateCallbacks.end());
_callbacks.insert(_callbacks.end(), std::make_move_iterator(updateCallbacks.begin()), std::make_move_iterator(updateCallbacks.end()));
}

View File

@@ -1,42 +0,0 @@
/*
* This file is part of the TrinityCore Project. See AUTHORS file for Copyright information
*
* This program is free software; you can redistribute it and/or modify it
* under the terms of the GNU General Public License as published by the
* Free Software Foundation; either version 2 of the License, or (at your
* option) any later version.
*
* This program is distributed in the hope that it will be useful, but WITHOUT
* ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or
* FITNESS FOR A PARTICULAR PURPOSE. See the GNU General Public License for
* more details.
*
* You should have received a copy of the GNU General Public License along
* with this program. If not, see <http://www.gnu.org/licenses/>.
*/
#ifndef QueryCallbackProcessor_h__
#define QueryCallbackProcessor_h__
#include "Define.h"
#include <vector>
class QueryCallback;
class TC_DATABASE_API QueryCallbackProcessor
{
public:
QueryCallbackProcessor();
~QueryCallbackProcessor();
void AddQuery(QueryCallback&& query);
void ProcessReadyQueries();
private:
QueryCallbackProcessor(QueryCallbackProcessor const&) = delete;
QueryCallbackProcessor& operator=(QueryCallbackProcessor const&) = delete;
std::vector<QueryCallback> _callbacks;
};
#endif // QueryCallbackProcessor_h__

View File

@@ -70,7 +70,7 @@ void TransactionBase::Cleanup()
bool TransactionTask::Execute()
{
int errorCode = m_conn->ExecuteTransaction(m_trans);
int errorCode = TryExecute();
if (!errorCode)
return true;
@@ -85,7 +85,7 @@ bool TransactionTask::Execute()
for (uint32 loopDuration = 0, startMSTime = getMSTime(); loopDuration <= DEADLOCK_MAX_RETRY_TIME_MS; loopDuration = GetMSTimeDiffToNow(startMSTime))
{
if (!m_conn->ExecuteTransaction(m_trans))
if (!TryExecute())
return true;
TC_LOG_WARN("sql.sql", "Deadlocked SQL Transaction, retrying. Loop timer: %u ms, Thread Id: %s", loopDuration, threadId.c_str());
@@ -95,7 +95,60 @@ bool TransactionTask::Execute()
}
// Clean up now.
m_trans->Cleanup();
CleanupOnFailure();
return false;
}
int TransactionTask::TryExecute()
{
return m_conn->ExecuteTransaction(m_trans);
}
void TransactionTask::CleanupOnFailure()
{
m_trans->Cleanup();
}
bool TransactionWithResultTask::Execute()
{
int errorCode = TryExecute();
if (!errorCode)
{
m_result.set_value(true);
return true;
}
if (errorCode == ER_LOCK_DEADLOCK)
{
// Make sure only 1 async thread retries a transaction so they don't keep dead-locking each other
std::lock_guard<std::mutex> lock(_deadlockLock);
uint8 loopBreaker = 5; // Handle MySQL Errno 1213 without extending deadlock to the core itself
for (uint8 i = 0; i < loopBreaker; ++i)
{
if (!TryExecute())
{
m_result.set_value(true);
return true;
}
}
}
// Clean up now.
CleanupOnFailure();
m_result.set_value(false);
return false;
}
bool TransactionCallback::InvokeIfReady()
{
if (m_future.valid() && m_future.wait_for(std::chrono::seconds(0)) == std::future_status::ready)
{
m_callback(m_future.get());
return true;
}
return false;
}

View File

@@ -22,6 +22,7 @@
#include "DatabaseEnvFwd.h"
#include "SQLOperation.h"
#include "StringFormat.h"
#include <functional>
#include <mutex>
#include <vector>
@@ -73,6 +74,7 @@ class TC_DATABASE_API TransactionTask : public SQLOperation
{
template <class T> friend class DatabaseWorkerPool;
friend class DatabaseWorker;
friend class TransactionCallback;
public:
TransactionTask(std::shared_ptr<TransactionBase> trans) : m_trans(trans) { }
@@ -80,9 +82,43 @@ class TC_DATABASE_API TransactionTask : public SQLOperation
protected:
bool Execute() override;
int TryExecute();
void CleanupOnFailure();
std::shared_ptr<TransactionBase> m_trans;
static std::mutex _deadlockLock;
};
class TC_DATABASE_API TransactionWithResultTask : public TransactionTask
{
public:
TransactionWithResultTask(std::shared_ptr<TransactionBase> trans) : TransactionTask(trans) { }
TransactionFuture GetFuture() { return m_result.get_future(); }
protected:
bool Execute() override;
TransactionPromise m_result;
};
class TC_DATABASE_API TransactionCallback
{
public:
TransactionCallback(TransactionFuture&& future) : m_future(std::move(future)) { }
TransactionCallback(TransactionCallback&&) = default;
TransactionCallback& operator=(TransactionCallback&&) = default;
void AfterComplete(std::function<void(bool)> callback) &
{
m_callback = std::move(callback);
}
bool InvokeIfReady();
TransactionFuture m_future;
std::function<void(bool)> m_callback;
};
#endif