mirror of
https://git.savannah.gnu.org/git/guile.git
synced 2025-06-11 22:31:12 +02:00
* iselect.c: Now several threads can wait on the same file
descriptor. The behaviour is compatible with OS select: All threads waiting for the fd return with the same status.
This commit is contained in:
parent
41fa950d83
commit
a48b6916e2
2 changed files with 166 additions and 73 deletions
|
@ -1,3 +1,9 @@
|
|||
1998-01-24 Mikael Djurfeldt <mdj@mdj.nada.kth.se>
|
||||
|
||||
* iselect.c: Now several threads can wait on the same file
|
||||
descriptor. The behaviour is compatible with OS select: All
|
||||
threads waiting for the fd return with the same status.
|
||||
|
||||
1998-01-23 Mikael Djurfeldt <mdj@mdj.nada.kth.se>
|
||||
|
||||
* coop-threads.c, threads.h (scm_spawn_thread): New function.
|
||||
|
|
|
@ -61,9 +61,11 @@
|
|||
|
||||
|
||||
|
||||
/* COOP queue macros */
|
||||
#define QEMPTYP(q) (q.t.next == &q.t)
|
||||
#define QFIRST(q) (q.t.next)
|
||||
|
||||
/* These macros count the number of bits in a word. */
|
||||
#define SCM_BITS_PER_LONG (8 * sizeof (unsigned long))
|
||||
#if ULONG_MAX >> 16 == 0
|
||||
#define SCM_NLONGBITS(p) (bc[((unsigned char *)(p))[0]]\
|
||||
|
@ -95,13 +97,27 @@ typedef unsigned long *ulongptr;
|
|||
static char bc[256]; /* Bit counting array. bc[x] is the number of
|
||||
bits in x. */
|
||||
|
||||
/* This flag indicates that several threads are waiting on the same
|
||||
file descriptor. When this is the case, the common fd sets are
|
||||
updated in a more inefficient way. */
|
||||
int collisionp;
|
||||
|
||||
/* These are the common fd sets. When new select calls are made,
|
||||
those sets are merged into these. */
|
||||
int gnfds;
|
||||
SELECT_TYPE greadfds;
|
||||
SELECT_TYPE gwritefds;
|
||||
SELECT_TYPE gexceptfds;
|
||||
|
||||
/* These are the result sets. They are used when we call OS select.
|
||||
We couldn't use the common fd sets above, since that would destroy
|
||||
them. */
|
||||
SELECT_TYPE rreadfds;
|
||||
SELECT_TYPE rwritefds;
|
||||
SELECT_TYPE rexceptfds;
|
||||
|
||||
/* Constant timeval struct representing a zero timeout which we use
|
||||
when polling. */
|
||||
static struct timeval timeout0;
|
||||
|
||||
/* Insert thread t into the ordered queue q.
|
||||
|
@ -125,27 +141,8 @@ coop_timeout_qinsert (coop_q_t *q, coop_t *t)
|
|||
q->tail = t;
|
||||
}
|
||||
|
||||
#ifdef DEBUG_ISELECT
|
||||
static void
|
||||
qp (char* qn, coop_q_t *q)
|
||||
{
|
||||
coop_t *t = q->t.next;
|
||||
fprintf (stderr, "%s:", qn);
|
||||
while (t != &q->t)
|
||||
{
|
||||
if (t->timeoutp)
|
||||
fprintf (stderr, " %04x (%d,%d)", (unsigned) t,
|
||||
t->wakeup_time.tv_sec, t->wakeup_time.tv_usec);
|
||||
else
|
||||
fprintf (stderr, " %04x", (unsigned) t);
|
||||
t = t->next;
|
||||
}
|
||||
fprintf (stderr, "\n");
|
||||
}
|
||||
#endif
|
||||
|
||||
/* As select, but doesn't destroy the file descriptor sets passed as
|
||||
arguments. */
|
||||
arguments. The results are stored into the result sets. */
|
||||
static int
|
||||
safe_select (int nfds,
|
||||
SELECT_TYPE *readfds,
|
||||
|
@ -179,83 +176,107 @@ safe_select (int nfds,
|
|||
return select (nfds, &rreadfds, &rwritefds, &rexceptfds, timeout);
|
||||
}
|
||||
|
||||
/* Merge new file descriptor sets into the global sets.
|
||||
Return 0 on success. Return -1 if some other thread is already
|
||||
waiting for one or more of the requested descriptors. */
|
||||
static int
|
||||
add_fd_sets (int nfds,
|
||||
SELECT_TYPE *readfds,
|
||||
SELECT_TYPE *writefds,
|
||||
SELECT_TYPE *exceptfds)
|
||||
/* Merge new file descriptor sets into the common sets. */
|
||||
static void
|
||||
add_fd_sets (coop_t *t)
|
||||
{
|
||||
int i, n = (nfds + SCM_BITS_PER_LONG - 1) / SCM_BITS_PER_LONG;
|
||||
for (i = 0; i < n; ++i)
|
||||
if ((readfds != NULL
|
||||
&& (((ulongptr) readfds)[i] & ((ulongptr) &greadfds)[i]) != 0)
|
||||
|| (writefds != NULL
|
||||
&& (((ulongptr) writefds)[i] & ((ulongptr) &gwritefds)[i]) != 0)
|
||||
|| (exceptfds != NULL
|
||||
&& (((ulongptr) exceptfds)[i] & ((ulongptr) &gexceptfds)[i]) != 0))
|
||||
return -1;
|
||||
coop_global_curr->nfds = 0;
|
||||
coop_global_curr->readfds = readfds;
|
||||
coop_global_curr->writefds = writefds;
|
||||
coop_global_curr->exceptfds = exceptfds;
|
||||
int n = (t->nfds + SCM_BITS_PER_LONG - 1) / SCM_BITS_PER_LONG;
|
||||
int i;
|
||||
|
||||
/* Detect if the fd sets of the thread have any bits in common with
|
||||
the rest of the waiting threads. If that is so, set the
|
||||
collision flag. This causes a more time consuming handling of
|
||||
the common fd sets---they need to recalculated every time a
|
||||
thread wakes up. */
|
||||
if (!collisionp)
|
||||
for (i = 0; i < n; ++i)
|
||||
if ((t->readfds != NULL
|
||||
&& (((ulongptr) t->readfds)[i] & ((ulongptr) &greadfds)[i]) != 0)
|
||||
|| (t->writefds != NULL
|
||||
&& ((((ulongptr) t->writefds)[i] & ((ulongptr) &gwritefds)[i])
|
||||
!= 0))
|
||||
|| (t->exceptfds != NULL
|
||||
&& ((((ulongptr) t->exceptfds)[i] & ((ulongptr) &gexceptfds)[i])
|
||||
!= 0)))
|
||||
{
|
||||
collisionp = 1;
|
||||
break;
|
||||
}
|
||||
|
||||
/* We recalculate nfds below. The cost for this can be paid back
|
||||
with a great bonus since many programs are lazy with the nfds
|
||||
arg. Many even pass 1024 when using one of the lowest fd:s!
|
||||
|
||||
We approach from above, checking for non-zero bits. As soon as
|
||||
we have determined the value of nfds, we jump down to code below
|
||||
which concludes the updating of the common sets. */
|
||||
t->nfds = 0;
|
||||
i = n;
|
||||
while (i > 0)
|
||||
{
|
||||
--i;
|
||||
if (readfds != NULL && ((ulongptr) readfds)[i] != 0)
|
||||
if (t->readfds != NULL && ((ulongptr) t->readfds)[i] != 0)
|
||||
{
|
||||
((ulongptr) &greadfds)[i] |= ((ulongptr) readfds)[i];
|
||||
((ulongptr) &greadfds)[i] |= ((ulongptr) t->readfds)[i];
|
||||
n = (i + 1) * SCM_BITS_PER_LONG;
|
||||
coop_global_curr->nfds = n;
|
||||
t->nfds = n;
|
||||
if (n > gnfds)
|
||||
gnfds = n;
|
||||
goto cont_read;
|
||||
}
|
||||
if (writefds != NULL && ((ulongptr) writefds)[i] != 0)
|
||||
if (t->writefds != NULL && ((ulongptr) t->writefds)[i] != 0)
|
||||
{
|
||||
((ulongptr) &gwritefds)[i] |= ((ulongptr) writefds)[i];
|
||||
((ulongptr) &gwritefds)[i] |= ((ulongptr) t->writefds)[i];
|
||||
n = (i + 1) * SCM_BITS_PER_LONG;
|
||||
coop_global_curr->nfds = n;
|
||||
t->nfds = n;
|
||||
if (n > gnfds)
|
||||
gnfds = n;
|
||||
goto cont_write;
|
||||
}
|
||||
if (exceptfds != NULL && ((ulongptr) exceptfds)[i] != 0)
|
||||
if (t->exceptfds != NULL && ((ulongptr) t->exceptfds)[i] != 0)
|
||||
{
|
||||
((ulongptr) &gexceptfds)[i] |= ((ulongptr) exceptfds)[i];
|
||||
((ulongptr) &gexceptfds)[i] |= ((ulongptr) t->exceptfds)[i];
|
||||
n = (i + 1) * SCM_BITS_PER_LONG;
|
||||
coop_global_curr->nfds = n;
|
||||
t->nfds = n;
|
||||
if (n > gnfds)
|
||||
gnfds = n;
|
||||
goto cont_except;
|
||||
}
|
||||
}
|
||||
return;
|
||||
|
||||
/* nfds is now determined. Just finish updating the common sets. */
|
||||
while (i > 0)
|
||||
{
|
||||
--i;
|
||||
if (readfds != NULL && ((ulongptr) readfds)[i] != 0)
|
||||
((ulongptr) &greadfds)[i] |= ((ulongptr) readfds)[i];
|
||||
if (t->readfds != NULL && ((ulongptr) t->readfds)[i] != 0)
|
||||
((ulongptr) &greadfds)[i] |= ((ulongptr) t->readfds)[i];
|
||||
cont_read:
|
||||
if (writefds != NULL && ((ulongptr) writefds)[i] != 0)
|
||||
((ulongptr) &gwritefds)[i] |= ((ulongptr) writefds)[i];
|
||||
if (t->writefds != NULL && ((ulongptr) t->writefds)[i] != 0)
|
||||
((ulongptr) &gwritefds)[i] |= ((ulongptr) t->writefds)[i];
|
||||
cont_write:
|
||||
if (exceptfds != NULL && ((ulongptr) exceptfds)[i] != 0)
|
||||
((ulongptr) &gexceptfds)[i] |= ((ulongptr) exceptfds)[i];
|
||||
if (t->exceptfds != NULL && ((ulongptr) t->exceptfds)[i] != 0)
|
||||
((ulongptr) &gexceptfds)[i] |= ((ulongptr) t->exceptfds)[i];
|
||||
cont_except:
|
||||
}
|
||||
return 0;
|
||||
}
|
||||
|
||||
/* Update the fd sets pointed to by the thread so that they reflect
|
||||
the status of the file descriptors which the thread was interested
|
||||
in. Also clear those bits in the common sets. This function is
|
||||
only called when there are no bit collisions. */
|
||||
static void
|
||||
finalize_fd_sets (coop_t *t)
|
||||
{
|
||||
int i = (t->nfds + SCM_BITS_PER_LONG - 1) / SCM_BITS_PER_LONG;
|
||||
int n_ones = 0;
|
||||
register unsigned long s;
|
||||
|
||||
if (t->nfds == gnfds)
|
||||
{
|
||||
/* This thread is the one responsible for the current high value
|
||||
of gnfds. First do our other jobs while at the same time
|
||||
trying to decrease gnfds. */
|
||||
while (i > 0)
|
||||
{
|
||||
--i;
|
||||
|
@ -297,6 +318,9 @@ finalize_fd_sets (coop_t *t)
|
|||
t->retval = n_ones;
|
||||
return;
|
||||
}
|
||||
|
||||
/* Either this thread wasn't responsible for gnfds or gnfds has been
|
||||
determined. */
|
||||
while (i > 0)
|
||||
{
|
||||
--i;
|
||||
|
@ -325,6 +349,37 @@ finalize_fd_sets (coop_t *t)
|
|||
t->retval = n_ones;
|
||||
}
|
||||
|
||||
/* Just like finalize_fd_sets except that we don't have to update the
|
||||
global fd sets. Those will be recalulated elsewhere. */
|
||||
static void
|
||||
finalize_fd_sets_lazily (coop_t *t)
|
||||
{
|
||||
int i = (t->nfds + SCM_BITS_PER_LONG - 1) / SCM_BITS_PER_LONG;
|
||||
int n_ones = 0;
|
||||
register unsigned long s;
|
||||
while (i > 0)
|
||||
{
|
||||
--i;
|
||||
if (t->readfds != NULL && (s = ((ulongptr) t->readfds)[i]) != 0)
|
||||
{
|
||||
((ulongptr) t->readfds)[i] &= ((ulongptr) &rreadfds)[i];
|
||||
n_ones += SCM_NLONGBITS (&((ulongptr) t->readfds)[i]);
|
||||
}
|
||||
if (t->writefds != NULL && (s = ((ulongptr) t->writefds)[i]) != 0)
|
||||
{
|
||||
((ulongptr) t->writefds)[i] &= ((ulongptr) &rwritefds)[i];
|
||||
n_ones += SCM_NLONGBITS (&((ulongptr) t->writefds)[i]);
|
||||
}
|
||||
if (t->exceptfds != NULL && (s = ((ulongptr) t->exceptfds)[i]) != 0)
|
||||
{
|
||||
((ulongptr) t->exceptfds)[i] &= ((ulongptr) &rexceptfds)[i];
|
||||
n_ones += SCM_NLONGBITS (&((ulongptr) t->exceptfds)[i]);
|
||||
}
|
||||
}
|
||||
t->retval = n_ones;
|
||||
}
|
||||
|
||||
/* Return first fd with a non-zero bit in any of the result sets. */
|
||||
static int
|
||||
first_interesting_fd (void)
|
||||
{
|
||||
|
@ -360,6 +415,7 @@ first_interesting_fd (void)
|
|||
exit (1);
|
||||
}
|
||||
|
||||
/* Revive all threads with an error status. */
|
||||
static void
|
||||
error_revive (void)
|
||||
{
|
||||
|
@ -377,6 +433,9 @@ error_revive (void)
|
|||
FD_ZERO (&gexceptfds);
|
||||
}
|
||||
|
||||
/* Given the result of a call to safe_select and the current time,
|
||||
try to wake up some threads and return the first one. Return NULL
|
||||
if we couldn't find any. */
|
||||
static coop_t *
|
||||
find_thread (int n, struct timeval *now)
|
||||
{
|
||||
|
@ -392,9 +451,18 @@ find_thread (int n, struct timeval *now)
|
|||
&& t->wakeup_time.tv_usec <= now->tv_usec)))
|
||||
{
|
||||
coop_qget (&coop_global_sleepq);
|
||||
finalize_fd_sets (t);
|
||||
if (collisionp)
|
||||
finalize_fd_sets_lazily (t);
|
||||
else
|
||||
finalize_fd_sets (t);
|
||||
coop_qput (&coop_global_runq, t);
|
||||
}
|
||||
if (collisionp)
|
||||
{
|
||||
while ((t = coop_qget (&coop_global_sleepq)) != NULL)
|
||||
coop_qput (&coop_tmp_queue, t);
|
||||
goto rebuild_global_fd_sets;
|
||||
}
|
||||
}
|
||||
else if (n > 0)
|
||||
{
|
||||
|
@ -414,21 +482,39 @@ find_thread (int n, struct timeval *now)
|
|||
|| (t->wakeup_time.tv_sec == now->tv_sec
|
||||
&& t->wakeup_time.tv_usec <= now->tv_usec))))
|
||||
{
|
||||
finalize_fd_sets (t);
|
||||
if (collisionp)
|
||||
finalize_fd_sets_lazily (t);
|
||||
else
|
||||
finalize_fd_sets (t);
|
||||
coop_qput (&coop_global_runq, t);
|
||||
}
|
||||
else
|
||||
coop_qput(&coop_tmp_queue, t);
|
||||
}
|
||||
while ((t = coop_qget (&coop_tmp_queue)) != NULL)
|
||||
coop_qput (&coop_global_sleepq, t);
|
||||
if (collisionp)
|
||||
{
|
||||
rebuild_global_fd_sets:
|
||||
collisionp = 0;
|
||||
gnfds = 0;
|
||||
FD_ZERO (&greadfds);
|
||||
FD_ZERO (&gwritefds);
|
||||
FD_ZERO (&gexceptfds);
|
||||
while ((t = coop_qget (&coop_tmp_queue)) != NULL)
|
||||
{
|
||||
add_fd_sets (t);
|
||||
coop_qput (&coop_global_sleepq, t);
|
||||
}
|
||||
}
|
||||
else
|
||||
{
|
||||
while ((t = coop_qget (&coop_tmp_queue)) != NULL)
|
||||
coop_qput (&coop_global_sleepq, t);
|
||||
}
|
||||
}
|
||||
else /* n < 0 */
|
||||
/* An error has occured. It is not EBADF since
|
||||
scm_internal_select called select before putting the
|
||||
threads on the sleep queue. The most robust and select
|
||||
compatible behaviour is probably to let all sleeping
|
||||
threads return with an error. */
|
||||
/* An error has occured. Wake all threads. Since we don't care
|
||||
to calculate if there is a sinner we report the error to all of
|
||||
them. */
|
||||
error_revive ();
|
||||
|
||||
return coop_qget (&coop_global_runq);
|
||||
|
@ -538,13 +624,11 @@ scm_internal_select (int nfds,
|
|||
++scm_ints_disabled;
|
||||
|
||||
/* Add our file descriptor flags to the common set. */
|
||||
if (add_fd_sets (nfds, readfds, writefds, exceptfds))
|
||||
{
|
||||
errno = EBADF; /* Several threads can't select on same fds. */
|
||||
if (!--scm_ints_disabled)
|
||||
SCM_ASYNC_TICK;
|
||||
return -1;
|
||||
}
|
||||
coop_global_curr->nfds = nfds;
|
||||
coop_global_curr->readfds = readfds;
|
||||
coop_global_curr->writefds = writefds;
|
||||
coop_global_curr->exceptfds = exceptfds;
|
||||
add_fd_sets (coop_global_curr);
|
||||
|
||||
/* Place ourselves on the sleep queue and get a new thread to run. */
|
||||
if (timeout == NULL)
|
||||
|
@ -600,12 +684,15 @@ static void init_bc (int bit, int i, int n)
|
|||
void
|
||||
scm_init_iselect ()
|
||||
{
|
||||
#if 0 /* This is just symbolic */
|
||||
collisionp = 0;
|
||||
gnfds = 0;
|
||||
FD_ZERO (&greadfds);
|
||||
FD_ZERO (&gwritefds);
|
||||
FD_ZERO (&gexceptfds);
|
||||
timeout0.tv_sec = 0;
|
||||
timeout0.tv_usec = 0;
|
||||
#endif
|
||||
init_bc (0x80, 0, 0);
|
||||
#include "iselect.x"
|
||||
}
|
||||
|
|
Loading…
Add table
Add a link
Reference in a new issue