diff options
| -rwxr-xr-x | src/server/shared/Database/DatabaseWorkerPool.h | 117 | ||||
| -rwxr-xr-x | src/server/shared/Database/PreparedStatement.cpp | 23 | ||||
| -rwxr-xr-x | src/server/shared/Database/PreparedStatement.h | 6 | ||||
| -rwxr-xr-x | src/server/shared/Database/Transaction.h | 2 | ||||
| -rwxr-xr-x | src/server/shared/Threading/Callback.h | 1 |
5 files changed, 120 insertions, 29 deletions
diff --git a/src/server/shared/Database/DatabaseWorkerPool.h b/src/server/shared/Database/DatabaseWorkerPool.h index af33bfbaa4b..0ab86708fb8 100755 --- a/src/server/shared/Database/DatabaseWorkerPool.h +++ b/src/server/shared/Database/DatabaseWorkerPool.h @@ -51,6 +51,7 @@ template <class T> class DatabaseWorkerPool { public: + /* Activity state */ DatabaseWorkerPool() : m_queue(new ACE_Activation_Queue(new ACE_Message_Queue<ACE_MT_SYNCH>)) { @@ -132,6 +133,11 @@ class DatabaseWorkerPool sLog.outSQLDriver("All connections on databasepool %s closed.", m_connectionInfo.database.c_str()); } + /** + Delayed one-way statement methods. + */ + + //! Enqueues a one-way SQL operation in string format that will be executed asynchronously. void Execute(const char* sql) { if (!sql) @@ -141,6 +147,7 @@ class DatabaseWorkerPool Enqueue(task); } + //! Enqueues a one-way SQL operation in string format -with variable args- that will be executed asynchronously. void PExecute(const char* sql, ...) { if (!sql) @@ -155,6 +162,18 @@ class DatabaseWorkerPool Execute(szQuery); } + //! Enqueues a one-way SQL operation in prepared statement format that will be executed asynchronously. + void Execute(PreparedStatement* stmt) + { + PreparedStatementTask* task = new PreparedStatementTask(stmt); + Enqueue(task); + } + + /** + Direct syncrhonous one-way statement methods. + */ + + //! Directly executes a one-way SQL operation in string format, that will block the calling thread until finished. void DirectExecute(const char* sql) { if (!sql) @@ -165,6 +184,7 @@ class DatabaseWorkerPool t->Unlock(); } + //! Directly executes a one-way SQL operation in string format -with variable args-, that will block the calling thread until finished. void DirectPExecute(const char* sql, ...) { if (!sql) @@ -179,6 +199,20 @@ class DatabaseWorkerPool return DirectExecute(szQuery); } + //! Directly executes a one-way SQL operation in prepared statement format, that will block the calling thread until finished. + void DirectExecute(PreparedStatement* stmt) + { + T* t = GetFreeConnection(); + t->Execute(stmt); + t->Unlock(); + } + + /** + Syncrhonous query (with resultset) methods. + */ + + //! Directly executes an SQL query in string format that will block the calling thread until finished. + //! Returns reference counted auto pointer, no need for manual memory management in upper level code. QueryResult Query(const char* sql, MySQLConnection* conn = NULL) { if (!conn) @@ -193,6 +227,8 @@ class DatabaseWorkerPool return QueryResult(result); } + //! Directly executes an SQL query in string format -with variable args- that will block the calling thread until finished. + //! Returns reference counted auto pointer, no need for manual memory management in upper level code. QueryResult PQuery(const char* sql, MySQLConnection* conn, ...) { if (!sql) @@ -207,6 +243,8 @@ class DatabaseWorkerPool return Query(szQuery, conn); } + //! Directly executes an SQL query in string format -with variable args- that will block the calling thread until finished. + //! Returns reference counted auto pointer, no need for manual memory management in upper level code. QueryResult PQuery(const char* sql, ...) { if (!sql) @@ -221,7 +259,27 @@ class DatabaseWorkerPool return Query(szQuery); } - ACE_Future<QueryResult> AsyncQuery(const char* sql) + //! Directly executes an SQL query in prepared format that will block the calling thread until finished. + //! Returns reference counted auto pointer, no need for manual memory management in upper level code. + PreparedQueryResult Query(PreparedStatement* stmt) + { + T* t = GetFreeConnection(); + PreparedResultSet* ret = t->Query(stmt); + t->Unlock(); + + if (!ret || !ret->GetRowCount()) + return PreparedQueryResult(NULL); + + return PreparedQueryResult(ret); + } + + /** + Asynchronous query (with resultset) methods. + */ + + //! Enqueues a query in string format that will set the value of the QueryResultFuture return object as soon as the query is executed. + //! The return value is then processed in ProcessQueryCallback methods. + QueryResultFuture AsyncQuery(const char* sql) { QueryResultFuture res; BasicStatementTask* task = new BasicStatementTask(sql, res); @@ -229,7 +287,9 @@ class DatabaseWorkerPool return res; //! Actual return value has no use yet } - ACE_Future<QueryResult> AsyncPQuery(const char* sql, ...) + //! Enqueues a query in string format -with variable args- that will set the value of the QueryResultFuture return object as soon as the query is executed. + //! The return value is then processed in ProcessQueryCallback methods. + QueryResultFuture AsyncPQuery(const char* sql, ...) { va_list ap; char szQuery[MAX_QUERY_LEN]; @@ -240,6 +300,19 @@ class DatabaseWorkerPool return AsyncQuery(szQuery); } + //! Enqueues a query in prepared format that will set the value of the PreparedQueryResultFuture return object as soon as the query is executed. + //! The return value is then processed in ProcessQueryCallback methods. + PreparedQueryResultFuture AsyncQuery(PreparedStatement* stmt) + { + PreparedQueryResultFuture res; + PreparedStatementTask* task = new PreparedStatementTask(stmt, res); + Enqueue(task); + return res; + } + + //! Enqueues a vector of SQL operations (can be both adhoc and prepared) that will set the value of the QueryResultHolderFuture + //! return object as soon as the query is executed. + //! The return value is then processed in ProcessQueryCallback methods. QueryResultHolderFuture DelayQueryHolder(SQLQueryHolder* holder) { QueryResultHolderFuture res; @@ -248,11 +321,18 @@ class DatabaseWorkerPool return res; //! Fool compiler, has no use yet } + /** + Transaction context methods. + */ + + //! Begins an automanaged transaction pointer that will automatically rollback if not commited. (Autocommit=0) SQLTransaction BeginTransaction() { return SQLTransaction(new Transaction); } + //! Enqueues a collection of one-way SQL operations (can be both adhoc and prepared). The order in which these operations + //! were appended to the transaction will be respected during execution. void CommitTransaction(SQLTransaction transaction) { if (sLog.GetSQLDriverQueryLogging()) @@ -293,24 +373,18 @@ class DatabaseWorkerPool trans->Append(sql); } + /** + Other + */ + + //! Automanaged (internally) pointer to a prepared statement object for usage in upper level code. + //! This object is not tied to the prepared statement on the MySQL context yet until execution. PreparedStatement* GetPreparedStatement(uint32 index) { return new PreparedStatement(index); } - void Execute(PreparedStatement* stmt) - { - PreparedStatementTask* task = new PreparedStatementTask(stmt); - Enqueue(task); - } - - void DirectExecute(PreparedStatement* stmt) - { - T* t = GetFreeConnection(); - t->Execute(stmt); - t->Unlock(); - } - + //! Apply escape string'ing for current collation. (utf8) void escape_string(std::string& str) { if (str.empty()) @@ -322,18 +396,7 @@ class DatabaseWorkerPool delete[] buf; } - PreparedQueryResult Query(PreparedStatement* stmt) - { - T* t = GetFreeConnection(); - PreparedResultSet* ret = t->Query(stmt); - t->Unlock(); - - if (!ret || !ret->GetRowCount()) - return PreparedQueryResult(NULL); - - return PreparedQueryResult(ret); - } - + //! Keeps all our MySQL connections alive, prevent the server from disconnecting us. void KeepAlive() { /// Ping synchronous connections diff --git a/src/server/shared/Database/PreparedStatement.cpp b/src/server/shared/Database/PreparedStatement.cpp index b95fb485079..95d0f89ba67 100755 --- a/src/server/shared/Database/PreparedStatement.cpp +++ b/src/server/shared/Database/PreparedStatement.cpp @@ -332,10 +332,19 @@ void MySQLPreparedStatement::setValue(MYSQL_BIND* param, enum_field_types type, //- Execution PreparedStatementTask::PreparedStatementTask(PreparedStatement* stmt) : -m_stmt(stmt) +m_stmt(stmt), +m_has_result(false) { } +PreparedStatementTask::PreparedStatementTask(PreparedStatement* stmt, PreparedQueryResultFuture result) : +m_stmt(stmt), +m_has_result(true), +m_result(result) +{ +} + + PreparedStatementTask::~PreparedStatementTask() { delete m_stmt; @@ -343,5 +352,17 @@ PreparedStatementTask::~PreparedStatementTask() bool PreparedStatementTask::Execute() { + if (m_has_result) + { + PreparedResultSet* result = m_conn->Query(m_stmt); + if (!result || !result->GetRowCount()) + { + m_result.set(PreparedQueryResult(NULL)); + return false; + } + m_result.set(PreparedQueryResult(result)); + return true; + } + return m_conn->Execute(m_stmt); } diff --git a/src/server/shared/Database/PreparedStatement.h b/src/server/shared/Database/PreparedStatement.h index 184c3a1a8da..3a0cd9f0b55 100755 --- a/src/server/shared/Database/PreparedStatement.h +++ b/src/server/shared/Database/PreparedStatement.h @@ -19,6 +19,7 @@ #define _PREPAREDSTATEMENT_H #include "SQLOperation.h" +#include <ace/Future.h> //- Union for data buffer (upper-level bind -> queue -> lower-level bind) union PreparedStatementDataUnion @@ -138,16 +139,21 @@ class MySQLPreparedStatement MYSQL_BIND* m_bind; }; +typedef ACE_Future<PreparedQueryResult> PreparedQueryResultFuture; + //- Lower-level class, enqueuable operation class PreparedStatementTask : public SQLOperation { public: PreparedStatementTask(PreparedStatement* stmt); + PreparedStatementTask(PreparedStatement* stmt, PreparedQueryResultFuture result); ~PreparedStatementTask(); bool Execute(); protected: PreparedStatement* m_stmt; + bool m_has_result; + PreparedQueryResultFuture m_result; }; #endif
\ No newline at end of file diff --git a/src/server/shared/Database/Transaction.h b/src/server/shared/Database/Transaction.h index 96acef0b01c..a182341f5d7 100755 --- a/src/server/shared/Database/Transaction.h +++ b/src/server/shared/Database/Transaction.h @@ -35,7 +35,7 @@ class Transaction void Append(const char* sql); void PAppend(const char* sql, ...); - size_t GetSize() { return m_queries.size(); } + size_t GetSize() const { return m_queries.size(); } protected: void Cleanup(); diff --git a/src/server/shared/Threading/Callback.h b/src/server/shared/Threading/Callback.h index 0847417e9e7..b8f1d88442b 100755 --- a/src/server/shared/Threading/Callback.h +++ b/src/server/shared/Threading/Callback.h @@ -23,6 +23,7 @@ #include "QueryResult.h" typedef ACE_Future<QueryResult> QueryResultFuture; +typedef ACE_Future<PreparedQueryResult> PreparedQueryResultFuture; /*! A simple template using ACE_Future to manage callbacks from the thread and object that issued the request. <ParamType> is variable type of parameter that is used as parameter |
