diff options
author | megamage <none@none> | 2009-02-18 11:36:19 -0600 |
---|---|---|
committer | megamage <none@none> | 2009-02-18 11:36:19 -0600 |
commit | 31637d4aac6ae2081ccb3db976cab17936019f1a (patch) | |
tree | a0e098d4afafaddcb901486d00da1563ecd628b9 /dep/src/zthread/PoolExecutor.cxx | |
parent | 017f309b5b5b981b6edfe0ae071971d513828a20 (diff) | |
parent | 7bff1c1d60b78ba78f2ab0a9c67bdde80427ea30 (diff) |
*Merge.
--HG--
branch : trunk
Diffstat (limited to 'dep/src/zthread/PoolExecutor.cxx')
-rw-r--r-- | dep/src/zthread/PoolExecutor.cxx | 204 |
1 files changed, 102 insertions, 102 deletions
diff --git a/dep/src/zthread/PoolExecutor.cxx b/dep/src/zthread/PoolExecutor.cxx index cf84e145453..82ace996035 100644 --- a/dep/src/zthread/PoolExecutor.cxx +++ b/dep/src/zthread/PoolExecutor.cxx @@ -42,7 +42,7 @@ namespace ZThread { class WaiterQueue { typedef std::deque<ThreadImpl*> ThreadList; - + typedef struct group_t { size_t id; size_t count; @@ -68,14 +68,14 @@ namespace ZThread { void operator()(const Group& grp) { count += grp.count; } operator size_t() { return count; } }; - + FastMutex _lock; GroupList _list; size_t _id; size_t _generation; public: - + WaiterQueue() : _id(0), _generation(0) { // At least one empty-group exists _list.push_back( Group(_id++) ); @@ -89,7 +89,7 @@ namespace ZThread { */ bool wait(unsigned long timeout) { - ThreadImpl* current = ThreadImpl::current(); + ThreadImpl* current = ThreadImpl::current(); Monitor& m = current->getMonitor(); Monitor::STATE state; @@ -103,15 +103,15 @@ namespace ZThread { if((size_t)std::for_each(_list.begin(), _list.end(), counter()) < 1) return true; - // Update the waiter list for the active group + // Update the waiter list for the active group _list.back().waiters.push_back(current); size_t n = _list.back().id; m.acquire(); - + { - Guard<FastMutex, UnlockedScope> g2(g1); + Guard<FastMutex, UnlockedScope> g2(g1); state = timeout == 0 ? m.wait() : m.wait(timeout); } @@ -142,22 +142,22 @@ namespace ZThread { throw Interrupted_Exception(); default: throw Synchronization_Exception(); - } - + } + return true; } - + /** * Increment the active group count * * @pre at least 1 empty group exists - * @post at least 1 non-empty group exists + * @post at least 1 non-empty group exists */ std::pair<size_t, size_t> increment() { - + Guard<FastMutex> g(_lock); - + // At least one empty-group exists assert(!_list.empty()); @@ -176,7 +176,7 @@ namespace ZThread { // When the active group is being incremented, insert a new active group // to replace it if there were waiting threads - if(i == --_list.end() && !i->waiters.empty()) + if(i == --_list.end() && !i->waiters.empty()) _list.push_back(Group(_id++)); // At least 1 non-empty group exists @@ -185,27 +185,27 @@ namespace ZThread { return std::make_pair(n, _generation); } - + /** * Decrease the count for the group with the given id. * * @param n group id - * + * * @pre At least 1 non-empty group exists * @post At least 1 empty group exists */ void decrement(size_t n) { - + Guard<FastMutex> g1(_lock); - + // At least 1 non-empty group exists assert((size_t)std::for_each(_list.begin(), _list.end(), counter()) > 0); // Find the requested group GroupList::iterator i = std::find_if(_list.begin(), _list.end(), by_id(n)); if(i == _list.end()) { - + // A group should never have been removed until // the final task in that group completed assert(0); @@ -214,11 +214,11 @@ namespace ZThread { // Decrease the count for tasks in this group, if(--i->count == 0 && i == _list.begin()) { - - do { + + do { // When the first group completes, wake all waiters for every - // group, starting from the first until a group that is not + // group, starting from the first until a group that is not // complete is reached /* @@ -226,29 +226,29 @@ namespace ZThread { if(i == --_list.end() && i->waiters.empty()) break; */ - + if( awaken(*i) ) { - - // If all waiters were awakened, remove the group + + // If all waiters were awakened, remove the group i = _list.erase(i); - + } else { - + { // Otherwise, unlock and yield allowing the waiter // lists to be updated if other threads are busy Guard<FastLock, UnlockedScope> g2(g1); ThreadImpl::yield(); - + } i = _list.begin(); } - - } while(i != _list.end() && i->count == 0); - + + } while(i != _list.end() && i->count == 0); + // Ensure that an active group exists if(_list.empty()) _list.push_back( Group(++_id) ); @@ -257,7 +257,7 @@ namespace ZThread { // At least one group exists assert(!_list.empty()); - + } /** @@ -268,40 +268,40 @@ namespace ZThread { return next ? _generation++ : _generation; } - + private: - + /** * Awaken all the waiters remaining in the given group - * + * * @return * - true if all the waiting threads were successfully awakened. * - false if there were one or more threads that could not be awakened. */ bool awaken(Group& grp) { - // Go through the waiter list in the given group; + // Go through the waiter list in the given group; for(ThreadList::iterator i = grp.waiters.begin(); i != grp.waiters.end();) { - + ThreadImpl* impl = *i; Monitor& m = impl->getMonitor(); - + // Try the monitor lock, if it cant be locked skip to the next waiter if(m.tryAcquire()) { - + // Notify the monitor & remove from the waiter list so time isn't // wasted checking it again. i = grp.waiters.erase(i); - + // Try to wake the waiter, it doesn't matter if this is successful - // or not (only fails when the monitor is already going to stop waiting). + // or not (only fails when the monitor is already going to stop waiting). m.notify(); m.release(); - + } else ++i; - + } - + return grp.waiters.empty(); } @@ -310,13 +310,13 @@ namespace ZThread { /** * @class GroupedRunnable - * - * Wrap a task with group and generation information. + * + * Wrap a task with group and generation information. * * - 'group' allows tasks to be grouped together so that lists of waiting * threads can be managed. * - * - 'generation' allows tasks to be interrupted + * - 'generation' allows tasks to be interrupted */ class GroupedRunnable : public Runnable { @@ -329,10 +329,10 @@ namespace ZThread { public: GroupedRunnable(const Task& task, WaiterQueue& queue) - : _task(task), _queue(queue) { - + : _task(task), _queue(queue) { + std::pair<size_t, size_t> pr( _queue.increment() ); - + _group = pr.first; _generation = pr.second; @@ -368,10 +368,10 @@ namespace ZThread { * */ class ExecutorImpl { - + typedef MonitoredQueue<ExecutorTask, FastMutex> TaskQueue; typedef std::deque<ThreadImpl*> ThreadList; - + TaskQueue _taskQueue; WaiterQueue _waitingQueue; @@ -380,19 +380,19 @@ namespace ZThread { public: - + ExecutorImpl() : _size(0) {} void registerThread() { - + Guard<TaskQueue> g(_taskQueue); ThreadImpl* impl = ThreadImpl::current(); _threads.push_back(impl); // current cancel if too many threads are being created - if(_threads.size() > _size) + if(_threads.size() > _size) impl->cancel(); } @@ -408,14 +408,14 @@ namespace ZThread { // Wrap the task with a grouped task GroupedRunnable* runnable = new GroupedRunnable(task, _waitingQueue); - + try { - + _taskQueue.add( ExecutorTask(runnable) ); } catch(...) { - // Incase the queue is canceled between the time the WaiterQueue is + // Incase the queue is canceled between the time the WaiterQueue is // updated and the task is added to the TaskQueue _waitingQueue.decrement( runnable->group() ); throw; @@ -430,41 +430,41 @@ namespace ZThread { _waitingQueue.generation(true); Guard<TaskQueue> g(_taskQueue); - + // Interrupt all threads currently running, thier tasks would be // from an older generation for(ThreadList::iterator i = _threads.begin(); i != _threads.end(); ++i) (*i)->interrupt(); - + } //! Adjust the number of desired workers and return the number of Threads needed size_t workers(size_t n) { - + Guard<TaskQueue> g(_taskQueue); size_t m = (_size < n) ? (n - _size) : 0; _size = n; - + return m; } - + size_t workers() { - + Guard<TaskQueue> g(_taskQueue); return _size; - + } - + ExecutorTask next() { - + ExecutorTask task; - + // Draw the task from the queue for(;;) { - try { + try { task = _taskQueue.next(); break; @@ -473,13 +473,13 @@ namespace ZThread { // Ignore interruption here, it can only come from // another thread interrupt()ing the executor. The - // thread was interrupted in the hopes it was busy + // thread was interrupted in the hopes it was busy // with a task } } - + // Interrupt the thread running the tasks when the generation // does not match the current generation if( task->generation() != _waitingQueue.generation() ) @@ -512,31 +512,31 @@ namespace ZThread { class Worker : public Runnable { CountedPtr< ExecutorImpl > _impl; - + public: - + //! Create a Worker that draws upon the given Queue - Worker(const CountedPtr< ExecutorImpl >& impl) + Worker(const CountedPtr< ExecutorImpl >& impl) : _impl(impl) { } - + //! Run until Thread or Queue are canceled - void run() { - + void run() { + _impl->registerThread(); - + // Run until the Queue is canceled while(!Thread::canceled()) { - + // Draw tasks from the queue ExecutorTask task( _impl->next() ); task->run(); - - } - + + } + _impl->unregisterThread(); - + } - + }; /* Worker */ @@ -547,10 +547,10 @@ namespace ZThread { public: - Shutdown(const CountedPtr< ExecutorImpl >& impl) + Shutdown(const CountedPtr< ExecutorImpl >& impl) : _impl(impl) { } - - void run() { + + void run() { _impl->cancel(); } @@ -560,35 +560,35 @@ namespace ZThread { PoolExecutor::PoolExecutor(size_t n) : _impl( new ExecutorImpl() ), _shutdown( new Shutdown(_impl) ) { - + size(n); - + // Request cancelation when main() exits ThreadQueue::instance()->insertShutdownTask(_shutdown); } - PoolExecutor::~PoolExecutor() { + PoolExecutor::~PoolExecutor() { try { - + /** * If the shutdown task for this executor has not already been * selected to run, then run it locally */ - if(ThreadQueue::instance()->removeShutdownTask(_shutdown)) + if(ThreadQueue::instance()->removeShutdownTask(_shutdown)) _shutdown->run(); - + } catch(...) { } - } + } void PoolExecutor::interrupt() { _impl->interrupt(); } void PoolExecutor::size(size_t n) { - + if(n < 1) throw InvalidOp_Exception(); @@ -604,26 +604,26 @@ namespace ZThread { void PoolExecutor::execute(const Task& task) { - // Enqueue the task, the Queue will reject it with a + // Enqueue the task, the Queue will reject it with a // Cancelation_Exception if the Executor has been canceled - _impl->execute(task); + _impl->execute(task); } void PoolExecutor::cancel() { - _impl->cancel(); + _impl->cancel(); } bool PoolExecutor::isCanceled() { - return _impl->isCanceled(); + return _impl->isCanceled(); } - - void PoolExecutor::wait() { + + void PoolExecutor::wait() { _impl->wait(0); } bool PoolExecutor::wait(unsigned long timeout) { - return _impl->wait(timeout == 0 ? 1 : timeout); + return _impl->wait(timeout == 0 ? 1 : timeout); } } |