diff options
Diffstat (limited to 'dep/src/zthread/ThreadedExecutor.cxx')
-rw-r--r-- | dep/src/zthread/ThreadedExecutor.cxx | 464 |
1 files changed, 0 insertions, 464 deletions
diff --git a/dep/src/zthread/ThreadedExecutor.cxx b/dep/src/zthread/ThreadedExecutor.cxx deleted file mode 100644 index e33dda4ab5e..00000000000 --- a/dep/src/zthread/ThreadedExecutor.cxx +++ /dev/null @@ -1,464 +0,0 @@ -/* - * Copyright (c) 2005, Eric Crahen - * - * Permission is hereby granted, free of charge, to any person obtaining a copy - * of this software and associated documentation files (the "Software"), to deal - * in the Software without restriction, including without limitation the rights - * to use, copy, modify, merge, publish, distribute, sublicense, and/or sell - * copies of the Software, and to permit persons to whom the Software is furnished - * to do so, subject to the following conditions: - * - * The above copyright notice and this permission notice shall be included in all - * copies or substantial portions of the Software. - * - * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR - * IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, - * FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE - * AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER LIABILITY, - * WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, OUT OF OR IN - * CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE SOFTWARE. - * - */ - -#include "zthread/ThreadedExecutor.h" -#include "zthread/Guard.h" -#include "zthread/FastMutex.h" -#include "zthread/Time.h" - -#include "ThreadImpl.h" - -namespace ZThread { - - namespace { - - //! - class WaiterQueue { - - typedef std::deque<ThreadImpl*> ThreadList; - - typedef struct group_t { - size_t id; - size_t count; - ThreadList waiters; - group_t(size_t n) : id(n), count(0) {} - } Group; - - typedef std::deque<Group> GroupList; - - //! Predicate to find a specific group - struct by_id : public std::unary_function<bool, Group> { - size_t id; - by_id(size_t n) : id(n) {} - bool operator()(const Group& grp) { - return grp.id == id; - } - }; - - //! Functor to count groups - struct counter : public std::unary_function<void, Group> { - size_t count; - counter() : count(0) {} - void operator()(const Group& grp) { count += grp.count; } - operator size_t() { return count; } - }; - - FastMutex _lock; - GroupList _list; - size_t _id; - size_t _generation; - - public: - - WaiterQueue() : _id(0), _generation(0) { - // At least one empty-group exists - _list.push_back( Group(_id++) ); - } - - /** - * Insert the current thread into the current waiter list - * - * @pre At least one empty group exists - * @post At least one empty group exists - */ - bool wait(unsigned long timeout) { - - ThreadImpl* self = ThreadImpl::current(); - Monitor& m = self->getMonitor(); - - Monitor::STATE state; - - Guard<Lockable> g1(_lock); - - // At least one empty-group exists - assert(!_list.empty()); - - // Return w/o waiting if there are no executing tasks - if((size_t)std::for_each(_list.begin(), _list.end(), counter()) < 1) - return true; - - // Update the waiter list for the active group - _list.back().waiters.push_back(self); - size_t n = _list.back().id; - - m.acquire(); - - { - - Guard<Lockable, UnlockedScope> g2(g1); - state = timeout == 0 ? m.wait() : m.wait(timeout); - - } - - m.release(); - - // If awoke due to a reason other than the last task in the group 'n' completing, - // then then find the group 'self' is waiting in - GroupList::iterator i = std::find_if(_list.begin(), _list.end(), by_id(n)); - if(i != _list.end()) { - - // Remove 'self' from that list if it is still a member - ThreadList::iterator j = std::find(i->waiters.begin(), i->waiters.end(), self); - if(j != i->waiters.end()) - i->waiters.erase(j); - - } - - // At least one empty-group exists - assert(!_list.empty()); - - switch(state) { - case Monitor::SIGNALED: - break; - case Monitor::TIMEDOUT: - return false; - case Monitor::INTERRUPTED: - throw Interrupted_Exception(); - default: - throw Synchronization_Exception(); - } - - return true; - - } - - /** - * Increment the active group count - * - * @pre at least 1 empty group exists - * @post at least 1 non-empty group exists - */ - std::pair<size_t, size_t> increment() { - - Guard<FastMutex> g(_lock); - - // At least one empty-group exists - assert(!_list.empty()); - - GroupList::iterator i = --_list.end(); - size_t n = i->id; - - if(i == _list.end()) { - - // A group should never have been removed until - // the final task in that group completed - assert(0); - - } - - i->count++; - - // When the active group is being incremented, insert a new active group - // to replace it if there were waiting threads - if(i == --_list.end() && !i->waiters.empty()) - _list.push_back(Group(_id++)); - - // At least 1 non-empty group exists - assert((size_t)std::for_each(_list.begin(), _list.end(), counter()) > 0); - - return std::make_pair(n, _generation); - - } - - - /** - * Decrease the count for the group with the given id. - * - * @param n group id - * - * @pre At least 1 non-empty group exists - * @post At least 1 empty group exists - */ - void decrement(size_t n) { - - Guard<FastMutex> g1(_lock); - - // At least 1 non-empty group exists - assert((size_t)std::for_each(_list.begin(), _list.end(), counter()) > 0); - - // Find the requested group - GroupList::iterator i = std::find_if(_list.begin(), _list.end(), by_id(n)); - if(i == _list.end()) { - - // A group should never have been removed until - // the final task in that group completed - assert(0); - - } - - // Decrease the count for tasks in this group, - if(--i->count == 0 && i == _list.begin()) { - - do { - - // When the first group completes, wake all waiters for every - // group, starting from the first until a group that is not - // complete is reached - - /* - // Don't remove the empty active group - if(i == --_list.end() && i->waiters.empty()) - break; - */ - - if( awaken(*i) ) { - - // If all waiters were awakened, remove the group - i = _list.erase(i); - - } else { - - { - - // Otherwise, unlock and yield allowing the waiter - // lists to be updated if other threads are busy - Guard<FastLock, UnlockedScope> g2(g1); - ThreadImpl::yield(); - - } - - i = _list.begin(); - - } - - } while(i != _list.end() && i->count == 0); - - // Ensure that an active group exists - if(_list.empty()) - _list.push_back( Group(++_id) ); - - } - - // At least one group exists - assert(!_list.empty()); - - } - - /** - */ - size_t generation(bool next = false) { - - Guard<FastMutex> g(_lock); - return next ? _generation++ : _generation; - - } - - private: - - /** - * Awaken all the waiters remaining in the given group - * - * @return - * - true if all the waiting threads were successfully awakened. - * - false if there were one or more threads that could not be awakened. - */ - bool awaken(Group& grp) { - - // Go through the waiter list in the given group; - for(ThreadList::iterator i = grp.waiters.begin(); i != grp.waiters.end();) { - - ThreadImpl* impl = *i; - Monitor& m = impl->getMonitor(); - - // Try the monitor lock, if it cant be locked skip to the next waiter - if(m.tryAcquire()) { - - // Notify the monitor & remove from the waiter list so time isn't - // wasted checking it again. - i = grp.waiters.erase(i); - - // Try to wake the waiter, it doesn't matter if this is successful - // or not (only fails when the monitor is already going to stop waiting). - m.notify(); - m.release(); - - } else ++i; - - } - - return grp.waiters.empty(); - - } - - }; - - //! Synchronization point for the Executor - class ExecutorImpl { - - typedef std::deque<ThreadImpl*> ThreadList; - - bool _canceled; - FastMutex _lock; - - //! Worker threads - ThreadList _threads; - - WaiterQueue _queue; - - public: - - ExecutorImpl() : _canceled(false) {} - - WaiterQueue& getWaiterQueue() { - return _queue; - } - - void registerThread(size_t generation) { - - // Interrupt slow starting threads - if(getWaiterQueue().generation() != generation) - ThreadImpl::current()->interrupt(); - - // Enqueue for possible future interrupt() - else { - - Guard<FastMutex> g(_lock); - _threads.push_back( ThreadImpl::current() ); - - } - - } - - void unregisterThread() { - - Guard<FastMutex> g(_lock); - std::remove(_threads.begin(), _threads.end(), ThreadImpl::current() ); - - } - - void cancel() { - - Guard<FastMutex> g(_lock); - _canceled = true; - - } - - bool isCanceled() { - - if(_canceled) - return true; - - Guard<FastMutex> g(_lock); - return _canceled; - - } - - void interrupt() { - - Guard<FastMutex> g(_lock); - - // Interrupt all the registered threads - for(ThreadList::iterator i = _threads.begin(); i != _threads.end(); ++i) - (*i)->interrupt(); - - // Bump the generation up, ensuring slow starting threads get this interrupt - getWaiterQueue().generation( true ); - - } - - }; /* ExecutorImpl */ - - //! Wrap a generation and a group around a task - class Worker : public Runnable { - - CountedPtr< ExecutorImpl > _impl; - Task _task; - - size_t _generation; - size_t _group; - - public: - - Worker(const CountedPtr< ExecutorImpl >& impl, const Task& task) - : _impl(impl), _task(task) { - - std::pair<size_t, size_t> pr( _impl->getWaiterQueue().increment() ); - - _group = pr.first; - _generation = pr.second; - - } - - size_t group() const { - return _group; - } - - size_t generation() const { - return _generation; - } - - void run() { - - // Register this thread once its begun; the generation is used to ensure - // threads that are slow starting are properly interrupted - - _impl->registerThread( generation() ); - - try { - _task->run(); - } catch(...) { - /* consume the exceptions the work propogates */ - } - - _impl->getWaiterQueue().decrement( group() ); - - // Unregister this thread - - _impl->unregisterThread(); - - } - - }; /* Worker */ - - } - - ThreadedExecutor::ThreadedExecutor() : _impl(new ExecutorImpl) {} - - ThreadedExecutor::~ThreadedExecutor() {} - - void ThreadedExecutor::execute(const Task& task) { - - Thread t( new Worker(_impl, task) ); - - } - - void ThreadedExecutor::interrupt() { - _impl->interrupt(); - } - - void ThreadedExecutor::cancel() { - _impl->cancel(); - } - - bool ThreadedExecutor::isCanceled() { - return _impl->isCanceled(); - } - - void ThreadedExecutor::wait() { - _impl->getWaiterQueue().wait(0); - } - - bool ThreadedExecutor::wait(unsigned long timeout) { - return _impl->getWaiterQueue().wait(timeout == 0 ? 1 : timeout); - } - -} |