aboutsummaryrefslogtreecommitdiff
path: root/dep/src/zthread/PoolExecutor.cxx
diff options
context:
space:
mode:
authorNeo2003 <none@none>2008-10-02 16:23:55 -0500
committerNeo2003 <none@none>2008-10-02 16:23:55 -0500
commit9b1c0e006f20091f28f3f468cfcab1feb51286bd (patch)
treeb5d1ba94a656e6679f8737f9ea6bed1239b73b14 /dep/src/zthread/PoolExecutor.cxx
[svn] * Proper SVN structureinit
--HG-- branch : trunk
Diffstat (limited to 'dep/src/zthread/PoolExecutor.cxx')
-rw-r--r--dep/src/zthread/PoolExecutor.cxx629
1 files changed, 629 insertions, 0 deletions
diff --git a/dep/src/zthread/PoolExecutor.cxx b/dep/src/zthread/PoolExecutor.cxx
new file mode 100644
index 00000000000..cf84e145453
--- /dev/null
+++ b/dep/src/zthread/PoolExecutor.cxx
@@ -0,0 +1,629 @@
+/*
+ * Copyright (c) 2005, Eric Crahen
+ *
+ * Permission is hereby granted, free of charge, to any person obtaining a copy
+ * of this software and associated documentation files (the "Software"), to deal
+ * in the Software without restriction, including without limitation the rights
+ * to use, copy, modify, merge, publish, distribute, sublicense, and/or sell
+ * copies of the Software, and to permit persons to whom the Software is furnished
+ * to do so, subject to the following conditions:
+ *
+ * The above copyright notice and this permission notice shall be included in all
+ * copies or substantial portions of the Software.
+ *
+ * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
+ * IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
+ * FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
+ * AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER LIABILITY,
+ * WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, OUT OF OR IN
+ * CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE SOFTWARE.
+ *
+ */
+
+#include "ThreadImpl.h"
+#include "zthread/PoolExecutor.h"
+#include "zthread/MonitoredQueue.h"
+#include "zthread/FastMutex.h"
+#include "ThreadImpl.h"
+#include "ThreadQueue.h"
+
+#include <algorithm>
+#include <deque>
+#include <utility>
+
+using namespace ZThread;
+
+namespace ZThread {
+
+ namespace {
+
+ /**
+ */
+ class WaiterQueue {
+
+ typedef std::deque<ThreadImpl*> ThreadList;
+
+ typedef struct group_t {
+ size_t id;
+ size_t count;
+ ThreadList waiters;
+ group_t(size_t n) : id(n), count(0) {}
+ } Group;
+
+ typedef std::deque<Group> GroupList;
+
+ //! Predicate to find a specific group
+ struct by_id : public std::unary_function<bool, Group> {
+ size_t id;
+ by_id(size_t n) : id(n) {}
+ bool operator()(const Group& grp) {
+ return grp.id == id;
+ }
+ };
+
+ //! Functor to count groups
+ struct counter : public std::unary_function<void, Group> {
+ size_t count;
+ counter() : count(0) {}
+ void operator()(const Group& grp) { count += grp.count; }
+ operator size_t() { return count; }
+ };
+
+ FastMutex _lock;
+ GroupList _list;
+ size_t _id;
+ size_t _generation;
+
+ public:
+
+ WaiterQueue() : _id(0), _generation(0) {
+ // At least one empty-group exists
+ _list.push_back( Group(_id++) );
+ }
+
+ /**
+ * Insert the current thread into the current waiter list
+ *
+ * @pre At least one empty group exists
+ * @post At least one empty group exists
+ */
+ bool wait(unsigned long timeout) {
+
+ ThreadImpl* current = ThreadImpl::current();
+ Monitor& m = current->getMonitor();
+
+ Monitor::STATE state;
+
+ Guard<FastMutex> g1(_lock);
+
+ // At least one empty-group exists
+ assert(!_list.empty());
+
+ // Return w/o waiting if there are no executing tasks
+ if((size_t)std::for_each(_list.begin(), _list.end(), counter()) < 1)
+ return true;
+
+ // Update the waiter list for the active group
+ _list.back().waiters.push_back(current);
+ size_t n = _list.back().id;
+
+ m.acquire();
+
+ {
+
+ Guard<FastMutex, UnlockedScope> g2(g1);
+ state = timeout == 0 ? m.wait() : m.wait(timeout);
+
+ }
+
+ m.release();
+
+ // If awoke due to a reason other than the last task in the group 'n' completing,
+ // then then find the group 'current' is waiting in
+ GroupList::iterator i = std::find_if(_list.begin(), _list.end(), by_id(n));
+ if(i != _list.end()) {
+
+ // Remove 'current' from that list if it is still a member
+ ThreadList::iterator j = std::find(i->waiters.begin(), i->waiters.end(), current);
+ if(j != i->waiters.end())
+ i->waiters.erase(j);
+
+ }
+
+ // At least one empty-group exists
+ assert(!_list.empty());
+
+ switch(state) {
+ case Monitor::SIGNALED:
+ break;
+ case Monitor::TIMEDOUT:
+ return false;
+ case Monitor::INTERRUPTED:
+ throw Interrupted_Exception();
+ default:
+ throw Synchronization_Exception();
+ }
+
+ return true;
+
+ }
+
+ /**
+ * Increment the active group count
+ *
+ * @pre at least 1 empty group exists
+ * @post at least 1 non-empty group exists
+ */
+ std::pair<size_t, size_t> increment() {
+
+ Guard<FastMutex> g(_lock);
+
+ // At least one empty-group exists
+ assert(!_list.empty());
+
+ GroupList::iterator i = --_list.end();
+ size_t n = i->id;
+
+ if(i == _list.end()) {
+
+ // A group should never have been removed until
+ // the final task in that group completed
+ assert(0);
+
+ }
+
+ i->count++;
+
+ // When the active group is being incremented, insert a new active group
+ // to replace it if there were waiting threads
+ if(i == --_list.end() && !i->waiters.empty())
+ _list.push_back(Group(_id++));
+
+ // At least 1 non-empty group exists
+ assert((size_t)std::for_each(_list.begin(), _list.end(), counter()) > 0);
+
+ return std::make_pair(n, _generation);
+
+ }
+
+
+ /**
+ * Decrease the count for the group with the given id.
+ *
+ * @param n group id
+ *
+ * @pre At least 1 non-empty group exists
+ * @post At least 1 empty group exists
+ */
+ void decrement(size_t n) {
+
+ Guard<FastMutex> g1(_lock);
+
+ // At least 1 non-empty group exists
+ assert((size_t)std::for_each(_list.begin(), _list.end(), counter()) > 0);
+
+ // Find the requested group
+ GroupList::iterator i = std::find_if(_list.begin(), _list.end(), by_id(n));
+ if(i == _list.end()) {
+
+ // A group should never have been removed until
+ // the final task in that group completed
+ assert(0);
+
+ }
+
+ // Decrease the count for tasks in this group,
+ if(--i->count == 0 && i == _list.begin()) {
+
+ do {
+
+ // When the first group completes, wake all waiters for every
+ // group, starting from the first until a group that is not
+ // complete is reached
+
+ /*
+ // Don't remove the empty active group
+ if(i == --_list.end() && i->waiters.empty())
+ break;
+ */
+
+ if( awaken(*i) ) {
+
+ // If all waiters were awakened, remove the group
+ i = _list.erase(i);
+
+ } else {
+
+ {
+
+ // Otherwise, unlock and yield allowing the waiter
+ // lists to be updated if other threads are busy
+ Guard<FastLock, UnlockedScope> g2(g1);
+ ThreadImpl::yield();
+
+ }
+
+ i = _list.begin();
+
+ }
+
+ } while(i != _list.end() && i->count == 0);
+
+ // Ensure that an active group exists
+ if(_list.empty())
+ _list.push_back( Group(++_id) );
+
+ }
+
+ // At least one group exists
+ assert(!_list.empty());
+
+ }
+
+ /**
+ */
+ size_t generation(bool next = false) {
+
+ Guard<FastMutex> g(_lock);
+ return next ? _generation++ : _generation;
+
+ }
+
+ private:
+
+ /**
+ * Awaken all the waiters remaining in the given group
+ *
+ * @return
+ * - true if all the waiting threads were successfully awakened.
+ * - false if there were one or more threads that could not be awakened.
+ */
+ bool awaken(Group& grp) {
+
+ // Go through the waiter list in the given group;
+ for(ThreadList::iterator i = grp.waiters.begin(); i != grp.waiters.end();) {
+
+ ThreadImpl* impl = *i;
+ Monitor& m = impl->getMonitor();
+
+ // Try the monitor lock, if it cant be locked skip to the next waiter
+ if(m.tryAcquire()) {
+
+ // Notify the monitor & remove from the waiter list so time isn't
+ // wasted checking it again.
+ i = grp.waiters.erase(i);
+
+ // Try to wake the waiter, it doesn't matter if this is successful
+ // or not (only fails when the monitor is already going to stop waiting).
+ m.notify();
+ m.release();
+
+ } else ++i;
+
+ }
+
+ return grp.waiters.empty();
+
+ }
+
+ };
+
+ /**
+ * @class GroupedRunnable
+ *
+ * Wrap a task with group and generation information.
+ *
+ * - 'group' allows tasks to be grouped together so that lists of waiting
+ * threads can be managed.
+ *
+ * - 'generation' allows tasks to be interrupted
+ */
+ class GroupedRunnable : public Runnable {
+
+ Task _task;
+ WaiterQueue& _queue;
+
+ size_t _group;
+ size_t _generation;
+
+ public:
+
+ GroupedRunnable(const Task& task, WaiterQueue& queue)
+ : _task(task), _queue(queue) {
+
+ std::pair<size_t, size_t> pr( _queue.increment() );
+
+ _group = pr.first;
+ _generation = pr.second;
+
+ }
+
+ size_t group() const {
+ return _group;
+ }
+
+ size_t generation() const {
+ return _generation;
+ }
+
+ void run() {
+
+ try {
+
+ _task->run();
+
+ } catch(...) {
+
+ }
+
+ _queue.decrement( group() );
+
+ }
+
+ };
+
+ typedef CountedPtr<GroupedRunnable, size_t> ExecutorTask;
+
+ /**
+ *
+ */
+ class ExecutorImpl {
+
+ typedef MonitoredQueue<ExecutorTask, FastMutex> TaskQueue;
+ typedef std::deque<ThreadImpl*> ThreadList;
+
+ TaskQueue _taskQueue;
+ WaiterQueue _waitingQueue;
+
+ ThreadList _threads;
+ volatile size_t _size;
+
+
+ public:
+
+ ExecutorImpl() : _size(0) {}
+
+
+ void registerThread() {
+
+ Guard<TaskQueue> g(_taskQueue);
+
+ ThreadImpl* impl = ThreadImpl::current();
+ _threads.push_back(impl);
+
+ // current cancel if too many threads are being created
+ if(_threads.size() > _size)
+ impl->cancel();
+
+ }
+
+ void unregisterThread() {
+
+ Guard<TaskQueue> g(_taskQueue);
+ std::remove(_threads.begin(), _threads.end(), ThreadImpl::current());
+
+ }
+
+ void execute(const Task& task) {
+
+ // Wrap the task with a grouped task
+ GroupedRunnable* runnable = new GroupedRunnable(task, _waitingQueue);
+
+ try {
+
+ _taskQueue.add( ExecutorTask(runnable) );
+
+ } catch(...) {
+
+ // Incase the queue is canceled between the time the WaiterQueue is
+ // updated and the task is added to the TaskQueue
+ _waitingQueue.decrement( runnable->group() );
+ throw;
+
+ }
+
+ }
+
+ void interrupt() {
+
+ // Bump the generation number
+ _waitingQueue.generation(true);
+
+ Guard<TaskQueue> g(_taskQueue);
+
+ // Interrupt all threads currently running, thier tasks would be
+ // from an older generation
+ for(ThreadList::iterator i = _threads.begin(); i != _threads.end(); ++i)
+ (*i)->interrupt();
+
+ }
+
+ //! Adjust the number of desired workers and return the number of Threads needed
+ size_t workers(size_t n) {
+
+ Guard<TaskQueue> g(_taskQueue);
+
+ size_t m = (_size < n) ? (n - _size) : 0;
+ _size = n;
+
+ return m;
+
+ }
+
+ size_t workers() {
+
+ Guard<TaskQueue> g(_taskQueue);
+ return _size;
+
+ }
+
+ ExecutorTask next() {
+
+ ExecutorTask task;
+
+ // Draw the task from the queue
+ for(;;) {
+
+ try {
+
+ task = _taskQueue.next();
+ break;
+
+ } catch(Interrupted_Exception&) {
+
+ // Ignore interruption here, it can only come from
+ // another thread interrupt()ing the executor. The
+ // thread was interrupted in the hopes it was busy
+ // with a task
+
+ }
+
+ }
+
+ // Interrupt the thread running the tasks when the generation
+ // does not match the current generation
+ if( task->generation() != _waitingQueue.generation() )
+ ThreadImpl::current()->interrupt();
+
+ // Otherwise, clear the interrupted status for the thread and
+ // give it a clean slate to start with
+ else
+ ThreadImpl::current()->isInterrupted();
+
+ return task;
+
+ }
+
+ bool isCanceled() {
+ return _taskQueue.isCanceled();
+ }
+
+ void cancel() {
+ _taskQueue.cancel();
+ }
+
+ bool wait(unsigned long timeout) {
+ return _waitingQueue.wait(timeout);
+ }
+
+ };
+
+ //! Executor job
+ class Worker : public Runnable {
+
+ CountedPtr< ExecutorImpl > _impl;
+
+ public:
+
+ //! Create a Worker that draws upon the given Queue
+ Worker(const CountedPtr< ExecutorImpl >& impl)
+ : _impl(impl) { }
+
+ //! Run until Thread or Queue are canceled
+ void run() {
+
+ _impl->registerThread();
+
+ // Run until the Queue is canceled
+ while(!Thread::canceled()) {
+
+ // Draw tasks from the queue
+ ExecutorTask task( _impl->next() );
+ task->run();
+
+ }
+
+ _impl->unregisterThread();
+
+ }
+
+ }; /* Worker */
+
+
+ //! Helper
+ class Shutdown : public Runnable {
+
+ CountedPtr< ExecutorImpl > _impl;
+
+ public:
+
+ Shutdown(const CountedPtr< ExecutorImpl >& impl)
+ : _impl(impl) { }
+
+ void run() {
+ _impl->cancel();
+ }
+
+ }; /* Shutdown */
+
+ }
+
+ PoolExecutor::PoolExecutor(size_t n)
+ : _impl( new ExecutorImpl() ), _shutdown( new Shutdown(_impl) ) {
+
+ size(n);
+
+ // Request cancelation when main() exits
+ ThreadQueue::instance()->insertShutdownTask(_shutdown);
+
+ }
+
+ PoolExecutor::~PoolExecutor() {
+
+ try {
+
+ /**
+ * If the shutdown task for this executor has not already been
+ * selected to run, then run it locally
+ */
+ if(ThreadQueue::instance()->removeShutdownTask(_shutdown))
+ _shutdown->run();
+
+ } catch(...) { }
+
+ }
+
+ void PoolExecutor::interrupt() {
+ _impl->interrupt();
+ }
+
+ void PoolExecutor::size(size_t n) {
+
+ if(n < 1)
+ throw InvalidOp_Exception();
+
+ for(size_t m = _impl->workers(n); m > 0; --m)
+ Thread t(new Worker(_impl));
+
+ }
+
+ size_t PoolExecutor::size() {
+ return _impl->workers();
+ }
+
+
+ void PoolExecutor::execute(const Task& task) {
+
+ // Enqueue the task, the Queue will reject it with a
+ // Cancelation_Exception if the Executor has been canceled
+ _impl->execute(task);
+
+ }
+
+ void PoolExecutor::cancel() {
+ _impl->cancel();
+ }
+
+ bool PoolExecutor::isCanceled() {
+ return _impl->isCanceled();
+ }
+
+ void PoolExecutor::wait() {
+ _impl->wait(0);
+ }
+
+ bool PoolExecutor::wait(unsigned long timeout) {
+ return _impl->wait(timeout == 0 ? 1 : timeout);
+ }
+
+}