Core/DB Layer: - Fix "Thread #1 unlocked a not-locked lock at 0x6D56E90" helgrind error (thanks to Aokromes for logs). Cause was unlocking MySQL connection on shutdown, but concurrent access at this point is not required.

- Remove redundant locking in PingOperation. Since these are delayed to async threads with their own MySQL connection, no concurrent access here either.
- Codestyle cleanup
- Documentation refining
This commit is contained in:
Machiavelli
2012-03-27 11:49:16 +02:00
parent 13e071195d
commit 88d81d27cf
2 changed files with 92 additions and 82 deletions

View File

@@ -33,17 +33,11 @@
class PingOperation : public SQLOperation
{
/// Operation for idle delaythreads
//! Operation for idle delaythreads
bool Execute()
{
if (m_conn->LockIfReady())
{
m_conn->Ping();
m_conn->Unlock();
return true;
}
return false;
m_conn->Ping();
return true;
}
};
@@ -53,10 +47,10 @@ class DatabaseWorkerPool
public:
/* Activity state */
DatabaseWorkerPool() :
m_queue(new ACE_Activation_Queue())
_queue(new ACE_Activation_Queue())
{
memset(m_connectionCount, 0, sizeof(m_connectionCount));
m_connections.resize(IDX_SIZE);
memset(_connectionCount, 0, sizeof(_connectionCount));
_connections.resize(IDX_SIZE);
WPFatal (mysql_thread_safe(), "Used MySQL library isn't thread-safe.");
}
@@ -68,66 +62,72 @@ class DatabaseWorkerPool
bool Open(const std::string& infoString, uint8 async_threads, uint8 synch_threads)
{
bool res = true;
m_connectionInfo = MySQLConnectionInfo(infoString);
_connectionInfo = MySQLConnectionInfo(infoString);
sLog->outSQLDriver("Opening databasepool '%s'. Async threads: %u, synch threads: %u", m_connectionInfo.database.c_str(), async_threads, synch_threads);
sLog->outSQLDriver("Opening DatabasePool '%s'. Asynchronous connections: %u, synchronous connections: %u.",
GetDatabaseName(), async_threads, synch_threads);
/// Open asynchronous connections (delayed operations)
m_connections[IDX_ASYNC].resize(async_threads);
//! Open asynchronous connections (delayed operations)
_connections[IDX_ASYNC].resize(async_threads);
for (uint8 i = 0; i < async_threads; ++i)
{
T* t = new T(m_queue, m_connectionInfo);
T* t = new T(_queue, _connectionInfo);
res &= t->Open();
m_connections[IDX_ASYNC][i] = t;
++m_connectionCount[IDX_ASYNC];
_connections[IDX_ASYNC][i] = t;
++_connectionCount[IDX_ASYNC];
}
/// Open synchronous connections (direct, blocking operations)
m_connections[IDX_SYNCH].resize(synch_threads);
//! Open synchronous connections (direct, blocking operations)
_connections[IDX_SYNCH].resize(synch_threads);
for (uint8 i = 0; i < synch_threads; ++i)
{
T* t = new T(m_connectionInfo);
T* t = new T(_connectionInfo);
res &= t->Open();
m_connections[IDX_SYNCH][i] = t;
++m_connectionCount[IDX_SYNCH];
_connections[IDX_SYNCH][i] = t;
++_connectionCount[IDX_SYNCH];
}
sLog->outSQLDriver("Databasepool opened successfully. %u total connections running.", (m_connectionCount[IDX_SYNCH] + m_connectionCount[IDX_ASYNC]));
if (res)
sLog->outSQLDriver("DatabasePool '%s' opened successfully. %u total connections running.", GetDatabaseName(),
(_connectionCount[IDX_SYNCH] + _connectionCount[IDX_ASYNC]));
else
sLog->outError("DatabasePool %s NOT opened. There were errors opening the MySQL connections. Check your SQLDriverLogFile "
"for specific errors.", GetDatabaseName());
return res;
}
void Close()
{
sLog->outSQLDriver("Closing down databasepool '%s'.", m_connectionInfo.database.c_str());
sLog->outSQLDriver("Closing down DatabasePool '%s'.", GetDatabaseName());
/// Shuts down delaythreads for this connection pool by underlying deactivate()
m_queue->queue()->close();
//! Shuts down delaythreads for this connection pool by underlying deactivate().
//! The next dequeue attempt in the worker thread tasks will result in an error,
//! ultimately ending the worker thread task.
_queue->queue()->close();
for (uint8 i = 0; i < m_connectionCount[IDX_ASYNC]; ++i)
for (uint8 i = 0; i < _connectionCount[IDX_ASYNC]; ++i)
{
/// TODO: Better way. probably should flip a boolean and check it on low level code before doing anything on the mysql ctx
/// Now we just wait until m_queue gives the signal to the worker threads to stop
T* t = m_connections[IDX_ASYNC][i];
T* t = _connections[IDX_ASYNC][i];
DatabaseWorker* worker = t->m_worker;
worker->wait();
worker->wait(); //! Block until no more threads are running this task.
delete worker;
t->Close();
t->Close(); //! Closes the actualy MySQL connection.
}
sLog->outSQLDriver("Asynchronous connections on databasepool '%s' terminated. Proceeding with synchronous connections.", m_connectionInfo.database.c_str());
sLog->outSQLDriver("Asynchronous connections on DatabasePool '%s' terminated. Proceeding with synchronous connections.",
GetDatabaseName());
/// Shut down the synchronous connections
for (uint8 i = 0; i < m_connectionCount[IDX_SYNCH]; ++i)
{
T* t = m_connections[IDX_SYNCH][i];
//while (1)
// if (t->LockIfReady()) -- For some reason deadlocks us
t->Close();
}
//! Shut down the synchronous connections
//! There's no need for locking the connection, because DatabaseWorkerPool<>::Close
//! should only be called after any other thread tasks in the core have exited,
//! meaning there can be no concurrent access at this point.
for (uint8 i = 0; i < _connectionCount[IDX_SYNCH]; ++i)
_connections[IDX_SYNCH][i]->Close();
delete m_queue;
//! Deletes the ACE_Activation_Queue object and its underlying ACE_Message_Queue
delete _queue;
sLog->outSQLDriver("All connections on databasepool %s closed.", m_connectionInfo.database.c_str());
sLog->outSQLDriver("All connections on DatabasePool '%s' closed.", GetDatabaseName());
}
/**
@@ -344,20 +344,22 @@ class DatabaseWorkerPool
//! were appended to the transaction will be respected during execution.
void CommitTransaction(SQLTransaction transaction)
{
if (sLog->GetSQLDriverQueryLogging())
#ifdef TRINITY_DEBUG
//! Only analyze transaction weaknesses in Debug mode.
//! Ideally we catch the faults in Debug mode and then correct them,
//! so there's no need to waste these CPU cycles in Release mode.
switch (transaction->GetSize())
{
switch (transaction->GetSize())
{
case 0:
sLog->outSQLDriver("Transaction contains 0 queries. Not executing.");
return;
case 1:
sLog->outSQLDriver("Warning: Transaction only holds 1 query, consider removing Transaction context in code.");
break;
default:
break;
}
case 0:
sLog->outSQLDriver("Transaction contains 0 queries. Not executing.");
return;
case 1:
sLog->outSQLDriver("Warning: Transaction only holds 1 query, consider removing Transaction context in code.");
break;
default:
break;
}
#endif // TRINITY_DEBUG
Enqueue(new TransactionTask(transaction));
}
@@ -373,9 +375,11 @@ class DatabaseWorkerPool
return;
}
//! Handle MySQL Errno 1213 without extending deadlock to the core itself
//! TODO: More elegant way
if (con->GetLastError() == 1213)
{
uint8 loopBreaker = 5; // Handle MySQL Errno 1213 without extending deadlock to the core itself
uint8 loopBreaker = 5;
for (uint8 i = 0; i < loopBreaker; ++i)
{
if (con->ExecuteTransaction(transaction))
@@ -383,7 +387,7 @@ class DatabaseWorkerPool
}
}
// Clean up now.
//! Clean up now.
transaction->Cleanup();
con->Unlock();
@@ -414,6 +418,7 @@ class DatabaseWorkerPool
*/
//! Automanaged (internally) pointer to a prepared statement object for usage in upper level code.
//! Pointer is deleted in this->Query(PreparedStatement*) or PreparedStatementTask::~PreparedStatementTask.
//! This object is not tied to the prepared statement on the MySQL context yet until execution.
PreparedStatement* GetPreparedStatement(uint32 index)
{
@@ -435,10 +440,10 @@ class DatabaseWorkerPool
//! Keeps all our MySQL connections alive, prevent the server from disconnecting us.
void KeepAlive()
{
/// Ping synchronous connections
for (uint8 i = 0; i < m_connectionCount[IDX_SYNCH]; ++i)
//! Ping synchronous connections
for (uint8 i = 0; i < _connectionCount[IDX_SYNCH]; ++i)
{
T* t = m_connections[IDX_SYNCH][i];
T* t = _connections[IDX_SYNCH][i];
if (t->LockIfReady())
{
t->Ping();
@@ -446,10 +451,10 @@ class DatabaseWorkerPool
}
}
/// Assuming all worker threads are free, every worker thread will receive 1 ping operation request
/// If one or more worker threads are busy, the ping operations will not be split evenly, but this doesn't matter
/// as the sole purpose is to prevent connections from idling.
for (size_t i = 0; i < m_connections[IDX_ASYNC].size(); ++i)
//! Assuming all worker threads are free, every worker thread will receive 1 ping operation request
//! If one or more worker threads are busy, the ping operations will not be split evenly, but this doesn't matter
//! as the sole purpose is to prevent connections from idling.
for (size_t i = 0; i < _connections[IDX_ASYNC].size(); ++i)
Enqueue(new PingOperation);
}
@@ -459,41 +464,50 @@ class DatabaseWorkerPool
if (!to || !from || !length)
return 0;
return mysql_real_escape_string(m_connections[IDX_SYNCH][0]->GetHandle(), to, from, length);
return mysql_real_escape_string(_connections[IDX_SYNCH][0]->GetHandle(), to, from, length);
}
void Enqueue(SQLOperation* op)
{
m_queue->enqueue(op);
_queue->enqueue(op);
}
//! Gets a free connection in the synchronous connection pool.
//! Caller MUST call t->Unlock() after touching the MySQL context to prevent deadlocks.
T* GetFreeConnection()
{
uint8 i = 0;
size_t num_cons = m_connectionCount[IDX_SYNCH];
for (;;) /// Block forever until a connection is free
size_t num_cons = _connectionCount[IDX_SYNCH];
//! Block forever until a connection is free
for (;;)
{
T* t = m_connections[IDX_SYNCH][++i % num_cons ];
if (t->LockIfReady()) /// Must be matched with t->Unlock() or you will get deadlocks
T* t = _connections[IDX_SYNCH][++i % num_cons];
//! Must be matched with t->Unlock() or you will get deadlocks
if (t->LockIfReady())
return t;
}
// This will be called when Celine Dion learns to sing
//! This will be called when Celine Dion learns to sing
return NULL;
}
char const* GetDatabaseName() const
{
return _connectionInfo.database.c_str();
}
private:
enum
enum _internalIndex
{
IDX_ASYNC,
IDX_SYNCH,
IDX_SIZE,
};
ACE_Activation_Queue* m_queue; //! Queue shared by async worker threads.
std::vector< std::vector<T*> > m_connections;
uint32 m_connectionCount[2]; //! Counter of MySQL connections;
MySQLConnectionInfo m_connectionInfo;
ACE_Activation_Queue* _queue; //! Queue shared by async worker threads.
std::vector< std::vector<T*> > _connections;
uint32 _connectionCount[2]; //! Counter of MySQL connections;
MySQLConnectionInfo _connectionInfo;
};
#endif

View File

@@ -58,17 +58,13 @@ MySQLConnection::~MySQLConnection()
{
ASSERT (m_Mysql); /// MySQL context must be present at this point
sLog->outSQLDriver("MySQLConnection::~MySQLConnection()");
for (size_t i = 0; i < m_stmts.size(); ++i)
delete m_stmts[i];
for (PreparedStatementMap::const_iterator itr = m_queries.begin(); itr != m_queries.end(); ++itr)
{
free((void *)m_queries[itr->first].first);
}
mysql_close(m_Mysql);
Unlock(); /// Unlock while we die, how ironic
}
void MySQLConnection::Close()