aboutsummaryrefslogtreecommitdiff
path: root/src/game/WorldSocket.cpp
diff options
context:
space:
mode:
Diffstat (limited to 'src/game/WorldSocket.cpp')
-rw-r--r--src/game/WorldSocket.cpp130
1 files changed, 99 insertions, 31 deletions
diff --git a/src/game/WorldSocket.cpp b/src/game/WorldSocket.cpp
index 1f77e1111e5..1f2835e5663 100644
--- a/src/game/WorldSocket.cpp
+++ b/src/game/WorldSocket.cpp
@@ -63,14 +63,8 @@ struct ServerPktHeader
if(isLargePacket())
{
sLog.outDebug("initializing large server to client packet. Size: %u, cmd: %u", size, cmd);
- header= new uint8[5];
header[headerIndex++] = 0x80|(0xFF &(size>>16));
}
- else
- {
- header= new uint8[4];
- }
-
header[headerIndex++] = 0xFF &(size>>8);
header[headerIndex++] = 0xFF &size;
@@ -78,11 +72,6 @@ struct ServerPktHeader
header[headerIndex++] = 0xFF & (cmd>>8);
}
- ~ServerPktHeader()
- {
- delete[] header;
- }
-
uint8 getHeaderLength()
{
// cmd = 2 bytes, size= 2||3bytes
@@ -95,7 +84,7 @@ struct ServerPktHeader
}
const uint32 size;
- uint8 *header;
+ uint8 header[5];
};
struct ClientPktHeader
@@ -124,6 +113,9 @@ m_OverSpeedPings (0),
m_LastPingTime (ACE_Time_Value::zero)
{
reference_counting_policy ().value (ACE_Event_Handler::Reference_Counting_Policy::ENABLED);
+
+ msg_queue()->high_water_mark(8*1024*1024);
+ msg_queue()->low_water_mark(8*1024*1024);
}
WorldSocket::~WorldSocket (void)
@@ -137,10 +129,6 @@ WorldSocket::~WorldSocket (void)
closing_ = true;
peer ().close ();
-
- WorldPacket* pct;
- while (m_PacketQueue.dequeue_head (pct) == 0)
- delete pct;
}
bool WorldSocket::IsClosed (void) const
@@ -200,18 +188,35 @@ int WorldSocket::SendPacket (const WorldPacket& pct)
sWorldLog.Log ("\n\n");
}
- if (iSendPacket (pct) == -1)
+ ServerPktHeader header(pct.size()+2, pct.GetOpcode());
+ m_Crypt.EncryptSend ( header.header, header.getHeaderLength());
+
+ if (m_OutBuffer->space () >= pct.size () + header.getHeaderLength() && msg_queue()->is_empty())
{
- WorldPacket* npct;
+ // Put the packet on the buffer.
+ if (m_OutBuffer->copy ((char*) header.header, header.getHeaderLength()) == -1)
+ ACE_ASSERT (false);
- ACE_NEW_RETURN (npct, WorldPacket (pct), -1);
+ if (!pct.empty ())
+ if (m_OutBuffer->copy ((char*) pct.contents (), pct.size ()) == -1)
+ ACE_ASSERT (false);
+ }
+ else
+ {
+ // Enqueue the packet.
+ ACE_Message_Block* mb;
- // NOTE maybe check of the size of the queue can be good ?
- // to make it bounded instead of unbounded
- if (m_PacketQueue.enqueue_tail (npct) == -1)
+ ACE_NEW_RETURN(mb, ACE_Message_Block(pct.size () + header.getHeaderLength()), -1);
+
+ mb->copy((char*) header.header, header.getHeaderLength());
+
+ if (!pct.empty ())
+ mb->copy((const char*)pct.contents(), pct.size ());
+
+ if(msg_queue()->enqueue_tail(mb,(ACE_Time_Value*)&ACE_Time_Value::zero) == -1)
{
- delete npct;
- sLog.outError ("WorldSocket::SendPacket: m_PacketQueue.enqueue_tail failed");
+ sLog.outError("WorldSocket::SendPacket enqueue_tail");
+ mb->release();
return -1;
}
}
@@ -336,7 +341,7 @@ int WorldSocket::handle_output (ACE_HANDLE)
const size_t send_len = m_OutBuffer->length ();
if (send_len == 0)
- return cancel_wakeup_output (Guard);
+ return handle_output_queue (Guard);
#ifdef MSG_NOSIGNAL
ssize_t n = peer ().send (m_OutBuffer->rd_ptr (), send_len, MSG_NOSIGNAL);
@@ -366,15 +371,73 @@ int WorldSocket::handle_output (ACE_HANDLE)
{
m_OutBuffer->reset ();
- if (!iFlushPacketQueue ())
- return cancel_wakeup_output (Guard);
- else
- return schedule_wakeup_output (Guard);
+ return handle_output_queue (Guard);
}
ACE_NOTREACHED (return 0);
}
+int WorldSocket::handle_output_queue (GuardType& g)
+{
+ if(msg_queue()->is_empty())
+ return cancel_wakeup_output(g);
+
+ ACE_Message_Block *mblk;
+
+ if(msg_queue()->dequeue_head(mblk, (ACE_Time_Value*)&ACE_Time_Value::zero) == -1)
+ {
+ sLog.outError("WorldSocket::handle_output_queue dequeue_head");
+ return -1;
+ }
+
+ const size_t send_len = mblk->length ();
+
+#ifdef MSG_NOSIGNAL
+ ssize_t n = peer ().send (mblk->rd_ptr (), send_len, MSG_NOSIGNAL);
+#else
+ ssize_t n = peer ().send (mblk->rd_ptr (), send_len);
+#endif // MSG_NOSIGNAL
+
+ if (n == 0)
+ {
+ mblk->release();
+
+ return -1;
+ }
+ else if (n == -1)
+ {
+ if (errno == EWOULDBLOCK || errno == EAGAIN)
+ {
+ msg_queue()->enqueue_head(mblk, (ACE_Time_Value*) &ACE_Time_Value::zero);
+ return schedule_wakeup_output (g);
+ }
+
+ mblk->release();
+ return -1;
+ }
+ else if (n < send_len) //now n > 0
+ {
+ mblk->rd_ptr (static_cast<size_t> (n));
+
+ if (msg_queue()->enqueue_head(mblk, (ACE_Time_Value*) &ACE_Time_Value::zero) == -1)
+ {
+ sLog.outError("WorldSocket::handle_output_queue enqueue_head");
+ mblk->release();
+ return -1;
+ }
+
+ return schedule_wakeup_output (g);
+ }
+ else //now n == send_len
+ {
+ mblk->release();
+
+ return msg_queue()->is_empty() ? cancel_wakeup_output(g) : ACE_Event_Handler::WRITE_MASK;
+ }
+
+ ACE_NOTREACHED(return -1);
+}
+
int WorldSocket::handle_close (ACE_HANDLE h, ACE_Reactor_Mask)
{
// Critical section
@@ -402,10 +465,15 @@ int WorldSocket::Update (void)
if (closing_)
return -1;
- if (m_OutActive || m_OutBuffer->length () == 0)
+ if (m_OutActive || (m_OutBuffer->length () == 0 && msg_queue()->is_empty()))
return 0;
- return handle_output (get_handle ());
+ int ret;
+ do
+ ret = handle_output (get_handle ());
+ while( ret > 0 );
+
+ return ret;
}
int WorldSocket::handle_input_header (void)