mirror of
https://git.savannah.gnu.org/git/guile.git
synced 2025-06-12 23:00:22 +02:00
futures: Keep futures unlocked while they are processing.
* module/ice-9/futures.scm (<future>)[completion]: New field. [done?]: Rename to... [state]: ... this. Change `set-future-done?!' to `set-future-state!', and `future-done?' to `future-state'. (make-future): Initialize the `completion' field to 'queued. (with-mutex): New macro. (process-future!): Remove `set-future-done?!' call. (process-futures): Check `future-state'. Unlock FUTURE's mutex before processing it. Broadcast FUTURE's `completion' cond. var. when done. (touch): Likewise.
This commit is contained in:
parent
7ae4e75af5
commit
f2fb5e5328
1 changed files with 76 additions and 35 deletions
|
@ -44,19 +44,21 @@
|
||||||
;;;
|
;;;
|
||||||
|
|
||||||
(define-record-type <future>
|
(define-record-type <future>
|
||||||
(%make-future thunk done? mutex)
|
(%make-future thunk state mutex completion)
|
||||||
future?
|
future?
|
||||||
(thunk future-thunk)
|
(thunk future-thunk)
|
||||||
(done? future-done? set-future-done?!)
|
(state future-state set-future-state!) ; done | started | queued
|
||||||
(result future-result set-future-result!)
|
(result future-result set-future-result!)
|
||||||
(mutex future-mutex))
|
(mutex future-mutex)
|
||||||
|
(completion future-completion)) ; completion cond. var.
|
||||||
|
|
||||||
(define (make-future thunk)
|
(define (make-future thunk)
|
||||||
"Return a new future for THUNK. Execution may start at any point
|
"Return a new future for THUNK. Execution may start at any point
|
||||||
concurrently, or it can start at the time when the returned future is
|
concurrently, or it can start at the time when the returned future is
|
||||||
touched."
|
touched."
|
||||||
(create-workers!)
|
(create-workers!)
|
||||||
(let ((future (%make-future thunk #f (make-mutex))))
|
(let ((future (%make-future thunk 'queued
|
||||||
|
(make-mutex) (make-condition-variable))))
|
||||||
(register-future! future)
|
(register-future! future)
|
||||||
future))
|
future))
|
||||||
|
|
||||||
|
@ -69,6 +71,14 @@ touched."
|
||||||
(define %futures-mutex (make-mutex))
|
(define %futures-mutex (make-mutex))
|
||||||
(define %futures-available (make-condition-variable))
|
(define %futures-available (make-condition-variable))
|
||||||
|
|
||||||
|
(define-syntax-rule (with-mutex m e0 e1 ...)
|
||||||
|
;; Copied from (ice-9 threads) to avoid circular dependency.
|
||||||
|
(let ((x m))
|
||||||
|
(dynamic-wind
|
||||||
|
(lambda () (lock-mutex x))
|
||||||
|
(lambda () (begin e0 e1 ...))
|
||||||
|
(lambda () (unlock-mutex x)))))
|
||||||
|
|
||||||
(define (register-future! future)
|
(define (register-future! future)
|
||||||
;; Register FUTURE as being processable.
|
;; Register FUTURE as being processable.
|
||||||
(lock-mutex %futures-mutex)
|
(lock-mutex %futures-mutex)
|
||||||
|
@ -77,7 +87,7 @@ touched."
|
||||||
(unlock-mutex %futures-mutex))
|
(unlock-mutex %futures-mutex))
|
||||||
|
|
||||||
(define (process-future! future)
|
(define (process-future! future)
|
||||||
;; Process FUTURE, assuming its mutex is already taken.
|
;; Process FUTURE, and update its result.
|
||||||
(set-future-result! future
|
(set-future-result! future
|
||||||
(catch #t
|
(catch #t
|
||||||
(lambda ()
|
(lambda ()
|
||||||
|
@ -87,8 +97,7 @@ touched."
|
||||||
(apply values results)))))
|
(apply values results)))))
|
||||||
(lambda args
|
(lambda args
|
||||||
(lambda ()
|
(lambda ()
|
||||||
(apply throw args)))))
|
(apply throw args))))))
|
||||||
(set-future-done?! future #t))
|
|
||||||
|
|
||||||
(define (process-futures)
|
(define (process-futures)
|
||||||
;; Wait for futures to be available and process them.
|
;; Wait for futures to be available and process them.
|
||||||
|
@ -101,42 +110,74 @@ touched."
|
||||||
(or (q-empty? %futures)
|
(or (q-empty? %futures)
|
||||||
(let ((future (deq! %futures)))
|
(let ((future (deq! %futures)))
|
||||||
(lock-mutex (future-mutex future))
|
(lock-mutex (future-mutex future))
|
||||||
(or (and (future-done? future)
|
(case (future-state future)
|
||||||
(unlock-mutex (future-mutex future)))
|
((done started)
|
||||||
(begin
|
;; Nothing to do.
|
||||||
;; Do the actual work.
|
(unlock-mutex (future-mutex future)))
|
||||||
|
(else
|
||||||
|
;; Do the actual work.
|
||||||
|
|
||||||
;; We want to release %FUTURES-MUTEX so that other workers
|
;; We want to release %FUTURES-MUTEX so that other workers can
|
||||||
;; can progress. However, to avoid deadlocks, we have to
|
;; progress. However, to avoid deadlocks, we have to unlock
|
||||||
;; unlock FUTURE as well, to preserve lock ordering.
|
;; FUTURE as well, to preserve lock ordering.
|
||||||
(unlock-mutex (future-mutex future))
|
(unlock-mutex (future-mutex future))
|
||||||
(unlock-mutex %futures-mutex)
|
(unlock-mutex %futures-mutex)
|
||||||
|
|
||||||
(lock-mutex (future-mutex future))
|
(lock-mutex (future-mutex future))
|
||||||
(or (future-done? future) ; lost the race?
|
(if (eq? (future-state future) 'queued) ; lost the race?
|
||||||
(process-future! future))
|
(begin ; no, so let's process it
|
||||||
(unlock-mutex (future-mutex future))
|
(set-future-state! future 'started)
|
||||||
|
(unlock-mutex (future-mutex future))
|
||||||
|
|
||||||
(lock-mutex %futures-mutex)))))
|
(process-future! future)
|
||||||
|
|
||||||
|
(with-mutex (future-mutex future)
|
||||||
|
(set-future-state! future 'done))
|
||||||
|
|
||||||
|
(broadcast-condition-variable (future-completion future)))
|
||||||
|
(unlock-mutex (future-mutex future))) ; yes
|
||||||
|
|
||||||
|
(lock-mutex %futures-mutex)))))
|
||||||
|
|
||||||
|
;; Look for more work.
|
||||||
(loop)))
|
(loop)))
|
||||||
|
|
||||||
(define (touch future)
|
(define (touch future)
|
||||||
"Return the result of FUTURE, computing it if not already done."
|
"Return the result of FUTURE, computing it if not already done."
|
||||||
(lock-mutex (future-mutex future))
|
(lock-mutex (future-mutex future))
|
||||||
(or (future-done? future)
|
(case (future-state future)
|
||||||
(begin
|
((done)
|
||||||
;; Do the actual work. Unlock FUTURE first to preserve lock
|
(unlock-mutex (future-mutex future)))
|
||||||
;; ordering.
|
((started)
|
||||||
(unlock-mutex (future-mutex future))
|
;; Wait for completion.
|
||||||
|
(wait-condition-variable (future-completion future)
|
||||||
|
(future-mutex future))
|
||||||
|
(unlock-mutex (future-mutex future)))
|
||||||
|
((queued)
|
||||||
|
(begin
|
||||||
|
;; Do the actual work. Unlock FUTURE first to preserve lock
|
||||||
|
;; ordering.
|
||||||
|
(unlock-mutex (future-mutex future))
|
||||||
|
|
||||||
(lock-mutex %futures-mutex)
|
(lock-mutex %futures-mutex)
|
||||||
(q-remove! %futures future)
|
(q-remove! %futures future)
|
||||||
(unlock-mutex %futures-mutex)
|
(unlock-mutex %futures-mutex)
|
||||||
|
|
||||||
(lock-mutex (future-mutex future))
|
(lock-mutex (future-mutex future))
|
||||||
(or (future-done? future) ; lost the race?
|
(if (eq? (future-state future) 'queued) ; lost the race?
|
||||||
(process-future! future))))
|
(begin ; no, so let's process it
|
||||||
(unlock-mutex (future-mutex future))
|
(set-future-state! future 'started)
|
||||||
|
(unlock-mutex (future-mutex future))
|
||||||
|
|
||||||
|
(process-future! future)
|
||||||
|
|
||||||
|
(with-mutex (future-mutex future)
|
||||||
|
(set-future-state! future 'done))
|
||||||
|
|
||||||
|
(broadcast-condition-variable (future-completion future)))
|
||||||
|
(begin ; yes, so try again
|
||||||
|
(unlock-mutex (future-mutex future))
|
||||||
|
(touch future))))))
|
||||||
((future-result future)))
|
((future-result future)))
|
||||||
|
|
||||||
|
|
||||||
|
|
Loading…
Add table
Add a link
Reference in a new issue