diff options
Diffstat (limited to 'src/server/shared')
| -rw-r--r-- | src/server/shared/Database/DatabaseWorkerPool.h | 12 | ||||
| -rw-r--r-- | src/server/shared/Database/Implementation/WorldDatabase.cpp | 4 | ||||
| -rw-r--r-- | src/server/shared/Networking/Socket.h | 109 |
3 files changed, 120 insertions, 5 deletions
diff --git a/src/server/shared/Database/DatabaseWorkerPool.h b/src/server/shared/Database/DatabaseWorkerPool.h index a1ee6407fea..18797b6b12a 100644 --- a/src/server/shared/Database/DatabaseWorkerPool.h +++ b/src/server/shared/Database/DatabaseWorkerPool.h @@ -297,8 +297,10 @@ class DatabaseWorkerPool QueryResultFuture AsyncQuery(const char* sql) { BasicStatementTask* task = new BasicStatementTask(sql, true); + // Store future result before enqueueing - task might get already processed and deleted before returning from this method + QueryResultFuture result = task->GetFuture(); Enqueue(task); - return task->GetFuture(); //! Actual return value has no use yet + return result; } //! Enqueues a query in string format -with variable args- that will set the value of the QueryResultFuture return object as soon as the query is executed. @@ -320,8 +322,10 @@ class DatabaseWorkerPool PreparedQueryResultFuture AsyncQuery(PreparedStatement* stmt) { PreparedStatementTask* task = new PreparedStatementTask(stmt, true); + // Store future result before enqueueing - task might get already processed and deleted before returning from this method + PreparedQueryResultFuture result = task->GetFuture(); Enqueue(task); - return task->GetFuture(); + return result; } //! Enqueues a vector of SQL operations (can be both adhoc and prepared) that will set the value of the QueryResultHolderFuture @@ -331,8 +335,10 @@ class DatabaseWorkerPool QueryResultHolderFuture DelayQueryHolder(SQLQueryHolder* holder) { SQLQueryHolderTask* task = new SQLQueryHolderTask(holder); + // Store future result before enqueueing - task might get already processed and deleted before returning from this method + QueryResultHolderFuture result = task->GetFuture(); Enqueue(task); - return task->GetFuture(); + return result; } /** diff --git a/src/server/shared/Database/Implementation/WorldDatabase.cpp b/src/server/shared/Database/Implementation/WorldDatabase.cpp index 800f9eafb60..53f3b5519cd 100644 --- a/src/server/shared/Database/Implementation/WorldDatabase.cpp +++ b/src/server/shared/Database/Implementation/WorldDatabase.cpp @@ -51,12 +51,12 @@ void WorldDatabaseConnection::DoPrepareStatements() PrepareStatement(WORLD_UPD_WAYPOINT_DATA_WPGUID, "UPDATE waypoint_data SET wpguid = ? WHERE id = ? and point = ?", CONNECTION_ASYNC); PrepareStatement(WORLD_SEL_WAYPOINT_DATA_MAX_ID, "SELECT MAX(id) FROM waypoint_data", CONNECTION_SYNCH); PrepareStatement(WORLD_SEL_WAYPOINT_DATA_MAX_POINT, "SELECT MAX(point) FROM waypoint_data WHERE id = ?", CONNECTION_SYNCH); - PrepareStatement(WORLD_SEL_WAYPOINT_DATA_BY_ID, "SELECT point, position_x, position_y, position_z, orientation, move_flag, delay, action, action_chance FROM waypoint_data WHERE id = ? ORDER BY point", CONNECTION_SYNCH); + PrepareStatement(WORLD_SEL_WAYPOINT_DATA_BY_ID, "SELECT point, position_x, position_y, position_z, orientation, move_type, delay, action, action_chance FROM waypoint_data WHERE id = ? ORDER BY point", CONNECTION_SYNCH); PrepareStatement(WORLD_SEL_WAYPOINT_DATA_POS_BY_ID, "SELECT point, position_x, position_y, position_z FROM waypoint_data WHERE id = ?", CONNECTION_SYNCH); PrepareStatement(WORLD_SEL_WAYPOINT_DATA_POS_FIRST_BY_ID, "SELECT position_x, position_y, position_z FROM waypoint_data WHERE point = 1 AND id = ?", CONNECTION_SYNCH); PrepareStatement(WORLD_SEL_WAYPOINT_DATA_POS_LAST_BY_ID, "SELECT position_x, position_y, position_z, orientation FROM waypoint_data WHERE id = ? ORDER BY point DESC LIMIT 1", CONNECTION_SYNCH); PrepareStatement(WORLD_SEL_WAYPOINT_DATA_BY_WPGUID, "SELECT id, point FROM waypoint_data WHERE wpguid = ?", CONNECTION_SYNCH); - PrepareStatement(WORLD_SEL_WAYPOINT_DATA_ALL_BY_WPGUID, "SELECT id, point, delay, move_flag, action, action_chance FROM waypoint_data WHERE wpguid = ?", CONNECTION_SYNCH); + PrepareStatement(WORLD_SEL_WAYPOINT_DATA_ALL_BY_WPGUID, "SELECT id, point, delay, move_type, action, action_chance FROM waypoint_data WHERE wpguid = ?", CONNECTION_SYNCH); PrepareStatement(WORLD_UPD_WAYPOINT_DATA_ALL_WPGUID, "UPDATE waypoint_data SET wpguid = 0", CONNECTION_ASYNC); PrepareStatement(WORLD_SEL_WAYPOINT_DATA_BY_POS, "SELECT id, point FROM waypoint_data WHERE (abs(position_x - ?) <= ?) and (abs(position_y - ?) <= ?) and (abs(position_z - ?) <= ?)", CONNECTION_SYNCH); PrepareStatement(WORLD_SEL_WAYPOINT_DATA_WPGUID_BY_ID, "SELECT wpguid FROM waypoint_data WHERE id = ? and wpguid <> 0", CONNECTION_SYNCH); diff --git a/src/server/shared/Networking/Socket.h b/src/server/shared/Networking/Socket.h new file mode 100644 index 00000000000..676418c27a7 --- /dev/null +++ b/src/server/shared/Networking/Socket.h @@ -0,0 +1,109 @@ +/* + * Copyright (C) 2008-2014 TrinityCore <http://www.trinitycore.org/> + * Copyright (C) 2005-2009 MaNGOS <http://getmangos.com/> + * + * This program is free software; you can redistribute it and/or modify it + * under the terms of the GNU General Public License as published by the + * Free Software Foundation; either version 2 of the License, or (at your + * option) any later version. + * + * This program is distributed in the hope that it will be useful, but WITHOUT + * ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or + * FITNESS FOR A PARTICULAR PURPOSE. See the GNU General Public License for + * more details. + * + * You should have received a copy of the GNU General Public License along + * with this program. If not, see <http://www.gnu.org/licenses/>. + */ + +#ifndef __SOCKET_H__ +#define __SOCKET_H__ + +#include "Define.h" +#include "Log.h" +#include <vector> +#include <mutex> +#include <queue> +#include <memory> +#include <functional> +#include <boost/asio/ip/tcp.hpp> +#include <boost/asio/write.hpp> + +using boost::asio::ip::tcp; + +template<class T> +class Socket : public std::enable_shared_from_this<T> +{ +public: + Socket(tcp::socket&& socket, std::size_t headerSize) : _socket(std::move(socket)), _headerSize(headerSize) { } + + virtual void Start() = 0; + + boost::asio::ip::address GetRemoteIpAddress() const { return _socket.remote_endpoint().address(); }; + uint16 GetRemotePort() const { return _socket.remote_endpoint().port(); } + + void AsyncReadHeader() + { + _socket.async_read_some(boost::asio::buffer(_readBuffer, _headerSize), std::bind(&Socket<T>::ReadHeaderHandlerInternal, this->shared_from_this(), std::placeholders::_1, std::placeholders::_2)); + } + + void AsyncReadData(std::size_t size, std::size_t bufferOffset) + { + _socket.async_read_some(boost::asio::buffer(&_readBuffer[bufferOffset], size), std::bind(&Socket<T>::ReadDataHandlerInternal, this->shared_from_this(), std::placeholders::_1, std::placeholders::_2)); + } + + void ReadData(std::size_t size, std::size_t bufferOffset) + { + _socket.read_some(boost::asio::buffer(&_readBuffer[bufferOffset], size)); + } + + void AsyncWrite(std::vector<uint8> const& data) + { + boost::asio::async_write(_socket, boost::asio::buffer(data), std::bind(&Socket<T>::WriteHandler, this->shared_from_this(), std::placeholders::_1, std::placeholders::_2)); + } + + bool IsOpen() const { return _socket.is_open(); } + void CloseSocket() + { + boost::system::error_code socketError; + _socket.close(socketError); + if (socketError) + TC_LOG_DEBUG("network", "Socket::CloseSocket: %s errored when closing socket: %i (%s)", GetRemoteIpAddress().to_string().c_str(), socketError.value(), socketError.message().c_str()); + } + + uint8* GetReadBuffer() { return _readBuffer; } + +protected: + virtual void ReadHeaderHandler(boost::system::error_code error, size_t transferedBytes) = 0; + virtual void ReadDataHandler(boost::system::error_code error, size_t transferedBytes) = 0; + + std::mutex _writeLock; + std::queue<std::vector<uint8> > _writeQueue; + +private: + void ReadHeaderHandlerInternal(boost::system::error_code error, size_t transferedBytes) { ReadHeaderHandler(error, transferedBytes); } + void ReadDataHandlerInternal(boost::system::error_code error, size_t transferedBytes) { ReadDataHandler(error, transferedBytes); } + + void WriteHandler(boost::system::error_code error, size_t /*transferedBytes*/) + { + if (!error) + { + std::lock_guard<std::mutex> deleteGuard(_writeLock); + + _writeQueue.pop(); + + if (!_writeQueue.empty()) + AsyncWrite(_writeQueue.front()); + } + else + CloseSocket(); + } + + tcp::socket _socket; + + uint8 _readBuffer[4096]; + + std::size_t _headerSize; +}; + +#endif // __SOCKET_H__ |
