diff options
| author | Shauren <shauren.trinity@gmail.com> | 2016-02-19 19:23:04 +0100 |
|---|---|---|
| committer | Shauren <shauren.trinity@gmail.com> | 2016-02-19 19:23:04 +0100 |
| commit | 97a79af4701621ec04b88c8b548dbc35d120e99e (patch) | |
| tree | c3e7f3f2f7f5ee41565bf16ea884cf55aa75c911 /src/server/bnetserver/Server | |
| parent | 06ec1b8fe8dfe9bb8a225ed57a053eb546d386ad (diff) | |
Core/Networking: Rewrite networking threading model
Each network thread has its own io_service - this means that all operations on a given socket except queueing packets run from a single thread, removing the need for locking
Sending packets now writes to a lockfree intermediate queue directly, encryption is applied in network thread if it was required at the time of sending the packet
Diffstat (limited to 'src/server/bnetserver/Server')
| -rw-r--r-- | src/server/bnetserver/Server/Session.cpp | 38 | ||||
| -rw-r--r-- | src/server/bnetserver/Server/Session.h | 8 | ||||
| -rw-r--r-- | src/server/bnetserver/Server/SessionManager.cpp | 7 | ||||
| -rw-r--r-- | src/server/bnetserver/Server/SessionManager.h | 2 |
4 files changed, 43 insertions, 12 deletions
diff --git a/src/server/bnetserver/Server/Session.cpp b/src/server/bnetserver/Server/Session.cpp index a5ede8d6524..4d54562501f 100644 --- a/src/server/bnetserver/Server/Session.cpp +++ b/src/server/bnetserver/Server/Session.cpp @@ -654,12 +654,37 @@ void Battlenet::Session::CheckIpCallback(PreparedQueryResult result) bool Battlenet::Session::Update() { + EncryptableBuffer* queued; + MessageBuffer buffer((std::size_t(BufferSizes::Read))); + while (_bufferQueue.Dequeue(queued)) + { + std::size_t packetSize = queued->Buffer.GetActiveSize(); + if (queued->Encrypt) + _crypt.EncryptSend(queued->Buffer.GetReadPointer(), packetSize); + + if (buffer.GetRemainingSpace() < packetSize) + { + QueuePacket(std::move(buffer)); + buffer.Resize(std::size_t(BufferSizes::Read)); + } + + if (buffer.GetRemainingSpace() >= packetSize) + buffer.Write(queued->Buffer.GetReadPointer(), packetSize); + else // single packet larger than 16384 bytes - client will reject. + QueuePacket(std::move(queued->Buffer)); + + delete queued; + } + + if (buffer.GetActiveSize() > 0) + QueuePacket(std::move(buffer)); + if (!BattlenetSocket::Update()) return false; if (_queryFuture.valid() && _queryFuture.wait_for(std::chrono::seconds(0)) == std::future_status::ready) { - auto callback = std::move(_queryCallback); + auto callback = _queryCallback; _queryCallback = nullptr; callback(_queryFuture.get()); } @@ -679,15 +704,12 @@ void Battlenet::Session::AsyncWrite(ServerPacket* packet) packet->Write(); - MessageBuffer buffer; - buffer.Write(packet->GetData(), packet->GetSize()); + EncryptableBuffer* buffer = new EncryptableBuffer(); + buffer->Buffer.Write(packet->GetData(), packet->GetSize()); + buffer->Encrypt = _crypt.IsInitialized(); delete packet; - std::unique_lock<std::mutex> guard(_writeLock); - - _crypt.EncryptSend(buffer.GetReadPointer(), buffer.GetActiveSize()); - - QueuePacket(std::move(buffer), guard); + _bufferQueue.Enqueue(buffer); } inline void ReplaceResponse(Battlenet::ServerPacket** oldResponse, Battlenet::ServerPacket* newResponse) diff --git a/src/server/bnetserver/Server/Session.h b/src/server/bnetserver/Server/Session.h index 75c30096417..2443d694a80 100644 --- a/src/server/bnetserver/Server/Session.h +++ b/src/server/bnetserver/Server/Session.h @@ -23,6 +23,7 @@ #include "Socket.h" #include "BigNumber.h" #include "Callback.h" +#include "MPSCQueue.h" #include <memory> #include <boost/asio/ip/tcp.hpp> @@ -174,6 +175,13 @@ namespace Battlenet std::queue<ModuleType> _modulesWaitingForData; + struct EncryptableBuffer + { + MessageBuffer Buffer; + bool Encrypt; + }; + + MPSCQueue<EncryptableBuffer> _bufferQueue; PacketCrypt _crypt; bool _authed; bool _subscribedToRealmListUpdates; diff --git a/src/server/bnetserver/Server/SessionManager.cpp b/src/server/bnetserver/Server/SessionManager.cpp index 8201f4869b4..c53214495d4 100644 --- a/src/server/bnetserver/Server/SessionManager.cpp +++ b/src/server/bnetserver/Server/SessionManager.cpp @@ -22,7 +22,8 @@ bool Battlenet::SessionManager::StartNetwork(boost::asio::io_service& service, s if (!BaseSocketMgr::StartNetwork(service, bindIp, port)) return false; - _acceptor->AsyncAcceptManaged(&OnSocketAccept); + _acceptor->SetSocketFactory(std::bind(&BaseSocketMgr::GetSocketForAccept, this)); + _acceptor->AsyncAcceptWithCallback<&OnSocketAccept>(); return true; } @@ -31,9 +32,9 @@ NetworkThread<Battlenet::Session>* Battlenet::SessionManager::CreateThreads() co return new NetworkThread<Session>[GetNetworkThreadCount()]; } -void Battlenet::SessionManager::OnSocketAccept(tcp::socket&& sock) +void Battlenet::SessionManager::OnSocketAccept(tcp::socket&& sock, uint32 threadIndex) { - sSessionMgr.OnSocketOpen(std::forward<tcp::socket>(sock)); + sSessionMgr.OnSocketOpen(std::forward<tcp::socket>(sock), threadIndex); } void Battlenet::SessionManager::AddSession(Session* session) diff --git a/src/server/bnetserver/Server/SessionManager.h b/src/server/bnetserver/Server/SessionManager.h index fe262b29f4e..5cf0b199f15 100644 --- a/src/server/bnetserver/Server/SessionManager.h +++ b/src/server/bnetserver/Server/SessionManager.h @@ -75,7 +75,7 @@ namespace Battlenet NetworkThread<Session>* CreateThreads() const override; private: - static void OnSocketAccept(tcp::socket&& sock); + static void OnSocketAccept(tcp::socket&& sock, uint32 threadIndex); SessionMap _sessions; SessionByAccountMap _sessionsByAccountId; |
