1
Fork 0
mirror of https://git.savannah.gnu.org/git/guile.git synced 2025-07-01 07:20:20 +02:00
guile/libguile/async.c
Andy Wingo b0ce014801 Inline thread wakeup data into "struct scm_thread"
This way we don't allocate an untagged wake data, and we don't need a
type tag.  On the other hand we have to roll a more complicated seqlock,
but that's fine.

Also switch to require C11 atomics.

* libguile/atomics-internal.h: Remove fallback for when we don't have
C11 atomics.
(scm_atomic_ref_uint32, scm_atomic_swap_uint32, scm_atomic_set_uint32):
New helpers.
* libguile/threads-internal.h:
* libguile/async.h:
* libguile/async.c: Inline the thread wake data.  Happily, waking a
remote thread is still wait-free from both sides.
2025-06-25 16:00:07 +02:00

525 lines
14 KiB
C
Raw Blame History

This file contains invisible Unicode characters

This file contains invisible Unicode characters that are indistinguishable to humans but may be processed differently by a computer. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.

/* Copyright 1995-1998,2000-2002,2004,2006,2008-2011,2014,2018-2019,2025
Free Software Foundation, Inc.
This file is part of Guile.
Guile is free software: you can redistribute it and/or modify it
under the terms of the GNU Lesser General Public License as published
by the Free Software Foundation, either version 3 of the License, or
(at your option) any later version.
Guile is distributed in the hope that it will be useful, but WITHOUT
ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or
FITNESS FOR A PARTICULAR PURPOSE. See the GNU Lesser General Public
License for more details.
You should have received a copy of the GNU Lesser General Public
License along with Guile. If not, see
<https://www.gnu.org/licenses/>. */
#ifdef HAVE_CONFIG_H
# include <config.h>
#endif
#include <full-write.h>
#include <stdatomic.h>
#include <string.h>
#include <unistd.h>
#include "atomics-internal.h"
#include "deprecation.h"
#include "dynwind.h"
#include "eval.h"
#include "gsubr.h"
#include "list.h"
#include "pairs.h"
#include "throw.h"
#include "threads-internal.h"
#include "async.h"
/* {Asynchronous Events}
*
* Asyncs are used to run arbitrary code at the next safe point in a
* specified thread. You can use them to trigger execution of Scheme
* code from signal handlers or to interrupt a thread, for example.
*
* Each thread has a list of 'activated asyncs', which is a normal
* Scheme list of procedures with zero arguments. When a thread
* executes an scm_async_tick (), it will call all procedures on this
* list in the order they were added to the list.
*/
void
scm_i_async_push (scm_thread *t, SCM proc)
{
SCM asyncs;
/* The usual algorithm you'd use for atomics with GC would be
something like:
repeat
l = get(asyncs);
until swap(l, cons(proc, l))
But this is a LIFO list of asyncs, and that's not so great. To
make it FIFO, you'd do:
repeat
l = get(asyncs);
until swap(l, append(l, list(proc)))
However, some parts of Guile need to add entries to the async list
from a context in which allocation is unsafe, for example right
before GC or from a signal handler. They do that by pre-allocating
a pair, then when the interrupt fires the code does a setcdr of
that pair to the t->pending_asyncs and atomically updates
t->pending_asyncs. So the append strategy doesn't work.
Instead to preserve the FIFO behavior we atomically cut off the
tail of the asyncs every time we want to run an interrupt, then
disable that newly-severed tail by setting its cdr to #f. Not so
nice, but oh well. */
asyncs = scm_atomic_ref_scm (&t->pending_asyncs);
while (1)
{
/* Traverse the asyncs list atomically. */
SCM walk;
for (walk = asyncs;
scm_is_pair (walk);
walk = scm_atomic_ref_scm (SCM_CDRLOC (walk)))
if (scm_is_eq (SCM_CAR (walk), proc))
return;
SCM expected = asyncs;
asyncs = scm_atomic_compare_and_swap_scm
(&t->pending_asyncs, asyncs, scm_cons (proc, asyncs));
if (scm_is_eq (asyncs, expected))
return;
}
}
/* Precondition: there are pending asyncs. */
SCM
scm_i_async_pop (scm_thread *t)
{
while (1)
{
SCM asyncs, last_pair, penultimate_pair;
last_pair = asyncs = scm_atomic_ref_scm (&t->pending_asyncs);
penultimate_pair = SCM_BOOL_F;
/* Since we are the only writer to cdrs of pairs in ASYNCS, and these
pairs were given to us after an atomic update to t->pending_asyncs,
no need to use atomic ops to traverse the list. */
while (scm_is_pair (SCM_CDR (last_pair)))
{
penultimate_pair = last_pair;
last_pair = SCM_CDR (last_pair);
}
/* Sever the tail. */
if (scm_is_false (penultimate_pair))
{
if (!scm_is_eq (asyncs,
scm_atomic_compare_and_swap_scm (&t->pending_asyncs,
asyncs, SCM_EOL)))
continue;
}
else
scm_atomic_set_scm (SCM_CDRLOC (penultimate_pair), SCM_EOL);
/* Disable it. */
scm_atomic_set_scm (SCM_CDRLOC (last_pair), SCM_BOOL_F);
return SCM_CAR (last_pair);
}
}
void
scm_async_tick (void)
{
scm_thread *t = SCM_I_CURRENT_THREAD;
if (t->block_asyncs)
return;
while (!scm_is_null (scm_atomic_ref_scm (&t->pending_asyncs)))
scm_call_0 (scm_i_async_pop (t));
}
static const int NOT_WAITING = 0;
static const int WAITING_ON_FD = 1;
static const int WAITING_ON_COND = 2;
static inline void
publish_wake_data_for_thread (scm_thread *t, struct scm_thread_wake_data data)
{
/* Hand-rolled seqlock over the wait state. The thread itself is the
only writer of the wait state, and foreign threads are the only
readers. */
uint32_t seq = scm_atomic_swap_uint32 (&t->wake_data.seq, 1);
/* Counter must be even before we prepare to wait. */
if (seq & 1) abort ();
switch (data.state)
{
case NOT_WAITING:
atomic_store_explicit ((atomic_int *) &t->wake_data.state,
NOT_WAITING, memory_order_relaxed);
break;
case WAITING_ON_FD:
atomic_store_explicit ((atomic_int *) &t->wake_data.state,
WAITING_ON_FD, memory_order_relaxed);
atomic_store_explicit ((atomic_int *) &t->wake_data.fd,
data.fd, memory_order_relaxed);
break;
case WAITING_ON_COND:
atomic_store_explicit ((atomic_int *) &t->wake_data.state,
WAITING_ON_COND, memory_order_relaxed);
atomic_store_explicit ((_Atomic void**) &t->wake_data.mutex,
(void*) data.mutex, memory_order_relaxed);
atomic_store_explicit ((_Atomic void**) &t->wake_data.cond,
(void*) data.cond, memory_order_relaxed);
break;
default:
abort ();
}
scm_atomic_set_uint32 (&t->wake_data.seq, seq + 2);
}
static inline struct scm_thread_wake_data
read_wake_data_for_remote_thread_after_publishing_async (scm_thread *t)
{
struct scm_thread_wake_data wake = {0,};
uint32_t seq = scm_atomic_ref_uint32 (&t->wake_data.seq);
if (seq & 1)
{
/* The thread is preparing to wait but will check the
pending_asyncs before it sleeps. */
wake.state = NOT_WAITING;
}
else
{
wake.state =
atomic_load_explicit ((atomic_int *) &t->wake_data.state,
memory_order_relaxed);
switch (wake.state)
{
case NOT_WAITING:
break;
case WAITING_ON_FD:
wake.fd =
atomic_load_explicit ((atomic_int *) &t->wake_data.fd,
memory_order_relaxed);
break;
case WAITING_ON_COND:
wake.mutex = (scm_i_pthread_mutex_t *)
atomic_load_explicit ((_Atomic void**) &t->wake_data.mutex,
memory_order_relaxed);
wake.cond = (scm_i_pthread_cond_t *)
atomic_load_explicit ((_Atomic void**) &t->wake_data.cond,
memory_order_relaxed);
break;
default:
abort();
}
if (seq != scm_atomic_ref_uint32 (&t->wake_data.seq))
/* If the thread updated the wake state since we started
reading it, then the thread also checked the
pending_asyncs, so we don't have to do anything. */
wake.state = NOT_WAITING;
}
return wake;
}
static int
scm_i_prepare_to_wait (scm_thread *t,
struct scm_thread_wake_data data)
{
if (t->block_asyncs)
return 0;
publish_wake_data_for_thread (t, data);
/* If no interrupt was registered in the meantime, then any future
wakeup will signal the FD or cond var. */
if (scm_is_null (scm_atomic_ref_scm (&t->pending_asyncs)))
return 0;
/* Otherwise clear the wake pointer and indicate that the caller
should handle interrupts directly. */
scm_i_wait_finished (t);
return 1;
}
void
scm_i_wait_finished (scm_thread *t)
{
struct scm_thread_wake_data data = { .state = NOT_WAITING };
publish_wake_data_for_thread (t, data);
}
int
scm_i_prepare_to_wait_on_fd (scm_thread *t, int fd)
{
struct scm_thread_wake_data wake = {
.state = WAITING_ON_FD,
.fd = fd
};
return scm_i_prepare_to_wait (t, wake);
}
int
scm_c_prepare_to_wait_on_fd (int fd)
{
return scm_i_prepare_to_wait_on_fd (SCM_I_CURRENT_THREAD, fd);
}
int
scm_i_prepare_to_wait_on_cond (scm_thread *t,
scm_i_pthread_mutex_t *m,
scm_i_pthread_cond_t *c)
{
struct scm_thread_wake_data wake = {
.state = WAITING_ON_COND,
.mutex = m,
.cond = c
};
return scm_i_prepare_to_wait (t, wake);
}
int
scm_c_prepare_to_wait_on_cond (scm_i_pthread_mutex_t *m,
scm_i_pthread_cond_t *c)
{
return scm_i_prepare_to_wait_on_cond (SCM_I_CURRENT_THREAD, m, c);
}
void
scm_c_wait_finished (void)
{
scm_i_wait_finished (SCM_I_CURRENT_THREAD);
}
SCM_DEFINE (scm_system_async_mark_for_thread, "system-async-mark", 1, 1, 0,
(SCM proc, SCM thread),
"Mark @var{proc} (a procedure with zero arguments) for future execution\n"
"in @var{thread}. If @var{proc} has already been marked for\n"
"@var{thread} but has not been executed yet, this call has no effect.\n"
"If @var{thread} is omitted, the thread that called\n"
"@code{system-async-mark} is used.\n\n"
"This procedure is not safe to be called from C signal handlers. Use\n"
"@code{scm_sigaction} or @code{scm_sigaction_for_thread} to install\n"
"signal handlers.")
#define FUNC_NAME s_scm_system_async_mark_for_thread
{
scm_thread *t;
if (SCM_UNBNDP (thread))
t = SCM_I_CURRENT_THREAD;
else
{
SCM_VALIDATE_THREAD (2, thread);
t = SCM_I_THREAD_DATA (thread);
}
scm_i_async_push (t, proc);
/* At this point the async is enqueued. However if the thread is
sleeping, we have to wake it up. */
struct scm_thread_wake_data wake =
read_wake_data_for_remote_thread_after_publishing_async (t);
switch (wake.state)
{
case NOT_WAITING:
break;
case WAITING_ON_FD:
{
char dummy = 0;
/* T might already been done with sleeping here, but
interrupting it once too often does no harm. T might also
not yet have started sleeping, but this is no problem either
since the data written to a pipe will not be lost, unlike a
condition variable signal. */
full_write (wake.fd, &dummy, 1);
}
break;
case WAITING_ON_COND:
/* By now, the thread T might be out of its sleep already, or
might even be in the next, unrelated sleep. Interrupting it
anyway does no harm, however.
The important thing to prevent here is to signal the cond
before T waits on it. This can not happen since T has its
mutex locked while preparing the wait and will only unlock it
again while waiting on the cond.
*/
scm_i_scm_pthread_mutex_lock (wake.mutex);
scm_i_pthread_cond_signal (wake.cond);
scm_i_pthread_mutex_unlock (wake.mutex);
break;
default:
abort ();
}
return SCM_UNSPECIFIED;
}
#undef FUNC_NAME
SCM
scm_system_async_mark (SCM proc)
#define FUNC_NAME s_scm_system_async_mark_for_thread
{
return scm_system_async_mark_for_thread (proc, SCM_UNDEFINED);
}
#undef FUNC_NAME
SCM_DEFINE (scm_noop, "noop", 0, 0, 1,
(SCM args),
"Do nothing. When called without arguments, return @code{#f},\n"
"otherwise return the first argument.")
#define FUNC_NAME s_scm_noop
{
SCM_VALIDATE_REST_ARGUMENT (args);
return (SCM_NULL_OR_NIL_P (args) ? SCM_BOOL_F : SCM_CAR (args));
}
#undef FUNC_NAME
static void
increase_block (void *data)
{
scm_thread *t = data;
t->block_asyncs++;
}
static void
decrease_block (void *data)
{
scm_thread *t = data;
if (--t->block_asyncs == 0)
scm_async_tick ();
}
void
scm_dynwind_block_asyncs (void)
{
scm_thread *t = SCM_I_CURRENT_THREAD;
scm_dynwind_rewind_handler (increase_block, t, SCM_F_WIND_EXPLICITLY);
scm_dynwind_unwind_handler (decrease_block, t, SCM_F_WIND_EXPLICITLY);
}
void
scm_dynwind_unblock_asyncs (void)
{
scm_thread *t = SCM_I_CURRENT_THREAD;
if (t->block_asyncs == 0)
scm_misc_error ("scm_with_unblocked_asyncs",
"asyncs already unblocked", SCM_EOL);
scm_dynwind_rewind_handler (decrease_block, t, SCM_F_WIND_EXPLICITLY);
scm_dynwind_unwind_handler (increase_block, t, SCM_F_WIND_EXPLICITLY);
}
SCM_DEFINE (scm_call_with_blocked_asyncs, "call-with-blocked-asyncs", 1, 0, 0,
(SCM proc),
"Call @var{proc} with no arguments and block the execution\n"
"of system asyncs by one level for the current thread while\n"
"it is running. Return the value returned by @var{proc}.\n")
#define FUNC_NAME s_scm_call_with_blocked_asyncs
{
SCM ans;
scm_dynwind_begin (SCM_F_DYNWIND_REWINDABLE);
scm_dynwind_block_asyncs ();
ans = scm_call_0 (proc);
scm_dynwind_end ();
return ans;
}
#undef FUNC_NAME
void *
scm_c_call_with_blocked_asyncs (void *(*proc) (void *data), void *data)
{
void* ans;
scm_dynwind_begin (SCM_F_DYNWIND_REWINDABLE);
scm_dynwind_block_asyncs ();
ans = proc (data);
scm_dynwind_end ();
return ans;
}
SCM_DEFINE (scm_call_with_unblocked_asyncs, "call-with-unblocked-asyncs", 1, 0, 0,
(SCM proc),
"Call @var{proc} with no arguments and unblock the execution\n"
"of system asyncs by one level for the current thread while\n"
"it is running. Return the value returned by @var{proc}.\n")
#define FUNC_NAME s_scm_call_with_unblocked_asyncs
{
SCM ans;
if (SCM_I_CURRENT_THREAD->block_asyncs == 0)
SCM_MISC_ERROR ("asyncs already unblocked", SCM_EOL);
scm_dynwind_begin (SCM_F_DYNWIND_REWINDABLE);
scm_dynwind_unblock_asyncs ();
ans = scm_call_0 (proc);
scm_dynwind_end ();
return ans;
}
#undef FUNC_NAME
void *
scm_c_call_with_unblocked_asyncs (void *(*proc) (void *data), void *data)
{
void* ans;
if (SCM_I_CURRENT_THREAD->block_asyncs == 0)
scm_misc_error ("scm_c_call_with_unblocked_asyncs",
"asyncs already unblocked", SCM_EOL);
scm_dynwind_begin (SCM_F_DYNWIND_REWINDABLE);
scm_dynwind_unblock_asyncs ();
ans = proc (data);
scm_dynwind_end ();
return ans;
}
void
scm_init_async ()
{
#include "async.x"
}