diff options
author | Neo2003 <none@none> | 2008-10-02 16:23:55 -0500 |
---|---|---|
committer | Neo2003 <none@none> | 2008-10-02 16:23:55 -0500 |
commit | 9b1c0e006f20091f28f3f468cfcab1feb51286bd (patch) | |
tree | b5d1ba94a656e6679f8737f9ea6bed1239b73b14 /dep/src/zthread/PoolExecutor.cxx |
[svn] * Proper SVN structureinit
--HG--
branch : trunk
Diffstat (limited to 'dep/src/zthread/PoolExecutor.cxx')
-rw-r--r-- | dep/src/zthread/PoolExecutor.cxx | 629 |
1 files changed, 629 insertions, 0 deletions
diff --git a/dep/src/zthread/PoolExecutor.cxx b/dep/src/zthread/PoolExecutor.cxx new file mode 100644 index 00000000000..cf84e145453 --- /dev/null +++ b/dep/src/zthread/PoolExecutor.cxx @@ -0,0 +1,629 @@ +/* + * Copyright (c) 2005, Eric Crahen + * + * Permission is hereby granted, free of charge, to any person obtaining a copy + * of this software and associated documentation files (the "Software"), to deal + * in the Software without restriction, including without limitation the rights + * to use, copy, modify, merge, publish, distribute, sublicense, and/or sell + * copies of the Software, and to permit persons to whom the Software is furnished + * to do so, subject to the following conditions: + * + * The above copyright notice and this permission notice shall be included in all + * copies or substantial portions of the Software. + * + * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR + * IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, + * FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE + * AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER LIABILITY, + * WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, OUT OF OR IN + * CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE SOFTWARE. + * + */ + +#include "ThreadImpl.h" +#include "zthread/PoolExecutor.h" +#include "zthread/MonitoredQueue.h" +#include "zthread/FastMutex.h" +#include "ThreadImpl.h" +#include "ThreadQueue.h" + +#include <algorithm> +#include <deque> +#include <utility> + +using namespace ZThread; + +namespace ZThread { + + namespace { + + /** + */ + class WaiterQueue { + + typedef std::deque<ThreadImpl*> ThreadList; + + typedef struct group_t { + size_t id; + size_t count; + ThreadList waiters; + group_t(size_t n) : id(n), count(0) {} + } Group; + + typedef std::deque<Group> GroupList; + + //! Predicate to find a specific group + struct by_id : public std::unary_function<bool, Group> { + size_t id; + by_id(size_t n) : id(n) {} + bool operator()(const Group& grp) { + return grp.id == id; + } + }; + + //! Functor to count groups + struct counter : public std::unary_function<void, Group> { + size_t count; + counter() : count(0) {} + void operator()(const Group& grp) { count += grp.count; } + operator size_t() { return count; } + }; + + FastMutex _lock; + GroupList _list; + size_t _id; + size_t _generation; + + public: + + WaiterQueue() : _id(0), _generation(0) { + // At least one empty-group exists + _list.push_back( Group(_id++) ); + } + + /** + * Insert the current thread into the current waiter list + * + * @pre At least one empty group exists + * @post At least one empty group exists + */ + bool wait(unsigned long timeout) { + + ThreadImpl* current = ThreadImpl::current(); + Monitor& m = current->getMonitor(); + + Monitor::STATE state; + + Guard<FastMutex> g1(_lock); + + // At least one empty-group exists + assert(!_list.empty()); + + // Return w/o waiting if there are no executing tasks + if((size_t)std::for_each(_list.begin(), _list.end(), counter()) < 1) + return true; + + // Update the waiter list for the active group + _list.back().waiters.push_back(current); + size_t n = _list.back().id; + + m.acquire(); + + { + + Guard<FastMutex, UnlockedScope> g2(g1); + state = timeout == 0 ? m.wait() : m.wait(timeout); + + } + + m.release(); + + // If awoke due to a reason other than the last task in the group 'n' completing, + // then then find the group 'current' is waiting in + GroupList::iterator i = std::find_if(_list.begin(), _list.end(), by_id(n)); + if(i != _list.end()) { + + // Remove 'current' from that list if it is still a member + ThreadList::iterator j = std::find(i->waiters.begin(), i->waiters.end(), current); + if(j != i->waiters.end()) + i->waiters.erase(j); + + } + + // At least one empty-group exists + assert(!_list.empty()); + + switch(state) { + case Monitor::SIGNALED: + break; + case Monitor::TIMEDOUT: + return false; + case Monitor::INTERRUPTED: + throw Interrupted_Exception(); + default: + throw Synchronization_Exception(); + } + + return true; + + } + + /** + * Increment the active group count + * + * @pre at least 1 empty group exists + * @post at least 1 non-empty group exists + */ + std::pair<size_t, size_t> increment() { + + Guard<FastMutex> g(_lock); + + // At least one empty-group exists + assert(!_list.empty()); + + GroupList::iterator i = --_list.end(); + size_t n = i->id; + + if(i == _list.end()) { + + // A group should never have been removed until + // the final task in that group completed + assert(0); + + } + + i->count++; + + // When the active group is being incremented, insert a new active group + // to replace it if there were waiting threads + if(i == --_list.end() && !i->waiters.empty()) + _list.push_back(Group(_id++)); + + // At least 1 non-empty group exists + assert((size_t)std::for_each(_list.begin(), _list.end(), counter()) > 0); + + return std::make_pair(n, _generation); + + } + + + /** + * Decrease the count for the group with the given id. + * + * @param n group id + * + * @pre At least 1 non-empty group exists + * @post At least 1 empty group exists + */ + void decrement(size_t n) { + + Guard<FastMutex> g1(_lock); + + // At least 1 non-empty group exists + assert((size_t)std::for_each(_list.begin(), _list.end(), counter()) > 0); + + // Find the requested group + GroupList::iterator i = std::find_if(_list.begin(), _list.end(), by_id(n)); + if(i == _list.end()) { + + // A group should never have been removed until + // the final task in that group completed + assert(0); + + } + + // Decrease the count for tasks in this group, + if(--i->count == 0 && i == _list.begin()) { + + do { + + // When the first group completes, wake all waiters for every + // group, starting from the first until a group that is not + // complete is reached + + /* + // Don't remove the empty active group + if(i == --_list.end() && i->waiters.empty()) + break; + */ + + if( awaken(*i) ) { + + // If all waiters were awakened, remove the group + i = _list.erase(i); + + } else { + + { + + // Otherwise, unlock and yield allowing the waiter + // lists to be updated if other threads are busy + Guard<FastLock, UnlockedScope> g2(g1); + ThreadImpl::yield(); + + } + + i = _list.begin(); + + } + + } while(i != _list.end() && i->count == 0); + + // Ensure that an active group exists + if(_list.empty()) + _list.push_back( Group(++_id) ); + + } + + // At least one group exists + assert(!_list.empty()); + + } + + /** + */ + size_t generation(bool next = false) { + + Guard<FastMutex> g(_lock); + return next ? _generation++ : _generation; + + } + + private: + + /** + * Awaken all the waiters remaining in the given group + * + * @return + * - true if all the waiting threads were successfully awakened. + * - false if there were one or more threads that could not be awakened. + */ + bool awaken(Group& grp) { + + // Go through the waiter list in the given group; + for(ThreadList::iterator i = grp.waiters.begin(); i != grp.waiters.end();) { + + ThreadImpl* impl = *i; + Monitor& m = impl->getMonitor(); + + // Try the monitor lock, if it cant be locked skip to the next waiter + if(m.tryAcquire()) { + + // Notify the monitor & remove from the waiter list so time isn't + // wasted checking it again. + i = grp.waiters.erase(i); + + // Try to wake the waiter, it doesn't matter if this is successful + // or not (only fails when the monitor is already going to stop waiting). + m.notify(); + m.release(); + + } else ++i; + + } + + return grp.waiters.empty(); + + } + + }; + + /** + * @class GroupedRunnable + * + * Wrap a task with group and generation information. + * + * - 'group' allows tasks to be grouped together so that lists of waiting + * threads can be managed. + * + * - 'generation' allows tasks to be interrupted + */ + class GroupedRunnable : public Runnable { + + Task _task; + WaiterQueue& _queue; + + size_t _group; + size_t _generation; + + public: + + GroupedRunnable(const Task& task, WaiterQueue& queue) + : _task(task), _queue(queue) { + + std::pair<size_t, size_t> pr( _queue.increment() ); + + _group = pr.first; + _generation = pr.second; + + } + + size_t group() const { + return _group; + } + + size_t generation() const { + return _generation; + } + + void run() { + + try { + + _task->run(); + + } catch(...) { + + } + + _queue.decrement( group() ); + + } + + }; + + typedef CountedPtr<GroupedRunnable, size_t> ExecutorTask; + + /** + * + */ + class ExecutorImpl { + + typedef MonitoredQueue<ExecutorTask, FastMutex> TaskQueue; + typedef std::deque<ThreadImpl*> ThreadList; + + TaskQueue _taskQueue; + WaiterQueue _waitingQueue; + + ThreadList _threads; + volatile size_t _size; + + + public: + + ExecutorImpl() : _size(0) {} + + + void registerThread() { + + Guard<TaskQueue> g(_taskQueue); + + ThreadImpl* impl = ThreadImpl::current(); + _threads.push_back(impl); + + // current cancel if too many threads are being created + if(_threads.size() > _size) + impl->cancel(); + + } + + void unregisterThread() { + + Guard<TaskQueue> g(_taskQueue); + std::remove(_threads.begin(), _threads.end(), ThreadImpl::current()); + + } + + void execute(const Task& task) { + + // Wrap the task with a grouped task + GroupedRunnable* runnable = new GroupedRunnable(task, _waitingQueue); + + try { + + _taskQueue.add( ExecutorTask(runnable) ); + + } catch(...) { + + // Incase the queue is canceled between the time the WaiterQueue is + // updated and the task is added to the TaskQueue + _waitingQueue.decrement( runnable->group() ); + throw; + + } + + } + + void interrupt() { + + // Bump the generation number + _waitingQueue.generation(true); + + Guard<TaskQueue> g(_taskQueue); + + // Interrupt all threads currently running, thier tasks would be + // from an older generation + for(ThreadList::iterator i = _threads.begin(); i != _threads.end(); ++i) + (*i)->interrupt(); + + } + + //! Adjust the number of desired workers and return the number of Threads needed + size_t workers(size_t n) { + + Guard<TaskQueue> g(_taskQueue); + + size_t m = (_size < n) ? (n - _size) : 0; + _size = n; + + return m; + + } + + size_t workers() { + + Guard<TaskQueue> g(_taskQueue); + return _size; + + } + + ExecutorTask next() { + + ExecutorTask task; + + // Draw the task from the queue + for(;;) { + + try { + + task = _taskQueue.next(); + break; + + } catch(Interrupted_Exception&) { + + // Ignore interruption here, it can only come from + // another thread interrupt()ing the executor. The + // thread was interrupted in the hopes it was busy + // with a task + + } + + } + + // Interrupt the thread running the tasks when the generation + // does not match the current generation + if( task->generation() != _waitingQueue.generation() ) + ThreadImpl::current()->interrupt(); + + // Otherwise, clear the interrupted status for the thread and + // give it a clean slate to start with + else + ThreadImpl::current()->isInterrupted(); + + return task; + + } + + bool isCanceled() { + return _taskQueue.isCanceled(); + } + + void cancel() { + _taskQueue.cancel(); + } + + bool wait(unsigned long timeout) { + return _waitingQueue.wait(timeout); + } + + }; + + //! Executor job + class Worker : public Runnable { + + CountedPtr< ExecutorImpl > _impl; + + public: + + //! Create a Worker that draws upon the given Queue + Worker(const CountedPtr< ExecutorImpl >& impl) + : _impl(impl) { } + + //! Run until Thread or Queue are canceled + void run() { + + _impl->registerThread(); + + // Run until the Queue is canceled + while(!Thread::canceled()) { + + // Draw tasks from the queue + ExecutorTask task( _impl->next() ); + task->run(); + + } + + _impl->unregisterThread(); + + } + + }; /* Worker */ + + + //! Helper + class Shutdown : public Runnable { + + CountedPtr< ExecutorImpl > _impl; + + public: + + Shutdown(const CountedPtr< ExecutorImpl >& impl) + : _impl(impl) { } + + void run() { + _impl->cancel(); + } + + }; /* Shutdown */ + + } + + PoolExecutor::PoolExecutor(size_t n) + : _impl( new ExecutorImpl() ), _shutdown( new Shutdown(_impl) ) { + + size(n); + + // Request cancelation when main() exits + ThreadQueue::instance()->insertShutdownTask(_shutdown); + + } + + PoolExecutor::~PoolExecutor() { + + try { + + /** + * If the shutdown task for this executor has not already been + * selected to run, then run it locally + */ + if(ThreadQueue::instance()->removeShutdownTask(_shutdown)) + _shutdown->run(); + + } catch(...) { } + + } + + void PoolExecutor::interrupt() { + _impl->interrupt(); + } + + void PoolExecutor::size(size_t n) { + + if(n < 1) + throw InvalidOp_Exception(); + + for(size_t m = _impl->workers(n); m > 0; --m) + Thread t(new Worker(_impl)); + + } + + size_t PoolExecutor::size() { + return _impl->workers(); + } + + + void PoolExecutor::execute(const Task& task) { + + // Enqueue the task, the Queue will reject it with a + // Cancelation_Exception if the Executor has been canceled + _impl->execute(task); + + } + + void PoolExecutor::cancel() { + _impl->cancel(); + } + + bool PoolExecutor::isCanceled() { + return _impl->isCanceled(); + } + + void PoolExecutor::wait() { + _impl->wait(0); + } + + bool PoolExecutor::wait(unsigned long timeout) { + return _impl->wait(timeout == 0 ? 1 : timeout); + } + +} |