aboutsummaryrefslogtreecommitdiff
path: root/src/server/shared
diff options
context:
space:
mode:
Diffstat (limited to 'src/server/shared')
-rw-r--r--src/server/shared/Common.h47
-rw-r--r--src/server/shared/Database/DatabaseWorker.cpp2
-rw-r--r--src/server/shared/Database/DatabaseWorkerPool.h99
-rw-r--r--src/server/shared/Database/Implementation/CharacterDatabase.h2
-rw-r--r--src/server/shared/Database/Implementation/LoginDatabase.h2
-rw-r--r--src/server/shared/Database/Implementation/WorldDatabase.h2
-rw-r--r--src/server/shared/Database/MySQLConnection.cpp10
-rw-r--r--src/server/shared/Database/PreparedStatement.h2
-rw-r--r--src/server/shared/Database/QueryHolder.h2
-rw-r--r--src/server/shared/Database/QueryResult.cpp2
-rw-r--r--src/server/shared/Database/Transaction.h2
-rw-r--r--src/server/shared/Define.h3
-rw-r--r--src/server/shared/Dynamic/LinkedList.h2
-rw-r--r--src/server/shared/Logging/AppenderConsole.h2
-rw-r--r--src/server/shared/Logging/AppenderDB.h2
-rw-r--r--src/server/shared/Logging/AppenderFile.h2
-rw-r--r--src/server/shared/Networking/AsyncAcceptor.h12
-rw-r--r--src/server/shared/Networking/MessageBuffer.h93
-rw-r--r--src/server/shared/Networking/Socket.h182
-rw-r--r--src/server/shared/Packets/ByteBuffer.cpp11
-rw-r--r--src/server/shared/Packets/ByteBuffer.h14
-rw-r--r--src/server/shared/Threading/ProcessPriority.h2
-rw-r--r--src/server/shared/Threading/ProducerConsumerQueue.h4
-rw-r--r--src/server/shared/Utilities/Util.cpp3
24 files changed, 351 insertions, 153 deletions
diff --git a/src/server/shared/Common.h b/src/server/shared/Common.h
index e74e707a942..e9e9d4d58c0 100644
--- a/src/server/shared/Common.h
+++ b/src/server/shared/Common.h
@@ -19,46 +19,6 @@
#ifndef TRINITYCORE_COMMON_H
#define TRINITYCORE_COMMON_H
-// config.h needs to be included 1st
-/// @todo this thingy looks like hack, but its not, need to
-// make separate header however, because It makes mess here.
-#ifdef HAVE_CONFIG_H
-// Remove Some things that we will define
-// This is in case including another config.h
-// before trinity config.h
-#ifdef PACKAGE
-#undef PACKAGE
-#endif //PACKAGE
-#ifdef PACKAGE_BUGREPORT
-#undef PACKAGE_BUGREPORT
-#endif //PACKAGE_BUGREPORT
-#ifdef PACKAGE_NAME
-#undef PACKAGE_NAME
-#endif //PACKAGE_NAME
-#ifdef PACKAGE_STRING
-#undef PACKAGE_STRING
-#endif //PACKAGE_STRING
-#ifdef PACKAGE_TARNAME
-#undef PACKAGE_TARNAME
-#endif //PACKAGE_TARNAME
-#ifdef PACKAGE_VERSION
-#undef PACKAGE_VERSION
-#endif //PACKAGE_VERSION
-#ifdef VERSION
-#undef VERSION
-#endif //VERSION
-
-# include "Config.h"
-
-#undef PACKAGE
-#undef PACKAGE_BUGREPORT
-#undef PACKAGE_NAME
-#undef PACKAGE_STRING
-#undef PACKAGE_TARNAME
-#undef PACKAGE_VERSION
-#undef VERSION
-#endif //HAVE_CONFIG_H
-
#include "Define.h"
#include <unordered_map>
@@ -85,6 +45,13 @@
#if PLATFORM == PLATFORM_WINDOWS
# include <ws2tcpip.h>
+
+# if defined(__INTEL_COMPILER)
+# if !defined(BOOST_ASIO_HAS_MOVE)
+# define BOOST_ASIO_HAS_MOVE
+# endif // !defined(BOOST_ASIO_HAS_MOVE)
+# endif // if defined(__INTEL_COMPILER)
+
#else
# include <sys/types.h>
# include <sys/ioctl.h>
diff --git a/src/server/shared/Database/DatabaseWorker.cpp b/src/server/shared/Database/DatabaseWorker.cpp
index ca48ebdd811..e130429c8d0 100644
--- a/src/server/shared/Database/DatabaseWorker.cpp
+++ b/src/server/shared/Database/DatabaseWorker.cpp
@@ -50,7 +50,7 @@ void DatabaseWorker::WorkerThread()
_queue->WaitAndPop(operation);
- if (_cancelationToken)
+ if (_cancelationToken || !operation)
return;
operation->SetConnection(_connection);
diff --git a/src/server/shared/Database/DatabaseWorkerPool.h b/src/server/shared/Database/DatabaseWorkerPool.h
index 18797b6b12a..5548e44c925 100644
--- a/src/server/shared/Database/DatabaseWorkerPool.h
+++ b/src/server/shared/Database/DatabaseWorkerPool.h
@@ -35,7 +35,7 @@
class PingOperation : public SQLOperation
{
//! Operation for idle delaythreads
- bool Execute()
+ bool Execute() override
{
m_conn->Ping();
return true;
@@ -45,6 +45,14 @@ class PingOperation : public SQLOperation
template <class T>
class DatabaseWorkerPool
{
+ private:
+ enum InternalIndex
+ {
+ IDX_ASYNC,
+ IDX_SYNCH,
+ IDX_SIZE
+ };
+
public:
/* Activity state */
DatabaseWorkerPool() : _connectionInfo(NULL)
@@ -74,34 +82,17 @@ class DatabaseWorkerPool
TC_LOG_INFO("sql.driver", "Opening DatabasePool '%s'. Asynchronous connections: %u, synchronous connections: %u.",
GetDatabaseName(), async_threads, synch_threads);
- //! Open asynchronous connections (delayed operations)
- _connections[IDX_ASYNC].resize(async_threads);
- for (uint8 i = 0; i < async_threads; ++i)
- {
- T* t = new T(_queue, *_connectionInfo);
- res &= t->Open();
- if (res) // only check mysql version if connection is valid
- WPFatal(mysql_get_server_version(t->GetHandle()) >= MIN_MYSQL_SERVER_VERSION, "TrinityCore does not support MySQL versions below 5.1");
- _connections[IDX_ASYNC][i] = t;
- ++_connectionCount[IDX_ASYNC];
- }
+ res = OpenConnections(IDX_ASYNC, async_threads);
- //! Open synchronous connections (direct, blocking operations)
- _connections[IDX_SYNCH].resize(synch_threads);
- for (uint8 i = 0; i < synch_threads; ++i)
- {
- T* t = new T(*_connectionInfo);
- res &= t->Open();
- _connections[IDX_SYNCH][i] = t;
- ++_connectionCount[IDX_SYNCH];
- }
+ if (!res)
+ return res;
+
+ res = OpenConnections(IDX_SYNCH, synch_threads);
if (res)
TC_LOG_INFO("sql.driver", "DatabasePool '%s' opened successfully. %u total connections running.", GetDatabaseName(),
(_connectionCount[IDX_SYNCH] + _connectionCount[IDX_ASYNC]));
- else
- TC_LOG_ERROR("sql.driver", "DatabasePool %s NOT opened. There were errors opening the MySQL connections. Check your SQLDriverLogFile "
- "for specific errors. Read wiki at http://collab.kpsn.org/display/tc/TrinityCore+Home", GetDatabaseName());
+
return res;
}
@@ -112,8 +103,6 @@ class DatabaseWorkerPool
for (uint8 i = 0; i < _connectionCount[IDX_ASYNC]; ++i)
{
T* t = _connections[IDX_ASYNC][i];
- DatabaseWorker* worker = t->m_worker;
- delete worker;
t->Close(); //! Closes the actualy MySQL connection.
}
@@ -442,7 +431,7 @@ class DatabaseWorkerPool
if (str.empty())
return;
- char* buf = new char[str.size()*2+1];
+ char* buf = new char[str.size() * 2 + 1];
EscapeString(buf, str.c_str(), str.size());
str = buf;
delete[] buf;
@@ -470,6 +459,54 @@ class DatabaseWorkerPool
}
private:
+ bool OpenConnections(InternalIndex type, uint8 numConnections)
+ {
+ _connections[type].resize(numConnections);
+ for (uint8 i = 0; i < numConnections; ++i)
+ {
+ T* t;
+
+ if (type == IDX_ASYNC)
+ t = new T(_queue, *_connectionInfo);
+ else if (type == IDX_SYNCH)
+ t = new T(*_connectionInfo);
+ else
+ ASSERT(false);
+
+ _connections[type][i] = t;
+ ++_connectionCount[type];
+
+ bool res = t->Open();
+
+ if (res)
+ {
+ if (mysql_get_server_version(t->GetHandle()) < MIN_MYSQL_SERVER_VERSION)
+ {
+ TC_LOG_ERROR("sql.driver", "TrinityCore does not support MySQL versions below 5.1");
+ res = false;
+ }
+ }
+
+ // Failed to open a connection or invalid version, abort and cleanup
+ if (!res)
+ {
+ TC_LOG_ERROR("sql.driver", "DatabasePool %s NOT opened. There were errors opening the MySQL connections. Check your SQLDriverLogFile "
+ "for specific errors. Read wiki at http://collab.kpsn.org/display/tc/TrinityCore+Home", GetDatabaseName());
+
+ while (_connectionCount[type] != 0)
+ {
+ T* t = _connections[type][i--];
+ delete t;
+ --_connectionCount[type];
+ }
+
+ return false;
+ }
+ }
+
+ return true;
+ }
+
unsigned long EscapeString(char *to, const char *from, unsigned long length)
{
if (!to || !from || !length)
@@ -507,14 +544,6 @@ class DatabaseWorkerPool
return _connectionInfo->database.c_str();
}
- private:
- enum _internalIndex
- {
- IDX_ASYNC,
- IDX_SYNCH,
- IDX_SIZE
- };
-
ProducerConsumerQueue<SQLOperation*>* _queue; //! Queue shared by async worker threads.
std::vector< std::vector<T*> > _connections;
uint32 _connectionCount[2]; //! Counter of MySQL connections;
diff --git a/src/server/shared/Database/Implementation/CharacterDatabase.h b/src/server/shared/Database/Implementation/CharacterDatabase.h
index 73eac6e30d6..5accf57f132 100644
--- a/src/server/shared/Database/Implementation/CharacterDatabase.h
+++ b/src/server/shared/Database/Implementation/CharacterDatabase.h
@@ -29,7 +29,7 @@ class CharacterDatabaseConnection : public MySQLConnection
CharacterDatabaseConnection(ProducerConsumerQueue<SQLOperation*>* q, MySQLConnectionInfo& connInfo) : MySQLConnection(q, connInfo) { }
//- Loads database type specific prepared statements
- void DoPrepareStatements();
+ void DoPrepareStatements() override;
};
typedef DatabaseWorkerPool<CharacterDatabaseConnection> CharacterDatabaseWorkerPool;
diff --git a/src/server/shared/Database/Implementation/LoginDatabase.h b/src/server/shared/Database/Implementation/LoginDatabase.h
index 830fd625931..6fc452808de 100644
--- a/src/server/shared/Database/Implementation/LoginDatabase.h
+++ b/src/server/shared/Database/Implementation/LoginDatabase.h
@@ -29,7 +29,7 @@ class LoginDatabaseConnection : public MySQLConnection
LoginDatabaseConnection(ProducerConsumerQueue<SQLOperation*>* q, MySQLConnectionInfo& connInfo) : MySQLConnection(q, connInfo) { }
//- Loads database type specific prepared statements
- void DoPrepareStatements();
+ void DoPrepareStatements() override;
};
typedef DatabaseWorkerPool<LoginDatabaseConnection> LoginDatabaseWorkerPool;
diff --git a/src/server/shared/Database/Implementation/WorldDatabase.h b/src/server/shared/Database/Implementation/WorldDatabase.h
index c8c38d8a629..625dfc68ce6 100644
--- a/src/server/shared/Database/Implementation/WorldDatabase.h
+++ b/src/server/shared/Database/Implementation/WorldDatabase.h
@@ -29,7 +29,7 @@ class WorldDatabaseConnection : public MySQLConnection
WorldDatabaseConnection(ProducerConsumerQueue<SQLOperation*>* q, MySQLConnectionInfo& connInfo) : MySQLConnection(q, connInfo) { }
//- Loads database type specific prepared statements
- void DoPrepareStatements();
+ void DoPrepareStatements() override;
};
typedef DatabaseWorkerPool<WorldDatabaseConnection> WorldDatabaseWorkerPool;
diff --git a/src/server/shared/Database/MySQLConnection.cpp b/src/server/shared/Database/MySQLConnection.cpp
index 8b24f508331..4e46ff0e3a1 100644
--- a/src/server/shared/Database/MySQLConnection.cpp
+++ b/src/server/shared/Database/MySQLConnection.cpp
@@ -57,12 +57,14 @@ m_connectionFlags(CONNECTION_ASYNC)
MySQLConnection::~MySQLConnection()
{
- ASSERT (m_Mysql); /// MySQL context must be present at this point
-
for (size_t i = 0; i < m_stmts.size(); ++i)
delete m_stmts[i];
- mysql_close(m_Mysql);
+ if (m_Mysql)
+ mysql_close(m_Mysql);
+
+ if (m_worker)
+ delete m_worker;
}
void MySQLConnection::Close()
@@ -112,7 +114,7 @@ bool MySQLConnection::Open()
else // generic case
{
port = atoi(m_connectionInfo.port_or_socket.c_str());
- unix_socket = 0;
+ unix_socket = nullptr;
}
#endif
diff --git a/src/server/shared/Database/PreparedStatement.h b/src/server/shared/Database/PreparedStatement.h
index 16f7a9141d3..5af52cde016 100644
--- a/src/server/shared/Database/PreparedStatement.h
+++ b/src/server/shared/Database/PreparedStatement.h
@@ -163,7 +163,7 @@ class PreparedStatementTask : public SQLOperation
PreparedStatementTask(PreparedStatement* stmt, bool async = false);
~PreparedStatementTask();
- bool Execute();
+ bool Execute() override;
PreparedQueryResultFuture GetFuture() { return m_result->get_future(); }
protected:
diff --git a/src/server/shared/Database/QueryHolder.h b/src/server/shared/Database/QueryHolder.h
index 37e23ecd653..273980c3ac5 100644
--- a/src/server/shared/Database/QueryHolder.h
+++ b/src/server/shared/Database/QueryHolder.h
@@ -52,7 +52,7 @@ class SQLQueryHolderTask : public SQLOperation
SQLQueryHolderTask(SQLQueryHolder* holder)
: m_holder(holder) { };
- bool Execute();
+ bool Execute() override;
QueryResultHolderFuture GetFuture() { return m_result.get_future(); }
};
diff --git a/src/server/shared/Database/QueryResult.cpp b/src/server/shared/Database/QueryResult.cpp
index 06b09d43168..a7b8ec2b107 100644
--- a/src/server/shared/Database/QueryResult.cpp
+++ b/src/server/shared/Database/QueryResult.cpp
@@ -124,7 +124,7 @@ m_length(NULL)
*m_rBind[fIndex].length);
break;
default:
- m_rows[uint32(m_rowPosition)][fIndex].SetByteValue(0,
+ m_rows[uint32(m_rowPosition)][fIndex].SetByteValue(nullptr,
m_rBind[fIndex].buffer_length,
m_rBind[fIndex].buffer_type,
*m_rBind[fIndex].length);
diff --git a/src/server/shared/Database/Transaction.h b/src/server/shared/Database/Transaction.h
index c7cbbbbe712..3822c2c82c1 100644
--- a/src/server/shared/Database/Transaction.h
+++ b/src/server/shared/Database/Transaction.h
@@ -63,7 +63,7 @@ class TransactionTask : public SQLOperation
~TransactionTask(){ };
protected:
- bool Execute();
+ bool Execute() override;
SQLTransaction m_trans;
};
diff --git a/src/server/shared/Define.h b/src/server/shared/Define.h
index 2a04b8cdfad..8350815243c 100644
--- a/src/server/shared/Define.h
+++ b/src/server/shared/Define.h
@@ -28,6 +28,9 @@
# if !defined(__STDC_CONSTANT_MACROS)
# define __STDC_CONSTANT_MACROS
# endif
+# if !defined(_GLIBCXX_USE_NANOSLEEP)
+# define _GLIBCXX_USE_NANOSLEEP
+# endif
#endif
#include <cstddef>
diff --git a/src/server/shared/Dynamic/LinkedList.h b/src/server/shared/Dynamic/LinkedList.h
index 402aaf3d40c..87bbeaac380 100644
--- a/src/server/shared/Dynamic/LinkedList.h
+++ b/src/server/shared/Dynamic/LinkedList.h
@@ -150,7 +150,7 @@ class LinkedListHead
typedef _Ty& reference;
typedef _Ty const & const_reference;
- Iterator() : _Ptr(0)
+ Iterator() : _Ptr(nullptr)
{ // construct with null node pointer
}
diff --git a/src/server/shared/Logging/AppenderConsole.h b/src/server/shared/Logging/AppenderConsole.h
index b8f15b4fa0f..5b66f86650d 100644
--- a/src/server/shared/Logging/AppenderConsole.h
+++ b/src/server/shared/Logging/AppenderConsole.h
@@ -51,7 +51,7 @@ class AppenderConsole: public Appender
private:
void SetColor(bool stdout_stream, ColorTypes color);
void ResetColor(bool stdout_stream);
- void _write(LogMessage const& message);
+ void _write(LogMessage const& message) override;
bool _colored;
ColorTypes _colors[MaxLogLevels];
};
diff --git a/src/server/shared/Logging/AppenderDB.h b/src/server/shared/Logging/AppenderDB.h
index 660992261fd..b86252d0d67 100644
--- a/src/server/shared/Logging/AppenderDB.h
+++ b/src/server/shared/Logging/AppenderDB.h
@@ -31,7 +31,7 @@ class AppenderDB: public Appender
private:
uint32 realmId;
bool enabled;
- void _write(LogMessage const& message);
+ void _write(LogMessage const& message) override;
};
#endif
diff --git a/src/server/shared/Logging/AppenderFile.h b/src/server/shared/Logging/AppenderFile.h
index a600c92d152..37ba2769e19 100644
--- a/src/server/shared/Logging/AppenderFile.h
+++ b/src/server/shared/Logging/AppenderFile.h
@@ -30,7 +30,7 @@ class AppenderFile: public Appender
private:
void CloseFile();
- void _write(LogMessage const& message);
+ void _write(LogMessage const& message) override;
FILE* logfile;
std::string filename;
std::string logDir;
diff --git a/src/server/shared/Networking/AsyncAcceptor.h b/src/server/shared/Networking/AsyncAcceptor.h
index d056731bb79..64665c2b198 100644
--- a/src/server/shared/Networking/AsyncAcceptor.h
+++ b/src/server/shared/Networking/AsyncAcceptor.h
@@ -18,6 +18,7 @@
#ifndef __ASYNCACCEPT_H_
#define __ASYNCACCEPT_H_
+#include "Log.h"
#include <boost/asio.hpp>
using boost::asio::ip::tcp;
@@ -49,8 +50,15 @@ private:
{
if (!error)
{
- // this-> is required here to fix an segmentation fault in gcc 4.7.2 - reason is lambdas in a templated class
- std::make_shared<T>(std::move(this->_socket))->Start();
+ try
+ {
+ // this-> is required here to fix an segmentation fault in gcc 4.7.2 - reason is lambdas in a templated class
+ std::make_shared<T>(std::move(this->_socket))->Start();
+ }
+ catch (boost::system::system_error const& err)
+ {
+ TC_LOG_INFO("network", "Failed to retrieve client's remote address %s", err.what());
+ }
}
// lets slap some more this-> on this so we can fix this bug with gcc 4.7.2 throwing internals in yo face
diff --git a/src/server/shared/Networking/MessageBuffer.h b/src/server/shared/Networking/MessageBuffer.h
new file mode 100644
index 00000000000..547b39130a4
--- /dev/null
+++ b/src/server/shared/Networking/MessageBuffer.h
@@ -0,0 +1,93 @@
+/*
+* Copyright (C) 2008-2014 TrinityCore <http://www.trinitycore.org/>
+*
+* This program is free software; you can redistribute it and/or modify it
+* under the terms of the GNU General Public License as published by the
+* Free Software Foundation; either version 2 of the License, or (at your
+* option) any later version.
+*
+* This program is distributed in the hope that it will be useful, but WITHOUT
+* ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or
+* FITNESS FOR A PARTICULAR PURPOSE. See the GNU General Public License for
+* more details.
+*
+* You should have received a copy of the GNU General Public License along
+* with this program. If not, see <http://www.gnu.org/licenses/>.
+*/
+
+#ifndef __MESSAGEBUFFER_H_
+#define __MESSAGEBUFFER_H_
+
+#include "Define.h"
+#include <vector>
+
+class MessageBuffer
+{
+ typedef std::vector<uint8>::size_type size_type;
+
+public:
+ MessageBuffer() : _wpos(0), _storage() { }
+
+ MessageBuffer(MessageBuffer const& right) : _wpos(right._wpos), _storage(right._storage) { }
+
+ MessageBuffer(MessageBuffer&& right) : _wpos(right._wpos), _storage(right.Move()) { }
+
+ void Reset()
+ {
+ _storage.clear();
+ _wpos = 0;
+ }
+
+ bool IsMessageReady() const { return _wpos == _storage.size(); }
+
+ size_type GetMissingSize() const { return _storage.size() - _wpos; }
+
+ uint8* Data() { return _storage.data(); }
+
+ void Grow(size_type bytes)
+ {
+ _storage.resize(_storage.size() + bytes);
+ }
+
+ uint8* GetWritePointer() { return &_storage[_wpos]; }
+
+ void WriteCompleted(size_type bytes) { _wpos += bytes; }
+
+ void ResetWritePointer() { _wpos = 0; }
+
+ size_type GetSize() const { return _storage.size(); }
+
+ std::vector<uint8>&& Move()
+ {
+ _wpos = 0;
+ return std::move(_storage);
+ }
+
+ MessageBuffer& operator=(MessageBuffer& right)
+ {
+ if (this != &right)
+ {
+ _wpos = right._wpos;
+ _storage = right._storage;
+ }
+
+ return *this;
+ }
+
+ MessageBuffer& operator=(MessageBuffer&& right)
+ {
+ if (this != &right)
+ {
+ _wpos = right._wpos;
+ _storage = right.Move();
+ }
+
+ return *this;
+ }
+
+private:
+ size_type _wpos;
+ std::vector<uint8> _storage;
+};
+
+#endif /* __MESSAGEBUFFER_H_ */
diff --git a/src/server/shared/Networking/Socket.h b/src/server/shared/Networking/Socket.h
index 38d88e3592c..dd638c059a5 100644
--- a/src/server/shared/Networking/Socket.h
+++ b/src/server/shared/Networking/Socket.h
@@ -18,8 +18,9 @@
#ifndef __SOCKET_H__
#define __SOCKET_H__
-#include "Define.h"
+#include "MessageBuffer.h"
#include "Log.h"
+#include <atomic>
#include <vector>
#include <mutex>
#include <queue>
@@ -28,68 +29,92 @@
#include <type_traits>
#include <boost/asio/ip/tcp.hpp>
#include <boost/asio/write.hpp>
+#include <boost/asio/read.hpp>
using boost::asio::ip::tcp;
+#define READ_BLOCK_SIZE 4096
+
template<class T, class PacketType>
class Socket : public std::enable_shared_from_this<T>
{
typedef typename std::conditional<std::is_pointer<PacketType>::value, PacketType, PacketType const&>::type WritePacketType;
public:
- Socket(tcp::socket&& socket, std::size_t headerSize) : _socket(std::move(socket)), _headerSize(headerSize) { }
-
- virtual void Start() = 0;
+ Socket(tcp::socket&& socket, std::size_t headerSize) : _socket(std::move(socket)), _remoteAddress(_socket.remote_endpoint().address()),
+ _remotePort(_socket.remote_endpoint().port()), _readHeaderBuffer(), _readDataBuffer(), _closed(false)
+ {
+ _readHeaderBuffer.Grow(headerSize);
+ }
- boost::asio::ip::address GetRemoteIpAddress() const
+ virtual ~Socket()
{
boost::system::error_code error;
- auto ep = _socket.remote_endpoint(error);
+ _socket.close(error);
- if (error)
+ while (!_writeQueue.empty())
{
- TC_LOG_DEBUG("network", "Socket::GetRemoteIpAddress: errored with: %i (%s)", error.value(), error.message().c_str());
- return boost::asio::ip::address();
+ DeletePacket(_writeQueue.front());
+ _writeQueue.pop();
}
- else
- return ep.address();
}
- uint16 GetRemotePort() const
+ virtual void Start() = 0;
+
+ boost::asio::ip::address GetRemoteIpAddress() const
{
- boost::system::error_code error;
- auto ep = _socket.remote_endpoint(error);
+ return _remoteAddress;
+ }
- if (error)
- {
- TC_LOG_DEBUG("network", "Socket::GetRemotePort: errored with: %i (%s)", error.value(), error.message().c_str());
- return 0;
- }
- else
- return ep.port();
+ uint16 GetRemotePort() const
+ {
+ return _remotePort;
}
void AsyncReadHeader()
{
- _socket.async_read_some(boost::asio::buffer(_readBuffer, _headerSize), std::bind(&Socket<T, PacketType>::ReadHeaderHandlerInternal, this->shared_from_this(),
- std::placeholders::_1, std::placeholders::_2));
+ if (!IsOpen())
+ return;
+
+ _readHeaderBuffer.ResetWritePointer();
+ _readDataBuffer.Reset();
+
+ AsyncReadMissingHeaderData();
}
- void AsyncReadData(std::size_t size, std::size_t bufferOffset)
+ void AsyncReadData(std::size_t size)
{
- _socket.async_read_some(boost::asio::buffer(&_readBuffer[bufferOffset], size), std::bind(&Socket<T, PacketType>::ReadDataHandlerInternal, this->shared_from_this(),
- std::placeholders::_1, std::placeholders::_2));
+ if (!IsOpen())
+ return;
+
+ if (!size)
+ {
+ // if this is a packet with 0 length body just invoke handler directly
+ ReadDataHandler();
+ return;
+ }
+
+ _readDataBuffer.Grow(size);
+ AsyncReadMissingData();
}
- void ReadData(std::size_t size, std::size_t bufferOffset)
+ void ReadData(std::size_t size)
{
+ if (!IsOpen())
+ return;
+
boost::system::error_code error;
- _socket.read_some(boost::asio::buffer(&_readBuffer[bufferOffset], size), error);
+ _readDataBuffer.Grow(size);
- if (error)
+ std::size_t bytesRead = boost::asio::read(_socket, boost::asio::buffer(_readDataBuffer.GetWritePointer(), size), error);
+
+ _readDataBuffer.WriteCompleted(bytesRead);
+
+ if (error || !_readDataBuffer.IsMessageReady())
{
- TC_LOG_DEBUG("network", "Socket::ReadData: %s errored with: %i (%s)", GetRemoteIpAddress().to_string().c_str(), error.value(), error.message().c_str());
+ TC_LOG_DEBUG("network", "Socket::ReadData: %s errored with: %i (%s)", GetRemoteIpAddress().to_string().c_str(), error.value(),
+ error.message().c_str());
CloseSocket();
}
@@ -97,32 +122,93 @@ public:
void AsyncWrite(WritePacketType data)
{
- boost::asio::async_write(_socket, boost::asio::buffer(data), std::bind(&Socket<T, PacketType>::WriteHandler, this->shared_from_this(), std::placeholders::_1,
- std::placeholders::_2));
+ boost::asio::async_write(_socket, boost::asio::buffer(data), std::bind(&Socket<T, PacketType>::WriteHandler, this->shared_from_this(),
+ std::placeholders::_1, std::placeholders::_2));
}
- bool IsOpen() const { return _socket.is_open(); }
- void CloseSocket()
+ bool IsOpen() const { return !_closed; }
+
+ virtual void CloseSocket()
{
- boost::system::error_code error;
- _socket.close(error);
- if (error)
- TC_LOG_DEBUG("network", "Socket::CloseSocket: %s errored when closing socket: %i (%s)", GetRemoteIpAddress().to_string().c_str(),
- error.value(), error.message().c_str());
+ if (_closed.exchange(true))
+ return;
+
+ boost::system::error_code shutdownError;
+ _socket.shutdown(boost::asio::socket_base::shutdown_send, shutdownError);
+ if (shutdownError)
+ TC_LOG_DEBUG("network", "Socket::CloseSocket: %s errored when shutting down socket: %i (%s)", GetRemoteIpAddress().to_string().c_str(),
+ shutdownError.value(), shutdownError.message().c_str());
}
- uint8* GetReadBuffer() { return _readBuffer; }
+ virtual bool IsHeaderReady() const { return _readHeaderBuffer.IsMessageReady(); }
+ virtual bool IsDataReady() const { return _readDataBuffer.IsMessageReady(); }
+
+ uint8* GetHeaderBuffer() { return _readHeaderBuffer.Data(); }
+ uint8* GetDataBuffer() { return _readDataBuffer.Data(); }
+
+ size_t GetHeaderSize() const { return _readHeaderBuffer.GetSize(); }
+ size_t GetDataSize() const { return _readDataBuffer.GetSize(); }
+
+ MessageBuffer&& MoveHeader() { return std::move(_readHeaderBuffer); }
+ MessageBuffer&& MoveData() { return std::move(_readDataBuffer); }
protected:
- virtual void ReadHeaderHandler(boost::system::error_code error, size_t transferedBytes) = 0;
- virtual void ReadDataHandler(boost::system::error_code error, size_t transferedBytes) = 0;
+ virtual void ReadHeaderHandler() = 0;
+ virtual void ReadDataHandler() = 0;
std::mutex _writeLock;
std::queue<PacketType> _writeQueue;
private:
- void ReadHeaderHandlerInternal(boost::system::error_code error, size_t transferedBytes) { ReadHeaderHandler(error, transferedBytes); }
- void ReadDataHandlerInternal(boost::system::error_code error, size_t transferedBytes) { ReadDataHandler(error, transferedBytes); }
+ void AsyncReadMissingHeaderData()
+ {
+ _socket.async_read_some(boost::asio::buffer(_readHeaderBuffer.GetWritePointer(), std::min<std::size_t>(READ_BLOCK_SIZE, _readHeaderBuffer.GetMissingSize())),
+ std::bind(&Socket<T, PacketType>::ReadHeaderHandlerInternal, this->shared_from_this(), std::placeholders::_1, std::placeholders::_2));
+ }
+
+ void AsyncReadMissingData()
+ {
+ _socket.async_read_some(boost::asio::buffer(_readDataBuffer.GetWritePointer(), std::min<std::size_t>(READ_BLOCK_SIZE, _readDataBuffer.GetMissingSize())),
+ std::bind(&Socket<T, PacketType>::ReadDataHandlerInternal, this->shared_from_this(), std::placeholders::_1, std::placeholders::_2));
+ }
+
+ void ReadHeaderHandlerInternal(boost::system::error_code error, size_t transferredBytes)
+ {
+ if (error)
+ {
+ CloseSocket();
+ return;
+ }
+
+ _readHeaderBuffer.WriteCompleted(transferredBytes);
+ if (!IsHeaderReady())
+ {
+ // incomplete, read more
+ AsyncReadMissingHeaderData();
+ return;
+ }
+
+ ReadHeaderHandler();
+ }
+
+ void ReadDataHandlerInternal(boost::system::error_code error, size_t transferredBytes)
+ {
+ if (error)
+ {
+ CloseSocket();
+ return;
+ }
+
+ _readDataBuffer.WriteCompleted(transferredBytes);
+ if (!IsDataReady())
+ {
+ // incomplete, read more
+ AsyncReadMissingData();
+ return;
+ }
+
+ ReadDataHandler();
+ }
void WriteHandler(boost::system::error_code error, size_t /*transferedBytes*/)
{
@@ -148,9 +234,13 @@ private:
tcp::socket _socket;
- uint8 _readBuffer[4096];
+ boost::asio::ip::address _remoteAddress;
+ uint16 _remotePort;
+
+ MessageBuffer _readHeaderBuffer;
+ MessageBuffer _readDataBuffer;
- std::size_t _headerSize;
+ std::atomic<bool> _closed;
};
#endif // __SOCKET_H__
diff --git a/src/server/shared/Packets/ByteBuffer.cpp b/src/server/shared/Packets/ByteBuffer.cpp
index 0a911492f85..3785d1c29fa 100644
--- a/src/server/shared/Packets/ByteBuffer.cpp
+++ b/src/server/shared/Packets/ByteBuffer.cpp
@@ -17,11 +17,16 @@
*/
#include "ByteBuffer.h"
+#include "MessageBuffer.h"
#include "Common.h"
#include "Log.h"
#include <sstream>
+ByteBuffer::ByteBuffer(MessageBuffer&& buffer) : _rpos(0), _wpos(0), _storage(buffer.Move())
+{
+}
+
ByteBufferPositionException::ByteBufferPositionException(bool add, size_t pos,
size_t size, size_t valueSize)
{
@@ -69,8 +74,8 @@ void ByteBuffer::textlike() const
o << "STORAGE_SIZE: " << size();
for (uint32 i = 0; i < size(); ++i)
{
- char buf[1];
- snprintf(buf, 1, "%c", read<uint8>(i));
+ char buf[2];
+ snprintf(buf, 2, "%c", read<uint8>(i));
o << buf;
}
o << " ";
@@ -90,7 +95,7 @@ void ByteBuffer::hexlike() const
for (uint32 i = 0; i < size(); ++i)
{
char buf[3];
- snprintf(buf, 1, "%2X ", read<uint8>(i));
+ snprintf(buf, 3, "%2X ", read<uint8>(i));
if ((i == (j * 8)) && ((i != (k * 16))))
{
o << "| ";
diff --git a/src/server/shared/Packets/ByteBuffer.h b/src/server/shared/Packets/ByteBuffer.h
index 520c1a85fc2..35b23610143 100644
--- a/src/server/shared/Packets/ByteBuffer.h
+++ b/src/server/shared/Packets/ByteBuffer.h
@@ -34,13 +34,15 @@
#include <math.h>
#include <boost/asio/buffer.hpp>
+class MessageBuffer;
+
// Root of ByteBuffer exception hierarchy
class ByteBufferException : public std::exception
{
public:
~ByteBufferException() throw() { }
- char const* what() const throw() { return msg_.c_str(); }
+ char const* what() const throw() override { return msg_.c_str(); }
protected:
std::string & message() throw() { return msg_; }
@@ -82,14 +84,12 @@ class ByteBuffer
}
ByteBuffer(ByteBuffer&& buf) : _rpos(buf._rpos), _wpos(buf._wpos),
- _bitpos(buf._bitpos), _curbitval(buf._curbitval), _storage(std::move(buf._storage))
- {
- }
+ _bitpos(buf._bitpos), _curbitval(buf._curbitval), _storage(std::move(buf._storage)) { }
ByteBuffer(ByteBuffer const& right) : _rpos(right._rpos), _wpos(right._wpos),
- _bitpos(right._bitpos), _curbitval(right._curbitval), _storage(right._storage)
- {
- }
+ _bitpos(right._bitpos), _curbitval(right._curbitval), _storage(right._storage) { }
+
+ ByteBuffer(MessageBuffer&& buffer);
ByteBuffer& operator=(ByteBuffer const& right)
{
diff --git a/src/server/shared/Threading/ProcessPriority.h b/src/server/shared/Threading/ProcessPriority.h
index 06a5622fb9d..23238c94ace 100644
--- a/src/server/shared/Threading/ProcessPriority.h
+++ b/src/server/shared/Threading/ProcessPriority.h
@@ -32,7 +32,7 @@ void SetProcessPriority(const std::string logChannel)
#if PLATFORM_APPLE
(void)logChannel;
#endif
-
+
#if defined(_WIN32) || defined(__linux__)
///- Handle affinity for multiple processors and process priority
diff --git a/src/server/shared/Threading/ProducerConsumerQueue.h b/src/server/shared/Threading/ProducerConsumerQueue.h
index 1fee1d0685f..accb0aebb11 100644
--- a/src/server/shared/Threading/ProducerConsumerQueue.h
+++ b/src/server/shared/Threading/ProducerConsumerQueue.h
@@ -58,7 +58,7 @@ public:
{
std::lock_guard<std::mutex> lock(_queueLock);
- if (_queue.empty())
+ if (_queue.empty() || _shutdown)
return false;
value = _queue.front();
@@ -77,7 +77,7 @@ public:
_condition.wait(lock);
}
- if (_queue.empty())
+ if (_queue.empty() || _shutdown)
return;
value = _queue.front();
diff --git a/src/server/shared/Utilities/Util.cpp b/src/server/shared/Utilities/Util.cpp
index a5279eae03f..0bdc1fb714c 100644
--- a/src/server/shared/Utilities/Util.cpp
+++ b/src/server/shared/Utilities/Util.cpp
@@ -18,13 +18,14 @@
#include "Util.h"
#include "Common.h"
+#include "CompilerDefs.h"
#include "utf8.h"
#include "SFMT.h"
#include "Errors.h" // for ASSERT
#include <stdarg.h>
#include <boost/thread/tss.hpp>
-#if PLATFORM == PLATFORM_UNIX
+#if COMPILER == COMPILER_GNU
#include <sys/socket.h>
#include <netinet/in.h>
#include <arpa/inet.h>