summaryrefslogtreecommitdiff
path: root/libs/audiographer/audiographer/general/threader.h
blob: 0f4aaff973fffe3bc1ada93d9381e71fdbefaf7d (plain)
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
#ifndef AUDIOGRAPHER_THREADER_H
#define AUDIOGRAPHER_THREADER_H

#include <glibmm/threadpool.h>
#include <sigc++/slot.h>
#include <boost/format.hpp>

#include <glib.h>
#include <vector>
#include <algorithm>

#include "audiographer/source.h"
#include "audiographer/sink.h"
#include "audiographer/exception.h"

namespace AudioGrapher
{

/// Class that stores exceptions thrown from different threads
class ThreaderException : public Exception
{
  public:
	template<typename T>
	ThreaderException (T const & thrower, std::exception const & e)
		: Exception (thrower,
			boost::str ( boost::format
			("\n\t- Dynamic type: %1%\n\t- what(): %2%")
			% DebugUtils::demangled_name (e) % e.what() ))
	{ }
};

/// Class for distributing processing across several threads
template <typename T = DefaultSampleType>
class Threader : public Source<T>, public Sink<T>
{
  private:
	typedef std::vector<typename Source<T>::SinkPtr> OutputVec;

  public:
	
	/** Constructor
	  * \n RT safe
	  * \param thread_pool a thread pool from which all tasks are scheduled
	  * \param wait_timeout_milliseconds maximum time allowed for threads to use in processing
	  */
	Threader (Glib::ThreadPool & thread_pool, long wait_timeout_milliseconds = 500)
	  : thread_pool (thread_pool)
	  , readers (0)
	  , wait_timeout (wait_timeout_milliseconds)
	{ }
	
	virtual ~Threader () {}
	
	/// Adds output \n RT safe
	void add_output (typename Source<T>::SinkPtr output) { outputs.push_back (output); }
	
	/// Clears outputs \n RT safe
	void clear_outputs () { outputs.clear (); }
	
	/// Removes a specific output \n RT safe
	void remove_output (typename Source<T>::SinkPtr output) {
		typename OutputVec::iterator new_end = std::remove(outputs.begin(), outputs.end(), output);
		outputs.erase (new_end, outputs.end());
	}
	
	/// Processes context concurrently by scheduling each output separately to the given thread pool
	void process (ProcessContext<T> const & c)
	{
		wait_mutex.lock();
		
		exception.reset();
		
		unsigned int outs = outputs.size();
		g_atomic_int_add (&readers, outs);
		for (unsigned int i = 0; i < outs; ++i) {
			thread_pool.push (sigc::bind (sigc::mem_fun (this, &Threader::process_output), c, i));
		}
		
		wait();
	}
	
	using Sink<T>::process;
	
  private:

	void wait()
	{
		while (g_atomic_int_get (&readers) != 0) {
			Glib::TimeVal wait_time;
			wait_time.assign_current_time();
			wait_time.add_milliseconds(wait_timeout);
		
			wait_cond.timed_wait(wait_mutex, wait_time);
		}

		wait_mutex.unlock();
		
		if (exception) {
			throw *exception;
		}
	}
	
	void process_output(ProcessContext<T> const & c, unsigned int output)
	{
		try {
			outputs[output]->process (c);
		} catch (std::exception const & e) {
			// Only first exception will be passed on
			exception_mutex.lock();
			if(!exception) { exception.reset (new ThreaderException (*this, e)); }
			exception_mutex.unlock();
		}
		
		if (g_atomic_int_dec_and_test (&readers)) {
			wait_cond.signal();
		}
	}

	OutputVec outputs;

	Glib::ThreadPool & thread_pool;
	Glib::Mutex wait_mutex;
	Glib::Cond  wait_cond;
	gint        readers;
	long        wait_timeout;
	
	Glib::Mutex exception_mutex;
	boost::shared_ptr<ThreaderException> exception;

};

} // namespace

#endif //AUDIOGRAPHER_THREADER_H