diff options
author | Shauren <shauren.trinity@gmail.com> | 2017-01-13 21:38:03 +0100 |
---|---|---|
committer | Shauren <shauren.trinity@gmail.com> | 2017-01-13 21:38:03 +0100 |
commit | 8e2634b2b49eb814b8cc425a060b2f160dbb49b7 (patch) | |
tree | 9fc926ba9f77539c3eb847e37f3f9d7061f9d47a /src/server/database/Database | |
parent | 0f432edc4b48a8692e41fc30aef7751d295a7176 (diff) |
Core/DBLayer: Convert async queries to new query callbacks and remove old callback handling
Diffstat (limited to 'src/server/database/Database')
-rw-r--r-- | src/server/database/Database/DatabaseWorkerPool.cpp | 9 | ||||
-rw-r--r-- | src/server/database/Database/DatabaseWorkerPool.h | 9 | ||||
-rw-r--r-- | src/server/database/Database/QueryCallback.cpp | 173 | ||||
-rw-r--r-- | src/server/database/Database/QueryCallback.h | 218 | ||||
-rw-r--r-- | src/server/database/Database/QueryCallbackProcessor.cpp | 15 | ||||
-rw-r--r-- | src/server/database/Database/QueryCallbackProcessor.h | 6 |
6 files changed, 162 insertions, 268 deletions
diff --git a/src/server/database/Database/DatabaseWorkerPool.cpp b/src/server/database/Database/DatabaseWorkerPool.cpp index 0a3865009b0..e680ed834a3 100644 --- a/src/server/database/Database/DatabaseWorkerPool.cpp +++ b/src/server/database/Database/DatabaseWorkerPool.cpp @@ -17,6 +17,7 @@ #include "DatabaseWorkerPool.h" #include "DatabaseEnv.h" +#include "QueryCallback.h" #define MIN_MYSQL_SERVER_VERSION 50100u #define MIN_MYSQL_CLIENT_VERSION 50100u @@ -146,23 +147,23 @@ PreparedQueryResult DatabaseWorkerPool<T>::Query(PreparedStatement* stmt) } template <class T> -QueryResultFuture DatabaseWorkerPool<T>::AsyncQuery(const char* sql) +QueryCallback DatabaseWorkerPool<T>::AsyncQuery(const char* sql) { BasicStatementTask* task = new BasicStatementTask(sql, true); // Store future result before enqueueing - task might get already processed and deleted before returning from this method QueryResultFuture result = task->GetFuture(); Enqueue(task); - return result; + return QueryCallback(std::move(result)); } template <class T> -PreparedQueryResultFuture DatabaseWorkerPool<T>::AsyncQuery(PreparedStatement* stmt) +QueryCallback DatabaseWorkerPool<T>::AsyncQuery(PreparedStatement* stmt) { PreparedStatementTask* task = new PreparedStatementTask(stmt, true); // Store future result before enqueueing - task might get already processed and deleted before returning from this method PreparedQueryResultFuture result = task->GetFuture(); Enqueue(task); - return result; + return QueryCallback(std::move(result)); } template <class T> diff --git a/src/server/database/Database/DatabaseWorkerPool.h b/src/server/database/Database/DatabaseWorkerPool.h index a082a9b6b0e..b195d2d6a28 100644 --- a/src/server/database/Database/DatabaseWorkerPool.h +++ b/src/server/database/Database/DatabaseWorkerPool.h @@ -19,7 +19,6 @@ #define _DATABASEWORKERPOOL_H #include "Common.h" -#include "QueryCallback.h" #include "MySQLConnection.h" #include "Transaction.h" #include "DatabaseWorker.h" @@ -34,6 +33,8 @@ #include <memory> #include <array> +class QueryCallback; + class PingOperation : public SQLOperation { //! Operation for idle delaythreads @@ -192,12 +193,12 @@ class DatabaseWorkerPool //! Enqueues a query in string format that will set the value of the QueryResultFuture return object as soon as the query is executed. //! The return value is then processed in ProcessQueryCallback methods. - QueryResultFuture AsyncQuery(const char* sql); + QueryCallback AsyncQuery(const char* sql); //! Enqueues a query in string format -with variable args- that will set the value of the QueryResultFuture return object as soon as the query is executed. //! The return value is then processed in ProcessQueryCallback methods. template<typename Format, typename... Args> - QueryResultFuture AsyncPQuery(Format&& sql, Args&&... args) + QueryCallback AsyncPQuery(Format&& sql, Args&&... args) { return AsyncQuery(Trinity::StringFormat(std::forward<Format>(sql), std::forward<Args>(args)...).c_str()); } @@ -205,7 +206,7 @@ class DatabaseWorkerPool //! Enqueues a query in prepared format that will set the value of the PreparedQueryResultFuture return object as soon as the query is executed. //! The return value is then processed in ProcessQueryCallback methods. //! Statement must be prepared with CONNECTION_ASYNC flag. - PreparedQueryResultFuture AsyncQuery(PreparedStatement* stmt); + QueryCallback AsyncQuery(PreparedStatement* stmt); //! Enqueues a vector of SQL operations (can be both adhoc and prepared) that will set the value of the QueryResultHolderFuture //! return object as soon as the query is executed. diff --git a/src/server/database/Database/QueryCallback.cpp b/src/server/database/Database/QueryCallback.cpp index f88ad7a6e47..8850f87b20a 100644 --- a/src/server/database/Database/QueryCallback.cpp +++ b/src/server/database/Database/QueryCallback.cpp @@ -18,115 +18,186 @@ #include "QueryCallback.h" template<typename T> +inline void Construct(T& t) +{ + new (&t) T(); +} + +template<typename T> inline void Destroy(T& t) { t.~T(); } template<typename T> -void DestroyActiveMember(T& obj) +void ConstructActiveMember(T* obj) +{ + if (!obj->_isPrepared) + Construct(obj->_string); + else + Construct(obj->_prepared); +} + +template<typename T> +void DestroyActiveMember(T* obj) { - if (!obj._isPrepared) - Destroy(obj._string); + if (!obj->_isPrepared) + Destroy(obj->_string); else - Destroy(obj._prepared); + Destroy(obj->_prepared); } template<typename T> -void MoveFrom(T& to, T&& from) +void MoveFrom(T* to, T&& from) { - DestroyActiveMember(to); - to._isPrepared = from._isPrepared; - if (!to._isPrepared) - to._string = std::move(from._string); + ASSERT(to->_isPrepared == from._isPrepared); + + if (!to->_isPrepared) + to->_string = std::move(from._string); else - to._prepared = std::move(from._prepared); + to->_prepared = std::move(from._prepared); } -struct QueryCallbackNew::QueryCallbackData +struct QueryCallback::QueryCallbackData { public: - friend class QueryCallbackNew; + friend class QueryCallback; - QueryCallbackData(std::function<void(QueryResult)>&& callback) : _string(std::move(callback)), _isPrepared(false) { } - QueryCallbackData(std::function<void(PreparedQueryResult)>&& callback) : _prepared(std::move(callback)), _isPrepared(true) { } - QueryCallbackData(QueryCallbackData&& right) { MoveFrom(*this, std::move(right)); } - QueryCallbackData& operator=(QueryCallbackData&& right) { MoveFrom(*this, std::move(right)); return *this; } - ~QueryCallbackData() { DestroyActiveMember(*this); } + QueryCallbackData(std::function<void(QueryCallback&, QueryResult)>&& callback) : _string(std::move(callback)), _isPrepared(false) { TC_LOG_ERROR(LOGGER_ROOT, "QueryCallbackData [%p]: Constructing from string query", (void*)this); } + QueryCallbackData(std::function<void(QueryCallback&, PreparedQueryResult)>&& callback) : _prepared(std::move(callback)), _isPrepared(true) { TC_LOG_ERROR(LOGGER_ROOT, "QueryCallbackData [%p]: Constructing from prepared query", (void*)this); } + QueryCallbackData(QueryCallbackData&& right) + { + _isPrepared = right._isPrepared; + ConstructActiveMember(this); + MoveFrom(this, std::move(right)); + } + QueryCallbackData& operator=(QueryCallbackData&& right) + { + if (this != &right) + { + if (_isPrepared != right._isPrepared) + { + DestroyActiveMember(this); + _isPrepared = right._isPrepared; + ConstructActiveMember(this); + } + MoveFrom(this, std::move(right)); + } + return *this; + } + ~QueryCallbackData() { DestroyActiveMember(this); } private: QueryCallbackData(QueryCallbackData const&) = delete; QueryCallbackData& operator=(QueryCallbackData const&) = delete; - template<typename T> friend void MoveFrom(T& to, T&& from); - template<typename T> friend void DestroyActiveMember(T& obj); + template<typename T> friend void ConstructActiveMember(T* obj); + template<typename T> friend void DestroyActiveMember(T* obj); + template<typename T> friend void MoveFrom(T* to, T&& from); union { - std::function<void(QueryResult)> _string; - std::function<void(PreparedQueryResult)> _prepared; + std::function<void(QueryCallback&, QueryResult)> _string; + std::function<void(QueryCallback&, PreparedQueryResult)> _prepared; }; bool _isPrepared; }; -QueryCallbackNew::QueryCallbackNew(std::future<QueryResult>&& result) : _string(std::move(result)), _isPrepared(false) +QueryCallback::QueryCallback(std::future<QueryResult>&& result) : _string(std::move(result)), _isPrepared(false) { } -QueryCallbackNew::QueryCallbackNew(std::future<PreparedQueryResult>&& result) : _prepared(std::move(result)), _isPrepared(true) +QueryCallback::QueryCallback(std::future<PreparedQueryResult>&& result) : _prepared(std::move(result)), _isPrepared(true) { } -QueryCallbackNew::QueryCallbackNew(QueryCallbackNew&& right) +QueryCallback::QueryCallback(QueryCallback&& right) { - MoveFrom(*this, std::move(right)); + _isPrepared = right._isPrepared; + ConstructActiveMember(this); + MoveFrom(this, std::move(right)); _callbacks = std::move(right._callbacks); } -QueryCallbackNew& QueryCallbackNew::operator=(QueryCallbackNew&& right) +QueryCallback& QueryCallback::operator=(QueryCallback&& right) { - MoveFrom(*this, std::move(right)); - _callbacks = std::move(right._callbacks); + if (this != &right) + { + if (_isPrepared != right._isPrepared) + { + DestroyActiveMember(this); + _isPrepared = right._isPrepared; + ConstructActiveMember(this); + } + MoveFrom(this, std::move(right)); + _callbacks = std::move(right._callbacks); + } return *this; } -QueryCallbackNew::~QueryCallbackNew() +QueryCallback::~QueryCallback() { - DestroyActiveMember(*this); + DestroyActiveMember(this); } -QueryCallbackNew&& QueryCallbackNew::WithCallback(std::function<void(QueryResult)>&& callback) +QueryCallback&& QueryCallback::WithCallback(std::function<void(QueryResult)>&& callback) +{ + return WithChainingCallback([callback](QueryCallback& /*this*/, QueryResult result) { callback(std::move(result)); }); +} + +QueryCallback&& QueryCallback::WithPreparedCallback(std::function<void(PreparedQueryResult)>&& callback) +{ + return WithChainingPreparedCallback([callback](QueryCallback& /*this*/, PreparedQueryResult result) { callback(std::move(result)); }); +} + +QueryCallback&& QueryCallback::WithChainingCallback(std::function<void(QueryCallback&, QueryResult)>&& callback) { ASSERT(!_callbacks.empty() || !_isPrepared, "Attempted to set callback function for string query on a prepared async query"); _callbacks.emplace(std::move(callback)); return std::move(*this); } -QueryCallbackNew&& QueryCallbackNew::WithPreparedCallback(std::function<void(PreparedQueryResult)>&& callback) +QueryCallback&& QueryCallback::WithChainingPreparedCallback(std::function<void(QueryCallback&, PreparedQueryResult)>&& callback) { ASSERT(!_callbacks.empty() || _isPrepared, "Attempted to set callback function for prepared query on a string async query"); _callbacks.emplace(std::move(callback)); return std::move(*this); } -QueryCallbackNew::Status QueryCallbackNew::InvokeIfReady() +void QueryCallback::SetNextQuery(QueryCallback&& next) +{ + MoveFrom(this, std::move(next)); +} + +QueryCallback::Status QueryCallback::InvokeIfReady() { QueryCallbackData& callback = _callbacks.front(); + auto checkStateAndReturnCompletion = [this]() + { + _callbacks.pop(); + bool hasNext = !_isPrepared ? _string.valid() : _prepared.valid(); + if (_callbacks.empty()) + { + ASSERT(!hasNext); + return Completed; + } + + // abort chain + if (!hasNext) + return Completed; + + ASSERT(_isPrepared == _callbacks.front()._isPrepared); + return NextStep; + }; + if (!_isPrepared) { if (_string.valid() && _string.wait_for(std::chrono::seconds(0)) == std::future_status::ready) { - std::function<void(QueryResult)> cb(std::move(callback._string)); - cb(_string.get()); - _callbacks.pop(); - if (_callbacks.empty()) - { - ASSERT(!_isPrepared ? !_string.valid() : !_prepared.valid()); - return Completed; - } - - ASSERT(_isPrepared == _callbacks.front()._isPrepared); - return NextStep; + std::future<QueryResult> f(std::move(_string)); + std::function<void(QueryCallback&, QueryResult)> cb(std::move(callback._string)); + cb(*this, f.get()); + return checkStateAndReturnCompletion(); } } else @@ -134,17 +205,9 @@ QueryCallbackNew::Status QueryCallbackNew::InvokeIfReady() if (_prepared.valid() && _prepared.wait_for(std::chrono::seconds(0)) == std::future_status::ready) { std::future<PreparedQueryResult> f(std::move(_prepared)); - std::function<void(PreparedQueryResult)> cb(std::move(callback._prepared)); - cb(_prepared.get()); - _callbacks.pop(); - if (_callbacks.empty()) - { - ASSERT(!_isPrepared ? !_string.valid() : !_prepared.valid()); - return Completed; - } - - ASSERT(_isPrepared == _callbacks.front()._isPrepared); - return NextStep; + std::function<void(QueryCallback&, PreparedQueryResult)> cb(std::move(callback._prepared)); + cb(*this, f.get()); + return checkStateAndReturnCompletion(); } } diff --git a/src/server/database/Database/QueryCallback.h b/src/server/database/Database/QueryCallback.h index 6c4a8a81b4e..f9c93000da7 100644 --- a/src/server/database/Database/QueryCallback.h +++ b/src/server/database/Database/QueryCallback.h @@ -18,27 +18,26 @@ #ifndef _QUERY_CALLBACK_H #define _QUERY_CALLBACK_H -#include <future> #include "QueryResult.h" +#include <future> -typedef std::future<QueryResult> QueryResultFuture; -typedef std::promise<QueryResult> QueryResultPromise; -typedef std::future<PreparedQueryResult> PreparedQueryResultFuture; -typedef std::promise<PreparedQueryResult> PreparedQueryResultPromise; - -#define CALLBACK_STAGE_INVALID uint8(-1) - -class TC_DATABASE_API QueryCallbackNew +class TC_DATABASE_API QueryCallback { public: - explicit QueryCallbackNew(std::future<QueryResult>&& result); - explicit QueryCallbackNew(std::future<PreparedQueryResult>&& result); - QueryCallbackNew(QueryCallbackNew&& right); - QueryCallbackNew& operator=(QueryCallbackNew&& right); - ~QueryCallbackNew(); + explicit QueryCallback(std::future<QueryResult>&& result); + explicit QueryCallback(std::future<PreparedQueryResult>&& result); + QueryCallback(QueryCallback&& right); + QueryCallback& operator=(QueryCallback&& right); + ~QueryCallback(); + + QueryCallback&& WithCallback(std::function<void(QueryResult)>&& callback); + QueryCallback&& WithPreparedCallback(std::function<void(PreparedQueryResult)>&& callback); + + QueryCallback&& WithChainingCallback(std::function<void(QueryCallback&, QueryResult)>&& callback); + QueryCallback&& WithChainingPreparedCallback(std::function<void(QueryCallback&, PreparedQueryResult)>&& callback); - QueryCallbackNew&& WithCallback(std::function<void(QueryResult)>&& callback); - QueryCallbackNew&& WithPreparedCallback(std::function<void(PreparedQueryResult)>&& callback); + // Moves std::future from next to this object + void SetNextQuery(QueryCallback&& next); enum Status { @@ -50,11 +49,12 @@ public: Status InvokeIfReady(); private: - QueryCallbackNew(QueryCallbackNew const& right) = delete; - QueryCallbackNew& operator=(QueryCallbackNew const& right) = delete; + QueryCallback(QueryCallback const& right) = delete; + QueryCallback& operator=(QueryCallback const& right) = delete; - template<typename T> friend void MoveFrom(T& to, T&& from); - template<typename T> friend void DestroyActiveMember(T& obj); + template<typename T> friend void ConstructActiveMember(T* obj); + template<typename T> friend void DestroyActiveMember(T* obj); + template<typename T> friend void MoveFrom(T* to, T&& from); union { @@ -67,182 +67,4 @@ private: std::queue<QueryCallbackData, std::list<QueryCallbackData>> _callbacks; }; -template <typename Result, typename ParamType, bool chain = false> -class QueryCallback -{ - public: - QueryCallback() : _param(), _stage(chain ? 0 : CALLBACK_STAGE_INVALID) { } - - //! The parameter of this function should be a resultset returned from either .AsyncQuery or .AsyncPQuery - void SetFutureResult(std::future<Result> value) - { - _result = std::move(value); - } - - std::future<Result>& GetFutureResult() - { - return _result; - } - - bool IsReady() - { - return _result.valid() && _result.wait_for(std::chrono::seconds(0)) == std::future_status::ready; - } - - void GetResult(Result& res) - { - res = _result.get(); - } - - void FreeResult() - { - // Nothing to do here, the constructor of std::future will take care of the cleanup - } - - void SetParam(ParamType value) - { - _param = value; - } - - ParamType GetParam() - { - return _param; - } - - //! Resets the stage of the callback chain - void ResetStage() - { - if (!chain) - return; - - _stage = 0; - } - - //! Advances the callback chain to the next stage, so upper level code can act on its results accordingly - void NextStage() - { - if (!chain) - return; - - ++_stage; - } - - //! Returns the callback stage (or CALLBACK_STAGE_INVALID if invalid) - uint8 GetStage() - { - return _stage; - } - - //! Resets all underlying variables (param, result and stage) - void Reset() - { - SetParam(ParamType()); - FreeResult(); - ResetStage(); - } - - private: - std::future<Result> _result; - ParamType _param; - uint8 _stage; - - QueryCallback(QueryCallback const& right) = delete; - QueryCallback& operator=(QueryCallback const& right) = delete; -}; - -template <typename Result, typename ParamType1, typename ParamType2, bool chain = false> -class QueryCallback_2 -{ - public: - QueryCallback_2() : _stage(chain ? 0 : CALLBACK_STAGE_INVALID) { } - - //! The parameter of this function should be a resultset returned from either .AsyncQuery or .AsyncPQuery - void SetFutureResult(std::future<Result> value) - { - _result = std::move(value); - } - - std::future<Result>& GetFutureResult() - { - return _result; - } - - bool IsReady() - { - return _result.valid() && _result.wait_for(std::chrono::seconds(0)) == std::future_status::ready; - } - - void GetResult(Result& res) - { - res = _result.get(); - } - - void FreeResult() - { - // Nothing to do here, the constructor of std::future will take care of the cleanup - } - - void SetFirstParam(ParamType1 value) - { - _param_1 = value; - } - - void SetSecondParam(ParamType2 value) - { - _param_2 = value; - } - - ParamType1 GetFirstParam() - { - return _param_1; - } - - ParamType2 GetSecondParam() - { - return _param_2; - } - - //! Resets the stage of the callback chain - void ResetStage() - { - if (!chain) - return; - - _stage = 0; - } - - //! Advances the callback chain to the next stage, so upper level code can act on its results accordingly - void NextStage() - { - if (!chain) - return; - - ++_stage; - } - - //! Returns the callback stage (or CALLBACK_STAGE_INVALID if invalid) - uint8 GetStage() - { - return _stage; - } - - //! Resets all underlying variables (param, result and stage) - void Reset() - { - SetFirstParam(NULL); - SetSecondParam(NULL); - FreeResult(); - ResetStage(); - } - - private: - std::future<Result> _result; - ParamType1 _param_1; - ParamType2 _param_2; - uint8 _stage; - - QueryCallback_2(QueryCallback_2 const& right) = delete; - QueryCallback_2& operator=(QueryCallback_2 const& right) = delete; -}; - #endif // _QUERY_CALLBACK_H diff --git a/src/server/database/Database/QueryCallbackProcessor.cpp b/src/server/database/Database/QueryCallbackProcessor.cpp index 0eaa1c2352c..9f8b8fc4ad9 100644 --- a/src/server/database/Database/QueryCallbackProcessor.cpp +++ b/src/server/database/Database/QueryCallbackProcessor.cpp @@ -27,15 +27,22 @@ QueryCallbackProcessor::~QueryCallbackProcessor() { } -void QueryCallbackProcessor::AddQuery(QueryCallbackNew&& query) +void QueryCallbackProcessor::AddQuery(QueryCallback&& query) { _callbacks.emplace_back(std::move(query)); } void QueryCallbackProcessor::ProcessReadyQueries() { - _callbacks.erase(std::remove_if(_callbacks.begin(), _callbacks.end(), [](QueryCallbackNew& callback) + if (_callbacks.empty()) + return; + + std::vector<QueryCallback> updateCallbacks{ std::move(_callbacks) }; + + updateCallbacks.erase(std::remove_if(updateCallbacks.begin(), updateCallbacks.end(), [](QueryCallback& callback) { - return callback.InvokeIfReady() == QueryCallbackNew::Completed; - }), _callbacks.end()); + return callback.InvokeIfReady() == QueryCallback::Completed; + }), updateCallbacks.end()); + + _callbacks.insert(_callbacks.end(), std::make_move_iterator(updateCallbacks.begin()), std::make_move_iterator(updateCallbacks.end())); } diff --git a/src/server/database/Database/QueryCallbackProcessor.h b/src/server/database/Database/QueryCallbackProcessor.h index 08ae3d15693..0b06fd8fc27 100644 --- a/src/server/database/Database/QueryCallbackProcessor.h +++ b/src/server/database/Database/QueryCallbackProcessor.h @@ -21,7 +21,7 @@ #include "Define.h" #include <vector> -class QueryCallbackNew; +class QueryCallback; class TC_DATABASE_API QueryCallbackProcessor { @@ -29,14 +29,14 @@ public: QueryCallbackProcessor(); ~QueryCallbackProcessor(); - void AddQuery(QueryCallbackNew&& query); + void AddQuery(QueryCallback&& query); void ProcessReadyQueries(); private: QueryCallbackProcessor(QueryCallbackProcessor const&) = delete; QueryCallbackProcessor& operator=(QueryCallbackProcessor const&) = delete; - std::vector<QueryCallbackNew> _callbacks; + std::vector<QueryCallback> _callbacks; }; #endif // QueryCallbackProcessor_h__ |