diff options
Diffstat (limited to 'dep/src/zthread/ThreadedExecutor.cxx')
-rw-r--r-- | dep/src/zthread/ThreadedExecutor.cxx | 130 |
1 files changed, 65 insertions, 65 deletions
diff --git a/dep/src/zthread/ThreadedExecutor.cxx b/dep/src/zthread/ThreadedExecutor.cxx index 44a213e8daa..e33dda4ab5e 100644 --- a/dep/src/zthread/ThreadedExecutor.cxx +++ b/dep/src/zthread/ThreadedExecutor.cxx @@ -31,11 +31,11 @@ namespace ZThread { namespace { - //! + //! class WaiterQueue { typedef std::deque<ThreadImpl*> ThreadList; - + typedef struct group_t { size_t id; size_t count; @@ -61,14 +61,14 @@ namespace ZThread { void operator()(const Group& grp) { count += grp.count; } operator size_t() { return count; } }; - + FastMutex _lock; GroupList _list; size_t _id; size_t _generation; public: - + WaiterQueue() : _id(0), _generation(0) { // At least one empty-group exists _list.push_back( Group(_id++) ); @@ -96,15 +96,15 @@ namespace ZThread { if((size_t)std::for_each(_list.begin(), _list.end(), counter()) < 1) return true; - // Update the waiter list for the active group + // Update the waiter list for the active group _list.back().waiters.push_back(self); size_t n = _list.back().id; m.acquire(); - + { - - Guard<Lockable, UnlockedScope> g2(g1); + + Guard<Lockable, UnlockedScope> g2(g1); state = timeout == 0 ? m.wait() : m.wait(timeout); } @@ -135,22 +135,22 @@ namespace ZThread { throw Interrupted_Exception(); default: throw Synchronization_Exception(); - } - + } + return true; } - + /** * Increment the active group count * * @pre at least 1 empty group exists - * @post at least 1 non-empty group exists + * @post at least 1 non-empty group exists */ std::pair<size_t, size_t> increment() { - + Guard<FastMutex> g(_lock); - + // At least one empty-group exists assert(!_list.empty()); @@ -169,7 +169,7 @@ namespace ZThread { // When the active group is being incremented, insert a new active group // to replace it if there were waiting threads - if(i == --_list.end() && !i->waiters.empty()) + if(i == --_list.end() && !i->waiters.empty()) _list.push_back(Group(_id++)); // At least 1 non-empty group exists @@ -178,13 +178,13 @@ namespace ZThread { return std::make_pair(n, _generation); } - + /** * Decrease the count for the group with the given id. * * @param n group id - * + * * @pre At least 1 non-empty group exists * @post At least 1 empty group exists */ @@ -198,7 +198,7 @@ namespace ZThread { // Find the requested group GroupList::iterator i = std::find_if(_list.begin(), _list.end(), by_id(n)); if(i == _list.end()) { - + // A group should never have been removed until // the final task in that group completed assert(0); @@ -207,11 +207,11 @@ namespace ZThread { // Decrease the count for tasks in this group, if(--i->count == 0 && i == _list.begin()) { - - do { + + do { // When the first group completes, wake all waiters for every - // group, starting from the first until a group that is not + // group, starting from the first until a group that is not // complete is reached /* @@ -219,29 +219,29 @@ namespace ZThread { if(i == --_list.end() && i->waiters.empty()) break; */ - + if( awaken(*i) ) { - - // If all waiters were awakened, remove the group + + // If all waiters were awakened, remove the group i = _list.erase(i); - + } else { - + { // Otherwise, unlock and yield allowing the waiter // lists to be updated if other threads are busy Guard<FastLock, UnlockedScope> g2(g1); ThreadImpl::yield(); - + } i = _list.begin(); } - - } while(i != _list.end() && i->count == 0); - + + } while(i != _list.end() && i->count == 0); + // Ensure that an active group exists if(_list.empty()) _list.push_back( Group(++_id) ); @@ -250,7 +250,7 @@ namespace ZThread { // At least one group exists assert(!_list.empty()); - + } /** @@ -261,74 +261,74 @@ namespace ZThread { return next ? _generation++ : _generation; } - + private: - + /** * Awaken all the waiters remaining in the given group - * + * * @return * - true if all the waiting threads were successfully awakened. * - false if there were one or more threads that could not be awakened. */ bool awaken(Group& grp) { - // Go through the waiter list in the given group; + // Go through the waiter list in the given group; for(ThreadList::iterator i = grp.waiters.begin(); i != grp.waiters.end();) { - + ThreadImpl* impl = *i; Monitor& m = impl->getMonitor(); - + // Try the monitor lock, if it cant be locked skip to the next waiter if(m.tryAcquire()) { - + // Notify the monitor & remove from the waiter list so time isn't // wasted checking it again. i = grp.waiters.erase(i); - + // Try to wake the waiter, it doesn't matter if this is successful - // or not (only fails when the monitor is already going to stop waiting). + // or not (only fails when the monitor is already going to stop waiting). m.notify(); m.release(); - + } else ++i; - + } - + return grp.waiters.empty(); } }; - //! Synchronization point for the Executor + //! Synchronization point for the Executor class ExecutorImpl { typedef std::deque<ThreadImpl*> ThreadList; bool _canceled; - FastMutex _lock; + FastMutex _lock; //! Worker threads ThreadList _threads; - + WaiterQueue _queue; public: ExecutorImpl() : _canceled(false) {} - WaiterQueue& getWaiterQueue() { + WaiterQueue& getWaiterQueue() { return _queue; } void registerThread(size_t generation) { - + // Interrupt slow starting threads if(getWaiterQueue().generation() != generation) ThreadImpl::current()->interrupt(); - // Enqueue for possible future interrupt() + // Enqueue for possible future interrupt() else { Guard<FastMutex> g(_lock); @@ -339,7 +339,7 @@ namespace ZThread { } void unregisterThread() { - + Guard<FastMutex> g(_lock); std::remove(_threads.begin(), _threads.end(), ThreadImpl::current() ); @@ -369,11 +369,11 @@ namespace ZThread { // Interrupt all the registered threads for(ThreadList::iterator i = _threads.begin(); i != _threads.end(); ++i) (*i)->interrupt(); - + // Bump the generation up, ensuring slow starting threads get this interrupt getWaiterQueue().generation( true ); - } + } }; /* ExecutorImpl */ @@ -392,7 +392,7 @@ namespace ZThread { : _impl(impl), _task(task) { std::pair<size_t, size_t> pr( _impl->getWaiterQueue().increment() ); - + _group = pr.first; _generation = pr.second; @@ -405,20 +405,20 @@ namespace ZThread { size_t generation() const { return _generation; } - + void run() { - + // Register this thread once its begun; the generation is used to ensure // threads that are slow starting are properly interrupted _impl->registerThread( generation() ); - + try { - _task->run(); + _task->run(); } catch(...) { /* consume the exceptions the work propogates */ } - + _impl->getWaiterQueue().decrement( group() ); // Unregister this thread @@ -434,30 +434,30 @@ namespace ZThread { ThreadedExecutor::ThreadedExecutor() : _impl(new ExecutorImpl) {} ThreadedExecutor::~ThreadedExecutor() {} - + void ThreadedExecutor::execute(const Task& task) { - + Thread t( new Worker(_impl, task) ); - } + } void ThreadedExecutor::interrupt() { _impl->interrupt(); } void ThreadedExecutor::cancel() { - _impl->cancel(); + _impl->cancel(); } - + bool ThreadedExecutor::isCanceled() { return _impl->isCanceled(); } - + void ThreadedExecutor::wait() { _impl->getWaiterQueue().wait(0); } - bool ThreadedExecutor::wait(unsigned long timeout) { + bool ThreadedExecutor::wait(unsigned long timeout) { return _impl->getWaiterQueue().wait(timeout == 0 ? 1 : timeout); } |