diff options
author | leak <leak@bitmx.net> | 2014-06-30 18:37:23 +0200 |
---|---|---|
committer | leak <leak@bitmx.net> | 2014-06-30 18:37:23 +0200 |
commit | d39a013b6b979a5158bf86c37a197cb902b2c2f9 (patch) | |
tree | d39b71809197052500bdbc7399276aa15dd96ee8 /src | |
parent | 9588c1d92b20573c2bd214b44a15e41fd8cf35b4 (diff) |
Replaced ACE_Task_Base based DatabaseWorker with PCQ
Note: Not exactly sure how shutdown should be handled, currently the queue clears itself out before shutting down
This might need to be changed if the queue should be fully processed before being deleted
Diffstat (limited to 'src')
-rw-r--r-- | src/server/shared/Database/DatabaseWorker.cpp | 44 | ||||
-rw-r--r-- | src/server/shared/Database/DatabaseWorker.h | 21 | ||||
-rw-r--r-- | src/server/shared/Database/DatabaseWorkerPool.h | 20 | ||||
-rw-r--r-- | src/server/shared/Database/Implementation/CharacterDatabase.h | 2 | ||||
-rw-r--r-- | src/server/shared/Database/Implementation/LoginDatabase.h | 2 | ||||
-rw-r--r-- | src/server/shared/Database/Implementation/WorldDatabase.h | 2 | ||||
-rw-r--r-- | src/server/shared/Database/MySQLConnection.cpp | 13 | ||||
-rw-r--r-- | src/server/shared/Database/MySQLConnection.h | 5 | ||||
-rw-r--r-- | src/server/shared/Database/SQLOperation.h | 5 | ||||
-rw-r--r-- | src/server/shared/Threading/ProducerConsumerQueue.h | 11 |
10 files changed, 68 insertions, 57 deletions
diff --git a/src/server/shared/Database/DatabaseWorker.cpp b/src/server/shared/Database/DatabaseWorker.cpp index 3581f8e0211..3944c008652 100644 --- a/src/server/shared/Database/DatabaseWorker.cpp +++ b/src/server/shared/Database/DatabaseWorker.cpp @@ -20,32 +20,42 @@ #include "SQLOperation.h" #include "MySQLConnection.h" #include "MySQLThreading.h" +#include "ProducerConsumerQueue.h" -DatabaseWorker::DatabaseWorker(ACE_Activation_Queue* new_queue, MySQLConnection* con) : -m_queue(new_queue), -m_conn(con) +DatabaseWorker::DatabaseWorker(ProducerConsumerQueue<SQLOperation*>* newQueue, MySQLConnection* connection) { - /// Assign thread to task - activate(); + _connection = connection; + _queue = newQueue; + _cancelationToken = false; + _workerThread = std::thread(&DatabaseWorker::WorkerThread, this); } -int DatabaseWorker::svc() +DatabaseWorker::~DatabaseWorker() { - if (!m_queue) - return -1; + _cancelationToken = true; + + _queue->Cancel(); + + _workerThread.join(); +} + +void DatabaseWorker::WorkerThread() +{ + if (!_queue) + return; - SQLOperation *request = NULL; while (1) { - request = (SQLOperation*)(m_queue->dequeue()); - if (!request) - break; + SQLOperation* operation = nullptr; - request->SetConnection(m_conn); - request->call(); + _queue->WaitAndPop(operation); - delete request; - } + if (_cancelationToken) + return; - return 0; + operation->SetConnection(_connection); + operation->call(); + + delete operation; + } } diff --git a/src/server/shared/Database/DatabaseWorker.h b/src/server/shared/Database/DatabaseWorker.h index dc883dd3428..9b45318e39f 100644 --- a/src/server/shared/Database/DatabaseWorker.h +++ b/src/server/shared/Database/DatabaseWorker.h @@ -19,23 +19,24 @@ #define _WORKERTHREAD_H #include "Define.h" -#include <ace/Task.h> -#include <ace/Activation_Queue.h> +#include "ProducerConsumerQueue.h" class MySQLConnection; -class DatabaseWorker : protected ACE_Task_Base +class DatabaseWorker { public: - DatabaseWorker(ACE_Activation_Queue* new_queue, MySQLConnection* con); - - ///- Inherited from ACE_Task_Base - int svc(); - int wait() { return ACE_Task_Base::wait(); } + DatabaseWorker(ProducerConsumerQueue<SQLOperation*>* newQueue, MySQLConnection* connection); + ~DatabaseWorker(); private: - ACE_Activation_Queue* m_queue; - MySQLConnection* m_conn; + ProducerConsumerQueue<SQLOperation*>* _queue; + MySQLConnection* _connection; + + void WorkerThread(); + std::thread _workerThread; + + std::atomic_bool _cancelationToken; DatabaseWorker(DatabaseWorker const& right) = delete; DatabaseWorker& operator=(DatabaseWorker const& right) = delete; diff --git a/src/server/shared/Database/DatabaseWorkerPool.h b/src/server/shared/Database/DatabaseWorkerPool.h index 078b97762b5..e56dcc329cd 100644 --- a/src/server/shared/Database/DatabaseWorkerPool.h +++ b/src/server/shared/Database/DatabaseWorkerPool.h @@ -52,7 +52,7 @@ class DatabaseWorkerPool DatabaseWorkerPool() : _connectionInfo(NULL) { _messageQueue = new ACE_Message_Queue<ACE_SYNCH>(8 * 1024 * 1024, 8 * 1024 * 1024); - _queue = new ACE_Activation_Queue(_messageQueue); + _queue = new ProducerConsumerQueue<SQLOperation*>(); memset(_connectionCount, 0, sizeof(_connectionCount)); _connections.resize(IDX_SIZE); @@ -107,16 +107,10 @@ class DatabaseWorkerPool { TC_LOG_INFO("sql.driver", "Closing down DatabasePool '%s'.", GetDatabaseName()); - //! Shuts down delaythreads for this connection pool by underlying deactivate(). - //! The next dequeue attempt in the worker thread tasks will result in an error, - //! ultimately ending the worker thread task. - _queue->queue()->close(); - for (uint8 i = 0; i < _connectionCount[IDX_ASYNC]; ++i) { T* t = _connections[IDX_ASYNC][i]; DatabaseWorker* worker = t->m_worker; - worker->wait(); //! Block until no more threads are running this task. delete worker; t->Close(); //! Closes the actualy MySQL connection. } @@ -488,7 +482,7 @@ class DatabaseWorkerPool void Enqueue(SQLOperation* op) { - _queue->enqueue(op); + _queue->Push(op); } //! Gets a free connection in the synchronous connection pool. @@ -523,11 +517,11 @@ class DatabaseWorkerPool IDX_SIZE }; - ACE_Message_Queue<ACE_SYNCH>* _messageQueue; //! Message Queue used by ACE_Activation_Queue - ACE_Activation_Queue* _queue; //! Queue shared by async worker threads. - std::vector< std::vector<T*> > _connections; - uint32 _connectionCount[2]; //! Counter of MySQL connections; - MySQLConnectionInfo* _connectionInfo; + ACE_Message_Queue<ACE_SYNCH>* _messageQueue; //! Message Queue used by ACE_Activation_Queue + ProducerConsumerQueue<SQLOperation*>* _queue; //! Queue shared by async worker threads. + std::vector< std::vector<T*> > _connections; + uint32 _connectionCount[2]; //! Counter of MySQL connections; + MySQLConnectionInfo* _connectionInfo; }; #endif diff --git a/src/server/shared/Database/Implementation/CharacterDatabase.h b/src/server/shared/Database/Implementation/CharacterDatabase.h index 98d7fe231f1..61167681b0b 100644 --- a/src/server/shared/Database/Implementation/CharacterDatabase.h +++ b/src/server/shared/Database/Implementation/CharacterDatabase.h @@ -26,7 +26,7 @@ class CharacterDatabaseConnection : public MySQLConnection public: //- Constructors for sync and async connections CharacterDatabaseConnection(MySQLConnectionInfo& connInfo) : MySQLConnection(connInfo) { } - CharacterDatabaseConnection(ACE_Activation_Queue* q, MySQLConnectionInfo& connInfo) : MySQLConnection(q, connInfo) { } + CharacterDatabaseConnection(ProducerConsumerQueue<SQLOperation*>* q, MySQLConnectionInfo& connInfo) : MySQLConnection(q, connInfo) { } //- Loads database type specific prepared statements void DoPrepareStatements(); diff --git a/src/server/shared/Database/Implementation/LoginDatabase.h b/src/server/shared/Database/Implementation/LoginDatabase.h index 604e9d39551..7fa2ff49324 100644 --- a/src/server/shared/Database/Implementation/LoginDatabase.h +++ b/src/server/shared/Database/Implementation/LoginDatabase.h @@ -26,7 +26,7 @@ class LoginDatabaseConnection : public MySQLConnection public: //- Constructors for sync and async connections LoginDatabaseConnection(MySQLConnectionInfo& connInfo) : MySQLConnection(connInfo) { } - LoginDatabaseConnection(ACE_Activation_Queue* q, MySQLConnectionInfo& connInfo) : MySQLConnection(q, connInfo) { } + LoginDatabaseConnection(ProducerConsumerQueue<SQLOperation*>* q, MySQLConnectionInfo& connInfo) : MySQLConnection(q, connInfo) { } //- Loads database type specific prepared statements void DoPrepareStatements(); diff --git a/src/server/shared/Database/Implementation/WorldDatabase.h b/src/server/shared/Database/Implementation/WorldDatabase.h index a815373a1c6..c8c38d8a629 100644 --- a/src/server/shared/Database/Implementation/WorldDatabase.h +++ b/src/server/shared/Database/Implementation/WorldDatabase.h @@ -26,7 +26,7 @@ class WorldDatabaseConnection : public MySQLConnection public: //- Constructors for sync and async connections WorldDatabaseConnection(MySQLConnectionInfo& connInfo) : MySQLConnection(connInfo) { } - WorldDatabaseConnection(ACE_Activation_Queue* q, MySQLConnectionInfo& connInfo) : MySQLConnection(q, connInfo) { } + WorldDatabaseConnection(ProducerConsumerQueue<SQLOperation*>* q, MySQLConnectionInfo& connInfo) : MySQLConnection(q, connInfo) { } //- Loads database type specific prepared statements void DoPrepareStatements(); diff --git a/src/server/shared/Database/MySQLConnection.cpp b/src/server/shared/Database/MySQLConnection.cpp index 0d2d97f7f28..8b24f508331 100644 --- a/src/server/shared/Database/MySQLConnection.cpp +++ b/src/server/shared/Database/MySQLConnection.cpp @@ -33,8 +33,7 @@ #include "DatabaseWorker.h" #include "Timer.h" #include "Log.h" - -#include <ace/OS_NS_unistd.h> +#include "ProducerConsumerQueue.h" MySQLConnection::MySQLConnection(MySQLConnectionInfo& connInfo) : m_reconnecting(false), @@ -45,7 +44,7 @@ m_Mysql(NULL), m_connectionInfo(connInfo), m_connectionFlags(CONNECTION_SYNCH) { } -MySQLConnection::MySQLConnection(ACE_Activation_Queue* queue, MySQLConnectionInfo& connInfo) : +MySQLConnection::MySQLConnection(ProducerConsumerQueue<SQLOperation*>* queue, MySQLConnectionInfo& connInfo) : m_reconnecting(false), m_prepareError(false), m_queue(queue), @@ -502,8 +501,8 @@ bool MySQLConnection::_HandleMySQLErrno(uint32 errNo) } uint32 lErrno = mysql_errno(GetHandle()); // It's possible this attempted reconnect throws 2006 at us. To prevent crazy recursive calls, sleep here. - ACE_OS::sleep(3); // Sleep 3 seconds - return _HandleMySQLErrno(lErrno); // Call self (recursive) + std::this_thread::sleep_for(std::chrono::seconds(3)); // Sleep 3 seconds + return _HandleMySQLErrno(lErrno); // Call self (recursive) } case ER_LOCK_DEADLOCK: @@ -517,12 +516,12 @@ bool MySQLConnection::_HandleMySQLErrno(uint32 errNo) case ER_BAD_FIELD_ERROR: case ER_NO_SUCH_TABLE: TC_LOG_ERROR("sql.sql", "Your database structure is not up to date. Please make sure you've executed all queries in the sql/updates folders."); - ACE_OS::sleep(10); + std::this_thread::sleep_for(std::chrono::seconds(10)); std::abort(); return false; case ER_PARSE_ERROR: TC_LOG_ERROR("sql.sql", "Error while parsing SQL. Core fix required."); - ACE_OS::sleep(10); + std::this_thread::sleep_for(std::chrono::seconds(10)); std::abort(); return false; default: diff --git a/src/server/shared/Database/MySQLConnection.h b/src/server/shared/Database/MySQLConnection.h index 512df7c16c7..3b7efeb5846 100644 --- a/src/server/shared/Database/MySQLConnection.h +++ b/src/server/shared/Database/MySQLConnection.h @@ -20,6 +20,7 @@ #include "DatabaseWorkerPool.h" #include "Transaction.h" #include "Util.h" +#include "ProducerConsumerQueue.h" #ifndef _MYSQLCONNECTION_H #define _MYSQLCONNECTION_H @@ -70,7 +71,7 @@ class MySQLConnection public: MySQLConnection(MySQLConnectionInfo& connInfo); //! Constructor for synchronous connections. - MySQLConnection(ACE_Activation_Queue* queue, MySQLConnectionInfo& connInfo); //! Constructor for asynchronous connections. + MySQLConnection(ProducerConsumerQueue<SQLOperation*>* queue, MySQLConnectionInfo& connInfo); //! Constructor for asynchronous connections. virtual ~MySQLConnection(); virtual bool Open(); @@ -125,7 +126,7 @@ class MySQLConnection bool _HandleMySQLErrno(uint32 errNo); private: - ACE_Activation_Queue* m_queue; //! Queue shared with other asynchronous connections. + ProducerConsumerQueue<SQLOperation*>* m_queue; //! Queue shared with other asynchronous connections. DatabaseWorker* m_worker; //! Core worker task. MYSQL * m_Mysql; //! MySQL Handle. MySQLConnectionInfo& m_connectionInfo; //! Connection info (used for logging) diff --git a/src/server/shared/Database/SQLOperation.h b/src/server/shared/Database/SQLOperation.h index 6f933a051e3..64fc64e2c2e 100644 --- a/src/server/shared/Database/SQLOperation.h +++ b/src/server/shared/Database/SQLOperation.h @@ -18,9 +18,6 @@ #ifndef _SQLOPERATION_H #define _SQLOPERATION_H -#include <ace/Method_Request.h> -#include <ace/Activation_Queue.h> - #include "QueryResult.h" //- Forward declare (don't include header to prevent circular includes) @@ -56,7 +53,7 @@ union SQLResultSetUnion class MySQLConnection; -class SQLOperation : public ACE_Method_Request +class SQLOperation { public: SQLOperation(): m_conn(NULL) { } diff --git a/src/server/shared/Threading/ProducerConsumerQueue.h b/src/server/shared/Threading/ProducerConsumerQueue.h index 961cb9f9c82..41bff445c2e 100644 --- a/src/server/shared/Threading/ProducerConsumerQueue.h +++ b/src/server/shared/Threading/ProducerConsumerQueue.h @@ -21,6 +21,7 @@ #include <condition_variable> #include <mutex> #include <queue> +#include <atomic> template <typename T> class ProducerConsumerQueue @@ -29,9 +30,12 @@ private: std::mutex _queueLock; std::queue<T> _queue; std::condition_variable _condition; + std::atomic<bool> _shutdown; public: + ProducerConsumerQueue<T>() : _shutdown(false) { } + void Push(const T& value) { _queueLock.lock(); @@ -68,7 +72,10 @@ public: { std::unique_lock<std::mutex> lock(_queueLock); - _condition.wait(lock, [this](){ return !_queue.empty(); }); + while (_queue.empty() && !_shutdown) + { + _condition.wait(lock); + } if (_queue.empty()) return; @@ -91,6 +98,8 @@ public: _queue.pop(); } + _shutdown = true; + _queueLock.unlock(); _condition.notify_all(); |