diff options
author | Machiavelli <none@none> | 2010-09-02 17:47:50 +0200 |
---|---|---|
committer | Machiavelli <none@none> | 2010-09-02 17:47:50 +0200 |
commit | 0117af4c37324eac2c70bc249899c19b8dcb7b80 (patch) | |
tree | e47c44854b8bca2bb5037bff0d8a16f53c1f1b8b /src | |
parent | 39f901d5660136f69c6252f4a21153d9f5d0732b (diff) |
Core/DBLayer:
- Implement deriviate classes of MySQLConnection for every database type (world, realm, characters)
- Make DatabaseWorkerPool templatized and use the above mentioned classes as parameter
- Implementation of the new types in code
(This is in preparation of prepared statement interface)
--HG--
branch : trunk
Diffstat (limited to 'src')
-rw-r--r-- | src/server/authserver/Main.cpp | 2 | ||||
-rw-r--r-- | src/server/authserver/Realms/RealmList.cpp | 3 | ||||
-rw-r--r-- | src/server/authserver/Server/AuthSocket.cpp | 2 | ||||
-rw-r--r-- | src/server/game/AI/EventAI/CreatureEventAIMgr.cpp | 2 | ||||
-rw-r--r-- | src/server/game/Addons/AddonMgr.cpp | 4 | ||||
-rw-r--r-- | src/server/game/Globals/ObjectMgr.cpp | 4 | ||||
-rw-r--r-- | src/server/game/Globals/ObjectMgr.h | 6 | ||||
-rw-r--r-- | src/server/game/Instances/InstanceSaveMgr.cpp | 18 | ||||
-rw-r--r-- | src/server/game/Instances/InstanceSaveMgr.h | 2 | ||||
-rw-r--r-- | src/server/game/Scripting/ScriptSystem.cpp | 4 | ||||
-rw-r--r-- | src/server/shared/Database/DatabaseEnv.h | 16 | ||||
-rw-r--r-- | src/server/shared/Database/DatabaseWorkerPool.cpp | 197 | ||||
-rw-r--r-- | src/server/shared/Database/DatabaseWorkerPool.h | 259 | ||||
-rw-r--r-- | src/server/shared/Database/MySQLConnection.h | 4 | ||||
-rw-r--r-- | src/server/shared/Database/SQLStorage.cpp | 2 | ||||
-rw-r--r-- | src/server/shared/Database/Transaction.h | 4 | ||||
-rw-r--r-- | src/server/worldserver/Main.cpp | 6 |
17 files changed, 280 insertions, 255 deletions
diff --git a/src/server/authserver/Main.cpp b/src/server/authserver/Main.cpp index d0698b10592..1e140e4e1c0 100644 --- a/src/server/authserver/Main.cpp +++ b/src/server/authserver/Main.cpp @@ -63,7 +63,7 @@ bool StartDB(); bool stopEvent = false; ///< Setting it to true stops the server -DatabaseType LoginDatabase; ///< Accessor to the realm server database +LoginDatabaseWorkerPool LoginDatabase; ///< Accessor to the realm server database /// Handle realmd's termination signals class RealmdSignalHandler : public Trinity::SignalHandler diff --git a/src/server/authserver/Realms/RealmList.cpp b/src/server/authserver/Realms/RealmList.cpp index 1e989c36c5d..63cadf7cb22 100644 --- a/src/server/authserver/Realms/RealmList.cpp +++ b/src/server/authserver/Realms/RealmList.cpp @@ -26,9 +26,6 @@ #include "RealmList.h" #include "Database/DatabaseEnv.h" - -extern DatabaseType LoginDatabase; - RealmList::RealmList() : m_UpdateInterval(0), m_NextUpdateTime(time(NULL)) { } diff --git a/src/server/authserver/Server/AuthSocket.cpp b/src/server/authserver/Server/AuthSocket.cpp index 4fbdc53c420..24cca80fcea 100644 --- a/src/server/authserver/Server/AuthSocket.cpp +++ b/src/server/authserver/Server/AuthSocket.cpp @@ -34,8 +34,6 @@ #include "SHA1.h" //#include "Util.h" -- for commented utf8ToUpperOnlyLatin -extern DatabaseType LoginDatabase; - #define ChunkSize 2048 enum eAuthCmd diff --git a/src/server/game/AI/EventAI/CreatureEventAIMgr.cpp b/src/server/game/AI/EventAI/CreatureEventAIMgr.cpp index f8b9628d53b..6bdbdbe1b06 100644 --- a/src/server/game/AI/EventAI/CreatureEventAIMgr.cpp +++ b/src/server/game/AI/EventAI/CreatureEventAIMgr.cpp @@ -34,7 +34,7 @@ void CreatureEventAIMgr::LoadCreatureEventAI_Texts() m_CreatureEventAI_TextMap.clear(); // Load EventAI Text - sObjectMgr.LoadTrinityStrings(WorldDatabase,"creature_ai_texts",MIN_CREATURE_AI_TEXT_STRING_ID,MAX_CREATURE_AI_TEXT_STRING_ID); + sObjectMgr.LoadTrinityStrings("creature_ai_texts",MIN_CREATURE_AI_TEXT_STRING_ID,MAX_CREATURE_AI_TEXT_STRING_ID); // Gather Additional data from EventAI Texts QueryResult_AutoPtr result = WorldDatabase.Query("SELECT entry, sound, type, language, emote FROM creature_ai_texts"); diff --git a/src/server/game/Addons/AddonMgr.cpp b/src/server/game/Addons/AddonMgr.cpp index 830d439bfdd..2acf31d47f4 100644 --- a/src/server/game/Addons/AddonMgr.cpp +++ b/src/server/game/Addons/AddonMgr.cpp @@ -18,8 +18,6 @@ */ #include "DatabaseEnv.h" - - #include "AddonMgr.h" #include "ObjectAccessor.h" #include "Player.h" @@ -27,8 +25,6 @@ #include "SHA1.h" #include "ProgressBar.h" -extern DatabaseType LoginDatabase; - AddonMgr::AddonMgr() { } diff --git a/src/server/game/Globals/ObjectMgr.cpp b/src/server/game/Globals/ObjectMgr.cpp index 0fed45aad13..a44c5197eec 100644 --- a/src/server/game/Globals/ObjectMgr.cpp +++ b/src/server/game/Globals/ObjectMgr.cpp @@ -7680,7 +7680,7 @@ void ObjectMgr::LoadGameObjectForQuests() sLog.outString(">> Loaded %u GameObjects for quests", count); } -bool ObjectMgr::LoadTrinityStrings(DatabaseType& db, char const* table, int32 min_value, int32 max_value) +bool ObjectMgr::LoadTrinityStrings(char const* table, int32 min_value, int32 max_value) { int32 start_value = min_value; int32 end_value = max_value; @@ -7716,7 +7716,7 @@ bool ObjectMgr::LoadTrinityStrings(DatabaseType& db, char const* table, int32 mi ++itr; } - QueryResult_AutoPtr result = db.PQuery("SELECT entry,content_default,content_loc1,content_loc2,content_loc3,content_loc4,content_loc5,content_loc6,content_loc7,content_loc8 FROM %s",table); + QueryResult_AutoPtr result = WorldDatabase.PQuery("SELECT entry,content_default,content_loc1,content_loc2,content_loc3,content_loc4,content_loc5,content_loc6,content_loc7,content_loc8 FROM %s",table); if (!result) { diff --git a/src/server/game/Globals/ObjectMgr.h b/src/server/game/Globals/ObjectMgr.h index 389b017b959..515289502ad 100644 --- a/src/server/game/Globals/ObjectMgr.h +++ b/src/server/game/Globals/ObjectMgr.h @@ -625,8 +625,8 @@ class ObjectMgr void LoadSpellScriptNames(); void ValidateSpellScripts(); - bool LoadTrinityStrings(DatabaseType& db, char const* table, int32 min_value, int32 max_value); - bool LoadTrinityStrings() { return LoadTrinityStrings(WorldDatabase,"trinity_string",MIN_TRINITY_STRING_ID,MAX_TRINITY_STRING_ID); } + bool LoadTrinityStrings(char const* table, int32 min_value, int32 max_value); + bool LoadTrinityStrings() { return LoadTrinityStrings("trinity_string",MIN_TRINITY_STRING_ID,MAX_TRINITY_STRING_ID); } void LoadDbScriptStrings(); void LoadCreatureClassLevelStats(); void LoadCreatureLocales(); @@ -1104,7 +1104,7 @@ class ObjectMgr #define sObjectMgr (*ACE_Singleton<ObjectMgr, ACE_Null_Mutex>::instance()) // scripting access functions - bool LoadTrinityStrings(DatabaseType& db, char const* table,int32 start_value = MAX_CREATURE_AI_TEXT_STRING_ID, int32 end_value = std::numeric_limits<int32>::min()); + bool LoadTrinityStrings(char const* table,int32 start_value = MAX_CREATURE_AI_TEXT_STRING_ID, int32 end_value = std::numeric_limits<int32>::min()); uint32 GetAreaTriggerScriptId(uint32 trigger_id); uint32 GetScriptId(const char *name); ObjectMgr::ScriptNameMap& GetScriptNames(); diff --git a/src/server/game/Instances/InstanceSaveMgr.cpp b/src/server/game/Instances/InstanceSaveMgr.cpp index 09149f1b281..d6a4af533f5 100644 --- a/src/server/game/Instances/InstanceSaveMgr.cpp +++ b/src/server/game/Instances/InstanceSaveMgr.cpp @@ -219,7 +219,7 @@ bool InstanceSave::UnloadIfEmpty() return true; } -void InstanceSaveManager::_DelHelper(DatabaseType &db, const char *fields, const char *table, const char *queryTail,...) +void InstanceSaveManager::_DelHelper(const char *fields, const char *table, const char *queryTail,...) { Tokens fieldTokens = StrSplit(fields, ", "); ASSERT(fieldTokens.size() != 0); @@ -230,7 +230,7 @@ void InstanceSaveManager::_DelHelper(DatabaseType &db, const char *fields, const vsnprintf(szQueryTail, MAX_QUERY_LEN, queryTail, ap); va_end(ap); - QueryResult_AutoPtr result = db.PQuery("SELECT %s FROM %s %s", fields, table, szQueryTail); + QueryResult_AutoPtr result = CharacterDatabase.PQuery("SELECT %s FROM %s %s", fields, table, szQueryTail); if (result) { do @@ -243,7 +243,7 @@ void InstanceSaveManager::_DelHelper(DatabaseType &db, const char *fields, const db.escape_string(fieldValue); ss << (i != 0 ? " AND " : "") << fieldTokens[i] << " = '" << fieldValue << "'"; } - db.PExecute("DELETE FROM %s WHERE %s", table, ss.str().c_str()); + CharacterDatabase.PExecute("DELETE FROM %s WHERE %s", table, ss.str().c_str()); } while (result->NextRow()); } } @@ -257,15 +257,15 @@ void InstanceSaveManager::CleanupInstances() sInstanceSaveMgr.LoadResetTimes(); // clean character/group - instance binds with invalid group/characters - _DelHelper(CharacterDatabase, "character_instance.guid, instance", "character_instance", "LEFT JOIN characters ON character_instance.guid = characters.guid WHERE characters.guid IS NULL"); - _DelHelper(CharacterDatabase, "group_instance.guid, instance", "group_instance", "LEFT JOIN groups ON group_instance.guid = groups.guid LEFT JOIN characters ON groups.leaderGuid = characters.guid WHERE characters.guid IS NULL OR groups.guid IS NULL"); + _DelHelper("character_instance.guid, instance", "character_instance", "LEFT JOIN characters ON character_instance.guid = characters.guid WHERE characters.guid IS NULL"); + _DelHelper("group_instance.guid, instance", "group_instance", "LEFT JOIN groups ON group_instance.guid = groups.guid LEFT JOIN characters ON groups.leaderGuid = characters.guid WHERE characters.guid IS NULL OR groups.guid IS NULL"); // clean instances that do not have any players or groups bound to them - _DelHelper(CharacterDatabase, "id, map, difficulty", "instance", "LEFT JOIN character_instance ON character_instance.instance = id LEFT JOIN group_instance ON group_instance.instance = id WHERE character_instance.instance IS NULL AND group_instance.instance IS NULL"); + _DelHelper("id, map, difficulty", "instance", "LEFT JOIN character_instance ON character_instance.instance = id LEFT JOIN group_instance ON group_instance.instance = id WHERE character_instance.instance IS NULL AND group_instance.instance IS NULL"); // clean invalid instance references in other tables - _DelHelper(CharacterDatabase, "character_instance.guid, instance", "character_instance", "LEFT JOIN instance ON character_instance.instance = instance.id WHERE instance.id IS NULL"); - _DelHelper(CharacterDatabase, "guid, instance", "group_instance", "LEFT JOIN instance ON group_instance.instance = instance.id WHERE instance.id IS NULL"); + _DelHelper("character_instance.guid, instance", "character_instance", "LEFT JOIN instance ON character_instance.instance = instance.id WHERE instance.id IS NULL"); + _DelHelper("guid, instance", "group_instance", "LEFT JOIN instance ON group_instance.instance = instance.id WHERE instance.id IS NULL"); // creature_respawn and gameobject_respawn are in another database // first, obtain total instance set @@ -477,7 +477,7 @@ void InstanceSaveManager::LoadResetTimes() // clean expired instances, references to them will be deleted in CleanupInstances // must be done before calculating new reset times - _DelHelper(CharacterDatabase, "id, map, instance.difficulty", "instance", "LEFT JOIN instance_reset ON mapid = map AND instance.difficulty = instance_reset.difficulty WHERE (instance.resettime < '"UI64FMTD"' AND instance.resettime > '0') OR (NOT instance_reset.resettime IS NULL AND instance_reset.resettime < '"UI64FMTD"')", (uint64)now, (uint64)now); + _DelHelper("id, map, instance.difficulty", "instance", "LEFT JOIN instance_reset ON mapid = map AND instance.difficulty = instance_reset.difficulty WHERE (instance.resettime < '"UI64FMTD"' AND instance.resettime > '0') OR (NOT instance_reset.resettime IS NULL AND instance_reset.resettime < '"UI64FMTD"')", (uint64)now, (uint64)now); // calculate new global reset times for expired instances and those that have never been reset yet // add the global reset times to the priority queue diff --git a/src/server/game/Instances/InstanceSaveMgr.h b/src/server/game/Instances/InstanceSaveMgr.h index 9e375a50673..6583ce31215 100644 --- a/src/server/game/Instances/InstanceSaveMgr.h +++ b/src/server/game/Instances/InstanceSaveMgr.h @@ -181,7 +181,7 @@ class InstanceSaveManager void _ResetOrWarnAll(uint32 mapid, Difficulty difficulty, bool warn, uint32 timeleft); void _ResetInstance(uint32 mapid, uint32 instanceId); void _ResetSave(InstanceSaveHashMap::iterator &itr); - void _DelHelper(DatabaseType &db, const char *fields, const char *table, const char *queryTail,...); + void _DelHelper(const char *fields, const char *table, const char *queryTail,...); // used during global instance resets bool lock_instLists; // fast lookup by instance id diff --git a/src/server/game/Scripting/ScriptSystem.cpp b/src/server/game/Scripting/ScriptSystem.cpp index d642faa3f3f..f2c83d199b4 100644 --- a/src/server/game/Scripting/ScriptSystem.cpp +++ b/src/server/game/Scripting/ScriptSystem.cpp @@ -56,7 +56,7 @@ void SystemMgr::LoadVersion() void SystemMgr::LoadScriptTexts() { sLog.outString("TSCR: Loading Script Texts..."); - LoadTrinityStrings(WorldDatabase,"script_texts",TEXT_SOURCE_RANGE,1+(TEXT_SOURCE_RANGE*2)); + LoadTrinityStrings("script_texts",TEXT_SOURCE_RANGE,1+(TEXT_SOURCE_RANGE*2)); QueryResult_AutoPtr Result = WorldDatabase.Query("SELECT entry, sound, type, language, emote FROM script_texts"); @@ -122,7 +122,7 @@ void SystemMgr::LoadScriptTexts() void SystemMgr::LoadScriptTextsCustom() { sLog.outString("TSCR: Loading Custom Texts..."); - LoadTrinityStrings(WorldDatabase,"custom_texts",TEXT_SOURCE_RANGE*2,1+(TEXT_SOURCE_RANGE*3)); + LoadTrinityStrings("custom_texts",TEXT_SOURCE_RANGE*2,1+(TEXT_SOURCE_RANGE*3)); QueryResult_AutoPtr Result = WorldDatabase.Query("SELECT entry, sound, type, language, emote FROM custom_texts"); diff --git a/src/server/shared/Database/DatabaseEnv.h b/src/server/shared/Database/DatabaseEnv.h index 53b7dee5521..d35948b7ec7 100644 --- a/src/server/shared/Database/DatabaseEnv.h +++ b/src/server/shared/Database/DatabaseEnv.h @@ -18,7 +18,7 @@ * Foundation, Inc., 59 Temple Place, Suite 330, Boston, MA 02111-1307 USA */ -#if !defined(DATABASEENV_H) +#ifndef DATABASEENV_H #define DATABASEENV_H #include "Common.h" @@ -28,22 +28,22 @@ #include "Field.h" #include "QueryResult.h" -#include "DatabaseWorkerPool.h" #include "MySQLThreading.h" #include "Transaction.h" -typedef DatabaseWorkerPool DatabaseType; - +#define MAX_QUERY_LEN 32*1024 #define _LIKE_ "LIKE" #define _TABLE_SIM_ "`" #define _CONCAT3_(A,B,C) "CONCAT( " A " , " B " , " C " )" #define _OFFSET_ "LIMIT %d,1" -extern DatabaseType WorldDatabase; -extern DatabaseType CharacterDatabase; -extern DatabaseType LoginDatabase; +#include "Implementation/LoginDatabase.h" +#include "Implementation/CharacterDatabase.h" +#include "Implementation/WorldDatabase.h" -#define MAX_QUERY_LEN 32*1024 +extern WorldDatabaseWorkerPool WorldDatabase; +extern CharacterDatabaseWorkerPool CharacterDatabase; +extern LoginDatabaseWorkerPool LoginDatabase; #endif diff --git a/src/server/shared/Database/DatabaseWorkerPool.cpp b/src/server/shared/Database/DatabaseWorkerPool.cpp index a704829545a..93861354a6b 100644 --- a/src/server/shared/Database/DatabaseWorkerPool.cpp +++ b/src/server/shared/Database/DatabaseWorkerPool.cpp @@ -22,233 +22,62 @@ #include "MySQLConnection.h" #include "SQLOperation.h" -DatabaseWorkerPool::DatabaseWorkerPool() : -m_queue(new ACE_Activation_Queue(new ACE_Message_Queue<ACE_MT_SYNCH>)), -m_connections(0) -{ - m_infoString = ""; - - mysql_library_init(-1, NULL, NULL); - WPFatal (mysql_thread_safe(), "Used MySQL library isn't thread-safe."); -} +DatabaseWorkerPool:: +DatabaseWorkerPool:: -DatabaseWorkerPool::~DatabaseWorkerPool() -{ - mysql_library_end(); -} -bool DatabaseWorkerPool::Open(const std::string& infoString, uint8 num_threads) -{ - sLog.outSQLDriver("Creating bundled/master MySQL connection."); - m_bundle_conn = new MySQLConnection(); - m_bundle_conn->Open(infoString); - ++m_connections; - - m_async_connections.resize(num_threads); - - /// Open the Async pool - for (uint8 i = 0; i < num_threads; i++) - { - m_async_connections[i] = new MySQLConnection(m_queue); - m_async_connections[i]->Open(infoString); - ++m_connections; - sLog.outSQLDriver("Async database thread pool opened. Worker thread count: %u", num_threads); - } - - m_infoString = infoString; - return true; -} void DatabaseWorkerPool::Close() -{ - sLog.outSQLDriver("Closing down %u connections on this DatabaseWorkerPool", (uint32)m_connections.value()); - /// Shuts down worker threads for this connection pool. - m_queue->queue()->deactivate(); - - for (uint8 i = 0; i < m_async_connections.size(); i++) - { - m_async_connections[i]->m_worker->wait(); - --m_connections; - } - - delete m_bundle_conn; - m_bundle_conn = NULL; - --m_connections; - sLog.outSQLDriver("Closed bundled connection."); - - //- MySQL::Thread_End() should be called manually from the aborting calling threads - sLog.outSQLDriver("Waiting for %u synchroneous database threads to exit.", (uint32)m_connections.value()); - while (!m_sync_connections.empty()) - { - } - sLog.outSQLDriver("Synchroneous database threads exited succesfuly."); -} + /*! This function creates a new MySQL connection for every MapUpdate thread and every unbundled task. */ void DatabaseWorkerPool::Init_MySQL_Connection() -{ - MySQLConnection* conn = new MySQLConnection(); - conn->Open(m_infoString); - - { - ACE_Guard<ACE_Thread_Mutex> guard(m_connectionMap_mtx); - ConnectionMap::const_iterator itr = m_sync_connections.find(ACE_Based::Thread::current()); - #ifdef _DEBUG - if (itr != m_sync_connections.end()) - sLog.outSQLDriver("Thread ["UI64FMTD"] already started a MySQL connection", (uint64)ACE_Based::Thread::currentId()); - #endif - m_sync_connections[ACE_Based::Thread::current()] = conn; - } - - sLog.outSQLDriver("Core thread with ID ["UI64FMTD"] initializing MySQL connection.", - (uint64)ACE_Based::Thread::currentId()); - - ++m_connections; -} + void DatabaseWorkerPool::End_MySQL_Connection() -{ - MySQLConnection* conn; - { - ACE_Guard<ACE_Thread_Mutex> guard(m_connectionMap_mtx); - ConnectionMap::iterator itr = m_sync_connections.find(ACE_Based::Thread::current()); - #ifdef _DEBUG - if (itr == m_sync_connections.end()) - sLog.outSQLDriver("Thread ["UI64FMTD"] already shut down their MySQL connection.", (uint64)ACE_Based::Thread::currentId()); - #endif - conn = itr->second; - m_sync_connections.erase(itr); - } - delete conn; - conn = NULL; - --m_connections; -} + void DatabaseWorkerPool::Execute(const char* sql) -{ - if (!sql) - return; - BasicStatementTask* task = new BasicStatementTask(sql); - Enqueue(task); -} void DatabaseWorkerPool::PExecute(const char* sql, ...) { - if (!sql) - return; - - va_list ap; - char szQuery[MAX_QUERY_LEN]; - va_start(ap, sql); - vsnprintf(szQuery, MAX_QUERY_LEN, sql, ap); - va_end(ap); - - Execute(szQuery); + } void DatabaseWorkerPool::DirectExecute(const char* sql) { - if (sql) - GetConnection()->Execute(sql); + } void DatabaseWorkerPool::DirectPExecute(const char* sql, ...) { - if (!sql) - return; - - va_list ap; - char szQuery[MAX_QUERY_LEN]; - va_start(ap, sql); - vsnprintf(szQuery, MAX_QUERY_LEN, sql, ap); - va_end(ap); - - return DirectExecute(szQuery); + } QueryResult_AutoPtr DatabaseWorkerPool::Query(const char* sql) -{ - return GetConnection()->Query(sql); -} -QueryResult_AutoPtr DatabaseWorkerPool::PQuery(const char* sql, ...) -{ - if (!sql) - return QueryResult_AutoPtr(NULL); - va_list ap; - char szQuery[MAX_QUERY_LEN]; - va_start(ap, sql); - vsnprintf(szQuery, MAX_QUERY_LEN, sql, ap); - va_end(ap); +QueryResult_AutoPtr DatabaseWorkerPool::PQuery(const char* sql, ...) - return Query(szQuery); -} SQLTransaction DatabaseWorkerPool::BeginTransaction() { - return SQLTransaction(new Transaction); + } void DatabaseWorkerPool::CommitTransaction(SQLTransaction transaction) -{ - #ifdef _DEBUG - if (transaction->GetSize() == 0) - { - sLog.outSQLDriver("Transaction contains 0 queries. Not executing."); - return; - } - if (transaction->GetSize() == 1) - { - sLog.outSQLDriver("Warning: Transaction only holds 1 query, consider removing Transaction context in code."); - } - #endif - Enqueue(new TransactionTask(transaction)); -} + ACE_Future<QueryResult_AutoPtr> DatabaseWorkerPool::AsyncQuery(const char* sql) -{ - QueryResultFuture res; - BasicStatementTask* task = new BasicStatementTask(sql, res); - Enqueue(task); - return res; //! Fool compiler, has no use yet -} + ACE_Future<QueryResult_AutoPtr> DatabaseWorkerPool::AsyncPQuery(const char* sql, ...) -{ - va_list ap; - char szQuery[MAX_QUERY_LEN]; - va_start(ap, sql); - vsnprintf(szQuery, MAX_QUERY_LEN, sql, ap); - va_end(ap); - return AsyncQuery(szQuery); -} QueryResultHolderFuture DatabaseWorkerPool::DelayQueryHolder(SQLQueryHolder* holder) -{ - QueryResultHolderFuture res; - SQLQueryHolderTask* task = new SQLQueryHolderTask(holder, res); - Enqueue(task); - return res; //! Fool compiler, has no use yet -} MySQLConnection* DatabaseWorkerPool::GetConnection() -{ - MySQLConnection* conn; - ConnectionMap::const_iterator itr; - { - /*! MapUpdate + unbundled threads */ - ACE_Guard<ACE_Thread_Mutex> guard(m_connectionMap_mtx); - itr = m_sync_connections.find(ACE_Based::Thread::current()); - if (itr != m_sync_connections.end()) - conn = itr->second; - } - /*! Bundled threads */ - conn = m_bundle_conn; - ASSERT (conn); - return conn; -} + diff --git a/src/server/shared/Database/DatabaseWorkerPool.h b/src/server/shared/Database/DatabaseWorkerPool.h index 8736609473a..c4875ce4a9e 100644 --- a/src/server/shared/Database/DatabaseWorkerPool.h +++ b/src/server/shared/Database/DatabaseWorkerPool.h @@ -24,11 +24,12 @@ #include <ace/Atomic_Op_T.h> #include <ace/Thread_Mutex.h> +#include "Log.h" +#include "DatabaseEnv.h" #include "SQLOperation.h" -#include "QueryResult.h" #include "Callback.h" #include "MySQLConnection.h" -#include "Transaction.h" +#include "DatabaseWorker.h" enum MySQLThreadBundle { @@ -40,30 +41,220 @@ enum MySQLThreadBundle MYSQL_BUNDLE_ALL = MYSQL_BUNDLE_CLI | MYSQL_BUNDLE_RA | MYSQL_BUNDLE_RAR | MYSQL_BUNDLE_WORLD, }; +template <class T> class DatabaseWorkerPool { public: - DatabaseWorkerPool(); - ~DatabaseWorkerPool(); - - bool Open(const std::string& infoString, uint8 num_threads); - void Close(); - - void Init_MySQL_Connection(); - void End_MySQL_Connection(); - - void Execute(const char* sql); - void PExecute(const char* sql, ...); - void DirectExecute(const char* sql); - void DirectPExecute(const char* sql, ...); - QueryResult_AutoPtr Query(const char* sql); - QueryResult_AutoPtr PQuery(const char* sql, ...); - ACE_Future<QueryResult_AutoPtr> AsyncQuery(const char* sql); - ACE_Future<QueryResult_AutoPtr> AsyncPQuery(const char* sql, ...); - QueryResultHolderFuture DelayQueryHolder(SQLQueryHolder* holder); + DatabaseWorkerPool() : + m_queue(new ACE_Activation_Queue(new ACE_Message_Queue<ACE_MT_SYNCH>)), + m_connections(0) + { + m_infoString = ""; + + mysql_library_init(-1, NULL, NULL); + WPFatal (mysql_thread_safe(), "Used MySQL library isn't thread-safe."); + } + + ~DatabaseWorkerPool() + { + mysql_library_end(); + } + + bool Open(const std::string& infoString, uint8 num_threads) + { + sLog.outSQLDriver("Creating bundled/master MySQL connection."); + m_bundle_conn = new T(); + m_bundle_conn->Open(infoString); + ++m_connections; + + m_async_connections.resize(num_threads); + + /// Open the Async pool + for (uint8 i = 0; i < num_threads; i++) + { + m_async_connections[i] = new T(m_queue); + m_async_connections[i]->Open(infoString); + ++m_connections; + sLog.outSQLDriver("Async database thread pool opened. Worker thread count: %u", num_threads); + } + + m_infoString = infoString; + return true; + } + + void Close() + { + sLog.outSQLDriver("Closing down %u connections on this DatabaseWorkerPool", (uint32)m_connections.value()); + /// Shuts down worker threads for this connection pool. + m_queue->queue()->deactivate(); + + for (uint8 i = 0; i < m_async_connections.size(); i++) + { + m_async_connections[i]->m_worker->wait(); + --m_connections; + } + + delete m_bundle_conn; + m_bundle_conn = NULL; + --m_connections; + sLog.outSQLDriver("Closed bundled connection."); + + //- MySQL::Thread_End() should be called manually from the aborting calling threads + sLog.outSQLDriver("Waiting for %u synchroneous database threads to exit.", (uint32)m_connections.value()); + while (!m_sync_connections.empty()) + { + } + sLog.outSQLDriver("Synchroneous database threads exited succesfuly."); + } + + void Init_MySQL_Connection() + { + T* conn = new T(); + conn->Open(m_infoString); + + { + ACE_Guard<ACE_Thread_Mutex> guard(m_connectionMap_mtx); + ConnectionMap::const_iterator itr = m_sync_connections.find(ACE_Based::Thread::current()); + #ifdef _DEBUG + if (itr != m_sync_connections.end()) + sLog.outSQLDriver("Thread ["UI64FMTD"] already started a MySQL connection", (uint64)ACE_Based::Thread::currentId()); + #endif + m_sync_connections[ACE_Based::Thread::current()] = conn; + } + + sLog.outSQLDriver("Core thread with ID ["UI64FMTD"] initializing MySQL connection.", + (uint64)ACE_Based::Thread::currentId()); + + ++m_connections; + } + + void End_MySQL_Connection() + { + T* conn; + { + ACE_Guard<ACE_Thread_Mutex> guard(m_connectionMap_mtx); + ConnectionMap::iterator itr = m_sync_connections.find(ACE_Based::Thread::current()); + #ifdef _DEBUG + if (itr == m_sync_connections.end()) + sLog.outSQLDriver("Thread ["UI64FMTD"] already shut down their MySQL connection.", (uint64)ACE_Based::Thread::currentId()); + #endif + conn = itr->second; + m_sync_connections.erase(itr); + } + delete conn; + conn = NULL; + --m_connections; + } + + void Execute(const char* sql) + { + if (!sql) + return; + + BasicStatementTask* task = new BasicStatementTask(sql); + Enqueue(task); + } + + void PExecute(const char* sql, ...) + { + if (!sql) + return; + + va_list ap; + char szQuery[MAX_QUERY_LEN]; + va_start(ap, sql); + vsnprintf(szQuery, MAX_QUERY_LEN, sql, ap); + va_end(ap); + + Execute(szQuery); + } + + void DirectExecute(const char* sql) + { + if (sql) + GetConnection()->Execute(sql); + } + + void DirectPExecute(const char* sql, ...) + { + if (!sql) + return; + + va_list ap; + char szQuery[MAX_QUERY_LEN]; + va_start(ap, sql); + vsnprintf(szQuery, MAX_QUERY_LEN, sql, ap); + va_end(ap); + + return DirectExecute(szQuery); + } + + QueryResult_AutoPtr Query(const char* sql) + { + return GetConnection()->Query(sql); + } + + QueryResult_AutoPtr PQuery(const char* sql, ...) + { + if (!sql) + return QueryResult_AutoPtr(NULL); + + va_list ap; + char szQuery[MAX_QUERY_LEN]; + va_start(ap, sql); + vsnprintf(szQuery, MAX_QUERY_LEN, sql, ap); + va_end(ap); + + return Query(szQuery); + } + + ACE_Future<QueryResult_AutoPtr> AsyncQuery(const char* sql) + { + QueryResultFuture res; + BasicStatementTask* task = new BasicStatementTask(sql, res); + Enqueue(task); + return res; //! Actual return value has no use yet + } + + ACE_Future<QueryResult_AutoPtr> AsyncPQuery(const char* sql, ...) + { + va_list ap; + char szQuery[MAX_QUERY_LEN]; + va_start(ap, sql); + vsnprintf(szQuery, MAX_QUERY_LEN, sql, ap); + va_end(ap); + + return AsyncQuery(szQuery); + } + + QueryResultHolderFuture DelayQueryHolder(SQLQueryHolder* holder) + { + QueryResultHolderFuture res; + SQLQueryHolderTask* task = new SQLQueryHolderTask(holder, res); + Enqueue(task); + return res; //! Fool compiler, has no use yet + } - SQLTransaction BeginTransaction(); - void CommitTransaction(SQLTransaction transaction); + SQLTransaction BeginTransaction() + { + return SQLTransaction(new Transaction); + } + + void CommitTransaction(SQLTransaction transaction) + { + #ifdef TRINITY_DEBUG + if (transaction->GetSize() == 0) + { + sLog.outSQLDriver("Transaction contains 0 queries. Not executing."); + return; + } + if (transaction->GetSize() == 1) + { + sLog.outSQLDriver("Warning: Transaction only holds 1 query, consider removing Transaction context in code."); + } + #endif + Enqueue(new TransactionTask(transaction)); + } void escape_string(std::string& str) { @@ -89,20 +280,34 @@ class DatabaseWorkerPool m_queue->enqueue(op); } - MySQLConnection* GetConnection(); + T* GetConnection() + { + T* conn; + ConnectionMap::const_iterator itr; + { + /*! MapUpdate + unbundled threads */ + ACE_Guard<ACE_Thread_Mutex> guard(m_connectionMap_mtx); + itr = m_sync_connections.find(ACE_Based::Thread::current()); + if (itr != m_sync_connections.end()) + conn = itr->second; + } + /*! Bundled threads */ + conn = m_bundle_conn; + ASSERT (conn); + return conn; + } private: - typedef UNORDERED_MAP<ACE_Based::Thread*, MySQLConnection*> ConnectionMap; - typedef UNORDERED_MAP<ACE_Based::Thread*, TransactionTask*> TransactionQueues; + typedef UNORDERED_MAP<ACE_Based::Thread*, T*> ConnectionMap; typedef ACE_Atomic_Op<ACE_SYNCH_MUTEX, uint32> AtomicUInt; private: ACE_Activation_Queue* m_queue; //! Queue shared by async worker threads. ACE_Thread_Mutex m_queue_mtx; //! For thread safe enqueues of delayed statements. - std::vector<MySQLConnection*> m_async_connections; + std::vector<T*> m_async_connections; ConnectionMap m_sync_connections; //! Holds a mysql connection+thread per mapUpdate thread and unbundled runnnables. ACE_Thread_Mutex m_connectionMap_mtx; //! For thread safe access to the synchroneous connection map - MySQLConnection* m_bundle_conn; //! Bundled connection (see Database.ThreadBundleMask config) + T* m_bundle_conn; //! Bundled connection (see Database.ThreadBundleMask config) AtomicUInt m_connections; //! Counter of MySQL connections; std::string m_infoString; //! Infostring that is passed on to child connections. }; diff --git a/src/server/shared/Database/MySQLConnection.h b/src/server/shared/Database/MySQLConnection.h index 08ceaa2860c..5e68e9d7516 100644 --- a/src/server/shared/Database/MySQLConnection.h +++ b/src/server/shared/Database/MySQLConnection.h @@ -23,14 +23,14 @@ class DatabaseWorker; class MySQLConnection { - friend class DatabaseWorkerPool; + template <class T> friend class DatabaseWorkerPool; public: MySQLConnection(); //! Constructor for synchroneous connections. MySQLConnection(ACE_Activation_Queue* queue); //! Constructor for asynchroneous connections. ~MySQLConnection(); - bool Open(const std::string& infoString); //! Connection details. + virtual bool Open(const std::string& infoString); //! Connection details. public: bool Execute(const char* sql); diff --git a/src/server/shared/Database/SQLStorage.cpp b/src/server/shared/Database/SQLStorage.cpp index 26aecb80fa3..f71b90b300e 100644 --- a/src/server/shared/Database/SQLStorage.cpp +++ b/src/server/shared/Database/SQLStorage.cpp @@ -21,8 +21,6 @@ #include "SQLStorage.h" #include "SQLStorageImpl.h" -extern DatabaseType WorldDatabase; - const char CreatureInfosrcfmt[]="iiiiiiiiiisssiiiiiiifffiffiifiiiiiiiiiiffiiiiiiiiiiiiiiiiiiiiiiiisiifffliiiiiiiliiisi"; const char CreatureInfodstfmt[]="iiiiiiiiiisssibbiiiifffiffiifiiiiiiiiiiffiiiiiiiiiiiiiiiiiiiiiiiisiifffliiiiiiiliiiii"; const char CreatureDataAddonInfofmt[]="iiiiiis"; diff --git a/src/server/shared/Database/Transaction.h b/src/server/shared/Database/Transaction.h index e94ca053e32..58c87b61270 100644 --- a/src/server/shared/Database/Transaction.h +++ b/src/server/shared/Database/Transaction.h @@ -19,6 +19,8 @@ #ifndef _TRANSACTION_H #define _TRANSACTION_H +#include "SQLOperation.h" + /*! Transactions, high level class. */ class Transaction { @@ -43,7 +45,7 @@ typedef ACE_Refcounted_Auto_Ptr<Transaction, ACE_Null_Mutex> SQLTransaction; /*! Low level class*/ class TransactionTask : public SQLOperation { - friend class DatabaseWorkerPool; + template <class T> friend class DatabaseWorkerPool; friend class DatabaseWorker; public: diff --git a/src/server/worldserver/Main.cpp b/src/server/worldserver/Main.cpp index fd6ecdda8fa..16ca0d9f98e 100644 --- a/src/server/worldserver/Main.cpp +++ b/src/server/worldserver/Main.cpp @@ -51,9 +51,9 @@ char serviceDescription[] = "Massive Network Game Object Server"; int m_ServiceStatus = -1; #endif -DatabaseType WorldDatabase; ///< Accessor to the world database -DatabaseType CharacterDatabase; ///< Accessor to the character database -DatabaseType LoginDatabase; ///< Accessor to the realm/login database +WorldDatabaseWorkerPool WorldDatabase; ///< Accessor to the world database +CharacterDatabaseWorkerPool CharacterDatabase; ///< Accessor to the character database +LoginDatabaseWorkerPool LoginDatabase; ///< Accessor to the realm/login database uint32 realmID; ///< Id of the realm |