mirror of
https://git.savannah.gnu.org/git/guile.git
synced 2025-04-30 11:50:28 +02:00
* module/web/server/http.scm (<http-server>, http-open) (read-request!, write-request!, enqueue-write!, http-write): As in the previous commit, add support for concurrent writes. (http-read): Pop off keepalive ports in this, the main loop. (http-close): Shut down writers appropriately.
346 lines
13 KiB
Scheme
346 lines
13 KiB
Scheme
;;; Web I/O: HTTP
|
|
|
|
;; Copyright (C) 2010, 2011, 2012 Free Software Foundation, Inc.
|
|
|
|
;; This library is free software; you can redistribute it and/or
|
|
;; modify it under the terms of the GNU Lesser General Public
|
|
;; License as published by the Free Software Foundation; either
|
|
;; version 3 of the License, or (at your option) any later version.
|
|
;;
|
|
;; This library is distributed in the hope that it will be useful,
|
|
;; but WITHOUT ANY WARRANTY; without even the implied warranty of
|
|
;; MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
|
|
;; Lesser General Public License for more details.
|
|
;;
|
|
;; You should have received a copy of the GNU Lesser General Public
|
|
;; License along with this library; if not, write to the Free Software
|
|
;; Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA
|
|
;; 02110-1301 USA
|
|
|
|
;;; Commentary:
|
|
;;;
|
|
;;; This is the HTTP implementation of the (web server) interface.
|
|
;;;
|
|
;;; `read-request' sets the character encoding on the new port to
|
|
;;; latin-1. See the note in request.scm regarding character sets,
|
|
;;; strings, and bytevectors for more information.
|
|
;;;
|
|
;;; Code:
|
|
|
|
(define-module (web server http)
|
|
#:use-module ((srfi srfi-1) #:select (fold))
|
|
#:use-module (srfi srfi-9)
|
|
#:use-module (ice-9 binary-ports)
|
|
#:use-module (rnrs bytevectors)
|
|
#:use-module (web request)
|
|
#:use-module (web response)
|
|
#:use-module (web server)
|
|
#:use-module (system repl error-handling)
|
|
#:use-module (ice-9 poll)
|
|
#:use-module (ice-9 async-queue)
|
|
#:use-module (ice-9 thread-pool))
|
|
|
|
|
|
(define (make-default-socket family addr port)
|
|
(let ((sock (socket PF_INET SOCK_STREAM 0)))
|
|
(setsockopt sock SOL_SOCKET SO_REUSEADDR 1)
|
|
(bind sock family addr port)
|
|
sock))
|
|
|
|
(define (make-waker)
|
|
(let ((wakeup (socketpair AF_UNIX SOCK_STREAM 0)))
|
|
(fcntl (car wakeup) F_SETFL O_NONBLOCK)
|
|
(fcntl (cdr wakeup) F_SETFL O_NONBLOCK)
|
|
#;
|
|
(if (defined? 'O_CLOEXEC)
|
|
(begin
|
|
;; FIXME: currently it's not defined...
|
|
(fcntl (car wakeup) F_SETFD O_CLOEXEC)
|
|
(fcntl (cdr wakeup) F_SETFD O_CLOEXEC)))
|
|
(values (lambda () (put-u8 (car wakeup) 0))
|
|
(cdr wakeup))))
|
|
|
|
(define flush-wake-port
|
|
;; FIXME: need a bytevector-get-n! that understands nonblocking ports.
|
|
(lambda (port)
|
|
(get-u8 port)))
|
|
|
|
(define-record-type <http-server>
|
|
(make-http-server socket poll-idx poll-set wake-thunk threaded?
|
|
read-workers read-queue handle-queue
|
|
write-workers write-queue keepalive-queue)
|
|
http-server?
|
|
(socket http-socket)
|
|
(poll-idx http-poll-idx set-http-poll-idx!)
|
|
(poll-set http-poll-set)
|
|
(wake-thunk http-wake-thunk)
|
|
(threaded? http-threaded?)
|
|
|
|
(read-workers http-read-workers set-http-read-workers!)
|
|
(read-queue http-read-queue)
|
|
(handle-queue http-handle-queue)
|
|
|
|
(write-workers http-write-workers set-http-write-workers!)
|
|
(write-queue http-write-queue)
|
|
(keepalive-queue http-keepalive-queue))
|
|
|
|
(define *error-events* (logior POLLHUP POLLERR))
|
|
(define *read-events* POLLIN)
|
|
(define *events* (logior *error-events* *read-events*))
|
|
|
|
;; -> server
|
|
(define* (http-open #:key
|
|
(host #f)
|
|
(family AF_INET)
|
|
(addr (if host
|
|
(inet-pton family host)
|
|
INADDR_LOOPBACK))
|
|
(port 8080)
|
|
(socket (make-default-socket family addr port))
|
|
(threaded? (and (provided? 'threads) #t))
|
|
(read-workers (if threaded? 8 1))
|
|
(write-workers (if threaded? 8 1)))
|
|
(listen socket 128)
|
|
(sigaction SIGPIPE SIG_IGN)
|
|
(let ((poll-set (make-empty-poll-set)))
|
|
(call-with-values make-waker
|
|
(lambda (wake-thunk wake-port)
|
|
(poll-set-add! poll-set socket *events*)
|
|
(poll-set-add! poll-set wake-port *read-events*)
|
|
(let ((read-queue (make-async-queue #:capacity read-workers))
|
|
(handle-queue (make-async-queue #:capacity read-workers))
|
|
(write-queue (make-async-queue #:capacity write-workers))
|
|
(keepalive-queue (make-async-queue #:capacity write-workers
|
|
#:fixed? #f)))
|
|
(define server
|
|
(make-http-server socket 0 poll-set wake-thunk threaded?
|
|
#f read-queue handle-queue
|
|
#f write-queue keepalive-queue))
|
|
(if threaded?
|
|
(begin
|
|
(set-http-read-workers!
|
|
server
|
|
(make-thread-pool
|
|
read-workers
|
|
(lambda ()
|
|
(cond ((async-queue-pop! read-queue)
|
|
=> (lambda (p) (read-request! server p)))))))
|
|
(start-thread-pool! (http-read-workers server))
|
|
(set-http-write-workers!
|
|
server
|
|
(make-thread-pool
|
|
write-workers
|
|
(lambda ()
|
|
(cond ((async-queue-pop! write-queue)
|
|
=> (lambda (args)
|
|
(apply write-request! server args)))))))
|
|
(start-thread-pool! (http-write-workers server))))
|
|
server)))))
|
|
|
|
(define (bad-request port)
|
|
(write-response (build-response #:version '(1 . 0) #:code 400
|
|
#:headers '((content-length . 0)))
|
|
port))
|
|
|
|
(define (read-request! server port)
|
|
(call-with-error-handling
|
|
(lambda ()
|
|
(cond
|
|
((eof-object? (peek-char port))
|
|
;; EOF.
|
|
(close-port port))
|
|
(else
|
|
;; Otherwise, try to read a request from this port.
|
|
(with-throw-handler #t
|
|
(lambda ()
|
|
(let* ((req (read-request port))
|
|
(body (read-request-body req)))
|
|
(or (async-queue-push! (http-handle-queue server)
|
|
(list port req body))
|
|
(error "failed to push request during shutdown"))
|
|
((http-wake-thunk server))))
|
|
(lambda (k . args)
|
|
(define-syntax-rule (cleanup-catch statement)
|
|
(catch #t
|
|
(lambda () statement)
|
|
(lambda (k . args)
|
|
(format (current-error-port) "In ~a:\n" 'statement)
|
|
(print-exception (current-error-port) #f k args))))
|
|
(cleanup-catch (bad-request port))
|
|
(cleanup-catch (close-port port)))))))
|
|
#:pass-keys '(quit interrupt)
|
|
#:on-error 'backtrace
|
|
#:post-error
|
|
(lambda (k . args)
|
|
(display "While reading request:\n" (current-error-port))
|
|
(print-exception (current-error-port) #f k args))))
|
|
|
|
(define (enqueue-read! server port)
|
|
(if (http-threaded? server)
|
|
(or (async-queue-push! (http-read-queue server) port)
|
|
(false-if-exception
|
|
(begin
|
|
(warn "failed to push read during shutdown.")
|
|
(close-port port))))
|
|
(read-request! server port)))
|
|
|
|
;; -> (client request body | #f #f #f)
|
|
(define (http-read server)
|
|
(let* ((poll-set (http-poll-set server)))
|
|
(let lp ((idx (http-poll-idx server)))
|
|
(let ((revents (poll-set-revents poll-set idx)))
|
|
(cond
|
|
((async-queue-try-pop! (http-handle-queue server))
|
|
=> (lambda (vals) (apply values vals)))
|
|
((async-queue-try-pop! (http-keepalive-queue server))
|
|
=> (lambda (port)
|
|
(poll-set-add! poll-set port *events*)
|
|
(lp idx)))
|
|
((zero? idx)
|
|
;; The server socket, and the end of our downward loop.
|
|
(cond
|
|
((zero? revents)
|
|
;; No client ready, and no error; poll and loop.
|
|
(poll poll-set)
|
|
(lp (1- (poll-set-nfds poll-set))))
|
|
((not (zero? (logand revents *error-events*)))
|
|
;; An error.
|
|
(set-http-poll-idx! server idx)
|
|
(throw 'interrupt))
|
|
(else
|
|
;; A new client. Add to set, poll, and loop.
|
|
;;
|
|
;; FIXME: preserve meta-info.
|
|
(let ((client (accept (poll-set-port poll-set idx))))
|
|
;; Buffer input and output on this port.
|
|
(setvbuf (car client) _IOFBF)
|
|
;; From "HOP, A Fast Server for the Diffuse Web", Serrano.
|
|
(setsockopt (car client) SOL_SOCKET SO_SNDBUF (* 12 1024))
|
|
(poll-set-add! poll-set (car client) *events*)
|
|
(poll poll-set)
|
|
(lp (1- (poll-set-nfds poll-set)))))))
|
|
((zero? revents)
|
|
;; Nothing on this port.
|
|
(lp (1- idx)))
|
|
((= idx 1)
|
|
;; The wakeup socket.
|
|
(flush-wake-port (poll-set-port poll-set idx))
|
|
(lp (1- idx)))
|
|
;; Otherwise, a client socket with some activity on
|
|
;; it. Remove it from the poll set.
|
|
(else
|
|
(let ((port (poll-set-remove! poll-set idx)))
|
|
;; Record the next index in all cases, in case enqueue-read!
|
|
;; throws an error.
|
|
(set-http-poll-idx! server (1- idx))
|
|
(enqueue-read! server port)
|
|
(lp (1- idx)))))))))
|
|
|
|
(define (keep-alive? response)
|
|
(let ((v (response-version response)))
|
|
(and (or (< (response-code response) 400)
|
|
(= (response-code response) 404))
|
|
(case (car v)
|
|
((1)
|
|
(case (cdr v)
|
|
((1) (not (memq 'close (response-connection response))))
|
|
((0) (memq 'keep-alive (response-connection response)))))
|
|
(else #f)))))
|
|
|
|
(define (write-request! server client response body)
|
|
(call-with-error-handling
|
|
(lambda ()
|
|
(let* ((response (write-response response client))
|
|
(port (response-port response)))
|
|
(cond
|
|
((not body)) ; pass
|
|
((bytevector? body)
|
|
(write-response-body response body))
|
|
(else
|
|
(error "Expected a bytevector for body" body)))
|
|
(cond
|
|
((keep-alive? response)
|
|
(force-output port)
|
|
(or (async-queue-push! (http-keepalive-queue server) port)
|
|
;; Shutting down; there is a sane thing to do, no need to
|
|
;; error.
|
|
(close-port port))
|
|
((http-wake-thunk server)))
|
|
(else
|
|
(close-port port)))))
|
|
#:pass-keys '(quit interrupt)
|
|
#:on-error 'backtrace
|
|
#:post-error
|
|
(lambda (k . args)
|
|
(display "While writing response:\n" (current-error-port))
|
|
(print-exception (current-error-port) #f k args))))
|
|
|
|
(define (enqueue-write! server client response body)
|
|
(if (http-threaded? server)
|
|
(or (async-queue-push! (http-write-queue server)
|
|
(list client response body))
|
|
(false-if-exception
|
|
(begin
|
|
(warn "failed to push write during shutdown.")
|
|
(close-port client))))
|
|
(write-request! server client response body)))
|
|
|
|
;; -> 0 values
|
|
(define (http-write server client response body)
|
|
(enqueue-write! server client response body)
|
|
(values))
|
|
|
|
(define (seconds-from-now n)
|
|
(let ((now (gettimeofday)))
|
|
(cons (+ (car now) n) (cdr now))))
|
|
|
|
(define (async-queue-for-each queue proc)
|
|
(let lp ()
|
|
(cond ((async-queue-try-pop! queue)
|
|
=> (lambda (vals) (proc vals) (lp))))))
|
|
|
|
;; -> unspecified values
|
|
(define (http-close server)
|
|
(let ((poll-set (http-poll-set server)))
|
|
(display "Plugging read queue\n")
|
|
(async-queue-stop-accepting! (http-read-queue server))
|
|
(display "Stopping read workers\n")
|
|
(if (http-threaded? server)
|
|
(stop-thread-pool! (http-read-workers server)
|
|
(seconds-from-now 5)
|
|
#:cancel? #t))
|
|
(display "Plugging read queue\n")
|
|
(async-queue-stop-accepting! (http-read-queue server))
|
|
(display "Draining read queue\n")
|
|
(async-queue-for-each (http-read-queue server) close-port)
|
|
(display "Plugging handle queue\n")
|
|
(async-queue-stop-accepting! (http-handle-queue server))
|
|
(display "Draining handle queue\n")
|
|
(async-queue-for-each (http-handle-queue server)
|
|
(lambda (vals) (close-port (car vals))))
|
|
(display "Plugging write queue\n")
|
|
(async-queue-stop-accepting! (http-write-queue server))
|
|
(display "Stopping write workers\n")
|
|
(if (http-threaded? server)
|
|
(stop-thread-pool! (http-write-workers server)
|
|
(seconds-from-now 5)
|
|
#:cancel? #t))
|
|
(display "Draining write queue\n")
|
|
(async-queue-for-each (http-write-queue server)
|
|
(lambda (vals) (close-port (car vals))))
|
|
(display "Plugging keepalive queue\n")
|
|
(async-queue-stop-accepting! (http-keepalive-queue server))
|
|
(display "Draining keepalive queue\n")
|
|
(async-queue-for-each (http-keepalive-queue server) close-port)
|
|
(display "Closing poll ports\n")
|
|
(let lp ((n (poll-set-nfds poll-set)))
|
|
(if (positive? n)
|
|
(begin
|
|
(close-port (poll-set-remove! poll-set (1- n)))
|
|
(lp (1- n)))))))
|
|
|
|
(define-server-impl http
|
|
http-open
|
|
http-read
|
|
http-write
|
|
http-close)
|