diff options
author | megamage <none@none> | 2009-01-04 16:37:14 -0600 |
---|---|---|
committer | megamage <none@none> | 2009-01-04 16:37:14 -0600 |
commit | 2d319dd5b41c12f50250c006cca66f0316b90dc2 (patch) | |
tree | 4f8a547fc3a2a567dae63a61bf8b020e8a8126fe /src/game/WorldSocket.cpp | |
parent | eb5a7b02eef6fe13684dfe14faf048bbce1d0f83 (diff) |
*Mangos: replaced dynamic by static allocation in server packets header. Added command to test large packets. By arrai.
*Mangos: [7022] Added support for packets > 64 kb. By derex.
--HG--
branch : trunk
Diffstat (limited to 'src/game/WorldSocket.cpp')
-rw-r--r-- | src/game/WorldSocket.cpp | 130 |
1 files changed, 99 insertions, 31 deletions
diff --git a/src/game/WorldSocket.cpp b/src/game/WorldSocket.cpp index 1f77e1111e5..1f2835e5663 100644 --- a/src/game/WorldSocket.cpp +++ b/src/game/WorldSocket.cpp @@ -63,14 +63,8 @@ struct ServerPktHeader if(isLargePacket()) { sLog.outDebug("initializing large server to client packet. Size: %u, cmd: %u", size, cmd); - header= new uint8[5]; header[headerIndex++] = 0x80|(0xFF &(size>>16)); } - else - { - header= new uint8[4]; - } - header[headerIndex++] = 0xFF &(size>>8); header[headerIndex++] = 0xFF &size; @@ -78,11 +72,6 @@ struct ServerPktHeader header[headerIndex++] = 0xFF & (cmd>>8); } - ~ServerPktHeader() - { - delete[] header; - } - uint8 getHeaderLength() { // cmd = 2 bytes, size= 2||3bytes @@ -95,7 +84,7 @@ struct ServerPktHeader } const uint32 size; - uint8 *header; + uint8 header[5]; }; struct ClientPktHeader @@ -124,6 +113,9 @@ m_OverSpeedPings (0), m_LastPingTime (ACE_Time_Value::zero) { reference_counting_policy ().value (ACE_Event_Handler::Reference_Counting_Policy::ENABLED); + + msg_queue()->high_water_mark(8*1024*1024); + msg_queue()->low_water_mark(8*1024*1024); } WorldSocket::~WorldSocket (void) @@ -137,10 +129,6 @@ WorldSocket::~WorldSocket (void) closing_ = true; peer ().close (); - - WorldPacket* pct; - while (m_PacketQueue.dequeue_head (pct) == 0) - delete pct; } bool WorldSocket::IsClosed (void) const @@ -200,18 +188,35 @@ int WorldSocket::SendPacket (const WorldPacket& pct) sWorldLog.Log ("\n\n"); } - if (iSendPacket (pct) == -1) + ServerPktHeader header(pct.size()+2, pct.GetOpcode()); + m_Crypt.EncryptSend ( header.header, header.getHeaderLength()); + + if (m_OutBuffer->space () >= pct.size () + header.getHeaderLength() && msg_queue()->is_empty()) { - WorldPacket* npct; + // Put the packet on the buffer. + if (m_OutBuffer->copy ((char*) header.header, header.getHeaderLength()) == -1) + ACE_ASSERT (false); - ACE_NEW_RETURN (npct, WorldPacket (pct), -1); + if (!pct.empty ()) + if (m_OutBuffer->copy ((char*) pct.contents (), pct.size ()) == -1) + ACE_ASSERT (false); + } + else + { + // Enqueue the packet. + ACE_Message_Block* mb; - // NOTE maybe check of the size of the queue can be good ? - // to make it bounded instead of unbounded - if (m_PacketQueue.enqueue_tail (npct) == -1) + ACE_NEW_RETURN(mb, ACE_Message_Block(pct.size () + header.getHeaderLength()), -1); + + mb->copy((char*) header.header, header.getHeaderLength()); + + if (!pct.empty ()) + mb->copy((const char*)pct.contents(), pct.size ()); + + if(msg_queue()->enqueue_tail(mb,(ACE_Time_Value*)&ACE_Time_Value::zero) == -1) { - delete npct; - sLog.outError ("WorldSocket::SendPacket: m_PacketQueue.enqueue_tail failed"); + sLog.outError("WorldSocket::SendPacket enqueue_tail"); + mb->release(); return -1; } } @@ -336,7 +341,7 @@ int WorldSocket::handle_output (ACE_HANDLE) const size_t send_len = m_OutBuffer->length (); if (send_len == 0) - return cancel_wakeup_output (Guard); + return handle_output_queue (Guard); #ifdef MSG_NOSIGNAL ssize_t n = peer ().send (m_OutBuffer->rd_ptr (), send_len, MSG_NOSIGNAL); @@ -366,15 +371,73 @@ int WorldSocket::handle_output (ACE_HANDLE) { m_OutBuffer->reset (); - if (!iFlushPacketQueue ()) - return cancel_wakeup_output (Guard); - else - return schedule_wakeup_output (Guard); + return handle_output_queue (Guard); } ACE_NOTREACHED (return 0); } +int WorldSocket::handle_output_queue (GuardType& g) +{ + if(msg_queue()->is_empty()) + return cancel_wakeup_output(g); + + ACE_Message_Block *mblk; + + if(msg_queue()->dequeue_head(mblk, (ACE_Time_Value*)&ACE_Time_Value::zero) == -1) + { + sLog.outError("WorldSocket::handle_output_queue dequeue_head"); + return -1; + } + + const size_t send_len = mblk->length (); + +#ifdef MSG_NOSIGNAL + ssize_t n = peer ().send (mblk->rd_ptr (), send_len, MSG_NOSIGNAL); +#else + ssize_t n = peer ().send (mblk->rd_ptr (), send_len); +#endif // MSG_NOSIGNAL + + if (n == 0) + { + mblk->release(); + + return -1; + } + else if (n == -1) + { + if (errno == EWOULDBLOCK || errno == EAGAIN) + { + msg_queue()->enqueue_head(mblk, (ACE_Time_Value*) &ACE_Time_Value::zero); + return schedule_wakeup_output (g); + } + + mblk->release(); + return -1; + } + else if (n < send_len) //now n > 0 + { + mblk->rd_ptr (static_cast<size_t> (n)); + + if (msg_queue()->enqueue_head(mblk, (ACE_Time_Value*) &ACE_Time_Value::zero) == -1) + { + sLog.outError("WorldSocket::handle_output_queue enqueue_head"); + mblk->release(); + return -1; + } + + return schedule_wakeup_output (g); + } + else //now n == send_len + { + mblk->release(); + + return msg_queue()->is_empty() ? cancel_wakeup_output(g) : ACE_Event_Handler::WRITE_MASK; + } + + ACE_NOTREACHED(return -1); +} + int WorldSocket::handle_close (ACE_HANDLE h, ACE_Reactor_Mask) { // Critical section @@ -402,10 +465,15 @@ int WorldSocket::Update (void) if (closing_) return -1; - if (m_OutActive || m_OutBuffer->length () == 0) + if (m_OutActive || (m_OutBuffer->length () == 0 && msg_queue()->is_empty())) return 0; - return handle_output (get_handle ()); + int ret; + do + ret = handle_output (get_handle ()); + while( ret > 0 ); + + return ret; } int WorldSocket::handle_input_header (void) |