diff --git a/module/ice-9/futures.scm b/module/ice-9/futures.scm index 7fbccf63f..f574410b1 100644 --- a/module/ice-9/futures.scm +++ b/module/ice-9/futures.scm @@ -44,19 +44,21 @@ ;;; (define-record-type - (%make-future thunk done? mutex) + (%make-future thunk state mutex completion) future? - (thunk future-thunk) - (done? future-done? set-future-done?!) - (result future-result set-future-result!) - (mutex future-mutex)) + (thunk future-thunk) + (state future-state set-future-state!) ; done | started | queued + (result future-result set-future-result!) + (mutex future-mutex) + (completion future-completion)) ; completion cond. var. (define (make-future thunk) "Return a new future for THUNK. Execution may start at any point concurrently, or it can start at the time when the returned future is touched." (create-workers!) - (let ((future (%make-future thunk #f (make-mutex)))) + (let ((future (%make-future thunk 'queued + (make-mutex) (make-condition-variable)))) (register-future! future) future)) @@ -69,6 +71,14 @@ touched." (define %futures-mutex (make-mutex)) (define %futures-available (make-condition-variable)) +(define-syntax-rule (with-mutex m e0 e1 ...) + ;; Copied from (ice-9 threads) to avoid circular dependency. + (let ((x m)) + (dynamic-wind + (lambda () (lock-mutex x)) + (lambda () (begin e0 e1 ...)) + (lambda () (unlock-mutex x))))) + (define (register-future! future) ;; Register FUTURE as being processable. (lock-mutex %futures-mutex) @@ -77,7 +87,7 @@ touched." (unlock-mutex %futures-mutex)) (define (process-future! future) - ;; Process FUTURE, assuming its mutex is already taken. + ;; Process FUTURE, and update its result. (set-future-result! future (catch #t (lambda () @@ -87,8 +97,7 @@ touched." (apply values results))))) (lambda args (lambda () - (apply throw args))))) - (set-future-done?! future #t)) + (apply throw args)))))) (define (process-futures) ;; Wait for futures to be available and process them. @@ -101,42 +110,74 @@ touched." (or (q-empty? %futures) (let ((future (deq! %futures))) (lock-mutex (future-mutex future)) - (or (and (future-done? future) - (unlock-mutex (future-mutex future))) - (begin - ;; Do the actual work. + (case (future-state future) + ((done started) + ;; Nothing to do. + (unlock-mutex (future-mutex future))) + (else + ;; Do the actual work. - ;; We want to release %FUTURES-MUTEX so that other workers - ;; can progress. However, to avoid deadlocks, we have to - ;; unlock FUTURE as well, to preserve lock ordering. - (unlock-mutex (future-mutex future)) - (unlock-mutex %futures-mutex) + ;; We want to release %FUTURES-MUTEX so that other workers can + ;; progress. However, to avoid deadlocks, we have to unlock + ;; FUTURE as well, to preserve lock ordering. + (unlock-mutex (future-mutex future)) + (unlock-mutex %futures-mutex) - (lock-mutex (future-mutex future)) - (or (future-done? future) ; lost the race? - (process-future! future)) - (unlock-mutex (future-mutex future)) + (lock-mutex (future-mutex future)) + (if (eq? (future-state future) 'queued) ; lost the race? + (begin ; no, so let's process it + (set-future-state! future 'started) + (unlock-mutex (future-mutex future)) - (lock-mutex %futures-mutex))))) + (process-future! future) + + (with-mutex (future-mutex future) + (set-future-state! future 'done)) + + (broadcast-condition-variable (future-completion future))) + (unlock-mutex (future-mutex future))) ; yes + + (lock-mutex %futures-mutex))))) + + ;; Look for more work. (loop))) (define (touch future) "Return the result of FUTURE, computing it if not already done." (lock-mutex (future-mutex future)) - (or (future-done? future) - (begin - ;; Do the actual work. Unlock FUTURE first to preserve lock - ;; ordering. - (unlock-mutex (future-mutex future)) + (case (future-state future) + ((done) + (unlock-mutex (future-mutex future))) + ((started) + ;; Wait for completion. + (wait-condition-variable (future-completion future) + (future-mutex future)) + (unlock-mutex (future-mutex future))) + ((queued) + (begin + ;; Do the actual work. Unlock FUTURE first to preserve lock + ;; ordering. + (unlock-mutex (future-mutex future)) - (lock-mutex %futures-mutex) - (q-remove! %futures future) - (unlock-mutex %futures-mutex) + (lock-mutex %futures-mutex) + (q-remove! %futures future) + (unlock-mutex %futures-mutex) - (lock-mutex (future-mutex future)) - (or (future-done? future) ; lost the race? - (process-future! future)))) - (unlock-mutex (future-mutex future)) + (lock-mutex (future-mutex future)) + (if (eq? (future-state future) 'queued) ; lost the race? + (begin ; no, so let's process it + (set-future-state! future 'started) + (unlock-mutex (future-mutex future)) + + (process-future! future) + + (with-mutex (future-mutex future) + (set-future-state! future 'done)) + + (broadcast-condition-variable (future-completion future))) + (begin ; yes, so try again + (unlock-mutex (future-mutex future)) + (touch future)))))) ((future-result future)))