1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
|
#ifndef AUDIOGRAPHER_THREADER_H
#define AUDIOGRAPHER_THREADER_H
#include <glibmm/threadpool.h>
#include <sigc++/slot.h>
#include <boost/format.hpp>
#include <glib.h>
#include <vector>
#include <algorithm>
#include "source.h"
#include "sink.h"
#include "exception.h"
namespace AudioGrapher
{
class ThreaderException : public Exception
{
public:
template<typename T>
ThreaderException (T const & thrower, std::exception const & e)
: Exception (thrower,
boost::str ( boost::format
("\n\t- Dynamic type: %1%\n\t- what(): %2%") % name (e) % e.what() ))
{ }
};
template <typename T>
class Threader : public Source<T>, public Sink<T>
{
private:
typedef std::vector<typename Source<T>::SinkPtr> OutputVec;
public:
Threader (Glib::ThreadPool & thread_pool, long wait_timeout_milliseconds = 1000)
: thread_pool (thread_pool)
, readers (0)
, wait_timeout (wait_timeout_milliseconds)
{ }
virtual ~Threader () {}
void add_output (typename Source<T>::SinkPtr output) { outputs.push_back (output); }
void clear_outputs () { outputs.clear (); }
void remove_output (typename Source<T>::SinkPtr output) {
typename OutputVec::iterator new_end = std::remove(outputs.begin(), outputs.end(), output);
outputs.erase (new_end, outputs.end());
}
/* The context has to be const, because this is working concurrently */
void process (ProcessContext<T> const & c)
{
wait_mutex.lock();
exception.reset();
unsigned int outs = outputs.size();
g_atomic_int_add (&readers, outs);
for (unsigned int i = 0; i < outs; ++i) {
thread_pool.push (sigc::bind (sigc::mem_fun (this, &Threader::process_output), c, i));
}
wait();
}
using Sink<T>::process;
private:
void wait()
{
Glib::TimeVal wait_time;
wait_time.assign_current_time();
wait_time.add_milliseconds(wait_timeout);
wait_cond.timed_wait(wait_mutex, wait_time);
bool timed_out = (g_atomic_int_get (&readers) != 0);
wait_mutex.unlock();
if (timed_out) { throw Exception (*this, "wait timed out"); }
if (exception) {
throw *exception;
}
}
void process_output(ProcessContext<T> const & c, unsigned int output)
{
try {
outputs[output]->process (c);
} catch (std::exception const & e) {
// Only first exception will be passed on
exception_mutex.lock();
if(!exception) { exception.reset (new ThreaderException (*this, e)); }
exception_mutex.unlock();
}
if (g_atomic_int_dec_and_test (&readers)) {
wait_cond.signal();
}
}
OutputVec outputs;
Glib::ThreadPool & thread_pool;
Glib::Mutex wait_mutex;
Glib::Cond wait_cond;
gint readers;
long wait_timeout;
Glib::Mutex exception_mutex;
boost::shared_ptr<ThreaderException> exception;
};
} // namespace
#endif //AUDIOGRAPHER_THREADER_H
|