mirror of
https://github.com/TrinityCore/TrinityCore.git
synced 2026-01-19 00:48:56 +01:00
Replace some WorldSocket code with Derex´s code for MaNGOS to support packets > 64kb.
This get rid of the ¨queue¨ that only supported 1 packet and implement it properly. Meaning no ¨internal¨ packet loss obstructing the client/server I/O. Apply Trinity code style where applicable. --HG-- branch : trunk
This commit is contained in:
@@ -100,19 +100,22 @@ struct ClientPktHeader
|
||||
#endif
|
||||
|
||||
WorldSocket::WorldSocket (void) :
|
||||
WorldHandler (),
|
||||
m_Session (0),
|
||||
m_RecvWPct (0),
|
||||
m_RecvPct (),
|
||||
m_Header (sizeof (ClientPktHeader)),
|
||||
m_OutBuffer (0),
|
||||
m_OutBufferSize (65536),
|
||||
m_OutActive (false),
|
||||
m_Seed (static_cast<uint32> (rand32 ())),
|
||||
m_OverSpeedPings (0),
|
||||
m_LastPingTime (ACE_Time_Value::zero)
|
||||
WorldHandler(),
|
||||
m_Session(0),
|
||||
m_RecvWPct(0),
|
||||
m_RecvPct(),
|
||||
m_Header(sizeof (ClientPktHeader)),
|
||||
m_OutBuffer(0),
|
||||
m_OutBufferSize(65536),
|
||||
m_OutActive(false),
|
||||
m_Seed(static_cast<uint32> (rand32())),
|
||||
m_OverSpeedPings(0),
|
||||
m_LastPingTime(ACE_Time_Value::zero)
|
||||
{
|
||||
reference_counting_policy ().value (ACE_Event_Handler::Reference_Counting_Policy::ENABLED);
|
||||
reference_counting_policy().value (ACE_Event_Handler::Reference_Counting_Policy::ENABLED);
|
||||
|
||||
msg_queue()->high_water_mark(8*1024*1024);
|
||||
msg_queue()->low_water_mark(8*1024*1024);
|
||||
}
|
||||
|
||||
WorldSocket::~WorldSocket (void)
|
||||
@@ -121,15 +124,11 @@ WorldSocket::~WorldSocket (void)
|
||||
delete m_RecvWPct;
|
||||
|
||||
if (m_OutBuffer)
|
||||
m_OutBuffer->release ();
|
||||
m_OutBuffer->release();
|
||||
|
||||
closing_ = true;
|
||||
|
||||
peer ().close ();
|
||||
|
||||
WorldPacket* pct;
|
||||
while (m_PacketQueue.dequeue_head (pct) == 0)
|
||||
delete pct;
|
||||
peer().close();
|
||||
}
|
||||
|
||||
bool WorldSocket::IsClosed (void) const
|
||||
@@ -146,7 +145,7 @@ void WorldSocket::CloseSocket (void)
|
||||
return;
|
||||
|
||||
closing_ = true;
|
||||
peer ().close_writer ();
|
||||
peer().close_writer();
|
||||
}
|
||||
|
||||
{
|
||||
@@ -169,46 +168,56 @@ int WorldSocket::SendPacket (const WorldPacket& pct)
|
||||
return -1;
|
||||
|
||||
// Dump outgoing packet.
|
||||
if (sWorldLog.LogWorld ())
|
||||
if (sWorldLog.LogWorld())
|
||||
{
|
||||
sWorldLog.outTimestampLog ("SERVER:\nSOCKET: %u\nLENGTH: %u\nOPCODE: %s (0x%.4X)\nDATA:\n",
|
||||
(uint32) get_handle (),
|
||||
pct.size (),
|
||||
LookupOpcodeName (pct.GetOpcode ()),
|
||||
pct.GetOpcode ());
|
||||
(uint32) get_handle(),
|
||||
pct.size(),
|
||||
LookupOpcodeName (pct.GetOpcode()),
|
||||
pct.GetOpcode());
|
||||
|
||||
uint32 p = 0;
|
||||
while (p < pct.size ())
|
||||
while (p < pct.size())
|
||||
{
|
||||
for (uint32 j = 0; j < 16 && p < pct.size (); j++)
|
||||
sWorldLog.outLog ("%.2X ", const_cast<WorldPacket&>(pct)[p++]);
|
||||
for (uint32 j = 0; j < 16 && p < pct.size(); j++)
|
||||
sWorldLog.outLog("%.2X ", const_cast<WorldPacket&>(pct)[p++]);
|
||||
|
||||
sWorldLog.outLog ("\n");
|
||||
sWorldLog.outLog("\n");
|
||||
}
|
||||
sWorldLog.outLog ("\n");
|
||||
sWorldLog.outLog("\n");
|
||||
}
|
||||
|
||||
// don't try to send the packet if there are packets on the queue
|
||||
if (!m_PacketQueue.is_empty() || iSendPacket(pct) == -1)
|
||||
ServerPktHeader header(pct.size()+2, pct.GetOpcode());
|
||||
m_Crypt.EncryptSend ((uint8*)header.header, header.getHeaderLength());
|
||||
|
||||
if (m_OutBuffer->space() >= pct.size() + header.getHeaderLength() && msg_queue()->is_empty())
|
||||
{
|
||||
WorldPacket* npct;
|
||||
// Put the packet on the buffer.
|
||||
if (m_OutBuffer->copy((char*) header.header, header.getHeaderLength()) == -1)
|
||||
ACE_ASSERT (false);
|
||||
|
||||
ACE_NEW_RETURN (npct, WorldPacket (pct), -1);
|
||||
if (!pct.empty())
|
||||
if (m_OutBuffer->copy((char*) pct.contents(), pct.size()) == -1)
|
||||
ACE_ASSERT (false);
|
||||
}
|
||||
else
|
||||
{
|
||||
// Enqueue the packet.
|
||||
ACE_Message_Block* mb;
|
||||
|
||||
// NOTE maybe check of the size of the queue can be good ?
|
||||
// to make it bounded instead of unbounded
|
||||
if (m_PacketQueue.enqueue_tail (npct) == -1)
|
||||
ACE_NEW_RETURN(mb, ACE_Message_Block(pct.size() + header.getHeaderLength()), -1);
|
||||
|
||||
mb->copy((char*) header.header, header.getHeaderLength());
|
||||
|
||||
if (!pct.empty())
|
||||
mb->copy((const char*)pct.contents(), pct.size());
|
||||
|
||||
if (msg_queue()->enqueue_tail(mb,(ACE_Time_Value*)&ACE_Time_Value::zero) == -1)
|
||||
{
|
||||
delete npct;
|
||||
sLog.outError ("WorldSocket::SendPacket: m_PacketQueue.enqueue_tail failed");
|
||||
sLog.outError("WorldSocket::SendPacket enqueue_tail failed");
|
||||
mb->release();
|
||||
return -1;
|
||||
}
|
||||
|
||||
// iSendPacket may fail if the packet is larger than the buffer (even in the buffer is empty)
|
||||
// So we must try to flush so the packet may be sent partially
|
||||
// don't cancel_wakeup_output here
|
||||
if (iFlushPacketQueue())
|
||||
return schedule_wakeup_output(Guard);
|
||||
}
|
||||
|
||||
return 0;
|
||||
@@ -216,12 +225,12 @@ int WorldSocket::SendPacket (const WorldPacket& pct)
|
||||
|
||||
long WorldSocket::AddReference (void)
|
||||
{
|
||||
return static_cast<long> (add_reference ());
|
||||
return static_cast<long> (add_reference());
|
||||
}
|
||||
|
||||
long WorldSocket::RemoveReference (void)
|
||||
{
|
||||
return static_cast<long> (remove_reference ());
|
||||
return static_cast<long> (remove_reference());
|
||||
}
|
||||
|
||||
int WorldSocket::open (void *a)
|
||||
@@ -237,7 +246,7 @@ int WorldSocket::open (void *a)
|
||||
m_OutActive = true;
|
||||
|
||||
// Hook for the manager.
|
||||
if (sWorldSocketMgr->OnSocketOpen (this) == -1)
|
||||
if (sWorldSocketMgr->OnSocketOpen(this) == -1)
|
||||
return -1;
|
||||
|
||||
// Allocate the buffer.
|
||||
@@ -246,14 +255,14 @@ int WorldSocket::open (void *a)
|
||||
// Store peer address.
|
||||
ACE_INET_Addr remote_addr;
|
||||
|
||||
if (peer ().get_remote_addr (remote_addr) == -1)
|
||||
if (peer().get_remote_addr(remote_addr) == -1)
|
||||
{
|
||||
sLog.outError ("WorldSocket::open: peer ().get_remote_addr errno = %s", ACE_OS::strerror (errno));
|
||||
sLog.outError ("WorldSocket::open: peer().get_remote_addr errno = %s", ACE_OS::strerror (errno));
|
||||
return -1;
|
||||
}
|
||||
|
||||
m_Address = remote_addr.get_host_addr ();
|
||||
|
||||
m_Address = remote_addr.get_host_addr();
|
||||
|
||||
// Send startup packet.
|
||||
WorldPacket packet (SMSG_AUTH_CHALLENGE, 24);
|
||||
packet << uint32(1); // 1...31
|
||||
@@ -267,29 +276,29 @@ int WorldSocket::open (void *a)
|
||||
seed2.SetRand(16 * 8);
|
||||
packet.append(seed2.AsByteArray(16), 16); // new encryption seeds
|
||||
|
||||
if (SendPacket (packet) == -1)
|
||||
if (SendPacket(packet) == -1)
|
||||
return -1;
|
||||
|
||||
|
||||
// Register with ACE Reactor
|
||||
if (reactor ()->register_handler(this, ACE_Event_Handler::READ_MASK | ACE_Event_Handler::WRITE_MASK) == -1)
|
||||
if (reactor()->register_handler(this, ACE_Event_Handler::READ_MASK | ACE_Event_Handler::WRITE_MASK) == -1)
|
||||
{
|
||||
sLog.outError ("WorldSocket::open: unable to register client handler errno = %s", ACE_OS::strerror (errno));
|
||||
return -1;
|
||||
}
|
||||
|
||||
|
||||
// reactor takes care of the socket from now on
|
||||
remove_reference ();
|
||||
remove_reference();
|
||||
|
||||
return 0;
|
||||
}
|
||||
|
||||
int WorldSocket::close (int)
|
||||
{
|
||||
shutdown ();
|
||||
shutdown();
|
||||
|
||||
closing_ = true;
|
||||
|
||||
remove_reference ();
|
||||
remove_reference();
|
||||
|
||||
return 0;
|
||||
}
|
||||
@@ -299,24 +308,24 @@ int WorldSocket::handle_input (ACE_HANDLE)
|
||||
if (closing_)
|
||||
return -1;
|
||||
|
||||
switch (handle_input_missing_data ())
|
||||
switch (handle_input_missing_data())
|
||||
{
|
||||
case -1 :
|
||||
{
|
||||
if ((errno == EWOULDBLOCK) ||
|
||||
(errno == EAGAIN))
|
||||
{
|
||||
return Update (); // interesting line ,isn't it ?
|
||||
return Update(); // interesting line ,isn't it ?
|
||||
}
|
||||
|
||||
DEBUG_LOG ("WorldSocket::handle_input: Peer error closing connection errno = %s", ACE_OS::strerror (errno));
|
||||
DEBUG_LOG("WorldSocket::handle_input: Peer error closing connection errno = %s", ACE_OS::strerror (errno));
|
||||
|
||||
errno = ECONNRESET;
|
||||
return -1;
|
||||
}
|
||||
case 0:
|
||||
{
|
||||
DEBUG_LOG ("WorldSocket::handle_input: Peer has closed connection");
|
||||
DEBUG_LOG("WorldSocket::handle_input: Peer has closed connection");
|
||||
|
||||
errno = ECONNRESET;
|
||||
return -1;
|
||||
@@ -324,7 +333,7 @@ int WorldSocket::handle_input (ACE_HANDLE)
|
||||
case 1:
|
||||
return 1;
|
||||
default:
|
||||
return Update (); // another interesting line ;)
|
||||
return Update(); // another interesting line ;)
|
||||
}
|
||||
|
||||
ACE_NOTREACHED(return -1);
|
||||
@@ -340,17 +349,12 @@ int WorldSocket::handle_output (ACE_HANDLE)
|
||||
size_t send_len = m_OutBuffer->length();
|
||||
|
||||
if (send_len == 0)
|
||||
{
|
||||
if (!iFlushPacketQueue())
|
||||
return cancel_wakeup_output(Guard);
|
||||
|
||||
send_len = m_OutBuffer->length ();
|
||||
}
|
||||
return handle_output_queue(Guard);
|
||||
|
||||
#ifdef MSG_NOSIGNAL
|
||||
ssize_t n = peer ().send (m_OutBuffer->rd_ptr (), send_len, MSG_NOSIGNAL);
|
||||
ssize_t n = peer().send (m_OutBuffer->rd_ptr(), send_len, MSG_NOSIGNAL);
|
||||
#else
|
||||
ssize_t n = peer ().send (m_OutBuffer->rd_ptr (), send_len);
|
||||
ssize_t n = peer().send (m_OutBuffer->rd_ptr(), send_len);
|
||||
#endif // MSG_NOSIGNAL
|
||||
|
||||
if (n == 0)
|
||||
@@ -362,28 +366,86 @@ int WorldSocket::handle_output (ACE_HANDLE)
|
||||
|
||||
return -1;
|
||||
}
|
||||
else if (n < send_len) //now n > 0
|
||||
else if (n < (ssize_t)send_len) //now n > 0
|
||||
{
|
||||
m_OutBuffer->rd_ptr (static_cast<size_t> (n));
|
||||
|
||||
// move the data to the base of the buffer
|
||||
m_OutBuffer->crunch ();
|
||||
m_OutBuffer->crunch();
|
||||
|
||||
return schedule_wakeup_output (Guard);
|
||||
}
|
||||
else //now n == send_len
|
||||
{
|
||||
m_OutBuffer->reset ();
|
||||
m_OutBuffer->reset();
|
||||
|
||||
if (!iFlushPacketQueue ())
|
||||
return cancel_wakeup_output (Guard);
|
||||
else
|
||||
return schedule_wakeup_output (Guard);
|
||||
return handle_output_queue (Guard);
|
||||
}
|
||||
|
||||
ACE_NOTREACHED (return 0);
|
||||
}
|
||||
|
||||
int WorldSocket::handle_output_queue (GuardType& g)
|
||||
{
|
||||
if (msg_queue()->is_empty())
|
||||
return cancel_wakeup_output(g);
|
||||
|
||||
ACE_Message_Block *mblk;
|
||||
|
||||
if (msg_queue()->dequeue_head(mblk, (ACE_Time_Value*)&ACE_Time_Value::zero) == -1)
|
||||
{
|
||||
sLog.outError("WorldSocket::handle_output_queue dequeue_head");
|
||||
return -1;
|
||||
}
|
||||
|
||||
const size_t send_len = mblk->length();
|
||||
|
||||
#ifdef MSG_NOSIGNAL
|
||||
ssize_t n = peer().send (mblk->rd_ptr(), send_len, MSG_NOSIGNAL);
|
||||
#else
|
||||
ssize_t n = peer().send (mblk->rd_ptr(), send_len);
|
||||
#endif // MSG_NOSIGNAL
|
||||
|
||||
if (n == 0)
|
||||
{
|
||||
mblk->release();
|
||||
|
||||
return -1;
|
||||
}
|
||||
else if (n == -1)
|
||||
{
|
||||
if (errno == EWOULDBLOCK || errno == EAGAIN)
|
||||
{
|
||||
msg_queue()->enqueue_head(mblk, (ACE_Time_Value*) &ACE_Time_Value::zero);
|
||||
return schedule_wakeup_output (g);
|
||||
}
|
||||
|
||||
mblk->release();
|
||||
return -1;
|
||||
}
|
||||
else if (n < (ssize_t)send_len) //now n > 0
|
||||
{
|
||||
mblk->rd_ptr(static_cast<size_t> (n));
|
||||
|
||||
if (msg_queue()->enqueue_head(mblk, (ACE_Time_Value*) &ACE_Time_Value::zero) == -1)
|
||||
{
|
||||
sLog.outError("WorldSocket::handle_output_queue enqueue_head");
|
||||
mblk->release();
|
||||
return -1;
|
||||
}
|
||||
|
||||
return schedule_wakeup_output (g);
|
||||
}
|
||||
else //now n == send_len
|
||||
{
|
||||
mblk->release();
|
||||
|
||||
return msg_queue()->is_empty() ? cancel_wakeup_output(g) : ACE_Event_Handler::WRITE_MASK;
|
||||
}
|
||||
|
||||
ACE_NOTREACHED(return -1);
|
||||
}
|
||||
|
||||
int WorldSocket::handle_close (ACE_HANDLE h, ACE_Reactor_Mask)
|
||||
{
|
||||
// Critical section
|
||||
@@ -393,7 +455,7 @@ int WorldSocket::handle_close (ACE_HANDLE h, ACE_Reactor_Mask)
|
||||
closing_ = true;
|
||||
|
||||
if (h == ACE_INVALID_HANDLE)
|
||||
peer ().close_writer ();
|
||||
peer().close_writer();
|
||||
}
|
||||
|
||||
// Critical section
|
||||
@@ -411,21 +473,26 @@ int WorldSocket::Update (void)
|
||||
if (closing_)
|
||||
return -1;
|
||||
|
||||
if (m_OutActive || m_OutBuffer->length () == 0)
|
||||
if (m_OutActive || (m_OutBuffer->length() == 0 && msg_queue()->is_empty()))
|
||||
return 0;
|
||||
|
||||
return handle_output (get_handle ());
|
||||
int ret;
|
||||
do
|
||||
ret = handle_output (get_handle());
|
||||
while (ret > 0);
|
||||
|
||||
return ret;
|
||||
}
|
||||
|
||||
int WorldSocket::handle_input_header (void)
|
||||
{
|
||||
ACE_ASSERT (m_RecvWPct == NULL);
|
||||
|
||||
ACE_ASSERT (m_Header.length () == sizeof (ClientPktHeader));
|
||||
ACE_ASSERT (m_Header.length() == sizeof(ClientPktHeader));
|
||||
|
||||
m_Crypt.DecryptRecv ((uint8*) m_Header.rd_ptr (), sizeof (ClientPktHeader));
|
||||
m_Crypt.DecryptRecv ((uint8*) m_Header.rd_ptr(), sizeof(ClientPktHeader));
|
||||
|
||||
ClientPktHeader& header = *((ClientPktHeader*) m_Header.rd_ptr ());
|
||||
ClientPktHeader& header = *((ClientPktHeader*) m_Header.rd_ptr());
|
||||
|
||||
EndianConvertReverse(header.size);
|
||||
EndianConvert(header.cmd);
|
||||
@@ -446,7 +513,7 @@ int WorldSocket::handle_input_header (void)
|
||||
if (header.size > 0)
|
||||
{
|
||||
m_RecvWPct->resize (header.size);
|
||||
m_RecvPct.base ((char*) m_RecvWPct->contents (), m_RecvWPct->size ());
|
||||
m_RecvPct.base ((char*) m_RecvWPct->contents(), m_RecvWPct->size());
|
||||
}
|
||||
else
|
||||
{
|
||||
@@ -461,17 +528,17 @@ int WorldSocket::handle_input_payload (void)
|
||||
// set errno properly here on error !!!
|
||||
// now have a header and payload
|
||||
|
||||
ACE_ASSERT (m_RecvPct.space () == 0);
|
||||
ACE_ASSERT (m_Header.space () == 0);
|
||||
ACE_ASSERT (m_RecvPct.space() == 0);
|
||||
ACE_ASSERT (m_Header.space() == 0);
|
||||
ACE_ASSERT (m_RecvWPct != NULL);
|
||||
|
||||
const int ret = ProcessIncoming (m_RecvWPct);
|
||||
|
||||
m_RecvPct.base (NULL, 0);
|
||||
m_RecvPct.reset ();
|
||||
m_RecvPct.reset();
|
||||
m_RecvWPct = NULL;
|
||||
|
||||
m_Header.reset ();
|
||||
m_Header.reset();
|
||||
|
||||
if (ret == -1)
|
||||
errno = EINVAL;
|
||||
@@ -495,9 +562,9 @@ int WorldSocket::handle_input_missing_data (void)
|
||||
ACE_Message_Block::DONT_DELETE,
|
||||
0);
|
||||
|
||||
const size_t recv_size = message_block.space ();
|
||||
const size_t recv_size = message_block.space();
|
||||
|
||||
const ssize_t n = peer ().recv (message_block.wr_ptr (),
|
||||
const ssize_t n = peer().recv (message_block.wr_ptr(),
|
||||
recv_size);
|
||||
|
||||
if (n <= 0)
|
||||
@@ -505,25 +572,25 @@ int WorldSocket::handle_input_missing_data (void)
|
||||
|
||||
message_block.wr_ptr (n);
|
||||
|
||||
while (message_block.length () > 0)
|
||||
while (message_block.length() > 0)
|
||||
{
|
||||
if (m_Header.space () > 0)
|
||||
if (m_Header.space() > 0)
|
||||
{
|
||||
//need to receive the header
|
||||
const size_t to_header = (message_block.length () > m_Header.space () ? m_Header.space () : message_block.length ());
|
||||
m_Header.copy (message_block.rd_ptr (), to_header);
|
||||
const size_t to_header = (message_block.length() > m_Header.space() ? m_Header.space() : message_block.length());
|
||||
m_Header.copy (message_block.rd_ptr(), to_header);
|
||||
message_block.rd_ptr (to_header);
|
||||
|
||||
if (m_Header.space () > 0)
|
||||
if (m_Header.space() > 0)
|
||||
{
|
||||
// Couldn't receive the whole header this time.
|
||||
ACE_ASSERT (message_block.length () == 0);
|
||||
ACE_ASSERT (message_block.length() == 0);
|
||||
errno = EWOULDBLOCK;
|
||||
return -1;
|
||||
}
|
||||
|
||||
// We just received nice new header
|
||||
if (handle_input_header () == -1)
|
||||
if (handle_input_header() == -1)
|
||||
{
|
||||
ACE_ASSERT ((errno != EWOULDBLOCK) && (errno != EAGAIN));
|
||||
return -1;
|
||||
@@ -541,24 +608,24 @@ int WorldSocket::handle_input_missing_data (void)
|
||||
}
|
||||
|
||||
// We have full read header, now check the data payload
|
||||
if (m_RecvPct.space () > 0)
|
||||
if (m_RecvPct.space() > 0)
|
||||
{
|
||||
//need more data in the payload
|
||||
const size_t to_data = (message_block.length () > m_RecvPct.space () ? m_RecvPct.space () : message_block.length ());
|
||||
m_RecvPct.copy (message_block.rd_ptr (), to_data);
|
||||
const size_t to_data = (message_block.length() > m_RecvPct.space() ? m_RecvPct.space() : message_block.length());
|
||||
m_RecvPct.copy (message_block.rd_ptr(), to_data);
|
||||
message_block.rd_ptr (to_data);
|
||||
|
||||
if (m_RecvPct.space () > 0)
|
||||
if (m_RecvPct.space() > 0)
|
||||
{
|
||||
// Couldn't receive the whole data this time.
|
||||
ACE_ASSERT (message_block.length () == 0);
|
||||
ACE_ASSERT (message_block.length() == 0);
|
||||
errno = EWOULDBLOCK;
|
||||
return -1;
|
||||
}
|
||||
}
|
||||
|
||||
//just received fresh new payload
|
||||
if (handle_input_payload () == -1)
|
||||
if (handle_input_payload() == -1)
|
||||
{
|
||||
ACE_ASSERT ((errno != EWOULDBLOCK) && (errno != EAGAIN));
|
||||
return -1;
|
||||
@@ -575,9 +642,9 @@ int WorldSocket::cancel_wakeup_output (GuardType& g)
|
||||
|
||||
m_OutActive = false;
|
||||
|
||||
g.release ();
|
||||
g.release();
|
||||
|
||||
if (reactor ()->cancel_wakeup
|
||||
if (reactor()->cancel_wakeup
|
||||
(this, ACE_Event_Handler::WRITE_MASK) == -1)
|
||||
{
|
||||
// would be good to store errno from reactor with errno guard
|
||||
@@ -595,9 +662,9 @@ int WorldSocket::schedule_wakeup_output (GuardType& g)
|
||||
|
||||
m_OutActive = true;
|
||||
|
||||
g.release ();
|
||||
g.release();
|
||||
|
||||
if (reactor ()->schedule_wakeup
|
||||
if (reactor()->schedule_wakeup
|
||||
(this, ACE_Event_Handler::WRITE_MASK) == -1)
|
||||
{
|
||||
sLog.outError ("WorldSocket::schedule_wakeup_output");
|
||||
@@ -614,24 +681,24 @@ int WorldSocket::ProcessIncoming (WorldPacket* new_pct)
|
||||
// manage memory ;)
|
||||
ACE_Auto_Ptr<WorldPacket> aptr (new_pct);
|
||||
|
||||
const ACE_UINT16 opcode = new_pct->GetOpcode ();
|
||||
const ACE_UINT16 opcode = new_pct->GetOpcode();
|
||||
|
||||
if (closing_)
|
||||
return -1;
|
||||
|
||||
// Dump received packet.
|
||||
if (sWorldLog.LogWorld ())
|
||||
if (sWorldLog.LogWorld())
|
||||
{
|
||||
sWorldLog.outTimestampLog ("CLIENT:\nSOCKET: %u\nLENGTH: %u\nOPCODE: %s (0x%.4X)\nDATA:\n",
|
||||
(uint32) get_handle (),
|
||||
new_pct->size (),
|
||||
LookupOpcodeName (new_pct->GetOpcode ()),
|
||||
new_pct->GetOpcode ());
|
||||
(uint32) get_handle(),
|
||||
new_pct->size(),
|
||||
LookupOpcodeName (new_pct->GetOpcode()),
|
||||
new_pct->GetOpcode());
|
||||
|
||||
uint32 p = 0;
|
||||
while (p < new_pct->size ())
|
||||
while (p < new_pct->size())
|
||||
{
|
||||
for (uint32 j = 0; j < 16 && p < new_pct->size (); j++)
|
||||
for (uint32 j = 0; j < 16 && p < new_pct->size(); j++)
|
||||
sWorldLog.outLog ("%.2X ", (*new_pct)[p++]);
|
||||
|
||||
sWorldLog.outLog ("\n");
|
||||
@@ -653,7 +720,7 @@ int WorldSocket::ProcessIncoming (WorldPacket* new_pct)
|
||||
|
||||
return HandleAuthSession (*new_pct);
|
||||
case CMSG_KEEP_ALIVE:
|
||||
DEBUG_LOG ("CMSG_KEEP_ALIVE ,size: %d", new_pct->size ());
|
||||
DEBUG_LOG ("CMSG_KEEP_ALIVE ,size: %d", new_pct->size());
|
||||
|
||||
return 0;
|
||||
default:
|
||||
@@ -677,7 +744,7 @@ int WorldSocket::ProcessIncoming (WorldPacket* new_pct)
|
||||
m_Session->UpdateTimeOutTime(false);
|
||||
|
||||
// OK ,give the packet to WorldSession
|
||||
aptr.release ();
|
||||
aptr.release();
|
||||
// WARNINIG here we call it with locks held.
|
||||
// Its possible to cause deadlock if QueuePacket calls back
|
||||
m_Session->QueuePacket (new_pct);
|
||||
@@ -747,7 +814,7 @@ int WorldSocket::HandleAuthSession (WorldPacket& recvPacket)
|
||||
DEBUG_LOG ("WorldSocket::HandleAuthSession: client %u, unk2 %u, account %s, unk3 %u, clientseed %u",
|
||||
BuiltNumberClient,
|
||||
unk2,
|
||||
account.c_str (),
|
||||
account.c_str(),
|
||||
unk3,
|
||||
clientSeed);
|
||||
|
||||
@@ -769,7 +836,7 @@ int WorldSocket::HandleAuthSession (WorldPacket& recvPacket)
|
||||
"locale " //8
|
||||
"FROM account "
|
||||
"WHERE username = '%s'",
|
||||
safe_account.c_str ());
|
||||
safe_account.c_str());
|
||||
|
||||
// Stop if the account is not found
|
||||
if (!result)
|
||||
@@ -783,7 +850,7 @@ int WorldSocket::HandleAuthSession (WorldPacket& recvPacket)
|
||||
return -1;
|
||||
}
|
||||
|
||||
Field* fields = result->Fetch ();
|
||||
Field* fields = result->Fetch();
|
||||
|
||||
uint8 expansion = fields[6].GetUInt8();
|
||||
uint32 world_expansion = sWorld.getConfig(CONFIG_EXPANSION);
|
||||
@@ -795,10 +862,10 @@ int WorldSocket::HandleAuthSession (WorldPacket& recvPacket)
|
||||
g.SetDword (7);
|
||||
|
||||
v.SetHexStr(fields[4].GetString());
|
||||
s.SetHexStr (fields[5].GetString ());
|
||||
s.SetHexStr (fields[5].GetString());
|
||||
|
||||
const char* sStr = s.AsHexStr (); //Must be freed by OPENSSL_free()
|
||||
const char* vStr = v.AsHexStr (); //Must be freed by OPENSSL_free()
|
||||
const char* sStr = s.AsHexStr(); //Must be freed by OPENSSL_free()
|
||||
const char* vStr = v.AsHexStr(); //Must be freed by OPENSSL_free()
|
||||
|
||||
DEBUG_LOG ("WorldSocket::HandleAuthSession: (s,v) check s: %s v: %s",
|
||||
sStr,
|
||||
@@ -808,9 +875,9 @@ int WorldSocket::HandleAuthSession (WorldPacket& recvPacket)
|
||||
OPENSSL_free ((void*) vStr);
|
||||
|
||||
///- Re-check ip locking (same check as in realmd).
|
||||
if (fields[3].GetUInt8 () == 1) // if ip is locked
|
||||
if (fields[3].GetUInt8() == 1) // if ip is locked
|
||||
{
|
||||
if (strcmp (fields[2].GetString (), GetRemoteAddress ().c_str ()))
|
||||
if (strcmp (fields[2].GetString(), GetRemoteAddress().c_str()))
|
||||
{
|
||||
packet.Initialize (SMSG_AUTH_RESPONSE, 1);
|
||||
packet << uint8 (AUTH_FAILED);
|
||||
@@ -821,17 +888,17 @@ int WorldSocket::HandleAuthSession (WorldPacket& recvPacket)
|
||||
}
|
||||
}
|
||||
|
||||
id = fields[0].GetUInt32 ();
|
||||
id = fields[0].GetUInt32();
|
||||
/*
|
||||
if (security > SEC_ADMINISTRATOR) // prevent invalid security settings in DB
|
||||
security = SEC_ADMINISTRATOR;
|
||||
*/
|
||||
|
||||
K.SetHexStr (fields[1].GetString ());
|
||||
K.SetHexStr (fields[1].GetString());
|
||||
|
||||
time_t mutetime = time_t (fields[7].GetUInt64 ());
|
||||
time_t mutetime = time_t (fields[7].GetUInt64());
|
||||
|
||||
locale = LocaleConstant (fields[8].GetUInt8 ());
|
||||
locale = LocaleConstant (fields[8].GetUInt8());
|
||||
if (locale >= MAX_LOCALE)
|
||||
locale = LOCALE_enUS;
|
||||
|
||||
@@ -849,7 +916,7 @@ int WorldSocket::HandleAuthSession (WorldPacket& recvPacket)
|
||||
security = 0;
|
||||
else
|
||||
{
|
||||
fields = result->Fetch ();
|
||||
fields = result->Fetch();
|
||||
security = fields[1].GetInt32();
|
||||
}
|
||||
|
||||
@@ -872,7 +939,7 @@ int WorldSocket::HandleAuthSession (WorldPacket& recvPacket)
|
||||
|
||||
// Check locked state for server
|
||||
sWorld.UpdateAllowedSecurity();
|
||||
AccountTypes allowedAccountType = sWorld.GetPlayerSecurityLimit ();
|
||||
AccountTypes allowedAccountType = sWorld.GetPlayerSecurityLimit();
|
||||
sLog.outDebug("Allowed Level: %u Player Level %u", allowedAccountType, AccountTypes(security));
|
||||
if (allowedAccountType > SEC_PLAYER && AccountTypes(security) < allowedAccountType)
|
||||
{
|
||||
@@ -896,9 +963,9 @@ int WorldSocket::HandleAuthSession (WorldPacket& recvPacket)
|
||||
sha.UpdateData ((uint8 *) & clientSeed, 4);
|
||||
sha.UpdateData ((uint8 *) & seed, 4);
|
||||
sha.UpdateBigNumbers (&K, NULL);
|
||||
sha.Finalize ();
|
||||
sha.Finalize();
|
||||
|
||||
if (memcmp (sha.GetDigest (), digest, 20))
|
||||
if (memcmp (sha.GetDigest(), digest, 20))
|
||||
{
|
||||
packet.Initialize (SMSG_AUTH_RESPONSE, 1);
|
||||
packet << uint8 (AUTH_FAILED);
|
||||
@@ -909,11 +976,11 @@ int WorldSocket::HandleAuthSession (WorldPacket& recvPacket)
|
||||
return -1;
|
||||
}
|
||||
|
||||
std::string address = GetRemoteAddress ();
|
||||
std::string address = GetRemoteAddress();
|
||||
|
||||
DEBUG_LOG ("WorldSocket::HandleAuthSession: Client '%s' authenticated successfully from %s.",
|
||||
account.c_str (),
|
||||
address.c_str ());
|
||||
account.c_str(),
|
||||
address.c_str());
|
||||
|
||||
// Update the last_ip in the database
|
||||
// No SQL injection, username escaped.
|
||||
@@ -922,8 +989,8 @@ int WorldSocket::HandleAuthSession (WorldPacket& recvPacket)
|
||||
LoginDatabase.PExecute ("UPDATE account "
|
||||
"SET last_ip = '%s' "
|
||||
"WHERE username = '%s'",
|
||||
address.c_str (),
|
||||
safe_account.c_str ());
|
||||
address.c_str(),
|
||||
safe_account.c_str());
|
||||
|
||||
// NOTE ATM the socket is single-threaded, have this in mind ...
|
||||
ACE_NEW_RETURN (m_Session, WorldSession (id, this, AccountTypes(security), expansion, mutetime, locale), -1);
|
||||
@@ -952,10 +1019,10 @@ int WorldSocket::HandlePing (WorldPacket& recvPacket)
|
||||
recvPacket >> latency;
|
||||
|
||||
if (m_LastPingTime == ACE_Time_Value::zero)
|
||||
m_LastPingTime = ACE_OS::gettimeofday (); // for 1st ping
|
||||
m_LastPingTime = ACE_OS::gettimeofday(); // for 1st ping
|
||||
else
|
||||
{
|
||||
ACE_Time_Value cur_time = ACE_OS::gettimeofday ();
|
||||
ACE_Time_Value cur_time = ACE_OS::gettimeofday();
|
||||
ACE_Time_Value diff_time (cur_time);
|
||||
diff_time -= m_LastPingTime;
|
||||
m_LastPingTime = cur_time;
|
||||
@@ -970,11 +1037,11 @@ int WorldSocket::HandlePing (WorldPacket& recvPacket)
|
||||
{
|
||||
ACE_GUARD_RETURN (LockType, Guard, m_SessionLock, -1);
|
||||
|
||||
if (m_Session && m_Session->GetSecurity () == SEC_PLAYER)
|
||||
if (m_Session && m_Session->GetSecurity() == SEC_PLAYER)
|
||||
{
|
||||
sLog.outError ("WorldSocket::HandlePing: Player kicked for "
|
||||
"over-speed pings address = %s",
|
||||
GetRemoteAddress ().c_str ());
|
||||
GetRemoteAddress().c_str());
|
||||
|
||||
return -1;
|
||||
}
|
||||
@@ -995,7 +1062,7 @@ int WorldSocket::HandlePing (WorldPacket& recvPacket)
|
||||
sLog.outError ("WorldSocket::HandlePing: peer sent CMSG_PING, "
|
||||
"but is not authenticated or got recently kicked,"
|
||||
" address = %s",
|
||||
GetRemoteAddress ().c_str ());
|
||||
GetRemoteAddress().c_str());
|
||||
return -1;
|
||||
}
|
||||
}
|
||||
@@ -1004,100 +1071,3 @@ int WorldSocket::HandlePing (WorldPacket& recvPacket)
|
||||
packet << ping;
|
||||
return SendPacket (packet);
|
||||
}
|
||||
|
||||
int WorldSocket::iSendPacket (const WorldPacket& pct)
|
||||
{
|
||||
ServerPktHeader header(pct.size()+2, pct.GetOpcode());
|
||||
if (m_OutBuffer->space() < pct.size () + header.getHeaderLength())
|
||||
{
|
||||
errno = ENOBUFS;
|
||||
return -1;
|
||||
}
|
||||
|
||||
m_Crypt.EncryptSend (header.header, header.getHeaderLength());
|
||||
|
||||
if (m_OutBuffer->copy ((char*) header.header, header.getHeaderLength()) == -1)
|
||||
ACE_ASSERT (false);
|
||||
|
||||
if (!pct.empty ())
|
||||
if (m_OutBuffer->copy ((char*) pct.contents (), pct.size ()) == -1)
|
||||
ACE_ASSERT (false);
|
||||
|
||||
return 0;
|
||||
}
|
||||
|
||||
/// Return 1 if some bytes written and packet completed
|
||||
/// Return 0 if no byte written, but still remaining data
|
||||
/// Return -1 if some bytes written and still remaining data
|
||||
int WorldSocket::iSendPartialPacket(WorldPacket& pct)
|
||||
{
|
||||
size_t remainingLen = pct.size() - pct.rpos();
|
||||
|
||||
if (pct.rpos() == 0) // nothing sent yet => send header
|
||||
{
|
||||
ServerPktHeader header(pct.size()+2, pct.GetOpcode());
|
||||
|
||||
// check if there is at least enough space for the header
|
||||
if (m_OutBuffer->space () < header.getHeaderLength())
|
||||
return 0; // nothing written, still remaining data => 0
|
||||
|
||||
m_Crypt.EncryptSend(header.header, header.getHeaderLength());
|
||||
|
||||
if (m_OutBuffer->copy((char*) header.header, header.getHeaderLength()) == -1)
|
||||
return 0; // no space left. This should not happend. Just return 0 as nothing written and still remaining data
|
||||
|
||||
if (remainingLen == 0)
|
||||
return 1; // header written, nothing else => 1
|
||||
}
|
||||
|
||||
if ((m_OutBuffer->space()) < remainingLen) {
|
||||
size_t len = m_OutBuffer->space();
|
||||
|
||||
if (m_OutBuffer->copy((char*) (pct.contents() + pct.rpos()), len) == -1)
|
||||
ACE_ASSERT (false);
|
||||
|
||||
pct.read_skip(len);
|
||||
return -1; // packet will be pushed back on the queue
|
||||
}
|
||||
|
||||
if (m_OutBuffer->copy((char*) (pct.contents() + pct.rpos()), remainingLen) == -1)
|
||||
ACE_ASSERT (false);
|
||||
|
||||
return 1; // some byte written and packet completed
|
||||
}
|
||||
|
||||
bool WorldSocket::iFlushPacketQueue()
|
||||
{
|
||||
WorldPacket *pct;
|
||||
bool haveone = false;
|
||||
|
||||
while (m_PacketQueue.dequeue_head(pct) == 0)
|
||||
{
|
||||
int result = iSendPartialPacket(*pct);
|
||||
|
||||
if (result != 0)
|
||||
{
|
||||
// some bytes were written
|
||||
haveone = true;
|
||||
}
|
||||
|
||||
if (result <= 0)
|
||||
{
|
||||
if (m_PacketQueue.enqueue_head(pct) == -1)
|
||||
{
|
||||
delete pct;
|
||||
sLog.outError ("WorldSocket::iFlushPacketQueue m_PacketQueue->enqueue_head");
|
||||
return false;
|
||||
}
|
||||
|
||||
break;
|
||||
}
|
||||
else
|
||||
{
|
||||
// packet completed
|
||||
delete pct;
|
||||
}
|
||||
}
|
||||
|
||||
return haveone;
|
||||
}
|
||||
|
||||
@@ -73,7 +73,7 @@ typedef ACE_Svc_Handler<ACE_SOCK_STREAM, ACE_NULL_SYNCH> WorldHandler;
|
||||
* sending packets from "producer" threads is minimal,
|
||||
* and doing a lot of writes with small size is tolerated.
|
||||
*
|
||||
* The calls to Update () method are managed by WorldSocketMgr
|
||||
* The calls to Update() method are managed by WorldSocketMgr
|
||||
* and ReactorRunnable.
|
||||
*
|
||||
* For input ,the class uses one 1024 bytes buffer on stack
|
||||
@@ -103,9 +103,6 @@ class WorldSocket : protected WorldHandler
|
||||
typedef ACE_Thread_Mutex LockType;
|
||||
typedef ACE_Guard<LockType> GuardType;
|
||||
|
||||
/// Queue for storing packets for which there is no space.
|
||||
typedef ACE_Unbounded_Queue< WorldPacket* > PacketQueueT;
|
||||
|
||||
/// Check if socket is closed.
|
||||
bool IsClosed (void) const;
|
||||
|
||||
@@ -160,6 +157,9 @@ class WorldSocket : protected WorldHandler
|
||||
/// @param g the guard is for m_OutBufferLock, the function will release it
|
||||
int cancel_wakeup_output (GuardType& g);
|
||||
int schedule_wakeup_output (GuardType& g);
|
||||
|
||||
/// Drain the queue if its not empty.
|
||||
int handle_output_queue (GuardType& g);
|
||||
|
||||
/// process one incoming packet.
|
||||
/// @param new_pct received packet ,note that you need to delete it.
|
||||
@@ -171,20 +171,6 @@ class WorldSocket : protected WorldHandler
|
||||
/// Called by ProcessIncoming() on CMSG_PING.
|
||||
int HandlePing (WorldPacket& recvPacket);
|
||||
|
||||
/// Try to write WorldPacket to m_OutBuffer ,return -1 if no space
|
||||
/// Need to be called with m_OutBufferLock lock held
|
||||
int iSendPacket (const WorldPacket& pct);
|
||||
|
||||
/// Try to write WorldPacket to m_OutBuffer even partially,
|
||||
/// Need to be called with m_OutBufferLock lock held
|
||||
int iSendPartialPacket(WorldPacket& pct);
|
||||
|
||||
/// Flush m_PacketQueue if there are packets in it
|
||||
/// Need to be called with m_OutBufferLock lock held
|
||||
/// @return true if it wrote to the buffer (AKA you need
|
||||
/// to mark the socket for output).
|
||||
bool iFlushPacketQueue ();
|
||||
|
||||
private:
|
||||
/// Time in which the last ping was received
|
||||
ACE_Time_Value m_LastPingTime;
|
||||
@@ -224,10 +210,6 @@ class WorldSocket : protected WorldHandler
|
||||
/// Size of the m_OutBuffer.
|
||||
size_t m_OutBufferSize;
|
||||
|
||||
/// Here are stored packets for which there was no space on m_OutBuffer,
|
||||
/// this allows not-to kick player if its buffer is overflowed.
|
||||
PacketQueueT m_PacketQueue;
|
||||
|
||||
/// True if the socket is registered with the reactor for output
|
||||
bool m_OutActive;
|
||||
|
||||
|
||||
@@ -55,23 +55,23 @@ class ReactorRunnable : protected ACE_Task_Base
|
||||
{
|
||||
public:
|
||||
|
||||
ReactorRunnable () :
|
||||
m_ThreadId (-1),
|
||||
m_Connections (0),
|
||||
m_Reactor (0)
|
||||
ReactorRunnable() :
|
||||
m_ThreadId(-1),
|
||||
m_Connections(0),
|
||||
m_Reactor(0)
|
||||
{
|
||||
ACE_Reactor_Impl* imp = 0;
|
||||
|
||||
#if defined (ACE_HAS_EVENT_POLL) || defined (ACE_HAS_DEV_POLL)
|
||||
|
||||
imp = new ACE_Dev_Poll_Reactor ();
|
||||
imp = new ACE_Dev_Poll_Reactor();
|
||||
|
||||
imp->max_notify_iterations (128);
|
||||
imp->restart (1);
|
||||
|
||||
#else
|
||||
|
||||
imp = new ACE_TP_Reactor ();
|
||||
imp = new ACE_TP_Reactor();
|
||||
imp->max_notify_iterations (128);
|
||||
|
||||
#endif
|
||||
@@ -79,33 +79,33 @@ class ReactorRunnable : protected ACE_Task_Base
|
||||
m_Reactor = new ACE_Reactor (imp, 1);
|
||||
}
|
||||
|
||||
virtual ~ReactorRunnable ()
|
||||
virtual ~ReactorRunnable()
|
||||
{
|
||||
Stop ();
|
||||
Wait ();
|
||||
Stop();
|
||||
Wait();
|
||||
|
||||
if (m_Reactor)
|
||||
delete m_Reactor;
|
||||
}
|
||||
|
||||
void Stop ()
|
||||
void Stop()
|
||||
{
|
||||
m_Reactor->end_reactor_event_loop ();
|
||||
m_Reactor->end_reactor_event_loop();
|
||||
}
|
||||
|
||||
int Start ()
|
||||
int Start()
|
||||
{
|
||||
if (m_ThreadId != -1)
|
||||
return -1;
|
||||
|
||||
return (m_ThreadId = activate ());
|
||||
return (m_ThreadId = activate());
|
||||
}
|
||||
|
||||
void Wait () { ACE_Task_Base::wait (); }
|
||||
void Wait() { ACE_Task_Base::wait(); }
|
||||
|
||||
long Connections ()
|
||||
long Connections()
|
||||
{
|
||||
return static_cast<long> (m_Connections.value ());
|
||||
return static_cast<long> (m_Connections.value());
|
||||
}
|
||||
|
||||
int AddSocket (WorldSocket* sock)
|
||||
@@ -120,37 +120,37 @@ class ReactorRunnable : protected ACE_Task_Base
|
||||
return 0;
|
||||
}
|
||||
|
||||
ACE_Reactor* GetReactor ()
|
||||
ACE_Reactor* GetReactor()
|
||||
{
|
||||
return m_Reactor;
|
||||
}
|
||||
|
||||
protected:
|
||||
|
||||
void AddNewSockets ()
|
||||
void AddNewSockets()
|
||||
{
|
||||
ACE_GUARD (ACE_Thread_Mutex, Guard, m_NewSockets_Lock);
|
||||
|
||||
if (m_NewSockets.empty ())
|
||||
if (m_NewSockets.empty())
|
||||
return;
|
||||
|
||||
for (SocketSet::const_iterator i = m_NewSockets.begin (); i != m_NewSockets.end (); ++i)
|
||||
for (SocketSet::const_iterator i = m_NewSockets.begin(); i != m_NewSockets.end(); ++i)
|
||||
{
|
||||
WorldSocket* sock = (*i);
|
||||
|
||||
if (sock->IsClosed ())
|
||||
if (sock->IsClosed())
|
||||
{
|
||||
sock->RemoveReference ();
|
||||
sock->RemoveReference();
|
||||
--m_Connections;
|
||||
}
|
||||
else
|
||||
m_Sockets.insert (sock);
|
||||
}
|
||||
|
||||
m_NewSockets.clear ();
|
||||
m_NewSockets.clear();
|
||||
}
|
||||
|
||||
virtual int svc ()
|
||||
virtual int svc()
|
||||
{
|
||||
DEBUG_LOG ("Network Thread Starting");
|
||||
|
||||
@@ -160,7 +160,7 @@ class ReactorRunnable : protected ACE_Task_Base
|
||||
|
||||
SocketSet::iterator i, t;
|
||||
|
||||
while (!m_Reactor->reactor_event_loop_done ())
|
||||
while (!m_Reactor->reactor_event_loop_done())
|
||||
{
|
||||
// dont be too smart to move this outside the loop
|
||||
// the run_reactor_event_loop will modify interval
|
||||
@@ -169,16 +169,16 @@ class ReactorRunnable : protected ACE_Task_Base
|
||||
if (m_Reactor->run_reactor_event_loop (interval) == -1)
|
||||
break;
|
||||
|
||||
AddNewSockets ();
|
||||
AddNewSockets();
|
||||
|
||||
for (i = m_Sockets.begin (); i != m_Sockets.end ();)
|
||||
for (i = m_Sockets.begin(); i != m_Sockets.end();)
|
||||
{
|
||||
if ((*i)->Update () == -1)
|
||||
if ((*i)->Update() == -1)
|
||||
{
|
||||
t = i;
|
||||
++i;
|
||||
(*t)->CloseSocket ();
|
||||
(*t)->RemoveReference ();
|
||||
(*t)->CloseSocket();
|
||||
(*t)->RemoveReference();
|
||||
--m_Connections;
|
||||
m_Sockets.erase (t);
|
||||
}
|
||||
@@ -208,17 +208,17 @@ class ReactorRunnable : protected ACE_Task_Base
|
||||
ACE_Thread_Mutex m_NewSockets_Lock;
|
||||
};
|
||||
|
||||
WorldSocketMgr::WorldSocketMgr () :
|
||||
m_NetThreadsCount (0),
|
||||
m_NetThreads (0),
|
||||
m_SockOutKBuff (-1),
|
||||
m_SockOutUBuff (65536),
|
||||
m_UseNoDelay (true),
|
||||
WorldSocketMgr::WorldSocketMgr() :
|
||||
m_NetThreadsCount(0),
|
||||
m_NetThreads(0),
|
||||
m_SockOutKBuff(-1),
|
||||
m_SockOutUBuff(65536),
|
||||
m_UseNoDelay(true),
|
||||
m_Acceptor (0)
|
||||
{
|
||||
}
|
||||
|
||||
WorldSocketMgr::~WorldSocketMgr ()
|
||||
WorldSocketMgr::~WorldSocketMgr()
|
||||
{
|
||||
if (m_NetThreads)
|
||||
delete [] m_NetThreads;
|
||||
@@ -244,8 +244,8 @@ WorldSocketMgr::StartReactiveIO (ACE_UINT16 port, const char* address)
|
||||
|
||||
m_NetThreads = new ReactorRunnable[m_NetThreadsCount];
|
||||
|
||||
sLog.outBasic ("Max allowed socket connections %d",ACE::max_handles ());
|
||||
|
||||
sLog.outBasic ("Max allowed socket connections %d", ACE::max_handles());
|
||||
|
||||
// -1 means use default
|
||||
m_SockOutKBuff = sConfig.GetIntDefault ("Network.OutKBuff", -1);
|
||||
|
||||
@@ -262,14 +262,14 @@ WorldSocketMgr::StartReactiveIO (ACE_UINT16 port, const char* address)
|
||||
|
||||
ACE_INET_Addr listen_addr (port, address);
|
||||
|
||||
if (acc->open (listen_addr, m_NetThreads[0].GetReactor (), ACE_NONBLOCK) == -1)
|
||||
if (acc->open(listen_addr, m_NetThreads[0].GetReactor(), ACE_NONBLOCK) == -1)
|
||||
{
|
||||
sLog.outError ("Failed to open acceptor ,check if the port is free");
|
||||
return -1;
|
||||
}
|
||||
|
||||
for (size_t i = 0; i < m_NetThreadsCount; ++i)
|
||||
m_NetThreads[i].Start ();
|
||||
m_NetThreads[i].Start();
|
||||
|
||||
return 0;
|
||||
}
|
||||
@@ -277,42 +277,42 @@ WorldSocketMgr::StartReactiveIO (ACE_UINT16 port, const char* address)
|
||||
int
|
||||
WorldSocketMgr::StartNetwork (ACE_UINT16 port, const char* address)
|
||||
{
|
||||
if (!sLog.IsOutDebug ())
|
||||
ACE_Log_Msg::instance ()->priority_mask (LM_ERROR, ACE_Log_Msg::PROCESS);
|
||||
if (!sLog.IsOutDebug())
|
||||
ACE_Log_Msg::instance()->priority_mask (LM_ERROR, ACE_Log_Msg::PROCESS);
|
||||
|
||||
if (StartReactiveIO (port, address) == -1)
|
||||
if (StartReactiveIO(port, address) == -1)
|
||||
return -1;
|
||||
|
||||
return 0;
|
||||
}
|
||||
|
||||
void
|
||||
WorldSocketMgr::StopNetwork ()
|
||||
WorldSocketMgr::StopNetwork()
|
||||
{
|
||||
if (m_Acceptor)
|
||||
{
|
||||
WorldSocket::Acceptor* acc = dynamic_cast<WorldSocket::Acceptor*> (m_Acceptor);
|
||||
|
||||
if (acc)
|
||||
acc->close ();
|
||||
acc->close();
|
||||
}
|
||||
|
||||
if (m_NetThreadsCount != 0)
|
||||
{
|
||||
for (size_t i = 0; i < m_NetThreadsCount; ++i)
|
||||
m_NetThreads[i].Stop ();
|
||||
m_NetThreads[i].Stop();
|
||||
}
|
||||
|
||||
Wait ();
|
||||
Wait();
|
||||
}
|
||||
|
||||
void
|
||||
WorldSocketMgr::Wait ()
|
||||
WorldSocketMgr::Wait()
|
||||
{
|
||||
if (m_NetThreadsCount != 0)
|
||||
{
|
||||
for (size_t i = 0; i < m_NetThreadsCount; ++i)
|
||||
m_NetThreads[i].Wait ();
|
||||
m_NetThreads[i].Wait();
|
||||
}
|
||||
}
|
||||
|
||||
@@ -322,7 +322,7 @@ WorldSocketMgr::OnSocketOpen (WorldSocket* sock)
|
||||
// set some options here
|
||||
if (m_SockOutKBuff >= 0)
|
||||
{
|
||||
if (sock->peer ().set_option (SOL_SOCKET,
|
||||
if (sock->peer().set_option (SOL_SOCKET,
|
||||
SO_SNDBUF,
|
||||
(void*) & m_SockOutKBuff,
|
||||
sizeof (int)) == -1 && errno != ENOTSUP)
|
||||
@@ -337,12 +337,12 @@ WorldSocketMgr::OnSocketOpen (WorldSocket* sock)
|
||||
// Set TCP_NODELAY.
|
||||
if (m_UseNoDelay)
|
||||
{
|
||||
if (sock->peer ().set_option (ACE_IPPROTO_TCP,
|
||||
if (sock->peer().set_option (ACE_IPPROTO_TCP,
|
||||
TCP_NODELAY,
|
||||
(void*)&ndoption,
|
||||
sizeof (int)) == -1)
|
||||
{
|
||||
sLog.outError ("WorldSocketMgr::OnSocketOpen: peer ().set_option TCP_NODELAY errno = %s", ACE_OS::strerror (errno));
|
||||
sLog.outError ("WorldSocketMgr::OnSocketOpen: peer().set_option TCP_NODELAY errno = %s", ACE_OS::strerror (errno));
|
||||
return -1;
|
||||
}
|
||||
}
|
||||
@@ -355,14 +355,14 @@ WorldSocketMgr::OnSocketOpen (WorldSocket* sock)
|
||||
ACE_ASSERT (m_NetThreadsCount >= 1);
|
||||
|
||||
for (size_t i = 1; i < m_NetThreadsCount; ++i)
|
||||
if (m_NetThreads[i].Connections () < m_NetThreads[min].Connections ())
|
||||
if (m_NetThreads[i].Connections() < m_NetThreads[min].Connections())
|
||||
min = i;
|
||||
|
||||
return m_NetThreads[min].AddSocket (sock);
|
||||
}
|
||||
|
||||
WorldSocketMgr*
|
||||
WorldSocketMgr::Instance ()
|
||||
WorldSocketMgr::Instance()
|
||||
{
|
||||
return ACE_Singleton<WorldSocketMgr,ACE_Thread_Mutex>::instance();
|
||||
}
|
||||
|
||||
@@ -46,22 +46,22 @@ public:
|
||||
int StartNetwork (ACE_UINT16 port, const char* address);
|
||||
|
||||
/// Stops all network threads, It will wait for all running threads .
|
||||
void StopNetwork ();
|
||||
void StopNetwork();
|
||||
|
||||
/// Wait untill all network threads have "joined" .
|
||||
void Wait ();
|
||||
void Wait();
|
||||
|
||||
/// Make this class singleton .
|
||||
static WorldSocketMgr* Instance ();
|
||||
|
||||
static WorldSocketMgr* Instance();
|
||||
|
||||
private:
|
||||
int OnSocketOpen(WorldSocket* sock);
|
||||
|
||||
int StartReactiveIO(ACE_UINT16 port, const char* address);
|
||||
|
||||
private:
|
||||
WorldSocketMgr ();
|
||||
virtual ~WorldSocketMgr ();
|
||||
WorldSocketMgr();
|
||||
virtual ~WorldSocketMgr();
|
||||
|
||||
ReactorRunnable* m_NetThreads;
|
||||
size_t m_NetThreadsCount;
|
||||
@@ -73,7 +73,7 @@ private:
|
||||
ACE_Event_Handler* m_Acceptor;
|
||||
};
|
||||
|
||||
#define sWorldSocketMgr WorldSocketMgr::Instance ()
|
||||
#define sWorldSocketMgr WorldSocketMgr::Instance()
|
||||
|
||||
#endif
|
||||
/// @}
|
||||
|
||||
Reference in New Issue
Block a user