mirror of
https://git.savannah.gnu.org/git/guile.git
synced 2025-04-30 03:40:34 +02:00
http web server: allow concurrent write operations
* module/web/server/http.scm (<http-server>, http-open) (read-request!, write-request!, enqueue-write!, http-write): As in the previous commit, add support for concurrent writes. (http-read): Pop off keepalive ports in this, the main loop. (http-close): Shut down writers appropriately.
This commit is contained in:
parent
f20ae551a1
commit
58942f30d6
1 changed files with 83 additions and 21 deletions
|
@ -67,7 +67,8 @@
|
|||
|
||||
(define-record-type <http-server>
|
||||
(make-http-server socket poll-idx poll-set wake-thunk threaded?
|
||||
read-workers read-queue handle-queue)
|
||||
read-workers read-queue handle-queue
|
||||
write-workers write-queue keepalive-queue)
|
||||
http-server?
|
||||
(socket http-socket)
|
||||
(poll-idx http-poll-idx set-http-poll-idx!)
|
||||
|
@ -77,7 +78,11 @@
|
|||
|
||||
(read-workers http-read-workers set-http-read-workers!)
|
||||
(read-queue http-read-queue)
|
||||
(handle-queue http-handle-queue))
|
||||
(handle-queue http-handle-queue)
|
||||
|
||||
(write-workers http-write-workers set-http-write-workers!)
|
||||
(write-queue http-write-queue)
|
||||
(keepalive-queue http-keepalive-queue))
|
||||
|
||||
(define *error-events* (logior POLLHUP POLLERR))
|
||||
(define *read-events* POLLIN)
|
||||
|
@ -93,7 +98,8 @@
|
|||
(port 8080)
|
||||
(socket (make-default-socket family addr port))
|
||||
(threaded? (and (provided? 'threads) #t))
|
||||
(read-workers (if threaded? 8 1)))
|
||||
(read-workers (if threaded? 8 1))
|
||||
(write-workers (if threaded? 8 1)))
|
||||
(listen socket 128)
|
||||
(sigaction SIGPIPE SIG_IGN)
|
||||
(let ((poll-set (make-empty-poll-set)))
|
||||
|
@ -102,10 +108,14 @@
|
|||
(poll-set-add! poll-set socket *events*)
|
||||
(poll-set-add! poll-set wake-port *read-events*)
|
||||
(let ((read-queue (make-async-queue #:capacity read-workers))
|
||||
(handle-queue (make-async-queue #:capacity read-workers)))
|
||||
(handle-queue (make-async-queue #:capacity read-workers))
|
||||
(write-queue (make-async-queue #:capacity write-workers))
|
||||
(keepalive-queue (make-async-queue #:capacity write-workers
|
||||
#:fixed? #f)))
|
||||
(define server
|
||||
(make-http-server socket 0 poll-set wake-thunk threaded?
|
||||
#f read-queue handle-queue))
|
||||
#f read-queue handle-queue
|
||||
#f write-queue keepalive-queue))
|
||||
(if threaded?
|
||||
(begin
|
||||
(set-http-read-workers!
|
||||
|
@ -115,7 +125,16 @@
|
|||
(lambda ()
|
||||
(cond ((async-queue-pop! read-queue)
|
||||
=> (lambda (p) (read-request! server p)))))))
|
||||
(start-thread-pool! (http-read-workers server))))
|
||||
(start-thread-pool! (http-read-workers server))
|
||||
(set-http-write-workers!
|
||||
server
|
||||
(make-thread-pool
|
||||
write-workers
|
||||
(lambda ()
|
||||
(cond ((async-queue-pop! write-queue)
|
||||
=> (lambda (args)
|
||||
(apply write-request! server args)))))))
|
||||
(start-thread-pool! (http-write-workers server))))
|
||||
server)))))
|
||||
|
||||
(define (bad-request port)
|
||||
|
@ -173,6 +192,10 @@
|
|||
(cond
|
||||
((async-queue-try-pop! (http-handle-queue server))
|
||||
=> (lambda (vals) (apply values vals)))
|
||||
((async-queue-try-pop! (http-keepalive-queue server))
|
||||
=> (lambda (port)
|
||||
(poll-set-add! poll-set port *events*)
|
||||
(lp idx)))
|
||||
((zero? idx)
|
||||
;; The server socket, and the end of our downward loop.
|
||||
(cond
|
||||
|
@ -224,8 +247,9 @@
|
|||
((0) (memq 'keep-alive (response-connection response)))))
|
||||
(else #f)))))
|
||||
|
||||
;; -> 0 values
|
||||
(define (http-write server client response body)
|
||||
(define (write-request! server client response body)
|
||||
(call-with-error-handling
|
||||
(lambda ()
|
||||
(let* ((response (write-response response client))
|
||||
(port (response-port response)))
|
||||
(cond
|
||||
|
@ -237,10 +261,34 @@
|
|||
(cond
|
||||
((keep-alive? response)
|
||||
(force-output port)
|
||||
(poll-set-add! (http-poll-set server) port *events*))
|
||||
(or (async-queue-push! (http-keepalive-queue server) port)
|
||||
;; Shutting down; there is a sane thing to do, no need to
|
||||
;; error.
|
||||
(close-port port))
|
||||
((http-wake-thunk server)))
|
||||
(else
|
||||
(close-port port)))
|
||||
(values)))
|
||||
(close-port port)))))
|
||||
#:pass-keys '(quit interrupt)
|
||||
#:on-error 'backtrace
|
||||
#:post-error
|
||||
(lambda (k . args)
|
||||
(display "While writing response:\n" (current-error-port))
|
||||
(print-exception (current-error-port) #f k args))))
|
||||
|
||||
(define (enqueue-write! server client response body)
|
||||
(if (http-threaded? server)
|
||||
(or (async-queue-push! (http-write-queue server)
|
||||
(list client response body))
|
||||
(false-if-exception
|
||||
(begin
|
||||
(warn "failed to push write during shutdown.")
|
||||
(close-port client))))
|
||||
(write-request! server client response body)))
|
||||
|
||||
;; -> 0 values
|
||||
(define (http-write server client response body)
|
||||
(enqueue-write! server client response body)
|
||||
(values))
|
||||
|
||||
(define (seconds-from-now n)
|
||||
(let ((now (gettimeofday)))
|
||||
|
@ -270,6 +318,20 @@
|
|||
(display "Draining handle queue\n")
|
||||
(async-queue-for-each (http-handle-queue server)
|
||||
(lambda (vals) (close-port (car vals))))
|
||||
(display "Plugging write queue\n")
|
||||
(async-queue-stop-accepting! (http-write-queue server))
|
||||
(display "Stopping write workers\n")
|
||||
(if (http-threaded? server)
|
||||
(stop-thread-pool! (http-write-workers server)
|
||||
(seconds-from-now 5)
|
||||
#:cancel? #t))
|
||||
(display "Draining write queue\n")
|
||||
(async-queue-for-each (http-write-queue server)
|
||||
(lambda (vals) (close-port (car vals))))
|
||||
(display "Plugging keepalive queue\n")
|
||||
(async-queue-stop-accepting! (http-keepalive-queue server))
|
||||
(display "Draining keepalive queue\n")
|
||||
(async-queue-for-each (http-keepalive-queue server) close-port)
|
||||
(display "Closing poll ports\n")
|
||||
(let lp ((n (poll-set-nfds poll-set)))
|
||||
(if (positive? n)
|
||||
|
|
Loading…
Add table
Add a link
Reference in a new issue