diff options
Diffstat (limited to 'src/server/shared/Database')
| -rw-r--r-- | src/server/shared/Database/DatabaseEnv.h | 16 | ||||
| -rw-r--r-- | src/server/shared/Database/DatabaseWorkerPool.cpp | 197 | ||||
| -rw-r--r-- | src/server/shared/Database/DatabaseWorkerPool.h | 259 | ||||
| -rw-r--r-- | src/server/shared/Database/MySQLConnection.h | 4 | ||||
| -rw-r--r-- | src/server/shared/Database/SQLStorage.cpp | 2 | ||||
| -rw-r--r-- | src/server/shared/Database/Transaction.h | 4 |
6 files changed, 258 insertions, 224 deletions
diff --git a/src/server/shared/Database/DatabaseEnv.h b/src/server/shared/Database/DatabaseEnv.h index 53b7dee5521..d35948b7ec7 100644 --- a/src/server/shared/Database/DatabaseEnv.h +++ b/src/server/shared/Database/DatabaseEnv.h @@ -18,7 +18,7 @@ * Foundation, Inc., 59 Temple Place, Suite 330, Boston, MA 02111-1307 USA */ -#if !defined(DATABASEENV_H) +#ifndef DATABASEENV_H #define DATABASEENV_H #include "Common.h" @@ -28,22 +28,22 @@ #include "Field.h" #include "QueryResult.h" -#include "DatabaseWorkerPool.h" #include "MySQLThreading.h" #include "Transaction.h" -typedef DatabaseWorkerPool DatabaseType; - +#define MAX_QUERY_LEN 32*1024 #define _LIKE_ "LIKE" #define _TABLE_SIM_ "`" #define _CONCAT3_(A,B,C) "CONCAT( " A " , " B " , " C " )" #define _OFFSET_ "LIMIT %d,1" -extern DatabaseType WorldDatabase; -extern DatabaseType CharacterDatabase; -extern DatabaseType LoginDatabase; +#include "Implementation/LoginDatabase.h" +#include "Implementation/CharacterDatabase.h" +#include "Implementation/WorldDatabase.h" -#define MAX_QUERY_LEN 32*1024 +extern WorldDatabaseWorkerPool WorldDatabase; +extern CharacterDatabaseWorkerPool CharacterDatabase; +extern LoginDatabaseWorkerPool LoginDatabase; #endif diff --git a/src/server/shared/Database/DatabaseWorkerPool.cpp b/src/server/shared/Database/DatabaseWorkerPool.cpp index a704829545a..93861354a6b 100644 --- a/src/server/shared/Database/DatabaseWorkerPool.cpp +++ b/src/server/shared/Database/DatabaseWorkerPool.cpp @@ -22,233 +22,62 @@ #include "MySQLConnection.h" #include "SQLOperation.h" -DatabaseWorkerPool::DatabaseWorkerPool() : -m_queue(new ACE_Activation_Queue(new ACE_Message_Queue<ACE_MT_SYNCH>)), -m_connections(0) -{ - m_infoString = ""; - - mysql_library_init(-1, NULL, NULL); - WPFatal (mysql_thread_safe(), "Used MySQL library isn't thread-safe."); -} +DatabaseWorkerPool:: +DatabaseWorkerPool:: -DatabaseWorkerPool::~DatabaseWorkerPool() -{ - mysql_library_end(); -} -bool DatabaseWorkerPool::Open(const std::string& infoString, uint8 num_threads) -{ - sLog.outSQLDriver("Creating bundled/master MySQL connection."); - m_bundle_conn = new MySQLConnection(); - m_bundle_conn->Open(infoString); - ++m_connections; - - m_async_connections.resize(num_threads); - - /// Open the Async pool - for (uint8 i = 0; i < num_threads; i++) - { - m_async_connections[i] = new MySQLConnection(m_queue); - m_async_connections[i]->Open(infoString); - ++m_connections; - sLog.outSQLDriver("Async database thread pool opened. Worker thread count: %u", num_threads); - } - - m_infoString = infoString; - return true; -} void DatabaseWorkerPool::Close() -{ - sLog.outSQLDriver("Closing down %u connections on this DatabaseWorkerPool", (uint32)m_connections.value()); - /// Shuts down worker threads for this connection pool. - m_queue->queue()->deactivate(); - - for (uint8 i = 0; i < m_async_connections.size(); i++) - { - m_async_connections[i]->m_worker->wait(); - --m_connections; - } - - delete m_bundle_conn; - m_bundle_conn = NULL; - --m_connections; - sLog.outSQLDriver("Closed bundled connection."); - - //- MySQL::Thread_End() should be called manually from the aborting calling threads - sLog.outSQLDriver("Waiting for %u synchroneous database threads to exit.", (uint32)m_connections.value()); - while (!m_sync_connections.empty()) - { - } - sLog.outSQLDriver("Synchroneous database threads exited succesfuly."); -} + /*! This function creates a new MySQL connection for every MapUpdate thread and every unbundled task. */ void DatabaseWorkerPool::Init_MySQL_Connection() -{ - MySQLConnection* conn = new MySQLConnection(); - conn->Open(m_infoString); - - { - ACE_Guard<ACE_Thread_Mutex> guard(m_connectionMap_mtx); - ConnectionMap::const_iterator itr = m_sync_connections.find(ACE_Based::Thread::current()); - #ifdef _DEBUG - if (itr != m_sync_connections.end()) - sLog.outSQLDriver("Thread ["UI64FMTD"] already started a MySQL connection", (uint64)ACE_Based::Thread::currentId()); - #endif - m_sync_connections[ACE_Based::Thread::current()] = conn; - } - - sLog.outSQLDriver("Core thread with ID ["UI64FMTD"] initializing MySQL connection.", - (uint64)ACE_Based::Thread::currentId()); - - ++m_connections; -} + void DatabaseWorkerPool::End_MySQL_Connection() -{ - MySQLConnection* conn; - { - ACE_Guard<ACE_Thread_Mutex> guard(m_connectionMap_mtx); - ConnectionMap::iterator itr = m_sync_connections.find(ACE_Based::Thread::current()); - #ifdef _DEBUG - if (itr == m_sync_connections.end()) - sLog.outSQLDriver("Thread ["UI64FMTD"] already shut down their MySQL connection.", (uint64)ACE_Based::Thread::currentId()); - #endif - conn = itr->second; - m_sync_connections.erase(itr); - } - delete conn; - conn = NULL; - --m_connections; -} + void DatabaseWorkerPool::Execute(const char* sql) -{ - if (!sql) - return; - BasicStatementTask* task = new BasicStatementTask(sql); - Enqueue(task); -} void DatabaseWorkerPool::PExecute(const char* sql, ...) { - if (!sql) - return; - - va_list ap; - char szQuery[MAX_QUERY_LEN]; - va_start(ap, sql); - vsnprintf(szQuery, MAX_QUERY_LEN, sql, ap); - va_end(ap); - - Execute(szQuery); + } void DatabaseWorkerPool::DirectExecute(const char* sql) { - if (sql) - GetConnection()->Execute(sql); + } void DatabaseWorkerPool::DirectPExecute(const char* sql, ...) { - if (!sql) - return; - - va_list ap; - char szQuery[MAX_QUERY_LEN]; - va_start(ap, sql); - vsnprintf(szQuery, MAX_QUERY_LEN, sql, ap); - va_end(ap); - - return DirectExecute(szQuery); + } QueryResult_AutoPtr DatabaseWorkerPool::Query(const char* sql) -{ - return GetConnection()->Query(sql); -} -QueryResult_AutoPtr DatabaseWorkerPool::PQuery(const char* sql, ...) -{ - if (!sql) - return QueryResult_AutoPtr(NULL); - va_list ap; - char szQuery[MAX_QUERY_LEN]; - va_start(ap, sql); - vsnprintf(szQuery, MAX_QUERY_LEN, sql, ap); - va_end(ap); +QueryResult_AutoPtr DatabaseWorkerPool::PQuery(const char* sql, ...) - return Query(szQuery); -} SQLTransaction DatabaseWorkerPool::BeginTransaction() { - return SQLTransaction(new Transaction); + } void DatabaseWorkerPool::CommitTransaction(SQLTransaction transaction) -{ - #ifdef _DEBUG - if (transaction->GetSize() == 0) - { - sLog.outSQLDriver("Transaction contains 0 queries. Not executing."); - return; - } - if (transaction->GetSize() == 1) - { - sLog.outSQLDriver("Warning: Transaction only holds 1 query, consider removing Transaction context in code."); - } - #endif - Enqueue(new TransactionTask(transaction)); -} + ACE_Future<QueryResult_AutoPtr> DatabaseWorkerPool::AsyncQuery(const char* sql) -{ - QueryResultFuture res; - BasicStatementTask* task = new BasicStatementTask(sql, res); - Enqueue(task); - return res; //! Fool compiler, has no use yet -} + ACE_Future<QueryResult_AutoPtr> DatabaseWorkerPool::AsyncPQuery(const char* sql, ...) -{ - va_list ap; - char szQuery[MAX_QUERY_LEN]; - va_start(ap, sql); - vsnprintf(szQuery, MAX_QUERY_LEN, sql, ap); - va_end(ap); - return AsyncQuery(szQuery); -} QueryResultHolderFuture DatabaseWorkerPool::DelayQueryHolder(SQLQueryHolder* holder) -{ - QueryResultHolderFuture res; - SQLQueryHolderTask* task = new SQLQueryHolderTask(holder, res); - Enqueue(task); - return res; //! Fool compiler, has no use yet -} MySQLConnection* DatabaseWorkerPool::GetConnection() -{ - MySQLConnection* conn; - ConnectionMap::const_iterator itr; - { - /*! MapUpdate + unbundled threads */ - ACE_Guard<ACE_Thread_Mutex> guard(m_connectionMap_mtx); - itr = m_sync_connections.find(ACE_Based::Thread::current()); - if (itr != m_sync_connections.end()) - conn = itr->second; - } - /*! Bundled threads */ - conn = m_bundle_conn; - ASSERT (conn); - return conn; -} + diff --git a/src/server/shared/Database/DatabaseWorkerPool.h b/src/server/shared/Database/DatabaseWorkerPool.h index 8736609473a..c4875ce4a9e 100644 --- a/src/server/shared/Database/DatabaseWorkerPool.h +++ b/src/server/shared/Database/DatabaseWorkerPool.h @@ -24,11 +24,12 @@ #include <ace/Atomic_Op_T.h> #include <ace/Thread_Mutex.h> +#include "Log.h" +#include "DatabaseEnv.h" #include "SQLOperation.h" -#include "QueryResult.h" #include "Callback.h" #include "MySQLConnection.h" -#include "Transaction.h" +#include "DatabaseWorker.h" enum MySQLThreadBundle { @@ -40,30 +41,220 @@ enum MySQLThreadBundle MYSQL_BUNDLE_ALL = MYSQL_BUNDLE_CLI | MYSQL_BUNDLE_RA | MYSQL_BUNDLE_RAR | MYSQL_BUNDLE_WORLD, }; +template <class T> class DatabaseWorkerPool { public: - DatabaseWorkerPool(); - ~DatabaseWorkerPool(); - - bool Open(const std::string& infoString, uint8 num_threads); - void Close(); - - void Init_MySQL_Connection(); - void End_MySQL_Connection(); - - void Execute(const char* sql); - void PExecute(const char* sql, ...); - void DirectExecute(const char* sql); - void DirectPExecute(const char* sql, ...); - QueryResult_AutoPtr Query(const char* sql); - QueryResult_AutoPtr PQuery(const char* sql, ...); - ACE_Future<QueryResult_AutoPtr> AsyncQuery(const char* sql); - ACE_Future<QueryResult_AutoPtr> AsyncPQuery(const char* sql, ...); - QueryResultHolderFuture DelayQueryHolder(SQLQueryHolder* holder); + DatabaseWorkerPool() : + m_queue(new ACE_Activation_Queue(new ACE_Message_Queue<ACE_MT_SYNCH>)), + m_connections(0) + { + m_infoString = ""; + + mysql_library_init(-1, NULL, NULL); + WPFatal (mysql_thread_safe(), "Used MySQL library isn't thread-safe."); + } + + ~DatabaseWorkerPool() + { + mysql_library_end(); + } + + bool Open(const std::string& infoString, uint8 num_threads) + { + sLog.outSQLDriver("Creating bundled/master MySQL connection."); + m_bundle_conn = new T(); + m_bundle_conn->Open(infoString); + ++m_connections; + + m_async_connections.resize(num_threads); + + /// Open the Async pool + for (uint8 i = 0; i < num_threads; i++) + { + m_async_connections[i] = new T(m_queue); + m_async_connections[i]->Open(infoString); + ++m_connections; + sLog.outSQLDriver("Async database thread pool opened. Worker thread count: %u", num_threads); + } + + m_infoString = infoString; + return true; + } + + void Close() + { + sLog.outSQLDriver("Closing down %u connections on this DatabaseWorkerPool", (uint32)m_connections.value()); + /// Shuts down worker threads for this connection pool. + m_queue->queue()->deactivate(); + + for (uint8 i = 0; i < m_async_connections.size(); i++) + { + m_async_connections[i]->m_worker->wait(); + --m_connections; + } + + delete m_bundle_conn; + m_bundle_conn = NULL; + --m_connections; + sLog.outSQLDriver("Closed bundled connection."); + + //- MySQL::Thread_End() should be called manually from the aborting calling threads + sLog.outSQLDriver("Waiting for %u synchroneous database threads to exit.", (uint32)m_connections.value()); + while (!m_sync_connections.empty()) + { + } + sLog.outSQLDriver("Synchroneous database threads exited succesfuly."); + } + + void Init_MySQL_Connection() + { + T* conn = new T(); + conn->Open(m_infoString); + + { + ACE_Guard<ACE_Thread_Mutex> guard(m_connectionMap_mtx); + ConnectionMap::const_iterator itr = m_sync_connections.find(ACE_Based::Thread::current()); + #ifdef _DEBUG + if (itr != m_sync_connections.end()) + sLog.outSQLDriver("Thread ["UI64FMTD"] already started a MySQL connection", (uint64)ACE_Based::Thread::currentId()); + #endif + m_sync_connections[ACE_Based::Thread::current()] = conn; + } + + sLog.outSQLDriver("Core thread with ID ["UI64FMTD"] initializing MySQL connection.", + (uint64)ACE_Based::Thread::currentId()); + + ++m_connections; + } + + void End_MySQL_Connection() + { + T* conn; + { + ACE_Guard<ACE_Thread_Mutex> guard(m_connectionMap_mtx); + ConnectionMap::iterator itr = m_sync_connections.find(ACE_Based::Thread::current()); + #ifdef _DEBUG + if (itr == m_sync_connections.end()) + sLog.outSQLDriver("Thread ["UI64FMTD"] already shut down their MySQL connection.", (uint64)ACE_Based::Thread::currentId()); + #endif + conn = itr->second; + m_sync_connections.erase(itr); + } + delete conn; + conn = NULL; + --m_connections; + } + + void Execute(const char* sql) + { + if (!sql) + return; + + BasicStatementTask* task = new BasicStatementTask(sql); + Enqueue(task); + } + + void PExecute(const char* sql, ...) + { + if (!sql) + return; + + va_list ap; + char szQuery[MAX_QUERY_LEN]; + va_start(ap, sql); + vsnprintf(szQuery, MAX_QUERY_LEN, sql, ap); + va_end(ap); + + Execute(szQuery); + } + + void DirectExecute(const char* sql) + { + if (sql) + GetConnection()->Execute(sql); + } + + void DirectPExecute(const char* sql, ...) + { + if (!sql) + return; + + va_list ap; + char szQuery[MAX_QUERY_LEN]; + va_start(ap, sql); + vsnprintf(szQuery, MAX_QUERY_LEN, sql, ap); + va_end(ap); + + return DirectExecute(szQuery); + } + + QueryResult_AutoPtr Query(const char* sql) + { + return GetConnection()->Query(sql); + } + + QueryResult_AutoPtr PQuery(const char* sql, ...) + { + if (!sql) + return QueryResult_AutoPtr(NULL); + + va_list ap; + char szQuery[MAX_QUERY_LEN]; + va_start(ap, sql); + vsnprintf(szQuery, MAX_QUERY_LEN, sql, ap); + va_end(ap); + + return Query(szQuery); + } + + ACE_Future<QueryResult_AutoPtr> AsyncQuery(const char* sql) + { + QueryResultFuture res; + BasicStatementTask* task = new BasicStatementTask(sql, res); + Enqueue(task); + return res; //! Actual return value has no use yet + } + + ACE_Future<QueryResult_AutoPtr> AsyncPQuery(const char* sql, ...) + { + va_list ap; + char szQuery[MAX_QUERY_LEN]; + va_start(ap, sql); + vsnprintf(szQuery, MAX_QUERY_LEN, sql, ap); + va_end(ap); + + return AsyncQuery(szQuery); + } + + QueryResultHolderFuture DelayQueryHolder(SQLQueryHolder* holder) + { + QueryResultHolderFuture res; + SQLQueryHolderTask* task = new SQLQueryHolderTask(holder, res); + Enqueue(task); + return res; //! Fool compiler, has no use yet + } - SQLTransaction BeginTransaction(); - void CommitTransaction(SQLTransaction transaction); + SQLTransaction BeginTransaction() + { + return SQLTransaction(new Transaction); + } + + void CommitTransaction(SQLTransaction transaction) + { + #ifdef TRINITY_DEBUG + if (transaction->GetSize() == 0) + { + sLog.outSQLDriver("Transaction contains 0 queries. Not executing."); + return; + } + if (transaction->GetSize() == 1) + { + sLog.outSQLDriver("Warning: Transaction only holds 1 query, consider removing Transaction context in code."); + } + #endif + Enqueue(new TransactionTask(transaction)); + } void escape_string(std::string& str) { @@ -89,20 +280,34 @@ class DatabaseWorkerPool m_queue->enqueue(op); } - MySQLConnection* GetConnection(); + T* GetConnection() + { + T* conn; + ConnectionMap::const_iterator itr; + { + /*! MapUpdate + unbundled threads */ + ACE_Guard<ACE_Thread_Mutex> guard(m_connectionMap_mtx); + itr = m_sync_connections.find(ACE_Based::Thread::current()); + if (itr != m_sync_connections.end()) + conn = itr->second; + } + /*! Bundled threads */ + conn = m_bundle_conn; + ASSERT (conn); + return conn; + } private: - typedef UNORDERED_MAP<ACE_Based::Thread*, MySQLConnection*> ConnectionMap; - typedef UNORDERED_MAP<ACE_Based::Thread*, TransactionTask*> TransactionQueues; + typedef UNORDERED_MAP<ACE_Based::Thread*, T*> ConnectionMap; typedef ACE_Atomic_Op<ACE_SYNCH_MUTEX, uint32> AtomicUInt; private: ACE_Activation_Queue* m_queue; //! Queue shared by async worker threads. ACE_Thread_Mutex m_queue_mtx; //! For thread safe enqueues of delayed statements. - std::vector<MySQLConnection*> m_async_connections; + std::vector<T*> m_async_connections; ConnectionMap m_sync_connections; //! Holds a mysql connection+thread per mapUpdate thread and unbundled runnnables. ACE_Thread_Mutex m_connectionMap_mtx; //! For thread safe access to the synchroneous connection map - MySQLConnection* m_bundle_conn; //! Bundled connection (see Database.ThreadBundleMask config) + T* m_bundle_conn; //! Bundled connection (see Database.ThreadBundleMask config) AtomicUInt m_connections; //! Counter of MySQL connections; std::string m_infoString; //! Infostring that is passed on to child connections. }; diff --git a/src/server/shared/Database/MySQLConnection.h b/src/server/shared/Database/MySQLConnection.h index 08ceaa2860c..5e68e9d7516 100644 --- a/src/server/shared/Database/MySQLConnection.h +++ b/src/server/shared/Database/MySQLConnection.h @@ -23,14 +23,14 @@ class DatabaseWorker; class MySQLConnection { - friend class DatabaseWorkerPool; + template <class T> friend class DatabaseWorkerPool; public: MySQLConnection(); //! Constructor for synchroneous connections. MySQLConnection(ACE_Activation_Queue* queue); //! Constructor for asynchroneous connections. ~MySQLConnection(); - bool Open(const std::string& infoString); //! Connection details. + virtual bool Open(const std::string& infoString); //! Connection details. public: bool Execute(const char* sql); diff --git a/src/server/shared/Database/SQLStorage.cpp b/src/server/shared/Database/SQLStorage.cpp index 26aecb80fa3..f71b90b300e 100644 --- a/src/server/shared/Database/SQLStorage.cpp +++ b/src/server/shared/Database/SQLStorage.cpp @@ -21,8 +21,6 @@ #include "SQLStorage.h" #include "SQLStorageImpl.h" -extern DatabaseType WorldDatabase; - const char CreatureInfosrcfmt[]="iiiiiiiiiisssiiiiiiifffiffiifiiiiiiiiiiffiiiiiiiiiiiiiiiiiiiiiiiisiifffliiiiiiiliiisi"; const char CreatureInfodstfmt[]="iiiiiiiiiisssibbiiiifffiffiifiiiiiiiiiiffiiiiiiiiiiiiiiiiiiiiiiiisiifffliiiiiiiliiiii"; const char CreatureDataAddonInfofmt[]="iiiiiis"; diff --git a/src/server/shared/Database/Transaction.h b/src/server/shared/Database/Transaction.h index e94ca053e32..58c87b61270 100644 --- a/src/server/shared/Database/Transaction.h +++ b/src/server/shared/Database/Transaction.h @@ -19,6 +19,8 @@ #ifndef _TRANSACTION_H #define _TRANSACTION_H +#include "SQLOperation.h" + /*! Transactions, high level class. */ class Transaction { @@ -43,7 +45,7 @@ typedef ACE_Refcounted_Auto_Ptr<Transaction, ACE_Null_Mutex> SQLTransaction; /*! Low level class*/ class TransactionTask : public SQLOperation { - friend class DatabaseWorkerPool; + template <class T> friend class DatabaseWorkerPool; friend class DatabaseWorker; public: |
