From 1e5a9b2969616b26f7b0e5808e3281c611146fdc Mon Sep 17 00:00:00 2001 From: "Neal H. Walfield" Date: Mon, 29 Aug 2005 08:29:35 +0000 Subject: pflocal/ 2005-05-17 Neal H. Walfield * connq.h (struct connq_request): Remove forward. (connq_listen): Wait for a request to be queued not until there is a connection attempt. Remove REQ parameter. Update callers. (connq_request_complete): Remove declaration. (connq_connect): Wait for a slot to queue a request not until there is an acceptor. Remove SOCK parameter. Update callers. (connq_connect_complete): New declaration. (connq_connect_cancel): New declaration. * connq.c (struct connq): Remove fields noqueue, queue, length, head and tail. Add fields head, tail, count, max, connectors and num_connectors. That is, replace the circular buffer with a singly linked list. (qnext): Remove function. (struct connq_request): Remove field signal, lock, completed and err. Add field next. (connq_request_init): Rewrite according to new semantics. (connq_request_enqueue): New function. (connq_request_dequeue): New function. (connq_create): Update according to new semantics. (connq_destroy): Likewise. (connq_listen): Rewrite to not block until there is a connector but until there is a request in the queue. (connq_request_complete): Remove function. (connq_connect): Rewrite to not block until there is an acceptor but until there is space for a request. (connq_connect_complete): New function. (connq_connect_cancel): New function. (connq_compress): Remove dead code. (connq_set_length): Rewrite. * socket.c (S_socket_connect): Create the server socket here... (S_socket_accept): ... not here. --- pflocal/ChangeLog | 36 ++++++ pflocal/connq.c | 349 +++++++++++++++++++++++++++++------------------------- pflocal/connq.h | 39 +++--- pflocal/io.c | 10 +- pflocal/socket.c | 74 ++++++------ 5 files changed, 287 insertions(+), 221 deletions(-) (limited to 'pflocal') diff --git a/pflocal/ChangeLog b/pflocal/ChangeLog index f12686c0..444bab96 100644 --- a/pflocal/ChangeLog +++ b/pflocal/ChangeLog @@ -1,3 +1,39 @@ +2005-05-17 Neal H. Walfield + + * connq.h (struct connq_request): Remove forward. + (connq_listen): Wait for a request to be queued not until there is + a connection attempt. Remove REQ parameter. Update callers. + (connq_request_complete): Remove declaration. + (connq_connect): Wait for a slot to queue a request not until + there is an acceptor. Remove SOCK parameter. Update callers. + (connq_connect_complete): New declaration. + (connq_connect_cancel): New declaration. + + * connq.c (struct connq): Remove fields noqueue, queue, length, + head and tail. Add fields head, tail, count, max, connectors and + num_connectors. That is, replace the circular buffer with a + singly linked list. + (qnext): Remove function. + (struct connq_request): Remove field signal, lock, completed and + err. Add field next. + (connq_request_init): Rewrite according to new semantics. + (connq_request_enqueue): New function. + (connq_request_dequeue): New function. + (connq_create): Update according to new semantics. + (connq_destroy): Likewise. + (connq_listen): Rewrite to not block until there is a connector + but until there is a request in the queue. + (connq_request_complete): Remove function. + (connq_connect): Rewrite to not block until there is an acceptor + but until there is space for a request. + (connq_connect_complete): New function. + (connq_connect_cancel): New function. + (connq_compress): Remove dead code. + (connq_set_length): Rewrite. + + * socket.c (S_socket_connect): Create the server socket here... + (S_socket_accept): ... not here. + 2005-05-17 Neal H. Walfield * sock.c (sock_free): Don't destroy SOCK->CONNECT_QUEUE. diff --git a/pflocal/connq.c b/pflocal/connq.c index d4103840..a4524804 100644 --- a/pflocal/connq.c +++ b/pflocal/connq.c @@ -1,6 +1,6 @@ /* Listen queue functions - Copyright (C) 1995,96,2001 Free Software Foundation, Inc. + Copyright (C) 1995,96,2001, 2005 Free Software Foundation, Inc. Written by Miles Bader @@ -26,31 +26,22 @@ /* A queue for queueing incoming connections. */ struct connq { - /* True if all connection requests should be treated as non-blocking. */ - int noqueue; - /* The connection request queue. */ - struct connq_request **queue; - unsigned length; - /* Head is the position in QUEUE of the first request, and TAIL is the - first free position in the queue. If HEAD == TAIL, then the queue is - empty. Starting at HEAD, successive positions can be calculated by - using qnext(). */ - unsigned head, tail; + struct connq_request *head; + struct connq_request **tail; + unsigned count; + unsigned max; /* Threads that have done an accept on this queue wait on this condition. */ struct condition listeners; unsigned num_listeners; + /* Threads that have done a connect on this queue wait on this condition. */ + struct condition connectors; + unsigned num_connectors; + struct mutex lock; }; - -/* Returns the position CQ's queue after POS. */ -static inline unsigned -qnext (struct connq *cq, unsigned pos) -{ - return (pos + 1 == cq->length) ? 0 : pos + 1; -} /* ---------------------------------------------------------------- */ @@ -58,30 +49,50 @@ qnext (struct connq *cq, unsigned pos) get information from and to the thread. */ struct connq_request { + struct connq_request *next; + /* The socket that's waiting to connect. */ struct sock *sock; - - /* What the waiting thread blocks on. */ - struct condition signal; - struct mutex lock; - - /* Set to true when this request has been dealt with, to guard against - spurious conditions being signaled. */ - int completed; - - /* After the waiting thread is unblocked, this is the result, either 0 if - SOCK has been connected, or an error. */ - error_t err; }; static inline void connq_request_init (struct connq_request *req, struct sock *sock) { - req->err = 0; req->sock = sock; - req->completed = 0; - condition_init (&req->signal); - mutex_init (&req->lock); +} + +/* Enqueue connection request REQ onto CQ. CQ must be locked. */ +static void +connq_request_enqueue (struct connq *cq, struct connq_request *req) +{ + assert (! mutex_try_lock (&cq->lock)); + + req->next = NULL; + *cq->tail = req; + cq->tail = &req->next; + + cq->count ++; +} + +/* Dequeue a pending request from CQ. CQ must be locked and must not + be empty. */ +static struct connq_request * +connq_request_dequeue (struct connq *cq) +{ + struct connq_request *req; + + assert (! mutex_try_lock (&cq->lock)); + assert (cq->head); + + req = cq->head; + cq->head = req->next; + if (! cq->head) + /* We just dequeued the last element. Fixup the tail pointer. */ + cq->tail = &cq->head; + + cq->count --; + + return req; } /* ---------------------------------------------------------------- */ @@ -95,16 +106,20 @@ connq_create (struct connq **cq) struct connq *new = malloc (sizeof (struct connq)); if (!new) - return ENOMEM; + return ENOBUFS; + + new->head = NULL; + new->tail = &new->head; + new->count = 0; + /* By default, don't queue requests. */ + new->max = 0; - new->noqueue = 1; /* By default, don't queue requests. */ - new->length = 0; - new->head = new->tail = 0; - new->queue = NULL; new->num_listeners = 0; + new->num_connectors = 0; mutex_init (&new->lock); condition_init (&new->listeners); + condition_init (&new->connectors); *cq = new; return 0; @@ -116,175 +131,189 @@ connq_destroy (struct connq *cq) { /* Everybody in the queue should hold a reference to the socket containing the queue. */ - assert (cq->length == 0); - /* Nevertheless, malloc(0) or realloc(0) might allocate some small - space. */ - if (cq->queue) - free (cq->queue); + assert (! cq->head); + assert (cq->count == 0); + free (cq); } /* ---------------------------------------------------------------- */ -/* Wait for a connection attempt to be made on CQ, and return the connecting - socket in SOCK, and a request tag in REQ. If REQ is NULL, the request is - left in the queue, otherwise connq_request_complete must be called on REQ - to allow the requesting thread to continue. If NOBLOCK is true, - EWOULDBLOCK is returned when there are no immediate connections - available. */ +/* Return a connection request on CQ. If SOCK is NULL, the request is + left in the queue. If NOBLOCK is true, EWOULDBLOCK is returned + when there are no immediate connections available. */ error_t -connq_listen (struct connq *cq, int noblock, - struct connq_request **req, struct sock **sock) +connq_listen (struct connq *cq, int noblock, struct sock **sock) { + error_t err = 0; + mutex_lock (&cq->lock); - if (noblock && cq->head == cq->tail) + if (noblock && cq->count == 0 && cq->num_connectors == 0) { mutex_unlock (&cq->lock); return EWOULDBLOCK; } - cq->num_listeners++; + if (! sock && (cq->count > 0 || cq->num_connectors > 0)) + /* The caller just wants to know if a connection ready. */ + { + mutex_unlock (&cq->lock); + return 0; + } - while (cq->head == cq->tail) - if (hurd_condition_wait (&cq->listeners, &cq->lock)) - { - cq->num_listeners--; - mutex_unlock (&cq->lock); - return EINTR; - } + cq->num_listeners++; - if (req != NULL) - /* Dequeue the next request, if desired. */ + if (cq->count == 0) + /* The request queue is empty. */ { - *req = cq->queue[cq->head]; - cq->head = qnext (cq, cq->head); - if (sock != NULL) - *sock = (*req)->sock; + assert (! cq->head); + + if (cq->num_connectors > 0) + /* Someone is waiting for an acceptor. Signal that we can + service their request. */ + condition_signal (&cq->connectors); + + do + if (hurd_condition_wait (&cq->listeners, &cq->lock)) + { + cq->num_listeners--; + err = EINTR; + goto out; + } + while (cq->count == 0); } - cq->num_listeners--; + assert (cq->head); + if (sock) + /* Dequeue the next request, if desired. */ + { + struct connq_request *req = connq_request_dequeue (cq); + *sock = req->sock; + free (req); + } + else if (cq->num_listeners > 0) + /* The caller will not actually process this request but someone + else could. (This case is rare but possible: it would require + one thread to do a select on the socket and a second to do an + accept.) */ + condition_signal (&cq->listeners); + else + /* There is no one else to process the request and the connection + has now been initiated. This is not actually a problem as even + if the current queue limit is 0, the connector will queue the + request and another listener (should) eventually come along. + (In fact it is very probably as the caller has likely done a + select and will now follow up with an accept.) */ + ; + + out: mutex_unlock (&cq->lock); - - return 0; -} - -/* Return the error code ERR to the thread that made the listen request REQ, - returned from a previous connq_listen. */ -void -connq_request_complete (struct connq_request *req, error_t err) -{ - mutex_lock (&req->lock); - req->err = err; - req->completed = 1; - condition_signal (&req->signal); - mutex_unlock (&req->lock); + return err; } -/* Try to connect SOCK with the socket listening on CQ. If NOBLOCK is true, - then return EWOULDBLOCK immediately when there are no immediate - connections available. Neither SOCK nor CQ should be locked. */ +/* Try to connect SOCK with the socket listening on CQ. If NOBLOCK is + true, then return EWOULDBLOCK if there are no connections + immediately available. On success, this call must be followed up + either connq_connect_complete or connq_connect_cancel. */ error_t -connq_connect (struct connq *cq, int noblock, struct sock *sock) +connq_connect (struct connq *cq, int noblock) { - error_t err = 0; - unsigned next; - mutex_lock (&cq->lock); /* Check for listeners after we've locked CQ for good. */ - if ((noblock || cq->noqueue) && cq->num_listeners == 0) + + if (noblock + && cq->count + cq->num_connectors >= cq->max + cq->num_listeners) + /* We are in non-blocking mode and would have to wait to secure an + entry in the listen queue. */ { mutex_unlock (&cq->lock); return EWOULDBLOCK; } - next = qnext (cq, cq->tail); - if (next == cq->tail) - /* The queue is full. */ - err = ECONNREFUSED; - else - { - struct connq_request req; + cq->num_connectors ++; - connq_request_init (&req, sock); + while (cq->count + cq->num_connectors > cq->max + cq->num_listeners) + /* The queue is full and there is no immediate listener to service + us. Block until we can get a slot. */ + if (hurd_condition_wait (&cq->connectors, &cq->lock)) + { + cq->num_connectors --; + mutex_unlock (&cq->lock); + return EINTR; + } - cq->queue[cq->tail] = &req; - cq->tail = next; + mutex_unlock (&cq->lock); - /* Hold REQ.LOCK before we signal the condition so that we're sure - to be woken up. */ - mutex_lock (&req.lock); - condition_signal (&cq->listeners); - mutex_unlock (&cq->lock); + return 0; +} - while (!req.completed) - condition_wait (&req.signal, &req.lock); - err = req.err; +/* Follow up to connq_connect. Completes the connect, SOCK is the new + server socket. */ +void +connq_connect_complete (struct connq *cq, struct sock *sock) +{ + struct connq_request *req; + + req = malloc (sizeof (struct connq_request)); + if (! req) + abort (); + + connq_request_init (req, sock); + + mutex_lock (&cq->lock); - mutex_unlock (&req.lock); + assert (cq->num_connectors > 0); + cq->num_connectors --; + + connq_request_enqueue (cq, req); + + if (cq->num_listeners > 0) + /* Wake a listener up. We must consume the listener ref here as + someone else might call this function before the listener + thread dequeues this request. */ + { + cq->num_listeners --; + condition_signal (&cq->listeners); } - return err; + mutex_unlock (&cq->lock); } - -#if 0 -/* `Compresses' CQ, by removing any NULL entries. CQ should be locked. */ -static void -connq_compress (struct connq *cq) + +/* Follow up to connq_connect. Cancel the connect. */ +void +connq_connect_cancel (struct connq *cq) { - unsigned pos; - unsigned comp_tail = cq->head; - - /* Now compress the queue to remove any null entries we put in. */ - for (pos = cq->head; pos != cq->tail; pos = qnext (cq, pos)) - if (cq->queue[pos] != NULL) - /* This position has a non-NULL request, so move it to the end of the - compressed queue. */ - { - cq->queue[comp_tail] = cq->queue[pos]; - comp_tail = qnext (cq, comp_tail); - } + mutex_lock (&cq->lock); + + assert (cq->num_connectors > 0); + cq->num_connectors --; - /* Move back tail to only include what we kept in the queue. */ - cq->tail = comp_tail; + if (cq->count + cq->num_connectors >= cq->max + cq->num_listeners) + /* A connector is blocked and could use the spot we reserved. */ + condition_signal (&cq->connectors); + + mutex_unlock (&cq->lock); } -#endif -/* Set CQ's queue length to LENGTH. Any sockets already waiting for a - connections that are past the new length will fail with ECONNREFUSED. */ +/* Set CQ's queue length to LENGTH. */ error_t -connq_set_length (struct connq *cq, int length) +connq_set_length (struct connq *cq, int max) { - mutex_lock (&cq->lock); + int omax; - if (length > cq->length) - /* Growing the queue is simple... */ - cq->queue = realloc (cq->queue, sizeof (struct connq_request *) * length); - else - /* Shrinking it less so. */ - { - int i; - struct connq_request **new_queue = - malloc (sizeof (struct connq_request *) * length); - - for (i = 0; i < cq->length && cq->head != cq->tail; i++) - { - if (i < length) - /* Keep this connect request in the queue. */ - new_queue[length - i] = cq->queue[cq->head]; - else - /* Punt this one. */ - connq_request_complete (cq->queue[cq->head], ECONNREFUSED); - cq->head = qnext (cq, cq->head); - } - - free (cq->queue); - cq->queue = new_queue; - } - - cq->noqueue = 0; /* Turn on queueing. */ + mutex_lock (&cq->lock); + omax = cq->max; + cq->max = max; + + if (max > omax && cq->count >= omax && cq->count < max + && cq->num_connectors >= cq->num_listeners) + /* This is an increase in the number of connection slots which has + made some slots available and there are waiting threads. Wake + them up. */ + condition_broadcast (&cq->listeners); mutex_unlock (&cq->lock); diff --git a/pflocal/connq.h b/pflocal/connq.h index 1039bff9..17c88c0b 100644 --- a/pflocal/connq.h +++ b/pflocal/connq.h @@ -1,6 +1,6 @@ /* Connection queues - Copyright (C) 1995 Free Software Foundation, Inc. + Copyright (C) 1995, 2005 Free Software Foundation, Inc. Written by Miles Bader @@ -23,9 +23,8 @@ #include -/* Unknown types */ +/* Forward. */ struct connq; -struct connq_request; struct sock; /* Create a new listening queue, returning it in CQ. The resulting queue @@ -36,26 +35,26 @@ error_t connq_create (struct connq **cq); /* Destroy a queue. */ void connq_destroy (struct connq *cq); -/* Wait for a connection attempt to be made on CQ, and return the connecting - socket in SOCK, and a request tag in REQ. If REQ is NULL, the request is - left in the queue, otherwise connq_request_complete must be called on REQ - to allow the requesting thread to continue. If NOBLOCK is true, - EWOULDBLOCK is returned when there are no immediate connections - available. CQ should be unlocked. */ -error_t connq_listen (struct connq *cq, int noblock, - struct connq_request **req, struct sock **sock); +/* Return a connection request on CQ. If SOCK is NULL, the request is + left in the queue. If NOBLOCK is true, EWOULDBLOCK is returned + when there are no immediate connections available. */ +error_t connq_listen (struct connq *cq, int noblock, struct sock **sock); -/* Return the error code ERR to the thread that made the listen request REQ, - returned from a previous connq_listen. */ -void connq_request_complete (struct connq_request *req, error_t err); +/* Try to connect SOCK with the socket listening on CQ. If NOBLOCK is + true, then return EWOULDBLOCK if there are no connections + immediately available. On success, this call must be followed up + either connq_connect_complete or connq_connect_cancel. */ +error_t connq_connect (struct connq *cq, int noblock); + +/* Follow up to connq_connect. Completes the connection, SOCK is the + new server socket. */ +void connq_connect_complete (struct connq *cq, struct sock *sock); + +/* Follow up to connq_connect. Cancel the connect. */ +void connq_connect_cancel (struct connq *cq); /* Set CQ's queue length to LENGTH. Any sockets already waiting for a - connections that are past the new length will fail with ECONNREFUSED. */ + connections that are past the new length remain. */ error_t connq_set_length (struct connq *cq, int length); -/* Try to connect SOCK with the socket listening on CQ. If NOBLOCK is true, - then return EWOULDBLOCK immediately when there are no immediate - connections available. Neither SOCK nor CQ should be locked. */ -error_t connq_connect (struct connq *cq, int noblock, struct sock *sock); - #endif /* __CONNQ_H__ */ diff --git a/pflocal/io.c b/pflocal/io.c index bd9ecbdd..211b3dd4 100644 --- a/pflocal/io.c +++ b/pflocal/io.c @@ -1,6 +1,6 @@ /* Socket I/O operations - Copyright (C) 1995,96,98,99,2000,02 Free Software Foundation, Inc. + Copyright (C) 1995,96,98,99,2000,02, 2005 Free Software Foundation, Inc. Written by Miles Bader This program is free software; you can redistribute it and/or @@ -197,16 +197,16 @@ S_io_select (struct sock_user *user, if (*select_type & SELECT_READ) { - /* Wait for a connect. Passing in NULL for REQ means that the - request won't be dequeued. */ - if (connq_listen (sock->listen_queue, 1, NULL, NULL) == 0) + /* Wait for a connect. Passing in NULL for SOCK means that + the request won't be dequeued. */ + if (connq_listen (sock->listen_queue, 1, NULL) == 0) /* We can satisfy this request immediately. */ return 0; else /* Gotta wait... */ { ports_interrupt_self_on_port_death (user, reply); - return connq_listen (sock->listen_queue, 0, NULL, NULL); + return connq_listen (sock->listen_queue, 0, NULL); } } } diff --git a/pflocal/socket.c b/pflocal/socket.c index 0bc72066..1cf78657 100644 --- a/pflocal/socket.c +++ b/pflocal/socket.c @@ -1,6 +1,6 @@ /* Socket-specific operations - Copyright (C) 1995 Free Software Foundation, Inc. + Copyright (C) 1995, 2005 Free Software Foundation, Inc. Written by Miles Bader @@ -110,7 +110,7 @@ S_socket_connect (struct sock_user *user, struct addr *addr) else if (sock->flags & SOCK_CONNECTED) /* SOCK_CONNECTED is only set for connection-oriented sockets, which can only ever connect once. [If we didn't do this test - here, it would eventually fail when it the listening socket + here, it would eventually fail when the listening socket tried to accept our connection request.] */ err = EISCONN; else @@ -118,16 +118,35 @@ S_socket_connect (struct sock_user *user, struct addr *addr) /* Assert that we're trying to connect, so anyone else trying to do so will fail with EALREADY. */ sock->connect_queue = cq; - mutex_unlock (&sock->lock); /* Unlock SOCK while waiting. */ + /* Unlock SOCK while waiting. */ + mutex_unlock (&sock->lock); - /* Try to connect. */ - err = connq_connect (cq, sock->flags & SOCK_NONBLOCK, sock); + err = connq_connect (peer->listen_queue, + sock->flags & SOCK_NONBLOCK); + if (!err) + { + struct sock *server; + + err = sock_clone (peer, &server); + if (!err) + { + err = sock_connect (sock, server); + if (!err) + connq_connect_complete (peer->listen_queue, server); + else + sock_free (server); + } - /* We can safely set CONNECT_QUEUE to NULL, as no one else can + mutex_lock (&sock->lock); + if (err) + connq_connect_cancel (peer->listen_queue); + } + + /* We must set CONNECT_QUEUE to NULL, as no one else can set it until we've done so. */ - mutex_lock (&sock->lock); sock->connect_queue = NULL; } + mutex_unlock (&sock->lock); } else @@ -157,42 +176,25 @@ S_socket_accept (struct sock_user *user, err = ensure_connq (sock); if (!err) { - struct connq_request *req; struct sock *peer_sock; - err = - connq_listen (sock->listen_queue, sock->flags & SOCK_NONBLOCK, - &req, &peer_sock); + err = connq_listen (sock->listen_queue, sock->flags & SOCK_NONBLOCK, + &peer_sock); if (!err) { - struct sock *conn_sock; - - err = sock_clone (sock, &conn_sock); + struct addr *peer_addr; + *port_type = MACH_MSG_TYPE_MAKE_SEND; + err = sock_create_port (peer_sock, port); + if (!err) + err = sock_get_addr (peer_sock, &peer_addr); if (!err) { - err = sock_connect (conn_sock, peer_sock); - if (!err) - { - struct addr *peer_addr; - *port_type = MACH_MSG_TYPE_MAKE_SEND; - err = sock_create_port (conn_sock, port); - if (!err) - err = sock_get_addr (peer_sock, &peer_addr); - if (!err) - { - *peer_addr_port = ports_get_right (peer_addr); - *peer_addr_port_type = MACH_MSG_TYPE_MAKE_SEND; - ports_port_deref (peer_addr); - } - else - /* TEAR DOWN THE CONNECTION XXX */; - } - if (err) - sock_free (conn_sock); + *peer_addr_port = ports_get_right (peer_addr); + *peer_addr_port_type = MACH_MSG_TYPE_MAKE_SEND; + ports_port_deref (peer_addr); } - - /* Communicate any error (or success) to the connecting thread. */ - connq_request_complete (req, err); + else + /* TEAR DOWN THE CONNECTION XXX */; } } -- cgit v1.2.3