From a0656ad4cf976b3845e9b9663a90b46b4cf9fc5a Mon Sep 17 00:00:00 2001 From: Andy Wingo Date: Thu, 29 Dec 2016 18:46:16 +0100 Subject: [PATCH] New interfaces to help wait on fd/cond * libguile/async.h: * libguile/async.c (struct scm_thread_wake_data): Include the cond to signal. Be a union and include a tag. (scm_i_prepare_to_wait): Rename from scm_i_setup_sleep and take wake data directly. Also call scm_i_wait_finished as appropriate. (scm_i_wait_finished): Rename from scm_i_reset_sleep. (scm_i_prepare_to_wait_on_fd, scm_c_prepare_to_wait_on_fd): (scm_i_prepare_to_wait_on_cond, scm_c_prepare_to_wait_on_cond): New functions. (scm_c_wait_finished): New function. (scm_system_async_mark_for_thread): Adapt to wake data change. * libguile/threads.c (block_self, scm_std_select): Adapt to async interface changes. * doc/ref/api-scheduling.texi (Asyncs): Doc new public interfaces. --- doc/ref/api-scheduling.texi | 32 ++++++++++++ libguile/async.c | 100 +++++++++++++++++++++++++++--------- libguile/async.h | 18 +++++-- libguile/threads.c | 48 ++++++++--------- 4 files changed, 142 insertions(+), 56 deletions(-) diff --git a/doc/ref/api-scheduling.texi b/doc/ref/api-scheduling.texi index 86a1ad27c..bf85a6411 100644 --- a/doc/ref/api-scheduling.texi +++ b/doc/ref/api-scheduling.texi @@ -306,6 +306,38 @@ one level. This function must be used inside a pair of calls to Wind}). @end deftypefn +Sometimes you want to interrupt a thread that might be waiting for +something to happen, for example on a file descriptor or a condition +variable. In that case you can inform Guile of how to interrupt that +wait using the following procedures: + +@deftypefn {C Function} int scm_c_prepare_to_wait_on_fd (int fd) +Inform Guile that the current thread is about to sleep, and that if an +asynchronous interrupt is signalled on this thread, Guile should wake up +the thread by writing a zero byte to @var{fd}. Returns zero if the +prepare succeeded, or nonzero if the thread already has a pending async +and that it should avoid waiting. +@end deftypefn + +@deftypefn {C Function} int scm_c_prepare_to_wait_on_cond (scm_i_pthread_mutex_t *mutex, scm_i_pthread_cond_t *cond) +Inform Guile that the current thread is about to sleep, and that if an +asynchronous interrupt is signalled on this thread, Guile should wake up +the thread by acquiring @var{mutex} and signalling @var{cond}. The +caller must already hold @var{mutex} and only drop it as part of the +@code{pthread_cond_wait} call. Returns zero if the prepare succeeded, +or nonzero if the thread already has a pending async and that it should +avoid waiting. +@end deftypefn + +@deftypefn {C Function} void scm_c_wait_finished (void) +Inform Guile that the current thread has finished waiting, and that +asynchronous interrupts no longer need any special wakeup action; the +current thread will periodically poll its internal queue instead. +@end deftypefn + +Guile's own interface to @code{sleep}, @code{wait-condition-variable}, +@code{select}, and so on all call the above routines as appropriate. + Finally, note that threads can also be interrupted via POSIX signals. @xref{Signals}. As an implementation detail, signal handlers will effectively call @code{system-async-mark} in a signal-safe way, diff --git a/libguile/async.c b/libguile/async.c index df8064107..7b3ccb850 100644 --- a/libguile/async.c +++ b/libguile/async.c @@ -149,32 +149,83 @@ scm_async_tick (void) } struct scm_thread_wake_data { - scm_i_pthread_mutex_t *mutex; - int fd; + enum { WAIT_FD, WAIT_COND } kind; + union { + struct { + int fd; + } wait_fd; + struct { + scm_i_pthread_mutex_t *mutex; + scm_i_pthread_cond_t *cond; + } wait_cond; + } data; }; int -scm_i_setup_sleep (scm_i_thread *t, - scm_i_pthread_mutex_t *sleep_mutex, - int sleep_fd) +scm_i_prepare_to_wait (scm_i_thread *t, + struct scm_thread_wake_data *wake) { - struct scm_thread_wake_data *wake; - - wake = scm_gc_typed_calloc (struct scm_thread_wake_data); - wake->mutex = sleep_mutex; - wake->fd = sleep_fd; - scm_atomic_set_pointer ((void **)&t->wake, wake); - return !scm_is_null (scm_atomic_ref_scm (&t->pending_asyncs)); + /* If no interrupt was registered in the meantime, then any future + wakeup will signal the FD or cond var. */ + if (scm_is_null (scm_atomic_ref_scm (&t->pending_asyncs))) + return 0; + + /* Otherwise clear the wake pointer and indicate that the caller + should handle interrupts directly. */ + scm_i_wait_finished (t); + return 1; } void -scm_i_reset_sleep (scm_i_thread *t) +scm_i_wait_finished (scm_i_thread *t) { scm_atomic_set_pointer ((void **)&t->wake, NULL); } +int +scm_i_prepare_to_wait_on_fd (scm_i_thread *t, int fd) +{ + struct scm_thread_wake_data *wake; + wake = scm_gc_typed_calloc (struct scm_thread_wake_data); + wake->kind = WAIT_FD; + wake->data.wait_fd.fd = fd; + return scm_i_prepare_to_wait (t, wake); +} + +int +scm_c_prepare_to_wait_on_fd (int fd) +{ + return scm_i_prepare_to_wait_on_fd (SCM_I_CURRENT_THREAD, fd); +} + +int +scm_i_prepare_to_wait_on_cond (scm_i_thread *t, + scm_i_pthread_mutex_t *m, + scm_i_pthread_cond_t *c) +{ + struct scm_thread_wake_data *wake; + wake = scm_gc_typed_calloc (struct scm_thread_wake_data); + wake->kind = WAIT_COND; + wake->data.wait_cond.mutex = m; + wake->data.wait_cond.cond = c; + return scm_i_prepare_to_wait (t, wake); +} + +int +scm_c_prepare_to_wait_on_cond (scm_i_pthread_mutex_t *m, + scm_i_pthread_cond_t *c) +{ + return scm_i_prepare_to_wait_on_cond (SCM_I_CURRENT_THREAD, m, c); +} + +void +scm_c_wait_finished (void) +{ + scm_i_wait_finished (SCM_I_CURRENT_THREAD); +} + SCM_DEFINE (scm_system_async_mark_for_thread, "system-async-mark", 1, 1, 0, (SCM proc, SCM thread), "Mark @var{proc} (a procedure with zero arguments) for future execution\n" @@ -210,19 +261,18 @@ SCM_DEFINE (scm_system_async_mark_for_thread, "system-async-mark", 1, 1, 0, might even be in the next, unrelated sleep. Interrupting it anyway does no harm, however. - The important thing to prevent here is to signal sleep_cond - before T waits on it. This can not happen since T has - sleep_mutex locked while setting t->sleep_mutex and will only - unlock it again while waiting on sleep_cond. + The important thing to prevent here is to signal the cond + before T waits on it. This can not happen since T has its + mutex locked while preparing the wait and will only unlock it + again while waiting on the cond. */ - if (wake->mutex) + if (wake->kind == WAIT_COND) { - scm_i_scm_pthread_mutex_lock (wake->mutex); - scm_i_pthread_cond_signal (&t->sleep_cond); - scm_i_pthread_mutex_unlock (wake->mutex); + scm_i_scm_pthread_mutex_lock (wake->data.wait_cond.mutex); + scm_i_pthread_cond_signal (wake->data.wait_cond.cond); + scm_i_pthread_mutex_unlock (wake->data.wait_cond.mutex); } - - if (wake->fd >= 0) + else if (wake->kind == WAIT_FD) { char dummy = 0; @@ -231,8 +281,10 @@ SCM_DEFINE (scm_system_async_mark_for_thread, "system-async-mark", 1, 1, 0, not yet have started sleeping, but this is no problem either since the data written to a pipe will not be lost, unlike a condition variable signal. */ - full_write (wake->fd, &dummy, 1); + full_write (wake->data.wait_fd.fd, &dummy, 1); } + else + abort (); } return SCM_UNSPECIFIED; diff --git a/libguile/async.h b/libguile/async.h index c6d7202aa..2bca16df9 100644 --- a/libguile/async.h +++ b/libguile/async.h @@ -33,10 +33,12 @@ SCM_API void scm_async_tick (void); SCM_API void scm_switch (void); SCM_API SCM scm_system_async_mark (SCM a); SCM_API SCM scm_system_async_mark_for_thread (SCM a, SCM thread); -SCM_INTERNAL int scm_i_setup_sleep (scm_i_thread *, - scm_i_pthread_mutex_t *m, - int fd); -SCM_INTERNAL void scm_i_reset_sleep (scm_i_thread *); + +SCM_API int scm_c_prepare_to_wait_on_fd (int fd); +SCM_API int scm_c_prepare_to_wait_on_cond (scm_i_pthread_mutex_t *m, + scm_i_pthread_cond_t *c); +SCM_API void scm_c_wait_finished (void); + SCM_API SCM scm_noop (SCM args); SCM_API SCM scm_call_with_blocked_asyncs (SCM proc); SCM_API SCM scm_call_with_unblocked_asyncs (SCM proc); @@ -45,6 +47,14 @@ SCM_API void *scm_c_call_with_unblocked_asyncs (void *(*p) (void *d), void *d); SCM_API void scm_dynwind_block_asyncs (void); SCM_API void scm_dynwind_unblock_asyncs (void); +SCM_INTERNAL int scm_i_prepare_to_wait (scm_i_thread *, + struct scm_thread_wake_data *); +SCM_INTERNAL void scm_i_wait_finished (scm_i_thread *); +SCM_INTERNAL int scm_i_prepare_to_wait_on_fd (scm_i_thread *, int); +SCM_INTERNAL int scm_i_prepare_to_wait_on_cond (scm_i_thread *, + scm_i_pthread_mutex_t *, + scm_i_pthread_cond_t *); + SCM_INTERNAL void scm_i_async_push (scm_i_thread *t, SCM proc); SCM_INTERNAL SCM scm_i_async_pop (scm_i_thread *t); diff --git a/libguile/threads.c b/libguile/threads.c index 91b18b43a..48a91e84f 100644 --- a/libguile/threads.c +++ b/libguile/threads.c @@ -307,29 +307,24 @@ block_self (SCM queue, scm_i_pthread_mutex_t *mutex, SCM q_handle; int err; - if (scm_i_setup_sleep (t, mutex, -1)) - { - scm_i_reset_sleep (t); - err = EINTR; - } - else - { - t->block_asyncs++; - q_handle = enqueue (queue, t->handle); - if (waittime == NULL) - err = scm_i_scm_pthread_cond_wait (&t->sleep_cond, mutex); - else - err = scm_i_scm_pthread_cond_timedwait (&t->sleep_cond, mutex, waittime); + if (scm_i_prepare_to_wait_on_cond (t, mutex, &t->sleep_cond)) + return EINTR; - /* When we are still on QUEUE, we have been interrupted. We - report this only when no other error (such as a timeout) has - happened above. - */ - if (remqueue (queue, q_handle) && err == 0) - err = EINTR; - t->block_asyncs--; - scm_i_reset_sleep (t); - } + t->block_asyncs++; + q_handle = enqueue (queue, t->handle); + if (waittime == NULL) + err = scm_i_scm_pthread_cond_wait (&t->sleep_cond, mutex); + else + err = scm_i_scm_pthread_cond_timedwait (&t->sleep_cond, mutex, waittime); + + /* When we are still on QUEUE, we have been interrupted. We + report this only when no other error (such as a timeout) has + happened above. + */ + if (remqueue (queue, q_handle) && err == 0) + err = EINTR; + t->block_asyncs--; + scm_i_wait_finished (t); return err; } @@ -1479,11 +1474,8 @@ scm_std_select (int nfds, readfds = &my_readfds; } - while (scm_i_setup_sleep (t, NULL, t->sleep_pipe[1])) - { - scm_i_reset_sleep (t); - SCM_TICK; - } + while (scm_i_prepare_to_wait_on_fd (t, t->sleep_pipe[1])) + SCM_TICK; wakeup_fd = t->sleep_pipe[0]; FD_SET (wakeup_fd, readfds); @@ -1502,7 +1494,7 @@ scm_std_select (int nfds, res = args.result; eno = args.errno_value; - scm_i_reset_sleep (t); + scm_i_wait_finished (t); if (res > 0 && FD_ISSET (wakeup_fd, readfds)) {