summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorRobin Gareus <robin@gareus.org>2015-03-01 19:59:50 +0100
committerRobin Gareus <robin@gareus.org>2015-03-01 20:49:55 +0100
commitd7727a77e0b19ecb2d1957d1e13d554dbed58243 (patch)
treeb4bf35e94e2670c8e62a290cb5e364fa0b5b7a05
parent60388f975c7731fbfa69327c2374cf83f39d85ce (diff)
Xthread: blocking read + non-blocking write mode.
Needed for switching the butler to use Crossthreads.
-rw-r--r--libs/pbd/crossthread.posix.cc33
-rw-r--r--libs/pbd/crossthread.win.cc29
-rw-r--r--libs/pbd/pbd/crossthread.h19
3 files changed, 72 insertions, 9 deletions
diff --git a/libs/pbd/crossthread.posix.cc b/libs/pbd/crossthread.posix.cc
index 44205e680d..6dbca9dcec 100644
--- a/libs/pbd/crossthread.posix.cc
+++ b/libs/pbd/crossthread.posix.cc
@@ -1,3 +1,5 @@
+#include <poll.h>
+
CrossThreadChannel::CrossThreadChannel (bool non_blocking)
: receive_channel (0)
{
@@ -61,8 +63,37 @@ CrossThreadChannel::deliver (char msg)
return ::write (fds[1], &msg, 1);
}
+bool
+CrossThreadChannel::poll_for_request()
+{
+ struct pollfd pfd[1];
+ pfd[0].fd = fds[0];
+ pfd[0].events = POLLIN|POLLERR|POLLHUP;
+ while(true) {
+ if (poll (pfd, 1, -1) < 0) {
+ if (errno == EINTR) {
+ continue;
+ }
+ break;
+ }
+ if (pfd[0].revents & ~POLLIN) {
+ break;
+ }
+
+ if (pfd[0].revents & POLLIN) {
+ return true;
+ }
+ }
+ return false;
+}
+
int
-CrossThreadChannel::receive (char& msg)
+CrossThreadChannel::receive (char& msg, bool wait)
{
+ if (wait) {
+ if (!poll_for_request ()) {
+ return -1;
+ }
+ }
return ::read (fds[0], &msg, 1);
}
diff --git a/libs/pbd/crossthread.win.cc b/libs/pbd/crossthread.win.cc
index 5309916ddb..73ff74ddcf 100644
--- a/libs/pbd/crossthread.win.cc
+++ b/libs/pbd/crossthread.win.cc
@@ -152,11 +152,38 @@ CrossThreadChannel::deliver (char msg)
return status;
}
+bool
+CrossThreadChannel::poll_for_request()
+{
+ // windows before Vista has no poll
+ while(true) {
+ fd_set rfds;
+ FD_ZERO(&rfds);
+ FD_SET(receive_socket, &rfds);
+ if ((select(receive_socket+1, &rfds, NULL, NULL, NULL)) < 0) {
+ if (errno == EINTR) {
+ continue;
+ }
+ break;
+ }
+ if(FD_ISSET(receive_socket, &rfds)) {
+ return true;
+ }
+ }
+ return false;
+}
+
int
-CrossThreadChannel::receive (char& msg)
+CrossThreadChannel::receive (char& msg, bool wait)
{
gsize read = 0;
GError *g_error = 0;
+
+ if (wait) {
+ if (!poll_for_request ()) {
+ return -1;
+ }
+ }
// fetch the message from the channel.
GIOStatus g_status = g_io_channel_read_chars (receive_channel, &msg, sizeof(msg), &read, &g_error);
diff --git a/libs/pbd/pbd/crossthread.h b/libs/pbd/pbd/crossthread.h
index 6167c664db..d48f1bfe68 100644
--- a/libs/pbd/pbd/crossthread.h
+++ b/libs/pbd/pbd/crossthread.h
@@ -43,7 +43,7 @@
*/
class LIBPBD_API CrossThreadChannel {
- public:
+public:
/** if @a non_blocking is true, the channel will not cause blocking
* when used in an event loop based on poll/select or the glib main
* loop.
@@ -62,13 +62,16 @@ class LIBPBD_API CrossThreadChannel {
* because there is no way to know which byte value will be used
* for ::wakeup()
*/
- int deliver (char msg);
+ int deliver (char msg);
/** if using ::deliver() to wakeup the listening thread, then
* the listener should call ::receive() to fetch the message
* type from the channel.
+ *
+ * wait = true only make sense for non_blocking channels,
+ * it polls for data to become available.
*/
- int receive (char& msg);
+ int receive (char& msg, bool wait = false);
/** empty the channel of all requests.
* Typically this is done as soon as input
@@ -79,15 +82,17 @@ class LIBPBD_API CrossThreadChannel {
*/
void drain ();
- void set_receive_handler (sigc::slot<bool,Glib::IOCondition> s);
- void attach (Glib::RefPtr<Glib::MainContext>);
+ void set_receive_handler (sigc::slot<bool,Glib::IOCondition> s);
+ void attach (Glib::RefPtr<Glib::MainContext>);
private:
friend gboolean cross_thread_channel_call_receive_slot (GIOChannel*, GIOCondition condition, void *data);
GIOChannel* receive_channel;
- GSource* receive_source;
- sigc::slot<bool,Glib::IOCondition> receive_slot;
+ GSource* receive_source;
+ sigc::slot<bool,Glib::IOCondition> receive_slot;
+
+ bool poll_for_request();
#ifndef PLATFORM_WINDOWS
int fds[2]; // current implementation uses a pipe/fifo