summaryrefslogtreecommitdiff
path: root/libs/audiographer/audiographer/general/threader.h
diff options
context:
space:
mode:
Diffstat (limited to 'libs/audiographer/audiographer/general/threader.h')
-rw-r--r--libs/audiographer/audiographer/general/threader.h30
1 files changed, 15 insertions, 15 deletions
diff --git a/libs/audiographer/audiographer/general/threader.h b/libs/audiographer/audiographer/general/threader.h
index e9a953ce44..2ef4099efe 100644
--- a/libs/audiographer/audiographer/general/threader.h
+++ b/libs/audiographer/audiographer/general/threader.h
@@ -39,7 +39,7 @@ class /*LIBAUDIOGRAPHER_API*/ Threader : public Source<T>, public Sink<T>
typedef std::vector<typename Source<T>::SinkPtr> OutputVec;
public:
-
+
/** Constructor
* \n RT safe
* \param thread_pool a thread pool from which all tasks are scheduled
@@ -50,39 +50,39 @@ class /*LIBAUDIOGRAPHER_API*/ Threader : public Source<T>, public Sink<T>
, readers (0)
, wait_timeout (wait_timeout_milliseconds)
{ }
-
+
virtual ~Threader () {}
-
+
/// Adds output \n RT safe
void add_output (typename Source<T>::SinkPtr output) { outputs.push_back (output); }
-
+
/// Clears outputs \n RT safe
void clear_outputs () { outputs.clear (); }
-
+
/// Removes a specific output \n RT safe
void remove_output (typename Source<T>::SinkPtr output) {
typename OutputVec::iterator new_end = std::remove(outputs.begin(), outputs.end(), output);
outputs.erase (new_end, outputs.end());
}
-
+
/// Processes context concurrently by scheduling each output separately to the given thread pool
void process (ProcessContext<T> const & c)
{
wait_mutex.lock();
-
+
exception.reset();
-
+
unsigned int outs = outputs.size();
g_atomic_int_add (&readers, outs);
for (unsigned int i = 0; i < outs; ++i) {
thread_pool.push (sigc::bind (sigc::mem_fun (this, &Threader::process_output), c, i));
}
-
+
wait();
}
-
+
using Sink<T>::process;
-
+
private:
void wait()
@@ -93,12 +93,12 @@ class /*LIBAUDIOGRAPHER_API*/ Threader : public Source<T>, public Sink<T>
}
wait_mutex.unlock();
-
+
if (exception) {
throw *exception;
}
}
-
+
void process_output(ProcessContext<T> const & c, unsigned int output)
{
try {
@@ -109,7 +109,7 @@ class /*LIBAUDIOGRAPHER_API*/ Threader : public Source<T>, public Sink<T>
if(!exception) { exception.reset (new ThreaderException (*this, e)); }
exception_mutex.unlock();
}
-
+
if (g_atomic_int_dec_and_test (&readers)) {
wait_cond.signal();
}
@@ -122,7 +122,7 @@ class /*LIBAUDIOGRAPHER_API*/ Threader : public Source<T>, public Sink<T>
Glib::Threads::Cond wait_cond;
gint readers;
long wait_timeout;
-
+
Glib::Threads::Mutex exception_mutex;
boost::shared_ptr<ThreaderException> exception;