aboutsummaryrefslogtreecommitdiff
path: root/dep/src/zthread/PoolExecutor.cxx
diff options
context:
space:
mode:
Diffstat (limited to 'dep/src/zthread/PoolExecutor.cxx')
-rw-r--r--dep/src/zthread/PoolExecutor.cxx204
1 files changed, 102 insertions, 102 deletions
diff --git a/dep/src/zthread/PoolExecutor.cxx b/dep/src/zthread/PoolExecutor.cxx
index cf84e145453..82ace996035 100644
--- a/dep/src/zthread/PoolExecutor.cxx
+++ b/dep/src/zthread/PoolExecutor.cxx
@@ -42,7 +42,7 @@ namespace ZThread {
class WaiterQueue {
typedef std::deque<ThreadImpl*> ThreadList;
-
+
typedef struct group_t {
size_t id;
size_t count;
@@ -68,14 +68,14 @@ namespace ZThread {
void operator()(const Group& grp) { count += grp.count; }
operator size_t() { return count; }
};
-
+
FastMutex _lock;
GroupList _list;
size_t _id;
size_t _generation;
public:
-
+
WaiterQueue() : _id(0), _generation(0) {
// At least one empty-group exists
_list.push_back( Group(_id++) );
@@ -89,7 +89,7 @@ namespace ZThread {
*/
bool wait(unsigned long timeout) {
- ThreadImpl* current = ThreadImpl::current();
+ ThreadImpl* current = ThreadImpl::current();
Monitor& m = current->getMonitor();
Monitor::STATE state;
@@ -103,15 +103,15 @@ namespace ZThread {
if((size_t)std::for_each(_list.begin(), _list.end(), counter()) < 1)
return true;
- // Update the waiter list for the active group
+ // Update the waiter list for the active group
_list.back().waiters.push_back(current);
size_t n = _list.back().id;
m.acquire();
-
+
{
- Guard<FastMutex, UnlockedScope> g2(g1);
+ Guard<FastMutex, UnlockedScope> g2(g1);
state = timeout == 0 ? m.wait() : m.wait(timeout);
}
@@ -142,22 +142,22 @@ namespace ZThread {
throw Interrupted_Exception();
default:
throw Synchronization_Exception();
- }
-
+ }
+
return true;
}
-
+
/**
* Increment the active group count
*
* @pre at least 1 empty group exists
- * @post at least 1 non-empty group exists
+ * @post at least 1 non-empty group exists
*/
std::pair<size_t, size_t> increment() {
-
+
Guard<FastMutex> g(_lock);
-
+
// At least one empty-group exists
assert(!_list.empty());
@@ -176,7 +176,7 @@ namespace ZThread {
// When the active group is being incremented, insert a new active group
// to replace it if there were waiting threads
- if(i == --_list.end() && !i->waiters.empty())
+ if(i == --_list.end() && !i->waiters.empty())
_list.push_back(Group(_id++));
// At least 1 non-empty group exists
@@ -185,27 +185,27 @@ namespace ZThread {
return std::make_pair(n, _generation);
}
-
+
/**
* Decrease the count for the group with the given id.
*
* @param n group id
- *
+ *
* @pre At least 1 non-empty group exists
* @post At least 1 empty group exists
*/
void decrement(size_t n) {
-
+
Guard<FastMutex> g1(_lock);
-
+
// At least 1 non-empty group exists
assert((size_t)std::for_each(_list.begin(), _list.end(), counter()) > 0);
// Find the requested group
GroupList::iterator i = std::find_if(_list.begin(), _list.end(), by_id(n));
if(i == _list.end()) {
-
+
// A group should never have been removed until
// the final task in that group completed
assert(0);
@@ -214,11 +214,11 @@ namespace ZThread {
// Decrease the count for tasks in this group,
if(--i->count == 0 && i == _list.begin()) {
-
- do {
+
+ do {
// When the first group completes, wake all waiters for every
- // group, starting from the first until a group that is not
+ // group, starting from the first until a group that is not
// complete is reached
/*
@@ -226,29 +226,29 @@ namespace ZThread {
if(i == --_list.end() && i->waiters.empty())
break;
*/
-
+
if( awaken(*i) ) {
-
- // If all waiters were awakened, remove the group
+
+ // If all waiters were awakened, remove the group
i = _list.erase(i);
-
+
} else {
-
+
{
// Otherwise, unlock and yield allowing the waiter
// lists to be updated if other threads are busy
Guard<FastLock, UnlockedScope> g2(g1);
ThreadImpl::yield();
-
+
}
i = _list.begin();
}
-
- } while(i != _list.end() && i->count == 0);
-
+
+ } while(i != _list.end() && i->count == 0);
+
// Ensure that an active group exists
if(_list.empty())
_list.push_back( Group(++_id) );
@@ -257,7 +257,7 @@ namespace ZThread {
// At least one group exists
assert(!_list.empty());
-
+
}
/**
@@ -268,40 +268,40 @@ namespace ZThread {
return next ? _generation++ : _generation;
}
-
+
private:
-
+
/**
* Awaken all the waiters remaining in the given group
- *
+ *
* @return
* - true if all the waiting threads were successfully awakened.
* - false if there were one or more threads that could not be awakened.
*/
bool awaken(Group& grp) {
- // Go through the waiter list in the given group;
+ // Go through the waiter list in the given group;
for(ThreadList::iterator i = grp.waiters.begin(); i != grp.waiters.end();) {
-
+
ThreadImpl* impl = *i;
Monitor& m = impl->getMonitor();
-
+
// Try the monitor lock, if it cant be locked skip to the next waiter
if(m.tryAcquire()) {
-
+
// Notify the monitor & remove from the waiter list so time isn't
// wasted checking it again.
i = grp.waiters.erase(i);
-
+
// Try to wake the waiter, it doesn't matter if this is successful
- // or not (only fails when the monitor is already going to stop waiting).
+ // or not (only fails when the monitor is already going to stop waiting).
m.notify();
m.release();
-
+
} else ++i;
-
+
}
-
+
return grp.waiters.empty();
}
@@ -310,13 +310,13 @@ namespace ZThread {
/**
* @class GroupedRunnable
- *
- * Wrap a task with group and generation information.
+ *
+ * Wrap a task with group and generation information.
*
* - 'group' allows tasks to be grouped together so that lists of waiting
* threads can be managed.
*
- * - 'generation' allows tasks to be interrupted
+ * - 'generation' allows tasks to be interrupted
*/
class GroupedRunnable : public Runnable {
@@ -329,10 +329,10 @@ namespace ZThread {
public:
GroupedRunnable(const Task& task, WaiterQueue& queue)
- : _task(task), _queue(queue) {
-
+ : _task(task), _queue(queue) {
+
std::pair<size_t, size_t> pr( _queue.increment() );
-
+
_group = pr.first;
_generation = pr.second;
@@ -368,10 +368,10 @@ namespace ZThread {
*
*/
class ExecutorImpl {
-
+
typedef MonitoredQueue<ExecutorTask, FastMutex> TaskQueue;
typedef std::deque<ThreadImpl*> ThreadList;
-
+
TaskQueue _taskQueue;
WaiterQueue _waitingQueue;
@@ -380,19 +380,19 @@ namespace ZThread {
public:
-
+
ExecutorImpl() : _size(0) {}
void registerThread() {
-
+
Guard<TaskQueue> g(_taskQueue);
ThreadImpl* impl = ThreadImpl::current();
_threads.push_back(impl);
// current cancel if too many threads are being created
- if(_threads.size() > _size)
+ if(_threads.size() > _size)
impl->cancel();
}
@@ -408,14 +408,14 @@ namespace ZThread {
// Wrap the task with a grouped task
GroupedRunnable* runnable = new GroupedRunnable(task, _waitingQueue);
-
+
try {
-
+
_taskQueue.add( ExecutorTask(runnable) );
} catch(...) {
- // Incase the queue is canceled between the time the WaiterQueue is
+ // Incase the queue is canceled between the time the WaiterQueue is
// updated and the task is added to the TaskQueue
_waitingQueue.decrement( runnable->group() );
throw;
@@ -430,41 +430,41 @@ namespace ZThread {
_waitingQueue.generation(true);
Guard<TaskQueue> g(_taskQueue);
-
+
// Interrupt all threads currently running, thier tasks would be
// from an older generation
for(ThreadList::iterator i = _threads.begin(); i != _threads.end(); ++i)
(*i)->interrupt();
-
+
}
//! Adjust the number of desired workers and return the number of Threads needed
size_t workers(size_t n) {
-
+
Guard<TaskQueue> g(_taskQueue);
size_t m = (_size < n) ? (n - _size) : 0;
_size = n;
-
+
return m;
}
-
+
size_t workers() {
-
+
Guard<TaskQueue> g(_taskQueue);
return _size;
-
+
}
-
+
ExecutorTask next() {
-
+
ExecutorTask task;
-
+
// Draw the task from the queue
for(;;) {
- try {
+ try {
task = _taskQueue.next();
break;
@@ -473,13 +473,13 @@ namespace ZThread {
// Ignore interruption here, it can only come from
// another thread interrupt()ing the executor. The
- // thread was interrupted in the hopes it was busy
+ // thread was interrupted in the hopes it was busy
// with a task
}
}
-
+
// Interrupt the thread running the tasks when the generation
// does not match the current generation
if( task->generation() != _waitingQueue.generation() )
@@ -512,31 +512,31 @@ namespace ZThread {
class Worker : public Runnable {
CountedPtr< ExecutorImpl > _impl;
-
+
public:
-
+
//! Create a Worker that draws upon the given Queue
- Worker(const CountedPtr< ExecutorImpl >& impl)
+ Worker(const CountedPtr< ExecutorImpl >& impl)
: _impl(impl) { }
-
+
//! Run until Thread or Queue are canceled
- void run() {
-
+ void run() {
+
_impl->registerThread();
-
+
// Run until the Queue is canceled
while(!Thread::canceled()) {
-
+
// Draw tasks from the queue
ExecutorTask task( _impl->next() );
task->run();
-
- }
-
+
+ }
+
_impl->unregisterThread();
-
+
}
-
+
}; /* Worker */
@@ -547,10 +547,10 @@ namespace ZThread {
public:
- Shutdown(const CountedPtr< ExecutorImpl >& impl)
+ Shutdown(const CountedPtr< ExecutorImpl >& impl)
: _impl(impl) { }
-
- void run() {
+
+ void run() {
_impl->cancel();
}
@@ -560,35 +560,35 @@ namespace ZThread {
PoolExecutor::PoolExecutor(size_t n)
: _impl( new ExecutorImpl() ), _shutdown( new Shutdown(_impl) ) {
-
+
size(n);
-
+
// Request cancelation when main() exits
ThreadQueue::instance()->insertShutdownTask(_shutdown);
}
- PoolExecutor::~PoolExecutor() {
+ PoolExecutor::~PoolExecutor() {
try {
-
+
/**
* If the shutdown task for this executor has not already been
* selected to run, then run it locally
*/
- if(ThreadQueue::instance()->removeShutdownTask(_shutdown))
+ if(ThreadQueue::instance()->removeShutdownTask(_shutdown))
_shutdown->run();
-
+
} catch(...) { }
- }
+ }
void PoolExecutor::interrupt() {
_impl->interrupt();
}
void PoolExecutor::size(size_t n) {
-
+
if(n < 1)
throw InvalidOp_Exception();
@@ -604,26 +604,26 @@ namespace ZThread {
void PoolExecutor::execute(const Task& task) {
- // Enqueue the task, the Queue will reject it with a
+ // Enqueue the task, the Queue will reject it with a
// Cancelation_Exception if the Executor has been canceled
- _impl->execute(task);
+ _impl->execute(task);
}
void PoolExecutor::cancel() {
- _impl->cancel();
+ _impl->cancel();
}
bool PoolExecutor::isCanceled() {
- return _impl->isCanceled();
+ return _impl->isCanceled();
}
-
- void PoolExecutor::wait() {
+
+ void PoolExecutor::wait() {
_impl->wait(0);
}
bool PoolExecutor::wait(unsigned long timeout) {
- return _impl->wait(timeout == 0 ? 1 : timeout);
+ return _impl->wait(timeout == 0 ? 1 : timeout);
}
}