aboutsummaryrefslogtreecommitdiff
path: root/src/server/game/Server
diff options
context:
space:
mode:
authorShauren <shauren.trinity@gmail.com>2016-02-19 19:23:04 +0100
committerShauren <shauren.trinity@gmail.com>2016-02-19 19:23:04 +0100
commit97a79af4701621ec04b88c8b548dbc35d120e99e (patch)
treec3e7f3f2f7f5ee41565bf16ea884cf55aa75c911 /src/server/game/Server
parent06ec1b8fe8dfe9bb8a225ed57a053eb546d386ad (diff)
Core/Networking: Rewrite networking threading model
Each network thread has its own io_service - this means that all operations on a given socket except queueing packets run from a single thread, removing the need for locking Sending packets now writes to a lockfree intermediate queue directly, encryption is applied in network thread if it was required at the time of sending the packet
Diffstat (limited to 'src/server/game/Server')
-rw-r--r--src/server/game/Server/WorldSocket.cpp105
-rw-r--r--src/server/game/Server/WorldSocket.h6
-rw-r--r--src/server/game/Server/WorldSocketMgr.cpp15
-rw-r--r--src/server/game/Server/WorldSocketMgr.h2
4 files changed, 73 insertions, 55 deletions
diff --git a/src/server/game/Server/WorldSocket.cpp b/src/server/game/Server/WorldSocket.cpp
index 8f8c9d89502..acec25a7363 100644
--- a/src/server/game/Server/WorldSocket.cpp
+++ b/src/server/game/Server/WorldSocket.cpp
@@ -38,6 +38,17 @@ struct CompressedWorldPacket
uint32 CompressedAdler;
};
+class EncryptablePacket : public WorldPacket
+{
+public:
+ EncryptablePacket(WorldPacket const& packet, bool encrypt) : WorldPacket(packet), _encrypt(encrypt) { }
+
+ bool NeedsEncryption() const { return _encrypt; }
+
+private:
+ bool _encrypt;
+};
+
#pragma pack(pop)
using boost::asio::ip::tcp;
@@ -76,11 +87,8 @@ void WorldSocket::Start()
stmt->setString(0, ip_address);
stmt->setUInt32(1, inet_addr(ip_address.c_str()));
- {
- std::lock_guard<std::mutex> guard(_queryLock);
- _queryCallback = io_service().wrap(std::bind(&WorldSocket::CheckIpCallback, this, std::placeholders::_1));
- _queryFuture = LoginDatabase.AsyncQuery(stmt);
- }
+ _queryCallback = std::bind(&WorldSocket::CheckIpCallback, this, std::placeholders::_1);
+ _queryFuture = LoginDatabase.AsyncQuery(stmt);
}
void WorldSocket::CheckIpCallback(PreparedQueryResult result)
@@ -116,23 +124,50 @@ void WorldSocket::CheckIpCallback(PreparedQueryResult result)
initializer.Write(&header, sizeof(header.Setup.Size));
initializer.Write(ServerConnectionInitialize.c_str(), ServerConnectionInitialize.length());
- std::unique_lock<std::mutex> guard(_writeLock);
- QueuePacket(std::move(initializer), guard);
+ // - io_service.run thread, safe.
+ QueuePacket(std::move(initializer));
}
bool WorldSocket::Update()
{
+ EncryptablePacket* queued;
+ MessageBuffer buffer;
+ while (_bufferQueue.Dequeue(queued))
+ {
+ uint32 sizeOfHeader = SizeOfServerHeader[queued->NeedsEncryption()];
+ uint32 packetSize = queued->size();
+ if (packetSize > MinSizeForCompression && queued->NeedsEncryption())
+ packetSize = compressBound(packetSize) + sizeof(CompressedWorldPacket);
+
+ if (buffer.GetRemainingSpace() < packetSize + sizeOfHeader)
+ {
+ QueuePacket(std::move(buffer));
+ buffer.Resize(4096);
+ }
+
+ if (buffer.GetRemainingSpace() >= packetSize + sizeOfHeader)
+ WritePacketToBuffer(*queued, buffer);
+ else // single packet larger than 4096 bytes
+ {
+ MessageBuffer packetBuffer(packetSize + sizeOfHeader);
+ WritePacketToBuffer(*queued, packetBuffer);
+ QueuePacket(std::move(packetBuffer));
+ }
+
+ delete queued;
+ }
+
+ if (buffer.GetActiveSize() > 0)
+ QueuePacket(std::move(buffer));
+
if (!BaseSocket::Update())
return false;
+ if (_queryFuture.valid() && _queryFuture.wait_for(std::chrono::seconds(0)) == std::future_status::ready)
{
- std::lock_guard<std::mutex> guard(_queryLock);
- if (_queryFuture.valid() && _queryFuture.wait_for(std::chrono::seconds(0)) == std::future_status::ready)
- {
- auto callback = std::move(_queryCallback);
- _queryCallback = nullptr;
- callback(_queryFuture.get());
- }
+ auto callback = _queryCallback;
+ _queryCallback = nullptr;
+ callback(_queryFuture.get());
}
return true;
@@ -428,29 +463,13 @@ void WorldSocket::SendPacket(WorldPacket const& packet)
if (sPacketLog->CanLogPacket())
sPacketLog->LogPacket(packet, SERVER_TO_CLIENT, GetRemoteIpAddress(), GetRemotePort(), GetConnectionType());
- uint32 packetSize = packet.size();
- uint32 sizeOfHeader = SizeOfServerHeader[_authCrypt.IsInitialized()];
- if (packetSize > MinSizeForCompression && _authCrypt.IsInitialized())
- packetSize = compressBound(packetSize) + sizeof(CompressedWorldPacket);
-
- std::unique_lock<std::mutex> guard(_writeLock);
-
-#ifndef TC_SOCKET_USE_IOCP
- if (_writeQueue.empty() && _writeBuffer.GetRemainingSpace() >= sizeOfHeader + packetSize)
- WritePacketToBuffer(packet, _writeBuffer);
- else
-#endif
- {
- MessageBuffer buffer(sizeOfHeader + packetSize);
- WritePacketToBuffer(packet, buffer);
- QueuePacket(std::move(buffer), guard);
- }
+ _bufferQueue.Enqueue(new EncryptablePacket(packet, _authCrypt.IsInitialized()));
}
-void WorldSocket::WritePacketToBuffer(WorldPacket const& packet, MessageBuffer& buffer)
+void WorldSocket::WritePacketToBuffer(EncryptablePacket const& packet, MessageBuffer& buffer)
{
ServerPktHeader header;
- uint32 sizeOfHeader = SizeOfServerHeader[_authCrypt.IsInitialized()];
+ uint32 sizeOfHeader = SizeOfServerHeader[packet.NeedsEncryption()];
uint32 opcode = packet.GetOpcode();
uint32 packetSize = packet.size();
@@ -458,7 +477,7 @@ void WorldSocket::WritePacketToBuffer(WorldPacket const& packet, MessageBuffer&
uint8* headerPos = buffer.GetWritePointer();
buffer.WriteCompleted(sizeOfHeader);
- if (packetSize > MinSizeForCompression && _authCrypt.IsInitialized())
+ if (packetSize > MinSizeForCompression && packet.NeedsEncryption())
{
CompressedWorldPacket cmp;
cmp.UncompressedSize = packetSize + 4;
@@ -481,7 +500,7 @@ void WorldSocket::WritePacketToBuffer(WorldPacket const& packet, MessageBuffer&
else if (!packet.empty())
buffer.Write(packet.contents(), packet.size());
- if (_authCrypt.IsInitialized())
+ if (packet.NeedsEncryption())
{
header.Normal.Size = packetSize;
header.Normal.Command = opcode;
@@ -598,11 +617,8 @@ void WorldSocket::HandleAuthSession(std::shared_ptr<WorldPackets::Auth::AuthSess
stmt->setInt32(0, int32(realm.Id.Realm));
stmt->setString(1, authSession->Account);
- {
- std::lock_guard<std::mutex> guard(_queryLock);
- _queryCallback = io_service().wrap(std::bind(&WorldSocket::HandleAuthSessionCallback, this, authSession, std::placeholders::_1));
- _queryFuture = LoginDatabase.AsyncQuery(stmt);
- }
+ _queryCallback = std::bind(&WorldSocket::HandleAuthSessionCallback, this, authSession, std::placeholders::_1);
+ _queryFuture = LoginDatabase.AsyncQuery(stmt);
}
void WorldSocket::HandleAuthSessionCallback(std::shared_ptr<WorldPackets::Auth::AuthSession> authSession, PreparedQueryResult result)
@@ -768,7 +784,7 @@ void WorldSocket::HandleAuthSessionCallback(std::shared_ptr<WorldPackets::Auth::
if (wardenActive)
_worldSession->InitWarden(&account.Game.SessionKey, account.BattleNet.OS);
- _queryCallback = io_service().wrap(std::bind(&WorldSocket::LoadSessionPermissionsCallback, this, std::placeholders::_1));
+ _queryCallback = std::bind(&WorldSocket::LoadSessionPermissionsCallback, this, std::placeholders::_1);
_queryFuture = _worldSession->LoadPermissionsAsync();
AsyncRead();
}
@@ -801,11 +817,8 @@ void WorldSocket::HandleAuthContinuedSession(std::shared_ptr<WorldPackets::Auth:
PreparedStatement* stmt = LoginDatabase.GetPreparedStatement(LOGIN_SEL_ACCOUNT_INFO_CONTINUED_SESSION);
stmt->setUInt32(0, accountId);
- {
- std::lock_guard<std::mutex> guard(_queryLock);
- _queryCallback = io_service().wrap(std::bind(&WorldSocket::HandleAuthContinuedSessionCallback, this, authSession, std::placeholders::_1));
- _queryFuture = LoginDatabase.AsyncQuery(stmt);
- }
+ _queryCallback = std::bind(&WorldSocket::HandleAuthContinuedSessionCallback, this, authSession, std::placeholders::_1);
+ _queryFuture = LoginDatabase.AsyncQuery(stmt);
}
void WorldSocket::HandleAuthContinuedSessionCallback(std::shared_ptr<WorldPackets::Auth::AuthContinuedSession> authSession, PreparedQueryResult result)
diff --git a/src/server/game/Server/WorldSocket.h b/src/server/game/Server/WorldSocket.h
index d6d29fb2826..205494ca4ea 100644
--- a/src/server/game/Server/WorldSocket.h
+++ b/src/server/game/Server/WorldSocket.h
@@ -26,11 +26,13 @@
#include "Util.h"
#include "WorldPacket.h"
#include "WorldSession.h"
+#include "MPSCQueue.h"
#include <chrono>
#include <boost/asio/ip/tcp.hpp>
using boost::asio::ip::tcp;
struct z_stream_s;
+class EncryptablePacket;
namespace WorldPackets
{
@@ -111,7 +113,7 @@ private:
void LogOpcodeText(OpcodeClient opcode, std::unique_lock<std::mutex> const& guard) const;
/// sends and logs network.opcode without accessing WorldSession
void SendPacketAndLogOpcode(WorldPacket const& packet);
- void WritePacketToBuffer(WorldPacket const& packet, MessageBuffer& buffer);
+ void WritePacketToBuffer(EncryptablePacket const& packet, MessageBuffer& buffer);
uint32 CompressPacket(uint8* buffer, WorldPacket const& packet);
void HandleSendAuthSession();
@@ -142,12 +144,12 @@ private:
MessageBuffer _headerBuffer;
MessageBuffer _packetBuffer;
+ MPSCQueue<EncryptablePacket> _bufferQueue;
z_stream_s* _compressionStream;
bool _initialized;
- std::mutex _queryLock;
PreparedQueryResultFuture _queryFuture;
std::function<void(PreparedQueryResult&&)> _queryCallback;
std::string _ipCountry;
diff --git a/src/server/game/Server/WorldSocketMgr.cpp b/src/server/game/Server/WorldSocketMgr.cpp
index 937483e1179..94c5a8f6979 100644
--- a/src/server/game/Server/WorldSocketMgr.cpp
+++ b/src/server/game/Server/WorldSocketMgr.cpp
@@ -24,9 +24,9 @@
#include <boost/system/error_code.hpp>
-static void OnSocketAccept(tcp::socket&& sock)
+static void OnSocketAccept(tcp::socket&& sock, uint32 threadIndex)
{
- sWorldSocketMgr.OnSocketOpen(std::forward<tcp::socket>(sock));
+ sWorldSocketMgr.OnSocketOpen(std::forward<tcp::socket>(sock), threadIndex);
}
class WorldSocketThread : public NetworkThread<WorldSocket>
@@ -73,8 +73,11 @@ bool WorldSocketMgr::StartNetwork(boost::asio::io_service& service, std::string
BaseSocketMgr::StartNetwork(service, bindIp, port);
_instanceAcceptor = new AsyncAcceptor(service, bindIp, uint16(sWorld->getIntConfig(CONFIG_PORT_INSTANCE)));
- _acceptor->AsyncAcceptManaged(&OnSocketAccept);
- _instanceAcceptor->AsyncAcceptManaged(&OnSocketAccept);
+ _acceptor->SetSocketFactory(std::bind(&BaseSocketMgr::GetSocketForAccept, this));
+ _instanceAcceptor->SetSocketFactory(std::bind(&BaseSocketMgr::GetSocketForAccept, this));
+
+ _acceptor->AsyncAcceptWithCallback<&OnSocketAccept>();
+ _instanceAcceptor->AsyncAcceptWithCallback<&OnSocketAccept>();
sScriptMgr->OnNetworkStart();
return true;
@@ -87,7 +90,7 @@ void WorldSocketMgr::StopNetwork()
sScriptMgr->OnNetworkStop();
}
-void WorldSocketMgr::OnSocketOpen(tcp::socket&& sock)
+void WorldSocketMgr::OnSocketOpen(tcp::socket&& sock, uint32 threadIndex)
{
// set some options here
if (_socketSendBufferSize >= 0)
@@ -115,7 +118,7 @@ void WorldSocketMgr::OnSocketOpen(tcp::socket&& sock)
//sock->m_OutBufferSize = static_cast<size_t> (m_SockOutUBuff);
- BaseSocketMgr::OnSocketOpen(std::forward<tcp::socket>(sock));
+ BaseSocketMgr::OnSocketOpen(std::forward<tcp::socket>(sock), threadIndex);
}
NetworkThread<WorldSocket>* WorldSocketMgr::CreateThreads() const
diff --git a/src/server/game/Server/WorldSocketMgr.h b/src/server/game/Server/WorldSocketMgr.h
index d4bf4115deb..2079b62d14f 100644
--- a/src/server/game/Server/WorldSocketMgr.h
+++ b/src/server/game/Server/WorldSocketMgr.h
@@ -49,7 +49,7 @@ public:
/// Stops all network threads, It will wait for all running threads .
void StopNetwork() override;
- void OnSocketOpen(tcp::socket&& sock) override;
+ void OnSocketOpen(tcp::socket&& sock, uint32 threadIndex) override;
protected:
WorldSocketMgr();