aboutsummaryrefslogtreecommitdiff
path: root/src/server/shared
diff options
context:
space:
mode:
authorMachiavelli <none@none>2010-09-27 00:20:56 +0200
committerMachiavelli <none@none>2010-09-27 00:20:56 +0200
commita9e9a2c8848c22e4a3e3b7bab0caeca25d9ea408 (patch)
treea7c4960796d0a9a42cb1e0252d4a75c4436d1f01 /src/server/shared
parent894b2081b3837575bd44c71ea4ebc76008b5b5e3 (diff)
Core/DBLayer:
- DB Threading model update * Get rid of ThreadBundleMask and bundled connection * Implement configurable amount of Synch threads for databasepools * Use modulus based algorithm to check for free synchronous connections instead of previous ¨get connection by thread key or bundlemask¨ feature * Locks on mysql context objects are now managed outside the mysql query methods Fixes issue #4058 Fixes issue #4059 Introduces a ton of more issues. Use at own risk. You were warned. Really. Don´t forget to update your worldserver.conf --HG-- branch : trunk
Diffstat (limited to 'src/server/shared')
-rw-r--r--src/server/shared/Database/DatabaseWorker.cpp5
-rw-r--r--src/server/shared/Database/DatabaseWorker.h2
-rw-r--r--src/server/shared/Database/DatabaseWorkerPool.h227
-rw-r--r--src/server/shared/Database/MySQLConnection.cpp22
-rw-r--r--src/server/shared/Database/MySQLConnection.h16
5 files changed, 136 insertions, 136 deletions
diff --git a/src/server/shared/Database/DatabaseWorker.cpp b/src/server/shared/Database/DatabaseWorker.cpp
index 0fd7287f23a..9d903ac6db4 100644
--- a/src/server/shared/Database/DatabaseWorker.cpp
+++ b/src/server/shared/Database/DatabaseWorker.cpp
@@ -35,8 +35,6 @@ int DatabaseWorker::svc()
if (!m_queue)
return -1;
- MySQL::Thread_Init();
-
SQLOperation *request = NULL;
while (1)
{
@@ -50,8 +48,7 @@ int DatabaseWorker::svc()
delete request;
}
- MySQL::Thread_End();
- delete m_conn;
+ m_conn->Close();
return 0;
}
diff --git a/src/server/shared/Database/DatabaseWorker.h b/src/server/shared/Database/DatabaseWorker.h
index cb224620020..5bb3a64eba3 100644
--- a/src/server/shared/Database/DatabaseWorker.h
+++ b/src/server/shared/Database/DatabaseWorker.h
@@ -24,7 +24,7 @@
class MySQLConnection;
-class DatabaseWorker : protected ACE_Task_Base
+class DatabaseWorker : public ACE_Task_Base
{
public:
DatabaseWorker(ACE_Activation_Queue* new_queue, MySQLConnection* con);
diff --git a/src/server/shared/Database/DatabaseWorkerPool.h b/src/server/shared/Database/DatabaseWorkerPool.h
index 9a7134eb2ce..164fce8cb5f 100644
--- a/src/server/shared/Database/DatabaseWorkerPool.h
+++ b/src/server/shared/Database/DatabaseWorkerPool.h
@@ -33,23 +33,20 @@
#include "QueryHolder.h"
#include "AdhocStatement.h"
-enum MySQLThreadBundle
-{
- MYSQL_BUNDLE_NONE = 0x00, //- Each task will run their own MySQL connection
- MYSQL_BUNDLE_UNUSED = 0x01, //- Temp unused
- MYSQL_BUNDLE_RA = 0x02, //- Remote admin thread
- MYSQL_BUNDLE_RAR = 0x04, //- Reactor runnable thread
- MYSQL_BUNDLE_WORLD = 0x08, //- WorldRunnable
- MYSQL_BUNDLE_ALL = MYSQL_BUNDLE_RA | MYSQL_BUNDLE_RAR | MYSQL_BUNDLE_WORLD,
-};
-
class PingOperation : public SQLOperation
{
/// Operation for idle delaythreads
bool Execute()
{
- m_conn->Ping();
- return true;
+ for (;;)
+ if (m_conn->LockIfReady())
+ {
+ m_conn->Ping();
+ m_conn->Unlock();
+ return true;
+ }
+
+ return false;
}
};
@@ -57,7 +54,6 @@ template <class T>
class DatabaseWorkerPool
{
private:
- typedef UNORDERED_MAP<ACE_Based::Thread*, MySQLConnection*> ConnectionMap;
typedef ACE_Atomic_Op<ACE_SYNCH_MUTEX, uint32> AtomicUInt;
public:
@@ -66,6 +62,7 @@ class DatabaseWorkerPool
m_connections(0)
{
m_infoString = "";
+ m_connections.resize(IDX_SIZE);
mysql_library_init(-1, NULL, NULL);
WPFatal (mysql_thread_safe(), "Used MySQL library isn't thread-safe.");
@@ -73,97 +70,69 @@ class DatabaseWorkerPool
~DatabaseWorkerPool()
{
+ sLog.outSQLDriver("~DatabaseWorkerPool for '%s'.", "missingname");
mysql_library_end();
}
- bool Open(const std::string& infoString, uint8 num_threads, MySQLThreadBundle mask)
+ bool Open(const std::string& infoString, uint8 async_threads, uint8 synch_threads)
{
- //- Only created bundled connection if configured
- m_bundle_conn = NULL;
- if (mask != MYSQL_BUNDLE_NONE)
+ sLog.outSQLDriver("Opening databasepool '%s'. Async threads: %u, synch threads: %u", "nonameyet", async_threads, synch_threads);
+
+ /// Open asynchronous connections (delayed operations)
+ m_connections[IDX_ASYNC].resize(async_threads);
+ for (uint8 i = 0; i < async_threads; ++i)
{
- sLog.outSQLDriver("Creating bundled/master MySQL connection.");
- m_bundle_conn = new T();
- m_bundle_conn->Open(infoString);
- ++m_connections;
+ T* t = new T(m_queue);
+ t->Open(infoString);
+ m_connections[IDX_ASYNC][i] = t;
+ ++m_connectionCount;
}
- m_async_connections.resize(num_threads);
-
- /// Open the Async pool
- for (uint8 i = 0; i < num_threads; i++)
+ /// Open synchronous connections (direct, blocking operations)
+ m_connections[IDX_SYNCH].resize(synch_threads);
+ for (uint8 i = 0; i < synch_threads; ++i)
{
- m_async_connections[i] = new T(m_queue);
- m_async_connections[i]->Open(infoString);
- ++m_connections;
- sLog.outSQLDriver("Async database thread pool opened. Worker thread count: %u", num_threads);
+ T* t = new T();
+ t->Open(infoString);
+ m_connections[IDX_SYNCH][i] = t;
+ ++m_connectionCount;
}
+ /// TODO: Connection details in a struct
m_infoString = infoString;
+
+ sLog.outSQLDriver("Databasepool opened succesfuly. %u connections running.", (uint32)m_connectionCount.value());
return true;
}
void Close()
{
- sLog.outSQLDriver("Closing down %u connections on this DatabaseWorkerPool", (uint32)m_connections.value());
- /// Shuts down worker threads for this connection pool.
- m_queue->queue()->deactivate();
-
- for (uint8 i = 0; i < m_async_connections.size(); i++)
- {
- m_async_connections[i]->m_worker->wait();
- --m_connections;
- }
-
- if (m_bundleMask != MYSQL_BUNDLE_NONE)
- {
- delete m_bundle_conn;
- m_bundle_conn = NULL;
- --m_connections;
- sLog.outSQLDriver("Closed bundled connection.");
- }
-
- //- MySQL::Thread_End() should be called manually from the aborting calling threads
- }
-
- void Init_MySQL_Connection()
- {
- T* conn = new T();
- conn->Open(m_infoString);
+ sLog.outSQLDriver("Closing down databasepool '%s'.", "missingname");
- // no idea why it doesn't accept sLog here
+ /// Shuts down delaythreads for this connection pool.
+ m_queue->queue()->deactivate();
+ for (uint8 i = 0; i < m_connections[IDX_ASYNC].size(); ++i)
{
- ACE_Guard<ACE_Thread_Mutex> guard(m_connectionMap_mtx);
- ConnectionMap::const_iterator itr = m_sync_connections.find(ACE_Based::Thread::current());
- #ifdef _DEBUG
- if (itr != m_sync_connections.end())
- ACE_Singleton<Log, ACE_Thread_Mutex>::instance()->outSQLDriver("Thread ["UI64FMTD"] already started a MySQL connection", (uint64)ACE_Based::Thread::currentId());
- #endif
- m_sync_connections[ACE_Based::Thread::current()] = conn;
+ /// TODO: Better way. probably should flip a boolean and check it on low level code before doing anything on the mysql ctx
+ /// Now we just wait until m_queue gives the signal to the worker threads to stop
+ T* t = m_connections[IDX_ASYNC][i];
+ t->m_worker->wait(); // t->Close(); is called from worker thread
+ --m_connectionCount;
}
- ACE_Singleton<Log, ACE_Thread_Mutex>::instance()->outSQLDriver("Core thread with ID ["UI64FMTD"] initializing MySQL connection.",
- (uint64)ACE_Based::Thread::currentId());
+ sLog.outSQLDriver("Asynchronous connections on databasepool '%s' terminated. Proceeding with synchronous connections.", "missingname");
- ++m_connections;
- }
-
- void End_MySQL_Connection()
- {
- MySQLConnection* conn;
+ /// Shut down the synchronous connections
+ for (uint8 i = 0; i < m_connections[IDX_SYNCH].size(); ++i)
{
- ACE_Guard<ACE_Thread_Mutex> guard(m_connectionMap_mtx);
- ConnectionMap::iterator itr = m_sync_connections.find(ACE_Based::Thread::current());
- #ifdef _DEBUG
- if (itr == m_sync_connections.end())
- sLog.outSQLDriver("Thread ["UI64FMTD"] already shut down their MySQL connection.", (uint64)ACE_Based::Thread::currentId());
- #endif
- conn = itr->second;
- m_sync_connections.erase(itr);
+ T* t = m_connections[IDX_SYNCH][i];
+ //while (1)
+ // if (t->LockIfReady()) -- For some reason deadlocks us
+ t->Close();
+ --m_connectionCount;
}
- delete conn;
- conn = NULL;
- --m_connections;
+
+ sLog.outSQLDriver("All connections on databasepool 'missingname' closed.");
}
void Execute(const char* sql)
@@ -191,8 +160,12 @@ class DatabaseWorkerPool
void DirectExecute(const char* sql)
{
- if (sql)
- GetConnection()->Execute(sql);
+ if (!sql)
+ return;
+
+ T* t = GetFreeConnection();
+ t->Execute(sql);
+ t->Unlock();
}
void DirectPExecute(const char* sql, ...)
@@ -209,9 +182,13 @@ class DatabaseWorkerPool
return DirectExecute(szQuery);
}
- QueryResult Query(const char* sql)
+ QueryResult Query(const char* sql, MySQLConnection* conn = NULL)
{
- ResultSet* result = GetConnection()->Query(sql);
+ if (!conn)
+ conn = GetFreeConnection();
+
+ ResultSet* result = conn->Query(sql);
+ conn->Unlock();
if (!result || !result->GetRowCount())
return QueryResult(NULL);
@@ -219,6 +196,20 @@ class DatabaseWorkerPool
return QueryResult(result);
}
+ QueryResult PQuery(const char* sql, MySQLConnection* conn, ...)
+ {
+ if (!sql)
+ return QueryResult(NULL);
+
+ va_list ap;
+ char szQuery[MAX_QUERY_LEN];
+ va_start(ap, sql);
+ vsnprintf(szQuery, MAX_QUERY_LEN, sql, ap);
+ va_end(ap);
+
+ return Query(szQuery, conn);
+ }
+
QueryResult PQuery(const char* sql, ...)
{
if (!sql)
@@ -303,11 +294,12 @@ class DatabaseWorkerPool
delete[] buf;
}
- MySQLThreadBundle GetBundleMask() { return m_bundleMask; }
-
PreparedQueryResult Query(PreparedStatement* stmt)
{
- PreparedResultSet* ret = GetConnection()->Query(stmt);
+ T* t = GetFreeConnection();
+ PreparedResultSet* ret = t->Query(stmt);
+ t->Unlock();
+
if (!ret || !ret->GetRowCount())
return PreparedQueryResult(NULL);
@@ -316,22 +308,14 @@ class DatabaseWorkerPool
void KeepAlive()
{
- ConnectionMap::const_iterator itr;
- {
- /*! MapUpdate + unbundled threads */
- ACE_Guard<ACE_Thread_Mutex> guard(m_connectionMap_mtx);
- itr = m_sync_connections.find(ACE_Based::Thread::current());
- if (itr != m_sync_connections.end())
- itr->second->Ping();
- }
-
- if (m_bundle_conn)
- m_bundle_conn->Ping();
+ /// Ping syncrhonous connections
+ for (uint8 i = 0; i < m_connections[IDX_SYNCH].size(); ++i)
+ m_connections[IDX_SYNCH][i]->Ping();
/// Assuming all worker threads are free, every worker thread will receive 1 ping operation request
/// If one or more worker threads are busy, the ping operations will not be split evenly, but this doesn't matter
/// as the sole purpose is to prevent connections from idling.
- for (size_t i = 0; i < m_async_connections.size(); ++i)
+ for (size_t i = 0; i < m_connections[IDX_ASYNC].size(); ++i)
Enqueue(new PingOperation);
}
@@ -340,7 +324,11 @@ class DatabaseWorkerPool
{
if (!to || !from || !length)
return 0;
- return (mysql_real_escape_string(GetConnection()->GetHandle(), to, from, length));
+
+ T* t = GetFreeConnection();
+ unsigned long ret = mysql_real_escape_string(t->GetHandle(), to, from, length);
+ t->Unlock();
+ return ret;
}
void Enqueue(SQLOperation* op)
@@ -348,32 +336,33 @@ class DatabaseWorkerPool
m_queue->enqueue(op);
}
- MySQLConnection* GetConnection()
+ T* GetFreeConnection()
{
- MySQLConnection* conn;
- ConnectionMap::const_iterator itr;
+ uint8 i = 0;
+ size_t num_cons = m_connections[IDX_SYNCH].size();
+ for (;;) /// Block forever until a connection is free
{
- /*! MapUpdate + unbundled threads */
- ACE_Guard<ACE_Thread_Mutex> guard(m_connectionMap_mtx);
- itr = m_sync_connections.find(ACE_Based::Thread::current());
- if (itr != m_sync_connections.end())
- return itr->second;
+ T* t = m_connections[IDX_SYNCH][++i % num_cons ];
+ if (t->LockIfReady()) /// Must be matched with t->Unlock() or you will get deadlocks
+ return t;
}
- /*! Bundled threads */
- conn = m_bundle_conn;
- ASSERT (conn);
- return conn;
+
+ // This will be called when Celine Dion learns to sing
+ return NULL;
}
private:
+ enum
+ {
+ IDX_ASYNC,
+ IDX_SYNCH,
+ IDX_SIZE,
+ };
+
ACE_Activation_Queue* m_queue; //! Queue shared by async worker threads.
- std::vector<T*> m_async_connections;
- ConnectionMap m_sync_connections; //! Holds a mysql connection+thread per mapUpdate thread and unbundled runnnables.
- ACE_Thread_Mutex m_connectionMap_mtx; //! For thread safe access to the synchroneous connection map
- T* m_bundle_conn; //! Bundled connection (see Database.ThreadBundleMask config)
- AtomicUInt m_connections; //! Counter of MySQL connections;
+ std::vector< std::vector<T*> > m_connections;
+ AtomicUInt m_connectionCount; //! Counter of MySQL connections;
std::string m_infoString; //! Infostring that is passed on to child connections.
- MySQLThreadBundle m_bundleMask; //! Our configured bundle mask (see enum)
};
#endif
diff --git a/src/server/shared/Database/MySQLConnection.cpp b/src/server/shared/Database/MySQLConnection.cpp
index f3fa2f352da..83d95268e1a 100644
--- a/src/server/shared/Database/MySQLConnection.cpp
+++ b/src/server/shared/Database/MySQLConnection.cpp
@@ -49,11 +49,20 @@ m_Mysql(NULL)
MySQLConnection::~MySQLConnection()
{
+ ASSERT (m_Mysql); /// MySQL context must be present at this point
+
+ sLog.outSQLDriver("MySQLConnection::~MySQLConnection()");
for (size_t i = 0; i < m_stmts.size(); ++i)
delete m_stmts[i];
- MySQL::Thread_End();
mysql_close(m_Mysql);
+ Unlock(); /// Unlock while we die, how ironic
+}
+
+void MySQLConnection::Close()
+{
+ /// Only close us if we're not operating
+ delete this;
}
bool MySQLConnection::Open(const std::string& infoString)
@@ -163,9 +172,6 @@ bool MySQLConnection::Execute(const char* sql)
return false;
{
- // guarded block for thread-safe mySQL request
- ACE_Guard<ACE_Thread_Mutex> query_connection_guard(m_Mutex);
-
#ifdef SQLQUERY_LOG
uint32 _s = getMSTime();
#endif
@@ -193,9 +199,6 @@ bool MySQLConnection::Execute(PreparedStatement* stmt)
uint32 index = stmt->m_index;
{
- // guarded block for thread-safe mySQL request
- ACE_Guard<ACE_Thread_Mutex> query_connection_guard(m_Mutex);
-
MySQLPreparedStatement* m_mStmt = GetPreparedStatement(index);
ASSERT(m_mStmt); // Can only be null if preparation failed, server side error or bad query
m_mStmt->m_stmt = stmt; // Cross reference them for debug output
@@ -238,9 +241,6 @@ bool MySQLConnection::_Query(PreparedStatement* stmt, MYSQL_RES **pResult, MYSQL
uint32 index = stmt->m_index;
{
- // guarded block for thread-safe mySQL request
- ACE_Guard<ACE_Thread_Mutex> query_connection_guard(m_Mutex);
-
MySQLPreparedStatement* m_mStmt = GetPreparedStatement(index);
ASSERT(m_mStmt); // Can only be null if preparation failed, server side error or bad query
m_mStmt->m_stmt = stmt; // Cross reference them for debug output
@@ -304,8 +304,6 @@ bool MySQLConnection::_Query(const char *sql, MYSQL_RES **pResult, MYSQL_FIELD *
return false;
{
- // guarded block for thread-safe mySQL request
- ACE_Guard<ACE_Thread_Mutex> query_connection_guard(m_Mutex);
#ifdef SQLQUERY_LOG
uint32 _s = getMSTime();
#endif
diff --git a/src/server/shared/Database/MySQLConnection.h b/src/server/shared/Database/MySQLConnection.h
index 93812421c8a..f83d22218dd 100644
--- a/src/server/shared/Database/MySQLConnection.h
+++ b/src/server/shared/Database/MySQLConnection.h
@@ -25,10 +25,12 @@
class DatabaseWorker;
class PreparedStatement;
class MySQLPreparedStatement;
+class PingOperation;
class MySQLConnection
{
template <class T> friend class DatabaseWorkerPool;
+ friend class PingOperation;
public:
MySQLConnection(); //! Constructor for synchroneous connections.
@@ -36,6 +38,7 @@ class MySQLConnection
~MySQLConnection();
virtual bool Open(const std::string& infoString); //! Connection details.
+ void Close();
public:
bool Execute(const char* sql);
@@ -58,6 +61,19 @@ class MySQLConnection
void PrepareStatement(uint32 index, const char* sql);
std::vector<MySQLPreparedStatement*> m_stmts; //! PreparedStatements storage
+ bool LockIfReady()
+ {
+ /// Tries to acquire lock. If lock is acquired by another thread
+ /// the calling parent will just try another connection
+ return m_Mutex.tryacquire() != -1;
+ }
+
+ void Unlock()
+ {
+ /// Called by parent databasepool. Will let other threads access this connection
+ m_Mutex.release();
+ }
+
private:
ACE_Activation_Queue* m_queue; //! Queue shared with other asynchroneous connections.
DatabaseWorker* m_worker; //! Core worker task.