1
Fork 0
mirror of https://git.savannah.gnu.org/git/guile.git synced 2025-06-17 01:00:20 +02:00

Core enhancements, by Julian Graham, to Guile's thread, mutex and

condvar primitives, in preparation for SRFI-18 support.
This commit is contained in:
Neil Jerram 2008-03-08 16:22:40 +00:00
parent 61b6542aa6
commit 6180e336b2
6 changed files with 611 additions and 176 deletions

View file

@ -49,6 +49,7 @@
#include "libguile/gc.h"
#include "libguile/init.h"
#include "libguile/scmsigs.h"
#include "libguile/strings.h"
#ifdef __MINGW32__
#ifndef ETIMEDOUT
@ -59,6 +60,24 @@
# define pipe(fd) _pipe (fd, 256, O_BINARY)
#endif /* __MINGW32__ */
static void
to_timespec (SCM t, scm_t_timespec *waittime)
{
if (scm_is_pair (t))
{
waittime->tv_sec = scm_to_ulong (SCM_CAR (t));
waittime->tv_nsec = scm_to_ulong (SCM_CDR (t)) * 1000;
}
else
{
double time = scm_to_double (t);
double sec = scm_c_truncate (time);
waittime->tv_sec = (long) sec;
waittime->tv_nsec = (long) ((time - sec) * 1000000);
}
}
/*** Queues */
/* Make an empty queue data structure.
@ -134,6 +153,7 @@ thread_mark (SCM obj)
scm_gc_mark (t->result);
scm_gc_mark (t->cleanup_handler);
scm_gc_mark (t->join_queue);
scm_gc_mark (t->mutexes);
scm_gc_mark (t->dynwinds);
scm_gc_mark (t->active_asyncs);
scm_gc_mark (t->continuation_root);
@ -418,6 +438,7 @@ guilify_self_1 (SCM_STACKITEM *base)
t->handle = SCM_BOOL_F;
t->result = SCM_BOOL_F;
t->cleanup_handler = SCM_BOOL_F;
t->mutexes = SCM_EOL;
t->join_queue = SCM_EOL;
t->dynamic_state = SCM_BOOL_F;
t->dynwinds = SCM_EOL;
@ -478,6 +499,31 @@ guilify_self_2 (SCM parent)
t->block_asyncs = 0;
}
/*** Fat mutexes */
/* We implement our own mutex type since we want them to be 'fair', we
want to do fancy things while waiting for them (like running
asyncs) and we might want to add things that are nice for
debugging.
*/
typedef struct {
scm_i_pthread_mutex_t lock;
SCM owner;
int level; /* how much the owner owns us.
< 0 for non-recursive mutexes */
int unchecked_unlock; /* is it an error to unlock an unlocked mutex? */
int allow_external_unlock; /* is it an error to unlock a mutex that is not
owned by the current thread? */
SCM waiting; /* the threads waiting for this mutex. */
} fat_mutex;
#define SCM_MUTEXP(x) SCM_SMOB_PREDICATE (scm_tc16_mutex, x)
#define SCM_MUTEX_DATA(x) ((fat_mutex *) SCM_SMOB_DATA (x))
/* Perform thread tear-down, in guile mode.
*/
static void *
@ -503,6 +549,18 @@ do_thread_exit (void *v)
while (scm_is_true (unblock_from_queue (t->join_queue)))
;
while (!scm_is_null (t->mutexes))
{
SCM mutex = SCM_CAR (t->mutexes);
fat_mutex *m = SCM_MUTEX_DATA (mutex);
scm_i_pthread_mutex_lock (&m->lock);
unblock_from_queue (m->waiting);
scm_i_pthread_mutex_unlock (&m->lock);
t->mutexes = SCM_CDR (t->mutexes);
}
scm_i_pthread_mutex_unlock (&t->admin_mutex);
return NULL;
@ -989,14 +1047,23 @@ SCM_DEFINE (scm_thread_cleanup, "thread-cleanup", 1, 0, 0,
}
#undef FUNC_NAME
SCM_DEFINE (scm_join_thread, "join-thread", 1, 0, 0,
(SCM thread),
SCM scm_join_thread (SCM thread)
{
return scm_join_thread_timed (thread, SCM_UNDEFINED, SCM_UNDEFINED);
}
SCM_DEFINE (scm_join_thread_timed, "join-thread", 1, 2, 0,
(SCM thread, SCM timeout, SCM timeoutval),
"Suspend execution of the calling thread until the target @var{thread} "
"terminates, unless the target @var{thread} has already terminated. ")
#define FUNC_NAME s_scm_join_thread
#define FUNC_NAME s_scm_join_thread_timed
{
scm_i_thread *t;
SCM res;
scm_t_timespec ctimeout, *timeout_ptr = NULL;
SCM res = SCM_BOOL_F;
if (! (SCM_UNBNDP (timeoutval)))
res = timeoutval;
SCM_VALIDATE_THREAD (1, thread);
if (scm_is_eq (scm_current_thread (), thread))
@ -1005,19 +1072,36 @@ SCM_DEFINE (scm_join_thread, "join-thread", 1, 0, 0,
t = SCM_I_THREAD_DATA (thread);
scm_i_scm_pthread_mutex_lock (&t->admin_mutex);
if (!t->exited)
if (! SCM_UNBNDP (timeout))
{
to_timespec (timeout, &ctimeout);
timeout_ptr = &ctimeout;
}
if (t->exited)
res = t->result;
else
{
while (1)
{
block_self (t->join_queue, thread, &t->admin_mutex, NULL);
if (t->exited)
int err = block_self (t->join_queue, thread, &t->admin_mutex,
timeout_ptr);
if (err == 0)
{
if (t->exited)
{
res = t->result;
break;
}
}
else if (err == ETIMEDOUT)
break;
scm_i_pthread_mutex_unlock (&t->admin_mutex);
SCM_TICK;
scm_i_scm_pthread_mutex_lock (&t->admin_mutex);
}
}
res = t->result;
scm_i_pthread_mutex_unlock (&t->admin_mutex);
@ -1025,26 +1109,14 @@ SCM_DEFINE (scm_join_thread, "join-thread", 1, 0, 0,
}
#undef FUNC_NAME
/*** Fat mutexes */
/* We implement our own mutex type since we want them to be 'fair', we
want to do fancy things while waiting for them (like running
asyncs) and we might want to add things that are nice for
debugging.
*/
typedef struct {
scm_i_pthread_mutex_t lock;
SCM owner;
int level; /* how much the owner owns us.
< 0 for non-recursive mutexes */
SCM waiting; /* the threads waiting for this mutex. */
} fat_mutex;
#define SCM_MUTEXP(x) SCM_SMOB_PREDICATE (scm_tc16_mutex, x)
#define SCM_MUTEX_DATA(x) ((fat_mutex *) SCM_SMOB_DATA (x))
SCM_DEFINE (scm_thread_p, "thread?", 1, 0, 0,
(SCM obj),
"Return @code{#t} if @var{obj} is a thread.")
#define FUNC_NAME s_scm_thread_p
{
return SCM_I_IS_THREAD(obj) ? SCM_BOOL_T : SCM_BOOL_F;
}
#undef FUNC_NAME
static SCM
fat_mutex_mark (SCM mx)
@ -1074,7 +1146,7 @@ fat_mutex_print (SCM mx, SCM port, scm_print_state *pstate SCM_UNUSED)
}
static SCM
make_fat_mutex (int recursive)
make_fat_mutex (int recursive, int unchecked_unlock, int external_unlock)
{
fat_mutex *m;
SCM mx;
@ -1083,18 +1155,47 @@ make_fat_mutex (int recursive)
scm_i_pthread_mutex_init (&m->lock, NULL);
m->owner = SCM_BOOL_F;
m->level = recursive? 0 : -1;
m->unchecked_unlock = unchecked_unlock;
m->allow_external_unlock = external_unlock;
m->waiting = SCM_EOL;
SCM_NEWSMOB (mx, scm_tc16_mutex, (scm_t_bits) m);
m->waiting = make_queue ();
return mx;
}
SCM_DEFINE (scm_make_mutex, "make-mutex", 0, 0, 0,
(void),
"Create a new mutex. ")
#define FUNC_NAME s_scm_make_mutex
SCM scm_make_mutex (void)
{
return make_fat_mutex (0);
return scm_make_mutex_with_flags (SCM_EOL);
}
static SCM unchecked_unlock_sym;
static SCM allow_external_unlock_sym;
static SCM recursive_sym;
SCM_DEFINE (scm_make_mutex_with_flags, "make-mutex", 0, 0, 1,
(SCM flags),
"Create a new mutex. ")
#define FUNC_NAME s_scm_make_mutex_with_flags
{
int unchecked_unlock = 0, external_unlock = 0, recursive = 0;
SCM ptr = flags;
while (! scm_is_null (ptr))
{
SCM flag = SCM_CAR (ptr);
if (scm_is_eq (flag, unchecked_unlock_sym))
unchecked_unlock = 1;
else if (scm_is_eq (flag, allow_external_unlock_sym))
external_unlock = 1;
else if (scm_is_eq (flag, recursive_sym))
recursive = 1;
else
SCM_MISC_ERROR ("unsupported mutex option", SCM_EOL);
ptr = SCM_CDR (ptr);
}
return make_fat_mutex (recursive, unchecked_unlock, external_unlock);
}
#undef FUNC_NAME
@ -1103,59 +1204,121 @@ SCM_DEFINE (scm_make_recursive_mutex, "make-recursive-mutex", 0, 0, 0,
"Create a new recursive mutex. ")
#define FUNC_NAME s_scm_make_recursive_mutex
{
return make_fat_mutex (1);
return make_fat_mutex (1, 0, 0);
}
#undef FUNC_NAME
static char *
fat_mutex_lock (SCM mutex)
SCM_SYMBOL (scm_abandoned_mutex_error_key, "abandoned-mutex-error");
static SCM
fat_mutex_lock (SCM mutex, scm_t_timespec *timeout, int *ret)
{
fat_mutex *m = SCM_MUTEX_DATA (mutex);
SCM thread = scm_current_thread ();
char *msg = NULL;
scm_i_thread *t = SCM_I_THREAD_DATA (thread);
SCM err = SCM_BOOL_F;
struct timeval current_time;
scm_i_scm_pthread_mutex_lock (&m->lock);
if (scm_is_false (m->owner))
m->owner = thread;
{
m->owner = thread;
scm_i_pthread_mutex_lock (&t->admin_mutex);
t->mutexes = scm_cons (mutex, t->mutexes);
scm_i_pthread_mutex_unlock (&t->admin_mutex);
*ret = 1;
}
else if (scm_is_eq (m->owner, thread))
{
if (m->level >= 0)
m->level++;
{
m->level++;
*ret = 1;
}
else
msg = "mutex already locked by current thread";
err = scm_cons (scm_misc_error_key,
scm_from_locale_string ("mutex already locked by "
"current thread"));
}
else
{
int first_iteration = 1;
while (1)
{
block_self (m->waiting, mutex, &m->lock, NULL);
if (scm_is_eq (m->owner, thread))
break;
scm_i_pthread_mutex_unlock (&m->lock);
SCM_TICK;
scm_i_scm_pthread_mutex_lock (&m->lock);
if (scm_is_eq (m->owner, thread) || scm_c_thread_exited_p (m->owner))
{
scm_i_pthread_mutex_lock (&t->admin_mutex);
t->mutexes = scm_cons (mutex, t->mutexes);
scm_i_pthread_mutex_unlock (&t->admin_mutex);
*ret = 1;
if (scm_c_thread_exited_p (m->owner))
{
m->owner = thread;
err = scm_cons (scm_abandoned_mutex_error_key,
scm_from_locale_string ("lock obtained on "
"abandoned mutex"));
}
break;
}
else if (!first_iteration)
{
if (timeout != NULL)
{
gettimeofday (&current_time, NULL);
if (current_time.tv_sec > timeout->tv_sec ||
(current_time.tv_sec == timeout->tv_sec &&
current_time.tv_usec * 1000 > timeout->tv_nsec))
{
*ret = 0;
break;
}
}
scm_i_pthread_mutex_unlock (&m->lock);
SCM_TICK;
scm_i_scm_pthread_mutex_lock (&m->lock);
}
else
first_iteration = 0;
block_self (m->waiting, mutex, &m->lock, timeout);
}
}
scm_i_pthread_mutex_unlock (&m->lock);
return msg;
return err;
}
SCM_DEFINE (scm_lock_mutex, "lock-mutex", 1, 0, 0,
(SCM mx),
SCM scm_lock_mutex (SCM mx)
{
return scm_lock_mutex_timed (mx, SCM_UNDEFINED);
}
SCM_DEFINE (scm_lock_mutex_timed, "lock-mutex", 1, 1, 0,
(SCM m, SCM timeout),
"Lock @var{mutex}. If the mutex is already locked, the calling thread "
"blocks until the mutex becomes available. The function returns when "
"the calling thread owns the lock on @var{mutex}. Locking a mutex that "
"a thread already owns will succeed right away and will not block the "
"thread. That is, Guile's mutexes are @emph{recursive}. ")
#define FUNC_NAME s_scm_lock_mutex
#define FUNC_NAME s_scm_lock_mutex_timed
{
char *msg;
SCM exception;
int ret = 0;
scm_t_timespec cwaittime, *waittime = NULL;
SCM_VALIDATE_MUTEX (1, mx);
msg = fat_mutex_lock (mx);
if (msg)
scm_misc_error (NULL, msg, SCM_EOL);
return SCM_BOOL_T;
SCM_VALIDATE_MUTEX (1, m);
if (! SCM_UNBNDP (timeout) && ! scm_is_false (timeout))
{
to_timespec (timeout, &cwaittime);
waittime = &cwaittime;
}
exception = fat_mutex_lock (m, waittime, &ret);
if (!scm_is_false (exception))
scm_ithrow (SCM_CAR (exception), scm_list_1 (SCM_CDR (exception)), 1);
return ret ? SCM_BOOL_T : SCM_BOOL_F;
}
#undef FUNC_NAME
@ -1168,71 +1331,134 @@ scm_dynwind_lock_mutex (SCM mutex)
SCM_F_WIND_EXPLICITLY);
}
static char *
fat_mutex_trylock (fat_mutex *m, int *resp)
{
char *msg = NULL;
SCM thread = scm_current_thread ();
*resp = 1;
scm_i_pthread_mutex_lock (&m->lock);
if (scm_is_false (m->owner))
m->owner = thread;
else if (scm_is_eq (m->owner, thread))
{
if (m->level >= 0)
m->level++;
else
msg = "mutex already locked by current thread";
}
else
*resp = 0;
scm_i_pthread_mutex_unlock (&m->lock);
return msg;
}
SCM_DEFINE (scm_try_mutex, "try-mutex", 1, 0, 0,
(SCM mutex),
"Try to lock @var{mutex}. If the mutex is already locked by someone "
"else, return @code{#f}. Else lock the mutex and return @code{#t}. ")
#define FUNC_NAME s_scm_try_mutex
{
char *msg;
int res;
SCM exception;
int ret = 0;
scm_t_timespec cwaittime, *waittime = NULL;
SCM_VALIDATE_MUTEX (1, mutex);
to_timespec (scm_from_int(0), &cwaittime);
waittime = &cwaittime;
msg = fat_mutex_trylock (SCM_MUTEX_DATA (mutex), &res);
if (msg)
scm_misc_error (NULL, msg, SCM_EOL);
return scm_from_bool (res);
exception = fat_mutex_lock (mutex, waittime, &ret);
if (!scm_is_false (exception))
scm_ithrow (SCM_CAR (exception), scm_list_1 (SCM_CDR (exception)), 1);
return ret ? SCM_BOOL_T : SCM_BOOL_F;
}
#undef FUNC_NAME
static char *
fat_mutex_unlock (fat_mutex *m)
/*** Fat condition variables */
typedef struct {
scm_i_pthread_mutex_t lock;
SCM waiting; /* the threads waiting for this condition. */
} fat_cond;
#define SCM_CONDVARP(x) SCM_SMOB_PREDICATE (scm_tc16_condvar, x)
#define SCM_CONDVAR_DATA(x) ((fat_cond *) SCM_SMOB_DATA (x))
static int
fat_mutex_unlock (SCM mutex, SCM cond,
const scm_t_timespec *waittime, int relock)
{
char *msg = NULL;
fat_mutex *m = SCM_MUTEX_DATA (mutex);
fat_cond *c = NULL;
scm_i_thread *t = SCM_I_CURRENT_THREAD;
int err = 0, ret = 0;
scm_i_scm_pthread_mutex_lock (&m->lock);
if (!scm_is_eq (m->owner, scm_current_thread ()))
{
if (scm_is_false (m->owner))
msg = "mutex not locked";
else
msg = "mutex not locked by current thread";
{
if (!m->unchecked_unlock)
scm_misc_error (NULL, "mutex not locked", SCM_EOL);
}
else if (!m->allow_external_unlock)
scm_misc_error (NULL, "mutex not locked by current thread", SCM_EOL);
}
else if (m->level > 0)
m->level--;
else
m->owner = unblock_from_queue (m->waiting);
scm_i_pthread_mutex_unlock (&m->lock);
return msg;
if (! (SCM_UNBNDP (cond)))
{
int lock_ret = 0;
c = SCM_CONDVAR_DATA (cond);
while (1)
{
int brk = 0;
scm_i_scm_pthread_mutex_lock (&c->lock);
if (m->level > 0)
m->level--;
else
m->owner = unblock_from_queue (m->waiting);
scm_i_pthread_mutex_unlock (&m->lock);
t->block_asyncs++;
err = block_self (c->waiting, cond, &c->lock, waittime);
if (err == 0)
{
ret = 1;
brk = 1;
}
else if (err == ETIMEDOUT)
{
ret = 0;
brk = 1;
}
else if (err != EINTR)
{
errno = err;
scm_i_pthread_mutex_unlock (&c->lock);
scm_syserror (NULL);
}
if (brk)
{
if (relock)
fat_mutex_lock (mutex, NULL, &lock_ret);
scm_i_pthread_mutex_unlock (&c->lock);
break;
}
scm_i_pthread_mutex_unlock (&c->lock);
t->block_asyncs--;
scm_async_click ();
scm_remember_upto_here_2 (cond, mutex);
scm_i_scm_pthread_mutex_lock (&m->lock);
}
}
else
{
if (m->level > 0)
m->level--;
else
m->owner = unblock_from_queue (m->waiting);
scm_i_pthread_mutex_unlock (&m->lock);
ret = 1;
}
return ret;
}
SCM_DEFINE (scm_unlock_mutex, "unlock-mutex", 1, 0, 0,
(SCM mx),
SCM scm_unlock_mutex (SCM mx)
{
return scm_unlock_mutex_timed (mx, SCM_UNDEFINED, SCM_UNDEFINED);
}
SCM_DEFINE (scm_unlock_mutex_timed, "unlock-mutex", 1, 2, 0,
(SCM mx, SCM cond, SCM timeout),
"Unlocks @var{mutex} if the calling thread owns the lock on "
"@var{mutex}. Calling unlock-mutex on a mutex not owned by the current "
"thread results in undefined behaviour. Once a mutex has been unlocked, "
@ -1240,18 +1466,35 @@ SCM_DEFINE (scm_unlock_mutex, "unlock-mutex", 1, 0, 0,
"lock. Every call to @code{lock-mutex} by this thread must be matched "
"with a call to @code{unlock-mutex}. Only the last call to "
"@code{unlock-mutex} will actually unlock the mutex. ")
#define FUNC_NAME s_scm_unlock_mutex
#define FUNC_NAME s_scm_unlock_mutex_timed
{
char *msg;
scm_t_timespec cwaittime, *waittime = NULL;
SCM_VALIDATE_MUTEX (1, mx);
msg = fat_mutex_unlock (SCM_MUTEX_DATA (mx));
if (msg)
scm_misc_error (NULL, msg, SCM_EOL);
return SCM_BOOL_T;
if (! (SCM_UNBNDP (cond)))
{
SCM_VALIDATE_CONDVAR (2, cond);
if (! (SCM_UNBNDP (timeout)))
{
to_timespec (timeout, &cwaittime);
waittime = &cwaittime;
}
}
return fat_mutex_unlock (mx, cond, waittime, 0) ? SCM_BOOL_T : SCM_BOOL_F;
}
#undef FUNC_NAME
SCM_DEFINE (scm_mutex_p, "mutex?", 1, 0, 0,
(SCM obj),
"Return @code{#t} if @var{obj} is a mutex.")
#define FUNC_NAME s_scm_mutex_p
{
return SCM_MUTEXP (obj) ? SCM_BOOL_T : SCM_BOOL_F;
}
#undef FUNC_NAME
#if 0
SCM_DEFINE (scm_mutex_owner, "mutex-owner", 1, 0, 0,
@ -1277,16 +1520,6 @@ SCM_DEFINE (scm_mutex_level, "mutex-level", 1, 0, 0,
#endif
/*** Fat condition variables */
typedef struct {
scm_i_pthread_mutex_t lock;
SCM waiting; /* the threads waiting for this condition. */
} fat_cond;
#define SCM_CONDVARP(x) SCM_SMOB_PREDICATE (scm_tc16_condvar, x)
#define SCM_CONDVAR_DATA(x) ((fat_cond *) SCM_SMOB_DATA (x))
static SCM
fat_cond_mark (SCM cv)
{
@ -1334,43 +1567,7 @@ static int
fat_cond_timedwait (SCM cond, SCM mutex,
const scm_t_timespec *waittime)
{
scm_i_thread *t = SCM_I_CURRENT_THREAD;
fat_cond *c = SCM_CONDVAR_DATA (cond);
fat_mutex *m = SCM_MUTEX_DATA (mutex);
const char *msg;
int err = 0;
while (1)
{
scm_i_scm_pthread_mutex_lock (&c->lock);
msg = fat_mutex_unlock (m);
t->block_asyncs++;
if (msg == NULL)
{
err = block_self (c->waiting, cond, &c->lock, waittime);
scm_i_pthread_mutex_unlock (&c->lock);
fat_mutex_lock (mutex);
}
else
scm_i_pthread_mutex_unlock (&c->lock);
t->block_asyncs--;
scm_async_click ();
if (msg)
scm_misc_error (NULL, msg, SCM_EOL);
scm_remember_upto_here_2 (cond, mutex);
if (err == 0)
return 1;
if (err == ETIMEDOUT)
return 0;
if (err != EINTR)
{
errno = err;
scm_syserror (NULL);
}
}
return fat_mutex_unlock (mutex, cond, waittime, 1);
}
SCM_DEFINE (scm_timed_wait_condition_variable, "wait-condition-variable", 2, 1, 0,
@ -1393,20 +1590,11 @@ SCM_DEFINE (scm_timed_wait_condition_variable, "wait-condition-variable", 2, 1,
if (!SCM_UNBNDP (t))
{
if (scm_is_pair (t))
{
waittime.tv_sec = scm_to_ulong (SCM_CAR (t));
waittime.tv_nsec = scm_to_ulong (SCM_CAR (t)) * 1000;
}
else
{
waittime.tv_sec = scm_to_ulong (t);
waittime.tv_nsec = 0;
}
to_timespec (t, &waittime);
waitptr = &waittime;
}
return scm_from_bool (fat_cond_timedwait (cv, mx, waitptr));
return fat_cond_timedwait (cv, mx, waitptr) ? SCM_BOOL_T : SCM_BOOL_F;
}
#undef FUNC_NAME
@ -1449,6 +1637,15 @@ SCM_DEFINE (scm_broadcast_condition_variable, "broadcast-condition-variable", 1,
}
#undef FUNC_NAME
SCM_DEFINE (scm_condition_variable_p, "condition-variable?", 1, 0, 0,
(SCM obj),
"Return @code{#t} if @var{obj} is a condition variable.")
#define FUNC_NAME s_scm_condition_variable_p
{
return SCM_CONDVARP(obj) ? SCM_BOOL_T : SCM_BOOL_F;
}
#undef FUNC_NAME
/*** Marking stacks */
/* XXX - what to do with this? Do we need to handle this for blocked
@ -1800,6 +1997,12 @@ scm_init_threads ()
scm_set_smob_print (scm_tc16_mutex, fat_mutex_print);
scm_set_smob_free (scm_tc16_mutex, fat_mutex_free);
unchecked_unlock_sym =
scm_permanent_object (scm_from_locale_symbol ("unchecked-unlock"));
allow_external_unlock_sym =
scm_permanent_object (scm_from_locale_symbol ("allow-external-unlock"));
recursive_sym = scm_permanent_object (scm_from_locale_symbol ("recursive"));
scm_tc16_condvar = scm_make_smob_type ("condition-variable",
sizeof (fat_cond));
scm_set_smob_mark (scm_tc16_condvar, fat_cond_mark);