1
Fork 0
mirror of https://git.savannah.gnu.org/git/guile.git synced 2025-05-20 19:50:24 +02:00

(web server) punts keep-alive to impls; http server uses (ice-9 poll)

* module/web/server.scm: Rewrite to remove the extra "keep-alive"
  parameter. Instead, since the server is an essentially stateful
  object, have clients that want to do keep-alive manage that set as
  part of the server state. Also avoids imposing a particular data
  structure on the server implementation.

* module/web/server/http.scm: Adapt to the new server interface. Also,
  use a poll set instead of select and lists. Makes handling 1000
  clients at a time much more possible.
This commit is contained in:
Andy Wingo 2010-12-03 15:31:57 +01:00
parent 51c1dba88a
commit 462a1a04cf
2 changed files with 118 additions and 113 deletions

View file

@ -43,19 +43,12 @@
;;; server socket object, or signals an error. ;;; server socket object, or signals an error.
;;; ;;;
;;; * The `read' hook is called, to read a request from a new client. ;;; * The `read' hook is called, to read a request from a new client.
;;; The `read' hook takes two arguments: the server socket, and a ;;; The `read' hook takes one arguments, the server socket. It
;;; list of keep-alive clients. It should return four values: the ;;; should return three values: an opaque client socket, the
;;; new list of keep-alive clients, an opaque client socket, the
;;; request, and the request body. The request should be a ;;; request, and the request body. The request should be a
;;; `<request>' object, from `(web request)'. The body should be a ;;; `<request>' object, from `(web request)'. The body should be a
;;; string or a bytevector, or `#f' if there is no body. ;;; string or a bytevector, or `#f' if there is no body.
;;; ;;;
;;; The keep-alive list is used when selecting a new request. You
;;; can either serve an old client or serve a new client; and some
;;; old clients might close their connections while you are waiting.
;;; The `read' hook returns a new keep-alive set to account for old
;;; clients going away, and for read errors on old clients.
;;;
;;; If the read failed, the `read' hook may return #f for the client ;;; If the read failed, the `read' hook may return #f for the client
;;; socket, request, and body. ;;; socket, request, and body.
;;; ;;;
@ -68,14 +61,11 @@
;;; constructed with those headers. ;;; constructed with those headers.
;;; ;;;
;;; * The `write' hook is called with three arguments: the client ;;; * The `write' hook is called with three arguments: the client
;;; socket, the response, and the body. The `write' hook may return ;;; socket, the response, and the body. The `write' hook returns no
;;; #f to indicate that the connection was closed. If `write' ;;; values.
;;; returns a true value, it will be consed onto the keep-alive
;;; list.
;;; ;;;
;;; * At this point the request handling is complete. For a loop, we ;;; * At this point the request handling is complete. For a loop, we
;;; loop back with the new keep-alive list, and try to read a new ;;; loop back and try to read a new request.
;;; request.
;;; ;;;
;;; * If the user interrupts the loop, the `close' hook is called on ;;; * If the user interrupts the loop, the `close' hook is called on
;;; the server socket. ;;; the server socket.
@ -149,17 +139,17 @@
(define (open-server impl open-params) (define (open-server impl open-params)
(apply (server-impl-open impl) open-params)) (apply (server-impl-open impl) open-params))
;; -> (keep-alive client request body | keep-alive #f #f #f) ;; -> (client request body | #f #f #f)
(define (read-client impl server keep-alive) (define (read-client impl server)
(call-with-error-handling (call-with-error-handling
(lambda () (lambda ()
((server-impl-read impl) server keep-alive)) ((server-impl-read impl) server))
#:pass-keys '(quit interrupt) #:pass-keys '(quit interrupt)
#:on-error (if (batch-mode?) 'pass 'debug) #:on-error (if (batch-mode?) 'pass 'debug)
#:post-error #:post-error
(lambda (k . args) (lambda (k . args)
(warn "Error while accepting client" k args) (warn "Error while accepting client" k args)
(values keep-alive #f #f #f)))) (values #f #f #f))))
(define (call-with-encoded-output-string charset proc) (define (call-with-encoded-output-string charset proc)
(if (and (string-ci=? charset "utf-8") #f) (if (and (string-ci=? charset "utf-8") #f)
@ -256,7 +246,7 @@
(warn "Error handling request" k args) (warn "Error handling request" k args)
(values (build-response #:code 500) #f state)))) (values (build-response #:code 500) #f state))))
;; -> (#f | client) ;; -> unspecified values
(define (write-client impl server client response body) (define (write-client impl server client response body)
(call-with-error-handling (call-with-error-handling
(lambda () (lambda ()
@ -266,7 +256,7 @@
#:post-error #:post-error
(lambda (k . args) (lambda (k . args)
(warn "Error while writing response" k args) (warn "Error while writing response" k args)
#f))) (values))))
;; -> unspecified values ;; -> unspecified values
(define (close-server impl server) (define (close-server impl server)
@ -298,16 +288,13 @@
(lambda (k proc) (lambda (k proc)
(with-stack-and-prompt (lambda () (proc k)))))) (with-stack-and-prompt (lambda () (proc k))))))
(define (and-cons x xs) ;; -> new-state
(if x (cons x xs) xs)) (define (serve-one-client handler impl server state)
;; -> new keep-alive new-state
(define (serve-one-client handler impl server keep-alive state)
(debug-elapsed 'serve-again) (debug-elapsed 'serve-again)
(call-with-values (call-with-values
(lambda () (lambda ()
(read-client impl server keep-alive)) (read-client impl server))
(lambda (keep-alive client request body) (lambda (client request body)
(debug-elapsed 'read-client) (debug-elapsed 'read-client)
(if client (if client
(call-with-values (call-with-values
@ -315,13 +302,10 @@
(handle-request handler request body state)) (handle-request handler request body state))
(lambda (response body state) (lambda (response body state)
(debug-elapsed 'handle-request) (debug-elapsed 'handle-request)
(values (write-client impl server client response body)
(and-cons (let ((x (write-client impl server client response body))) (debug-elapsed 'write-client)
(debug-elapsed 'write-client) state))
x) state))))
keep-alive)
state)))
(values keep-alive state)))))
(define* (run-server handler #:optional (impl 'http) (open-params '()) (define* (run-server handler #:optional (impl 'http) (open-params '())
. state) . state)
@ -329,12 +313,8 @@
(server (open-server impl open-params))) (server (open-server impl open-params)))
(call-with-sigint (call-with-sigint
(lambda () (lambda ()
(let lp ((keep-alive '()) (state state)) (let lp ((state state))
(call-with-values (lp (serve-one-client handler impl server state))))
(lambda ()
(serve-one-client handler impl server keep-alive state))
(lambda (new-keep-alive new-state)
(lp new-keep-alive new-state)))))
(lambda () (lambda ()
(close-server impl server) (close-server impl server)
(values))))) (values)))))

View file

@ -21,10 +21,12 @@
(define-module (web server http) (define-module (web server http)
#:use-module ((srfi srfi-1) #:select (fold)) #:use-module ((srfi srfi-1) #:select (fold))
#:use-module (srfi srfi-9)
#:use-module (rnrs bytevectors) #:use-module (rnrs bytevectors)
#:use-module (web request) #:use-module (web request)
#:use-module (web response) #:use-module (web response)
#:use-module (web server) #:use-module (web server)
#:use-module (ice-9 poll)
#:use-module (system repl error-handling)) #:use-module (system repl error-handling))
@ -34,72 +36,90 @@
(bind sock family addr port) (bind sock family addr port)
sock)) sock))
(define-record-type <http-server>
(make-http-server socket poll-idx poll-set)
http-server?
(socket http-socket)
(poll-idx http-poll-idx set-http-poll-idx!)
(poll-set http-poll-set))
(define *error-events* (logior POLLHUP POLLERR))
(define *read-events* POLLIN)
(define *events* (logior *error-events* *read-events*))
;; -> server ;; -> server
(define* (http-open #:key (define* (http-open #:key
(host #f) (host #f)
(family AF_INET) (family AF_INET)
(addr (if host (addr (if host
(inet-pton family host) (inet-pton family host)
INADDR_LOOPBACK)) INADDR_LOOPBACK))
(port 8080) (port 8080)
(socket (make-default-socket family addr port))) (socket (make-default-socket family addr port)))
(listen socket 5) (listen socket 5)
(sigaction SIGPIPE SIG_IGN) (sigaction SIGPIPE SIG_IGN)
socket) (let ((poll-set (make-empty-poll-set)))
(poll-set-add! poll-set socket *events*)
(make-http-server socket 1 poll-set)))
;; -> (keep-alive client request body | keep-alive #f #f #f) ;; -> (client request body | #f #f #f)
(define (http-read server keep-alive) (define (http-read server)
(call-with-values (lambda () (let* ((poll-set (http-poll-set server)))
(let ((ports (cons server keep-alive))) (let lp ((idx (http-poll-idx server)))
(apply values (select ports '() ports))))
(lambda (readable writable except)
(cond (cond
((pair? except) ((not (< idx (poll-set-nfds poll-set)))
(values (fold (lambda (p keep-alive) (poll poll-set)
(close-port p) (lp 0))
(if (eq? p server)
(throw 'interrupt)
(delq p keep-alive)))
keep-alive
except)
#f #f #f))
((memq server readable)
;; FIXME: meta to read-request
(let* ((client (let ((pair (accept server)))
;; line buffered for request
(setvbuf (car pair) _IOLBF)
pair))
(req (read-request (car client)))
(body-str (begin
;; block buffered for body and response
(setvbuf (car client) _IOFBF)
(read-request-body/latin-1 req))))
(values keep-alive (car client) req body-str)))
((pair? readable)
;; FIXME: preserve meta for keep-alive
(let* ((p (car readable))
(keep-alive (delq p keep-alive)))
(if (eof-object? (peek-char p))
(begin
(close-port p)
(values keep-alive #f #f #f))
(call-with-error-handling
(lambda ()
;; http-write already left p in line-buffered state
(let* ((req (read-request p))
(body-str (begin
;; block buffered for body and response
(setvbuf p _IOFBF)
(read-request-body/latin-1 req))))
(values keep-alive p req body-str)))
#:pass-keys '(quit interrupt)
#:on-error (if (batch-mode?) 'pass 'debug)
#:post-error
(lambda (k . args)
(warn "Error while reading request" k args)
(values keep-alive #f #f #f #f))))))
(else (else
(values keep-alive #f #f #f)))))) (let ((revents (poll-set-revents poll-set idx)))
(cond
((zero? revents)
;; Nothing on this port.
(lp (1+ idx)))
((zero? idx)
;; The server socket.
(if (not (zero? (logand revents *error-events*)))
;; An error.
(throw 'interrupt)
;; Otherwise, we have a new client. Add to set, then
;; find another client that is ready to read.
;;
;; FIXME: preserve meta-info.
(let ((client (accept (poll-set-port poll-set idx))))
;; Set line buffering while reading the request.
(setvbuf (car client) _IOLBF)
(poll-set-add! poll-set (car client) *events*)
(lp (1+ idx)))))
;; Otherwise, a client socket with some activity on
;; it. Remove it from the poll set.
(else
(let ((port (poll-set-remove! poll-set idx)))
(cond
((or (not (zero? (logand revents *error-events*)))
(eof-object? (peek-char port)))
;; The socket was shut down or had an error. See
;; http://www.greenend.org.uk/rjk/2001/06/poll.html
;; for an interesting discussion.
(close-port port)
(lp idx))
(else
;; Otherwise, try to read a request from this port.
;; Next time we start with this index.
(set-http-poll-idx! server idx)
(call-with-error-handling
(lambda ()
(let ((req (read-request port)))
;; Block buffering for reading body and writing response.
(setvbuf port _IOFBF)
(values port
req
(read-request-body/latin-1 req))))
#:pass-keys '(quit interrupt)
#:on-error (if (batch-mode?) 'pass 'debug)
#:post-error
(lambda (k . args)
(warn "Error while reading request" k args)
(values #f #f #f))))))))))))))
(define (keep-alive? response) (define (keep-alive? response)
(let ((v (response-version response))) (let ((v (response-version response)))
@ -110,9 +130,10 @@
((0) (memq 'keep-alive (response-connection response))))) ((0) (memq 'keep-alive (response-connection response)))))
(else #f)))) (else #f))))
;; -> (#f | client) ;; -> 0 values
(define (http-write server client response body) (define (http-write server client response body)
(let ((response (write-response response client))) (let* ((response (write-response response client))
(port (response-port response)))
(cond (cond
((not body)) ; pass ((not body)) ; pass
((string? body) ((string? body)
@ -121,20 +142,24 @@
(write-response-body/bytevector response body)) (write-response-body/bytevector response body))
(else (else
(error "Expected a string or bytevector for body" body))) (error "Expected a string or bytevector for body" body)))
(force-output (response-port response)) (cond
(if (keep-alive? response) ((keep-alive? response)
(let ((p (response-port response))) (force-output port)
;; back to line buffered ;; back to line buffered
(setvbuf p _IOLBF) (setvbuf port _IOLBF)
p) (poll-set-add! (http-poll-set server) port *events*))
(begin (else
(close-port (response-port response)) (close-port port)))
#f)))) (values)))
;; -> unspecified values ;; -> unspecified values
(define (http-close server) (define (http-close server)
(shutdown server 2) (let ((poll-set (http-poll-set server)))
(close-port server)) (let lp ((n (poll-set-nfds poll-set)))
(if (positive? n)
(begin
(close-port (poll-set-remove! poll-set (1- n)))
(lp (1- n)))))))
(define-server-impl http (define-server-impl http
http-open http-open