From 97a79af4701621ec04b88c8b548dbc35d120e99e Mon Sep 17 00:00:00 2001 From: Shauren Date: Fri, 19 Feb 2016 19:23:04 +0100 Subject: Core/Networking: Rewrite networking threading model Each network thread has its own io_service - this means that all operations on a given socket except queueing packets run from a single thread, removing the need for locking Sending packets now writes to a lockfree intermediate queue directly, encryption is applied in network thread if it was required at the time of sending the packet --- src/server/bnetserver/Server/Session.cpp | 38 +++++++++++++++++++------ src/server/bnetserver/Server/Session.h | 8 ++++++ src/server/bnetserver/Server/SessionManager.cpp | 7 +++-- src/server/bnetserver/Server/SessionManager.h | 2 +- 4 files changed, 43 insertions(+), 12 deletions(-) (limited to 'src/server/bnetserver') diff --git a/src/server/bnetserver/Server/Session.cpp b/src/server/bnetserver/Server/Session.cpp index a5ede8d6524..4d54562501f 100644 --- a/src/server/bnetserver/Server/Session.cpp +++ b/src/server/bnetserver/Server/Session.cpp @@ -654,12 +654,37 @@ void Battlenet::Session::CheckIpCallback(PreparedQueryResult result) bool Battlenet::Session::Update() { + EncryptableBuffer* queued; + MessageBuffer buffer((std::size_t(BufferSizes::Read))); + while (_bufferQueue.Dequeue(queued)) + { + std::size_t packetSize = queued->Buffer.GetActiveSize(); + if (queued->Encrypt) + _crypt.EncryptSend(queued->Buffer.GetReadPointer(), packetSize); + + if (buffer.GetRemainingSpace() < packetSize) + { + QueuePacket(std::move(buffer)); + buffer.Resize(std::size_t(BufferSizes::Read)); + } + + if (buffer.GetRemainingSpace() >= packetSize) + buffer.Write(queued->Buffer.GetReadPointer(), packetSize); + else // single packet larger than 16384 bytes - client will reject. + QueuePacket(std::move(queued->Buffer)); + + delete queued; + } + + if (buffer.GetActiveSize() > 0) + QueuePacket(std::move(buffer)); + if (!BattlenetSocket::Update()) return false; if (_queryFuture.valid() && _queryFuture.wait_for(std::chrono::seconds(0)) == std::future_status::ready) { - auto callback = std::move(_queryCallback); + auto callback = _queryCallback; _queryCallback = nullptr; callback(_queryFuture.get()); } @@ -679,15 +704,12 @@ void Battlenet::Session::AsyncWrite(ServerPacket* packet) packet->Write(); - MessageBuffer buffer; - buffer.Write(packet->GetData(), packet->GetSize()); + EncryptableBuffer* buffer = new EncryptableBuffer(); + buffer->Buffer.Write(packet->GetData(), packet->GetSize()); + buffer->Encrypt = _crypt.IsInitialized(); delete packet; - std::unique_lock guard(_writeLock); - - _crypt.EncryptSend(buffer.GetReadPointer(), buffer.GetActiveSize()); - - QueuePacket(std::move(buffer), guard); + _bufferQueue.Enqueue(buffer); } inline void ReplaceResponse(Battlenet::ServerPacket** oldResponse, Battlenet::ServerPacket* newResponse) diff --git a/src/server/bnetserver/Server/Session.h b/src/server/bnetserver/Server/Session.h index 75c30096417..2443d694a80 100644 --- a/src/server/bnetserver/Server/Session.h +++ b/src/server/bnetserver/Server/Session.h @@ -23,6 +23,7 @@ #include "Socket.h" #include "BigNumber.h" #include "Callback.h" +#include "MPSCQueue.h" #include #include @@ -174,6 +175,13 @@ namespace Battlenet std::queue _modulesWaitingForData; + struct EncryptableBuffer + { + MessageBuffer Buffer; + bool Encrypt; + }; + + MPSCQueue _bufferQueue; PacketCrypt _crypt; bool _authed; bool _subscribedToRealmListUpdates; diff --git a/src/server/bnetserver/Server/SessionManager.cpp b/src/server/bnetserver/Server/SessionManager.cpp index 8201f4869b4..c53214495d4 100644 --- a/src/server/bnetserver/Server/SessionManager.cpp +++ b/src/server/bnetserver/Server/SessionManager.cpp @@ -22,7 +22,8 @@ bool Battlenet::SessionManager::StartNetwork(boost::asio::io_service& service, s if (!BaseSocketMgr::StartNetwork(service, bindIp, port)) return false; - _acceptor->AsyncAcceptManaged(&OnSocketAccept); + _acceptor->SetSocketFactory(std::bind(&BaseSocketMgr::GetSocketForAccept, this)); + _acceptor->AsyncAcceptWithCallback<&OnSocketAccept>(); return true; } @@ -31,9 +32,9 @@ NetworkThread* Battlenet::SessionManager::CreateThreads() co return new NetworkThread[GetNetworkThreadCount()]; } -void Battlenet::SessionManager::OnSocketAccept(tcp::socket&& sock) +void Battlenet::SessionManager::OnSocketAccept(tcp::socket&& sock, uint32 threadIndex) { - sSessionMgr.OnSocketOpen(std::forward(sock)); + sSessionMgr.OnSocketOpen(std::forward(sock), threadIndex); } void Battlenet::SessionManager::AddSession(Session* session) diff --git a/src/server/bnetserver/Server/SessionManager.h b/src/server/bnetserver/Server/SessionManager.h index fe262b29f4e..5cf0b199f15 100644 --- a/src/server/bnetserver/Server/SessionManager.h +++ b/src/server/bnetserver/Server/SessionManager.h @@ -75,7 +75,7 @@ namespace Battlenet NetworkThread* CreateThreads() const override; private: - static void OnSocketAccept(tcp::socket&& sock); + static void OnSocketAccept(tcp::socket&& sock, uint32 threadIndex); SessionMap _sessions; SessionByAccountMap _sessionsByAccountId; -- cgit v1.2.3