diff options
-rw-r--r-- | src/game/WorldSocket.cpp | 452 | ||||
-rw-r--r-- | src/game/WorldSocket.h | 26 | ||||
-rw-r--r-- | src/game/WorldSocketMgr.cpp | 110 | ||||
-rw-r--r-- | src/game/WorldSocketMgr.h | 14 |
4 files changed, 277 insertions, 325 deletions
diff --git a/src/game/WorldSocket.cpp b/src/game/WorldSocket.cpp index 7dcb6056fbe..98cfe84a379 100644 --- a/src/game/WorldSocket.cpp +++ b/src/game/WorldSocket.cpp @@ -100,19 +100,22 @@ struct ClientPktHeader #endif WorldSocket::WorldSocket (void) : -WorldHandler (), -m_Session (0), -m_RecvWPct (0), -m_RecvPct (), -m_Header (sizeof (ClientPktHeader)), -m_OutBuffer (0), -m_OutBufferSize (65536), -m_OutActive (false), -m_Seed (static_cast<uint32> (rand32 ())), -m_OverSpeedPings (0), -m_LastPingTime (ACE_Time_Value::zero) +WorldHandler(), +m_Session(0), +m_RecvWPct(0), +m_RecvPct(), +m_Header(sizeof (ClientPktHeader)), +m_OutBuffer(0), +m_OutBufferSize(65536), +m_OutActive(false), +m_Seed(static_cast<uint32> (rand32())), +m_OverSpeedPings(0), +m_LastPingTime(ACE_Time_Value::zero) { - reference_counting_policy ().value (ACE_Event_Handler::Reference_Counting_Policy::ENABLED); + reference_counting_policy().value (ACE_Event_Handler::Reference_Counting_Policy::ENABLED); + + msg_queue()->high_water_mark(8*1024*1024); + msg_queue()->low_water_mark(8*1024*1024); } WorldSocket::~WorldSocket (void) @@ -121,15 +124,11 @@ WorldSocket::~WorldSocket (void) delete m_RecvWPct; if (m_OutBuffer) - m_OutBuffer->release (); + m_OutBuffer->release(); closing_ = true; - peer ().close (); - - WorldPacket* pct; - while (m_PacketQueue.dequeue_head (pct) == 0) - delete pct; + peer().close(); } bool WorldSocket::IsClosed (void) const @@ -146,7 +145,7 @@ void WorldSocket::CloseSocket (void) return; closing_ = true; - peer ().close_writer (); + peer().close_writer(); } { @@ -169,46 +168,56 @@ int WorldSocket::SendPacket (const WorldPacket& pct) return -1; // Dump outgoing packet. - if (sWorldLog.LogWorld ()) + if (sWorldLog.LogWorld()) { sWorldLog.outTimestampLog ("SERVER:\nSOCKET: %u\nLENGTH: %u\nOPCODE: %s (0x%.4X)\nDATA:\n", - (uint32) get_handle (), - pct.size (), - LookupOpcodeName (pct.GetOpcode ()), - pct.GetOpcode ()); + (uint32) get_handle(), + pct.size(), + LookupOpcodeName (pct.GetOpcode()), + pct.GetOpcode()); uint32 p = 0; - while (p < pct.size ()) + while (p < pct.size()) { - for (uint32 j = 0; j < 16 && p < pct.size (); j++) - sWorldLog.outLog ("%.2X ", const_cast<WorldPacket&>(pct)[p++]); + for (uint32 j = 0; j < 16 && p < pct.size(); j++) + sWorldLog.outLog("%.2X ", const_cast<WorldPacket&>(pct)[p++]); - sWorldLog.outLog ("\n"); + sWorldLog.outLog("\n"); } - sWorldLog.outLog ("\n"); + sWorldLog.outLog("\n"); } - // don't try to send the packet if there are packets on the queue - if (!m_PacketQueue.is_empty() || iSendPacket(pct) == -1) + ServerPktHeader header(pct.size()+2, pct.GetOpcode()); + m_Crypt.EncryptSend ((uint8*)header.header, header.getHeaderLength()); + + if (m_OutBuffer->space() >= pct.size() + header.getHeaderLength() && msg_queue()->is_empty()) { - WorldPacket* npct; + // Put the packet on the buffer. + if (m_OutBuffer->copy((char*) header.header, header.getHeaderLength()) == -1) + ACE_ASSERT (false); - ACE_NEW_RETURN (npct, WorldPacket (pct), -1); + if (!pct.empty()) + if (m_OutBuffer->copy((char*) pct.contents(), pct.size()) == -1) + ACE_ASSERT (false); + } + else + { + // Enqueue the packet. + ACE_Message_Block* mb; + + ACE_NEW_RETURN(mb, ACE_Message_Block(pct.size() + header.getHeaderLength()), -1); + + mb->copy((char*) header.header, header.getHeaderLength()); - // NOTE maybe check of the size of the queue can be good ? - // to make it bounded instead of unbounded - if (m_PacketQueue.enqueue_tail (npct) == -1) + if (!pct.empty()) + mb->copy((const char*)pct.contents(), pct.size()); + + if (msg_queue()->enqueue_tail(mb,(ACE_Time_Value*)&ACE_Time_Value::zero) == -1) { - delete npct; - sLog.outError ("WorldSocket::SendPacket: m_PacketQueue.enqueue_tail failed"); + sLog.outError("WorldSocket::SendPacket enqueue_tail failed"); + mb->release(); return -1; } - - // iSendPacket may fail if the packet is larger than the buffer (even in the buffer is empty) - // So we must try to flush so the packet may be sent partially - // don't cancel_wakeup_output here - if (iFlushPacketQueue()) - return schedule_wakeup_output(Guard); } return 0; @@ -216,12 +225,12 @@ int WorldSocket::SendPacket (const WorldPacket& pct) long WorldSocket::AddReference (void) { - return static_cast<long> (add_reference ()); + return static_cast<long> (add_reference()); } long WorldSocket::RemoveReference (void) { - return static_cast<long> (remove_reference ()); + return static_cast<long> (remove_reference()); } int WorldSocket::open (void *a) @@ -237,7 +246,7 @@ int WorldSocket::open (void *a) m_OutActive = true; // Hook for the manager. - if (sWorldSocketMgr->OnSocketOpen (this) == -1) + if (sWorldSocketMgr->OnSocketOpen(this) == -1) return -1; // Allocate the buffer. @@ -246,14 +255,14 @@ int WorldSocket::open (void *a) // Store peer address. ACE_INET_Addr remote_addr; - if (peer ().get_remote_addr (remote_addr) == -1) + if (peer().get_remote_addr(remote_addr) == -1) { - sLog.outError ("WorldSocket::open: peer ().get_remote_addr errno = %s", ACE_OS::strerror (errno)); + sLog.outError ("WorldSocket::open: peer().get_remote_addr errno = %s", ACE_OS::strerror (errno)); return -1; } - m_Address = remote_addr.get_host_addr (); - + m_Address = remote_addr.get_host_addr(); + // Send startup packet. WorldPacket packet (SMSG_AUTH_CHALLENGE, 24); packet << uint32(1); // 1...31 @@ -267,29 +276,29 @@ int WorldSocket::open (void *a) seed2.SetRand(16 * 8); packet.append(seed2.AsByteArray(16), 16); // new encryption seeds - if (SendPacket (packet) == -1) + if (SendPacket(packet) == -1) return -1; - + // Register with ACE Reactor - if (reactor ()->register_handler(this, ACE_Event_Handler::READ_MASK | ACE_Event_Handler::WRITE_MASK) == -1) + if (reactor()->register_handler(this, ACE_Event_Handler::READ_MASK | ACE_Event_Handler::WRITE_MASK) == -1) { sLog.outError ("WorldSocket::open: unable to register client handler errno = %s", ACE_OS::strerror (errno)); return -1; } - + // reactor takes care of the socket from now on - remove_reference (); + remove_reference(); return 0; } int WorldSocket::close (int) { - shutdown (); + shutdown(); closing_ = true; - remove_reference (); + remove_reference(); return 0; } @@ -299,24 +308,24 @@ int WorldSocket::handle_input (ACE_HANDLE) if (closing_) return -1; - switch (handle_input_missing_data ()) + switch (handle_input_missing_data()) { case -1 : { if ((errno == EWOULDBLOCK) || (errno == EAGAIN)) { - return Update (); // interesting line ,isn't it ? + return Update(); // interesting line ,isn't it ? } - DEBUG_LOG ("WorldSocket::handle_input: Peer error closing connection errno = %s", ACE_OS::strerror (errno)); + DEBUG_LOG("WorldSocket::handle_input: Peer error closing connection errno = %s", ACE_OS::strerror (errno)); errno = ECONNRESET; return -1; } case 0: { - DEBUG_LOG ("WorldSocket::handle_input: Peer has closed connection"); + DEBUG_LOG("WorldSocket::handle_input: Peer has closed connection"); errno = ECONNRESET; return -1; @@ -324,7 +333,7 @@ int WorldSocket::handle_input (ACE_HANDLE) case 1: return 1; default: - return Update (); // another interesting line ;) + return Update(); // another interesting line ;) } ACE_NOTREACHED(return -1); @@ -340,17 +349,12 @@ int WorldSocket::handle_output (ACE_HANDLE) size_t send_len = m_OutBuffer->length(); if (send_len == 0) - { - if (!iFlushPacketQueue()) - return cancel_wakeup_output(Guard); - - send_len = m_OutBuffer->length (); - } + return handle_output_queue(Guard); #ifdef MSG_NOSIGNAL - ssize_t n = peer ().send (m_OutBuffer->rd_ptr (), send_len, MSG_NOSIGNAL); + ssize_t n = peer().send (m_OutBuffer->rd_ptr(), send_len, MSG_NOSIGNAL); #else - ssize_t n = peer ().send (m_OutBuffer->rd_ptr (), send_len); + ssize_t n = peer().send (m_OutBuffer->rd_ptr(), send_len); #endif // MSG_NOSIGNAL if (n == 0) @@ -362,28 +366,86 @@ int WorldSocket::handle_output (ACE_HANDLE) return -1; } - else if (n < send_len) //now n > 0 + else if (n < (ssize_t)send_len) //now n > 0 { m_OutBuffer->rd_ptr (static_cast<size_t> (n)); // move the data to the base of the buffer - m_OutBuffer->crunch (); + m_OutBuffer->crunch(); return schedule_wakeup_output (Guard); } else //now n == send_len { - m_OutBuffer->reset (); + m_OutBuffer->reset(); - if (!iFlushPacketQueue ()) - return cancel_wakeup_output (Guard); - else - return schedule_wakeup_output (Guard); + return handle_output_queue (Guard); } ACE_NOTREACHED (return 0); } +int WorldSocket::handle_output_queue (GuardType& g) +{ + if (msg_queue()->is_empty()) + return cancel_wakeup_output(g); + + ACE_Message_Block *mblk; + + if (msg_queue()->dequeue_head(mblk, (ACE_Time_Value*)&ACE_Time_Value::zero) == -1) + { + sLog.outError("WorldSocket::handle_output_queue dequeue_head"); + return -1; + } + + const size_t send_len = mblk->length(); + +#ifdef MSG_NOSIGNAL + ssize_t n = peer().send (mblk->rd_ptr(), send_len, MSG_NOSIGNAL); +#else + ssize_t n = peer().send (mblk->rd_ptr(), send_len); +#endif // MSG_NOSIGNAL + + if (n == 0) + { + mblk->release(); + + return -1; + } + else if (n == -1) + { + if (errno == EWOULDBLOCK || errno == EAGAIN) + { + msg_queue()->enqueue_head(mblk, (ACE_Time_Value*) &ACE_Time_Value::zero); + return schedule_wakeup_output (g); + } + + mblk->release(); + return -1; + } + else if (n < (ssize_t)send_len) //now n > 0 + { + mblk->rd_ptr(static_cast<size_t> (n)); + + if (msg_queue()->enqueue_head(mblk, (ACE_Time_Value*) &ACE_Time_Value::zero) == -1) + { + sLog.outError("WorldSocket::handle_output_queue enqueue_head"); + mblk->release(); + return -1; + } + + return schedule_wakeup_output (g); + } + else //now n == send_len + { + mblk->release(); + + return msg_queue()->is_empty() ? cancel_wakeup_output(g) : ACE_Event_Handler::WRITE_MASK; + } + + ACE_NOTREACHED(return -1); +} + int WorldSocket::handle_close (ACE_HANDLE h, ACE_Reactor_Mask) { // Critical section @@ -393,7 +455,7 @@ int WorldSocket::handle_close (ACE_HANDLE h, ACE_Reactor_Mask) closing_ = true; if (h == ACE_INVALID_HANDLE) - peer ().close_writer (); + peer().close_writer(); } // Critical section @@ -411,21 +473,26 @@ int WorldSocket::Update (void) if (closing_) return -1; - if (m_OutActive || m_OutBuffer->length () == 0) + if (m_OutActive || (m_OutBuffer->length() == 0 && msg_queue()->is_empty())) return 0; - return handle_output (get_handle ()); + int ret; + do + ret = handle_output (get_handle()); + while (ret > 0); + + return ret; } int WorldSocket::handle_input_header (void) { ACE_ASSERT (m_RecvWPct == NULL); - ACE_ASSERT (m_Header.length () == sizeof (ClientPktHeader)); + ACE_ASSERT (m_Header.length() == sizeof(ClientPktHeader)); - m_Crypt.DecryptRecv ((uint8*) m_Header.rd_ptr (), sizeof (ClientPktHeader)); + m_Crypt.DecryptRecv ((uint8*) m_Header.rd_ptr(), sizeof(ClientPktHeader)); - ClientPktHeader& header = *((ClientPktHeader*) m_Header.rd_ptr ()); + ClientPktHeader& header = *((ClientPktHeader*) m_Header.rd_ptr()); EndianConvertReverse(header.size); EndianConvert(header.cmd); @@ -446,7 +513,7 @@ int WorldSocket::handle_input_header (void) if (header.size > 0) { m_RecvWPct->resize (header.size); - m_RecvPct.base ((char*) m_RecvWPct->contents (), m_RecvWPct->size ()); + m_RecvPct.base ((char*) m_RecvWPct->contents(), m_RecvWPct->size()); } else { @@ -461,17 +528,17 @@ int WorldSocket::handle_input_payload (void) // set errno properly here on error !!! // now have a header and payload - ACE_ASSERT (m_RecvPct.space () == 0); - ACE_ASSERT (m_Header.space () == 0); + ACE_ASSERT (m_RecvPct.space() == 0); + ACE_ASSERT (m_Header.space() == 0); ACE_ASSERT (m_RecvWPct != NULL); const int ret = ProcessIncoming (m_RecvWPct); m_RecvPct.base (NULL, 0); - m_RecvPct.reset (); + m_RecvPct.reset(); m_RecvWPct = NULL; - m_Header.reset (); + m_Header.reset(); if (ret == -1) errno = EINVAL; @@ -495,9 +562,9 @@ int WorldSocket::handle_input_missing_data (void) ACE_Message_Block::DONT_DELETE, 0); - const size_t recv_size = message_block.space (); + const size_t recv_size = message_block.space(); - const ssize_t n = peer ().recv (message_block.wr_ptr (), + const ssize_t n = peer().recv (message_block.wr_ptr(), recv_size); if (n <= 0) @@ -505,25 +572,25 @@ int WorldSocket::handle_input_missing_data (void) message_block.wr_ptr (n); - while (message_block.length () > 0) + while (message_block.length() > 0) { - if (m_Header.space () > 0) + if (m_Header.space() > 0) { //need to receive the header - const size_t to_header = (message_block.length () > m_Header.space () ? m_Header.space () : message_block.length ()); - m_Header.copy (message_block.rd_ptr (), to_header); + const size_t to_header = (message_block.length() > m_Header.space() ? m_Header.space() : message_block.length()); + m_Header.copy (message_block.rd_ptr(), to_header); message_block.rd_ptr (to_header); - if (m_Header.space () > 0) + if (m_Header.space() > 0) { // Couldn't receive the whole header this time. - ACE_ASSERT (message_block.length () == 0); + ACE_ASSERT (message_block.length() == 0); errno = EWOULDBLOCK; return -1; } // We just received nice new header - if (handle_input_header () == -1) + if (handle_input_header() == -1) { ACE_ASSERT ((errno != EWOULDBLOCK) && (errno != EAGAIN)); return -1; @@ -541,24 +608,24 @@ int WorldSocket::handle_input_missing_data (void) } // We have full read header, now check the data payload - if (m_RecvPct.space () > 0) + if (m_RecvPct.space() > 0) { //need more data in the payload - const size_t to_data = (message_block.length () > m_RecvPct.space () ? m_RecvPct.space () : message_block.length ()); - m_RecvPct.copy (message_block.rd_ptr (), to_data); + const size_t to_data = (message_block.length() > m_RecvPct.space() ? m_RecvPct.space() : message_block.length()); + m_RecvPct.copy (message_block.rd_ptr(), to_data); message_block.rd_ptr (to_data); - if (m_RecvPct.space () > 0) + if (m_RecvPct.space() > 0) { // Couldn't receive the whole data this time. - ACE_ASSERT (message_block.length () == 0); + ACE_ASSERT (message_block.length() == 0); errno = EWOULDBLOCK; return -1; } } //just received fresh new payload - if (handle_input_payload () == -1) + if (handle_input_payload() == -1) { ACE_ASSERT ((errno != EWOULDBLOCK) && (errno != EAGAIN)); return -1; @@ -575,9 +642,9 @@ int WorldSocket::cancel_wakeup_output (GuardType& g) m_OutActive = false; - g.release (); + g.release(); - if (reactor ()->cancel_wakeup + if (reactor()->cancel_wakeup (this, ACE_Event_Handler::WRITE_MASK) == -1) { // would be good to store errno from reactor with errno guard @@ -595,9 +662,9 @@ int WorldSocket::schedule_wakeup_output (GuardType& g) m_OutActive = true; - g.release (); + g.release(); - if (reactor ()->schedule_wakeup + if (reactor()->schedule_wakeup (this, ACE_Event_Handler::WRITE_MASK) == -1) { sLog.outError ("WorldSocket::schedule_wakeup_output"); @@ -614,24 +681,24 @@ int WorldSocket::ProcessIncoming (WorldPacket* new_pct) // manage memory ;) ACE_Auto_Ptr<WorldPacket> aptr (new_pct); - const ACE_UINT16 opcode = new_pct->GetOpcode (); + const ACE_UINT16 opcode = new_pct->GetOpcode(); if (closing_) return -1; // Dump received packet. - if (sWorldLog.LogWorld ()) + if (sWorldLog.LogWorld()) { sWorldLog.outTimestampLog ("CLIENT:\nSOCKET: %u\nLENGTH: %u\nOPCODE: %s (0x%.4X)\nDATA:\n", - (uint32) get_handle (), - new_pct->size (), - LookupOpcodeName (new_pct->GetOpcode ()), - new_pct->GetOpcode ()); + (uint32) get_handle(), + new_pct->size(), + LookupOpcodeName (new_pct->GetOpcode()), + new_pct->GetOpcode()); uint32 p = 0; - while (p < new_pct->size ()) + while (p < new_pct->size()) { - for (uint32 j = 0; j < 16 && p < new_pct->size (); j++) + for (uint32 j = 0; j < 16 && p < new_pct->size(); j++) sWorldLog.outLog ("%.2X ", (*new_pct)[p++]); sWorldLog.outLog ("\n"); @@ -653,7 +720,7 @@ int WorldSocket::ProcessIncoming (WorldPacket* new_pct) return HandleAuthSession (*new_pct); case CMSG_KEEP_ALIVE: - DEBUG_LOG ("CMSG_KEEP_ALIVE ,size: %d", new_pct->size ()); + DEBUG_LOG ("CMSG_KEEP_ALIVE ,size: %d", new_pct->size()); return 0; default: @@ -677,7 +744,7 @@ int WorldSocket::ProcessIncoming (WorldPacket* new_pct) m_Session->UpdateTimeOutTime(false); // OK ,give the packet to WorldSession - aptr.release (); + aptr.release(); // WARNINIG here we call it with locks held. // Its possible to cause deadlock if QueuePacket calls back m_Session->QueuePacket (new_pct); @@ -747,7 +814,7 @@ int WorldSocket::HandleAuthSession (WorldPacket& recvPacket) DEBUG_LOG ("WorldSocket::HandleAuthSession: client %u, unk2 %u, account %s, unk3 %u, clientseed %u", BuiltNumberClient, unk2, - account.c_str (), + account.c_str(), unk3, clientSeed); @@ -769,7 +836,7 @@ int WorldSocket::HandleAuthSession (WorldPacket& recvPacket) "locale " //8 "FROM account " "WHERE username = '%s'", - safe_account.c_str ()); + safe_account.c_str()); // Stop if the account is not found if (!result) @@ -783,7 +850,7 @@ int WorldSocket::HandleAuthSession (WorldPacket& recvPacket) return -1; } - Field* fields = result->Fetch (); + Field* fields = result->Fetch(); uint8 expansion = fields[6].GetUInt8(); uint32 world_expansion = sWorld.getConfig(CONFIG_EXPANSION); @@ -795,10 +862,10 @@ int WorldSocket::HandleAuthSession (WorldPacket& recvPacket) g.SetDword (7); v.SetHexStr(fields[4].GetString()); - s.SetHexStr (fields[5].GetString ()); + s.SetHexStr (fields[5].GetString()); - const char* sStr = s.AsHexStr (); //Must be freed by OPENSSL_free() - const char* vStr = v.AsHexStr (); //Must be freed by OPENSSL_free() + const char* sStr = s.AsHexStr(); //Must be freed by OPENSSL_free() + const char* vStr = v.AsHexStr(); //Must be freed by OPENSSL_free() DEBUG_LOG ("WorldSocket::HandleAuthSession: (s,v) check s: %s v: %s", sStr, @@ -808,9 +875,9 @@ int WorldSocket::HandleAuthSession (WorldPacket& recvPacket) OPENSSL_free ((void*) vStr); ///- Re-check ip locking (same check as in realmd). - if (fields[3].GetUInt8 () == 1) // if ip is locked + if (fields[3].GetUInt8() == 1) // if ip is locked { - if (strcmp (fields[2].GetString (), GetRemoteAddress ().c_str ())) + if (strcmp (fields[2].GetString(), GetRemoteAddress().c_str())) { packet.Initialize (SMSG_AUTH_RESPONSE, 1); packet << uint8 (AUTH_FAILED); @@ -821,17 +888,17 @@ int WorldSocket::HandleAuthSession (WorldPacket& recvPacket) } } - id = fields[0].GetUInt32 (); + id = fields[0].GetUInt32(); /* if (security > SEC_ADMINISTRATOR) // prevent invalid security settings in DB security = SEC_ADMINISTRATOR; */ - K.SetHexStr (fields[1].GetString ()); + K.SetHexStr (fields[1].GetString()); - time_t mutetime = time_t (fields[7].GetUInt64 ()); + time_t mutetime = time_t (fields[7].GetUInt64()); - locale = LocaleConstant (fields[8].GetUInt8 ()); + locale = LocaleConstant (fields[8].GetUInt8()); if (locale >= MAX_LOCALE) locale = LOCALE_enUS; @@ -849,7 +916,7 @@ int WorldSocket::HandleAuthSession (WorldPacket& recvPacket) security = 0; else { - fields = result->Fetch (); + fields = result->Fetch(); security = fields[1].GetInt32(); } @@ -872,7 +939,7 @@ int WorldSocket::HandleAuthSession (WorldPacket& recvPacket) // Check locked state for server sWorld.UpdateAllowedSecurity(); - AccountTypes allowedAccountType = sWorld.GetPlayerSecurityLimit (); + AccountTypes allowedAccountType = sWorld.GetPlayerSecurityLimit(); sLog.outDebug("Allowed Level: %u Player Level %u", allowedAccountType, AccountTypes(security)); if (allowedAccountType > SEC_PLAYER && AccountTypes(security) < allowedAccountType) { @@ -896,9 +963,9 @@ int WorldSocket::HandleAuthSession (WorldPacket& recvPacket) sha.UpdateData ((uint8 *) & clientSeed, 4); sha.UpdateData ((uint8 *) & seed, 4); sha.UpdateBigNumbers (&K, NULL); - sha.Finalize (); + sha.Finalize(); - if (memcmp (sha.GetDigest (), digest, 20)) + if (memcmp (sha.GetDigest(), digest, 20)) { packet.Initialize (SMSG_AUTH_RESPONSE, 1); packet << uint8 (AUTH_FAILED); @@ -909,11 +976,11 @@ int WorldSocket::HandleAuthSession (WorldPacket& recvPacket) return -1; } - std::string address = GetRemoteAddress (); + std::string address = GetRemoteAddress(); DEBUG_LOG ("WorldSocket::HandleAuthSession: Client '%s' authenticated successfully from %s.", - account.c_str (), - address.c_str ()); + account.c_str(), + address.c_str()); // Update the last_ip in the database // No SQL injection, username escaped. @@ -922,8 +989,8 @@ int WorldSocket::HandleAuthSession (WorldPacket& recvPacket) LoginDatabase.PExecute ("UPDATE account " "SET last_ip = '%s' " "WHERE username = '%s'", - address.c_str (), - safe_account.c_str ()); + address.c_str(), + safe_account.c_str()); // NOTE ATM the socket is single-threaded, have this in mind ... ACE_NEW_RETURN (m_Session, WorldSession (id, this, AccountTypes(security), expansion, mutetime, locale), -1); @@ -952,10 +1019,10 @@ int WorldSocket::HandlePing (WorldPacket& recvPacket) recvPacket >> latency; if (m_LastPingTime == ACE_Time_Value::zero) - m_LastPingTime = ACE_OS::gettimeofday (); // for 1st ping + m_LastPingTime = ACE_OS::gettimeofday(); // for 1st ping else { - ACE_Time_Value cur_time = ACE_OS::gettimeofday (); + ACE_Time_Value cur_time = ACE_OS::gettimeofday(); ACE_Time_Value diff_time (cur_time); diff_time -= m_LastPingTime; m_LastPingTime = cur_time; @@ -970,11 +1037,11 @@ int WorldSocket::HandlePing (WorldPacket& recvPacket) { ACE_GUARD_RETURN (LockType, Guard, m_SessionLock, -1); - if (m_Session && m_Session->GetSecurity () == SEC_PLAYER) + if (m_Session && m_Session->GetSecurity() == SEC_PLAYER) { sLog.outError ("WorldSocket::HandlePing: Player kicked for " "over-speed pings address = %s", - GetRemoteAddress ().c_str ()); + GetRemoteAddress().c_str()); return -1; } @@ -995,7 +1062,7 @@ int WorldSocket::HandlePing (WorldPacket& recvPacket) sLog.outError ("WorldSocket::HandlePing: peer sent CMSG_PING, " "but is not authenticated or got recently kicked," " address = %s", - GetRemoteAddress ().c_str ()); + GetRemoteAddress().c_str()); return -1; } } @@ -1004,100 +1071,3 @@ int WorldSocket::HandlePing (WorldPacket& recvPacket) packet << ping; return SendPacket (packet); } - -int WorldSocket::iSendPacket (const WorldPacket& pct) -{ - ServerPktHeader header(pct.size()+2, pct.GetOpcode()); - if (m_OutBuffer->space() < pct.size () + header.getHeaderLength()) - { - errno = ENOBUFS; - return -1; - } - - m_Crypt.EncryptSend (header.header, header.getHeaderLength()); - - if (m_OutBuffer->copy ((char*) header.header, header.getHeaderLength()) == -1) - ACE_ASSERT (false); - - if (!pct.empty ()) - if (m_OutBuffer->copy ((char*) pct.contents (), pct.size ()) == -1) - ACE_ASSERT (false); - - return 0; -} - -/// Return 1 if some bytes written and packet completed -/// Return 0 if no byte written, but still remaining data -/// Return -1 if some bytes written and still remaining data -int WorldSocket::iSendPartialPacket(WorldPacket& pct) -{ - size_t remainingLen = pct.size() - pct.rpos(); - - if (pct.rpos() == 0) // nothing sent yet => send header - { - ServerPktHeader header(pct.size()+2, pct.GetOpcode()); - - // check if there is at least enough space for the header - if (m_OutBuffer->space () < header.getHeaderLength()) - return 0; // nothing written, still remaining data => 0 - - m_Crypt.EncryptSend(header.header, header.getHeaderLength()); - - if (m_OutBuffer->copy((char*) header.header, header.getHeaderLength()) == -1) - return 0; // no space left. This should not happend. Just return 0 as nothing written and still remaining data - - if (remainingLen == 0) - return 1; // header written, nothing else => 1 - } - - if ((m_OutBuffer->space()) < remainingLen) { - size_t len = m_OutBuffer->space(); - - if (m_OutBuffer->copy((char*) (pct.contents() + pct.rpos()), len) == -1) - ACE_ASSERT (false); - - pct.read_skip(len); - return -1; // packet will be pushed back on the queue - } - - if (m_OutBuffer->copy((char*) (pct.contents() + pct.rpos()), remainingLen) == -1) - ACE_ASSERT (false); - - return 1; // some byte written and packet completed -} - -bool WorldSocket::iFlushPacketQueue() -{ - WorldPacket *pct; - bool haveone = false; - - while (m_PacketQueue.dequeue_head(pct) == 0) - { - int result = iSendPartialPacket(*pct); - - if (result != 0) - { - // some bytes were written - haveone = true; - } - - if (result <= 0) - { - if (m_PacketQueue.enqueue_head(pct) == -1) - { - delete pct; - sLog.outError ("WorldSocket::iFlushPacketQueue m_PacketQueue->enqueue_head"); - return false; - } - - break; - } - else - { - // packet completed - delete pct; - } - } - - return haveone; -} diff --git a/src/game/WorldSocket.h b/src/game/WorldSocket.h index 17de5c19c36..70654274215 100644 --- a/src/game/WorldSocket.h +++ b/src/game/WorldSocket.h @@ -73,7 +73,7 @@ typedef ACE_Svc_Handler<ACE_SOCK_STREAM, ACE_NULL_SYNCH> WorldHandler; * sending packets from "producer" threads is minimal, * and doing a lot of writes with small size is tolerated. * - * The calls to Update () method are managed by WorldSocketMgr + * The calls to Update() method are managed by WorldSocketMgr * and ReactorRunnable. * * For input ,the class uses one 1024 bytes buffer on stack @@ -103,9 +103,6 @@ class WorldSocket : protected WorldHandler typedef ACE_Thread_Mutex LockType; typedef ACE_Guard<LockType> GuardType; - /// Queue for storing packets for which there is no space. - typedef ACE_Unbounded_Queue< WorldPacket* > PacketQueueT; - /// Check if socket is closed. bool IsClosed (void) const; @@ -160,6 +157,9 @@ class WorldSocket : protected WorldHandler /// @param g the guard is for m_OutBufferLock, the function will release it int cancel_wakeup_output (GuardType& g); int schedule_wakeup_output (GuardType& g); + + /// Drain the queue if its not empty. + int handle_output_queue (GuardType& g); /// process one incoming packet. /// @param new_pct received packet ,note that you need to delete it. @@ -171,20 +171,6 @@ class WorldSocket : protected WorldHandler /// Called by ProcessIncoming() on CMSG_PING. int HandlePing (WorldPacket& recvPacket); - /// Try to write WorldPacket to m_OutBuffer ,return -1 if no space - /// Need to be called with m_OutBufferLock lock held - int iSendPacket (const WorldPacket& pct); - - /// Try to write WorldPacket to m_OutBuffer even partially, - /// Need to be called with m_OutBufferLock lock held - int iSendPartialPacket(WorldPacket& pct); - - /// Flush m_PacketQueue if there are packets in it - /// Need to be called with m_OutBufferLock lock held - /// @return true if it wrote to the buffer (AKA you need - /// to mark the socket for output). - bool iFlushPacketQueue (); - private: /// Time in which the last ping was received ACE_Time_Value m_LastPingTime; @@ -224,10 +210,6 @@ class WorldSocket : protected WorldHandler /// Size of the m_OutBuffer. size_t m_OutBufferSize; - /// Here are stored packets for which there was no space on m_OutBuffer, - /// this allows not-to kick player if its buffer is overflowed. - PacketQueueT m_PacketQueue; - /// True if the socket is registered with the reactor for output bool m_OutActive; diff --git a/src/game/WorldSocketMgr.cpp b/src/game/WorldSocketMgr.cpp index 23a123ae990..c23d08e6f78 100644 --- a/src/game/WorldSocketMgr.cpp +++ b/src/game/WorldSocketMgr.cpp @@ -55,23 +55,23 @@ class ReactorRunnable : protected ACE_Task_Base { public: - ReactorRunnable () : - m_ThreadId (-1), - m_Connections (0), - m_Reactor (0) + ReactorRunnable() : + m_ThreadId(-1), + m_Connections(0), + m_Reactor(0) { ACE_Reactor_Impl* imp = 0; #if defined (ACE_HAS_EVENT_POLL) || defined (ACE_HAS_DEV_POLL) - imp = new ACE_Dev_Poll_Reactor (); + imp = new ACE_Dev_Poll_Reactor(); imp->max_notify_iterations (128); imp->restart (1); #else - imp = new ACE_TP_Reactor (); + imp = new ACE_TP_Reactor(); imp->max_notify_iterations (128); #endif @@ -79,33 +79,33 @@ class ReactorRunnable : protected ACE_Task_Base m_Reactor = new ACE_Reactor (imp, 1); } - virtual ~ReactorRunnable () + virtual ~ReactorRunnable() { - Stop (); - Wait (); + Stop(); + Wait(); if (m_Reactor) delete m_Reactor; } - void Stop () + void Stop() { - m_Reactor->end_reactor_event_loop (); + m_Reactor->end_reactor_event_loop(); } - int Start () + int Start() { if (m_ThreadId != -1) return -1; - return (m_ThreadId = activate ()); + return (m_ThreadId = activate()); } - void Wait () { ACE_Task_Base::wait (); } + void Wait() { ACE_Task_Base::wait(); } - long Connections () + long Connections() { - return static_cast<long> (m_Connections.value ()); + return static_cast<long> (m_Connections.value()); } int AddSocket (WorldSocket* sock) @@ -120,37 +120,37 @@ class ReactorRunnable : protected ACE_Task_Base return 0; } - ACE_Reactor* GetReactor () + ACE_Reactor* GetReactor() { return m_Reactor; } protected: - void AddNewSockets () + void AddNewSockets() { ACE_GUARD (ACE_Thread_Mutex, Guard, m_NewSockets_Lock); - if (m_NewSockets.empty ()) + if (m_NewSockets.empty()) return; - for (SocketSet::const_iterator i = m_NewSockets.begin (); i != m_NewSockets.end (); ++i) + for (SocketSet::const_iterator i = m_NewSockets.begin(); i != m_NewSockets.end(); ++i) { WorldSocket* sock = (*i); - if (sock->IsClosed ()) + if (sock->IsClosed()) { - sock->RemoveReference (); + sock->RemoveReference(); --m_Connections; } else m_Sockets.insert (sock); } - m_NewSockets.clear (); + m_NewSockets.clear(); } - virtual int svc () + virtual int svc() { DEBUG_LOG ("Network Thread Starting"); @@ -160,7 +160,7 @@ class ReactorRunnable : protected ACE_Task_Base SocketSet::iterator i, t; - while (!m_Reactor->reactor_event_loop_done ()) + while (!m_Reactor->reactor_event_loop_done()) { // dont be too smart to move this outside the loop // the run_reactor_event_loop will modify interval @@ -169,16 +169,16 @@ class ReactorRunnable : protected ACE_Task_Base if (m_Reactor->run_reactor_event_loop (interval) == -1) break; - AddNewSockets (); + AddNewSockets(); - for (i = m_Sockets.begin (); i != m_Sockets.end ();) + for (i = m_Sockets.begin(); i != m_Sockets.end();) { - if ((*i)->Update () == -1) + if ((*i)->Update() == -1) { t = i; ++i; - (*t)->CloseSocket (); - (*t)->RemoveReference (); + (*t)->CloseSocket(); + (*t)->RemoveReference(); --m_Connections; m_Sockets.erase (t); } @@ -208,17 +208,17 @@ class ReactorRunnable : protected ACE_Task_Base ACE_Thread_Mutex m_NewSockets_Lock; }; -WorldSocketMgr::WorldSocketMgr () : - m_NetThreadsCount (0), - m_NetThreads (0), - m_SockOutKBuff (-1), - m_SockOutUBuff (65536), - m_UseNoDelay (true), +WorldSocketMgr::WorldSocketMgr() : + m_NetThreadsCount(0), + m_NetThreads(0), + m_SockOutKBuff(-1), + m_SockOutUBuff(65536), + m_UseNoDelay(true), m_Acceptor (0) { } -WorldSocketMgr::~WorldSocketMgr () +WorldSocketMgr::~WorldSocketMgr() { if (m_NetThreads) delete [] m_NetThreads; @@ -244,8 +244,8 @@ WorldSocketMgr::StartReactiveIO (ACE_UINT16 port, const char* address) m_NetThreads = new ReactorRunnable[m_NetThreadsCount]; - sLog.outBasic ("Max allowed socket connections %d",ACE::max_handles ()); - + sLog.outBasic ("Max allowed socket connections %d", ACE::max_handles()); + // -1 means use default m_SockOutKBuff = sConfig.GetIntDefault ("Network.OutKBuff", -1); @@ -262,14 +262,14 @@ WorldSocketMgr::StartReactiveIO (ACE_UINT16 port, const char* address) ACE_INET_Addr listen_addr (port, address); - if (acc->open (listen_addr, m_NetThreads[0].GetReactor (), ACE_NONBLOCK) == -1) + if (acc->open(listen_addr, m_NetThreads[0].GetReactor(), ACE_NONBLOCK) == -1) { sLog.outError ("Failed to open acceptor ,check if the port is free"); return -1; } for (size_t i = 0; i < m_NetThreadsCount; ++i) - m_NetThreads[i].Start (); + m_NetThreads[i].Start(); return 0; } @@ -277,42 +277,42 @@ WorldSocketMgr::StartReactiveIO (ACE_UINT16 port, const char* address) int WorldSocketMgr::StartNetwork (ACE_UINT16 port, const char* address) { - if (!sLog.IsOutDebug ()) - ACE_Log_Msg::instance ()->priority_mask (LM_ERROR, ACE_Log_Msg::PROCESS); + if (!sLog.IsOutDebug()) + ACE_Log_Msg::instance()->priority_mask (LM_ERROR, ACE_Log_Msg::PROCESS); - if (StartReactiveIO (port, address) == -1) + if (StartReactiveIO(port, address) == -1) return -1; return 0; } void -WorldSocketMgr::StopNetwork () +WorldSocketMgr::StopNetwork() { if (m_Acceptor) { WorldSocket::Acceptor* acc = dynamic_cast<WorldSocket::Acceptor*> (m_Acceptor); if (acc) - acc->close (); + acc->close(); } if (m_NetThreadsCount != 0) { for (size_t i = 0; i < m_NetThreadsCount; ++i) - m_NetThreads[i].Stop (); + m_NetThreads[i].Stop(); } - Wait (); + Wait(); } void -WorldSocketMgr::Wait () +WorldSocketMgr::Wait() { if (m_NetThreadsCount != 0) { for (size_t i = 0; i < m_NetThreadsCount; ++i) - m_NetThreads[i].Wait (); + m_NetThreads[i].Wait(); } } @@ -322,7 +322,7 @@ WorldSocketMgr::OnSocketOpen (WorldSocket* sock) // set some options here if (m_SockOutKBuff >= 0) { - if (sock->peer ().set_option (SOL_SOCKET, + if (sock->peer().set_option (SOL_SOCKET, SO_SNDBUF, (void*) & m_SockOutKBuff, sizeof (int)) == -1 && errno != ENOTSUP) @@ -337,12 +337,12 @@ WorldSocketMgr::OnSocketOpen (WorldSocket* sock) // Set TCP_NODELAY. if (m_UseNoDelay) { - if (sock->peer ().set_option (ACE_IPPROTO_TCP, + if (sock->peer().set_option (ACE_IPPROTO_TCP, TCP_NODELAY, (void*)&ndoption, sizeof (int)) == -1) { - sLog.outError ("WorldSocketMgr::OnSocketOpen: peer ().set_option TCP_NODELAY errno = %s", ACE_OS::strerror (errno)); + sLog.outError ("WorldSocketMgr::OnSocketOpen: peer().set_option TCP_NODELAY errno = %s", ACE_OS::strerror (errno)); return -1; } } @@ -355,14 +355,14 @@ WorldSocketMgr::OnSocketOpen (WorldSocket* sock) ACE_ASSERT (m_NetThreadsCount >= 1); for (size_t i = 1; i < m_NetThreadsCount; ++i) - if (m_NetThreads[i].Connections () < m_NetThreads[min].Connections ()) + if (m_NetThreads[i].Connections() < m_NetThreads[min].Connections()) min = i; return m_NetThreads[min].AddSocket (sock); } WorldSocketMgr* -WorldSocketMgr::Instance () +WorldSocketMgr::Instance() { return ACE_Singleton<WorldSocketMgr,ACE_Thread_Mutex>::instance(); } diff --git a/src/game/WorldSocketMgr.h b/src/game/WorldSocketMgr.h index bea11159dd6..11345304962 100644 --- a/src/game/WorldSocketMgr.h +++ b/src/game/WorldSocketMgr.h @@ -46,22 +46,22 @@ public: int StartNetwork (ACE_UINT16 port, const char* address); /// Stops all network threads, It will wait for all running threads . - void StopNetwork (); + void StopNetwork(); /// Wait untill all network threads have "joined" . - void Wait (); + void Wait(); /// Make this class singleton . - static WorldSocketMgr* Instance (); - + static WorldSocketMgr* Instance(); + private: int OnSocketOpen(WorldSocket* sock); int StartReactiveIO(ACE_UINT16 port, const char* address); private: - WorldSocketMgr (); - virtual ~WorldSocketMgr (); + WorldSocketMgr(); + virtual ~WorldSocketMgr(); ReactorRunnable* m_NetThreads; size_t m_NetThreadsCount; @@ -73,7 +73,7 @@ private: ACE_Event_Handler* m_Acceptor; }; -#define sWorldSocketMgr WorldSocketMgr::Instance () +#define sWorldSocketMgr WorldSocketMgr::Instance() #endif /// @} |