diff options
Diffstat (limited to 'src/server/ipc')
| -rw-r--r-- | src/server/ipc/CMakeLists.txt | 24 | ||||
| -rw-r--r-- | src/server/ipc/Commands.cpp | 81 | ||||
| -rw-r--r-- | src/server/ipc/Commands.h | 83 | ||||
| -rw-r--r-- | src/server/ipc/ZMQTask.cpp | 93 | ||||
| -rw-r--r-- | src/server/ipc/ZMQTask.h | 52 | ||||
| -rw-r--r-- | src/server/ipc/ZmqContext.cpp | 52 | ||||
| -rw-r--r-- | src/server/ipc/ZmqContext.h | 55 | ||||
| -rw-r--r-- | src/server/ipc/ZmqListener.cpp | 69 | ||||
| -rw-r--r-- | src/server/ipc/ZmqListener.h | 51 | ||||
| -rw-r--r-- | src/server/ipc/ZmqMux.cpp | 67 | ||||
| -rw-r--r-- | src/server/ipc/ZmqMux.h | 47 | ||||
| -rw-r--r-- | src/server/ipc/ZmqWorker.cpp | 70 | ||||
| -rw-r--r-- | src/server/ipc/ZmqWorker.h | 44 | 
13 files changed, 788 insertions, 0 deletions
diff --git a/src/server/ipc/CMakeLists.txt b/src/server/ipc/CMakeLists.txt new file mode 100644 index 00000000000..93a5d630dfe --- /dev/null +++ b/src/server/ipc/CMakeLists.txt @@ -0,0 +1,24 @@ +# Copyright (C) 2008-2014 TrinityCore <http://www.trinitycore.org/> +# +# This file is free software; as a special exception the author gives +# unlimited permission to copy and/or distribute it, with or without +# modifications, as long as this notice is preserved. +# +# This program is distributed in the hope that it will be useful, but +# WITHOUT ANY WARRANTY, to the extent permitted by law; without even the +# implied warranty of MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. + +file(GLOB_RECURSE sources_ipc *.cpp *.h) + +set(ipc_SRCS +  ${sources_ipc} +) + +include_directories( +  ${CMAKE_BINARY_DIR} +  ${CMAKE_SOURCE_DIR}/dep/zmqpp +  ${CMAKE_SOURCE_DIR}/src/server/shared/ +  ${ZMQ_INCLUDE_DIR} +) + +add_library(ipc STATIC ${ipc_SRCS}) diff --git a/src/server/ipc/Commands.cpp b/src/server/ipc/Commands.cpp new file mode 100644 index 00000000000..8e494fc34b9 --- /dev/null +++ b/src/server/ipc/Commands.cpp @@ -0,0 +1,81 @@ +/* + * Copyright (C) 2008-2014 TrinityCore <http://www.trinitycore.org/> + * + * This program is free software; you can redistribute it and/or modify it + * under the terms of the GNU General Public License as published by the + * Free Software Foundation; either version 2 of the License, or (at your + * option) any later version. + * + * This program is distributed in the hope that it will be useful, but WITHOUT + * ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or + * FITNESS FOR A PARTICULAR PURPOSE. See the GNU General Public License for + * more details. + * + * You should have received a copy of the GNU General Public License along + * with this program. If not, see <http://www.gnu.org/licenses/>. + */ + +#include "Commands.h" +#include <zmqpp/message.hpp> + +zmqpp::message& operator>>(zmqpp::message& msg, IPCHeader& header) +{ +    msg >> header.Channel; +    msg >> header.Command; +    return msg; +} + +zmqpp::message& operator>>(zmqpp::message& msg, Battlenet::RealmHandle& realm) +{ +    msg >> realm.Region; +    msg >> realm.Battlegroup; +    msg >> realm.Index; +    return msg; +} + +zmqpp::message& operator>>(zmqpp::message& msg, Battlenet::Header& header) +{ +    msg >> header.Ipc; +    msg >> header.Realm; +    return msg; +} + +zmqpp::message& operator>>(zmqpp::message& msg, Battlenet::ToonHandle& toonHandle) +{ +    msg >> toonHandle.AccountId; +    msg >> toonHandle.GameAccountId; +    msg >> toonHandle.Guid; +    msg >> toonHandle.Name; +    return msg; +} + +zmqpp::message& operator<<(zmqpp::message& msg, IPCHeader& header) +{ +    msg << header.Channel; +    msg << header.Command; +    return msg; +} + +zmqpp::message& operator<<(zmqpp::message& msg, Battlenet::RealmHandle& realm) +{ +    msg << realm.Region; +    msg << realm.Battlegroup; +    msg << realm.Index; +    return msg; +} + +zmqpp::message& operator<<(zmqpp::message& msg, Battlenet::Header& header) +{ +    msg << header.Ipc; +    msg << header.Realm; +    return msg; +} + +zmqpp::message& operator<<(zmqpp::message& msg, Battlenet::ToonHandle& toonHandle) +{ +    msg << toonHandle.AccountId; +    msg << toonHandle.GameAccountId; +    msg << toonHandle.Guid; +    msg << toonHandle.Name; +    return msg; +} diff --git a/src/server/ipc/Commands.h b/src/server/ipc/Commands.h new file mode 100644 index 00000000000..05309a45022 --- /dev/null +++ b/src/server/ipc/Commands.h @@ -0,0 +1,83 @@ +/* + * Copyright (C) 2008-2014 TrinityCore <http://www.trinitycore.org/> + * + * This program is free software; you can redistribute it and/or modify it + * under the terms of the GNU General Public License as published by the + * Free Software Foundation; either version 2 of the License, or (at your + * option) any later version. + * + * This program is distributed in the hope that it will be useful, but WITHOUT + * ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or + * FITNESS FOR A PARTICULAR PURPOSE. See the GNU General Public License for + * more details. + * + * You should have received a copy of the GNU General Public License along + * with this program. If not, see <http://www.gnu.org/licenses/>. + */ + +#ifndef _COMMANDS_H +#define _COMMANDS_H + +#include "Define.h" +#include <string> + +enum Channels +{ +    IPC_CHANNEL_BNET, + +    MAX_IPC_CHANNELS, +}; + +enum BnetCommands +{ +    BNET_CHANGE_TOON_ONLINE_STATE, + +    IPC_BNET_MAX_COMMAND +}; + +struct IPCHeader +{ +    uint8 Channel; +    uint8 Command; +}; + +namespace Battlenet +{ +    struct RealmHandle +    { +        uint8 Region; +        uint8 Battlegroup; +        uint32 Index; +    }; + +    struct Header +    { +        IPCHeader Ipc; +        RealmHandle Realm; +    }; + +    struct ToonHandle +    { +        uint32 AccountId; +        uint32 GameAccountId; +        uint64 Guid; +        std::string Name; +    }; +} + +namespace zmqpp +{ +    class message; +} + +zmqpp::message& operator>>(zmqpp::message& msg, IPCHeader& header); +zmqpp::message& operator>>(zmqpp::message& msg, Battlenet::RealmHandle& realm); +zmqpp::message& operator>>(zmqpp::message& msg, Battlenet::Header& header); +zmqpp::message& operator>>(zmqpp::message& msg, Battlenet::ToonHandle& toonHandle); + +zmqpp::message& operator<<(zmqpp::message& msg, IPCHeader& header); +zmqpp::message& operator<<(zmqpp::message& msg, Battlenet::RealmHandle& realm); +zmqpp::message& operator<<(zmqpp::message& msg, Battlenet::Header& header); +zmqpp::message& operator<<(zmqpp::message& msg, Battlenet::ToonHandle& toonHandle); + +#endif // _COMMANDS_H diff --git a/src/server/ipc/ZMQTask.cpp b/src/server/ipc/ZMQTask.cpp new file mode 100644 index 00000000000..0d25dc2babf --- /dev/null +++ b/src/server/ipc/ZMQTask.cpp @@ -0,0 +1,93 @@ +/* + * Copyright (C) 2008-2014 TrinityCore <http://www.trinitycore.org/> + * + * This program is free software; you can redistribute it and/or modify it + * under the terms of the GNU General Public License as published by the + * Free Software Foundation; either version 2 of the License, or (at your + * option) any later version. + * + * This program is distributed in the hope that it will be useful, but WITHOUT + * ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or + * FITNESS FOR A PARTICULAR PURPOSE. See the GNU General Public License for + * more details. + * + * You should have received a copy of the GNU General Public License along + * with this program. If not, see <http://www.gnu.org/licenses/>. + */ + +#include "ZMQTask.h" +#include "ZmqContext.h" +#include <zmqpp/message.hpp> + +ZMQTask::ZMQTask() +{ +    _poller = new zmqpp::poller(); +} + +ZMQTask::~ZMQTask() +{ +    delete _poller; +    _poller = NULL; +    delete _inproc; +    delete _thread; +} + +void ZMQTask::Start() +{ +    _inproc = sIpcContext->CreateInprocSubscriber(); +    _poller->add(*_inproc); + +    HandleOpen(); +    _thread = new std::thread(&ZMQTask::Run, this); +} + +void ZMQTask::End() +{ +    _thread->join(); +    _inproc->close(); +    HandleClose(); +} + +bool ZMQTask::ProcessExit() +{ +    if (_poller->events(*_inproc) == zmqpp::poller::poll_in) +    { +        int op1; +        do +        { +            zmqpp::message msg; +            if (!_inproc->receive(msg, true)) +                return false; //No more messages to read from sock. This shouldn't happen. + +            // strip 'internalmq.' from message +            std::string cmd = msg.get(0).substr(11); +            if (cmd == "kill") +                return true; + +            _inproc->get(zmqpp::socket_option::events, op1); +        } while (op1 & zmqpp::poller::poll_in); +    } + +    return false; +} + +void ZMQTask::Pipeline(zmqpp::socket* from, zmqpp::socket* to) +{ +    /* +      Push messages from socket to socket. +    */ +    if (_poller->events(*from) == zmqpp::poller::poll_in) +    { +        int32 op1, op2; +        do +        { +            zmqpp::message msg; +            if (!from->receive(msg, true)) +                return; //No more messages to read from socket. This shouldn't happen. + +            to->send(msg); +            from->get(zmqpp::socket_option::events, op1); +            to->get(zmqpp::socket_option::events, op2); +        } while(op1 & zmqpp::poller::poll_in && op2 & zmqpp::poller::poll_out); +    } +} diff --git a/src/server/ipc/ZMQTask.h b/src/server/ipc/ZMQTask.h new file mode 100644 index 00000000000..24251893aaa --- /dev/null +++ b/src/server/ipc/ZMQTask.h @@ -0,0 +1,52 @@ +/* + * Copyright (C) 2008-2014 TrinityCore <http://www.trinitycore.org/> + * + * This program is free software; you can redistribute it and/or modify it + * under the terms of the GNU General Public License as published by the + * Free Software Foundation; either version 2 of the License, or (at your + * option) any later version. + * + * This program is distributed in the hope that it will be useful, but WITHOUT + * ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or + * FITNESS FOR A PARTICULAR PURPOSE. See the GNU General Public License for + * more details. + * + * You should have received a copy of the GNU General Public License along + * with this program. If not, see <http://www.gnu.org/licenses/>. + */ + +#ifndef __ZMQTASK_H +#define __ZMQTASK_H + +#include "Define.h" +#include <thread> +#include <zmqpp/poller.hpp> +#include <zmqpp/socket.hpp> + +/* +  This class serves as a base for all long running tasks +  It is set up to terminate its running task upon receiving "kill" command +*/ +class ZMQTask +{ +public: +    ZMQTask(); +    virtual ~ZMQTask(); + +    void Start(); +    void End(); +    virtual void Run() = 0; + +protected: +    virtual void HandleOpen() { } +    virtual void HandleClose() { } +    void Pipeline(zmqpp::socket* from, zmqpp::socket* to); +    bool ProcessExit(); + +    zmqpp::poller* _poller; + +    zmqpp::socket* _inproc; +    std::thread* _thread; +}; + +#endif // __ZMQTASK_H diff --git a/src/server/ipc/ZmqContext.cpp b/src/server/ipc/ZmqContext.cpp new file mode 100644 index 00000000000..305e6b1d843 --- /dev/null +++ b/src/server/ipc/ZmqContext.cpp @@ -0,0 +1,52 @@ +/* + * Copyright (C) 2008-2014 TrinityCore <http://www.trinitycore.org/> + * + * This program is free software; you can redistribute it and/or modify it + * under the terms of the GNU General Public License as published by the + * Free Software Foundation; either version 2 of the License, or (at your + * option) any later version. + * + * This program is distributed in the hope that it will be useful, but WITHOUT + * ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or + * FITNESS FOR A PARTICULAR PURPOSE. See the GNU General Public License for + * more details. + * + * You should have received a copy of the GNU General Public License along + * with this program. If not, see <http://www.gnu.org/licenses/>. + */ + +#include "ZmqContext.h" + +ZmqContext::ZmqContext() : _inproc(nullptr) +{ +} + +ZmqContext::~ZmqContext() +{ +    delete _inproc; +} + +zmqpp::socket* ZmqContext::CreateNewSocket(zmqpp::socket_type type) +{ +    std::unique_lock<std::mutex> lock(_mutex); +    return new zmqpp::socket(_context, type); +} + +void ZmqContext::Initialize() +{ +    _inproc = new zmqpp::socket(_context, zmqpp::socket_type::pub); +    _inproc->bind("inproc://workers"); +} + +zmqpp::socket* ZmqContext::CreateInprocSubscriber() +{ +    zmqpp::socket* sub = CreateNewSocket(zmqpp::socket_type::sub); +    sub->connect("inproc://workers"); +    sub->subscribe("internalmq."); +    return sub; +} + +void ZmqContext::Close() +{ +    _inproc->send("internalmq.kill"); +} diff --git a/src/server/ipc/ZmqContext.h b/src/server/ipc/ZmqContext.h new file mode 100644 index 00000000000..a6ad12b1b70 --- /dev/null +++ b/src/server/ipc/ZmqContext.h @@ -0,0 +1,55 @@ +/* + * Copyright (C) 2008-2014 TrinityCore <http://www.trinitycore.org/> + * + * This program is free software; you can redistribute it and/or modify it + * under the terms of the GNU General Public License as published by the + * Free Software Foundation; either version 2 of the License, or (at your + * option) any later version. + * + * This program is distributed in the hope that it will be useful, but WITHOUT + * ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or + * FITNESS FOR A PARTICULAR PURPOSE. See the GNU General Public License for + * more details. + * + * You should have received a copy of the GNU General Public License along + * with this program. If not, see <http://www.gnu.org/licenses/>. + */ + +#ifndef __ZMQCONTEX_H +#define __ZMQCONTEX_H + +#include <zmqpp/zmqpp.hpp> +#include <mutex> + +/* + * We need to serialize access to zmq context otherwise stuff blows up. + */ +class ZmqContext  +{ +public: +    ~ZmqContext(); + +    static ZmqContext* Instance() +    { +        static ZmqContext instance; +        return &instance; +    } + +    zmqpp::socket* CreateNewSocket(zmqpp::socket_type); +    void Initialize(); +    zmqpp::socket* CreateInprocSubscriber(); +    void Close(); + +private: +    ZmqContext(); +    ZmqContext(ZmqContext const&) = delete; +    ZmqContext& operator=(ZmqContext const&) = delete; + +    zmqpp::context _context; +    std::mutex _mutex; +    zmqpp::socket* _inproc; +}; + +#define sIpcContext ZmqContext::Instance() + +#endif diff --git a/src/server/ipc/ZmqListener.cpp b/src/server/ipc/ZmqListener.cpp new file mode 100644 index 00000000000..98333305e58 --- /dev/null +++ b/src/server/ipc/ZmqListener.cpp @@ -0,0 +1,69 @@ +/* + * Copyright (C) 2008-2014 TrinityCore <http://www.trinitycore.org/> + * + * This program is free software; you can redistribute it and/or modify it + * under the terms of the GNU General Public License as published by the + * Free Software Foundation; either version 2 of the License, or (at your + * option) any later version. + * + * This program is distributed in the hope that it will be useful, but WITHOUT + * ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or + * FITNESS FOR A PARTICULAR PURPOSE. See the GNU General Public License for + * more details. + * + * You should have received a copy of the GNU General Public License along + * with this program. If not, see <http://www.gnu.org/licenses/>. + */ + +#include "ZmqListener.h" +#include "ZmqContext.h" + +ZmqListener::ZmqListener(std::string const& from, std::string const& to) +{ +    _from = sIpcContext->CreateNewSocket(zmqpp::socket_type::sub); +    _to = sIpcContext->CreateNewSocket(zmqpp::socket_type::push); +    _from->connect(from); +    _to->bind(to); +} + +ZmqListener::~ZmqListener() +{ +    delete _from; +    delete _to; +} + +void ZmqListener::HandleOpen() +{ +} + +void ZmqListener::HandleClose() +{ +    _from->close(); +    _to->close(); +} + +void ZmqListener::Run() +{ +    while (!ProcessExit()) +    { +        _poller->poll(); + +        while (_poller->events(*_from) & zmqpp::poller::poll_in && +              _poller->events(*_to) & zmqpp::poller::poll_out) +        { +            zmqpp::message msg; +            _from->receive(msg); +            _to->send(msg); +        } +    } +} + +void ZmqListener::Subscribe(std::string const& keyword) +{ +    _from->subscribe(keyword); +} + +void ZmqListener::Unsubscribe(std::string const& keyword) +{ +    _from->unsubscribe(keyword); +} diff --git a/src/server/ipc/ZmqListener.h b/src/server/ipc/ZmqListener.h new file mode 100644 index 00000000000..8b79ba67f6d --- /dev/null +++ b/src/server/ipc/ZmqListener.h @@ -0,0 +1,51 @@ +/* + * Copyright (C) 2008-2014 TrinityCore <http://www.trinitycore.org/> + * + * This program is free software; you can redistribute it and/or modify it + * under the terms of the GNU General Public License as published by the + * Free Software Foundation; either version 2 of the License, or (at your + * option) any later version. + * + * This program is distributed in the hope that it will be useful, but WITHOUT + * ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or + * FITNESS FOR A PARTICULAR PURPOSE. See the GNU General Public License for + * more details. + * + * You should have received a copy of the GNU General Public License along + * with this program. If not, see <http://www.gnu.org/licenses/>. + */ + +#ifndef __ZMQLISTENER_H +#define __ZMQLISTENER_H + +#include "ZMQTask.h" +#include <zmqpp/zmqpp.hpp> + +class ZmqListener : public ZMQTask +{ +/* + * Read broadcasts from remote PUB socket, and forward them to  + * another socket.  + * + * from - client SUB socket + * to - listen PUSH socket + * + */ +public: +    ZmqListener(std::string const& from, std::string const& to); +    ~ZmqListener(); +    void Run() override; + +    void Subscribe(std::string const& keyword); +    void Unsubscribe(std::string const& keyword); + +protected: +    void HandleOpen() override; +    void HandleClose() override; + +private: +    zmqpp::socket* _from; +    zmqpp::socket* _to; +}; + +#endif diff --git a/src/server/ipc/ZmqMux.cpp b/src/server/ipc/ZmqMux.cpp new file mode 100644 index 00000000000..4b5a4f48b05 --- /dev/null +++ b/src/server/ipc/ZmqMux.cpp @@ -0,0 +1,67 @@ +/* + * Copyright (C) 2008-2014 TrinityCore <http://www.trinitycore.org/> + * + * This program is free software; you can redistribute it and/or modify it + * under the terms of the GNU General Public License as published by the + * Free Software Foundation; either version 2 of the License, or (at your + * option) any later version. + * + * This program is distributed in the hope that it will be useful, but WITHOUT + * ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or + * FITNESS FOR A PARTICULAR PURPOSE. See the GNU General Public License for + * more details. + * + * You should have received a copy of the GNU General Public License along + * with this program. If not, see <http://www.gnu.org/licenses/>. + */ + +#include "ZmqMux.h" +#include "ZmqContext.h" + +ZmqMux::ZmqMux(std::string from_uri, std::string to_uri): +    _fromAddress(from_uri) +{ +    printf("Opening muxer thread from %s to %s\n", from_uri.c_str(), to_uri.c_str()); +    _from = sIpcContext->CreateNewSocket(zmqpp::socket_type::pull); +    _to = sIpcContext->CreateNewSocket(zmqpp::socket_type::push); + +    _from->bind(from_uri); +    _to->connect(to_uri); +} + +ZmqMux::~ZmqMux() +{ +    delete _from; +    delete _to; +} + +void ZmqMux::HandleOpen() +{ +    _poller->add(*_from); +    _poller->add(*_to, zmqpp::poller::poll_out); +} + +bool ZmqMux::Send(zmqpp::message* m, bool dont_block) +{ +    if (_socket.get() == nullptr) +    { +        _socket.reset(sIpcContext->CreateNewSocket(zmqpp::socket_type::push)); +        _socket->connect(_fromAddress); +    } + +    return _socket->send(*m, dont_block); +} + +void ZmqMux::Run() +{ +    for (;;) +    { +        if (!_poller->poll()) +            break; + +        if (ProcessExit()) +            break; + +        Pipeline(_from, _to); +    } +} diff --git a/src/server/ipc/ZmqMux.h b/src/server/ipc/ZmqMux.h new file mode 100644 index 00000000000..4b81f11daaf --- /dev/null +++ b/src/server/ipc/ZmqMux.h @@ -0,0 +1,47 @@ +/* + * Copyright (C) 2008-2014 TrinityCore <http://www.trinitycore.org/> + * + * This program is free software; you can redistribute it and/or modify it + * under the terms of the GNU General Public License as published by the + * Free Software Foundation; either version 2 of the License, or (at your + * option) any later version. + * + * This program is distributed in the hope that it will be useful, but WITHOUT + * ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or + * FITNESS FOR A PARTICULAR PURPOSE. See the GNU General Public License for + * more details. + * + * You should have received a copy of the GNU General Public License along + * with this program. If not, see <http://www.gnu.org/licenses/>. + */ + +#ifndef __ZMQMUX_H +#define __ZMQMUX_H + +#include "ZMQTask.h" +#include <string> +#include <boost/thread/tss.hpp> + +/* + * Multiplexes zmq messages from many threads,  + * and then passes them to another socket. + */ +class ZmqMux : public ZMQTask +{ +public: +    ZmqMux(std::string from, std::string to); +    ~ZmqMux(); +    bool Send(zmqpp::message*, bool dont_block = false); +    void Run() override; + +protected: +    void HandleOpen() override; + +private: +    boost::thread_specific_ptr<zmqpp::socket> _socket; +    zmqpp::socket* _from; +    zmqpp::socket* _to; +    std::string const _fromAddress; +}; + +#endif diff --git a/src/server/ipc/ZmqWorker.cpp b/src/server/ipc/ZmqWorker.cpp new file mode 100644 index 00000000000..f205ea831b5 --- /dev/null +++ b/src/server/ipc/ZmqWorker.cpp @@ -0,0 +1,70 @@ +/* + * Copyright (C) 2008-2014 TrinityCore <http://www.trinitycore.org/> + * + * This program is free software; you can redistribute it and/or modify it + * under the terms of the GNU General Public License as published by the + * Free Software Foundation; either version 2 of the License, or (at your + * option) any later version. + * + * This program is distributed in the hope that it will be useful, but WITHOUT + * ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or + * FITNESS FOR A PARTICULAR PURPOSE. See the GNU General Public License for + * more details. + * + * You should have received a copy of the GNU General Public License along + * with this program. If not, see <http://www.gnu.org/licenses/>. + */ + +#include "ZmqWorker.h" +#include "ZmqContext.h" + +ZmqWorker::ZmqWorker(std::string task_uri, std::string res_uri) : +    _taskUri(task_uri), _resultsUri(res_uri) +{ +} + +ZmqWorker::~ZmqWorker() +{ +    delete _taskQueue; +    delete _results; +    delete _inproc; +} + +void ZmqWorker::HandleOpen() +{ +    _taskQueue =  sIpcContext->CreateNewSocket(zmqpp::socket_type::pull); +    _results = sIpcContext->CreateNewSocket(zmqpp::socket_type::push); + +    _taskQueue->connect(_taskUri); +    _results->connect(_resultsUri); + +    _poller->add(*_taskQueue); +} + +void ZmqWorker::HandleClose() +{ +    _taskQueue->close(); +    _results->close(); +} + +void ZmqWorker::Run() +{ +    while (!ProcessExit()) +    { +        _poller->poll(); +        if (_poller->events(*_taskQueue) & zmqpp::poller::poll_in) +            PerformWork(); +    } +} + +void ZmqWorker::PerformWork() +{ +    int32 op1; +    do +    { +        zmqpp::message msg; +        _taskQueue->receive(msg); +        Dispatch(msg); +        _taskQueue->get(zmqpp::socket_option::events, op1); +    } while (op1 & zmqpp::poller::poll_in); +} diff --git a/src/server/ipc/ZmqWorker.h b/src/server/ipc/ZmqWorker.h new file mode 100644 index 00000000000..b3e221e9129 --- /dev/null +++ b/src/server/ipc/ZmqWorker.h @@ -0,0 +1,44 @@ +/* + * Copyright (C) 2008-2014 TrinityCore <http://www.trinitycore.org/> + * + * This program is free software; you can redistribute it and/or modify it + * under the terms of the GNU General Public License as published by the + * Free Software Foundation; either version 2 of the License, or (at your + * option) any later version. + * + * This program is distributed in the hope that it will be useful, but WITHOUT + * ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or + * FITNESS FOR A PARTICULAR PURPOSE. See the GNU General Public License for + * more details. + * + * You should have received a copy of the GNU General Public License along + * with this program. If not, see <http://www.gnu.org/licenses/>. + */ + +#ifndef __ZMQWORKER_H +#define __ZMQWORKER_H + +#include "ZMQTask.h" +#include <zmqpp/zmqpp.hpp> + +class ZmqWorker : public ZMQTask +{ +public: +    ZmqWorker(std::string task_uri, std::string res_uri); +    ~ZmqWorker(); +    void Run() override; + +protected: +    void HandleOpen() override; +    void HandleClose() override; +    zmqpp::socket* _results; + +private: +    void PerformWork(); +    virtual void Dispatch(zmqpp::message const&) = 0; +    zmqpp::socket* _taskQueue; +    std::string _taskUri; +    std::string _resultsUri; +}; + +#endif  | 
