aboutsummaryrefslogtreecommitdiff
path: root/src/server/game/Server
diff options
context:
space:
mode:
authorShauren <shauren.trinity@gmail.com>2016-02-20 13:08:03 +0100
committerShauren <shauren.trinity@gmail.com>2016-02-20 13:08:03 +0100
commitb2e03a744813a17bf0c01c9ef010e65cac078420 (patch)
tree6513a0674f2a07c89b05b089e8d52e8accde68cb /src/server/game/Server
parentd4184065b6d51b250ab08f35e88868cef631ed4b (diff)
Core/Networking: Rewrite networking threading model
Each network thread has its own io_service - this means that all operations on a given socket except queueing packets run from a single thread, removing the need for locking Sending packets now writes to a lockfree intermediate queue directly, encryption is applied in network thread if it was required at the time of sending the packet (cherry picked from commit 97a79af4701621ec04b88c8b548dbc35d120e99e)
Diffstat (limited to 'src/server/game/Server')
-rw-r--r--src/server/game/Server/WorldSocket.cpp98
-rw-r--r--src/server/game/Server/WorldSocket.h4
-rw-r--r--src/server/game/Server/WorldSocketMgr.cpp12
-rw-r--r--src/server/game/Server/WorldSocketMgr.h2
4 files changed, 68 insertions, 48 deletions
diff --git a/src/server/game/Server/WorldSocket.cpp b/src/server/game/Server/WorldSocket.cpp
index a2d357cbc4d..36029113055 100644
--- a/src/server/game/Server/WorldSocket.cpp
+++ b/src/server/game/Server/WorldSocket.cpp
@@ -25,6 +25,17 @@
#include <memory>
+class EncryptablePacket : public WorldPacket
+{
+public:
+ EncryptablePacket(WorldPacket const& packet, bool encrypt) : WorldPacket(packet), _encrypt(encrypt) { }
+
+ bool NeedsEncryption() const { return _encrypt; }
+
+private:
+ bool _encrypt;
+};
+
using boost::asio::ip::tcp;
WorldSocket::WorldSocket(tcp::socket&& socket)
@@ -40,11 +51,8 @@ void WorldSocket::Start()
stmt->setString(0, ip_address);
stmt->setUInt32(1, inet_addr(ip_address.c_str()));
- {
- std::lock_guard<std::mutex> guard(_queryLock);
- _queryCallback = io_service().wrap(std::bind(&WorldSocket::CheckIpCallback, this, std::placeholders::_1));
- _queryFuture = LoginDatabase.AsyncQuery(stmt);
- }
+ _queryCallback = std::bind(&WorldSocket::CheckIpCallback, this, std::placeholders::_1);
+ _queryFuture = LoginDatabase.AsyncQuery(stmt);
}
void WorldSocket::CheckIpCallback(PreparedQueryResult result)
@@ -78,17 +86,50 @@ void WorldSocket::CheckIpCallback(PreparedQueryResult result)
bool WorldSocket::Update()
{
+ EncryptablePacket* queued;
+ MessageBuffer buffer;
+ while (_bufferQueue.Dequeue(queued))
+ {
+ ServerPktHeader header(queued->size() + 2, queued->GetOpcode());
+ if (queued->NeedsEncryption())
+ _authCrypt.EncryptSend(header.header, header.getHeaderLength());
+
+ if (buffer.GetRemainingSpace() < queued->size() + header.getHeaderLength())
+ {
+ QueuePacket(std::move(buffer));
+ buffer.Resize(4096);
+ }
+
+ if (buffer.GetRemainingSpace() >= queued->size() + header.getHeaderLength())
+ {
+ buffer.Write(header.header, header.getHeaderLength());
+ if (!queued->empty())
+ buffer.Write(queued->contents(), queued->size());
+ }
+ else // single packet larger than 4096 bytes
+ {
+ MessageBuffer packetBuffer(queued->size() + header.getHeaderLength());
+ packetBuffer.Write(header.header, header.getHeaderLength());
+ if (!queued->empty())
+ packetBuffer.Write(queued->contents(), queued->size());
+
+ QueuePacket(std::move(packetBuffer));
+ }
+
+ delete queued;
+ }
+
+ if (buffer.GetActiveSize() > 0)
+ QueuePacket(std::move(buffer));
+
if (!BaseSocket::Update())
return false;
+ if (_queryFuture.valid() && _queryFuture.wait_for(std::chrono::seconds(0)) == std::future_status::ready)
{
- std::lock_guard<std::mutex> guard(_queryLock);
- if (_queryFuture.valid() && _queryFuture.wait_for(std::chrono::seconds(0)) == std::future_status::ready)
- {
- auto callback = std::move(_queryCallback);
- _queryCallback = nullptr;
- callback(_queryFuture.get());
- }
+ auto callback = _queryCallback;
+ _queryCallback = nullptr;
+ callback(_queryFuture.get());
}
return true;
@@ -351,29 +392,7 @@ void WorldSocket::SendPacket(WorldPacket const& packet)
if (sPacketLog->CanLogPacket())
sPacketLog->LogPacket(packet, SERVER_TO_CLIENT, GetRemoteIpAddress(), GetRemotePort());
- ServerPktHeader header(packet.size() + 2, packet.GetOpcode());
-
- std::unique_lock<std::mutex> guard(_writeLock);
-
- _authCrypt.EncryptSend(header.header, header.getHeaderLength());
-
-#ifndef TC_SOCKET_USE_IOCP
- if (_writeQueue.empty() && _writeBuffer.GetRemainingSpace() >= header.getHeaderLength() + packet.size())
- {
- _writeBuffer.Write(header.header, header.getHeaderLength());
- if (!packet.empty())
- _writeBuffer.Write(packet.contents(), packet.size());
- }
- else
-#endif
- {
- MessageBuffer buffer(header.getHeaderLength() + packet.size());
- buffer.Write(header.header, header.getHeaderLength());
- if (!packet.empty())
- buffer.Write(packet.contents(), packet.size());
-
- QueuePacket(std::move(buffer), guard);
- }
+ _bufferQueue.Enqueue(new EncryptablePacket(packet, _authCrypt.IsInitialized()));
}
void WorldSocket::HandleAuthSession(WorldPacket& recvPacket)
@@ -398,11 +417,8 @@ void WorldSocket::HandleAuthSession(WorldPacket& recvPacket)
stmt->setInt32(0, int32(realmID));
stmt->setString(1, authSession->Account);
- {
- std::lock_guard<std::mutex> guard(_queryLock);
- _queryCallback = io_service().wrap(std::bind(&WorldSocket::HandleAuthSessionCallback, this, authSession, std::placeholders::_1));
- _queryFuture = LoginDatabase.AsyncQuery(stmt);
- }
+ _queryCallback = std::bind(&WorldSocket::HandleAuthSessionCallback, this, authSession, std::placeholders::_1);
+ _queryFuture = LoginDatabase.AsyncQuery(stmt);
}
void WorldSocket::HandleAuthSessionCallback(std::shared_ptr<AuthSession> authSession, PreparedQueryResult result)
@@ -559,7 +575,7 @@ void WorldSocket::HandleAuthSessionCallback(std::shared_ptr<AuthSession> authSes
if (wardenActive)
_worldSession->InitWarden(&account.SessionKey, account.OS);
- _queryCallback = io_service().wrap(std::bind(&WorldSocket::LoadSessionPermissionsCallback, this, std::placeholders::_1));
+ _queryCallback = std::bind(&WorldSocket::LoadSessionPermissionsCallback, this, std::placeholders::_1);
_queryFuture = _worldSession->LoadPermissionsAsync();
AsyncRead();
}
diff --git a/src/server/game/Server/WorldSocket.h b/src/server/game/Server/WorldSocket.h
index 9e5b35992a6..08a5b185cf1 100644
--- a/src/server/game/Server/WorldSocket.h
+++ b/src/server/game/Server/WorldSocket.h
@@ -26,11 +26,13 @@
#include "Util.h"
#include "WorldPacket.h"
#include "WorldSession.h"
+#include "MPSCQueue.h"
#include <chrono>
#include <boost/asio/ip/tcp.hpp>
#include <boost/asio/buffer.hpp>
using boost::asio::ip::tcp;
+class EncryptablePacket;
#pragma pack(push, 1)
@@ -104,8 +106,8 @@ private:
MessageBuffer _headerBuffer;
MessageBuffer _packetBuffer;
+ MPSCQueue<EncryptablePacket> _bufferQueue;
- std::mutex _queryLock;
PreparedQueryResultFuture _queryFuture;
std::function<void(PreparedQueryResult&&)> _queryCallback;
std::string _ipCountry;
diff --git a/src/server/game/Server/WorldSocketMgr.cpp b/src/server/game/Server/WorldSocketMgr.cpp
index 529396b3966..e8f8c59f4af 100644
--- a/src/server/game/Server/WorldSocketMgr.cpp
+++ b/src/server/game/Server/WorldSocketMgr.cpp
@@ -24,9 +24,9 @@
#include <boost/system/error_code.hpp>
-static void OnSocketAccept(tcp::socket&& sock)
+static void OnSocketAccept(tcp::socket&& sock, uint32 threadIndex)
{
- sWorldSocketMgr.OnSocketOpen(std::forward<tcp::socket>(sock));
+ sWorldSocketMgr.OnSocketOpen(std::forward<tcp::socket>(sock), threadIndex);
}
class WorldSocketThread : public NetworkThread<WorldSocket>
@@ -67,7 +67,9 @@ bool WorldSocketMgr::StartNetwork(boost::asio::io_service& service, std::string
BaseSocketMgr::StartNetwork(service, bindIp, port);
- _acceptor->AsyncAcceptManaged(&OnSocketAccept);
+ _acceptor->SetSocketFactory(std::bind(&BaseSocketMgr::GetSocketForAccept, this));
+
+ _acceptor->AsyncAcceptWithCallback<&OnSocketAccept>();
sScriptMgr->OnNetworkStart();
return true;
@@ -80,7 +82,7 @@ void WorldSocketMgr::StopNetwork()
sScriptMgr->OnNetworkStop();
}
-void WorldSocketMgr::OnSocketOpen(tcp::socket&& sock)
+void WorldSocketMgr::OnSocketOpen(tcp::socket&& sock, uint32 threadIndex)
{
// set some options here
if (_socketSendBufferSize >= 0)
@@ -108,7 +110,7 @@ void WorldSocketMgr::OnSocketOpen(tcp::socket&& sock)
//sock->m_OutBufferSize = static_cast<size_t> (m_SockOutUBuff);
- BaseSocketMgr::OnSocketOpen(std::forward<tcp::socket>(sock));
+ BaseSocketMgr::OnSocketOpen(std::forward<tcp::socket>(sock), threadIndex);
}
NetworkThread<WorldSocket>* WorldSocketMgr::CreateThreads() const
diff --git a/src/server/game/Server/WorldSocketMgr.h b/src/server/game/Server/WorldSocketMgr.h
index 92a28d0c135..38e2e7abb69 100644
--- a/src/server/game/Server/WorldSocketMgr.h
+++ b/src/server/game/Server/WorldSocketMgr.h
@@ -47,7 +47,7 @@ public:
/// Stops all network threads, It will wait for all running threads .
void StopNetwork() override;
- void OnSocketOpen(tcp::socket&& sock) override;
+ void OnSocketOpen(tcp::socket&& sock, uint32 threadIndex) override;
protected:
WorldSocketMgr();