aboutsummaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorNaios <naios-dev@live.de>2016-03-03 01:31:51 +0100
committerNaios <naios-dev@live.de>2016-03-03 01:51:49 +0100
commit31c8f9a7ed9acee9a02be690bfb70a820f9c53e8 (patch)
tree0876b8533f042bec336d360370e0e3a5edbed21d
parent6a4a65521dbfbf3b270f385554b89c70728d1214 (diff)
Core/Database: Move DatabaseWorkerPool into it's own translation unit
(cherry picked from commit 09fa0ab46a487ea85d475ac0b3495724c170fe1a)
-rw-r--r--src/server/database/Database/DatabaseWorkerPool.cpp323
-rw-r--r--src/server/database/Database/DatabaseWorkerPool.h277
2 files changed, 339 insertions, 261 deletions
diff --git a/src/server/database/Database/DatabaseWorkerPool.cpp b/src/server/database/Database/DatabaseWorkerPool.cpp
new file mode 100644
index 00000000000..6c352bb11b8
--- /dev/null
+++ b/src/server/database/Database/DatabaseWorkerPool.cpp
@@ -0,0 +1,323 @@
+/*
+ * Copyright (C) 2008-2016 TrinityCore <http://www.trinitycore.org/>
+ *
+ * This program is free software; you can redistribute it and/or modify it
+ * under the terms of the GNU General Public License as published by the
+ * Free Software Foundation; either version 2 of the License, or (at your
+ * option) any later version.
+ *
+ * This program is distributed in the hope that it will be useful, but WITHOUT
+ * ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or
+ * FITNESS FOR A PARTICULAR PURPOSE. See the GNU General Public License for
+ * more details.
+ *
+ * You should have received a copy of the GNU General Public License along
+ * with this program. If not, see <http://www.gnu.org/licenses/>.
+ */
+
+#include "DatabaseWorkerPool.h"
+#include "DatabaseEnv.h"
+
+#define MIN_MYSQL_SERVER_VERSION 50100u
+#define MIN_MYSQL_CLIENT_VERSION 50100u
+
+template <class T>
+DatabaseWorkerPool<T>::DatabaseWorkerPool()
+ : _queue(new ProducerConsumerQueue<SQLOperation*>()),
+ _async_threads(0), _synch_threads(0)
+{
+ WPFatal(mysql_thread_safe(), "Used MySQL library isn't thread-safe.");
+ WPFatal(mysql_get_client_version() >= MIN_MYSQL_CLIENT_VERSION, "TrinityCore does not support MySQL versions below 5.1");
+ WPFatal(mysql_get_client_version() == MYSQL_VERSION_ID, "Used MySQL library version (%s) does not match the version used to compile TrinityCore (%s).",
+ mysql_get_client_info(), MYSQL_SERVER_VERSION);
+}
+
+template <class T>
+void DatabaseWorkerPool<T>::SetConnectionInfo(std::string const& infoString,
+ uint8 const asyncThreads, uint8 const synchThreads)
+{
+ _connectionInfo = Trinity::make_unique<MySQLConnectionInfo>(infoString);
+
+ _async_threads = asyncThreads;
+ _synch_threads = synchThreads;
+}
+
+template <class T>
+uint32 DatabaseWorkerPool<T>::Open()
+{
+ WPFatal(_connectionInfo.get(), "Connection info was not set!");
+
+ TC_LOG_INFO("sql.driver", "Opening DatabasePool '%s'. "
+ "Asynchronous connections: %u, synchronous connections: %u.",
+ GetDatabaseName(), _async_threads, _synch_threads);
+
+ uint32 error = OpenConnections(IDX_ASYNC, _async_threads);
+
+ if (error)
+ return error;
+
+ error = OpenConnections(IDX_SYNCH, _synch_threads);
+
+ if (!error)
+ {
+ TC_LOG_INFO("sql.driver", "DatabasePool '%s' opened successfully. " SZFMTD
+ " total connections running.", GetDatabaseName(),
+ (_connections[IDX_SYNCH].size() + _connections[IDX_ASYNC].size()));
+ }
+
+ return error;
+}
+
+template <class T>
+void DatabaseWorkerPool<T>::Close()
+{
+ TC_LOG_INFO("sql.driver", "Closing down DatabasePool '%s'.", GetDatabaseName());
+
+ //! Closes the actualy MySQL connection.
+ _connections[IDX_ASYNC].clear();
+
+ TC_LOG_INFO("sql.driver", "Asynchronous connections on DatabasePool '%s' terminated. "
+ "Proceeding with synchronous connections.",
+ GetDatabaseName());
+
+ //! Shut down the synchronous connections
+ //! There's no need for locking the connection, because DatabaseWorkerPool<>::Close
+ //! should only be called after any other thread tasks in the core have exited,
+ //! meaning there can be no concurrent access at this point.
+ _connections[IDX_SYNCH].clear();
+
+ TC_LOG_INFO("sql.driver", "All connections on DatabasePool '%s' closed.", GetDatabaseName());
+}
+
+template <class T>
+bool DatabaseWorkerPool<T>::PrepareStatements()
+{
+ for (auto& connections : _connections)
+ for (auto& connection : connections)
+ {
+ connection->LockIfReady();
+ if (!connection->PrepareStatements())
+ {
+ connection->Unlock();
+ Close();
+ return false;
+ }
+ else
+ connection->Unlock();
+ }
+
+ return true;
+}
+
+template <class T>
+QueryResult DatabaseWorkerPool<T>::Query(const char* sql, T* connection /*= nullptr*/)
+{
+ if (!connection)
+ connection = GetFreeConnection();
+
+ ResultSet* result = connection->Query(sql);
+ connection->Unlock();
+ if (!result || !result->GetRowCount() || !result->NextRow())
+ {
+ delete result;
+ return QueryResult(NULL);
+ }
+
+ return QueryResult(result);
+}
+
+template <class T>
+PreparedQueryResult DatabaseWorkerPool<T>::Query(PreparedStatement* stmt)
+{
+ auto connection = GetFreeConnection();
+ PreparedResultSet* ret = connection->Query(stmt);
+ connection->Unlock();
+
+ //! Delete proxy-class. Not needed anymore
+ delete stmt;
+
+ if (!ret || !ret->GetRowCount())
+ {
+ delete ret;
+ return PreparedQueryResult(NULL);
+ }
+
+ return PreparedQueryResult(ret);
+}
+
+template <class T>
+QueryResultFuture DatabaseWorkerPool<T>::AsyncQuery(const char* sql)
+{
+ BasicStatementTask* task = new BasicStatementTask(sql, true);
+ // Store future result before enqueueing - task might get already processed and deleted before returning from this method
+ QueryResultFuture result = task->GetFuture();
+ Enqueue(task);
+ return result;
+}
+
+template <class T>
+PreparedQueryResultFuture DatabaseWorkerPool<T>::AsyncQuery(PreparedStatement* stmt)
+{
+ PreparedStatementTask* task = new PreparedStatementTask(stmt, true);
+ // Store future result before enqueueing - task might get already processed and deleted before returning from this method
+ PreparedQueryResultFuture result = task->GetFuture();
+ Enqueue(task);
+ return result;
+}
+
+template <class T>
+QueryResultHolderFuture DatabaseWorkerPool<T>::DelayQueryHolder(SQLQueryHolder* holder)
+{
+ SQLQueryHolderTask* task = new SQLQueryHolderTask(holder);
+ // Store future result before enqueueing - task might get already processed and deleted before returning from this method
+ QueryResultHolderFuture result = task->GetFuture();
+ Enqueue(task);
+ return result;
+}
+
+template <class T>
+void DatabaseWorkerPool<T>::CommitTransaction(SQLTransaction transaction)
+{
+#ifdef TRINITY_DEBUG
+ //! Only analyze transaction weaknesses in Debug mode.
+ //! Ideally we catch the faults in Debug mode and then correct them,
+ //! so there's no need to waste these CPU cycles in Release mode.
+ switch (transaction->GetSize())
+ {
+ case 0:
+ TC_LOG_DEBUG("sql.driver", "Transaction contains 0 queries. Not executing.");
+ return;
+ case 1:
+ TC_LOG_DEBUG("sql.driver", "Warning: Transaction only holds 1 query, consider removing Transaction context in code.");
+ break;
+ default:
+ break;
+ }
+#endif // TRINITY_DEBUG
+
+ Enqueue(new TransactionTask(transaction));
+}
+
+template <class T>
+void DatabaseWorkerPool<T>::DirectCommitTransaction(SQLTransaction& transaction)
+{
+ T* connection = GetFreeConnection();
+ int errorCode = connection->ExecuteTransaction(transaction);
+ if (!errorCode)
+ {
+ connection->Unlock(); // OK, operation succesful
+ return;
+ }
+
+ //! Handle MySQL Errno 1213 without extending deadlock to the core itself
+ /// @todo More elegant way
+ if (errorCode == ER_LOCK_DEADLOCK)
+ {
+ uint8 loopBreaker = 5;
+ for (uint8 i = 0; i < loopBreaker; ++i)
+ {
+ if (!connection->ExecuteTransaction(transaction))
+ break;
+ }
+ }
+
+ //! Clean up now.
+ transaction->Cleanup();
+
+ connection->Unlock();
+}
+
+template <class T>
+void DatabaseWorkerPool<T>::EscapeString(std::string& str)
+{
+ if (str.empty())
+ return;
+
+ char* buf = new char[str.size() * 2 + 1];
+ EscapeString(buf, str.c_str(), str.size());
+ str = buf;
+ delete[] buf;
+}
+
+template <class T>
+void DatabaseWorkerPool<T>::KeepAlive()
+{
+ //! Ping synchronous connections
+ for (auto& connection : _connections[IDX_SYNCH])
+ {
+ if (connection->LockIfReady())
+ {
+ connection->Ping();
+ connection->Unlock();
+ }
+ }
+
+ //! Assuming all worker threads are free, every worker thread will receive 1 ping operation request
+ //! If one or more worker threads are busy, the ping operations will not be split evenly, but this doesn't matter
+ //! as the sole purpose is to prevent connections from idling.
+ auto const count = _connections[IDX_ASYNC].size();
+ for (uint8 i = 0; i < count; ++i)
+ Enqueue(new PingOperation);
+}
+
+template <class T>
+uint32 DatabaseWorkerPool<T>::OpenConnections(InternalIndex type, uint8 numConnections)
+{
+ for (uint8 i = 0; i < numConnections; ++i)
+ {
+ // Create the connection
+ auto connection = [&] {
+ switch (type)
+ {
+ case IDX_ASYNC:
+ return Trinity::make_unique<T>(_queue.get(), *_connectionInfo);
+ case IDX_SYNCH:
+ return Trinity::make_unique<T>(*_connectionInfo);
+ default:
+ ABORT();
+ }
+ }();
+
+ if (uint32 error = connection->Open())
+ {
+ // Failed to open a connection or invalid version, abort and cleanup
+ _connections[type].clear();
+ return error;
+ }
+ else if (mysql_get_server_version(connection->GetHandle()) < MIN_MYSQL_SERVER_VERSION)
+ {
+ TC_LOG_ERROR("sql.driver", "TrinityCore does not support MySQL versions below 5.1");
+ return 1;
+ }
+ else
+ {
+ _connections[type].push_back(std::move(connection));
+ }
+ }
+
+ // Everything is fine
+ return 0;
+}
+
+template <class T>
+T* DatabaseWorkerPool<T>::GetFreeConnection()
+{
+ uint8 i = 0;
+ auto const num_cons = _connections[IDX_SYNCH].size();
+ T* connection = nullptr;
+ //! Block forever until a connection is free
+ for (;;)
+ {
+ connection = _connections[IDX_SYNCH][++i % num_cons].get();
+ //! Must be matched with t->Unlock() or you will get deadlocks
+ if (connection->LockIfReady())
+ break;
+ }
+
+ return connection;
+}
+
+template class DatabaseWorkerPool<LoginDatabaseConnection>;
+template class DatabaseWorkerPool<WorldDatabaseConnection>;
+template class DatabaseWorkerPool<CharacterDatabaseConnection>;
+template class DatabaseWorkerPool<HotfixDatabaseConnection>;
diff --git a/src/server/database/Database/DatabaseWorkerPool.h b/src/server/database/Database/DatabaseWorkerPool.h
index c2b73437455..d883366237f 100644
--- a/src/server/database/Database/DatabaseWorkerPool.h
+++ b/src/server/database/Database/DatabaseWorkerPool.h
@@ -34,9 +34,6 @@
#include <memory>
#include <array>
-#define MIN_MYSQL_SERVER_VERSION 50100u
-#define MIN_MYSQL_CLIENT_VERSION 50100u
-
class PingOperation : public SQLOperation
{
//! Operation for idle delaythreads
@@ -60,89 +57,21 @@ class DatabaseWorkerPool
public:
/* Activity state */
- DatabaseWorkerPool() : _queue(new ProducerConsumerQueue<SQLOperation*>()),
- _async_threads(0), _synch_threads(0)
- {
- WPFatal(mysql_thread_safe(), "Used MySQL library isn't thread-safe.");
- WPFatal(mysql_get_client_version() >= MIN_MYSQL_CLIENT_VERSION, "TrinityCore does not support MySQL versions below 5.1");
- WPFatal(mysql_get_client_version() == MYSQL_VERSION_ID, "Used MySQL library version (%s) does not match the version used to compile TrinityCore (%s).",
- mysql_get_client_info(), MYSQL_SERVER_VERSION);
- }
+ DatabaseWorkerPool();
~DatabaseWorkerPool()
{
_queue->Cancel();
}
- void SetConnectionInfo(std::string const& infoString, uint8 const asyncThreads, uint8 const synchThreads)
- {
- _connectionInfo = Trinity::make_unique<MySQLConnectionInfo>(infoString);
-
- _async_threads = asyncThreads;
- _synch_threads = synchThreads;
- }
-
- uint32 Open()
- {
- WPFatal(_connectionInfo.get(), "Connection info was not set!");
-
- TC_LOG_INFO("sql.driver", "Opening DatabasePool '%s'. Asynchronous connections: %u, synchronous connections: %u.",
- GetDatabaseName(), _async_threads, _synch_threads);
-
- uint32 error = OpenConnections(IDX_ASYNC, _async_threads);
-
- if (error)
- return error;
-
- error = OpenConnections(IDX_SYNCH, _synch_threads);
+ void SetConnectionInfo(std::string const& infoString, uint8 const asyncThreads, uint8 const synchThreads);
- if (!error)
- {
- TC_LOG_INFO("sql.driver", "DatabasePool '%s' opened successfully. %u total connections running.",
- GetDatabaseName(), (_connections[IDX_SYNCH].size() + _connections[IDX_ASYNC].size()));
- }
-
- return error;
- }
-
- void Close()
- {
- TC_LOG_INFO("sql.driver", "Closing down DatabasePool '%s'.", GetDatabaseName());
+ uint32 Open();
- //! Closes the actualy MySQL connection.
- _connections[IDX_ASYNC].clear();
-
- TC_LOG_INFO("sql.driver", "Asynchronous connections on DatabasePool '%s' terminated. Proceeding with synchronous connections.",
- GetDatabaseName());
-
- //! Shut down the synchronous connections
- //! There's no need for locking the connection, because DatabaseWorkerPool<>::Close
- //! should only be called after any other thread tasks in the core have exited,
- //! meaning there can be no concurrent access at this point.
- _connections[IDX_SYNCH].clear();
-
- TC_LOG_INFO("sql.driver", "All connections on DatabasePool '%s' closed.", GetDatabaseName());
- }
+ void Close();
//! Prepares all prepared statements
- bool PrepareStatements()
- {
- for (auto& connections : _connections)
- for (auto& connection : connections)
- {
- connection->LockIfReady();
- if (!connection->PrepareStatements())
- {
- connection->Unlock();
- Close();
- return false;
- }
- else
- connection->Unlock();
- }
-
- return true;
- }
+ bool PrepareStatements();
inline MySQLConnectionInfo const* GetConnectionInfo() const
{
@@ -228,21 +157,7 @@ class DatabaseWorkerPool
//! Directly executes an SQL query in string format that will block the calling thread until finished.
//! Returns reference counted auto pointer, no need for manual memory management in upper level code.
- QueryResult Query(const char* sql, T* connection = nullptr)
- {
- if (!connection)
- connection = GetFreeConnection();
-
- ResultSet* result = connection->Query(sql);
- connection->Unlock();
- if (!result || !result->GetRowCount() || !result->NextRow())
- {
- delete result;
- return QueryResult(NULL);
- }
-
- return QueryResult(result);
- }
+ QueryResult Query(const char* sql, T* connection = nullptr);
//! Directly executes an SQL query in string format -with variable args- that will block the calling thread until finished.
//! Returns reference counted auto pointer, no need for manual memory management in upper level code.
@@ -269,23 +184,7 @@ class DatabaseWorkerPool
//! Directly executes an SQL query in prepared format that will block the calling thread until finished.
//! Returns reference counted auto pointer, no need for manual memory management in upper level code.
//! Statement must be prepared with CONNECTION_SYNCH flag.
- PreparedQueryResult Query(PreparedStatement* stmt)
- {
- auto connection = GetFreeConnection();
- PreparedResultSet* ret = connection->Query(stmt);
- connection->Unlock();
-
- //! Delete proxy-class. Not needed anymore
- delete stmt;
-
- if (!ret || !ret->GetRowCount())
- {
- delete ret;
- return PreparedQueryResult(NULL);
- }
-
- return PreparedQueryResult(ret);
- }
+ PreparedQueryResult Query(PreparedStatement* stmt);
/**
Asynchronous query (with resultset) methods.
@@ -293,14 +192,7 @@ class DatabaseWorkerPool
//! Enqueues a query in string format that will set the value of the QueryResultFuture return object as soon as the query is executed.
//! The return value is then processed in ProcessQueryCallback methods.
- QueryResultFuture AsyncQuery(const char* sql)
- {
- BasicStatementTask* task = new BasicStatementTask(sql, true);
- // Store future result before enqueueing - task might get already processed and deleted before returning from this method
- QueryResultFuture result = task->GetFuture();
- Enqueue(task);
- return result;
- }
+ QueryResultFuture AsyncQuery(const char* sql);
//! Enqueues a query in string format -with variable args- that will set the value of the QueryResultFuture return object as soon as the query is executed.
//! The return value is then processed in ProcessQueryCallback methods.
@@ -313,27 +205,13 @@ class DatabaseWorkerPool
//! Enqueues a query in prepared format that will set the value of the PreparedQueryResultFuture return object as soon as the query is executed.
//! The return value is then processed in ProcessQueryCallback methods.
//! Statement must be prepared with CONNECTION_ASYNC flag.
- PreparedQueryResultFuture AsyncQuery(PreparedStatement* stmt)
- {
- PreparedStatementTask* task = new PreparedStatementTask(stmt, true);
- // Store future result before enqueueing - task might get already processed and deleted before returning from this method
- PreparedQueryResultFuture result = task->GetFuture();
- Enqueue(task);
- return result;
- }
+ PreparedQueryResultFuture AsyncQuery(PreparedStatement* stmt);
//! Enqueues a vector of SQL operations (can be both adhoc and prepared) that will set the value of the QueryResultHolderFuture
//! return object as soon as the query is executed.
//! The return value is then processed in ProcessQueryCallback methods.
//! Any prepared statements added to this holder need to be prepared with the CONNECTION_ASYNC flag.
- QueryResultHolderFuture DelayQueryHolder(SQLQueryHolder* holder)
- {
- SQLQueryHolderTask* task = new SQLQueryHolderTask(holder);
- // Store future result before enqueueing - task might get already processed and deleted before returning from this method
- QueryResultHolderFuture result = task->GetFuture();
- Enqueue(task);
- return result;
- }
+ QueryResultHolderFuture DelayQueryHolder(SQLQueryHolder* holder);
/**
Transaction context methods.
@@ -347,57 +225,11 @@ class DatabaseWorkerPool
//! Enqueues a collection of one-way SQL operations (can be both adhoc and prepared). The order in which these operations
//! were appended to the transaction will be respected during execution.
- void CommitTransaction(SQLTransaction transaction)
- {
- #ifdef TRINITY_DEBUG
- //! Only analyze transaction weaknesses in Debug mode.
- //! Ideally we catch the faults in Debug mode and then correct them,
- //! so there's no need to waste these CPU cycles in Release mode.
- switch (transaction->GetSize())
- {
- case 0:
- TC_LOG_DEBUG("sql.driver", "Transaction contains 0 queries. Not executing.");
- return;
- case 1:
- TC_LOG_DEBUG("sql.driver", "Warning: Transaction only holds 1 query, consider removing Transaction context in code.");
- break;
- default:
- break;
- }
- #endif // TRINITY_DEBUG
-
- Enqueue(new TransactionTask(transaction));
- }
+ void CommitTransaction(SQLTransaction transaction);
//! Directly executes a collection of one-way SQL operations (can be both adhoc and prepared). The order in which these operations
//! were appended to the transaction will be respected during execution.
- void DirectCommitTransaction(SQLTransaction& transaction)
- {
- T* connection = GetFreeConnection();
- int errorCode = connection->ExecuteTransaction(transaction);
- if (!errorCode)
- {
- connection->Unlock(); // OK, operation succesful
- return;
- }
-
- //! Handle MySQL Errno 1213 without extending deadlock to the core itself
- /// @todo More elegant way
- if (errorCode == ER_LOCK_DEADLOCK)
- {
- uint8 loopBreaker = 5;
- for (uint8 i = 0; i < loopBreaker; ++i)
- {
- if (!connection->ExecuteTransaction(transaction))
- break;
- }
- }
-
- //! Clean up now.
- transaction->Cleanup();
-
- connection->Unlock();
- }
+ void DirectCommitTransaction(SQLTransaction& transaction);
//! Method used to execute prepared statements in a diverse context.
//! Will be wrapped in a transaction if valid object is present, otherwise executed standalone.
@@ -434,75 +266,13 @@ class DatabaseWorkerPool
}
//! Apply escape string'ing for current collation. (utf8)
- void EscapeString(std::string& str)
- {
- if (str.empty())
- return;
-
- char* buf = new char[str.size() * 2 + 1];
- EscapeString(buf, str.c_str(), uint32(str.length()));
- str = buf;
- delete[] buf;
- }
+ void EscapeString(std::string& str);
//! Keeps all our MySQL connections alive, prevent the server from disconnecting us.
- void KeepAlive()
- {
- //! Ping synchronous connections
- for (auto& connection : _connections[IDX_SYNCH])
- {
- if (connection->LockIfReady())
- {
- connection->Ping();
- connection->Unlock();
- }
- }
-
- //! Assuming all worker threads are free, every worker thread will receive 1 ping operation request
- //! If one or more worker threads are busy, the ping operations will not be split evenly, but this doesn't matter
- //! as the sole purpose is to prevent connections from idling.
- for (auto& connection : _connections[IDX_ASYNC])
- Enqueue(new PingOperation);
- }
+ void KeepAlive();
private:
- uint32 OpenConnections(InternalIndex type, uint8 numConnections)
- {
- for (uint8 i = 0; i < numConnections; ++i)
- {
- // Create the connection
- auto connection = [&] {
- switch (type)
- {
- case IDX_ASYNC:
- return Trinity::make_unique<T>(_queue.get(), *_connectionInfo);
- case IDX_SYNCH:
- return Trinity::make_unique<T>(*_connectionInfo);
- default:
- ABORT();
- }
- }();
-
- if (uint32 error = connection->Open())
- {
- // Failed to open a connection or invalid version, abort and cleanup
- _connections[type].clear();
- return error;
- }
- else if (mysql_get_server_version(connection->GetHandle()) < MIN_MYSQL_SERVER_VERSION)
- {
- TC_LOG_ERROR("sql.driver", "TrinityCore does not support MySQL versions below 5.1");
- return 1;
- }
- else
- {
- _connections[type].push_back(std::move(connection));
- }
- }
-
- // Everything is fine
- return 0;
- }
+ uint32 OpenConnections(InternalIndex type, uint8 numConnections);
unsigned long EscapeString(char *to, const char *from, unsigned long length)
{
@@ -520,22 +290,7 @@ class DatabaseWorkerPool
//! Gets a free connection in the synchronous connection pool.
//! Caller MUST call t->Unlock() after touching the MySQL context to prevent deadlocks.
- T* GetFreeConnection()
- {
- uint8 i = 0;
- auto const num_cons = _connections[IDX_SYNCH].size();
- T* connection = nullptr;
- //! Block forever until a connection is free
- for (;;)
- {
- connection = _connections[IDX_SYNCH][++i % num_cons].get();
- //! Must be matched with t->Unlock() or you will get deadlocks
- if (connection->LockIfReady())
- break;
- }
-
- return connection;
- }
+ T* GetFreeConnection();
char const* GetDatabaseName() const
{