aboutsummaryrefslogtreecommitdiff
path: root/dep/src/zthread/ThreadedExecutor.cxx
diff options
context:
space:
mode:
Diffstat (limited to 'dep/src/zthread/ThreadedExecutor.cxx')
-rw-r--r--dep/src/zthread/ThreadedExecutor.cxx130
1 files changed, 65 insertions, 65 deletions
diff --git a/dep/src/zthread/ThreadedExecutor.cxx b/dep/src/zthread/ThreadedExecutor.cxx
index 44a213e8daa..e33dda4ab5e 100644
--- a/dep/src/zthread/ThreadedExecutor.cxx
+++ b/dep/src/zthread/ThreadedExecutor.cxx
@@ -31,11 +31,11 @@ namespace ZThread {
namespace {
- //!
+ //!
class WaiterQueue {
typedef std::deque<ThreadImpl*> ThreadList;
-
+
typedef struct group_t {
size_t id;
size_t count;
@@ -61,14 +61,14 @@ namespace ZThread {
void operator()(const Group& grp) { count += grp.count; }
operator size_t() { return count; }
};
-
+
FastMutex _lock;
GroupList _list;
size_t _id;
size_t _generation;
public:
-
+
WaiterQueue() : _id(0), _generation(0) {
// At least one empty-group exists
_list.push_back( Group(_id++) );
@@ -96,15 +96,15 @@ namespace ZThread {
if((size_t)std::for_each(_list.begin(), _list.end(), counter()) < 1)
return true;
- // Update the waiter list for the active group
+ // Update the waiter list for the active group
_list.back().waiters.push_back(self);
size_t n = _list.back().id;
m.acquire();
-
+
{
-
- Guard<Lockable, UnlockedScope> g2(g1);
+
+ Guard<Lockable, UnlockedScope> g2(g1);
state = timeout == 0 ? m.wait() : m.wait(timeout);
}
@@ -135,22 +135,22 @@ namespace ZThread {
throw Interrupted_Exception();
default:
throw Synchronization_Exception();
- }
-
+ }
+
return true;
}
-
+
/**
* Increment the active group count
*
* @pre at least 1 empty group exists
- * @post at least 1 non-empty group exists
+ * @post at least 1 non-empty group exists
*/
std::pair<size_t, size_t> increment() {
-
+
Guard<FastMutex> g(_lock);
-
+
// At least one empty-group exists
assert(!_list.empty());
@@ -169,7 +169,7 @@ namespace ZThread {
// When the active group is being incremented, insert a new active group
// to replace it if there were waiting threads
- if(i == --_list.end() && !i->waiters.empty())
+ if(i == --_list.end() && !i->waiters.empty())
_list.push_back(Group(_id++));
// At least 1 non-empty group exists
@@ -178,13 +178,13 @@ namespace ZThread {
return std::make_pair(n, _generation);
}
-
+
/**
* Decrease the count for the group with the given id.
*
* @param n group id
- *
+ *
* @pre At least 1 non-empty group exists
* @post At least 1 empty group exists
*/
@@ -198,7 +198,7 @@ namespace ZThread {
// Find the requested group
GroupList::iterator i = std::find_if(_list.begin(), _list.end(), by_id(n));
if(i == _list.end()) {
-
+
// A group should never have been removed until
// the final task in that group completed
assert(0);
@@ -207,11 +207,11 @@ namespace ZThread {
// Decrease the count for tasks in this group,
if(--i->count == 0 && i == _list.begin()) {
-
- do {
+
+ do {
// When the first group completes, wake all waiters for every
- // group, starting from the first until a group that is not
+ // group, starting from the first until a group that is not
// complete is reached
/*
@@ -219,29 +219,29 @@ namespace ZThread {
if(i == --_list.end() && i->waiters.empty())
break;
*/
-
+
if( awaken(*i) ) {
-
- // If all waiters were awakened, remove the group
+
+ // If all waiters were awakened, remove the group
i = _list.erase(i);
-
+
} else {
-
+
{
// Otherwise, unlock and yield allowing the waiter
// lists to be updated if other threads are busy
Guard<FastLock, UnlockedScope> g2(g1);
ThreadImpl::yield();
-
+
}
i = _list.begin();
}
-
- } while(i != _list.end() && i->count == 0);
-
+
+ } while(i != _list.end() && i->count == 0);
+
// Ensure that an active group exists
if(_list.empty())
_list.push_back( Group(++_id) );
@@ -250,7 +250,7 @@ namespace ZThread {
// At least one group exists
assert(!_list.empty());
-
+
}
/**
@@ -261,74 +261,74 @@ namespace ZThread {
return next ? _generation++ : _generation;
}
-
+
private:
-
+
/**
* Awaken all the waiters remaining in the given group
- *
+ *
* @return
* - true if all the waiting threads were successfully awakened.
* - false if there were one or more threads that could not be awakened.
*/
bool awaken(Group& grp) {
- // Go through the waiter list in the given group;
+ // Go through the waiter list in the given group;
for(ThreadList::iterator i = grp.waiters.begin(); i != grp.waiters.end();) {
-
+
ThreadImpl* impl = *i;
Monitor& m = impl->getMonitor();
-
+
// Try the monitor lock, if it cant be locked skip to the next waiter
if(m.tryAcquire()) {
-
+
// Notify the monitor & remove from the waiter list so time isn't
// wasted checking it again.
i = grp.waiters.erase(i);
-
+
// Try to wake the waiter, it doesn't matter if this is successful
- // or not (only fails when the monitor is already going to stop waiting).
+ // or not (only fails when the monitor is already going to stop waiting).
m.notify();
m.release();
-
+
} else ++i;
-
+
}
-
+
return grp.waiters.empty();
}
};
- //! Synchronization point for the Executor
+ //! Synchronization point for the Executor
class ExecutorImpl {
typedef std::deque<ThreadImpl*> ThreadList;
bool _canceled;
- FastMutex _lock;
+ FastMutex _lock;
//! Worker threads
ThreadList _threads;
-
+
WaiterQueue _queue;
public:
ExecutorImpl() : _canceled(false) {}
- WaiterQueue& getWaiterQueue() {
+ WaiterQueue& getWaiterQueue() {
return _queue;
}
void registerThread(size_t generation) {
-
+
// Interrupt slow starting threads
if(getWaiterQueue().generation() != generation)
ThreadImpl::current()->interrupt();
- // Enqueue for possible future interrupt()
+ // Enqueue for possible future interrupt()
else {
Guard<FastMutex> g(_lock);
@@ -339,7 +339,7 @@ namespace ZThread {
}
void unregisterThread() {
-
+
Guard<FastMutex> g(_lock);
std::remove(_threads.begin(), _threads.end(), ThreadImpl::current() );
@@ -369,11 +369,11 @@ namespace ZThread {
// Interrupt all the registered threads
for(ThreadList::iterator i = _threads.begin(); i != _threads.end(); ++i)
(*i)->interrupt();
-
+
// Bump the generation up, ensuring slow starting threads get this interrupt
getWaiterQueue().generation( true );
- }
+ }
}; /* ExecutorImpl */
@@ -392,7 +392,7 @@ namespace ZThread {
: _impl(impl), _task(task) {
std::pair<size_t, size_t> pr( _impl->getWaiterQueue().increment() );
-
+
_group = pr.first;
_generation = pr.second;
@@ -405,20 +405,20 @@ namespace ZThread {
size_t generation() const {
return _generation;
}
-
+
void run() {
-
+
// Register this thread once its begun; the generation is used to ensure
// threads that are slow starting are properly interrupted
_impl->registerThread( generation() );
-
+
try {
- _task->run();
+ _task->run();
} catch(...) {
/* consume the exceptions the work propogates */
}
-
+
_impl->getWaiterQueue().decrement( group() );
// Unregister this thread
@@ -434,30 +434,30 @@ namespace ZThread {
ThreadedExecutor::ThreadedExecutor() : _impl(new ExecutorImpl) {}
ThreadedExecutor::~ThreadedExecutor() {}
-
+
void ThreadedExecutor::execute(const Task& task) {
-
+
Thread t( new Worker(_impl, task) );
- }
+ }
void ThreadedExecutor::interrupt() {
_impl->interrupt();
}
void ThreadedExecutor::cancel() {
- _impl->cancel();
+ _impl->cancel();
}
-
+
bool ThreadedExecutor::isCanceled() {
return _impl->isCanceled();
}
-
+
void ThreadedExecutor::wait() {
_impl->getWaiterQueue().wait(0);
}
- bool ThreadedExecutor::wait(unsigned long timeout) {
+ bool ThreadedExecutor::wait(unsigned long timeout) {
return _impl->getWaiterQueue().wait(timeout == 0 ? 1 : timeout);
}