diff --git a/doc/ref/ChangeLog b/doc/ref/ChangeLog index 54173f24b..aea2b0f1b 100644 --- a/doc/ref/ChangeLog +++ b/doc/ref/ChangeLog @@ -1,3 +1,12 @@ +2008-03-08 Julian Graham + + * api-scheduling.texi (Threads): Add documentation for new + functions "scm_thread_p" and new "scm_join_thread_timed". + (Mutexes and Condition Variables): Add documentation for new + functions "scm_make_mutex_with_flags", "scm_mutex_p", + "scm_lock_mutex_timed", "scm_unlock_mutex_timed", and + "scm_condition_variable_p". + 2008-02-11 Neil Jerram * api-data.texi (Random): New text about the default random state, diff --git a/doc/ref/api-scheduling.texi b/doc/ref/api-scheduling.texi index 56fa20228..7f405064d 100644 --- a/doc/ref/api-scheduling.texi +++ b/doc/ref/api-scheduling.texi @@ -267,12 +267,24 @@ Once @var{body} or @var{handler} returns, the return value is made the @emph{exit value} of the thread and the thread is terminated. @end deftypefn +@deffn {Scheme Procedure} thread? obj +@deffnx {C Function} scm_thread_p (obj) +Return @code{#t} iff @var{obj} is a thread; otherwise, return +@code{#f}. +@end deffn + @c begin (texi-doc-string "guile" "join-thread") -@deffn {Scheme Procedure} join-thread thread +@deffn {Scheme Procedure} join-thread thread [timeout [timeoutval]] @deffnx {C Function} scm_join_thread (thread) +@deffnx {C Function} scm_join_thread_timed (thread, timeout, timeoutval) Wait for @var{thread} to terminate and return its exit value. Threads that have not been created with @code{call-with-new-thread} or -@code{scm_spawn_thread} have an exit value of @code{#f}. +@code{scm_spawn_thread} have an exit value of @code{#f}. When +@var{timeout} is given, it specifies a point in time where the waiting +should be aborted. It can be either an integer as returned by +@code{current-time} or a pair as returned by @code{gettimeofday}. +When the waiting is aborted, @var{timeoutval} is returned (if it is +specified; @code{#f} is returned otherwise). @end deffn @deffn {Scheme Procedure} thread-exited? thread @@ -363,21 +375,51 @@ Acquiring requisite mutexes in a fixed order (like always A before B) in all threads is one way to avoid such problems. @sp 1 -@deffn {Scheme Procedure} make-mutex +@deffn {Scheme Procedure} make-mutex . flags @deffnx {C Function} scm_make_mutex () -Return a new standard mutex. It is initially unlocked. +@deffnx {C Function} scm_make_mutex_with_flags (SCM flag) +Return a new mutex. It is initially unlocked. If @var{flags} is +specified, it must be a list of symbols specifying configuration flags +for the newly-created mutex. The supported flags are: +@table @code +@item unchecked-unlock +Unless this flag is present, a call to `unlock-mutex' on the returned +mutex when it is already unlocked will cause an error to be signalled. + +@item allow-external-unlock +Allow the returned mutex to be unlocked by the calling thread even if +it was originally locked by a different thread. + +@item recursive +The returned mutex will be recursive. + +@end table +@end deffn + +@deffn {Scheme Procedure} mutex? obj +@deffnx {C Function} scm_mutex_p (obj) +Return @code{#t} iff @var{obj} is a mutex; otherwise, return +@code{#f}. @end deffn @deffn {Scheme Procedure} make-recursive-mutex @deffnx {C Function} scm_make_recursive_mutex () -Create a new recursive mutex. It is initialloy unlocked. +Create a new recursive mutex. It is initially unlocked. Calling this +function is equivalent to calling `make-mutex' and specifying the +@code{recursive} flag. @end deffn -@deffn {Scheme Procedure} lock-mutex mutex +@deffn {Scheme Procedure} lock-mutex mutex [timeout] @deffnx {C Function} scm_lock_mutex (mutex) +@deffnx {C Function} scm_lock_mutex_timed (mutex, timeout) Lock @var{mutex}. If the mutex is already locked by another thread then block and return only when @var{mutex} has been acquired. +When @var{timeout} is given, it specifies a point in time where the +waiting should be aborted. It can be either an integer as returned +by @code{current-time} or a pair as returned by @code{gettimeofday}. +When the waiting is aborted, @code{#f} is returned. + For standard mutexes (@code{make-mutex}), and error is signalled if the thread has itself already locked @var{mutex}. @@ -386,6 +428,10 @@ itself already locked @var{mutex}, then a further @code{lock-mutex} call increments the lock count. An additional @code{unlock-mutex} will be required to finally release. +If @var{mutex} was locked by a thread that exited before unlocking it, +the next attempt to lock @var{mutex} will succeed, but +@code{abandoned-mutex-error} will be signalled. + When a system async (@pxref{System asyncs}) is activated for a thread blocked in @code{lock-mutex}, the wait is interrupted and the async is executed. When the async returns, the wait resumes. @@ -395,7 +441,7 @@ executed. When the async returns, the wait resumes. Arrange for @var{mutex} to be locked whenever the current dynwind context is entered and to be unlocked when it is exited. @end deftypefn - + @deffn {Scheme Procedure} try-mutex mx @deffnx {C Function} scm_try_mutex (mx) Try to lock @var{mutex} as per @code{lock-mutex}. If @var{mutex} can @@ -404,10 +450,25 @@ If @var{mutex} is locked by some other thread then nothing is done and the return is @code{#f}. @end deffn -@deffn {Scheme Procedure} unlock-mutex mutex +@deffn {Scheme Procedure} unlock-mutex mutex [condvar [timeout]] @deffnx {C Function} scm_unlock_mutex (mutex) -Unlock @var{mutex}. An error is signalled if @var{mutex} is not -locked by the calling thread. +@deffnx {C Function} scm_unlock_mutex_timed (mutex, condvar, timeout) +Unlock @var{mutex}. An error is signalled if @var{mutex} is not locked +and was not created with the @code{unchecked-unlock} flag set, or if +@var{mutex} is locked by a thread other than the calling thread and was +not created with the @code{allow-external-unlock} flag set. + +If @var{condvar} is given, it specifies a condition variable upon +which the calling thread will wait to be signalled before returning. +(This behavior is very similar to that of +@code{wait-condition-variable}, except that the mutex is left in an +unlocked state when the function returns.) + +When @var{timeout} is also given, it specifies a point in time where +the waiting should be aborted. It can be either an integer as +returned by @code{current-time} or a pair as returned by +@code{gettimeofday}. When the waiting is aborted, @code{#f} is +returned. Otherwise the function returns @code{#t}. @end deffn @deffn {Scheme Procedure} make-condition-variable @@ -415,6 +476,12 @@ locked by the calling thread. Return a new condition variable. @end deffn +@deffn {Scheme Procedure} condition-variable? obj +@deffnx {C Function} scm_condition_variable_p (obj) +Return @code{#t} iff @var{obj} is a condition variable; otherwise, +return @code{#f}. +@end deffn + @deffn {Scheme Procedure} wait-condition-variable condvar mutex [time] @deffnx {C Function} scm_wait_condition_variable (condvar, mutex, time) Wait until @var{condvar} has been signalled. While waiting, diff --git a/libguile/ChangeLog b/libguile/ChangeLog index 7d8846c6e..677d9277c 100644 --- a/libguile/ChangeLog +++ b/libguile/ChangeLog @@ -1,3 +1,29 @@ +2008-03-08 Julian Graham + + * threads.c (scm_join_thread_timed, scm_thread_p, + scm_make_mutex_with_flags, scm_lock_mutex_timed, + scm_unlock_mutex_timed, scm_mutex_p, scm_condition_variable_p): New + functions. + (thread_mark): Updated to mark new struct field `mutexes'. + (do_thread_exit): Notify threads waiting on mutexes locked by exiting + thread. + (scm_join_thread, scm_make_mutex, scm_make_recursive_mutex, + scm_mutex_lock): Reimplement in terms of their newer + counterparts. + (scm_abandoned_mutex_error_key): New symbol. + (fat_mutex)[unchecked_unlock, allow_external_unlock]: New fields. + (fat_mutex_lock): Reimplement to support timeouts and abandonment. + (fat_mutex_trylock, scm_try_mutex): Remove fat_mutex_trylock and + reimplement scm_try_mutex as a lock attempt with a timeout of zero. + (fat_mutex_unlock): Allow unlocking from other threads and unchecked + unlocking; implement in terms of condition variable wait. + (scm_timed_wait_condition_variable): Reimplement in terms of + fat_mutex_unlock. + * threads.h (scm_i_thread)[mutexes]: New field. + (scm_join_thread_timed, scm_thread_p, scm_lock_mutex_timed, + scm_unlock_mutex_timed, scm_mutex_p, scm_condition_variable_p): + Prototypes for new functions. + 2008-03-06 Ludovic Courtès * eval.c (scm_eval): If MODULE_OR_STATE is not a dynamic state, diff --git a/libguile/threads.c b/libguile/threads.c index ba0aa1a55..e959cc66c 100644 --- a/libguile/threads.c +++ b/libguile/threads.c @@ -49,6 +49,7 @@ #include "libguile/gc.h" #include "libguile/init.h" #include "libguile/scmsigs.h" +#include "libguile/strings.h" #ifdef __MINGW32__ #ifndef ETIMEDOUT @@ -59,6 +60,24 @@ # define pipe(fd) _pipe (fd, 256, O_BINARY) #endif /* __MINGW32__ */ +static void +to_timespec (SCM t, scm_t_timespec *waittime) +{ + if (scm_is_pair (t)) + { + waittime->tv_sec = scm_to_ulong (SCM_CAR (t)); + waittime->tv_nsec = scm_to_ulong (SCM_CDR (t)) * 1000; + } + else + { + double time = scm_to_double (t); + double sec = scm_c_truncate (time); + + waittime->tv_sec = (long) sec; + waittime->tv_nsec = (long) ((time - sec) * 1000000); + } +} + /*** Queues */ /* Make an empty queue data structure. @@ -134,6 +153,7 @@ thread_mark (SCM obj) scm_gc_mark (t->result); scm_gc_mark (t->cleanup_handler); scm_gc_mark (t->join_queue); + scm_gc_mark (t->mutexes); scm_gc_mark (t->dynwinds); scm_gc_mark (t->active_asyncs); scm_gc_mark (t->continuation_root); @@ -418,6 +438,7 @@ guilify_self_1 (SCM_STACKITEM *base) t->handle = SCM_BOOL_F; t->result = SCM_BOOL_F; t->cleanup_handler = SCM_BOOL_F; + t->mutexes = SCM_EOL; t->join_queue = SCM_EOL; t->dynamic_state = SCM_BOOL_F; t->dynwinds = SCM_EOL; @@ -478,6 +499,31 @@ guilify_self_2 (SCM parent) t->block_asyncs = 0; } + +/*** Fat mutexes */ + +/* We implement our own mutex type since we want them to be 'fair', we + want to do fancy things while waiting for them (like running + asyncs) and we might want to add things that are nice for + debugging. +*/ + +typedef struct { + scm_i_pthread_mutex_t lock; + SCM owner; + int level; /* how much the owner owns us. + < 0 for non-recursive mutexes */ + + int unchecked_unlock; /* is it an error to unlock an unlocked mutex? */ + int allow_external_unlock; /* is it an error to unlock a mutex that is not + owned by the current thread? */ + + SCM waiting; /* the threads waiting for this mutex. */ +} fat_mutex; + +#define SCM_MUTEXP(x) SCM_SMOB_PREDICATE (scm_tc16_mutex, x) +#define SCM_MUTEX_DATA(x) ((fat_mutex *) SCM_SMOB_DATA (x)) + /* Perform thread tear-down, in guile mode. */ static void * @@ -503,6 +549,18 @@ do_thread_exit (void *v) while (scm_is_true (unblock_from_queue (t->join_queue))) ; + while (!scm_is_null (t->mutexes)) + { + SCM mutex = SCM_CAR (t->mutexes); + fat_mutex *m = SCM_MUTEX_DATA (mutex); + scm_i_pthread_mutex_lock (&m->lock); + + unblock_from_queue (m->waiting); + + scm_i_pthread_mutex_unlock (&m->lock); + t->mutexes = SCM_CDR (t->mutexes); + } + scm_i_pthread_mutex_unlock (&t->admin_mutex); return NULL; @@ -989,14 +1047,23 @@ SCM_DEFINE (scm_thread_cleanup, "thread-cleanup", 1, 0, 0, } #undef FUNC_NAME -SCM_DEFINE (scm_join_thread, "join-thread", 1, 0, 0, - (SCM thread), +SCM scm_join_thread (SCM thread) +{ + return scm_join_thread_timed (thread, SCM_UNDEFINED, SCM_UNDEFINED); +} + +SCM_DEFINE (scm_join_thread_timed, "join-thread", 1, 2, 0, + (SCM thread, SCM timeout, SCM timeoutval), "Suspend execution of the calling thread until the target @var{thread} " "terminates, unless the target @var{thread} has already terminated. ") -#define FUNC_NAME s_scm_join_thread +#define FUNC_NAME s_scm_join_thread_timed { scm_i_thread *t; - SCM res; + scm_t_timespec ctimeout, *timeout_ptr = NULL; + SCM res = SCM_BOOL_F; + + if (! (SCM_UNBNDP (timeoutval))) + res = timeoutval; SCM_VALIDATE_THREAD (1, thread); if (scm_is_eq (scm_current_thread (), thread)) @@ -1005,19 +1072,36 @@ SCM_DEFINE (scm_join_thread, "join-thread", 1, 0, 0, t = SCM_I_THREAD_DATA (thread); scm_i_scm_pthread_mutex_lock (&t->admin_mutex); - if (!t->exited) + if (! SCM_UNBNDP (timeout)) + { + to_timespec (timeout, &ctimeout); + timeout_ptr = &ctimeout; + } + + if (t->exited) + res = t->result; + else { while (1) { - block_self (t->join_queue, thread, &t->admin_mutex, NULL); - if (t->exited) + int err = block_self (t->join_queue, thread, &t->admin_mutex, + timeout_ptr); + if (err == 0) + { + if (t->exited) + { + res = t->result; + break; + } + } + else if (err == ETIMEDOUT) break; + scm_i_pthread_mutex_unlock (&t->admin_mutex); SCM_TICK; scm_i_scm_pthread_mutex_lock (&t->admin_mutex); } } - res = t->result; scm_i_pthread_mutex_unlock (&t->admin_mutex); @@ -1025,26 +1109,14 @@ SCM_DEFINE (scm_join_thread, "join-thread", 1, 0, 0, } #undef FUNC_NAME - - -/*** Fat mutexes */ - -/* We implement our own mutex type since we want them to be 'fair', we - want to do fancy things while waiting for them (like running - asyncs) and we might want to add things that are nice for - debugging. -*/ - -typedef struct { - scm_i_pthread_mutex_t lock; - SCM owner; - int level; /* how much the owner owns us. - < 0 for non-recursive mutexes */ - SCM waiting; /* the threads waiting for this mutex. */ -} fat_mutex; - -#define SCM_MUTEXP(x) SCM_SMOB_PREDICATE (scm_tc16_mutex, x) -#define SCM_MUTEX_DATA(x) ((fat_mutex *) SCM_SMOB_DATA (x)) +SCM_DEFINE (scm_thread_p, "thread?", 1, 0, 0, + (SCM obj), + "Return @code{#t} if @var{obj} is a thread.") +#define FUNC_NAME s_scm_thread_p +{ + return SCM_I_IS_THREAD(obj) ? SCM_BOOL_T : SCM_BOOL_F; +} +#undef FUNC_NAME static SCM fat_mutex_mark (SCM mx) @@ -1074,7 +1146,7 @@ fat_mutex_print (SCM mx, SCM port, scm_print_state *pstate SCM_UNUSED) } static SCM -make_fat_mutex (int recursive) +make_fat_mutex (int recursive, int unchecked_unlock, int external_unlock) { fat_mutex *m; SCM mx; @@ -1083,18 +1155,47 @@ make_fat_mutex (int recursive) scm_i_pthread_mutex_init (&m->lock, NULL); m->owner = SCM_BOOL_F; m->level = recursive? 0 : -1; + + m->unchecked_unlock = unchecked_unlock; + m->allow_external_unlock = external_unlock; + m->waiting = SCM_EOL; SCM_NEWSMOB (mx, scm_tc16_mutex, (scm_t_bits) m); m->waiting = make_queue (); return mx; } -SCM_DEFINE (scm_make_mutex, "make-mutex", 0, 0, 0, - (void), - "Create a new mutex. ") -#define FUNC_NAME s_scm_make_mutex +SCM scm_make_mutex (void) { - return make_fat_mutex (0); + return scm_make_mutex_with_flags (SCM_EOL); +} + +static SCM unchecked_unlock_sym; +static SCM allow_external_unlock_sym; +static SCM recursive_sym; + +SCM_DEFINE (scm_make_mutex_with_flags, "make-mutex", 0, 0, 1, + (SCM flags), + "Create a new mutex. ") +#define FUNC_NAME s_scm_make_mutex_with_flags +{ + int unchecked_unlock = 0, external_unlock = 0, recursive = 0; + + SCM ptr = flags; + while (! scm_is_null (ptr)) + { + SCM flag = SCM_CAR (ptr); + if (scm_is_eq (flag, unchecked_unlock_sym)) + unchecked_unlock = 1; + else if (scm_is_eq (flag, allow_external_unlock_sym)) + external_unlock = 1; + else if (scm_is_eq (flag, recursive_sym)) + recursive = 1; + else + SCM_MISC_ERROR ("unsupported mutex option", SCM_EOL); + ptr = SCM_CDR (ptr); + } + return make_fat_mutex (recursive, unchecked_unlock, external_unlock); } #undef FUNC_NAME @@ -1103,59 +1204,121 @@ SCM_DEFINE (scm_make_recursive_mutex, "make-recursive-mutex", 0, 0, 0, "Create a new recursive mutex. ") #define FUNC_NAME s_scm_make_recursive_mutex { - return make_fat_mutex (1); + return make_fat_mutex (1, 0, 0); } #undef FUNC_NAME -static char * -fat_mutex_lock (SCM mutex) +SCM_SYMBOL (scm_abandoned_mutex_error_key, "abandoned-mutex-error"); + +static SCM +fat_mutex_lock (SCM mutex, scm_t_timespec *timeout, int *ret) { fat_mutex *m = SCM_MUTEX_DATA (mutex); + SCM thread = scm_current_thread (); - char *msg = NULL; + scm_i_thread *t = SCM_I_THREAD_DATA (thread); + + SCM err = SCM_BOOL_F; + + struct timeval current_time; scm_i_scm_pthread_mutex_lock (&m->lock); if (scm_is_false (m->owner)) - m->owner = thread; + { + m->owner = thread; + scm_i_pthread_mutex_lock (&t->admin_mutex); + t->mutexes = scm_cons (mutex, t->mutexes); + scm_i_pthread_mutex_unlock (&t->admin_mutex); + *ret = 1; + } else if (scm_is_eq (m->owner, thread)) { if (m->level >= 0) - m->level++; + { + m->level++; + *ret = 1; + } else - msg = "mutex already locked by current thread"; + err = scm_cons (scm_misc_error_key, + scm_from_locale_string ("mutex already locked by " + "current thread")); } else { + int first_iteration = 1; while (1) { - block_self (m->waiting, mutex, &m->lock, NULL); - if (scm_is_eq (m->owner, thread)) - break; - scm_i_pthread_mutex_unlock (&m->lock); - SCM_TICK; - scm_i_scm_pthread_mutex_lock (&m->lock); + if (scm_is_eq (m->owner, thread) || scm_c_thread_exited_p (m->owner)) + { + scm_i_pthread_mutex_lock (&t->admin_mutex); + t->mutexes = scm_cons (mutex, t->mutexes); + scm_i_pthread_mutex_unlock (&t->admin_mutex); + *ret = 1; + if (scm_c_thread_exited_p (m->owner)) + { + m->owner = thread; + err = scm_cons (scm_abandoned_mutex_error_key, + scm_from_locale_string ("lock obtained on " + "abandoned mutex")); + } + break; + } + else if (!first_iteration) + { + if (timeout != NULL) + { + gettimeofday (¤t_time, NULL); + if (current_time.tv_sec > timeout->tv_sec || + (current_time.tv_sec == timeout->tv_sec && + current_time.tv_usec * 1000 > timeout->tv_nsec)) + { + *ret = 0; + break; + } + } + scm_i_pthread_mutex_unlock (&m->lock); + SCM_TICK; + scm_i_scm_pthread_mutex_lock (&m->lock); + } + else + first_iteration = 0; + block_self (m->waiting, mutex, &m->lock, timeout); } } scm_i_pthread_mutex_unlock (&m->lock); - return msg; + return err; } -SCM_DEFINE (scm_lock_mutex, "lock-mutex", 1, 0, 0, - (SCM mx), +SCM scm_lock_mutex (SCM mx) +{ + return scm_lock_mutex_timed (mx, SCM_UNDEFINED); +} + +SCM_DEFINE (scm_lock_mutex_timed, "lock-mutex", 1, 1, 0, + (SCM m, SCM timeout), "Lock @var{mutex}. If the mutex is already locked, the calling thread " "blocks until the mutex becomes available. The function returns when " "the calling thread owns the lock on @var{mutex}. Locking a mutex that " "a thread already owns will succeed right away and will not block the " "thread. That is, Guile's mutexes are @emph{recursive}. ") -#define FUNC_NAME s_scm_lock_mutex +#define FUNC_NAME s_scm_lock_mutex_timed { - char *msg; + SCM exception; + int ret = 0; + scm_t_timespec cwaittime, *waittime = NULL; - SCM_VALIDATE_MUTEX (1, mx); - msg = fat_mutex_lock (mx); - if (msg) - scm_misc_error (NULL, msg, SCM_EOL); - return SCM_BOOL_T; + SCM_VALIDATE_MUTEX (1, m); + + if (! SCM_UNBNDP (timeout) && ! scm_is_false (timeout)) + { + to_timespec (timeout, &cwaittime); + waittime = &cwaittime; + } + + exception = fat_mutex_lock (m, waittime, &ret); + if (!scm_is_false (exception)) + scm_ithrow (SCM_CAR (exception), scm_list_1 (SCM_CDR (exception)), 1); + return ret ? SCM_BOOL_T : SCM_BOOL_F; } #undef FUNC_NAME @@ -1168,71 +1331,134 @@ scm_dynwind_lock_mutex (SCM mutex) SCM_F_WIND_EXPLICITLY); } -static char * -fat_mutex_trylock (fat_mutex *m, int *resp) -{ - char *msg = NULL; - SCM thread = scm_current_thread (); - - *resp = 1; - scm_i_pthread_mutex_lock (&m->lock); - if (scm_is_false (m->owner)) - m->owner = thread; - else if (scm_is_eq (m->owner, thread)) - { - if (m->level >= 0) - m->level++; - else - msg = "mutex already locked by current thread"; - } - else - *resp = 0; - scm_i_pthread_mutex_unlock (&m->lock); - return msg; -} - SCM_DEFINE (scm_try_mutex, "try-mutex", 1, 0, 0, (SCM mutex), "Try to lock @var{mutex}. If the mutex is already locked by someone " "else, return @code{#f}. Else lock the mutex and return @code{#t}. ") #define FUNC_NAME s_scm_try_mutex { - char *msg; - int res; + SCM exception; + int ret = 0; + scm_t_timespec cwaittime, *waittime = NULL; SCM_VALIDATE_MUTEX (1, mutex); + + to_timespec (scm_from_int(0), &cwaittime); + waittime = &cwaittime; - msg = fat_mutex_trylock (SCM_MUTEX_DATA (mutex), &res); - if (msg) - scm_misc_error (NULL, msg, SCM_EOL); - return scm_from_bool (res); + exception = fat_mutex_lock (mutex, waittime, &ret); + if (!scm_is_false (exception)) + scm_ithrow (SCM_CAR (exception), scm_list_1 (SCM_CDR (exception)), 1); + return ret ? SCM_BOOL_T : SCM_BOOL_F; } #undef FUNC_NAME -static char * -fat_mutex_unlock (fat_mutex *m) +/*** Fat condition variables */ + +typedef struct { + scm_i_pthread_mutex_t lock; + SCM waiting; /* the threads waiting for this condition. */ +} fat_cond; + +#define SCM_CONDVARP(x) SCM_SMOB_PREDICATE (scm_tc16_condvar, x) +#define SCM_CONDVAR_DATA(x) ((fat_cond *) SCM_SMOB_DATA (x)) + +static int +fat_mutex_unlock (SCM mutex, SCM cond, + const scm_t_timespec *waittime, int relock) { - char *msg = NULL; + fat_mutex *m = SCM_MUTEX_DATA (mutex); + fat_cond *c = NULL; + scm_i_thread *t = SCM_I_CURRENT_THREAD; + int err = 0, ret = 0; scm_i_scm_pthread_mutex_lock (&m->lock); if (!scm_is_eq (m->owner, scm_current_thread ())) { if (scm_is_false (m->owner)) - msg = "mutex not locked"; - else - msg = "mutex not locked by current thread"; + { + if (!m->unchecked_unlock) + scm_misc_error (NULL, "mutex not locked", SCM_EOL); + } + else if (!m->allow_external_unlock) + scm_misc_error (NULL, "mutex not locked by current thread", SCM_EOL); } - else if (m->level > 0) - m->level--; - else - m->owner = unblock_from_queue (m->waiting); - scm_i_pthread_mutex_unlock (&m->lock); - return msg; + if (! (SCM_UNBNDP (cond))) + { + int lock_ret = 0; + + c = SCM_CONDVAR_DATA (cond); + while (1) + { + int brk = 0; + + scm_i_scm_pthread_mutex_lock (&c->lock); + if (m->level > 0) + m->level--; + else + m->owner = unblock_from_queue (m->waiting); + scm_i_pthread_mutex_unlock (&m->lock); + + t->block_asyncs++; + + err = block_self (c->waiting, cond, &c->lock, waittime); + + if (err == 0) + { + ret = 1; + brk = 1; + } + else if (err == ETIMEDOUT) + { + ret = 0; + brk = 1; + } + else if (err != EINTR) + { + errno = err; + scm_i_pthread_mutex_unlock (&c->lock); + scm_syserror (NULL); + } + + if (brk) + { + if (relock) + fat_mutex_lock (mutex, NULL, &lock_ret); + scm_i_pthread_mutex_unlock (&c->lock); + break; + } + + scm_i_pthread_mutex_unlock (&c->lock); + + t->block_asyncs--; + scm_async_click (); + + scm_remember_upto_here_2 (cond, mutex); + + scm_i_scm_pthread_mutex_lock (&m->lock); + } + } + else + { + if (m->level > 0) + m->level--; + else + m->owner = unblock_from_queue (m->waiting); + scm_i_pthread_mutex_unlock (&m->lock); + ret = 1; + } + + return ret; } -SCM_DEFINE (scm_unlock_mutex, "unlock-mutex", 1, 0, 0, - (SCM mx), +SCM scm_unlock_mutex (SCM mx) +{ + return scm_unlock_mutex_timed (mx, SCM_UNDEFINED, SCM_UNDEFINED); +} + +SCM_DEFINE (scm_unlock_mutex_timed, "unlock-mutex", 1, 2, 0, + (SCM mx, SCM cond, SCM timeout), "Unlocks @var{mutex} if the calling thread owns the lock on " "@var{mutex}. Calling unlock-mutex on a mutex not owned by the current " "thread results in undefined behaviour. Once a mutex has been unlocked, " @@ -1240,18 +1466,35 @@ SCM_DEFINE (scm_unlock_mutex, "unlock-mutex", 1, 0, 0, "lock. Every call to @code{lock-mutex} by this thread must be matched " "with a call to @code{unlock-mutex}. Only the last call to " "@code{unlock-mutex} will actually unlock the mutex. ") -#define FUNC_NAME s_scm_unlock_mutex +#define FUNC_NAME s_scm_unlock_mutex_timed { - char *msg; + scm_t_timespec cwaittime, *waittime = NULL; + SCM_VALIDATE_MUTEX (1, mx); - - msg = fat_mutex_unlock (SCM_MUTEX_DATA (mx)); - if (msg) - scm_misc_error (NULL, msg, SCM_EOL); - return SCM_BOOL_T; + if (! (SCM_UNBNDP (cond))) + { + SCM_VALIDATE_CONDVAR (2, cond); + + if (! (SCM_UNBNDP (timeout))) + { + to_timespec (timeout, &cwaittime); + waittime = &cwaittime; + } + } + + return fat_mutex_unlock (mx, cond, waittime, 0) ? SCM_BOOL_T : SCM_BOOL_F; } #undef FUNC_NAME +SCM_DEFINE (scm_mutex_p, "mutex?", 1, 0, 0, + (SCM obj), + "Return @code{#t} if @var{obj} is a mutex.") +#define FUNC_NAME s_scm_mutex_p +{ + return SCM_MUTEXP (obj) ? SCM_BOOL_T : SCM_BOOL_F; +} +#undef FUNC_NAME + #if 0 SCM_DEFINE (scm_mutex_owner, "mutex-owner", 1, 0, 0, @@ -1277,16 +1520,6 @@ SCM_DEFINE (scm_mutex_level, "mutex-level", 1, 0, 0, #endif -/*** Fat condition variables */ - -typedef struct { - scm_i_pthread_mutex_t lock; - SCM waiting; /* the threads waiting for this condition. */ -} fat_cond; - -#define SCM_CONDVARP(x) SCM_SMOB_PREDICATE (scm_tc16_condvar, x) -#define SCM_CONDVAR_DATA(x) ((fat_cond *) SCM_SMOB_DATA (x)) - static SCM fat_cond_mark (SCM cv) { @@ -1334,43 +1567,7 @@ static int fat_cond_timedwait (SCM cond, SCM mutex, const scm_t_timespec *waittime) { - scm_i_thread *t = SCM_I_CURRENT_THREAD; - fat_cond *c = SCM_CONDVAR_DATA (cond); - fat_mutex *m = SCM_MUTEX_DATA (mutex); - const char *msg; - int err = 0; - - while (1) - { - scm_i_scm_pthread_mutex_lock (&c->lock); - msg = fat_mutex_unlock (m); - t->block_asyncs++; - if (msg == NULL) - { - err = block_self (c->waiting, cond, &c->lock, waittime); - scm_i_pthread_mutex_unlock (&c->lock); - fat_mutex_lock (mutex); - } - else - scm_i_pthread_mutex_unlock (&c->lock); - t->block_asyncs--; - scm_async_click (); - - if (msg) - scm_misc_error (NULL, msg, SCM_EOL); - - scm_remember_upto_here_2 (cond, mutex); - - if (err == 0) - return 1; - if (err == ETIMEDOUT) - return 0; - if (err != EINTR) - { - errno = err; - scm_syserror (NULL); - } - } + return fat_mutex_unlock (mutex, cond, waittime, 1); } SCM_DEFINE (scm_timed_wait_condition_variable, "wait-condition-variable", 2, 1, 0, @@ -1393,20 +1590,11 @@ SCM_DEFINE (scm_timed_wait_condition_variable, "wait-condition-variable", 2, 1, if (!SCM_UNBNDP (t)) { - if (scm_is_pair (t)) - { - waittime.tv_sec = scm_to_ulong (SCM_CAR (t)); - waittime.tv_nsec = scm_to_ulong (SCM_CAR (t)) * 1000; - } - else - { - waittime.tv_sec = scm_to_ulong (t); - waittime.tv_nsec = 0; - } + to_timespec (t, &waittime); waitptr = &waittime; } - return scm_from_bool (fat_cond_timedwait (cv, mx, waitptr)); + return fat_cond_timedwait (cv, mx, waitptr) ? SCM_BOOL_T : SCM_BOOL_F; } #undef FUNC_NAME @@ -1449,6 +1637,15 @@ SCM_DEFINE (scm_broadcast_condition_variable, "broadcast-condition-variable", 1, } #undef FUNC_NAME +SCM_DEFINE (scm_condition_variable_p, "condition-variable?", 1, 0, 0, + (SCM obj), + "Return @code{#t} if @var{obj} is a condition variable.") +#define FUNC_NAME s_scm_condition_variable_p +{ + return SCM_CONDVARP(obj) ? SCM_BOOL_T : SCM_BOOL_F; +} +#undef FUNC_NAME + /*** Marking stacks */ /* XXX - what to do with this? Do we need to handle this for blocked @@ -1800,6 +1997,12 @@ scm_init_threads () scm_set_smob_print (scm_tc16_mutex, fat_mutex_print); scm_set_smob_free (scm_tc16_mutex, fat_mutex_free); + unchecked_unlock_sym = + scm_permanent_object (scm_from_locale_symbol ("unchecked-unlock")); + allow_external_unlock_sym = + scm_permanent_object (scm_from_locale_symbol ("allow-external-unlock")); + recursive_sym = scm_permanent_object (scm_from_locale_symbol ("recursive")); + scm_tc16_condvar = scm_make_smob_type ("condition-variable", sizeof (fat_cond)); scm_set_smob_mark (scm_tc16_condvar, fat_cond_mark); diff --git a/libguile/threads.h b/libguile/threads.h index b19fbe3f4..e1944a552 100644 --- a/libguile/threads.h +++ b/libguile/threads.h @@ -54,6 +54,7 @@ typedef struct scm_i_thread { SCM join_queue; scm_i_pthread_mutex_t admin_mutex; + SCM mutexes; SCM result; int canceled; @@ -162,13 +163,19 @@ SCM_API SCM scm_cancel_thread (SCM t); SCM_API SCM scm_set_thread_cleanup_x (SCM thread, SCM proc); SCM_API SCM scm_thread_cleanup (SCM thread); SCM_API SCM scm_join_thread (SCM t); +SCM_API SCM scm_join_thread_timed (SCM t, SCM timeout, SCM timeoutval); +SCM_API SCM scm_thread_p (SCM t); SCM_API SCM scm_make_mutex (void); SCM_API SCM scm_make_recursive_mutex (void); +SCM_API SCM scm_make_mutex_with_flags (SCM flags); SCM_API SCM scm_lock_mutex (SCM m); +SCM_API SCM scm_lock_mutex_timed (SCM m, SCM timeout); SCM_API void scm_dynwind_lock_mutex (SCM mutex); SCM_API SCM scm_try_mutex (SCM m); SCM_API SCM scm_unlock_mutex (SCM m); +SCM_API SCM scm_unlock_mutex_timed (SCM m, SCM cond, SCM timeout); +SCM_API SCM scm_mutex_p (SCM o); SCM_API SCM scm_make_condition_variable (void); SCM_API SCM scm_wait_condition_variable (SCM cond, SCM mutex); @@ -176,6 +183,7 @@ SCM_API SCM scm_timed_wait_condition_variable (SCM cond, SCM mutex, SCM abstime); SCM_API SCM scm_signal_condition_variable (SCM cond); SCM_API SCM scm_broadcast_condition_variable (SCM cond); +SCM_API SCM scm_condition_variable_p (SCM o); SCM_API SCM scm_current_thread (void); SCM_API SCM scm_all_threads (void); diff --git a/test-suite/tests/threads.test b/test-suite/tests/threads.test index 10b1b91a0..62ee0cdc7 100644 --- a/test-suite/tests/threads.test +++ b/test-suite/tests/threads.test @@ -137,6 +137,97 @@ '(0 1 2 3 4 5)) (equal? result '(10 8 6 4 2 0))))) + ;; + ;; timed mutex locking + ;; + + (with-test-prefix "lock-mutex" + + (pass-if "timed locking fails if timeout exceeded" + (let ((m (make-mutex))) + (lock-mutex m) + (let ((t (begin-thread (lock-mutex m (+ (current-time) 1))))) + (not (join-thread t))))) + + (pass-if "timed locking succeeds if mutex unlocked within timeout" + (let* ((m (make-mutex)) + (c (make-condition-variable)) + (cm (make-mutex))) + (lock-mutex cm) + (let ((t (begin-thread (begin (lock-mutex cm) + (signal-condition-variable c) + (unlock-mutex cm) + (lock-mutex m + (+ (current-time) 2)))))) + (lock-mutex m) + (wait-condition-variable c cm) + (unlock-mutex cm) + (sleep 1) + (unlock-mutex m) + (join-thread t))))) + + ;; + ;; timed mutex unlocking + ;; + + (with-test-prefix "unlock-mutex" + + (pass-if "timed unlocking returns #f if timeout exceeded" + (let ((m (make-mutex)) + (c (make-condition-variable))) + (lock-mutex m) + (not (unlock-mutex m c (current-time))))) + + (pass-if "timed unlocking returns #t if condition signaled" + (let ((m1 (make-mutex)) + (m2 (make-mutex)) + (c1 (make-condition-variable)) + (c2 (make-condition-variable))) + (lock-mutex m1) + (let ((t (begin-thread (begin (lock-mutex m1) + (signal-condition-variable c1) + (lock-mutex m2) + (unlock-mutex m1) + (unlock-mutex m2 + c2 + (+ (current-time) + 2)))))) + (wait-condition-variable c1 m1) + (unlock-mutex m1) + (lock-mutex m2) + (signal-condition-variable c2) + (unlock-mutex m2) + (join-thread t))))) + + ;; + ;; timed joining + ;; + + (with-test-prefix "join-thread" + + (pass-if "timed joining fails if timeout exceeded" + (let* ((m (make-mutex)) + (c (make-condition-variable)) + (t (begin-thread (begin (lock-mutex m) + (wait-condition-variable c m)))) + (r (join-thread t (current-time)))) + (cancel-thread t) + (not r))) + + (pass-if "join-thread returns timeoutval on timeout" + (let* ((m (make-mutex)) + (c (make-condition-variable)) + (t (begin-thread (begin (lock-mutex m) + (wait-condition-variable c m)))) + (r (join-thread t (current-time) 'foo))) + (cancel-thread t) + (eq? r 'foo))) + + + (pass-if "timed joining succeeds if thread exits within timeout" + (let ((t (begin-thread (begin (sleep 1) #t)))) + (join-thread t (+ (current-time) 2))))) + ;; ;; thread cancellation ;; @@ -185,4 +276,35 @@ (eq? (join-thread t) 'bar)))) (pass-if "initial handler is false" - (not (thread-cleanup (current-thread))))))) + (not (thread-cleanup (current-thread))))) + + ;; + ;; mutex behavior + ;; + + (with-test-prefix "mutex-behavior" + + (pass-if "unchecked unlock" + (let* ((m (make-mutex 'unchecked-unlock))) + (unlock-mutex m))) + + (pass-if "allow external unlock" + (let* ((m (make-mutex 'allow-external-unlock)) + (t (begin-thread (lock-mutex m)))) + (join-thread t) + (unlock-mutex m))) + + (pass-if "recursive mutexes" + (let* ((m (make-mutex 'recursive))) + (lock-mutex m) + (lock-mutex m))) + + (pass-if "locking abandoned mutex throws exception" + (let* ((m (make-mutex)) + (t (begin-thread (lock-mutex m))) + (success #f)) + (join-thread t) + (catch 'abandoned-mutex-error + (lambda () (lock-mutex m)) + (lambda key (set! success #t))) + success)))))