diff options
Diffstat (limited to 'src/game/WorldSocket.cpp')
-rw-r--r-- | src/game/WorldSocket.cpp | 131 |
1 files changed, 27 insertions, 104 deletions
diff --git a/src/game/WorldSocket.cpp b/src/game/WorldSocket.cpp index c9c36f7651d..9c2c0c7c9f1 100644 --- a/src/game/WorldSocket.cpp +++ b/src/game/WorldSocket.cpp @@ -112,9 +112,6 @@ m_OverSpeedPings (0), m_LastPingTime (ACE_Time_Value::zero) { reference_counting_policy ().value (ACE_Event_Handler::Reference_Counting_Policy::ENABLED); - - msg_queue()->high_water_mark(8*1024*1024); - msg_queue()->low_water_mark(8*1024*1024); } WorldSocket::~WorldSocket (void) @@ -128,6 +125,10 @@ WorldSocket::~WorldSocket (void) closing_ = true; peer ().close (); + + WorldPacket* pct; + while (m_PacketQueue.dequeue_head (pct) == 0) + delete pct; } bool WorldSocket::IsClosed (void) const @@ -186,35 +187,18 @@ int WorldSocket::SendPacket (const WorldPacket& pct) sWorldLog.outLog ("\n"); } - ServerPktHeader header(pct.size()+2, pct.GetOpcode()); - m_Crypt.EncryptSend ((uint8*)header.header, header.getHeaderLength()); - - if (m_OutBuffer->space () >= pct.size () + header.getHeaderLength() && msg_queue()->is_empty()) - { - // Put the packet on the buffer. - if (m_OutBuffer->copy ((char*) header.header, header.getHeaderLength()) == -1) - ACE_ASSERT (false); - - if (!pct.empty ()) - if (m_OutBuffer->copy ((char*) pct.contents (), pct.size ()) == -1) - ACE_ASSERT (false); - } - else + if (iSendPacket (pct) == -1) { - // Enqueue the packet. - ACE_Message_Block* mb; - - ACE_NEW_RETURN(mb, ACE_Message_Block(pct.size () + header.getHeaderLength()), -1); - - mb->copy((char*) header.header, header.getHeaderLength()); + WorldPacket* npct; - if (!pct.empty ()) - mb->copy((const char*)pct.contents(), pct.size ()); + ACE_NEW_RETURN (npct, WorldPacket (pct), -1); - if(msg_queue()->enqueue_tail(mb,(ACE_Time_Value*)&ACE_Time_Value::zero) == -1) + // NOTE maybe check of the size of the queue can be good ? + // to make it bounded instead of unbounded + if (m_PacketQueue.enqueue_tail (npct) == -1) { - sLog.outError("WorldSocket::SendPacket enqueue_tail"); - mb->release(); + delete npct; + sLog.outError ("WorldSocket::SendPacket: m_PacketQueue.enqueue_tail failed"); return -1; } } @@ -339,7 +323,7 @@ int WorldSocket::handle_output (ACE_HANDLE) const size_t send_len = m_OutBuffer->length (); if (send_len == 0) - return handle_output_queue (Guard); + return cancel_wakeup_output (Guard); #ifdef MSG_NOSIGNAL ssize_t n = peer ().send (m_OutBuffer->rd_ptr (), send_len, MSG_NOSIGNAL); @@ -369,73 +353,15 @@ int WorldSocket::handle_output (ACE_HANDLE) { m_OutBuffer->reset (); - return handle_output_queue (Guard); + if (!iFlushPacketQueue ()) + return cancel_wakeup_output (Guard); + else + return schedule_wakeup_output (Guard); } ACE_NOTREACHED (return 0); } -int WorldSocket::handle_output_queue (GuardType& g) -{ - if(msg_queue()->is_empty()) - return cancel_wakeup_output(g); - - ACE_Message_Block *mblk; - - if(msg_queue()->dequeue_head(mblk, (ACE_Time_Value*)&ACE_Time_Value::zero) == -1) - { - sLog.outError("WorldSocket::handle_output_queue dequeue_head"); - return -1; - } - - const size_t send_len = mblk->length (); - -#ifdef MSG_NOSIGNAL - ssize_t n = peer ().send (mblk->rd_ptr (), send_len, MSG_NOSIGNAL); -#else - ssize_t n = peer ().send (mblk->rd_ptr (), send_len); -#endif // MSG_NOSIGNAL - - if (n == 0) - { - mblk->release(); - - return -1; - } - else if (n == -1) - { - if (errno == EWOULDBLOCK || errno == EAGAIN) - { - msg_queue()->enqueue_head(mblk, (ACE_Time_Value*) &ACE_Time_Value::zero); - return schedule_wakeup_output (g); - } - - mblk->release(); - return -1; - } - else if (n < send_len) //now n > 0 - { - mblk->rd_ptr (static_cast<size_t> (n)); - - if (msg_queue()->enqueue_head(mblk, (ACE_Time_Value*) &ACE_Time_Value::zero) == -1) - { - sLog.outError("WorldSocket::handle_output_queue enqueue_head"); - mblk->release(); - return -1; - } - - return schedule_wakeup_output (g); - } - else //now n == send_len - { - mblk->release(); - - return msg_queue()->is_empty() ? cancel_wakeup_output(g) : ACE_Event_Handler::WRITE_MASK; - } - - ACE_NOTREACHED(return -1); -} - int WorldSocket::handle_close (ACE_HANDLE h, ACE_Reactor_Mask) { // Critical section @@ -463,15 +389,10 @@ int WorldSocket::Update (void) if (closing_) return -1; - if (m_OutActive || (m_OutBuffer->length () == 0 && msg_queue()->is_empty())) + if (m_OutActive || m_OutBuffer->length () == 0) return 0; - int ret; - do - ret = handle_output (get_handle ()); - while( ret > 0 ); - - return ret; + return handle_output (get_handle ()); } int WorldSocket::handle_input_header (void) @@ -487,7 +408,8 @@ int WorldSocket::handle_input_header (void) EndianConvertReverse(header.size); EndianConvert(header.cmd); - if ((header.size < 4) || (header.size > 10240) || (header.cmd > 10240)) + if ((header.size < 4) || (header.size > 10240) || + (header.cmd < 0) || (header.cmd > 10240) ) { sLog.outError ("WorldSocket::handle_input_header: client sent malformed packet size = %d , cmd = %d", header.size, header.cmd); @@ -693,6 +615,7 @@ int WorldSocket::ProcessIncoming (WorldPacket* new_pct) sWorldLog.outLog ("\n"); } + sWorldLog.outLog ("\n"); } // like one switch ;) @@ -1081,7 +1004,7 @@ int WorldSocket::HandlePing (WorldPacket& recvPacket) return SendPacket (packet); } -/*int WorldSocket::iSendPacket (const WorldPacket& pct) +int WorldSocket::iSendPacket (const WorldPacket& pct) { ServerPktHeader header(pct.size()+2, pct.GetOpcode()); if (m_OutBuffer->space () < pct.size () + header.getHeaderLength()) @@ -1089,10 +1012,10 @@ int WorldSocket::HandlePing (WorldPacket& recvPacket) errno = ENOBUFS; return -1; } - - + + m_Crypt.EncryptSend ( header.header, header.getHeaderLength()); - + if (m_OutBuffer->copy ((char*) header.header, header.getHeaderLength()) == -1) ACE_ASSERT (false); @@ -1129,5 +1052,5 @@ bool WorldSocket::iFlushPacketQueue () } return haveone; -}*/ +} |