diff options
-rw-r--r-- | src/game/WorldSocket.cpp | 131 | ||||
-rw-r--r-- | src/game/WorldSocket.h | 20 |
2 files changed, 44 insertions, 107 deletions
diff --git a/src/game/WorldSocket.cpp b/src/game/WorldSocket.cpp index c9c36f7651d..9c2c0c7c9f1 100644 --- a/src/game/WorldSocket.cpp +++ b/src/game/WorldSocket.cpp @@ -112,9 +112,6 @@ m_OverSpeedPings (0), m_LastPingTime (ACE_Time_Value::zero) { reference_counting_policy ().value (ACE_Event_Handler::Reference_Counting_Policy::ENABLED); - - msg_queue()->high_water_mark(8*1024*1024); - msg_queue()->low_water_mark(8*1024*1024); } WorldSocket::~WorldSocket (void) @@ -128,6 +125,10 @@ WorldSocket::~WorldSocket (void) closing_ = true; peer ().close (); + + WorldPacket* pct; + while (m_PacketQueue.dequeue_head (pct) == 0) + delete pct; } bool WorldSocket::IsClosed (void) const @@ -186,35 +187,18 @@ int WorldSocket::SendPacket (const WorldPacket& pct) sWorldLog.outLog ("\n"); } - ServerPktHeader header(pct.size()+2, pct.GetOpcode()); - m_Crypt.EncryptSend ((uint8*)header.header, header.getHeaderLength()); - - if (m_OutBuffer->space () >= pct.size () + header.getHeaderLength() && msg_queue()->is_empty()) - { - // Put the packet on the buffer. - if (m_OutBuffer->copy ((char*) header.header, header.getHeaderLength()) == -1) - ACE_ASSERT (false); - - if (!pct.empty ()) - if (m_OutBuffer->copy ((char*) pct.contents (), pct.size ()) == -1) - ACE_ASSERT (false); - } - else + if (iSendPacket (pct) == -1) { - // Enqueue the packet. - ACE_Message_Block* mb; - - ACE_NEW_RETURN(mb, ACE_Message_Block(pct.size () + header.getHeaderLength()), -1); - - mb->copy((char*) header.header, header.getHeaderLength()); + WorldPacket* npct; - if (!pct.empty ()) - mb->copy((const char*)pct.contents(), pct.size ()); + ACE_NEW_RETURN (npct, WorldPacket (pct), -1); - if(msg_queue()->enqueue_tail(mb,(ACE_Time_Value*)&ACE_Time_Value::zero) == -1) + // NOTE maybe check of the size of the queue can be good ? + // to make it bounded instead of unbounded + if (m_PacketQueue.enqueue_tail (npct) == -1) { - sLog.outError("WorldSocket::SendPacket enqueue_tail"); - mb->release(); + delete npct; + sLog.outError ("WorldSocket::SendPacket: m_PacketQueue.enqueue_tail failed"); return -1; } } @@ -339,7 +323,7 @@ int WorldSocket::handle_output (ACE_HANDLE) const size_t send_len = m_OutBuffer->length (); if (send_len == 0) - return handle_output_queue (Guard); + return cancel_wakeup_output (Guard); #ifdef MSG_NOSIGNAL ssize_t n = peer ().send (m_OutBuffer->rd_ptr (), send_len, MSG_NOSIGNAL); @@ -369,73 +353,15 @@ int WorldSocket::handle_output (ACE_HANDLE) { m_OutBuffer->reset (); - return handle_output_queue (Guard); + if (!iFlushPacketQueue ()) + return cancel_wakeup_output (Guard); + else + return schedule_wakeup_output (Guard); } ACE_NOTREACHED (return 0); } -int WorldSocket::handle_output_queue (GuardType& g) -{ - if(msg_queue()->is_empty()) - return cancel_wakeup_output(g); - - ACE_Message_Block *mblk; - - if(msg_queue()->dequeue_head(mblk, (ACE_Time_Value*)&ACE_Time_Value::zero) == -1) - { - sLog.outError("WorldSocket::handle_output_queue dequeue_head"); - return -1; - } - - const size_t send_len = mblk->length (); - -#ifdef MSG_NOSIGNAL - ssize_t n = peer ().send (mblk->rd_ptr (), send_len, MSG_NOSIGNAL); -#else - ssize_t n = peer ().send (mblk->rd_ptr (), send_len); -#endif // MSG_NOSIGNAL - - if (n == 0) - { - mblk->release(); - - return -1; - } - else if (n == -1) - { - if (errno == EWOULDBLOCK || errno == EAGAIN) - { - msg_queue()->enqueue_head(mblk, (ACE_Time_Value*) &ACE_Time_Value::zero); - return schedule_wakeup_output (g); - } - - mblk->release(); - return -1; - } - else if (n < send_len) //now n > 0 - { - mblk->rd_ptr (static_cast<size_t> (n)); - - if (msg_queue()->enqueue_head(mblk, (ACE_Time_Value*) &ACE_Time_Value::zero) == -1) - { - sLog.outError("WorldSocket::handle_output_queue enqueue_head"); - mblk->release(); - return -1; - } - - return schedule_wakeup_output (g); - } - else //now n == send_len - { - mblk->release(); - - return msg_queue()->is_empty() ? cancel_wakeup_output(g) : ACE_Event_Handler::WRITE_MASK; - } - - ACE_NOTREACHED(return -1); -} - int WorldSocket::handle_close (ACE_HANDLE h, ACE_Reactor_Mask) { // Critical section @@ -463,15 +389,10 @@ int WorldSocket::Update (void) if (closing_) return -1; - if (m_OutActive || (m_OutBuffer->length () == 0 && msg_queue()->is_empty())) + if (m_OutActive || m_OutBuffer->length () == 0) return 0; - int ret; - do - ret = handle_output (get_handle ()); - while( ret > 0 ); - - return ret; + return handle_output (get_handle ()); } int WorldSocket::handle_input_header (void) @@ -487,7 +408,8 @@ int WorldSocket::handle_input_header (void) EndianConvertReverse(header.size); EndianConvert(header.cmd); - if ((header.size < 4) || (header.size > 10240) || (header.cmd > 10240)) + if ((header.size < 4) || (header.size > 10240) || + (header.cmd < 0) || (header.cmd > 10240) ) { sLog.outError ("WorldSocket::handle_input_header: client sent malformed packet size = %d , cmd = %d", header.size, header.cmd); @@ -693,6 +615,7 @@ int WorldSocket::ProcessIncoming (WorldPacket* new_pct) sWorldLog.outLog ("\n"); } + sWorldLog.outLog ("\n"); } // like one switch ;) @@ -1081,7 +1004,7 @@ int WorldSocket::HandlePing (WorldPacket& recvPacket) return SendPacket (packet); } -/*int WorldSocket::iSendPacket (const WorldPacket& pct) +int WorldSocket::iSendPacket (const WorldPacket& pct) { ServerPktHeader header(pct.size()+2, pct.GetOpcode()); if (m_OutBuffer->space () < pct.size () + header.getHeaderLength()) @@ -1089,10 +1012,10 @@ int WorldSocket::HandlePing (WorldPacket& recvPacket) errno = ENOBUFS; return -1; } - - + + m_Crypt.EncryptSend ( header.header, header.getHeaderLength()); - + if (m_OutBuffer->copy ((char*) header.header, header.getHeaderLength()) == -1) ACE_ASSERT (false); @@ -1129,5 +1052,5 @@ bool WorldSocket::iFlushPacketQueue () } return haveone; -}*/ +} diff --git a/src/game/WorldSocket.h b/src/game/WorldSocket.h index 67e15adf148..94f57d8d636 100644 --- a/src/game/WorldSocket.h +++ b/src/game/WorldSocket.h @@ -103,6 +103,9 @@ class WorldSocket : protected WorldHandler typedef ACE_Thread_Mutex LockType; typedef ACE_Guard<LockType> GuardType; + /// Queue for storing packets for which there is no space. + typedef ACE_Unbounded_Queue< WorldPacket* > PacketQueueT; + /// Check if socket is closed. bool IsClosed (void) const; @@ -158,9 +161,6 @@ class WorldSocket : protected WorldHandler int cancel_wakeup_output (GuardType& g); int schedule_wakeup_output (GuardType& g); - /// Drain the queue if its not empty. - int handle_output_queue (GuardType& g); - /// process one incoming packet. /// @param new_pct received packet ,note that you need to delete it. int ProcessIncoming (WorldPacket* new_pct); @@ -171,6 +171,16 @@ class WorldSocket : protected WorldHandler /// Called by ProcessIncoming() on CMSG_PING. int HandlePing (WorldPacket& recvPacket); + /// Try to write WorldPacket to m_OutBuffer ,return -1 if no space + /// Need to be called with m_OutBufferLock lock held + int iSendPacket (const WorldPacket& pct); + + /// Flush m_PacketQueue if there are packets in it + /// Need to be called with m_OutBufferLock lock held + /// @return true if it wrote to the buffer ( AKA you need + /// to mark the socket for output ). + bool iFlushPacketQueue (); + private: /// Time in which the last ping was received ACE_Time_Value m_LastPingTime; @@ -210,6 +220,10 @@ class WorldSocket : protected WorldHandler /// Size of the m_OutBuffer. size_t m_OutBufferSize; + /// Here are stored packets for which there was no space on m_OutBuffer, + /// this allows not-to kick player if its buffer is overflowed. + PacketQueueT m_PacketQueue; + /// True if the socket is registered with the reactor for output bool m_OutActive; |