diff options
Diffstat (limited to 'src')
-rwxr-xr-x | src/server/shared/Database/DatabaseWorkerPool.h | 170 | ||||
-rwxr-xr-x | src/server/shared/Database/MySQLConnection.cpp | 4 |
2 files changed, 92 insertions, 82 deletions
diff --git a/src/server/shared/Database/DatabaseWorkerPool.h b/src/server/shared/Database/DatabaseWorkerPool.h index aef952fab55..5fe31006bb2 100755 --- a/src/server/shared/Database/DatabaseWorkerPool.h +++ b/src/server/shared/Database/DatabaseWorkerPool.h @@ -33,17 +33,11 @@ class PingOperation : public SQLOperation { - /// Operation for idle delaythreads + //! Operation for idle delaythreads bool Execute() { - if (m_conn->LockIfReady()) - { - m_conn->Ping(); - m_conn->Unlock(); - return true; - } - - return false; + m_conn->Ping(); + return true; } }; @@ -53,10 +47,10 @@ class DatabaseWorkerPool public: /* Activity state */ DatabaseWorkerPool() : - m_queue(new ACE_Activation_Queue()) + _queue(new ACE_Activation_Queue()) { - memset(m_connectionCount, 0, sizeof(m_connectionCount)); - m_connections.resize(IDX_SIZE); + memset(_connectionCount, 0, sizeof(_connectionCount)); + _connections.resize(IDX_SIZE); WPFatal (mysql_thread_safe(), "Used MySQL library isn't thread-safe."); } @@ -68,66 +62,72 @@ class DatabaseWorkerPool bool Open(const std::string& infoString, uint8 async_threads, uint8 synch_threads) { bool res = true; - m_connectionInfo = MySQLConnectionInfo(infoString); + _connectionInfo = MySQLConnectionInfo(infoString); - sLog->outSQLDriver("Opening databasepool '%s'. Async threads: %u, synch threads: %u", m_connectionInfo.database.c_str(), async_threads, synch_threads); + sLog->outSQLDriver("Opening DatabasePool '%s'. Asynchronous connections: %u, synchronous connections: %u.", + GetDatabaseName(), async_threads, synch_threads); - /// Open asynchronous connections (delayed operations) - m_connections[IDX_ASYNC].resize(async_threads); + //! Open asynchronous connections (delayed operations) + _connections[IDX_ASYNC].resize(async_threads); for (uint8 i = 0; i < async_threads; ++i) { - T* t = new T(m_queue, m_connectionInfo); + T* t = new T(_queue, _connectionInfo); res &= t->Open(); - m_connections[IDX_ASYNC][i] = t; - ++m_connectionCount[IDX_ASYNC]; + _connections[IDX_ASYNC][i] = t; + ++_connectionCount[IDX_ASYNC]; } - /// Open synchronous connections (direct, blocking operations) - m_connections[IDX_SYNCH].resize(synch_threads); + //! Open synchronous connections (direct, blocking operations) + _connections[IDX_SYNCH].resize(synch_threads); for (uint8 i = 0; i < synch_threads; ++i) { - T* t = new T(m_connectionInfo); + T* t = new T(_connectionInfo); res &= t->Open(); - m_connections[IDX_SYNCH][i] = t; - ++m_connectionCount[IDX_SYNCH]; + _connections[IDX_SYNCH][i] = t; + ++_connectionCount[IDX_SYNCH]; } - sLog->outSQLDriver("Databasepool opened successfully. %u total connections running.", (m_connectionCount[IDX_SYNCH] + m_connectionCount[IDX_ASYNC])); + if (res) + sLog->outSQLDriver("DatabasePool '%s' opened successfully. %u total connections running.", GetDatabaseName(), + (_connectionCount[IDX_SYNCH] + _connectionCount[IDX_ASYNC])); + else + sLog->outError("DatabasePool %s NOT opened. There were errors opening the MySQL connections. Check your SQLDriverLogFile " + "for specific errors.", GetDatabaseName()); return res; } void Close() { - sLog->outSQLDriver("Closing down databasepool '%s'.", m_connectionInfo.database.c_str()); + sLog->outSQLDriver("Closing down DatabasePool '%s'.", GetDatabaseName()); - /// Shuts down delaythreads for this connection pool by underlying deactivate() - m_queue->queue()->close(); + //! Shuts down delaythreads for this connection pool by underlying deactivate(). + //! The next dequeue attempt in the worker thread tasks will result in an error, + //! ultimately ending the worker thread task. + _queue->queue()->close(); - for (uint8 i = 0; i < m_connectionCount[IDX_ASYNC]; ++i) + for (uint8 i = 0; i < _connectionCount[IDX_ASYNC]; ++i) { - /// TODO: Better way. probably should flip a boolean and check it on low level code before doing anything on the mysql ctx - /// Now we just wait until m_queue gives the signal to the worker threads to stop - T* t = m_connections[IDX_ASYNC][i]; + T* t = _connections[IDX_ASYNC][i]; DatabaseWorker* worker = t->m_worker; - worker->wait(); + worker->wait(); //! Block until no more threads are running this task. delete worker; - t->Close(); + t->Close(); //! Closes the actualy MySQL connection. } - sLog->outSQLDriver("Asynchronous connections on databasepool '%s' terminated. Proceeding with synchronous connections.", m_connectionInfo.database.c_str()); + sLog->outSQLDriver("Asynchronous connections on DatabasePool '%s' terminated. Proceeding with synchronous connections.", + GetDatabaseName()); - /// Shut down the synchronous connections - for (uint8 i = 0; i < m_connectionCount[IDX_SYNCH]; ++i) - { - T* t = m_connections[IDX_SYNCH][i]; - //while (1) - // if (t->LockIfReady()) -- For some reason deadlocks us - t->Close(); - } + //! Shut down the synchronous connections + //! There's no need for locking the connection, because DatabaseWorkerPool<>::Close + //! should only be called after any other thread tasks in the core have exited, + //! meaning there can be no concurrent access at this point. + for (uint8 i = 0; i < _connectionCount[IDX_SYNCH]; ++i) + _connections[IDX_SYNCH][i]->Close(); - delete m_queue; + //! Deletes the ACE_Activation_Queue object and its underlying ACE_Message_Queue + delete _queue; - sLog->outSQLDriver("All connections on databasepool %s closed.", m_connectionInfo.database.c_str()); + sLog->outSQLDriver("All connections on DatabasePool '%s' closed.", GetDatabaseName()); } /** @@ -344,20 +344,22 @@ class DatabaseWorkerPool //! were appended to the transaction will be respected during execution. void CommitTransaction(SQLTransaction transaction) { - if (sLog->GetSQLDriverQueryLogging()) + #ifdef TRINITY_DEBUG + //! Only analyze transaction weaknesses in Debug mode. + //! Ideally we catch the faults in Debug mode and then correct them, + //! so there's no need to waste these CPU cycles in Release mode. + switch (transaction->GetSize()) { - switch (transaction->GetSize()) - { - case 0: - sLog->outSQLDriver("Transaction contains 0 queries. Not executing."); - return; - case 1: - sLog->outSQLDriver("Warning: Transaction only holds 1 query, consider removing Transaction context in code."); - break; - default: - break; - } + case 0: + sLog->outSQLDriver("Transaction contains 0 queries. Not executing."); + return; + case 1: + sLog->outSQLDriver("Warning: Transaction only holds 1 query, consider removing Transaction context in code."); + break; + default: + break; } + #endif // TRINITY_DEBUG Enqueue(new TransactionTask(transaction)); } @@ -373,9 +375,11 @@ class DatabaseWorkerPool return; } + //! Handle MySQL Errno 1213 without extending deadlock to the core itself + //! TODO: More elegant way if (con->GetLastError() == 1213) { - uint8 loopBreaker = 5; // Handle MySQL Errno 1213 without extending deadlock to the core itself + uint8 loopBreaker = 5; for (uint8 i = 0; i < loopBreaker; ++i) { if (con->ExecuteTransaction(transaction)) @@ -383,7 +387,7 @@ class DatabaseWorkerPool } } - // Clean up now. + //! Clean up now. transaction->Cleanup(); con->Unlock(); @@ -414,6 +418,7 @@ class DatabaseWorkerPool */ //! Automanaged (internally) pointer to a prepared statement object for usage in upper level code. + //! Pointer is deleted in this->Query(PreparedStatement*) or PreparedStatementTask::~PreparedStatementTask. //! This object is not tied to the prepared statement on the MySQL context yet until execution. PreparedStatement* GetPreparedStatement(uint32 index) { @@ -435,10 +440,10 @@ class DatabaseWorkerPool //! Keeps all our MySQL connections alive, prevent the server from disconnecting us. void KeepAlive() { - /// Ping synchronous connections - for (uint8 i = 0; i < m_connectionCount[IDX_SYNCH]; ++i) + //! Ping synchronous connections + for (uint8 i = 0; i < _connectionCount[IDX_SYNCH]; ++i) { - T* t = m_connections[IDX_SYNCH][i]; + T* t = _connections[IDX_SYNCH][i]; if (t->LockIfReady()) { t->Ping(); @@ -446,10 +451,10 @@ class DatabaseWorkerPool } } - /// Assuming all worker threads are free, every worker thread will receive 1 ping operation request - /// If one or more worker threads are busy, the ping operations will not be split evenly, but this doesn't matter - /// as the sole purpose is to prevent connections from idling. - for (size_t i = 0; i < m_connections[IDX_ASYNC].size(); ++i) + //! Assuming all worker threads are free, every worker thread will receive 1 ping operation request + //! If one or more worker threads are busy, the ping operations will not be split evenly, but this doesn't matter + //! as the sole purpose is to prevent connections from idling. + for (size_t i = 0; i < _connections[IDX_ASYNC].size(); ++i) Enqueue(new PingOperation); } @@ -459,41 +464,50 @@ class DatabaseWorkerPool if (!to || !from || !length) return 0; - return mysql_real_escape_string(m_connections[IDX_SYNCH][0]->GetHandle(), to, from, length); + return mysql_real_escape_string(_connections[IDX_SYNCH][0]->GetHandle(), to, from, length); } void Enqueue(SQLOperation* op) { - m_queue->enqueue(op); + _queue->enqueue(op); } + //! Gets a free connection in the synchronous connection pool. + //! Caller MUST call t->Unlock() after touching the MySQL context to prevent deadlocks. T* GetFreeConnection() { uint8 i = 0; - size_t num_cons = m_connectionCount[IDX_SYNCH]; - for (;;) /// Block forever until a connection is free + size_t num_cons = _connectionCount[IDX_SYNCH]; + //! Block forever until a connection is free + for (;;) { - T* t = m_connections[IDX_SYNCH][++i % num_cons ]; - if (t->LockIfReady()) /// Must be matched with t->Unlock() or you will get deadlocks + T* t = _connections[IDX_SYNCH][++i % num_cons]; + //! Must be matched with t->Unlock() or you will get deadlocks + if (t->LockIfReady()) return t; } - // This will be called when Celine Dion learns to sing + //! This will be called when Celine Dion learns to sing return NULL; } + char const* GetDatabaseName() const + { + return _connectionInfo.database.c_str(); + } + private: - enum + enum _internalIndex { IDX_ASYNC, IDX_SYNCH, IDX_SIZE, }; - ACE_Activation_Queue* m_queue; //! Queue shared by async worker threads. - std::vector< std::vector<T*> > m_connections; - uint32 m_connectionCount[2]; //! Counter of MySQL connections; - MySQLConnectionInfo m_connectionInfo; + ACE_Activation_Queue* _queue; //! Queue shared by async worker threads. + std::vector< std::vector<T*> > _connections; + uint32 _connectionCount[2]; //! Counter of MySQL connections; + MySQLConnectionInfo _connectionInfo; }; #endif diff --git a/src/server/shared/Database/MySQLConnection.cpp b/src/server/shared/Database/MySQLConnection.cpp index f686db4c199..7fb4a4f7025 100755 --- a/src/server/shared/Database/MySQLConnection.cpp +++ b/src/server/shared/Database/MySQLConnection.cpp @@ -58,17 +58,13 @@ MySQLConnection::~MySQLConnection() { ASSERT (m_Mysql); /// MySQL context must be present at this point - sLog->outSQLDriver("MySQLConnection::~MySQLConnection()"); for (size_t i = 0; i < m_stmts.size(); ++i) delete m_stmts[i]; for (PreparedStatementMap::const_iterator itr = m_queries.begin(); itr != m_queries.end(); ++itr) - { free((void *)m_queries[itr->first].first); - } mysql_close(m_Mysql); - Unlock(); /// Unlock while we die, how ironic } void MySQLConnection::Close() |