diff options
-rw-r--r-- | src/server/shared/Database/DatabaseWorker.cpp | 44 | ||||
-rw-r--r-- | src/server/shared/Database/DatabaseWorker.h | 21 | ||||
-rw-r--r-- | src/server/shared/Database/DatabaseWorkerPool.h | 20 | ||||
-rw-r--r-- | src/server/shared/Database/Implementation/CharacterDatabase.h | 2 | ||||
-rw-r--r-- | src/server/shared/Database/Implementation/LoginDatabase.h | 2 | ||||
-rw-r--r-- | src/server/shared/Database/Implementation/WorldDatabase.h | 2 | ||||
-rw-r--r-- | src/server/shared/Database/MySQLConnection.cpp | 13 | ||||
-rw-r--r-- | src/server/shared/Database/MySQLConnection.h | 5 | ||||
-rw-r--r-- | src/server/shared/Database/SQLOperation.h | 5 | ||||
-rw-r--r-- | src/server/shared/Threading/ProducerConsumerQueue.h | 11 |
10 files changed, 68 insertions, 57 deletions
diff --git a/src/server/shared/Database/DatabaseWorker.cpp b/src/server/shared/Database/DatabaseWorker.cpp index 3581f8e0211..3944c008652 100644 --- a/src/server/shared/Database/DatabaseWorker.cpp +++ b/src/server/shared/Database/DatabaseWorker.cpp @@ -20,32 +20,42 @@ #include "SQLOperation.h" #include "MySQLConnection.h" #include "MySQLThreading.h" +#include "ProducerConsumerQueue.h" -DatabaseWorker::DatabaseWorker(ACE_Activation_Queue* new_queue, MySQLConnection* con) : -m_queue(new_queue), -m_conn(con) +DatabaseWorker::DatabaseWorker(ProducerConsumerQueue<SQLOperation*>* newQueue, MySQLConnection* connection) { - /// Assign thread to task - activate(); + _connection = connection; + _queue = newQueue; + _cancelationToken = false; + _workerThread = std::thread(&DatabaseWorker::WorkerThread, this); } -int DatabaseWorker::svc() +DatabaseWorker::~DatabaseWorker() { - if (!m_queue) - return -1; + _cancelationToken = true; + + _queue->Cancel(); + + _workerThread.join(); +} + +void DatabaseWorker::WorkerThread() +{ + if (!_queue) + return; - SQLOperation *request = NULL; while (1) { - request = (SQLOperation*)(m_queue->dequeue()); - if (!request) - break; + SQLOperation* operation = nullptr; - request->SetConnection(m_conn); - request->call(); + _queue->WaitAndPop(operation); - delete request; - } + if (_cancelationToken) + return; - return 0; + operation->SetConnection(_connection); + operation->call(); + + delete operation; + } } diff --git a/src/server/shared/Database/DatabaseWorker.h b/src/server/shared/Database/DatabaseWorker.h index dc883dd3428..9b45318e39f 100644 --- a/src/server/shared/Database/DatabaseWorker.h +++ b/src/server/shared/Database/DatabaseWorker.h @@ -19,23 +19,24 @@ #define _WORKERTHREAD_H #include "Define.h" -#include <ace/Task.h> -#include <ace/Activation_Queue.h> +#include "ProducerConsumerQueue.h" class MySQLConnection; -class DatabaseWorker : protected ACE_Task_Base +class DatabaseWorker { public: - DatabaseWorker(ACE_Activation_Queue* new_queue, MySQLConnection* con); - - ///- Inherited from ACE_Task_Base - int svc(); - int wait() { return ACE_Task_Base::wait(); } + DatabaseWorker(ProducerConsumerQueue<SQLOperation*>* newQueue, MySQLConnection* connection); + ~DatabaseWorker(); private: - ACE_Activation_Queue* m_queue; - MySQLConnection* m_conn; + ProducerConsumerQueue<SQLOperation*>* _queue; + MySQLConnection* _connection; + + void WorkerThread(); + std::thread _workerThread; + + std::atomic_bool _cancelationToken; DatabaseWorker(DatabaseWorker const& right) = delete; DatabaseWorker& operator=(DatabaseWorker const& right) = delete; diff --git a/src/server/shared/Database/DatabaseWorkerPool.h b/src/server/shared/Database/DatabaseWorkerPool.h index 078b97762b5..e56dcc329cd 100644 --- a/src/server/shared/Database/DatabaseWorkerPool.h +++ b/src/server/shared/Database/DatabaseWorkerPool.h @@ -52,7 +52,7 @@ class DatabaseWorkerPool DatabaseWorkerPool() : _connectionInfo(NULL) { _messageQueue = new ACE_Message_Queue<ACE_SYNCH>(8 * 1024 * 1024, 8 * 1024 * 1024); - _queue = new ACE_Activation_Queue(_messageQueue); + _queue = new ProducerConsumerQueue<SQLOperation*>(); memset(_connectionCount, 0, sizeof(_connectionCount)); _connections.resize(IDX_SIZE); @@ -107,16 +107,10 @@ class DatabaseWorkerPool { TC_LOG_INFO("sql.driver", "Closing down DatabasePool '%s'.", GetDatabaseName()); - //! Shuts down delaythreads for this connection pool by underlying deactivate(). - //! The next dequeue attempt in the worker thread tasks will result in an error, - //! ultimately ending the worker thread task. - _queue->queue()->close(); - for (uint8 i = 0; i < _connectionCount[IDX_ASYNC]; ++i) { T* t = _connections[IDX_ASYNC][i]; DatabaseWorker* worker = t->m_worker; - worker->wait(); //! Block until no more threads are running this task. delete worker; t->Close(); //! Closes the actualy MySQL connection. } @@ -488,7 +482,7 @@ class DatabaseWorkerPool void Enqueue(SQLOperation* op) { - _queue->enqueue(op); + _queue->Push(op); } //! Gets a free connection in the synchronous connection pool. @@ -523,11 +517,11 @@ class DatabaseWorkerPool IDX_SIZE }; - ACE_Message_Queue<ACE_SYNCH>* _messageQueue; //! Message Queue used by ACE_Activation_Queue - ACE_Activation_Queue* _queue; //! Queue shared by async worker threads. - std::vector< std::vector<T*> > _connections; - uint32 _connectionCount[2]; //! Counter of MySQL connections; - MySQLConnectionInfo* _connectionInfo; + ACE_Message_Queue<ACE_SYNCH>* _messageQueue; //! Message Queue used by ACE_Activation_Queue + ProducerConsumerQueue<SQLOperation*>* _queue; //! Queue shared by async worker threads. + std::vector< std::vector<T*> > _connections; + uint32 _connectionCount[2]; //! Counter of MySQL connections; + MySQLConnectionInfo* _connectionInfo; }; #endif diff --git a/src/server/shared/Database/Implementation/CharacterDatabase.h b/src/server/shared/Database/Implementation/CharacterDatabase.h index 98d7fe231f1..61167681b0b 100644 --- a/src/server/shared/Database/Implementation/CharacterDatabase.h +++ b/src/server/shared/Database/Implementation/CharacterDatabase.h @@ -26,7 +26,7 @@ class CharacterDatabaseConnection : public MySQLConnection public: //- Constructors for sync and async connections CharacterDatabaseConnection(MySQLConnectionInfo& connInfo) : MySQLConnection(connInfo) { } - CharacterDatabaseConnection(ACE_Activation_Queue* q, MySQLConnectionInfo& connInfo) : MySQLConnection(q, connInfo) { } + CharacterDatabaseConnection(ProducerConsumerQueue<SQLOperation*>* q, MySQLConnectionInfo& connInfo) : MySQLConnection(q, connInfo) { } //- Loads database type specific prepared statements void DoPrepareStatements(); diff --git a/src/server/shared/Database/Implementation/LoginDatabase.h b/src/server/shared/Database/Implementation/LoginDatabase.h index 604e9d39551..7fa2ff49324 100644 --- a/src/server/shared/Database/Implementation/LoginDatabase.h +++ b/src/server/shared/Database/Implementation/LoginDatabase.h @@ -26,7 +26,7 @@ class LoginDatabaseConnection : public MySQLConnection public: //- Constructors for sync and async connections LoginDatabaseConnection(MySQLConnectionInfo& connInfo) : MySQLConnection(connInfo) { } - LoginDatabaseConnection(ACE_Activation_Queue* q, MySQLConnectionInfo& connInfo) : MySQLConnection(q, connInfo) { } + LoginDatabaseConnection(ProducerConsumerQueue<SQLOperation*>* q, MySQLConnectionInfo& connInfo) : MySQLConnection(q, connInfo) { } //- Loads database type specific prepared statements void DoPrepareStatements(); diff --git a/src/server/shared/Database/Implementation/WorldDatabase.h b/src/server/shared/Database/Implementation/WorldDatabase.h index a815373a1c6..c8c38d8a629 100644 --- a/src/server/shared/Database/Implementation/WorldDatabase.h +++ b/src/server/shared/Database/Implementation/WorldDatabase.h @@ -26,7 +26,7 @@ class WorldDatabaseConnection : public MySQLConnection public: //- Constructors for sync and async connections WorldDatabaseConnection(MySQLConnectionInfo& connInfo) : MySQLConnection(connInfo) { } - WorldDatabaseConnection(ACE_Activation_Queue* q, MySQLConnectionInfo& connInfo) : MySQLConnection(q, connInfo) { } + WorldDatabaseConnection(ProducerConsumerQueue<SQLOperation*>* q, MySQLConnectionInfo& connInfo) : MySQLConnection(q, connInfo) { } //- Loads database type specific prepared statements void DoPrepareStatements(); diff --git a/src/server/shared/Database/MySQLConnection.cpp b/src/server/shared/Database/MySQLConnection.cpp index 0d2d97f7f28..8b24f508331 100644 --- a/src/server/shared/Database/MySQLConnection.cpp +++ b/src/server/shared/Database/MySQLConnection.cpp @@ -33,8 +33,7 @@ #include "DatabaseWorker.h" #include "Timer.h" #include "Log.h" - -#include <ace/OS_NS_unistd.h> +#include "ProducerConsumerQueue.h" MySQLConnection::MySQLConnection(MySQLConnectionInfo& connInfo) : m_reconnecting(false), @@ -45,7 +44,7 @@ m_Mysql(NULL), m_connectionInfo(connInfo), m_connectionFlags(CONNECTION_SYNCH) { } -MySQLConnection::MySQLConnection(ACE_Activation_Queue* queue, MySQLConnectionInfo& connInfo) : +MySQLConnection::MySQLConnection(ProducerConsumerQueue<SQLOperation*>* queue, MySQLConnectionInfo& connInfo) : m_reconnecting(false), m_prepareError(false), m_queue(queue), @@ -502,8 +501,8 @@ bool MySQLConnection::_HandleMySQLErrno(uint32 errNo) } uint32 lErrno = mysql_errno(GetHandle()); // It's possible this attempted reconnect throws 2006 at us. To prevent crazy recursive calls, sleep here. - ACE_OS::sleep(3); // Sleep 3 seconds - return _HandleMySQLErrno(lErrno); // Call self (recursive) + std::this_thread::sleep_for(std::chrono::seconds(3)); // Sleep 3 seconds + return _HandleMySQLErrno(lErrno); // Call self (recursive) } case ER_LOCK_DEADLOCK: @@ -517,12 +516,12 @@ bool MySQLConnection::_HandleMySQLErrno(uint32 errNo) case ER_BAD_FIELD_ERROR: case ER_NO_SUCH_TABLE: TC_LOG_ERROR("sql.sql", "Your database structure is not up to date. Please make sure you've executed all queries in the sql/updates folders."); - ACE_OS::sleep(10); + std::this_thread::sleep_for(std::chrono::seconds(10)); std::abort(); return false; case ER_PARSE_ERROR: TC_LOG_ERROR("sql.sql", "Error while parsing SQL. Core fix required."); - ACE_OS::sleep(10); + std::this_thread::sleep_for(std::chrono::seconds(10)); std::abort(); return false; default: diff --git a/src/server/shared/Database/MySQLConnection.h b/src/server/shared/Database/MySQLConnection.h index 512df7c16c7..3b7efeb5846 100644 --- a/src/server/shared/Database/MySQLConnection.h +++ b/src/server/shared/Database/MySQLConnection.h @@ -20,6 +20,7 @@ #include "DatabaseWorkerPool.h" #include "Transaction.h" #include "Util.h" +#include "ProducerConsumerQueue.h" #ifndef _MYSQLCONNECTION_H #define _MYSQLCONNECTION_H @@ -70,7 +71,7 @@ class MySQLConnection public: MySQLConnection(MySQLConnectionInfo& connInfo); //! Constructor for synchronous connections. - MySQLConnection(ACE_Activation_Queue* queue, MySQLConnectionInfo& connInfo); //! Constructor for asynchronous connections. + MySQLConnection(ProducerConsumerQueue<SQLOperation*>* queue, MySQLConnectionInfo& connInfo); //! Constructor for asynchronous connections. virtual ~MySQLConnection(); virtual bool Open(); @@ -125,7 +126,7 @@ class MySQLConnection bool _HandleMySQLErrno(uint32 errNo); private: - ACE_Activation_Queue* m_queue; //! Queue shared with other asynchronous connections. + ProducerConsumerQueue<SQLOperation*>* m_queue; //! Queue shared with other asynchronous connections. DatabaseWorker* m_worker; //! Core worker task. MYSQL * m_Mysql; //! MySQL Handle. MySQLConnectionInfo& m_connectionInfo; //! Connection info (used for logging) diff --git a/src/server/shared/Database/SQLOperation.h b/src/server/shared/Database/SQLOperation.h index 6f933a051e3..64fc64e2c2e 100644 --- a/src/server/shared/Database/SQLOperation.h +++ b/src/server/shared/Database/SQLOperation.h @@ -18,9 +18,6 @@ #ifndef _SQLOPERATION_H #define _SQLOPERATION_H -#include <ace/Method_Request.h> -#include <ace/Activation_Queue.h> - #include "QueryResult.h" //- Forward declare (don't include header to prevent circular includes) @@ -56,7 +53,7 @@ union SQLResultSetUnion class MySQLConnection; -class SQLOperation : public ACE_Method_Request +class SQLOperation { public: SQLOperation(): m_conn(NULL) { } diff --git a/src/server/shared/Threading/ProducerConsumerQueue.h b/src/server/shared/Threading/ProducerConsumerQueue.h index 961cb9f9c82..41bff445c2e 100644 --- a/src/server/shared/Threading/ProducerConsumerQueue.h +++ b/src/server/shared/Threading/ProducerConsumerQueue.h @@ -21,6 +21,7 @@ #include <condition_variable> #include <mutex> #include <queue> +#include <atomic> template <typename T> class ProducerConsumerQueue @@ -29,9 +30,12 @@ private: std::mutex _queueLock; std::queue<T> _queue; std::condition_variable _condition; + std::atomic<bool> _shutdown; public: + ProducerConsumerQueue<T>() : _shutdown(false) { } + void Push(const T& value) { _queueLock.lock(); @@ -68,7 +72,10 @@ public: { std::unique_lock<std::mutex> lock(_queueLock); - _condition.wait(lock, [this](){ return !_queue.empty(); }); + while (_queue.empty() && !_shutdown) + { + _condition.wait(lock); + } if (_queue.empty()) return; @@ -91,6 +98,8 @@ public: _queue.pop(); } + _shutdown = true; + _queueLock.unlock(); _condition.notify_all(); |