Core/Dep: Removed ZMQ dependency in preparation for future changes

This commit is contained in:
Shauren
2016-03-20 18:50:25 +01:00
parent 1fea358bec
commit 92e247b226
50 changed files with 6 additions and 4853 deletions

View File

@@ -34,8 +34,6 @@ if(SERVERS)
add_subdirectory(process)
add_subdirectory(readline)
add_subdirectory(gsoap)
add_subdirectory(zmq)
add_subdirectory(zmqpp)
endif()
if(TOOLS)

View File

@@ -1,21 +0,0 @@
# Copyright (C) 2008-2016 TrinityCore <http://www.trinitycore.org/>
#
# This file is free software; as a special exception the author gives
# unlimited permission to copy and/or distribute it, with or without
# modifications, as long as this notice is preserved.
#
# This program is distributed in the hope that it will be useful, but
# WITHOUT ANY WARRANTY, to the extent permitted by law; without even the
# implied warranty of MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.
find_package(ZMQ REQUIRED)
add_library(zmq INTERFACE)
target_link_libraries(zmq
INTERFACE
${ZMQ_LIBRARY})
target_include_directories(zmq
INTERFACE
${ZMQ_INCLUDE_DIR})

View File

@@ -1,37 +0,0 @@
# Copyright (C) 2008-2016 TrinityCore <http://www.trinitycore.org/>
#
# This file is free software; as a special exception the author gives
# unlimited permission to copy and/or distribute it, with or without
# modifications, as long as this notice is preserved.
#
# This program is distributed in the hope that it will be useful, but
# WITHOUT ANY WARRANTY, to the extent permitted by law; without even the
# implied warranty of MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.
file(GLOB_RECURSE sources_zmqpp zmqpp/*.cpp zmqpp/*.hpp zmqpp/*.h)
include_directories(${ZMQ_INCLUDE_DIR})
add_library(zmqpp STATIC
${sources_zmqpp}
)
target_link_libraries(zmqpp
PUBLIC
zmq)
target_include_directories(zmqpp
PUBLIC
${CMAKE_CURRENT_SOURCE_DIR})
set_target_properties(zmqpp
PROPERTIES
FOLDER
"dep")
target_compile_definitions(zmqpp
PRIVATE
-DBUILD_VERSION_MAJOR=3
-DBUILD_VERSION_MINOR=2
-DBUILD_VERSION_REVISION=0
-DBUILD_VERSION="3.2.0")

View File

@@ -1,97 +0,0 @@
/**
* \file
*
* \date 10 Sep 2011
* \author ron
* \author Ben Gray (\@benjamg)
*
* A fair number of C++0x (or more accurately C++11) features are used in this
* library and as this project is used where I work on older compilers this
* file was created to help.
*
* C++ features and their workaround where not supported:
* \li lambda functions - disabled, these are only used in the test anyway.
* \li typesafe enums - replaced with enum where comparisons needed.
* \li nullptr - defined to null.
*
* As of the port to version 3.1 (libzmqpp version 1.1.0) this file will also
* be used to maintain compatablity with multiple versions of 0mq
*/
#ifndef ZMQPP_COMPATIBILITY_HPP_
#define ZMQPP_COMPATIBILITY_HPP_
#include <zmq.h>
#include <cstdint>
// Currently we require at least 0mq version 2.2.x
#define ZMQPP_REQUIRED_ZMQ_MAJOR 2
#define ZMQPP_REQUIRED_ZMQ_MINOR 2
#if (ZMQ_VERSION_MAJOR < ZMQPP_REQUIRED_ZMQ_MAJOR) || ((ZMQ_VERSION_MAJOR == ZMQPP_REQUIRED_ZMQ_MAJOR) && (ZMQ_VERSION_MINOR < ZMQPP_REQUIRED_ZMQ_MINOR))
#error zmqpp requires a later version of 0mq
#endif
// Experimental feature support
#if (ZMQ_VERSION_MAJOR == 3) && (ZMQ_VERSION_MINOR == 0)
#define ZMQ_EXPERIMENTAL_LABELS
#endif
// Deal with older versions of gcc
#if defined(__GNUC__) && !defined(__clang__)
#if __GNUC__ == 4
// Deal with older gcc not supporting C++0x typesafe enum class name {} comparison
#if __GNUC_MINOR__ < 4
#define ZMQPP_COMPARABLE_ENUM enum
#endif
#if __GNUC_MINOR__ == 4
#if __GNUC_PATCHLEVEL__ < 1
#undef ZMQPP_COMPARABLE_ENUM
#define ZMQPP_COMPARABLE_ENUM enum
#endif // if __GNUC_PATCHLEVEL__ < 1
#endif // if __GNUC_MINOR__ == 4
// Deal with older gcc not supporting C++0x lambda function
#if __GNUC_MINOR__ < 5
#define ZMQPP_IGNORE_LAMBDA_FUNCTION_TESTS
#define ZMQPP_EXPLICITLY_DELETED
#endif // if __GNUC_MINOR__ < 5
// Deal with older gcc not supporting C++0x nullptr
#if __GNUC_MINOR__ < 6
#define nullptr NULL
#define NOEXCEPT
#endif // if __GNUC_MINOR__ < 6
#endif // if __GNUC_ == 4
#endif // if defined(__GNUC__) && !defined(__clang__)
#if defined(_MSC_VER)
#define NOEXCEPT throw()
#if _MSC_VER < 1800
#define ZMQPP_EXPLICITLY_DELETED
#endif // if _MSC_VER < 1800
#if _MSC_VER < 1600
#define nullptr NULL
#define ZMQPP_IGNORE_LAMBDA_FUNCTION_TESTS
#define ZMQPP_COMPARABLE_ENUM enum
#endif // if _MSC_VER < 1600
#endif // if defined(_MSC_VER)
// Generic state, assume a modern compiler
#ifndef ZMQPP_COMPARABLE_ENUM
#define ZMQPP_COMPARABLE_ENUM enum class
#endif
#ifndef ZMQPP_EXPLICITLY_DELETED
#define ZMQPP_EXPLICITLY_DELETED = delete
#endif
#ifndef NOEXCEPT
#define NOEXCEPT noexcept
#endif
#endif /* ZMQPP_COMPATIBILITY_HPP_ */

View File

@@ -1,54 +0,0 @@
/**
* \file
*
* \date 9 Aug 2011
* \author Ben Gray (\@benjamg)
*/
#include "context.hpp"
namespace zmqpp
{
void context::terminate()
{
int result;
do
{
#if (ZMQ_VERSION_MAJOR < 3) || ((ZMQ_VERSION_MAJOR == 3) && (ZMQ_VERSION_MINOR < 2))
result = zmq_term(_context);
#else
result = zmq_ctx_destroy(_context);
#endif
} while (result != 0 && zmq_errno() == EINTR);
if (result != 0) { throw zmq_internal_exception(); }
_context = nullptr;
}
#if (ZMQ_VERSION_MAJOR > 3) || ((ZMQ_VERSION_MAJOR == 3) && (ZMQ_VERSION_MINOR >= 2))
void context::set(context_option const option, int const value)
{
if (nullptr == _context) { throw invalid_instance("context is invalid"); }
if (0 != zmq_ctx_set(_context, static_cast<int>(option), value))
{
throw zmq_internal_exception();
}
}
int context::get(context_option const option)
{
if (nullptr == _context) { throw invalid_instance("context is invalid"); }
int result = zmq_ctx_get(_context, static_cast<int>(option));
if (result < 0)
{
throw zmq_internal_exception();
}
return result;
}
#endif
}

View File

@@ -1,184 +0,0 @@
/**
* \file
*
* \date 9 Aug 2011
* \author Ben Gray (\@benjamg)
*/
#ifndef ZMQPP_CONTEXT_HPP_
#define ZMQPP_CONTEXT_HPP_
#include <cassert>
#include <zmq.h>
#include "compatibility.hpp"
#include "exception.hpp"
#if (ZMQ_VERSION_MAJOR > 3) || ((ZMQ_VERSION_MAJOR == 3) && (ZMQ_VERSION_MINOR >= 2))
#include "context_options.hpp"
#endif
namespace zmqpp
{
/*!
* The context class represents internal zmq context and io threads.
*
* By default the context class will create one thread, however this can be
* overridden in the constructor.
*
* The context class is the only object that can be considered thread safe.
*
* All sockets using endpoints other than inproc require the context to have
* at least one thread.
*
* This class is c++0x move supporting and cannot be copied.
*/
class context
{
public:
#if (ZMQ_VERSION_MAJOR < 3) || ((ZMQ_VERSION_MAJOR == 3) && (ZMQ_VERSION_MINOR < 2))
/*!
* Initialise the 0mq context.
*
* If only inproc is used then the context may be created with zero threads.
* Any inproc endpoint using sockets must be created using the same context.
*
* The context is thread safe an may be used anywhere in your application,
* however there is no requirement (other than inproc restrictions) for you
* to do this.
*
* \param threads an integer argument for the number of required threads. Defaults to 1.
*/
context(int const& threads = 1)
#else
/*!
* Initialise the 0mq context.
*
* The context is thread safe an may be used anywhere in your application,
* however there is no requirement (other than inproc restrictions) for you
* to do this.
*/
context()
#endif
: _context(nullptr)
{
#if (ZMQ_VERSION_MAJOR < 3) || ((ZMQ_VERSION_MAJOR == 3) && (ZMQ_VERSION_MINOR < 2))
_context = zmq_init(threads);
#else
_context = zmq_ctx_new();
#endif
if (nullptr == _context)
{
throw zmq_internal_exception();
}
}
/*!
* Closes the 0mq context.
*
* Any blocking calls other than a socket close will return with an error.
*
* If there are open sockets will block while zmq internal buffers are
* processed up to a limit specified by that sockets linger option.
*/
~context() NOEXCEPT
{
if (nullptr != _context)
{
terminate();
}
}
/*!
* Move supporting constructor.
*
* Allows zero-copy move semantics to be used with this class.
*
* \param source a rvalue instance of the object who's internals we wish to steal.
*/
context(context&& source) NOEXCEPT
: _context(source._context)
{
source._context = nullptr;
}
/*!
* Move supporting operator.
*
* Allows zero-copy move semantics to be used with this class.
*
* \param source an rvalue instance of the context who's internals we wish to steal.
*/
context& operator=(context&& source) NOEXCEPT
{
std::swap( _context, source._context );
return *this;
}
/*!
* Terminate the current context.
*
* Any blocking calls other than a socket close will return with an error.
*
* If there are open sockets will block while zmq internal buffers are
* processed up to a limit specified by that sockets linger option.
*/
void terminate();
#if (ZMQ_VERSION_MAJOR > 3) || ((ZMQ_VERSION_MAJOR == 3) && (ZMQ_VERSION_MINOR >= 2))
/*!
* Set the value of an option in the underlaying zmq context.
*
* \param option a valid ::context_option
* \param value to set the option to
*/
void set(context_option const option, int const value);
/*!
* Get a context option from the underlaying zmq context.
*
* \param option a valid ::context_option
* \return context option value
*/
int get(context_option const option);
#endif
/*!
* Validity checking of the context
*
* Checks if the underlying 0mq context for this instance is valid.
*
* Contexts should always be valid unless people are doing 'fun' things with
* std::move.
*
* \return boolean true if the object is valid.
*/
operator bool() const NOEXCEPT
{
return nullptr != _context;
}
/*!
* Access to the raw 0mq context
*
* \return void pointer to the underlying 0mq context.
*/
operator void*() const NOEXCEPT
{
return _context;
}
private:
void* _context;
// No copy - private and not implemented
context(context const&) ZMQPP_EXPLICITLY_DELETED;
context& operator=(context const&) NOEXCEPT ZMQPP_EXPLICITLY_DELETED;
};
}
#endif /* ZMQPP_CONTEXT_HPP_ */

View File

@@ -1,26 +0,0 @@
/**
* \file
*
* \date 3 Jul 2013
* \author Ben Gray (\@benjamg)
*/
#ifndef ZMQPP_CONTEXT_OPTIONS_HPP_
#define ZMQPP_CONTEXT_OPTIONS_HPP_
namespace zmqpp
{
/** \todo Expand the information on the options to make it actually useful. */
/*!
* \brief possible Context options in zmq
*/
enum class context_option {
io_threads = ZMQ_IO_THREADS, /*!< I/O thread count */
max_sockets = ZMQ_MAX_SOCKETS, /*!< Maximum supported sockets */
};
}
#endif /* ZMQPP_CONTEXT_OPTIONS_HPP_ */

View File

@@ -1,87 +0,0 @@
/**
* \file
*
* \date 9 Aug 2011
* \author Ben Gray (\@benjamg)
*/
#ifndef ZMQPP_EXCEPTION_HPP_
#define ZMQPP_EXCEPTION_HPP_
#include <stdexcept>
#include <string>
#include <zmq.h>
namespace zmqpp
{
/** \todo Have a larger variety of exceptions with better state debug information */
/*!
* Represents the base zmqpp exception.
*
* All zmqpp runtime exceptions are children of this class.
* The class itself does not provide any special access fields but it only
* for convince when catching exceptions.
*
* The class extends std::runtime_error.
*
*/
class exception : public std::runtime_error
{
public:
/*!
* Standard exception constructor.
*
* \param message a string representing the error message.
*/
exception(std::string const& message)
: std::runtime_error(message)
{ }
};
/*!
* Represents an attempt to use an invalid object.
*
* Objects may be invalid initially or after a shutdown or close.
*/
class invalid_instance : public exception
{
public:
invalid_instance(std::string const& message)
: exception(message)
{ }
};
/*!
* Represents internal zmq errors.
*
* Any error response from the zmq bindings will be wrapped in this error.
*
* The class provides access to the zmq error number via zmq_error().
*/
class zmq_internal_exception : public exception
{
public:
/*!
* Uses the zmq functions to pull out error messages and numbers.
*/
zmq_internal_exception()
: exception(zmq_strerror(zmq_errno()))
, _error(zmq_errno())
{ }
/*!
* Retrieve the zmq error number associated with this exception.
* \return zmq error number
*/
int zmq_error() const { return _error; }
private:
int _error;
};
}
#endif /* ZMQPP_EXCEPTION_HPP_ */

View File

@@ -1,95 +0,0 @@
/**
* \file
*
* \date 8 Jan 2014
* \author Ben Gray (\@benjamg)
*/
#include <cassert>
#include <cstring>
#include "exception.hpp"
#include "frame.hpp"
namespace zmqpp {
frame::frame()
: _sent( false )
{
if( 0 != zmq_msg_init( &_msg ) )
{
throw zmq_internal_exception();
}
}
frame::frame(size_t const size)
: _sent( false )
{
if( 0 != zmq_msg_init_size( &_msg, size ) )
{
throw zmq_internal_exception();
}
}
frame::frame(void const* part, size_t const size)
: _sent( false )
{
if( 0 != zmq_msg_init_size( &_msg, size ) )
{
throw zmq_internal_exception();
}
void* msg_data = zmq_msg_data( &_msg );
memcpy( msg_data, part, size );
}
frame::frame(void* part, size_t const size, zmq_free_fn *ffn, void *hint)
: _sent( false )
{
if( 0 != zmq_msg_init_data( &_msg, part, size, ffn, hint ) )
{
throw zmq_internal_exception();
}
}
frame::~frame()
{
#ifndef NDEBUG // unused assert variable in release
int result = zmq_msg_close( &_msg );
assert(0 == result);
#else
zmq_msg_close( &_msg );
#endif // NDEBUG
}
frame::frame(frame&& other)
: _sent( other._sent )
{
zmq_msg_init( &_msg );
zmq_msg_move( &_msg, &other._msg );
other._sent = false;
}
frame& frame::operator=(frame&& other)
{
zmq_msg_init( &_msg );
zmq_msg_move( &_msg, &other._msg );
std::swap( _sent, other._sent );
return *this;
}
frame frame::copy() const
{
frame other( size() );
other._sent = _sent;
if( 0 != zmq_msg_copy( &other._msg, const_cast<zmq_msg_t*>(&_msg) ) )
{
throw zmq_internal_exception();
}
return other;
}
} // namespace zmqpp

View File

@@ -1,58 +0,0 @@
/**
* \file
*
* \date 8 Jan 2014
* \author Ben Gray (\@benjamg)
*/
#ifndef ZMQPP_MESSAGE_FRAME_HPP_
#define ZMQPP_MESSAGE_FRAME_HPP_
#include <zmq.h>
#include "compatibility.hpp"
namespace zmqpp {
/*!
* \brief an internal frame wrapper for a single zmq message
*
* This frame wrapper consists of a zmq message and meta data it is used
* by the zmqpp message class to keep track of parts in the internal
* queue. It is unlikely you need to use this class.
*/
class frame
{
public:
frame();
frame(size_t const size);
frame(void const* part, size_t const size);
frame(void* part, size_t const size, zmq_free_fn *ffn, void *hint);
~frame();
bool is_sent() const { return _sent; }
void const* data() const { return zmq_msg_data( const_cast<zmq_msg_t*>(&_msg) ); }
size_t size() const { return zmq_msg_size( const_cast<zmq_msg_t*>(&_msg) ); }
void mark_sent() { _sent = true; }
zmq_msg_t& msg() { return _msg; }
// Move operators
frame(frame&& other);
frame& operator=(frame&& other);
frame copy() const;
private:
bool _sent;
zmq_msg_t _msg;
// Disable implicit copy support, code must request a copy to clone
frame(frame const&) NOEXCEPT ZMQPP_EXPLICITLY_DELETED;
frame& operator=(frame const&) NOEXCEPT ZMQPP_EXPLICITLY_DELETED;
};
} // namespace zmqpp
#endif /* ZMQPP_MESSAGE_FRAME_HPP_ */

View File

@@ -1,175 +0,0 @@
/**
* \file
*
* \date 10 Aug 2011
* \author Ben Gray (\@benjamg)
*/
#ifndef ZMQPP_INET_HPP_
#define ZMQPP_INET_HPP_
/** \todo cross-platform version of including headers. */
// We get htons and htonl from here
#ifdef _WIN32
#include <WinSock2.h>
#else
#include <netinet/in.h>
#endif
#include "compatibility.hpp"
namespace zmqpp
{
/*!
* \brief Possible byte order types.
*
* An enumeration of all the known order types, all two of them.
* There is also an entry for unknown which is just used as a default.
*/
ZMQPP_COMPARABLE_ENUM order {
big_endian, /*!< byte order is big endian */
little_endian /*!< byte order is little endian */
};
/*!
* Common code for the 64bit versions of htons/htons and ntohs/ntohl
*
* As htons and ntohs (or htonl and ntohs) always just do the same thing, ie
* swap bytes if the host order differs from network order or otherwise don't
* do anything, it seemed silly to type the code twice.
*
* \note This code assumes network order is always big endian. Which it is.
* \note The host endian is only checked once and afterwards assumed to remain
* the same.
*
* \param value_to_check unsigned 64 bit integer to swap
* \return swapped (or not) unsigned 64 bit integer
*/
inline uint64_t swap_if_needed(uint64_t const value_to_check)
{
static order host_order = (htonl(42) == 42) ? order::big_endian : order::little_endian;
if (order::big_endian == host_order)
{
return value_to_check;
}
union {
uint64_t integer;
uint8_t bytes[8];
} value { value_to_check };
std::swap(value.bytes[0], value.bytes[7]);
std::swap(value.bytes[1], value.bytes[6]);
std::swap(value.bytes[2], value.bytes[5]);
std::swap(value.bytes[3], value.bytes[4]);
return value.integer;
}
/*!
* 64 bit version of the htons/htonl
*
* I've used the name htonll to try and keep with the htonl naming scheme.
*
* \param hostlonglong unsigned 64 bit host order integer
* \return unsigned 64 bit network order integer
*/
#ifndef htonll
inline uint64_t htonll(uint64_t const hostlonglong)
{
return zmqpp::swap_if_needed(hostlonglong);
}
#endif
/*!
* 64 bit version of the ntohs/ntohl
*
* I've used the name htonll to try and keep with the htonl naming scheme.
*
* \param networklonglong unsigned 64 bit network order integer
* \return unsigned 64 bit host order integer
*/
#ifndef ntohll
inline uint64_t ntohll(uint64_t const networklonglong)
{
return zmqpp::swap_if_needed(networklonglong);
}
#endif
/*!
* floating point version of the htons/htonl
*
* \param value host order floating point
* \returns network order floating point
*/
inline float htonf(float value)
{
assert(sizeof(float) == sizeof(uint32_t));
uint32_t temp;
memcpy(&temp, &value, sizeof(uint32_t));
temp = htonl( temp );
memcpy(&value, &temp, sizeof(uint32_t));
return value;
}
/*!
* floating point version of the ntohs/ntohl
*
* \param value network order float
* \returns host order float
*/
inline float ntohf(float value)
{
assert(sizeof(float) == sizeof(uint32_t));
uint32_t temp;
memcpy(&temp, &value, sizeof(uint32_t));
temp = ntohl( temp );
memcpy(&value, &temp, sizeof(uint32_t));
return value;
}
/*!
* double precision floating point version of the htons/htonl
*
* \param value host order double precision floating point
* \returns network order double precision floating point
*/
inline double htond(double value)
{
assert(sizeof(double) == sizeof(uint64_t));
uint64_t temp;
memcpy(&temp, &value, sizeof(uint64_t));
temp = htonll(temp);
memcpy(&value, &temp, sizeof(uint64_t));
return value;
}
/*!
* double precision floating point version of the ntohs/ntohl
*
* \param value network order double precision floating point
* \returns host order double precision floating point
*/
inline double ntohd(double value)
{
assert(sizeof(double) == sizeof(uint64_t));
uint64_t temp;
memcpy(&temp, &value, sizeof(uint64_t));
temp = ntohll(temp);
memcpy(&value, &temp, sizeof(uint64_t));
return value;
}
}
#endif /* INET_HPP_ */

View File

@@ -1,454 +0,0 @@
/*
* Created on: 9 Aug 2011
* Author: Ben Gray (@benjamg)
*/
#include <cassert>
#include <cstring>
#include "exception.hpp"
#include "inet.hpp"
#include "message.hpp"
namespace zmqpp
{
/*!
* \brief internal construct
* \internal handles bubbling callback from zmq c style to the c++ functor provided
*/
struct callback_releaser
{
message::release_function func;
};
message::message()
: _parts()
, _read_cursor(0)
{
}
message::~message()
{
_parts.clear();
}
size_t message::parts() const
{
return _parts.size();
}
/*
* The two const_casts in size and raw_data are a little bit hacky
* but neither of these methods called this way actually modify data
* so accurately represent the intent of these calls.
*/
size_t message::size(size_t const part /* = 0 */) const
{
if(part >= _parts.size())
{
throw exception("attempting to request a message part outside the valid range");
}
return _parts[part].size();
}
void const* message::raw_data(size_t const part /* = 0 */) const
{
if(part >= _parts.size())
{
throw exception("attempting to request a message part outside the valid range");
}
return _parts[part].data();
}
zmq_msg_t& message::raw_msg(size_t const part /* = 0 */)
{
if(part >= _parts.size())
{
throw exception("attempting to request a message part outside the valid range");
}
return _parts[part].msg();
}
zmq_msg_t& message::raw_new_msg()
{
_parts.push_back( frame() );
return _parts.back().msg();
}
zmq_msg_t& message::raw_new_msg(size_t const reserve_data_size)
{
_parts.push_back( frame(reserve_data_size) );
return _parts.back().msg();
}
std::string message::get(size_t const part /* = 0 */) const
{
return std::string(static_cast<char const*>(raw_data(part)), size(part));
}
// Move operators will take ownership of message parts without copying
void message::move(void* part, size_t const size, release_function const& release)
{
callback_releaser* hint = new callback_releaser();
hint->func = release;
_parts.push_back( frame( part, size, &message::release_callback, hint ) );
}
// Stream reader style
void message::reset_read_cursor()
{
_read_cursor = 0;
}
void message::get(int8_t& integer, size_t const part) const
{
assert(sizeof(int8_t) == size(part));
int8_t const* byte = static_cast<int8_t const*>(raw_data(part));
integer = *byte;
}
void message::get(int16_t& integer, size_t const part) const
{
assert(sizeof(int16_t) == size(part));
uint16_t const* network_order = static_cast<uint16_t const*>(raw_data(part));
integer = static_cast<int16_t>(ntohs(*network_order));
}
void message::get(int32_t& integer, size_t const part) const
{
assert(sizeof(int32_t) == size(part));
uint32_t const* network_order = static_cast<uint32_t const*>(raw_data(part));
integer = static_cast<int32_t>(htonl(*network_order));
}
void message::get(int64_t& integer, size_t const part) const
{
assert(sizeof(int64_t) == size(part));
uint64_t const* network_order = static_cast<uint64_t const*>(raw_data(part));
integer = static_cast<int64_t>(htonll(*network_order));
}
void message::get(uint8_t& unsigned_integer, size_t const part) const
{
assert(sizeof(uint8_t) == size(part));
uint8_t const* byte = static_cast<uint8_t const*>(raw_data(part));
unsigned_integer = *byte;
}
void message::get(uint16_t& unsigned_integer, size_t const part) const
{
assert(sizeof(uint16_t) == size(part));
uint16_t const* network_order = static_cast<uint16_t const*>(raw_data(part));
unsigned_integer = ntohs(*network_order);
}
void message::get(uint32_t& unsigned_integer, size_t const part) const
{
assert(sizeof(uint32_t) == size(part));
uint32_t const* network_order = static_cast<uint32_t const*>(raw_data(part));
unsigned_integer = ntohl(*network_order);
}
void message::get(uint64_t& unsigned_integer, size_t const part) const
{
assert(sizeof(uint64_t) == size(part));
uint64_t const* network_order = static_cast<uint64_t const*>(raw_data(part));
unsigned_integer = ntohll(*network_order);
}
void message::get(float& floating_point, size_t const part) const
{
assert(sizeof(float) == size(part));
float const* network_order = static_cast<float const*>(raw_data(part));
floating_point = zmqpp::ntohf(*network_order);
}
void message::get(double& double_precision, size_t const part) const
{
assert(sizeof(double) == size(part));
double const* network_order = static_cast<double const*>(raw_data(part));
double_precision = zmqpp::ntohd(*network_order);
}
void message::get(bool& boolean, size_t const part) const
{
assert(sizeof(uint8_t) == size(part));
uint8_t const* byte = static_cast<uint8_t const*>(raw_data(part));
boolean = (*byte != 0);
}
void message::get(std::string& string, size_t const part) const
{
string.assign( get(part) );
}
// Stream writer style - these all use copy styles
message& message::operator<<(int8_t const integer)
{
add(reinterpret_cast<void const*>(&integer), sizeof(int8_t));
return *this;
}
message& message::operator<<(int16_t const integer)
{
uint16_t network_order = htons(static_cast<uint16_t>(integer));
add(reinterpret_cast<void const*>(&network_order), sizeof(uint16_t));
return *this;
}
message& message::operator<<(int32_t const integer)
{
uint32_t network_order = htonl(static_cast<uint32_t>(integer));
add(reinterpret_cast<void const*>(&network_order), sizeof(uint32_t));
return *this;
}
message& message::operator<<(int64_t const integer)
{
uint64_t network_order = htonll(static_cast<uint64_t>(integer));
add(reinterpret_cast<void const*>(&network_order), sizeof(uint64_t));
return *this;
}
message& message::operator<<(uint8_t const unsigned_integer)
{
add(reinterpret_cast<void const*>(&unsigned_integer), sizeof(uint8_t));
return *this;
}
message& message::operator<<(uint16_t const unsigned_integer)
{
uint16_t network_order = htons(unsigned_integer);
add(reinterpret_cast<void const*>(&network_order), sizeof(uint16_t));
return *this;
}
message& message::operator<<(uint32_t const unsigned_integer)
{
uint32_t network_order = htonl(unsigned_integer);
add(reinterpret_cast<void const*>(&network_order), sizeof(uint32_t));
return *this;
}
message& message::operator<<(uint64_t const unsigned_integer)
{
uint64_t network_order = htonll(unsigned_integer);
add(reinterpret_cast<void const*>(&network_order), sizeof(uint64_t));
return *this;
}
message& message::operator<<(float const floating_point)
{
assert(sizeof(float) == 4);
float network_order = zmqpp::htonf(floating_point);
add(&network_order, sizeof(float));
return *this;
}
message& message::operator<<(double const double_precision)
{
assert(sizeof(double) == 8);
double network_order = zmqpp::htond(double_precision);
add(&network_order, sizeof(double));
return *this;
}
message& message::operator<<(bool const boolean)
{
uint8_t byte = (boolean) ? 1 : 0;
add(reinterpret_cast<void const*>(&byte), sizeof(uint8_t));
return *this;
}
message& message::operator<<(char const* c_string)
{
add(reinterpret_cast<void const*>(c_string), strlen(c_string));
return *this;
}
message& message::operator<<(std::string const& string)
{
add(reinterpret_cast<void const*>(string.data()), string.size());
return *this;
}
void message::push_front(void const* part, size_t const size)
{
_parts.emplace( _parts.begin(), part, size );
}
void message::push_front(int8_t const integer)
{
push_front(&integer, sizeof(int8_t));
}
void message::push_front(int16_t const integer)
{
uint16_t network_order = htons(static_cast<uint16_t>(integer));
push_front(&network_order, sizeof(uint16_t));
}
void message::push_front(int32_t const integer)
{
uint32_t network_order = htonl(static_cast<uint32_t>(integer));
push_front(&network_order, sizeof(uint32_t));
}
void message::push_front(int64_t const integer)
{
uint64_t network_order = htonll(static_cast<uint64_t>(integer));
push_front(&network_order, sizeof(uint64_t));
}
void message::push_front(uint8_t const unsigned_integer)
{
push_front(&unsigned_integer, sizeof(uint8_t));
}
void message::push_front(uint16_t const unsigned_integer)
{
uint16_t network_order = htons(unsigned_integer);
push_front(&network_order, sizeof(uint16_t));
}
void message::push_front(uint32_t const unsigned_integer)
{
uint32_t network_order = htonl(unsigned_integer);
push_front(&network_order, sizeof(uint32_t));
}
void message::push_front(uint64_t const unsigned_integer)
{
uint64_t network_order = htonll(unsigned_integer);
push_front(&network_order, sizeof(uint64_t));
}
void message::push_front(float const floating_point)
{
assert(sizeof(float) == 4);
float network_order = zmqpp::htonf(floating_point);
push_front(&network_order, sizeof(float));
}
void message::push_front(double const double_precision)
{
assert(sizeof(double) == 8);
double network_order = zmqpp::htond(double_precision);
push_front(&network_order, sizeof(double));
}
void message::push_front(bool const boolean)
{
uint8_t byte = (boolean) ? 1 : 0;
push_front(&byte, sizeof(uint8_t));
}
void message::push_front(char const* c_string)
{
push_front(c_string, strlen(c_string));
}
void message::push_front(std::string const& string)
{
push_front(string.data(), string.size());
}
void message::pop_front()
{
_parts.erase( _parts.begin() );
}
void message::pop_back()
{
_parts.pop_back();
}
message::message(message&& source) NOEXCEPT
: _parts()
, _read_cursor(0)
{
std::swap(_parts, source._parts);
}
message& message::operator=(message&& source) NOEXCEPT
{
std::swap(_parts, source._parts);
return *this;
}
message message::copy() const
{
message msg;
msg.copy(*this);
return msg;
}
void message::copy(message const& source)
{
_parts.resize( source._parts.size() );
for(size_t i = 0; i < source._parts.size(); ++i)
{
_parts[i] = source._parts[i].copy();
}
// we don't need a copy of the releasers as we did data copies of the internal data,
//_releasers = source._releasers;
//_strings = source._strings
}
// Used for internal tracking
void message::sent(size_t const part)
{
// sanity check
assert(!_parts[part].is_sent());
_parts[part].mark_sent();
}
// Note that these releasers are not thread safe, the only safety is provided by
// the socket class taking ownership so no updates can happen while zmq does it's thing
// If used in a custom class this has to be dealt with.
void message::release_callback(void* data, void* hint)
{
callback_releaser* releaser = static_cast<callback_releaser*>(hint);
releaser->func(data);
delete releaser;
}
}

View File

@@ -1,254 +0,0 @@
/**
* \file
*
* \date 9 Aug 2011
* \author Ben Gray (\@benjamg)
*/
#ifndef ZMQPP_MESSAGE_HPP_
#define ZMQPP_MESSAGE_HPP_
#include <functional>
#include <string>
#include <unordered_map>
#include <vector>
#include <utility>
#include <cassert>
#include <zmq.h>
#include "compatibility.hpp"
#include "frame.hpp"
namespace zmqpp
{
/*!
* \brief a zmq message with optional multipart support
*
* A zmq message is made up of one or more parts which are sent together to
* the target endpoints. zmq guarantees either the whole message or none
* of the message will be delivered.
*/
class message
{
public:
/*!
* \brief callback to release user allocated data.
*
* The release function will be called on any void* moved part.
* It must be thread safe to the extent that the callback may occur on
* one of the context threads.
*
* The function called will be passed a single variable which is the
* pointer to the memory allocated.
*/
typedef std::function<void (void*)> release_function;
message();
~message();
template <typename T, typename ...Args>
message(T const &part, Args &&...args)
: message()
{
add(part, std::forward<Args>(args)...);
}
size_t parts() const;
size_t size(size_t const part) const;
std::string get(size_t const part) const;
void get(int8_t& integer, size_t const part) const;
void get(int16_t& integer, size_t const part) const;
void get(int32_t& integer, size_t const part) const;
void get(int64_t& integer, size_t const part) const;
void get(uint8_t& unsigned_integer, size_t const part) const;
void get(uint16_t& unsigned_integer, size_t const part) const;
void get(uint32_t& unsigned_integer, size_t const part) const;
void get(uint64_t& unsigned_integer, size_t const part) const;
void get(float& floating_point, size_t const part) const;
void get(double& double_precision, size_t const part) const;
void get(bool& boolean, size_t const part) const;
void get(std::string& string, size_t const part) const;
// Warn: If a pointer type is requested the message (well zmq) still 'owns'
// the data and will release it when the message object is freed.
template<typename Type>
Type get(size_t const part)
{
Type value;
get(value, part);
return value;
}
template<int part=0, typename T, typename ...Args>
void extract(T &nextpart, Args &...args)
{
assert(part < parts());
get(nextpart,part);
extract<part+1>(args...);
}
template<int part=0, typename T>
void extract(T &nextpart)
{
assert(part < parts());
get(nextpart,part);
}
// Raw get data operations, useful with data structures more than anything else
// Warn: The message (well zmq) still 'owns' the data and will release it
// when the message object is freed.
template<typename Type>
void get(Type*& value, size_t const part) const
{
value = static_cast<Type*>(raw_data(part));
}
// Warn: The message (well zmq) still 'owns' the data and will release it
// when the message object is freed.
template<typename Type>
void get(Type** value, size_t const part) const
{
*value = static_cast<Type*>(raw_data(part));
}
// Move operators will take ownership of message parts without copying
void move(void* part, size_t const size, release_function const& release);
// Raw move data operation, useful with data structures more than anything else
template<typename Object>
void move(Object *part)
{
move(part, sizeof(Object), &deleter_callback<Object>);
}
// Copy operators will take copies of any data
template<typename Type>
void add(Type *part, size_t const size)
{
_parts.push_back( frame( part, size ) );
}
template<typename Type, typename ...Args>
void add(Type const& part, Args &&...args)
{
*this << part;
add(std::forward<Args>(args)...);
}
template<typename Type>
void add(Type const part)
{
*this << part;
}
// Stream reader style
void reset_read_cursor();
template<typename Type>
message& operator>>(Type& value)
{
get(value, _read_cursor++);
return *this;
}
// Stream writer style - these all use copy styles
message& operator<<(int8_t const integer);
message& operator<<(int16_t const integer);
message& operator<<(int32_t const integer);
message& operator<<(int64_t const integer);
message& operator<<(uint8_t const unsigned_integer);
message& operator<<(uint16_t const unsigned_integer);
message& operator<<(uint32_t const unsigned_integer);
message& operator<<(uint64_t const unsigned_integer);
message& operator<<(float const floating_point);
message& operator<<(double const double_precision);
message& operator<<(bool const boolean);
message& operator<<(char const* c_string);
message& operator<<(std::string const& string);
// Queue manipulation
void push_front(void const* part, size_t const size);
// TODO: unify conversion of types with the stream operators
void push_front(int8_t const integer);
void push_front(int16_t const integer);
void push_front(int32_t const integer);
void push_front(int64_t const integer);
void push_front(uint8_t const unsigned_integer);
void push_front(uint16_t const unsigned_integer);
void push_front(uint32_t const unsigned_integer);
void push_front(uint64_t const unsigned_integer);
void push_front(float const floating_point);
void push_front(double const double_precision);
void push_front(bool const boolean);
void push_front(char const* c_string);
void push_front(std::string const& string);
void pop_front();
void push_back(void const* part, size_t const size)
{
add( part, size );
}
template<typename Type>
void push_back(Type const part)
{
*this << part;
}
void pop_back();
void remove(size_t const part);
// Move supporting
message(message&& source) NOEXCEPT;
message& operator=(message&& source) NOEXCEPT;
// Copy support
message copy() const;
void copy(message const& source);
// Used for internal tracking
void sent(size_t const part);
// Access to raw zmq details
void const* raw_data(size_t const part = 0) const;
zmq_msg_t& raw_msg(size_t const part = 0);
zmq_msg_t& raw_new_msg();
zmq_msg_t& raw_new_msg(size_t const reserve_data_size);
private:
typedef std::vector<frame> parts_type;
parts_type _parts;
size_t _read_cursor;
// Disable implicit copy support, code must request a copy to clone
message(message const&) NOEXCEPT ZMQPP_EXPLICITLY_DELETED;
message& operator=(message const&) NOEXCEPT ZMQPP_EXPLICITLY_DELETED;
static void release_callback(void* data, void* hint);
template<typename Object>
static void deleter_callback(void* data)
{
delete static_cast<Object*>(data);
}
};
}
#endif /* ZMQPP_MESSAGE_HPP_ */

View File

@@ -1,182 +0,0 @@
/*
* Created on: 16 Aug 2011
* Author: Ben Gray (@benjamg)
*/
#include "exception.hpp"
#include "socket.hpp"
#include "poller.hpp"
#include <zmq.h>
namespace zmqpp
{
const long poller::wait_forever = -1;
const short poller::poll_none = 0;
const short poller::poll_in = ZMQ_POLLIN;
const short poller::poll_out = ZMQ_POLLOUT;
const short poller::poll_error = ZMQ_POLLERR;
poller::poller()
: _items()
, _index()
, _fdindex()
{
}
poller::~poller()
{
_items.clear();
_index.clear();
_fdindex.clear();
}
void poller::add(socket& socket, short const event /* = POLL_IN */)
{
zmq_pollitem_t item { socket, 0, event, 0 };
size_t index = _items.size();
_items.push_back(item);
_index[socket] = index;
}
void poller::add(int const descriptor, short const event /* = POLL_IN */)
{
zmq_pollitem_t item { nullptr, descriptor, event, 0 };
size_t index = _items.size();
_items.push_back(item);
_fdindex[descriptor] = index;
}
bool poller::has(socket_t const& socket)
{
return _index.find(socket) != _index.end();
}
bool poller::has(int const descriptor)
{
return _fdindex.find(descriptor) != _fdindex.end();
}
void poller::reindex(size_t const index)
{
if ( nullptr != _items[index].socket )
{
auto found = _index.find( _items[index].socket );
if (_index.end() == found) { throw exception("unable to reindex socket in poller"); }
found->second = index;
}
else
{
auto found = _fdindex.find( _items[index].fd );
if (_fdindex.end() == found) { throw exception("unable to reindex file descriptor in poller"); }
found->second = index;
}
}
void poller::remove(socket_t const& socket)
{
auto found = _index.find(socket);
if (_index.end() == found) { return; }
if ( _items.size() - 1 == found->second )
{
_items.pop_back();
_index.erase(found);
return;
}
std::swap(_items[found->second], _items.back());
_items.pop_back();
auto index = found->second;
_index.erase(found);
reindex( index );
}
void poller::remove(int const descriptor)
{
auto found = _fdindex.find(descriptor);
if (_fdindex.end() == found) { return; }
if ( _items.size() - 1 == found->second )
{
_items.pop_back();
_fdindex.erase(found);
return;
}
std::swap(_items[found->second], _items.back());
_items.pop_back();
auto index = found->second;
_fdindex.erase(found);
reindex( index );
}
void poller::check_for(socket const& socket, short const event)
{
auto found = _index.find(socket);
if (_index.end() == found)
{
throw exception("this socket is not represented within this poller");
}
_items[found->second].events = event;
}
void poller::check_for(int const descriptor, short const event)
{
auto found = _fdindex.find(descriptor);
if (_fdindex.end() == found)
{
throw exception("this socket is not represented within this poller");
}
_items[found->second].events = event;
}
bool poller::poll(long timeout /* = WAIT_FOREVER */)
{
int result = zmq_poll(_items.data(), _items.size(), timeout);
if (result < 0)
{
if(EINTR == zmq_errno())
{
return false;
}
throw zmq_internal_exception();
}
return (result > 0);
}
short poller::events(socket const& socket) const
{
auto found = _index.find(socket);
if (_index.end() == found)
{
throw exception("this socket is not represented within this poller");
}
return _items[found->second].revents;
}
short poller::events(int const descriptor) const
{
auto found = _fdindex.find(descriptor);
if (_fdindex.end() == found)
{
throw exception("this file descriptor is not represented within this poller");
}
return _items[found->second].revents;
}
}

View File

@@ -1,186 +0,0 @@
/**
* \file
*
* \date 9 Aug 2011
* \author Ben Gray (\@benjamg)
*/
#ifndef ZMQPP_POLLER_HPP_
#define ZMQPP_POLLER_HPP_
#include <unordered_map>
#include <vector>
#include "compatibility.hpp"
namespace zmqpp
{
class socket;
typedef socket socket_t;
/*!
* Polling wrapper.
*
* Allows access to polling for any number of sockets or file descriptors.
*/
class poller
{
public:
static const long wait_forever; /*!< Block forever flag, default setting. */
static const short poll_none; /*!< No polling flags set. */
static const short poll_in; /*!< Monitor inbound flag. */
static const short poll_out; /*!< Monitor output flag. */
static const short poll_error; /*!< Monitor error flag.\n Only for file descriptors. */
/*!
* Construct an empty polling model.
*/
poller();
/*!
* Cleanup poller.
*
* Any sockets will need to be closed separately.
*/
~poller();
/*!
* Add a socket to the polling model and set which events to monitor.
*
* \param socket the socket to monitor.
* \param event the event flags to monitor on the socket.
*/
void add(socket_t& socket, short const event = poll_in);
/*!
* Add a file descriptor to the polling model and set which events to monitor.
*
* \param descriptor the file descriptor to monitor.
* \param event the event flags to monitor.
*/
void add(int const descriptor, short const event = poll_in | poll_error);
/*!
* Check if we are monitoring a given socket with this poller.
*
* \param socket the socket to check.
* \return true if it is there.
*/
bool has(socket_t const& socket);
/*!
* Check if we are monitoring a given file descriptor with this poller.
*
* \param descriptor the file descriptor to check.
* \return true if it is there.
*/
bool has(int const descriptor);
/*!
* Stop monitoring a socket.
*
* \param socket the socket to stop monitoring.
*/
void remove(socket_t const& socket);
/*!
* Stop monitoring a file descriptor.
*
* \param descriptor the file descriptor to stop monitoring.
*/
void remove(int const descriptor);
/*!
* Update the monitored event flags for a given socket.
*
* \param socket the socket to update event flags.
* \param event the event flags to monitor on the socket.
*/
void check_for(socket_t const& socket, short const event);
/*!
* Update the monitored event flags for a given file descriptor.
*
* \param descriptor the file descriptor to update event flags.
* \param event the event flags to monitor on the socket.
*/
void check_for(int const descriptor, short const event);
/*!
* Poll for monitored events.
*
* By default this method will block forever or until at least one of the monitored
* sockets or file descriptors has events.
*
* If a timeout is set and was reached then this function returns false.
*
* \param timeout milliseconds to timeout.
* \return true if there is an event..
*/
bool poll(long timeout = wait_forever);
/*!
* Get the event flags triggered for a socket.
*
* \param socket the socket to get triggered event flags for.
* \return the event flags.
*/
short events(socket_t const& socket) const;
/*!
* Get the event flags triggered for a file descriptor.
*
* \param descriptor the file descriptor to get triggered event flags for.
* \return the event flags.
*/
short events(int const descriptor) const;
/*!
* Check either a file descriptor or socket for input events.
*
* Templated helper method that calls through to event and checks for a given flag
*
* \param watchable either a file descriptor or socket known to the poller.
* \return true if there is input.
*/
template<typename Watched>
bool has_input(Watched const& watchable) const { return events(watchable) & poll_in; }
/*!
* Check either a file descriptor or socket for output events.
*
* Templated helper method that calls through to event and checks for a given flag
*
* \param watchable either a file descriptor or socket known to the poller.
* \return true if there is output.
*/
template<typename Watched>
bool has_output(Watched const& watchable) const { return events(watchable) & poll_out; }
/*!
* Check a file descriptor.
*
* Templated helper method that calls through to event and checks for a given flag
*
* Technically this template works for sockets as well but the error flag is never set for
* sockets so I have no idea why someone would call it.
*
* \param watchable a file descriptor know to the poller.
* \return true if there is an error.
*/
template<typename Watched>
bool has_error(Watched const& watchable) const { return events(watchable) & poll_error; }
private:
std::vector<zmq_pollitem_t> _items;
std::unordered_map<void *, size_t> _index;
std::unordered_map<int, size_t> _fdindex;
void reindex(size_t const index);
};
}
#endif /* ZMQPP_POLLER_HPP_ */

View File

@@ -1,762 +0,0 @@
/*
* Created on: 9 Aug 2011
* Author: Ben Gray (@benjamg)
*/
#include <array>
#include <cassert>
#include <cstring>
#include <functional>
#include "context.hpp"
#include "exception.hpp"
#include "message.hpp"
#include "socket.hpp"
namespace zmqpp
{
const int socket::normal = 0;
#if (ZMQ_VERSION_MAJOR == 2)
const int socket::dont_wait = ZMQ_NOBLOCK;
#else
const int socket::dont_wait = ZMQ_DONTWAIT;
#endif
const int socket::send_more = ZMQ_SNDMORE;
#ifdef ZMQ_EXPERIMENTAL_LABELS
const int socket::send_label = ZMQ_SNDLABEL;
#endif
const int max_socket_option_buffer_size = 256;
const int max_stream_buffer_size = 4096;
socket::socket(const context& context, socket_type const type)
: _socket(nullptr)
, _type(type)
, _recv_buffer()
{
_socket = zmq_socket(context, static_cast<int>(type));
if(nullptr == _socket)
{
throw zmq_internal_exception();
}
zmq_msg_init(&_recv_buffer);
}
socket::~socket()
{
zmq_msg_close(&_recv_buffer);
if (nullptr != _socket)
{
#ifndef NDEBUG // unused assert variable in release
int result = zmq_close(_socket);
assert(0 == result);
#else
zmq_close(_socket);
#endif // NDEBUG
_socket = nullptr;
}
}
void socket::bind(endpoint_t const& endpoint)
{
int result = zmq_bind(_socket, endpoint.c_str());
if (0 != result)
{
throw zmq_internal_exception();
}
}
void socket::unbind(endpoint_t const& endpoint)
{
#if (ZMQ_VERSION_MAJOR > 3 || (ZMQ_VERSION_MAJOR == 3 && ZMQ_VERSION_MINOR >= 2))
int result = zmq_unbind(_socket, endpoint.c_str());
if (0 != result)
{
throw zmq_internal_exception();
}
#endif
}
void socket::connect(endpoint_t const& endpoint)
{
int result = zmq_connect(_socket, endpoint.c_str());
if (0 != result)
{
throw zmq_internal_exception();
}
}
void socket::disconnect(endpoint_t const& endpoint)
{
#if (ZMQ_VERSION_MAJOR > 3 || (ZMQ_VERSION_MAJOR == 3 && ZMQ_VERSION_MINOR >= 2))
int result = zmq_disconnect(_socket, endpoint.c_str());
if (0 != result)
{
throw zmq_internal_exception();
}
#endif
}
void socket::close()
{
int result = zmq_close(_socket);
if (0 != result)
{
throw zmq_internal_exception();
}
_socket = nullptr;
}
bool socket::send(message& message, bool const dont_block /* = false */)
{
size_t parts = message.parts();
if (parts == 0)
{
throw std::invalid_argument("sending requires messages have at least one part");
}
bool dont_wait = dont_block;
for(size_t i = 0; i < parts; ++i)
{
int flag = socket::normal;
if(dont_wait) { flag |= socket::dont_wait; }
if(i < (parts - 1)) { flag |= socket::send_more; }
#if (ZMQ_VERSION_MAJOR == 2)
int result = zmq_send( _socket, &message.raw_msg(i), flag );
#elif (ZMQ_VERSION_MAJOR < 3) || ((ZMQ_VERSION_MAJOR == 3) && (ZMQ_VERSION_MINOR < 2))
int result = zmq_sendmsg( _socket, &message.raw_msg(i), flag );
#else
int result = zmq_msg_send( &message.raw_msg(i), _socket, flag );
#endif
if (result < 0)
{
// the zmq framework should not block if the first part is accepted
// so we should only ever get this error on the first part
if((0 == i) && (EAGAIN == zmq_errno()))
{
return false;
}
if(EINTR == zmq_errno())
{
if (0 == message.parts())
{
return false;
}
// If we have an interrupt but it's not on the first part then we
// know we can safely send out the rest of the message as we can
// enforce that it won't wait on a blocking action
dont_wait = true;
continue;
}
// sanity checking
assert(EAGAIN != zmq_errno());
throw zmq_internal_exception();
}
message.sent(i);
}
// Leave message reference in a stable state
zmqpp::message local;
std::swap(local, message);
return true;
}
bool socket::receive(message& message, bool const dont_block /* = false */)
{
if (message.parts() > 0)
{
// swap and discard old message
zmqpp::message local;
std::swap(local, message);
}
int flags = (dont_block) ? socket::dont_wait : socket::normal;
bool more = true;
while(more)
{
#if (ZMQ_VERSION_MAJOR == 2)
int result = zmq_recv( _socket, &_recv_buffer, flags );
#elif (ZMQ_VERSION_MAJOR < 3) || ((ZMQ_VERSION_MAJOR == 3) && (ZMQ_VERSION_MINOR < 2))
int result = zmq_recvmsg( _socket, &_recv_buffer, flags );
#else
int result = zmq_msg_recv( &_recv_buffer, _socket, flags );
#endif
if(result < 0)
{
if ((0 == message.parts()) && (EAGAIN == zmq_errno()))
{
return false;
}
if(EINTR == zmq_errno())
{
if (0 == message.parts())
{
return false;
}
// If we have an interrupt but it's not on the first part then we
// know we can safely pull out the rest of the message as it will
// not be blocking
continue;
}
assert(EAGAIN != zmq_errno());
throw zmq_internal_exception();
}
zmq_msg_t& dest = message.raw_new_msg();
zmq_msg_move(&dest, &_recv_buffer);
get(socket_option::receive_more, more);
}
return true;
}
bool socket::send(std::string const& string, int const flags /* = NORMAL */)
{
return send_raw(string.data(), string.size(), flags);
}
bool socket::receive(std::string& string, int const flags /* = NORMAL */)
{
#if (ZMQ_VERSION_MAJOR == 2)
int result = zmq_recv( _socket, &_recv_buffer, flags );
#elif (ZMQ_VERSION_MAJOR < 3) || ((ZMQ_VERSION_MAJOR == 3) && (ZMQ_VERSION_MINOR < 2))
int result = zmq_recvmsg( _socket, &_recv_buffer, flags );
#else
int result = zmq_msg_recv( &_recv_buffer, _socket, flags );
#endif
if(result >= 0)
{
string.reserve(zmq_msg_size(&_recv_buffer));
string.assign(static_cast<char*>(zmq_msg_data(&_recv_buffer)), zmq_msg_size(&_recv_buffer));
return true;
}
if (EAGAIN == zmq_errno() || EINTR == zmq_errno())
{
return false;
}
throw zmq_internal_exception();
}
bool socket::send_raw(char const* buffer, int const length, int const flags /* = NORMAL */)
{
#if (ZMQ_VERSION_MAJOR == 2)
zmq_msg_t msg;
int result = zmq_msg_init_size(&msg, length);
if (result != 0)
{
zmq_internal_exception();
}
memcpy(zmq_msg_data(&msg), buffer, length);
result = zmq_send(_socket, &msg, flags);
#else
int result = zmq_send(_socket, buffer, length, flags);
#endif
if(result >= 0)
{
return true;
}
#if (ZMQ_VERSION_MAJOR == 2)
// only actually need to close this on error
zmq_msg_close(&msg);
#endif
if (EAGAIN == zmq_errno() || EINTR == zmq_errno())
{
return false;
}
throw zmq_internal_exception();
}
bool socket::receive_raw(char* buffer, int& length, int const flags /* = NORMAL */)
{
#if (ZMQ_VERSION_MAJOR == 2)
int result = zmq_recv( _socket, &_recv_buffer, flags );
#elif (ZMQ_VERSION_MAJOR < 3) || ((ZMQ_VERSION_MAJOR == 3) && (ZMQ_VERSION_MINOR < 2))
int result = zmq_recvmsg( _socket, &_recv_buffer, flags );
#else
int result = zmq_msg_recv( &_recv_buffer, _socket, flags );
#endif
if(result >= 0)
{
length = zmq_msg_size(&_recv_buffer);
memcpy(buffer, zmq_msg_data(&_recv_buffer), length);
return true;
}
if (EAGAIN == zmq_errno() || EINTR == zmq_errno())
{
return false;
}
throw zmq_internal_exception();
}
// Helper
void socket::subscribe(std::string const& topic)
{
set(socket_option::subscribe, topic);
}
void socket::unsubscribe(std::string const& topic)
{
set(socket_option::unsubscribe, topic);
}
bool socket::has_more_parts() const
{
return get<bool>(socket_option::receive_more);
}
// Set socket options for different types of option
void socket::set(socket_option const option, int const value)
{
switch(option)
{
// unsigned 64bit Integers
#if (ZMQ_VERSION_MAJOR == 2)
case socket_option::high_water_mark:
case socket_option::send_buffer_size:
case socket_option::receive_buffer_size:
#endif
case socket_option::affinity:
if (value < 0) { throw exception("attempting to set an unsigned 64 bit integer option with a negative integer"); }
set(option, static_cast<uint64_t>(value));
break;
// 64bit Integers
#if (ZMQ_VERSION_MAJOR == 2)
case socket_option::rate:
case socket_option::recovery_interval:
case socket_option::recovery_interval_seconds:
case socket_option::swap_size:
#else
case socket_option::max_messsage_size:
#endif
set(option, static_cast<int64_t>(value));
break;
// Boolean
#if (ZMQ_VERSION_MAJOR > 3) || ((ZMQ_VERSION_MAJOR == 3) && (ZMQ_VERSION_MINOR >= 1))
case socket_option::ipv4_only:
#endif
#if (ZMQ_VERSION_MAJOR == 2)
case socket_option::multicast_loopback:
#endif
#if (ZMQ_VERSION_MAJOR > 3) || ((ZMQ_VERSION_MAJOR == 3) && (ZMQ_VERSION_MINOR >= 2))
#if (ZMQ_VERSION_MINOR == 2)
case socket_option::delay_attach_on_connect:
#else
case socket_option::immediate:
#endif
case socket_option::router_mandatory:
case socket_option::xpub_verbose:
#endif
if (value == 0) { set(option, false); }
else if (value == 1) { set(option, true); }
else { throw exception("attempting to set a boolean option with a non 0 or 1 integer"); }
break;
// Default or Boolean
#if (ZMQ_VERSION_MAJOR > 3) || ((ZMQ_VERSION_MAJOR == 3) && (ZMQ_VERSION_MINOR >= 2))
case socket_option::tcp_keepalive:
if (value < -1 || value > 1) { throw exception("attempting to set a default or boolean option with a non -1, 0 or 1 integer"); }
if (0 != zmq_setsockopt(_socket, static_cast<int>(option), &value, sizeof(value)))
{
throw zmq_internal_exception();
}
break;
#endif
// Integers that require +ve numbers
#if (ZMQ_VERSION_MAJOR == 2)
case socket_option::reconnect_interval_max:
#else
case socket_option::reconnect_interval_max:
case socket_option::send_buffer_size:
case socket_option::recovery_interval:
case socket_option::receive_buffer_size:
case socket_option::send_high_water_mark:
case socket_option::receive_high_water_mark:
case socket_option::multicast_hops:
case socket_option::rate:
#endif
case socket_option::backlog:
if (value < 0) { throw exception("attempting to set a positive only integer option with a negative integer"); }
// Integers
case socket_option::reconnect_interval:
case socket_option::linger:
case socket_option::receive_timeout:
case socket_option::send_timeout:
#if (ZMQ_VERSION_MAJOR > 3) || ((ZMQ_VERSION_MAJOR == 3) && (ZMQ_VERSION_MINOR >= 2))
case socket_option::tcp_keepalive_idle:
case socket_option::tcp_keepalive_count:
case socket_option::tcp_keepalive_interval:
#endif
if (0 != zmq_setsockopt(_socket, static_cast<int>(option), &value, sizeof(value)))
{
throw zmq_internal_exception();
}
break;
default:
throw exception("attempting to set a non signed integer option with a signed integer value");
}
}
void socket::set(socket_option const option, bool const value)
{
switch(option)
{
#if (ZMQ_VERSION_MAJOR == 2)
case socket_option::multicast_loopback:
#endif
#if (ZMQ_VERSION_MAJOR > 3) || ((ZMQ_VERSION_MAJOR == 3) && (ZMQ_VERSION_MINOR >= 1))
case socket_option::ipv4_only:
#endif
#if (ZMQ_VERSION_MAJOR > 3) || ((ZMQ_VERSION_MAJOR == 3) && (ZMQ_VERSION_MINOR >= 2))
#if (ZMQ_VERSION_MINOR == 2)
case socket_option::delay_attach_on_connect:
#else
case socket_option::immediate:
#endif
case socket_option::router_mandatory:
case socket_option::xpub_verbose:
#endif
{
int ivalue = value ? 1 : 0;
if (0 != zmq_setsockopt(_socket, static_cast<int>(option), &ivalue, sizeof(ivalue)))
{
throw zmq_internal_exception();
}
break;
}
default:
throw exception("attempting to set a non boolean option with a boolean value");
}
}
void socket::set(socket_option const option, uint64_t const value)
{
switch(option)
{
#if (ZMQ_VERSION_MAJOR == 2)
// unsigned 64bit Integers
case socket_option::high_water_mark:
case socket_option::send_buffer_size:
case socket_option::receive_buffer_size:
#endif
case socket_option::affinity:
if (0 != zmq_setsockopt(_socket, static_cast<int>(option), &value, sizeof(value)))
{
throw zmq_internal_exception();
}
break;
default:
throw exception("attempting to set a non unsigned 64 bit integer option with a unsigned 64 bit integer value");
}
}
void socket::set(socket_option const option, int64_t const value)
{
switch(option)
{
#if (ZMQ_VERSION_MAJOR == 2)
case socket_option::rate:
case socket_option::recovery_interval:
case socket_option::recovery_interval_seconds:
case socket_option::swap_size:
#else
case socket_option::max_messsage_size:
#endif
// zmq only allowed +ve int64_t options
if (value < 0) { throw exception("attempting to set a positive only 64 bit integer option with a negative 64bit integer"); }
if (0 != zmq_setsockopt(_socket, static_cast<int>(option), &value, sizeof(value)))
{
throw zmq_internal_exception();
}
break;
default:
throw exception("attempting to set a non 64 bit integer option with a 64 bit integer value");
}
}
void socket::set(socket_option const option, char const* value, size_t const length)
{
switch(option)
{
case socket_option::identity:
case socket_option::subscribe:
case socket_option::unsubscribe:
#if (ZMQ_VERSION_MAJOR > 3) || ((ZMQ_VERSION_MAJOR == 3) && (ZMQ_VERSION_MINOR >= 2))
case socket_option::tcp_accept_filter:
#endif
if (0 != zmq_setsockopt(_socket, static_cast<int>(option), value, length))
{
throw zmq_internal_exception();
}
break;
default:
throw exception("attempting to set a non string option with a string value");
}
}
// Get socket options, multiple versions for easy of use
void socket::get(socket_option const option, int& value) const
{
size_t value_size = sizeof(int);
switch(option)
{
#if (ZMQ_VERSION_MAJOR == 2)
case socket_option::receive_more:
case socket_option::multicast_loopback:
value = static_cast<int>(get<int64_t>(option));
break;
#endif
case socket_option::type:
case socket_option::linger:
case socket_option::backlog:
case socket_option::reconnect_interval:
case socket_option::reconnect_interval_max:
case socket_option::receive_timeout:
case socket_option::send_timeout:
case socket_option::file_descriptor:
case socket_option::events:
#if (ZMQ_VERSION_MAJOR > 2)
case socket_option::receive_more:
case socket_option::send_buffer_size:
case socket_option::receive_buffer_size:
case socket_option::rate:
case socket_option::recovery_interval:
case socket_option::send_high_water_mark:
case socket_option::receive_high_water_mark:
case socket_option::multicast_hops:
#endif
#if (ZMQ_VERSION_MAJOR > 3) || ((ZMQ_VERSION_MAJOR == 3) && (ZMQ_VERSION_MINOR >= 1))
case socket_option::ipv4_only:
#endif
#if (ZMQ_VERSION_MAJOR > 3) || ((ZMQ_VERSION_MAJOR == 3) && (ZMQ_VERSION_MINOR >= 2))
#if (ZMQ_VERSION_MINOR == 2)
case socket_option::delay_attach_on_connect:
#else
case socket_option::immediate:
#endif
case socket_option::tcp_keepalive:
case socket_option::tcp_keepalive_idle:
case socket_option::tcp_keepalive_count:
case socket_option::tcp_keepalive_interval:
#endif
#ifdef ZMQ_EXPERIMENTAL_LABELS
case socket_option::receive_label:
#endif
if (0 != zmq_getsockopt(_socket, static_cast<int>(option), &value, &value_size))
{
throw zmq_internal_exception();
}
// sanity check
assert(value_size <= sizeof(int));
break;
default:
throw exception("attempting to get a non integer option with an integer value");
}
}
void socket::get(socket_option const option, bool& value) const
{
#if (ZMQ_VERSION_MAJOR == 2)
int64_t int_value = 0;
size_t value_size = sizeof(int64_t);
#else
int int_value = 0;
size_t value_size = sizeof(int);
#endif
switch(option)
{
case socket_option::receive_more:
#if (ZMQ_VERSION_MAJOR == 2)
case socket_option::multicast_loopback:
#endif
#if (ZMQ_VERSION_MAJOR > 3) || ((ZMQ_VERSION_MAJOR == 3) && (ZMQ_VERSION_MINOR >= 1))
case socket_option::ipv4_only:
#endif
#if (ZMQ_VERSION_MAJOR > 3) || ((ZMQ_VERSION_MAJOR == 3) && (ZMQ_VERSION_MINOR >= 2))
#if (ZMQ_VERSION_MINOR == 2)
case socket_option::delay_attach_on_connect:
#else
case socket_option::immediate:
#endif
#endif
#ifdef ZMQ_EXPERIMENTAL_LABELS
case socket_option::receive_label:
#endif
if (0 != zmq_getsockopt(_socket, static_cast<int>(option), &int_value, &value_size))
{
throw zmq_internal_exception();
}
value = (int_value == 1) ? true : false;
break;
default:
throw exception("attempting to get a non boolean option with a boolean value");
}
}
void socket::get(socket_option const option, uint64_t& value) const
{
size_t value_size = sizeof(uint64_t);
switch(option)
{
#if (ZMQ_VERSION_MAJOR == 2)
case socket_option::high_water_mark:
case socket_option::send_buffer_size:
case socket_option::receive_buffer_size:
#endif
case socket_option::affinity:
if(0 != zmq_getsockopt(_socket, static_cast<int>(option), &value, &value_size))
{
throw zmq_internal_exception();
}
break;
default:
throw exception("attempting to get a non unsigned 64 bit integer option with an unsigned 64 bit integer value");
}
}
void socket::get(socket_option const option, int64_t& value) const
{
size_t value_size = sizeof(int64_t);
switch(option)
{
#if (ZMQ_VERSION_MAJOR == 2)
case socket_option::rate:
case socket_option::recovery_interval:
case socket_option::recovery_interval_seconds:
case socket_option::swap_size:
case socket_option::receive_more:
case socket_option::multicast_loopback:
#else
case socket_option::max_messsage_size:
#endif
if(0 != zmq_getsockopt(_socket, static_cast<int>(option), &value, &value_size))
{
throw zmq_internal_exception();
}
break;
default:
throw exception("attempting to get a non 64 bit integer option with an 64 bit integer value");
}
}
void socket::get(socket_option const option, std::string& value) const
{
static std::array<char, max_socket_option_buffer_size> buffer;
size_t size = max_socket_option_buffer_size;
switch(option)
{
case socket_option::identity:
#if (ZMQ_VERSION_MAJOR > 3) || ((ZMQ_VERSION_MAJOR == 3) && (ZMQ_VERSION_MINOR >= 2))
case socket_option::last_endpoint:
#endif
if(0 != zmq_getsockopt(_socket, static_cast<int>(option), buffer.data(), &size))
{
throw zmq_internal_exception();
}
value.assign(buffer.data(), size);
break;
default:
throw exception("attempting to get a non string option with a string value");
}
}
socket::socket(socket&& source) NOEXCEPT
: _socket(source._socket)
, _type(source._type)
, _recv_buffer()
{
// we steal the zmq_msg_t from the valid socket, we only init our own because it's cheap
// and zmq_msg_move does a valid check
zmq_msg_init(&_recv_buffer);
zmq_msg_move(&_recv_buffer, &source._recv_buffer);
// Clean up source a little, we will handle the deinit, it doesn't need to
source._socket = nullptr;
}
socket& socket::operator=(socket&& source) NOEXCEPT
{
std::swap(_socket, source._socket);
_type = source._type; // just clone?
// we steal the zmq_msg_t from the valid socket, we only init our own because it's cheap
// and zmq_msg_move does a valid check
zmq_msg_init(&_recv_buffer);
zmq_msg_move(&_recv_buffer, &source._recv_buffer);
return *this;
}
socket::operator bool() const
{
return nullptr != _socket;
}
socket::operator void*() const
{
return _socket;
}
void socket::track_message(message const& /* message */, uint32_t const parts, bool& should_delete)
{
if (parts == 0)
{
should_delete = true;
}
}
}

View File

@@ -1,500 +0,0 @@
/**
* \file
*
* \date 9 Aug 2011
* \author Ben Gray (\@benjamg)
*/
#ifndef ZMQPP_SOCKET_HPP_
#define ZMQPP_SOCKET_HPP_
#include <cstring>
#include <string>
#include <list>
#include <zmq.h>
#include "compatibility.hpp"
#include "socket_types.hpp"
#include "socket_options.hpp"
namespace zmqpp
{
class context;
class message;
typedef std::string endpoint_t;
typedef context context_t;
typedef message message_t;
/*!
* The socket class represents the zmq sockets.
*
* A socket can be bound and/or connected to as many endpoints as required
* with the sole exception of a ::pair socket.
*
* The routing is handled by zmq based on the type set.
*
* The bound side of an inproc connection must occur first and inproc can only
* connect to other inproc sockets of the same context.
*
* This class is c++0x move supporting and cannot be copied.
*/
class socket
{
public:
static const int normal; /*!< /brief default send type, no flags set */
#if (ZMQ_VERSION_MAJOR == 2)
static const int dont_wait; /*!< /brief don't block if sending is not currently possible */
#else
static const int dont_wait; /*!< /brief don't block if sending is not currently possible */
#endif
static const int send_more; /*!< /brief more parts will follow this one */
#ifdef ZMQ_EXPERIMENTAL_LABELS
static const int send_label; /*!< /brief this message part is an internal zmq label */
#endif
/*!
* Create a socket for a given type.
*
* \param context the zmq context under which the socket will live
* \param type a valid ::socket_type for the socket
*/
socket(context_t const& context, socket_type const type);
/*!
* This will close any socket still open before returning
*/
~socket();
/*!
* Get the type of the socket, this works on zmqpp types and not the zmq internal types.
* Use the socket::get method if you wish to intergoate the zmq internal ones.
*
* \return the type of the socket
*/
socket_type type() const { return _type; }
/*!
* Asynchronously binds to an endpoint.
*
* \param endpoint the zmq endpoint to bind to
*/
void bind(endpoint_t const& endpoint);
/*!
* Unbinds from a previously bound endpoint.
*
* \param endpoint the zmq endpoint to bind to
*/
void unbind(endpoint_t const& endpoint);
/*!
* Asynchronously connects to an endpoint.
* If the endpoint is not inproc then zmq will happily keep trying
* to connect until there is something there.
*
* Inproc sockets must have a valid target already bound before connection
* will work.
*
* \param endpoint the zmq endpoint to connect to
*/
void connect(endpoint_t const& endpoint);
/*!
* Asynchronously connects to multiple endpoints.
* If the endpoint is not inproc then zmq will happily keep trying
* to connect until there is something there.
*
* Inproc sockets must have a valid target already bound before connection
* will work.
*
* This is a helper function that wraps the single item connect in a loop
*
* \param connections_begin the starting iterator for zmq endpoints.
* \param connections_end the final iterator for zmq endpoints.
*/
template<typename InputIterator>
void connect(InputIterator const& connections_begin, InputIterator const& connections_end)
{
for(InputIterator it = connections_begin; it != connections_end; ++it)
{
connect(*it);
}
}
/*!
* Disconnects a previously connected endpoint.
*
* \param endpoint the zmq endpoint to disconnect from
*/
void disconnect(endpoint_t const& endpoint);
/*!
* Disconnects from multiple previously connected endpoints.
*
* This is a helper function that wraps the single item disconnect in a loop
*
* \param disconnections_begin the starting iterator for zmq endpoints.
* \param disconnections_end the final iterator for zmq endpoints.
*/
template<typename InputIterator>
void disconnect(InputIterator const& disconnections_begin, InputIterator const& disconnections_end)
{
for(InputIterator it = disconnections_begin; it != disconnections_end; ++it)
{
disconnect(*it);
}
}
/*!
* Closes the internal zmq socket and marks this instance
* as invalid.
*/
void close();
/*!
* Sends the message over the connection, this may be a multipart message.
*
* If dont_block is true and we are unable to add a new message then this
* function will return false.
*
* \param message message to send
* \param dont_block boolean to dictate if we wait while sending.
* \return true if message sent, false if it would have blocked
*/
bool send(message_t& message, bool const dont_block = false);
/*!
* Gets a message from the connection, this may be a multipart message.
*
* If dont_block is true and we are unable to get a message then this
* function will return false.
*
* \param message reference to fill with received data
* \param dont_block boolean to dictate if we wait for data.
* \return true if message sent, false if it would have blocked
*/
bool receive(message_t& message, bool const dont_block = false);
/*!
* Sends the byte data held by the string as the next message part.
*
* If the socket::DONT_WAIT flag and we are unable to add a new message to
* socket then this function will return false.
*
* \param string message part to send
* \param flags message send flags
* \return true if message part sent, false if it would have blocked
*/
bool send(std::string const& string, int const flags = normal);
/*!
* If there is a message ready then get the next part as a string
*
* If the socket::DONT_WAIT flag and there is no message ready to receive
* then this function will return false.
*
* \param string message part to receive into
* \param flags message receive flags
* \return true if message part received, false if it would have blocked
*/
bool receive(std::string& string, int const flags = normal);
/*!
* Sends the byte data pointed to by buffer as the next part of the message.
*
* If the socket::DONT_WAIT flag and we are unable to add a new message to
* socket then this function will return false.
*
* \param buffer byte buffer pointer to start writing from
* \param length max length of the buffer
* \param flags message send flags
* \return true if message part sent, false if it would have blocked
*/
bool send_raw(char const* buffer, int const length, int const flags = normal);
/*!
* \warning If the buffer is not large enough for the message part then the
* data will be truncated. The rest of the part is lost forever.
*
* If there is a message ready then get the next part of it as a raw
* byte buffer.
*
* If the socket::DONT_WAIT flag and there is no message ready to receive
* then this function will return false.
*
* \param buffer byte buffer pointer to start writing to
* \param length max length of the buffer
* \param flags message receive flags
* \return true if message part received, false if it would have blocked
*/
bool receive_raw(char* buffer, int& length, int const flags = normal);
/*!
*
* Subscribe to a topic
*
* Helper function that is equivalent of calling
* \code
* set(zmqpp::socket_option::subscribe, topic);
* \endcode
*
* This method is only useful for subscribe type sockets.
*
* \param topic the topic to subscribe to.
*/
void subscribe(std::string const& topic);
/*!
* Subscribe to a topic
*
* Helper function that is equivalent of a loop calling
* \code
* set(zmqpp::socket_option::subscribe, topic);
* \endcode
*
* This method is only useful for subscribe type sockets.
*
* Generally this will be used with stl collections using begin() and
* end() functions to get the iterators.
* For this reason the end loop runs until the end iterator, not inclusive
* of it.
*
* \param topics_begin the starting iterator for topics.
* \param topics_end the final iterator for topics.
*/
template<typename InputIterator>
void subscribe(InputIterator const& topics_begin, InputIterator const& topics_end)
{
for(InputIterator it = topics_begin; it != topics_end; ++it)
{
subscribe(*it);
}
}
/*!
* Unsubscribe from a topic
*
* Helper function that is equivalent of calling
* \code
* set(zmqpp::socket_option::unsubscribe, topic);
* \endcode
*
* This method is only useful for subscribe type sockets.
*
* \param topic the topic to unsubscribe from.
*/
void unsubscribe(std::string const& topic);
/*!
* Unsubscribe from a topic
*
* Helper function that is equivalent of a loop calling
* \code
* set(zmqpp::socket_option::unsubscribe, topic);
* \endcode
*
* This method is only useful for subscribe type sockets.
*
* Generally this will be used with stl collections using begin() and
* end() functions to get the iterators.
* For this reason the end loop runs until the end iterator, not inclusive
* of it.
*
* \param topics_begin the starting iterator for topics.
* \param topics_end the final iterator for topics.
*/
template<typename InputIterator>
void unsubscribe(InputIterator const& topics_begin, InputIterator const& topics_end)
{
for(InputIterator it = topics_begin; it != topics_end; ++it)
{
unsubscribe(*it);
}
}
/*!
* If the last receive part call to the socket resulted
* in a label or a non-terminating part of a multipart
* message this will return true.
*
* \return true if there are more parts
*/
bool has_more_parts() const;
/*!
* Set the value of an option in the underlaying zmq socket.
*
* \param option a valid ::socket_option
* \param value to set the option to
*/
void set(socket_option const option, int const value);
/*!
* Set the value of an option in the underlaying zmq socket.
*
* \since 2.0.0 (built against 0mq version 3.1.x or later)
*
* \param option a valid ::socket_option
* \param value to set the option to
*/
void set(socket_option const option, bool const value);
/*!
* Set the value of an option in the underlaying zmq socket.
*
* \param option a valid ::socket_option
* \param value to set the option to
*/
void set(socket_option const option, uint64_t const value);
/*!
* Set the value of an option in the underlaying zmq socket.
*
* \param option a valid ::socket_option
* \param value to set the option to
*/
void set(socket_option const option, int64_t const value);
/*!
* Set the value of an option in the underlaying zmq socket.
*
* \param option a valid ::socket_option
* \param pointer to raw byte value to set the option to
* \param length the size of the raw byte value
*/
void set(socket_option const option, char const* value, size_t const length);
/*!
* Set the value of an option in the underlaying zmq socket.
*
* \param option a valid ::socket_option
* \param pointer to null terminated cstring value to set the option to
*/
inline void set(socket_option const option, char const* value) { set(option, value, strlen(value)); }
/*!
* Set the value of an option in the underlaying zmq socket.
*
* \param option a valid ::socket_option
* \param value to set the option to
*/
inline void set(socket_option const option, std::string const value) { set(option, value.c_str(), value.length()); }
/*!
* Get a socket option from the underlaying zmq socket.
*
* \param option a valid ::socket_option
* \param value referenced int to return value in
*/
void get(socket_option const option, int& value) const;
/*!
* Get a socket option from the underlaying zmq socket.
*
* \param option a valid ::socket_option
* \param value referenced bool to return value in
*/
void get(socket_option const option, bool& value) const;
/*!
* Get a socket option from the underlaying zmq socket.
*
* \param option a valid ::socket_option
* \param value referenced uint64_t to return value in
*/
void get(socket_option const option, uint64_t& value) const;
/*!
* Get a socket option from the underlaying zmq socket.
*
* \param option a valid ::socket_option
* \param value referenced uint64_t to return value in
*/
void get(socket_option const option, int64_t& value) const;
/*!
* Get a socket option from the underlaying zmq socket.
*
* \param option a valid ::socket_option
* \param value referenced std::string to return value in
*/
void get(socket_option const option, std::string& value) const;
/*!
* For those that don't want to get into a referenced value this templated method
* will return the value instead.
*
* \param option a valid ::socket_option
* \return socket option value
*/
template<typename Type>
Type get(socket_option const option) const
{
Type value = Type();
get(option, value);
return value;
}
/*!
* Move constructor
*
* Moves the internals of source to this object, there is no guarantee
* that source will be left in a valid state.
*
* This constructor is noexcept and so will not throw exceptions
*
* \param source target socket to steal internals from
*/
socket(socket&& source) NOEXCEPT;
/*!
* Move operator
*
* Moves the internals of source to this object, there is no guarantee
* that source will be left in a valid state.
*
* This function is noexcept and so will not throw exceptions
*
* \param source target socket to steal internals from
* \return socket reference to this
*/
socket& operator=(socket&& source) NOEXCEPT;
/*!
* Check the socket is still valid
*
* This tests the internal state of the socket.
* If creation failed for some reason or if the move functions were used
* to move the socket internals to another instance this will return false.
*
* \return true if the socket is valid
*/
operator bool() const;
/*!
* Access to the raw 0mq context
*
* \return void pointer to the underlying 0mq socket
*/
operator void*() const;
private:
void* _socket;
socket_type _type;
zmq_msg_t _recv_buffer;
// No copy
socket(socket const&) NOEXCEPT ZMQPP_EXPLICITLY_DELETED;
socket& operator=(socket const&) NOEXCEPT ZMQPP_EXPLICITLY_DELETED;
void track_message(message_t const&, uint32_t const, bool&);
};
}
#endif /* ZMQPP_SOCKET_HPP_ */

View File

@@ -1,80 +0,0 @@
/**
* \file
*
* \date 23 Sep 2011
* \author Ben Gray (\@benjamg)
*/
#ifndef ZMQPP_SOCKET_OPTIONS_HPP_
#define ZMQPP_SOCKET_OPTIONS_HPP_
namespace zmqpp
{
/** \todo Expand the information on the options to make it actually useful. */
/*!
* \brief possible Socket options in zmq
*/
enum class socket_option {
affinity = ZMQ_AFFINITY, /*!< I/O thread affinity */
identity = ZMQ_IDENTITY, /*!< Socket identity */
subscribe = ZMQ_SUBSCRIBE, /*!< Add topic subscription - set only */
unsubscribe = ZMQ_UNSUBSCRIBE, /*!< Remove topic subscription - set only */
rate = ZMQ_RATE, /*!< Multicast data rate */
send_buffer_size = ZMQ_SNDBUF, /*!< Kernel transmission buffer size */
receive_buffer_size = ZMQ_RCVBUF, /*!< Kernel receive buffer size */
receive_more = ZMQ_RCVMORE, /*!< Can receive more parts - get only */
file_descriptor = ZMQ_FD, /*!< Socket file descriptor - get only */
events = ZMQ_EVENTS, /*!< Socket event flags - get only */
type = ZMQ_TYPE, /*!< Socket type - get only */
linger = ZMQ_LINGER, /*!< Socket linger timeout */
backlog = ZMQ_BACKLOG, /*!< Maximum length of outstanding connections - get only */
#if (ZMQ_VERSION_MAJOR == 2)
// Note that this is inverse of the zmq names for version 2.x
recovery_interval_seconds = ZMQ_RECOVERY_IVL, /*!< Multicast recovery interval in seconds */
recovery_interval = ZMQ_RECOVERY_IVL_MSEC, /*!< Multicast recovery interval in milliseconds */
#else
recovery_interval = ZMQ_RECOVERY_IVL, /*!< Multicast recovery interval in milliseconds */
#endif
reconnect_interval = ZMQ_RECONNECT_IVL, /*!< Reconnection interval */
reconnect_interval_max = ZMQ_RECONNECT_IVL_MAX, /*!< Maximum reconnection interval */
receive_timeout = ZMQ_RCVTIMEO, /*!< Maximum inbound block timeout */
send_timeout = ZMQ_SNDTIMEO, /*!< Maximum outbound block timeout */
#if (ZMQ_VERSION_MAJOR == 2)
high_water_mark = ZMQ_HWM, /*!< High-water mark for all messages */
swap_size = ZMQ_SWAP, /*!< Maximum socket swap size in bytes */
multicast_loopback = ZMQ_MCAST_LOOP, /*!< Allow multicast packet loopback */
#else
max_messsage_size = ZMQ_MAXMSGSIZE, /*!< Maximum inbound message size */
send_high_water_mark = ZMQ_SNDHWM, /*!< High-water mark for outbound messages */
receive_high_water_mark = ZMQ_RCVHWM, /*!< High-water mark for inbound messages */
multicast_hops = ZMQ_MULTICAST_HOPS, /*!< Maximum number of multicast hops */
#endif
#if (ZMQ_VERSION_MAJOR > 3) || ((ZMQ_VERSION_MAJOR == 3) && (ZMQ_VERSION_MINOR >= 1))
ipv4_only = ZMQ_IPV4ONLY,
#endif
#if (ZMQ_VERSION_MAJOR > 3) || ((ZMQ_VERSION_MAJOR == 3) && (ZMQ_VERSION_MINOR >= 2))
#if (ZMQ_VERSION_MINOR == 2)
delay_attach_on_connect = ZMQ_DELAY_ATTACH_ON_CONNECT, /*!< Delay buffer attachment until connect complete */
#else
// ZMQ_DELAY_ATTACH_ON_CONNECT is renamed in ZeroMQ starting 3.3.x
immediate = ZMQ_IMMEDIATE, /*!< Block message sending until connect complete */
#endif
last_endpoint = ZMQ_LAST_ENDPOINT, /*!< Last bound endpoint - get only */
router_mandatory = ZMQ_ROUTER_MANDATORY, /*!< Require routable messages - set only */
xpub_verbose = ZMQ_XPUB_VERBOSE, /*!< Pass on existing subscriptions - set only */
tcp_keepalive = ZMQ_TCP_KEEPALIVE, /*!< Enable TCP keepalives */
tcp_keepalive_idle = ZMQ_TCP_KEEPALIVE_IDLE, /*!< TCP keepalive idle count (generally retry count) */
tcp_keepalive_count = ZMQ_TCP_KEEPALIVE_CNT, /*!< TCP keepalive retry count */
tcp_keepalive_interval = ZMQ_TCP_KEEPALIVE_INTVL, /*!< TCP keepalive interval */
tcp_accept_filter = ZMQ_TCP_ACCEPT_FILTER, /*!< Filter inbound connections - set only */
#endif
#ifdef ZMQ_EXPERIMENTAL_LABELS
receive_label = ZMQ_RCVLABEL, /*!< Received label part - get only */
#endif
};
}
#endif /* ZMQPP_SOCKET_OPTIONS_HPP_ */

View File

@@ -1,148 +0,0 @@
/**
* \file
*
* \date 23 Sep 2011
* \author Ben Gray (\@benjamg)
*/
#ifndef ZMQPP_SOCKET_TYPES_HPP_
#define ZMQPP_SOCKET_TYPES_HPP_
namespace zmqpp
{
/*!
* \brief Socket types allowed by zmq
*
* The socket type choose at creation must be one of these types.
*
* Each is designed for a different use and has different limitations.
*/
enum class socket_type {
/*!
* One to one - two way connection.\n
* Connect to ::pair.\n
* A \c pair socket has to be connected only one other pair socket and allows
* two way communication between them.
*/
pair = ZMQ_PAIR,
/*!
* One to many - fan out.\n
* Connect to ::subscribe or ::xsubscribe.\n
* Socket is send only.\n
* Socket will drop messages and not block.\n
* \c publish sockets allow sending of the same message to many subscribers
* each subscriber can limit what is sent through the socket_option::subscribe
* settings.
*/
publish = ZMQ_PUB,
/*!
* \note It seems doxygen can't work out which data is for the socket type and
* which is for the socket option so both get listed for both.
*
* One to many - fair-queued.\n
* Connect to ::publish or ::xpublish.\n
* Socket is receive only.\n
* The \c subscribe socket can connection to any number of publishers and will
* fairly pull messages from each. The socket_option::subscribe settings can
* be use to limit which messages are received and by default none are.
*/
subscribe = ZMQ_SUB,
/*!
* One to many - fair-queued.\n
* Connect to ::push.\n
* Socket is receive only.\n
* The \c pull socket fairly pulls messages from all pushers it is connected
* to.
*/
pull = ZMQ_PULL,
/*!
* One to many - load-balanced.\n
* Connect to ::pull.\n
* Socket is send only.\n
* Socket will block if unable to send.\n
* The \c push socket fairly distributes messages between any connected
* puller sockets.
*/
push = ZMQ_PUSH,
/*!
* One to many - fair-queued outgoing, last peer incoming.\n
* Connect to ::reply or ::xreply.\n
* Socket flips between send and receive only.\n
* Socket will block if unable to send.\n
* The \c request socket will fairly balance requests sent out to a
* replier and then can only be used to receive until that replier
* sends a reply.
*/
request = ZMQ_REQ,
/*!
* One to many - load-balanced incoming, last peer outgoing.\n
* Connect to ::request or ::xrequest.\n
* Socket flips between send and receive only.\n
* Socket will drop messages and not block.\n
* The \c reply socket can only receive until it pulls a message from a
* requester at which point it can only send until the reply is sent.
*/
reply = ZMQ_REP,
/*!
* One to many - fan out.\n
* Connect to ::subscribe or ::xsubscribe.\n
* Socket is send only with the exception of special subscription messages.\n
* Socket will drop messages and not block.\n
* \c xpublish act the same as ::publish sockets however also allow special
* subscription messages to be received from subscribers.
*/
xpublish = ZMQ_XPUB,
/*!
* One to many - fair-queued.\n
* Connect to ::publish or ::xpublish.\n
* Socket is receive only with the exception of special subscription messages\n
* \c xsubscribe act the same as ::subscribe sockets however also allow special
* subscription messages to be send to connected publishers.
*/
xsubscribe = ZMQ_XSUB,
/*!
* One to many - fair-queued incoming, load-balanced outgoing.\n
* Connect to ::reply or ::xreply.\n
* Socket will block if unable to send.\n
* An \c xrequest socket balances requests between repliers and pulls replies
* back in a fair manner. Each request is expected to have exactly one reply.
*/
xrequest = ZMQ_XREQ,
/*!
* One to many - fair-queued incoming, targeted outgoing.\n
* Connect to ::request or ::xrequest.\n
* Socket will drop messages and not block.\n
* An \c xreply socket fairly pulls in requests from requesters and will
* label requests so it can return replies back to the correct target.
*/
xreply = ZMQ_XREP,
// To match for people who prefer the shorter versions
pub = ZMQ_PUB, /*!< version of ::publish to match zmq name convention */
sub = ZMQ_SUB, /*!< version of ::subscribe to match zmq name convention */
req = ZMQ_REQ, /*!< version of ::request to match zmq name convention */
rep = ZMQ_REP, /*!< version of ::reply to match zmq name convention */
xpub = ZMQ_XPUB, /*!< version of ::xpublish to match zmq name convention */
xsub = ZMQ_XSUB, /*!< version of ::xsubscribe to match zmq name convention */
xreq = ZMQ_XREQ, /*!< version of ::xrequest to match zmq name convention */
xrep = ZMQ_XREP, /*!< version of ::xreply to match zmq name convention */
// For completion
router = ZMQ_ROUTER, /*!< \deprecated Matches zmq 2.x xrep functionality. */
dealer = ZMQ_DEALER /*!< \deprecated Matches zmq 2.x xreq functionality. */
};
}
#endif /* ZMQPP_SOCKET_TYPES_HPP_ */

View File

@@ -1,30 +0,0 @@
/*
* Created on: 18 Aug 2011
* Author: Ben Gray (@benjamg)
*/
#include "zmqpp.hpp"
namespace zmqpp
{
std::string version()
{
return BUILD_VERSION;
}
void version(uint8_t& major, uint8_t& minor, uint8_t& revision)
{
major = ZMQPP_VERSION_MAJOR;
minor = ZMQPP_VERSION_MINOR;
revision = ZMQPP_VERSION_REVISION;
}
void zmq_version(uint8_t& major, uint8_t& minor, uint8_t& patch)
{
major = ZMQ_VERSION_MAJOR;
minor = ZMQ_VERSION_MINOR;
patch = ZMQ_VERSION_PATCH;
}
}

View File

@@ -1,111 +0,0 @@
/**
* \file
*
* \date 9 Aug 2011
* \author Ben Gray (\@benjamg)
*
* License: http://www.opensource.org/licenses/MIT
*
* Copyright (C) 2011 by Ben Gray
*
* Permission is hereby granted, free of charge, to any person obtaining a copy
* of this software and associated documentation files (the "Software"), to deal
* in the Software without restriction, including without limitation the rights
* to use, copy, modify, merge, publish, distribute, sublicense, and/or sell
* copies of the Software, and to permit persons to whom the Software is
* furnished to do so, subject to the following conditions:
*
* The above copyright notice and this permission notice shall be included in
* all copies or substantial portions of the Software.
*
* THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
* IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
* FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
* AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
* LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM,
* OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN
* THE SOFTWARE.
*/
#ifndef ZMQPP_ZMQPP_HPP_
#define ZMQPP_ZMQPP_HPP_
/**
* \def ZMQPP_VERSION_MAJOR
* zmqpp major version number, generated at compile time
*/
#define ZMQPP_VERSION_MAJOR BUILD_VERSION_MAJOR
/**
* \def ZMQPP_VERSION_MINOR
* zmqpp minor version number, generated at compile time
*/
#define ZMQPP_VERSION_MINOR BUILD_VERSION_MINOR
/**
* \def ZMQPP_VERSION_REVISION
* zmqpp version revision number, generated at compile time
*/
#define ZMQPP_VERSION_REVISION BUILD_VERSION_REVISION
#include <zmq.h>
#include "compatibility.hpp"
#include "context.hpp"
#include "exception.hpp"
#include "message.hpp"
#include "poller.hpp"
#include "socket.hpp"
/*!
* \brief C++ wrapper around zmq
*
* All zmq++ / zmqpp functions, constants and classes live within this namespace,
*/
namespace zmqpp
{
/*!
* Returns the current major.minor.revision version number as a string.
*
* \return string version number.
*/
std::string version();
/*!
* Retrieve the parts of the zmqpp version number.
*
* Set the three parameters to values representing the zmqpp version number.
* These values are generated at library compile time.
*
* \param major an unsigned 8 bit reference to set to the major version.
* \param minor an unsigned 8 bit reference to set to the minor version.
* \param revision an unsigned 8 bit reference to set the current revision.
*/
void version(uint8_t& major, uint8_t& minor, uint8_t& revision);
/*!
* Retrieve the parts of the 0mq version this library was built against.
*
* Because sections of the library are optionally compiled in or ignored
* depending on the version of 0mq it was compiled against this method is
* provided to allow sanity checking for usage.
*
* Set the three parameters to values representing the 0mq version number.
* These values are generated at library compile time.
*
* \param major an unsigned 8 bit reference to set to the major version.
* \param minor an unsigned 8 bit reference to set to the minor version.
* \param revision an unsigned 8 bit reference to set the current revision.
*/
void zmq_version(uint8_t& major, uint8_t& minor, uint8_t& patch);
typedef context context_t; /*!< \brief context type */
typedef std::string endpoint_t; /*!< \brief endpoint type */
typedef message message_t; /*!< \brief message type */
typedef poller poller_t; /*!< \brief poller type */
typedef socket socket_t; /*!< \brief socket type */
}
#endif /* ZMQPP_ZMQPP_HPP_ */

View File

@@ -1,106 +0,0 @@
diff --git a/dep/zmqpp/zmqpp/inet.hpp b/dep/zmqpp/zmqpp/inet.hpp
index 5245aa4..e0c3b14 100644
--- a/dep/zmqpp/zmqpp/inet.hpp
+++ b/dep/zmqpp/zmqpp/inet.hpp
@@ -76,10 +76,12 @@ inline uint64_t swap_if_needed(uint64_t const value_to_check)
* \param hostlonglong unsigned 64 bit host order integer
* \return unsigned 64 bit network order integer
*/
+#ifndef htonll
inline uint64_t htonll(uint64_t const hostlonglong)
{
return zmqpp::swap_if_needed(hostlonglong);
}
+#endif
/*!
* 64 bit version of the ntohs/ntohl
@@ -89,10 +91,12 @@ inline uint64_t htonll(uint64_t const hostlonglong)
* \param networklonglong unsigned 64 bit network order integer
* \return unsigned 64 bit host order integer
*/
+#ifndef ntohll
inline uint64_t ntohll(uint64_t const networklonglong)
{
return zmqpp::swap_if_needed(networklonglong);
}
+#endif
/*!
* floating point version of the htons/htonl
@@ -142,7 +146,7 @@ inline double htond(double value)
uint64_t temp;
memcpy(&temp, &value, sizeof(uint64_t));
- temp = zmqpp::htonll(temp);
+ temp = htonll(temp);
memcpy(&value, &temp, sizeof(uint64_t));
return value;
@@ -160,7 +164,7 @@ inline double ntohd(double value)
uint64_t temp;
memcpy(&temp, &value, sizeof(uint64_t));
- temp = zmqpp::ntohll(temp);
+ temp = ntohll(temp);
memcpy(&value, &temp, sizeof(uint64_t));
return value;
diff --git a/dep/zmqpp/zmqpp/message.cpp b/dep/zmqpp/zmqpp/message.cpp
index 5858730..4d81d24 100644
--- a/dep/zmqpp/zmqpp/message.cpp
+++ b/dep/zmqpp/zmqpp/message.cpp
@@ -138,7 +138,7 @@ void message::get(int64_t& integer, size_t const part) const
assert(sizeof(int64_t) == size(part));
uint64_t const* network_order = static_cast<uint64_t const*>(raw_data(part));
- integer = static_cast<int64_t>(zmqpp::htonll(*network_order));
+ integer = static_cast<int64_t>(htonll(*network_order));
}
void message::get(uint8_t& unsigned_integer, size_t const part) const
@@ -170,7 +170,7 @@ void message::get(uint64_t& unsigned_integer, size_t const part) const
assert(sizeof(uint64_t) == size(part));
uint64_t const* network_order = static_cast<uint64_t const*>(raw_data(part));
- unsigned_integer = zmqpp::ntohll(*network_order);
+ unsigned_integer = ntohll(*network_order);
}
void message::get(float& floating_point, size_t const part) const
@@ -228,7 +228,7 @@ message& message::operator<<(int32_t const integer)
message& message::operator<<(int64_t const integer)
{
- uint64_t network_order = zmqpp::htonll(static_cast<uint64_t>(integer));
+ uint64_t network_order = htonll(static_cast<uint64_t>(integer));
add(reinterpret_cast<void const*>(&network_order), sizeof(uint64_t));
return *this;
@@ -259,7 +259,7 @@ message& message::operator<<(uint32_t const unsigned_integer)
message& message::operator<<(uint64_t const unsigned_integer)
{
- uint64_t network_order = zmqpp::htonll(unsigned_integer);
+ uint64_t network_order = htonll(unsigned_integer);
add(reinterpret_cast<void const*>(&network_order), sizeof(uint64_t));
return *this;
@@ -329,7 +329,7 @@ void message::push_front(int32_t const integer)
void message::push_front(int64_t const integer)
{
- uint64_t network_order = zmqpp::htonll(static_cast<uint64_t>(integer));
+ uint64_t network_order = htonll(static_cast<uint64_t>(integer));
push_front(&network_order, sizeof(uint64_t));
}
@@ -353,7 +353,7 @@ void message::push_front(uint32_t const unsigned_integer)
void message::push_front(uint64_t const unsigned_integer)
{
- uint64_t network_order = zmqpp::htonll(unsigned_integer);
+ uint64_t network_order = htonll(unsigned_integer);
push_front(&network_order, sizeof(uint64_t));
}

View File

@@ -27,7 +27,6 @@ endif(WIN32)
add_subdirectory(database)
add_subdirectory(shared)
add_subdirectory(ipc)
add_subdirectory(bnetserver)
add_subdirectory(game)
add_subdirectory(scripts)

View File

@@ -43,8 +43,7 @@ endif()
target_link_libraries(bnetserver
PUBLIC
shared
ipc)
shared)
CollectIncludeDirectories(
${CMAKE_CURRENT_SOURCE_DIR}

View File

@@ -36,7 +36,6 @@
#include "RealmList.h"
#include "GitRevision.h"
#include "Util.h"
#include "ZmqContext.h"
#include "DatabaseLoader.h"
#include <cstdlib>
#include <iostream>
@@ -136,23 +135,14 @@ int main(int argc, char** argv)
}
}
int32 worldListenPort = sConfigMgr->GetIntDefault("WorldserverListenPort", 1118);
if (worldListenPort < 0 || worldListenPort > 0xFFFF)
{
TC_LOG_ERROR("server.bnetserver", "Specified worldserver listen port (%d) out of allowed range (1-65535)", worldListenPort);
return 1;
}
// Initialize the database connection
if (!StartDB())
return 1;
sIpcContext->Initialize();
_ioService = new boost::asio::io_service();
// Get the list of realms for the server
sRealmList->Initialize(*_ioService, sConfigMgr->GetIntDefault("RealmsStateUpdateDelay", 10), worldListenPort);
sRealmList->Initialize(*_ioService, sConfigMgr->GetIntDefault("RealmsStateUpdateDelay", 10));
// Start the listening port (acceptor) for auth connections
int32 bnport = sConfigMgr->GetIntDefault("BattlenetPort", 1119);
@@ -209,8 +199,6 @@ int main(int argc, char** argv)
sSessionMgr.StopNetwork();
sIpcContext->Close();
sRealmList->Close();
// Close the Database Pool and library

View File

@@ -20,23 +20,20 @@
#include "Database/DatabaseEnv.h"
#include "SessionManager.h"
#include "Util.h"
#include "Commands.h"
#include "RealmList.h"
#include <boost/asio/ip/tcp.hpp>
RealmList::RealmList() : _updateInterval(0), _updateTimer(nullptr), _resolver(nullptr), _worldListener(nullptr)
RealmList::RealmList() : _updateInterval(0), _updateTimer(nullptr), _resolver(nullptr)
{
}
RealmList::~RealmList()
{
delete _updateTimer;
delete _resolver;
delete _worldListener;
}
// Load the realm list from the database
void RealmList::Initialize(boost::asio::io_service& ioService, uint32 updateInterval, uint16 worldListenPort)
void RealmList::Initialize(boost::asio::io_service& ioService, uint32 updateInterval)
{
_updateInterval = updateInterval;
_updateTimer = new boost::asio::deadline_timer(ioService);
@@ -44,14 +41,10 @@ void RealmList::Initialize(boost::asio::io_service& ioService, uint32 updateInte
// Get the content of the realmlist table in the database
UpdateRealms(boost::system::error_code());
_worldListener = new WorldListener(worldListenPort);
_worldListener->Start();
}
void RealmList::Close()
{
_worldListener->End();
_updateTimer->cancel();
}

View File

@@ -21,7 +21,6 @@
#include "Common.h"
#include "Realm/Realm.h"
#include "WorldListener.h"
#include <boost/asio/ip/address.hpp>
#include <boost/asio/ip/tcp.hpp>
#include <boost/asio/io_service.hpp>
@@ -43,7 +42,7 @@ public:
~RealmList();
void Initialize(boost::asio::io_service& ioService, uint32 updateInterval, uint16 worldListenPort);
void Initialize(boost::asio::io_service& ioService, uint32 updateInterval);
void Close();
RealmMap const& GetRealms() const { return _realms; }
@@ -60,7 +59,6 @@ private:
uint32 _updateInterval;
boost::asio::deadline_timer* _updateTimer;
boost::asio::ip::tcp::resolver* _resolver;
WorldListener* _worldListener;
};
#define sRealmList RealmList::instance()

View File

@@ -1,132 +0,0 @@
/*
* Copyright (C) 2008-2016 TrinityCore <http://www.trinitycore.org/>
*
* This program is free software; you can redistribute it and/or modify it
* under the terms of the GNU General Public License as published by the
* Free Software Foundation; either version 2 of the License, or (at your
* option) any later version.
*
* This program is distributed in the hope that it will be useful, but WITHOUT
* ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or
* FITNESS FOR A PARTICULAR PURPOSE. See the GNU General Public License for
* more details.
*
* You should have received a copy of the GNU General Public License along
* with this program. If not, see <http://www.gnu.org/licenses/>.
*/
#include "Log.h"
#include "SessionManager.h"
#include "WoWRealmPackets.h"
#include "ZmqContext.h"
#include "WorldListener.h"
WorldListener::HandlerTable const WorldListener::_handlers;
WorldListener::HandlerTable::HandlerTable()
{
#define DEFINE_HANDLER(opc, func) _handlers[opc] = { func, #opc }
DEFINE_HANDLER(BNET_CHANGE_TOON_ONLINE_STATE, &WorldListener::HandleToonOnlineStatusChange);
#undef DEFINE_HANDLER
}
WorldListener::WorldListener(uint16 worldListenPort) : _worldListenPort(worldListenPort)
{
_worldSocket = sIpcContext->CreateNewSocket(zmqpp::socket_type::pull);
}
WorldListener::~WorldListener()
{
delete _worldSocket;
}
void WorldListener::Run()
{
while (!ProcessExit())
{
_poller->poll();
if (_poller->events(*_worldSocket) & zmqpp::poller::poll_in)
{
int32 op1;
do
{
zmqpp::message msg;
_worldSocket->receive(msg);
Dispatch(msg);
_worldSocket->get(zmqpp::socket_option::events, op1);
} while (op1 & zmqpp::poller::poll_in);
}
}
}
void WorldListener::HandleOpen()
{
try
{
_worldSocket->bind(std::string("tcp://*:") + std::to_string(_worldListenPort));
}
catch (zmqpp::zmq_internal_exception& ex)
{
TC_LOG_FATAL("server.ipc", "Could not bind to WorldserverListenPort %u. Exception: %s. Shutting down bnetserver.", _worldListenPort, ex.what());
abort();
}
_poller->add(*_worldSocket);
TC_LOG_INFO("server.ipc", "Listening on connections from worldservers on port %u...", _worldListenPort);
}
void WorldListener::HandleClose()
{
_worldSocket->close();
TC_LOG_INFO("server.ipc", "Shutting down connections from worldservers...");
}
void WorldListener::Dispatch(zmqpp::message& msg) const
{
IPC::BattlenetComm::Header ipcHeader;
msg >> ipcHeader;
if (ipcHeader.Ipc.Channel != IPC_CHANNEL_BNET)
return;
if (ipcHeader.Ipc.Command < IPC_BNET_MAX_COMMAND)
(this->*_handlers[ipcHeader.Ipc.Command].Handler)(ipcHeader.Realm, msg);
}
void WorldListener::HandleToonOnlineStatusChange(Battlenet::RealmHandle const& realm, zmqpp::message& msg) const
{
IPC::BattlenetComm::ToonHandle toonHandle;
bool online;
msg >> toonHandle;
msg >> online;
if (Battlenet::Session* session = sSessionMgr.GetSession(toonHandle.AccountId, toonHandle.GameAccountId))
{
if (online)
{
if (!session->IsToonOnline())
{
Battlenet::WoWRealm::ToonReady* toonReady = new Battlenet::WoWRealm::ToonReady();
toonReady->Name.Region = realm.Region;
toonReady->Name.ProgramId = "WoW";
toonReady->Name.Realm = realm.GetAddress();
toonReady->Name.Name = toonHandle.Name;
toonReady->Handle.Region = realm.Region;
toonReady->Handle.ProgramId = "WoW";
toonReady->Handle.Realm = realm.GetAddress();
toonReady->Handle.Id = toonHandle.Guid;
session->SetToonOnline(true);
session->AsyncWrite(toonReady);
}
}
else if (session->IsToonOnline())
{
session->AsyncWrite(new Battlenet::WoWRealm::ToonLoggedOut());
session->SetToonOnline(false);
}
}
}

View File

@@ -1,63 +0,0 @@
/*
* Copyright (C) 2008-2016 TrinityCore <http://www.trinitycore.org/>
*
* This program is free software; you can redistribute it and/or modify it
* under the terms of the GNU General Public License as published by the
* Free Software Foundation; either version 2 of the License, or (at your
* option) any later version.
*
* This program is distributed in the hope that it will be useful, but WITHOUT
* ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or
* FITNESS FOR A PARTICULAR PURPOSE. See the GNU General Public License for
* more details.
*
* You should have received a copy of the GNU General Public License along
* with this program. If not, see <http://www.gnu.org/licenses/>.
*/
#ifndef WorldListener_h__
#define WorldListener_h__
#include "ZMQTask.h"
#include "Commands.h"
class WorldListener : public ZMQTask
{
public:
explicit WorldListener(uint16 worldListenPort);
~WorldListener();
void Run() override;
protected:
void HandleOpen() override;
void HandleClose() override;
private:
void Dispatch(zmqpp::message& msg) const;
typedef void(WorldListener::*PacketHandler)(Battlenet::RealmHandle const& realm, zmqpp::message& msg) const;
class HandlerTable
{
public:
HandlerTable();
struct HandlerInfo
{
PacketHandler Handler;
char const* Name;
};
HandlerInfo const& operator[](uint8 opcode) const { return _handlers[opcode]; }
private:
HandlerInfo _handlers[IPC_BNET_MAX_COMMAND];
};
void HandleToonOnlineStatusChange(Battlenet::RealmHandle const& realm, zmqpp::message& msg) const;
zmqpp::socket* _worldSocket;
uint16 _worldListenPort;
static HandlerTable const _handlers;
};
#endif // WorldListener_h__

View File

@@ -47,13 +47,6 @@ LogsDir = ""
MaxPingTime = 30
#
# WorldserverListenPort
# Description: TCP port to listen on for incoming worldserver IPC.
# Default: 1118
WorldserverListenPort = 1118
#
# BattlenetPort
# Description: TCP port to reach the auth server for battle.net connections.

View File

@@ -40,7 +40,7 @@ target_include_directories(game
target_link_libraries(game
PUBLIC
ipc
shared
Detour)
set_target_properties(game

View File

@@ -21,7 +21,6 @@
#include "ArenaTeamMgr.h"
#include "AuthenticationPackets.h"
#include "Battleground.h"
#include "BattlenetServerManager.h"
#include "BattlePetPackets.h"
#include "CalendarMgr.h"
#include "CharacterPackets.h"
@@ -1138,8 +1137,6 @@ void WorldSession::HandlePlayerLogin(LoginQueryHolder* holder)
sScriptMgr->OnPlayerLogin(pCurrChar, firstLogin);
sBattlenetServer.SendChangeToonOnlineState(GetBattlenetAccountId(), GetAccountId(), _player->GetGUID(), _player->GetName(), true);
delete holder;
}

View File

@@ -1,80 +0,0 @@
/*
* Copyright (C) 2008-2016 TrinityCore <http://www.trinitycore.org/>
*
* This program is free software; you can redistribute it and/or modify it
* under the terms of the GNU General Public License as published by the
* Free Software Foundation; either version 2 of the License, or (at your
* option) any later version.
*
* This program is distributed in the hope that it will be useful, but WITHOUT
* ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or
* FITNESS FOR A PARTICULAR PURPOSE. See the GNU General Public License for
* more details.
*
* You should have received a copy of the GNU General Public License along
* with this program. If not, see <http://www.gnu.org/licenses/>.
*/
#include "Config.h"
#include "World.h"
#include "ZmqContext.h"
#include "BattlenetServerManager.h"
void IPC::BattlenetComm::ServerManager::InitializeConnection()
{
std::string bnetserverAddress = sConfigMgr->GetStringDefault("BnetServer.Address", "127.0.0.1");
int32 bnetserverPort = sConfigMgr->GetIntDefault("BnetServer.Port", 1118);
_socket = new ZmqMux("inproc://bnetmgr", "tcp://" + bnetserverAddress + ":" + std::to_string(bnetserverPort));
_socket->Start();
}
void IPC::BattlenetComm::ServerManager::CloseConnection()
{
_socket->End();
delete _socket;
_socket = nullptr;
}
IPC::BattlenetComm::ServerManager& IPC::BattlenetComm::ServerManager::Instance()
{
static ServerManager instance;
return instance;
}
IPC::BattlenetComm::Header IPC::BattlenetComm::ServerManager::CreateHeader(BnetCommands command)
{
Header header;
header.Ipc.Channel = IPC_CHANNEL_BNET;
header.Ipc.Command = command;
header.Realm = realm.Id;
return header;
}
void IPC::BattlenetComm::ServerManager::SendChangeToonOnlineState(uint32 battlenetAccountId, uint32 gameAccountId, ObjectGuid guid, std::string const& name, bool online)
{
// Do nothing for Grunt login
if (!battlenetAccountId)
return;
Header header = CreateHeader(BNET_CHANGE_TOON_ONLINE_STATE);
ToonHandle toon;
toon.AccountId = battlenetAccountId;
toon.GameAccountId = gameAccountId;
toon.Guid = guid.GetCounter();
toon.Name = name;
zmqpp::message msg;
msg << header;
msg << toon;
msg << online;
Send(&msg);
}
void IPC::BattlenetComm::ServerManager::Send(zmqpp::message* msg)
{
if (!_socket)
return;
_socket->Send(msg);
}

View File

@@ -1,57 +0,0 @@
/*
* Copyright (C) 2008-2016 TrinityCore <http://www.trinitycore.org/>
*
* This program is free software; you can redistribute it and/or modify it
* under the terms of the GNU General Public License as published by the
* Free Software Foundation; either version 2 of the License, or (at your
* option) any later version.
*
* This program is distributed in the hope that it will be useful, but WITHOUT
* ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or
* FITNESS FOR A PARTICULAR PURPOSE. See the GNU General Public License for
* more details.
*
* You should have received a copy of the GNU General Public License along
* with this program. If not, see <http://www.gnu.org/licenses/>.
*/
#ifndef BattlenetMgr_h__
#define BattlenetMgr_h__
#include "ZmqMux.h"
#include "Commands.h"
namespace zmqpp
{
class socket;
class message;
}
namespace IPC
{
namespace BattlenetComm
{
class ServerManager
{
ServerManager() : _socket(nullptr) { }
public:
void InitializeConnection();
void CloseConnection();
static ServerManager& Instance();
void SendChangeToonOnlineState(uint32 battlenetAccountId, uint32 gameAccountId, ObjectGuid guid, std::string const& name, bool online);
private:
void Send(zmqpp::message* msg);
static Header CreateHeader(BnetCommands command);
ZmqMux* _socket;
};
}
}
#define sBattlenetServer IPC::BattlenetComm::ServerManager::Instance()
#endif // BattlenetMgr_h__

View File

@@ -42,7 +42,6 @@
#include "SocialMgr.h"
#include "ScriptMgr.h"
#include "WardenWin.h"
#include "BattlenetServerManager.h"
#include "AuthenticationPackets.h"
#include "CharacterPackets.h"
#include "ClientConfigPackets.h"
@@ -612,8 +611,6 @@ void WorldSession::LogoutPlayer(bool save)
TC_LOG_INFO("entities.player.character", "Account: %u (IP: %s) Logout Character:[%s] (%s) Level: %d",
GetAccountId(), GetRemoteAddress().c_str(), _player->GetName().c_str(), _player->GetGUID().ToString().c_str(), _player->getLevel());
sBattlenetServer.SendChangeToonOnlineState(GetBattlenetAccountId(), GetAccountId(), _player->GetGUID(), _player->GetName(), false);
if (Map* _map = _player->FindMap())
_map->RemovePlayerFromMap(_player, true);

View File

@@ -1,33 +0,0 @@
# Copyright (C) 2008-2016 TrinityCore <http://www.trinitycore.org/>
#
# This file is free software; as a special exception the author gives
# unlimited permission to copy and/or distribute it, with or without
# modifications, as long as this notice is preserved.
#
# This program is distributed in the hope that it will be useful, but
# WITHOUT ANY WARRANTY, to the extent permitted by law; without even the
# implied warranty of MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.
CollectSourceFiles(
${CMAKE_CURRENT_SOURCE_DIR}
PRIVATE_SOURCES)
GroupSources(${CMAKE_CURRENT_SOURCE_DIR})
add_library(ipc ${PRIVATE_SOURCES})
target_include_directories(ipc
PUBLIC
${CMAKE_CURRENT_SOURCE_DIR}
PRIVATE
${CMAKE_CURRENT_BINARY_DIR})
target_link_libraries(ipc
PUBLIC
shared
zmqpp)
set_target_properties(ipc
PROPERTIES
FOLDER
"server")

View File

@@ -1,81 +0,0 @@
/*
* Copyright (C) 2008-2016 TrinityCore <http://www.trinitycore.org/>
*
* This program is free software; you can redistribute it and/or modify it
* under the terms of the GNU General Public License as published by the
* Free Software Foundation; either version 2 of the License, or (at your
* option) any later version.
*
* This program is distributed in the hope that it will be useful, but WITHOUT
* ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or
* FITNESS FOR A PARTICULAR PURPOSE. See the GNU General Public License for
* more details.
*
* You should have received a copy of the GNU General Public License along
* with this program. If not, see <http://www.gnu.org/licenses/>.
*/
#include "Commands.h"
#include <zmqpp/message.hpp>
zmqpp::message& operator>>(zmqpp::message& msg, IPC::Header& header)
{
msg >> header.Channel;
msg >> header.Command;
return msg;
}
zmqpp::message& operator>>(zmqpp::message& msg, Battlenet::RealmHandle& realm)
{
msg >> realm.Region;
msg >> realm.Site;
msg >> realm.Realm;
return msg;
}
zmqpp::message& operator>>(zmqpp::message& msg, IPC::BattlenetComm::Header& header)
{
msg >> header.Ipc;
msg >> header.Realm;
return msg;
}
zmqpp::message& operator>>(zmqpp::message& msg, IPC::BattlenetComm::ToonHandle& toonHandle)
{
msg >> toonHandle.AccountId;
msg >> toonHandle.GameAccountId;
msg >> toonHandle.Guid;
msg >> toonHandle.Name;
return msg;
}
zmqpp::message& operator<<(zmqpp::message& msg, IPC::Header const& header)
{
msg << header.Channel;
msg << header.Command;
return msg;
}
zmqpp::message& operator<<(zmqpp::message& msg, Battlenet::RealmHandle const& realm)
{
msg << realm.Region;
msg << realm.Site;
msg << realm.Realm;
return msg;
}
zmqpp::message& operator<<(zmqpp::message& msg, IPC::BattlenetComm::Header const& header)
{
msg << header.Ipc;
msg << header.Realm;
return msg;
}
zmqpp::message& operator<<(zmqpp::message& msg, IPC::BattlenetComm::ToonHandle const& toonHandle)
{
msg << toonHandle.AccountId;
msg << toonHandle.GameAccountId;
msg << toonHandle.Guid;
msg << toonHandle.Name;
return msg;
}

View File

@@ -1,80 +0,0 @@
/*
* Copyright (C) 2008-2016 TrinityCore <http://www.trinitycore.org/>
*
* This program is free software; you can redistribute it and/or modify it
* under the terms of the GNU General Public License as published by the
* Free Software Foundation; either version 2 of the License, or (at your
* option) any later version.
*
* This program is distributed in the hope that it will be useful, but WITHOUT
* ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or
* FITNESS FOR A PARTICULAR PURPOSE. See the GNU General Public License for
* more details.
*
* You should have received a copy of the GNU General Public License along
* with this program. If not, see <http://www.gnu.org/licenses/>.
*/
#ifndef _COMMANDS_H
#define _COMMANDS_H
#include "Define.h"
#include "Realm/Realm.h"
#include <string>
enum Channels
{
IPC_CHANNEL_BNET,
MAX_IPC_CHANNELS,
};
enum BnetCommands
{
BNET_CHANGE_TOON_ONLINE_STATE,
IPC_BNET_MAX_COMMAND
};
namespace IPC
{
struct Header
{
uint8 Channel;
uint8 Command;
};
namespace BattlenetComm
{
struct Header
{
IPC::Header Ipc;
Battlenet::RealmHandle Realm;
};
struct ToonHandle
{
uint32 AccountId;
uint32 GameAccountId;
uint64 Guid;
std::string Name;
};
}
}
namespace zmqpp
{
class message;
}
zmqpp::message& operator>>(zmqpp::message& msg, IPC::Header& header);
zmqpp::message& operator>>(zmqpp::message& msg, Battlenet::RealmHandle& realm);
zmqpp::message& operator>>(zmqpp::message& msg, IPC::BattlenetComm::Header& header);
zmqpp::message& operator>>(zmqpp::message& msg, IPC::BattlenetComm::ToonHandle& toonHandle);
zmqpp::message& operator<<(zmqpp::message& msg, IPC::Header const& header);
zmqpp::message& operator<<(zmqpp::message& msg, Battlenet::RealmHandle const& realm);
zmqpp::message& operator<<(zmqpp::message& msg, IPC::BattlenetComm::Header const& header);
zmqpp::message& operator<<(zmqpp::message& msg, IPC::BattlenetComm::ToonHandle const& toonHandle);
#endif // _COMMANDS_H

View File

@@ -1,94 +0,0 @@
/*
* Copyright (C) 2008-2016 TrinityCore <http://www.trinitycore.org/>
*
* This program is free software; you can redistribute it and/or modify it
* under the terms of the GNU General Public License as published by the
* Free Software Foundation; either version 2 of the License, or (at your
* option) any later version.
*
* This program is distributed in the hope that it will be useful, but WITHOUT
* ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or
* FITNESS FOR A PARTICULAR PURPOSE. See the GNU General Public License for
* more details.
*
* You should have received a copy of the GNU General Public License along
* with this program. If not, see <http://www.gnu.org/licenses/>.
*/
#include "ZMQTask.h"
#include "ZmqContext.h"
#include <zmqpp/message.hpp>
ZMQTask::ZMQTask()
{
_poller = new zmqpp::poller();
}
ZMQTask::~ZMQTask()
{
delete _poller;
_poller = NULL;
delete _inproc;
delete _thread;
}
void ZMQTask::Start()
{
_inproc = sIpcContext->CreateInprocSubscriber();
_poller->add(*_inproc);
HandleOpen();
_thread = new std::thread(&ZMQTask::Run, this);
}
void ZMQTask::End()
{
_thread->join();
_poller->remove(*_inproc);
_inproc->close();
HandleClose();
}
bool ZMQTask::ProcessExit()
{
if (_poller->events(*_inproc) == zmqpp::poller::poll_in)
{
int op1;
do
{
zmqpp::message msg;
if (!_inproc->receive(msg, true))
return false; //No more messages to read from sock. This shouldn't happen.
// strip 'internalmq.' from message
std::string cmd = msg.get(0).substr(11);
if (cmd == "kill")
return true;
_inproc->get(zmqpp::socket_option::events, op1);
} while (op1 & zmqpp::poller::poll_in);
}
return false;
}
void ZMQTask::Pipeline(zmqpp::socket* from, zmqpp::socket* to)
{
/*
Push messages from socket to socket.
*/
if (_poller->events(*from) == zmqpp::poller::poll_in)
{
int32 op1, op2;
do
{
zmqpp::message msg;
if (!from->receive(msg, true))
return; //No more messages to read from socket. This shouldn't happen.
to->send(msg);
from->get(zmqpp::socket_option::events, op1);
to->get(zmqpp::socket_option::events, op2);
} while(op1 & zmqpp::poller::poll_in && op2 & zmqpp::poller::poll_out);
}
}

View File

@@ -1,52 +0,0 @@
/*
* Copyright (C) 2008-2016 TrinityCore <http://www.trinitycore.org/>
*
* This program is free software; you can redistribute it and/or modify it
* under the terms of the GNU General Public License as published by the
* Free Software Foundation; either version 2 of the License, or (at your
* option) any later version.
*
* This program is distributed in the hope that it will be useful, but WITHOUT
* ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or
* FITNESS FOR A PARTICULAR PURPOSE. See the GNU General Public License for
* more details.
*
* You should have received a copy of the GNU General Public License along
* with this program. If not, see <http://www.gnu.org/licenses/>.
*/
#ifndef __ZMQTASK_H
#define __ZMQTASK_H
#include "Define.h"
#include <thread>
#include <zmqpp/poller.hpp>
#include <zmqpp/socket.hpp>
/*
This class serves as a base for all long running tasks
It is set up to terminate its running task upon receiving "kill" command
*/
class ZMQTask
{
public:
ZMQTask();
virtual ~ZMQTask();
void Start();
void End();
virtual void Run() = 0;
protected:
virtual void HandleOpen() { }
virtual void HandleClose() { }
void Pipeline(zmqpp::socket* from, zmqpp::socket* to);
bool ProcessExit();
zmqpp::poller* _poller;
zmqpp::socket* _inproc = nullptr;
std::thread* _thread = nullptr;
};
#endif // __ZMQTASK_H

View File

@@ -1,55 +0,0 @@
/*
* Copyright (C) 2008-2016 TrinityCore <http://www.trinitycore.org/>
*
* This program is free software; you can redistribute it and/or modify it
* under the terms of the GNU General Public License as published by the
* Free Software Foundation; either version 2 of the License, or (at your
* option) any later version.
*
* This program is distributed in the hope that it will be useful, but WITHOUT
* ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or
* FITNESS FOR A PARTICULAR PURPOSE. See the GNU General Public License for
* more details.
*
* You should have received a copy of the GNU General Public License along
* with this program. If not, see <http://www.gnu.org/licenses/>.
*/
#include "ZmqContext.h"
ZmqContext::ZmqContext() : _inproc(nullptr)
{
}
ZmqContext::~ZmqContext()
{
}
zmqpp::socket* ZmqContext::CreateNewSocket(zmqpp::socket_type type)
{
std::unique_lock<std::mutex> lock(_mutex);
zmqpp::socket* socket = new zmqpp::socket(_context, type);
socket->set(zmqpp::socket_option::linger, 0);
return socket;
}
void ZmqContext::Initialize()
{
_inproc = new zmqpp::socket(_context, zmqpp::socket_type::pub);
_inproc->bind("inproc://workers");
}
zmqpp::socket* ZmqContext::CreateInprocSubscriber()
{
zmqpp::socket* sub = CreateNewSocket(zmqpp::socket_type::sub);
sub->connect("inproc://workers");
sub->subscribe("internalmq.");
return sub;
}
void ZmqContext::Close()
{
_inproc->send("internalmq.kill");
delete _inproc;
_inproc = nullptr;
}

View File

@@ -1,55 +0,0 @@
/*
* Copyright (C) 2008-2016 TrinityCore <http://www.trinitycore.org/>
*
* This program is free software; you can redistribute it and/or modify it
* under the terms of the GNU General Public License as published by the
* Free Software Foundation; either version 2 of the License, or (at your
* option) any later version.
*
* This program is distributed in the hope that it will be useful, but WITHOUT
* ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or
* FITNESS FOR A PARTICULAR PURPOSE. See the GNU General Public License for
* more details.
*
* You should have received a copy of the GNU General Public License along
* with this program. If not, see <http://www.gnu.org/licenses/>.
*/
#ifndef __ZMQCONTEX_H
#define __ZMQCONTEX_H
#include <zmqpp/zmqpp.hpp>
#include <mutex>
/*
* We need to serialize access to zmq context otherwise stuff blows up.
*/
class ZmqContext
{
public:
~ZmqContext();
static ZmqContext* Instance()
{
static ZmqContext instance;
return &instance;
}
zmqpp::socket* CreateNewSocket(zmqpp::socket_type);
void Initialize();
zmqpp::socket* CreateInprocSubscriber();
void Close();
private:
ZmqContext();
ZmqContext(ZmqContext const&) = delete;
ZmqContext& operator=(ZmqContext const&) = delete;
zmqpp::context _context;
std::mutex _mutex;
zmqpp::socket* _inproc;
};
#define sIpcContext ZmqContext::Instance()
#endif

View File

@@ -1,69 +0,0 @@
/*
* Copyright (C) 2008-2016 TrinityCore <http://www.trinitycore.org/>
*
* This program is free software; you can redistribute it and/or modify it
* under the terms of the GNU General Public License as published by the
* Free Software Foundation; either version 2 of the License, or (at your
* option) any later version.
*
* This program is distributed in the hope that it will be useful, but WITHOUT
* ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or
* FITNESS FOR A PARTICULAR PURPOSE. See the GNU General Public License for
* more details.
*
* You should have received a copy of the GNU General Public License along
* with this program. If not, see <http://www.gnu.org/licenses/>.
*/
#include "ZmqListener.h"
#include "ZmqContext.h"
ZmqListener::ZmqListener(std::string const& from, std::string const& to)
{
_from = sIpcContext->CreateNewSocket(zmqpp::socket_type::sub);
_to = sIpcContext->CreateNewSocket(zmqpp::socket_type::push);
_from->connect(from);
_to->bind(to);
}
ZmqListener::~ZmqListener()
{
delete _from;
delete _to;
}
void ZmqListener::HandleOpen()
{
}
void ZmqListener::HandleClose()
{
_from->close();
_to->close();
}
void ZmqListener::Run()
{
while (!ProcessExit())
{
_poller->poll();
while (_poller->events(*_from) & zmqpp::poller::poll_in &&
_poller->events(*_to) & zmqpp::poller::poll_out)
{
zmqpp::message msg;
_from->receive(msg);
_to->send(msg);
}
}
}
void ZmqListener::Subscribe(std::string const& keyword)
{
_from->subscribe(keyword);
}
void ZmqListener::Unsubscribe(std::string const& keyword)
{
_from->unsubscribe(keyword);
}

View File

@@ -1,51 +0,0 @@
/*
* Copyright (C) 2008-2016 TrinityCore <http://www.trinitycore.org/>
*
* This program is free software; you can redistribute it and/or modify it
* under the terms of the GNU General Public License as published by the
* Free Software Foundation; either version 2 of the License, or (at your
* option) any later version.
*
* This program is distributed in the hope that it will be useful, but WITHOUT
* ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or
* FITNESS FOR A PARTICULAR PURPOSE. See the GNU General Public License for
* more details.
*
* You should have received a copy of the GNU General Public License along
* with this program. If not, see <http://www.gnu.org/licenses/>.
*/
#ifndef __ZMQLISTENER_H
#define __ZMQLISTENER_H
#include "ZMQTask.h"
#include <zmqpp/zmqpp.hpp>
class ZmqListener : public ZMQTask
{
/*
* Read broadcasts from remote PUB socket, and forward them to
* another socket.
*
* from - client SUB socket
* to - listen PUSH socket
*
*/
public:
ZmqListener(std::string const& from, std::string const& to);
~ZmqListener();
void Run() override;
void Subscribe(std::string const& keyword);
void Unsubscribe(std::string const& keyword);
protected:
void HandleOpen() override;
void HandleClose() override;
private:
zmqpp::socket* _from;
zmqpp::socket* _to;
};
#endif

View File

@@ -1,68 +0,0 @@
/*
* Copyright (C) 2008-2016 TrinityCore <http://www.trinitycore.org/>
*
* This program is free software; you can redistribute it and/or modify it
* under the terms of the GNU General Public License as published by the
* Free Software Foundation; either version 2 of the License, or (at your
* option) any later version.
*
* This program is distributed in the hope that it will be useful, but WITHOUT
* ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or
* FITNESS FOR A PARTICULAR PURPOSE. See the GNU General Public License for
* more details.
*
* You should have received a copy of the GNU General Public License along
* with this program. If not, see <http://www.gnu.org/licenses/>.
*/
#include "ZmqMux.h"
#include "ZmqContext.h"
ZmqMux::ZmqMux(std::string const& fromUri, std::string const& toUri):
_fromAddress(fromUri)
{
printf("Opening muxer thread from %s to %s\n", fromUri.c_str(), toUri.c_str());
_from = sIpcContext->CreateNewSocket(zmqpp::socket_type::pull);
_to = sIpcContext->CreateNewSocket(zmqpp::socket_type::push);
_from->bind(fromUri);
_to->connect(toUri);
}
ZmqMux::~ZmqMux()
{
delete _from;
delete _to;
}
void ZmqMux::HandleOpen()
{
_poller->add(*_from);
_poller->add(*_to, zmqpp::poller::poll_out);
}
bool ZmqMux::Send(zmqpp::message* m, bool dont_block)
{
if (_socket.get() == nullptr)
{
_socket.reset(sIpcContext->CreateNewSocket(zmqpp::socket_type::push));
_socket->connect(_fromAddress);
}
return _socket->send(*m, dont_block);
}
void ZmqMux::Run()
{
for (;;)
{
std::this_thread::sleep_for(std::chrono::milliseconds(50));
if (!_poller->poll())
break;
if (ProcessExit())
break;
Pipeline(_from, _to);
}
}

View File

@@ -1,47 +0,0 @@
/*
* Copyright (C) 2008-2016 TrinityCore <http://www.trinitycore.org/>
*
* This program is free software; you can redistribute it and/or modify it
* under the terms of the GNU General Public License as published by the
* Free Software Foundation; either version 2 of the License, or (at your
* option) any later version.
*
* This program is distributed in the hope that it will be useful, but WITHOUT
* ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or
* FITNESS FOR A PARTICULAR PURPOSE. See the GNU General Public License for
* more details.
*
* You should have received a copy of the GNU General Public License along
* with this program. If not, see <http://www.gnu.org/licenses/>.
*/
#ifndef __ZMQMUX_H
#define __ZMQMUX_H
#include "ZMQTask.h"
#include <string>
#include <boost/thread/tss.hpp>
/*
* Multiplexes zmq messages from many threads,
* and then passes them to another socket.
*/
class ZmqMux : public ZMQTask
{
public:
ZmqMux(std::string const& from, std::string const& to);
~ZmqMux();
bool Send(zmqpp::message*, bool dont_block = false);
void Run() override;
protected:
void HandleOpen() override;
private:
boost::thread_specific_ptr<zmqpp::socket> _socket;
zmqpp::socket* _from;
zmqpp::socket* _to;
std::string const _fromAddress;
};
#endif

View File

@@ -1,69 +0,0 @@
/*
* Copyright (C) 2008-2016 TrinityCore <http://www.trinitycore.org/>
*
* This program is free software; you can redistribute it and/or modify it
* under the terms of the GNU General Public License as published by the
* Free Software Foundation; either version 2 of the License, or (at your
* option) any later version.
*
* This program is distributed in the hope that it will be useful, but WITHOUT
* ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or
* FITNESS FOR A PARTICULAR PURPOSE. See the GNU General Public License for
* more details.
*
* You should have received a copy of the GNU General Public License along
* with this program. If not, see <http://www.gnu.org/licenses/>.
*/
#include "ZmqWorker.h"
#include "ZmqContext.h"
ZmqWorker::ZmqWorker(std::string const& taskUri, std::string const& resUri) :
_taskUri(taskUri), _resultsUri(resUri)
{
}
ZmqWorker::~ZmqWorker()
{
delete _taskQueue;
delete _results;
}
void ZmqWorker::HandleOpen()
{
_taskQueue = sIpcContext->CreateNewSocket(zmqpp::socket_type::pull);
_results = sIpcContext->CreateNewSocket(zmqpp::socket_type::push);
_taskQueue->connect(_taskUri);
_results->connect(_resultsUri);
_poller->add(*_taskQueue);
}
void ZmqWorker::HandleClose()
{
_taskQueue->close();
_results->close();
}
void ZmqWorker::Run()
{
while (!ProcessExit())
{
_poller->poll();
if (_poller->events(*_taskQueue) & zmqpp::poller::poll_in)
PerformWork();
}
}
void ZmqWorker::PerformWork()
{
int32 op1;
do
{
zmqpp::message msg;
_taskQueue->receive(msg);
Dispatch(msg);
_taskQueue->get(zmqpp::socket_option::events, op1);
} while (op1 & zmqpp::poller::poll_in);
}

View File

@@ -1,44 +0,0 @@
/*
* Copyright (C) 2008-2016 TrinityCore <http://www.trinitycore.org/>
*
* This program is free software; you can redistribute it and/or modify it
* under the terms of the GNU General Public License as published by the
* Free Software Foundation; either version 2 of the License, or (at your
* option) any later version.
*
* This program is distributed in the hope that it will be useful, but WITHOUT
* ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or
* FITNESS FOR A PARTICULAR PURPOSE. See the GNU General Public License for
* more details.
*
* You should have received a copy of the GNU General Public License along
* with this program. If not, see <http://www.gnu.org/licenses/>.
*/
#ifndef __ZMQWORKER_H
#define __ZMQWORKER_H
#include "ZMQTask.h"
#include <zmqpp/zmqpp.hpp>
class ZmqWorker : public ZMQTask
{
public:
ZmqWorker(std::string const& taskUri, std::string const& resUri);
~ZmqWorker();
void Run() override;
protected:
void HandleOpen() override;
void HandleClose() override;
zmqpp::socket* _results = nullptr;
private:
void PerformWork();
virtual void Dispatch(zmqpp::message const&) = 0;
zmqpp::socket* _taskQueue = nullptr;
std::string _taskUri;
std::string _resultsUri;
};
#endif

View File

@@ -21,8 +21,6 @@
/// \file
#include "Common.h"
#include "Commands.h"
#include "ZmqContext.h"
#include "DatabaseEnv.h"
#include "AsyncAcceptor.h"
#include "RASession.h"
@@ -43,7 +41,6 @@
#include "GitRevision.h"
#include "WorldSocket.h"
#include "WorldSocketMgr.h"
#include "BattlenetServerManager.h"
#include "Realm/Realm.h"
#include "DatabaseLoader.h"
#include "AppenderDB.h"
@@ -240,10 +237,6 @@ extern int main(int argc, char** argv)
TC_LOG_INFO("server.worldserver", "Starting up anti-freeze thread (%u seconds max stuck time)...", coreStuckTime);
}
sIpcContext->Initialize();
sBattlenetServer.InitializeConnection();
TC_LOG_INFO("server.worldserver", "%s (worldserver-daemon) ready...", GitRevision::GetFullVersion());
sScriptMgr->OnStartup();
@@ -257,10 +250,6 @@ extern int main(int argc, char** argv)
sScriptMgr->OnShutdown();
sIpcContext->Close();
sBattlenetServer.CloseConnection();
sWorld->KickAll(); // save and kick all players
sWorld->UpdateSessions(1); // real players unload required UpdateSessions call

View File

@@ -3626,24 +3626,3 @@ PacketSpoof.BanDuration = 86400
#
###################################################################################################
###################################################################################################
# IPC SETTINGS
#
# BnetServer.Address
# Description: Determines IP address of battle.net server to connect to.
# Default: 127.0.0.1
#
BnetServer.Address = 127.0.0.1
#
# BnetServer.Port
# Description: Determines port to use when connecting to battle.net server.
# Default: 1118
#
BnetServer.Port = 1118
#
###################################################################################################