diff options
| author | Shauren <shauren.trinity@gmail.com> | 2014-09-09 19:19:25 +0200 |
|---|---|---|
| committer | Shauren <shauren.trinity@gmail.com> | 2014-09-09 19:19:25 +0200 |
| commit | e0ce4528c5ffd43f651f88821723311541e9e461 (patch) | |
| tree | c5f3906d17114120b2ea2b382cea39db07137d18 /src/server/game | |
| parent | a2ba49afa428ed9297f98bf8a5e00f6f7a6f4c3a (diff) | |
Core/NetworkIO: Use reactor style sending on linux to reduce locking overhead
Diffstat (limited to 'src/server/game')
| -rw-r--r-- | src/server/game/Server/WorldSession.cpp | 2 | ||||
| -rw-r--r-- | src/server/game/Server/WorldSocket.cpp | 122 | ||||
| -rw-r--r-- | src/server/game/Server/WorldSocket.h | 67 | ||||
| -rw-r--r-- | src/server/game/Server/WorldSocketMgr.cpp | 115 | ||||
| -rw-r--r-- | src/server/game/Server/WorldSocketMgr.h | 66 |
5 files changed, 282 insertions, 90 deletions
diff --git a/src/server/game/Server/WorldSession.cpp b/src/server/game/Server/WorldSession.cpp index 445e42a7f08..321bc707879 100644 --- a/src/server/game/Server/WorldSession.cpp +++ b/src/server/game/Server/WorldSession.cpp @@ -227,7 +227,7 @@ void WorldSession::SendPacket(WorldPacket* packet) sScriptMgr->OnPacketSend(this, *packet); - m_Socket->AsyncWrite(*packet); + m_Socket->SendPacket(*packet); } /// Add an incoming packet to the queue diff --git a/src/server/game/Server/WorldSocket.cpp b/src/server/game/Server/WorldSocket.cpp index ca8e2cd5a34..f8673e5d5b7 100644 --- a/src/server/game/Server/WorldSocket.cpp +++ b/src/server/game/Server/WorldSocket.cpp @@ -28,14 +28,14 @@ using boost::asio::ip::tcp; WorldSocket::WorldSocket(tcp::socket&& socket) - : Socket(std::move(socket), sizeof(ClientPktHeader)), _authSeed(rand32()), _OverSpeedPings(0), _worldSession(nullptr) + : Socket(std::move(socket)), _authSeed(rand32()), _OverSpeedPings(0), _worldSession(nullptr) { + _headerBuffer.Resize(sizeof(ClientPktHeader)); } void WorldSocket::Start() { - sScriptMgr->OnSocketOpen(shared_from_this()); - AsyncReadHeader(); + AsyncRead(); HandleSendAuthSession(); } @@ -53,14 +53,69 @@ void WorldSocket::HandleSendAuthSession() seed2.SetRand(16 * 8); packet.append(seed2.AsByteArray(16).get(), 16); // new encryption seeds - AsyncWrite(packet); + SendPacket(packet); } -void WorldSocket::ReadHeaderHandler() +void WorldSocket::ReadHandler() { - _authCrypt.DecryptRecv(GetHeaderBuffer(), sizeof(ClientPktHeader)); + if (!IsOpen()) + return; + + MessageBuffer& packet = GetReadBuffer(); + while (packet.GetActiveSize() > 0) + { + if (_headerBuffer.GetRemainingSpace() > 0) + { + // need to receive the header + std::size_t readHeaderSize = std::min(packet.GetActiveSize(), _headerBuffer.GetRemainingSpace()); + _headerBuffer.Write(packet.GetReadPointer(), readHeaderSize); + packet.ReadCompleted(readHeaderSize); + + if (_headerBuffer.GetRemainingSpace() > 0) + { + // Couldn't receive the whole header this time. + ASSERT(packet.GetActiveSize() == 0); + break; + } + + // We just received nice new header + if (!ReadHeaderHandler()) + return; + } + + // We have full read header, now check the data payload + if (_packetBuffer.GetRemainingSpace() > 0) + { + // need more data in the payload + std::size_t readDataSize = std::min(packet.GetActiveSize(), _packetBuffer.GetRemainingSpace()); + _packetBuffer.Write(packet.GetReadPointer(), readDataSize); + packet.ReadCompleted(readDataSize); + + if (_packetBuffer.GetRemainingSpace() > 0) + { + // Couldn't receive the whole data this time. + ASSERT(packet.GetActiveSize() == 0); + break; + } + } - ClientPktHeader* header = reinterpret_cast<ClientPktHeader*>(GetHeaderBuffer()); + // just received fresh new payload + if (!ReadDataHandler()) + return; + + _headerBuffer.Reset(); + } + + AsyncRead(); +} + +bool WorldSocket::ReadHeaderHandler() +{ + ASSERT(_headerBuffer.GetActiveSize() == sizeof(ClientPktHeader)); + + _authCrypt.DecryptRecv(_headerBuffer.GetReadPointer(), sizeof(ClientPktHeader)); + + ClientPktHeader* header = reinterpret_cast<ClientPktHeader*>(_headerBuffer.GetReadPointer()); EndianConvertReverse(header->size); EndianConvert(header->cmd); @@ -74,24 +129,26 @@ void WorldSocket::ReadHeaderHandler() } else TC_LOG_ERROR("network", "WorldSocket::ReadHeaderHandler(): client %s sent malformed packet (size: %hu, cmd: %u)", - GetRemoteIpAddress().to_string().c_str(), header->size, header->cmd); + GetRemoteIpAddress().to_string().c_str(), header->size, header->cmd); CloseSocket(); - return; + return false; } - AsyncReadData(header->size - sizeof(header->cmd)); + header->size -= sizeof(header->cmd); + _packetBuffer.Resize(header->size); + return true; } -void WorldSocket::ReadDataHandler() +bool WorldSocket::ReadDataHandler() { - ClientPktHeader* header = reinterpret_cast<ClientPktHeader*>(GetHeaderBuffer()); + ClientPktHeader* header = reinterpret_cast<ClientPktHeader*>(_headerBuffer.GetReadPointer()); uint16 opcode = uint16(header->cmd); std::string opcodeName = GetOpcodeNameForLogging(opcode); - WorldPacket packet(opcode, MoveData()); + WorldPacket packet(opcode, std::move(_packetBuffer)); if (sPacketLog->CanLogPacket()) sPacketLog->LogPacket(packet, CLIENT_TO_SERVER, GetRemoteIpAddress(), GetRemotePort()); @@ -122,7 +179,7 @@ void WorldSocket::ReadDataHandler() { TC_LOG_ERROR("network.opcode", "ProcessIncoming: Client not authed opcode = %u", uint32(opcode)); CloseSocket(); - return; + return false; } // Our Idle timer will reset on any non PING opcodes. @@ -135,10 +192,10 @@ void WorldSocket::ReadDataHandler() } } - AsyncReadHeader(); + return true; } -void WorldSocket::AsyncWrite(WorldPacket& packet) +void WorldSocket::SendPacket(WorldPacket& packet) { if (!IsOpen()) return; @@ -150,15 +207,27 @@ void WorldSocket::AsyncWrite(WorldPacket& packet) ServerPktHeader header(packet.size() + 2, packet.GetOpcode()); - std::lock_guard<std::mutex> guard(_writeLock); + std::unique_lock<std::mutex> guard(_writeLock); - bool needsWriteStart = _writeQueue.empty(); _authCrypt.EncryptSend(header.header, header.getHeaderLength()); - _writeQueue.emplace(header, packet); +#ifndef BOOST_ASIO_HAS_IOCP + if (_writeQueue.empty() && _writeBuffer.GetRemainingSpace() >= header.getHeaderLength() + packet.size()) + { + _writeBuffer.Write(header.header, header.getHeaderLength()); + if (!packet.empty()) + _writeBuffer.Write(packet.contents(), packet.size()); + } + else +#endif + { + MessageBuffer buffer(header.getHeaderLength() + packet.size()); + buffer.Write(header.header, header.getHeaderLength()); + if (!packet.empty()) + buffer.Write(packet.contents(), packet.size()); - if (needsWriteStart) - AsyncWrite(_writeQueue.front()); + QueuePacket(std::move(buffer), guard); + } } void WorldSocket::HandleAuthSession(WorldPacket& recvPacket) @@ -410,7 +479,7 @@ void WorldSocket::SendAuthResponseError(uint8 code) WorldPacket packet(SMSG_AUTH_RESPONSE, 1); packet << uint8(code); - AsyncWrite(packet); + SendPacket(packet); } void WorldSocket::HandlePing(WorldPacket& recvPacket) @@ -471,12 +540,5 @@ void WorldSocket::HandlePing(WorldPacket& recvPacket) WorldPacket packet(SMSG_PONG, 4); packet << ping; - return AsyncWrite(packet); -} - -void WorldSocket::CloseSocket() -{ - sScriptMgr->OnSocketClose(shared_from_this()); - - Socket::CloseSocket(); + return SendPacket(packet); } diff --git a/src/server/game/Server/WorldSocket.h b/src/server/game/Server/WorldSocket.h index 0667c1b090a..d301e239340 100644 --- a/src/server/game/Server/WorldSocket.h +++ b/src/server/game/Server/WorldSocket.h @@ -19,16 +19,6 @@ #ifndef __WORLDSOCKET_H__ #define __WORLDSOCKET_H__ -// Forward declare buffer function here - Socket.h must know about it -struct WorldPacketBuffer; -namespace boost -{ - namespace asio - { - WorldPacketBuffer const& buffer(WorldPacketBuffer const& buf); - } -} - #include "Common.h" #include "AuthCrypt.h" #include "ServerPktHeader.h" @@ -54,50 +44,8 @@ struct ClientPktHeader #pragma pack(pop) -struct WorldPacketBuffer +class WorldSocket : public Socket<WorldSocket> { - typedef boost::asio::const_buffer value_type; - - typedef boost::asio::const_buffer const* const_iterator; - - WorldPacketBuffer(ServerPktHeader header, WorldPacket const& packet) : _header(header), _packet(packet) - { - _buffers[0] = boost::asio::const_buffer(_header.header, _header.getHeaderLength()); - if (!_packet.empty()) - _buffers[1] = boost::asio::const_buffer(_packet.contents(), _packet.size()); - } - - const_iterator begin() const - { - return _buffers; - } - - const_iterator end() const - { - return _buffers + (_packet.empty() ? 1 : 2); - } - -private: - boost::asio::const_buffer _buffers[2]; - ServerPktHeader _header; - WorldPacket _packet; -}; - -namespace boost -{ - namespace asio - { - inline WorldPacketBuffer const& buffer(WorldPacketBuffer const& buf) - { - return buf; - } - } -} - -class WorldSocket : public Socket<WorldSocket, WorldPacketBuffer> -{ - typedef Socket<WorldSocket, WorldPacketBuffer> Base; - public: WorldSocket(tcp::socket&& socket); @@ -106,14 +54,12 @@ public: void Start() override; - void CloseSocket() override; - - using Base::AsyncWrite; - void AsyncWrite(WorldPacket& packet); + void SendPacket(WorldPacket& packet); protected: - void ReadHeaderHandler() override; - void ReadDataHandler() override; + void ReadHandler() override; + bool ReadHeaderHandler(); + bool ReadDataHandler(); private: void HandleSendAuthSession(); @@ -129,6 +75,9 @@ private: uint32 _OverSpeedPings; WorldSession* _worldSession; + + MessageBuffer _headerBuffer; + MessageBuffer _packetBuffer; }; #endif diff --git a/src/server/game/Server/WorldSocketMgr.cpp b/src/server/game/Server/WorldSocketMgr.cpp new file mode 100644 index 00000000000..21f62fa265c --- /dev/null +++ b/src/server/game/Server/WorldSocketMgr.cpp @@ -0,0 +1,115 @@ +/* + * Copyright (C) 2008-2014 TrinityCore <http://www.trinitycore.org/> + * Copyright (C) 2005-2008 MaNGOS <http://getmangos.com/> + * + * This program is free software; you can redistribute it and/or modify it + * under the terms of the GNU General Public License as published by the + * Free Software Foundation; either version 2 of the License, or (at your + * option) any later version. + * + * This program is distributed in the hope that it will be useful, but WITHOUT + * ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or + * FITNESS FOR A PARTICULAR PURPOSE. See the GNU General Public License for + * more details. + * + * You should have received a copy of the GNU General Public License along + * with this program. If not, see <http://www.gnu.org/licenses/>. + */ + +#include "Config.h" +#include "NetworkThread.h" +#include "ScriptMgr.h" +#include "WorldSocket.h" +#include "WorldSocketMgr.h" +#include <boost/system/error_code.hpp> + +static void OnSocketAccept(tcp::socket&& sock) +{ + sWorldSocketMgr.OnSocketOpen(std::forward<tcp::socket>(sock)); +} + +class WorldSocketThread : public NetworkThread<WorldSocket> +{ +public: + void SocketAdded(std::shared_ptr<WorldSocket> sock) override + { + sScriptMgr->OnSocketOpen(sock); + } + + void SocketRemoved(std::shared_ptr<WorldSocket> sock) override + { + sScriptMgr->OnSocketClose(sock); + } +}; + +WorldSocketMgr::WorldSocketMgr() : BaseSocketMgr(), _socketSendBufferSize(-1), m_SockOutUBuff(65536), _tcpNoDelay(true) +{ +} + +bool WorldSocketMgr::StartNetwork(boost::asio::io_service& service, std::string const& bindIp, uint16 port) +{ + _tcpNoDelay = sConfigMgr->GetBoolDefault("Network.TcpNodelay", true); + + TC_LOG_DEBUG("misc", "Max allowed socket connections %d", boost::asio::socket_base::max_connections); + + // -1 means use default + _socketSendBufferSize = sConfigMgr->GetIntDefault("Network.OutKBuff", -1); + + m_SockOutUBuff = sConfigMgr->GetIntDefault("Network.OutUBuff", 65536); + + if (m_SockOutUBuff <= 0) + { + TC_LOG_ERROR("misc", "Network.OutUBuff is wrong in your config file"); + return false; + } + + BaseSocketMgr::StartNetwork(service, bindIp, port); + + _acceptor->AsyncAcceptManaged(&OnSocketAccept); + + sScriptMgr->OnNetworkStart(); + return true; +} + +void WorldSocketMgr::StopNetwork() +{ + BaseSocketMgr::StopNetwork(); + + sScriptMgr->OnNetworkStop(); +} + +void WorldSocketMgr::OnSocketOpen(tcp::socket&& sock) +{ + // set some options here + if (_socketSendBufferSize >= 0) + { + boost::system::error_code err; + sock.set_option(boost::asio::socket_base::send_buffer_size(_socketSendBufferSize), err); + if (err && err != boost::system::errc::not_supported) + { + TC_LOG_ERROR("misc", "WorldSocketMgr::OnSocketOpen sock.set_option(boost::asio::socket_base::send_buffer_size) err = %s", err.message().c_str()); + return; + } + } + + // Set TCP_NODELAY. + if (_tcpNoDelay) + { + boost::system::error_code err; + sock.set_option(boost::asio::ip::tcp::no_delay(true), err); + if (err) + { + TC_LOG_ERROR("misc", "WorldSocketMgr::OnSocketOpen sock.set_option(boost::asio::ip::tcp::no_delay) err = %s", err.message().c_str()); + return; + } + } + + //sock->m_OutBufferSize = static_cast<size_t> (m_SockOutUBuff); + + BaseSocketMgr::OnSocketOpen(std::forward<tcp::socket>(sock)); +} + +NetworkThread<WorldSocket>* WorldSocketMgr::CreateThreads() const +{ + return new WorldSocketThread[GetNetworkThreadCount()]; +} diff --git a/src/server/game/Server/WorldSocketMgr.h b/src/server/game/Server/WorldSocketMgr.h new file mode 100644 index 00000000000..92a28d0c135 --- /dev/null +++ b/src/server/game/Server/WorldSocketMgr.h @@ -0,0 +1,66 @@ +/* + * Copyright (C) 2008-2013 TrinityCore <http://www.trinitycore.org/> + * Copyright (C) 2005-2009 MaNGOS <http://getmangos.com/> + * + * This program is free software; you can redistribute it and/or modify it + * under the terms of the GNU General Public License as published by the + * Free Software Foundation; either version 2 of the License, or (at your + * option) any later version. + * + * This program is distributed in the hope that it will be useful, but WITHOUT + * ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or + * FITNESS FOR A PARTICULAR PURPOSE. See the GNU General Public License for + * more details. + * + * You should have received a copy of the GNU General Public License along + * with this program. If not, see <http://www.gnu.org/licenses/>. + */ + +/** \addtogroup u2w User to World Communication + * @{ + * \file WorldSocketMgr.h + * \author Derex <derex101@gmail.com> + */ + +#ifndef __WORLDSOCKETMGR_H +#define __WORLDSOCKETMGR_H + +#include "SocketMgr.h" + +class WorldSocket; + +/// Manages all sockets connected to peers and network threads +class WorldSocketMgr : public SocketMgr<WorldSocket> +{ + typedef SocketMgr<WorldSocket> BaseSocketMgr; + +public: + static WorldSocketMgr& Instance() + { + static WorldSocketMgr instance; + return instance; + } + + /// Start network, listen at address:port . + bool StartNetwork(boost::asio::io_service& service, std::string const& bindIp, uint16 port) override; + + /// Stops all network threads, It will wait for all running threads . + void StopNetwork() override; + + void OnSocketOpen(tcp::socket&& sock) override; + +protected: + WorldSocketMgr(); + + NetworkThread<WorldSocket>* CreateThreads() const override; + +private: + int32 _socketSendBufferSize; + int32 m_SockOutUBuff; + bool _tcpNoDelay; +}; + +#define sWorldSocketMgr WorldSocketMgr::Instance() + +#endif +/// @} |
