aboutsummaryrefslogtreecommitdiff
path: root/dep/src/zthread/PoolExecutor.cxx
diff options
context:
space:
mode:
authorChaz Brown <iamparadox@netscape.net>2009-09-13 06:18:26 -0400
committerChaz Brown <iamparadox@netscape.net>2009-09-13 06:18:26 -0400
commita02bb96290c09658c0f54e9fa9828eafe778f079 (patch)
treeee7443cea56991c9ba56bc576d8ec8539dd079b4 /dep/src/zthread/PoolExecutor.cxx
parente08ddd3096660c871d1b5acc03889c9be3e8ada3 (diff)
Remove the remaining obsolete Zthread code Part3 Really this time.
--HG-- branch : trunk
Diffstat (limited to 'dep/src/zthread/PoolExecutor.cxx')
-rw-r--r--dep/src/zthread/PoolExecutor.cxx629
1 files changed, 0 insertions, 629 deletions
diff --git a/dep/src/zthread/PoolExecutor.cxx b/dep/src/zthread/PoolExecutor.cxx
deleted file mode 100644
index 82ace996035..00000000000
--- a/dep/src/zthread/PoolExecutor.cxx
+++ /dev/null
@@ -1,629 +0,0 @@
-/*
- * Copyright (c) 2005, Eric Crahen
- *
- * Permission is hereby granted, free of charge, to any person obtaining a copy
- * of this software and associated documentation files (the "Software"), to deal
- * in the Software without restriction, including without limitation the rights
- * to use, copy, modify, merge, publish, distribute, sublicense, and/or sell
- * copies of the Software, and to permit persons to whom the Software is furnished
- * to do so, subject to the following conditions:
- *
- * The above copyright notice and this permission notice shall be included in all
- * copies or substantial portions of the Software.
- *
- * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
- * IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
- * FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
- * AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER LIABILITY,
- * WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, OUT OF OR IN
- * CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE SOFTWARE.
- *
- */
-
-#include "ThreadImpl.h"
-#include "zthread/PoolExecutor.h"
-#include "zthread/MonitoredQueue.h"
-#include "zthread/FastMutex.h"
-#include "ThreadImpl.h"
-#include "ThreadQueue.h"
-
-#include <algorithm>
-#include <deque>
-#include <utility>
-
-using namespace ZThread;
-
-namespace ZThread {
-
- namespace {
-
- /**
- */
- class WaiterQueue {
-
- typedef std::deque<ThreadImpl*> ThreadList;
-
- typedef struct group_t {
- size_t id;
- size_t count;
- ThreadList waiters;
- group_t(size_t n) : id(n), count(0) {}
- } Group;
-
- typedef std::deque<Group> GroupList;
-
- //! Predicate to find a specific group
- struct by_id : public std::unary_function<bool, Group> {
- size_t id;
- by_id(size_t n) : id(n) {}
- bool operator()(const Group& grp) {
- return grp.id == id;
- }
- };
-
- //! Functor to count groups
- struct counter : public std::unary_function<void, Group> {
- size_t count;
- counter() : count(0) {}
- void operator()(const Group& grp) { count += grp.count; }
- operator size_t() { return count; }
- };
-
- FastMutex _lock;
- GroupList _list;
- size_t _id;
- size_t _generation;
-
- public:
-
- WaiterQueue() : _id(0), _generation(0) {
- // At least one empty-group exists
- _list.push_back( Group(_id++) );
- }
-
- /**
- * Insert the current thread into the current waiter list
- *
- * @pre At least one empty group exists
- * @post At least one empty group exists
- */
- bool wait(unsigned long timeout) {
-
- ThreadImpl* current = ThreadImpl::current();
- Monitor& m = current->getMonitor();
-
- Monitor::STATE state;
-
- Guard<FastMutex> g1(_lock);
-
- // At least one empty-group exists
- assert(!_list.empty());
-
- // Return w/o waiting if there are no executing tasks
- if((size_t)std::for_each(_list.begin(), _list.end(), counter()) < 1)
- return true;
-
- // Update the waiter list for the active group
- _list.back().waiters.push_back(current);
- size_t n = _list.back().id;
-
- m.acquire();
-
- {
-
- Guard<FastMutex, UnlockedScope> g2(g1);
- state = timeout == 0 ? m.wait() : m.wait(timeout);
-
- }
-
- m.release();
-
- // If awoke due to a reason other than the last task in the group 'n' completing,
- // then then find the group 'current' is waiting in
- GroupList::iterator i = std::find_if(_list.begin(), _list.end(), by_id(n));
- if(i != _list.end()) {
-
- // Remove 'current' from that list if it is still a member
- ThreadList::iterator j = std::find(i->waiters.begin(), i->waiters.end(), current);
- if(j != i->waiters.end())
- i->waiters.erase(j);
-
- }
-
- // At least one empty-group exists
- assert(!_list.empty());
-
- switch(state) {
- case Monitor::SIGNALED:
- break;
- case Monitor::TIMEDOUT:
- return false;
- case Monitor::INTERRUPTED:
- throw Interrupted_Exception();
- default:
- throw Synchronization_Exception();
- }
-
- return true;
-
- }
-
- /**
- * Increment the active group count
- *
- * @pre at least 1 empty group exists
- * @post at least 1 non-empty group exists
- */
- std::pair<size_t, size_t> increment() {
-
- Guard<FastMutex> g(_lock);
-
- // At least one empty-group exists
- assert(!_list.empty());
-
- GroupList::iterator i = --_list.end();
- size_t n = i->id;
-
- if(i == _list.end()) {
-
- // A group should never have been removed until
- // the final task in that group completed
- assert(0);
-
- }
-
- i->count++;
-
- // When the active group is being incremented, insert a new active group
- // to replace it if there were waiting threads
- if(i == --_list.end() && !i->waiters.empty())
- _list.push_back(Group(_id++));
-
- // At least 1 non-empty group exists
- assert((size_t)std::for_each(_list.begin(), _list.end(), counter()) > 0);
-
- return std::make_pair(n, _generation);
-
- }
-
-
- /**
- * Decrease the count for the group with the given id.
- *
- * @param n group id
- *
- * @pre At least 1 non-empty group exists
- * @post At least 1 empty group exists
- */
- void decrement(size_t n) {
-
- Guard<FastMutex> g1(_lock);
-
- // At least 1 non-empty group exists
- assert((size_t)std::for_each(_list.begin(), _list.end(), counter()) > 0);
-
- // Find the requested group
- GroupList::iterator i = std::find_if(_list.begin(), _list.end(), by_id(n));
- if(i == _list.end()) {
-
- // A group should never have been removed until
- // the final task in that group completed
- assert(0);
-
- }
-
- // Decrease the count for tasks in this group,
- if(--i->count == 0 && i == _list.begin()) {
-
- do {
-
- // When the first group completes, wake all waiters for every
- // group, starting from the first until a group that is not
- // complete is reached
-
- /*
- // Don't remove the empty active group
- if(i == --_list.end() && i->waiters.empty())
- break;
- */
-
- if( awaken(*i) ) {
-
- // If all waiters were awakened, remove the group
- i = _list.erase(i);
-
- } else {
-
- {
-
- // Otherwise, unlock and yield allowing the waiter
- // lists to be updated if other threads are busy
- Guard<FastLock, UnlockedScope> g2(g1);
- ThreadImpl::yield();
-
- }
-
- i = _list.begin();
-
- }
-
- } while(i != _list.end() && i->count == 0);
-
- // Ensure that an active group exists
- if(_list.empty())
- _list.push_back( Group(++_id) );
-
- }
-
- // At least one group exists
- assert(!_list.empty());
-
- }
-
- /**
- */
- size_t generation(bool next = false) {
-
- Guard<FastMutex> g(_lock);
- return next ? _generation++ : _generation;
-
- }
-
- private:
-
- /**
- * Awaken all the waiters remaining in the given group
- *
- * @return
- * - true if all the waiting threads were successfully awakened.
- * - false if there were one or more threads that could not be awakened.
- */
- bool awaken(Group& grp) {
-
- // Go through the waiter list in the given group;
- for(ThreadList::iterator i = grp.waiters.begin(); i != grp.waiters.end();) {
-
- ThreadImpl* impl = *i;
- Monitor& m = impl->getMonitor();
-
- // Try the monitor lock, if it cant be locked skip to the next waiter
- if(m.tryAcquire()) {
-
- // Notify the monitor & remove from the waiter list so time isn't
- // wasted checking it again.
- i = grp.waiters.erase(i);
-
- // Try to wake the waiter, it doesn't matter if this is successful
- // or not (only fails when the monitor is already going to stop waiting).
- m.notify();
- m.release();
-
- } else ++i;
-
- }
-
- return grp.waiters.empty();
-
- }
-
- };
-
- /**
- * @class GroupedRunnable
- *
- * Wrap a task with group and generation information.
- *
- * - 'group' allows tasks to be grouped together so that lists of waiting
- * threads can be managed.
- *
- * - 'generation' allows tasks to be interrupted
- */
- class GroupedRunnable : public Runnable {
-
- Task _task;
- WaiterQueue& _queue;
-
- size_t _group;
- size_t _generation;
-
- public:
-
- GroupedRunnable(const Task& task, WaiterQueue& queue)
- : _task(task), _queue(queue) {
-
- std::pair<size_t, size_t> pr( _queue.increment() );
-
- _group = pr.first;
- _generation = pr.second;
-
- }
-
- size_t group() const {
- return _group;
- }
-
- size_t generation() const {
- return _generation;
- }
-
- void run() {
-
- try {
-
- _task->run();
-
- } catch(...) {
-
- }
-
- _queue.decrement( group() );
-
- }
-
- };
-
- typedef CountedPtr<GroupedRunnable, size_t> ExecutorTask;
-
- /**
- *
- */
- class ExecutorImpl {
-
- typedef MonitoredQueue<ExecutorTask, FastMutex> TaskQueue;
- typedef std::deque<ThreadImpl*> ThreadList;
-
- TaskQueue _taskQueue;
- WaiterQueue _waitingQueue;
-
- ThreadList _threads;
- volatile size_t _size;
-
-
- public:
-
- ExecutorImpl() : _size(0) {}
-
-
- void registerThread() {
-
- Guard<TaskQueue> g(_taskQueue);
-
- ThreadImpl* impl = ThreadImpl::current();
- _threads.push_back(impl);
-
- // current cancel if too many threads are being created
- if(_threads.size() > _size)
- impl->cancel();
-
- }
-
- void unregisterThread() {
-
- Guard<TaskQueue> g(_taskQueue);
- std::remove(_threads.begin(), _threads.end(), ThreadImpl::current());
-
- }
-
- void execute(const Task& task) {
-
- // Wrap the task with a grouped task
- GroupedRunnable* runnable = new GroupedRunnable(task, _waitingQueue);
-
- try {
-
- _taskQueue.add( ExecutorTask(runnable) );
-
- } catch(...) {
-
- // Incase the queue is canceled between the time the WaiterQueue is
- // updated and the task is added to the TaskQueue
- _waitingQueue.decrement( runnable->group() );
- throw;
-
- }
-
- }
-
- void interrupt() {
-
- // Bump the generation number
- _waitingQueue.generation(true);
-
- Guard<TaskQueue> g(_taskQueue);
-
- // Interrupt all threads currently running, thier tasks would be
- // from an older generation
- for(ThreadList::iterator i = _threads.begin(); i != _threads.end(); ++i)
- (*i)->interrupt();
-
- }
-
- //! Adjust the number of desired workers and return the number of Threads needed
- size_t workers(size_t n) {
-
- Guard<TaskQueue> g(_taskQueue);
-
- size_t m = (_size < n) ? (n - _size) : 0;
- _size = n;
-
- return m;
-
- }
-
- size_t workers() {
-
- Guard<TaskQueue> g(_taskQueue);
- return _size;
-
- }
-
- ExecutorTask next() {
-
- ExecutorTask task;
-
- // Draw the task from the queue
- for(;;) {
-
- try {
-
- task = _taskQueue.next();
- break;
-
- } catch(Interrupted_Exception&) {
-
- // Ignore interruption here, it can only come from
- // another thread interrupt()ing the executor. The
- // thread was interrupted in the hopes it was busy
- // with a task
-
- }
-
- }
-
- // Interrupt the thread running the tasks when the generation
- // does not match the current generation
- if( task->generation() != _waitingQueue.generation() )
- ThreadImpl::current()->interrupt();
-
- // Otherwise, clear the interrupted status for the thread and
- // give it a clean slate to start with
- else
- ThreadImpl::current()->isInterrupted();
-
- return task;
-
- }
-
- bool isCanceled() {
- return _taskQueue.isCanceled();
- }
-
- void cancel() {
- _taskQueue.cancel();
- }
-
- bool wait(unsigned long timeout) {
- return _waitingQueue.wait(timeout);
- }
-
- };
-
- //! Executor job
- class Worker : public Runnable {
-
- CountedPtr< ExecutorImpl > _impl;
-
- public:
-
- //! Create a Worker that draws upon the given Queue
- Worker(const CountedPtr< ExecutorImpl >& impl)
- : _impl(impl) { }
-
- //! Run until Thread or Queue are canceled
- void run() {
-
- _impl->registerThread();
-
- // Run until the Queue is canceled
- while(!Thread::canceled()) {
-
- // Draw tasks from the queue
- ExecutorTask task( _impl->next() );
- task->run();
-
- }
-
- _impl->unregisterThread();
-
- }
-
- }; /* Worker */
-
-
- //! Helper
- class Shutdown : public Runnable {
-
- CountedPtr< ExecutorImpl > _impl;
-
- public:
-
- Shutdown(const CountedPtr< ExecutorImpl >& impl)
- : _impl(impl) { }
-
- void run() {
- _impl->cancel();
- }
-
- }; /* Shutdown */
-
- }
-
- PoolExecutor::PoolExecutor(size_t n)
- : _impl( new ExecutorImpl() ), _shutdown( new Shutdown(_impl) ) {
-
- size(n);
-
- // Request cancelation when main() exits
- ThreadQueue::instance()->insertShutdownTask(_shutdown);
-
- }
-
- PoolExecutor::~PoolExecutor() {
-
- try {
-
- /**
- * If the shutdown task for this executor has not already been
- * selected to run, then run it locally
- */
- if(ThreadQueue::instance()->removeShutdownTask(_shutdown))
- _shutdown->run();
-
- } catch(...) { }
-
- }
-
- void PoolExecutor::interrupt() {
- _impl->interrupt();
- }
-
- void PoolExecutor::size(size_t n) {
-
- if(n < 1)
- throw InvalidOp_Exception();
-
- for(size_t m = _impl->workers(n); m > 0; --m)
- Thread t(new Worker(_impl));
-
- }
-
- size_t PoolExecutor::size() {
- return _impl->workers();
- }
-
-
- void PoolExecutor::execute(const Task& task) {
-
- // Enqueue the task, the Queue will reject it with a
- // Cancelation_Exception if the Executor has been canceled
- _impl->execute(task);
-
- }
-
- void PoolExecutor::cancel() {
- _impl->cancel();
- }
-
- bool PoolExecutor::isCanceled() {
- return _impl->isCanceled();
- }
-
- void PoolExecutor::wait() {
- _impl->wait(0);
- }
-
- bool PoolExecutor::wait(unsigned long timeout) {
- return _impl->wait(timeout == 0 ? 1 : timeout);
- }
-
-}