Core/Networking: Rewrite networking threading model

Each network thread has its own io_service - this means that all operations on a given socket except queueing packets run from a single thread, removing the need for locking
Sending packets now writes to a lockfree intermediate queue directly, encryption is applied in network thread if it was required at the time of sending the packet
This commit is contained in:
Shauren
2016-02-19 19:23:04 +01:00
parent 06ec1b8fe8
commit 97a79af470
16 changed files with 325 additions and 210 deletions

View File

@@ -20,34 +20,39 @@
#include "Log.h"
#include <boost/asio.hpp>
#include <functional>
using boost::asio::ip::tcp;
class AsyncAcceptor
{
public:
typedef void(*ManagerAcceptHandler)(tcp::socket&& newSocket);
typedef void(*AcceptCallback)(tcp::socket&& newSocket, uint32 threadIndex);
AsyncAcceptor(boost::asio::io_service& ioService, std::string const& bindIp, uint16 port) :
_acceptor(ioService, tcp::endpoint(boost::asio::ip::address::from_string(bindIp), port)),
_socket(ioService), _closed(false)
_socket(ioService), _closed(false), _socketFactory(std::bind(&AsyncAcceptor::DefeaultSocketFactory, this))
{
}
template <class T>
template<class T>
void AsyncAccept();
void AsyncAcceptManaged(ManagerAcceptHandler mgrHandler)
template<AcceptCallback acceptCallback>
void AsyncAcceptWithCallback()
{
_acceptor.async_accept(_socket, [this, mgrHandler](boost::system::error_code error)
tcp::socket* socket;
uint32 threadIndex;
std::tie(socket, threadIndex) = _socketFactory();
_acceptor.async_accept(*socket, [this, socket, threadIndex](boost::system::error_code error)
{
if (!error)
{
try
{
_socket.non_blocking(true);
socket->non_blocking(true);
mgrHandler(std::move(_socket));
acceptCallback(std::move(*socket), threadIndex);
}
catch (boost::system::system_error const& err)
{
@@ -56,7 +61,7 @@ public:
}
if (!_closed)
AsyncAcceptManaged(mgrHandler);
this->AsyncAcceptWithCallback<acceptCallback>();
});
}
@@ -69,10 +74,15 @@ public:
_acceptor.close(err);
}
void SetSocketFactory(std::function<std::pair<tcp::socket*, uint32>()> func) { _socketFactory = func; }
private:
std::pair<tcp::socket*, uint32> DefeaultSocketFactory() { return std::make_pair(&_socket, 0); }
tcp::acceptor _acceptor;
tcp::socket _socket;
std::atomic<bool> _closed;
std::function<std::pair<tcp::socket*, uint32>()> _socketFactory;
};
template<class T>

View File

@@ -105,7 +105,7 @@ public:
return std::move(_storage);
}
MessageBuffer& operator=(MessageBuffer& right)
MessageBuffer& operator=(MessageBuffer const& right)
{
if (this != &right)
{

View File

@@ -22,6 +22,8 @@
#include "Errors.h"
#include "Log.h"
#include "Timer.h"
#include <boost/asio/ip/tcp.hpp>
#include <boost/asio/deadline_timer.hpp>
#include <atomic>
#include <chrono>
#include <memory>
@@ -29,11 +31,14 @@
#include <set>
#include <thread>
using boost::asio::ip::tcp;
template<class SocketType>
class NetworkThread
{
public:
NetworkThread() : _connections(0), _stopped(false), _thread(nullptr)
NetworkThread() : _connections(0), _stopped(false), _thread(nullptr),
_acceptSocket(_io_service), _updateTimer(_io_service)
{
}
@@ -50,6 +55,7 @@ public:
void Stop()
{
_stopped = true;
_io_service.stop();
}
bool Start()
@@ -80,10 +86,12 @@ public:
std::lock_guard<std::mutex> lock(_newSocketsLock);
++_connections;
_newSockets.insert(sock);
_newSockets.push_back(sock);
SocketAdded(sock);
}
tcp::socket* GetSocketForAccept() { return &_acceptSocket; }
protected:
virtual void SocketAdded(std::shared_ptr<SocketType> /*sock*/) { }
virtual void SocketRemoved(std::shared_ptr<SocketType> /*sock*/) { }
@@ -95,16 +103,15 @@ protected:
if (_newSockets.empty())
return;
for (typename SocketSet::const_iterator i = _newSockets.begin(); i != _newSockets.end(); ++i)
for (std::shared_ptr<SocketType> sock : _newSockets)
{
if (!(*i)->IsOpen())
if (!sock->IsOpen())
{
SocketRemoved(*i);
SocketRemoved(sock);
--_connections;
}
else
_Sockets.insert(*i);
_sockets.push_back(sock);
}
_newSockets.clear();
@@ -114,55 +121,58 @@ protected:
{
TC_LOG_DEBUG("misc", "Network Thread Starting");
typename SocketSet::iterator i, t;
uint32 sleepTime = 10;
uint32 tickStart = 0, diff = 0;
while (!_stopped)
{
std::this_thread::sleep_for(std::chrono::milliseconds(sleepTime));
tickStart = getMSTime();
AddNewSockets();
for (i = _Sockets.begin(); i != _Sockets.end();)
{
if (!(*i)->Update())
{
if ((*i)->IsOpen())
(*i)->CloseSocket();
SocketRemoved(*i);
--_connections;
_Sockets.erase(i++);
}
else
++i;
}
diff = GetMSTimeDiffToNow(tickStart);
sleepTime = diff > 10 ? 0 : 10 - diff;
}
_updateTimer.expires_from_now(boost::posix_time::milliseconds(10));
_updateTimer.async_wait(std::bind(&NetworkThread<SocketType>::Update, this));
_io_service.run();
TC_LOG_DEBUG("misc", "Network Thread exits");
_newSockets.clear();
_Sockets.clear();
_sockets.clear();
}
void Update()
{
if (_stopped)
return;
_updateTimer.expires_from_now(boost::posix_time::milliseconds(10));
_updateTimer.async_wait(std::bind(&NetworkThread<SocketType>::Update, this));
AddNewSockets();
_sockets.erase(std::remove_if(_sockets.begin(), _sockets.end(), [this](std::shared_ptr<SocketType> sock)
{
if (!sock->Update())
{
if (sock->IsOpen())
sock->CloseSocket();
SocketRemoved(sock);
--_connections;
return true;
}
return false;
}), _sockets.end());
}
private:
typedef std::set<std::shared_ptr<SocketType> > SocketSet;
typedef std::vector<std::shared_ptr<SocketType>> SocketContainer;
std::atomic<int32> _connections;
std::atomic<bool> _stopped;
std::thread* _thread;
SocketSet _Sockets;
SocketContainer _sockets;
std::mutex _newSocketsLock;
SocketSet _newSockets;
SocketContainer _newSockets;
boost::asio::io_service _io_service;
tcp::socket _acceptSocket;
boost::asio::deadline_timer _updateTimer;
};
#endif // NetworkThread_h__

View File

@@ -21,15 +21,11 @@
#include "MessageBuffer.h"
#include "Log.h"
#include <atomic>
#include <vector>
#include <mutex>
#include <queue>
#include <memory>
#include <functional>
#include <type_traits>
#include <boost/asio/ip/tcp.hpp>
#include <boost/asio/write.hpp>
#include <boost/asio/read.hpp>
using boost::asio::ip::tcp;
@@ -63,14 +59,10 @@ public:
return false;
#ifndef TC_SOCKET_USE_IOCP
std::unique_lock<std::mutex> guard(_writeLock);
if (!guard)
if (_isWritingAsync || _writeQueue.empty())
return true;
if (_isWritingAsync || (!_writeBuffer.GetActiveSize() && _writeQueue.empty()))
return true;
for (; WriteHandler(guard);)
for (; HandleQueue();)
;
#endif
@@ -98,14 +90,12 @@ public:
std::bind(&Socket<T>::ReadHandlerInternal, this->shared_from_this(), std::placeholders::_1, std::placeholders::_2));
}
void QueuePacket(MessageBuffer&& buffer, std::unique_lock<std::mutex>& guard)
void QueuePacket(MessageBuffer&& buffer)
{
_writeQueue.push(std::move(buffer));
#ifdef TC_SOCKET_USE_IOCP
AsyncProcessQueue(guard);
#else
(void)guard;
AsyncProcessQueue();
#endif
}
@@ -135,7 +125,7 @@ protected:
virtual void ReadHandler() = 0;
bool AsyncProcessQueue(std::unique_lock<std::mutex>&)
bool AsyncProcessQueue()
{
if (_isWritingAsync)
return false;
@@ -157,19 +147,12 @@ protected:
void SetNoDelay(bool enable)
{
boost::system::error_code err;
_socket.set_option(boost::asio::ip::tcp::no_delay(enable), err);
_socket.set_option(tcp::no_delay(enable), err);
if (err)
TC_LOG_DEBUG("network", "Socket::SetNoDelay: failed to set_option(boost::asio::ip::tcp::no_delay) for %s - %d (%s)",
GetRemoteIpAddress().to_string().c_str(), err.value(), err.message().c_str());
}
std::mutex _writeLock;
std::queue<MessageBuffer> _writeQueue;
#ifndef TC_SOCKET_USE_IOCP
MessageBuffer _writeBuffer;
#endif
boost::asio::io_service& io_service() { return _socket.get_io_service(); }
private:
void ReadHandlerInternal(boost::system::error_code error, size_t transferredBytes)
@@ -190,15 +173,13 @@ private:
{
if (!error)
{
std::unique_lock<std::mutex> deleteGuard(_writeLock);
_isWritingAsync = false;
_writeQueue.front().ReadCompleted(transferedBytes);
if (!_writeQueue.front().GetActiveSize())
_writeQueue.pop();
if (!_writeQueue.empty())
AsyncProcessQueue(deleteGuard);
AsyncProcessQueue();
else if (_closing)
CloseSocket();
}
@@ -210,48 +191,15 @@ private:
void WriteHandlerWrapper(boost::system::error_code /*error*/, std::size_t /*transferedBytes*/)
{
std::unique_lock<std::mutex> guard(_writeLock);
_isWritingAsync = false;
WriteHandler(guard);
HandleQueue();
}
bool WriteHandler(std::unique_lock<std::mutex>& guard)
bool HandleQueue()
{
if (!IsOpen())
return false;
std::size_t bytesToSend = _writeBuffer.GetActiveSize();
if (bytesToSend == 0)
return HandleQueue(guard);
boost::system::error_code error;
std::size_t bytesWritten = _socket.write_some(boost::asio::buffer(_writeBuffer.GetReadPointer(), bytesToSend), error);
if (error)
{
if (error == boost::asio::error::would_block || error == boost::asio::error::try_again)
return AsyncProcessQueue(guard);
return false;
}
else if (bytesWritten == 0)
return false;
else if (bytesWritten < bytesToSend)
{
_writeBuffer.ReadCompleted(bytesWritten);
_writeBuffer.Normalize();
return AsyncProcessQueue(guard);
}
// now bytesWritten == bytesToSend
_writeBuffer.Reset();
return HandleQueue(guard);
}
bool HandleQueue(std::unique_lock<std::mutex>& guard)
{
if (_writeQueue.empty())
return false;
@@ -265,7 +213,7 @@ private:
if (error)
{
if (error == boost::asio::error::would_block || error == boost::asio::error::try_again)
return AsyncProcessQueue(guard);
return AsyncProcessQueue();
_writeQueue.pop();
return false;
@@ -278,7 +226,7 @@ private:
else if (bytesSent < bytesToSend) // now n > 0
{
queuedMessage.ReadCompleted(bytesSent);
return AsyncProcessQueue(guard);
return AsyncProcessQueue();
}
_writeQueue.pop();
@@ -293,6 +241,7 @@ private:
uint16 _remotePort;
MessageBuffer _readBuffer;
std::queue<MessageBuffer> _writeQueue;
std::atomic<bool> _closed;
std::atomic<bool> _closing;

View File

@@ -90,20 +90,14 @@ public:
_threads[i].Wait();
}
virtual void OnSocketOpen(tcp::socket&& sock)
virtual void OnSocketOpen(tcp::socket&& sock, uint32 threadIndex)
{
size_t min = 0;
for (int32 i = 1; i < _threadCount; ++i)
if (_threads[i].GetConnectionCount() < _threads[min].GetConnectionCount())
min = i;
try
{
std::shared_ptr<SocketType> newSocket = std::make_shared<SocketType>(std::move(sock));
newSocket->Start();
_threads[min].AddSocket(newSocket);
_threads[threadIndex].AddSocket(newSocket);
}
catch (boost::system::system_error const& err)
{
@@ -113,6 +107,23 @@ public:
int32 GetNetworkThreadCount() const { return _threadCount; }
uint32 SelectThreadWithMinConnections() const
{
uint32 min = 0;
for (int32 i = 1; i < _threadCount; ++i)
if (_threads[i].GetConnectionCount() < _threads[min].GetConnectionCount())
min = i;
return min;
}
std::pair<tcp::socket*, uint32> GetSocketForAccept()
{
uint32 threadIndex = SelectThreadWithMinConnections();
return std::make_pair(_threads[threadIndex].GetSocketForAccept(), threadIndex);
}
protected:
SocketMgr() : _acceptor(nullptr), _threads(nullptr), _threadCount(1)
{