diff options
| author | Machiavelli <none@none> | 2010-09-27 00:20:56 +0200 |
|---|---|---|
| committer | Machiavelli <none@none> | 2010-09-27 00:20:56 +0200 |
| commit | a9e9a2c8848c22e4a3e3b7bab0caeca25d9ea408 (patch) | |
| tree | a7c4960796d0a9a42cb1e0252d4a75c4436d1f01 /src/server/shared | |
| parent | 894b2081b3837575bd44c71ea4ebc76008b5b5e3 (diff) | |
Core/DBLayer:
- DB Threading model update
* Get rid of ThreadBundleMask and bundled connection
* Implement configurable amount of Synch threads for databasepools
* Use modulus based algorithm to check for free synchronous connections instead of previous ¨get connection by thread key or bundlemask¨ feature
* Locks on mysql context objects are now managed outside the mysql query methods
Fixes issue #4058
Fixes issue #4059
Introduces a ton of more issues. Use at own risk. You were warned. Really.
Don´t forget to update your worldserver.conf
--HG--
branch : trunk
Diffstat (limited to 'src/server/shared')
| -rw-r--r-- | src/server/shared/Database/DatabaseWorker.cpp | 5 | ||||
| -rw-r--r-- | src/server/shared/Database/DatabaseWorker.h | 2 | ||||
| -rw-r--r-- | src/server/shared/Database/DatabaseWorkerPool.h | 227 | ||||
| -rw-r--r-- | src/server/shared/Database/MySQLConnection.cpp | 22 | ||||
| -rw-r--r-- | src/server/shared/Database/MySQLConnection.h | 16 |
5 files changed, 136 insertions, 136 deletions
diff --git a/src/server/shared/Database/DatabaseWorker.cpp b/src/server/shared/Database/DatabaseWorker.cpp index 0fd7287f23a..9d903ac6db4 100644 --- a/src/server/shared/Database/DatabaseWorker.cpp +++ b/src/server/shared/Database/DatabaseWorker.cpp @@ -35,8 +35,6 @@ int DatabaseWorker::svc() if (!m_queue) return -1; - MySQL::Thread_Init(); - SQLOperation *request = NULL; while (1) { @@ -50,8 +48,7 @@ int DatabaseWorker::svc() delete request; } - MySQL::Thread_End(); - delete m_conn; + m_conn->Close(); return 0; } diff --git a/src/server/shared/Database/DatabaseWorker.h b/src/server/shared/Database/DatabaseWorker.h index cb224620020..5bb3a64eba3 100644 --- a/src/server/shared/Database/DatabaseWorker.h +++ b/src/server/shared/Database/DatabaseWorker.h @@ -24,7 +24,7 @@ class MySQLConnection; -class DatabaseWorker : protected ACE_Task_Base +class DatabaseWorker : public ACE_Task_Base { public: DatabaseWorker(ACE_Activation_Queue* new_queue, MySQLConnection* con); diff --git a/src/server/shared/Database/DatabaseWorkerPool.h b/src/server/shared/Database/DatabaseWorkerPool.h index 9a7134eb2ce..164fce8cb5f 100644 --- a/src/server/shared/Database/DatabaseWorkerPool.h +++ b/src/server/shared/Database/DatabaseWorkerPool.h @@ -33,23 +33,20 @@ #include "QueryHolder.h" #include "AdhocStatement.h" -enum MySQLThreadBundle -{ - MYSQL_BUNDLE_NONE = 0x00, //- Each task will run their own MySQL connection - MYSQL_BUNDLE_UNUSED = 0x01, //- Temp unused - MYSQL_BUNDLE_RA = 0x02, //- Remote admin thread - MYSQL_BUNDLE_RAR = 0x04, //- Reactor runnable thread - MYSQL_BUNDLE_WORLD = 0x08, //- WorldRunnable - MYSQL_BUNDLE_ALL = MYSQL_BUNDLE_RA | MYSQL_BUNDLE_RAR | MYSQL_BUNDLE_WORLD, -}; - class PingOperation : public SQLOperation { /// Operation for idle delaythreads bool Execute() { - m_conn->Ping(); - return true; + for (;;) + if (m_conn->LockIfReady()) + { + m_conn->Ping(); + m_conn->Unlock(); + return true; + } + + return false; } }; @@ -57,7 +54,6 @@ template <class T> class DatabaseWorkerPool { private: - typedef UNORDERED_MAP<ACE_Based::Thread*, MySQLConnection*> ConnectionMap; typedef ACE_Atomic_Op<ACE_SYNCH_MUTEX, uint32> AtomicUInt; public: @@ -66,6 +62,7 @@ class DatabaseWorkerPool m_connections(0) { m_infoString = ""; + m_connections.resize(IDX_SIZE); mysql_library_init(-1, NULL, NULL); WPFatal (mysql_thread_safe(), "Used MySQL library isn't thread-safe."); @@ -73,97 +70,69 @@ class DatabaseWorkerPool ~DatabaseWorkerPool() { + sLog.outSQLDriver("~DatabaseWorkerPool for '%s'.", "missingname"); mysql_library_end(); } - bool Open(const std::string& infoString, uint8 num_threads, MySQLThreadBundle mask) + bool Open(const std::string& infoString, uint8 async_threads, uint8 synch_threads) { - //- Only created bundled connection if configured - m_bundle_conn = NULL; - if (mask != MYSQL_BUNDLE_NONE) + sLog.outSQLDriver("Opening databasepool '%s'. Async threads: %u, synch threads: %u", "nonameyet", async_threads, synch_threads); + + /// Open asynchronous connections (delayed operations) + m_connections[IDX_ASYNC].resize(async_threads); + for (uint8 i = 0; i < async_threads; ++i) { - sLog.outSQLDriver("Creating bundled/master MySQL connection."); - m_bundle_conn = new T(); - m_bundle_conn->Open(infoString); - ++m_connections; + T* t = new T(m_queue); + t->Open(infoString); + m_connections[IDX_ASYNC][i] = t; + ++m_connectionCount; } - m_async_connections.resize(num_threads); - - /// Open the Async pool - for (uint8 i = 0; i < num_threads; i++) + /// Open synchronous connections (direct, blocking operations) + m_connections[IDX_SYNCH].resize(synch_threads); + for (uint8 i = 0; i < synch_threads; ++i) { - m_async_connections[i] = new T(m_queue); - m_async_connections[i]->Open(infoString); - ++m_connections; - sLog.outSQLDriver("Async database thread pool opened. Worker thread count: %u", num_threads); + T* t = new T(); + t->Open(infoString); + m_connections[IDX_SYNCH][i] = t; + ++m_connectionCount; } + /// TODO: Connection details in a struct m_infoString = infoString; + + sLog.outSQLDriver("Databasepool opened succesfuly. %u connections running.", (uint32)m_connectionCount.value()); return true; } void Close() { - sLog.outSQLDriver("Closing down %u connections on this DatabaseWorkerPool", (uint32)m_connections.value()); - /// Shuts down worker threads for this connection pool. - m_queue->queue()->deactivate(); - - for (uint8 i = 0; i < m_async_connections.size(); i++) - { - m_async_connections[i]->m_worker->wait(); - --m_connections; - } - - if (m_bundleMask != MYSQL_BUNDLE_NONE) - { - delete m_bundle_conn; - m_bundle_conn = NULL; - --m_connections; - sLog.outSQLDriver("Closed bundled connection."); - } - - //- MySQL::Thread_End() should be called manually from the aborting calling threads - } - - void Init_MySQL_Connection() - { - T* conn = new T(); - conn->Open(m_infoString); + sLog.outSQLDriver("Closing down databasepool '%s'.", "missingname"); - // no idea why it doesn't accept sLog here + /// Shuts down delaythreads for this connection pool. + m_queue->queue()->deactivate(); + for (uint8 i = 0; i < m_connections[IDX_ASYNC].size(); ++i) { - ACE_Guard<ACE_Thread_Mutex> guard(m_connectionMap_mtx); - ConnectionMap::const_iterator itr = m_sync_connections.find(ACE_Based::Thread::current()); - #ifdef _DEBUG - if (itr != m_sync_connections.end()) - ACE_Singleton<Log, ACE_Thread_Mutex>::instance()->outSQLDriver("Thread ["UI64FMTD"] already started a MySQL connection", (uint64)ACE_Based::Thread::currentId()); - #endif - m_sync_connections[ACE_Based::Thread::current()] = conn; + /// TODO: Better way. probably should flip a boolean and check it on low level code before doing anything on the mysql ctx + /// Now we just wait until m_queue gives the signal to the worker threads to stop + T* t = m_connections[IDX_ASYNC][i]; + t->m_worker->wait(); // t->Close(); is called from worker thread + --m_connectionCount; } - ACE_Singleton<Log, ACE_Thread_Mutex>::instance()->outSQLDriver("Core thread with ID ["UI64FMTD"] initializing MySQL connection.", - (uint64)ACE_Based::Thread::currentId()); + sLog.outSQLDriver("Asynchronous connections on databasepool '%s' terminated. Proceeding with synchronous connections.", "missingname"); - ++m_connections; - } - - void End_MySQL_Connection() - { - MySQLConnection* conn; + /// Shut down the synchronous connections + for (uint8 i = 0; i < m_connections[IDX_SYNCH].size(); ++i) { - ACE_Guard<ACE_Thread_Mutex> guard(m_connectionMap_mtx); - ConnectionMap::iterator itr = m_sync_connections.find(ACE_Based::Thread::current()); - #ifdef _DEBUG - if (itr == m_sync_connections.end()) - sLog.outSQLDriver("Thread ["UI64FMTD"] already shut down their MySQL connection.", (uint64)ACE_Based::Thread::currentId()); - #endif - conn = itr->second; - m_sync_connections.erase(itr); + T* t = m_connections[IDX_SYNCH][i]; + //while (1) + // if (t->LockIfReady()) -- For some reason deadlocks us + t->Close(); + --m_connectionCount; } - delete conn; - conn = NULL; - --m_connections; + + sLog.outSQLDriver("All connections on databasepool 'missingname' closed."); } void Execute(const char* sql) @@ -191,8 +160,12 @@ class DatabaseWorkerPool void DirectExecute(const char* sql) { - if (sql) - GetConnection()->Execute(sql); + if (!sql) + return; + + T* t = GetFreeConnection(); + t->Execute(sql); + t->Unlock(); } void DirectPExecute(const char* sql, ...) @@ -209,9 +182,13 @@ class DatabaseWorkerPool return DirectExecute(szQuery); } - QueryResult Query(const char* sql) + QueryResult Query(const char* sql, MySQLConnection* conn = NULL) { - ResultSet* result = GetConnection()->Query(sql); + if (!conn) + conn = GetFreeConnection(); + + ResultSet* result = conn->Query(sql); + conn->Unlock(); if (!result || !result->GetRowCount()) return QueryResult(NULL); @@ -219,6 +196,20 @@ class DatabaseWorkerPool return QueryResult(result); } + QueryResult PQuery(const char* sql, MySQLConnection* conn, ...) + { + if (!sql) + return QueryResult(NULL); + + va_list ap; + char szQuery[MAX_QUERY_LEN]; + va_start(ap, sql); + vsnprintf(szQuery, MAX_QUERY_LEN, sql, ap); + va_end(ap); + + return Query(szQuery, conn); + } + QueryResult PQuery(const char* sql, ...) { if (!sql) @@ -303,11 +294,12 @@ class DatabaseWorkerPool delete[] buf; } - MySQLThreadBundle GetBundleMask() { return m_bundleMask; } - PreparedQueryResult Query(PreparedStatement* stmt) { - PreparedResultSet* ret = GetConnection()->Query(stmt); + T* t = GetFreeConnection(); + PreparedResultSet* ret = t->Query(stmt); + t->Unlock(); + if (!ret || !ret->GetRowCount()) return PreparedQueryResult(NULL); @@ -316,22 +308,14 @@ class DatabaseWorkerPool void KeepAlive() { - ConnectionMap::const_iterator itr; - { - /*! MapUpdate + unbundled threads */ - ACE_Guard<ACE_Thread_Mutex> guard(m_connectionMap_mtx); - itr = m_sync_connections.find(ACE_Based::Thread::current()); - if (itr != m_sync_connections.end()) - itr->second->Ping(); - } - - if (m_bundle_conn) - m_bundle_conn->Ping(); + /// Ping syncrhonous connections + for (uint8 i = 0; i < m_connections[IDX_SYNCH].size(); ++i) + m_connections[IDX_SYNCH][i]->Ping(); /// Assuming all worker threads are free, every worker thread will receive 1 ping operation request /// If one or more worker threads are busy, the ping operations will not be split evenly, but this doesn't matter /// as the sole purpose is to prevent connections from idling. - for (size_t i = 0; i < m_async_connections.size(); ++i) + for (size_t i = 0; i < m_connections[IDX_ASYNC].size(); ++i) Enqueue(new PingOperation); } @@ -340,7 +324,11 @@ class DatabaseWorkerPool { if (!to || !from || !length) return 0; - return (mysql_real_escape_string(GetConnection()->GetHandle(), to, from, length)); + + T* t = GetFreeConnection(); + unsigned long ret = mysql_real_escape_string(t->GetHandle(), to, from, length); + t->Unlock(); + return ret; } void Enqueue(SQLOperation* op) @@ -348,32 +336,33 @@ class DatabaseWorkerPool m_queue->enqueue(op); } - MySQLConnection* GetConnection() + T* GetFreeConnection() { - MySQLConnection* conn; - ConnectionMap::const_iterator itr; + uint8 i = 0; + size_t num_cons = m_connections[IDX_SYNCH].size(); + for (;;) /// Block forever until a connection is free { - /*! MapUpdate + unbundled threads */ - ACE_Guard<ACE_Thread_Mutex> guard(m_connectionMap_mtx); - itr = m_sync_connections.find(ACE_Based::Thread::current()); - if (itr != m_sync_connections.end()) - return itr->second; + T* t = m_connections[IDX_SYNCH][++i % num_cons ]; + if (t->LockIfReady()) /// Must be matched with t->Unlock() or you will get deadlocks + return t; } - /*! Bundled threads */ - conn = m_bundle_conn; - ASSERT (conn); - return conn; + + // This will be called when Celine Dion learns to sing + return NULL; } private: + enum + { + IDX_ASYNC, + IDX_SYNCH, + IDX_SIZE, + }; + ACE_Activation_Queue* m_queue; //! Queue shared by async worker threads. - std::vector<T*> m_async_connections; - ConnectionMap m_sync_connections; //! Holds a mysql connection+thread per mapUpdate thread and unbundled runnnables. - ACE_Thread_Mutex m_connectionMap_mtx; //! For thread safe access to the synchroneous connection map - T* m_bundle_conn; //! Bundled connection (see Database.ThreadBundleMask config) - AtomicUInt m_connections; //! Counter of MySQL connections; + std::vector< std::vector<T*> > m_connections; + AtomicUInt m_connectionCount; //! Counter of MySQL connections; std::string m_infoString; //! Infostring that is passed on to child connections. - MySQLThreadBundle m_bundleMask; //! Our configured bundle mask (see enum) }; #endif diff --git a/src/server/shared/Database/MySQLConnection.cpp b/src/server/shared/Database/MySQLConnection.cpp index f3fa2f352da..83d95268e1a 100644 --- a/src/server/shared/Database/MySQLConnection.cpp +++ b/src/server/shared/Database/MySQLConnection.cpp @@ -49,11 +49,20 @@ m_Mysql(NULL) MySQLConnection::~MySQLConnection() { + ASSERT (m_Mysql); /// MySQL context must be present at this point + + sLog.outSQLDriver("MySQLConnection::~MySQLConnection()"); for (size_t i = 0; i < m_stmts.size(); ++i) delete m_stmts[i]; - MySQL::Thread_End(); mysql_close(m_Mysql); + Unlock(); /// Unlock while we die, how ironic +} + +void MySQLConnection::Close() +{ + /// Only close us if we're not operating + delete this; } bool MySQLConnection::Open(const std::string& infoString) @@ -163,9 +172,6 @@ bool MySQLConnection::Execute(const char* sql) return false; { - // guarded block for thread-safe mySQL request - ACE_Guard<ACE_Thread_Mutex> query_connection_guard(m_Mutex); - #ifdef SQLQUERY_LOG uint32 _s = getMSTime(); #endif @@ -193,9 +199,6 @@ bool MySQLConnection::Execute(PreparedStatement* stmt) uint32 index = stmt->m_index; { - // guarded block for thread-safe mySQL request - ACE_Guard<ACE_Thread_Mutex> query_connection_guard(m_Mutex); - MySQLPreparedStatement* m_mStmt = GetPreparedStatement(index); ASSERT(m_mStmt); // Can only be null if preparation failed, server side error or bad query m_mStmt->m_stmt = stmt; // Cross reference them for debug output @@ -238,9 +241,6 @@ bool MySQLConnection::_Query(PreparedStatement* stmt, MYSQL_RES **pResult, MYSQL uint32 index = stmt->m_index; { - // guarded block for thread-safe mySQL request - ACE_Guard<ACE_Thread_Mutex> query_connection_guard(m_Mutex); - MySQLPreparedStatement* m_mStmt = GetPreparedStatement(index); ASSERT(m_mStmt); // Can only be null if preparation failed, server side error or bad query m_mStmt->m_stmt = stmt; // Cross reference them for debug output @@ -304,8 +304,6 @@ bool MySQLConnection::_Query(const char *sql, MYSQL_RES **pResult, MYSQL_FIELD * return false; { - // guarded block for thread-safe mySQL request - ACE_Guard<ACE_Thread_Mutex> query_connection_guard(m_Mutex); #ifdef SQLQUERY_LOG uint32 _s = getMSTime(); #endif diff --git a/src/server/shared/Database/MySQLConnection.h b/src/server/shared/Database/MySQLConnection.h index 93812421c8a..f83d22218dd 100644 --- a/src/server/shared/Database/MySQLConnection.h +++ b/src/server/shared/Database/MySQLConnection.h @@ -25,10 +25,12 @@ class DatabaseWorker; class PreparedStatement; class MySQLPreparedStatement; +class PingOperation; class MySQLConnection { template <class T> friend class DatabaseWorkerPool; + friend class PingOperation; public: MySQLConnection(); //! Constructor for synchroneous connections. @@ -36,6 +38,7 @@ class MySQLConnection ~MySQLConnection(); virtual bool Open(const std::string& infoString); //! Connection details. + void Close(); public: bool Execute(const char* sql); @@ -58,6 +61,19 @@ class MySQLConnection void PrepareStatement(uint32 index, const char* sql); std::vector<MySQLPreparedStatement*> m_stmts; //! PreparedStatements storage + bool LockIfReady() + { + /// Tries to acquire lock. If lock is acquired by another thread + /// the calling parent will just try another connection + return m_Mutex.tryacquire() != -1; + } + + void Unlock() + { + /// Called by parent databasepool. Will let other threads access this connection + m_Mutex.release(); + } + private: ACE_Activation_Queue* m_queue; //! Queue shared with other asynchroneous connections. DatabaseWorker* m_worker; //! Core worker task. |
