aboutsummaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorMachiavelli <none@none>2010-12-13 09:18:49 +0100
committerMachiavelli <none@none>2010-12-13 09:18:49 +0100
commitea29d87dccc50d985e90c013c128c34a0b8299f7 (patch)
tree40e2ebedc048e4194a65faf9aa0b34e80af0a197
parentf5d6319d4ddd35a9be9ea886aa119465360e397e (diff)
Backed out changeset: 8326a2411148
--HG-- branch : trunk
-rwxr-xr-xsrc/server/authserver/Main.cpp10
-rw-r--r--src/server/authserver/authserver.conf.dist10
-rw-r--r--src/server/shared/Database/DatabaseWorker.cpp67
-rwxr-xr-xsrc/server/shared/Database/DatabaseWorker.h52
-rwxr-xr-xsrc/server/shared/Database/DatabaseWorkerPool.h115
-rwxr-xr-xsrc/server/shared/Database/MySQLConnection.cpp9
-rwxr-xr-xsrc/server/shared/Database/MySQLConnection.h7
-rwxr-xr-xsrc/server/shared/Database/Transaction.h2
-rwxr-xr-xsrc/server/worldserver/Master.cpp14
-rw-r--r--src/server/worldserver/worldserver.conf.dist21
10 files changed, 177 insertions, 130 deletions
diff --git a/src/server/authserver/Main.cpp b/src/server/authserver/Main.cpp
index 454cc83fc1b..7f7d2e39561 100755
--- a/src/server/authserver/Main.cpp
+++ b/src/server/authserver/Main.cpp
@@ -344,15 +344,15 @@ bool StartDB()
worker_threads = 1;
}
- uint8 connections = sConfig.GetIntDefault("LoginDatabase.Connections", 2);
- if (connections < 1 || connections > 32)
+ uint8 synch_threads = sConfig.GetIntDefault("LoginDatabase.SynchThreads", 1);
+ if (synch_threads < 1 || synch_threads > 32)
{
- sLog.outError("Improper value specified for LoginDatabase.Connections, defaulting to 2.");
- connections = 2;
+ sLog.outError("Improper value specified for LoginDatabase.SynchThreads, defaulting to 1.");
+ synch_threads = 1;
}
/// NOTE: While authserver is singlethreaded you should keep synch_threads == 1. Increasing it is just silly since only 1 will be used ever.
- if (!LoginDatabase.Open(dbstring.c_str(), worker_threads, connections))
+ if (!LoginDatabase.Open(dbstring.c_str(), worker_threads, synch_threads))
{
sLog.outError("Cannot connect to database");
return false;
diff --git a/src/server/authserver/authserver.conf.dist b/src/server/authserver/authserver.conf.dist
index 6fed0efe0ed..9c14178a0a0 100644
--- a/src/server/authserver/authserver.conf.dist
+++ b/src/server/authserver/authserver.conf.dist
@@ -224,17 +224,11 @@ LoginDatabaseInfo = "127.0.0.1;3306;trinity;trinity;auth"
#
# LoginDatabase.WorkerThreads
# Description: The amount of worker threads spawned to handle asynchronous (delayed) MySQL
-# statements.
+# statements. Each worker thread is mirrored with its own connection to the
+# MySQL server and their own thread on the MySQL server.
# Default: 1 - (LoginDatabase.WorkerThreads)
LoginDatabase.WorkerThreads = 1
#
-# LoginDatabase.Connections
-# Description: The amount of MySQL connections spawned.
-# Default: 1
-
-LoginDatabase.Connections = 1
-
-#
###################################################################################################
diff --git a/src/server/shared/Database/DatabaseWorker.cpp b/src/server/shared/Database/DatabaseWorker.cpp
new file mode 100644
index 00000000000..3739d81210e
--- /dev/null
+++ b/src/server/shared/Database/DatabaseWorker.cpp
@@ -0,0 +1,67 @@
+/*
+ * Copyright (C) 2008-2010 TrinityCore <http://www.trinitycore.org/>
+ *
+ * This program is free software; you can redistribute it and/or modify it
+ * under the terms of the GNU General Public License as published by the
+ * Free Software Foundation; either version 2 of the License, or (at your
+ * option) any later version.
+ *
+ * This program is distributed in the hope that it will be useful, but WITHOUT
+ * ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or
+ * FITNESS FOR A PARTICULAR PURPOSE. See the GNU General Public License for
+ * more details.
+ *
+ * You should have received a copy of the GNU General Public License along
+ * with this program. If not, see <http://www.gnu.org/licenses/>.
+ */
+
+#include "DatabaseEnv.h"
+#include "DatabaseWorker.h"
+#include "SQLOperation.h"
+#include "MySQLConnection.h"
+#include "MySQLThreading.h"
+
+DatabaseWorker::DatabaseWorker(ACE_Activation_Queue* new_queue, MySQLConnection* con) :
+m_queue(new_queue),
+m_conn(con)
+{
+ /// Assign thread to task
+ activate();
+}
+
+int DatabaseWorker::svc()
+{
+ if (!m_queue)
+ return -1;
+
+ SQLOperation *request = NULL;
+ while (1)
+ {
+ request = (SQLOperation*)(m_queue->dequeue());
+ if (!request)
+ break;
+
+ request->SetConnection(m_conn);
+ request->call();
+
+ delete request;
+ }
+
+ m_conn->Close();
+ return 0;
+}
+
+int DatabaseWorker::activate()
+{
+ /* THR_DETACHED:
+ Create an asynchronous thread. The exit status of the thread would not be available to any other threads.
+ The thread resources are reclaimed by the operating system whenever the thread is terminated. */
+
+ /* THR_NEW_LWP:
+ Create an explicit kernel-level thread (as opposed to a user-level thread). */
+
+ ACE_Task_Base::activate(THR_NEW_LWP | THR_DETACHED, 1);
+ return 0; //^ - Spawn one thread to handle this task.
+ // However more of these tasks may be activated
+ // See DatabaseWorkerPool ctor.
+} \ No newline at end of file
diff --git a/src/server/shared/Database/DatabaseWorker.h b/src/server/shared/Database/DatabaseWorker.h
index f51d014bb8d..8972555fbe1 100755
--- a/src/server/shared/Database/DatabaseWorker.h
+++ b/src/server/shared/Database/DatabaseWorker.h
@@ -23,66 +23,20 @@
class MySQLConnection;
-template <typename T>
class DatabaseWorker : public ACE_Task_Base
{
public:
- DatabaseWorker(ACE_Activation_Queue* new_queue, T* t) :
- m_queue(new_queue),
- m_pool(t)
- {
- /// Assign thread to task
- activate();
- }
-
+ DatabaseWorker(ACE_Activation_Queue* new_queue, MySQLConnection* con);
///- Inherited from ACE_Task_Base
- int svc()
- {
- if (!m_queue)
- return -1;
-
- SQLOperation *request = NULL;
- while (1)
- {
- request = (SQLOperation*)(m_queue->dequeue());
- if (!request)
- break;
-
- MySQLConnection* con = m_pool->GetFreeConnection();
- request->SetConnection(con);
- request->call();
- con->Unlock();
-
- delete request;
- }
-
- m_conn->Close();
- return 0;
- }
-
- int activate()
- {
- /* THR_DETACHED:
- Create an asynchronous thread. The exit status of the thread would not be available to any other threads.
- The thread resources are reclaimed by the operating system whenever the thread is terminated. */
-
- /* THR_NEW_LWP:
- Create an explicit kernel-level thread (as opposed to a user-level thread). */
-
- ACE_Task_Base::activate(THR_NEW_LWP | THR_DETACHED, 1);
- return 0; //^ - Spawn one thread to handle this task.
- // However more of these tasks may be activated
- // See DatabaseWorkerPool ctor.
- }
-
+ int svc();
+ int activate();
int wait() { return ACE_Task_Base::wait(); }
private:
DatabaseWorker() : ACE_Task_Base() {}
ACE_Activation_Queue* m_queue;
MySQLConnection* m_conn;
- T* m_pool; // Databasepool we operate on
};
#endif
diff --git a/src/server/shared/Database/DatabaseWorkerPool.h b/src/server/shared/Database/DatabaseWorkerPool.h
index 5cfb8af2155..9b912dd3854 100755
--- a/src/server/shared/Database/DatabaseWorkerPool.h
+++ b/src/server/shared/Database/DatabaseWorkerPool.h
@@ -51,8 +51,6 @@ class PingOperation : public SQLOperation
template <class T>
class DatabaseWorkerPool
{
- template <class Y> friend class DatabaseWorker;
-
private:
typedef ACE_Atomic_Op<ACE_SYNCH_MUTEX, uint32> AtomicUInt;
@@ -61,6 +59,8 @@ class DatabaseWorkerPool
m_queue(new ACE_Activation_Queue(new ACE_Message_Queue<ACE_MT_SYNCH>)),
m_connections(0)
{
+ m_connections.resize(IDX_SIZE);
+
mysql_library_init(-1, NULL, NULL);
WPFatal (mysql_thread_safe(), "Used MySQL library isn't thread-safe.");
}
@@ -71,24 +71,31 @@ class DatabaseWorkerPool
mysql_library_end();
}
- bool Open(const std::string& infoString, uint8 connections, uint8 delaythreads)
+ bool Open(const std::string& infoString, uint8 async_threads, uint8 synch_threads)
{
m_connectionInfo = MySQLConnectionInfo(infoString);
- sLog.outSQLDriver("Opening databasepool '%s'. Delaythreads: %u, connections: %u", m_connectionInfo.database.c_str(), delaythreads, connections);
+ sLog.outSQLDriver("Opening databasepool '%s'. Async threads: %u, synch threads: %u", m_connectionInfo.database.c_str(), async_threads, synch_threads);
- m_connections.resize(connections);
- for (uint8 i = 0; i < connections; ++i)
+ /// Open asynchronous connections (delayed operations)
+ m_connections[IDX_ASYNC].resize(async_threads);
+ for (uint8 i = 0; i < async_threads; ++i)
{
- T* t = new T(m_connectionInfo);
+ T* t = new T(m_queue, m_connectionInfo);
t->Open();
- m_connections[i] = t;
+ m_connections[IDX_ASYNC][i] = t;
++m_connectionCount;
}
- m_delaythreads.resize(delaythreads);
- for (uint8 i = 0; i < delaythreads; ++i)
- m_delaythreads[i] = new DatabaseWorker<ThisClass>(m_queue, this);
+ /// Open synchronous connections (direct, blocking operations)
+ m_connections[IDX_SYNCH].resize(synch_threads);
+ for (uint8 i = 0; i < synch_threads; ++i)
+ {
+ T* t = new T(m_connectionInfo);
+ t->Open();
+ m_connections[IDX_SYNCH][i] = t;
+ ++m_connectionCount;
+ }
sLog.outSQLDriver("Databasepool opened succesfuly. %u connections running.", (uint32)m_connectionCount.value());
return true;
@@ -100,21 +107,25 @@ class DatabaseWorkerPool
/// Shuts down delaythreads for this connection pool.
m_queue->queue()->deactivate();
+ for (uint8 i = 0; i < m_connections[IDX_ASYNC].size(); ++i)
+ {
+ /// TODO: Better way. probably should flip a boolean and check it on low level code before doing anything on the mysql ctx
+ /// Now we just wait until m_queue gives the signal to the worker threads to stop
+ T* t = m_connections[IDX_ASYNC][i];
+ t->m_worker->wait(); // t->Close(); is called from worker thread
+ --m_connectionCount;
+ }
- for (uint8 i = 0; i < m_delaythreads.size(); ++i)
- m_delaythreads[i]->wait();
+ sLog.outSQLDriver("Asynchronous connections on databasepool '%s' terminated. Proceeding with synchronous connections.", m_connectionInfo.database.c_str());
- /// Shut down the connections
- for (uint8 i = 0; i < m_connections.size(); ++i)
+ /// Shut down the synchronous connections
+ for (uint8 i = 0; i < m_connections[IDX_SYNCH].size(); ++i)
{
- T* t = m_connections[i];
- while (1)
- if (t->LockIfReady())
- {
- t->Close();
- --m_connectionCount;
- break;
- }
+ T* t = m_connections[IDX_SYNCH][i];
+ //while (1)
+ // if (t->LockIfReady()) -- For some reason deadlocks us
+ t->Close();
+ --m_connectionCount;
}
sLog.outSQLDriver("All connections on databasepool %s closed.", m_connectionInfo.database.c_str());
@@ -301,31 +312,22 @@ class DatabaseWorkerPool
void KeepAlive()
{
- /// Ping connections
- for (uint8 i = 0; i < m_connections.size(); ++i)
+ /// Ping synchronous connections
+ for (uint8 i = 0; i < m_connections[IDX_SYNCH].size(); ++i)
{
- T* t = m_connections[i];
+ T* t = m_connections[IDX_SYNCH][i];
if (t->LockIfReady())
{
t->Ping();
t->Unlock();
}
}
- }
-
- protected:
- T* GetFreeConnection()
- {
- uint8 i = 0;
- for (;;) /// Block forever until a connection is free
- {
- T* t = m_connections[++i % m_connectionCount.value()];
- if (t->LockIfReady()) /// Must be matched with t->Unlock() or you will get deadlocks
- return t;
- }
-
- // This will be called when Celine Dion learns to sing
- return NULL;
+
+ /// Assuming all worker threads are free, every worker thread will receive 1 ping operation request
+ /// If one or more worker threads are busy, the ping operations will not be split evenly, but this doesn't matter
+ /// as the sole purpose is to prevent connections from idling.
+ for (size_t i = 0; i < m_connections[IDX_ASYNC].size(); ++i)
+ Enqueue(new PingOperation);
}
private:
@@ -345,14 +347,33 @@ class DatabaseWorkerPool
m_queue->enqueue(op);
}
- private:
- typedef DatabaseWorkerPool<T> ThisClass;
+ T* GetFreeConnection()
+ {
+ uint8 i = 0;
+ size_t num_cons = m_connections[IDX_SYNCH].size();
+ for (;;) /// Block forever until a connection is free
+ {
+ T* t = m_connections[IDX_SYNCH][++i % num_cons ];
+ if (t->LockIfReady()) /// Must be matched with t->Unlock() or you will get deadlocks
+ return t;
+ }
- ACE_Activation_Queue* m_queue; //! Queue shared by async worker threads.
- std::vector<T*> m_connections;
- AtomicUInt m_connectionCount; //! Counter of MySQL connections;
+ // This will be called when Celine Dion learns to sing
+ return NULL;
+ }
+
+ private:
+ enum
+ {
+ IDX_ASYNC,
+ IDX_SYNCH,
+ IDX_SIZE,
+ };
+
+ ACE_Activation_Queue* m_queue; //! Queue shared by async worker threads.
+ std::vector< std::vector<T*> > m_connections;
+ AtomicUInt m_connectionCount; //! Counter of MySQL connections;
MySQLConnectionInfo m_connectionInfo;
- std::vector<DatabaseWorker<ThisClass>*> m_delaythreads; //! Delaythreads (templatized)
};
#endif
diff --git a/src/server/shared/Database/MySQLConnection.cpp b/src/server/shared/Database/MySQLConnection.cpp
index de05ee311db..20c614b5333 100755
--- a/src/server/shared/Database/MySQLConnection.cpp
+++ b/src/server/shared/Database/MySQLConnection.cpp
@@ -34,11 +34,20 @@
MySQLConnection::MySQLConnection(MySQLConnectionInfo& connInfo) :
m_queue(NULL),
+m_worker(NULL),
m_Mysql(NULL),
m_connectionInfo(connInfo)
{
}
+MySQLConnection::MySQLConnection(ACE_Activation_Queue* queue, MySQLConnectionInfo& connInfo) :
+m_queue(queue),
+m_Mysql(NULL),
+m_connectionInfo(connInfo)
+{
+ m_worker = new DatabaseWorker(m_queue, this);
+}
+
MySQLConnection::~MySQLConnection()
{
ASSERT (m_Mysql); /// MySQL context must be present at this point
diff --git a/src/server/shared/Database/MySQLConnection.h b/src/server/shared/Database/MySQLConnection.h
index a72b5b2b54b..56a670d07c1 100755
--- a/src/server/shared/Database/MySQLConnection.h
+++ b/src/server/shared/Database/MySQLConnection.h
@@ -22,7 +22,7 @@
#ifndef _MYSQLCONNECTION_H
#define _MYSQLCONNECTION_H
-template <class T> class DatabaseWorker;
+class DatabaseWorker;
class PreparedStatement;
class MySQLPreparedStatement;
class PingOperation;
@@ -56,11 +56,11 @@ struct MySQLConnectionInfo
class MySQLConnection
{
template <class T> friend class DatabaseWorkerPool;
- template <class T> friend class DatabaseWorker;
friend class PingOperation;
public:
- MySQLConnection(MySQLConnectionInfo& connInfo);
+ MySQLConnection(MySQLConnectionInfo& connInfo); //! Constructor for synchroneous connections.
+ MySQLConnection(ACE_Activation_Queue* queue, MySQLConnectionInfo& connInfo); //! Constructor for asynchroneous connections.
~MySQLConnection();
virtual bool Open();
@@ -102,6 +102,7 @@ class MySQLConnection
private:
ACE_Activation_Queue* m_queue; //! Queue shared with other asynchroneous connections.
+ DatabaseWorker* m_worker; //! Core worker task.
MYSQL * m_Mysql; //! MySQL Handle.
MySQLConnectionInfo& m_connectionInfo; //! Connection info (used for logging)
ACE_Thread_Mutex m_Mutex;
diff --git a/src/server/shared/Database/Transaction.h b/src/server/shared/Database/Transaction.h
index e8200f9810a..8bdb11541e7 100755
--- a/src/server/shared/Database/Transaction.h
+++ b/src/server/shared/Database/Transaction.h
@@ -49,7 +49,7 @@ typedef ACE_Refcounted_Auto_Ptr<Transaction, ACE_Null_Mutex> SQLTransaction;
class TransactionTask : public SQLOperation
{
template <class T> friend class DatabaseWorkerPool;
- template <class T> friend class DatabaseWorker;
+ friend class DatabaseWorker;
public:
TransactionTask(SQLTransaction trans) : m_trans(trans) {} ;
diff --git a/src/server/worldserver/Master.cpp b/src/server/worldserver/Master.cpp
index 4dede995597..ecfbdbc7d8e 100755
--- a/src/server/worldserver/Master.cpp
+++ b/src/server/worldserver/Master.cpp
@@ -419,7 +419,7 @@ bool Master::_StartDB()
{
sLog.SetLogDB(false);
std::string dbstring;
- uint8 async_threads, connections;
+ uint8 async_threads, synch_threads;
dbstring = sConfig.GetStringDefault("WorldDatabaseInfo", "");
if (dbstring.empty())
@@ -436,10 +436,10 @@ bool Master::_StartDB()
return false;
}
- connections = sConfig.GetIntDefault("WorldDatabase.Connections", 2);
+ synch_threads = sConfig.GetIntDefault("WorldDatabase.SynchThreads", 1);
///- Initialise the world database
- if (!WorldDatabase.Open(dbstring, async_threads, connections))
+ if (!WorldDatabase.Open(dbstring, async_threads, synch_threads))
{
sLog.outError("Cannot connect to world database %s", dbstring.c_str());
return false;
@@ -460,10 +460,10 @@ bool Master::_StartDB()
return false;
}
- connections = sConfig.GetIntDefault("CharacterDatabase.Connections", 2);
+ synch_threads = sConfig.GetIntDefault("CharacterDatabase.SynchThreads", 2);
///- Initialise the Character database
- if (!CharacterDatabase.Open(dbstring, async_threads, connections))
+ if (!CharacterDatabase.Open(dbstring, async_threads, synch_threads))
{
sLog.outError("Cannot connect to Character database %s", dbstring.c_str());
return false;
@@ -485,10 +485,10 @@ bool Master::_StartDB()
return false;
}
- connections = sConfig.GetIntDefault("LoginDatabase.Connections", 2);
+ synch_threads = sConfig.GetIntDefault("LoginDatabase.SynchThreads", 1);
///- Initialise the login database
- if (!LoginDatabase.Open(dbstring, async_threads, connections))
+ if (!LoginDatabase.Open(dbstring, async_threads, synch_threads))
{
sLog.outError("Cannot connect to login database %s", dbstring.c_str());
return false;
diff --git a/src/server/worldserver/worldserver.conf.dist b/src/server/worldserver/worldserver.conf.dist
index 6815f27d00b..a74fd7c0b71 100644
--- a/src/server/worldserver/worldserver.conf.dist
+++ b/src/server/worldserver/worldserver.conf.dist
@@ -94,7 +94,8 @@ CharacterDatabaseInfo = "127.0.0.1;3306;trinity;trinity;characters"
# WorldDatabase.WorkerThreads
# CharacterDatabase.WorkerThreads
# Description: The amount of worker threads spawned to handle asynchronous (delayed) MySQL
-# statements.
+# statements. Each worker thread is mirrored with its own connection to the
+# MySQL server and their own thread on the MySQL server.
# Default: 1 - (LoginDatabase.WorkerThreads)
# 1 - (WorldDatabase.WorkerThreads)
# 1 - (CharacterDatabase.WorkerThreads)
@@ -104,17 +105,17 @@ WorldDatabase.WorkerThreads = 1
CharacterDatabase.WorkerThreads = 1
#
-# LoginDatabase.Connections
-# WorldDatabase.Connections
-# CharacterDatabase.Connections
-# Description: The amount of MySQL connections spawned.
-# Default: 1 - (LoginDatabase.Connections)
-# 1 - (WorldDatabase.Connections)
+# LoginDatabase.SynchThreads
+# WorldDatabase.SynchThreads
+# CharacterDatabase.SynchThreads
+# Description: The amount of MySQL connections spawned to handle.
+# Default: 1 - (LoginDatabase.WorkerThreads)
+# 1 - (WorldDatabase.WorkerThreads)
# 2 - (CharacterDatabase.WorkerThreads)
-LoginDatabase.Connections = 2
-WorldDatabase.Connections = 2
-CharacterDatabase.Connections = 2
+LoginDatabase.SynchThreads = 1
+WorldDatabase.SynchThreads = 1
+CharacterDatabase.SynchThreads = 2
#
# MaxPingTime