diff options
author | Machiavelli <none@none> | 2010-12-04 21:50:36 +0100 |
---|---|---|
committer | Machiavelli <none@none> | 2010-12-04 21:50:36 +0100 |
commit | f5d6319d4ddd35a9be9ea886aa119465360e397e (patch) | |
tree | 5bd50f42122443fd93e81d40e07adbc2f589a21b | |
parent | b8bf37264b4a947e66fe29b29eb9b60ebe3d6f3c (diff) |
Core/DBLayer:
- Make the mysql connectionpool shared for async and syncrhonous connections.
- Allow configurable amount of connections for the pool
- Allow configurable amount of delaythreads
Note that delaythreads now only represent in-core threads. Whenever they execute a task they will pick a free connection from the pool instead of using their previously unique assigned connection.
The purpose of this design change is better distribution of SQL requests (no bottlenecks paired with idling) among available resources.
This also prevents a ¨memory waste¨ of preparing async prepared statements on synchronous connections (that were never called) - and vice versa. Now, connections aren´t explicitly async or synchronous, they serve both purposes.
Use at own risk, might cause instabilities.
Don´t forget to update your config files and clear your cmake cache.
--HG--
branch : trunk
-rwxr-xr-x | src/server/authserver/Main.cpp | 10 | ||||
-rw-r--r-- | src/server/authserver/authserver.conf.dist | 10 | ||||
-rwxr-xr-x | src/server/shared/Database/DatabaseWorker.cpp | 67 | ||||
-rwxr-xr-x | src/server/shared/Database/DatabaseWorker.h | 52 | ||||
-rwxr-xr-x | src/server/shared/Database/DatabaseWorkerPool.h | 115 | ||||
-rwxr-xr-x | src/server/shared/Database/MySQLConnection.cpp | 9 | ||||
-rwxr-xr-x | src/server/shared/Database/MySQLConnection.h | 7 | ||||
-rwxr-xr-x | src/server/shared/Database/Transaction.h | 2 | ||||
-rwxr-xr-x | src/server/worldserver/Master.cpp | 14 | ||||
-rw-r--r-- | src/server/worldserver/worldserver.conf.dist | 21 |
10 files changed, 130 insertions, 177 deletions
diff --git a/src/server/authserver/Main.cpp b/src/server/authserver/Main.cpp index 7f7d2e39561..454cc83fc1b 100755 --- a/src/server/authserver/Main.cpp +++ b/src/server/authserver/Main.cpp @@ -344,15 +344,15 @@ bool StartDB() worker_threads = 1; } - uint8 synch_threads = sConfig.GetIntDefault("LoginDatabase.SynchThreads", 1); - if (synch_threads < 1 || synch_threads > 32) + uint8 connections = sConfig.GetIntDefault("LoginDatabase.Connections", 2); + if (connections < 1 || connections > 32) { - sLog.outError("Improper value specified for LoginDatabase.SynchThreads, defaulting to 1."); - synch_threads = 1; + sLog.outError("Improper value specified for LoginDatabase.Connections, defaulting to 2."); + connections = 2; } /// NOTE: While authserver is singlethreaded you should keep synch_threads == 1. Increasing it is just silly since only 1 will be used ever. - if (!LoginDatabase.Open(dbstring.c_str(), worker_threads, synch_threads)) + if (!LoginDatabase.Open(dbstring.c_str(), worker_threads, connections)) { sLog.outError("Cannot connect to database"); return false; diff --git a/src/server/authserver/authserver.conf.dist b/src/server/authserver/authserver.conf.dist index 9c14178a0a0..6fed0efe0ed 100644 --- a/src/server/authserver/authserver.conf.dist +++ b/src/server/authserver/authserver.conf.dist @@ -224,11 +224,17 @@ LoginDatabaseInfo = "127.0.0.1;3306;trinity;trinity;auth" # # LoginDatabase.WorkerThreads # Description: The amount of worker threads spawned to handle asynchronous (delayed) MySQL -# statements. Each worker thread is mirrored with its own connection to the -# MySQL server and their own thread on the MySQL server. +# statements. # Default: 1 - (LoginDatabase.WorkerThreads) LoginDatabase.WorkerThreads = 1 # +# LoginDatabase.Connections +# Description: The amount of MySQL connections spawned. +# Default: 1 + +LoginDatabase.Connections = 1 + +# ################################################################################################### diff --git a/src/server/shared/Database/DatabaseWorker.cpp b/src/server/shared/Database/DatabaseWorker.cpp deleted file mode 100755 index 3739d81210e..00000000000 --- a/src/server/shared/Database/DatabaseWorker.cpp +++ /dev/null @@ -1,67 +0,0 @@ -/* - * Copyright (C) 2008-2010 TrinityCore <http://www.trinitycore.org/> - * - * This program is free software; you can redistribute it and/or modify it - * under the terms of the GNU General Public License as published by the - * Free Software Foundation; either version 2 of the License, or (at your - * option) any later version. - * - * This program is distributed in the hope that it will be useful, but WITHOUT - * ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or - * FITNESS FOR A PARTICULAR PURPOSE. See the GNU General Public License for - * more details. - * - * You should have received a copy of the GNU General Public License along - * with this program. If not, see <http://www.gnu.org/licenses/>. - */ - -#include "DatabaseEnv.h" -#include "DatabaseWorker.h" -#include "SQLOperation.h" -#include "MySQLConnection.h" -#include "MySQLThreading.h" - -DatabaseWorker::DatabaseWorker(ACE_Activation_Queue* new_queue, MySQLConnection* con) : -m_queue(new_queue), -m_conn(con) -{ - /// Assign thread to task - activate(); -} - -int DatabaseWorker::svc() -{ - if (!m_queue) - return -1; - - SQLOperation *request = NULL; - while (1) - { - request = (SQLOperation*)(m_queue->dequeue()); - if (!request) - break; - - request->SetConnection(m_conn); - request->call(); - - delete request; - } - - m_conn->Close(); - return 0; -} - -int DatabaseWorker::activate() -{ - /* THR_DETACHED: - Create an asynchronous thread. The exit status of the thread would not be available to any other threads. - The thread resources are reclaimed by the operating system whenever the thread is terminated. */ - - /* THR_NEW_LWP: - Create an explicit kernel-level thread (as opposed to a user-level thread). */ - - ACE_Task_Base::activate(THR_NEW_LWP | THR_DETACHED, 1); - return 0; //^ - Spawn one thread to handle this task. - // However more of these tasks may be activated - // See DatabaseWorkerPool ctor. -}
\ No newline at end of file diff --git a/src/server/shared/Database/DatabaseWorker.h b/src/server/shared/Database/DatabaseWorker.h index 8972555fbe1..f51d014bb8d 100755 --- a/src/server/shared/Database/DatabaseWorker.h +++ b/src/server/shared/Database/DatabaseWorker.h @@ -23,20 +23,66 @@ class MySQLConnection; +template <typename T> class DatabaseWorker : public ACE_Task_Base { public: - DatabaseWorker(ACE_Activation_Queue* new_queue, MySQLConnection* con); + DatabaseWorker(ACE_Activation_Queue* new_queue, T* t) : + m_queue(new_queue), + m_pool(t) + { + /// Assign thread to task + activate(); + } + ///- Inherited from ACE_Task_Base - int svc(); - int activate(); + int svc() + { + if (!m_queue) + return -1; + + SQLOperation *request = NULL; + while (1) + { + request = (SQLOperation*)(m_queue->dequeue()); + if (!request) + break; + + MySQLConnection* con = m_pool->GetFreeConnection(); + request->SetConnection(con); + request->call(); + con->Unlock(); + + delete request; + } + + m_conn->Close(); + return 0; + } + + int activate() + { + /* THR_DETACHED: + Create an asynchronous thread. The exit status of the thread would not be available to any other threads. + The thread resources are reclaimed by the operating system whenever the thread is terminated. */ + + /* THR_NEW_LWP: + Create an explicit kernel-level thread (as opposed to a user-level thread). */ + + ACE_Task_Base::activate(THR_NEW_LWP | THR_DETACHED, 1); + return 0; //^ - Spawn one thread to handle this task. + // However more of these tasks may be activated + // See DatabaseWorkerPool ctor. + } + int wait() { return ACE_Task_Base::wait(); } private: DatabaseWorker() : ACE_Task_Base() {} ACE_Activation_Queue* m_queue; MySQLConnection* m_conn; + T* m_pool; // Databasepool we operate on }; #endif diff --git a/src/server/shared/Database/DatabaseWorkerPool.h b/src/server/shared/Database/DatabaseWorkerPool.h index 9b912dd3854..5cfb8af2155 100755 --- a/src/server/shared/Database/DatabaseWorkerPool.h +++ b/src/server/shared/Database/DatabaseWorkerPool.h @@ -51,6 +51,8 @@ class PingOperation : public SQLOperation template <class T> class DatabaseWorkerPool { + template <class Y> friend class DatabaseWorker; + private: typedef ACE_Atomic_Op<ACE_SYNCH_MUTEX, uint32> AtomicUInt; @@ -59,8 +61,6 @@ class DatabaseWorkerPool m_queue(new ACE_Activation_Queue(new ACE_Message_Queue<ACE_MT_SYNCH>)), m_connections(0) { - m_connections.resize(IDX_SIZE); - mysql_library_init(-1, NULL, NULL); WPFatal (mysql_thread_safe(), "Used MySQL library isn't thread-safe."); } @@ -71,32 +71,25 @@ class DatabaseWorkerPool mysql_library_end(); } - bool Open(const std::string& infoString, uint8 async_threads, uint8 synch_threads) + bool Open(const std::string& infoString, uint8 connections, uint8 delaythreads) { m_connectionInfo = MySQLConnectionInfo(infoString); - sLog.outSQLDriver("Opening databasepool '%s'. Async threads: %u, synch threads: %u", m_connectionInfo.database.c_str(), async_threads, synch_threads); - - /// Open asynchronous connections (delayed operations) - m_connections[IDX_ASYNC].resize(async_threads); - for (uint8 i = 0; i < async_threads; ++i) - { - T* t = new T(m_queue, m_connectionInfo); - t->Open(); - m_connections[IDX_ASYNC][i] = t; - ++m_connectionCount; - } + sLog.outSQLDriver("Opening databasepool '%s'. Delaythreads: %u, connections: %u", m_connectionInfo.database.c_str(), delaythreads, connections); - /// Open synchronous connections (direct, blocking operations) - m_connections[IDX_SYNCH].resize(synch_threads); - for (uint8 i = 0; i < synch_threads; ++i) + m_connections.resize(connections); + for (uint8 i = 0; i < connections; ++i) { T* t = new T(m_connectionInfo); t->Open(); - m_connections[IDX_SYNCH][i] = t; + m_connections[i] = t; ++m_connectionCount; } + m_delaythreads.resize(delaythreads); + for (uint8 i = 0; i < delaythreads; ++i) + m_delaythreads[i] = new DatabaseWorker<ThisClass>(m_queue, this); + sLog.outSQLDriver("Databasepool opened succesfuly. %u connections running.", (uint32)m_connectionCount.value()); return true; } @@ -107,25 +100,21 @@ class DatabaseWorkerPool /// Shuts down delaythreads for this connection pool. m_queue->queue()->deactivate(); - for (uint8 i = 0; i < m_connections[IDX_ASYNC].size(); ++i) - { - /// TODO: Better way. probably should flip a boolean and check it on low level code before doing anything on the mysql ctx - /// Now we just wait until m_queue gives the signal to the worker threads to stop - T* t = m_connections[IDX_ASYNC][i]; - t->m_worker->wait(); // t->Close(); is called from worker thread - --m_connectionCount; - } - sLog.outSQLDriver("Asynchronous connections on databasepool '%s' terminated. Proceeding with synchronous connections.", m_connectionInfo.database.c_str()); + for (uint8 i = 0; i < m_delaythreads.size(); ++i) + m_delaythreads[i]->wait(); - /// Shut down the synchronous connections - for (uint8 i = 0; i < m_connections[IDX_SYNCH].size(); ++i) + /// Shut down the connections + for (uint8 i = 0; i < m_connections.size(); ++i) { - T* t = m_connections[IDX_SYNCH][i]; - //while (1) - // if (t->LockIfReady()) -- For some reason deadlocks us - t->Close(); - --m_connectionCount; + T* t = m_connections[i]; + while (1) + if (t->LockIfReady()) + { + t->Close(); + --m_connectionCount; + break; + } } sLog.outSQLDriver("All connections on databasepool %s closed.", m_connectionInfo.database.c_str()); @@ -312,22 +301,31 @@ class DatabaseWorkerPool void KeepAlive() { - /// Ping synchronous connections - for (uint8 i = 0; i < m_connections[IDX_SYNCH].size(); ++i) + /// Ping connections + for (uint8 i = 0; i < m_connections.size(); ++i) { - T* t = m_connections[IDX_SYNCH][i]; + T* t = m_connections[i]; if (t->LockIfReady()) { t->Ping(); t->Unlock(); } } - - /// Assuming all worker threads are free, every worker thread will receive 1 ping operation request - /// If one or more worker threads are busy, the ping operations will not be split evenly, but this doesn't matter - /// as the sole purpose is to prevent connections from idling. - for (size_t i = 0; i < m_connections[IDX_ASYNC].size(); ++i) - Enqueue(new PingOperation); + } + + protected: + T* GetFreeConnection() + { + uint8 i = 0; + for (;;) /// Block forever until a connection is free + { + T* t = m_connections[++i % m_connectionCount.value()]; + if (t->LockIfReady()) /// Must be matched with t->Unlock() or you will get deadlocks + return t; + } + + // This will be called when Celine Dion learns to sing + return NULL; } private: @@ -347,33 +345,14 @@ class DatabaseWorkerPool m_queue->enqueue(op); } - T* GetFreeConnection() - { - uint8 i = 0; - size_t num_cons = m_connections[IDX_SYNCH].size(); - for (;;) /// Block forever until a connection is free - { - T* t = m_connections[IDX_SYNCH][++i % num_cons ]; - if (t->LockIfReady()) /// Must be matched with t->Unlock() or you will get deadlocks - return t; - } - - // This will be called when Celine Dion learns to sing - return NULL; - } - private: - enum - { - IDX_ASYNC, - IDX_SYNCH, - IDX_SIZE, - }; - - ACE_Activation_Queue* m_queue; //! Queue shared by async worker threads. - std::vector< std::vector<T*> > m_connections; - AtomicUInt m_connectionCount; //! Counter of MySQL connections; + typedef DatabaseWorkerPool<T> ThisClass; + + ACE_Activation_Queue* m_queue; //! Queue shared by async worker threads. + std::vector<T*> m_connections; + AtomicUInt m_connectionCount; //! Counter of MySQL connections; MySQLConnectionInfo m_connectionInfo; + std::vector<DatabaseWorker<ThisClass>*> m_delaythreads; //! Delaythreads (templatized) }; #endif diff --git a/src/server/shared/Database/MySQLConnection.cpp b/src/server/shared/Database/MySQLConnection.cpp index 20c614b5333..de05ee311db 100755 --- a/src/server/shared/Database/MySQLConnection.cpp +++ b/src/server/shared/Database/MySQLConnection.cpp @@ -34,20 +34,11 @@ MySQLConnection::MySQLConnection(MySQLConnectionInfo& connInfo) : m_queue(NULL), -m_worker(NULL), m_Mysql(NULL), m_connectionInfo(connInfo) { } -MySQLConnection::MySQLConnection(ACE_Activation_Queue* queue, MySQLConnectionInfo& connInfo) : -m_queue(queue), -m_Mysql(NULL), -m_connectionInfo(connInfo) -{ - m_worker = new DatabaseWorker(m_queue, this); -} - MySQLConnection::~MySQLConnection() { ASSERT (m_Mysql); /// MySQL context must be present at this point diff --git a/src/server/shared/Database/MySQLConnection.h b/src/server/shared/Database/MySQLConnection.h index 56a670d07c1..a72b5b2b54b 100755 --- a/src/server/shared/Database/MySQLConnection.h +++ b/src/server/shared/Database/MySQLConnection.h @@ -22,7 +22,7 @@ #ifndef _MYSQLCONNECTION_H #define _MYSQLCONNECTION_H -class DatabaseWorker; +template <class T> class DatabaseWorker; class PreparedStatement; class MySQLPreparedStatement; class PingOperation; @@ -56,11 +56,11 @@ struct MySQLConnectionInfo class MySQLConnection { template <class T> friend class DatabaseWorkerPool; + template <class T> friend class DatabaseWorker; friend class PingOperation; public: - MySQLConnection(MySQLConnectionInfo& connInfo); //! Constructor for synchroneous connections. - MySQLConnection(ACE_Activation_Queue* queue, MySQLConnectionInfo& connInfo); //! Constructor for asynchroneous connections. + MySQLConnection(MySQLConnectionInfo& connInfo); ~MySQLConnection(); virtual bool Open(); @@ -102,7 +102,6 @@ class MySQLConnection private: ACE_Activation_Queue* m_queue; //! Queue shared with other asynchroneous connections. - DatabaseWorker* m_worker; //! Core worker task. MYSQL * m_Mysql; //! MySQL Handle. MySQLConnectionInfo& m_connectionInfo; //! Connection info (used for logging) ACE_Thread_Mutex m_Mutex; diff --git a/src/server/shared/Database/Transaction.h b/src/server/shared/Database/Transaction.h index 8bdb11541e7..e8200f9810a 100755 --- a/src/server/shared/Database/Transaction.h +++ b/src/server/shared/Database/Transaction.h @@ -49,7 +49,7 @@ typedef ACE_Refcounted_Auto_Ptr<Transaction, ACE_Null_Mutex> SQLTransaction; class TransactionTask : public SQLOperation { template <class T> friend class DatabaseWorkerPool; - friend class DatabaseWorker; + template <class T> friend class DatabaseWorker; public: TransactionTask(SQLTransaction trans) : m_trans(trans) {} ; diff --git a/src/server/worldserver/Master.cpp b/src/server/worldserver/Master.cpp index ecfbdbc7d8e..4dede995597 100755 --- a/src/server/worldserver/Master.cpp +++ b/src/server/worldserver/Master.cpp @@ -419,7 +419,7 @@ bool Master::_StartDB() { sLog.SetLogDB(false); std::string dbstring; - uint8 async_threads, synch_threads; + uint8 async_threads, connections; dbstring = sConfig.GetStringDefault("WorldDatabaseInfo", ""); if (dbstring.empty()) @@ -436,10 +436,10 @@ bool Master::_StartDB() return false; } - synch_threads = sConfig.GetIntDefault("WorldDatabase.SynchThreads", 1); + connections = sConfig.GetIntDefault("WorldDatabase.Connections", 2); ///- Initialise the world database - if (!WorldDatabase.Open(dbstring, async_threads, synch_threads)) + if (!WorldDatabase.Open(dbstring, async_threads, connections)) { sLog.outError("Cannot connect to world database %s", dbstring.c_str()); return false; @@ -460,10 +460,10 @@ bool Master::_StartDB() return false; } - synch_threads = sConfig.GetIntDefault("CharacterDatabase.SynchThreads", 2); + connections = sConfig.GetIntDefault("CharacterDatabase.Connections", 2); ///- Initialise the Character database - if (!CharacterDatabase.Open(dbstring, async_threads, synch_threads)) + if (!CharacterDatabase.Open(dbstring, async_threads, connections)) { sLog.outError("Cannot connect to Character database %s", dbstring.c_str()); return false; @@ -485,10 +485,10 @@ bool Master::_StartDB() return false; } - synch_threads = sConfig.GetIntDefault("LoginDatabase.SynchThreads", 1); + connections = sConfig.GetIntDefault("LoginDatabase.Connections", 2); ///- Initialise the login database - if (!LoginDatabase.Open(dbstring, async_threads, synch_threads)) + if (!LoginDatabase.Open(dbstring, async_threads, connections)) { sLog.outError("Cannot connect to login database %s", dbstring.c_str()); return false; diff --git a/src/server/worldserver/worldserver.conf.dist b/src/server/worldserver/worldserver.conf.dist index a74fd7c0b71..6815f27d00b 100644 --- a/src/server/worldserver/worldserver.conf.dist +++ b/src/server/worldserver/worldserver.conf.dist @@ -94,8 +94,7 @@ CharacterDatabaseInfo = "127.0.0.1;3306;trinity;trinity;characters" # WorldDatabase.WorkerThreads # CharacterDatabase.WorkerThreads # Description: The amount of worker threads spawned to handle asynchronous (delayed) MySQL -# statements. Each worker thread is mirrored with its own connection to the -# MySQL server and their own thread on the MySQL server. +# statements. # Default: 1 - (LoginDatabase.WorkerThreads) # 1 - (WorldDatabase.WorkerThreads) # 1 - (CharacterDatabase.WorkerThreads) @@ -105,17 +104,17 @@ WorldDatabase.WorkerThreads = 1 CharacterDatabase.WorkerThreads = 1 # -# LoginDatabase.SynchThreads -# WorldDatabase.SynchThreads -# CharacterDatabase.SynchThreads -# Description: The amount of MySQL connections spawned to handle. -# Default: 1 - (LoginDatabase.WorkerThreads) -# 1 - (WorldDatabase.WorkerThreads) +# LoginDatabase.Connections +# WorldDatabase.Connections +# CharacterDatabase.Connections +# Description: The amount of MySQL connections spawned. +# Default: 1 - (LoginDatabase.Connections) +# 1 - (WorldDatabase.Connections) # 2 - (CharacterDatabase.WorkerThreads) -LoginDatabase.SynchThreads = 1 -WorldDatabase.SynchThreads = 1 -CharacterDatabase.SynchThreads = 2 +LoginDatabase.Connections = 2 +WorldDatabase.Connections = 2 +CharacterDatabase.Connections = 2 # # MaxPingTime |