aboutsummaryrefslogtreecommitdiff
path: root/src/server/shared/Database
diff options
context:
space:
mode:
authorMachiavelli <none@none>2010-12-04 21:50:36 +0100
committerMachiavelli <none@none>2010-12-04 21:50:36 +0100
commitf5d6319d4ddd35a9be9ea886aa119465360e397e (patch)
tree5bd50f42122443fd93e81d40e07adbc2f589a21b /src/server/shared/Database
parentb8bf37264b4a947e66fe29b29eb9b60ebe3d6f3c (diff)
Core/DBLayer:
- Make the mysql connectionpool shared for async and syncrhonous connections. - Allow configurable amount of connections for the pool - Allow configurable amount of delaythreads Note that delaythreads now only represent in-core threads. Whenever they execute a task they will pick a free connection from the pool instead of using their previously unique assigned connection. The purpose of this design change is better distribution of SQL requests (no bottlenecks paired with idling) among available resources. This also prevents a ¨memory waste¨ of preparing async prepared statements on synchronous connections (that were never called) - and vice versa. Now, connections aren´t explicitly async or synchronous, they serve both purposes. Use at own risk, might cause instabilities. Don´t forget to update your config files and clear your cmake cache. --HG-- branch : trunk
Diffstat (limited to 'src/server/shared/Database')
-rwxr-xr-xsrc/server/shared/Database/DatabaseWorker.cpp67
-rwxr-xr-xsrc/server/shared/Database/DatabaseWorker.h52
-rwxr-xr-xsrc/server/shared/Database/DatabaseWorkerPool.h115
-rwxr-xr-xsrc/server/shared/Database/MySQLConnection.cpp9
-rwxr-xr-xsrc/server/shared/Database/MySQLConnection.h7
-rwxr-xr-xsrc/server/shared/Database/Transaction.h2
6 files changed, 100 insertions, 152 deletions
diff --git a/src/server/shared/Database/DatabaseWorker.cpp b/src/server/shared/Database/DatabaseWorker.cpp
deleted file mode 100755
index 3739d81210e..00000000000
--- a/src/server/shared/Database/DatabaseWorker.cpp
+++ /dev/null
@@ -1,67 +0,0 @@
-/*
- * Copyright (C) 2008-2010 TrinityCore <http://www.trinitycore.org/>
- *
- * This program is free software; you can redistribute it and/or modify it
- * under the terms of the GNU General Public License as published by the
- * Free Software Foundation; either version 2 of the License, or (at your
- * option) any later version.
- *
- * This program is distributed in the hope that it will be useful, but WITHOUT
- * ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or
- * FITNESS FOR A PARTICULAR PURPOSE. See the GNU General Public License for
- * more details.
- *
- * You should have received a copy of the GNU General Public License along
- * with this program. If not, see <http://www.gnu.org/licenses/>.
- */
-
-#include "DatabaseEnv.h"
-#include "DatabaseWorker.h"
-#include "SQLOperation.h"
-#include "MySQLConnection.h"
-#include "MySQLThreading.h"
-
-DatabaseWorker::DatabaseWorker(ACE_Activation_Queue* new_queue, MySQLConnection* con) :
-m_queue(new_queue),
-m_conn(con)
-{
- /// Assign thread to task
- activate();
-}
-
-int DatabaseWorker::svc()
-{
- if (!m_queue)
- return -1;
-
- SQLOperation *request = NULL;
- while (1)
- {
- request = (SQLOperation*)(m_queue->dequeue());
- if (!request)
- break;
-
- request->SetConnection(m_conn);
- request->call();
-
- delete request;
- }
-
- m_conn->Close();
- return 0;
-}
-
-int DatabaseWorker::activate()
-{
- /* THR_DETACHED:
- Create an asynchronous thread. The exit status of the thread would not be available to any other threads.
- The thread resources are reclaimed by the operating system whenever the thread is terminated. */
-
- /* THR_NEW_LWP:
- Create an explicit kernel-level thread (as opposed to a user-level thread). */
-
- ACE_Task_Base::activate(THR_NEW_LWP | THR_DETACHED, 1);
- return 0; //^ - Spawn one thread to handle this task.
- // However more of these tasks may be activated
- // See DatabaseWorkerPool ctor.
-} \ No newline at end of file
diff --git a/src/server/shared/Database/DatabaseWorker.h b/src/server/shared/Database/DatabaseWorker.h
index 8972555fbe1..f51d014bb8d 100755
--- a/src/server/shared/Database/DatabaseWorker.h
+++ b/src/server/shared/Database/DatabaseWorker.h
@@ -23,20 +23,66 @@
class MySQLConnection;
+template <typename T>
class DatabaseWorker : public ACE_Task_Base
{
public:
- DatabaseWorker(ACE_Activation_Queue* new_queue, MySQLConnection* con);
+ DatabaseWorker(ACE_Activation_Queue* new_queue, T* t) :
+ m_queue(new_queue),
+ m_pool(t)
+ {
+ /// Assign thread to task
+ activate();
+ }
+
///- Inherited from ACE_Task_Base
- int svc();
- int activate();
+ int svc()
+ {
+ if (!m_queue)
+ return -1;
+
+ SQLOperation *request = NULL;
+ while (1)
+ {
+ request = (SQLOperation*)(m_queue->dequeue());
+ if (!request)
+ break;
+
+ MySQLConnection* con = m_pool->GetFreeConnection();
+ request->SetConnection(con);
+ request->call();
+ con->Unlock();
+
+ delete request;
+ }
+
+ m_conn->Close();
+ return 0;
+ }
+
+ int activate()
+ {
+ /* THR_DETACHED:
+ Create an asynchronous thread. The exit status of the thread would not be available to any other threads.
+ The thread resources are reclaimed by the operating system whenever the thread is terminated. */
+
+ /* THR_NEW_LWP:
+ Create an explicit kernel-level thread (as opposed to a user-level thread). */
+
+ ACE_Task_Base::activate(THR_NEW_LWP | THR_DETACHED, 1);
+ return 0; //^ - Spawn one thread to handle this task.
+ // However more of these tasks may be activated
+ // See DatabaseWorkerPool ctor.
+ }
+
int wait() { return ACE_Task_Base::wait(); }
private:
DatabaseWorker() : ACE_Task_Base() {}
ACE_Activation_Queue* m_queue;
MySQLConnection* m_conn;
+ T* m_pool; // Databasepool we operate on
};
#endif
diff --git a/src/server/shared/Database/DatabaseWorkerPool.h b/src/server/shared/Database/DatabaseWorkerPool.h
index 9b912dd3854..5cfb8af2155 100755
--- a/src/server/shared/Database/DatabaseWorkerPool.h
+++ b/src/server/shared/Database/DatabaseWorkerPool.h
@@ -51,6 +51,8 @@ class PingOperation : public SQLOperation
template <class T>
class DatabaseWorkerPool
{
+ template <class Y> friend class DatabaseWorker;
+
private:
typedef ACE_Atomic_Op<ACE_SYNCH_MUTEX, uint32> AtomicUInt;
@@ -59,8 +61,6 @@ class DatabaseWorkerPool
m_queue(new ACE_Activation_Queue(new ACE_Message_Queue<ACE_MT_SYNCH>)),
m_connections(0)
{
- m_connections.resize(IDX_SIZE);
-
mysql_library_init(-1, NULL, NULL);
WPFatal (mysql_thread_safe(), "Used MySQL library isn't thread-safe.");
}
@@ -71,32 +71,25 @@ class DatabaseWorkerPool
mysql_library_end();
}
- bool Open(const std::string& infoString, uint8 async_threads, uint8 synch_threads)
+ bool Open(const std::string& infoString, uint8 connections, uint8 delaythreads)
{
m_connectionInfo = MySQLConnectionInfo(infoString);
- sLog.outSQLDriver("Opening databasepool '%s'. Async threads: %u, synch threads: %u", m_connectionInfo.database.c_str(), async_threads, synch_threads);
-
- /// Open asynchronous connections (delayed operations)
- m_connections[IDX_ASYNC].resize(async_threads);
- for (uint8 i = 0; i < async_threads; ++i)
- {
- T* t = new T(m_queue, m_connectionInfo);
- t->Open();
- m_connections[IDX_ASYNC][i] = t;
- ++m_connectionCount;
- }
+ sLog.outSQLDriver("Opening databasepool '%s'. Delaythreads: %u, connections: %u", m_connectionInfo.database.c_str(), delaythreads, connections);
- /// Open synchronous connections (direct, blocking operations)
- m_connections[IDX_SYNCH].resize(synch_threads);
- for (uint8 i = 0; i < synch_threads; ++i)
+ m_connections.resize(connections);
+ for (uint8 i = 0; i < connections; ++i)
{
T* t = new T(m_connectionInfo);
t->Open();
- m_connections[IDX_SYNCH][i] = t;
+ m_connections[i] = t;
++m_connectionCount;
}
+ m_delaythreads.resize(delaythreads);
+ for (uint8 i = 0; i < delaythreads; ++i)
+ m_delaythreads[i] = new DatabaseWorker<ThisClass>(m_queue, this);
+
sLog.outSQLDriver("Databasepool opened succesfuly. %u connections running.", (uint32)m_connectionCount.value());
return true;
}
@@ -107,25 +100,21 @@ class DatabaseWorkerPool
/// Shuts down delaythreads for this connection pool.
m_queue->queue()->deactivate();
- for (uint8 i = 0; i < m_connections[IDX_ASYNC].size(); ++i)
- {
- /// TODO: Better way. probably should flip a boolean and check it on low level code before doing anything on the mysql ctx
- /// Now we just wait until m_queue gives the signal to the worker threads to stop
- T* t = m_connections[IDX_ASYNC][i];
- t->m_worker->wait(); // t->Close(); is called from worker thread
- --m_connectionCount;
- }
- sLog.outSQLDriver("Asynchronous connections on databasepool '%s' terminated. Proceeding with synchronous connections.", m_connectionInfo.database.c_str());
+ for (uint8 i = 0; i < m_delaythreads.size(); ++i)
+ m_delaythreads[i]->wait();
- /// Shut down the synchronous connections
- for (uint8 i = 0; i < m_connections[IDX_SYNCH].size(); ++i)
+ /// Shut down the connections
+ for (uint8 i = 0; i < m_connections.size(); ++i)
{
- T* t = m_connections[IDX_SYNCH][i];
- //while (1)
- // if (t->LockIfReady()) -- For some reason deadlocks us
- t->Close();
- --m_connectionCount;
+ T* t = m_connections[i];
+ while (1)
+ if (t->LockIfReady())
+ {
+ t->Close();
+ --m_connectionCount;
+ break;
+ }
}
sLog.outSQLDriver("All connections on databasepool %s closed.", m_connectionInfo.database.c_str());
@@ -312,22 +301,31 @@ class DatabaseWorkerPool
void KeepAlive()
{
- /// Ping synchronous connections
- for (uint8 i = 0; i < m_connections[IDX_SYNCH].size(); ++i)
+ /// Ping connections
+ for (uint8 i = 0; i < m_connections.size(); ++i)
{
- T* t = m_connections[IDX_SYNCH][i];
+ T* t = m_connections[i];
if (t->LockIfReady())
{
t->Ping();
t->Unlock();
}
}
-
- /// Assuming all worker threads are free, every worker thread will receive 1 ping operation request
- /// If one or more worker threads are busy, the ping operations will not be split evenly, but this doesn't matter
- /// as the sole purpose is to prevent connections from idling.
- for (size_t i = 0; i < m_connections[IDX_ASYNC].size(); ++i)
- Enqueue(new PingOperation);
+ }
+
+ protected:
+ T* GetFreeConnection()
+ {
+ uint8 i = 0;
+ for (;;) /// Block forever until a connection is free
+ {
+ T* t = m_connections[++i % m_connectionCount.value()];
+ if (t->LockIfReady()) /// Must be matched with t->Unlock() or you will get deadlocks
+ return t;
+ }
+
+ // This will be called when Celine Dion learns to sing
+ return NULL;
}
private:
@@ -347,33 +345,14 @@ class DatabaseWorkerPool
m_queue->enqueue(op);
}
- T* GetFreeConnection()
- {
- uint8 i = 0;
- size_t num_cons = m_connections[IDX_SYNCH].size();
- for (;;) /// Block forever until a connection is free
- {
- T* t = m_connections[IDX_SYNCH][++i % num_cons ];
- if (t->LockIfReady()) /// Must be matched with t->Unlock() or you will get deadlocks
- return t;
- }
-
- // This will be called when Celine Dion learns to sing
- return NULL;
- }
-
private:
- enum
- {
- IDX_ASYNC,
- IDX_SYNCH,
- IDX_SIZE,
- };
-
- ACE_Activation_Queue* m_queue; //! Queue shared by async worker threads.
- std::vector< std::vector<T*> > m_connections;
- AtomicUInt m_connectionCount; //! Counter of MySQL connections;
+ typedef DatabaseWorkerPool<T> ThisClass;
+
+ ACE_Activation_Queue* m_queue; //! Queue shared by async worker threads.
+ std::vector<T*> m_connections;
+ AtomicUInt m_connectionCount; //! Counter of MySQL connections;
MySQLConnectionInfo m_connectionInfo;
+ std::vector<DatabaseWorker<ThisClass>*> m_delaythreads; //! Delaythreads (templatized)
};
#endif
diff --git a/src/server/shared/Database/MySQLConnection.cpp b/src/server/shared/Database/MySQLConnection.cpp
index 20c614b5333..de05ee311db 100755
--- a/src/server/shared/Database/MySQLConnection.cpp
+++ b/src/server/shared/Database/MySQLConnection.cpp
@@ -34,20 +34,11 @@
MySQLConnection::MySQLConnection(MySQLConnectionInfo& connInfo) :
m_queue(NULL),
-m_worker(NULL),
m_Mysql(NULL),
m_connectionInfo(connInfo)
{
}
-MySQLConnection::MySQLConnection(ACE_Activation_Queue* queue, MySQLConnectionInfo& connInfo) :
-m_queue(queue),
-m_Mysql(NULL),
-m_connectionInfo(connInfo)
-{
- m_worker = new DatabaseWorker(m_queue, this);
-}
-
MySQLConnection::~MySQLConnection()
{
ASSERT (m_Mysql); /// MySQL context must be present at this point
diff --git a/src/server/shared/Database/MySQLConnection.h b/src/server/shared/Database/MySQLConnection.h
index 56a670d07c1..a72b5b2b54b 100755
--- a/src/server/shared/Database/MySQLConnection.h
+++ b/src/server/shared/Database/MySQLConnection.h
@@ -22,7 +22,7 @@
#ifndef _MYSQLCONNECTION_H
#define _MYSQLCONNECTION_H
-class DatabaseWorker;
+template <class T> class DatabaseWorker;
class PreparedStatement;
class MySQLPreparedStatement;
class PingOperation;
@@ -56,11 +56,11 @@ struct MySQLConnectionInfo
class MySQLConnection
{
template <class T> friend class DatabaseWorkerPool;
+ template <class T> friend class DatabaseWorker;
friend class PingOperation;
public:
- MySQLConnection(MySQLConnectionInfo& connInfo); //! Constructor for synchroneous connections.
- MySQLConnection(ACE_Activation_Queue* queue, MySQLConnectionInfo& connInfo); //! Constructor for asynchroneous connections.
+ MySQLConnection(MySQLConnectionInfo& connInfo);
~MySQLConnection();
virtual bool Open();
@@ -102,7 +102,6 @@ class MySQLConnection
private:
ACE_Activation_Queue* m_queue; //! Queue shared with other asynchroneous connections.
- DatabaseWorker* m_worker; //! Core worker task.
MYSQL * m_Mysql; //! MySQL Handle.
MySQLConnectionInfo& m_connectionInfo; //! Connection info (used for logging)
ACE_Thread_Mutex m_Mutex;
diff --git a/src/server/shared/Database/Transaction.h b/src/server/shared/Database/Transaction.h
index 8bdb11541e7..e8200f9810a 100755
--- a/src/server/shared/Database/Transaction.h
+++ b/src/server/shared/Database/Transaction.h
@@ -49,7 +49,7 @@ typedef ACE_Refcounted_Auto_Ptr<Transaction, ACE_Null_Mutex> SQLTransaction;
class TransactionTask : public SQLOperation
{
template <class T> friend class DatabaseWorkerPool;
- friend class DatabaseWorker;
+ template <class T> friend class DatabaseWorker;
public:
TransactionTask(SQLTransaction trans) : m_trans(trans) {} ;