1
Fork 0
mirror of https://git.savannah.gnu.org/git/guile.git synced 2025-05-20 03:30:27 +02:00

Use atomics for async interrupts

* libguile/__scm.h (SCM_TICK): Always define as scm_async_tick().
* libguile/error.c (scm_syserror, scm_syserror_msg):
* libguile/fports.c (fport_read, fport_write):
* libguile/_scm.h (SCM_SYSCALL): Replace SCM_ASYNC_TICK with
  scm_async_tick ().
  (SCM_ASYNC_TICK, SCM_ASYNC_TICK_WITH_CODE)
  (SCM_ASYNC_TICK_WITH_GUARD_CODE): Remove internal definitions.  We
  inline into vm-engine.c, the only place where it matters.
* libguile/async.h:
* libguile/async.c (scm_async_tick, scm_i_setup_sleep):
  (scm_i_reset_sleep, scm_system_async_mark_for_thread):
* libguile/threads.h (struct scm_thread_wake_data):
* libguile/threads.h (scm_i_thread):
* libguile/threads.c (block_self, guilify_self_1, scm_std_select):
  Rewrite to use sequentially-consistent atomic references.
* libguile/atomics-internal.h (scm_atomic_set_pointer):
  (scm_atomic_ref_pointer): New definitions.
* libguile/finalizers.c (queue_finalizer_async): We can allocate, so
  just use scm_system_async_mark_for_thread instead of the set-cdr!
  shenanigans.
* libguile/scmsigs.c (take_signal):
* libguile/gc.c (queue_after_gc_hook): Adapt to new asyncs mechanism.
  Can't allocate but we're just manipulating the current thread when no
  other threads are running so we should be good.
* libguile/vm-engine.c (VM_HANDLE_INTERRUPTS): Inline the async_tick
  business.
This commit is contained in:
Andy Wingo 2016-10-26 22:32:51 +02:00
parent f3bfe29235
commit c957ec7ab0
14 changed files with 152 additions and 192 deletions

View file

@ -474,11 +474,7 @@ typedef long SCM_STACKITEM;
#define SCM_STACK_PTR(ptr) ((SCM_STACKITEM *) (void *) (ptr))
#ifdef BUILDING_LIBGUILE
#define SCM_TICK SCM_ASYNC_TICK
#else
#define SCM_TICK scm_async_tick ()
#endif

View file

@ -100,7 +100,7 @@
errno = 0; \
line; \
if (EVMSERR == errno && (vaxc$errno>>3)==(SS$_CONTROLC>>3)) \
SCM_ASYNC_TICK; \
scm_async_tick (); \
else \
break; \
} \
@ -119,7 +119,7 @@
line; \
if (errno == EINTR) \
{ \
SCM_ASYNC_TICK; \
scm_async_tick (); \
errno = EINTR; \
} \
} \
@ -223,26 +223,6 @@ void scm_ia64_longjmp (scm_i_jmp_buf *, int);
#define SCM_I_LONGJMP longjmp
#endif
#define SCM_ASYNC_TICK_WITH_GUARD_CODE(thr, pre, post) \
do \
{ \
if (SCM_UNLIKELY (thr->pending_asyncs)) \
{ \
pre; \
scm_async_tick (); \
post; \
} \
} \
while (0)
#define SCM_ASYNC_TICK_WITH_CODE(thr, stmt) \
SCM_ASYNC_TICK_WITH_GUARD_CODE (thr, stmt, (void) 0)
#define SCM_ASYNC_TICK \
SCM_ASYNC_TICK_WITH_CODE (SCM_I_CURRENT_THREAD, (void) 0)
#if (defined __GNUC__)

View file

@ -24,6 +24,7 @@
#endif
#include "libguile/_scm.h"
#include "libguile/atomics-internal.h"
#include "libguile/eval.h"
#include "libguile/throw.h"
#include "libguile/root.h"
@ -50,142 +51,51 @@
*
* Each thread has a list of 'activated asyncs', which is a normal
* Scheme list of procedures with zero arguments. When a thread
* executes a SCM_ASYNC_TICK statement (which is included in SCM_TICK),
* it will call all procedures on this list.
* executes an scm_async_tick (), it will call all procedures on this
* list.
*/
static scm_i_pthread_mutex_t async_mutex = SCM_I_PTHREAD_MUTEX_INITIALIZER;
/* System asyncs. */
void
scm_async_tick (void)
{
scm_i_thread *t = SCM_I_CURRENT_THREAD;
SCM asyncs;
/* Reset pending_asyncs even when asyncs are blocked and not really
executed since this will avoid future futile calls to this
function. When asyncs are unblocked again, this function is
invoked even when pending_asyncs is zero.
*/
if (t->block_asyncs)
return;
scm_i_scm_pthread_mutex_lock (&async_mutex);
t->pending_asyncs = 0;
if (t->block_asyncs == 0)
asyncs = scm_atomic_swap_scm (&t->pending_asyncs, SCM_EOL);
while (!scm_is_null (asyncs))
{
asyncs = t->active_asyncs;
t->active_asyncs = SCM_EOL;
}
else
asyncs = SCM_EOL;
scm_i_pthread_mutex_unlock (&async_mutex);
while (scm_is_pair (asyncs))
{
SCM next = SCM_CDR (asyncs);
SCM_SETCDR (asyncs, SCM_BOOL_F);
scm_call_0 (SCM_CAR (asyncs));
SCM next = scm_cdr (asyncs);
scm_call_0 (scm_car (asyncs));
scm_set_cdr_x (asyncs, SCM_BOOL_F);
asyncs = next;
}
}
void
scm_i_queue_async_cell (SCM c, scm_i_thread *t)
{
SCM sleep_object;
scm_i_pthread_mutex_t *sleep_mutex;
int sleep_fd;
SCM p;
scm_i_scm_pthread_mutex_lock (&async_mutex);
p = t->active_asyncs;
SCM_SETCDR (c, SCM_EOL);
if (!scm_is_pair (p))
t->active_asyncs = c;
else
{
SCM pp;
while (scm_is_pair (pp = SCM_CDR (p)))
{
if (scm_is_eq (SCM_CAR (p), SCM_CAR (c)))
{
scm_i_pthread_mutex_unlock (&async_mutex);
return;
}
p = pp;
}
SCM_SETCDR (p, c);
}
t->pending_asyncs = 1;
sleep_object = t->sleep_object;
sleep_mutex = t->sleep_mutex;
sleep_fd = t->sleep_fd;
scm_i_pthread_mutex_unlock (&async_mutex);
if (sleep_mutex)
{
/* By now, the thread T might be out of its sleep already, or
might even be in the next, unrelated sleep. Interrupting it
anyway does no harm, however.
The important thing to prevent here is to signal sleep_cond
before T waits on it. This can not happen since T has
sleep_mutex locked while setting t->sleep_mutex and will only
unlock it again while waiting on sleep_cond.
*/
scm_i_scm_pthread_mutex_lock (sleep_mutex);
scm_i_pthread_cond_signal (&t->sleep_cond);
scm_i_pthread_mutex_unlock (sleep_mutex);
}
if (sleep_fd >= 0)
{
char dummy = 0;
/* Likewise, T might already been done with sleeping here, but
interrupting it once too often does no harm. T might also
not yet have started sleeping, but this is no problem either
since the data written to a pipe will not be lost, unlike a
condition variable signal. */
full_write (sleep_fd, &dummy, 1);
}
/* This is needed to protect sleep_mutex.
*/
scm_remember_upto_here_1 (sleep_object);
}
int
scm_i_setup_sleep (scm_i_thread *t,
SCM sleep_object, scm_i_pthread_mutex_t *sleep_mutex,
int sleep_fd)
{
int pending;
struct scm_thread_wake_data *wake;
scm_i_scm_pthread_mutex_lock (&async_mutex);
pending = t->pending_asyncs;
if (!pending)
{
t->sleep_object = sleep_object;
t->sleep_mutex = sleep_mutex;
t->sleep_fd = sleep_fd;
}
scm_i_pthread_mutex_unlock (&async_mutex);
return pending;
wake = scm_gc_typed_calloc (struct scm_thread_wake_data);
wake->object = sleep_object;
wake->mutex = sleep_mutex;
wake->fd = sleep_fd;
scm_atomic_set_pointer ((void **)&t->wake, wake);
return !scm_is_null (scm_atomic_ref_scm (&t->pending_asyncs));
}
void
scm_i_reset_sleep (scm_i_thread *t)
{
scm_i_scm_pthread_mutex_lock (&async_mutex);
t->sleep_object = SCM_BOOL_F;
t->sleep_mutex = NULL;
t->sleep_fd = -1;
scm_i_pthread_mutex_unlock (&async_mutex);
scm_atomic_set_pointer ((void **)&t->wake, NULL);
}
SCM_DEFINE (scm_system_async_mark_for_thread, "system-async-mark", 1, 1, 0,
@ -200,13 +110,9 @@ SCM_DEFINE (scm_system_async_mark_for_thread, "system-async-mark", 1, 1, 0,
"signal handlers.")
#define FUNC_NAME s_scm_system_async_mark_for_thread
{
/* The current thread might not have a handle yet. This can happen
when the GC runs immediately before allocating the handle. At
the end of that GC, a system async might be marked. Thus, we can
not use scm_current_thread here.
*/
scm_i_thread *t;
SCM asyncs;
struct scm_thread_wake_data *wake;
if (SCM_UNBNDP (thread))
t = SCM_I_CURRENT_THREAD;
@ -217,7 +123,48 @@ SCM_DEFINE (scm_system_async_mark_for_thread, "system-async-mark", 1, 1, 0,
SCM_MISC_ERROR ("thread has already exited", SCM_EOL);
t = SCM_I_THREAD_DATA (thread);
}
scm_i_queue_async_cell (scm_cons (proc, SCM_BOOL_F), t);
asyncs = scm_atomic_ref_scm (&t->pending_asyncs);
do
if (scm_is_true (scm_c_memq (proc, asyncs)))
return SCM_UNSPECIFIED;
while (!scm_atomic_compare_and_swap_scm (&t->pending_asyncs, &asyncs,
scm_cons (proc, asyncs)));
/* At this point the async is enqueued. However if the thread is
sleeping, we have to wake it up. */
if ((wake = scm_atomic_ref_pointer ((void **) &t->wake)))
{
/* By now, the thread T might be out of its sleep already, or
might even be in the next, unrelated sleep. Interrupting it
anyway does no harm, however.
The important thing to prevent here is to signal sleep_cond
before T waits on it. This can not happen since T has
sleep_mutex locked while setting t->sleep_mutex and will only
unlock it again while waiting on sleep_cond.
*/
scm_i_scm_pthread_mutex_lock (wake->mutex);
scm_i_pthread_cond_signal (&t->sleep_cond);
scm_i_pthread_mutex_unlock (wake->mutex);
/* This is needed to protect wake->mutex.
*/
scm_remember_upto_here_1 (wake->object);
if (wake->fd >= 0)
{
char dummy = 0;
/* Likewise, T might already been done with sleeping here, but
interrupting it once too often does no harm. T might also
not yet have started sleeping, but this is no problem
either since the data written to a pipe will not be lost,
unlike a condition variable signal. */
full_write (wake->fd, &dummy, 1);
}
}
return SCM_UNSPECIFIED;
}
#undef FUNC_NAME

View file

@ -34,7 +34,6 @@ SCM_API void scm_async_tick (void);
SCM_API void scm_switch (void);
SCM_API SCM scm_system_async_mark (SCM a);
SCM_API SCM scm_system_async_mark_for_thread (SCM a, SCM thread);
SCM_INTERNAL void scm_i_queue_async_cell (SCM cell, scm_i_thread *);
SCM_INTERNAL int scm_i_setup_sleep (scm_i_thread *,
SCM obj, scm_i_pthread_mutex_t *m,
int fd);

View file

@ -45,6 +45,16 @@ scm_atomic_compare_and_swap_uint32 (uint32_t *loc, uint32_t *expected,
return atomic_compare_exchange_weak (loc, expected, desired);
}
static inline void
scm_atomic_set_pointer (void **loc, void *val)
{
atomic_store (loc, val);
}
static inline void *
scm_atomic_ref_pointer (void **loc)
{
return atomic_load (loc);
}
static inline void
scm_atomic_set_scm (SCM *loc, SCM val)
{
atomic_store (loc, val);
@ -99,6 +109,23 @@ scm_atomic_compare_and_swap_uint32 (uint32_t *loc, uint32_t *expected,
return ret;
}
static inline void
scm_atomic_set_pointer (void **loc, void *val)
{
scm_i_pthread_mutex_lock (&atomics_lock);
*loc = val;
scm_i_pthread_mutex_unlock (&atomics_lock);
}
static inline void *
scm_atomic_ref_pointer (void **loc)
{
void *ret;
scm_i_pthread_mutex_lock (&atomics_lock);
ret = *loc;
scm_i_pthread_mutex_unlock (&atomics_lock);
return ret;
}
static inline void
scm_atomic_set_scm (SCM *loc, SCM val)
{

View file

@ -163,7 +163,7 @@ scm_syserror (const char *subr)
*/
#ifdef EINTR
if (scm_to_int (err) == EINTR)
SCM_ASYNC_TICK;
scm_async_tick ();
#endif
scm_error (scm_system_error_key,
@ -179,7 +179,7 @@ scm_syserror_msg (const char *subr, const char *message, SCM args, int eno)
/* See above note about the EINTR signal handling race. */
#ifdef EINTR
if (eno == EINTR)
SCM_ASYNC_TICK;
scm_async_tick ();
#endif
scm_error (scm_system_error_key,
subr,

View file

@ -40,6 +40,8 @@ static int automatic_finalization_p = 1;
static size_t finalization_count;
static SCM run_finalizers_subr;
@ -132,8 +134,6 @@ scm_i_add_finalizer (void *obj, scm_t_finalizer_proc proc, void *data)
static SCM finalizer_async_cell;
static SCM
run_finalizers_async_thunk (void)
{
@ -150,19 +150,13 @@ static void
queue_finalizer_async (void)
{
scm_i_thread *t = SCM_I_CURRENT_THREAD;
static scm_i_pthread_mutex_t lock = SCM_I_PTHREAD_MUTEX_INITIALIZER;
scm_i_pthread_mutex_lock (&lock);
/* If t is NULL, that could be because we're allocating in
threads.c:guilify_self_1. In that case, rely on the
/* Could be that the current thread is is NULL when we're allocating
in threads.c:guilify_self_1. In that case, rely on the
GC_invoke_finalizers call there after the thread spins up. */
if (t && scm_is_false (SCM_CDR (finalizer_async_cell)))
{
SCM_SETCDR (finalizer_async_cell, t->active_asyncs);
t->active_asyncs = finalizer_async_cell;
t->pending_asyncs = 1;
}
scm_i_pthread_mutex_unlock (&lock);
if (!t) return;
scm_system_async_mark_for_thread (run_finalizers_subr, t->handle);
}
@ -418,10 +412,8 @@ scm_init_finalizers (void)
{
/* When the async is to run, the cdr of the pair gets set to the
asyncs queue of the current thread. */
finalizer_async_cell =
scm_cons (scm_c_make_gsubr ("%run-finalizers", 0, 0, 0,
run_finalizers_async_thunk),
SCM_BOOL_F);
run_finalizers_subr = scm_c_make_gsubr ("%run-finalizers", 0, 0, 0,
run_finalizers_async_thunk);
if (automatic_finalization_p)
GC_set_finalizer_notifier (queue_finalizer_async);

View file

@ -595,7 +595,7 @@ fport_read (SCM port, SCM dst, size_t start, size_t count)
{
if (errno == EINTR)
{
SCM_ASYNC_TICK;
scm_async_tick ();
goto retry;
}
if (errno == EWOULDBLOCK || errno == EAGAIN)
@ -618,7 +618,7 @@ fport_write (SCM port, SCM src, size_t start, size_t count)
{
if (errno == EINTR)
{
SCM_ASYNC_TICK;
scm_async_tick ();
goto retry;
}
if (errno == EWOULDBLOCK || errno == EAGAIN)

View file

@ -685,8 +685,8 @@ after_gc_async_thunk (void)
*/
static void *
queue_after_gc_hook (void * hook_data SCM_UNUSED,
void *fn_data SCM_UNUSED,
void *data SCM_UNUSED)
void *fn_data SCM_UNUSED,
void *data SCM_UNUSED)
{
/* If cell access debugging is enabled, the user may choose to perform
* additional garbage collections after an arbitrary number of cell
@ -721,9 +721,8 @@ queue_after_gc_hook (void * hook_data SCM_UNUSED,
if (scm_is_false (SCM_CDR (after_gc_async_cell)))
{
SCM_SETCDR (after_gc_async_cell, t->active_asyncs);
t->active_asyncs = after_gc_async_cell;
t->pending_asyncs = 1;
SCM_SETCDR (after_gc_async_cell, t->pending_asyncs);
t->pending_asyncs = after_gc_async_cell;
}
}

View file

@ -344,7 +344,7 @@ invoke_main_func (void *body_data)
* asyncs a chance to run. This must be done after
* the call to scm_restore_signals.
*/
SCM_ASYNC_TICK;
scm_async_tick ();
/* Indicate success by returning non-NULL.
*/

View file

@ -228,9 +228,8 @@ take_signal (int signum)
if (scm_is_false (SCM_CDR (cell)))
{
SCM_SETCDR (cell, t->active_asyncs);
t->active_asyncs = cell;
t->pending_asyncs = 1;
SCM_SETCDR (cell, t->pending_asyncs);
t->pending_asyncs = cell;
}
#ifndef HAVE_SIGACTION

View file

@ -275,7 +275,7 @@ thread_print (SCM exp, SCM port, scm_print_state *pstate SCM_UNUSED)
/*** Blocking on queues. */
/* See also scm_i_queue_async_cell for how such a block is
/* See also scm_system_async_mark_for_thread for how such a block is
interrputed.
*/
@ -309,7 +309,10 @@ block_self (SCM queue, SCM sleep_object, scm_i_pthread_mutex_t *mutex,
int err;
if (scm_i_setup_sleep (t, sleep_object, mutex, -1))
err = EINTR;
{
scm_i_reset_sleep (t);
err = EINTR;
}
else
{
t->block_asyncs++;
@ -415,9 +418,8 @@ guilify_self_1 (struct GC_stack_base *base)
t.dynstack.base = NULL;
t.dynstack.top = NULL;
t.dynstack.limit = NULL;
t.active_asyncs = SCM_EOL;
t.pending_asyncs = SCM_EOL;
t.block_asyncs = 1;
t.pending_asyncs = 1;
t.critical_section_level = 0;
t.base = base->mem_base;
#ifdef __ia64__
@ -426,9 +428,7 @@ guilify_self_1 (struct GC_stack_base *base)
t.continuation_root = SCM_EOL;
t.continuation_base = t.base;
scm_i_pthread_cond_init (&t.sleep_cond, NULL);
t.sleep_mutex = NULL;
t.sleep_object = SCM_BOOL_F;
t.sleep_fd = -1;
t.wake = NULL;
t.vp = NULL;
if (pipe2 (t.sleep_pipe, O_CLOEXEC) != 0)
@ -1776,7 +1776,10 @@ scm_std_select (int nfds,
}
while (scm_i_setup_sleep (t, SCM_BOOL_F, NULL, t->sleep_pipe[1]))
SCM_TICK;
{
scm_i_reset_sleep (t);
SCM_TICK;
}
wakeup_fd = t->sleep_pipe[0];
FD_SET (wakeup_fd, readfds);
@ -1795,7 +1798,6 @@ scm_std_select (int nfds,
res = args.result;
eno = args.errno_value;
t->sleep_fd = -1;
scm_i_reset_sleep (t);
if (res > 0 && FD_ISSET (wakeup_fd, readfds))

View file

@ -47,6 +47,13 @@ SCM_API scm_t_bits scm_tc16_thread;
SCM_API scm_t_bits scm_tc16_mutex;
SCM_API scm_t_bits scm_tc16_condvar;
struct scm_thread_wake_data
{
SCM object;
scm_i_pthread_mutex_t *mutex;
int fd;
};
typedef struct scm_i_thread {
struct scm_i_thread *next_thread;
@ -67,10 +74,9 @@ typedef struct scm_i_thread {
/* Boolean indicating whether the thread is in guile mode. */
int guile_mode;
SCM sleep_object;
scm_i_pthread_mutex_t *sleep_mutex;
struct scm_thread_wake_data *wake;
scm_i_pthread_cond_t sleep_cond;
int sleep_fd, sleep_pipe[2];
int sleep_pipe[2];
/* Thread-local freelists; see gc-inline.h. */
void **freelists;
@ -85,12 +91,10 @@ typedef struct scm_i_thread {
/* For system asyncs.
*/
SCM active_asyncs; /* The thunks to be run at the next
safe point */
SCM pending_asyncs; /* The thunks to be run at the next
safe point. Accessed atomically. */
unsigned int block_asyncs; /* Non-zero means that asyncs should
not be run. */
unsigned int pending_asyncs; /* Non-zero means that asyncs might be pending.
*/
/* The current continuation root and the stack base for it.

View file

@ -127,10 +127,25 @@
#define ABORT_CONTINUATION_HOOK() \
RUN_HOOK0 (abort)
#define VM_HANDLE_INTERRUPTS \
SCM_ASYNC_TICK_WITH_GUARD_CODE (thread, SYNC_IP (), CACHE_SP ())
/* TODO: Invoke asyncs without trampolining out to C. That will let us
preempt computations via an asynchronous interrupt. */
#define VM_HANDLE_INTERRUPTS \
do \
if (SCM_LIKELY (thread->block_asyncs == 0)) \
{ \
SCM asyncs = scm_atomic_ref_scm (&thread->pending_asyncs); \
if (SCM_UNLIKELY (!scm_is_null (asyncs))) \
{ \
SYNC_IP (); \
scm_async_tick (); \
CACHE_SP (); \
} \
} \
while (0)
/* Virtual Machine
The VM has three state bits: the instruction pointer (IP), the frame