summaryrefslogtreecommitdiff
path: root/libs/audiographer/audiographer/threader.h
blob: e6c3aa97bfae48773bb380e854c62fb6a2ce1d0e (plain)
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
#ifndef AUDIOGRAPHER_THREADER_H
#define AUDIOGRAPHER_THREADER_H

#include <glibmm/threadpool.h>
#include <sigc++/slot.h>
#include <boost/format.hpp>

#include <glib.h>
#include <vector>
#include <algorithm>

#include "source.h"
#include "sink.h"
#include "exception.h"

namespace AudioGrapher
{

class ThreaderException : public Exception
{
  public:
	template<typename T>
	ThreaderException (T const & thrower, std::exception const & e)
		: Exception (thrower,
			boost::str ( boost::format
			("\n\t- Dynamic type: %1%\n\t- what(): %2%") % name (e) % e.what() ))
	{ }
};

template <typename T>
class Threader : public Source<T>, public Sink<T>
{
  private:
	typedef std::vector<typename Source<T>::SinkPtr> OutputVec;

  public:
	
	Threader (Glib::ThreadPool & thread_pool, long wait_timeout_milliseconds = 1000)
	  : thread_pool (thread_pool)
	  , readers (0)
	  , wait_timeout (wait_timeout_milliseconds)
	{ }
	
	virtual ~Threader () {}
	
	void add_output (typename Source<T>::SinkPtr output) { outputs.push_back (output); }
	void clear_outputs () { outputs.clear (); }
	void remove_output (typename Source<T>::SinkPtr output) {
		typename OutputVec::iterator new_end = std::remove(outputs.begin(), outputs.end(), output);
		outputs.erase (new_end, outputs.end());
	}
	
	/* The context has to be const, because this is working concurrently */
	void process (ProcessContext<T> const & c)
	{
		wait_mutex.lock();
		
		exception.reset();
		
		unsigned int outs = outputs.size();
		g_atomic_int_add (&readers, outs);
		for (unsigned int i = 0; i < outs; ++i) {
			thread_pool.push (sigc::bind (sigc::mem_fun (this, &Threader::process_output), c, i));
		}
		
		wait();
	}
	
	using Sink<T>::process;
	
  private:

	void wait()
	{
		Glib::TimeVal wait_time;
		wait_time.assign_current_time();
		wait_time.add_milliseconds(wait_timeout);
		
		wait_cond.timed_wait(wait_mutex, wait_time);
		bool timed_out = (g_atomic_int_get (&readers) != 0);
		wait_mutex.unlock();
		if (timed_out) { throw Exception (*this, "wait timed out"); }
		
		if (exception) {
			throw *exception;
		}
	}
	
	void process_output(ProcessContext<T> const & c, unsigned int output)
	{
		try {
			outputs[output]->process (c);
		} catch (std::exception const & e) {
			// Only first exception will be passed on
			exception_mutex.lock();
			if(!exception) { exception.reset (new ThreaderException (*this, e)); }
			exception_mutex.unlock();
		}
		
		if (g_atomic_int_dec_and_test (&readers)) {
			wait_cond.signal();
		}
	}

	OutputVec outputs;

	Glib::ThreadPool & thread_pool;
	Glib::Mutex wait_mutex;
	Glib::Cond  wait_cond;
	gint        readers;
	long        wait_timeout;
	
	Glib::Mutex exception_mutex;
	boost::shared_ptr<ThreaderException> exception;

};

} // namespace

#endif //AUDIOGRAPHER_THREADER_H