mirror of
https://github.com/TrinityCore/TrinityCore.git
synced 2026-01-28 21:02:14 +01:00
Merge branch 'master' of https://github.com/TrinityCore/TrinityCore into 4.3.4
Conflicts: src/server/authserver/Main.cpp src/server/authserver/Server/AuthSession.cpp src/server/authserver/Server/AuthSession.h src/server/game/Server/WorldSocket.cpp src/server/game/Server/WorldSocket.h src/server/scripts/World/npcs_special.cpp
This commit is contained in:
@@ -57,14 +57,14 @@ m_connectionFlags(CONNECTION_ASYNC)
|
||||
|
||||
MySQLConnection::~MySQLConnection()
|
||||
{
|
||||
if (m_worker)
|
||||
delete m_worker;
|
||||
|
||||
for (size_t i = 0; i < m_stmts.size(); ++i)
|
||||
delete m_stmts[i];
|
||||
|
||||
if (m_Mysql)
|
||||
mysql_close(m_Mysql);
|
||||
|
||||
if (m_worker)
|
||||
delete m_worker;
|
||||
}
|
||||
|
||||
void MySQLConnection::Close()
|
||||
|
||||
@@ -23,51 +23,68 @@
|
||||
|
||||
using boost::asio::ip::tcp;
|
||||
|
||||
template <class T>
|
||||
class AsyncAcceptor
|
||||
{
|
||||
public:
|
||||
AsyncAcceptor(boost::asio::io_service& ioService, std::string bindIp, int port) :
|
||||
typedef void(*ManagerAcceptHandler)(tcp::socket&& newSocket);
|
||||
|
||||
AsyncAcceptor(boost::asio::io_service& ioService, std::string const& bindIp, uint16 port) :
|
||||
_acceptor(ioService, tcp::endpoint(boost::asio::ip::address::from_string(bindIp), port)),
|
||||
_socket(ioService)
|
||||
{
|
||||
AsyncAccept();
|
||||
};
|
||||
}
|
||||
|
||||
AsyncAcceptor(boost::asio::io_service& ioService, std::string bindIp, int port, bool tcpNoDelay) :
|
||||
_acceptor(ioService, tcp::endpoint(boost::asio::ip::address::from_string(bindIp), port)),
|
||||
_socket(ioService)
|
||||
template <class T>
|
||||
void AsyncAccept();
|
||||
|
||||
void AsyncAcceptManaged(ManagerAcceptHandler mgrHandler)
|
||||
{
|
||||
_acceptor.set_option(boost::asio::ip::tcp::no_delay(tcpNoDelay));
|
||||
|
||||
AsyncAccept();
|
||||
};
|
||||
|
||||
private:
|
||||
void AsyncAccept()
|
||||
{
|
||||
_acceptor.async_accept(_socket, [this](boost::system::error_code error)
|
||||
_acceptor.async_accept(_socket, [this, mgrHandler](boost::system::error_code error)
|
||||
{
|
||||
if (!error)
|
||||
{
|
||||
try
|
||||
{
|
||||
// this-> is required here to fix an segmentation fault in gcc 4.7.2 - reason is lambdas in a templated class
|
||||
std::make_shared<T>(std::move(this->_socket))->Start();
|
||||
_socket.non_blocking(true);
|
||||
|
||||
mgrHandler(std::move(_socket));
|
||||
}
|
||||
catch (boost::system::system_error const& err)
|
||||
{
|
||||
TC_LOG_INFO("network", "Failed to retrieve client's remote address %s", err.what());
|
||||
TC_LOG_INFO("network", "Failed to initialize client's socket %s", err.what());
|
||||
}
|
||||
}
|
||||
|
||||
// lets slap some more this-> on this so we can fix this bug with gcc 4.7.2 throwing internals in yo face
|
||||
this->AsyncAccept();
|
||||
AsyncAcceptManaged(mgrHandler);
|
||||
});
|
||||
}
|
||||
|
||||
private:
|
||||
tcp::acceptor _acceptor;
|
||||
tcp::socket _socket;
|
||||
};
|
||||
|
||||
template<class T>
|
||||
void AsyncAcceptor::AsyncAccept()
|
||||
{
|
||||
_acceptor.async_accept(_socket, [this](boost::system::error_code error)
|
||||
{
|
||||
if (!error)
|
||||
{
|
||||
try
|
||||
{
|
||||
// this-> is required here to fix an segmentation fault in gcc 4.7.2 - reason is lambdas in a templated class
|
||||
std::make_shared<T>(std::move(this->_socket))->Start();
|
||||
}
|
||||
catch (boost::system::system_error const& err)
|
||||
{
|
||||
TC_LOG_INFO("network", "Failed to retrieve client's remote address %s", err.what());
|
||||
}
|
||||
}
|
||||
|
||||
// lets slap some more this-> on this so we can fix this bug with gcc 4.7.2 throwing internals in yo face
|
||||
this->AsyncAccept<T>();
|
||||
});
|
||||
}
|
||||
|
||||
#endif /* __ASYNCACCEPT_H_ */
|
||||
|
||||
@@ -26,42 +26,74 @@ class MessageBuffer
|
||||
typedef std::vector<uint8>::size_type size_type;
|
||||
|
||||
public:
|
||||
MessageBuffer() : _wpos(0), _storage() { }
|
||||
MessageBuffer() : _wpos(0), _rpos(0), _storage()
|
||||
{
|
||||
_storage.resize(4096);
|
||||
}
|
||||
|
||||
MessageBuffer(MessageBuffer const& right) : _wpos(right._wpos), _storage(right._storage) { }
|
||||
explicit MessageBuffer(std::size_t initialSize) : _wpos(0), _rpos(0), _storage()
|
||||
{
|
||||
_storage.resize(initialSize);
|
||||
}
|
||||
|
||||
MessageBuffer(MessageBuffer&& right) : _wpos(right._wpos), _storage(right.Move()) { }
|
||||
MessageBuffer(MessageBuffer const& right) : _wpos(right._wpos), _rpos(right._rpos), _storage(right._storage)
|
||||
{
|
||||
}
|
||||
|
||||
MessageBuffer(MessageBuffer&& right) : _wpos(right._wpos), _rpos(right._rpos), _storage(right.Move()) { }
|
||||
|
||||
void Reset()
|
||||
{
|
||||
_storage.clear();
|
||||
_wpos = 0;
|
||||
_rpos = 0;
|
||||
}
|
||||
|
||||
bool IsMessageReady() const { return _wpos == _storage.size(); }
|
||||
|
||||
size_type GetSize() const { return _storage.size(); }
|
||||
|
||||
size_type GetReadyDataSize() const { return _wpos; }
|
||||
|
||||
size_type GetMissingSize() const { return _storage.size() - _wpos; }
|
||||
|
||||
uint8* Data() { return _storage.data(); }
|
||||
|
||||
void Grow(size_type bytes)
|
||||
void Resize(size_type bytes)
|
||||
{
|
||||
_storage.resize(_storage.size() + bytes);
|
||||
_storage.resize(bytes);
|
||||
}
|
||||
|
||||
uint8* GetBasePointer() { return _storage.data(); }
|
||||
|
||||
uint8* GetReadPointer() { return &_storage[_rpos]; }
|
||||
|
||||
uint8* GetWritePointer() { return &_storage[_wpos]; }
|
||||
|
||||
void ReadCompleted(size_type bytes) { _rpos += bytes; }
|
||||
|
||||
void WriteCompleted(size_type bytes) { _wpos += bytes; }
|
||||
|
||||
void ResetWritePointer() { _wpos = 0; }
|
||||
size_type GetActiveSize() const { return _wpos - _rpos; }
|
||||
|
||||
size_type GetRemainingSpace() const { return _storage.size() - _wpos; }
|
||||
|
||||
size_type GetBufferSize() const { return _storage.size(); }
|
||||
|
||||
// Discards inactive data
|
||||
void Normalize()
|
||||
{
|
||||
if (_rpos)
|
||||
{
|
||||
if (_rpos != _wpos)
|
||||
memmove(GetBasePointer(), GetReadPointer(), GetActiveSize());
|
||||
_wpos -= _rpos;
|
||||
_rpos = 0;
|
||||
}
|
||||
}
|
||||
|
||||
void Write(void* data, std::size_t size)
|
||||
{
|
||||
if (size)
|
||||
{
|
||||
memcpy(GetWritePointer(), data, size);
|
||||
WriteCompleted(size);
|
||||
}
|
||||
}
|
||||
|
||||
std::vector<uint8>&& Move()
|
||||
{
|
||||
_wpos = 0;
|
||||
_rpos = 0;
|
||||
return std::move(_storage);
|
||||
}
|
||||
|
||||
@@ -70,6 +102,7 @@ public:
|
||||
if (this != &right)
|
||||
{
|
||||
_wpos = right._wpos;
|
||||
_rpos = right._rpos;
|
||||
_storage = right._storage;
|
||||
}
|
||||
|
||||
@@ -81,6 +114,7 @@ public:
|
||||
if (this != &right)
|
||||
{
|
||||
_wpos = right._wpos;
|
||||
_rpos = right._rpos;
|
||||
_storage = right.Move();
|
||||
}
|
||||
|
||||
@@ -89,6 +123,7 @@ public:
|
||||
|
||||
private:
|
||||
size_type _wpos;
|
||||
size_type _rpos;
|
||||
std::vector<uint8> _storage;
|
||||
};
|
||||
|
||||
|
||||
166
src/server/shared/Networking/NetworkThread.h
Normal file
166
src/server/shared/Networking/NetworkThread.h
Normal file
@@ -0,0 +1,166 @@
|
||||
/*
|
||||
* Copyright (C) 2008-2014 TrinityCore <http://www.trinitycore.org/>
|
||||
*
|
||||
* This program is free software; you can redistribute it and/or modify it
|
||||
* under the terms of the GNU General Public License as published by the
|
||||
* Free Software Foundation; either version 2 of the License, or (at your
|
||||
* option) any later version.
|
||||
*
|
||||
* This program is distributed in the hope that it will be useful, but WITHOUT
|
||||
* ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or
|
||||
* FITNESS FOR A PARTICULAR PURPOSE. See the GNU General Public License for
|
||||
* more details.
|
||||
*
|
||||
* You should have received a copy of the GNU General Public License along
|
||||
* with this program. If not, see <http://www.gnu.org/licenses/>.
|
||||
*/
|
||||
|
||||
#ifndef NetworkThread_h__
|
||||
#define NetworkThread_h__
|
||||
|
||||
#include "Define.h"
|
||||
#include "Errors.h"
|
||||
#include "Log.h"
|
||||
#include "Timer.h"
|
||||
#include <atomic>
|
||||
#include <chrono>
|
||||
#include <memory>
|
||||
#include <mutex>
|
||||
#include <set>
|
||||
#include <thread>
|
||||
|
||||
template<class SocketType>
|
||||
class NetworkThread
|
||||
{
|
||||
public:
|
||||
NetworkThread() : _connections(0), _stopped(false), _thread(nullptr)
|
||||
{
|
||||
}
|
||||
|
||||
virtual ~NetworkThread()
|
||||
{
|
||||
Stop();
|
||||
if (_thread)
|
||||
{
|
||||
Wait();
|
||||
delete _thread;
|
||||
}
|
||||
}
|
||||
|
||||
void Stop()
|
||||
{
|
||||
_stopped = true;
|
||||
}
|
||||
|
||||
bool Start()
|
||||
{
|
||||
if (_thread)
|
||||
return false;
|
||||
|
||||
_thread = new std::thread(&NetworkThread::Run, this);
|
||||
return true;
|
||||
}
|
||||
|
||||
void Wait()
|
||||
{
|
||||
ASSERT(_thread);
|
||||
|
||||
_thread->join();
|
||||
delete _thread;
|
||||
_thread = nullptr;
|
||||
}
|
||||
|
||||
int32 GetConnectionCount() const
|
||||
{
|
||||
return _connections;
|
||||
}
|
||||
|
||||
virtual void AddSocket(std::shared_ptr<SocketType> sock)
|
||||
{
|
||||
std::lock_guard<std::mutex> lock(_newSocketsLock);
|
||||
|
||||
++_connections;
|
||||
_newSockets.insert(sock);
|
||||
SocketAdded(sock);
|
||||
}
|
||||
|
||||
protected:
|
||||
virtual void SocketAdded(std::shared_ptr<SocketType> /*sock*/) { }
|
||||
virtual void SocketRemoved(std::shared_ptr<SocketType> /*sock*/) { }
|
||||
|
||||
void AddNewSockets()
|
||||
{
|
||||
std::lock_guard<std::mutex> lock(_newSocketsLock);
|
||||
|
||||
if (_newSockets.empty())
|
||||
return;
|
||||
|
||||
for (typename SocketSet::const_iterator i = _newSockets.begin(); i != _newSockets.end(); ++i)
|
||||
{
|
||||
if (!(*i)->IsOpen())
|
||||
{
|
||||
SocketRemoved(*i);
|
||||
|
||||
--_connections;
|
||||
}
|
||||
else
|
||||
_Sockets.insert(*i);
|
||||
}
|
||||
|
||||
_newSockets.clear();
|
||||
}
|
||||
|
||||
void Run()
|
||||
{
|
||||
TC_LOG_DEBUG("misc", "Network Thread Starting");
|
||||
|
||||
typename SocketSet::iterator i, t;
|
||||
|
||||
uint32 sleepTime = 10;
|
||||
uint32 tickStart = 0, diff = 0;
|
||||
while (!_stopped)
|
||||
{
|
||||
std::this_thread::sleep_for(std::chrono::milliseconds(sleepTime));
|
||||
|
||||
tickStart = getMSTime();
|
||||
|
||||
AddNewSockets();
|
||||
|
||||
for (i = _Sockets.begin(); i != _Sockets.end();)
|
||||
{
|
||||
if (!(*i)->Update())
|
||||
{
|
||||
if ((*i)->IsOpen())
|
||||
(*i)->CloseSocket();
|
||||
|
||||
SocketRemoved(*i);
|
||||
|
||||
--_connections;
|
||||
_Sockets.erase(i++);
|
||||
}
|
||||
else
|
||||
++i;
|
||||
}
|
||||
|
||||
diff = GetMSTimeDiffToNow(tickStart);
|
||||
sleepTime = diff > 10 ? 0 : 10 - diff;
|
||||
}
|
||||
|
||||
TC_LOG_DEBUG("misc", "Network Thread exits");
|
||||
}
|
||||
|
||||
private:
|
||||
typedef std::set<std::shared_ptr<SocketType> > SocketSet;
|
||||
|
||||
std::atomic<int32> _connections;
|
||||
std::atomic<bool> _stopped;
|
||||
|
||||
std::thread* _thread;
|
||||
|
||||
SocketSet _Sockets;
|
||||
|
||||
std::mutex _newSocketsLock;
|
||||
SocketSet _newSockets;
|
||||
};
|
||||
|
||||
#endif // NetworkThread_h__
|
||||
@@ -35,32 +35,40 @@ using boost::asio::ip::tcp;
|
||||
|
||||
#define READ_BLOCK_SIZE 4096
|
||||
|
||||
template<class T, class PacketType>
|
||||
template<class T>
|
||||
class Socket : public std::enable_shared_from_this<T>
|
||||
{
|
||||
typedef typename std::conditional<std::is_pointer<PacketType>::value, PacketType, PacketType const&>::type WritePacketType;
|
||||
|
||||
public:
|
||||
Socket(tcp::socket&& socket, std::size_t headerSize) : _socket(std::move(socket)), _remoteAddress(_socket.remote_endpoint().address()),
|
||||
_remotePort(_socket.remote_endpoint().port()), _readHeaderBuffer(), _readDataBuffer(), _closed(false), _closing(false)
|
||||
explicit Socket(tcp::socket&& socket) : _socket(std::move(socket)), _remoteAddress(_socket.remote_endpoint().address()),
|
||||
_remotePort(_socket.remote_endpoint().port()), _readBuffer(), _closed(false), _closing(false), _isWritingAsync(false)
|
||||
{
|
||||
_readHeaderBuffer.Grow(headerSize);
|
||||
_readBuffer.Resize(READ_BLOCK_SIZE);
|
||||
}
|
||||
|
||||
virtual ~Socket()
|
||||
{
|
||||
boost::system::error_code error;
|
||||
_socket.close(error);
|
||||
|
||||
while (!_writeQueue.empty())
|
||||
{
|
||||
DeletePacket(_writeQueue.front());
|
||||
_writeQueue.pop();
|
||||
}
|
||||
}
|
||||
|
||||
virtual void Start() = 0;
|
||||
|
||||
virtual bool Update()
|
||||
{
|
||||
if (!IsOpen())
|
||||
return false;
|
||||
|
||||
#ifndef BOOST_ASIO_HAS_IOCP
|
||||
if (_isWritingAsync || (!_writeBuffer.GetActiveSize() && _writeQueue.empty()))
|
||||
return true;
|
||||
|
||||
for (; WriteHandler(boost::system::error_code(), 0);)
|
||||
;
|
||||
#endif
|
||||
|
||||
return true;
|
||||
}
|
||||
|
||||
boost::asio::ip::address GetRemoteIpAddress() const
|
||||
{
|
||||
return _remoteAddress;
|
||||
@@ -71,31 +79,14 @@ public:
|
||||
return _remotePort;
|
||||
}
|
||||
|
||||
void AsyncReadHeader()
|
||||
void AsyncRead()
|
||||
{
|
||||
if (!IsOpen())
|
||||
return;
|
||||
|
||||
_readHeaderBuffer.ResetWritePointer();
|
||||
_readDataBuffer.Reset();
|
||||
|
||||
AsyncReadMissingHeaderData();
|
||||
}
|
||||
|
||||
void AsyncReadData(std::size_t size)
|
||||
{
|
||||
if (!IsOpen())
|
||||
return;
|
||||
|
||||
if (!size)
|
||||
{
|
||||
// if this is a packet with 0 length body just invoke handler directly
|
||||
ReadDataHandler();
|
||||
return;
|
||||
}
|
||||
|
||||
_readDataBuffer.Grow(size);
|
||||
AsyncReadMissingData();
|
||||
_readBuffer.Normalize();
|
||||
_socket.async_read_some(boost::asio::buffer(_readBuffer.GetWritePointer(), READ_BLOCK_SIZE),
|
||||
std::bind(&Socket<T>::ReadHandlerInternal, this->shared_from_this(), std::placeholders::_1, std::placeholders::_2));
|
||||
}
|
||||
|
||||
void ReadData(std::size_t size)
|
||||
@@ -105,13 +96,11 @@ public:
|
||||
|
||||
boost::system::error_code error;
|
||||
|
||||
_readDataBuffer.Grow(size);
|
||||
std::size_t bytesRead = boost::asio::read(_socket, boost::asio::buffer(_readBuffer.GetWritePointer(), size), error);
|
||||
|
||||
std::size_t bytesRead = boost::asio::read(_socket, boost::asio::buffer(_readDataBuffer.GetWritePointer(), size), error);
|
||||
_readBuffer.WriteCompleted(bytesRead);
|
||||
|
||||
_readDataBuffer.WriteCompleted(bytesRead);
|
||||
|
||||
if (error || !_readDataBuffer.IsMessageReady())
|
||||
if (error || bytesRead != size)
|
||||
{
|
||||
TC_LOG_DEBUG("network", "Socket::ReadData: %s errored with: %i (%s)", GetRemoteIpAddress().to_string().c_str(), error.value(),
|
||||
error.message().c_str());
|
||||
@@ -120,15 +109,19 @@ public:
|
||||
}
|
||||
}
|
||||
|
||||
void AsyncWrite(WritePacketType data)
|
||||
void QueuePacket(MessageBuffer&& buffer, std::unique_lock<std::mutex>& guard)
|
||||
{
|
||||
boost::asio::async_write(_socket, boost::asio::buffer(data), std::bind(&Socket<T, PacketType>::WriteHandler, this->shared_from_this(),
|
||||
std::placeholders::_1, std::placeholders::_2));
|
||||
|
||||
_writeQueue.push(std::move(buffer));
|
||||
|
||||
#ifdef BOOST_ASIO_HAS_IOCP
|
||||
AsyncProcessQueue(guard);
|
||||
#endif
|
||||
}
|
||||
|
||||
bool IsOpen() const { return !_closed && !_closing; }
|
||||
|
||||
virtual void CloseSocket()
|
||||
void CloseSocket()
|
||||
{
|
||||
if (_closed.exchange(true))
|
||||
return;
|
||||
@@ -143,39 +136,37 @@ public:
|
||||
/// Marks the socket for closing after write buffer becomes empty
|
||||
void DelayedCloseSocket() { _closing = true; }
|
||||
|
||||
virtual bool IsHeaderReady() const { return _readHeaderBuffer.IsMessageReady(); }
|
||||
virtual bool IsDataReady() const { return _readDataBuffer.IsMessageReady(); }
|
||||
|
||||
uint8* GetHeaderBuffer() { return _readHeaderBuffer.Data(); }
|
||||
uint8* GetDataBuffer() { return _readDataBuffer.Data(); }
|
||||
|
||||
size_t GetHeaderSize() const { return _readHeaderBuffer.GetReadyDataSize(); }
|
||||
size_t GetDataSize() const { return _readDataBuffer.GetReadyDataSize(); }
|
||||
|
||||
MessageBuffer&& MoveHeader() { return std::move(_readHeaderBuffer); }
|
||||
MessageBuffer&& MoveData() { return std::move(_readDataBuffer); }
|
||||
MessageBuffer& GetReadBuffer() { return _readBuffer; }
|
||||
|
||||
protected:
|
||||
virtual void ReadHeaderHandler() = 0;
|
||||
virtual void ReadDataHandler() = 0;
|
||||
virtual void ReadHandler() = 0;
|
||||
|
||||
bool AsyncProcessQueue(std::unique_lock<std::mutex>&)
|
||||
{
|
||||
if (_isWritingAsync)
|
||||
return true;
|
||||
|
||||
_isWritingAsync = true;
|
||||
|
||||
#ifdef BOOST_ASIO_HAS_IOCP
|
||||
MessageBuffer& buffer = _writeQueue.front();
|
||||
_socket.async_write_some(boost::asio::buffer(buffer.GetReadPointer(), buffer.GetActiveSize()), std::bind(&Socket<T>::WriteHandler,
|
||||
this->shared_from_this(), std::placeholders::_1, std::placeholders::_2));
|
||||
#else
|
||||
_socket.async_write_some(boost::asio::null_buffers(), std::bind(&Socket<T>::WriteHandler, this->shared_from_this(), std::placeholders::_1, std::placeholders::_2));
|
||||
#endif
|
||||
|
||||
return true;
|
||||
}
|
||||
|
||||
std::mutex _writeLock;
|
||||
std::queue<PacketType> _writeQueue;
|
||||
std::queue<MessageBuffer> _writeQueue;
|
||||
#ifndef BOOST_ASIO_HAS_IOCP
|
||||
MessageBuffer _writeBuffer;
|
||||
#endif
|
||||
|
||||
private:
|
||||
void AsyncReadMissingHeaderData()
|
||||
{
|
||||
_socket.async_read_some(boost::asio::buffer(_readHeaderBuffer.GetWritePointer(), std::min<std::size_t>(READ_BLOCK_SIZE, _readHeaderBuffer.GetMissingSize())),
|
||||
std::bind(&Socket<T, PacketType>::ReadHeaderHandlerInternal, this->shared_from_this(), std::placeholders::_1, std::placeholders::_2));
|
||||
}
|
||||
|
||||
void AsyncReadMissingData()
|
||||
{
|
||||
_socket.async_read_some(boost::asio::buffer(_readDataBuffer.GetWritePointer(), std::min<std::size_t>(READ_BLOCK_SIZE, _readDataBuffer.GetMissingSize())),
|
||||
std::bind(&Socket<T, PacketType>::ReadDataHandlerInternal, this->shared_from_this(), std::placeholders::_1, std::placeholders::_2));
|
||||
}
|
||||
|
||||
void ReadHeaderHandlerInternal(boost::system::error_code error, size_t transferredBytes)
|
||||
void ReadHandlerInternal(boost::system::error_code error, size_t transferredBytes)
|
||||
{
|
||||
if (error)
|
||||
{
|
||||
@@ -183,47 +174,25 @@ private:
|
||||
return;
|
||||
}
|
||||
|
||||
_readHeaderBuffer.WriteCompleted(transferredBytes);
|
||||
if (!IsHeaderReady())
|
||||
{
|
||||
// incomplete, read more
|
||||
AsyncReadMissingHeaderData();
|
||||
return;
|
||||
}
|
||||
|
||||
ReadHeaderHandler();
|
||||
_readBuffer.WriteCompleted(transferredBytes);
|
||||
ReadHandler();
|
||||
}
|
||||
|
||||
void ReadDataHandlerInternal(boost::system::error_code error, size_t transferredBytes)
|
||||
{
|
||||
if (error)
|
||||
{
|
||||
CloseSocket();
|
||||
return;
|
||||
}
|
||||
#ifdef BOOST_ASIO_HAS_IOCP
|
||||
|
||||
_readDataBuffer.WriteCompleted(transferredBytes);
|
||||
if (!IsDataReady())
|
||||
{
|
||||
// incomplete, read more
|
||||
AsyncReadMissingData();
|
||||
return;
|
||||
}
|
||||
|
||||
ReadDataHandler();
|
||||
}
|
||||
|
||||
void WriteHandler(boost::system::error_code error, size_t /*transferedBytes*/)
|
||||
void WriteHandler(boost::system::error_code error, std::size_t transferedBytes)
|
||||
{
|
||||
if (!error)
|
||||
{
|
||||
std::lock_guard<std::mutex> deleteGuard(_writeLock);
|
||||
std::unique_lock<std::mutex> deleteGuard(_writeLock);
|
||||
|
||||
DeletePacket(_writeQueue.front());
|
||||
_writeQueue.pop();
|
||||
_isWritingAsync = false;
|
||||
_writeQueue.front().ReadCompleted(transferedBytes);
|
||||
if (!_writeQueue.front().GetActiveSize())
|
||||
_writeQueue.pop();
|
||||
|
||||
if (!_writeQueue.empty())
|
||||
AsyncWrite(_writeQueue.front());
|
||||
AsyncProcessQueue(deleteGuard);
|
||||
else if (_closing)
|
||||
CloseSocket();
|
||||
}
|
||||
@@ -231,22 +200,104 @@ private:
|
||||
CloseSocket();
|
||||
}
|
||||
|
||||
template<typename Q = PacketType>
|
||||
typename std::enable_if<std::is_pointer<Q>::value>::type DeletePacket(PacketType& packet) { delete packet; }
|
||||
#else
|
||||
|
||||
template<typename Q = PacketType>
|
||||
typename std::enable_if<!std::is_pointer<Q>::value>::type DeletePacket(PacketType const& /*packet*/) { }
|
||||
bool WriteHandler(boost::system::error_code /*error*/, std::size_t /*transferedBytes*/)
|
||||
{
|
||||
std::unique_lock<std::mutex> guard(_writeLock, std::try_to_lock);
|
||||
if (!guard)
|
||||
return false;
|
||||
|
||||
if (!IsOpen())
|
||||
return false;
|
||||
|
||||
std::size_t bytesToSend = _writeBuffer.GetActiveSize();
|
||||
|
||||
if (bytesToSend == 0)
|
||||
return HandleQueue(guard);
|
||||
|
||||
boost::system::error_code error;
|
||||
std::size_t bytesWritten = _socket.write_some(boost::asio::buffer(_writeBuffer.GetReadPointer(), bytesToSend), error);
|
||||
|
||||
if (error)
|
||||
{
|
||||
if (error == boost::asio::error::would_block || error == boost::asio::error::try_again)
|
||||
return AsyncProcessQueue(guard);
|
||||
|
||||
return false;
|
||||
}
|
||||
else if (bytesWritten == 0)
|
||||
return false;
|
||||
else if (bytesWritten < bytesToSend) //now n > 0
|
||||
{
|
||||
_writeBuffer.ReadCompleted(bytesWritten);
|
||||
_writeBuffer.Normalize();
|
||||
return AsyncProcessQueue(guard);
|
||||
}
|
||||
|
||||
// now bytesWritten == bytesToSend
|
||||
_writeBuffer.Reset();
|
||||
|
||||
return HandleQueue(guard);
|
||||
}
|
||||
|
||||
bool HandleQueue(std::unique_lock<std::mutex>& guard)
|
||||
{
|
||||
if (_writeQueue.empty())
|
||||
{
|
||||
_isWritingAsync = false;
|
||||
return false;
|
||||
}
|
||||
|
||||
MessageBuffer& queuedMessage = _writeQueue.front();
|
||||
|
||||
std::size_t bytesToSend = queuedMessage.GetActiveSize();
|
||||
|
||||
boost::system::error_code error;
|
||||
std::size_t bytesSent = _socket.write_some(boost::asio::buffer(queuedMessage.GetReadPointer(), bytesToSend), error);
|
||||
|
||||
if (error)
|
||||
{
|
||||
if (error == boost::asio::error::would_block || error == boost::asio::error::try_again)
|
||||
return AsyncProcessQueue(guard);
|
||||
|
||||
_writeQueue.pop();
|
||||
return false;
|
||||
}
|
||||
else if (bytesSent == 0)
|
||||
{
|
||||
_writeQueue.pop();
|
||||
return false;
|
||||
}
|
||||
else if (bytesSent < bytesToSend) // now n > 0
|
||||
{
|
||||
queuedMessage.ReadCompleted(bytesSent);
|
||||
return AsyncProcessQueue(guard);
|
||||
}
|
||||
|
||||
_writeQueue.pop();
|
||||
if (_writeQueue.empty())
|
||||
{
|
||||
_isWritingAsync = false;
|
||||
return false;
|
||||
}
|
||||
|
||||
return true;
|
||||
}
|
||||
|
||||
#endif
|
||||
|
||||
tcp::socket _socket;
|
||||
|
||||
boost::asio::ip::address _remoteAddress;
|
||||
uint16 _remotePort;
|
||||
|
||||
MessageBuffer _readHeaderBuffer;
|
||||
MessageBuffer _readDataBuffer;
|
||||
MessageBuffer _readBuffer;
|
||||
|
||||
std::atomic<bool> _closed;
|
||||
std::atomic<bool> _closing;
|
||||
|
||||
bool _isWritingAsync;
|
||||
};
|
||||
|
||||
#endif // __SOCKET_H__
|
||||
|
||||
111
src/server/shared/Networking/SocketMgr.h
Normal file
111
src/server/shared/Networking/SocketMgr.h
Normal file
@@ -0,0 +1,111 @@
|
||||
/*
|
||||
* Copyright (C) 2008-2014 TrinityCore <http://www.trinitycore.org/>
|
||||
*
|
||||
* This program is free software; you can redistribute it and/or modify it
|
||||
* under the terms of the GNU General Public License as published by the
|
||||
* Free Software Foundation; either version 2 of the License, or (at your
|
||||
* option) any later version.
|
||||
*
|
||||
* This program is distributed in the hope that it will be useful, but WITHOUT
|
||||
* ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or
|
||||
* FITNESS FOR A PARTICULAR PURPOSE. See the GNU General Public License for
|
||||
* more details.
|
||||
*
|
||||
* You should have received a copy of the GNU General Public License along
|
||||
* with this program. If not, see <http://www.gnu.org/licenses/>.
|
||||
*/
|
||||
|
||||
#ifndef SocketMgr_h__
|
||||
#define SocketMgr_h__
|
||||
|
||||
#include "AsyncAcceptor.h"
|
||||
#include "Config.h"
|
||||
#include "Errors.h"
|
||||
#include "NetworkThread.h"
|
||||
#include <boost/asio/ip/tcp.hpp>
|
||||
#include <memory>
|
||||
|
||||
using boost::asio::ip::tcp;
|
||||
|
||||
template<class SocketType>
|
||||
class SocketMgr
|
||||
{
|
||||
public:
|
||||
virtual ~SocketMgr()
|
||||
{
|
||||
delete[] _threads;
|
||||
}
|
||||
|
||||
virtual bool StartNetwork(boost::asio::io_service& service, std::string const& bindIp, uint16 port)
|
||||
{
|
||||
_threadCount = sConfigMgr->GetIntDefault("Network.Threads", 1);
|
||||
|
||||
if (_threadCount <= 0)
|
||||
{
|
||||
TC_LOG_ERROR("misc", "Network.Threads is wrong in your config file");
|
||||
return false;
|
||||
}
|
||||
|
||||
_acceptor = new AsyncAcceptor(service, bindIp, port);
|
||||
_threads = CreateThreads();
|
||||
|
||||
ASSERT(_threads);
|
||||
|
||||
for (int32 i = 0; i < _threadCount; ++i)
|
||||
_threads[i].Start();
|
||||
|
||||
return true;
|
||||
}
|
||||
|
||||
virtual void StopNetwork()
|
||||
{
|
||||
if (_threadCount != 0)
|
||||
for (int32 i = 0; i < _threadCount; ++i)
|
||||
_threads[i].Stop();
|
||||
|
||||
Wait();
|
||||
}
|
||||
|
||||
void Wait()
|
||||
{
|
||||
if (_threadCount != 0)
|
||||
for (int32 i = 0; i < _threadCount; ++i)
|
||||
_threads[i].Wait();
|
||||
}
|
||||
|
||||
virtual void OnSocketOpen(tcp::socket&& sock)
|
||||
{
|
||||
size_t min = 0;
|
||||
|
||||
for (int32 i = 1; i < _threadCount; ++i)
|
||||
if (_threads[i].GetConnectionCount() < _threads[min].GetConnectionCount())
|
||||
min = i;
|
||||
|
||||
try
|
||||
{
|
||||
std::shared_ptr<SocketType> newSocket = std::make_shared<SocketType>(std::move(sock));
|
||||
newSocket->Start();
|
||||
|
||||
_threads[min].AddSocket(newSocket);
|
||||
}
|
||||
catch (boost::system::system_error const& err)
|
||||
{
|
||||
TC_LOG_INFO("network", "Failed to retrieve client's remote address %s", err.what());
|
||||
}
|
||||
}
|
||||
|
||||
int32 GetNetworkThreadCount() const { return _threadCount; }
|
||||
|
||||
protected:
|
||||
SocketMgr() : _threads(nullptr), _threadCount(1)
|
||||
{
|
||||
}
|
||||
|
||||
virtual NetworkThread<SocketType>* CreateThreads() const = 0;
|
||||
|
||||
AsyncAcceptor* _acceptor;
|
||||
NetworkThread<SocketType>* _threads;
|
||||
int32 _threadCount;
|
||||
};
|
||||
|
||||
#endif // SocketMgr_h__
|
||||
Reference in New Issue
Block a user