1
Fork 0
mirror of https://git.savannah.gnu.org/git/guile.git synced 2025-05-20 11:40:18 +02:00

futures: Allow nested futures; put the main thread to work.

* module/ice-9/futures.scm (%futures-waiting, %within-future?,
  %future-prompt): New variables.
  (let/ec): New macro.
  (process-future!): Run FUTURE's thunk in a prompt; capture FUTURE's
  continuation when it aborts, and add it to %FUTURES-WAITING.  Set
  %WITHIN-FUTURE? in the dynamic extent of the call FUTURE's thunk.
  (process-futures): Move loop body to...
  (process-one-future): ... here.  New procedure.
  (notify-completion): New procedure.
  (touch)[work, loop]: New procedures.
  When %WITHIN-FUTURE? and FUTURE is started, abort; if not
  %WITHIN-FUTURE, call `work' while waiting.
  When FUTURE is queued, call `work' too.

* test-suite/tests/future.test ("nested futures"): New tests.
This commit is contained in:
Ludovic Courtès 2012-11-17 00:20:21 +01:00
parent ab975cf592
commit 3e529bf02a
2 changed files with 177 additions and 80 deletions

View file

@ -20,8 +20,10 @@
#:use-module (srfi srfi-1)
#:use-module (srfi srfi-9)
#:use-module (srfi srfi-9 gnu)
#:use-module (srfi srfi-11)
#:use-module (ice-9 threads)
#:use-module (ice-9 q)
#:use-module (ice-9 match)
#:export (future make-future future? touch))
;;; Author: Ludovic Courtès <ludo@gnu.org>
@ -47,7 +49,7 @@
(define-record-type <future>
(%make-future thunk state mutex completion)
future?
(thunk future-thunk)
(thunk future-thunk set-future-thunk!)
(state future-state set-future-state!) ; done | started | queued
(result future-result set-future-result!)
(mutex future-mutex)
@ -76,10 +78,20 @@ touched."
;;; Future queues.
;;;
;; Global queue of pending futures.
;; TODO: Use per-worker queues to reduce contention.
(define %futures (make-q))
;; Lock for %FUTURES and %FUTURES-WAITING.
(define %futures-mutex (make-mutex))
(define %futures-available (make-condition-variable))
;; A mapping of nested futures to futures waiting for them to complete.
(define %futures-waiting '())
;; Whether currently running within a future.
(define %within-future? (make-parameter #f))
(define-syntax-rule (with-mutex m e0 e1 ...)
;; Copied from (ice-9 threads) to avoid circular dependency.
(let ((x m))
@ -88,6 +100,22 @@ touched."
(lambda () (begin e0 e1 ...))
(lambda () (unlock-mutex x)))))
(define-syntax-rule (let/ec k e e* ...) ; TODO: move to core
(let ((tag (make-prompt-tag)))
(call-with-prompt
tag
(lambda ()
(let ((k (lambda args (apply abort-to-prompt tag args))))
e e* ...))
(lambda (_ res) res))))
(define %future-prompt
;; The prompt futures abort to when they want to wait for another
;; future.
(make-prompt-tag))
(define (register-future! future)
;; Register FUTURE as being processable.
(lock-mutex %futures-mutex)
@ -96,97 +124,146 @@ touched."
(unlock-mutex %futures-mutex))
(define (process-future! future)
;; Process FUTURE, and update its result.
(set-future-result! future
(catch #t
(lambda ()
(call-with-values (future-thunk future)
(lambda results
"Process FUTURE. When FUTURE completes, return #t and update its
result; otherwise, when FUTURE touches a nested future that has not
completed yet, then suspend it and return #f. Suspending a future
consists in capturing its continuation, marking it as `queued', and
adding it to the waiter queue."
(let/ec return
(let* ((suspend
(lambda (cont future-to-wait)
;; FUTURE wishes to wait for the completion of FUTURE-TO-WAIT.
;; At this point, FUTURE is unlocked and in `started' state,
;; and FUTURE-TO-WAIT is unlocked.
(with-mutex %futures-mutex
(with-mutex (future-mutex future)
(set-future-thunk! future cont)
(set-future-state! future 'queued))
(with-mutex (future-mutex future-to-wait)
;; If FUTURE-TO-WAIT completed in the meantime, then
;; reschedule FUTURE directly; otherwise, add it to the
;; waiter queue.
(if (eq? 'done (future-state future-to-wait))
(begin
(enq! %futures future)
(signal-condition-variable %futures-available))
(set! %futures-waiting
(alist-cons future-to-wait future
%futures-waiting))))
(return #f))))
(thunk (lambda ()
(call-with-prompt %future-prompt
(lambda ()
(parameterize ((%within-future? #t))
((future-thunk future))))
suspend))))
(set-future-result! future
(catch #t
(lambda ()
(call-with-values thunk
(lambda results
(lambda ()
(apply values results)))))
(lambda args
(lambda ()
(apply values results)))))
(lambda args
(lambda ()
(apply throw args))))))
(apply throw args)))))
#t)))
(define (process-one-future)
"Attempt to pick one future from the queue and process it."
;; %FUTURES-MUTEX must be locked on entry, and is locked on exit.
(or (q-empty? %futures)
(let ((future (deq! %futures)))
(lock-mutex (future-mutex future))
(case (future-state future)
((done started)
;; Nothing to do.
(unlock-mutex (future-mutex future)))
(else
;; Do the actual work.
;; We want to release %FUTURES-MUTEX so that other workers can
;; progress. However, to avoid deadlocks, we have to unlock
;; FUTURE as well, to preserve lock ordering.
(unlock-mutex (future-mutex future))
(unlock-mutex %futures-mutex)
(lock-mutex (future-mutex future))
(if (eq? (future-state future) 'queued) ; lost the race?
(begin ; no, so let's process it
(set-future-state! future 'started)
(unlock-mutex (future-mutex future))
(let ((done? (process-future! future)))
(when done?
(with-mutex %futures-mutex
(with-mutex (future-mutex future)
(set-future-state! future 'done)
(notify-completion future))))))
(unlock-mutex (future-mutex future))) ; yes
(lock-mutex %futures-mutex))))))
(define (process-futures)
;; Wait for futures to be available and process them.
"Continuously process futures from the queue."
(lock-mutex %futures-mutex)
(let loop ()
(when (q-empty? %futures)
(wait-condition-variable %futures-available
%futures-mutex))
(or (q-empty? %futures)
(let ((future (deq! %futures)))
(lock-mutex (future-mutex future))
(case (future-state future)
((done started)
;; Nothing to do.
(unlock-mutex (future-mutex future)))
(else
;; Do the actual work.
;; We want to release %FUTURES-MUTEX so that other workers can
;; progress. However, to avoid deadlocks, we have to unlock
;; FUTURE as well, to preserve lock ordering.
(unlock-mutex (future-mutex future))
(unlock-mutex %futures-mutex)
(lock-mutex (future-mutex future))
(if (eq? (future-state future) 'queued) ; lost the race?
(begin ; no, so let's process it
(set-future-state! future 'started)
(unlock-mutex (future-mutex future))
(process-future! future)
(with-mutex (future-mutex future)
(set-future-state! future 'done))
(broadcast-condition-variable (future-completion future)))
(unlock-mutex (future-mutex future))) ; yes
(lock-mutex %futures-mutex)))))
;; Look for more work.
(process-one-future)
(loop)))
(define (notify-completion future)
"Notify futures and callers waiting that FUTURE completed."
;; FUTURE and %FUTURES-MUTEX are locked.
(broadcast-condition-variable (future-completion future))
(let-values (((waiting remaining)
(partition (match-lambda ; TODO: optimize
((waitee . _)
(eq? waitee future)))
%futures-waiting)))
(set! %futures-waiting remaining)
(for-each (match-lambda
((_ . waiter)
(enq! %futures waiter)))
waiting)))
(define (touch future)
"Return the result of FUTURE, computing it if not already done."
(lock-mutex (future-mutex future))
(case (future-state future)
((done)
(unlock-mutex (future-mutex future)))
((started)
;; Wait for completion.
(wait-condition-variable (future-completion future)
(future-mutex future))
(unlock-mutex (future-mutex future)))
((queued)
(begin
;; Do the actual work. Unlock FUTURE first to preserve lock
;; ordering.
(define (work)
;; Do some work while waiting for FUTURE to complete.
(lock-mutex %futures-mutex)
(if (q-empty? %futures)
(begin
(unlock-mutex %futures-mutex)
(with-mutex (future-mutex future)
(unless (eq? 'done (future-state future))
(wait-condition-variable (future-completion future)
(future-mutex future)))))
(begin
(process-one-future)
(unlock-mutex %futures-mutex))))
(let loop ()
(lock-mutex (future-mutex future))
(case (future-state future)
((done)
(unlock-mutex (future-mutex future)))
((started)
(unlock-mutex (future-mutex future))
(lock-mutex %futures-mutex)
(q-remove! %futures future)
(unlock-mutex %futures-mutex)
(lock-mutex (future-mutex future))
(if (eq? (future-state future) 'queued) ; lost the race?
(begin ; no, so let's process it
(set-future-state! future 'started)
(unlock-mutex (future-mutex future))
(process-future! future)
(with-mutex (future-mutex future)
(set-future-state! future 'done))
(broadcast-condition-variable (future-completion future)))
(begin ; yes, so try again
(unlock-mutex (future-mutex future))
(touch future))))))
(if (%within-future?)
(abort-to-prompt %future-prompt future)
(begin
(work)
(loop))))
(else
(unlock-mutex (future-mutex future))
(work)
(loop))))
((future-result future)))
@ -234,3 +311,7 @@ touched."
(define-syntax-rule (future body)
"Return a new future for BODY."
(make-future (lambda () body)))
;;; Local Variables:
;;; eval: (put 'with-mutex 'scheme-indent-function 1)
;;; End:

View file

@ -2,7 +2,7 @@
;;;;
;;;; Ludovic Courtès <ludo@gnu.org>
;;;;
;;;; Copyright (C) 2010 Free Software Foundation, Inc.
;;;; Copyright (C) 2010, 2012 Free Software Foundation, Inc.
;;;;
;;;; This library is free software; you can redistribute it and/or
;;;; modify it under the terms of the GNU Lesser General Public
@ -22,7 +22,8 @@
#:use-module (test-suite lib)
#:use-module (ice-9 futures)
#:use-module (srfi srfi-1)
#:use-module (srfi srfi-26))
#:use-module (srfi srfi-26)
#:use-module (system base compile))
(define specific-exception-key (gensym))
@ -90,3 +91,18 @@
(pass-if-exception "exception"
specific-exception
(touch (future (throw specific-exception-key 'test "thrown!")))))
(with-test-prefix "nested futures"
(pass-if-equal "simple" 2
(touch (future (1+ (touch (future (1+ (touch (future 0)))))))))
(pass-if-equal "loop" (map - (iota 1000))
;; Compile to avoid stack overflows.
(compile '(let loop ((list (iota 1000)))
(if (null? list)
'()
(cons (- (car list))
(touch (future (loop (cdr list)))))))
#:to 'value
#:env (current-module))))