diff options
author | Shauren <shauren.trinity@gmail.com> | 2016-03-20 18:50:25 +0100 |
---|---|---|
committer | Shauren <shauren.trinity@gmail.com> | 2016-03-20 18:50:25 +0100 |
commit | 92e247b22674dc78a7836947af3f656cc84ac1a1 (patch) | |
tree | e2d08b1bc05cd51bebd2dc29635cd96b539f239e /src/server/ipc | |
parent | 1fea358becc00d358e0c79a86645258080aea5b5 (diff) |
Core/Dep: Removed ZMQ dependency in preparation for future changes
Diffstat (limited to 'src/server/ipc')
-rw-r--r-- | src/server/ipc/CMakeLists.txt | 33 | ||||
-rw-r--r-- | src/server/ipc/Commands.cpp | 81 | ||||
-rw-r--r-- | src/server/ipc/Commands.h | 80 | ||||
-rw-r--r-- | src/server/ipc/ZMQTask.cpp | 94 | ||||
-rw-r--r-- | src/server/ipc/ZMQTask.h | 52 | ||||
-rw-r--r-- | src/server/ipc/ZmqContext.cpp | 55 | ||||
-rw-r--r-- | src/server/ipc/ZmqContext.h | 55 | ||||
-rw-r--r-- | src/server/ipc/ZmqListener.cpp | 69 | ||||
-rw-r--r-- | src/server/ipc/ZmqListener.h | 51 | ||||
-rw-r--r-- | src/server/ipc/ZmqMux.cpp | 68 | ||||
-rw-r--r-- | src/server/ipc/ZmqMux.h | 47 | ||||
-rw-r--r-- | src/server/ipc/ZmqWorker.cpp | 69 | ||||
-rw-r--r-- | src/server/ipc/ZmqWorker.h | 44 |
13 files changed, 0 insertions, 798 deletions
diff --git a/src/server/ipc/CMakeLists.txt b/src/server/ipc/CMakeLists.txt deleted file mode 100644 index 8469c922066..00000000000 --- a/src/server/ipc/CMakeLists.txt +++ /dev/null @@ -1,33 +0,0 @@ -# Copyright (C) 2008-2016 TrinityCore <http://www.trinitycore.org/> -# -# This file is free software; as a special exception the author gives -# unlimited permission to copy and/or distribute it, with or without -# modifications, as long as this notice is preserved. -# -# This program is distributed in the hope that it will be useful, but -# WITHOUT ANY WARRANTY, to the extent permitted by law; without even the -# implied warranty of MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. - -CollectSourceFiles( - ${CMAKE_CURRENT_SOURCE_DIR} - PRIVATE_SOURCES) - -GroupSources(${CMAKE_CURRENT_SOURCE_DIR}) - -add_library(ipc ${PRIVATE_SOURCES}) - -target_include_directories(ipc - PUBLIC - ${CMAKE_CURRENT_SOURCE_DIR} - PRIVATE - ${CMAKE_CURRENT_BINARY_DIR}) - -target_link_libraries(ipc - PUBLIC - shared - zmqpp) - -set_target_properties(ipc - PROPERTIES - FOLDER - "server") diff --git a/src/server/ipc/Commands.cpp b/src/server/ipc/Commands.cpp deleted file mode 100644 index 3eb78cbc14a..00000000000 --- a/src/server/ipc/Commands.cpp +++ /dev/null @@ -1,81 +0,0 @@ -/* - * Copyright (C) 2008-2016 TrinityCore <http://www.trinitycore.org/> - * - * This program is free software; you can redistribute it and/or modify it - * under the terms of the GNU General Public License as published by the - * Free Software Foundation; either version 2 of the License, or (at your - * option) any later version. - * - * This program is distributed in the hope that it will be useful, but WITHOUT - * ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or - * FITNESS FOR A PARTICULAR PURPOSE. See the GNU General Public License for - * more details. - * - * You should have received a copy of the GNU General Public License along - * with this program. If not, see <http://www.gnu.org/licenses/>. - */ - -#include "Commands.h" -#include <zmqpp/message.hpp> - -zmqpp::message& operator>>(zmqpp::message& msg, IPC::Header& header) -{ - msg >> header.Channel; - msg >> header.Command; - return msg; -} - -zmqpp::message& operator>>(zmqpp::message& msg, Battlenet::RealmHandle& realm) -{ - msg >> realm.Region; - msg >> realm.Site; - msg >> realm.Realm; - return msg; -} - -zmqpp::message& operator>>(zmqpp::message& msg, IPC::BattlenetComm::Header& header) -{ - msg >> header.Ipc; - msg >> header.Realm; - return msg; -} - -zmqpp::message& operator>>(zmqpp::message& msg, IPC::BattlenetComm::ToonHandle& toonHandle) -{ - msg >> toonHandle.AccountId; - msg >> toonHandle.GameAccountId; - msg >> toonHandle.Guid; - msg >> toonHandle.Name; - return msg; -} - -zmqpp::message& operator<<(zmqpp::message& msg, IPC::Header const& header) -{ - msg << header.Channel; - msg << header.Command; - return msg; -} - -zmqpp::message& operator<<(zmqpp::message& msg, Battlenet::RealmHandle const& realm) -{ - msg << realm.Region; - msg << realm.Site; - msg << realm.Realm; - return msg; -} - -zmqpp::message& operator<<(zmqpp::message& msg, IPC::BattlenetComm::Header const& header) -{ - msg << header.Ipc; - msg << header.Realm; - return msg; -} - -zmqpp::message& operator<<(zmqpp::message& msg, IPC::BattlenetComm::ToonHandle const& toonHandle) -{ - msg << toonHandle.AccountId; - msg << toonHandle.GameAccountId; - msg << toonHandle.Guid; - msg << toonHandle.Name; - return msg; -} diff --git a/src/server/ipc/Commands.h b/src/server/ipc/Commands.h deleted file mode 100644 index 618e8ca0509..00000000000 --- a/src/server/ipc/Commands.h +++ /dev/null @@ -1,80 +0,0 @@ -/* - * Copyright (C) 2008-2016 TrinityCore <http://www.trinitycore.org/> - * - * This program is free software; you can redistribute it and/or modify it - * under the terms of the GNU General Public License as published by the - * Free Software Foundation; either version 2 of the License, or (at your - * option) any later version. - * - * This program is distributed in the hope that it will be useful, but WITHOUT - * ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or - * FITNESS FOR A PARTICULAR PURPOSE. See the GNU General Public License for - * more details. - * - * You should have received a copy of the GNU General Public License along - * with this program. If not, see <http://www.gnu.org/licenses/>. - */ - -#ifndef _COMMANDS_H -#define _COMMANDS_H - -#include "Define.h" -#include "Realm/Realm.h" -#include <string> - -enum Channels -{ - IPC_CHANNEL_BNET, - - MAX_IPC_CHANNELS, -}; - -enum BnetCommands -{ - BNET_CHANGE_TOON_ONLINE_STATE, - - IPC_BNET_MAX_COMMAND -}; - -namespace IPC -{ - struct Header - { - uint8 Channel; - uint8 Command; - }; - - namespace BattlenetComm - { - struct Header - { - IPC::Header Ipc; - Battlenet::RealmHandle Realm; - }; - - struct ToonHandle - { - uint32 AccountId; - uint32 GameAccountId; - uint64 Guid; - std::string Name; - }; - } -} - -namespace zmqpp -{ - class message; -} - -zmqpp::message& operator>>(zmqpp::message& msg, IPC::Header& header); -zmqpp::message& operator>>(zmqpp::message& msg, Battlenet::RealmHandle& realm); -zmqpp::message& operator>>(zmqpp::message& msg, IPC::BattlenetComm::Header& header); -zmqpp::message& operator>>(zmqpp::message& msg, IPC::BattlenetComm::ToonHandle& toonHandle); - -zmqpp::message& operator<<(zmqpp::message& msg, IPC::Header const& header); -zmqpp::message& operator<<(zmqpp::message& msg, Battlenet::RealmHandle const& realm); -zmqpp::message& operator<<(zmqpp::message& msg, IPC::BattlenetComm::Header const& header); -zmqpp::message& operator<<(zmqpp::message& msg, IPC::BattlenetComm::ToonHandle const& toonHandle); - -#endif // _COMMANDS_H diff --git a/src/server/ipc/ZMQTask.cpp b/src/server/ipc/ZMQTask.cpp deleted file mode 100644 index 6d359d9b10a..00000000000 --- a/src/server/ipc/ZMQTask.cpp +++ /dev/null @@ -1,94 +0,0 @@ -/* - * Copyright (C) 2008-2016 TrinityCore <http://www.trinitycore.org/> - * - * This program is free software; you can redistribute it and/or modify it - * under the terms of the GNU General Public License as published by the - * Free Software Foundation; either version 2 of the License, or (at your - * option) any later version. - * - * This program is distributed in the hope that it will be useful, but WITHOUT - * ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or - * FITNESS FOR A PARTICULAR PURPOSE. See the GNU General Public License for - * more details. - * - * You should have received a copy of the GNU General Public License along - * with this program. If not, see <http://www.gnu.org/licenses/>. - */ - -#include "ZMQTask.h" -#include "ZmqContext.h" -#include <zmqpp/message.hpp> - -ZMQTask::ZMQTask() -{ - _poller = new zmqpp::poller(); -} - -ZMQTask::~ZMQTask() -{ - delete _poller; - _poller = NULL; - delete _inproc; - delete _thread; -} - -void ZMQTask::Start() -{ - _inproc = sIpcContext->CreateInprocSubscriber(); - _poller->add(*_inproc); - - HandleOpen(); - _thread = new std::thread(&ZMQTask::Run, this); -} - -void ZMQTask::End() -{ - _thread->join(); - _poller->remove(*_inproc); - _inproc->close(); - HandleClose(); -} - -bool ZMQTask::ProcessExit() -{ - if (_poller->events(*_inproc) == zmqpp::poller::poll_in) - { - int op1; - do - { - zmqpp::message msg; - if (!_inproc->receive(msg, true)) - return false; //No more messages to read from sock. This shouldn't happen. - - // strip 'internalmq.' from message - std::string cmd = msg.get(0).substr(11); - if (cmd == "kill") - return true; - - _inproc->get(zmqpp::socket_option::events, op1); - } while (op1 & zmqpp::poller::poll_in); - } - - return false; -} - -void ZMQTask::Pipeline(zmqpp::socket* from, zmqpp::socket* to) -{ - /* - Push messages from socket to socket. - */ - if (_poller->events(*from) == zmqpp::poller::poll_in) - { - int32 op1, op2; - do - { - zmqpp::message msg; - if (!from->receive(msg, true)) - return; //No more messages to read from socket. This shouldn't happen. - - to->send(msg); - from->get(zmqpp::socket_option::events, op1); - to->get(zmqpp::socket_option::events, op2); - } while(op1 & zmqpp::poller::poll_in && op2 & zmqpp::poller::poll_out); - } -} diff --git a/src/server/ipc/ZMQTask.h b/src/server/ipc/ZMQTask.h deleted file mode 100644 index 85773adbfd1..00000000000 --- a/src/server/ipc/ZMQTask.h +++ /dev/null @@ -1,52 +0,0 @@ -/* - * Copyright (C) 2008-2016 TrinityCore <http://www.trinitycore.org/> - * - * This program is free software; you can redistribute it and/or modify it - * under the terms of the GNU General Public License as published by the - * Free Software Foundation; either version 2 of the License, or (at your - * option) any later version. - * - * This program is distributed in the hope that it will be useful, but WITHOUT - * ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or - * FITNESS FOR A PARTICULAR PURPOSE. See the GNU General Public License for - * more details. - * - * You should have received a copy of the GNU General Public License along - * with this program. If not, see <http://www.gnu.org/licenses/>. - */ - -#ifndef __ZMQTASK_H -#define __ZMQTASK_H - -#include "Define.h" -#include <thread> -#include <zmqpp/poller.hpp> -#include <zmqpp/socket.hpp> - -/* - This class serves as a base for all long running tasks - It is set up to terminate its running task upon receiving "kill" command -*/ -class ZMQTask -{ -public: - ZMQTask(); - virtual ~ZMQTask(); - - void Start(); - void End(); - virtual void Run() = 0; - -protected: - virtual void HandleOpen() { } - virtual void HandleClose() { } - void Pipeline(zmqpp::socket* from, zmqpp::socket* to); - bool ProcessExit(); - - zmqpp::poller* _poller; - - zmqpp::socket* _inproc = nullptr; - std::thread* _thread = nullptr; -}; - -#endif // __ZMQTASK_H diff --git a/src/server/ipc/ZmqContext.cpp b/src/server/ipc/ZmqContext.cpp deleted file mode 100644 index bc062de8e88..00000000000 --- a/src/server/ipc/ZmqContext.cpp +++ /dev/null @@ -1,55 +0,0 @@ -/* - * Copyright (C) 2008-2016 TrinityCore <http://www.trinitycore.org/> - * - * This program is free software; you can redistribute it and/or modify it - * under the terms of the GNU General Public License as published by the - * Free Software Foundation; either version 2 of the License, or (at your - * option) any later version. - * - * This program is distributed in the hope that it will be useful, but WITHOUT - * ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or - * FITNESS FOR A PARTICULAR PURPOSE. See the GNU General Public License for - * more details. - * - * You should have received a copy of the GNU General Public License along - * with this program. If not, see <http://www.gnu.org/licenses/>. - */ - -#include "ZmqContext.h" - -ZmqContext::ZmqContext() : _inproc(nullptr) -{ -} - -ZmqContext::~ZmqContext() -{ -} - -zmqpp::socket* ZmqContext::CreateNewSocket(zmqpp::socket_type type) -{ - std::unique_lock<std::mutex> lock(_mutex); - zmqpp::socket* socket = new zmqpp::socket(_context, type); - socket->set(zmqpp::socket_option::linger, 0); - return socket; -} - -void ZmqContext::Initialize() -{ - _inproc = new zmqpp::socket(_context, zmqpp::socket_type::pub); - _inproc->bind("inproc://workers"); -} - -zmqpp::socket* ZmqContext::CreateInprocSubscriber() -{ - zmqpp::socket* sub = CreateNewSocket(zmqpp::socket_type::sub); - sub->connect("inproc://workers"); - sub->subscribe("internalmq."); - return sub; -} - -void ZmqContext::Close() -{ - _inproc->send("internalmq.kill"); - delete _inproc; - _inproc = nullptr; -} diff --git a/src/server/ipc/ZmqContext.h b/src/server/ipc/ZmqContext.h deleted file mode 100644 index 99f49a8c597..00000000000 --- a/src/server/ipc/ZmqContext.h +++ /dev/null @@ -1,55 +0,0 @@ -/* - * Copyright (C) 2008-2016 TrinityCore <http://www.trinitycore.org/> - * - * This program is free software; you can redistribute it and/or modify it - * under the terms of the GNU General Public License as published by the - * Free Software Foundation; either version 2 of the License, or (at your - * option) any later version. - * - * This program is distributed in the hope that it will be useful, but WITHOUT - * ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or - * FITNESS FOR A PARTICULAR PURPOSE. See the GNU General Public License for - * more details. - * - * You should have received a copy of the GNU General Public License along - * with this program. If not, see <http://www.gnu.org/licenses/>. - */ - -#ifndef __ZMQCONTEX_H -#define __ZMQCONTEX_H - -#include <zmqpp/zmqpp.hpp> -#include <mutex> - -/* - * We need to serialize access to zmq context otherwise stuff blows up. - */ -class ZmqContext -{ -public: - ~ZmqContext(); - - static ZmqContext* Instance() - { - static ZmqContext instance; - return &instance; - } - - zmqpp::socket* CreateNewSocket(zmqpp::socket_type); - void Initialize(); - zmqpp::socket* CreateInprocSubscriber(); - void Close(); - -private: - ZmqContext(); - ZmqContext(ZmqContext const&) = delete; - ZmqContext& operator=(ZmqContext const&) = delete; - - zmqpp::context _context; - std::mutex _mutex; - zmqpp::socket* _inproc; -}; - -#define sIpcContext ZmqContext::Instance() - -#endif diff --git a/src/server/ipc/ZmqListener.cpp b/src/server/ipc/ZmqListener.cpp deleted file mode 100644 index a7cf78f8a92..00000000000 --- a/src/server/ipc/ZmqListener.cpp +++ /dev/null @@ -1,69 +0,0 @@ -/* - * Copyright (C) 2008-2016 TrinityCore <http://www.trinitycore.org/> - * - * This program is free software; you can redistribute it and/or modify it - * under the terms of the GNU General Public License as published by the - * Free Software Foundation; either version 2 of the License, or (at your - * option) any later version. - * - * This program is distributed in the hope that it will be useful, but WITHOUT - * ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or - * FITNESS FOR A PARTICULAR PURPOSE. See the GNU General Public License for - * more details. - * - * You should have received a copy of the GNU General Public License along - * with this program. If not, see <http://www.gnu.org/licenses/>. - */ - -#include "ZmqListener.h" -#include "ZmqContext.h" - -ZmqListener::ZmqListener(std::string const& from, std::string const& to) -{ - _from = sIpcContext->CreateNewSocket(zmqpp::socket_type::sub); - _to = sIpcContext->CreateNewSocket(zmqpp::socket_type::push); - _from->connect(from); - _to->bind(to); -} - -ZmqListener::~ZmqListener() -{ - delete _from; - delete _to; -} - -void ZmqListener::HandleOpen() -{ -} - -void ZmqListener::HandleClose() -{ - _from->close(); - _to->close(); -} - -void ZmqListener::Run() -{ - while (!ProcessExit()) - { - _poller->poll(); - - while (_poller->events(*_from) & zmqpp::poller::poll_in && - _poller->events(*_to) & zmqpp::poller::poll_out) - { - zmqpp::message msg; - _from->receive(msg); - _to->send(msg); - } - } -} - -void ZmqListener::Subscribe(std::string const& keyword) -{ - _from->subscribe(keyword); -} - -void ZmqListener::Unsubscribe(std::string const& keyword) -{ - _from->unsubscribe(keyword); -} diff --git a/src/server/ipc/ZmqListener.h b/src/server/ipc/ZmqListener.h deleted file mode 100644 index 07798b81886..00000000000 --- a/src/server/ipc/ZmqListener.h +++ /dev/null @@ -1,51 +0,0 @@ -/* - * Copyright (C) 2008-2016 TrinityCore <http://www.trinitycore.org/> - * - * This program is free software; you can redistribute it and/or modify it - * under the terms of the GNU General Public License as published by the - * Free Software Foundation; either version 2 of the License, or (at your - * option) any later version. - * - * This program is distributed in the hope that it will be useful, but WITHOUT - * ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or - * FITNESS FOR A PARTICULAR PURPOSE. See the GNU General Public License for - * more details. - * - * You should have received a copy of the GNU General Public License along - * with this program. If not, see <http://www.gnu.org/licenses/>. - */ - -#ifndef __ZMQLISTENER_H -#define __ZMQLISTENER_H - -#include "ZMQTask.h" -#include <zmqpp/zmqpp.hpp> - -class ZmqListener : public ZMQTask -{ -/* - * Read broadcasts from remote PUB socket, and forward them to - * another socket. - * - * from - client SUB socket - * to - listen PUSH socket - * - */ -public: - ZmqListener(std::string const& from, std::string const& to); - ~ZmqListener(); - void Run() override; - - void Subscribe(std::string const& keyword); - void Unsubscribe(std::string const& keyword); - -protected: - void HandleOpen() override; - void HandleClose() override; - -private: - zmqpp::socket* _from; - zmqpp::socket* _to; -}; - -#endif diff --git a/src/server/ipc/ZmqMux.cpp b/src/server/ipc/ZmqMux.cpp deleted file mode 100644 index d6dcd0f73f6..00000000000 --- a/src/server/ipc/ZmqMux.cpp +++ /dev/null @@ -1,68 +0,0 @@ -/* - * Copyright (C) 2008-2016 TrinityCore <http://www.trinitycore.org/> - * - * This program is free software; you can redistribute it and/or modify it - * under the terms of the GNU General Public License as published by the - * Free Software Foundation; either version 2 of the License, or (at your - * option) any later version. - * - * This program is distributed in the hope that it will be useful, but WITHOUT - * ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or - * FITNESS FOR A PARTICULAR PURPOSE. See the GNU General Public License for - * more details. - * - * You should have received a copy of the GNU General Public License along - * with this program. If not, see <http://www.gnu.org/licenses/>. - */ - -#include "ZmqMux.h" -#include "ZmqContext.h" - -ZmqMux::ZmqMux(std::string const& fromUri, std::string const& toUri): - _fromAddress(fromUri) -{ - printf("Opening muxer thread from %s to %s\n", fromUri.c_str(), toUri.c_str()); - _from = sIpcContext->CreateNewSocket(zmqpp::socket_type::pull); - _to = sIpcContext->CreateNewSocket(zmqpp::socket_type::push); - - _from->bind(fromUri); - _to->connect(toUri); -} - -ZmqMux::~ZmqMux() -{ - delete _from; - delete _to; -} - -void ZmqMux::HandleOpen() -{ - _poller->add(*_from); - _poller->add(*_to, zmqpp::poller::poll_out); -} - -bool ZmqMux::Send(zmqpp::message* m, bool dont_block) -{ - if (_socket.get() == nullptr) - { - _socket.reset(sIpcContext->CreateNewSocket(zmqpp::socket_type::push)); - _socket->connect(_fromAddress); - } - - return _socket->send(*m, dont_block); -} - -void ZmqMux::Run() -{ - for (;;) - { - std::this_thread::sleep_for(std::chrono::milliseconds(50)); - if (!_poller->poll()) - break; - - if (ProcessExit()) - break; - - Pipeline(_from, _to); - } -} diff --git a/src/server/ipc/ZmqMux.h b/src/server/ipc/ZmqMux.h deleted file mode 100644 index 1009382f101..00000000000 --- a/src/server/ipc/ZmqMux.h +++ /dev/null @@ -1,47 +0,0 @@ -/* - * Copyright (C) 2008-2016 TrinityCore <http://www.trinitycore.org/> - * - * This program is free software; you can redistribute it and/or modify it - * under the terms of the GNU General Public License as published by the - * Free Software Foundation; either version 2 of the License, or (at your - * option) any later version. - * - * This program is distributed in the hope that it will be useful, but WITHOUT - * ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or - * FITNESS FOR A PARTICULAR PURPOSE. See the GNU General Public License for - * more details. - * - * You should have received a copy of the GNU General Public License along - * with this program. If not, see <http://www.gnu.org/licenses/>. - */ - -#ifndef __ZMQMUX_H -#define __ZMQMUX_H - -#include "ZMQTask.h" -#include <string> -#include <boost/thread/tss.hpp> - -/* - * Multiplexes zmq messages from many threads, - * and then passes them to another socket. - */ -class ZmqMux : public ZMQTask -{ -public: - ZmqMux(std::string const& from, std::string const& to); - ~ZmqMux(); - bool Send(zmqpp::message*, bool dont_block = false); - void Run() override; - -protected: - void HandleOpen() override; - -private: - boost::thread_specific_ptr<zmqpp::socket> _socket; - zmqpp::socket* _from; - zmqpp::socket* _to; - std::string const _fromAddress; -}; - -#endif diff --git a/src/server/ipc/ZmqWorker.cpp b/src/server/ipc/ZmqWorker.cpp deleted file mode 100644 index 617730ad013..00000000000 --- a/src/server/ipc/ZmqWorker.cpp +++ /dev/null @@ -1,69 +0,0 @@ -/* - * Copyright (C) 2008-2016 TrinityCore <http://www.trinitycore.org/> - * - * This program is free software; you can redistribute it and/or modify it - * under the terms of the GNU General Public License as published by the - * Free Software Foundation; either version 2 of the License, or (at your - * option) any later version. - * - * This program is distributed in the hope that it will be useful, but WITHOUT - * ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or - * FITNESS FOR A PARTICULAR PURPOSE. See the GNU General Public License for - * more details. - * - * You should have received a copy of the GNU General Public License along - * with this program. If not, see <http://www.gnu.org/licenses/>. - */ - -#include "ZmqWorker.h" -#include "ZmqContext.h" - -ZmqWorker::ZmqWorker(std::string const& taskUri, std::string const& resUri) : - _taskUri(taskUri), _resultsUri(resUri) -{ -} - -ZmqWorker::~ZmqWorker() -{ - delete _taskQueue; - delete _results; -} - -void ZmqWorker::HandleOpen() -{ - _taskQueue = sIpcContext->CreateNewSocket(zmqpp::socket_type::pull); - _results = sIpcContext->CreateNewSocket(zmqpp::socket_type::push); - - _taskQueue->connect(_taskUri); - _results->connect(_resultsUri); - - _poller->add(*_taskQueue); -} - -void ZmqWorker::HandleClose() -{ - _taskQueue->close(); - _results->close(); -} - -void ZmqWorker::Run() -{ - while (!ProcessExit()) - { - _poller->poll(); - if (_poller->events(*_taskQueue) & zmqpp::poller::poll_in) - PerformWork(); - } -} - -void ZmqWorker::PerformWork() -{ - int32 op1; - do - { - zmqpp::message msg; - _taskQueue->receive(msg); - Dispatch(msg); - _taskQueue->get(zmqpp::socket_option::events, op1); - } while (op1 & zmqpp::poller::poll_in); -} diff --git a/src/server/ipc/ZmqWorker.h b/src/server/ipc/ZmqWorker.h deleted file mode 100644 index 21d2d95ac18..00000000000 --- a/src/server/ipc/ZmqWorker.h +++ /dev/null @@ -1,44 +0,0 @@ -/* - * Copyright (C) 2008-2016 TrinityCore <http://www.trinitycore.org/> - * - * This program is free software; you can redistribute it and/or modify it - * under the terms of the GNU General Public License as published by the - * Free Software Foundation; either version 2 of the License, or (at your - * option) any later version. - * - * This program is distributed in the hope that it will be useful, but WITHOUT - * ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or - * FITNESS FOR A PARTICULAR PURPOSE. See the GNU General Public License for - * more details. - * - * You should have received a copy of the GNU General Public License along - * with this program. If not, see <http://www.gnu.org/licenses/>. - */ - -#ifndef __ZMQWORKER_H -#define __ZMQWORKER_H - -#include "ZMQTask.h" -#include <zmqpp/zmqpp.hpp> - -class ZmqWorker : public ZMQTask -{ -public: - ZmqWorker(std::string const& taskUri, std::string const& resUri); - ~ZmqWorker(); - void Run() override; - -protected: - void HandleOpen() override; - void HandleClose() override; - zmqpp::socket* _results = nullptr; - -private: - void PerformWork(); - virtual void Dispatch(zmqpp::message const&) = 0; - zmqpp::socket* _taskQueue = nullptr; - std::string _taskUri; - std::string _resultsUri; -}; - -#endif |