From 97a79af4701621ec04b88c8b548dbc35d120e99e Mon Sep 17 00:00:00 2001 From: Shauren Date: Fri, 19 Feb 2016 19:23:04 +0100 Subject: Core/Networking: Rewrite networking threading model Each network thread has its own io_service - this means that all operations on a given socket except queueing packets run from a single thread, removing the need for locking Sending packets now writes to a lockfree intermediate queue directly, encryption is applied in network thread if it was required at the time of sending the packet --- src/server/shared/Networking/AsyncAcceptor.h | 26 ++++++--- src/server/shared/Networking/MessageBuffer.h | 2 +- src/server/shared/Networking/NetworkThread.h | 86 ++++++++++++++++------------ src/server/shared/Networking/Socket.h | 75 ++++-------------------- src/server/shared/Networking/SocketMgr.h | 27 ++++++--- 5 files changed, 98 insertions(+), 118 deletions(-) (limited to 'src/server/shared/Networking') diff --git a/src/server/shared/Networking/AsyncAcceptor.h b/src/server/shared/Networking/AsyncAcceptor.h index 2fa1e448ff8..d21801a64ac 100644 --- a/src/server/shared/Networking/AsyncAcceptor.h +++ b/src/server/shared/Networking/AsyncAcceptor.h @@ -20,34 +20,39 @@ #include "Log.h" #include +#include using boost::asio::ip::tcp; class AsyncAcceptor { public: - typedef void(*ManagerAcceptHandler)(tcp::socket&& newSocket); + typedef void(*AcceptCallback)(tcp::socket&& newSocket, uint32 threadIndex); AsyncAcceptor(boost::asio::io_service& ioService, std::string const& bindIp, uint16 port) : _acceptor(ioService, tcp::endpoint(boost::asio::ip::address::from_string(bindIp), port)), - _socket(ioService), _closed(false) + _socket(ioService), _closed(false), _socketFactory(std::bind(&AsyncAcceptor::DefeaultSocketFactory, this)) { } - template + template void AsyncAccept(); - void AsyncAcceptManaged(ManagerAcceptHandler mgrHandler) + template + void AsyncAcceptWithCallback() { - _acceptor.async_accept(_socket, [this, mgrHandler](boost::system::error_code error) + tcp::socket* socket; + uint32 threadIndex; + std::tie(socket, threadIndex) = _socketFactory(); + _acceptor.async_accept(*socket, [this, socket, threadIndex](boost::system::error_code error) { if (!error) { try { - _socket.non_blocking(true); + socket->non_blocking(true); - mgrHandler(std::move(_socket)); + acceptCallback(std::move(*socket), threadIndex); } catch (boost::system::system_error const& err) { @@ -56,7 +61,7 @@ public: } if (!_closed) - AsyncAcceptManaged(mgrHandler); + this->AsyncAcceptWithCallback(); }); } @@ -69,10 +74,15 @@ public: _acceptor.close(err); } + void SetSocketFactory(std::function()> func) { _socketFactory = func; } + private: + std::pair DefeaultSocketFactory() { return std::make_pair(&_socket, 0); } + tcp::acceptor _acceptor; tcp::socket _socket; std::atomic _closed; + std::function()> _socketFactory; }; template diff --git a/src/server/shared/Networking/MessageBuffer.h b/src/server/shared/Networking/MessageBuffer.h index a6ed9b31e8f..42b65be8398 100644 --- a/src/server/shared/Networking/MessageBuffer.h +++ b/src/server/shared/Networking/MessageBuffer.h @@ -105,7 +105,7 @@ public: return std::move(_storage); } - MessageBuffer& operator=(MessageBuffer& right) + MessageBuffer& operator=(MessageBuffer const& right) { if (this != &right) { diff --git a/src/server/shared/Networking/NetworkThread.h b/src/server/shared/Networking/NetworkThread.h index e183209e989..5eb2fcb2f6a 100644 --- a/src/server/shared/Networking/NetworkThread.h +++ b/src/server/shared/Networking/NetworkThread.h @@ -22,6 +22,8 @@ #include "Errors.h" #include "Log.h" #include "Timer.h" +#include +#include #include #include #include @@ -29,11 +31,14 @@ #include #include +using boost::asio::ip::tcp; + template class NetworkThread { public: - NetworkThread() : _connections(0), _stopped(false), _thread(nullptr) + NetworkThread() : _connections(0), _stopped(false), _thread(nullptr), + _acceptSocket(_io_service), _updateTimer(_io_service) { } @@ -50,6 +55,7 @@ public: void Stop() { _stopped = true; + _io_service.stop(); } bool Start() @@ -80,10 +86,12 @@ public: std::lock_guard lock(_newSocketsLock); ++_connections; - _newSockets.insert(sock); + _newSockets.push_back(sock); SocketAdded(sock); } + tcp::socket* GetSocketForAccept() { return &_acceptSocket; } + protected: virtual void SocketAdded(std::shared_ptr /*sock*/) { } virtual void SocketRemoved(std::shared_ptr /*sock*/) { } @@ -95,16 +103,15 @@ protected: if (_newSockets.empty()) return; - for (typename SocketSet::const_iterator i = _newSockets.begin(); i != _newSockets.end(); ++i) + for (std::shared_ptr sock : _newSockets) { - if (!(*i)->IsOpen()) + if (!sock->IsOpen()) { - SocketRemoved(*i); - + SocketRemoved(sock); --_connections; } else - _Sockets.insert(*i); + _sockets.push_back(sock); } _newSockets.clear(); @@ -114,55 +121,58 @@ protected: { TC_LOG_DEBUG("misc", "Network Thread Starting"); - typename SocketSet::iterator i, t; + _updateTimer.expires_from_now(boost::posix_time::milliseconds(10)); + _updateTimer.async_wait(std::bind(&NetworkThread::Update, this)); + _io_service.run(); - uint32 sleepTime = 10; - uint32 tickStart = 0, diff = 0; - while (!_stopped) - { - std::this_thread::sleep_for(std::chrono::milliseconds(sleepTime)); + TC_LOG_DEBUG("misc", "Network Thread exits"); + _newSockets.clear(); + _sockets.clear(); + } + + void Update() + { + if (_stopped) + return; - tickStart = getMSTime(); + _updateTimer.expires_from_now(boost::posix_time::milliseconds(10)); + _updateTimer.async_wait(std::bind(&NetworkThread::Update, this)); - AddNewSockets(); + AddNewSockets(); - for (i = _Sockets.begin(); i != _Sockets.end();) + _sockets.erase(std::remove_if(_sockets.begin(), _sockets.end(), [this](std::shared_ptr sock) + { + if (!sock->Update()) { - if (!(*i)->Update()) - { - if ((*i)->IsOpen()) - (*i)->CloseSocket(); - - SocketRemoved(*i); - - --_connections; - _Sockets.erase(i++); - } - else - ++i; - } + if (sock->IsOpen()) + sock->CloseSocket(); - diff = GetMSTimeDiffToNow(tickStart); - sleepTime = diff > 10 ? 0 : 10 - diff; - } + SocketRemoved(sock); - TC_LOG_DEBUG("misc", "Network Thread exits"); - _newSockets.clear(); - _Sockets.clear(); + --_connections; + return true; + } + + return false; + }), _sockets.end()); } private: - typedef std::set > SocketSet; + typedef std::vector> SocketContainer; std::atomic _connections; std::atomic _stopped; std::thread* _thread; - SocketSet _Sockets; + SocketContainer _sockets; std::mutex _newSocketsLock; - SocketSet _newSockets; + SocketContainer _newSockets; + + boost::asio::io_service _io_service; + tcp::socket _acceptSocket; + boost::asio::deadline_timer _updateTimer; }; #endif // NetworkThread_h__ diff --git a/src/server/shared/Networking/Socket.h b/src/server/shared/Networking/Socket.h index 34ee50eb84e..07f427652aa 100644 --- a/src/server/shared/Networking/Socket.h +++ b/src/server/shared/Networking/Socket.h @@ -21,15 +21,11 @@ #include "MessageBuffer.h" #include "Log.h" #include -#include -#include #include #include #include #include #include -#include -#include using boost::asio::ip::tcp; @@ -63,14 +59,10 @@ public: return false; #ifndef TC_SOCKET_USE_IOCP - std::unique_lock guard(_writeLock); - if (!guard) + if (_isWritingAsync || _writeQueue.empty()) return true; - if (_isWritingAsync || (!_writeBuffer.GetActiveSize() && _writeQueue.empty())) - return true; - - for (; WriteHandler(guard);) + for (; HandleQueue();) ; #endif @@ -98,14 +90,12 @@ public: std::bind(&Socket::ReadHandlerInternal, this->shared_from_this(), std::placeholders::_1, std::placeholders::_2)); } - void QueuePacket(MessageBuffer&& buffer, std::unique_lock& guard) + void QueuePacket(MessageBuffer&& buffer) { _writeQueue.push(std::move(buffer)); #ifdef TC_SOCKET_USE_IOCP - AsyncProcessQueue(guard); -#else - (void)guard; + AsyncProcessQueue(); #endif } @@ -135,7 +125,7 @@ protected: virtual void ReadHandler() = 0; - bool AsyncProcessQueue(std::unique_lock&) + bool AsyncProcessQueue() { if (_isWritingAsync) return false; @@ -157,19 +147,12 @@ protected: void SetNoDelay(bool enable) { boost::system::error_code err; - _socket.set_option(boost::asio::ip::tcp::no_delay(enable), err); + _socket.set_option(tcp::no_delay(enable), err); if (err) TC_LOG_DEBUG("network", "Socket::SetNoDelay: failed to set_option(boost::asio::ip::tcp::no_delay) for %s - %d (%s)", GetRemoteIpAddress().to_string().c_str(), err.value(), err.message().c_str()); } - std::mutex _writeLock; - std::queue _writeQueue; -#ifndef TC_SOCKET_USE_IOCP - MessageBuffer _writeBuffer; -#endif - - boost::asio::io_service& io_service() { return _socket.get_io_service(); } private: void ReadHandlerInternal(boost::system::error_code error, size_t transferredBytes) @@ -190,15 +173,13 @@ private: { if (!error) { - std::unique_lock deleteGuard(_writeLock); - _isWritingAsync = false; _writeQueue.front().ReadCompleted(transferedBytes); if (!_writeQueue.front().GetActiveSize()) _writeQueue.pop(); if (!_writeQueue.empty()) - AsyncProcessQueue(deleteGuard); + AsyncProcessQueue(); else if (_closing) CloseSocket(); } @@ -210,48 +191,15 @@ private: void WriteHandlerWrapper(boost::system::error_code /*error*/, std::size_t /*transferedBytes*/) { - std::unique_lock guard(_writeLock); _isWritingAsync = false; - WriteHandler(guard); + HandleQueue(); } - bool WriteHandler(std::unique_lock& guard) + bool HandleQueue() { if (!IsOpen()) return false; - std::size_t bytesToSend = _writeBuffer.GetActiveSize(); - - if (bytesToSend == 0) - return HandleQueue(guard); - - boost::system::error_code error; - std::size_t bytesWritten = _socket.write_some(boost::asio::buffer(_writeBuffer.GetReadPointer(), bytesToSend), error); - - if (error) - { - if (error == boost::asio::error::would_block || error == boost::asio::error::try_again) - return AsyncProcessQueue(guard); - - return false; - } - else if (bytesWritten == 0) - return false; - else if (bytesWritten < bytesToSend) - { - _writeBuffer.ReadCompleted(bytesWritten); - _writeBuffer.Normalize(); - return AsyncProcessQueue(guard); - } - - // now bytesWritten == bytesToSend - _writeBuffer.Reset(); - - return HandleQueue(guard); - } - - bool HandleQueue(std::unique_lock& guard) - { if (_writeQueue.empty()) return false; @@ -265,7 +213,7 @@ private: if (error) { if (error == boost::asio::error::would_block || error == boost::asio::error::try_again) - return AsyncProcessQueue(guard); + return AsyncProcessQueue(); _writeQueue.pop(); return false; @@ -278,7 +226,7 @@ private: else if (bytesSent < bytesToSend) // now n > 0 { queuedMessage.ReadCompleted(bytesSent); - return AsyncProcessQueue(guard); + return AsyncProcessQueue(); } _writeQueue.pop(); @@ -293,6 +241,7 @@ private: uint16 _remotePort; MessageBuffer _readBuffer; + std::queue _writeQueue; std::atomic _closed; std::atomic _closing; diff --git a/src/server/shared/Networking/SocketMgr.h b/src/server/shared/Networking/SocketMgr.h index 4037c85baa1..b14aac4ca47 100644 --- a/src/server/shared/Networking/SocketMgr.h +++ b/src/server/shared/Networking/SocketMgr.h @@ -90,20 +90,14 @@ public: _threads[i].Wait(); } - virtual void OnSocketOpen(tcp::socket&& sock) + virtual void OnSocketOpen(tcp::socket&& sock, uint32 threadIndex) { - size_t min = 0; - - for (int32 i = 1; i < _threadCount; ++i) - if (_threads[i].GetConnectionCount() < _threads[min].GetConnectionCount()) - min = i; - try { std::shared_ptr newSocket = std::make_shared(std::move(sock)); newSocket->Start(); - _threads[min].AddSocket(newSocket); + _threads[threadIndex].AddSocket(newSocket); } catch (boost::system::system_error const& err) { @@ -113,6 +107,23 @@ public: int32 GetNetworkThreadCount() const { return _threadCount; } + uint32 SelectThreadWithMinConnections() const + { + uint32 min = 0; + + for (int32 i = 1; i < _threadCount; ++i) + if (_threads[i].GetConnectionCount() < _threads[min].GetConnectionCount()) + min = i; + + return min; + } + + std::pair GetSocketForAccept() + { + uint32 threadIndex = SelectThreadWithMinConnections(); + return std::make_pair(_threads[threadIndex].GetSocketForAccept(), threadIndex); + } + protected: SocketMgr() : _acceptor(nullptr), _threads(nullptr), _threadCount(1) { -- cgit v1.2.3