diff options
72 files changed, 4922 insertions, 68 deletions
diff --git a/CMakeLists.txt b/CMakeLists.txt index 5e91c7da4ec..905d92996bf 100644 --- a/CMakeLists.txt +++ b/CMakeLists.txt @@ -56,6 +56,7 @@ set(OPENSSL_EXPECTED_VERSION 1.0.0) find_package(PCHSupport) find_package(OpenSSL REQUIRED) find_package(Threads REQUIRED) +find_package(ZMQ REQUIRED) include(ConfigureBoost) find_package(MySQL REQUIRED) diff --git a/cmake/macros/FindZMQ.cmake b/cmake/macros/FindZMQ.cmake new file mode 100644 index 00000000000..6039dd56e2c --- /dev/null +++ b/cmake/macros/FindZMQ.cmake @@ -0,0 +1,75 @@ +# +# Find the ZMQ includes and library +# + +# This module defines +# ZMQ_INCLUDE_DIR, where to find zmq.h +# ZMQ_LIBRARY, the library needed to use ZMQ +# ZMQ_FOUND, if false, you cannot build anything that requires ZMQ. + +set(ZMQ_FOUND 0) + +if (PLATFORM EQUAL 64) + set(ZMQ_REGISTRY_PATH + "[HKEY_LOCAL_MACHINE\\SOFTWARE\\Microsoft\\Windows\\CurrentVersion\\Uninstall\\ZeroMQ (x64);DisplayIcon]" + ) +else() + set(ZMQ_REGISTRY_PATH + "[HKEY_LOCAL_MACHINE\\SOFTWARE\\Microsoft\\Windows\\CurrentVersion\\Uninstall\\ZeroMQ;DisplayIcon]" + "[HKEY_LOCAL_MACHINE\\SOFTWARE\\Wow6432Node\\Microsoft\\Windows\\CurrentVersion\\Uninstall\\ZeroMQ;DisplayIcon]" + ) +endif() + +find_path(ZMQ_INCLUDE_DIR + NAMES + zmq.h + HINTS + "${ZMQ_REGISTRY_PATH}/include" + PATHS + /usr/include + /usr/local/include +) + +if (MSVC) + # Read registry key holding version + if (PLATFORM EQUAL 64) + get_filename_component(ZMQ_NAME "[HKEY_LOCAL_MACHINE\\SOFTWARE\\Microsoft\\Windows\\CurrentVersion\\Uninstall\\ZeroMQ (x64);DisplayVersion]" NAME) + else() + get_filename_component(ZMQ_NAME "[HKEY_LOCAL_MACHINE\\SOFTWARE\\Microsoft\\Windows\\CurrentVersion\\Uninstall\\ZeroMQ;DisplayVersion]" NAME) + if (${ZMQ_NAME} MATCHES "registry") # if key was not found, the string "registry" is returned + get_filename_component(ZMQ_NAME "[HKEY_LOCAL_MACHINE\\SOFTWARE\\Wow6432Node\\Microsoft\\Windows\\CurrentVersion\\Uninstall\\ZeroMQ;DisplayVersion]" NAME) + endif() + endif() + + # Replace dots with underscores + string(REGEX REPLACE "\\." "_" ZMQ_NAME ${ZMQ_NAME}) + + # Get Visual studio version number + string(REGEX REPLACE "Visual Studio ([0-9]+).*" "\\1" ZMQ_VS_VERSION ${CMAKE_GENERATOR}) + + # Format ZMQ library file name + set(ZMQ_LIBRARY_NAME "libzmq-v${ZMQ_VS_VERSION}0-mt-${ZMQ_NAME}") +endif() + +find_library(ZMQ_LIBRARY + NAMES + zmq + ${ZMQ_LIBRARY_NAME} + HINTS + "${ZMQ_REGISTRY_PATH}/lib" + PATHS + /lib + /usr/lib + /usr/local/lib +) + +if (ZMQ_INCLUDE_DIR AND ZMQ_LIBRARY) + set(ZMQ_FOUND 1) + message(STATUS "Found ZMQ library: ${ZMQ_LIBRARY}") + message(STATUS "Found ZMQ headers: ${ZMQ_INCLUDE_DIR}") +else() + message(FATAL_ERROR "Could not find ZMQ libraries/headers! Please install ZMQ with libraries and headers") +endif() + +# show the ZMQ_INCLUDE_DIR and ZMQ_LIBRARY variables only in the advanced view +mark_as_advanced(ZMQ_INCLUDE_DIR ZMQ_LIBRARY ZMQ_FOUND) diff --git a/dep/CMakeLists.txt b/dep/CMakeLists.txt index 8ae2e7ac6d7..e304171560b 100644 --- a/dep/CMakeLists.txt +++ b/dep/CMakeLists.txt @@ -36,6 +36,7 @@ endif() if(SERVERS) add_subdirectory(gsoap) + add_subdirectory(zmqpp) endif() if(TOOLS) diff --git a/dep/PackageList.txt b/dep/PackageList.txt index 6c17867a929..aac1e5eb7b1 100644 --- a/dep/PackageList.txt +++ b/dep/PackageList.txt @@ -39,3 +39,7 @@ recastnavigation (Recast is state of the art navigation mesh construction toolse StormLib (a pack of modules, written in C++, which are able to read and also to write files from/to the MPQ archives) http://www.zezula.net/en/mpq/stormlib.html Version: 8.04 + +zmqpp (C++ binding for 0mq/zmq is a 'high-level' library that hides most of the c-style interface core 0mq provides.) + https://github.com/zeromq/zmqpp + Version: 3.2.0 17e9f6afa98f56ecac1e3f3eecbfc112357a6732 diff --git a/dep/zmqpp/CMakeLists.txt b/dep/zmqpp/CMakeLists.txt new file mode 100644 index 00000000000..6b6bd35b6e7 --- /dev/null +++ b/dep/zmqpp/CMakeLists.txt @@ -0,0 +1,31 @@ +# Copyright (C) 2008-2014 TrinityCore <http://www.trinitycore.org/> +# +# This file is free software; as a special exception the author gives +# unlimited permission to copy and/or distribute it, with or without +# modifications, as long as this notice is preserved. +# +# This program is distributed in the hope that it will be useful, but +# WITHOUT ANY WARRANTY, to the extent permitted by law; without even the +# implied warranty of MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. + +file(GLOB_RECURSE sources_zmqpp zmqpp/*.cpp zmqpp/*.hpp zmqpp/*.h) + +set(zmqpp_STAT_SRCS + ${sources_zmqpp} +) + +include_directories(${ZMQ_INCLUDE_DIR}) + +add_library(zmqpp STATIC + ${zmqpp_STAT_SRCS} +) + +if (WIN32) + add_definitions(-DBUILD_VERSION=\\"3.2.0\\") +else() + add_definitions(-DBUILD_VERSION='"3.2.0"') +endif() + +add_definitions(-DBUILD_VERSION_MAJOR=3) +add_definitions(-DBUILD_VERSION_MINOR=2) +add_definitions(-DBUILD_VERSION_REVISION=0) diff --git a/dep/zmqpp/zmqpp/compatibility.hpp b/dep/zmqpp/zmqpp/compatibility.hpp new file mode 100644 index 00000000000..103b2c82ebd --- /dev/null +++ b/dep/zmqpp/zmqpp/compatibility.hpp @@ -0,0 +1,97 @@ +/** + * \file + * + * \date 10 Sep 2011 + * \author ron + * \author Ben Gray (\@benjamg) + * + * A fair number of C++0x (or more accurately C++11) features are used in this + * library and as this project is used where I work on older compilers this + * file was created to help. + * + * C++ features and their workaround where not supported: + * \li lambda functions - disabled, these are only used in the test anyway. + * \li typesafe enums - replaced with enum where comparisons needed. + * \li nullptr - defined to null. + * + * As of the port to version 3.1 (libzmqpp version 1.1.0) this file will also + * be used to maintain compatablity with multiple versions of 0mq + */ + +#ifndef ZMQPP_COMPATIBILITY_HPP_ +#define ZMQPP_COMPATIBILITY_HPP_ + +#include <zmq.h> +#include <cstdint> + +// Currently we require at least 0mq version 2.2.x +#define ZMQPP_REQUIRED_ZMQ_MAJOR 2 +#define ZMQPP_REQUIRED_ZMQ_MINOR 2 + +#if (ZMQ_VERSION_MAJOR < ZMQPP_REQUIRED_ZMQ_MAJOR) || ((ZMQ_VERSION_MAJOR == ZMQPP_REQUIRED_ZMQ_MAJOR) && (ZMQ_VERSION_MINOR < ZMQPP_REQUIRED_ZMQ_MINOR)) +#error zmqpp requires a later version of 0mq +#endif + +// Experimental feature support +#if (ZMQ_VERSION_MAJOR == 3) && (ZMQ_VERSION_MINOR == 0) +#define ZMQ_EXPERIMENTAL_LABELS +#endif + +// Deal with older versions of gcc +#if defined(__GNUC__) && !defined(__clang__) +#if __GNUC__ == 4 + +// Deal with older gcc not supporting C++0x typesafe enum class name {} comparison +#if __GNUC_MINOR__ < 4 +#define ZMQPP_COMPARABLE_ENUM enum +#endif + +#if __GNUC_MINOR__ == 4 +#if __GNUC_PATCHLEVEL__ < 1 +#undef ZMQPP_COMPARABLE_ENUM +#define ZMQPP_COMPARABLE_ENUM enum +#endif // if __GNUC_PATCHLEVEL__ < 1 +#endif // if __GNUC_MINOR__ == 4 + +// Deal with older gcc not supporting C++0x lambda function +#if __GNUC_MINOR__ < 5 +#define ZMQPP_IGNORE_LAMBDA_FUNCTION_TESTS +#define ZMQPP_EXPLICITLY_DELETED +#endif // if __GNUC_MINOR__ < 5 + +// Deal with older gcc not supporting C++0x nullptr +#if __GNUC_MINOR__ < 6 +#define nullptr NULL +#define NOEXCEPT +#endif // if __GNUC_MINOR__ < 6 + +#endif // if __GNUC_ == 4 +#endif // if defined(__GNUC__) && !defined(__clang__) + +#if defined(_MSC_VER) +#define NOEXCEPT throw() +#if _MSC_VER < 1800 +#define ZMQPP_EXPLICITLY_DELETED +#endif // if _MSC_VER < 1800 +#if _MSC_VER < 1600 +#define nullptr NULL +#define ZMQPP_IGNORE_LAMBDA_FUNCTION_TESTS +#define ZMQPP_COMPARABLE_ENUM enum +#endif // if _MSC_VER < 1600 +#endif // if defined(_MSC_VER) + +// Generic state, assume a modern compiler +#ifndef ZMQPP_COMPARABLE_ENUM +#define ZMQPP_COMPARABLE_ENUM enum class +#endif + +#ifndef ZMQPP_EXPLICITLY_DELETED +#define ZMQPP_EXPLICITLY_DELETED = delete +#endif + +#ifndef NOEXCEPT +#define NOEXCEPT noexcept +#endif + +#endif /* ZMQPP_COMPATIBILITY_HPP_ */ + diff --git a/dep/zmqpp/zmqpp/context.cpp b/dep/zmqpp/zmqpp/context.cpp new file mode 100644 index 00000000000..32c657199dc --- /dev/null +++ b/dep/zmqpp/zmqpp/context.cpp @@ -0,0 +1,54 @@ +/** + * \file + * + * \date 9 Aug 2011 + * \author Ben Gray (\@benjamg) + */ + +#include "context.hpp" + +namespace zmqpp +{ + +void context::terminate() +{ + int result; + do + { +#if (ZMQ_VERSION_MAJOR < 3) || ((ZMQ_VERSION_MAJOR == 3) && (ZMQ_VERSION_MINOR < 2)) + result = zmq_term(_context); +#else + result = zmq_ctx_destroy(_context); +#endif + } while (result != 0 && zmq_errno() == EINTR); + if (result != 0) { throw zmq_internal_exception(); } + _context = nullptr; +} + +#if (ZMQ_VERSION_MAJOR > 3) || ((ZMQ_VERSION_MAJOR == 3) && (ZMQ_VERSION_MINOR >= 2)) +void context::set(context_option const option, int const value) +{ + if (nullptr == _context) { throw invalid_instance("context is invalid"); } + + if (0 != zmq_ctx_set(_context, static_cast<int>(option), value)) + { + throw zmq_internal_exception(); + } +} + +int context::get(context_option const option) +{ + if (nullptr == _context) { throw invalid_instance("context is invalid"); } + + int result = zmq_ctx_get(_context, static_cast<int>(option)); + + if (result < 0) + { + throw zmq_internal_exception(); + } + + return result; +} +#endif + +} diff --git a/dep/zmqpp/zmqpp/context.hpp b/dep/zmqpp/zmqpp/context.hpp new file mode 100644 index 00000000000..3ffaf791440 --- /dev/null +++ b/dep/zmqpp/zmqpp/context.hpp @@ -0,0 +1,184 @@ +/** + * \file + * + * \date 9 Aug 2011 + * \author Ben Gray (\@benjamg) + */ + +#ifndef ZMQPP_CONTEXT_HPP_ +#define ZMQPP_CONTEXT_HPP_ + +#include <cassert> + +#include <zmq.h> + +#include "compatibility.hpp" +#include "exception.hpp" +#if (ZMQ_VERSION_MAJOR > 3) || ((ZMQ_VERSION_MAJOR == 3) && (ZMQ_VERSION_MINOR >= 2)) +#include "context_options.hpp" +#endif + +namespace zmqpp +{ + +/*! + * The context class represents internal zmq context and io threads. + * + * By default the context class will create one thread, however this can be + * overridden in the constructor. + * + * The context class is the only object that can be considered thread safe. + * + * All sockets using endpoints other than inproc require the context to have + * at least one thread. + * + * This class is c++0x move supporting and cannot be copied. + */ +class context +{ +public: + +#if (ZMQ_VERSION_MAJOR < 3) || ((ZMQ_VERSION_MAJOR == 3) && (ZMQ_VERSION_MINOR < 2)) + /*! + * Initialise the 0mq context. + * + * If only inproc is used then the context may be created with zero threads. + * Any inproc endpoint using sockets must be created using the same context. + * + * The context is thread safe an may be used anywhere in your application, + * however there is no requirement (other than inproc restrictions) for you + * to do this. + * + * \param threads an integer argument for the number of required threads. Defaults to 1. + */ + context(int const& threads = 1) +#else + /*! + * Initialise the 0mq context. + * + * The context is thread safe an may be used anywhere in your application, + * however there is no requirement (other than inproc restrictions) for you + * to do this. + */ + context() +#endif + : _context(nullptr) + { +#if (ZMQ_VERSION_MAJOR < 3) || ((ZMQ_VERSION_MAJOR == 3) && (ZMQ_VERSION_MINOR < 2)) + _context = zmq_init(threads); +#else + _context = zmq_ctx_new(); +#endif + + if (nullptr == _context) + { + throw zmq_internal_exception(); + } + } + + /*! + * Closes the 0mq context. + * + * Any blocking calls other than a socket close will return with an error. + * + * If there are open sockets will block while zmq internal buffers are + * processed up to a limit specified by that sockets linger option. + */ + ~context() NOEXCEPT + { + if (nullptr != _context) + { + terminate(); + } + } + + /*! + * Move supporting constructor. + * + * Allows zero-copy move semantics to be used with this class. + * + * \param source a rvalue instance of the object who's internals we wish to steal. + */ + context(context&& source) NOEXCEPT + : _context(source._context) + { + source._context = nullptr; + } + + /*! + * Move supporting operator. + * + * Allows zero-copy move semantics to be used with this class. + * + * \param source an rvalue instance of the context who's internals we wish to steal. + */ + context& operator=(context&& source) NOEXCEPT + { + std::swap( _context, source._context ); + return *this; + } + + /*! + * Terminate the current context. + * + * Any blocking calls other than a socket close will return with an error. + * + * If there are open sockets will block while zmq internal buffers are + * processed up to a limit specified by that sockets linger option. + */ + void terminate(); + +#if (ZMQ_VERSION_MAJOR > 3) || ((ZMQ_VERSION_MAJOR == 3) && (ZMQ_VERSION_MINOR >= 2)) + /*! + * Set the value of an option in the underlaying zmq context. + * + * \param option a valid ::context_option + * \param value to set the option to + */ + void set(context_option const option, int const value); + + /*! + * Get a context option from the underlaying zmq context. + * + * \param option a valid ::context_option + * \return context option value + */ + int get(context_option const option); +#endif + + /*! + * Validity checking of the context + * + * Checks if the underlying 0mq context for this instance is valid. + * + * Contexts should always be valid unless people are doing 'fun' things with + * std::move. + * + * \return boolean true if the object is valid. + */ + operator bool() const NOEXCEPT + { + return nullptr != _context; + } + + /*! + * Access to the raw 0mq context + * + * \return void pointer to the underlying 0mq context. + */ + operator void*() const NOEXCEPT + { + return _context; + } + +private: + void* _context; + + // No copy - private and not implemented + context(context const&) ZMQPP_EXPLICITLY_DELETED; + context& operator=(context const&) NOEXCEPT ZMQPP_EXPLICITLY_DELETED; +}; + +} + +#endif /* ZMQPP_CONTEXT_HPP_ */ diff --git a/dep/zmqpp/zmqpp/context_options.hpp b/dep/zmqpp/zmqpp/context_options.hpp new file mode 100644 index 00000000000..b2e2cf4805f --- /dev/null +++ b/dep/zmqpp/zmqpp/context_options.hpp @@ -0,0 +1,26 @@ +/** + * \file + * + * \date 3 Jul 2013 + * \author Ben Gray (\@benjamg) + */ + +#ifndef ZMQPP_CONTEXT_OPTIONS_HPP_ +#define ZMQPP_CONTEXT_OPTIONS_HPP_ + +namespace zmqpp +{ + +/** \todo Expand the information on the options to make it actually useful. */ +/*! + * \brief possible Context options in zmq + */ + +enum class context_option { + io_threads = ZMQ_IO_THREADS, /*!< I/O thread count */ + max_sockets = ZMQ_MAX_SOCKETS, /*!< Maximum supported sockets */ +}; + +} + +#endif /* ZMQPP_CONTEXT_OPTIONS_HPP_ */ diff --git a/dep/zmqpp/zmqpp/exception.hpp b/dep/zmqpp/zmqpp/exception.hpp new file mode 100644 index 00000000000..a0b234769ce --- /dev/null +++ b/dep/zmqpp/zmqpp/exception.hpp @@ -0,0 +1,87 @@ +/** + * \file + * + * \date 9 Aug 2011 + * \author Ben Gray (\@benjamg) + */ + +#ifndef ZMQPP_EXCEPTION_HPP_ +#define ZMQPP_EXCEPTION_HPP_ + +#include <stdexcept> +#include <string> + +#include <zmq.h> + +namespace zmqpp +{ + +/** \todo Have a larger variety of exceptions with better state debug information */ + +/*! + * Represents the base zmqpp exception. + * + * All zmqpp runtime exceptions are children of this class. + * The class itself does not provide any special access fields but it only + * for convince when catching exceptions. + * + * The class extends std::runtime_error. + * + */ +class exception : public std::runtime_error +{ +public: + /*! + * Standard exception constructor. + * + * \param message a string representing the error message. + */ + exception(std::string const& message) + : std::runtime_error(message) + { } +}; + +/*! + * Represents an attempt to use an invalid object. + * + * Objects may be invalid initially or after a shutdown or close. + */ +class invalid_instance : public exception +{ +public: + invalid_instance(std::string const& message) + : exception(message) + { } +}; + +/*! + * Represents internal zmq errors. + * + * Any error response from the zmq bindings will be wrapped in this error. + * + * The class provides access to the zmq error number via zmq_error(). + */ +class zmq_internal_exception : public exception +{ +public: + /*! + * Uses the zmq functions to pull out error messages and numbers. + */ + zmq_internal_exception() + : exception(zmq_strerror(zmq_errno())) + , _error(zmq_errno()) + { } + + /*! + * Retrieve the zmq error number associated with this exception. + * \return zmq error number + */ + int zmq_error() const { return _error; } + +private: + int _error; +}; + +} + +#endif /* ZMQPP_EXCEPTION_HPP_ */ diff --git a/dep/zmqpp/zmqpp/frame.cpp b/dep/zmqpp/zmqpp/frame.cpp new file mode 100644 index 00000000000..4c512ae1010 --- /dev/null +++ b/dep/zmqpp/zmqpp/frame.cpp @@ -0,0 +1,95 @@ +/** + * \file + * + * \date 8 Jan 2014 + * \author Ben Gray (\@benjamg) + */ + +#include <cassert> +#include <cstring> + +#include "exception.hpp" +#include "frame.hpp" + +namespace zmqpp { + +frame::frame() + : _sent( false ) +{ + if( 0 != zmq_msg_init( &_msg ) ) + { + throw zmq_internal_exception(); + } +} + +frame::frame(size_t const size) + : _sent( false ) +{ + if( 0 != zmq_msg_init_size( &_msg, size ) ) + { + throw zmq_internal_exception(); + } +} + +frame::frame(void const* part, size_t const size) + : _sent( false ) +{ + if( 0 != zmq_msg_init_size( &_msg, size ) ) + { + throw zmq_internal_exception(); + } + + void* msg_data = zmq_msg_data( &_msg ); + memcpy( msg_data, part, size ); +} + +frame::frame(void* part, size_t const size, zmq_free_fn *ffn, void *hint) + : _sent( false ) +{ + if( 0 != zmq_msg_init_data( &_msg, part, size, ffn, hint ) ) + { + throw zmq_internal_exception(); + } +} + +frame::~frame() +{ +#ifndef NDEBUG // unused assert variable in release + int result = zmq_msg_close( &_msg ); + assert(0 == result); +#else + zmq_msg_close( &_msg ); +#endif // NDEBUG +} + +frame::frame(frame&& other) + : _sent( other._sent ) +{ + zmq_msg_init( &_msg ); + zmq_msg_move( &_msg, &other._msg ); + other._sent = false; +} + +frame& frame::operator=(frame&& other) +{ + zmq_msg_init( &_msg ); + zmq_msg_move( &_msg, &other._msg ); + std::swap( _sent, other._sent ); + + return *this; +} + +frame frame::copy() const +{ + frame other( size() ); + other._sent = _sent; + + if( 0 != zmq_msg_copy( &other._msg, const_cast<zmq_msg_t*>(&_msg) ) ) + { + throw zmq_internal_exception(); + } + + return other; +} + +} // namespace zmqpp diff --git a/dep/zmqpp/zmqpp/frame.hpp b/dep/zmqpp/zmqpp/frame.hpp new file mode 100644 index 00000000000..c9e4b9b7d82 --- /dev/null +++ b/dep/zmqpp/zmqpp/frame.hpp @@ -0,0 +1,58 @@ +/** + * \file + * + * \date 8 Jan 2014 + * \author Ben Gray (\@benjamg) + */ + +#ifndef ZMQPP_MESSAGE_FRAME_HPP_ +#define ZMQPP_MESSAGE_FRAME_HPP_ + +#include <zmq.h> + +#include "compatibility.hpp" + +namespace zmqpp { + +/*! + * \brief an internal frame wrapper for a single zmq message + * + * This frame wrapper consists of a zmq message and meta data it is used + * by the zmqpp message class to keep track of parts in the internal + * queue. It is unlikely you need to use this class. + */ +class frame +{ +public: + frame(); + frame(size_t const size); + frame(void const* part, size_t const size); + frame(void* part, size_t const size, zmq_free_fn *ffn, void *hint); + + ~frame(); + + bool is_sent() const { return _sent; } + void const* data() const { return zmq_msg_data( const_cast<zmq_msg_t*>(&_msg) ); } + size_t size() const { return zmq_msg_size( const_cast<zmq_msg_t*>(&_msg) ); } + + void mark_sent() { _sent = true; } + zmq_msg_t& msg() { return _msg; } + + // Move operators + frame(frame&& other); + frame& operator=(frame&& other); + + frame copy() const; + +private: + bool _sent; + zmq_msg_t _msg; + + // Disable implicit copy support, code must request a copy to clone + frame(frame const&) NOEXCEPT ZMQPP_EXPLICITLY_DELETED; + frame& operator=(frame const&) NOEXCEPT ZMQPP_EXPLICITLY_DELETED; +}; + +} // namespace zmqpp + +#endif /* ZMQPP_MESSAGE_FRAME_HPP_ */ diff --git a/dep/zmqpp/zmqpp/inet.hpp b/dep/zmqpp/zmqpp/inet.hpp new file mode 100644 index 00000000000..5245aa4143c --- /dev/null +++ b/dep/zmqpp/zmqpp/inet.hpp @@ -0,0 +1,171 @@ +/** + * \file + * + * \date 10 Aug 2011 + * \author Ben Gray (\@benjamg) + */ + +#ifndef ZMQPP_INET_HPP_ +#define ZMQPP_INET_HPP_ + +/** \todo cross-platform version of including headers. */ +// We get htons and htonl from here +#ifdef _WIN32 +#include <WinSock2.h> +#else +#include <netinet/in.h> +#endif + +#include "compatibility.hpp" + +namespace zmqpp +{ + +/*! + * \brief Possible byte order types. + * + * An enumeration of all the known order types, all two of them. + * There is also an entry for unknown which is just used as a default. + */ +ZMQPP_COMPARABLE_ENUM order { + big_endian, /*!< byte order is big endian */ + little_endian /*!< byte order is little endian */ +}; + +/*! + * Common code for the 64bit versions of htons/htons and ntohs/ntohl + * + * As htons and ntohs (or htonl and ntohs) always just do the same thing, ie + * swap bytes if the host order differs from network order or otherwise don't + * do anything, it seemed silly to type the code twice. + * + * \note This code assumes network order is always big endian. Which it is. + * \note The host endian is only checked once and afterwards assumed to remain + * the same. + * + * \param value_to_check unsigned 64 bit integer to swap + * \return swapped (or not) unsigned 64 bit integer + */ +inline uint64_t swap_if_needed(uint64_t const value_to_check) +{ + static order host_order = (htonl(42) == 42) ? order::big_endian : order::little_endian; + + if (order::big_endian == host_order) + { + return value_to_check; + } + + union { + uint64_t integer; + uint8_t bytes[8]; + } value { value_to_check }; + + std::swap(value.bytes[0], value.bytes[7]); + std::swap(value.bytes[1], value.bytes[6]); + std::swap(value.bytes[2], value.bytes[5]); + std::swap(value.bytes[3], value.bytes[4]); + + return value.integer; +} + +/*! + * 64 bit version of the htons/htonl + * + * I've used the name htonll to try and keep with the htonl naming scheme. + * + * \param hostlonglong unsigned 64 bit host order integer + * \return unsigned 64 bit network order integer + */ +inline uint64_t htonll(uint64_t const hostlonglong) +{ + return zmqpp::swap_if_needed(hostlonglong); +} + +/*! + * 64 bit version of the ntohs/ntohl + * + * I've used the name htonll to try and keep with the htonl naming scheme. + * + * \param networklonglong unsigned 64 bit network order integer + * \return unsigned 64 bit host order integer + */ +inline uint64_t ntohll(uint64_t const networklonglong) +{ + return zmqpp::swap_if_needed(networklonglong); +} + +/*! + * floating point version of the htons/htonl + * + * \param value host order floating point + * \returns network order floating point + */ +inline float htonf(float value) +{ + assert(sizeof(float) == sizeof(uint32_t)); + + uint32_t temp; + memcpy(&temp, &value, sizeof(uint32_t)); + temp = htonl( temp ); + memcpy(&value, &temp, sizeof(uint32_t)); + + return value; +} + +/*! + * floating point version of the ntohs/ntohl + * + * \param value network order float + * \returns host order float + */ +inline float ntohf(float value) +{ + assert(sizeof(float) == sizeof(uint32_t)); + + uint32_t temp; + memcpy(&temp, &value, sizeof(uint32_t)); + temp = ntohl( temp ); + memcpy(&value, &temp, sizeof(uint32_t)); + + return value; +} + +/*! + * double precision floating point version of the htons/htonl + * + * \param value host order double precision floating point + * \returns network order double precision floating point + */ +inline double htond(double value) +{ + assert(sizeof(double) == sizeof(uint64_t)); + + uint64_t temp; + memcpy(&temp, &value, sizeof(uint64_t)); + temp = zmqpp::htonll(temp); + memcpy(&value, &temp, sizeof(uint64_t)); + + return value; +} + +/*! + * double precision floating point version of the ntohs/ntohl + * + * \param value network order double precision floating point + * \returns host order double precision floating point + */ +inline double ntohd(double value) +{ + assert(sizeof(double) == sizeof(uint64_t)); + + uint64_t temp; + memcpy(&temp, &value, sizeof(uint64_t)); + temp = zmqpp::ntohll(temp); + memcpy(&value, &temp, sizeof(uint64_t)); + + return value; +} + +} + +#endif /* INET_HPP_ */ diff --git a/dep/zmqpp/zmqpp/message.cpp b/dep/zmqpp/zmqpp/message.cpp new file mode 100644 index 00000000000..58587307364 --- /dev/null +++ b/dep/zmqpp/zmqpp/message.cpp @@ -0,0 +1,454 @@ +/* + * Created on: 9 Aug 2011 + * Author: Ben Gray (@benjamg) + */ + +#include <cassert> +#include <cstring> + +#include "exception.hpp" +#include "inet.hpp" +#include "message.hpp" + +namespace zmqpp +{ + +/*! + * \brief internal construct + * \internal handles bubbling callback from zmq c style to the c++ functor provided + */ +struct callback_releaser +{ + message::release_function func; +}; + +message::message() + : _parts() + , _read_cursor(0) +{ +} + +message::~message() +{ + _parts.clear(); +} + +size_t message::parts() const +{ + return _parts.size(); +} + +/* + * The two const_casts in size and raw_data are a little bit hacky + * but neither of these methods called this way actually modify data + * so accurately represent the intent of these calls. + */ + +size_t message::size(size_t const part /* = 0 */) const +{ + if(part >= _parts.size()) + { + throw exception("attempting to request a message part outside the valid range"); + } + + return _parts[part].size(); +} + +void const* message::raw_data(size_t const part /* = 0 */) const +{ + if(part >= _parts.size()) + { + throw exception("attempting to request a message part outside the valid range"); + } + + return _parts[part].data(); +} + +zmq_msg_t& message::raw_msg(size_t const part /* = 0 */) +{ + if(part >= _parts.size()) + { + throw exception("attempting to request a message part outside the valid range"); + } + + return _parts[part].msg(); +} + +zmq_msg_t& message::raw_new_msg() +{ + _parts.push_back( frame() ); + + return _parts.back().msg(); +} + +zmq_msg_t& message::raw_new_msg(size_t const reserve_data_size) +{ + _parts.push_back( frame(reserve_data_size) ); + + return _parts.back().msg(); +} + +std::string message::get(size_t const part /* = 0 */) const +{ + return std::string(static_cast<char const*>(raw_data(part)), size(part)); +} + + +// Move operators will take ownership of message parts without copying +void message::move(void* part, size_t const size, release_function const& release) +{ + callback_releaser* hint = new callback_releaser(); + hint->func = release; + + _parts.push_back( frame( part, size, &message::release_callback, hint ) ); +} + +// Stream reader style +void message::reset_read_cursor() +{ + _read_cursor = 0; +} + +void message::get(int8_t& integer, size_t const part) const +{ + assert(sizeof(int8_t) == size(part)); + + int8_t const* byte = static_cast<int8_t const*>(raw_data(part)); + integer = *byte; +} + +void message::get(int16_t& integer, size_t const part) const +{ + assert(sizeof(int16_t) == size(part)); + + uint16_t const* network_order = static_cast<uint16_t const*>(raw_data(part)); + integer = static_cast<int16_t>(ntohs(*network_order)); +} + +void message::get(int32_t& integer, size_t const part) const +{ + assert(sizeof(int32_t) == size(part)); + + uint32_t const* network_order = static_cast<uint32_t const*>(raw_data(part)); + integer = static_cast<int32_t>(htonl(*network_order)); +} + +void message::get(int64_t& integer, size_t const part) const +{ + assert(sizeof(int64_t) == size(part)); + + uint64_t const* network_order = static_cast<uint64_t const*>(raw_data(part)); + integer = static_cast<int64_t>(zmqpp::htonll(*network_order)); +} + +void message::get(uint8_t& unsigned_integer, size_t const part) const +{ + assert(sizeof(uint8_t) == size(part)); + + uint8_t const* byte = static_cast<uint8_t const*>(raw_data(part)); + unsigned_integer = *byte; +} + +void message::get(uint16_t& unsigned_integer, size_t const part) const +{ + assert(sizeof(uint16_t) == size(part)); + + uint16_t const* network_order = static_cast<uint16_t const*>(raw_data(part)); + unsigned_integer = ntohs(*network_order); +} + +void message::get(uint32_t& unsigned_integer, size_t const part) const +{ + assert(sizeof(uint32_t) == size(part)); + + uint32_t const* network_order = static_cast<uint32_t const*>(raw_data(part)); + unsigned_integer = ntohl(*network_order); +} + +void message::get(uint64_t& unsigned_integer, size_t const part) const +{ + assert(sizeof(uint64_t) == size(part)); + + uint64_t const* network_order = static_cast<uint64_t const*>(raw_data(part)); + unsigned_integer = zmqpp::ntohll(*network_order); +} + +void message::get(float& floating_point, size_t const part) const +{ + assert(sizeof(float) == size(part)); + + float const* network_order = static_cast<float const*>(raw_data(part)); + floating_point = zmqpp::ntohf(*network_order); +} + +void message::get(double& double_precision, size_t const part) const +{ + assert(sizeof(double) == size(part)); + + double const* network_order = static_cast<double const*>(raw_data(part)); + double_precision = zmqpp::ntohd(*network_order); +} + +void message::get(bool& boolean, size_t const part) const +{ + assert(sizeof(uint8_t) == size(part)); + + uint8_t const* byte = static_cast<uint8_t const*>(raw_data(part)); + boolean = (*byte != 0); +} + +void message::get(std::string& string, size_t const part) const +{ + string.assign( get(part) ); +} + + +// Stream writer style - these all use copy styles +message& message::operator<<(int8_t const integer) +{ + add(reinterpret_cast<void const*>(&integer), sizeof(int8_t)); + return *this; +} + +message& message::operator<<(int16_t const integer) +{ + uint16_t network_order = htons(static_cast<uint16_t>(integer)); + add(reinterpret_cast<void const*>(&network_order), sizeof(uint16_t)); + + return *this; +} + +message& message::operator<<(int32_t const integer) +{ + uint32_t network_order = htonl(static_cast<uint32_t>(integer)); + add(reinterpret_cast<void const*>(&network_order), sizeof(uint32_t)); + + return *this; +} + +message& message::operator<<(int64_t const integer) +{ + uint64_t network_order = zmqpp::htonll(static_cast<uint64_t>(integer)); + add(reinterpret_cast<void const*>(&network_order), sizeof(uint64_t)); + + return *this; +} + + +message& message::operator<<(uint8_t const unsigned_integer) +{ + add(reinterpret_cast<void const*>(&unsigned_integer), sizeof(uint8_t)); + return *this; +} + +message& message::operator<<(uint16_t const unsigned_integer) +{ + uint16_t network_order = htons(unsigned_integer); + add(reinterpret_cast<void const*>(&network_order), sizeof(uint16_t)); + + return *this; +} + +message& message::operator<<(uint32_t const unsigned_integer) +{ + uint32_t network_order = htonl(unsigned_integer); + add(reinterpret_cast<void const*>(&network_order), sizeof(uint32_t)); + + return *this; +} + +message& message::operator<<(uint64_t const unsigned_integer) +{ + uint64_t network_order = zmqpp::htonll(unsigned_integer); + add(reinterpret_cast<void const*>(&network_order), sizeof(uint64_t)); + + return *this; +} + +message& message::operator<<(float const floating_point) +{ + assert(sizeof(float) == 4); + + float network_order = zmqpp::htonf(floating_point); + add(&network_order, sizeof(float)); + + return *this; +} + +message& message::operator<<(double const double_precision) +{ + assert(sizeof(double) == 8); + + double network_order = zmqpp::htond(double_precision); + add(&network_order, sizeof(double)); + + return *this; +} + +message& message::operator<<(bool const boolean) +{ + uint8_t byte = (boolean) ? 1 : 0; + add(reinterpret_cast<void const*>(&byte), sizeof(uint8_t)); + + return *this; +} + +message& message::operator<<(char const* c_string) +{ + add(reinterpret_cast<void const*>(c_string), strlen(c_string)); + return *this; +} + +message& message::operator<<(std::string const& string) +{ + add(reinterpret_cast<void const*>(string.data()), string.size()); + return *this; +} + +void message::push_front(void const* part, size_t const size) +{ + _parts.emplace( _parts.begin(), part, size ); +} + +void message::push_front(int8_t const integer) +{ + push_front(&integer, sizeof(int8_t)); +} + +void message::push_front(int16_t const integer) +{ + uint16_t network_order = htons(static_cast<uint16_t>(integer)); + push_front(&network_order, sizeof(uint16_t)); +} + +void message::push_front(int32_t const integer) +{ + uint32_t network_order = htonl(static_cast<uint32_t>(integer)); + push_front(&network_order, sizeof(uint32_t)); +} + +void message::push_front(int64_t const integer) +{ + uint64_t network_order = zmqpp::htonll(static_cast<uint64_t>(integer)); + push_front(&network_order, sizeof(uint64_t)); +} + + +void message::push_front(uint8_t const unsigned_integer) +{ + push_front(&unsigned_integer, sizeof(uint8_t)); +} + +void message::push_front(uint16_t const unsigned_integer) +{ + uint16_t network_order = htons(unsigned_integer); + push_front(&network_order, sizeof(uint16_t)); +} + +void message::push_front(uint32_t const unsigned_integer) +{ + uint32_t network_order = htonl(unsigned_integer); + push_front(&network_order, sizeof(uint32_t)); +} + +void message::push_front(uint64_t const unsigned_integer) +{ + uint64_t network_order = zmqpp::htonll(unsigned_integer); + push_front(&network_order, sizeof(uint64_t)); +} + +void message::push_front(float const floating_point) +{ + assert(sizeof(float) == 4); + + float network_order = zmqpp::htonf(floating_point); + push_front(&network_order, sizeof(float)); +} + +void message::push_front(double const double_precision) +{ + assert(sizeof(double) == 8); + + double network_order = zmqpp::htond(double_precision); + push_front(&network_order, sizeof(double)); +} + +void message::push_front(bool const boolean) +{ + uint8_t byte = (boolean) ? 1 : 0; + push_front(&byte, sizeof(uint8_t)); +} + +void message::push_front(char const* c_string) +{ + push_front(c_string, strlen(c_string)); +} + +void message::push_front(std::string const& string) +{ + push_front(string.data(), string.size()); +} + +void message::pop_front() +{ + _parts.erase( _parts.begin() ); +} + +void message::pop_back() +{ + _parts.pop_back(); +} + +message::message(message&& source) NOEXCEPT + : _parts() + , _read_cursor(0) +{ + std::swap(_parts, source._parts); +} + +message& message::operator=(message&& source) NOEXCEPT +{ + std::swap(_parts, source._parts); + return *this; +} + +message message::copy() const +{ + message msg; + msg.copy(*this); + return msg; +} + +void message::copy(message const& source) +{ + _parts.resize( source._parts.size() ); + for(size_t i = 0; i < source._parts.size(); ++i) + { + _parts[i] = source._parts[i].copy(); + } + + // we don't need a copy of the releasers as we did data copies of the internal data, + //_releasers = source._releasers; + //_strings = source._strings +} + +// Used for internal tracking +void message::sent(size_t const part) +{ + // sanity check + assert(!_parts[part].is_sent()); + _parts[part].mark_sent(); +} + +// Note that these releasers are not thread safe, the only safety is provided by +// the socket class taking ownership so no updates can happen while zmq does it's thing +// If used in a custom class this has to be dealt with. +void message::release_callback(void* data, void* hint) +{ + callback_releaser* releaser = static_cast<callback_releaser*>(hint); + releaser->func(data); + + delete releaser; +} + +} diff --git a/dep/zmqpp/zmqpp/message.hpp b/dep/zmqpp/zmqpp/message.hpp new file mode 100644 index 00000000000..c232e032ee0 --- /dev/null +++ b/dep/zmqpp/zmqpp/message.hpp @@ -0,0 +1,253 @@ +/** + * \file + * + * \date 9 Aug 2011 + * \author Ben Gray (\@benjamg) + */ + +#ifndef ZMQPP_MESSAGE_HPP_ +#define ZMQPP_MESSAGE_HPP_ + +#include <functional> +#include <string> +#include <unordered_map> +#include <vector> +#include <utility> + +#include <zmq.h> + +#include "compatibility.hpp" +#include "frame.hpp" + +namespace zmqpp +{ + +/*! + * \brief a zmq message with optional multipart support + * + * A zmq message is made up of one or more parts which are sent together to + * the target endpoints. zmq guarantees either the whole message or none + * of the message will be delivered. + */ +class message +{ +public: + /*! + * \brief callback to release user allocated data. + * + * The release function will be called on any void* moved part. + * It must be thread safe to the extent that the callback may occur on + * one of the context threads. + * + * The function called will be passed a single variable which is the + * pointer to the memory allocated. + */ + typedef std::function<void (void*)> release_function; + + message(); + ~message(); + + template <typename T, typename ...Args> + message(T const &part, Args &&...args) + : message() + { + add(part, std::forward<Args>(args)...); + } + + size_t parts() const; + size_t size(size_t const part) const; + std::string get(size_t const part) const; + + void get(int8_t& integer, size_t const part) const; + void get(int16_t& integer, size_t const part) const; + void get(int32_t& integer, size_t const part) const; + void get(int64_t& integer, size_t const part) const; + + void get(uint8_t& unsigned_integer, size_t const part) const; + void get(uint16_t& unsigned_integer, size_t const part) const; + void get(uint32_t& unsigned_integer, size_t const part) const; + void get(uint64_t& unsigned_integer, size_t const part) const; + + void get(float& floating_point, size_t const part) const; + void get(double& double_precision, size_t const part) const; + void get(bool& boolean, size_t const part) const; + + void get(std::string& string, size_t const part) const; + + // Warn: If a pointer type is requested the message (well zmq) still 'owns' + // the data and will release it when the message object is freed. + template<typename Type> + Type get(size_t const part) + { + Type value; + get(value, part); + return value; + } + + template<int part=0, typename T, typename ...Args> + void extract(T &nextpart, Args &...args) + { + assert(part < parts()); + get(nextpart,part); + extract<part+1>(args...); + } + + template<int part=0, typename T> + void extract(T &nextpart) + { + assert(part < parts()); + get(nextpart,part); + } + + // Raw get data operations, useful with data structures more than anything else + // Warn: The message (well zmq) still 'owns' the data and will release it + // when the message object is freed. + template<typename Type> + void get(Type*& value, size_t const part) const + { + value = static_cast<Type*>(raw_data(part)); + } + + // Warn: The message (well zmq) still 'owns' the data and will release it + // when the message object is freed. + template<typename Type> + void get(Type** value, size_t const part) const + { + *value = static_cast<Type*>(raw_data(part)); + } + + // Move operators will take ownership of message parts without copying + void move(void* part, size_t const size, release_function const& release); + + // Raw move data operation, useful with data structures more than anything else + template<typename Object> + void move(Object *part) + { + move(part, sizeof(Object), &deleter_callback<Object>); + } + + // Copy operators will take copies of any data + template<typename Type> + void add(Type *part, size_t const size) + { + _parts.push_back( frame( part, size ) ); + } + + + template<typename Type, typename ...Args> + void add(Type const& part, Args &&...args) + { + *this << part; + add(std::forward<Args>(args)...); + } + + template<typename Type> + void add(Type const part) + { + *this << part; + } + + // Stream reader style + void reset_read_cursor(); + + template<typename Type> + message& operator>>(Type& value) + { + get(value, _read_cursor++); + return *this; + } + + // Stream writer style - these all use copy styles + message& operator<<(int8_t const integer); + message& operator<<(int16_t const integer); + message& operator<<(int32_t const integer); + message& operator<<(int64_t const integer); + + message& operator<<(uint8_t const unsigned_integer); + message& operator<<(uint16_t const unsigned_integer); + message& operator<<(uint32_t const unsigned_integer); + message& operator<<(uint64_t const unsigned_integer); + + message& operator<<(float const floating_point); + message& operator<<(double const double_precision); + message& operator<<(bool const boolean); + + message& operator<<(char const* c_string); + message& operator<<(std::string const& string); + + // Queue manipulation + void push_front(void const* part, size_t const size); + + // TODO: unify conversion of types with the stream operators + void push_front(int8_t const integer); + void push_front(int16_t const integer); + void push_front(int32_t const integer); + void push_front(int64_t const integer); + + void push_front(uint8_t const unsigned_integer); + void push_front(uint16_t const unsigned_integer); + void push_front(uint32_t const unsigned_integer); + void push_front(uint64_t const unsigned_integer); + + void push_front(float const floating_point); + void push_front(double const double_precision); + void push_front(bool const boolean); + + void push_front(char const* c_string); + void push_front(std::string const& string); + + void pop_front(); + + void push_back(void const* part, size_t const size) + { + add( part, size ); + } + + template<typename Type> + void push_back(Type const part) + { + *this << part; + } + + void pop_back(); + + void remove(size_t const part); + + // Move supporting + message(message&& source) NOEXCEPT; + message& operator=(message&& source) NOEXCEPT; + + // Copy support + message copy() const; + void copy(message const& source); + + // Used for internal tracking + void sent(size_t const part); + + // Access to raw zmq details + void const* raw_data(size_t const part = 0) const; + zmq_msg_t& raw_msg(size_t const part = 0); + zmq_msg_t& raw_new_msg(); + zmq_msg_t& raw_new_msg(size_t const reserve_data_size); + +private: + typedef std::vector<frame> parts_type; + parts_type _parts; + size_t _read_cursor; + + // Disable implicit copy support, code must request a copy to clone + message(message const&) NOEXCEPT ZMQPP_EXPLICITLY_DELETED; + message& operator=(message const&) NOEXCEPT ZMQPP_EXPLICITLY_DELETED; + + static void release_callback(void* data, void* hint); + + template<typename Object> + static void deleter_callback(void* data) + { + delete static_cast<Object*>(data); + } +}; + +} + +#endif /* ZMQPP_MESSAGE_HPP_ */ diff --git a/dep/zmqpp/zmqpp/poller.cpp b/dep/zmqpp/zmqpp/poller.cpp new file mode 100644 index 00000000000..a6340c9bd61 --- /dev/null +++ b/dep/zmqpp/zmqpp/poller.cpp @@ -0,0 +1,182 @@ +/* + * Created on: 16 Aug 2011 + * Author: Ben Gray (@benjamg) + */ + +#include "exception.hpp" +#include "socket.hpp" +#include "poller.hpp" + +#include <zmq.h> + +namespace zmqpp +{ + +const long poller::wait_forever = -1; +const short poller::poll_none = 0; +const short poller::poll_in = ZMQ_POLLIN; +const short poller::poll_out = ZMQ_POLLOUT; +const short poller::poll_error = ZMQ_POLLERR; + +poller::poller() + : _items() + , _index() + , _fdindex() +{ + +} + +poller::~poller() +{ + _items.clear(); + _index.clear(); + _fdindex.clear(); +} + +void poller::add(socket& socket, short const event /* = POLL_IN */) +{ + zmq_pollitem_t item { socket, 0, event, 0 }; + + size_t index = _items.size(); + _items.push_back(item); + _index[socket] = index; +} + +void poller::add(int const descriptor, short const event /* = POLL_IN */) +{ + zmq_pollitem_t item { nullptr, descriptor, event, 0 }; + + size_t index = _items.size(); + _items.push_back(item); + _fdindex[descriptor] = index; +} + +bool poller::has(socket_t const& socket) +{ + return _index.find(socket) != _index.end(); +} + +bool poller::has(int const descriptor) +{ + return _fdindex.find(descriptor) != _fdindex.end(); +} + +void poller::reindex(size_t const index) +{ + if ( nullptr != _items[index].socket ) + { + auto found = _index.find( _items[index].socket ); + if (_index.end() == found) { throw exception("unable to reindex socket in poller"); } + found->second = index; + } + else + { + auto found = _fdindex.find( _items[index].fd ); + if (_fdindex.end() == found) { throw exception("unable to reindex file descriptor in poller"); } + found->second = index; + } +} + +void poller::remove(socket_t const& socket) +{ + auto found = _index.find(socket); + if (_index.end() == found) { return; } + + if ( _items.size() - 1 == found->second ) + { + _items.pop_back(); + _index.erase(found); + return; + } + + std::swap(_items[found->second], _items.back()); + _items.pop_back(); + + auto index = found->second; + _index.erase(found); + + reindex( index ); +} + +void poller::remove(int const descriptor) +{ + auto found = _fdindex.find(descriptor); + if (_fdindex.end() == found) { return; } + + if ( _items.size() - 1 == found->second ) + { + _items.pop_back(); + _fdindex.erase(found); + return; + } + + std::swap(_items[found->second], _items.back()); + _items.pop_back(); + + auto index = found->second; + _fdindex.erase(found); + + reindex( index ); +} + +void poller::check_for(socket const& socket, short const event) +{ + auto found = _index.find(socket); + if (_index.end() == found) + { + throw exception("this socket is not represented within this poller"); + } + + _items[found->second].events = event; +} + +void poller::check_for(int const descriptor, short const event) +{ + auto found = _fdindex.find(descriptor); + if (_fdindex.end() == found) + { + throw exception("this socket is not represented within this poller"); + } + + _items[found->second].events = event; +} + +bool poller::poll(long timeout /* = WAIT_FOREVER */) +{ + int result = zmq_poll(_items.data(), _items.size(), timeout); + if (result < 0) + { + if(EINTR == zmq_errno()) + { + return false; + } + + throw zmq_internal_exception(); + } + + return (result > 0); +} + +short poller::events(socket const& socket) const +{ + auto found = _index.find(socket); + if (_index.end() == found) + { + throw exception("this socket is not represented within this poller"); + } + + return _items[found->second].revents; +} + +short poller::events(int const descriptor) const +{ + auto found = _fdindex.find(descriptor); + if (_fdindex.end() == found) + { + throw exception("this file descriptor is not represented within this poller"); + } + + return _items[found->second].revents; +} + +} diff --git a/dep/zmqpp/zmqpp/poller.hpp b/dep/zmqpp/zmqpp/poller.hpp new file mode 100644 index 00000000000..a19063a091d --- /dev/null +++ b/dep/zmqpp/zmqpp/poller.hpp @@ -0,0 +1,186 @@ +/** + * \file + * + * \date 9 Aug 2011 + * \author Ben Gray (\@benjamg) + */ + +#ifndef ZMQPP_POLLER_HPP_ +#define ZMQPP_POLLER_HPP_ + +#include <unordered_map> +#include <vector> + +#include "compatibility.hpp" + +namespace zmqpp +{ + +class socket; +typedef socket socket_t; + +/*! + * Polling wrapper. + * + * Allows access to polling for any number of sockets or file descriptors. + */ +class poller +{ +public: + static const long wait_forever; /*!< Block forever flag, default setting. */ + + static const short poll_none; /*!< No polling flags set. */ + static const short poll_in; /*!< Monitor inbound flag. */ + static const short poll_out; /*!< Monitor output flag. */ + static const short poll_error; /*!< Monitor error flag.\n Only for file descriptors. */ + + /*! + * Construct an empty polling model. + */ + poller(); + + /*! + * Cleanup poller. + * + * Any sockets will need to be closed separately. + */ + ~poller(); + + /*! + * Add a socket to the polling model and set which events to monitor. + * + * \param socket the socket to monitor. + * \param event the event flags to monitor on the socket. + */ + void add(socket_t& socket, short const event = poll_in); + + /*! + * Add a file descriptor to the polling model and set which events to monitor. + * + * \param descriptor the file descriptor to monitor. + * \param event the event flags to monitor. + */ + void add(int const descriptor, short const event = poll_in | poll_error); + + /*! + * Check if we are monitoring a given socket with this poller. + * + * \param socket the socket to check. + * \return true if it is there. + */ + bool has(socket_t const& socket); + + /*! + * Check if we are monitoring a given file descriptor with this poller. + * + * \param descriptor the file descriptor to check. + * \return true if it is there. + */ + bool has(int const descriptor); + + /*! + * Stop monitoring a socket. + * + * \param socket the socket to stop monitoring. + */ + void remove(socket_t const& socket); + + /*! + * Stop monitoring a file descriptor. + * + * \param descriptor the file descriptor to stop monitoring. + */ + void remove(int const descriptor); + + /*! + * Update the monitored event flags for a given socket. + * + * \param socket the socket to update event flags. + * \param event the event flags to monitor on the socket. + */ + void check_for(socket_t const& socket, short const event); + + /*! + * Update the monitored event flags for a given file descriptor. + * + * \param descriptor the file descriptor to update event flags. + * \param event the event flags to monitor on the socket. + */ + void check_for(int const descriptor, short const event); + + /*! + * Poll for monitored events. + * + * By default this method will block forever or until at least one of the monitored + * sockets or file descriptors has events. + * + * If a timeout is set and was reached then this function returns false. + * + * \param timeout milliseconds to timeout. + * \return true if there is an event.. + */ + bool poll(long timeout = wait_forever); + + /*! + * Get the event flags triggered for a socket. + * + * \param socket the socket to get triggered event flags for. + * \return the event flags. + */ + short events(socket_t const& socket) const; + + /*! + * Get the event flags triggered for a file descriptor. + * + * \param descriptor the file descriptor to get triggered event flags for. + * \return the event flags. + */ + short events(int const descriptor) const; + + /*! + * Check either a file descriptor or socket for input events. + * + * Templated helper method that calls through to event and checks for a given flag + * + * \param watchable either a file descriptor or socket known to the poller. + * \return true if there is input. + */ + template<typename Watched> + bool has_input(Watched const& watchable) const { return events(watchable) & poll_in; } + + /*! + * Check either a file descriptor or socket for output events. + * + * Templated helper method that calls through to event and checks for a given flag + * + * \param watchable either a file descriptor or socket known to the poller. + * \return true if there is output. + */ + template<typename Watched> + bool has_output(Watched const& watchable) const { return events(watchable) & poll_out; } + + /*! + * Check a file descriptor. + * + * Templated helper method that calls through to event and checks for a given flag + * + * Technically this template works for sockets as well but the error flag is never set for + * sockets so I have no idea why someone would call it. + * + * \param watchable a file descriptor know to the poller. + * \return true if there is an error. + */ + template<typename Watched> + bool has_error(Watched const& watchable) const { return events(watchable) & poll_error; } + +private: + std::vector<zmq_pollitem_t> _items; + std::unordered_map<void *, size_t> _index; + std::unordered_map<int, size_t> _fdindex; + + void reindex(size_t const index); +}; + +} + +#endif /* ZMQPP_POLLER_HPP_ */ diff --git a/dep/zmqpp/zmqpp/socket.cpp b/dep/zmqpp/zmqpp/socket.cpp new file mode 100644 index 00000000000..0c6a795966c --- /dev/null +++ b/dep/zmqpp/zmqpp/socket.cpp @@ -0,0 +1,758 @@ +/* + * Created on: 9 Aug 2011 + * Author: Ben Gray (@benjamg) + */ + +#include <array> +#include <cassert> +#include <cstring> +#include <functional> + +#include "context.hpp" +#include "exception.hpp" +#include "message.hpp" +#include "socket.hpp" + +namespace zmqpp +{ + +const int socket::normal = 0; +#if (ZMQ_VERSION_MAJOR == 2) +const int socket::dont_wait = ZMQ_NOBLOCK; +#else +const int socket::dont_wait = ZMQ_DONTWAIT; +#endif +const int socket::send_more = ZMQ_SNDMORE; +#ifdef ZMQ_EXPERIMENTAL_LABELS +const int socket::send_label = ZMQ_SNDLABEL; +#endif + +const int max_socket_option_buffer_size = 256; +const int max_stream_buffer_size = 4096; + +socket::socket(const context& context, socket_type const type) + : _socket(nullptr) + , _type(type) + , _recv_buffer() +{ + _socket = zmq_socket(context, static_cast<int>(type)); + if(nullptr == _socket) + { + throw zmq_internal_exception(); + } + + zmq_msg_init(&_recv_buffer); +} + +socket::~socket() +{ + zmq_msg_close(&_recv_buffer); + + if (nullptr != _socket) + { + +#ifndef NDEBUG // unused assert variable in release + int result = zmq_close(_socket); + assert(0 == result); +#else + zmq_close(_socket); +#endif // NDEBUG + + _socket = nullptr; + } +} + +void socket::bind(endpoint_t const& endpoint) +{ + int result = zmq_bind(_socket, endpoint.c_str()); + + if (0 != result) + { + throw zmq_internal_exception(); + } +} + +void socket::unbind(endpoint_t const& endpoint) +{ + int result = zmq_unbind(_socket, endpoint.c_str()); + + if (0 != result) + { + throw zmq_internal_exception(); + } +} + +void socket::connect(endpoint_t const& endpoint) +{ + int result = zmq_connect(_socket, endpoint.c_str()); + + if (0 != result) + { + throw zmq_internal_exception(); + } +} + +void socket::disconnect(endpoint_t const& endpoint) +{ + int result = zmq_disconnect(_socket, endpoint.c_str()); + + if (0 != result) + { + throw zmq_internal_exception(); + } +} + +void socket::close() +{ + int result = zmq_close(_socket); + + if (0 != result) + { + throw zmq_internal_exception(); + } + + _socket = nullptr; +} + +bool socket::send(message& message, bool const dont_block /* = false */) +{ + size_t parts = message.parts(); + if (parts == 0) + { + throw std::invalid_argument("sending requires messages have at least one part"); + } + + bool dont_wait = dont_block; + for(size_t i = 0; i < parts; ++i) + { + int flag = socket::normal; + if(dont_wait) { flag |= socket::dont_wait; } + if(i < (parts - 1)) { flag |= socket::send_more; } + +#if (ZMQ_VERSION_MAJOR == 2) + int result = zmq_send( _socket, &message.raw_msg(i), flag ); +#elif (ZMQ_VERSION_MAJOR < 3) || ((ZMQ_VERSION_MAJOR == 3) && (ZMQ_VERSION_MINOR < 2)) + int result = zmq_sendmsg( _socket, &message.raw_msg(i), flag ); +#else + int result = zmq_msg_send( &message.raw_msg(i), _socket, flag ); +#endif + + if (result < 0) + { + // the zmq framework should not block if the first part is accepted + // so we should only ever get this error on the first part + if((0 == i) && (EAGAIN == zmq_errno())) + { + return false; + } + + if(EINTR == zmq_errno()) + { + if (0 == message.parts()) + { + return false; + } + + // If we have an interrupt but it's not on the first part then we + // know we can safely send out the rest of the message as we can + // enforce that it won't wait on a blocking action + dont_wait = true; + continue; + } + + // sanity checking + assert(EAGAIN != zmq_errno()); + + throw zmq_internal_exception(); + } + + message.sent(i); + } + + // Leave message reference in a stable state + zmqpp::message local; + std::swap(local, message); + return true; +} + +bool socket::receive(message& message, bool const dont_block /* = false */) +{ + if (message.parts() > 0) + { + // swap and discard old message + zmqpp::message local; + std::swap(local, message); + } + + int flags = (dont_block) ? socket::dont_wait : socket::normal; + bool more = true; + + while(more) + { +#if (ZMQ_VERSION_MAJOR == 2) + int result = zmq_recv( _socket, &_recv_buffer, flags ); +#elif (ZMQ_VERSION_MAJOR < 3) || ((ZMQ_VERSION_MAJOR == 3) && (ZMQ_VERSION_MINOR < 2)) + int result = zmq_recvmsg( _socket, &_recv_buffer, flags ); +#else + int result = zmq_msg_recv( &_recv_buffer, _socket, flags ); +#endif + + if(result < 0) + { + if ((0 == message.parts()) && (EAGAIN == zmq_errno())) + { + return false; + } + + if(EINTR == zmq_errno()) + { + if (0 == message.parts()) + { + return false; + } + + // If we have an interrupt but it's not on the first part then we + // know we can safely pull out the rest of the message as it will + // not be blocking + continue; + } + + assert(EAGAIN != zmq_errno()); + + throw zmq_internal_exception(); + } + + zmq_msg_t& dest = message.raw_new_msg(); + zmq_msg_move(&dest, &_recv_buffer); + + get(socket_option::receive_more, more); + } + + return true; +} + + +bool socket::send(std::string const& string, int const flags /* = NORMAL */) +{ + return send_raw(string.data(), string.size(), flags); +} + +bool socket::receive(std::string& string, int const flags /* = NORMAL */) +{ +#if (ZMQ_VERSION_MAJOR == 2) + int result = zmq_recv( _socket, &_recv_buffer, flags ); +#elif (ZMQ_VERSION_MAJOR < 3) || ((ZMQ_VERSION_MAJOR == 3) && (ZMQ_VERSION_MINOR < 2)) + int result = zmq_recvmsg( _socket, &_recv_buffer, flags ); +#else + int result = zmq_msg_recv( &_recv_buffer, _socket, flags ); +#endif + + if(result >= 0) + { + string.reserve(zmq_msg_size(&_recv_buffer)); + string.assign(static_cast<char*>(zmq_msg_data(&_recv_buffer)), zmq_msg_size(&_recv_buffer)); + + return true; + } + + if (EAGAIN == zmq_errno() || EINTR == zmq_errno()) + { + return false; + } + + throw zmq_internal_exception(); +} + + +bool socket::send_raw(char const* buffer, int const length, int const flags /* = NORMAL */) +{ +#if (ZMQ_VERSION_MAJOR == 2) + zmq_msg_t msg; + int result = zmq_msg_init_size(&msg, length); + if (result != 0) + { + zmq_internal_exception(); + } + + memcpy(zmq_msg_data(&msg), buffer, length); + result = zmq_send(_socket, &msg, flags); +#else + int result = zmq_send(_socket, buffer, length, flags); +#endif + if(result >= 0) + { + return true; + } + +#if (ZMQ_VERSION_MAJOR == 2) + // only actually need to close this on error + zmq_msg_close(&msg); +#endif + + if (EAGAIN == zmq_errno() || EINTR == zmq_errno()) + { + return false; + } + + throw zmq_internal_exception(); +} + +bool socket::receive_raw(char* buffer, int& length, int const flags /* = NORMAL */) +{ +#if (ZMQ_VERSION_MAJOR == 2) + int result = zmq_recv( _socket, &_recv_buffer, flags ); +#elif (ZMQ_VERSION_MAJOR < 3) || ((ZMQ_VERSION_MAJOR == 3) && (ZMQ_VERSION_MINOR < 2)) + int result = zmq_recvmsg( _socket, &_recv_buffer, flags ); +#else + int result = zmq_msg_recv( &_recv_buffer, _socket, flags ); +#endif + + if(result >= 0) + { + length = zmq_msg_size(&_recv_buffer); + memcpy(buffer, zmq_msg_data(&_recv_buffer), length); + + return true; + } + + if (EAGAIN == zmq_errno() || EINTR == zmq_errno()) + { + return false; + } + + throw zmq_internal_exception(); +} + + +// Helper +void socket::subscribe(std::string const& topic) +{ + set(socket_option::subscribe, topic); +} + +void socket::unsubscribe(std::string const& topic) +{ + set(socket_option::unsubscribe, topic); +} + +bool socket::has_more_parts() const +{ + return get<bool>(socket_option::receive_more); +} + + +// Set socket options for different types of option +void socket::set(socket_option const option, int const value) +{ + switch(option) + { + // unsigned 64bit Integers +#if (ZMQ_VERSION_MAJOR == 2) + case socket_option::high_water_mark: + case socket_option::send_buffer_size: + case socket_option::receive_buffer_size: +#endif + case socket_option::affinity: + if (value < 0) { throw exception("attempting to set an unsigned 64 bit integer option with a negative integer"); } + set(option, static_cast<uint64_t>(value)); + break; + + // 64bit Integers +#if (ZMQ_VERSION_MAJOR == 2) + case socket_option::rate: + case socket_option::recovery_interval: + case socket_option::recovery_interval_seconds: + case socket_option::swap_size: +#else + case socket_option::max_messsage_size: +#endif + set(option, static_cast<int64_t>(value)); + break; + + // Boolean +#if (ZMQ_VERSION_MAJOR > 3) || ((ZMQ_VERSION_MAJOR == 3) && (ZMQ_VERSION_MINOR >= 1)) + case socket_option::ipv4_only: +#endif +#if (ZMQ_VERSION_MAJOR == 2) + case socket_option::multicast_loopback: +#endif +#if (ZMQ_VERSION_MAJOR > 3) || ((ZMQ_VERSION_MAJOR == 3) && (ZMQ_VERSION_MINOR >= 2)) +#if (ZMQ_VERSION_MINOR == 2) + case socket_option::delay_attach_on_connect: +#else + case socket_option::immediate: +#endif + case socket_option::router_mandatory: + case socket_option::xpub_verbose: +#endif + if (value == 0) { set(option, false); } + else if (value == 1) { set(option, true); } + else { throw exception("attempting to set a boolean option with a non 0 or 1 integer"); } + break; + + // Default or Boolean +#if (ZMQ_VERSION_MAJOR > 3) || ((ZMQ_VERSION_MAJOR == 3) && (ZMQ_VERSION_MINOR >= 2)) + case socket_option::tcp_keepalive: + if (value < -1 || value > 1) { throw exception("attempting to set a default or boolean option with a non -1, 0 or 1 integer"); } + if (0 != zmq_setsockopt(_socket, static_cast<int>(option), &value, sizeof(value))) + { + throw zmq_internal_exception(); + } + break; +#endif + + // Integers that require +ve numbers +#if (ZMQ_VERSION_MAJOR == 2) + case socket_option::reconnect_interval_max: +#else + case socket_option::reconnect_interval_max: + case socket_option::send_buffer_size: + case socket_option::recovery_interval: + case socket_option::receive_buffer_size: + case socket_option::send_high_water_mark: + case socket_option::receive_high_water_mark: + case socket_option::multicast_hops: + case socket_option::rate: +#endif + case socket_option::backlog: + if (value < 0) { throw exception("attempting to set a positive only integer option with a negative integer"); } + // Integers + case socket_option::reconnect_interval: + case socket_option::linger: + case socket_option::receive_timeout: + case socket_option::send_timeout: +#if (ZMQ_VERSION_MAJOR > 3) || ((ZMQ_VERSION_MAJOR == 3) && (ZMQ_VERSION_MINOR >= 2)) + case socket_option::tcp_keepalive_idle: + case socket_option::tcp_keepalive_count: + case socket_option::tcp_keepalive_interval: +#endif + if (0 != zmq_setsockopt(_socket, static_cast<int>(option), &value, sizeof(value))) + { + throw zmq_internal_exception(); + } + break; + default: + throw exception("attempting to set a non signed integer option with a signed integer value"); + } +} + +void socket::set(socket_option const option, bool const value) +{ + switch(option) + { +#if (ZMQ_VERSION_MAJOR == 2) + case socket_option::multicast_loopback: +#endif +#if (ZMQ_VERSION_MAJOR > 3) || ((ZMQ_VERSION_MAJOR == 3) && (ZMQ_VERSION_MINOR >= 1)) + case socket_option::ipv4_only: +#endif +#if (ZMQ_VERSION_MAJOR > 3) || ((ZMQ_VERSION_MAJOR == 3) && (ZMQ_VERSION_MINOR >= 2)) +#if (ZMQ_VERSION_MINOR == 2) + case socket_option::delay_attach_on_connect: +#else + case socket_option::immediate: +#endif + case socket_option::router_mandatory: + case socket_option::xpub_verbose: +#endif + { + int ivalue = value ? 1 : 0; + if (0 != zmq_setsockopt(_socket, static_cast<int>(option), &ivalue, sizeof(ivalue))) + { + throw zmq_internal_exception(); + } + break; + } + default: + throw exception("attempting to set a non boolean option with a boolean value"); + } +} + +void socket::set(socket_option const option, uint64_t const value) +{ + switch(option) + { +#if (ZMQ_VERSION_MAJOR == 2) + // unsigned 64bit Integers + case socket_option::high_water_mark: + case socket_option::send_buffer_size: + case socket_option::receive_buffer_size: +#endif + case socket_option::affinity: + if (0 != zmq_setsockopt(_socket, static_cast<int>(option), &value, sizeof(value))) + { + throw zmq_internal_exception(); + } + break; + default: + throw exception("attempting to set a non unsigned 64 bit integer option with a unsigned 64 bit integer value"); + } +} + +void socket::set(socket_option const option, int64_t const value) +{ + switch(option) + { +#if (ZMQ_VERSION_MAJOR == 2) + case socket_option::rate: + case socket_option::recovery_interval: + case socket_option::recovery_interval_seconds: + case socket_option::swap_size: +#else + case socket_option::max_messsage_size: +#endif + // zmq only allowed +ve int64_t options + if (value < 0) { throw exception("attempting to set a positive only 64 bit integer option with a negative 64bit integer"); } + if (0 != zmq_setsockopt(_socket, static_cast<int>(option), &value, sizeof(value))) + { + throw zmq_internal_exception(); + } + break; + default: + throw exception("attempting to set a non 64 bit integer option with a 64 bit integer value"); + } +} + +void socket::set(socket_option const option, char const* value, size_t const length) +{ + switch(option) + { + case socket_option::identity: + case socket_option::subscribe: + case socket_option::unsubscribe: +#if (ZMQ_VERSION_MAJOR > 3) || ((ZMQ_VERSION_MAJOR == 3) && (ZMQ_VERSION_MINOR >= 2)) + case socket_option::tcp_accept_filter: +#endif + if (0 != zmq_setsockopt(_socket, static_cast<int>(option), value, length)) + { + throw zmq_internal_exception(); + } + break; + default: + throw exception("attempting to set a non string option with a string value"); + } +} + +// Get socket options, multiple versions for easy of use +void socket::get(socket_option const option, int& value) const +{ + size_t value_size = sizeof(int); + + switch(option) + { +#if (ZMQ_VERSION_MAJOR == 2) + case socket_option::receive_more: + case socket_option::multicast_loopback: + value = static_cast<int>(get<int64_t>(option)); + break; +#endif + case socket_option::type: + case socket_option::linger: + case socket_option::backlog: + case socket_option::reconnect_interval: + case socket_option::reconnect_interval_max: + case socket_option::receive_timeout: + case socket_option::send_timeout: + case socket_option::file_descriptor: + case socket_option::events: +#if (ZMQ_VERSION_MAJOR > 2) + case socket_option::receive_more: + case socket_option::send_buffer_size: + case socket_option::receive_buffer_size: + case socket_option::rate: + case socket_option::recovery_interval: + case socket_option::send_high_water_mark: + case socket_option::receive_high_water_mark: + case socket_option::multicast_hops: +#endif +#if (ZMQ_VERSION_MAJOR > 3) || ((ZMQ_VERSION_MAJOR == 3) && (ZMQ_VERSION_MINOR >= 1)) + case socket_option::ipv4_only: +#endif +#if (ZMQ_VERSION_MAJOR > 3) || ((ZMQ_VERSION_MAJOR == 3) && (ZMQ_VERSION_MINOR >= 2)) +#if (ZMQ_VERSION_MINOR == 2) + case socket_option::delay_attach_on_connect: +#else + case socket_option::immediate: +#endif + case socket_option::tcp_keepalive: + case socket_option::tcp_keepalive_idle: + case socket_option::tcp_keepalive_count: + case socket_option::tcp_keepalive_interval: +#endif +#ifdef ZMQ_EXPERIMENTAL_LABELS + case socket_option::receive_label: +#endif + if (0 != zmq_getsockopt(_socket, static_cast<int>(option), &value, &value_size)) + { + throw zmq_internal_exception(); + } + + // sanity check + assert(value_size <= sizeof(int)); + break; + default: + throw exception("attempting to get a non integer option with an integer value"); + } +} + +void socket::get(socket_option const option, bool& value) const +{ +#if (ZMQ_VERSION_MAJOR == 2) + int64_t int_value = 0; + size_t value_size = sizeof(int64_t); +#else + int int_value = 0; + size_t value_size = sizeof(int); +#endif + + switch(option) + { + case socket_option::receive_more: +#if (ZMQ_VERSION_MAJOR == 2) + case socket_option::multicast_loopback: +#endif +#if (ZMQ_VERSION_MAJOR > 3) || ((ZMQ_VERSION_MAJOR == 3) && (ZMQ_VERSION_MINOR >= 1)) + case socket_option::ipv4_only: +#endif +#if (ZMQ_VERSION_MAJOR > 3) || ((ZMQ_VERSION_MAJOR == 3) && (ZMQ_VERSION_MINOR >= 2)) +#if (ZMQ_VERSION_MINOR == 2) + case socket_option::delay_attach_on_connect: +#else + case socket_option::immediate: +#endif +#endif +#ifdef ZMQ_EXPERIMENTAL_LABELS + case socket_option::receive_label: +#endif + if (0 != zmq_getsockopt(_socket, static_cast<int>(option), &int_value, &value_size)) + { + throw zmq_internal_exception(); + } + + value = (int_value == 1) ? true : false; + break; + default: + throw exception("attempting to get a non boolean option with a boolean value"); + } +} + +void socket::get(socket_option const option, uint64_t& value) const +{ + size_t value_size = sizeof(uint64_t); + + switch(option) + { +#if (ZMQ_VERSION_MAJOR == 2) + case socket_option::high_water_mark: + case socket_option::send_buffer_size: + case socket_option::receive_buffer_size: +#endif + case socket_option::affinity: + if(0 != zmq_getsockopt(_socket, static_cast<int>(option), &value, &value_size)) + { + throw zmq_internal_exception(); + } + break; + default: + throw exception("attempting to get a non unsigned 64 bit integer option with an unsigned 64 bit integer value"); + } +} + +void socket::get(socket_option const option, int64_t& value) const +{ + size_t value_size = sizeof(int64_t); + + switch(option) + { +#if (ZMQ_VERSION_MAJOR == 2) + case socket_option::rate: + case socket_option::recovery_interval: + case socket_option::recovery_interval_seconds: + case socket_option::swap_size: + case socket_option::receive_more: + case socket_option::multicast_loopback: +#else + case socket_option::max_messsage_size: +#endif + if(0 != zmq_getsockopt(_socket, static_cast<int>(option), &value, &value_size)) + { + throw zmq_internal_exception(); + } + break; + default: + throw exception("attempting to get a non 64 bit integer option with an 64 bit integer value"); + } +} + +void socket::get(socket_option const option, std::string& value) const +{ + static std::array<char, max_socket_option_buffer_size> buffer; + size_t size = max_socket_option_buffer_size; + + switch(option) + { + case socket_option::identity: +#if (ZMQ_VERSION_MAJOR > 3) || ((ZMQ_VERSION_MAJOR == 3) && (ZMQ_VERSION_MINOR >= 2)) + case socket_option::last_endpoint: +#endif + if(0 != zmq_getsockopt(_socket, static_cast<int>(option), buffer.data(), &size)) + { + throw zmq_internal_exception(); + } + + value.assign(buffer.data(), size); + break; + default: + throw exception("attempting to get a non string option with a string value"); + } +} + +socket::socket(socket&& source) NOEXCEPT + : _socket(source._socket) + , _type(source._type) + , _recv_buffer() +{ + // we steal the zmq_msg_t from the valid socket, we only init our own because it's cheap + // and zmq_msg_move does a valid check + zmq_msg_init(&_recv_buffer); + zmq_msg_move(&_recv_buffer, &source._recv_buffer); + + // Clean up source a little, we will handle the deinit, it doesn't need to + source._socket = nullptr; +} + +socket& socket::operator=(socket&& source) NOEXCEPT +{ + std::swap(_socket, source._socket); + + _type = source._type; // just clone? + + // we steal the zmq_msg_t from the valid socket, we only init our own because it's cheap + // and zmq_msg_move does a valid check + zmq_msg_init(&_recv_buffer); + zmq_msg_move(&_recv_buffer, &source._recv_buffer); + + return *this; +} + + +socket::operator bool() const +{ + return nullptr != _socket; +} + + +socket::operator void*() const +{ + return _socket; +} + +void socket::track_message(message const& /* message */, uint32_t const parts, bool& should_delete) +{ + if (parts == 0) + { + should_delete = true; + } +} + +} diff --git a/dep/zmqpp/zmqpp/socket.hpp b/dep/zmqpp/zmqpp/socket.hpp new file mode 100644 index 00000000000..279bf801f77 --- /dev/null +++ b/dep/zmqpp/zmqpp/socket.hpp @@ -0,0 +1,500 @@ +/** + * \file + * + * \date 9 Aug 2011 + * \author Ben Gray (\@benjamg) + */ + +#ifndef ZMQPP_SOCKET_HPP_ +#define ZMQPP_SOCKET_HPP_ + +#include <cstring> +#include <string> +#include <list> + +#include <zmq.h> + +#include "compatibility.hpp" + +#include "socket_types.hpp" +#include "socket_options.hpp" + +namespace zmqpp +{ + +class context; +class message; + +typedef std::string endpoint_t; +typedef context context_t; +typedef message message_t; + +/*! + * The socket class represents the zmq sockets. + * + * A socket can be bound and/or connected to as many endpoints as required + * with the sole exception of a ::pair socket. + * + * The routing is handled by zmq based on the type set. + * + * The bound side of an inproc connection must occur first and inproc can only + * connect to other inproc sockets of the same context. + * + * This class is c++0x move supporting and cannot be copied. + */ +class socket +{ +public: + static const int normal; /*!< /brief default send type, no flags set */ +#if (ZMQ_VERSION_MAJOR == 2) + static const int dont_wait; /*!< /brief don't block if sending is not currently possible */ +#else + static const int dont_wait; /*!< /brief don't block if sending is not currently possible */ +#endif + static const int send_more; /*!< /brief more parts will follow this one */ +#ifdef ZMQ_EXPERIMENTAL_LABELS + static const int send_label; /*!< /brief this message part is an internal zmq label */ +#endif + + /*! + * Create a socket for a given type. + * + * \param context the zmq context under which the socket will live + * \param type a valid ::socket_type for the socket + */ + socket(context_t const& context, socket_type const type); + + /*! + * This will close any socket still open before returning + */ + ~socket(); + + /*! + * Get the type of the socket, this works on zmqpp types and not the zmq internal types. + * Use the socket::get method if you wish to intergoate the zmq internal ones. + * + * \return the type of the socket + */ + socket_type type() const { return _type; } + + /*! + * Asynchronously binds to an endpoint. + * + * \param endpoint the zmq endpoint to bind to + */ + void bind(endpoint_t const& endpoint); + + /*! + * Unbinds from a previously bound endpoint. + * + * \param endpoint the zmq endpoint to bind to + */ + void unbind(endpoint_t const& endpoint); + + /*! + * Asynchronously connects to an endpoint. + * If the endpoint is not inproc then zmq will happily keep trying + * to connect until there is something there. + * + * Inproc sockets must have a valid target already bound before connection + * will work. + * + * \param endpoint the zmq endpoint to connect to + */ + void connect(endpoint_t const& endpoint); + + /*! + * Asynchronously connects to multiple endpoints. + * If the endpoint is not inproc then zmq will happily keep trying + * to connect until there is something there. + * + * Inproc sockets must have a valid target already bound before connection + * will work. + * + * This is a helper function that wraps the single item connect in a loop + * + * \param connections_begin the starting iterator for zmq endpoints. + * \param connections_end the final iterator for zmq endpoints. + */ + template<typename InputIterator> + void connect(InputIterator const& connections_begin, InputIterator const& connections_end) + { + for(InputIterator it = connections_begin; it != connections_end; ++it) + { + connect(*it); + } + } + + + /*! + * Disconnects a previously connected endpoint. + * + * \param endpoint the zmq endpoint to disconnect from + */ + void disconnect(endpoint_t const& endpoint); + + /*! + * Disconnects from multiple previously connected endpoints. + * + * This is a helper function that wraps the single item disconnect in a loop + * + * \param disconnections_begin the starting iterator for zmq endpoints. + * \param disconnections_end the final iterator for zmq endpoints. + */ + template<typename InputIterator> + void disconnect(InputIterator const& disconnections_begin, InputIterator const& disconnections_end) + { + for(InputIterator it = disconnections_begin; it != disconnections_end; ++it) + { + disconnect(*it); + } + } + + /*! + * Closes the internal zmq socket and marks this instance + * as invalid. + */ + void close(); + + /*! + * Sends the message over the connection, this may be a multipart message. + * + * If dont_block is true and we are unable to add a new message then this + * function will return false. + * + * \param message message to send + * \param dont_block boolean to dictate if we wait while sending. + * \return true if message sent, false if it would have blocked + */ + bool send(message_t& message, bool const dont_block = false); + + /*! + * Gets a message from the connection, this may be a multipart message. + * + * If dont_block is true and we are unable to get a message then this + * function will return false. + * + * \param message reference to fill with received data + * \param dont_block boolean to dictate if we wait for data. + * \return true if message sent, false if it would have blocked + */ + bool receive(message_t& message, bool const dont_block = false); + + /*! + * Sends the byte data held by the string as the next message part. + * + * If the socket::DONT_WAIT flag and we are unable to add a new message to + * socket then this function will return false. + * + * \param string message part to send + * \param flags message send flags + * \return true if message part sent, false if it would have blocked + */ + bool send(std::string const& string, int const flags = normal); + + /*! + * If there is a message ready then get the next part as a string + * + * If the socket::DONT_WAIT flag and there is no message ready to receive + * then this function will return false. + * + * \param string message part to receive into + * \param flags message receive flags + * \return true if message part received, false if it would have blocked + */ + bool receive(std::string& string, int const flags = normal); + + /*! + * Sends the byte data pointed to by buffer as the next part of the message. + * + * If the socket::DONT_WAIT flag and we are unable to add a new message to + * socket then this function will return false. + * + * \param buffer byte buffer pointer to start writing from + * \param length max length of the buffer + * \param flags message send flags + * \return true if message part sent, false if it would have blocked + */ + bool send_raw(char const* buffer, int const length, int const flags = normal); + + /*! + * \warning If the buffer is not large enough for the message part then the + * data will be truncated. The rest of the part is lost forever. + * + * If there is a message ready then get the next part of it as a raw + * byte buffer. + * + * If the socket::DONT_WAIT flag and there is no message ready to receive + * then this function will return false. + * + * \param buffer byte buffer pointer to start writing to + * \param length max length of the buffer + * \param flags message receive flags + * \return true if message part received, false if it would have blocked + */ + bool receive_raw(char* buffer, int& length, int const flags = normal); + + /*! + * + * Subscribe to a topic + * + * Helper function that is equivalent of calling + * \code + * set(zmqpp::socket_option::subscribe, topic); + * \endcode + * + * This method is only useful for subscribe type sockets. + * + * \param topic the topic to subscribe to. + */ + void subscribe(std::string const& topic); + + /*! + * Subscribe to a topic + * + * Helper function that is equivalent of a loop calling + * \code + * set(zmqpp::socket_option::subscribe, topic); + * \endcode + * + * This method is only useful for subscribe type sockets. + * + * Generally this will be used with stl collections using begin() and + * end() functions to get the iterators. + * For this reason the end loop runs until the end iterator, not inclusive + * of it. + * + * \param topics_begin the starting iterator for topics. + * \param topics_end the final iterator for topics. + */ + template<typename InputIterator> + void subscribe(InputIterator const& topics_begin, InputIterator const& topics_end) + { + for(InputIterator it = topics_begin; it != topics_end; ++it) + { + subscribe(*it); + } + } + + /*! + * Unsubscribe from a topic + * + * Helper function that is equivalent of calling + * \code + * set(zmqpp::socket_option::unsubscribe, topic); + * \endcode + * + * This method is only useful for subscribe type sockets. + * + * \param topic the topic to unsubscribe from. + */ + void unsubscribe(std::string const& topic); + + /*! + * Unsubscribe from a topic + * + * Helper function that is equivalent of a loop calling + * \code + * set(zmqpp::socket_option::unsubscribe, topic); + * \endcode + * + * This method is only useful for subscribe type sockets. + * + * Generally this will be used with stl collections using begin() and + * end() functions to get the iterators. + * For this reason the end loop runs until the end iterator, not inclusive + * of it. + * + * \param topics_begin the starting iterator for topics. + * \param topics_end the final iterator for topics. + */ + template<typename InputIterator> + void unsubscribe(InputIterator const& topics_begin, InputIterator const& topics_end) + { + for(InputIterator it = topics_begin; it != topics_end; ++it) + { + unsubscribe(*it); + } + } + + /*! + * If the last receive part call to the socket resulted + * in a label or a non-terminating part of a multipart + * message this will return true. + * + * \return true if there are more parts + */ + bool has_more_parts() const; + + /*! + * Set the value of an option in the underlaying zmq socket. + * + * \param option a valid ::socket_option + * \param value to set the option to + */ + void set(socket_option const option, int const value); + + /*! + * Set the value of an option in the underlaying zmq socket. + * + * \since 2.0.0 (built against 0mq version 3.1.x or later) + * + * \param option a valid ::socket_option + * \param value to set the option to + */ + void set(socket_option const option, bool const value); + + /*! + * Set the value of an option in the underlaying zmq socket. + * + * \param option a valid ::socket_option + * \param value to set the option to + */ + void set(socket_option const option, uint64_t const value); + + /*! + * Set the value of an option in the underlaying zmq socket. + * + * \param option a valid ::socket_option + * \param value to set the option to + */ + void set(socket_option const option, int64_t const value); + + /*! + * Set the value of an option in the underlaying zmq socket. + * + * \param option a valid ::socket_option + * \param pointer to raw byte value to set the option to + * \param length the size of the raw byte value + */ + void set(socket_option const option, char const* value, size_t const length); + + /*! + * Set the value of an option in the underlaying zmq socket. + * + * \param option a valid ::socket_option + * \param pointer to null terminated cstring value to set the option to + */ + inline void set(socket_option const option, char const* value) { set(option, value, strlen(value)); } + + /*! + * Set the value of an option in the underlaying zmq socket. + * + * \param option a valid ::socket_option + * \param value to set the option to + */ + inline void set(socket_option const option, std::string const value) { set(option, value.c_str(), value.length()); } + + /*! + * Get a socket option from the underlaying zmq socket. + * + * \param option a valid ::socket_option + * \param value referenced int to return value in + */ + void get(socket_option const option, int& value) const; + + /*! + * Get a socket option from the underlaying zmq socket. + * + * \param option a valid ::socket_option + * \param value referenced bool to return value in + */ + void get(socket_option const option, bool& value) const; + + /*! + * Get a socket option from the underlaying zmq socket. + * + * \param option a valid ::socket_option + * \param value referenced uint64_t to return value in + */ + void get(socket_option const option, uint64_t& value) const; + + /*! + * Get a socket option from the underlaying zmq socket. + * + * \param option a valid ::socket_option + * \param value referenced uint64_t to return value in + */ + void get(socket_option const option, int64_t& value) const; + + /*! + * Get a socket option from the underlaying zmq socket. + * + * \param option a valid ::socket_option + * \param value referenced std::string to return value in + */ + void get(socket_option const option, std::string& value) const; + + /*! + * For those that don't want to get into a referenced value this templated method + * will return the value instead. + * + * \param option a valid ::socket_option + * \return socket option value + */ + template<typename Type> + Type get(socket_option const option) const + { + Type value = Type(); + get(option, value); + return value; + } + + /*! + * Move constructor + * + * Moves the internals of source to this object, there is no guarantee + * that source will be left in a valid state. + * + * This constructor is noexcept and so will not throw exceptions + * + * \param source target socket to steal internals from + */ + socket(socket&& source) NOEXCEPT; + + /*! + * Move operator + * + * Moves the internals of source to this object, there is no guarantee + * that source will be left in a valid state. + * + * This function is noexcept and so will not throw exceptions + * + * \param source target socket to steal internals from + * \return socket reference to this + */ + socket& operator=(socket&& source) NOEXCEPT; + + /*! + * Check the socket is still valid + * + * This tests the internal state of the socket. + * If creation failed for some reason or if the move functions were used + * to move the socket internals to another instance this will return false. + * + * \return true if the socket is valid + */ + operator bool() const; + + /*! + * Access to the raw 0mq context + * + * \return void pointer to the underlying 0mq socket + */ + operator void*() const; + +private: + void* _socket; + socket_type _type; + zmq_msg_t _recv_buffer; + + // No copy + socket(socket const&) NOEXCEPT ZMQPP_EXPLICITLY_DELETED; + socket& operator=(socket const&) NOEXCEPT ZMQPP_EXPLICITLY_DELETED; + + void track_message(message_t const&, uint32_t const, bool&); +}; + +} + +#endif /* ZMQPP_SOCKET_HPP_ */ diff --git a/dep/zmqpp/zmqpp/socket_options.hpp b/dep/zmqpp/zmqpp/socket_options.hpp new file mode 100644 index 00000000000..c5c8586cbc7 --- /dev/null +++ b/dep/zmqpp/zmqpp/socket_options.hpp @@ -0,0 +1,80 @@ +/** + * \file + * + * \date 23 Sep 2011 + * \author Ben Gray (\@benjamg) + */ + +#ifndef ZMQPP_SOCKET_OPTIONS_HPP_ +#define ZMQPP_SOCKET_OPTIONS_HPP_ + +namespace zmqpp +{ + +/** \todo Expand the information on the options to make it actually useful. */ +/*! + * \brief possible Socket options in zmq + */ + +enum class socket_option { + affinity = ZMQ_AFFINITY, /*!< I/O thread affinity */ + identity = ZMQ_IDENTITY, /*!< Socket identity */ + subscribe = ZMQ_SUBSCRIBE, /*!< Add topic subscription - set only */ + unsubscribe = ZMQ_UNSUBSCRIBE, /*!< Remove topic subscription - set only */ + rate = ZMQ_RATE, /*!< Multicast data rate */ + send_buffer_size = ZMQ_SNDBUF, /*!< Kernel transmission buffer size */ + receive_buffer_size = ZMQ_RCVBUF, /*!< Kernel receive buffer size */ + receive_more = ZMQ_RCVMORE, /*!< Can receive more parts - get only */ + file_descriptor = ZMQ_FD, /*!< Socket file descriptor - get only */ + events = ZMQ_EVENTS, /*!< Socket event flags - get only */ + type = ZMQ_TYPE, /*!< Socket type - get only */ + linger = ZMQ_LINGER, /*!< Socket linger timeout */ + backlog = ZMQ_BACKLOG, /*!< Maximum length of outstanding connections - get only */ +#if (ZMQ_VERSION_MAJOR == 2) + // Note that this is inverse of the zmq names for version 2.x + recovery_interval_seconds = ZMQ_RECOVERY_IVL, /*!< Multicast recovery interval in seconds */ + recovery_interval = ZMQ_RECOVERY_IVL_MSEC, /*!< Multicast recovery interval in milliseconds */ +#else + recovery_interval = ZMQ_RECOVERY_IVL, /*!< Multicast recovery interval in milliseconds */ +#endif + reconnect_interval = ZMQ_RECONNECT_IVL, /*!< Reconnection interval */ + reconnect_interval_max = ZMQ_RECONNECT_IVL_MAX, /*!< Maximum reconnection interval */ + receive_timeout = ZMQ_RCVTIMEO, /*!< Maximum inbound block timeout */ + send_timeout = ZMQ_SNDTIMEO, /*!< Maximum outbound block timeout */ +#if (ZMQ_VERSION_MAJOR == 2) + high_water_mark = ZMQ_HWM, /*!< High-water mark for all messages */ + swap_size = ZMQ_SWAP, /*!< Maximum socket swap size in bytes */ + multicast_loopback = ZMQ_MCAST_LOOP, /*!< Allow multicast packet loopback */ +#else + max_messsage_size = ZMQ_MAXMSGSIZE, /*!< Maximum inbound message size */ + send_high_water_mark = ZMQ_SNDHWM, /*!< High-water mark for outbound messages */ + receive_high_water_mark = ZMQ_RCVHWM, /*!< High-water mark for inbound messages */ + multicast_hops = ZMQ_MULTICAST_HOPS, /*!< Maximum number of multicast hops */ +#endif +#if (ZMQ_VERSION_MAJOR > 3) || ((ZMQ_VERSION_MAJOR == 3) && (ZMQ_VERSION_MINOR >= 1)) + ipv4_only = ZMQ_IPV4ONLY, +#endif +#if (ZMQ_VERSION_MAJOR > 3) || ((ZMQ_VERSION_MAJOR == 3) && (ZMQ_VERSION_MINOR >= 2)) +#if (ZMQ_VERSION_MINOR == 2) + delay_attach_on_connect = ZMQ_DELAY_ATTACH_ON_CONNECT, /*!< Delay buffer attachment until connect complete */ +#else + // ZMQ_DELAY_ATTACH_ON_CONNECT is renamed in ZeroMQ starting 3.3.x + immediate = ZMQ_IMMEDIATE, /*!< Block message sending until connect complete */ +#endif + last_endpoint = ZMQ_LAST_ENDPOINT, /*!< Last bound endpoint - get only */ + router_mandatory = ZMQ_ROUTER_MANDATORY, /*!< Require routable messages - set only */ + xpub_verbose = ZMQ_XPUB_VERBOSE, /*!< Pass on existing subscriptions - set only */ + tcp_keepalive = ZMQ_TCP_KEEPALIVE, /*!< Enable TCP keepalives */ + tcp_keepalive_idle = ZMQ_TCP_KEEPALIVE_IDLE, /*!< TCP keepalive idle count (generally retry count) */ + tcp_keepalive_count = ZMQ_TCP_KEEPALIVE_CNT, /*!< TCP keepalive retry count */ + tcp_keepalive_interval = ZMQ_TCP_KEEPALIVE_INTVL, /*!< TCP keepalive interval */ + tcp_accept_filter = ZMQ_TCP_ACCEPT_FILTER, /*!< Filter inbound connections - set only */ +#endif +#ifdef ZMQ_EXPERIMENTAL_LABELS + receive_label = ZMQ_RCVLABEL, /*!< Received label part - get only */ +#endif +}; + +} + +#endif /* ZMQPP_SOCKET_OPTIONS_HPP_ */ diff --git a/dep/zmqpp/zmqpp/socket_types.hpp b/dep/zmqpp/zmqpp/socket_types.hpp new file mode 100644 index 00000000000..e59e71ca0e1 --- /dev/null +++ b/dep/zmqpp/zmqpp/socket_types.hpp @@ -0,0 +1,148 @@ +/** + * \file + * + * \date 23 Sep 2011 + * \author Ben Gray (\@benjamg) + */ + +#ifndef ZMQPP_SOCKET_TYPES_HPP_ +#define ZMQPP_SOCKET_TYPES_HPP_ + +namespace zmqpp +{ + +/*! + * \brief Socket types allowed by zmq + * + * The socket type choose at creation must be one of these types. + * + * Each is designed for a different use and has different limitations. + */ +enum class socket_type { + /*! + * One to one - two way connection.\n + * Connect to ::pair.\n + * A \c pair socket has to be connected only one other pair socket and allows + * two way communication between them. + */ + pair = ZMQ_PAIR, + + /*! + * One to many - fan out.\n + * Connect to ::subscribe or ::xsubscribe.\n + * Socket is send only.\n + * Socket will drop messages and not block.\n + * \c publish sockets allow sending of the same message to many subscribers + * each subscriber can limit what is sent through the socket_option::subscribe + * settings. + */ + publish = ZMQ_PUB, + + /*! + * \note It seems doxygen can't work out which data is for the socket type and + * which is for the socket option so both get listed for both. + * + * One to many - fair-queued.\n + * Connect to ::publish or ::xpublish.\n + * Socket is receive only.\n + * The \c subscribe socket can connection to any number of publishers and will + * fairly pull messages from each. The socket_option::subscribe settings can + * be use to limit which messages are received and by default none are. + */ + subscribe = ZMQ_SUB, + + /*! + * One to many - fair-queued.\n + * Connect to ::push.\n + * Socket is receive only.\n + * The \c pull socket fairly pulls messages from all pushers it is connected + * to. + */ + pull = ZMQ_PULL, + + /*! + * One to many - load-balanced.\n + * Connect to ::pull.\n + * Socket is send only.\n + * Socket will block if unable to send.\n + * The \c push socket fairly distributes messages between any connected + * puller sockets. + */ + push = ZMQ_PUSH, + + /*! + * One to many - fair-queued outgoing, last peer incoming.\n + * Connect to ::reply or ::xreply.\n + * Socket flips between send and receive only.\n + * Socket will block if unable to send.\n + * The \c request socket will fairly balance requests sent out to a + * replier and then can only be used to receive until that replier + * sends a reply. + */ + request = ZMQ_REQ, + + /*! + * One to many - load-balanced incoming, last peer outgoing.\n + * Connect to ::request or ::xrequest.\n + * Socket flips between send and receive only.\n + * Socket will drop messages and not block.\n + * The \c reply socket can only receive until it pulls a message from a + * requester at which point it can only send until the reply is sent. + */ + reply = ZMQ_REP, + + /*! + * One to many - fan out.\n + * Connect to ::subscribe or ::xsubscribe.\n + * Socket is send only with the exception of special subscription messages.\n + * Socket will drop messages and not block.\n + * \c xpublish act the same as ::publish sockets however also allow special + * subscription messages to be received from subscribers. + */ + xpublish = ZMQ_XPUB, + + /*! + * One to many - fair-queued.\n + * Connect to ::publish or ::xpublish.\n + * Socket is receive only with the exception of special subscription messages\n + * \c xsubscribe act the same as ::subscribe sockets however also allow special + * subscription messages to be send to connected publishers. + */ + xsubscribe = ZMQ_XSUB, + + /*! + * One to many - fair-queued incoming, load-balanced outgoing.\n + * Connect to ::reply or ::xreply.\n + * Socket will block if unable to send.\n + * An \c xrequest socket balances requests between repliers and pulls replies + * back in a fair manner. Each request is expected to have exactly one reply. + */ + xrequest = ZMQ_XREQ, + + /*! + * One to many - fair-queued incoming, targeted outgoing.\n + * Connect to ::request or ::xrequest.\n + * Socket will drop messages and not block.\n + * An \c xreply socket fairly pulls in requests from requesters and will + * label requests so it can return replies back to the correct target. + */ + xreply = ZMQ_XREP, + + // To match for people who prefer the shorter versions + pub = ZMQ_PUB, /*!< version of ::publish to match zmq name convention */ + sub = ZMQ_SUB, /*!< version of ::subscribe to match zmq name convention */ + req = ZMQ_REQ, /*!< version of ::request to match zmq name convention */ + rep = ZMQ_REP, /*!< version of ::reply to match zmq name convention */ + xpub = ZMQ_XPUB, /*!< version of ::xpublish to match zmq name convention */ + xsub = ZMQ_XSUB, /*!< version of ::xsubscribe to match zmq name convention */ + xreq = ZMQ_XREQ, /*!< version of ::xrequest to match zmq name convention */ + xrep = ZMQ_XREP, /*!< version of ::xreply to match zmq name convention */ + + // For completion + router = ZMQ_ROUTER, /*!< \deprecated Matches zmq 2.x xrep functionality. */ + dealer = ZMQ_DEALER /*!< \deprecated Matches zmq 2.x xreq functionality. */ +}; + +} + +#endif /* ZMQPP_SOCKET_TYPES_HPP_ */ diff --git a/dep/zmqpp/zmqpp/zmqpp.cpp b/dep/zmqpp/zmqpp/zmqpp.cpp new file mode 100644 index 00000000000..216948e73e7 --- /dev/null +++ b/dep/zmqpp/zmqpp/zmqpp.cpp @@ -0,0 +1,30 @@ +/* + * Created on: 18 Aug 2011 + * Author: Ben Gray (@benjamg) + */ + +#include "zmqpp.hpp" + +namespace zmqpp +{ + +std::string version() +{ + return BUILD_VERSION; +} + +void version(uint8_t& major, uint8_t& minor, uint8_t& revision) +{ + major = ZMQPP_VERSION_MAJOR; + minor = ZMQPP_VERSION_MINOR; + revision = ZMQPP_VERSION_REVISION; +} + +void zmq_version(uint8_t& major, uint8_t& minor, uint8_t& patch) +{ + major = ZMQ_VERSION_MAJOR; + minor = ZMQ_VERSION_MINOR; + patch = ZMQ_VERSION_PATCH; +} + +} diff --git a/dep/zmqpp/zmqpp/zmqpp.hpp b/dep/zmqpp/zmqpp/zmqpp.hpp new file mode 100644 index 00000000000..92a47ce38bf --- /dev/null +++ b/dep/zmqpp/zmqpp/zmqpp.hpp @@ -0,0 +1,111 @@ +/** + * \file + * + * \date 9 Aug 2011 + * \author Ben Gray (\@benjamg) + * + * License: http://www.opensource.org/licenses/MIT + * + * Copyright (C) 2011 by Ben Gray + * + * Permission is hereby granted, free of charge, to any person obtaining a copy + * of this software and associated documentation files (the "Software"), to deal + * in the Software without restriction, including without limitation the rights + * to use, copy, modify, merge, publish, distribute, sublicense, and/or sell + * copies of the Software, and to permit persons to whom the Software is + * furnished to do so, subject to the following conditions: + * + * The above copyright notice and this permission notice shall be included in + * all copies or substantial portions of the Software. + * + * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR + * IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, + * FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE + * AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER + * LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, + * OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN + * THE SOFTWARE. + */ + +#ifndef ZMQPP_ZMQPP_HPP_ +#define ZMQPP_ZMQPP_HPP_ + +/** + * \def ZMQPP_VERSION_MAJOR + * zmqpp major version number, generated at compile time + */ +#define ZMQPP_VERSION_MAJOR BUILD_VERSION_MAJOR + +/** + * \def ZMQPP_VERSION_MINOR + * zmqpp minor version number, generated at compile time + */ +#define ZMQPP_VERSION_MINOR BUILD_VERSION_MINOR + +/** + * \def ZMQPP_VERSION_REVISION + * zmqpp version revision number, generated at compile time + */ +#define ZMQPP_VERSION_REVISION BUILD_VERSION_REVISION + +#include <zmq.h> + +#include "compatibility.hpp" +#include "context.hpp" +#include "exception.hpp" +#include "message.hpp" +#include "poller.hpp" +#include "socket.hpp" + +/*! + * \brief C++ wrapper around zmq + * + * All zmq++ / zmqpp functions, constants and classes live within this namespace, + */ +namespace zmqpp +{ + +/*! + * Returns the current major.minor.revision version number as a string. + * + * \return string version number. + */ +std::string version(); + +/*! + * Retrieve the parts of the zmqpp version number. + * + * Set the three parameters to values representing the zmqpp version number. + * These values are generated at library compile time. + * + * \param major an unsigned 8 bit reference to set to the major version. + * \param minor an unsigned 8 bit reference to set to the minor version. + * \param revision an unsigned 8 bit reference to set the current revision. + */ +void version(uint8_t& major, uint8_t& minor, uint8_t& revision); + +/*! + * Retrieve the parts of the 0mq version this library was built against. + * + * Because sections of the library are optionally compiled in or ignored + * depending on the version of 0mq it was compiled against this method is + * provided to allow sanity checking for usage. + * + * Set the three parameters to values representing the 0mq version number. + * These values are generated at library compile time. + * + * \param major an unsigned 8 bit reference to set to the major version. + * \param minor an unsigned 8 bit reference to set to the minor version. + * \param revision an unsigned 8 bit reference to set the current revision. + */ +void zmq_version(uint8_t& major, uint8_t& minor, uint8_t& patch); + +typedef context context_t; /*!< \brief context type */ +typedef std::string endpoint_t; /*!< \brief endpoint type */ +typedef message message_t; /*!< \brief message type */ +typedef poller poller_t; /*!< \brief poller type */ +typedef socket socket_t; /*!< \brief socket type */ + +} + +#endif /* ZMQPP_ZMQPP_HPP_ */ diff --git a/src/server/CMakeLists.txt b/src/server/CMakeLists.txt index ea2ad3abac7..e691b9527a5 100644 --- a/src/server/CMakeLists.txt +++ b/src/server/CMakeLists.txt @@ -25,6 +25,7 @@ if( SERVERS ) add_subdirectory(game) add_subdirectory(collision) add_subdirectory(authserver) + add_subdirectory(ipc) add_subdirectory(bnetserver) add_subdirectory(scripts) add_subdirectory(worldserver) diff --git a/src/server/bnetserver/CMakeLists.txt b/src/server/bnetserver/CMakeLists.txt index 9d99ac7f0c5..5b854018d47 100644 --- a/src/server/bnetserver/CMakeLists.txt +++ b/src/server/bnetserver/CMakeLists.txt @@ -45,6 +45,7 @@ endif() include_directories( ${CMAKE_BINARY_DIR} + ${CMAKE_SOURCE_DIR}/dep/zmqpp ${CMAKE_SOURCE_DIR}/src/server/shared ${CMAKE_SOURCE_DIR}/src/server/shared/Configuration ${CMAKE_SOURCE_DIR}/src/server/shared/Database @@ -56,6 +57,7 @@ include_directories( ${CMAKE_SOURCE_DIR}/src/server/shared/Networking ${CMAKE_SOURCE_DIR}/src/server/shared/Threading ${CMAKE_SOURCE_DIR}/src/server/shared/Utilities + ${CMAKE_SOURCE_DIR}/src/server/ipc ${CMAKE_CURRENT_SOURCE_DIR} ${CMAKE_CURRENT_SOURCE_DIR}/Authentication ${CMAKE_CURRENT_SOURCE_DIR}/Realms @@ -64,6 +66,7 @@ include_directories( ${MYSQL_INCLUDE_DIR} ${OPENSSL_INCLUDE_DIR} ${VALGRIND_INCLUDE_DIR} + ${ZMQ_INCLUDE_DIR} ) add_executable(bnetserver @@ -80,9 +83,12 @@ if( NOT WIN32 ) endif() target_link_libraries(bnetserver + ipc shared + zmqpp ${MYSQL_LIBRARY} ${OPENSSL_LIBRARIES} + ${ZMQ_LIBRARY} ${CMAKE_THREAD_LIBS_INIT} ${Boost_LIBRARIES} ) diff --git a/src/server/bnetserver/Main.cpp b/src/server/bnetserver/Main.cpp index ff891304bd3..ce90019c011 100644 --- a/src/server/bnetserver/Main.cpp +++ b/src/server/bnetserver/Main.cpp @@ -35,6 +35,7 @@ #include "RealmList.h" #include "SystemConfig.h" #include "Util.h" +#include "ZmqContext.h" #include <cstdlib> #include <iostream> #include <boost/date_time/posix_time/posix_time.hpp> @@ -94,12 +95,21 @@ int main(int argc, char** argv) } } + int32 worldListenPort = sConfigMgr->GetIntDefault("WorldserverListenPort", 1118); + if (worldListenPort < 0 || worldListenPort > 0xFFFF) + { + TC_LOG_ERROR("server.bnetserver", "Specified worldserver listen port (%d) out of allowed range (1-65535)", worldListenPort); + return 1; + } + // Initialize the database connection if (!StartDB()) return 1; + sIpcContext->Initialize(); + // Get the list of realms for the server - sRealmList->Initialize(_ioService, sConfigMgr->GetIntDefault("RealmsStateUpdateDelay", 10)); + sRealmList->Initialize(_ioService, sConfigMgr->GetIntDefault("RealmsStateUpdateDelay", 10), worldListenPort); // Start the listening port (acceptor) for auth connections int32 bnport = sConfigMgr->GetIntDefault("BattlenetPort", 1119); @@ -135,6 +145,10 @@ int main(int argc, char** argv) // Start the io service worker loop _ioService.run(); + sIpcContext->Close(); + + sRealmList->Close(); + // Close the Database Pool and library StopDB(); diff --git a/src/server/bnetserver/Packets/BitStream.h b/src/server/bnetserver/Packets/BitStream.h index 952ec5a39e2..54c61ab3bbf 100644 --- a/src/server/bnetserver/Packets/BitStream.h +++ b/src/server/bnetserver/Packets/BitStream.h @@ -212,14 +212,6 @@ namespace Battlenet } } - void SetReadPos(uint32 bits) - { - if (bits > _writePos) - throw BitStreamPositionException(true, bits, 0, _writePos); - - _readPos = bits; - } - bool IsRead() const { return _readPos >= _writePos; } uint8* GetBuffer() { return _buffer.data(); } @@ -227,6 +219,10 @@ namespace Battlenet size_t GetSize() const { return ((_writePos + 7) & ~7) / 8; } + // These methods are meant to only be used when their corresponding actions in the client ignore the value completely + void ReadSkip(uint32 bitCount) { _readPos += bitCount; } + void WriteSkip(uint32 bitCount) { Write(0, bitCount); } + private: uint32 _writePos; uint32 _readPos; diff --git a/src/server/bnetserver/Packets/FriendsPackets.cpp b/src/server/bnetserver/Packets/FriendsPackets.cpp index 14c5fb3a665..2659ec6204f 100644 --- a/src/server/bnetserver/Packets/FriendsPackets.cpp +++ b/src/server/bnetserver/Packets/FriendsPackets.cpp @@ -132,7 +132,7 @@ std::string Battlenet::Friends::SocialNetworkCheckConnectedResult::ToString() co void Battlenet::Friends::SocialNetworkCheckConnectedResult::Write() { - _stream.Write(0, 23); // Ignored + _stream.WriteSkip(23); _stream.Write(Result, 16); _stream.Write(SocialNetworkId, 32); } diff --git a/src/server/bnetserver/Packets/PacketManager.cpp b/src/server/bnetserver/Packets/PacketManager.cpp index 019cf48ac30..e18f3dc5ffb 100644 --- a/src/server/bnetserver/Packets/PacketManager.cpp +++ b/src/server/bnetserver/Packets/PacketManager.cpp @@ -115,8 +115,8 @@ void Battlenet::PacketManager::RegisterWoWRealmPackets() REGISTER_SERVER_PACKET(PacketHeader(WoWRealm::SMSG_LIST_SUBSCRIBE_RESPONSE, WOWREALM), WoWRealm::ListSubscribeResponse); REGISTER_SERVER_PACKET(PacketHeader(WoWRealm::SMSG_LIST_UPDATE, WOWREALM), WoWRealm::ListUpdate); REGISTER_SERVER_PACKET(PacketHeader(WoWRealm::SMSG_LIST_COMPLETE, WOWREALM), WoWRealm::ListComplete); - REGISTER_SERVER_PACKET_NAME(PacketHeader(WoWRealm::SMSG_TOON_READY, WOWREALM), "WoWRealm::ToonReady"); - REGISTER_SERVER_PACKET_NAME(PacketHeader(WoWRealm::SMSG_TOON_LOGGED_OUT, WOWREALM), "WoWRealm::ToonLoggedOut"); + REGISTER_SERVER_PACKET(PacketHeader(WoWRealm::SMSG_TOON_READY, WOWREALM), WoWRealm::ToonReady); + REGISTER_SERVER_PACKET(PacketHeader(WoWRealm::SMSG_TOON_LOGGED_OUT, WOWREALM), WoWRealm::ToonLoggedOut); REGISTER_SERVER_PACKET(PacketHeader(WoWRealm::SMSG_JOIN_RESPONSE_V2, WOWREALM), WoWRealm::JoinResponseV2); } diff --git a/src/server/bnetserver/Packets/WoWRealmPackets.cpp b/src/server/bnetserver/Packets/WoWRealmPackets.cpp index beeeb14895c..986152cbccb 100644 --- a/src/server/bnetserver/Packets/WoWRealmPackets.cpp +++ b/src/server/bnetserver/Packets/WoWRealmPackets.cpp @@ -40,6 +40,26 @@ void Battlenet::WoWRealm::ListUnsubscribe::CallHandler(Session* session) session->HandleListUnsubscribe(*this); } +void Battlenet::WoWRealm::JoinRequestV2::Read() +{ + Realm.Battlegroup = _stream.Read<uint8>(8); + Realm.Index = _stream.Read<uint32>(32); + Realm.Region = _stream.Read<uint8>(8); + ClientSeed = _stream.Read<uint32>(32); +} + +std::string Battlenet::WoWRealm::JoinRequestV2::ToString() const +{ + std::ostringstream stream; + stream << "Battlenet::WoWRealm::JoinRequestV2 ClientSeed " << ClientSeed << " Region " << uint32(Realm.Region) << " Battlegroup " << uint32(Realm.Battlegroup) << " Index " << Realm.Index; + return stream.str().c_str(); +} + +void Battlenet::WoWRealm::JoinRequestV2::CallHandler(Session* session) +{ + session->HandleJoinRequestV2(*this); +} + Battlenet::WoWRealm::ListSubscribeResponse::~ListSubscribeResponse() { for (ServerPacket* realmData : RealmData) @@ -143,24 +163,32 @@ std::string Battlenet::WoWRealm::ListUpdate::ToString() const return stream.str().c_str(); } -void Battlenet::WoWRealm::JoinRequestV2::Read() +void Battlenet::WoWRealm::ToonReady::Write() { - Realm.Battlegroup = _stream.Read<uint8>(8); - Realm.Index = _stream.Read<uint32>(32); - Realm.Region = _stream.Read<uint8>(8); - ClientSeed = _stream.Read<uint32>(32); + _stream.Write(Realm.Region, 8); + _stream.WriteFourCC(Game); + uint32 realmAddress = ((Realm.Battlegroup << 16) & 0xFF0000) | uint16(Realm.Index); + _stream.Write(realmAddress, 32); + _stream.WriteString(Name, 7, -2); + _stream.WriteSkip(7); + _stream.Write(Guid, 64); + _stream.WriteFourCC(Game); + _stream.Write(Realm.Region, 8); + _stream.WriteSkip(21); + _stream.Write(realmAddress, 32); + _stream.WriteSkip(9); + _stream.Write(0, 64); // Unknown + _stream.Write(0, 32); // Unknown } -std::string Battlenet::WoWRealm::JoinRequestV2::ToString() const +std::string Battlenet::WoWRealm::ToonReady::ToString() const { std::ostringstream stream; - stream << "Battlenet::WoWRealm::JoinRequestV2 ClientSeed " << ClientSeed << " Region " << uint32(Realm.Region) << " Battlegroup " << uint32(Realm.Battlegroup) << " Index " << Realm.Index; - return stream.str().c_str(); -} + stream << "Battlenet::WoWRealm::ToonReady" << " Game: " << Game + << ", Region: " << uint32(Realm.Region) << ", Battlegroup: " << uint32(Realm.Battlegroup) << ", Index: " << Realm.Index + << ", Guid: " << Guid << ", Name: " << Name; -void Battlenet::WoWRealm::JoinRequestV2::CallHandler(Session* session) -{ - session->HandleJoinRequestV2(*this); + return stream.str().c_str(); } void Battlenet::WoWRealm::JoinResponseV2::Write() diff --git a/src/server/bnetserver/Packets/WoWRealmPackets.h b/src/server/bnetserver/Packets/WoWRealmPackets.h index 2b1390a9067..b411c63100a 100644 --- a/src/server/bnetserver/Packets/WoWRealmPackets.h +++ b/src/server/bnetserver/Packets/WoWRealmPackets.h @@ -153,6 +153,33 @@ namespace Battlenet std::string ToString() const override { return "Battlenet::WoWRealm::ListComplete"; } }; + class ToonReady final : public ServerPacket + { + public: + ToonReady() : ServerPacket(PacketHeader(SMSG_TOON_READY, WOWREALM)), Game("WoW"), Guid(0) + { + } + + void Write() override; + std::string ToString() const override; + + std::string Game; + RealmId Realm; + uint64 Guid; + std::string Name; + }; + + class ToonLoggedOut final : public ServerPacket + { + public: + ToonLoggedOut() : ServerPacket(PacketHeader(SMSG_TOON_LOGGED_OUT, WOWREALM)) + { + } + + void Write() override { } + std::string ToString() const override { return "Battlenet::WoWRealm::ToonLoggedOut"; } + }; + class JoinResponseV2 final : public ServerPacket { public: diff --git a/src/server/bnetserver/Realms/RealmList.cpp b/src/server/bnetserver/Realms/RealmList.cpp index 2bf93e12cb3..cc7e1d492a8 100644 --- a/src/server/bnetserver/Realms/RealmList.cpp +++ b/src/server/bnetserver/Realms/RealmList.cpp @@ -16,12 +16,21 @@ * with this program. If not, see <http://www.gnu.org/licenses/>. */ -#include <boost/asio/ip/tcp.hpp> #include "Common.h" #include "Database/DatabaseEnv.h" #include "SessionManager.h" #include "Util.h" +#include "Commands.h" #include "RealmList.h" +#include <boost/asio/ip/tcp.hpp> + +Battlenet::RealmId& Battlenet::RealmId::operator=(Battlenet::RealmHandle const& handle) +{ + Region = handle.Region; + Battlegroup = handle.Battlegroup; + Index = handle.Index; + return *this; +} ip::tcp::endpoint Realm::GetAddressForClient(ip::address const& clientAddr) const { @@ -58,7 +67,7 @@ ip::tcp::endpoint Realm::GetAddressForClient(ip::address const& clientAddr) cons return endpoint; } -RealmList::RealmList() : _updateInterval(0), _updateTimer(nullptr), _resolver(nullptr) +RealmList::RealmList() : _updateInterval(0), _updateTimer(nullptr), _resolver(nullptr), _worldListener(nullptr) { } @@ -66,10 +75,11 @@ RealmList::~RealmList() { delete _updateTimer; delete _resolver; + delete _worldListener; } // Load the realm list from the database -void RealmList::Initialize(boost::asio::io_service& ioService, uint32 updateInterval) +void RealmList::Initialize(boost::asio::io_service& ioService, uint32 updateInterval, uint16 worldListenPort) { _updateInterval = updateInterval; _updateTimer = new boost::asio::deadline_timer(ioService); @@ -77,6 +87,14 @@ void RealmList::Initialize(boost::asio::io_service& ioService, uint32 updateInte // Get the content of the realmlist table in the database UpdateRealms(boost::system::error_code()); + + _worldListener = new WorldListener(worldListenPort); + _worldListener->Start(); +} + +void RealmList::Close() +{ + _worldListener->End(); } template<typename FieldType> diff --git a/src/server/bnetserver/Realms/RealmList.h b/src/server/bnetserver/Realms/RealmList.h index a4d3d77ff56..dc78a00dfdd 100644 --- a/src/server/bnetserver/Realms/RealmList.h +++ b/src/server/bnetserver/Realms/RealmList.h @@ -19,11 +19,12 @@ #ifndef _REALMLIST_H #define _REALMLIST_H +#include "Common.h" +#include "WorldListener.h" #include <boost/asio/ip/address.hpp> #include <boost/asio/ip/tcp.hpp> #include <boost/asio/io_service.hpp> #include <boost/asio/deadline_timer.hpp> -#include "Common.h" using namespace boost::asio; @@ -44,6 +45,8 @@ enum RealmFlags namespace Battlenet { + struct RealmHandle; + struct RealmId { RealmId() : Region(0), Battlegroup(0), Index(0), Build(0) { } @@ -59,6 +62,8 @@ namespace Battlenet { return memcmp(this, &r, sizeof(RealmId) - sizeof(Build)) < 0; } + + RealmId& operator=(RealmHandle const& handle); }; } @@ -98,7 +103,8 @@ public: ~RealmList(); - void Initialize(boost::asio::io_service& ioService, uint32 updateInterval); + void Initialize(boost::asio::io_service& ioService, uint32 updateInterval, uint16 worldListenPort); + void Close(); RealmMap const& GetRealms() const { return _realms; } Realm const* GetRealm(Battlenet::RealmId const& id) const; @@ -114,6 +120,7 @@ private: uint32 _updateInterval; boost::asio::deadline_timer* _updateTimer; boost::asio::ip::tcp::resolver* _resolver; + WorldListener* _worldListener; }; #define sRealmList RealmList::instance() diff --git a/src/server/bnetserver/Realms/WorldListener.cpp b/src/server/bnetserver/Realms/WorldListener.cpp new file mode 100644 index 00000000000..30886a67310 --- /dev/null +++ b/src/server/bnetserver/Realms/WorldListener.cpp @@ -0,0 +1,109 @@ +/* + * Copyright (C) 2008-2014 TrinityCore <http://www.trinitycore.org/> + * + * This program is free software; you can redistribute it and/or modify it + * under the terms of the GNU General Public License as published by the + * Free Software Foundation; either version 2 of the License, or (at your + * option) any later version. + * + * This program is distributed in the hope that it will be useful, but WITHOUT + * ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or + * FITNESS FOR A PARTICULAR PURPOSE. See the GNU General Public License for + * more details. + * + * You should have received a copy of the GNU General Public License along + * with this program. If not, see <http://www.gnu.org/licenses/>. + */ + +#include "Log.h" +#include "SessionManager.h" +#include "WoWRealmPackets.h" +#include "ZmqContext.h" +#include "WorldListener.h" + +WorldListener::HandlerTable const WorldListener::_handlers; + +WorldListener::HandlerTable::HandlerTable() +{ +#define DEFINE_HANDLER(opc, func) _handlers[opc] = { func, #opc } + + DEFINE_HANDLER(BNET_CHANGE_TOON_ONLINE_STATE, &WorldListener::HandleToonOnlineStatusChange); + +#undef DEFINE_HANDLER +} + +WorldListener::WorldListener(uint16 worldListenPort) : _worldListenPort(worldListenPort) +{ + _worldSocket = sIpcContext->CreateNewSocket(zmqpp::socket_type::pull); +} + +WorldListener::~WorldListener() +{ + delete _worldSocket; +} + +void WorldListener::Run() +{ + while (!ProcessExit()) + { + _poller->poll(); + if (_poller->events(*_worldSocket) & zmqpp::poller::poll_in) + { + int32 op1; + do + { + zmqpp::message msg; + _worldSocket->receive(msg); + Dispatch(msg); + _worldSocket->get(zmqpp::socket_option::events, op1); + } while (op1 & zmqpp::poller::poll_in); + } + } +} + +void WorldListener::HandleOpen() +{ + _worldSocket->bind(std::string("tcp://*:") + std::to_string(_worldListenPort)); + _poller->add(*_worldSocket); + TC_LOG_INFO("server.ipc", "Listening on connections from worldservers..."); +} + +void WorldListener::HandleClose() +{ + _worldSocket->close(); + TC_LOG_INFO("server.ipc", "Shutting down connections from worldservers..."); +} + +void WorldListener::Dispatch(zmqpp::message& msg) const +{ + Battlenet::Header ipcHeader; + msg >> ipcHeader; + + if (ipcHeader.Ipc.Channel != IPC_CHANNEL_BNET) + return; + + if (ipcHeader.Ipc.Command < IPC_BNET_MAX_COMMAND) + (this->*_handlers[ipcHeader.Ipc.Command].Handler)(ipcHeader.Realm, msg); +} + +void WorldListener::HandleToonOnlineStatusChange(Battlenet::RealmHandle const& realm, zmqpp::message& msg) const +{ + Battlenet::ToonHandle toonHandle; + bool online; + msg >> toonHandle; + msg >> online; + + if (Battlenet::Session* session = sSessionMgr.GetSession(toonHandle.AccountId, toonHandle.GameAccountId)) + { + if (online) + { + Battlenet::WoWRealm::ToonReady* toonReady = new Battlenet::WoWRealm::ToonReady(); + toonReady->Realm = realm; + toonReady->Guid = toonHandle.Guid; + toonReady->Name = toonHandle.Name; + session->AsyncWrite(toonReady); + } + else + session->AsyncWrite(new Battlenet::WoWRealm::ToonLoggedOut()); + } +} diff --git a/src/server/bnetserver/Realms/WorldListener.h b/src/server/bnetserver/Realms/WorldListener.h new file mode 100644 index 00000000000..04d5342449c --- /dev/null +++ b/src/server/bnetserver/Realms/WorldListener.h @@ -0,0 +1,63 @@ +/* + * Copyright (C) 2008-2014 TrinityCore <http://www.trinitycore.org/> + * + * This program is free software; you can redistribute it and/or modify it + * under the terms of the GNU General Public License as published by the + * Free Software Foundation; either version 2 of the License, or (at your + * option) any later version. + * + * This program is distributed in the hope that it will be useful, but WITHOUT + * ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or + * FITNESS FOR A PARTICULAR PURPOSE. See the GNU General Public License for + * more details. + * + * You should have received a copy of the GNU General Public License along + * with this program. If not, see <http://www.gnu.org/licenses/>. + */ + +#ifndef WorldListener_h__ +#define WorldListener_h__ + +#include "ZMQTask.h" +#include "Commands.h" + +class WorldListener : public ZMQTask +{ +public: + explicit WorldListener(uint16 worldListenPort); + ~WorldListener(); + void Run() override; + +protected: + void HandleOpen() override; + void HandleClose() override; + +private: + void Dispatch(zmqpp::message& msg) const; + + typedef void(WorldListener::*PacketHandler)(Battlenet::RealmHandle const& realm, zmqpp::message& msg) const; + class HandlerTable + { + public: + HandlerTable(); + + struct HandlerInfo + { + PacketHandler Handler; + char const* Name; + }; + + HandlerInfo const& operator[](uint8 opcode) const { return _handlers[opcode]; } + + private: + HandlerInfo _handlers[IPC_BNET_MAX_COMMAND]; + }; + + void HandleToonOnlineStatusChange(Battlenet::RealmHandle const& realm, zmqpp::message& msg) const; + + zmqpp::socket* _worldSocket; + uint16 _worldListenPort; + static HandlerTable const _handlers; +}; + +#endif // WorldListener_h__ diff --git a/src/server/bnetserver/Server/Session.h b/src/server/bnetserver/Server/Session.h index a2c587841a4..c932115a04b 100644 --- a/src/server/bnetserver/Server/Session.h +++ b/src/server/bnetserver/Server/Session.h @@ -93,9 +93,9 @@ namespace Battlenet bool IsSubscribedToRealmListUpdates() const { return _subscribedToRealmListUpdates; } - protected: void AsyncWrite(ServerPacket* packet); + protected: void ReadHandler() override; private: diff --git a/src/server/bnetserver/Server/SessionManager.cpp b/src/server/bnetserver/Server/SessionManager.cpp index 8dcee55ec6c..9e5836dab8d 100644 --- a/src/server/bnetserver/Server/SessionManager.cpp +++ b/src/server/bnetserver/Server/SessionManager.cpp @@ -49,3 +49,22 @@ void Battlenet::SessionManager::RemoveSession(Session* session) _sessions.erase({ session->GetAccountId(), session->GetGameAccountId() }); _sessionsByAccountId[session->GetAccountId()].remove(session); } + +Battlenet::Session* Battlenet::SessionManager::GetSession(uint32 accountId, uint32 gameAccountId) const +{ + auto itr = _sessions.find({ accountId, gameAccountId }); + if (itr != _sessions.end()) + return itr->second; + + return nullptr; +} + +std::list<Battlenet::Session*> Battlenet::SessionManager::GetSessions(uint32 accountId) const +{ + std::list<Session*> sessions; + auto itr = _sessionsByAccountId.find(accountId); + if (itr != _sessionsByAccountId.end()) + sessions = itr->second; + + return sessions; +} diff --git a/src/server/bnetserver/Server/SessionManager.h b/src/server/bnetserver/Server/SessionManager.h index bbd78c052d2..08ca5ce2b4e 100644 --- a/src/server/bnetserver/Server/SessionManager.h +++ b/src/server/bnetserver/Server/SessionManager.h @@ -60,6 +60,9 @@ namespace Battlenet void RemoveSession(Session* /*session*/); + Session* GetSession(uint32 accountId, uint32 gameAccountId) const; + std::list<Session*> GetSessions(uint32 accountId) const; + template<typename Iterator> void LockedForEach(Iterator iterator) { diff --git a/src/server/bnetserver/bnetserver.conf.dist b/src/server/bnetserver/bnetserver.conf.dist index 84456c117b3..102ddb9906a 100644 --- a/src/server/bnetserver/bnetserver.conf.dist +++ b/src/server/bnetserver/bnetserver.conf.dist @@ -47,6 +47,13 @@ LogsDir = "" MaxPingTime = 30 # +# WorldserverListenPort +# Description: TCP port to listen on for incoming worldserver IPC. +# Default: 1118 + +WorldserverListenPort = 1118 + +# # BattlenetPort # Description: TCP port to reach the auth server for battle.net connections. # Default: 1119 diff --git a/src/server/collision/CMakeLists.txt b/src/server/collision/CMakeLists.txt index 1c5fcbee52e..3aac255be29 100644 --- a/src/server/collision/CMakeLists.txt +++ b/src/server/collision/CMakeLists.txt @@ -47,6 +47,7 @@ include_directories( ${CMAKE_SOURCE_DIR}/src/server/shared/Packets ${CMAKE_SOURCE_DIR}/src/server/shared/Utilities ${CMAKE_SOURCE_DIR}/src/server/shared/DataStores + ${CMAKE_SOURCE_DIR}/src/server/ipc ${CMAKE_SOURCE_DIR}/src/server/game/Addons ${CMAKE_SOURCE_DIR}/src/server/game/Conditions ${CMAKE_SOURCE_DIR}/src/server/game/Entities/Item diff --git a/src/server/game/AuctionHouse/AuctionHouseMgr.cpp b/src/server/game/AuctionHouse/AuctionHouseMgr.cpp index 79512e57fe3..6550bccc06d 100644 --- a/src/server/game/AuctionHouse/AuctionHouseMgr.cpp +++ b/src/server/game/AuctionHouse/AuctionHouseMgr.cpp @@ -107,7 +107,7 @@ void AuctionHouseMgr::SendAuctionWonMail(AuctionEntry* auction, SQLTransaction& else { bidderAccId = sObjectMgr->GetPlayerAccountIdByGUID(bidderGuid); - logGmTrade = AccountMgr::HasPermission(bidderAccId, rbac::RBAC_PERM_LOG_GM_TRADE, realmID); + logGmTrade = AccountMgr::HasPermission(bidderAccId, rbac::RBAC_PERM_LOG_GM_TRADE, realmHandle.Index); if (logGmTrade && !sObjectMgr->GetPlayerNameByGUID(bidderGuid, bidderName)) bidderName = sObjectMgr->GetTrinityStringForDBCLocale(LANG_UNKNOWN); diff --git a/src/server/game/CMakeLists.txt b/src/server/game/CMakeLists.txt index 0d1b460500e..27b3ea2c381 100644 --- a/src/server/game/CMakeLists.txt +++ b/src/server/game/CMakeLists.txt @@ -111,6 +111,7 @@ include_directories( ${CMAKE_SOURCE_DIR}/dep/g3dlite/include ${CMAKE_SOURCE_DIR}/dep/SFMT ${CMAKE_SOURCE_DIR}/dep/zlib + ${CMAKE_SOURCE_DIR}/dep/zmqpp ${CMAKE_SOURCE_DIR}/src/server/collision ${CMAKE_SOURCE_DIR}/src/server/collision/Management ${CMAKE_SOURCE_DIR}/src/server/collision/Models @@ -129,6 +130,7 @@ include_directories( ${CMAKE_SOURCE_DIR}/src/server/shared/Packets ${CMAKE_SOURCE_DIR}/src/server/shared/Threading ${CMAKE_SOURCE_DIR}/src/server/shared/Utilities + ${CMAKE_SOURCE_DIR}/src/server/ipc ${CMAKE_CURRENT_SOURCE_DIR} ${CMAKE_CURRENT_SOURCE_DIR}/Accounts ${CMAKE_CURRENT_SOURCE_DIR}/Achievements @@ -205,6 +207,7 @@ include_directories( ${MYSQL_INCLUDE_DIR} ${OPENSSL_INCLUDE_DIR} ${VALGRIND_INCLUDE_DIR} + ${ZMQ_INCLUDE_DIR} ) add_library(game STATIC diff --git a/src/server/game/Chat/Chat.cpp b/src/server/game/Chat/Chat.cpp index ace13989efb..6dfd25495d2 100644 --- a/src/server/game/Chat/Chat.cpp +++ b/src/server/game/Chat/Chat.cpp @@ -160,7 +160,7 @@ bool ChatHandler::HasLowerSecurityAccount(WorldSession* target, uint32 target_ac if (target) target_sec = target->GetSecurity(); else if (target_account) - target_sec = AccountMgr::GetSecurity(target_account, realmID); + target_sec = AccountMgr::GetSecurity(target_account, realmHandle.Index); else return true; // caller must report error for (target == NULL && target_account == 0) diff --git a/src/server/game/Handlers/CharacterHandler.cpp b/src/server/game/Handlers/CharacterHandler.cpp index 2cae8427161..bef7aada6e9 100644 --- a/src/server/game/Handlers/CharacterHandler.cpp +++ b/src/server/game/Handlers/CharacterHandler.cpp @@ -47,6 +47,7 @@ #include "World.h" #include "WorldPacket.h" #include "WorldSession.h" +#include "BattlenetServerManager.h" class LoginQueryHolder : public SQLQueryHolder { @@ -689,13 +690,13 @@ void WorldSession::HandleCharCreateCallback(PreparedQueryResult result, Characte PreparedStatement* stmt = LoginDatabase.GetPreparedStatement(LOGIN_DEL_REALM_CHARACTERS_BY_REALM); stmt->setUInt32(0, GetAccountId()); - stmt->setUInt32(1, realmID); + stmt->setUInt32(1, realmHandle.Index); trans->Append(stmt); stmt = LoginDatabase.GetPreparedStatement(LOGIN_INS_REALM_CHARACTERS); stmt->setUInt32(0, createInfo->CharCount); stmt->setUInt32(1, GetAccountId()); - stmt->setUInt32(2, realmID); + stmt->setUInt32(2, realmHandle.Index); trans->Append(stmt); LoginDatabase.CommitTransaction(trans); @@ -1139,6 +1140,8 @@ void WorldSession::HandlePlayerLogin(LoginQueryHolder* holder) sScriptMgr->OnPlayerLogin(pCurrChar, firstLogin); + sBattlenetServer.SendChangeToonOnlineState(GetBattlenetAccountId(), GetAccountId(), _player->GetGUID(), _player->GetName(), true); + delete holder; } diff --git a/src/server/game/Handlers/MiscHandler.cpp b/src/server/game/Handlers/MiscHandler.cpp index 8d11b9bf41d..3f7cf24206d 100644 --- a/src/server/game/Handlers/MiscHandler.cpp +++ b/src/server/game/Handlers/MiscHandler.cpp @@ -609,7 +609,7 @@ void WorldSession::HandleAddFriendOpcodeCallBack(PreparedQueryResult result, std team = Player::TeamForRace(fields[1].GetUInt8()); friendAccountId = fields[2].GetUInt32(); - if (HasPermission(rbac::RBAC_PERM_ALLOW_GM_FRIEND) || AccountMgr::IsPlayerAccount(AccountMgr::GetSecurity(friendAccountId, realmID))) + if (HasPermission(rbac::RBAC_PERM_ALLOW_GM_FRIEND) || AccountMgr::IsPlayerAccount(AccountMgr::GetSecurity(friendAccountId, realmHandle.Index))) { if (friendGuid) { diff --git a/src/server/game/Server/BattlenetServerManager.cpp b/src/server/game/Server/BattlenetServerManager.cpp new file mode 100644 index 00000000000..b267926c6ff --- /dev/null +++ b/src/server/game/Server/BattlenetServerManager.cpp @@ -0,0 +1,66 @@ +/* + * Copyright (C) 2008-2014 TrinityCore <http://www.trinitycore.org/> + * + * This program is free software; you can redistribute it and/or modify it + * under the terms of the GNU General Public License as published by the + * Free Software Foundation; either version 2 of the License, or (at your + * option) any later version. + * + * This program is distributed in the hope that it will be useful, but WITHOUT + * ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or + * FITNESS FOR A PARTICULAR PURPOSE. See the GNU General Public License for + * more details. + * + * You should have received a copy of the GNU General Public License along + * with this program. If not, see <http://www.gnu.org/licenses/>. + */ + +#include "Config.h" +#include "World.h" +#include "ZmqContext.h" +#include "BattlenetServerManager.h" + +void Battlenet::ServerManager::InitializeConnection() +{ + std::string bnetserverAddress = sConfigMgr->GetStringDefault("BnetServer.Address", "127.0.0.1"); + int32 bnetserverPort = sConfigMgr->GetIntDefault("BnetServer.Port", 1118); + _socket = new ZmqMux("inproc://bnetmgr", "tcp://" + bnetserverAddress + ":" + std::to_string(bnetserverPort)); + _socket->Start(); +} + +void Battlenet::ServerManager::CloseConnection() +{ + _socket->End(); + delete _socket; + _socket = nullptr; +} + +Battlenet::Header Battlenet::ServerManager::CreateHeader(BnetCommands command) +{ + Header header; + header.Ipc.Channel = IPC_CHANNEL_BNET; + header.Ipc.Command = command; + header.Realm = realmHandle; + return header; +} + +void Battlenet::ServerManager::SendChangeToonOnlineState(uint32 battlenetAccountId, uint32 gameAccountId, ObjectGuid guid, std::string const& name, bool online) +{ + // Do nothing for Grunt login + if (!battlenetAccountId) + return; + + Header header = CreateHeader(BNET_CHANGE_TOON_ONLINE_STATE); + ToonHandle toon; + toon.AccountId = battlenetAccountId; + toon.GameAccountId = gameAccountId; + toon.Guid = guid; + toon.Name = name; + + zmqpp::message msg; + msg << header; + msg << toon; + msg << online; + + _socket->Send(&msg); +} diff --git a/src/server/game/Server/BattlenetServerManager.h b/src/server/game/Server/BattlenetServerManager.h new file mode 100644 index 00000000000..fe103a1c981 --- /dev/null +++ b/src/server/game/Server/BattlenetServerManager.h @@ -0,0 +1,55 @@ +/* + * Copyright (C) 2008-2014 TrinityCore <http://www.trinitycore.org/> + * + * This program is free software; you can redistribute it and/or modify it + * under the terms of the GNU General Public License as published by the + * Free Software Foundation; either version 2 of the License, or (at your + * option) any later version. + * + * This program is distributed in the hope that it will be useful, but WITHOUT + * ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or + * FITNESS FOR A PARTICULAR PURPOSE. See the GNU General Public License for + * more details. + * + * You should have received a copy of the GNU General Public License along + * with this program. If not, see <http://www.gnu.org/licenses/>. + */ + +#ifndef BattlenetMgr_h__ +#define BattlenetMgr_h__ + +#include "ZmqMux.h" +#include "Commands.h" + +namespace zmqpp +{ + class socket; +} + +namespace Battlenet +{ + class ServerManager + { + ServerManager() : _socket(nullptr) { } + + public: + void InitializeConnection(); + void CloseConnection(); + + static ServerManager& Instance() + { + static ServerManager instance; + return instance; + } + + void SendChangeToonOnlineState(uint32 battlenetAccountId, uint32 gameAccountId, ObjectGuid guid, std::string const& name, bool online); + + private: + static Header CreateHeader(BnetCommands command); + ZmqMux* _socket; + }; +} + +#define sBattlenetServer Battlenet::ServerManager::Instance() + +#endif // BattlenetMgr_h__ diff --git a/src/server/game/Server/WorldSession.cpp b/src/server/game/Server/WorldSession.cpp index 4c183939460..0fd46f0d20b 100644 --- a/src/server/game/Server/WorldSession.cpp +++ b/src/server/game/Server/WorldSession.cpp @@ -47,6 +47,7 @@ #include "Transport.h" #include "WardenWin.h" #include "WardenMac.h" +#include "BattlenetServerManager.h" namespace { @@ -582,6 +583,9 @@ void WorldSession::LogoutPlayer(bool save) _player->CleanupsBeforeDelete(); TC_LOG_INFO("entities.player.character", "Account: %d (IP: %s) Logout Character:[%s] (GUID: %u) Level: %d", GetAccountId(), GetRemoteAddress().c_str(), _player->GetName().c_str(), _player->GetGUIDLow(), _player->getLevel()); + + sBattlenetServer.SendChangeToonOnlineState(GetBattlenetAccountId(), GetAccountId(), _player->GetGUID(), _player->GetName(), false); + if (Map* _map = _player->FindMap()) _map->RemovePlayerFromMap(_player, true); @@ -1143,11 +1147,11 @@ void WorldSession::LoadPermissions() AccountMgr::GetName(id, name); uint8 secLevel = GetSecurity(); - _RBACData = new rbac::RBACData(id, name, realmID, secLevel); + _RBACData = new rbac::RBACData(id, name, realmHandle.Index, secLevel); _RBACData->LoadFromDB(); TC_LOG_DEBUG("rbac", "WorldSession::LoadPermissions [AccountId: %u, Name: %s, realmId: %d, secLevel: %u]", - id, name.c_str(), realmID, secLevel); + id, name.c_str(), realmHandle.Index, secLevel); } rbac::RBACData* WorldSession::GetRBACData() @@ -1162,7 +1166,7 @@ bool WorldSession::HasPermission(uint32 permission) bool hasPermission = _RBACData->HasPermission(permission); TC_LOG_DEBUG("rbac", "WorldSession::HasPermission [AccountId: %u, Name: %s, realmId: %d]", - _RBACData->GetId(), _RBACData->GetName().c_str(), realmID); + _RBACData->GetId(), _RBACData->GetName().c_str(), realmHandle.Index); return hasPermission; } @@ -1170,7 +1174,7 @@ bool WorldSession::HasPermission(uint32 permission) void WorldSession::InvalidateRBACData() { TC_LOG_DEBUG("rbac", "WorldSession::Invalidaterbac::RBACData [AccountId: %u, Name: %s, realmId: %d]", - _RBACData->GetId(), _RBACData->GetName().c_str(), realmID); + _RBACData->GetId(), _RBACData->GetName().c_str(), realmHandle.Index); delete _RBACData; _RBACData = NULL; } diff --git a/src/server/game/Server/WorldSocket.cpp b/src/server/game/Server/WorldSocket.cpp index bff343bdcb1..7933ddfeb4d 100644 --- a/src/server/game/Server/WorldSocket.cpp +++ b/src/server/game/Server/WorldSocket.cpp @@ -406,7 +406,7 @@ void WorldSocket::HandleAuthSession(WorldPacket& recvPacket) return; } - if (realmIndex != realmID) + if (realmIndex != realmHandle.Index) { SendAuthResponseError(REALM_LIST_REALM_NOT_FOUND); TC_LOG_ERROR("network", "WorldSocket::HandleAuthSession: Sent Auth Response (bad realm)."); @@ -485,7 +485,7 @@ void WorldSocket::HandleAuthSession(WorldPacket& recvPacket) stmt = LoginDatabase.GetPreparedStatement(LOGIN_GET_GMLEVEL_BY_REALMID); stmt->setUInt32(0, id); - stmt->setInt32(1, int32(realmID)); + stmt->setInt32(1, int32(realmHandle.Index)); result = LoginDatabase.Query(stmt); diff --git a/src/server/game/World/World.cpp b/src/server/game/World/World.cpp index 6adb93d4742..4b114255dbc 100644 --- a/src/server/game/World/World.cpp +++ b/src/server/game/World/World.cpp @@ -1375,7 +1375,7 @@ void World::SetInitialWorldSettings() uint32 server_type = IsFFAPvPRealm() ? uint32(REALM_TYPE_PVP) : getIntConfig(CONFIG_GAME_TYPE); uint32 realm_zone = getIntConfig(CONFIG_REALM_ZONE); - LoginDatabase.PExecute("UPDATE realmlist SET icon = %u, timezone = %u WHERE id = '%d'", server_type, realm_zone, realmID); // One-time query + LoginDatabase.PExecute("UPDATE realmlist SET icon = %u, timezone = %u WHERE id = '%d'", server_type, realm_zone, realmHandle.Index); // One-time query ///- Remove the bones (they should not exist in DB though) and old corpses after a restart PreparedStatement* stmt = CharacterDatabase.GetPreparedStatement(CHAR_DEL_OLD_CORPSES); @@ -1797,7 +1797,7 @@ void World::SetInitialWorldSettings() m_startTime = m_gameTime; LoginDatabase.PExecute("INSERT INTO uptime (realmid, starttime, uptime, revision) VALUES(%u, %u, 0, '%s')", - realmID, uint32(m_startTime), _FULLVERSION); // One-time query + realmHandle.Index, uint32(m_startTime), _FULLVERSION); // One-time query m_timers[WUPDATE_WEATHERS].SetInterval(1*IN_MILLISECONDS); m_timers[WUPDATE_AUCTIONS].SetInterval(MINUTE*IN_MILLISECONDS); @@ -2079,7 +2079,7 @@ void World::Update(uint32 diff) stmt->setUInt32(0, tmpDiff); stmt->setUInt16(1, uint16(maxOnlinePlayers)); - stmt->setUInt32(2, realmID); + stmt->setUInt32(2, realmHandle.Index); stmt->setUInt32(3, uint32(m_startTime)); LoginDatabase.Execute(stmt); @@ -2809,13 +2809,13 @@ void World::_UpdateRealmCharCount(PreparedQueryResult resultCharCount) PreparedStatement* stmt = LoginDatabase.GetPreparedStatement(LOGIN_DEL_REALM_CHARACTERS_BY_REALM); stmt->setUInt32(0, accountId); - stmt->setUInt32(1, realmID); + stmt->setUInt32(1, realmHandle.Index); LoginDatabase.Execute(stmt); stmt = LoginDatabase.GetPreparedStatement(LOGIN_INS_REALM_CHARACTERS); stmt->setUInt8(0, charCount); stmt->setUInt32(1, accountId); - stmt->setUInt32(2, realmID); + stmt->setUInt32(2, realmHandle.Index); LoginDatabase.Execute(stmt); } } @@ -2984,7 +2984,7 @@ void World::ResetCurrencyWeekCap() void World::LoadDBAllowedSecurityLevel() { PreparedStatement* stmt = LoginDatabase.GetPreparedStatement(LOGIN_SEL_REALMLIST_SECURITY_LEVEL); - stmt->setInt32(0, int32(realmID)); + stmt->setInt32(0, int32(realmHandle.Index)); PreparedQueryResult result = LoginDatabase.Query(stmt); if (result) diff --git a/src/server/game/World/World.h b/src/server/game/World/World.h index 49d154d8db6..2c74e3929fe 100644 --- a/src/server/game/World/World.h +++ b/src/server/game/World/World.h @@ -24,6 +24,7 @@ #define __WORLD_H #include "Common.h" +#include "Commands.h" #include "ObjectGuid.h" #include "Timer.h" #include "SharedDefines.h" @@ -879,7 +880,7 @@ class World std::deque<std::future<PreparedQueryResult>> m_realmCharCallbacks; }; -extern uint32 realmID; +extern Battlenet::RealmHandle realmHandle; #define sWorld World::instance() #endif diff --git a/src/server/ipc/CMakeLists.txt b/src/server/ipc/CMakeLists.txt new file mode 100644 index 00000000000..93a5d630dfe --- /dev/null +++ b/src/server/ipc/CMakeLists.txt @@ -0,0 +1,24 @@ +# Copyright (C) 2008-2014 TrinityCore <http://www.trinitycore.org/> +# +# This file is free software; as a special exception the author gives +# unlimited permission to copy and/or distribute it, with or without +# modifications, as long as this notice is preserved. +# +# This program is distributed in the hope that it will be useful, but +# WITHOUT ANY WARRANTY, to the extent permitted by law; without even the +# implied warranty of MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. + +file(GLOB_RECURSE sources_ipc *.cpp *.h) + +set(ipc_SRCS + ${sources_ipc} +) + +include_directories( + ${CMAKE_BINARY_DIR} + ${CMAKE_SOURCE_DIR}/dep/zmqpp + ${CMAKE_SOURCE_DIR}/src/server/shared/ + ${ZMQ_INCLUDE_DIR} +) + +add_library(ipc STATIC ${ipc_SRCS}) diff --git a/src/server/ipc/Commands.cpp b/src/server/ipc/Commands.cpp new file mode 100644 index 00000000000..8e494fc34b9 --- /dev/null +++ b/src/server/ipc/Commands.cpp @@ -0,0 +1,81 @@ +/* + * Copyright (C) 2008-2014 TrinityCore <http://www.trinitycore.org/> + * + * This program is free software; you can redistribute it and/or modify it + * under the terms of the GNU General Public License as published by the + * Free Software Foundation; either version 2 of the License, or (at your + * option) any later version. + * + * This program is distributed in the hope that it will be useful, but WITHOUT + * ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or + * FITNESS FOR A PARTICULAR PURPOSE. See the GNU General Public License for + * more details. + * + * You should have received a copy of the GNU General Public License along + * with this program. If not, see <http://www.gnu.org/licenses/>. + */ + +#include "Commands.h" +#include <zmqpp/message.hpp> + +zmqpp::message& operator>>(zmqpp::message& msg, IPCHeader& header) +{ + msg >> header.Channel; + msg >> header.Command; + return msg; +} + +zmqpp::message& operator>>(zmqpp::message& msg, Battlenet::RealmHandle& realm) +{ + msg >> realm.Region; + msg >> realm.Battlegroup; + msg >> realm.Index; + return msg; +} + +zmqpp::message& operator>>(zmqpp::message& msg, Battlenet::Header& header) +{ + msg >> header.Ipc; + msg >> header.Realm; + return msg; +} + +zmqpp::message& operator>>(zmqpp::message& msg, Battlenet::ToonHandle& toonHandle) +{ + msg >> toonHandle.AccountId; + msg >> toonHandle.GameAccountId; + msg >> toonHandle.Guid; + msg >> toonHandle.Name; + return msg; +} + +zmqpp::message& operator<<(zmqpp::message& msg, IPCHeader& header) +{ + msg << header.Channel; + msg << header.Command; + return msg; +} + +zmqpp::message& operator<<(zmqpp::message& msg, Battlenet::RealmHandle& realm) +{ + msg << realm.Region; + msg << realm.Battlegroup; + msg << realm.Index; + return msg; +} + +zmqpp::message& operator<<(zmqpp::message& msg, Battlenet::Header& header) +{ + msg << header.Ipc; + msg << header.Realm; + return msg; +} + +zmqpp::message& operator<<(zmqpp::message& msg, Battlenet::ToonHandle& toonHandle) +{ + msg << toonHandle.AccountId; + msg << toonHandle.GameAccountId; + msg << toonHandle.Guid; + msg << toonHandle.Name; + return msg; +} diff --git a/src/server/ipc/Commands.h b/src/server/ipc/Commands.h new file mode 100644 index 00000000000..05309a45022 --- /dev/null +++ b/src/server/ipc/Commands.h @@ -0,0 +1,83 @@ +/* + * Copyright (C) 2008-2014 TrinityCore <http://www.trinitycore.org/> + * + * This program is free software; you can redistribute it and/or modify it + * under the terms of the GNU General Public License as published by the + * Free Software Foundation; either version 2 of the License, or (at your + * option) any later version. + * + * This program is distributed in the hope that it will be useful, but WITHOUT + * ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or + * FITNESS FOR A PARTICULAR PURPOSE. See the GNU General Public License for + * more details. + * + * You should have received a copy of the GNU General Public License along + * with this program. If not, see <http://www.gnu.org/licenses/>. + */ + +#ifndef _COMMANDS_H +#define _COMMANDS_H + +#include "Define.h" +#include <string> + +enum Channels +{ + IPC_CHANNEL_BNET, + + MAX_IPC_CHANNELS, +}; + +enum BnetCommands +{ + BNET_CHANGE_TOON_ONLINE_STATE, + + IPC_BNET_MAX_COMMAND +}; + +struct IPCHeader +{ + uint8 Channel; + uint8 Command; +}; + +namespace Battlenet +{ + struct RealmHandle + { + uint8 Region; + uint8 Battlegroup; + uint32 Index; + }; + + struct Header + { + IPCHeader Ipc; + RealmHandle Realm; + }; + + struct ToonHandle + { + uint32 AccountId; + uint32 GameAccountId; + uint64 Guid; + std::string Name; + }; +} + +namespace zmqpp +{ + class message; +} + +zmqpp::message& operator>>(zmqpp::message& msg, IPCHeader& header); +zmqpp::message& operator>>(zmqpp::message& msg, Battlenet::RealmHandle& realm); +zmqpp::message& operator>>(zmqpp::message& msg, Battlenet::Header& header); +zmqpp::message& operator>>(zmqpp::message& msg, Battlenet::ToonHandle& toonHandle); + +zmqpp::message& operator<<(zmqpp::message& msg, IPCHeader& header); +zmqpp::message& operator<<(zmqpp::message& msg, Battlenet::RealmHandle& realm); +zmqpp::message& operator<<(zmqpp::message& msg, Battlenet::Header& header); +zmqpp::message& operator<<(zmqpp::message& msg, Battlenet::ToonHandle& toonHandle); + +#endif // _COMMANDS_H diff --git a/src/server/ipc/ZMQTask.cpp b/src/server/ipc/ZMQTask.cpp new file mode 100644 index 00000000000..0d25dc2babf --- /dev/null +++ b/src/server/ipc/ZMQTask.cpp @@ -0,0 +1,93 @@ +/* + * Copyright (C) 2008-2014 TrinityCore <http://www.trinitycore.org/> + * + * This program is free software; you can redistribute it and/or modify it + * under the terms of the GNU General Public License as published by the + * Free Software Foundation; either version 2 of the License, or (at your + * option) any later version. + * + * This program is distributed in the hope that it will be useful, but WITHOUT + * ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or + * FITNESS FOR A PARTICULAR PURPOSE. See the GNU General Public License for + * more details. + * + * You should have received a copy of the GNU General Public License along + * with this program. If not, see <http://www.gnu.org/licenses/>. + */ + +#include "ZMQTask.h" +#include "ZmqContext.h" +#include <zmqpp/message.hpp> + +ZMQTask::ZMQTask() +{ + _poller = new zmqpp::poller(); +} + +ZMQTask::~ZMQTask() +{ + delete _poller; + _poller = NULL; + delete _inproc; + delete _thread; +} + +void ZMQTask::Start() +{ + _inproc = sIpcContext->CreateInprocSubscriber(); + _poller->add(*_inproc); + + HandleOpen(); + _thread = new std::thread(&ZMQTask::Run, this); +} + +void ZMQTask::End() +{ + _thread->join(); + _inproc->close(); + HandleClose(); +} + +bool ZMQTask::ProcessExit() +{ + if (_poller->events(*_inproc) == zmqpp::poller::poll_in) + { + int op1; + do + { + zmqpp::message msg; + if (!_inproc->receive(msg, true)) + return false; //No more messages to read from sock. This shouldn't happen. + + // strip 'internalmq.' from message + std::string cmd = msg.get(0).substr(11); + if (cmd == "kill") + return true; + + _inproc->get(zmqpp::socket_option::events, op1); + } while (op1 & zmqpp::poller::poll_in); + } + + return false; +} + +void ZMQTask::Pipeline(zmqpp::socket* from, zmqpp::socket* to) +{ + /* + Push messages from socket to socket. + */ + if (_poller->events(*from) == zmqpp::poller::poll_in) + { + int32 op1, op2; + do + { + zmqpp::message msg; + if (!from->receive(msg, true)) + return; //No more messages to read from socket. This shouldn't happen. + + to->send(msg); + from->get(zmqpp::socket_option::events, op1); + to->get(zmqpp::socket_option::events, op2); + } while(op1 & zmqpp::poller::poll_in && op2 & zmqpp::poller::poll_out); + } +} diff --git a/src/server/ipc/ZMQTask.h b/src/server/ipc/ZMQTask.h new file mode 100644 index 00000000000..24251893aaa --- /dev/null +++ b/src/server/ipc/ZMQTask.h @@ -0,0 +1,52 @@ +/* + * Copyright (C) 2008-2014 TrinityCore <http://www.trinitycore.org/> + * + * This program is free software; you can redistribute it and/or modify it + * under the terms of the GNU General Public License as published by the + * Free Software Foundation; either version 2 of the License, or (at your + * option) any later version. + * + * This program is distributed in the hope that it will be useful, but WITHOUT + * ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or + * FITNESS FOR A PARTICULAR PURPOSE. See the GNU General Public License for + * more details. + * + * You should have received a copy of the GNU General Public License along + * with this program. If not, see <http://www.gnu.org/licenses/>. + */ + +#ifndef __ZMQTASK_H +#define __ZMQTASK_H + +#include "Define.h" +#include <thread> +#include <zmqpp/poller.hpp> +#include <zmqpp/socket.hpp> + +/* + This class serves as a base for all long running tasks + It is set up to terminate its running task upon receiving "kill" command +*/ +class ZMQTask +{ +public: + ZMQTask(); + virtual ~ZMQTask(); + + void Start(); + void End(); + virtual void Run() = 0; + +protected: + virtual void HandleOpen() { } + virtual void HandleClose() { } + void Pipeline(zmqpp::socket* from, zmqpp::socket* to); + bool ProcessExit(); + + zmqpp::poller* _poller; + + zmqpp::socket* _inproc; + std::thread* _thread; +}; + +#endif // __ZMQTASK_H diff --git a/src/server/ipc/ZmqContext.cpp b/src/server/ipc/ZmqContext.cpp new file mode 100644 index 00000000000..305e6b1d843 --- /dev/null +++ b/src/server/ipc/ZmqContext.cpp @@ -0,0 +1,52 @@ +/* + * Copyright (C) 2008-2014 TrinityCore <http://www.trinitycore.org/> + * + * This program is free software; you can redistribute it and/or modify it + * under the terms of the GNU General Public License as published by the + * Free Software Foundation; either version 2 of the License, or (at your + * option) any later version. + * + * This program is distributed in the hope that it will be useful, but WITHOUT + * ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or + * FITNESS FOR A PARTICULAR PURPOSE. See the GNU General Public License for + * more details. + * + * You should have received a copy of the GNU General Public License along + * with this program. If not, see <http://www.gnu.org/licenses/>. + */ + +#include "ZmqContext.h" + +ZmqContext::ZmqContext() : _inproc(nullptr) +{ +} + +ZmqContext::~ZmqContext() +{ + delete _inproc; +} + +zmqpp::socket* ZmqContext::CreateNewSocket(zmqpp::socket_type type) +{ + std::unique_lock<std::mutex> lock(_mutex); + return new zmqpp::socket(_context, type); +} + +void ZmqContext::Initialize() +{ + _inproc = new zmqpp::socket(_context, zmqpp::socket_type::pub); + _inproc->bind("inproc://workers"); +} + +zmqpp::socket* ZmqContext::CreateInprocSubscriber() +{ + zmqpp::socket* sub = CreateNewSocket(zmqpp::socket_type::sub); + sub->connect("inproc://workers"); + sub->subscribe("internalmq."); + return sub; +} + +void ZmqContext::Close() +{ + _inproc->send("internalmq.kill"); +} diff --git a/src/server/ipc/ZmqContext.h b/src/server/ipc/ZmqContext.h new file mode 100644 index 00000000000..a6ad12b1b70 --- /dev/null +++ b/src/server/ipc/ZmqContext.h @@ -0,0 +1,55 @@ +/* + * Copyright (C) 2008-2014 TrinityCore <http://www.trinitycore.org/> + * + * This program is free software; you can redistribute it and/or modify it + * under the terms of the GNU General Public License as published by the + * Free Software Foundation; either version 2 of the License, or (at your + * option) any later version. + * + * This program is distributed in the hope that it will be useful, but WITHOUT + * ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or + * FITNESS FOR A PARTICULAR PURPOSE. See the GNU General Public License for + * more details. + * + * You should have received a copy of the GNU General Public License along + * with this program. If not, see <http://www.gnu.org/licenses/>. + */ + +#ifndef __ZMQCONTEX_H +#define __ZMQCONTEX_H + +#include <zmqpp/zmqpp.hpp> +#include <mutex> + +/* + * We need to serialize access to zmq context otherwise stuff blows up. + */ +class ZmqContext +{ +public: + ~ZmqContext(); + + static ZmqContext* Instance() + { + static ZmqContext instance; + return &instance; + } + + zmqpp::socket* CreateNewSocket(zmqpp::socket_type); + void Initialize(); + zmqpp::socket* CreateInprocSubscriber(); + void Close(); + +private: + ZmqContext(); + ZmqContext(ZmqContext const&) = delete; + ZmqContext& operator=(ZmqContext const&) = delete; + + zmqpp::context _context; + std::mutex _mutex; + zmqpp::socket* _inproc; +}; + +#define sIpcContext ZmqContext::Instance() + +#endif diff --git a/src/server/ipc/ZmqListener.cpp b/src/server/ipc/ZmqListener.cpp new file mode 100644 index 00000000000..98333305e58 --- /dev/null +++ b/src/server/ipc/ZmqListener.cpp @@ -0,0 +1,69 @@ +/* + * Copyright (C) 2008-2014 TrinityCore <http://www.trinitycore.org/> + * + * This program is free software; you can redistribute it and/or modify it + * under the terms of the GNU General Public License as published by the + * Free Software Foundation; either version 2 of the License, or (at your + * option) any later version. + * + * This program is distributed in the hope that it will be useful, but WITHOUT + * ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or + * FITNESS FOR A PARTICULAR PURPOSE. See the GNU General Public License for + * more details. + * + * You should have received a copy of the GNU General Public License along + * with this program. If not, see <http://www.gnu.org/licenses/>. + */ + +#include "ZmqListener.h" +#include "ZmqContext.h" + +ZmqListener::ZmqListener(std::string const& from, std::string const& to) +{ + _from = sIpcContext->CreateNewSocket(zmqpp::socket_type::sub); + _to = sIpcContext->CreateNewSocket(zmqpp::socket_type::push); + _from->connect(from); + _to->bind(to); +} + +ZmqListener::~ZmqListener() +{ + delete _from; + delete _to; +} + +void ZmqListener::HandleOpen() +{ +} + +void ZmqListener::HandleClose() +{ + _from->close(); + _to->close(); +} + +void ZmqListener::Run() +{ + while (!ProcessExit()) + { + _poller->poll(); + + while (_poller->events(*_from) & zmqpp::poller::poll_in && + _poller->events(*_to) & zmqpp::poller::poll_out) + { + zmqpp::message msg; + _from->receive(msg); + _to->send(msg); + } + } +} + +void ZmqListener::Subscribe(std::string const& keyword) +{ + _from->subscribe(keyword); +} + +void ZmqListener::Unsubscribe(std::string const& keyword) +{ + _from->unsubscribe(keyword); +} diff --git a/src/server/ipc/ZmqListener.h b/src/server/ipc/ZmqListener.h new file mode 100644 index 00000000000..8b79ba67f6d --- /dev/null +++ b/src/server/ipc/ZmqListener.h @@ -0,0 +1,51 @@ +/* + * Copyright (C) 2008-2014 TrinityCore <http://www.trinitycore.org/> + * + * This program is free software; you can redistribute it and/or modify it + * under the terms of the GNU General Public License as published by the + * Free Software Foundation; either version 2 of the License, or (at your + * option) any later version. + * + * This program is distributed in the hope that it will be useful, but WITHOUT + * ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or + * FITNESS FOR A PARTICULAR PURPOSE. See the GNU General Public License for + * more details. + * + * You should have received a copy of the GNU General Public License along + * with this program. If not, see <http://www.gnu.org/licenses/>. + */ + +#ifndef __ZMQLISTENER_H +#define __ZMQLISTENER_H + +#include "ZMQTask.h" +#include <zmqpp/zmqpp.hpp> + +class ZmqListener : public ZMQTask +{ +/* + * Read broadcasts from remote PUB socket, and forward them to + * another socket. + * + * from - client SUB socket + * to - listen PUSH socket + * + */ +public: + ZmqListener(std::string const& from, std::string const& to); + ~ZmqListener(); + void Run() override; + + void Subscribe(std::string const& keyword); + void Unsubscribe(std::string const& keyword); + +protected: + void HandleOpen() override; + void HandleClose() override; + +private: + zmqpp::socket* _from; + zmqpp::socket* _to; +}; + +#endif diff --git a/src/server/ipc/ZmqMux.cpp b/src/server/ipc/ZmqMux.cpp new file mode 100644 index 00000000000..4b5a4f48b05 --- /dev/null +++ b/src/server/ipc/ZmqMux.cpp @@ -0,0 +1,67 @@ +/* + * Copyright (C) 2008-2014 TrinityCore <http://www.trinitycore.org/> + * + * This program is free software; you can redistribute it and/or modify it + * under the terms of the GNU General Public License as published by the + * Free Software Foundation; either version 2 of the License, or (at your + * option) any later version. + * + * This program is distributed in the hope that it will be useful, but WITHOUT + * ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or + * FITNESS FOR A PARTICULAR PURPOSE. See the GNU General Public License for + * more details. + * + * You should have received a copy of the GNU General Public License along + * with this program. If not, see <http://www.gnu.org/licenses/>. + */ + +#include "ZmqMux.h" +#include "ZmqContext.h" + +ZmqMux::ZmqMux(std::string from_uri, std::string to_uri): + _fromAddress(from_uri) +{ + printf("Opening muxer thread from %s to %s\n", from_uri.c_str(), to_uri.c_str()); + _from = sIpcContext->CreateNewSocket(zmqpp::socket_type::pull); + _to = sIpcContext->CreateNewSocket(zmqpp::socket_type::push); + + _from->bind(from_uri); + _to->connect(to_uri); +} + +ZmqMux::~ZmqMux() +{ + delete _from; + delete _to; +} + +void ZmqMux::HandleOpen() +{ + _poller->add(*_from); + _poller->add(*_to, zmqpp::poller::poll_out); +} + +bool ZmqMux::Send(zmqpp::message* m, bool dont_block) +{ + if (_socket.get() == nullptr) + { + _socket.reset(sIpcContext->CreateNewSocket(zmqpp::socket_type::push)); + _socket->connect(_fromAddress); + } + + return _socket->send(*m, dont_block); +} + +void ZmqMux::Run() +{ + for (;;) + { + if (!_poller->poll()) + break; + + if (ProcessExit()) + break; + + Pipeline(_from, _to); + } +} diff --git a/src/server/ipc/ZmqMux.h b/src/server/ipc/ZmqMux.h new file mode 100644 index 00000000000..4b81f11daaf --- /dev/null +++ b/src/server/ipc/ZmqMux.h @@ -0,0 +1,47 @@ +/* + * Copyright (C) 2008-2014 TrinityCore <http://www.trinitycore.org/> + * + * This program is free software; you can redistribute it and/or modify it + * under the terms of the GNU General Public License as published by the + * Free Software Foundation; either version 2 of the License, or (at your + * option) any later version. + * + * This program is distributed in the hope that it will be useful, but WITHOUT + * ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or + * FITNESS FOR A PARTICULAR PURPOSE. See the GNU General Public License for + * more details. + * + * You should have received a copy of the GNU General Public License along + * with this program. If not, see <http://www.gnu.org/licenses/>. + */ + +#ifndef __ZMQMUX_H +#define __ZMQMUX_H + +#include "ZMQTask.h" +#include <string> +#include <boost/thread/tss.hpp> + +/* + * Multiplexes zmq messages from many threads, + * and then passes them to another socket. + */ +class ZmqMux : public ZMQTask +{ +public: + ZmqMux(std::string from, std::string to); + ~ZmqMux(); + bool Send(zmqpp::message*, bool dont_block = false); + void Run() override; + +protected: + void HandleOpen() override; + +private: + boost::thread_specific_ptr<zmqpp::socket> _socket; + zmqpp::socket* _from; + zmqpp::socket* _to; + std::string const _fromAddress; +}; + +#endif diff --git a/src/server/ipc/ZmqWorker.cpp b/src/server/ipc/ZmqWorker.cpp new file mode 100644 index 00000000000..f205ea831b5 --- /dev/null +++ b/src/server/ipc/ZmqWorker.cpp @@ -0,0 +1,70 @@ +/* + * Copyright (C) 2008-2014 TrinityCore <http://www.trinitycore.org/> + * + * This program is free software; you can redistribute it and/or modify it + * under the terms of the GNU General Public License as published by the + * Free Software Foundation; either version 2 of the License, or (at your + * option) any later version. + * + * This program is distributed in the hope that it will be useful, but WITHOUT + * ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or + * FITNESS FOR A PARTICULAR PURPOSE. See the GNU General Public License for + * more details. + * + * You should have received a copy of the GNU General Public License along + * with this program. If not, see <http://www.gnu.org/licenses/>. + */ + +#include "ZmqWorker.h" +#include "ZmqContext.h" + +ZmqWorker::ZmqWorker(std::string task_uri, std::string res_uri) : + _taskUri(task_uri), _resultsUri(res_uri) +{ +} + +ZmqWorker::~ZmqWorker() +{ + delete _taskQueue; + delete _results; + delete _inproc; +} + +void ZmqWorker::HandleOpen() +{ + _taskQueue = sIpcContext->CreateNewSocket(zmqpp::socket_type::pull); + _results = sIpcContext->CreateNewSocket(zmqpp::socket_type::push); + + _taskQueue->connect(_taskUri); + _results->connect(_resultsUri); + + _poller->add(*_taskQueue); +} + +void ZmqWorker::HandleClose() +{ + _taskQueue->close(); + _results->close(); +} + +void ZmqWorker::Run() +{ + while (!ProcessExit()) + { + _poller->poll(); + if (_poller->events(*_taskQueue) & zmqpp::poller::poll_in) + PerformWork(); + } +} + +void ZmqWorker::PerformWork() +{ + int32 op1; + do + { + zmqpp::message msg; + _taskQueue->receive(msg); + Dispatch(msg); + _taskQueue->get(zmqpp::socket_option::events, op1); + } while (op1 & zmqpp::poller::poll_in); +} diff --git a/src/server/ipc/ZmqWorker.h b/src/server/ipc/ZmqWorker.h new file mode 100644 index 00000000000..b3e221e9129 --- /dev/null +++ b/src/server/ipc/ZmqWorker.h @@ -0,0 +1,44 @@ +/* + * Copyright (C) 2008-2014 TrinityCore <http://www.trinitycore.org/> + * + * This program is free software; you can redistribute it and/or modify it + * under the terms of the GNU General Public License as published by the + * Free Software Foundation; either version 2 of the License, or (at your + * option) any later version. + * + * This program is distributed in the hope that it will be useful, but WITHOUT + * ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or + * FITNESS FOR A PARTICULAR PURPOSE. See the GNU General Public License for + * more details. + * + * You should have received a copy of the GNU General Public License along + * with this program. If not, see <http://www.gnu.org/licenses/>. + */ + +#ifndef __ZMQWORKER_H +#define __ZMQWORKER_H + +#include "ZMQTask.h" +#include <zmqpp/zmqpp.hpp> + +class ZmqWorker : public ZMQTask +{ +public: + ZmqWorker(std::string task_uri, std::string res_uri); + ~ZmqWorker(); + void Run() override; + +protected: + void HandleOpen() override; + void HandleClose() override; + zmqpp::socket* _results; + +private: + void PerformWork(); + virtual void Dispatch(zmqpp::message const&) = 0; + zmqpp::socket* _taskQueue; + std::string _taskUri; + std::string _resultsUri; +}; + +#endif diff --git a/src/server/scripts/CMakeLists.txt b/src/server/scripts/CMakeLists.txt index e92c883eeff..eab6a36d02e 100644 --- a/src/server/scripts/CMakeLists.txt +++ b/src/server/scripts/CMakeLists.txt @@ -64,6 +64,7 @@ include_directories( ${CMAKE_SOURCE_DIR}/src/server/shared/Packets ${CMAKE_SOURCE_DIR}/src/server/shared/Threading ${CMAKE_SOURCE_DIR}/src/server/shared/Utilities + ${CMAKE_SOURCE_DIR}/src/server/ipc ${CMAKE_SOURCE_DIR}/src/server/collision ${CMAKE_SOURCE_DIR}/src/server/collision/Management ${CMAKE_SOURCE_DIR}/src/server/collision/Models diff --git a/src/server/scripts/Commands/cs_gm.cpp b/src/server/scripts/Commands/cs_gm.cpp index 27ec4835ce6..4c5b68516ca 100644 --- a/src/server/scripts/Commands/cs_gm.cpp +++ b/src/server/scripts/Commands/cs_gm.cpp @@ -163,7 +163,7 @@ public: ///- Get the accounts with GM Level >0 PreparedStatement* stmt = LoginDatabase.GetPreparedStatement(LOGIN_SEL_GM_ACCOUNTS); stmt->setUInt8(0, uint8(SEC_MODERATOR)); - stmt->setInt32(1, int32(realmID)); + stmt->setInt32(1, int32(realmHandle.Index)); PreparedQueryResult result = LoginDatabase.Query(stmt); if (result) diff --git a/src/server/scripts/Commands/cs_misc.cpp b/src/server/scripts/Commands/cs_misc.cpp index 386be81c198..6fa3c6512b7 100644 --- a/src/server/scripts/Commands/cs_misc.cpp +++ b/src/server/scripts/Commands/cs_misc.cpp @@ -1542,7 +1542,7 @@ public: // Query the prepared statement for login data stmt = LoginDatabase.GetPreparedStatement(LOGIN_SEL_PINFO); - stmt->setInt32(0, int32(realmID)); + stmt->setInt32(0, int32(realmHandle.Index)); stmt->setUInt32(1, accId); PreparedQueryResult result = LoginDatabase.Query(stmt); diff --git a/src/server/scripts/Commands/cs_rbac.cpp b/src/server/scripts/Commands/cs_rbac.cpp index 95ef5ab6984..ab960026d69 100644 --- a/src/server/scripts/Commands/cs_rbac.cpp +++ b/src/server/scripts/Commands/cs_rbac.cpp @@ -160,7 +160,7 @@ public: if (!rdata)
{
- data->rbac = new rbac::RBACData(accountId, accountName, realmID, AccountMgr::GetSecurity(accountId, realmID));
+ data->rbac = new rbac::RBACData(accountId, accountName, realmHandle.Index, AccountMgr::GetSecurity(accountId, realmHandle.Index));
data->rbac->LoadFromDB();
data->needDelete = true;
}
diff --git a/src/server/scripts/Commands/cs_ticket.cpp b/src/server/scripts/Commands/cs_ticket.cpp index 674658f41ff..f2aa046676f 100644 --- a/src/server/scripts/Commands/cs_ticket.cpp +++ b/src/server/scripts/Commands/cs_ticket.cpp @@ -98,7 +98,7 @@ public: ObjectGuid targetGuid = sObjectMgr->GetPlayerGUIDByName(target); uint32 accountId = sObjectMgr->GetPlayerAccountIdByGUID(targetGuid); // Target must exist and have administrative rights - if (!AccountMgr::HasPermission(accountId, rbac::RBAC_PERM_COMMANDS_BE_ASSIGNED_TICKET, realmID)) + if (!AccountMgr::HasPermission(accountId, rbac::RBAC_PERM_COMMANDS_BE_ASSIGNED_TICKET, realmHandle.Index)) { handler->SendSysMessage(LANG_COMMAND_TICKETASSIGNERROR_A); return true; @@ -122,7 +122,7 @@ public: // Assign ticket SQLTransaction trans = SQLTransaction(NULL); - ticket->SetAssignedTo(targetGuid, AccountMgr::IsAdminAccount(AccountMgr::GetSecurity(accountId, realmID))); + ticket->SetAssignedTo(targetGuid, AccountMgr::IsAdminAccount(AccountMgr::GetSecurity(accountId, realmHandle.Index))); ticket->SaveToDB(trans); sTicketMgr->UpdateLastChange(); @@ -378,7 +378,7 @@ public: { ObjectGuid guid = ticket->GetAssignedToGUID(); uint32 accountId = sObjectMgr->GetPlayerAccountIdByGUID(guid); - security = AccountMgr::GetSecurity(accountId, realmID); + security = AccountMgr::GetSecurity(accountId, realmHandle.Index); } // Check security diff --git a/src/server/worldserver/CMakeLists.txt b/src/server/worldserver/CMakeLists.txt index 0cdf5f13f79..65972e680ef 100644 --- a/src/server/worldserver/CMakeLists.txt +++ b/src/server/worldserver/CMakeLists.txt @@ -47,6 +47,7 @@ include_directories( ${CMAKE_SOURCE_DIR}/dep/gsoap ${CMAKE_SOURCE_DIR}/dep/sockets/include ${CMAKE_SOURCE_DIR}/dep/SFMT + ${CMAKE_SOURCE_DIR}/dep/zmqpp ${CMAKE_SOURCE_DIR}/src/server/collision ${CMAKE_SOURCE_DIR}/src/server/collision/Management ${CMAKE_SOURCE_DIR}/src/server/collision/Models @@ -64,6 +65,7 @@ include_directories( ${CMAKE_SOURCE_DIR}/src/server/shared/Packets ${CMAKE_SOURCE_DIR}/src/server/shared/Threading ${CMAKE_SOURCE_DIR}/src/server/shared/Utilities + ${CMAKE_SOURCE_DIR}/src/server/ipc ${CMAKE_SOURCE_DIR}/src/server/game ${CMAKE_SOURCE_DIR}/src/server/game/Accounts ${CMAKE_SOURCE_DIR}/src/server/game/Achievements @@ -141,6 +143,7 @@ include_directories( ${MYSQL_INCLUDE_DIR} ${OPENSSL_INCLUDE_DIR} ${VALGRIND_INCLUDE_DIR} + ${ZMQ_INCLUDE_DIR} ) add_executable(worldserver @@ -164,18 +167,21 @@ set_target_properties(worldserver PROPERTIES LINK_FLAGS "${worldserver_LINK_FLAG target_link_libraries(worldserver game + ipc shared scripts collision g3dlib gsoap Detour + zmqpp ${JEMALLOC_LIBRARY} ${READLINE_LIBRARY} ${TERMCAP_LIBRARY} ${MYSQL_LIBRARY} ${OPENSSL_LIBRARIES} ${ZLIB_LIBRARIES} + ${ZMQ_LIBRARY} ${CMAKE_THREAD_LIBS_INIT} ${Boost_LIBRARIES} ) diff --git a/src/server/worldserver/Main.cpp b/src/server/worldserver/Main.cpp index 6c93343b8de..c5127b8f3e2 100644 --- a/src/server/worldserver/Main.cpp +++ b/src/server/worldserver/Main.cpp @@ -20,13 +20,9 @@ /// @{ /// \file -#include <openssl/opensslv.h> -#include <openssl/crypto.h> -#include <boost/asio/io_service.hpp> -#include <boost/asio/deadline_timer.hpp> -#include <boost/program_options.hpp> - #include "Common.h" +#include "Commands.h" +#include "ZmqContext.h" #include "DatabaseEnv.h" #include "AsyncAcceptor.h" #include "RASession.h" @@ -47,6 +43,12 @@ #include "SystemConfig.h" #include "WorldSocket.h" #include "WorldSocketMgr.h" +#include "BattlenetServerManager.h" +#include <openssl/opensslv.h> +#include <openssl/crypto.h> +#include <boost/asio/io_service.hpp> +#include <boost/asio/deadline_timer.hpp> +#include <boost/program_options.hpp> using namespace boost::program_options; @@ -79,7 +81,7 @@ uint32 _maxCoreStuckTimeInMs(0); WorldDatabaseWorkerPool WorldDatabase; ///< Accessor to the world database CharacterDatabaseWorkerPool CharacterDatabase; ///< Accessor to the character database LoginDatabaseWorkerPool LoginDatabase; ///< Accessor to the realm/login database -uint32 realmID; ///< Id of the realm +Battlenet::RealmHandle realmHandle; ///< Id of the realm void SignalHandler(const boost::system::error_code& error, int signalNumber); void FreezeDetectorHandler(const boost::system::error_code& error); @@ -188,7 +190,7 @@ extern int main(int argc, char** argv) } // Set server offline (not connectable) - LoginDatabase.DirectPExecute("UPDATE realmlist SET flag = (flag & ~%u) | %u WHERE id = '%d'", REALM_FLAG_OFFLINE, REALM_FLAG_INVALID, realmID); + LoginDatabase.DirectPExecute("UPDATE realmlist SET flag = (flag & ~%u) | %u WHERE id = '%d'", REALM_FLAG_OFFLINE, REALM_FLAG_INVALID, realmHandle.Index); // Initialize the World sWorld->SetInitialWorldSettings(); @@ -223,7 +225,7 @@ extern int main(int argc, char** argv) sWorldSocketMgr.StartNetwork(_ioService, worldListener, worldPort); // Set server online (allow connecting now) - LoginDatabase.DirectPExecute("UPDATE realmlist SET flag = flag & ~%u, population = 0 WHERE id = '%u'", REALM_FLAG_INVALID, realmID); + LoginDatabase.DirectPExecute("UPDATE realmlist SET flag = flag & ~%u, population = 0 WHERE id = '%u'", REALM_FLAG_INVALID, realmHandle.Index); // Start the freeze check callback cycle in 5 seconds (cycle itself is 1 sec) if (int coreStuckTime = sConfigMgr->GetIntDefault("MaxCoreStuckTime", 0)) @@ -234,6 +236,10 @@ extern int main(int argc, char** argv) TC_LOG_INFO("server.worldserver", "Starting up anti-freeze thread (%u seconds max stuck time)...", coreStuckTime); } + sIpcContext->Initialize(); + + sBattlenetServer.InitializeConnection(); + TC_LOG_INFO("server.worldserver", "%s (worldserver-daemon) ready...", _FULLVERSION); sScriptMgr->OnStartup(); @@ -245,6 +251,10 @@ extern int main(int argc, char** argv) sScriptMgr->OnShutdown(); + sIpcContext->Close(); + + sBattlenetServer.CloseConnection(); + sWorld->KickAll(); // save and kick all players sWorld->UpdateSessions(1); // real players unload required UpdateSessions call @@ -260,7 +270,7 @@ extern int main(int argc, char** argv) sOutdoorPvPMgr->Die(); // set server offline - LoginDatabase.DirectPExecute("UPDATE realmlist SET flag = flag | %u WHERE id = '%d'", REALM_FLAG_OFFLINE, realmID); + LoginDatabase.DirectPExecute("UPDATE realmlist SET flag = flag | %u WHERE id = '%d'", REALM_FLAG_OFFLINE, realmHandle.Index); // Clean up threads if any if (soapThread != nullptr) @@ -523,13 +533,24 @@ bool StartDB() } ///- Get the realm Id from the configuration file - realmID = sConfigMgr->GetIntDefault("RealmID", 0); - if (!realmID) + realmHandle.Index = sConfigMgr->GetIntDefault("RealmID", 0); + if (!realmHandle.Index) { TC_LOG_ERROR("server.worldserver", "Realm ID not defined in configuration file"); return false; } - TC_LOG_INFO("server.worldserver", "Realm running as realm ID %d", realmID); + + QueryResult realmIdQuery = LoginDatabase.PQuery("SELECT `Region`,`Battlegroup` FROM `realmlist` WHERE `id`=%u", realmHandle.Index); + if (!realmIdQuery) + { + TC_LOG_ERROR("server.worldserver", "Realm id %u not defined in realmlist table", realmHandle.Index); + return false; + } + + realmHandle.Region = (*realmIdQuery)[0].GetUInt8(); + realmHandle.Battlegroup = (*realmIdQuery)[1].GetUInt8(); + + TC_LOG_INFO("server.worldserver", "Realm running as realm ID %u region %u battlegroup %u", realmHandle.Index, uint32(realmHandle.Region), uint32(realmHandle.Battlegroup)); ///- Clean the database before starting ClearOnlineAccounts(); @@ -556,7 +577,7 @@ void StopDB() void ClearOnlineAccounts() { // Reset online status for all accounts with characters on the current realm - LoginDatabase.DirectPExecute("UPDATE account SET online = 0 WHERE online > 0 AND id IN (SELECT acctid FROM realmcharacters WHERE realmid = %d)", realmID); + LoginDatabase.DirectPExecute("UPDATE account SET online = 0 WHERE online > 0 AND id IN (SELECT acctid FROM realmcharacters WHERE realmid = %d)", realmHandle.Index); // Reset online status for all characters CharacterDatabase.DirectExecute("UPDATE characters SET online = 0 WHERE online <> 0"); diff --git a/src/server/worldserver/worldserver.conf.dist b/src/server/worldserver/worldserver.conf.dist index 6c3972de421..57d97756d70 100644 --- a/src/server/worldserver/worldserver.conf.dist +++ b/src/server/worldserver/worldserver.conf.dist @@ -3324,3 +3324,24 @@ PacketSpoof.BanDuration = 86400 # ################################################################################################### + +################################################################################################### +# IPC SETTINGS +# +# BnetServer.Address +# Description: Determines IP address of battle.net server to connect to. +# Default: 127.0.0.1 +# + +BnetServer.Address = 127.0.0.1 + +# +# BnetServer.Port +# Description: Determines port to use when connecting to battle.net server. +# Default: 1118 +# + +BnetServer.Port = 1118 + +# +################################################################################################### |