diff --git a/NEWS b/NEWS index 3e64129e4..e887ec447 100644 --- a/NEWS +++ b/NEWS @@ -38,6 +38,10 @@ is equivalent to an unbuffered port. Ports may set their default buffer sizes, and some ports (for example soft ports) are unbuffered by default for historical reasons. +** Support for non-blocking I/O + +See "Non-Blocking I/O" in the manual, for more. + ** Removal of port locks As part of the 2.2 series, we introduced recursive locks on each port, diff --git a/doc/ref/api-io.texi b/doc/ref/api-io.texi index 5b200977b..313204593 100644 --- a/doc/ref/api-io.texi +++ b/doc/ref/api-io.texi @@ -20,6 +20,7 @@ * Port Types:: Types of port and how to make them. * R6RS I/O Ports:: The R6RS port API. * I/O Extensions:: Implementing new port types in C. +* Non-Blocking I/O:: How Guile deals with EWOULDBLOCK. * BOM Handling:: Handling of Unicode byte order marks. @end menu @@ -2302,6 +2303,24 @@ It should write out bytes from the supplied bytevector @code{src}, starting at offset @code{start} and continuing for @code{count} bytes, and return the number of bytes that were written. +@item read_wait_fd +@itemx write_wait_fd +If a port's @code{read} or @code{write} function returns @code{(size_t) +-1}, that indicates that reading or writing would block. In that case +to preserve the illusion of a blocking read or write operation, Guile's +C port run-time will @code{poll} on the file descriptor returned by +either the port's @code{read_wait_fd} or @code{write_wait_fd} function. +Set using + +@deftypefun void scm_set_port_read_wait_fd (scm_t_port_type *type, int (*wait_fd) (SCM port)) +@deftypefunx void scm_set_port_write_wait_fd (scm_t_port_type *type, int (*wait_fd) (SCM port)) +@end deftypefun + +Only a port type which implements the @code{read_wait_fd} or +@code{write_wait_fd} port methods can usefully return @code{(size_t) -1} +from a read or write function. @xref{Non-Blocking I/O}, for more on +non-blocking I/O in Guile. + @item print Called when @code{write} is called on the port, to print a port description. For example, for a file port it may produce something @@ -2384,6 +2403,74 @@ operating system inform Guile about the appropriate buffer sizes for the particular file opened by the port. @end table +@node Non-Blocking I/O +@subsection Non-Blocking I/O + +Most ports in Guile are @dfn{blocking}: when you try to read a character +from a port, Guile will block on the read until a character is ready, or +end-of-stream is detected. Likewise whenever Guile goes to write +(possibly buffered) data to an output port, Guile will block until all +the data is written. + +Interacting with ports in blocking mode is very convenient: you can +write straightforward, sequential algorithms whose code flow reflects +the flow of data. However, blocking I/O has two main limitations. + +The first is that it's easy to get into a situation where code is +waiting on data. Time spent waiting on data when code could be doing +something else is wasteful and prevents your program from reaching its +peak throughput. If you implement a web server that sequentially +handles requests from clients, it's very easy for the server to end up +waiting on a client to finish its HTTP request, or waiting on it to +consume the response. The end result is that you are able to serve +fewer requests per second than you'd like to serve. + +The second limitation is related: a blocking parser over user-controlled +input is a denial-of-service vulnerability. Indeed the so-called ``slow +loris'' attack of the early 2010s was just that: an attack on common web +servers that drip-fed HTTP requests, one character at a time. All it +took was a handful of slow loris connections to occupy an entire web +server. + +In Guile we would like to preserve the ability to write straightforward +blocking networking processes of all kinds, but under the hood to allow +those processes to suspend their requests if they would block. + +To do this, the first piece is to allow Guile ports to declare +themselves as being nonblocking. This is currently supported only for +file ports, which also includes sockets, terminals, or any other port +that is backed by a file descriptor. To do that, we use an arcane UNIX +incantation: + +@example +(let ((flags (fcntl socket F_GETFL))) + (fcntl socket F_SETFL (logior O_NONBLOCK flags))) +@end example + +Now the file descriptor is open in non-blocking mode. If Guile tries to +read or write from this file descriptor in C, it will block by polling +on the socket's @code{read_wait_fd}, to preserve the illusion of a +blocking read or write. @xref{I/O Extensions} for more on that internal +interface. + +However if a user uses the new and experimental Scheme implementation of +ports in @code{(ice-9 sports)}, Guile instead calls the value of the +@code{current-read-waiter} or @code{current-write-waiter} parameters on +the port before re-trying the read or write. The default value of these +parameters does the same thing as the C port runtime: it blocks. +However it's possible to dynamically bind these parameters to handlers +that can suspend the current coroutine to a scheduler, to be later +re-animated once the port becomes readable or writable in the future. +In the mean-time the scheduler can run other code, for example servicing +other web requests. + +Guile does not currently include such a scheduler. Currently we want to +make sure that we're providing the right primitives that can be used to +build schedulers and other user-space concurrency patterns. In the +meantime, have a look at 8sync (@url{https://gnu.org/software/8sync}) +for a prototype of an asynchronous I/O and concurrency facility. + + @node BOM Handling @subsection Handling of Unicode byte order marks. @cindex BOM diff --git a/libguile/fports.c b/libguile/fports.c index 046a844e9..271f3a0a1 100644 --- a/libguile/fports.c +++ b/libguile/fports.c @@ -573,14 +573,24 @@ fport_print (SCM exp, SCM port, scm_print_state *pstate SCM_UNUSED) static size_t fport_read (SCM port, SCM dst, size_t start, size_t count) { - long res; scm_t_fport *fp = SCM_FSTREAM (port); signed char *ptr = SCM_BYTEVECTOR_CONTENTS (dst) + start; + ssize_t ret; - SCM_SYSCALL (res = read (fp->fdes, ptr, count)); - if (res == -1) - scm_syserror ("fport_read"); - return res; + retry: + ret = read (fp->fdes, ptr, count); + if (ret < 0) + { + if (errno == EINTR) + { + SCM_ASYNC_TICK; + goto retry; + } + if (errno == EWOULDBLOCK || errno == EAGAIN) + return -1; + scm_syserror ("fport_read"); + } + return ret; } static size_t @@ -588,11 +598,23 @@ fport_write (SCM port, SCM src, size_t start, size_t count) { int fd = SCM_FPORT_FDES (port); signed char *ptr = SCM_BYTEVECTOR_CONTENTS (src) + start; + ssize_t ret; - if (full_write (fd, ptr, count) < count) - scm_syserror ("fport_write"); + retry: + ret = write (fd, ptr, count); + if (ret < 0) + { + if (errno == EINTR) + { + SCM_ASYNC_TICK; + goto retry; + } + if (errno == EWOULDBLOCK || errno == EAGAIN) + return -1; + scm_syserror ("fport_write"); + } - return count; + return ret; } static scm_t_off @@ -637,6 +659,12 @@ fport_random_access_p (SCM port) return SCM_FDES_RANDOM_P (SCM_FSTREAM (port)->fdes); } +static int +fport_wait_fd (SCM port) +{ + return SCM_FSTREAM (port)->fdes; +} + /* Query the OS to get the natural buffering for FPORT, if available. */ static void fport_get_natural_buffer_sizes (SCM port, size_t *read_size, size_t *write_size) @@ -660,6 +688,8 @@ scm_make_fptob () scm_set_port_close (ptob, fport_close); scm_set_port_seek (ptob, fport_seek); scm_set_port_truncate (ptob, fport_truncate); + scm_set_port_read_wait_fd (ptob, fport_wait_fd); + scm_set_port_write_wait_fd (ptob, fport_wait_fd); scm_set_port_input_waiting (ptob, fport_input_waiting); scm_set_port_random_access_p (ptob, fport_random_access_p); scm_set_port_get_natural_buffer_sizes (ptob, fport_get_natural_buffer_sizes); diff --git a/libguile/ports-internal.h b/libguile/ports-internal.h index 54ce3e4b0..38da49eb7 100644 --- a/libguile/ports-internal.h +++ b/libguile/ports-internal.h @@ -44,6 +44,9 @@ struct scm_t_port_type SCM scm_read; SCM scm_write; + int (*read_wait_fd) (SCM port); + int (*write_wait_fd) (SCM port); + scm_t_off (*seek) (SCM port, scm_t_off OFFSET, int WHENCE); void (*close) (SCM port); diff --git a/libguile/ports.c b/libguile/ports.c index c67bdf53b..ba3755507 100644 --- a/libguile/ports.c +++ b/libguile/ports.c @@ -33,6 +33,7 @@ #include /* for chsize on mingw */ #include #include +#include #include #include #include @@ -126,6 +127,18 @@ default_random_access_p (SCM port) return SCM_PORT_TYPE (port)->seek != NULL; } +static int +default_read_wait_fd (SCM port) +{ + scm_misc_error ("read_wait_fd", "unimplemented", SCM_EOL); +} + +static int +default_write_wait_fd (SCM port) +{ + scm_misc_error ("write_wait_fd", "unimplemented", SCM_EOL); +} + scm_t_port_type * scm_make_port_type (char *name, size_t (*read) (SCM port, SCM dst, size_t start, @@ -144,6 +157,8 @@ scm_make_port_type (char *name, desc->c_write = write; desc->scm_read = read ? trampoline_to_c_read_subr : SCM_BOOL_F; desc->scm_write = write ? trampoline_to_c_write_subr : SCM_BOOL_F; + desc->read_wait_fd = default_read_wait_fd; + desc->write_wait_fd = default_write_wait_fd; desc->random_access_p = default_random_access_p; scm_make_port_classes (desc); @@ -154,7 +169,7 @@ static SCM trampoline_to_c_read (SCM port, SCM dst, SCM start, SCM count) #define FUNC_NAME "port-read" { - size_t c_start, c_count; + size_t c_start, c_count, ret; SCM_VALIDATE_OPPORT (1, port); c_start = scm_to_size_t (start); @@ -162,24 +177,25 @@ trampoline_to_c_read (SCM port, SCM dst, SCM start, SCM count) SCM_ASSERT_RANGE (2, start, c_start <= c_count); SCM_ASSERT_RANGE (3, count, c_start+c_count <= scm_c_bytevector_length (dst)); - return scm_from_size_t - (SCM_PORT_TYPE (port)->c_read (port, dst, c_start, c_count)); + ret = SCM_PORT_TYPE (port)->c_read (port, dst, c_start, c_count); + + return ret == (size_t) -1 ? SCM_BOOL_F : scm_from_size_t (ret); } #undef FUNC_NAME static size_t trampoline_to_scm_read (SCM port, SCM dst, size_t start, size_t count) { - return scm_to_size_t - (scm_call_4 (SCM_PORT_TYPE (port)->scm_read, port, dst, - scm_from_size_t (start), scm_from_size_t (count))); + SCM ret = scm_call_4 (SCM_PORT_TYPE (port)->scm_read, port, dst, + scm_from_size_t (start), scm_from_size_t (count)); + return scm_is_true (ret) ? scm_to_size_t (ret) : (size_t) -1; } static SCM trampoline_to_c_write (SCM port, SCM src, SCM start, SCM count) #define FUNC_NAME "port-write" { - size_t c_start, c_count; + size_t c_start, c_count, ret; SCM_VALIDATE_OPPORT (1, port); c_start = scm_to_size_t (start); @@ -187,17 +203,18 @@ trampoline_to_c_write (SCM port, SCM src, SCM start, SCM count) SCM_ASSERT_RANGE (2, start, c_start <= c_count); SCM_ASSERT_RANGE (3, count, c_start+c_count <= scm_c_bytevector_length (src)); - return scm_from_size_t - (SCM_PORT_TYPE (port)->c_write (port, src, c_start, c_count)); + ret = SCM_PORT_TYPE (port)->c_write (port, src, c_start, c_count); + + return ret == (size_t) -1 ? SCM_BOOL_F : scm_from_size_t (ret); } #undef FUNC_NAME static size_t trampoline_to_scm_write (SCM port, SCM src, size_t start, size_t count) { - return scm_to_size_t - (scm_call_4 (SCM_PORT_TYPE (port)->scm_write, port, src, - scm_from_size_t (start), scm_from_size_t (count))); + SCM ret = scm_call_4 (SCM_PORT_TYPE (port)->scm_write, port, src, + scm_from_size_t (start), scm_from_size_t (count)); + return scm_is_true (ret) ? scm_to_size_t (ret) : (size_t) -1; } void @@ -214,6 +231,18 @@ scm_set_port_scm_write (scm_t_port_type *ptob, SCM write) ptob->c_write = trampoline_to_scm_write; } +void +scm_set_port_read_wait_fd (scm_t_port_type *ptob, int (*get_fd) (SCM)) +{ + ptob->read_wait_fd = get_fd; +} + +void +scm_set_port_write_wait_fd (scm_t_port_type *ptob, int (*get_fd) (SCM)) +{ + ptob->write_wait_fd = get_fd; +} + void scm_set_port_print (scm_t_port_type *ptob, int (*print) (SCM exp, SCM port, scm_print_state *pstate)) @@ -1230,6 +1259,116 @@ SCM_DEFINE (scm_set_port_conversion_strategy_x, "set-port-conversion-strategy!", #undef FUNC_NAME + + +/* Non-blocking I/O. */ + +static int +port_read_wait_fd (SCM port) +{ + scm_t_port_type *ptob = SCM_PORT_TYPE (port); + return ptob->read_wait_fd (port); +} + +static int +port_write_wait_fd (SCM port) +{ + scm_t_port_type *ptob = SCM_PORT_TYPE (port); + return ptob->write_wait_fd (port); +} + +SCM_INTERNAL SCM scm_port_read_wait_fd (SCM); +SCM_DEFINE (scm_port_read_wait_fd, "port-read-wait-fd", 1, 0, 0, + (SCM port), "") +#define FUNC_NAME s_scm_port_read_wait_fd +{ + int fd; + + port = SCM_COERCE_OUTPORT (port); + SCM_VALIDATE_OPINPORT (1, port); + + fd = port_read_wait_fd (port); + return fd < 0 ? SCM_BOOL_F : scm_from_int (fd); +} +#undef FUNC_NAME + +SCM_INTERNAL SCM scm_port_write_wait_fd (SCM); +SCM_DEFINE (scm_port_write_wait_fd, "port-write-wait-fd", 1, 0, 0, + (SCM port), "") +#define FUNC_NAME s_scm_port_write_wait_fd +{ + int fd; + + port = SCM_COERCE_OUTPORT (port); + SCM_VALIDATE_OPOUTPORT (1, port); + + fd = port_write_wait_fd (port); + return fd < 0 ? SCM_BOOL_F : scm_from_int (fd); +} +#undef FUNC_NAME + +static int +port_poll (SCM port, short events, int timeout) +#define FUNC_NAME "port-poll" +{ + struct pollfd pollfd[2]; + int nfds = 0, rv = 0; + + if (events & POLLIN) + { + pollfd[nfds].fd = port_read_wait_fd (port); + pollfd[nfds].events = events & (POLLIN | POLLPRI); + pollfd[nfds].revents = 0; + nfds++; + } + if (events & POLLOUT) + { + pollfd[nfds].fd = port_write_wait_fd (port); + pollfd[nfds].events = events & (POLLOUT | POLLPRI); + pollfd[nfds].revents = 0; + nfds++; + } + + if (nfds == 2 && pollfd[0].fd == pollfd[1].fd) + { + pollfd[0].events |= pollfd[1].events; + nfds--; + } + + SCM_SYSCALL (rv = poll (pollfd, nfds, timeout)); + if (rv < 0) + SCM_SYSERROR; + + return rv; +} +#undef FUNC_NAME + +SCM_INTERNAL SCM scm_port_poll (SCM, SCM, SCM); +SCM_DEFINE (scm_port_poll, "port-poll", 2, 1, 0, + (SCM port, SCM events, SCM timeout), + "") +#define FUNC_NAME s_scm_port_poll +{ + short c_events = 0; + int c_timeout; + + port = SCM_COERCE_OUTPORT (port); + SCM_VALIDATE_PORT (1, port); + SCM_VALIDATE_STRING (2, events); + c_timeout = SCM_UNBNDP (timeout) ? -1 : SCM_NUM2INT (3, timeout); + + if (scm_i_string_contains_char (events, 'r')) + c_events |= POLLIN; + if (scm_i_string_contains_char (events, '!')) + c_events |= POLLPRI; + if (scm_i_string_contains_char (events, 'w')) + c_events |= POLLIN; + + return scm_from_int (port_poll (port, c_events, c_timeout)); +} +#undef FUNC_NAME + + /* Input. */ @@ -1330,8 +1469,15 @@ scm_i_read_bytes (SCM port, SCM dst, size_t start, size_t count) assert (count <= SCM_BYTEVECTOR_LENGTH (dst)); assert (start + count <= SCM_BYTEVECTOR_LENGTH (dst)); + retry: filled = ptob->c_read (port, dst, start, count); + if (filled == (size_t) -1) + { + port_poll (port, POLLIN, -1); + goto retry; + } + assert (filled <= count); return filled; @@ -2508,7 +2654,14 @@ scm_i_write_bytes (SCM port, SCM src, size_t start, size_t count) assert (start + count <= SCM_BYTEVECTOR_LENGTH (src)); do - written += ptob->c_write (port, src, start + written, count - written); + { + size_t ret = ptob->c_write (port, src, start + written, count - written); + + if (ret == (size_t) -1) + port_poll (port, POLLOUT, -1); + else + written += ret; + } while (written < count); assert (written == count); diff --git a/libguile/ports.h b/libguile/ports.h index 43cd7458d..2905f68db 100644 --- a/libguile/ports.h +++ b/libguile/ports.h @@ -90,6 +90,10 @@ SCM_API scm_t_port_type *scm_make_port_type size_t (*write) (SCM port, SCM src, size_t start, size_t count)); SCM_API void scm_set_port_scm_read (scm_t_port_type *ptob, SCM read); SCM_API void scm_set_port_scm_write (scm_t_port_type *ptob, SCM write); +SCM_API void scm_set_port_read_wait_fd (scm_t_port_type *ptob, + int (*wait_fd) (SCM port)); +SCM_API void scm_set_port_write_wait_fd (scm_t_port_type *ptob, + int (*wait_fd) (SCM port)); SCM_API void scm_set_port_print (scm_t_port_type *ptob, int (*print) (SCM exp, SCM port, diff --git a/module/ice-9/ports.scm b/module/ice-9/ports.scm index a7f237347..4330ebedf 100644 --- a/module/ice-9/ports.scm +++ b/module/ice-9/ports.scm @@ -179,7 +179,10 @@ interpret its input and output." specialize-port-encoding! port-random-access? port-decode-char - port-read-buffering)) + port-read-buffering + port-poll + port-read-wait-fd + port-write-wait-fd)) (define-syntax-rule (port-buffer-bytevector buf) (vector-ref buf 0)) (define-syntax-rule (port-buffer-cur buf) (vector-ref buf 1)) @@ -209,7 +212,10 @@ interpret its input and output." specialize-port-encoding! port-decode-char port-random-access? - port-read-buffering) + port-read-buffering + port-poll + port-read-wait-fd + port-write-wait-fd) ;; And we're back. (define-module (ice-9 ports)) diff --git a/module/ice-9/sports.scm b/module/ice-9/sports.scm index 6fd7ddd31..c178b7310 100644 --- a/module/ice-9/sports.scm +++ b/module/ice-9/sports.scm @@ -54,7 +54,9 @@ #:replace (peek-char read-char) #:export (lookahead-u8 - get-u8)) + get-u8 + current-read-waiter + current-write-waiter)) (define (write-bytes port src start count) (let ((written ((port-write port) port src start count))) @@ -77,11 +79,25 @@ (set-port-buffer-end! buf 0) (write-bytes port (port-buffer-bytevector buf) cur (- end cur))))) +(define (default-read-waiter port) (port-poll port "r")) +(define (default-write-waiter port) (port-poll port "w")) + +(define current-read-waiter (make-parameter default-read-waiter)) +(define current-write-waiter (make-parameter default-write-waiter)) + +(define (wait-for-readable port) ((current-read-waiter) port)) +(define (wait-for-writable port) ((current-write-waiter) port)) + (define (read-bytes port dst start count) - (let ((read ((port-read port) port dst start count))) - (unless (<= 0 read count) - (error "bad return from port read function" read)) - read)) + (cond + (((port-read port) port dst start count) + => (lambda (read) + (unless (<= 0 read count) + (error "bad return from port read function" read)) + read)) + (else + (wait-for-readable port) + (read-bytes port dst start count)))) (define utf8-bom #vu8(#xEF #xBB #xBF)) (define utf16be-bom #vu8(#xFE #xFF)) diff --git a/test-suite/tests/ports.test b/test-suite/tests/ports.test index 029dd2dd9..dfa430e5a 100644 --- a/test-suite/tests/ports.test +++ b/test-suite/tests/ports.test @@ -23,6 +23,7 @@ #:use-module (test-suite guile-test) #:use-module (ice-9 popen) #:use-module (ice-9 rdelim) + #:use-module (ice-9 threads) #:use-module (rnrs bytevectors) #:use-module ((ice-9 binary-ports) #:select (open-bytevector-input-port open-bytevector-output-port @@ -601,20 +602,18 @@ (pass-if "unread residue" (string=? (read-line) "moon")))) -;;; non-blocking mode on a port. create a pipe and set O_NONBLOCK on -;;; the reading end. try to read a byte: should get EAGAIN or -;;; EWOULDBLOCK error. -(let* ((p (pipe)) - (r (car p))) - (fcntl r F_SETFL (logior (fcntl r F_GETFL) O_NONBLOCK)) - (pass-if "non-blocking-I/O" - (catch 'system-error - (lambda () (read-char r) #f) - (lambda (key . args) - (and (eq? key 'system-error) - (let ((errno (car (list-ref args 3)))) - (or (= errno EAGAIN) - (= errno EWOULDBLOCK)))))))) +(when (provided? 'threads) + (let* ((p (pipe)) + (r (car p)) + (w (cdr p))) + (fcntl r F_SETFL (logior (fcntl r F_GETFL) O_NONBLOCK)) + (let ((thread (call-with-new-thread + (lambda () + (usleep (* 250 1000)) + (write-char #\a w) + (force-output w))))) + (pass-if-equal "non-blocking-I/O" #\a (read-char r)) + (join-thread thread)))) ;;;; Pipe (popen) ports.