1
Fork 0
mirror of https://git.savannah.gnu.org/git/guile.git synced 2025-04-30 03:40:34 +02:00

New interfaces to help wait on fd/cond

* libguile/async.h:
* libguile/async.c (struct scm_thread_wake_data): Include the cond to
  signal.  Be a union and include a tag.
  (scm_i_prepare_to_wait): Rename from scm_i_setup_sleep and take wake
  data directly.  Also call scm_i_wait_finished as appropriate.
  (scm_i_wait_finished): Rename from scm_i_reset_sleep.
  (scm_i_prepare_to_wait_on_fd, scm_c_prepare_to_wait_on_fd):
  (scm_i_prepare_to_wait_on_cond, scm_c_prepare_to_wait_on_cond): New
  functions.
  (scm_c_wait_finished): New function.
  (scm_system_async_mark_for_thread): Adapt to wake data change.
* libguile/threads.c (block_self, scm_std_select): Adapt to async
  interface changes.
* doc/ref/api-scheduling.texi (Asyncs): Doc new public interfaces.
This commit is contained in:
Andy Wingo 2016-12-29 18:46:16 +01:00
parent 0ce8a9a5e0
commit a0656ad4cf
4 changed files with 142 additions and 56 deletions

View file

@ -306,6 +306,38 @@ one level. This function must be used inside a pair of calls to
Wind}). Wind}).
@end deftypefn @end deftypefn
Sometimes you want to interrupt a thread that might be waiting for
something to happen, for example on a file descriptor or a condition
variable. In that case you can inform Guile of how to interrupt that
wait using the following procedures:
@deftypefn {C Function} int scm_c_prepare_to_wait_on_fd (int fd)
Inform Guile that the current thread is about to sleep, and that if an
asynchronous interrupt is signalled on this thread, Guile should wake up
the thread by writing a zero byte to @var{fd}. Returns zero if the
prepare succeeded, or nonzero if the thread already has a pending async
and that it should avoid waiting.
@end deftypefn
@deftypefn {C Function} int scm_c_prepare_to_wait_on_cond (scm_i_pthread_mutex_t *mutex, scm_i_pthread_cond_t *cond)
Inform Guile that the current thread is about to sleep, and that if an
asynchronous interrupt is signalled on this thread, Guile should wake up
the thread by acquiring @var{mutex} and signalling @var{cond}. The
caller must already hold @var{mutex} and only drop it as part of the
@code{pthread_cond_wait} call. Returns zero if the prepare succeeded,
or nonzero if the thread already has a pending async and that it should
avoid waiting.
@end deftypefn
@deftypefn {C Function} void scm_c_wait_finished (void)
Inform Guile that the current thread has finished waiting, and that
asynchronous interrupts no longer need any special wakeup action; the
current thread will periodically poll its internal queue instead.
@end deftypefn
Guile's own interface to @code{sleep}, @code{wait-condition-variable},
@code{select}, and so on all call the above routines as appropriate.
Finally, note that threads can also be interrupted via POSIX signals. Finally, note that threads can also be interrupted via POSIX signals.
@xref{Signals}. As an implementation detail, signal handlers will @xref{Signals}. As an implementation detail, signal handlers will
effectively call @code{system-async-mark} in a signal-safe way, effectively call @code{system-async-mark} in a signal-safe way,

View file

@ -149,32 +149,83 @@ scm_async_tick (void)
} }
struct scm_thread_wake_data { struct scm_thread_wake_data {
scm_i_pthread_mutex_t *mutex; enum { WAIT_FD, WAIT_COND } kind;
union {
struct {
int fd; int fd;
} wait_fd;
struct {
scm_i_pthread_mutex_t *mutex;
scm_i_pthread_cond_t *cond;
} wait_cond;
} data;
}; };
int int
scm_i_setup_sleep (scm_i_thread *t, scm_i_prepare_to_wait (scm_i_thread *t,
scm_i_pthread_mutex_t *sleep_mutex, struct scm_thread_wake_data *wake)
int sleep_fd)
{ {
struct scm_thread_wake_data *wake;
wake = scm_gc_typed_calloc (struct scm_thread_wake_data);
wake->mutex = sleep_mutex;
wake->fd = sleep_fd;
scm_atomic_set_pointer ((void **)&t->wake, wake); scm_atomic_set_pointer ((void **)&t->wake, wake);
return !scm_is_null (scm_atomic_ref_scm (&t->pending_asyncs)); /* If no interrupt was registered in the meantime, then any future
wakeup will signal the FD or cond var. */
if (scm_is_null (scm_atomic_ref_scm (&t->pending_asyncs)))
return 0;
/* Otherwise clear the wake pointer and indicate that the caller
should handle interrupts directly. */
scm_i_wait_finished (t);
return 1;
} }
void void
scm_i_reset_sleep (scm_i_thread *t) scm_i_wait_finished (scm_i_thread *t)
{ {
scm_atomic_set_pointer ((void **)&t->wake, NULL); scm_atomic_set_pointer ((void **)&t->wake, NULL);
} }
int
scm_i_prepare_to_wait_on_fd (scm_i_thread *t, int fd)
{
struct scm_thread_wake_data *wake;
wake = scm_gc_typed_calloc (struct scm_thread_wake_data);
wake->kind = WAIT_FD;
wake->data.wait_fd.fd = fd;
return scm_i_prepare_to_wait (t, wake);
}
int
scm_c_prepare_to_wait_on_fd (int fd)
{
return scm_i_prepare_to_wait_on_fd (SCM_I_CURRENT_THREAD, fd);
}
int
scm_i_prepare_to_wait_on_cond (scm_i_thread *t,
scm_i_pthread_mutex_t *m,
scm_i_pthread_cond_t *c)
{
struct scm_thread_wake_data *wake;
wake = scm_gc_typed_calloc (struct scm_thread_wake_data);
wake->kind = WAIT_COND;
wake->data.wait_cond.mutex = m;
wake->data.wait_cond.cond = c;
return scm_i_prepare_to_wait (t, wake);
}
int
scm_c_prepare_to_wait_on_cond (scm_i_pthread_mutex_t *m,
scm_i_pthread_cond_t *c)
{
return scm_i_prepare_to_wait_on_cond (SCM_I_CURRENT_THREAD, m, c);
}
void
scm_c_wait_finished (void)
{
scm_i_wait_finished (SCM_I_CURRENT_THREAD);
}
SCM_DEFINE (scm_system_async_mark_for_thread, "system-async-mark", 1, 1, 0, SCM_DEFINE (scm_system_async_mark_for_thread, "system-async-mark", 1, 1, 0,
(SCM proc, SCM thread), (SCM proc, SCM thread),
"Mark @var{proc} (a procedure with zero arguments) for future execution\n" "Mark @var{proc} (a procedure with zero arguments) for future execution\n"
@ -210,19 +261,18 @@ SCM_DEFINE (scm_system_async_mark_for_thread, "system-async-mark", 1, 1, 0,
might even be in the next, unrelated sleep. Interrupting it might even be in the next, unrelated sleep. Interrupting it
anyway does no harm, however. anyway does no harm, however.
The important thing to prevent here is to signal sleep_cond The important thing to prevent here is to signal the cond
before T waits on it. This can not happen since T has before T waits on it. This can not happen since T has its
sleep_mutex locked while setting t->sleep_mutex and will only mutex locked while preparing the wait and will only unlock it
unlock it again while waiting on sleep_cond. again while waiting on the cond.
*/ */
if (wake->mutex) if (wake->kind == WAIT_COND)
{ {
scm_i_scm_pthread_mutex_lock (wake->mutex); scm_i_scm_pthread_mutex_lock (wake->data.wait_cond.mutex);
scm_i_pthread_cond_signal (&t->sleep_cond); scm_i_pthread_cond_signal (wake->data.wait_cond.cond);
scm_i_pthread_mutex_unlock (wake->mutex); scm_i_pthread_mutex_unlock (wake->data.wait_cond.mutex);
} }
else if (wake->kind == WAIT_FD)
if (wake->fd >= 0)
{ {
char dummy = 0; char dummy = 0;
@ -231,8 +281,10 @@ SCM_DEFINE (scm_system_async_mark_for_thread, "system-async-mark", 1, 1, 0,
not yet have started sleeping, but this is no problem not yet have started sleeping, but this is no problem
either since the data written to a pipe will not be lost, either since the data written to a pipe will not be lost,
unlike a condition variable signal. */ unlike a condition variable signal. */
full_write (wake->fd, &dummy, 1); full_write (wake->data.wait_fd.fd, &dummy, 1);
} }
else
abort ();
} }
return SCM_UNSPECIFIED; return SCM_UNSPECIFIED;

View file

@ -33,10 +33,12 @@ SCM_API void scm_async_tick (void);
SCM_API void scm_switch (void); SCM_API void scm_switch (void);
SCM_API SCM scm_system_async_mark (SCM a); SCM_API SCM scm_system_async_mark (SCM a);
SCM_API SCM scm_system_async_mark_for_thread (SCM a, SCM thread); SCM_API SCM scm_system_async_mark_for_thread (SCM a, SCM thread);
SCM_INTERNAL int scm_i_setup_sleep (scm_i_thread *,
scm_i_pthread_mutex_t *m, SCM_API int scm_c_prepare_to_wait_on_fd (int fd);
int fd); SCM_API int scm_c_prepare_to_wait_on_cond (scm_i_pthread_mutex_t *m,
SCM_INTERNAL void scm_i_reset_sleep (scm_i_thread *); scm_i_pthread_cond_t *c);
SCM_API void scm_c_wait_finished (void);
SCM_API SCM scm_noop (SCM args); SCM_API SCM scm_noop (SCM args);
SCM_API SCM scm_call_with_blocked_asyncs (SCM proc); SCM_API SCM scm_call_with_blocked_asyncs (SCM proc);
SCM_API SCM scm_call_with_unblocked_asyncs (SCM proc); SCM_API SCM scm_call_with_unblocked_asyncs (SCM proc);
@ -45,6 +47,14 @@ SCM_API void *scm_c_call_with_unblocked_asyncs (void *(*p) (void *d), void *d);
SCM_API void scm_dynwind_block_asyncs (void); SCM_API void scm_dynwind_block_asyncs (void);
SCM_API void scm_dynwind_unblock_asyncs (void); SCM_API void scm_dynwind_unblock_asyncs (void);
SCM_INTERNAL int scm_i_prepare_to_wait (scm_i_thread *,
struct scm_thread_wake_data *);
SCM_INTERNAL void scm_i_wait_finished (scm_i_thread *);
SCM_INTERNAL int scm_i_prepare_to_wait_on_fd (scm_i_thread *, int);
SCM_INTERNAL int scm_i_prepare_to_wait_on_cond (scm_i_thread *,
scm_i_pthread_mutex_t *,
scm_i_pthread_cond_t *);
SCM_INTERNAL void scm_i_async_push (scm_i_thread *t, SCM proc); SCM_INTERNAL void scm_i_async_push (scm_i_thread *t, SCM proc);
SCM_INTERNAL SCM scm_i_async_pop (scm_i_thread *t); SCM_INTERNAL SCM scm_i_async_pop (scm_i_thread *t);

View file

@ -307,13 +307,9 @@ block_self (SCM queue, scm_i_pthread_mutex_t *mutex,
SCM q_handle; SCM q_handle;
int err; int err;
if (scm_i_setup_sleep (t, mutex, -1)) if (scm_i_prepare_to_wait_on_cond (t, mutex, &t->sleep_cond))
{ return EINTR;
scm_i_reset_sleep (t);
err = EINTR;
}
else
{
t->block_asyncs++; t->block_asyncs++;
q_handle = enqueue (queue, t->handle); q_handle = enqueue (queue, t->handle);
if (waittime == NULL) if (waittime == NULL)
@ -328,8 +324,7 @@ block_self (SCM queue, scm_i_pthread_mutex_t *mutex,
if (remqueue (queue, q_handle) && err == 0) if (remqueue (queue, q_handle) && err == 0)
err = EINTR; err = EINTR;
t->block_asyncs--; t->block_asyncs--;
scm_i_reset_sleep (t); scm_i_wait_finished (t);
}
return err; return err;
} }
@ -1479,11 +1474,8 @@ scm_std_select (int nfds,
readfds = &my_readfds; readfds = &my_readfds;
} }
while (scm_i_setup_sleep (t, NULL, t->sleep_pipe[1])) while (scm_i_prepare_to_wait_on_fd (t, t->sleep_pipe[1]))
{
scm_i_reset_sleep (t);
SCM_TICK; SCM_TICK;
}
wakeup_fd = t->sleep_pipe[0]; wakeup_fd = t->sleep_pipe[0];
FD_SET (wakeup_fd, readfds); FD_SET (wakeup_fd, readfds);
@ -1502,7 +1494,7 @@ scm_std_select (int nfds,
res = args.result; res = args.result;
eno = args.errno_value; eno = args.errno_value;
scm_i_reset_sleep (t); scm_i_wait_finished (t);
if (res > 0 && FD_ISSET (wakeup_fd, readfds)) if (res > 0 && FD_ISSET (wakeup_fd, readfds))
{ {