aboutsummaryrefslogtreecommitdiff
diff options
context:
space:
mode:
-rw-r--r--src/game/WorldSocket.cpp131
-rw-r--r--src/game/WorldSocket.h20
2 files changed, 44 insertions, 107 deletions
diff --git a/src/game/WorldSocket.cpp b/src/game/WorldSocket.cpp
index c9c36f7651d..9c2c0c7c9f1 100644
--- a/src/game/WorldSocket.cpp
+++ b/src/game/WorldSocket.cpp
@@ -112,9 +112,6 @@ m_OverSpeedPings (0),
m_LastPingTime (ACE_Time_Value::zero)
{
reference_counting_policy ().value (ACE_Event_Handler::Reference_Counting_Policy::ENABLED);
-
- msg_queue()->high_water_mark(8*1024*1024);
- msg_queue()->low_water_mark(8*1024*1024);
}
WorldSocket::~WorldSocket (void)
@@ -128,6 +125,10 @@ WorldSocket::~WorldSocket (void)
closing_ = true;
peer ().close ();
+
+ WorldPacket* pct;
+ while (m_PacketQueue.dequeue_head (pct) == 0)
+ delete pct;
}
bool WorldSocket::IsClosed (void) const
@@ -186,35 +187,18 @@ int WorldSocket::SendPacket (const WorldPacket& pct)
sWorldLog.outLog ("\n");
}
- ServerPktHeader header(pct.size()+2, pct.GetOpcode());
- m_Crypt.EncryptSend ((uint8*)header.header, header.getHeaderLength());
-
- if (m_OutBuffer->space () >= pct.size () + header.getHeaderLength() && msg_queue()->is_empty())
- {
- // Put the packet on the buffer.
- if (m_OutBuffer->copy ((char*) header.header, header.getHeaderLength()) == -1)
- ACE_ASSERT (false);
-
- if (!pct.empty ())
- if (m_OutBuffer->copy ((char*) pct.contents (), pct.size ()) == -1)
- ACE_ASSERT (false);
- }
- else
+ if (iSendPacket (pct) == -1)
{
- // Enqueue the packet.
- ACE_Message_Block* mb;
-
- ACE_NEW_RETURN(mb, ACE_Message_Block(pct.size () + header.getHeaderLength()), -1);
-
- mb->copy((char*) header.header, header.getHeaderLength());
+ WorldPacket* npct;
- if (!pct.empty ())
- mb->copy((const char*)pct.contents(), pct.size ());
+ ACE_NEW_RETURN (npct, WorldPacket (pct), -1);
- if(msg_queue()->enqueue_tail(mb,(ACE_Time_Value*)&ACE_Time_Value::zero) == -1)
+ // NOTE maybe check of the size of the queue can be good ?
+ // to make it bounded instead of unbounded
+ if (m_PacketQueue.enqueue_tail (npct) == -1)
{
- sLog.outError("WorldSocket::SendPacket enqueue_tail");
- mb->release();
+ delete npct;
+ sLog.outError ("WorldSocket::SendPacket: m_PacketQueue.enqueue_tail failed");
return -1;
}
}
@@ -339,7 +323,7 @@ int WorldSocket::handle_output (ACE_HANDLE)
const size_t send_len = m_OutBuffer->length ();
if (send_len == 0)
- return handle_output_queue (Guard);
+ return cancel_wakeup_output (Guard);
#ifdef MSG_NOSIGNAL
ssize_t n = peer ().send (m_OutBuffer->rd_ptr (), send_len, MSG_NOSIGNAL);
@@ -369,73 +353,15 @@ int WorldSocket::handle_output (ACE_HANDLE)
{
m_OutBuffer->reset ();
- return handle_output_queue (Guard);
+ if (!iFlushPacketQueue ())
+ return cancel_wakeup_output (Guard);
+ else
+ return schedule_wakeup_output (Guard);
}
ACE_NOTREACHED (return 0);
}
-int WorldSocket::handle_output_queue (GuardType& g)
-{
- if(msg_queue()->is_empty())
- return cancel_wakeup_output(g);
-
- ACE_Message_Block *mblk;
-
- if(msg_queue()->dequeue_head(mblk, (ACE_Time_Value*)&ACE_Time_Value::zero) == -1)
- {
- sLog.outError("WorldSocket::handle_output_queue dequeue_head");
- return -1;
- }
-
- const size_t send_len = mblk->length ();
-
-#ifdef MSG_NOSIGNAL
- ssize_t n = peer ().send (mblk->rd_ptr (), send_len, MSG_NOSIGNAL);
-#else
- ssize_t n = peer ().send (mblk->rd_ptr (), send_len);
-#endif // MSG_NOSIGNAL
-
- if (n == 0)
- {
- mblk->release();
-
- return -1;
- }
- else if (n == -1)
- {
- if (errno == EWOULDBLOCK || errno == EAGAIN)
- {
- msg_queue()->enqueue_head(mblk, (ACE_Time_Value*) &ACE_Time_Value::zero);
- return schedule_wakeup_output (g);
- }
-
- mblk->release();
- return -1;
- }
- else if (n < send_len) //now n > 0
- {
- mblk->rd_ptr (static_cast<size_t> (n));
-
- if (msg_queue()->enqueue_head(mblk, (ACE_Time_Value*) &ACE_Time_Value::zero) == -1)
- {
- sLog.outError("WorldSocket::handle_output_queue enqueue_head");
- mblk->release();
- return -1;
- }
-
- return schedule_wakeup_output (g);
- }
- else //now n == send_len
- {
- mblk->release();
-
- return msg_queue()->is_empty() ? cancel_wakeup_output(g) : ACE_Event_Handler::WRITE_MASK;
- }
-
- ACE_NOTREACHED(return -1);
-}
-
int WorldSocket::handle_close (ACE_HANDLE h, ACE_Reactor_Mask)
{
// Critical section
@@ -463,15 +389,10 @@ int WorldSocket::Update (void)
if (closing_)
return -1;
- if (m_OutActive || (m_OutBuffer->length () == 0 && msg_queue()->is_empty()))
+ if (m_OutActive || m_OutBuffer->length () == 0)
return 0;
- int ret;
- do
- ret = handle_output (get_handle ());
- while( ret > 0 );
-
- return ret;
+ return handle_output (get_handle ());
}
int WorldSocket::handle_input_header (void)
@@ -487,7 +408,8 @@ int WorldSocket::handle_input_header (void)
EndianConvertReverse(header.size);
EndianConvert(header.cmd);
- if ((header.size < 4) || (header.size > 10240) || (header.cmd > 10240))
+ if ((header.size < 4) || (header.size > 10240) ||
+ (header.cmd < 0) || (header.cmd > 10240) )
{
sLog.outError ("WorldSocket::handle_input_header: client sent malformed packet size = %d , cmd = %d",
header.size, header.cmd);
@@ -693,6 +615,7 @@ int WorldSocket::ProcessIncoming (WorldPacket* new_pct)
sWorldLog.outLog ("\n");
}
+ sWorldLog.outLog ("\n");
}
// like one switch ;)
@@ -1081,7 +1004,7 @@ int WorldSocket::HandlePing (WorldPacket& recvPacket)
return SendPacket (packet);
}
-/*int WorldSocket::iSendPacket (const WorldPacket& pct)
+int WorldSocket::iSendPacket (const WorldPacket& pct)
{
ServerPktHeader header(pct.size()+2, pct.GetOpcode());
if (m_OutBuffer->space () < pct.size () + header.getHeaderLength())
@@ -1089,10 +1012,10 @@ int WorldSocket::HandlePing (WorldPacket& recvPacket)
errno = ENOBUFS;
return -1;
}
-
-
+
+
m_Crypt.EncryptSend ( header.header, header.getHeaderLength());
-
+
if (m_OutBuffer->copy ((char*) header.header, header.getHeaderLength()) == -1)
ACE_ASSERT (false);
@@ -1129,5 +1052,5 @@ bool WorldSocket::iFlushPacketQueue ()
}
return haveone;
-}*/
+}
diff --git a/src/game/WorldSocket.h b/src/game/WorldSocket.h
index 67e15adf148..94f57d8d636 100644
--- a/src/game/WorldSocket.h
+++ b/src/game/WorldSocket.h
@@ -103,6 +103,9 @@ class WorldSocket : protected WorldHandler
typedef ACE_Thread_Mutex LockType;
typedef ACE_Guard<LockType> GuardType;
+ /// Queue for storing packets for which there is no space.
+ typedef ACE_Unbounded_Queue< WorldPacket* > PacketQueueT;
+
/// Check if socket is closed.
bool IsClosed (void) const;
@@ -158,9 +161,6 @@ class WorldSocket : protected WorldHandler
int cancel_wakeup_output (GuardType& g);
int schedule_wakeup_output (GuardType& g);
- /// Drain the queue if its not empty.
- int handle_output_queue (GuardType& g);
-
/// process one incoming packet.
/// @param new_pct received packet ,note that you need to delete it.
int ProcessIncoming (WorldPacket* new_pct);
@@ -171,6 +171,16 @@ class WorldSocket : protected WorldHandler
/// Called by ProcessIncoming() on CMSG_PING.
int HandlePing (WorldPacket& recvPacket);
+ /// Try to write WorldPacket to m_OutBuffer ,return -1 if no space
+ /// Need to be called with m_OutBufferLock lock held
+ int iSendPacket (const WorldPacket& pct);
+
+ /// Flush m_PacketQueue if there are packets in it
+ /// Need to be called with m_OutBufferLock lock held
+ /// @return true if it wrote to the buffer ( AKA you need
+ /// to mark the socket for output ).
+ bool iFlushPacketQueue ();
+
private:
/// Time in which the last ping was received
ACE_Time_Value m_LastPingTime;
@@ -210,6 +220,10 @@ class WorldSocket : protected WorldHandler
/// Size of the m_OutBuffer.
size_t m_OutBufferSize;
+ /// Here are stored packets for which there was no space on m_OutBuffer,
+ /// this allows not-to kick player if its buffer is overflowed.
+ PacketQueueT m_PacketQueue;
+
/// True if the socket is registered with the reactor for output
bool m_OutActive;