/*
* Copyright (C)
*
* Copyright (C) 2008-2016 TrinityCore
* Copyright (C) 2005-2009 MaNGOS
*/
#ifndef _DATABASEWORKERPOOL_H
#define _DATABASEWORKERPOOL_H
#include
#include "Common.h"
#include "Callback.h"
#include "MySQLConnection.h"
#include "Transaction.h"
#include "DatabaseWorker.h"
#include "PreparedStatement.h"
#include "Log.h"
#include "QueryResult.h"
#include "QueryHolder.h"
#include "AdhocStatement.h"
#define MIN_MYSQL_SERVER_VERSION 50100u
#define MIN_MYSQL_CLIENT_VERSION 50100u
class PingOperation : public SQLOperation
{
//! Operation for idle delaythreads
bool Execute()
{
m_conn->Ping();
return true;
}
};
template
class DatabaseWorkerPool
{
public:
/* Activity state */
DatabaseWorkerPool() :
_mqueue(new ACE_Message_Queue(2*1024*1024, 2*1024*1024)),
_queue(new ACE_Activation_Queue(_mqueue))
{
memset(_connectionCount, 0, sizeof(_connectionCount));
_connections.resize(IDX_SIZE);
WPFatal(mysql_thread_safe(), "Used MySQL library isn't thread-safe.");
WPFatal(mysql_get_client_version() >= MIN_MYSQL_CLIENT_VERSION, "AzerothCore does not support MySQL versions below 5.1");
}
~DatabaseWorkerPool()
{
}
bool Open(const std::string& infoString, uint8 async_threads, uint8 synch_threads)
{
bool res = true;
_connectionInfo = MySQLConnectionInfo(infoString);
sLog->outSQLDriver("Opening DatabasePool '%s'. Asynchronous connections: %u, synchronous connections: %u.",
GetDatabaseName(), async_threads, synch_threads);
//! Open asynchronous connections (delayed operations)
_connections[IDX_ASYNC].resize(async_threads);
for (uint8 i = 0; i < async_threads; ++i)
{
T* t = new T(_queue, _connectionInfo);
res &= t->Open();
if (res) // only check mysql version if connection is valid
WPFatal(mysql_get_server_version(t->GetHandle()) >= MIN_MYSQL_SERVER_VERSION, "AzerothCore does not support MySQL versions below 5.1");
_connections[IDX_ASYNC][i] = t;
++_connectionCount[IDX_ASYNC];
}
//! Open synchronous connections (direct, blocking operations)
_connections[IDX_SYNCH].resize(synch_threads);
for (uint8 i = 0; i < synch_threads; ++i)
{
T* t = new T(_connectionInfo);
res &= t->Open();
_connections[IDX_SYNCH][i] = t;
++_connectionCount[IDX_SYNCH];
}
if (res)
sLog->outSQLDriver("DatabasePool '%s' opened successfully. %u total connections running.", GetDatabaseName(),
(_connectionCount[IDX_SYNCH] + _connectionCount[IDX_ASYNC]));
else
sLog->outError("DatabasePool %s NOT opened. There were errors opening the MySQL connections. Check your SQLDriverLogFile "
"for specific errors.", GetDatabaseName());
return res;
}
void Close()
{
sLog->outSQLDriver("Closing down DatabasePool '%s'.", GetDatabaseName());
//! Shuts down delaythreads for this connection pool by underlying deactivate().
//! The next dequeue attempt in the worker thread tasks will result in an error,
//! ultimately ending the worker thread task.
_queue->queue()->close();
for (uint8 i = 0; i < _connectionCount[IDX_ASYNC]; ++i)
{
T* t = _connections[IDX_ASYNC][i];
DatabaseWorker* worker = t->m_worker;
worker->wait(); //! Block until no more threads are running this task.
delete worker;
t->Close(); //! Closes the actualy MySQL connection.
}
sLog->outSQLDriver("Asynchronous connections on DatabasePool '%s' terminated. Proceeding with synchronous connections.",
GetDatabaseName());
//! Shut down the synchronous connections
//! There's no need for locking the connection, because DatabaseWorkerPool<>::Close
//! should only be called after any other thread tasks in the core have exited,
//! meaning there can be no concurrent access at this point.
for (uint8 i = 0; i < _connectionCount[IDX_SYNCH]; ++i)
_connections[IDX_SYNCH][i]->Close();
//! Deletes the ACE_Activation_Queue object and its underlying ACE_Message_Queue
delete _queue;
delete _mqueue;
sLog->outSQLDriver("All connections on DatabasePool '%s' closed.", GetDatabaseName());
}
/**
Delayed one-way statement methods.
*/
//! Enqueues a one-way SQL operation in string format that will be executed asynchronously.
//! This method should only be used for queries that are only executed once, e.g during startup.
void Execute(const char* sql)
{
if (!sql)
return;
BasicStatementTask* task = new BasicStatementTask(sql);
Enqueue(task);
}
//! Enqueues a one-way SQL operation in string format -with variable args- that will be executed asynchronously.
//! This method should only be used for queries that are only executed once, e.g during startup.
void PExecute(const char* sql, ...)
{
if (!sql)
return;
va_list ap;
char szQuery[MAX_QUERY_LEN];
va_start(ap, sql);
vsnprintf(szQuery, MAX_QUERY_LEN, sql, ap);
va_end(ap);
Execute(szQuery);
}
//! Enqueues a one-way SQL operation in prepared statement format that will be executed asynchronously.
//! Statement must be prepared with CONNECTION_ASYNC flag.
void Execute(PreparedStatement* stmt)
{
PreparedStatementTask* task = new PreparedStatementTask(stmt);
Enqueue(task);
}
/**
Direct synchronous one-way statement methods.
*/
//! Directly executes a one-way SQL operation in string format, that will block the calling thread until finished.
//! This method should only be used for queries that are only executed once, e.g during startup.
void DirectExecute(const char* sql)
{
if (!sql)
return;
T* t = GetFreeConnection();
t->Execute(sql);
t->Unlock();
}
//! Directly executes a one-way SQL operation in string format -with variable args-, that will block the calling thread until finished.
//! This method should only be used for queries that are only executed once, e.g during startup.
void DirectPExecute(const char* sql, ...)
{
if (!sql)
return;
va_list ap;
char szQuery[MAX_QUERY_LEN];
va_start(ap, sql);
vsnprintf(szQuery, MAX_QUERY_LEN, sql, ap);
va_end(ap);
return DirectExecute(szQuery);
}
//! Directly executes a one-way SQL operation in prepared statement format, that will block the calling thread until finished.
//! Statement must be prepared with the CONNECTION_SYNCH flag.
void DirectExecute(PreparedStatement* stmt)
{
T* t = GetFreeConnection();
t->Execute(stmt);
t->Unlock();
//! Delete proxy-class. Not needed anymore
delete stmt;
}
/**
Synchronous query (with resultset) methods.
*/
//! Directly executes an SQL query in string format that will block the calling thread until finished.
//! Returns reference counted auto pointer, no need for manual memory management in upper level code.
QueryResult Query(const char* sql, T* conn = NULL)
{
if (!conn)
conn = GetFreeConnection();
ResultSet* result = conn->Query(sql);
conn->Unlock();
if (!result || !result->GetRowCount())
{
delete result;
return QueryResult(NULL);
}
result->NextRow();
return QueryResult(result);
}
//! Directly executes an SQL query in string format -with variable args- that will block the calling thread until finished.
//! Returns reference counted auto pointer, no need for manual memory management in upper level code.
QueryResult PQuery(const char* sql, T* conn, ...)
{
if (!sql)
return QueryResult(NULL);
va_list ap;
char szQuery[MAX_QUERY_LEN];
va_start(ap, conn);
vsnprintf(szQuery, MAX_QUERY_LEN, sql, ap);
va_end(ap);
return Query(szQuery, conn);
}
//! Directly executes an SQL query in string format -with variable args- that will block the calling thread until finished.
//! Returns reference counted auto pointer, no need for manual memory management in upper level code.
QueryResult PQuery(const char* sql, ...)
{
if (!sql)
return QueryResult(NULL);
va_list ap;
char szQuery[MAX_QUERY_LEN];
va_start(ap, sql);
vsnprintf(szQuery, MAX_QUERY_LEN, sql, ap);
va_end(ap);
return Query(szQuery);
}
//! Directly executes an SQL query in prepared format that will block the calling thread until finished.
//! Returns reference counted auto pointer, no need for manual memory management in upper level code.
//! Statement must be prepared with CONNECTION_SYNCH flag.
PreparedQueryResult Query(PreparedStatement* stmt)
{
T* t = GetFreeConnection();
PreparedResultSet* ret = t->Query(stmt);
t->Unlock();
//! Delete proxy-class. Not needed anymore
delete stmt;
if (!ret || !ret->GetRowCount())
{
delete ret;
return PreparedQueryResult(NULL);
}
return PreparedQueryResult(ret);
}
/**
Asynchronous query (with resultset) methods.
*/
//! Enqueues a query in string format that will set the value of the QueryResultFuture return object as soon as the query is executed.
//! The return value is then processed in ProcessQueryCallback methods.
QueryResultFuture AsyncQuery(const char* sql)
{
QueryResultFuture res;
BasicStatementTask* task = new BasicStatementTask(sql, res);
Enqueue(task);
return res; //! Actual return value has no use yet
}
//! Enqueues a query in string format -with variable args- that will set the value of the QueryResultFuture return object as soon as the query is executed.
//! The return value is then processed in ProcessQueryCallback methods.
QueryResultFuture AsyncPQuery(const char* sql, ...)
{
va_list ap;
char szQuery[MAX_QUERY_LEN];
va_start(ap, sql);
vsnprintf(szQuery, MAX_QUERY_LEN, sql, ap);
va_end(ap);
return AsyncQuery(szQuery);
}
//! Enqueues a query in prepared format that will set the value of the PreparedQueryResultFuture return object as soon as the query is executed.
//! The return value is then processed in ProcessQueryCallback methods.
//! Statement must be prepared with CONNECTION_ASYNC flag.
PreparedQueryResultFuture AsyncQuery(PreparedStatement* stmt)
{
PreparedQueryResultFuture res;
PreparedStatementTask* task = new PreparedStatementTask(stmt, res);
Enqueue(task);
return res;
}
//! Enqueues a vector of SQL operations (can be both adhoc and prepared) that will set the value of the QueryResultHolderFuture
//! return object as soon as the query is executed.
//! The return value is then processed in ProcessQueryCallback methods.
//! Any prepared statements added to this holder need to be prepared with the CONNECTION_ASYNC flag.
QueryResultHolderFuture DelayQueryHolder(SQLQueryHolder* holder)
{
QueryResultHolderFuture res;
SQLQueryHolderTask* task = new SQLQueryHolderTask(holder, res);
Enqueue(task);
return res; //! Fool compiler, has no use yet
}
/**
Transaction context methods.
*/
//! Begins an automanaged transaction pointer that will automatically rollback if not commited. (Autocommit=0)
SQLTransaction BeginTransaction()
{
return SQLTransaction(new Transaction);
}
//! Enqueues a collection of one-way SQL operations (can be both adhoc and prepared). The order in which these operations
//! were appended to the transaction will be respected during execution.
void CommitTransaction(SQLTransaction transaction)
{
#ifdef TRINITY_DEBUG
//! Only analyze transaction weaknesses in Debug mode.
//! Ideally we catch the faults in Debug mode and then correct them,
//! so there's no need to waste these CPU cycles in Release mode.
switch (transaction->GetSize())
{
case 0:
sLog->outSQLDriver("Transaction contains 0 queries. Not executing.");
return;
case 1:
sLog->outSQLDriver("Warning: Transaction only holds 1 query, consider removing Transaction context in code.");
break;
default:
break;
}
#endif // TRINITY_DEBUG
Enqueue(new TransactionTask(transaction));
}
//! Directly executes a collection of one-way SQL operations (can be both adhoc and prepared). The order in which these operations
//! were appended to the transaction will be respected during execution.
void DirectCommitTransaction(SQLTransaction& transaction)
{
T* con = GetFreeConnection();
if (con->ExecuteTransaction(transaction))
{
con->Unlock(); // OK, operation succesful
return;
}
//! Handle MySQL Errno 1213 without extending deadlock to the core itself
//! TODO: More elegant way
if (con->GetLastError() == 1213)
{
uint8 loopBreaker = 5;
for (uint8 i = 0; i < loopBreaker; ++i)
{
if (con->ExecuteTransaction(transaction))
break;
}
}
//! Clean up now.
transaction->Cleanup();
con->Unlock();
}
//! Method used to execute prepared statements in a diverse context.
//! Will be wrapped in a transaction if valid object is present, otherwise executed standalone.
void ExecuteOrAppend(SQLTransaction& trans, PreparedStatement* stmt)
{
if (trans.null())
Execute(stmt);
else
trans->Append(stmt);
}
//! Method used to execute ad-hoc statements in a diverse context.
//! Will be wrapped in a transaction if valid object is present, otherwise executed standalone.
void ExecuteOrAppend(SQLTransaction& trans, const char* sql)
{
if (trans.null())
Execute(sql);
else
trans->Append(sql);
}
/**
Other
*/
//! Automanaged (internally) pointer to a prepared statement object for usage in upper level code.
//! Pointer is deleted in this->DirectExecute(PreparedStatement*), this->Query(PreparedStatement*) or PreparedStatementTask::~PreparedStatementTask.
//! This object is not tied to the prepared statement on the MySQL context yet until execution.
PreparedStatement* GetPreparedStatement(uint32 index)
{
return new PreparedStatement(index);
}
//! Apply escape string'ing for current collation. (utf8)
void EscapeString(std::string& str)
{
if (str.empty())
return;
char* buf = new char[str.size()*2+1];
EscapeString(buf, str.c_str(), str.size());
str = buf;
delete[] buf;
}
//! Keeps all our MySQL connections alive, prevent the server from disconnecting us.
void KeepAlive()
{
//! Ping synchronous connections
for (uint8 i = 0; i < _connectionCount[IDX_SYNCH]; ++i)
{
T* t = _connections[IDX_SYNCH][i];
if (t->LockIfReady())
{
t->Ping();
t->Unlock();
}
}
//! Assuming all worker threads are free, every worker thread will receive 1 ping operation request
//! If one or more worker threads are busy, the ping operations will not be split evenly, but this doesn't matter
//! as the sole purpose is to prevent connections from idling.
for (size_t i = 0; i < _connections[IDX_ASYNC].size(); ++i)
Enqueue(new PingOperation);
}
char const* GetDatabaseName() const
{
return _connectionInfo.database.c_str();
}
private:
unsigned long EscapeString(char *to, const char *from, unsigned long length)
{
if (!to || !from || !length)
return 0;
return mysql_real_escape_string(_connections[IDX_SYNCH][0]->GetHandle(), to, from, length);
}
void Enqueue(SQLOperation* op)
{
_queue->enqueue(op);
}
//! Gets a free connection in the synchronous connection pool.
//! Caller MUST call t->Unlock() after touching the MySQL context to prevent deadlocks.
T* GetFreeConnection()
{
uint8 i = 0;
size_t num_cons = _connectionCount[IDX_SYNCH];
T* t = NULL;
//! Block forever until a connection is free
for (;;)
{
t = _connections[IDX_SYNCH][++i % num_cons];
//! Must be matched with t->Unlock() or you will get deadlocks
if (t->LockIfReady())
break;
}
return t;
}
private:
enum _internalIndex
{
IDX_ASYNC,
IDX_SYNCH,
IDX_SIZE
};
ACE_Message_Queue* _mqueue;
ACE_Activation_Queue* _queue; //! Queue shared by async worker threads.
std::vector< std::vector > _connections;
uint32 _connectionCount[2]; //! Counter of MySQL connections;
MySQLConnectionInfo _connectionInfo;
};
#endif