summaryrefslogtreecommitdiff
path: root/libs/pbd
diff options
context:
space:
mode:
authorPaul Davis <paul@linuxaudiosystems.com>2015-12-28 10:14:17 -0500
committerPaul Davis <paul@linuxaudiosystems.com>2015-12-28 10:14:17 -0500
commit0d9efc11484c901795ff4e9549a1a39715d0474d (patch)
tree956ab3cd570670bcb1ff68856553f5aec4a8e470 /libs/pbd
parentdb4834027858b10f313c822c7fb3fad1617f11aa (diff)
redesign cross-thread registration/signalling system
This new design will work even when threads that need to receive messages from RT threads are created *after* the RT threads. The existing design would fail because the RT thread(s) would never be known the later created threads, and so signals emitted by the RT thread and causing call_slot() in the receiver would end up being enqueued using a lock-protected list. The new design ensures that communication always uses a lock-free FIFO instead
Diffstat (limited to 'libs/pbd')
-rw-r--r--libs/pbd/event_loop.cc106
-rw-r--r--libs/pbd/pbd/abstract_ui.cc79
-rw-r--r--libs/pbd/pbd/abstract_ui.h12
-rw-r--r--libs/pbd/pbd/event_loop.h33
-rw-r--r--libs/pbd/pbd/pthread_utils.h4
-rw-r--r--libs/pbd/pthread_utils.cc15
6 files changed, 202 insertions, 47 deletions
diff --git a/libs/pbd/event_loop.cc b/libs/pbd/event_loop.cc
index 95b43d9038..671e26bedc 100644
--- a/libs/pbd/event_loop.cc
+++ b/libs/pbd/event_loop.cc
@@ -17,9 +17,13 @@
*/
+#include "pbd/compose.h"
#include "pbd/event_loop.h"
+#include "pbd/error.h"
#include "pbd/stacktrace.h"
+#include "i18n.h"
+
using namespace PBD;
using namespace std;
@@ -27,13 +31,18 @@ static void do_not_delete_the_loop_pointer (void*) { }
Glib::Threads::Private<EventLoop> EventLoop::thread_event_loop (do_not_delete_the_loop_pointer);
+Glib::Threads::RWLock EventLoop::thread_buffer_requests_lock;
+EventLoop::ThreadRequestBufferList EventLoop::thread_buffer_requests;
+EventLoop::RequestBufferSuppliers EventLoop::request_buffer_suppliers;
+
EventLoop::EventLoop (string const& name)
: _name (name)
{
}
EventLoop*
-EventLoop::get_event_loop_for_thread() {
+EventLoop::get_event_loop_for_thread()
+{
return thread_event_loop.get ();
}
@@ -84,3 +93,98 @@ EventLoop::invalidate_request (void* data)
return 0;
}
+vector<EventLoop::ThreadBufferMapping>
+EventLoop::get_request_buffers_for_target_thread (const std::string& target_thread)
+{
+ vector<ThreadBufferMapping> ret;
+ Glib::Threads::RWLock::WriterLock lm (thread_buffer_requests_lock);
+
+ for (ThreadRequestBufferList::const_iterator x = thread_buffer_requests.begin();
+ x != thread_buffer_requests.end(); ++x) {
+
+ if (x->second.target_thread_name == target_thread) {
+ ret.push_back (x->second);
+ }
+ }
+
+ return ret;
+}
+
+void
+EventLoop::register_request_buffer_factory (const string& target_thread_name,
+ void* (*factory)(uint32_t))
+{
+
+ RequestBufferSupplier trs;
+ trs.name = target_thread_name;
+ trs.factory = factory;
+
+ {
+ Glib::Threads::RWLock::WriterLock lm (thread_buffer_requests_lock);
+ request_buffer_suppliers.push_back (trs);
+ }
+}
+
+void
+EventLoop::pre_register (const string& emitting_thread_name, uint32_t num_requests)
+{
+ /* Threads that need to emit signals "towards" other threads, but with
+ RT safe behavior may be created before the receiving threads
+ exist. This makes it impossible for them to use the
+ ThreadCreatedWithRequestSize signal to notify receiving threads of
+ their existence.
+
+ This function creates a request buffer for them to use with
+ the (not yet) created threads, and stores it where the receiving
+ thread can find it later.
+ */
+
+ ThreadBufferMapping mapping;
+ Glib::Threads::RWLock::ReaderLock lm (thread_buffer_requests_lock);
+
+ for (RequestBufferSuppliers::iterator trs = request_buffer_suppliers.begin(); trs != request_buffer_suppliers.end(); ++trs) {
+
+ if (!trs->factory) {
+ /* no factory - no request buffer required or expected */
+ continue;
+ }
+
+ if (emitting_thread_name == trs->name) {
+ /* no need to register an emitter with itself */
+ continue;
+ }
+
+ mapping.emitting_thread = pthread_self();
+ mapping.target_thread_name = trs->name;
+
+ /* Allocate a suitably sized request buffer. This will set the
+ * thread-local variable that holds a pointer to this request
+ * buffer.
+ */
+ mapping.request_buffer = trs->factory (num_requests);
+
+ /* now store it where the receiving thread (trs->name) can find
+ it if and when it is created. (Discovery happens in the
+ AbstractUI constructor. Note that if
+ */
+
+ /* make a key composed of the emitter and receiver thread names */
+
+ string key = emitting_thread_name;
+ key += '/';
+ key += mapping.target_thread_name;
+
+ /* if the emitting thread was killed and recreated (with the
+ * same name), this will replace the entry in
+ * thread_buffer_requests. The old entry will be lazily deleted
+ * when the target thread finds the request buffer and realizes
+ * that it is dead.
+ *
+ * If the request buffer is replaced before the target thread
+ * ever finds the dead version, we will leak the old request
+ * buffer.
+ */
+
+ thread_buffer_requests[key] = mapping;
+ }
+}
diff --git a/libs/pbd/pbd/abstract_ui.cc b/libs/pbd/pbd/abstract_ui.cc
index d3d3c2e8b1..1514455b42 100644
--- a/libs/pbd/pbd/abstract_ui.cc
+++ b/libs/pbd/pbd/abstract_ui.cc
@@ -14,7 +14,6 @@
You should have received a copy of the GNU General Public License
along with this program; if not, write to the Free Software
Foundation, Inc., 675 Mass Ave, Cambridge, MA 02139, USA.
-
*/
#include <unistd.h>
@@ -65,17 +64,28 @@ template <typename RequestObject>
AbstractUI<RequestObject>::AbstractUI (const string& name)
: BaseUI (name)
{
- void (AbstractUI<RequestObject>::*pmf)(string,pthread_t,string,uint32_t) = &AbstractUI<RequestObject>::register_thread;
+ void (AbstractUI<RequestObject>::*pmf)(pthread_t,string,uint32_t) = &AbstractUI<RequestObject>::register_thread;
/* better to make this connect a handler that runs in the UI event loop but the syntax seems hard, and
register_thread() is thread safe anyway.
*/
- PBD::ThreadCreatedWithRequestSize.connect_same_thread (new_thread_connection, boost::bind (pmf, this, _1, _2, _3, _4));
+ PBD::ThreadCreatedWithRequestSize.connect_same_thread (new_thread_connection, boost::bind (pmf, this, _1, _2, _3));
+
+ /* find pre-registerer threads */
+
+ vector<EventLoop::ThreadBufferMapping> tbm = EventLoop::get_request_buffers_for_target_thread (event_loop_name());
+
+ {
+ Glib::Threads::Mutex::Lock lm (request_buffer_map_lock);
+ for (vector<EventLoop::ThreadBufferMapping>::iterator t = tbm.begin(); t != tbm.end(); ++t) {
+ request_buffers[t->emitting_thread] = static_cast<RequestBuffer*> (t->request_buffer);
+ }
+ }
}
template <typename RequestObject> void
-AbstractUI<RequestObject>::register_thread (string target_gui, pthread_t thread_id, string thread_name, uint32_t num_requests)
+AbstractUI<RequestObject>::register_thread (pthread_t thread_id, string thread_name, uint32_t num_requests)
{
/* the calling thread wants to register with the thread that runs this
* UI's event loop, so that it will have its own per-thread queue of
@@ -83,36 +93,39 @@ AbstractUI<RequestObject>::register_thread (string target_gui, pthread_t thread_
* do so in a realtime-safe manner (no locks).
*/
- DEBUG_TRACE (PBD::DEBUG::AbstractUI, string_compose ("in %1 (thread name %4), %2 (%5) wants to register with %3\n", event_loop_name(), thread_name, target_gui, pthread_name(), DEBUG_THREAD_SELF));
-
- if (target_gui != event_loop_name()) {
- /* this UI is not the UI that the calling thread is trying to
- register with
- */
- DEBUG_TRACE (PBD::DEBUG::AbstractUI, string_compose ("%1 : not the registration target\n", event_loop_name()));
- return;
- }
+ DEBUG_TRACE (PBD::DEBUG::AbstractUI, string_compose ("in %1 (thread name %4), %2 (%5) wants to register with UIs\n", event_loop_name(), thread_name, pthread_name(), DEBUG_THREAD_SELF));
/* the per_thread_request_buffer is a thread-private variable.
See pthreads documentation for more on these, but the key
thing is that it is a variable that as unique value for
- each thread, guaranteed.
+ each thread, guaranteed. Note that the thread in question
+ is the caller of this function, which is assumed to be the
+ thread from which signals will be emitted that this UI's
+ event loop will catch.
*/
RequestBuffer* b = per_thread_request_buffer.get();
- if (b) {
- /* thread already registered with this UI
- */
- DEBUG_TRACE (PBD::DEBUG::AbstractUI, string_compose ("%1 : %2 is already registered\n", event_loop_name(), thread_name));
- return;
- }
+ if (!b) {
- /* create a new request queue/ringbuffer */
+ /* create a new request queue/ringbuffer */
- DEBUG_TRACE (PBD::DEBUG::AbstractUI, string_compose ("create new request buffer for %1 in %2\n", thread_name, event_loop_name()));
+ DEBUG_TRACE (PBD::DEBUG::AbstractUI, string_compose ("create new request buffer for %1 in %2\n", thread_name, event_loop_name()));
- b = new RequestBuffer (num_requests, *this);
+ b = new RequestBuffer (num_requests);
+ /* set this thread's per_thread_request_buffer to this new
+ queue/ringbuffer. remember that only this thread will
+ get this queue when it calls per_thread_request_buffer.get()
+
+ the second argument is a function that will be called
+ when the thread exits, and ensures that the buffer is marked
+ dead. it will then be deleted during a call to handle_ui_requests()
+ */
+
+ per_thread_request_buffer.set (b);
+ } else {
+ DEBUG_TRACE (PBD::DEBUG::AbstractUI, string_compose ("%1 : %2 is already registered\n", event_loop_name(), thread_name));
+ }
{
/* add the new request queue (ringbuffer) to our map
@@ -125,16 +138,6 @@ AbstractUI<RequestObject>::register_thread (string target_gui, pthread_t thread_
request_buffers[thread_id] = b;
}
- /* set this thread's per_thread_request_buffer to this new
- queue/ringbuffer. remember that only this thread will
- get this queue when it calls per_thread_request_buffer.get()
-
- the second argument is a function that will be called
- when the thread exits, and ensures that the buffer is marked
- dead. it will then be deleted during a call to handle_ui_requests()
- */
-
- per_thread_request_buffer.set (b);
}
template <typename RequestObject> RequestObject*
@@ -229,6 +232,7 @@ AbstractUI<RequestObject>::handle_ui_requests ()
if ((*i).second->dead) {
DEBUG_TRACE (PBD::DEBUG::AbstractUI, string_compose ("%1/%2 deleting dead per-thread request buffer for %3 @ %4\n",
event_loop_name(), pthread_name(), i->second));
+ cerr << event_loop_name() << " noticed that a buffer was dead\n";
delete (*i).second;
RequestBufferMapIterator tmp = i;
++tmp;
@@ -357,6 +361,7 @@ AbstractUI<RequestObject>::send_request (RequestObject *req)
single-reader/single-writer semantics
*/
DEBUG_TRACE (PBD::DEBUG::AbstractUI, string_compose ("%1/%2 send heap request type %3\n", event_loop_name(), pthread_name(), req->type));
+ cerr << "Send request to " << event_loop_name() << " via LIST from " << pthread_name() << endl;
Glib::Threads::Mutex::Lock lm (request_list_lock);
request_list.push_back (req);
}
@@ -407,3 +412,11 @@ AbstractUI<RequestObject>::call_slot (InvalidationRecord* invalidation, const bo
send_request (req);
}
+
+template<typename RequestObject> void*
+AbstractUI<RequestObject>::request_buffer_factory (uint32_t num_requests)
+{
+ RequestBuffer* mcr = new RequestBuffer (num_requests);
+ per_thread_request_buffer.set (mcr);
+ return mcr;
+}
diff --git a/libs/pbd/pbd/abstract_ui.h b/libs/pbd/pbd/abstract_ui.h
index 5491210db7..78a337fc40 100644
--- a/libs/pbd/pbd/abstract_ui.h
+++ b/libs/pbd/pbd/abstract_ui.h
@@ -58,20 +58,20 @@ class ABSTRACT_UI_API AbstractUI : public BaseUI
AbstractUI (const std::string& name);
virtual ~AbstractUI() {}
- void register_thread (std::string, pthread_t, std::string, uint32_t num_requests);
+ void register_thread (pthread_t, std::string, uint32_t num_requests);
void call_slot (EventLoop::InvalidationRecord*, const boost::function<void()>&);
Glib::Threads::Mutex& slot_invalidation_mutex() { return request_buffer_map_lock; }
Glib::Threads::Mutex request_buffer_map_lock;
+ static void* request_buffer_factory (uint32_t num_requests);
+
protected:
struct RequestBuffer : public PBD::RingBufferNPT<RequestObject> {
bool dead;
- AbstractUI<RequestObject>& ui;
- RequestBuffer (uint32_t size, AbstractUI<RequestObject>& uir)
+ RequestBuffer (uint32_t size)
: PBD::RingBufferNPT<RequestObject> (size)
- , dead (false)
- , ui (uir) {}
+ , dead (false) {}
};
typedef typename RequestBuffer::rw_vector RequestBufferVector;
@@ -105,5 +105,3 @@ class ABSTRACT_UI_API AbstractUI : public BaseUI
};
#endif /* __pbd_abstract_ui_h__ */
-
-
diff --git a/libs/pbd/pbd/event_loop.h b/libs/pbd/pbd/event_loop.h
index 3ea6388f3f..90d72ef47c 100644
--- a/libs/pbd/pbd/event_loop.h
+++ b/libs/pbd/pbd/event_loop.h
@@ -21,6 +21,8 @@
#define __pbd_event_loop_h__
#include <string>
+#include <vector>
+#include <map>
#include <boost/function.hpp>
#include <boost/bind.hpp> /* we don't need this here, but anything calling call_slot() probably will, so this is convenient */
#include <glibmm/threads.h>
@@ -79,9 +81,40 @@ class LIBPBD_API EventLoop
static EventLoop* get_event_loop_for_thread();
static void set_event_loop_for_thread (EventLoop* ui);
+ struct ThreadBufferMapping {
+ pthread_t emitting_thread;
+ std::string target_thread_name;
+ void* request_buffer;
+ };
+
+ static std::vector<ThreadBufferMapping> get_request_buffers_for_target_thread (const std::string&);
+
+ static void register_request_buffer_factory (const std::string& target_thread_name, void* (*factory) (uint32_t));
+ static void pre_register (const std::string& emitting_thread_name, uint32_t num_requests);
+
private:
static Glib::Threads::Private<EventLoop> thread_event_loop;
std::string _name;
+
+ typedef std::map<std::string,ThreadBufferMapping> ThreadRequestBufferList;
+ static ThreadRequestBufferList thread_buffer_requests;
+ static Glib::Threads::RWLock thread_buffer_requests_lock;
+
+ struct RequestBufferSupplier {
+
+ /* @param name : name of object/entity that will/may accept
+ requests from other threads, via a request buffer.
+ */
+ std::string name;
+
+ /* @param factory : a function that can be called (with an
+ argument specifying the @param number_of_requests) to create and
+ return a request buffer for communicating with @param name)
+ */
+ void* (*factory)(uint32_t nunber_of_requests);
+ };
+ typedef std::vector<RequestBufferSupplier> RequestBufferSuppliers;
+ static RequestBufferSuppliers request_buffer_suppliers;
};
}
diff --git a/libs/pbd/pbd/pthread_utils.h b/libs/pbd/pbd/pthread_utils.h
index a5173ad24d..8f70fdac5b 100644
--- a/libs/pbd/pbd/pthread_utils.h
+++ b/libs/pbd/pbd/pthread_utils.h
@@ -55,8 +55,8 @@ LIBPBD_API const char* pthread_name ();
LIBPBD_API void pthread_set_name (const char* name);
namespace PBD {
- LIBPBD_API extern void notify_gui_about_thread_creation (std::string, pthread_t, std::string, int requests = 256);
- LIBPBD_API extern PBD::Signal4<void,std::string,pthread_t,std::string,uint32_t> ThreadCreatedWithRequestSize;
+ LIBPBD_API extern void notify_event_loops_about_thread_creation (pthread_t, const std::string&, int requests = 256);
+ LIBPBD_API extern PBD::Signal3<void,pthread_t,std::string,uint32_t> ThreadCreatedWithRequestSize;
}
#endif /* __pbd_pthread_utils__ */
diff --git a/libs/pbd/pthread_utils.cc b/libs/pbd/pthread_utils.cc
index 1abe6a95fb..7cd25e42b8 100644
--- a/libs/pbd/pthread_utils.cc
+++ b/libs/pbd/pthread_utils.cc
@@ -44,7 +44,7 @@ static pthread_mutex_t thread_map_lock = PTHREAD_MUTEX_INITIALIZER;
static Glib::Threads::Private<char> thread_name (free);
namespace PBD {
- PBD::Signal4<void,std::string, pthread_t,std::string,uint32_t> ThreadCreatedWithRequestSize;
+ PBD::Signal3<void,pthread_t,std::string,uint32_t> ThreadCreatedWithRequestSize;
}
using namespace PBD;
@@ -58,10 +58,18 @@ static int thread_creator (pthread_t* thread_id, const pthread_attr_t* attr, voi
#endif
}
+
void
-PBD::notify_gui_about_thread_creation (std::string target_gui, pthread_t thread, std::string str, int request_count)
+PBD::notify_event_loops_about_thread_creation (pthread_t thread, const std::string& emitting_thread_name, int request_count)
{
- ThreadCreatedWithRequestSize (target_gui, thread, str, request_count);
+ /* notify threads that may exist in the future (they may also exist
+ * already, in which case they will catch the
+ * ThreadCreatedWithRequestSize signal)
+ */
+ EventLoop::pre_register (emitting_thread_name, request_count);
+
+ /* notify all existing threads */
+ ThreadCreatedWithRequestSize (thread, emitting_thread_name, request_count);
}
struct ThreadStartWithName {
@@ -199,4 +207,3 @@ pthread_cancel_one (pthread_t thread)
pthread_cancel (thread);
pthread_mutex_unlock (&thread_map_lock);
}
-