1
Fork 0
mirror of https://git.savannah.gnu.org/git/guile.git synced 2025-05-28 07:50:20 +02:00

* threads.scm (par-map, par-for-each): Reimplemented using

joing-thread.
(parallel): Reimplemented using futures.
(n-par-map, n-for-each): New procedures.
This commit is contained in:
Mikael Djurfeldt 2002-12-15 14:36:19 +00:00
parent 28d52ebb19
commit 876235959d
2 changed files with 97 additions and 96 deletions

View file

@ -1,3 +1,10 @@
2002-12-15 Mikael Djurfeldt <djurfeldt@nada.kth.se>
* threads.scm (par-map, par-for-each): Reimplemented using
joing-thread.
(parallel): Reimplemented using futures.
(n-par-map, n-for-each): New procedures.
2002-12-12 Marius Vollmer <mvo@zagadka.ping.de>
* optargs.scm (improper-list-copy): New.

View file

@ -57,65 +57,83 @@
;;; Code:
(define-module (ice-9 threads)
:export (par-map
:export (future-ref
par-map
par-for-each
%thread-handler)
:export-syntax (make-thread
begin-thread
n-par-map
n-par-for-each)
:export-syntax (begin-thread
future
parallel
letpar
make-thread
with-mutex
monitor))
(define (par-map proc . arglists)
(let* ((m (make-mutex))
(c (make-condition-variable))
(n (length (car arglists)))
(counter (- n 1))
(res (make-list n))
(ls res))
(lock-mutex m)
(apply for-each
(lambda args
(let ((res ls))
(set! ls (cdr ls))
(call-with-new-thread
(lambda ()
(set-car! res (apply proc args))
;; synchronize
(lock-mutex m)
(if (zero? counter)
(signal-condition-variable c)
(set! counter (- counter 1)))
(unlock-mutex m))
%thread-handler)))
arglists)
(wait-condition-variable c m)
res))
(define future-ref join-thread)
(define (par-for-each proc . arglists)
(let* ((m (make-mutex))
(c (make-condition-variable))
(counter (- (length (car arglists)) 1)))
(lock-mutex m)
(apply for-each
(lambda args
(call-with-new-thread
(lambda ()
(apply proc args)
;; synchronize
(lock-mutex m)
(if (zero? counter)
(signal-condition-variable c)
(set! counter (- counter 1)))
(unlock-mutex m))
%thread-handler))
arglists)
(wait-condition-variable c m)))
(define ((par-mapper mapper) proc . arglists)
(mapper join-thread
(apply map
(lambda args
(call-with-new-thread (lambda ()
(apply proc args))
%thread-handler))
arglists)))
(define (%thread-handler tag . args)
(define par-map (par-mapper map))
(define par-for-each (par-mapper for-each))
(define (n-par-map n proc . arglists)
(let* ((m (make-mutex))
(threads '())
(results (make-list (length (car arglists))))
(result results))
(do ((i 0 (+ 1 i)))
((= i n)
(for-each join-thread threads)
results)
(set! threads
(cons (call-with-new-thread
(lambda ()
(let loop ()
(lock-mutex m)
(if (null? result)
(unlock-mutex m)
(let ((args (map car arglists))
(my-result result))
(set! arglists (map cdr arglists))
(set! result (cdr result))
(unlock-mutex m)
(set-car! my-result (apply proc args))
(loop)))))
%thread-handler)
threads)))))
(define (n-par-for-each n proc . arglists)
(let ((m (make-mutex))
(threads '()))
(do ((i 0 (+ 1 i)))
((= i n)
(for-each join-thread threads))
(set! threads
(cons (call-with-new-thread
(lambda ()
(let loop ()
(lock-mutex m)
(if (null? (car arglists))
(unlock-mutex m)
(let ((args (map car arglists)))
(set! arglists (map cdr arglists))
(unlock-mutex m)
(apply proc args)
(loop)))))
%thread-handler)
threads)))))
(define (thread-handler tag . args)
(fluid-set! the-last-stack #f)
(let ((n (length args))
(p (current-error-port)))
@ -138,71 +156,47 @@
(newline p)))
#f))
;;; Set system thread handler
(set! %thread-handler thread-handler)
; --- MACROS -------------------------------------------------------
(defmacro make-thread (proc . args)
`(call-with-new-thread
(lambda ()
(,proc ,@args))
%thread-handler))
(define-macro (begin-thread . forms)
(if (null? forms)
'(begin)
`(call-with-new-thread
(lambda ()
,@forms)
%thread-handler)))
(defmacro begin-thread (first . rest)
`(call-with-new-thread
(lambda ()
(begin
,first ,@rest))
%thread-handler))
(defmacro parallel forms
(define-macro (parallel . forms)
(cond ((null? forms) '(begin))
((null? (cdr forms)) (car forms))
(else
(let* ((m (make-symbol "m"))
(c (make-symbol "c"))
(counter (make-symbol "counter"))
(sync (make-symbol "sync"))
(n-forms (length forms))
(vars (map (lambda (i)
(make-symbol (string-append "res"
(number->string i))))
(iota n-forms))))
`(let* ((,m (make-mutex))
(,c (make-condition-variable))
(,counter ,(- n-forms 1))
(,sync (lambda ()
(lock-mutex ,m)
(if (zero? ,counter)
(signal-condition-variable ,c)
(set! ,counter (- ,counter 1)))
(unlock-mutex ,m)))
,@(map (lambda (var)
`(,var #f))
vars))
(lock-mutex ,m)
,@(map (lambda (var form)
`(call-with-new-thread (lambda ()
(set! ,var ,form)
(,sync))
%thread-handler))
vars
forms)
(wait-condition-variable ,c ,m)
(values ,@vars))))))
`(apply values
(map future-ref
(list ,@(map (lambda (form) `(future ,form)) forms)))))))
(defmacro letpar (bindings . body)
(define-macro (letpar bindings . body)
`(call-with-values
(lambda ()
(parallel ,@(map cadr bindings)))
(lambda ,(map car bindings)
,@body)))
(defmacro with-mutex (m . body)
(define-macro (make-thread proc . args)
`(call-with-new-thread
(lambda ()
(,proc ,@args))
%thread-handler))
(define-macro (with-mutex m . body)
`(dynamic-wind
(lambda () (lock-mutex ,m))
(lambda () (begin ,@body))
(lambda () (unlock-mutex ,m))))
(defmacro monitor (first . rest)
(define-macro (monitor first . rest)
`(with-mutex ,(make-mutex)
(begin
,first ,@rest)))