mirror of
https://git.savannah.gnu.org/git/guile.git
synced 2025-07-03 08:10:31 +02:00
http web server: allow concurrent read operations
* module/web/server/http.scm (<http-server>): Add fields for a reader thread-pool, and some async queues that it operates on. Also, a flag, http-threaded?. (http-open): Add #:threaded? and #:read-workers kwargs. Create a thread pool for reading if threads are available. (read-request!): New function, factored out of http-read. (enqueue-read!, http-read): Instead of reading the client directly, enqueue a read. In the case where threads are not available, this will call read-request! directly. read-request! takes care of adding to the handle-queue. The read polling loop will pop items off the handle-queue. (seconds-from-now, async-queue-for-each): New helpers. (http-write): Shut down the queues and threads, hopefully in a nonblocking fashion.
This commit is contained in:
parent
31a04ee239
commit
f20ae551a1
1 changed files with 104 additions and 29 deletions
|
@ -35,7 +35,10 @@
|
|||
#:use-module (web request)
|
||||
#:use-module (web response)
|
||||
#:use-module (web server)
|
||||
#:use-module (ice-9 poll))
|
||||
#:use-module (system repl error-handling)
|
||||
#:use-module (ice-9 poll)
|
||||
#:use-module (ice-9 async-queue)
|
||||
#:use-module (ice-9 thread-pool))
|
||||
|
||||
|
||||
(define (make-default-socket family addr port)
|
||||
|
@ -63,12 +66,18 @@
|
|||
(get-u8 port)))
|
||||
|
||||
(define-record-type <http-server>
|
||||
(make-http-server socket poll-idx poll-set wake-thunk)
|
||||
(make-http-server socket poll-idx poll-set wake-thunk threaded?
|
||||
read-workers read-queue handle-queue)
|
||||
http-server?
|
||||
(socket http-socket)
|
||||
(poll-idx http-poll-idx set-http-poll-idx!)
|
||||
(poll-set http-poll-set)
|
||||
(wake-thunk http-wake-thunk))
|
||||
(wake-thunk http-wake-thunk)
|
||||
(threaded? http-threaded?)
|
||||
|
||||
(read-workers http-read-workers set-http-read-workers!)
|
||||
(read-queue http-read-queue)
|
||||
(handle-queue http-handle-queue))
|
||||
|
||||
(define *error-events* (logior POLLHUP POLLERR))
|
||||
(define *read-events* POLLIN)
|
||||
|
@ -82,7 +91,9 @@
|
|||
(inet-pton family host)
|
||||
INADDR_LOOPBACK))
|
||||
(port 8080)
|
||||
(socket (make-default-socket family addr port)))
|
||||
(socket (make-default-socket family addr port))
|
||||
(threaded? (and (provided? 'threads) #t))
|
||||
(read-workers (if threaded? 8 1)))
|
||||
(listen socket 128)
|
||||
(sigaction SIGPIPE SIG_IGN)
|
||||
(let ((poll-set (make-empty-poll-set)))
|
||||
|
@ -90,19 +101,78 @@
|
|||
(lambda (wake-thunk wake-port)
|
||||
(poll-set-add! poll-set socket *events*)
|
||||
(poll-set-add! poll-set wake-port *read-events*)
|
||||
(make-http-server socket 0 poll-set wake-thunk)))))
|
||||
(let ((read-queue (make-async-queue #:capacity read-workers))
|
||||
(handle-queue (make-async-queue #:capacity read-workers)))
|
||||
(define server
|
||||
(make-http-server socket 0 poll-set wake-thunk threaded?
|
||||
#f read-queue handle-queue))
|
||||
(if threaded?
|
||||
(begin
|
||||
(set-http-read-workers!
|
||||
server
|
||||
(make-thread-pool
|
||||
read-workers
|
||||
(lambda ()
|
||||
(cond ((async-queue-pop! read-queue)
|
||||
=> (lambda (p) (read-request! server p)))))))
|
||||
(start-thread-pool! (http-read-workers server))))
|
||||
server)))))
|
||||
|
||||
(define (bad-request port)
|
||||
(write-response (build-response #:version '(1 . 0) #:code 400
|
||||
#:headers '((content-length . 0)))
|
||||
port))
|
||||
|
||||
(define (read-request! server port)
|
||||
(call-with-error-handling
|
||||
(lambda ()
|
||||
(cond
|
||||
((eof-object? (peek-char port))
|
||||
;; EOF.
|
||||
(close-port port))
|
||||
(else
|
||||
;; Otherwise, try to read a request from this port.
|
||||
(with-throw-handler #t
|
||||
(lambda ()
|
||||
(let* ((req (read-request port))
|
||||
(body (read-request-body req)))
|
||||
(or (async-queue-push! (http-handle-queue server)
|
||||
(list port req body))
|
||||
(error "failed to push request during shutdown"))
|
||||
((http-wake-thunk server))))
|
||||
(lambda (k . args)
|
||||
(define-syntax-rule (cleanup-catch statement)
|
||||
(catch #t
|
||||
(lambda () statement)
|
||||
(lambda (k . args)
|
||||
(format (current-error-port) "In ~a:\n" 'statement)
|
||||
(print-exception (current-error-port) #f k args))))
|
||||
(cleanup-catch (bad-request port))
|
||||
(cleanup-catch (close-port port)))))))
|
||||
#:pass-keys '(quit interrupt)
|
||||
#:on-error 'backtrace
|
||||
#:post-error
|
||||
(lambda (k . args)
|
||||
(display "While reading request:\n" (current-error-port))
|
||||
(print-exception (current-error-port) #f k args))))
|
||||
|
||||
(define (enqueue-read! server port)
|
||||
(if (http-threaded? server)
|
||||
(or (async-queue-push! (http-read-queue server) port)
|
||||
(false-if-exception
|
||||
(begin
|
||||
(warn "failed to push read during shutdown.")
|
||||
(close-port port))))
|
||||
(read-request! server port)))
|
||||
|
||||
;; -> (client request body | #f #f #f)
|
||||
(define (http-read server)
|
||||
(let* ((poll-set (http-poll-set server)))
|
||||
(let lp ((idx (http-poll-idx server)))
|
||||
(let ((revents (poll-set-revents poll-set idx)))
|
||||
(cond
|
||||
((async-queue-try-pop! (http-handle-queue server))
|
||||
=> (lambda (vals) (apply values vals)))
|
||||
((zero? idx)
|
||||
;; The server socket, and the end of our downward loop.
|
||||
(cond
|
||||
|
@ -137,32 +207,11 @@
|
|||
;; it. Remove it from the poll set.
|
||||
(else
|
||||
(let ((port (poll-set-remove! poll-set idx)))
|
||||
;; Record the next index in all cases, in case the EOF check
|
||||
;; Record the next index in all cases, in case enqueue-read!
|
||||
;; throws an error.
|
||||
(set-http-poll-idx! server (1- idx))
|
||||
(cond
|
||||
((eof-object? (peek-char port))
|
||||
;; EOF.
|
||||
(close-port port)
|
||||
(lp (1- idx)))
|
||||
(else
|
||||
;; Otherwise, try to read a request from this port.
|
||||
(with-throw-handler
|
||||
#t
|
||||
(lambda ()
|
||||
(let ((req (read-request port)))
|
||||
(values port
|
||||
req
|
||||
(read-request-body req))))
|
||||
(lambda (k . args)
|
||||
(define-syntax-rule (cleanup-catch statement)
|
||||
(catch #t
|
||||
(lambda () statement)
|
||||
(lambda (k . args)
|
||||
(format (current-error-port) "In ~a:\n" 'statement)
|
||||
(print-exception (current-error-port) #f k args))))
|
||||
(cleanup-catch (bad-request port))
|
||||
(cleanup-catch (close-port port)))))))))))))
|
||||
(enqueue-read! server port)
|
||||
(lp (1- idx)))))))))
|
||||
|
||||
(define (keep-alive? response)
|
||||
(let ((v (response-version response)))
|
||||
|
@ -193,9 +242,35 @@
|
|||
(close-port port)))
|
||||
(values)))
|
||||
|
||||
(define (seconds-from-now n)
|
||||
(let ((now (gettimeofday)))
|
||||
(cons (+ (car now) n) (cdr now))))
|
||||
|
||||
(define (async-queue-for-each queue proc)
|
||||
(let lp ()
|
||||
(cond ((async-queue-try-pop! queue)
|
||||
=> (lambda (vals) (proc vals) (lp))))))
|
||||
|
||||
;; -> unspecified values
|
||||
(define (http-close server)
|
||||
(let ((poll-set (http-poll-set server)))
|
||||
(display "Plugging read queue\n")
|
||||
(async-queue-stop-accepting! (http-read-queue server))
|
||||
(display "Stopping read workers\n")
|
||||
(if (http-threaded? server)
|
||||
(stop-thread-pool! (http-read-workers server)
|
||||
(seconds-from-now 5)
|
||||
#:cancel? #t))
|
||||
(display "Plugging read queue\n")
|
||||
(async-queue-stop-accepting! (http-read-queue server))
|
||||
(display "Draining read queue\n")
|
||||
(async-queue-for-each (http-read-queue server) close-port)
|
||||
(display "Plugging handle queue\n")
|
||||
(async-queue-stop-accepting! (http-handle-queue server))
|
||||
(display "Draining handle queue\n")
|
||||
(async-queue-for-each (http-handle-queue server)
|
||||
(lambda (vals) (close-port (car vals))))
|
||||
(display "Closing poll ports\n")
|
||||
(let lp ((n (poll-set-nfds poll-set)))
|
||||
(if (positive? n)
|
||||
(begin
|
||||
|
|
Loading…
Add table
Add a link
Reference in a new issue