diff --git a/dep/libzmq/CMakeLists.txt b/dep/libzmq/CMakeLists.txt index ba408865b6d..737193af197 100644 --- a/dep/libzmq/CMakeLists.txt +++ b/dep/libzmq/CMakeLists.txt @@ -5,14 +5,17 @@ set (libzmq_STAT_SRCS src/clock.cpp src/ctx.cpp src/dealer.cpp + src/devpoll.cpp src/dgram.cpp src/dist.cpp + src/epoll.cpp src/err.cpp src/fq.cpp src/io_object.cpp src/io_thread.cpp src/ip.cpp src/lb.cpp + src/kqueue.cpp src/mailbox.cpp src/mailbox_safe.cpp src/mechanism.cpp @@ -28,7 +31,9 @@ set (libzmq_STAT_SRCS src/pipe.cpp src/plain_client.cpp src/plain_server.cpp + src/poll.cpp src/poller_base.cpp + src/pollset.cpp src/proxy.cpp src/pub.cpp src/pull.cpp diff --git a/dep/libzmq/src/devpoll.cpp b/dep/libzmq/src/devpoll.cpp new file mode 100644 index 00000000000..99c82777081 --- /dev/null +++ b/dep/libzmq/src/devpoll.cpp @@ -0,0 +1,205 @@ +/* + Copyright (c) 2007-2016 Contributors as noted in the AUTHORS file + + This file is part of libzmq, the ZeroMQ core engine in C++. + + libzmq is free software; you can redistribute it and/or modify it under + the terms of the GNU Lesser General Public License (LGPL) as published + by the Free Software Foundation; either version 3 of the License, or + (at your option) any later version. + + As a special exception, the Contributors give you permission to link + this library with independent modules to produce an executable, + regardless of the license terms of these independent modules, and to + copy and distribute the resulting executable under terms of your choice, + provided that you also meet, for each linked independent module, the + terms and conditions of the license of that module. An independent + module is a module which is not derived from or based on this library. + If you modify this library, you must extend this exception to your + version of the library. + + libzmq is distributed in the hope that it will be useful, but WITHOUT + ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or + FITNESS FOR A PARTICULAR PURPOSE. See the GNU Lesser General Public + License for more details. + + You should have received a copy of the GNU Lesser General Public License + along with this program. If not, see . +*/ + +#include "precompiled.hpp" +#include "devpoll.hpp" +#if defined ZMQ_USE_DEVPOLL + +#include +#include +#include +#include +#include +#include +#include +#include +#include + +#include "devpoll.hpp" +#include "err.hpp" +#include "config.hpp" +#include "i_poll_events.hpp" + +zmq::devpoll_t::devpoll_t (const zmq::ctx_t &ctx_) : + ctx(ctx_), + stopping (false) +{ + devpoll_fd = open ("/dev/poll", O_RDWR); + errno_assert (devpoll_fd != -1); +} + +zmq::devpoll_t::~devpoll_t () +{ + worker.stop (); + close (devpoll_fd); +} + +void zmq::devpoll_t::devpoll_ctl (fd_t fd_, short events_) +{ + struct pollfd pfd = {fd_, events_, 0}; + ssize_t rc = write (devpoll_fd, &pfd, sizeof pfd); + zmq_assert (rc == sizeof pfd); +} + +zmq::devpoll_t::handle_t zmq::devpoll_t::add_fd (fd_t fd_, + i_poll_events *reactor_) +{ + // If the file descriptor table is too small expand it. + fd_table_t::size_type sz = fd_table.size (); + if (sz <= (fd_table_t::size_type) fd_) { + fd_table.resize (fd_ + 1); + while (sz != (fd_table_t::size_type) (fd_ + 1)) { + fd_table [sz].valid = false; + ++sz; + } + } + + zmq_assert (!fd_table [fd_].valid); + + fd_table [fd_].events = 0; + fd_table [fd_].reactor = reactor_; + fd_table [fd_].valid = true; + fd_table [fd_].accepted = false; + + devpoll_ctl (fd_, 0); + pending_list.push_back (fd_); + + // Increase the load metric of the thread. + adjust_load (1); + + return fd_; +} + +void zmq::devpoll_t::rm_fd (handle_t handle_) +{ + zmq_assert (fd_table [handle_].valid); + + devpoll_ctl (handle_, POLLREMOVE); + fd_table [handle_].valid = false; + + // Decrease the load metric of the thread. + adjust_load (-1); +} + +void zmq::devpoll_t::set_pollin (handle_t handle_) +{ + devpoll_ctl (handle_, POLLREMOVE); + fd_table [handle_].events |= POLLIN; + devpoll_ctl (handle_, fd_table [handle_].events); +} + +void zmq::devpoll_t::reset_pollin (handle_t handle_) +{ + devpoll_ctl (handle_, POLLREMOVE); + fd_table [handle_].events &= ~((short) POLLIN); + devpoll_ctl (handle_, fd_table [handle_].events); +} + +void zmq::devpoll_t::set_pollout (handle_t handle_) +{ + devpoll_ctl (handle_, POLLREMOVE); + fd_table [handle_].events |= POLLOUT; + devpoll_ctl (handle_, fd_table [handle_].events); +} + +void zmq::devpoll_t::reset_pollout (handle_t handle_) +{ + devpoll_ctl (handle_, POLLREMOVE); + fd_table [handle_].events &= ~((short) POLLOUT); + devpoll_ctl (handle_, fd_table [handle_].events); +} + +void zmq::devpoll_t::start () +{ + ctx.start_thread (worker, worker_routine, this); +} + +void zmq::devpoll_t::stop () +{ + stopping = true; +} + +int zmq::devpoll_t::max_fds () +{ + return -1; +} + +void zmq::devpoll_t::loop () +{ + while (!stopping) { + + struct pollfd ev_buf [max_io_events]; + struct dvpoll poll_req; + + for (pending_list_t::size_type i = 0; i < pending_list.size (); i ++) + fd_table [pending_list [i]].accepted = true; + pending_list.clear (); + + // Execute any due timers. + int timeout = (int) execute_timers (); + + // Wait for events. + // On Solaris, we can retrieve no more then (OPEN_MAX - 1) events. + poll_req.dp_fds = &ev_buf [0]; +#if defined ZMQ_HAVE_SOLARIS + poll_req.dp_nfds = std::min ((int) max_io_events, OPEN_MAX - 1); +#else + poll_req.dp_nfds = max_io_events; +#endif + poll_req.dp_timeout = timeout ? timeout : -1; + int n = ioctl (devpoll_fd, DP_POLL, &poll_req); + if (n == -1 && errno == EINTR) + continue; + errno_assert (n != -1); + + for (int i = 0; i < n; i ++) { + + fd_entry_t *fd_ptr = &fd_table [ev_buf [i].fd]; + if (!fd_ptr->valid || !fd_ptr->accepted) + continue; + if (ev_buf [i].revents & (POLLERR | POLLHUP)) + fd_ptr->reactor->in_event (); + if (!fd_ptr->valid || !fd_ptr->accepted) + continue; + if (ev_buf [i].revents & POLLOUT) + fd_ptr->reactor->out_event (); + if (!fd_ptr->valid || !fd_ptr->accepted) + continue; + if (ev_buf [i].revents & POLLIN) + fd_ptr->reactor->in_event (); + } + } +} + +void zmq::devpoll_t::worker_routine (void *arg_) +{ + ((devpoll_t*) arg_)->loop (); +} + +#endif diff --git a/dep/libzmq/src/devpoll.hpp b/dep/libzmq/src/devpoll.hpp new file mode 100644 index 00000000000..3d939b5549a --- /dev/null +++ b/dep/libzmq/src/devpoll.hpp @@ -0,0 +1,119 @@ +/* + Copyright (c) 2007-2016 Contributors as noted in the AUTHORS file + + This file is part of libzmq, the ZeroMQ core engine in C++. + + libzmq is free software; you can redistribute it and/or modify it under + the terms of the GNU Lesser General Public License (LGPL) as published + by the Free Software Foundation; either version 3 of the License, or + (at your option) any later version. + + As a special exception, the Contributors give you permission to link + this library with independent modules to produce an executable, + regardless of the license terms of these independent modules, and to + copy and distribute the resulting executable under terms of your choice, + provided that you also meet, for each linked independent module, the + terms and conditions of the license of that module. An independent + module is a module which is not derived from or based on this library. + If you modify this library, you must extend this exception to your + version of the library. + + libzmq is distributed in the hope that it will be useful, but WITHOUT + ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or + FITNESS FOR A PARTICULAR PURPOSE. See the GNU Lesser General Public + License for more details. + + You should have received a copy of the GNU Lesser General Public License + along with this program. If not, see . +*/ + +#ifndef __ZMQ_DEVPOLL_HPP_INCLUDED__ +#define __ZMQ_DEVPOLL_HPP_INCLUDED__ + +// poller.hpp decides which polling mechanism to use. +#include "poller.hpp" +#if defined ZMQ_USE_DEVPOLL + +#include + +#include "ctx.hpp" +#include "fd.hpp" +#include "thread.hpp" +#include "poller_base.hpp" + +namespace zmq +{ + + struct i_poll_events; + + // Implements socket polling mechanism using the "/dev/poll" interface. + + class devpoll_t : public poller_base_t + { + public: + + typedef fd_t handle_t; + + devpoll_t (const ctx_t &ctx_); + ~devpoll_t (); + + // "poller" concept. + handle_t add_fd (fd_t fd_, zmq::i_poll_events *events_); + void rm_fd (handle_t handle_); + void set_pollin (handle_t handle_); + void reset_pollin (handle_t handle_); + void set_pollout (handle_t handle_); + void reset_pollout (handle_t handle_); + void start (); + void stop (); + + static int max_fds (); + + private: + + // Main worker thread routine. + static void worker_routine (void *arg_); + + // Main event loop. + void loop (); + + // Reference to ZMQ context. + const ctx_t &ctx; + + // File descriptor referring to "/dev/poll" pseudo-device. + fd_t devpoll_fd; + + struct fd_entry_t + { + short events; + zmq::i_poll_events *reactor; + bool valid; + bool accepted; + }; + + typedef std::vector fd_table_t; + fd_table_t fd_table; + + typedef std::vector pending_list_t; + pending_list_t pending_list; + + // Pollset manipulation function. + void devpoll_ctl (fd_t fd_, short events_); + + // If true, thread is in the process of shutting down. + bool stopping; + + // Handle of the physical thread doing the I/O work. + thread_t worker; + + devpoll_t (const devpoll_t&); + const devpoll_t &operator = (const devpoll_t&); + }; + + typedef devpoll_t poller_t; + +} + +#endif + +#endif diff --git a/dep/libzmq/src/epoll.cpp b/dep/libzmq/src/epoll.cpp new file mode 100644 index 00000000000..be7ad61d831 --- /dev/null +++ b/dep/libzmq/src/epoll.cpp @@ -0,0 +1,206 @@ +/* + Copyright (c) 2007-2016 Contributors as noted in the AUTHORS file + + This file is part of libzmq, the ZeroMQ core engine in C++. + + libzmq is free software; you can redistribute it and/or modify it under + the terms of the GNU Lesser General Public License (LGPL) as published + by the Free Software Foundation; either version 3 of the License, or + (at your option) any later version. + + As a special exception, the Contributors give you permission to link + this library with independent modules to produce an executable, + regardless of the license terms of these independent modules, and to + copy and distribute the resulting executable under terms of your choice, + provided that you also meet, for each linked independent module, the + terms and conditions of the license of that module. An independent + module is a module which is not derived from or based on this library. + If you modify this library, you must extend this exception to your + version of the library. + + libzmq is distributed in the hope that it will be useful, but WITHOUT + ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or + FITNESS FOR A PARTICULAR PURPOSE. See the GNU Lesser General Public + License for more details. + + You should have received a copy of the GNU Lesser General Public License + along with this program. If not, see . +*/ + +#include "precompiled.hpp" +#include "epoll.hpp" +#if defined ZMQ_USE_EPOLL + +#include +#include +#include +#include +#include +#include + +#include "macros.hpp" +#include "epoll.hpp" +#include "err.hpp" +#include "config.hpp" +#include "i_poll_events.hpp" + +zmq::epoll_t::epoll_t (const zmq::ctx_t &ctx_) : + ctx(ctx_), + stopping (false) +{ +#ifdef ZMQ_USE_EPOLL_CLOEXEC + // Setting this option result in sane behaviour when exec() functions + // are used. Old sockets are closed and don't block TCP ports, avoid + // leaks, etc. + epoll_fd = epoll_create1 (EPOLL_CLOEXEC); +#else + epoll_fd = epoll_create (1); +#endif + errno_assert (epoll_fd != -1); +} + +zmq::epoll_t::~epoll_t () +{ + // Wait till the worker thread exits. + worker.stop (); + + close (epoll_fd); + for (retired_t::iterator it = retired.begin (); it != retired.end (); ++it) { + LIBZMQ_DELETE(*it); + } +} + +zmq::epoll_t::handle_t zmq::epoll_t::add_fd (fd_t fd_, i_poll_events *events_) +{ + poll_entry_t *pe = new (std::nothrow) poll_entry_t; + alloc_assert (pe); + + // The memset is not actually needed. It's here to prevent debugging + // tools to complain about using uninitialised memory. + memset (pe, 0, sizeof (poll_entry_t)); + + pe->fd = fd_; + pe->ev.events = 0; + pe->ev.data.ptr = pe; + pe->events = events_; + + int rc = epoll_ctl (epoll_fd, EPOLL_CTL_ADD, fd_, &pe->ev); + errno_assert (rc != -1); + + // Increase the load metric of the thread. + adjust_load (1); + + return pe; +} + +void zmq::epoll_t::rm_fd (handle_t handle_) +{ + poll_entry_t *pe = (poll_entry_t*) handle_; + int rc = epoll_ctl (epoll_fd, EPOLL_CTL_DEL, pe->fd, &pe->ev); + errno_assert (rc != -1); + pe->fd = retired_fd; + retired_sync.lock (); + retired.push_back (pe); + retired_sync.unlock (); + + // Decrease the load metric of the thread. + adjust_load (-1); +} + +void zmq::epoll_t::set_pollin (handle_t handle_) +{ + poll_entry_t *pe = (poll_entry_t*) handle_; + pe->ev.events |= EPOLLIN; + int rc = epoll_ctl (epoll_fd, EPOLL_CTL_MOD, pe->fd, &pe->ev); + errno_assert (rc != -1); +} + +void zmq::epoll_t::reset_pollin (handle_t handle_) +{ + poll_entry_t *pe = (poll_entry_t*) handle_; + pe->ev.events &= ~((short) EPOLLIN); + int rc = epoll_ctl (epoll_fd, EPOLL_CTL_MOD, pe->fd, &pe->ev); + errno_assert (rc != -1); +} + +void zmq::epoll_t::set_pollout (handle_t handle_) +{ + poll_entry_t *pe = (poll_entry_t*) handle_; + pe->ev.events |= EPOLLOUT; + int rc = epoll_ctl (epoll_fd, EPOLL_CTL_MOD, pe->fd, &pe->ev); + errno_assert (rc != -1); +} + +void zmq::epoll_t::reset_pollout (handle_t handle_) +{ + poll_entry_t *pe = (poll_entry_t*) handle_; + pe->ev.events &= ~((short) EPOLLOUT); + int rc = epoll_ctl (epoll_fd, EPOLL_CTL_MOD, pe->fd, &pe->ev); + errno_assert (rc != -1); +} + +void zmq::epoll_t::start () +{ + ctx.start_thread (worker, worker_routine, this); +} + +void zmq::epoll_t::stop () +{ + stopping = true; +} + +int zmq::epoll_t::max_fds () +{ + return -1; +} + +void zmq::epoll_t::loop () +{ + epoll_event ev_buf [max_io_events]; + + while (!stopping) { + + // Execute any due timers. + int timeout = (int) execute_timers (); + + // Wait for events. + int n = epoll_wait (epoll_fd, &ev_buf [0], max_io_events, + timeout ? timeout : -1); + if (n == -1) { + errno_assert (errno == EINTR); + continue; + } + + for (int i = 0; i < n; i ++) { + poll_entry_t *pe = ((poll_entry_t*) ev_buf [i].data.ptr); + + if (pe->fd == retired_fd) + continue; + if (ev_buf [i].events & (EPOLLERR | EPOLLHUP)) + pe->events->in_event (); + if (pe->fd == retired_fd) + continue; + if (ev_buf [i].events & EPOLLOUT) + pe->events->out_event (); + if (pe->fd == retired_fd) + continue; + if (ev_buf [i].events & EPOLLIN) + pe->events->in_event (); + } + + // Destroy retired event sources. + retired_sync.lock (); + for (retired_t::iterator it = retired.begin (); it != retired.end (); ++it) { + LIBZMQ_DELETE(*it); + } + retired.clear (); + retired_sync.unlock (); + } +} + +void zmq::epoll_t::worker_routine (void *arg_) +{ + ((epoll_t*) arg_)->loop (); +} + +#endif diff --git a/dep/libzmq/src/epoll.hpp b/dep/libzmq/src/epoll.hpp new file mode 100644 index 00000000000..e6c08c9283a --- /dev/null +++ b/dep/libzmq/src/epoll.hpp @@ -0,0 +1,119 @@ +/* + Copyright (c) 2007-2016 Contributors as noted in the AUTHORS file + + This file is part of libzmq, the ZeroMQ core engine in C++. + + libzmq is free software; you can redistribute it and/or modify it under + the terms of the GNU Lesser General Public License (LGPL) as published + by the Free Software Foundation; either version 3 of the License, or + (at your option) any later version. + + As a special exception, the Contributors give you permission to link + this library with independent modules to produce an executable, + regardless of the license terms of these independent modules, and to + copy and distribute the resulting executable under terms of your choice, + provided that you also meet, for each linked independent module, the + terms and conditions of the license of that module. An independent + module is a module which is not derived from or based on this library. + If you modify this library, you must extend this exception to your + version of the library. + + libzmq is distributed in the hope that it will be useful, but WITHOUT + ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or + FITNESS FOR A PARTICULAR PURPOSE. See the GNU Lesser General Public + License for more details. + + You should have received a copy of the GNU Lesser General Public License + along with this program. If not, see . +*/ + +#ifndef __ZMQ_EPOLL_HPP_INCLUDED__ +#define __ZMQ_EPOLL_HPP_INCLUDED__ + +// poller.hpp decides which polling mechanism to use. +#include "poller.hpp" +#if defined ZMQ_USE_EPOLL + +#include +#include + +#include "ctx.hpp" +#include "fd.hpp" +#include "thread.hpp" +#include "poller_base.hpp" +#include "mutex.hpp" + +namespace zmq +{ + + struct i_poll_events; + + // This class implements socket polling mechanism using the Linux-specific + // epoll mechanism. + + class epoll_t : public poller_base_t + { + public: + + typedef void* handle_t; + + epoll_t (const ctx_t &ctx_); + ~epoll_t (); + + // "poller" concept. + handle_t add_fd (fd_t fd_, zmq::i_poll_events *events_); + void rm_fd (handle_t handle_); + void set_pollin (handle_t handle_); + void reset_pollin (handle_t handle_); + void set_pollout (handle_t handle_); + void reset_pollout (handle_t handle_); + void start (); + void stop (); + + static int max_fds (); + + private: + + // Main worker thread routine. + static void worker_routine (void *arg_); + + // Main event loop. + void loop (); + + // Reference to ZMQ context. + const ctx_t &ctx; + + // Main epoll file descriptor + fd_t epoll_fd; + + struct poll_entry_t + { + fd_t fd; + epoll_event ev; + zmq::i_poll_events *events; + }; + + // List of retired event sources. + typedef std::vector retired_t; + retired_t retired; + + // If true, thread is in the process of shutting down. + bool stopping; + + // Handle of the physical thread doing the I/O work. + thread_t worker; + + // Synchronisation of retired event sources + mutex_t retired_sync; + + epoll_t (const epoll_t&); + const epoll_t &operator = (const epoll_t&); + }; + + typedef epoll_t poller_t; + +} + +#endif + +#endif diff --git a/dep/libzmq/src/kqueue.cpp b/dep/libzmq/src/kqueue.cpp new file mode 100644 index 00000000000..a0ec2f1ee59 --- /dev/null +++ b/dep/libzmq/src/kqueue.cpp @@ -0,0 +1,227 @@ +/* + Copyright (c) 2007-2016 Contributors as noted in the AUTHORS file + + This file is part of libzmq, the ZeroMQ core engine in C++. + + libzmq is free software; you can redistribute it and/or modify it under + the terms of the GNU Lesser General Public License (LGPL) as published + by the Free Software Foundation; either version 3 of the License, or + (at your option) any later version. + + As a special exception, the Contributors give you permission to link + this library with independent modules to produce an executable, + regardless of the license terms of these independent modules, and to + copy and distribute the resulting executable under terms of your choice, + provided that you also meet, for each linked independent module, the + terms and conditions of the license of that module. An independent + module is a module which is not derived from or based on this library. + If you modify this library, you must extend this exception to your + version of the library. + + libzmq is distributed in the hope that it will be useful, but WITHOUT + ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or + FITNESS FOR A PARTICULAR PURPOSE. See the GNU Lesser General Public + License for more details. + + You should have received a copy of the GNU Lesser General Public License + along with this program. If not, see . +*/ + +#include "precompiled.hpp" +#include "kqueue.hpp" +#if defined ZMQ_USE_KQUEUE + +#include +#include +#include +#include +#include +#include +#include + +#include "macros.hpp" +#include "kqueue.hpp" +#include "err.hpp" +#include "config.hpp" +#include "i_poll_events.hpp" +#include "likely.hpp" + +// NetBSD defines (struct kevent).udata as intptr_t, everyone else +// as void *. +#if defined ZMQ_HAVE_NETBSD +#define kevent_udata_t intptr_t +#else +#define kevent_udata_t void * +#endif + +zmq::kqueue_t::kqueue_t (const zmq::ctx_t &ctx_) : + ctx(ctx_), + stopping (false) +{ + // Create event queue + kqueue_fd = kqueue (); + errno_assert (kqueue_fd != -1); +#ifdef HAVE_FORK + pid = getpid(); +#endif +} + +zmq::kqueue_t::~kqueue_t () +{ + worker.stop (); + close (kqueue_fd); +} + +void zmq::kqueue_t::kevent_add (fd_t fd_, short filter_, void *udata_) +{ + struct kevent ev; + + EV_SET (&ev, fd_, filter_, EV_ADD, 0, 0, (kevent_udata_t)udata_); + int rc = kevent (kqueue_fd, &ev, 1, NULL, 0, NULL); + errno_assert (rc != -1); +} + +void zmq::kqueue_t::kevent_delete (fd_t fd_, short filter_) +{ + struct kevent ev; + + EV_SET (&ev, fd_, filter_, EV_DELETE, 0, 0, 0); + int rc = kevent (kqueue_fd, &ev, 1, NULL, 0, NULL); + errno_assert (rc != -1); +} + +zmq::kqueue_t::handle_t zmq::kqueue_t::add_fd (fd_t fd_, + i_poll_events *reactor_) +{ + poll_entry_t *pe = new (std::nothrow) poll_entry_t; + alloc_assert (pe); + + pe->fd = fd_; + pe->flag_pollin = 0; + pe->flag_pollout = 0; + pe->reactor = reactor_; + + adjust_load (1); + + return pe; +} + +void zmq::kqueue_t::rm_fd (handle_t handle_) +{ + poll_entry_t *pe = (poll_entry_t*) handle_; + if (pe->flag_pollin) + kevent_delete (pe->fd, EVFILT_READ); + if (pe->flag_pollout) + kevent_delete (pe->fd, EVFILT_WRITE); + pe->fd = retired_fd; + retired.push_back (pe); + + adjust_load (-1); +} + +void zmq::kqueue_t::set_pollin (handle_t handle_) +{ + poll_entry_t *pe = (poll_entry_t*) handle_; + if (likely (!pe->flag_pollin)) { + pe->flag_pollin = true; + kevent_add (pe->fd, EVFILT_READ, pe); + } +} + +void zmq::kqueue_t::reset_pollin (handle_t handle_) +{ + poll_entry_t *pe = (poll_entry_t*) handle_; + if (likely (pe->flag_pollin)) { + pe->flag_pollin = false; + kevent_delete (pe->fd, EVFILT_READ); + } +} + +void zmq::kqueue_t::set_pollout (handle_t handle_) +{ + poll_entry_t *pe = (poll_entry_t*) handle_; + if (likely (!pe->flag_pollout)) { + pe->flag_pollout = true; + kevent_add (pe->fd, EVFILT_WRITE, pe); + } +} + +void zmq::kqueue_t::reset_pollout (handle_t handle_) +{ + poll_entry_t *pe = (poll_entry_t*) handle_; + if (likely (pe->flag_pollout)) { + pe->flag_pollout = false; + kevent_delete (pe->fd, EVFILT_WRITE); + } +} + +void zmq::kqueue_t::start () +{ + ctx.start_thread (worker, worker_routine, this); +} + +void zmq::kqueue_t::stop () +{ + stopping = true; +} + +int zmq::kqueue_t::max_fds () +{ + return -1; +} + +void zmq::kqueue_t::loop () +{ + while (!stopping) { + + // Execute any due timers. + int timeout = (int) execute_timers (); + + // Wait for events. + struct kevent ev_buf [max_io_events]; + timespec ts = {timeout / 1000, (timeout % 1000) * 1000000}; + int n = kevent (kqueue_fd, NULL, 0, &ev_buf [0], max_io_events, + timeout ? &ts: NULL); +#ifdef HAVE_FORK + if (unlikely(pid != getpid())) { + //printf("zmq::kqueue_t::loop aborting on forked child %d\n", (int)getpid()); + // simply exit the loop in a forked process. + return; + } +#endif + if (n == -1) { + errno_assert (errno == EINTR); + continue; + } + + for (int i = 0; i < n; i ++) { + poll_entry_t *pe = (poll_entry_t*) ev_buf [i].udata; + + if (pe->fd == retired_fd) + continue; + if (ev_buf [i].flags & EV_EOF) + pe->reactor->in_event (); + if (pe->fd == retired_fd) + continue; + if (ev_buf [i].filter == EVFILT_WRITE) + pe->reactor->out_event (); + if (pe->fd == retired_fd) + continue; + if (ev_buf [i].filter == EVFILT_READ) + pe->reactor->in_event (); + } + + // Destroy retired event sources. + for (retired_t::iterator it = retired.begin (); it != retired.end (); ++it) { + LIBZMQ_DELETE(*it); + } + retired.clear (); + } +} + +void zmq::kqueue_t::worker_routine (void *arg_) +{ + ((kqueue_t*) arg_)->loop (); +} + +#endif diff --git a/dep/libzmq/src/kqueue.hpp b/dep/libzmq/src/kqueue.hpp new file mode 100644 index 00000000000..27c1b151215 --- /dev/null +++ b/dep/libzmq/src/kqueue.hpp @@ -0,0 +1,127 @@ +/* + Copyright (c) 2007-2016 Contributors as noted in the AUTHORS file + + This file is part of libzmq, the ZeroMQ core engine in C++. + + libzmq is free software; you can redistribute it and/or modify it under + the terms of the GNU Lesser General Public License (LGPL) as published + by the Free Software Foundation; either version 3 of the License, or + (at your option) any later version. + + As a special exception, the Contributors give you permission to link + this library with independent modules to produce an executable, + regardless of the license terms of these independent modules, and to + copy and distribute the resulting executable under terms of your choice, + provided that you also meet, for each linked independent module, the + terms and conditions of the license of that module. An independent + module is a module which is not derived from or based on this library. + If you modify this library, you must extend this exception to your + version of the library. + + libzmq is distributed in the hope that it will be useful, but WITHOUT + ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or + FITNESS FOR A PARTICULAR PURPOSE. See the GNU Lesser General Public + License for more details. + + You should have received a copy of the GNU Lesser General Public License + along with this program. If not, see . +*/ + +#ifndef __ZMQ_KQUEUE_HPP_INCLUDED__ +#define __ZMQ_KQUEUE_HPP_INCLUDED__ + +// poller.hpp decides which polling mechanism to use. +#include "poller.hpp" +#if defined ZMQ_USE_KQUEUE + +#include +#include + +#include "ctx.hpp" +#include "fd.hpp" +#include "thread.hpp" +#include "poller_base.hpp" + +namespace zmq +{ + + struct i_poll_events; + + // Implements socket polling mechanism using the BSD-specific + // kqueue interface. + + class kqueue_t : public poller_base_t + { + public: + + typedef void* handle_t; + + kqueue_t (const ctx_t &ctx_); + ~kqueue_t (); + + // "poller" concept. + handle_t add_fd (fd_t fd_, zmq::i_poll_events *events_); + void rm_fd (handle_t handle_); + void set_pollin (handle_t handle_); + void reset_pollin (handle_t handle_); + void set_pollout (handle_t handle_); + void reset_pollout (handle_t handle_); + void start (); + void stop (); + + static int max_fds (); + + private: + + // Main worker thread routine. + static void worker_routine (void *arg_); + + // Main event loop. + void loop (); + + // Reference to ZMQ context. + const ctx_t &ctx; + + // File descriptor referring to the kernel event queue. + fd_t kqueue_fd; + + // Adds the event to the kqueue. + void kevent_add (fd_t fd_, short filter_, void *udata_); + + // Deletes the event from the kqueue. + void kevent_delete (fd_t fd_, short filter_); + + struct poll_entry_t + { + fd_t fd; + bool flag_pollin; + bool flag_pollout; + zmq::i_poll_events *reactor; + }; + + // List of retired event sources. + typedef std::vector retired_t; + retired_t retired; + + // If true, thread is in the process of shutting down. + bool stopping; + + // Handle of the physical thread doing the I/O work. + thread_t worker; + + kqueue_t (const kqueue_t&); + const kqueue_t &operator = (const kqueue_t&); + +#ifdef HAVE_FORK + // the process that created this context. Used to detect forking. + pid_t pid; +#endif + }; + + typedef kqueue_t poller_t; + +} + +#endif + +#endif diff --git a/dep/libzmq/src/poll.cpp b/dep/libzmq/src/poll.cpp new file mode 100644 index 00000000000..1f9163cf4d6 --- /dev/null +++ b/dep/libzmq/src/poll.cpp @@ -0,0 +1,193 @@ +/* + Copyright (c) 2007-2016 Contributors as noted in the AUTHORS file + + This file is part of libzmq, the ZeroMQ core engine in C++. + + libzmq is free software; you can redistribute it and/or modify it under + the terms of the GNU Lesser General Public License (LGPL) as published + by the Free Software Foundation; either version 3 of the License, or + (at your option) any later version. + + As a special exception, the Contributors give you permission to link + this library with independent modules to produce an executable, + regardless of the license terms of these independent modules, and to + copy and distribute the resulting executable under terms of your choice, + provided that you also meet, for each linked independent module, the + terms and conditions of the license of that module. An independent + module is a module which is not derived from or based on this library. + If you modify this library, you must extend this exception to your + version of the library. + + libzmq is distributed in the hope that it will be useful, but WITHOUT + ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or + FITNESS FOR A PARTICULAR PURPOSE. See the GNU Lesser General Public + License for more details. + + You should have received a copy of the GNU Lesser General Public License + along with this program. If not, see . +*/ + +#include "precompiled.hpp" +#include "poll.hpp" +#if defined ZMQ_USE_POLL + +#include +#if !defined ZMQ_HAVE_WINDOWS +#include +#include +#endif +#include + +#include "poll.hpp" +#include "err.hpp" +#include "config.hpp" +#include "i_poll_events.hpp" + +zmq::poll_t::poll_t (const zmq::ctx_t &ctx_) : + ctx(ctx_), + retired (false), + stopping (false) +{ +} + +zmq::poll_t::~poll_t () +{ + worker.stop (); +} + +zmq::poll_t::handle_t zmq::poll_t::add_fd (fd_t fd_, i_poll_events *events_) +{ + // If the file descriptor table is too small expand it. + fd_table_t::size_type sz = fd_table.size (); + if (sz <= (fd_table_t::size_type) fd_) { + fd_table.resize (fd_ + 1); + while (sz != (fd_table_t::size_type) (fd_ + 1)) { + fd_table [sz].index = retired_fd; + ++sz; + } + } + + pollfd pfd = {fd_, 0, 0}; + pollset.push_back (pfd); + zmq_assert (fd_table [fd_].index == retired_fd); + + fd_table [fd_].index = pollset.size() - 1; + fd_table [fd_].events = events_; + + // Increase the load metric of the thread. + adjust_load (1); + + return fd_; +} + +void zmq::poll_t::rm_fd (handle_t handle_) +{ + fd_t index = fd_table [handle_].index; + zmq_assert (index != retired_fd); + + // Mark the fd as unused. + pollset [index].fd = retired_fd; + fd_table [handle_].index = retired_fd; + retired = true; + + // Decrease the load metric of the thread. + adjust_load (-1); +} + +void zmq::poll_t::set_pollin (handle_t handle_) +{ + fd_t index = fd_table [handle_].index; + pollset [index].events |= POLLIN; +} + +void zmq::poll_t::reset_pollin (handle_t handle_) +{ + fd_t index = fd_table [handle_].index; + pollset [index].events &= ~((short) POLLIN); +} + +void zmq::poll_t::set_pollout (handle_t handle_) +{ + fd_t index = fd_table [handle_].index; + pollset [index].events |= POLLOUT; +} + +void zmq::poll_t::reset_pollout (handle_t handle_) +{ + fd_t index = fd_table [handle_].index; + pollset [index].events &= ~((short) POLLOUT); +} + +void zmq::poll_t::start () +{ + ctx.start_thread (worker, worker_routine, this); +} + +void zmq::poll_t::stop () +{ + stopping = true; +} + +int zmq::poll_t::max_fds () +{ + return -1; +} + +void zmq::poll_t::loop () +{ + while (!stopping) { + + // Execute any due timers. + int timeout = (int) execute_timers (); + + // Wait for events. + int rc = poll (&pollset [0], pollset.size (), timeout ? timeout : -1); + if (rc == -1) { + errno_assert (errno == EINTR); + continue; + } + + // If there are no events (i.e. it's a timeout) there's no point + // in checking the pollset. + if (rc == 0) + continue; + + for (pollset_t::size_type i = 0; i != pollset.size (); i++) { + + zmq_assert (!(pollset [i].revents & POLLNVAL)); + if (pollset [i].fd == retired_fd) + continue; + if (pollset [i].revents & (POLLERR | POLLHUP)) + fd_table [pollset [i].fd].events->in_event (); + if (pollset [i].fd == retired_fd) + continue; + if (pollset [i].revents & POLLOUT) + fd_table [pollset [i].fd].events->out_event (); + if (pollset [i].fd == retired_fd) + continue; + if (pollset [i].revents & POLLIN) + fd_table [pollset [i].fd].events->in_event (); + } + + // Clean up the pollset and update the fd_table accordingly. + if (retired) { + pollset_t::size_type i = 0; + while (i < pollset.size ()) { + if (pollset [i].fd == retired_fd) + pollset.erase (pollset.begin () + i); + else { + fd_table [pollset [i].fd].index = i; + i ++; + } + } + retired = false; + } + } +} + +void zmq::poll_t::worker_routine (void *arg_) +{ + ((poll_t*) arg_)->loop (); +} + +#endif diff --git a/dep/libzmq/src/poll.hpp b/dep/libzmq/src/poll.hpp new file mode 100644 index 00000000000..84818055091 --- /dev/null +++ b/dep/libzmq/src/poll.hpp @@ -0,0 +1,121 @@ +/* + Copyright (c) 2007-2016 Contributors as noted in the AUTHORS file + + This file is part of libzmq, the ZeroMQ core engine in C++. + + libzmq is free software; you can redistribute it and/or modify it under + the terms of the GNU Lesser General Public License (LGPL) as published + by the Free Software Foundation; either version 3 of the License, or + (at your option) any later version. + + As a special exception, the Contributors give you permission to link + this library with independent modules to produce an executable, + regardless of the license terms of these independent modules, and to + copy and distribute the resulting executable under terms of your choice, + provided that you also meet, for each linked independent module, the + terms and conditions of the license of that module. An independent + module is a module which is not derived from or based on this library. + If you modify this library, you must extend this exception to your + version of the library. + + libzmq is distributed in the hope that it will be useful, but WITHOUT + ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or + FITNESS FOR A PARTICULAR PURPOSE. See the GNU Lesser General Public + License for more details. + + You should have received a copy of the GNU Lesser General Public License + along with this program. If not, see . +*/ + +#ifndef __ZMQ_POLL_HPP_INCLUDED__ +#define __ZMQ_POLL_HPP_INCLUDED__ + +// poller.hpp decides which polling mechanism to use. +#include "poller.hpp" +#if defined ZMQ_USE_POLL + +#if !defined ZMQ_HAVE_WINDOWS +#include +#endif +#include +#include + +#include "ctx.hpp" +#include "fd.hpp" +#include "thread.hpp" +#include "poller_base.hpp" + +namespace zmq +{ + + struct i_poll_events; + + // Implements socket polling mechanism using the POSIX.1-2001 + // poll() system call. + + class poll_t : public poller_base_t + { + public: + + typedef fd_t handle_t; + + poll_t (const ctx_t &ctx_); + ~poll_t (); + + // "poller" concept. + handle_t add_fd (fd_t fd_, zmq::i_poll_events *events_); + void rm_fd (handle_t handle_); + void set_pollin (handle_t handle_); + void reset_pollin (handle_t handle_); + void set_pollout (handle_t handle_); + void reset_pollout (handle_t handle_); + void start (); + void stop (); + + static int max_fds (); + + private: + + // Main worker thread routine. + static void worker_routine (void *arg_); + + // Main event loop. + void loop (); + + // Reference to ZMQ context. + const ctx_t &ctx; + + struct fd_entry_t + { + fd_t index; + zmq::i_poll_events *events; + }; + + // This table stores data for registered descriptors. + typedef std::vector fd_table_t; + fd_table_t fd_table; + + // Pollset to pass to the poll function. + typedef std::vector pollset_t; + pollset_t pollset; + + // If true, there's at least one retired event source. + bool retired; + + // If true, thread is in the process of shutting down. + bool stopping; + + // Handle of the physical thread doing the I/O work. + thread_t worker; + + poll_t (const poll_t&); + const poll_t &operator = (const poll_t&); + }; + + typedef poll_t poller_t; + +} + +#endif + +#endif diff --git a/dep/libzmq/src/pollset.cpp b/dep/libzmq/src/pollset.cpp new file mode 100644 index 00000000000..f08b34b44e0 --- /dev/null +++ b/dep/libzmq/src/pollset.cpp @@ -0,0 +1,254 @@ +/* + Copyright (c) 2007-2016 Contributors as noted in the AUTHORS file + + This file is part of libzmq, the ZeroMQ core engine in C++. + + libzmq is free software; you can redistribute it and/or modify it under + the terms of the GNU Lesser General Public License (LGPL) as published + by the Free Software Foundation; either version 3 of the License, or + (at your option) any later version. + + As a special exception, the Contributors give you permission to link + this library with independent modules to produce an executable, + regardless of the license terms of these independent modules, and to + copy and distribute the resulting executable under terms of your choice, + provided that you also meet, for each linked independent module, the + terms and conditions of the license of that module. An independent + module is a module which is not derived from or based on this library. + If you modify this library, you must extend this exception to your + version of the library. + + libzmq is distributed in the hope that it will be useful, but WITHOUT + ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or + FITNESS FOR A PARTICULAR PURPOSE. See the GNU Lesser General Public + License for more details. + + You should have received a copy of the GNU Lesser General Public License + along with this program. If not, see . +*/ + +#include "precompiled.hpp" +#include "pollset.hpp" +#if defined ZMQ_USE_POLLSET + +#include +#include +#include +#include +#include + +#include "macros.hpp" +#include "err.hpp" +#include "config.hpp" +#include "i_poll_events.hpp" + +zmq::pollset_t::pollset_t (const zmq::ctx_t &ctx_) : + ctx(ctx_), + stopping (false) +{ + pollset_fd = pollset_create (-1); + errno_assert (pollset_fd != -1); +} + +zmq::pollset_t::~pollset_t () +{ + // Wait till the worker thread exits. + worker.stop (); + + pollset_destroy (pollset_fd); + for (retired_t::iterator it = retired.begin (); it != retired.end (); ++it) + LIBZMQ_DELETE(*it); +} + +zmq::pollset_t::handle_t zmq::pollset_t::add_fd (fd_t fd_, i_poll_events *events_) +{ + poll_entry_t *pe = new (std::nothrow) poll_entry_t; + alloc_assert (pe); + + pe->fd = fd_; + pe->flag_pollin = false; + pe->flag_pollout = false; + pe->events = events_; + + struct poll_ctl pc; + pc.fd = fd_; + pc.cmd = PS_ADD; + pc.events = 0; + + int rc = pollset_ctl (pollset_fd, &pc, 1); + errno_assert (rc != -1); + + // Increase the load metric of the thread. + adjust_load (1); + + if (fd_ >= fd_table.size ()) { + fd_table.resize(fd_ + 1, NULL); + } + fd_table [fd_] = pe; + return pe; +} + +void zmq::pollset_t::rm_fd (handle_t handle_) +{ + poll_entry_t *pe = (poll_entry_t*) handle_; + + struct poll_ctl pc; + pc.fd = pe->fd; + pc.cmd = PS_DELETE; + pc.events = 0; + pollset_ctl (pollset_fd, &pc, 1); + + fd_table [pe->fd] = NULL; + + pe->fd = retired_fd; + retired.push_back (pe); + + // Decrease the load metric of the thread. + adjust_load (-1); +} + +void zmq::pollset_t::set_pollin (handle_t handle_) +{ + poll_entry_t *pe = (poll_entry_t*) handle_; + if (likely (!pe->flag_pollin)) { + struct poll_ctl pc; + pc.fd = pe->fd; + pc.cmd = PS_MOD; + pc.events = POLLIN; + + const int rc = pollset_ctl (pollset_fd, &pc, 1); + errno_assert (rc != -1); + + pe->flag_pollin = true; + } +} + +void zmq::pollset_t::reset_pollin (handle_t handle_) +{ + poll_entry_t *pe = (poll_entry_t*) handle_; + if (unlikely(!pe->flag_pollin)) { + return; + } + + struct poll_ctl pc; + pc.fd = pe->fd; + pc.events = 0; + + pc.cmd = PS_DELETE; + int rc = pollset_ctl (pollset_fd, &pc, 1); + + if (pe->flag_pollout) { + pc.events = POLLOUT; + pc.cmd = PS_MOD; + rc = pollset_ctl (pollset_fd, &pc, 1); + errno_assert (rc != -1); + } + + pe->flag_pollin = false; +} + +void zmq::pollset_t::set_pollout (handle_t handle_) +{ + poll_entry_t *pe = (poll_entry_t*) handle_; + if (likely (!pe->flag_pollout)) { + struct poll_ctl pc; + pc.fd = pe->fd; + pc.cmd = PS_MOD; + pc.events = POLLOUT; + + const int rc = pollset_ctl (pollset_fd, &pc, 1); + errno_assert (rc != -1); + + pe->flag_pollout = true; + } +} + +void zmq::pollset_t::reset_pollout (handle_t handle_) +{ + poll_entry_t *pe = (poll_entry_t*) handle_; + if (unlikely(!pe->flag_pollout)) { + return; + } + + struct poll_ctl pc; + pc.fd = pe->fd; + pc.events = 0; + + pc.cmd = PS_DELETE; + int rc = pollset_ctl (pollset_fd, &pc, 1); + errno_assert (rc != -1); + + if (pe->flag_pollin) { + pc.cmd = PS_MOD; + pc.events = POLLIN; + rc = pollset_ctl (pollset_fd, &pc, 1); + errno_assert (rc != -1); + } + pe->flag_pollout = false; +} + +void zmq::pollset_t::start () +{ + ctx.start_thread (worker, worker_routine, this); +} + +void zmq::pollset_t::stop () +{ + stopping = true; +} + +int zmq::pollset_t::max_fds () +{ + return -1; +} + +void zmq::pollset_t::loop () +{ + struct pollfd polldata_array[max_io_events]; + + while (!stopping) { + + // Execute any due timers. + int timeout = (int) execute_timers (); + + // Wait for events. + int n = pollset_poll(pollset_fd, polldata_array, max_io_events, + timeout ? timeout : -1); + if (n == -1) { + errno_assert (errno == EINTR); + continue; + } + + for (int i = 0; i < n; i ++) { + poll_entry_t *pe = fd_table [polldata_array [i].fd]; + if (!pe) + continue; + + if (pe->fd == retired_fd) + continue; + if (polldata_array [i].revents & (POLLERR | POLLHUP)) + pe->events->in_event (); + if (pe->fd == retired_fd) + continue; + if (polldata_array [i].revents & POLLOUT) + pe->events->out_event (); + if (pe->fd == retired_fd) + continue; + if (polldata_array [i].revents & POLLIN) + pe->events->in_event (); + } + + // Destroy retired event sources. + for (retired_t::iterator it = retired.begin (); it != retired.end (); + ++it) + LIBZMQ_DELETE(*it); + retired.clear (); + } +} + +void zmq::pollset_t::worker_routine (void *arg_) +{ + ((pollset_t*) arg_)->loop (); +} + +#endif diff --git a/dep/libzmq/src/pollset.hpp b/dep/libzmq/src/pollset.hpp new file mode 100644 index 00000000000..9ce333caebb --- /dev/null +++ b/dep/libzmq/src/pollset.hpp @@ -0,0 +1,121 @@ +/* + Copyright (c) 2007-2016 Contributors as noted in the AUTHORS file + + This file is part of libzmq, the ZeroMQ core engine in C++. + + libzmq is free software; you can redistribute it and/or modify it under + the terms of the GNU Lesser General Public License (LGPL) as published + by the Free Software Foundation; either version 3 of the License, or + (at your option) any later version. + + As a special exception, the Contributors give you permission to link + this library with independent modules to produce an executable, + regardless of the license terms of these independent modules, and to + copy and distribute the resulting executable under terms of your choice, + provided that you also meet, for each linked independent module, the + terms and conditions of the license of that module. An independent + module is a module which is not derived from or based on this library. + If you modify this library, you must extend this exception to your + version of the library. + + libzmq is distributed in the hope that it will be useful, but WITHOUT + ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or + FITNESS FOR A PARTICULAR PURPOSE. See the GNU Lesser General Public + License for more details. + + You should have received a copy of the GNU Lesser General Public License + along with this program. If not, see . +*/ + +#ifndef __ZMQ_POLLSET_HPP_INCLUDED__ +#define __ZMQ_POLLSET_HPP_INCLUDED__ + +// poller.hpp decides which polling mechanism to use. +#include "poller.hpp" +#if defined ZMQ_USE_POLLSET + +#include +#include +#include + +#include "ctx.hpp" +#include "fd.hpp" +#include "thread.hpp" +#include "poller_base.hpp" + +namespace zmq +{ + + struct i_poll_events; + + // This class implements socket polling mechanism using the AIX-specific + // pollset mechanism. + + class pollset_t : public poller_base_t + { + public: + + typedef void* handle_t; + + pollset_t (const ctx_t &ctx_); + ~pollset_t (); + + // "poller" concept. + handle_t add_fd (fd_t fd_, zmq::i_poll_events *events_); + void rm_fd (handle_t handle_); + void set_pollin (handle_t handle_); + void reset_pollin (handle_t handle_); + void set_pollout (handle_t handle_); + void reset_pollout (handle_t handle_); + void start (); + void stop (); + + static int max_fds (); + + private: + + // Main worker thread routine. + static void worker_routine (void *arg_); + + // Main event loop. + void loop (); + + // Reference to ZMQ context. + const ctx_t &ctx; + + // Main pollset file descriptor + ::pollset_t pollset_fd; + + struct poll_entry_t + { + fd_t fd; + bool flag_pollin; + bool flag_pollout; + zmq::i_poll_events *events; + }; + + // List of retired event sources. + typedef std::vector retired_t; + retired_t retired; + + // This table stores data for registered descriptors. + typedef std::vector fd_table_t; + fd_table_t fd_table; + + // If true, thread is in the process of shutting down. + bool stopping; + + // Handle of the physical thread doing the I/O work. + thread_t worker; + + pollset_t (const pollset_t&); + const pollset_t &operator = (const pollset_t&); + }; + + typedef pollset_t poller_t; + +} + +#endif + +#endif