aboutsummaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorleak <leak@bitmx.net>2014-06-30 18:37:23 +0200
committerleak <leak@bitmx.net>2014-06-30 18:37:23 +0200
commitd39a013b6b979a5158bf86c37a197cb902b2c2f9 (patch)
treed39b71809197052500bdbc7399276aa15dd96ee8
parent9588c1d92b20573c2bd214b44a15e41fd8cf35b4 (diff)
Replaced ACE_Task_Base based DatabaseWorker with PCQ
Note: Not exactly sure how shutdown should be handled, currently the queue clears itself out before shutting down This might need to be changed if the queue should be fully processed before being deleted
-rw-r--r--src/server/shared/Database/DatabaseWorker.cpp44
-rw-r--r--src/server/shared/Database/DatabaseWorker.h21
-rw-r--r--src/server/shared/Database/DatabaseWorkerPool.h20
-rw-r--r--src/server/shared/Database/Implementation/CharacterDatabase.h2
-rw-r--r--src/server/shared/Database/Implementation/LoginDatabase.h2
-rw-r--r--src/server/shared/Database/Implementation/WorldDatabase.h2
-rw-r--r--src/server/shared/Database/MySQLConnection.cpp13
-rw-r--r--src/server/shared/Database/MySQLConnection.h5
-rw-r--r--src/server/shared/Database/SQLOperation.h5
-rw-r--r--src/server/shared/Threading/ProducerConsumerQueue.h11
10 files changed, 68 insertions, 57 deletions
diff --git a/src/server/shared/Database/DatabaseWorker.cpp b/src/server/shared/Database/DatabaseWorker.cpp
index 3581f8e0211..3944c008652 100644
--- a/src/server/shared/Database/DatabaseWorker.cpp
+++ b/src/server/shared/Database/DatabaseWorker.cpp
@@ -20,32 +20,42 @@
#include "SQLOperation.h"
#include "MySQLConnection.h"
#include "MySQLThreading.h"
+#include "ProducerConsumerQueue.h"
-DatabaseWorker::DatabaseWorker(ACE_Activation_Queue* new_queue, MySQLConnection* con) :
-m_queue(new_queue),
-m_conn(con)
+DatabaseWorker::DatabaseWorker(ProducerConsumerQueue<SQLOperation*>* newQueue, MySQLConnection* connection)
{
- /// Assign thread to task
- activate();
+ _connection = connection;
+ _queue = newQueue;
+ _cancelationToken = false;
+ _workerThread = std::thread(&DatabaseWorker::WorkerThread, this);
}
-int DatabaseWorker::svc()
+DatabaseWorker::~DatabaseWorker()
{
- if (!m_queue)
- return -1;
+ _cancelationToken = true;
+
+ _queue->Cancel();
+
+ _workerThread.join();
+}
+
+void DatabaseWorker::WorkerThread()
+{
+ if (!_queue)
+ return;
- SQLOperation *request = NULL;
while (1)
{
- request = (SQLOperation*)(m_queue->dequeue());
- if (!request)
- break;
+ SQLOperation* operation = nullptr;
- request->SetConnection(m_conn);
- request->call();
+ _queue->WaitAndPop(operation);
- delete request;
- }
+ if (_cancelationToken)
+ return;
- return 0;
+ operation->SetConnection(_connection);
+ operation->call();
+
+ delete operation;
+ }
}
diff --git a/src/server/shared/Database/DatabaseWorker.h b/src/server/shared/Database/DatabaseWorker.h
index dc883dd3428..9b45318e39f 100644
--- a/src/server/shared/Database/DatabaseWorker.h
+++ b/src/server/shared/Database/DatabaseWorker.h
@@ -19,23 +19,24 @@
#define _WORKERTHREAD_H
#include "Define.h"
-#include <ace/Task.h>
-#include <ace/Activation_Queue.h>
+#include "ProducerConsumerQueue.h"
class MySQLConnection;
-class DatabaseWorker : protected ACE_Task_Base
+class DatabaseWorker
{
public:
- DatabaseWorker(ACE_Activation_Queue* new_queue, MySQLConnection* con);
-
- ///- Inherited from ACE_Task_Base
- int svc();
- int wait() { return ACE_Task_Base::wait(); }
+ DatabaseWorker(ProducerConsumerQueue<SQLOperation*>* newQueue, MySQLConnection* connection);
+ ~DatabaseWorker();
private:
- ACE_Activation_Queue* m_queue;
- MySQLConnection* m_conn;
+ ProducerConsumerQueue<SQLOperation*>* _queue;
+ MySQLConnection* _connection;
+
+ void WorkerThread();
+ std::thread _workerThread;
+
+ std::atomic_bool _cancelationToken;
DatabaseWorker(DatabaseWorker const& right) = delete;
DatabaseWorker& operator=(DatabaseWorker const& right) = delete;
diff --git a/src/server/shared/Database/DatabaseWorkerPool.h b/src/server/shared/Database/DatabaseWorkerPool.h
index 078b97762b5..e56dcc329cd 100644
--- a/src/server/shared/Database/DatabaseWorkerPool.h
+++ b/src/server/shared/Database/DatabaseWorkerPool.h
@@ -52,7 +52,7 @@ class DatabaseWorkerPool
DatabaseWorkerPool() : _connectionInfo(NULL)
{
_messageQueue = new ACE_Message_Queue<ACE_SYNCH>(8 * 1024 * 1024, 8 * 1024 * 1024);
- _queue = new ACE_Activation_Queue(_messageQueue);
+ _queue = new ProducerConsumerQueue<SQLOperation*>();
memset(_connectionCount, 0, sizeof(_connectionCount));
_connections.resize(IDX_SIZE);
@@ -107,16 +107,10 @@ class DatabaseWorkerPool
{
TC_LOG_INFO("sql.driver", "Closing down DatabasePool '%s'.", GetDatabaseName());
- //! Shuts down delaythreads for this connection pool by underlying deactivate().
- //! The next dequeue attempt in the worker thread tasks will result in an error,
- //! ultimately ending the worker thread task.
- _queue->queue()->close();
-
for (uint8 i = 0; i < _connectionCount[IDX_ASYNC]; ++i)
{
T* t = _connections[IDX_ASYNC][i];
DatabaseWorker* worker = t->m_worker;
- worker->wait(); //! Block until no more threads are running this task.
delete worker;
t->Close(); //! Closes the actualy MySQL connection.
}
@@ -488,7 +482,7 @@ class DatabaseWorkerPool
void Enqueue(SQLOperation* op)
{
- _queue->enqueue(op);
+ _queue->Push(op);
}
//! Gets a free connection in the synchronous connection pool.
@@ -523,11 +517,11 @@ class DatabaseWorkerPool
IDX_SIZE
};
- ACE_Message_Queue<ACE_SYNCH>* _messageQueue; //! Message Queue used by ACE_Activation_Queue
- ACE_Activation_Queue* _queue; //! Queue shared by async worker threads.
- std::vector< std::vector<T*> > _connections;
- uint32 _connectionCount[2]; //! Counter of MySQL connections;
- MySQLConnectionInfo* _connectionInfo;
+ ACE_Message_Queue<ACE_SYNCH>* _messageQueue; //! Message Queue used by ACE_Activation_Queue
+ ProducerConsumerQueue<SQLOperation*>* _queue; //! Queue shared by async worker threads.
+ std::vector< std::vector<T*> > _connections;
+ uint32 _connectionCount[2]; //! Counter of MySQL connections;
+ MySQLConnectionInfo* _connectionInfo;
};
#endif
diff --git a/src/server/shared/Database/Implementation/CharacterDatabase.h b/src/server/shared/Database/Implementation/CharacterDatabase.h
index 98d7fe231f1..61167681b0b 100644
--- a/src/server/shared/Database/Implementation/CharacterDatabase.h
+++ b/src/server/shared/Database/Implementation/CharacterDatabase.h
@@ -26,7 +26,7 @@ class CharacterDatabaseConnection : public MySQLConnection
public:
//- Constructors for sync and async connections
CharacterDatabaseConnection(MySQLConnectionInfo& connInfo) : MySQLConnection(connInfo) { }
- CharacterDatabaseConnection(ACE_Activation_Queue* q, MySQLConnectionInfo& connInfo) : MySQLConnection(q, connInfo) { }
+ CharacterDatabaseConnection(ProducerConsumerQueue<SQLOperation*>* q, MySQLConnectionInfo& connInfo) : MySQLConnection(q, connInfo) { }
//- Loads database type specific prepared statements
void DoPrepareStatements();
diff --git a/src/server/shared/Database/Implementation/LoginDatabase.h b/src/server/shared/Database/Implementation/LoginDatabase.h
index 604e9d39551..7fa2ff49324 100644
--- a/src/server/shared/Database/Implementation/LoginDatabase.h
+++ b/src/server/shared/Database/Implementation/LoginDatabase.h
@@ -26,7 +26,7 @@ class LoginDatabaseConnection : public MySQLConnection
public:
//- Constructors for sync and async connections
LoginDatabaseConnection(MySQLConnectionInfo& connInfo) : MySQLConnection(connInfo) { }
- LoginDatabaseConnection(ACE_Activation_Queue* q, MySQLConnectionInfo& connInfo) : MySQLConnection(q, connInfo) { }
+ LoginDatabaseConnection(ProducerConsumerQueue<SQLOperation*>* q, MySQLConnectionInfo& connInfo) : MySQLConnection(q, connInfo) { }
//- Loads database type specific prepared statements
void DoPrepareStatements();
diff --git a/src/server/shared/Database/Implementation/WorldDatabase.h b/src/server/shared/Database/Implementation/WorldDatabase.h
index a815373a1c6..c8c38d8a629 100644
--- a/src/server/shared/Database/Implementation/WorldDatabase.h
+++ b/src/server/shared/Database/Implementation/WorldDatabase.h
@@ -26,7 +26,7 @@ class WorldDatabaseConnection : public MySQLConnection
public:
//- Constructors for sync and async connections
WorldDatabaseConnection(MySQLConnectionInfo& connInfo) : MySQLConnection(connInfo) { }
- WorldDatabaseConnection(ACE_Activation_Queue* q, MySQLConnectionInfo& connInfo) : MySQLConnection(q, connInfo) { }
+ WorldDatabaseConnection(ProducerConsumerQueue<SQLOperation*>* q, MySQLConnectionInfo& connInfo) : MySQLConnection(q, connInfo) { }
//- Loads database type specific prepared statements
void DoPrepareStatements();
diff --git a/src/server/shared/Database/MySQLConnection.cpp b/src/server/shared/Database/MySQLConnection.cpp
index 0d2d97f7f28..8b24f508331 100644
--- a/src/server/shared/Database/MySQLConnection.cpp
+++ b/src/server/shared/Database/MySQLConnection.cpp
@@ -33,8 +33,7 @@
#include "DatabaseWorker.h"
#include "Timer.h"
#include "Log.h"
-
-#include <ace/OS_NS_unistd.h>
+#include "ProducerConsumerQueue.h"
MySQLConnection::MySQLConnection(MySQLConnectionInfo& connInfo) :
m_reconnecting(false),
@@ -45,7 +44,7 @@ m_Mysql(NULL),
m_connectionInfo(connInfo),
m_connectionFlags(CONNECTION_SYNCH) { }
-MySQLConnection::MySQLConnection(ACE_Activation_Queue* queue, MySQLConnectionInfo& connInfo) :
+MySQLConnection::MySQLConnection(ProducerConsumerQueue<SQLOperation*>* queue, MySQLConnectionInfo& connInfo) :
m_reconnecting(false),
m_prepareError(false),
m_queue(queue),
@@ -502,8 +501,8 @@ bool MySQLConnection::_HandleMySQLErrno(uint32 errNo)
}
uint32 lErrno = mysql_errno(GetHandle()); // It's possible this attempted reconnect throws 2006 at us. To prevent crazy recursive calls, sleep here.
- ACE_OS::sleep(3); // Sleep 3 seconds
- return _HandleMySQLErrno(lErrno); // Call self (recursive)
+ std::this_thread::sleep_for(std::chrono::seconds(3)); // Sleep 3 seconds
+ return _HandleMySQLErrno(lErrno); // Call self (recursive)
}
case ER_LOCK_DEADLOCK:
@@ -517,12 +516,12 @@ bool MySQLConnection::_HandleMySQLErrno(uint32 errNo)
case ER_BAD_FIELD_ERROR:
case ER_NO_SUCH_TABLE:
TC_LOG_ERROR("sql.sql", "Your database structure is not up to date. Please make sure you've executed all queries in the sql/updates folders.");
- ACE_OS::sleep(10);
+ std::this_thread::sleep_for(std::chrono::seconds(10));
std::abort();
return false;
case ER_PARSE_ERROR:
TC_LOG_ERROR("sql.sql", "Error while parsing SQL. Core fix required.");
- ACE_OS::sleep(10);
+ std::this_thread::sleep_for(std::chrono::seconds(10));
std::abort();
return false;
default:
diff --git a/src/server/shared/Database/MySQLConnection.h b/src/server/shared/Database/MySQLConnection.h
index 512df7c16c7..3b7efeb5846 100644
--- a/src/server/shared/Database/MySQLConnection.h
+++ b/src/server/shared/Database/MySQLConnection.h
@@ -20,6 +20,7 @@
#include "DatabaseWorkerPool.h"
#include "Transaction.h"
#include "Util.h"
+#include "ProducerConsumerQueue.h"
#ifndef _MYSQLCONNECTION_H
#define _MYSQLCONNECTION_H
@@ -70,7 +71,7 @@ class MySQLConnection
public:
MySQLConnection(MySQLConnectionInfo& connInfo); //! Constructor for synchronous connections.
- MySQLConnection(ACE_Activation_Queue* queue, MySQLConnectionInfo& connInfo); //! Constructor for asynchronous connections.
+ MySQLConnection(ProducerConsumerQueue<SQLOperation*>* queue, MySQLConnectionInfo& connInfo); //! Constructor for asynchronous connections.
virtual ~MySQLConnection();
virtual bool Open();
@@ -125,7 +126,7 @@ class MySQLConnection
bool _HandleMySQLErrno(uint32 errNo);
private:
- ACE_Activation_Queue* m_queue; //! Queue shared with other asynchronous connections.
+ ProducerConsumerQueue<SQLOperation*>* m_queue; //! Queue shared with other asynchronous connections.
DatabaseWorker* m_worker; //! Core worker task.
MYSQL * m_Mysql; //! MySQL Handle.
MySQLConnectionInfo& m_connectionInfo; //! Connection info (used for logging)
diff --git a/src/server/shared/Database/SQLOperation.h b/src/server/shared/Database/SQLOperation.h
index 6f933a051e3..64fc64e2c2e 100644
--- a/src/server/shared/Database/SQLOperation.h
+++ b/src/server/shared/Database/SQLOperation.h
@@ -18,9 +18,6 @@
#ifndef _SQLOPERATION_H
#define _SQLOPERATION_H
-#include <ace/Method_Request.h>
-#include <ace/Activation_Queue.h>
-
#include "QueryResult.h"
//- Forward declare (don't include header to prevent circular includes)
@@ -56,7 +53,7 @@ union SQLResultSetUnion
class MySQLConnection;
-class SQLOperation : public ACE_Method_Request
+class SQLOperation
{
public:
SQLOperation(): m_conn(NULL) { }
diff --git a/src/server/shared/Threading/ProducerConsumerQueue.h b/src/server/shared/Threading/ProducerConsumerQueue.h
index 961cb9f9c82..41bff445c2e 100644
--- a/src/server/shared/Threading/ProducerConsumerQueue.h
+++ b/src/server/shared/Threading/ProducerConsumerQueue.h
@@ -21,6 +21,7 @@
#include <condition_variable>
#include <mutex>
#include <queue>
+#include <atomic>
template <typename T>
class ProducerConsumerQueue
@@ -29,9 +30,12 @@ private:
std::mutex _queueLock;
std::queue<T> _queue;
std::condition_variable _condition;
+ std::atomic<bool> _shutdown;
public:
+ ProducerConsumerQueue<T>() : _shutdown(false) { }
+
void Push(const T& value)
{
_queueLock.lock();
@@ -68,7 +72,10 @@ public:
{
std::unique_lock<std::mutex> lock(_queueLock);
- _condition.wait(lock, [this](){ return !_queue.empty(); });
+ while (_queue.empty() && !_shutdown)
+ {
+ _condition.wait(lock);
+ }
if (_queue.empty())
return;
@@ -91,6 +98,8 @@ public:
_queue.pop();
}
+ _shutdown = true;
+
_queueLock.unlock();
_condition.notify_all();