mirror of
https://git.savannah.gnu.org/git/guile.git
synced 2025-05-20 03:30:27 +02:00
join-thread in Scheme
* module/ice-9/threads.scm (join-thread): Implement in Scheme. (call-with-new-thread): Arrange to record values in a weak table and signal the join cond. (with-mutex): Move up definition; call-with-new-thread needs it. (How was this working before?) * libguile/threads.c (guilify_self_1, guilify_self_2, do_thread_exit): Remove join queue management. * libguile/threads.c (scm_join_thread, scm_join_thread_timed): Call out to Scheme. (scm_init_ice_9_threads): Capture join-thread var.
This commit is contained in:
parent
e447258c3f
commit
a521440029
3 changed files with 70 additions and 89 deletions
|
@ -408,7 +408,6 @@ guilify_self_1 (struct GC_stack_base *base)
|
|||
t.pthread = scm_i_pthread_self ();
|
||||
t.handle = SCM_BOOL_F;
|
||||
t.result = SCM_BOOL_F;
|
||||
t.join_queue = SCM_EOL;
|
||||
t.freelists = NULL;
|
||||
t.pointerless_freelists = NULL;
|
||||
t.dynamic_state = SCM_BOOL_F;
|
||||
|
@ -491,7 +490,6 @@ guilify_self_2 (SCM parent)
|
|||
t->dynstack.limit = t->dynstack.base + 16;
|
||||
t->dynstack.top = t->dynstack.base + SCM_DYNSTACK_HEADER_LEN;
|
||||
|
||||
t->join_queue = make_queue ();
|
||||
t->block_asyncs = 0;
|
||||
|
||||
/* See note in finalizers.c:queue_finalizer_async(). */
|
||||
|
@ -509,13 +507,9 @@ do_thread_exit (void *v)
|
|||
scm_i_thread *t = (scm_i_thread *) v;
|
||||
|
||||
scm_i_scm_pthread_mutex_lock (&t->admin_mutex);
|
||||
|
||||
t->exited = 1;
|
||||
close (t->sleep_pipe[0]);
|
||||
close (t->sleep_pipe[1]);
|
||||
while (scm_is_true (unblock_from_queue (t->join_queue)))
|
||||
;
|
||||
|
||||
scm_i_pthread_mutex_unlock (&t->admin_mutex);
|
||||
|
||||
return NULL;
|
||||
|
@ -867,9 +861,6 @@ SCM_DEFINE (scm_yield, "yield", 0, 0, 0,
|
|||
}
|
||||
#undef FUNC_NAME
|
||||
|
||||
/* Some systems, notably Android, lack 'pthread_cancel'. Don't provide
|
||||
'cancel-thread' on these systems. */
|
||||
|
||||
static SCM cancel_thread_var;
|
||||
|
||||
SCM
|
||||
|
@ -879,79 +870,26 @@ scm_cancel_thread (SCM thread)
|
|||
return SCM_UNSPECIFIED;
|
||||
}
|
||||
|
||||
static SCM join_thread_var;
|
||||
|
||||
SCM
|
||||
scm_join_thread (SCM thread)
|
||||
{
|
||||
return scm_join_thread_timed (thread, SCM_UNDEFINED, SCM_UNDEFINED);
|
||||
return scm_call_1 (scm_variable_ref (join_thread_var), thread);
|
||||
}
|
||||
|
||||
SCM_DEFINE (scm_join_thread_timed, "join-thread", 1, 2, 0,
|
||||
(SCM thread, SCM timeout, SCM timeoutval),
|
||||
"Suspend execution of the calling thread until the target @var{thread} "
|
||||
"terminates, unless the target @var{thread} has already terminated. ")
|
||||
#define FUNC_NAME s_scm_join_thread_timed
|
||||
SCM
|
||||
scm_join_thread_timed (SCM thread, SCM timeout, SCM timeoutval)
|
||||
{
|
||||
scm_i_thread *t;
|
||||
scm_t_timespec ctimeout, *timeout_ptr = NULL;
|
||||
SCM res = SCM_BOOL_F;
|
||||
SCM join_thread = scm_variable_ref (join_thread_var);
|
||||
|
||||
if (! (SCM_UNBNDP (timeoutval)))
|
||||
res = timeoutval;
|
||||
|
||||
SCM_VALIDATE_THREAD (1, thread);
|
||||
if (scm_is_eq (scm_current_thread (), thread))
|
||||
SCM_MISC_ERROR ("cannot join the current thread", SCM_EOL);
|
||||
|
||||
t = SCM_I_THREAD_DATA (thread);
|
||||
scm_i_scm_pthread_mutex_lock (&t->admin_mutex);
|
||||
|
||||
if (! SCM_UNBNDP (timeout))
|
||||
{
|
||||
to_timespec (timeout, &ctimeout);
|
||||
timeout_ptr = &ctimeout;
|
||||
}
|
||||
|
||||
if (t->exited)
|
||||
res = t->result;
|
||||
if (SCM_UNBNDP (timeout))
|
||||
return scm_call_1 (join_thread, thread);
|
||||
else if (SCM_UNBNDP (timeoutval))
|
||||
return scm_call_2 (join_thread, thread, timeout);
|
||||
else
|
||||
{
|
||||
while (1)
|
||||
{
|
||||
int err = block_self (t->join_queue, &t->admin_mutex,
|
||||
timeout_ptr);
|
||||
scm_remember_upto_here_1 (thread);
|
||||
if (err == 0)
|
||||
{
|
||||
if (t->exited)
|
||||
{
|
||||
res = t->result;
|
||||
break;
|
||||
}
|
||||
}
|
||||
else if (err == ETIMEDOUT)
|
||||
break;
|
||||
|
||||
scm_i_pthread_mutex_unlock (&t->admin_mutex);
|
||||
SCM_TICK;
|
||||
scm_i_scm_pthread_mutex_lock (&t->admin_mutex);
|
||||
|
||||
/* Check for exit again, since we just released and
|
||||
reacquired the admin mutex, before the next block_self
|
||||
call (which would block forever if t has already
|
||||
exited). */
|
||||
if (t->exited)
|
||||
{
|
||||
res = t->result;
|
||||
break;
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
scm_i_pthread_mutex_unlock (&t->admin_mutex);
|
||||
|
||||
return res;
|
||||
return scm_call_3 (join_thread, thread, timeout, timeoutval);
|
||||
}
|
||||
#undef FUNC_NAME
|
||||
|
||||
SCM_DEFINE (scm_thread_p, "thread?", 1, 0, 0,
|
||||
(SCM obj),
|
||||
|
@ -1875,6 +1813,9 @@ scm_init_ice_9_threads (void *unused)
|
|||
cancel_thread_var =
|
||||
scm_module_variable (scm_current_module (),
|
||||
scm_from_latin1_symbol ("cancel-thread"));
|
||||
join_thread_var =
|
||||
scm_module_variable (scm_current_module (),
|
||||
scm_from_latin1_symbol ("join-thread"));
|
||||
call_with_new_thread_var =
|
||||
scm_module_variable (scm_current_module (),
|
||||
scm_from_latin1_symbol ("call-with-new-thread"));
|
||||
|
|
|
@ -55,8 +55,6 @@ typedef struct scm_i_thread {
|
|||
SCM handle;
|
||||
scm_i_pthread_t pthread;
|
||||
|
||||
SCM join_queue;
|
||||
|
||||
scm_i_pthread_mutex_t admin_mutex;
|
||||
|
||||
SCM result;
|
||||
|
|
|
@ -85,6 +85,13 @@
|
|||
|
||||
|
||||
|
||||
(define-syntax-rule (with-mutex m e0 e1 ...)
|
||||
(let ((x m))
|
||||
(dynamic-wind
|
||||
(lambda () (lock-mutex x))
|
||||
(lambda () (begin e0 e1 ...))
|
||||
(lambda () (unlock-mutex x)))))
|
||||
|
||||
(define cancel-tag (make-prompt-tag "cancel"))
|
||||
(define (cancel-thread thread . values)
|
||||
"Asynchronously interrupt the target @var{thread} and ask it to
|
||||
|
@ -101,6 +108,9 @@ no-op."
|
|||
(error "thread cancellation failed, throwing error instead???"))))
|
||||
thread))
|
||||
|
||||
(define thread-join-data (make-object-property))
|
||||
(define %thread-results (make-object-property))
|
||||
|
||||
(define* (call-with-new-thread thunk #:optional handler)
|
||||
"Call @code{thunk} in a new thread and with a new dynamic state,
|
||||
returning a new thread object representing the thread. The procedure
|
||||
|
@ -121,21 +131,60 @@ Once @var{thunk} or @var{handler} returns, the return value is made the
|
|||
(with-mutex mutex
|
||||
(%call-with-new-thread
|
||||
(lambda ()
|
||||
(call-with-prompt cancel-tag
|
||||
(lambda ()
|
||||
(call-with-values
|
||||
(lambda ()
|
||||
(with-continuation-barrier
|
||||
(lambda ()
|
||||
(call-with-prompt cancel-tag
|
||||
(lambda ()
|
||||
(lock-mutex mutex)
|
||||
(set! thread (current-thread))
|
||||
(set! (thread-join-data thread) (cons cv mutex))
|
||||
(signal-condition-variable cv)
|
||||
(unlock-mutex mutex)
|
||||
(thunk))
|
||||
(lambda (k . args)
|
||||
(apply values args))))))
|
||||
(lambda vals
|
||||
(lock-mutex mutex)
|
||||
(set! thread (current-thread))
|
||||
(signal-condition-variable cv)
|
||||
;; Probably now you're wondering why we are going to use
|
||||
;; the cond variable as the key into the thread results
|
||||
;; object property. It's because there is a possibility
|
||||
;; that the thread object itself ends up as part of the
|
||||
;; result, and if that happens we create a cycle whereby
|
||||
;; the strong reference to a thread in the value of the
|
||||
;; weak-key hash table used by the object property prevents
|
||||
;; the thread from ever being collected. So instead we use
|
||||
;; the cv as the key. Weak-key hash tables, amirite?
|
||||
(set! (%thread-results cv) vals)
|
||||
(broadcast-condition-variable cv)
|
||||
(unlock-mutex mutex)
|
||||
(thunk))
|
||||
(lambda (k . args)
|
||||
(apply values args)))))
|
||||
(apply values vals)))))
|
||||
(let lp ()
|
||||
(unless thread
|
||||
(wait-condition-variable cv mutex)
|
||||
(lp))))
|
||||
thread))
|
||||
|
||||
(define* (join-thread thread #:optional timeout timeoutval)
|
||||
"Suspend execution of the calling thread until the target @var{thread}
|
||||
terminates, unless the target @var{thread} has already terminated."
|
||||
(match (thread-join-data thread)
|
||||
(#f (error "foreign thread cannot be joined" thread))
|
||||
((cv . mutex)
|
||||
(lock-mutex mutex)
|
||||
(let lp ()
|
||||
(cond
|
||||
((%thread-results cv)
|
||||
=> (lambda (results)
|
||||
(unlock-mutex mutex)
|
||||
(apply values results)))
|
||||
((if timeout
|
||||
(wait-condition-variable cv mutex timeout)
|
||||
(wait-condition-variable cv mutex))
|
||||
(lp))
|
||||
(else timeoutval))))))
|
||||
|
||||
(define* (try-mutex mutex)
|
||||
"Try to lock @var{mutex}. If the mutex is already locked, return
|
||||
@code{#f}. Otherwise lock the mutex and return @code{#t}."
|
||||
|
@ -155,13 +204,6 @@ Once @var{thunk} or @var{handler} returns, the return value is made the
|
|||
(lambda () (proc arg ...))
|
||||
%thread-handler))
|
||||
|
||||
(define-syntax-rule (with-mutex m e0 e1 ...)
|
||||
(let ((x m))
|
||||
(dynamic-wind
|
||||
(lambda () (lock-mutex x))
|
||||
(lambda () (begin e0 e1 ...))
|
||||
(lambda () (unlock-mutex x)))))
|
||||
|
||||
(define monitor-mutex-table (make-hash-table))
|
||||
|
||||
(define monitor-mutex-table-mutex (make-mutex))
|
||||
|
|
Loading…
Add table
Add a link
Reference in a new issue