diff options
| author | Shauren <shauren.trinity@gmail.com> | 2016-02-19 19:23:04 +0100 |
|---|---|---|
| committer | Shauren <shauren.trinity@gmail.com> | 2016-02-19 19:23:04 +0100 |
| commit | 97a79af4701621ec04b88c8b548dbc35d120e99e (patch) | |
| tree | c3e7f3f2f7f5ee41565bf16ea884cf55aa75c911 /src/server/game/Server | |
| parent | 06ec1b8fe8dfe9bb8a225ed57a053eb546d386ad (diff) | |
Core/Networking: Rewrite networking threading model
Each network thread has its own io_service - this means that all operations on a given socket except queueing packets run from a single thread, removing the need for locking
Sending packets now writes to a lockfree intermediate queue directly, encryption is applied in network thread if it was required at the time of sending the packet
Diffstat (limited to 'src/server/game/Server')
| -rw-r--r-- | src/server/game/Server/WorldSocket.cpp | 105 | ||||
| -rw-r--r-- | src/server/game/Server/WorldSocket.h | 6 | ||||
| -rw-r--r-- | src/server/game/Server/WorldSocketMgr.cpp | 15 | ||||
| -rw-r--r-- | src/server/game/Server/WorldSocketMgr.h | 2 |
4 files changed, 73 insertions, 55 deletions
diff --git a/src/server/game/Server/WorldSocket.cpp b/src/server/game/Server/WorldSocket.cpp index 8f8c9d89502..acec25a7363 100644 --- a/src/server/game/Server/WorldSocket.cpp +++ b/src/server/game/Server/WorldSocket.cpp @@ -38,6 +38,17 @@ struct CompressedWorldPacket uint32 CompressedAdler; }; +class EncryptablePacket : public WorldPacket +{ +public: + EncryptablePacket(WorldPacket const& packet, bool encrypt) : WorldPacket(packet), _encrypt(encrypt) { } + + bool NeedsEncryption() const { return _encrypt; } + +private: + bool _encrypt; +}; + #pragma pack(pop) using boost::asio::ip::tcp; @@ -76,11 +87,8 @@ void WorldSocket::Start() stmt->setString(0, ip_address); stmt->setUInt32(1, inet_addr(ip_address.c_str())); - { - std::lock_guard<std::mutex> guard(_queryLock); - _queryCallback = io_service().wrap(std::bind(&WorldSocket::CheckIpCallback, this, std::placeholders::_1)); - _queryFuture = LoginDatabase.AsyncQuery(stmt); - } + _queryCallback = std::bind(&WorldSocket::CheckIpCallback, this, std::placeholders::_1); + _queryFuture = LoginDatabase.AsyncQuery(stmt); } void WorldSocket::CheckIpCallback(PreparedQueryResult result) @@ -116,23 +124,50 @@ void WorldSocket::CheckIpCallback(PreparedQueryResult result) initializer.Write(&header, sizeof(header.Setup.Size)); initializer.Write(ServerConnectionInitialize.c_str(), ServerConnectionInitialize.length()); - std::unique_lock<std::mutex> guard(_writeLock); - QueuePacket(std::move(initializer), guard); + // - io_service.run thread, safe. + QueuePacket(std::move(initializer)); } bool WorldSocket::Update() { + EncryptablePacket* queued; + MessageBuffer buffer; + while (_bufferQueue.Dequeue(queued)) + { + uint32 sizeOfHeader = SizeOfServerHeader[queued->NeedsEncryption()]; + uint32 packetSize = queued->size(); + if (packetSize > MinSizeForCompression && queued->NeedsEncryption()) + packetSize = compressBound(packetSize) + sizeof(CompressedWorldPacket); + + if (buffer.GetRemainingSpace() < packetSize + sizeOfHeader) + { + QueuePacket(std::move(buffer)); + buffer.Resize(4096); + } + + if (buffer.GetRemainingSpace() >= packetSize + sizeOfHeader) + WritePacketToBuffer(*queued, buffer); + else // single packet larger than 4096 bytes + { + MessageBuffer packetBuffer(packetSize + sizeOfHeader); + WritePacketToBuffer(*queued, packetBuffer); + QueuePacket(std::move(packetBuffer)); + } + + delete queued; + } + + if (buffer.GetActiveSize() > 0) + QueuePacket(std::move(buffer)); + if (!BaseSocket::Update()) return false; + if (_queryFuture.valid() && _queryFuture.wait_for(std::chrono::seconds(0)) == std::future_status::ready) { - std::lock_guard<std::mutex> guard(_queryLock); - if (_queryFuture.valid() && _queryFuture.wait_for(std::chrono::seconds(0)) == std::future_status::ready) - { - auto callback = std::move(_queryCallback); - _queryCallback = nullptr; - callback(_queryFuture.get()); - } + auto callback = _queryCallback; + _queryCallback = nullptr; + callback(_queryFuture.get()); } return true; @@ -428,29 +463,13 @@ void WorldSocket::SendPacket(WorldPacket const& packet) if (sPacketLog->CanLogPacket()) sPacketLog->LogPacket(packet, SERVER_TO_CLIENT, GetRemoteIpAddress(), GetRemotePort(), GetConnectionType()); - uint32 packetSize = packet.size(); - uint32 sizeOfHeader = SizeOfServerHeader[_authCrypt.IsInitialized()]; - if (packetSize > MinSizeForCompression && _authCrypt.IsInitialized()) - packetSize = compressBound(packetSize) + sizeof(CompressedWorldPacket); - - std::unique_lock<std::mutex> guard(_writeLock); - -#ifndef TC_SOCKET_USE_IOCP - if (_writeQueue.empty() && _writeBuffer.GetRemainingSpace() >= sizeOfHeader + packetSize) - WritePacketToBuffer(packet, _writeBuffer); - else -#endif - { - MessageBuffer buffer(sizeOfHeader + packetSize); - WritePacketToBuffer(packet, buffer); - QueuePacket(std::move(buffer), guard); - } + _bufferQueue.Enqueue(new EncryptablePacket(packet, _authCrypt.IsInitialized())); } -void WorldSocket::WritePacketToBuffer(WorldPacket const& packet, MessageBuffer& buffer) +void WorldSocket::WritePacketToBuffer(EncryptablePacket const& packet, MessageBuffer& buffer) { ServerPktHeader header; - uint32 sizeOfHeader = SizeOfServerHeader[_authCrypt.IsInitialized()]; + uint32 sizeOfHeader = SizeOfServerHeader[packet.NeedsEncryption()]; uint32 opcode = packet.GetOpcode(); uint32 packetSize = packet.size(); @@ -458,7 +477,7 @@ void WorldSocket::WritePacketToBuffer(WorldPacket const& packet, MessageBuffer& uint8* headerPos = buffer.GetWritePointer(); buffer.WriteCompleted(sizeOfHeader); - if (packetSize > MinSizeForCompression && _authCrypt.IsInitialized()) + if (packetSize > MinSizeForCompression && packet.NeedsEncryption()) { CompressedWorldPacket cmp; cmp.UncompressedSize = packetSize + 4; @@ -481,7 +500,7 @@ void WorldSocket::WritePacketToBuffer(WorldPacket const& packet, MessageBuffer& else if (!packet.empty()) buffer.Write(packet.contents(), packet.size()); - if (_authCrypt.IsInitialized()) + if (packet.NeedsEncryption()) { header.Normal.Size = packetSize; header.Normal.Command = opcode; @@ -598,11 +617,8 @@ void WorldSocket::HandleAuthSession(std::shared_ptr<WorldPackets::Auth::AuthSess stmt->setInt32(0, int32(realm.Id.Realm)); stmt->setString(1, authSession->Account); - { - std::lock_guard<std::mutex> guard(_queryLock); - _queryCallback = io_service().wrap(std::bind(&WorldSocket::HandleAuthSessionCallback, this, authSession, std::placeholders::_1)); - _queryFuture = LoginDatabase.AsyncQuery(stmt); - } + _queryCallback = std::bind(&WorldSocket::HandleAuthSessionCallback, this, authSession, std::placeholders::_1); + _queryFuture = LoginDatabase.AsyncQuery(stmt); } void WorldSocket::HandleAuthSessionCallback(std::shared_ptr<WorldPackets::Auth::AuthSession> authSession, PreparedQueryResult result) @@ -768,7 +784,7 @@ void WorldSocket::HandleAuthSessionCallback(std::shared_ptr<WorldPackets::Auth:: if (wardenActive) _worldSession->InitWarden(&account.Game.SessionKey, account.BattleNet.OS); - _queryCallback = io_service().wrap(std::bind(&WorldSocket::LoadSessionPermissionsCallback, this, std::placeholders::_1)); + _queryCallback = std::bind(&WorldSocket::LoadSessionPermissionsCallback, this, std::placeholders::_1); _queryFuture = _worldSession->LoadPermissionsAsync(); AsyncRead(); } @@ -801,11 +817,8 @@ void WorldSocket::HandleAuthContinuedSession(std::shared_ptr<WorldPackets::Auth: PreparedStatement* stmt = LoginDatabase.GetPreparedStatement(LOGIN_SEL_ACCOUNT_INFO_CONTINUED_SESSION); stmt->setUInt32(0, accountId); - { - std::lock_guard<std::mutex> guard(_queryLock); - _queryCallback = io_service().wrap(std::bind(&WorldSocket::HandleAuthContinuedSessionCallback, this, authSession, std::placeholders::_1)); - _queryFuture = LoginDatabase.AsyncQuery(stmt); - } + _queryCallback = std::bind(&WorldSocket::HandleAuthContinuedSessionCallback, this, authSession, std::placeholders::_1); + _queryFuture = LoginDatabase.AsyncQuery(stmt); } void WorldSocket::HandleAuthContinuedSessionCallback(std::shared_ptr<WorldPackets::Auth::AuthContinuedSession> authSession, PreparedQueryResult result) diff --git a/src/server/game/Server/WorldSocket.h b/src/server/game/Server/WorldSocket.h index d6d29fb2826..205494ca4ea 100644 --- a/src/server/game/Server/WorldSocket.h +++ b/src/server/game/Server/WorldSocket.h @@ -26,11 +26,13 @@ #include "Util.h" #include "WorldPacket.h" #include "WorldSession.h" +#include "MPSCQueue.h" #include <chrono> #include <boost/asio/ip/tcp.hpp> using boost::asio::ip::tcp; struct z_stream_s; +class EncryptablePacket; namespace WorldPackets { @@ -111,7 +113,7 @@ private: void LogOpcodeText(OpcodeClient opcode, std::unique_lock<std::mutex> const& guard) const; /// sends and logs network.opcode without accessing WorldSession void SendPacketAndLogOpcode(WorldPacket const& packet); - void WritePacketToBuffer(WorldPacket const& packet, MessageBuffer& buffer); + void WritePacketToBuffer(EncryptablePacket const& packet, MessageBuffer& buffer); uint32 CompressPacket(uint8* buffer, WorldPacket const& packet); void HandleSendAuthSession(); @@ -142,12 +144,12 @@ private: MessageBuffer _headerBuffer; MessageBuffer _packetBuffer; + MPSCQueue<EncryptablePacket> _bufferQueue; z_stream_s* _compressionStream; bool _initialized; - std::mutex _queryLock; PreparedQueryResultFuture _queryFuture; std::function<void(PreparedQueryResult&&)> _queryCallback; std::string _ipCountry; diff --git a/src/server/game/Server/WorldSocketMgr.cpp b/src/server/game/Server/WorldSocketMgr.cpp index 937483e1179..94c5a8f6979 100644 --- a/src/server/game/Server/WorldSocketMgr.cpp +++ b/src/server/game/Server/WorldSocketMgr.cpp @@ -24,9 +24,9 @@ #include <boost/system/error_code.hpp> -static void OnSocketAccept(tcp::socket&& sock) +static void OnSocketAccept(tcp::socket&& sock, uint32 threadIndex) { - sWorldSocketMgr.OnSocketOpen(std::forward<tcp::socket>(sock)); + sWorldSocketMgr.OnSocketOpen(std::forward<tcp::socket>(sock), threadIndex); } class WorldSocketThread : public NetworkThread<WorldSocket> @@ -73,8 +73,11 @@ bool WorldSocketMgr::StartNetwork(boost::asio::io_service& service, std::string BaseSocketMgr::StartNetwork(service, bindIp, port); _instanceAcceptor = new AsyncAcceptor(service, bindIp, uint16(sWorld->getIntConfig(CONFIG_PORT_INSTANCE))); - _acceptor->AsyncAcceptManaged(&OnSocketAccept); - _instanceAcceptor->AsyncAcceptManaged(&OnSocketAccept); + _acceptor->SetSocketFactory(std::bind(&BaseSocketMgr::GetSocketForAccept, this)); + _instanceAcceptor->SetSocketFactory(std::bind(&BaseSocketMgr::GetSocketForAccept, this)); + + _acceptor->AsyncAcceptWithCallback<&OnSocketAccept>(); + _instanceAcceptor->AsyncAcceptWithCallback<&OnSocketAccept>(); sScriptMgr->OnNetworkStart(); return true; @@ -87,7 +90,7 @@ void WorldSocketMgr::StopNetwork() sScriptMgr->OnNetworkStop(); } -void WorldSocketMgr::OnSocketOpen(tcp::socket&& sock) +void WorldSocketMgr::OnSocketOpen(tcp::socket&& sock, uint32 threadIndex) { // set some options here if (_socketSendBufferSize >= 0) @@ -115,7 +118,7 @@ void WorldSocketMgr::OnSocketOpen(tcp::socket&& sock) //sock->m_OutBufferSize = static_cast<size_t> (m_SockOutUBuff); - BaseSocketMgr::OnSocketOpen(std::forward<tcp::socket>(sock)); + BaseSocketMgr::OnSocketOpen(std::forward<tcp::socket>(sock), threadIndex); } NetworkThread<WorldSocket>* WorldSocketMgr::CreateThreads() const diff --git a/src/server/game/Server/WorldSocketMgr.h b/src/server/game/Server/WorldSocketMgr.h index d4bf4115deb..2079b62d14f 100644 --- a/src/server/game/Server/WorldSocketMgr.h +++ b/src/server/game/Server/WorldSocketMgr.h @@ -49,7 +49,7 @@ public: /// Stops all network threads, It will wait for all running threads . void StopNetwork() override; - void OnSocketOpen(tcp::socket&& sock) override; + void OnSocketOpen(tcp::socket&& sock, uint32 threadIndex) override; protected: WorldSocketMgr(); |
