From 58942f30d6e7dd58350b8f4bce863e597dd94b43 Mon Sep 17 00:00:00 2001 From: Andy Wingo Date: Tue, 7 Feb 2012 22:12:26 +0100 Subject: [PATCH] http web server: allow concurrent write operations * module/web/server/http.scm (, http-open) (read-request!, write-request!, enqueue-write!, http-write): As in the previous commit, add support for concurrent writes. (http-read): Pop off keepalive ports in this, the main loop. (http-close): Shut down writers appropriately. --- module/web/server/http.scm | 104 +++++++++++++++++++++++++++++-------- 1 file changed, 83 insertions(+), 21 deletions(-) diff --git a/module/web/server/http.scm b/module/web/server/http.scm index a88b480c7..05990af26 100644 --- a/module/web/server/http.scm +++ b/module/web/server/http.scm @@ -67,7 +67,8 @@ (define-record-type (make-http-server socket poll-idx poll-set wake-thunk threaded? - read-workers read-queue handle-queue) + read-workers read-queue handle-queue + write-workers write-queue keepalive-queue) http-server? (socket http-socket) (poll-idx http-poll-idx set-http-poll-idx!) @@ -77,7 +78,11 @@ (read-workers http-read-workers set-http-read-workers!) (read-queue http-read-queue) - (handle-queue http-handle-queue)) + (handle-queue http-handle-queue) + + (write-workers http-write-workers set-http-write-workers!) + (write-queue http-write-queue) + (keepalive-queue http-keepalive-queue)) (define *error-events* (logior POLLHUP POLLERR)) (define *read-events* POLLIN) @@ -93,7 +98,8 @@ (port 8080) (socket (make-default-socket family addr port)) (threaded? (and (provided? 'threads) #t)) - (read-workers (if threaded? 8 1))) + (read-workers (if threaded? 8 1)) + (write-workers (if threaded? 8 1))) (listen socket 128) (sigaction SIGPIPE SIG_IGN) (let ((poll-set (make-empty-poll-set))) @@ -102,10 +108,14 @@ (poll-set-add! poll-set socket *events*) (poll-set-add! poll-set wake-port *read-events*) (let ((read-queue (make-async-queue #:capacity read-workers)) - (handle-queue (make-async-queue #:capacity read-workers))) + (handle-queue (make-async-queue #:capacity read-workers)) + (write-queue (make-async-queue #:capacity write-workers)) + (keepalive-queue (make-async-queue #:capacity write-workers + #:fixed? #f))) (define server (make-http-server socket 0 poll-set wake-thunk threaded? - #f read-queue handle-queue)) + #f read-queue handle-queue + #f write-queue keepalive-queue)) (if threaded? (begin (set-http-read-workers! @@ -115,7 +125,16 @@ (lambda () (cond ((async-queue-pop! read-queue) => (lambda (p) (read-request! server p))))))) - (start-thread-pool! (http-read-workers server)))) + (start-thread-pool! (http-read-workers server)) + (set-http-write-workers! + server + (make-thread-pool + write-workers + (lambda () + (cond ((async-queue-pop! write-queue) + => (lambda (args) + (apply write-request! server args))))))) + (start-thread-pool! (http-write-workers server)))) server))))) (define (bad-request port) @@ -173,6 +192,10 @@ (cond ((async-queue-try-pop! (http-handle-queue server)) => (lambda (vals) (apply values vals))) + ((async-queue-try-pop! (http-keepalive-queue server)) + => (lambda (port) + (poll-set-add! poll-set port *events*) + (lp idx))) ((zero? idx) ;; The server socket, and the end of our downward loop. (cond @@ -224,23 +247,48 @@ ((0) (memq 'keep-alive (response-connection response))))) (else #f))))) +(define (write-request! server client response body) + (call-with-error-handling + (lambda () + (let* ((response (write-response response client)) + (port (response-port response))) + (cond + ((not body)) ; pass + ((bytevector? body) + (write-response-body response body)) + (else + (error "Expected a bytevector for body" body))) + (cond + ((keep-alive? response) + (force-output port) + (or (async-queue-push! (http-keepalive-queue server) port) + ;; Shutting down; there is a sane thing to do, no need to + ;; error. + (close-port port)) + ((http-wake-thunk server))) + (else + (close-port port))))) + #:pass-keys '(quit interrupt) + #:on-error 'backtrace + #:post-error + (lambda (k . args) + (display "While writing response:\n" (current-error-port)) + (print-exception (current-error-port) #f k args)))) + +(define (enqueue-write! server client response body) + (if (http-threaded? server) + (or (async-queue-push! (http-write-queue server) + (list client response body)) + (false-if-exception + (begin + (warn "failed to push write during shutdown.") + (close-port client)))) + (write-request! server client response body))) + ;; -> 0 values (define (http-write server client response body) - (let* ((response (write-response response client)) - (port (response-port response))) - (cond - ((not body)) ; pass - ((bytevector? body) - (write-response-body response body)) - (else - (error "Expected a bytevector for body" body))) - (cond - ((keep-alive? response) - (force-output port) - (poll-set-add! (http-poll-set server) port *events*)) - (else - (close-port port))) - (values))) + (enqueue-write! server client response body) + (values)) (define (seconds-from-now n) (let ((now (gettimeofday))) @@ -270,6 +318,20 @@ (display "Draining handle queue\n") (async-queue-for-each (http-handle-queue server) (lambda (vals) (close-port (car vals)))) + (display "Plugging write queue\n") + (async-queue-stop-accepting! (http-write-queue server)) + (display "Stopping write workers\n") + (if (http-threaded? server) + (stop-thread-pool! (http-write-workers server) + (seconds-from-now 5) + #:cancel? #t)) + (display "Draining write queue\n") + (async-queue-for-each (http-write-queue server) + (lambda (vals) (close-port (car vals)))) + (display "Plugging keepalive queue\n") + (async-queue-stop-accepting! (http-keepalive-queue server)) + (display "Draining keepalive queue\n") + (async-queue-for-each (http-keepalive-queue server) close-port) (display "Closing poll ports\n") (let lp ((n (poll-set-nfds poll-set))) (if (positive? n)