Core/DBLayer:

- Make the mysql connectionpool shared for async and syncrhonous connections.
- Allow configurable amount of connections for the pool
- Allow configurable amount of delaythreads
Note that delaythreads now only represent in-core threads. Whenever they execute a task they will pick a free connection from the pool instead of using their previously unique assigned connection.
The purpose of this design change is better distribution of SQL requests (no bottlenecks paired with idling) among available resources.
This also prevents a ¨memory waste¨ of preparing async prepared statements on synchronous connections (that were never called) - and vice versa. Now, connections aren´t explicitly async or synchronous, they serve both purposes.

Use at own risk, might cause instabilities.
Don´t forget to update your config files and clear your cmake cache.

--HG--
branch : trunk
This commit is contained in:
Machiavelli
2010-12-04 21:50:36 +01:00
parent b8bf37264b
commit f5d6319d4d
10 changed files with 129 additions and 176 deletions

View File

@@ -344,15 +344,15 @@ bool StartDB()
worker_threads = 1;
}
uint8 synch_threads = sConfig.GetIntDefault("LoginDatabase.SynchThreads", 1);
if (synch_threads < 1 || synch_threads > 32)
uint8 connections = sConfig.GetIntDefault("LoginDatabase.Connections", 2);
if (connections < 1 || connections > 32)
{
sLog.outError("Improper value specified for LoginDatabase.SynchThreads, defaulting to 1.");
synch_threads = 1;
sLog.outError("Improper value specified for LoginDatabase.Connections, defaulting to 2.");
connections = 2;
}
/// NOTE: While authserver is singlethreaded you should keep synch_threads == 1. Increasing it is just silly since only 1 will be used ever.
if (!LoginDatabase.Open(dbstring.c_str(), worker_threads, synch_threads))
if (!LoginDatabase.Open(dbstring.c_str(), worker_threads, connections))
{
sLog.outError("Cannot connect to database");
return false;

View File

@@ -224,11 +224,17 @@ LoginDatabaseInfo = "127.0.0.1;3306;trinity;trinity;auth"
#
# LoginDatabase.WorkerThreads
# Description: The amount of worker threads spawned to handle asynchronous (delayed) MySQL
# statements. Each worker thread is mirrored with its own connection to the
# MySQL server and their own thread on the MySQL server.
# statements.
# Default: 1 - (LoginDatabase.WorkerThreads)
LoginDatabase.WorkerThreads = 1
#
# LoginDatabase.Connections
# Description: The amount of MySQL connections spawned.
# Default: 1
LoginDatabase.Connections = 1
#
###################################################################################################

View File

@@ -1,67 +0,0 @@
/*
* Copyright (C) 2008-2010 TrinityCore <http://www.trinitycore.org/>
*
* This program is free software; you can redistribute it and/or modify it
* under the terms of the GNU General Public License as published by the
* Free Software Foundation; either version 2 of the License, or (at your
* option) any later version.
*
* This program is distributed in the hope that it will be useful, but WITHOUT
* ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or
* FITNESS FOR A PARTICULAR PURPOSE. See the GNU General Public License for
* more details.
*
* You should have received a copy of the GNU General Public License along
* with this program. If not, see <http://www.gnu.org/licenses/>.
*/
#include "DatabaseEnv.h"
#include "DatabaseWorker.h"
#include "SQLOperation.h"
#include "MySQLConnection.h"
#include "MySQLThreading.h"
DatabaseWorker::DatabaseWorker(ACE_Activation_Queue* new_queue, MySQLConnection* con) :
m_queue(new_queue),
m_conn(con)
{
/// Assign thread to task
activate();
}
int DatabaseWorker::svc()
{
if (!m_queue)
return -1;
SQLOperation *request = NULL;
while (1)
{
request = (SQLOperation*)(m_queue->dequeue());
if (!request)
break;
request->SetConnection(m_conn);
request->call();
delete request;
}
m_conn->Close();
return 0;
}
int DatabaseWorker::activate()
{
/* THR_DETACHED:
Create an asynchronous thread. The exit status of the thread would not be available to any other threads.
The thread resources are reclaimed by the operating system whenever the thread is terminated. */
/* THR_NEW_LWP:
Create an explicit kernel-level thread (as opposed to a user-level thread). */
ACE_Task_Base::activate(THR_NEW_LWP | THR_DETACHED, 1);
return 0; //^ - Spawn one thread to handle this task.
// However more of these tasks may be activated
// See DatabaseWorkerPool ctor.
}

View File

@@ -23,20 +23,66 @@
class MySQLConnection;
template <typename T>
class DatabaseWorker : public ACE_Task_Base
{
public:
DatabaseWorker(ACE_Activation_Queue* new_queue, MySQLConnection* con);
DatabaseWorker(ACE_Activation_Queue* new_queue, T* t) :
m_queue(new_queue),
m_pool(t)
{
/// Assign thread to task
activate();
}
///- Inherited from ACE_Task_Base
int svc();
int activate();
int svc()
{
if (!m_queue)
return -1;
SQLOperation *request = NULL;
while (1)
{
request = (SQLOperation*)(m_queue->dequeue());
if (!request)
break;
MySQLConnection* con = m_pool->GetFreeConnection();
request->SetConnection(con);
request->call();
con->Unlock();
delete request;
}
m_conn->Close();
return 0;
}
int activate()
{
/* THR_DETACHED:
Create an asynchronous thread. The exit status of the thread would not be available to any other threads.
The thread resources are reclaimed by the operating system whenever the thread is terminated. */
/* THR_NEW_LWP:
Create an explicit kernel-level thread (as opposed to a user-level thread). */
ACE_Task_Base::activate(THR_NEW_LWP | THR_DETACHED, 1);
return 0; //^ - Spawn one thread to handle this task.
// However more of these tasks may be activated
// See DatabaseWorkerPool ctor.
}
int wait() { return ACE_Task_Base::wait(); }
private:
DatabaseWorker() : ACE_Task_Base() {}
ACE_Activation_Queue* m_queue;
MySQLConnection* m_conn;
T* m_pool; // Databasepool we operate on
};
#endif

View File

@@ -51,6 +51,8 @@ class PingOperation : public SQLOperation
template <class T>
class DatabaseWorkerPool
{
template <class Y> friend class DatabaseWorker;
private:
typedef ACE_Atomic_Op<ACE_SYNCH_MUTEX, uint32> AtomicUInt;
@@ -59,8 +61,6 @@ class DatabaseWorkerPool
m_queue(new ACE_Activation_Queue(new ACE_Message_Queue<ACE_MT_SYNCH>)),
m_connections(0)
{
m_connections.resize(IDX_SIZE);
mysql_library_init(-1, NULL, NULL);
WPFatal (mysql_thread_safe(), "Used MySQL library isn't thread-safe.");
}
@@ -71,32 +71,25 @@ class DatabaseWorkerPool
mysql_library_end();
}
bool Open(const std::string& infoString, uint8 async_threads, uint8 synch_threads)
bool Open(const std::string& infoString, uint8 connections, uint8 delaythreads)
{
m_connectionInfo = MySQLConnectionInfo(infoString);
sLog.outSQLDriver("Opening databasepool '%s'. Async threads: %u, synch threads: %u", m_connectionInfo.database.c_str(), async_threads, synch_threads);
sLog.outSQLDriver("Opening databasepool '%s'. Delaythreads: %u, connections: %u", m_connectionInfo.database.c_str(), delaythreads, connections);
/// Open asynchronous connections (delayed operations)
m_connections[IDX_ASYNC].resize(async_threads);
for (uint8 i = 0; i < async_threads; ++i)
{
T* t = new T(m_queue, m_connectionInfo);
t->Open();
m_connections[IDX_ASYNC][i] = t;
++m_connectionCount;
}
/// Open synchronous connections (direct, blocking operations)
m_connections[IDX_SYNCH].resize(synch_threads);
for (uint8 i = 0; i < synch_threads; ++i)
m_connections.resize(connections);
for (uint8 i = 0; i < connections; ++i)
{
T* t = new T(m_connectionInfo);
t->Open();
m_connections[IDX_SYNCH][i] = t;
m_connections[i] = t;
++m_connectionCount;
}
m_delaythreads.resize(delaythreads);
for (uint8 i = 0; i < delaythreads; ++i)
m_delaythreads[i] = new DatabaseWorker<ThisClass>(m_queue, this);
sLog.outSQLDriver("Databasepool opened succesfuly. %u connections running.", (uint32)m_connectionCount.value());
return true;
}
@@ -107,25 +100,21 @@ class DatabaseWorkerPool
/// Shuts down delaythreads for this connection pool.
m_queue->queue()->deactivate();
for (uint8 i = 0; i < m_connections[IDX_ASYNC].size(); ++i)
{
/// TODO: Better way. probably should flip a boolean and check it on low level code before doing anything on the mysql ctx
/// Now we just wait until m_queue gives the signal to the worker threads to stop
T* t = m_connections[IDX_ASYNC][i];
t->m_worker->wait(); // t->Close(); is called from worker thread
--m_connectionCount;
}
sLog.outSQLDriver("Asynchronous connections on databasepool '%s' terminated. Proceeding with synchronous connections.", m_connectionInfo.database.c_str());
for (uint8 i = 0; i < m_delaythreads.size(); ++i)
m_delaythreads[i]->wait();
/// Shut down the synchronous connections
for (uint8 i = 0; i < m_connections[IDX_SYNCH].size(); ++i)
/// Shut down the connections
for (uint8 i = 0; i < m_connections.size(); ++i)
{
T* t = m_connections[IDX_SYNCH][i];
//while (1)
// if (t->LockIfReady()) -- For some reason deadlocks us
t->Close();
--m_connectionCount;
T* t = m_connections[i];
while (1)
if (t->LockIfReady())
{
t->Close();
--m_connectionCount;
break;
}
}
sLog.outSQLDriver("All connections on databasepool %s closed.", m_connectionInfo.database.c_str());
@@ -312,22 +301,31 @@ class DatabaseWorkerPool
void KeepAlive()
{
/// Ping synchronous connections
for (uint8 i = 0; i < m_connections[IDX_SYNCH].size(); ++i)
/// Ping connections
for (uint8 i = 0; i < m_connections.size(); ++i)
{
T* t = m_connections[IDX_SYNCH][i];
T* t = m_connections[i];
if (t->LockIfReady())
{
t->Ping();
t->Unlock();
}
}
/// Assuming all worker threads are free, every worker thread will receive 1 ping operation request
/// If one or more worker threads are busy, the ping operations will not be split evenly, but this doesn't matter
/// as the sole purpose is to prevent connections from idling.
for (size_t i = 0; i < m_connections[IDX_ASYNC].size(); ++i)
Enqueue(new PingOperation);
}
protected:
T* GetFreeConnection()
{
uint8 i = 0;
for (;;) /// Block forever until a connection is free
{
T* t = m_connections[++i % m_connectionCount.value()];
if (t->LockIfReady()) /// Must be matched with t->Unlock() or you will get deadlocks
return t;
}
// This will be called when Celine Dion learns to sing
return NULL;
}
private:
@@ -347,33 +345,14 @@ class DatabaseWorkerPool
m_queue->enqueue(op);
}
T* GetFreeConnection()
{
uint8 i = 0;
size_t num_cons = m_connections[IDX_SYNCH].size();
for (;;) /// Block forever until a connection is free
{
T* t = m_connections[IDX_SYNCH][++i % num_cons ];
if (t->LockIfReady()) /// Must be matched with t->Unlock() or you will get deadlocks
return t;
}
// This will be called when Celine Dion learns to sing
return NULL;
}
private:
enum
{
IDX_ASYNC,
IDX_SYNCH,
IDX_SIZE,
};
typedef DatabaseWorkerPool<T> ThisClass;
ACE_Activation_Queue* m_queue; //! Queue shared by async worker threads.
std::vector< std::vector<T*> > m_connections;
AtomicUInt m_connectionCount; //! Counter of MySQL connections;
ACE_Activation_Queue* m_queue; //! Queue shared by async worker threads.
std::vector<T*> m_connections;
AtomicUInt m_connectionCount; //! Counter of MySQL connections;
MySQLConnectionInfo m_connectionInfo;
std::vector<DatabaseWorker<ThisClass>*> m_delaythreads; //! Delaythreads (templatized)
};
#endif

View File

@@ -34,20 +34,11 @@
MySQLConnection::MySQLConnection(MySQLConnectionInfo& connInfo) :
m_queue(NULL),
m_worker(NULL),
m_Mysql(NULL),
m_connectionInfo(connInfo)
{
}
MySQLConnection::MySQLConnection(ACE_Activation_Queue* queue, MySQLConnectionInfo& connInfo) :
m_queue(queue),
m_Mysql(NULL),
m_connectionInfo(connInfo)
{
m_worker = new DatabaseWorker(m_queue, this);
}
MySQLConnection::~MySQLConnection()
{
ASSERT (m_Mysql); /// MySQL context must be present at this point

View File

@@ -22,7 +22,7 @@
#ifndef _MYSQLCONNECTION_H
#define _MYSQLCONNECTION_H
class DatabaseWorker;
template <class T> class DatabaseWorker;
class PreparedStatement;
class MySQLPreparedStatement;
class PingOperation;
@@ -56,11 +56,11 @@ struct MySQLConnectionInfo
class MySQLConnection
{
template <class T> friend class DatabaseWorkerPool;
template <class T> friend class DatabaseWorker;
friend class PingOperation;
public:
MySQLConnection(MySQLConnectionInfo& connInfo); //! Constructor for synchroneous connections.
MySQLConnection(ACE_Activation_Queue* queue, MySQLConnectionInfo& connInfo); //! Constructor for asynchroneous connections.
MySQLConnection(MySQLConnectionInfo& connInfo);
~MySQLConnection();
virtual bool Open();
@@ -102,7 +102,6 @@ class MySQLConnection
private:
ACE_Activation_Queue* m_queue; //! Queue shared with other asynchroneous connections.
DatabaseWorker* m_worker; //! Core worker task.
MYSQL * m_Mysql; //! MySQL Handle.
MySQLConnectionInfo& m_connectionInfo; //! Connection info (used for logging)
ACE_Thread_Mutex m_Mutex;

View File

@@ -49,7 +49,7 @@ typedef ACE_Refcounted_Auto_Ptr<Transaction, ACE_Null_Mutex> SQLTransaction;
class TransactionTask : public SQLOperation
{
template <class T> friend class DatabaseWorkerPool;
friend class DatabaseWorker;
template <class T> friend class DatabaseWorker;
public:
TransactionTask(SQLTransaction trans) : m_trans(trans) {} ;

View File

@@ -419,7 +419,7 @@ bool Master::_StartDB()
{
sLog.SetLogDB(false);
std::string dbstring;
uint8 async_threads, synch_threads;
uint8 async_threads, connections;
dbstring = sConfig.GetStringDefault("WorldDatabaseInfo", "");
if (dbstring.empty())
@@ -436,10 +436,10 @@ bool Master::_StartDB()
return false;
}
synch_threads = sConfig.GetIntDefault("WorldDatabase.SynchThreads", 1);
connections = sConfig.GetIntDefault("WorldDatabase.Connections", 2);
///- Initialise the world database
if (!WorldDatabase.Open(dbstring, async_threads, synch_threads))
if (!WorldDatabase.Open(dbstring, async_threads, connections))
{
sLog.outError("Cannot connect to world database %s", dbstring.c_str());
return false;
@@ -460,10 +460,10 @@ bool Master::_StartDB()
return false;
}
synch_threads = sConfig.GetIntDefault("CharacterDatabase.SynchThreads", 2);
connections = sConfig.GetIntDefault("CharacterDatabase.Connections", 2);
///- Initialise the Character database
if (!CharacterDatabase.Open(dbstring, async_threads, synch_threads))
if (!CharacterDatabase.Open(dbstring, async_threads, connections))
{
sLog.outError("Cannot connect to Character database %s", dbstring.c_str());
return false;
@@ -485,10 +485,10 @@ bool Master::_StartDB()
return false;
}
synch_threads = sConfig.GetIntDefault("LoginDatabase.SynchThreads", 1);
connections = sConfig.GetIntDefault("LoginDatabase.Connections", 2);
///- Initialise the login database
if (!LoginDatabase.Open(dbstring, async_threads, synch_threads))
if (!LoginDatabase.Open(dbstring, async_threads, connections))
{
sLog.outError("Cannot connect to login database %s", dbstring.c_str());
return false;

View File

@@ -94,8 +94,7 @@ CharacterDatabaseInfo = "127.0.0.1;3306;trinity;trinity;characters"
# WorldDatabase.WorkerThreads
# CharacterDatabase.WorkerThreads
# Description: The amount of worker threads spawned to handle asynchronous (delayed) MySQL
# statements. Each worker thread is mirrored with its own connection to the
# MySQL server and their own thread on the MySQL server.
# statements.
# Default: 1 - (LoginDatabase.WorkerThreads)
# 1 - (WorldDatabase.WorkerThreads)
# 1 - (CharacterDatabase.WorkerThreads)
@@ -105,17 +104,17 @@ WorldDatabase.WorkerThreads = 1
CharacterDatabase.WorkerThreads = 1
#
# LoginDatabase.SynchThreads
# WorldDatabase.SynchThreads
# CharacterDatabase.SynchThreads
# Description: The amount of MySQL connections spawned to handle.
# Default: 1 - (LoginDatabase.WorkerThreads)
# 1 - (WorldDatabase.WorkerThreads)
# LoginDatabase.Connections
# WorldDatabase.Connections
# CharacterDatabase.Connections
# Description: The amount of MySQL connections spawned.
# Default: 1 - (LoginDatabase.Connections)
# 1 - (WorldDatabase.Connections)
# 2 - (CharacterDatabase.WorkerThreads)
LoginDatabase.SynchThreads = 1
WorldDatabase.SynchThreads = 1
CharacterDatabase.SynchThreads = 2
LoginDatabase.Connections = 2
WorldDatabase.Connections = 2
CharacterDatabase.Connections = 2
#
# MaxPingTime