diff options
author | Neo2003 <none@none> | 2008-10-02 16:23:55 -0500 |
---|---|---|
committer | Neo2003 <none@none> | 2008-10-02 16:23:55 -0500 |
commit | 9b1c0e006f20091f28f3f468cfcab1feb51286bd (patch) | |
tree | b5d1ba94a656e6679f8737f9ea6bed1239b73b14 /dep/include/zthread/Barrier.h |
[svn] * Proper SVN structureinit
--HG--
branch : trunk
Diffstat (limited to 'dep/include/zthread/Barrier.h')
-rw-r--r-- | dep/include/zthread/Barrier.h | 328 |
1 files changed, 328 insertions, 0 deletions
diff --git a/dep/include/zthread/Barrier.h b/dep/include/zthread/Barrier.h new file mode 100644 index 00000000000..6aaafa93678 --- /dev/null +++ b/dep/include/zthread/Barrier.h @@ -0,0 +1,328 @@ +/* + * Copyright (c) 2005, Eric Crahen + * + * Permission is hereby granted, free of charge, to any person obtaining a copy + * of this software and associated documentation files (the "Software"), to deal + * in the Software without restriction, including without limitation the rights + * to use, copy, modify, merge, publish, distribute, sublicense, and/or sell + * copies of the Software, and to permit persons to whom the Software is furnished + * to do so, subject to the following conditions: + * + * The above copyright notice and this permission notice shall be included in all + * copies or substantial portions of the Software. + * + * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR + * IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, + * FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE + * AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER LIABILITY, + * WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, OUT OF OR IN + * CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE SOFTWARE. + * + */ + +#ifndef __ZTBARRIER_H__ +#define __ZTBARRIER_H__ + +#include "zthread/Condition.h" +#include "zthread/Guard.h" +#include "zthread/Waitable.h" +#include "zthread/Runnable.h" + +namespace ZThread { + + /** + * @class Barrier + * @author Eric Crahen <http://www.code-foo.com> + * @date <2003-07-16T09:54:01-0400> + * @version 2.2.1 + * + * A Barrier is a Waitable object that serves as synchronization points for + * a set of threads. A Barrier is constructed for a fixed number (<i>N</i>) of threads. + * Threads attempting to wait() on a Barrier (<i> 1 - N</i>) will block until the <i>N</i>th + * thread arrives. The <i>N</i>th thread will awaken all the the others. + * + * An optional Runnable command may be associated with the Barrier. This will be run() + * when the <i>N</i>th thread arrives and Barrier is not broken. + * + * <b>Error Checking</b> + * + * A Barrier uses an all-or-nothing. All threads involved must successfully + * meet at Barrier. If any one of those threads leaves before all the threads + * have (as the result of an error or exception) then all threads present at + * the Barrier will throw BrokenBarrier_Exception. + * + * A broken Barrier will cause all threads attempting to wait() on it to + * throw a BrokenBarrier_Exception. + * + * A Barrier will remain 'broken', until it is manually reset(). + */ + template <unsigned int Count, class LockType> + class Barrier : public Waitable, private NonCopyable { + + //! Broken flag + bool _broken; + //! Task flag + bool _haveTask; + //! Thread count + unsigned int _count; + //! Wait generation + unsigned int _generation; + //! Serialize access + LockType _lock; + //! Signaled when all thread arrive + Condition _arrived; + //! Command to run when all the thread arrive + Task _task; + + public: + + //! Create a Barrier + Barrier() + : _broken(false), _haveTask(false), _count(Count), _generation(0), _arrived(_lock), _task(0) { } + + /** + * Create a Barrier that executes the given task when all threads arrive + * without error + * + * @param task Task to associate with this Barrier + */ + Barrier(const Task& task) + : _broken(false), _haveTask(true), _count(Count), _generation(0), _arrived(_lock), + _task(task) { } + + //! Destroy this Barrier + virtual ~Barrier() {} + + /** + * Enter barrier and wait for the other threads to arrive. This can block for an indefinite + * amount of time. + * + * @exception BrokenBarrier_Exception thrown when any thread has left a wait on this + * Barrier as a result of an error. + * @exception Interrupted_Exception thrown when the calling thread is interrupted. + * A thread may be interrupted at any time, prematurely ending a wait + * for one thread and breaking the barrier for all threads + * + * @see Waitable::wait() + * + * @post If no exception was thrown, all threads have successfully arrived + * @post If an exception was thrown, the barrier is broken + */ + virtual void wait() { + + Guard<LockType> g(_lock); + + if(_broken) + throw BrokenBarrier_Exception(); + + // Break the barrier if an arriving thread is interrupted + if(Thread::interrupted()) { + + // Release the other waiter, propagate the exception + _arrived.broadcast(); + _broken = true; + + throw Interrupted_Exception(); + + } + + if(--_count == 0) { + + // Wake the other threads if this was the last + // arriving thread + _arrived.broadcast(); + + // Try to run the associated task, if it throws then + // break the barrier and propagate the exception + try { + + if(_task) + _task->run(); + + _generation++; + + } catch(Synchronization_Exception&) { + + _broken = true; + throw; + + } catch(...) { assert(0); } + + } else { + + int myGeneration = _generation; + + try { + + // Wait for the other threads to arrive + _arrived.wait(); + + } catch(Interrupted_Exception&) { + + // Its possible for a thread to be interrupted before the + // last thread arrives. If the interrupted thread hasn't + // resumed, then just propagate the interruption + + if(myGeneration != _generation) + Thread().interrupt(); + + else _broken = true; + + } catch(Synchronization_Exception&) { + + // Break the barrier and propagate the exception + _broken = true; + throw; + + } + + // If the thread woke because it was notified by the thread + // that broke the barrier, throw. + if(_broken) + throw BrokenBarrier_Exception(); + + } + + } + + /** + * Enter barrier and wait for the other threads to arrive. This can block up to the + * amount of time specified with the timeout parameter. The barrier will not break + * if a thread leaves this function due to a timeout. + * + * @param timeout maximum amount of time, in milliseconds, to wait before + * + * @return + * - <em>true</em> if the set of tasks being wait for complete before + * <i>timeout</i> milliseconds elapse. + * - <em>false</em> otherwise. + * + * @exception BrokenBarrier_Exception thrown when any thread has left a wait on this + * Barrier as a result of an error. + * @exception Interrupted_Exception thrown when the calling thread is interrupted. + * A thread may be interrupted at any time, prematurely ending a wait + * for one thread and breaking the barrier for all threads + * + * @see Waitable::wait(unsigned long timeout) + * + * @post If no exception was thrown, all threads have successfully arrived + * @post If an exception was thrown, the barrier is broken + */ + virtual bool wait(unsigned long timeout) { + + Guard<LockType> g(_lock); + + if(_broken) + throw BrokenBarrier_Exception(); + + // Break the barrier if an arriving thread is interrupted + if(Thread::interrupted()) { + + // Release the other waiter, propagate the exception + _arrived.broadcast(); + _broken = true; + + throw Interrupted_Exception(); + + } + + + if(--_count == 0) { + + // Wake the other threads if this was the last + // arriving thread + _arrived.broadcast(); + + // Try to run the associated task, if it throws then + // break the barrier and propagate the exception + try { + + if(_task) + _task->run(); + + _generation++; + + } catch(Synchronization_Exception&) { + + _broken = true; + throw; + + } catch(...) { assert(0); } + + } else { + + int myGeneration = _generation; + + try { + + // Wait for the other threads to arrive + if(!_arrived.wait(timeout)) + _broken = true; + + } catch(Interrupted_Exception&) { + + // Its possible for a thread to be interrupted before the + // last thread arrives. If the interrupted thread hasn't + // resumed, then just propagate the interruption + + if(myGeneration != _generation) + Thread().interrupt(); + + else _broken = true; + + } catch(Synchronization_Exception&) { + + // Break the barrier and propagate the exception + _broken = true; + throw; + + } + + // If the thread woke because it was notified by the thread + // that broke the barrier, throw. + if(_broken) + throw BrokenBarrier_Exception(); + + } + + return true; + + } + + /** + * Break the Barrier ending the wait for any threads that were waiting on + * the barrier. + * + * @post the Barrier is broken, all waiting threads will throw the + * BrokenBarrier_Exception + */ + void shatter() { + + Guard<LockType> g(_lock); + + _broken = true; + _arrived.broadcast(); + + } + + /** + * Reset the Barrier. + * + * @post the Barrier is no longer Broken and can be used again. + */ + void reset() { + + Guard<LockType> g(_lock); + + _broken = false; + _generation++; + _count = Count; + + } + + }; + + +} // namespace ZThread + +#endif // __ZTBARRIER_H__ |