diff options
| author | Shauren <shauren.trinity@gmail.com> | 2016-02-20 13:08:03 +0100 |
|---|---|---|
| committer | Shauren <shauren.trinity@gmail.com> | 2016-02-20 13:08:03 +0100 |
| commit | b2e03a744813a17bf0c01c9ef010e65cac078420 (patch) | |
| tree | 6513a0674f2a07c89b05b089e8d52e8accde68cb /src/server/game/Server | |
| parent | d4184065b6d51b250ab08f35e88868cef631ed4b (diff) | |
Core/Networking: Rewrite networking threading model
Each network thread has its own io_service - this means that all operations on a given socket except queueing packets run from a single thread, removing the need for locking
Sending packets now writes to a lockfree intermediate queue directly, encryption is applied in network thread if it was required at the time of sending the packet
(cherry picked from commit 97a79af4701621ec04b88c8b548dbc35d120e99e)
Diffstat (limited to 'src/server/game/Server')
| -rw-r--r-- | src/server/game/Server/WorldSocket.cpp | 98 | ||||
| -rw-r--r-- | src/server/game/Server/WorldSocket.h | 4 | ||||
| -rw-r--r-- | src/server/game/Server/WorldSocketMgr.cpp | 12 | ||||
| -rw-r--r-- | src/server/game/Server/WorldSocketMgr.h | 2 |
4 files changed, 68 insertions, 48 deletions
diff --git a/src/server/game/Server/WorldSocket.cpp b/src/server/game/Server/WorldSocket.cpp index a2d357cbc4d..36029113055 100644 --- a/src/server/game/Server/WorldSocket.cpp +++ b/src/server/game/Server/WorldSocket.cpp @@ -25,6 +25,17 @@ #include <memory> +class EncryptablePacket : public WorldPacket +{ +public: + EncryptablePacket(WorldPacket const& packet, bool encrypt) : WorldPacket(packet), _encrypt(encrypt) { } + + bool NeedsEncryption() const { return _encrypt; } + +private: + bool _encrypt; +}; + using boost::asio::ip::tcp; WorldSocket::WorldSocket(tcp::socket&& socket) @@ -40,11 +51,8 @@ void WorldSocket::Start() stmt->setString(0, ip_address); stmt->setUInt32(1, inet_addr(ip_address.c_str())); - { - std::lock_guard<std::mutex> guard(_queryLock); - _queryCallback = io_service().wrap(std::bind(&WorldSocket::CheckIpCallback, this, std::placeholders::_1)); - _queryFuture = LoginDatabase.AsyncQuery(stmt); - } + _queryCallback = std::bind(&WorldSocket::CheckIpCallback, this, std::placeholders::_1); + _queryFuture = LoginDatabase.AsyncQuery(stmt); } void WorldSocket::CheckIpCallback(PreparedQueryResult result) @@ -78,17 +86,50 @@ void WorldSocket::CheckIpCallback(PreparedQueryResult result) bool WorldSocket::Update() { + EncryptablePacket* queued; + MessageBuffer buffer; + while (_bufferQueue.Dequeue(queued)) + { + ServerPktHeader header(queued->size() + 2, queued->GetOpcode()); + if (queued->NeedsEncryption()) + _authCrypt.EncryptSend(header.header, header.getHeaderLength()); + + if (buffer.GetRemainingSpace() < queued->size() + header.getHeaderLength()) + { + QueuePacket(std::move(buffer)); + buffer.Resize(4096); + } + + if (buffer.GetRemainingSpace() >= queued->size() + header.getHeaderLength()) + { + buffer.Write(header.header, header.getHeaderLength()); + if (!queued->empty()) + buffer.Write(queued->contents(), queued->size()); + } + else // single packet larger than 4096 bytes + { + MessageBuffer packetBuffer(queued->size() + header.getHeaderLength()); + packetBuffer.Write(header.header, header.getHeaderLength()); + if (!queued->empty()) + packetBuffer.Write(queued->contents(), queued->size()); + + QueuePacket(std::move(packetBuffer)); + } + + delete queued; + } + + if (buffer.GetActiveSize() > 0) + QueuePacket(std::move(buffer)); + if (!BaseSocket::Update()) return false; + if (_queryFuture.valid() && _queryFuture.wait_for(std::chrono::seconds(0)) == std::future_status::ready) { - std::lock_guard<std::mutex> guard(_queryLock); - if (_queryFuture.valid() && _queryFuture.wait_for(std::chrono::seconds(0)) == std::future_status::ready) - { - auto callback = std::move(_queryCallback); - _queryCallback = nullptr; - callback(_queryFuture.get()); - } + auto callback = _queryCallback; + _queryCallback = nullptr; + callback(_queryFuture.get()); } return true; @@ -351,29 +392,7 @@ void WorldSocket::SendPacket(WorldPacket const& packet) if (sPacketLog->CanLogPacket()) sPacketLog->LogPacket(packet, SERVER_TO_CLIENT, GetRemoteIpAddress(), GetRemotePort()); - ServerPktHeader header(packet.size() + 2, packet.GetOpcode()); - - std::unique_lock<std::mutex> guard(_writeLock); - - _authCrypt.EncryptSend(header.header, header.getHeaderLength()); - -#ifndef TC_SOCKET_USE_IOCP - if (_writeQueue.empty() && _writeBuffer.GetRemainingSpace() >= header.getHeaderLength() + packet.size()) - { - _writeBuffer.Write(header.header, header.getHeaderLength()); - if (!packet.empty()) - _writeBuffer.Write(packet.contents(), packet.size()); - } - else -#endif - { - MessageBuffer buffer(header.getHeaderLength() + packet.size()); - buffer.Write(header.header, header.getHeaderLength()); - if (!packet.empty()) - buffer.Write(packet.contents(), packet.size()); - - QueuePacket(std::move(buffer), guard); - } + _bufferQueue.Enqueue(new EncryptablePacket(packet, _authCrypt.IsInitialized())); } void WorldSocket::HandleAuthSession(WorldPacket& recvPacket) @@ -398,11 +417,8 @@ void WorldSocket::HandleAuthSession(WorldPacket& recvPacket) stmt->setInt32(0, int32(realmID)); stmt->setString(1, authSession->Account); - { - std::lock_guard<std::mutex> guard(_queryLock); - _queryCallback = io_service().wrap(std::bind(&WorldSocket::HandleAuthSessionCallback, this, authSession, std::placeholders::_1)); - _queryFuture = LoginDatabase.AsyncQuery(stmt); - } + _queryCallback = std::bind(&WorldSocket::HandleAuthSessionCallback, this, authSession, std::placeholders::_1); + _queryFuture = LoginDatabase.AsyncQuery(stmt); } void WorldSocket::HandleAuthSessionCallback(std::shared_ptr<AuthSession> authSession, PreparedQueryResult result) @@ -559,7 +575,7 @@ void WorldSocket::HandleAuthSessionCallback(std::shared_ptr<AuthSession> authSes if (wardenActive) _worldSession->InitWarden(&account.SessionKey, account.OS); - _queryCallback = io_service().wrap(std::bind(&WorldSocket::LoadSessionPermissionsCallback, this, std::placeholders::_1)); + _queryCallback = std::bind(&WorldSocket::LoadSessionPermissionsCallback, this, std::placeholders::_1); _queryFuture = _worldSession->LoadPermissionsAsync(); AsyncRead(); } diff --git a/src/server/game/Server/WorldSocket.h b/src/server/game/Server/WorldSocket.h index 9e5b35992a6..08a5b185cf1 100644 --- a/src/server/game/Server/WorldSocket.h +++ b/src/server/game/Server/WorldSocket.h @@ -26,11 +26,13 @@ #include "Util.h" #include "WorldPacket.h" #include "WorldSession.h" +#include "MPSCQueue.h" #include <chrono> #include <boost/asio/ip/tcp.hpp> #include <boost/asio/buffer.hpp> using boost::asio::ip::tcp; +class EncryptablePacket; #pragma pack(push, 1) @@ -104,8 +106,8 @@ private: MessageBuffer _headerBuffer; MessageBuffer _packetBuffer; + MPSCQueue<EncryptablePacket> _bufferQueue; - std::mutex _queryLock; PreparedQueryResultFuture _queryFuture; std::function<void(PreparedQueryResult&&)> _queryCallback; std::string _ipCountry; diff --git a/src/server/game/Server/WorldSocketMgr.cpp b/src/server/game/Server/WorldSocketMgr.cpp index 529396b3966..e8f8c59f4af 100644 --- a/src/server/game/Server/WorldSocketMgr.cpp +++ b/src/server/game/Server/WorldSocketMgr.cpp @@ -24,9 +24,9 @@ #include <boost/system/error_code.hpp> -static void OnSocketAccept(tcp::socket&& sock) +static void OnSocketAccept(tcp::socket&& sock, uint32 threadIndex) { - sWorldSocketMgr.OnSocketOpen(std::forward<tcp::socket>(sock)); + sWorldSocketMgr.OnSocketOpen(std::forward<tcp::socket>(sock), threadIndex); } class WorldSocketThread : public NetworkThread<WorldSocket> @@ -67,7 +67,9 @@ bool WorldSocketMgr::StartNetwork(boost::asio::io_service& service, std::string BaseSocketMgr::StartNetwork(service, bindIp, port); - _acceptor->AsyncAcceptManaged(&OnSocketAccept); + _acceptor->SetSocketFactory(std::bind(&BaseSocketMgr::GetSocketForAccept, this)); + + _acceptor->AsyncAcceptWithCallback<&OnSocketAccept>(); sScriptMgr->OnNetworkStart(); return true; @@ -80,7 +82,7 @@ void WorldSocketMgr::StopNetwork() sScriptMgr->OnNetworkStop(); } -void WorldSocketMgr::OnSocketOpen(tcp::socket&& sock) +void WorldSocketMgr::OnSocketOpen(tcp::socket&& sock, uint32 threadIndex) { // set some options here if (_socketSendBufferSize >= 0) @@ -108,7 +110,7 @@ void WorldSocketMgr::OnSocketOpen(tcp::socket&& sock) //sock->m_OutBufferSize = static_cast<size_t> (m_SockOutUBuff); - BaseSocketMgr::OnSocketOpen(std::forward<tcp::socket>(sock)); + BaseSocketMgr::OnSocketOpen(std::forward<tcp::socket>(sock), threadIndex); } NetworkThread<WorldSocket>* WorldSocketMgr::CreateThreads() const diff --git a/src/server/game/Server/WorldSocketMgr.h b/src/server/game/Server/WorldSocketMgr.h index 92a28d0c135..38e2e7abb69 100644 --- a/src/server/game/Server/WorldSocketMgr.h +++ b/src/server/game/Server/WorldSocketMgr.h @@ -47,7 +47,7 @@ public: /// Stops all network threads, It will wait for all running threads . void StopNetwork() override; - void OnSocketOpen(tcp::socket&& sock) override; + void OnSocketOpen(tcp::socket&& sock, uint32 threadIndex) override; protected: WorldSocketMgr(); |
