1
Fork 0
mirror of https://git.savannah.gnu.org/git/guile.git synced 2025-04-30 03:40:34 +02:00

Support for non-blocking I/O

* doc/ref/api-io.texi (I/O Extensions): Document read_wait_fd /
  write_wait_fd members.
  (Non-Blocking I/O): New section.
* libguile/fports.c (fport_read, fport_write): Return -1 if the
  operation would block.
  (fport_wait_fd, scm_make_fptob): Add read/write wait-fd
  implementation.
* libguile/ports-internal.h (scm_t_port_type): Add read_wait_fd /
  write_wait_fd.
* libguile/ports.c (default_read_wait_fd, default_write_wait_fd): New
  functions.
  (scm_make_port_type): Initialize default read/write wait fd impls.
  (trampoline_to_c_read, trampoline_to_scm_read)
  (trampoline_to_c_write, trampoline_to_scm_write): To Scheme, a return
  of #f indicates EWOULDBLOCk.
  (scm_set_port_read_wait_fd, scm_set_port_write_wait_fd): New
  functions.
  (port_read_wait_fd, port_write_wait_fd, scm_port_read_wait_fd)
  (scm_port_write_wait_fd, port_poll, scm_port_poll): New functions.
  (scm_i_read_bytes, scm_i_write_bytes): Poll if the read or write would
  block.
* libguile/ports.h (scm_set_port_read_wait_fd)
  (scm_set_port_write_wait_fd): Add declarations.
* module/ice-9/ports.scm: Shunt port-poll and port-{read,write}-wait-fd
  to the internals module.
* module/ice-9/sports.scm (current-write-waiter):
  (current-read-waiter): Implement.
* test-suite/tests/ports.test: Adapt non-blocking test to new behavior.
* NEWS: Add entry.
This commit is contained in:
Andy Wingo 2016-05-20 14:51:51 +02:00
parent 8b6f4df3f4
commit 534139e458
9 changed files with 344 additions and 42 deletions

4
NEWS
View file

@ -38,6 +38,10 @@ is equivalent to an unbuffered port. Ports may set their default buffer
sizes, and some ports (for example soft ports) are unbuffered by default
for historical reasons.
** Support for non-blocking I/O
See "Non-Blocking I/O" in the manual, for more.
** Removal of port locks
As part of the 2.2 series, we introduced recursive locks on each port,

View file

@ -20,6 +20,7 @@
* Port Types:: Types of port and how to make them.
* R6RS I/O Ports:: The R6RS port API.
* I/O Extensions:: Implementing new port types in C.
* Non-Blocking I/O:: How Guile deals with EWOULDBLOCK.
* BOM Handling:: Handling of Unicode byte order marks.
@end menu
@ -2302,6 +2303,24 @@ It should write out bytes from the supplied bytevector @code{src},
starting at offset @code{start} and continuing for @code{count} bytes,
and return the number of bytes that were written.
@item read_wait_fd
@itemx write_wait_fd
If a port's @code{read} or @code{write} function returns @code{(size_t)
-1}, that indicates that reading or writing would block. In that case
to preserve the illusion of a blocking read or write operation, Guile's
C port run-time will @code{poll} on the file descriptor returned by
either the port's @code{read_wait_fd} or @code{write_wait_fd} function.
Set using
@deftypefun void scm_set_port_read_wait_fd (scm_t_port_type *type, int (*wait_fd) (SCM port))
@deftypefunx void scm_set_port_write_wait_fd (scm_t_port_type *type, int (*wait_fd) (SCM port))
@end deftypefun
Only a port type which implements the @code{read_wait_fd} or
@code{write_wait_fd} port methods can usefully return @code{(size_t) -1}
from a read or write function. @xref{Non-Blocking I/O}, for more on
non-blocking I/O in Guile.
@item print
Called when @code{write} is called on the port, to print a port
description. For example, for a file port it may produce something
@ -2384,6 +2403,74 @@ operating system inform Guile about the appropriate buffer sizes for the
particular file opened by the port.
@end table
@node Non-Blocking I/O
@subsection Non-Blocking I/O
Most ports in Guile are @dfn{blocking}: when you try to read a character
from a port, Guile will block on the read until a character is ready, or
end-of-stream is detected. Likewise whenever Guile goes to write
(possibly buffered) data to an output port, Guile will block until all
the data is written.
Interacting with ports in blocking mode is very convenient: you can
write straightforward, sequential algorithms whose code flow reflects
the flow of data. However, blocking I/O has two main limitations.
The first is that it's easy to get into a situation where code is
waiting on data. Time spent waiting on data when code could be doing
something else is wasteful and prevents your program from reaching its
peak throughput. If you implement a web server that sequentially
handles requests from clients, it's very easy for the server to end up
waiting on a client to finish its HTTP request, or waiting on it to
consume the response. The end result is that you are able to serve
fewer requests per second than you'd like to serve.
The second limitation is related: a blocking parser over user-controlled
input is a denial-of-service vulnerability. Indeed the so-called ``slow
loris'' attack of the early 2010s was just that: an attack on common web
servers that drip-fed HTTP requests, one character at a time. All it
took was a handful of slow loris connections to occupy an entire web
server.
In Guile we would like to preserve the ability to write straightforward
blocking networking processes of all kinds, but under the hood to allow
those processes to suspend their requests if they would block.
To do this, the first piece is to allow Guile ports to declare
themselves as being nonblocking. This is currently supported only for
file ports, which also includes sockets, terminals, or any other port
that is backed by a file descriptor. To do that, we use an arcane UNIX
incantation:
@example
(let ((flags (fcntl socket F_GETFL)))
(fcntl socket F_SETFL (logior O_NONBLOCK flags)))
@end example
Now the file descriptor is open in non-blocking mode. If Guile tries to
read or write from this file descriptor in C, it will block by polling
on the socket's @code{read_wait_fd}, to preserve the illusion of a
blocking read or write. @xref{I/O Extensions} for more on that internal
interface.
However if a user uses the new and experimental Scheme implementation of
ports in @code{(ice-9 sports)}, Guile instead calls the value of the
@code{current-read-waiter} or @code{current-write-waiter} parameters on
the port before re-trying the read or write. The default value of these
parameters does the same thing as the C port runtime: it blocks.
However it's possible to dynamically bind these parameters to handlers
that can suspend the current coroutine to a scheduler, to be later
re-animated once the port becomes readable or writable in the future.
In the mean-time the scheduler can run other code, for example servicing
other web requests.
Guile does not currently include such a scheduler. Currently we want to
make sure that we're providing the right primitives that can be used to
build schedulers and other user-space concurrency patterns. In the
meantime, have a look at 8sync (@url{https://gnu.org/software/8sync})
for a prototype of an asynchronous I/O and concurrency facility.
@node BOM Handling
@subsection Handling of Unicode byte order marks.
@cindex BOM

View file

@ -573,14 +573,24 @@ fport_print (SCM exp, SCM port, scm_print_state *pstate SCM_UNUSED)
static size_t
fport_read (SCM port, SCM dst, size_t start, size_t count)
{
long res;
scm_t_fport *fp = SCM_FSTREAM (port);
signed char *ptr = SCM_BYTEVECTOR_CONTENTS (dst) + start;
ssize_t ret;
SCM_SYSCALL (res = read (fp->fdes, ptr, count));
if (res == -1)
retry:
ret = read (fp->fdes, ptr, count);
if (ret < 0)
{
if (errno == EINTR)
{
SCM_ASYNC_TICK;
goto retry;
}
if (errno == EWOULDBLOCK || errno == EAGAIN)
return -1;
scm_syserror ("fport_read");
return res;
}
return ret;
}
static size_t
@ -588,11 +598,23 @@ fport_write (SCM port, SCM src, size_t start, size_t count)
{
int fd = SCM_FPORT_FDES (port);
signed char *ptr = SCM_BYTEVECTOR_CONTENTS (src) + start;
ssize_t ret;
if (full_write (fd, ptr, count) < count)
retry:
ret = write (fd, ptr, count);
if (ret < 0)
{
if (errno == EINTR)
{
SCM_ASYNC_TICK;
goto retry;
}
if (errno == EWOULDBLOCK || errno == EAGAIN)
return -1;
scm_syserror ("fport_write");
}
return count;
return ret;
}
static scm_t_off
@ -637,6 +659,12 @@ fport_random_access_p (SCM port)
return SCM_FDES_RANDOM_P (SCM_FSTREAM (port)->fdes);
}
static int
fport_wait_fd (SCM port)
{
return SCM_FSTREAM (port)->fdes;
}
/* Query the OS to get the natural buffering for FPORT, if available. */
static void
fport_get_natural_buffer_sizes (SCM port, size_t *read_size, size_t *write_size)
@ -660,6 +688,8 @@ scm_make_fptob ()
scm_set_port_close (ptob, fport_close);
scm_set_port_seek (ptob, fport_seek);
scm_set_port_truncate (ptob, fport_truncate);
scm_set_port_read_wait_fd (ptob, fport_wait_fd);
scm_set_port_write_wait_fd (ptob, fport_wait_fd);
scm_set_port_input_waiting (ptob, fport_input_waiting);
scm_set_port_random_access_p (ptob, fport_random_access_p);
scm_set_port_get_natural_buffer_sizes (ptob, fport_get_natural_buffer_sizes);

View file

@ -44,6 +44,9 @@ struct scm_t_port_type
SCM scm_read;
SCM scm_write;
int (*read_wait_fd) (SCM port);
int (*write_wait_fd) (SCM port);
scm_t_off (*seek) (SCM port, scm_t_off OFFSET, int WHENCE);
void (*close) (SCM port);

View file

@ -33,6 +33,7 @@
#include <fcntl.h> /* for chsize on mingw */
#include <assert.h>
#include <iconv.h>
#include <poll.h>
#include <uniconv.h>
#include <unistr.h>
#include <striconveh.h>
@ -126,6 +127,18 @@ default_random_access_p (SCM port)
return SCM_PORT_TYPE (port)->seek != NULL;
}
static int
default_read_wait_fd (SCM port)
{
scm_misc_error ("read_wait_fd", "unimplemented", SCM_EOL);
}
static int
default_write_wait_fd (SCM port)
{
scm_misc_error ("write_wait_fd", "unimplemented", SCM_EOL);
}
scm_t_port_type *
scm_make_port_type (char *name,
size_t (*read) (SCM port, SCM dst, size_t start,
@ -144,6 +157,8 @@ scm_make_port_type (char *name,
desc->c_write = write;
desc->scm_read = read ? trampoline_to_c_read_subr : SCM_BOOL_F;
desc->scm_write = write ? trampoline_to_c_write_subr : SCM_BOOL_F;
desc->read_wait_fd = default_read_wait_fd;
desc->write_wait_fd = default_write_wait_fd;
desc->random_access_p = default_random_access_p;
scm_make_port_classes (desc);
@ -154,7 +169,7 @@ static SCM
trampoline_to_c_read (SCM port, SCM dst, SCM start, SCM count)
#define FUNC_NAME "port-read"
{
size_t c_start, c_count;
size_t c_start, c_count, ret;
SCM_VALIDATE_OPPORT (1, port);
c_start = scm_to_size_t (start);
@ -162,24 +177,25 @@ trampoline_to_c_read (SCM port, SCM dst, SCM start, SCM count)
SCM_ASSERT_RANGE (2, start, c_start <= c_count);
SCM_ASSERT_RANGE (3, count, c_start+c_count <= scm_c_bytevector_length (dst));
return scm_from_size_t
(SCM_PORT_TYPE (port)->c_read (port, dst, c_start, c_count));
ret = SCM_PORT_TYPE (port)->c_read (port, dst, c_start, c_count);
return ret == (size_t) -1 ? SCM_BOOL_F : scm_from_size_t (ret);
}
#undef FUNC_NAME
static size_t
trampoline_to_scm_read (SCM port, SCM dst, size_t start, size_t count)
{
return scm_to_size_t
(scm_call_4 (SCM_PORT_TYPE (port)->scm_read, port, dst,
scm_from_size_t (start), scm_from_size_t (count)));
SCM ret = scm_call_4 (SCM_PORT_TYPE (port)->scm_read, port, dst,
scm_from_size_t (start), scm_from_size_t (count));
return scm_is_true (ret) ? scm_to_size_t (ret) : (size_t) -1;
}
static SCM
trampoline_to_c_write (SCM port, SCM src, SCM start, SCM count)
#define FUNC_NAME "port-write"
{
size_t c_start, c_count;
size_t c_start, c_count, ret;
SCM_VALIDATE_OPPORT (1, port);
c_start = scm_to_size_t (start);
@ -187,17 +203,18 @@ trampoline_to_c_write (SCM port, SCM src, SCM start, SCM count)
SCM_ASSERT_RANGE (2, start, c_start <= c_count);
SCM_ASSERT_RANGE (3, count, c_start+c_count <= scm_c_bytevector_length (src));
return scm_from_size_t
(SCM_PORT_TYPE (port)->c_write (port, src, c_start, c_count));
ret = SCM_PORT_TYPE (port)->c_write (port, src, c_start, c_count);
return ret == (size_t) -1 ? SCM_BOOL_F : scm_from_size_t (ret);
}
#undef FUNC_NAME
static size_t
trampoline_to_scm_write (SCM port, SCM src, size_t start, size_t count)
{
return scm_to_size_t
(scm_call_4 (SCM_PORT_TYPE (port)->scm_write, port, src,
scm_from_size_t (start), scm_from_size_t (count)));
SCM ret = scm_call_4 (SCM_PORT_TYPE (port)->scm_write, port, src,
scm_from_size_t (start), scm_from_size_t (count));
return scm_is_true (ret) ? scm_to_size_t (ret) : (size_t) -1;
}
void
@ -214,6 +231,18 @@ scm_set_port_scm_write (scm_t_port_type *ptob, SCM write)
ptob->c_write = trampoline_to_scm_write;
}
void
scm_set_port_read_wait_fd (scm_t_port_type *ptob, int (*get_fd) (SCM))
{
ptob->read_wait_fd = get_fd;
}
void
scm_set_port_write_wait_fd (scm_t_port_type *ptob, int (*get_fd) (SCM))
{
ptob->write_wait_fd = get_fd;
}
void
scm_set_port_print (scm_t_port_type *ptob,
int (*print) (SCM exp, SCM port, scm_print_state *pstate))
@ -1230,6 +1259,116 @@ SCM_DEFINE (scm_set_port_conversion_strategy_x, "set-port-conversion-strategy!",
#undef FUNC_NAME
/* Non-blocking I/O. */
static int
port_read_wait_fd (SCM port)
{
scm_t_port_type *ptob = SCM_PORT_TYPE (port);
return ptob->read_wait_fd (port);
}
static int
port_write_wait_fd (SCM port)
{
scm_t_port_type *ptob = SCM_PORT_TYPE (port);
return ptob->write_wait_fd (port);
}
SCM_INTERNAL SCM scm_port_read_wait_fd (SCM);
SCM_DEFINE (scm_port_read_wait_fd, "port-read-wait-fd", 1, 0, 0,
(SCM port), "")
#define FUNC_NAME s_scm_port_read_wait_fd
{
int fd;
port = SCM_COERCE_OUTPORT (port);
SCM_VALIDATE_OPINPORT (1, port);
fd = port_read_wait_fd (port);
return fd < 0 ? SCM_BOOL_F : scm_from_int (fd);
}
#undef FUNC_NAME
SCM_INTERNAL SCM scm_port_write_wait_fd (SCM);
SCM_DEFINE (scm_port_write_wait_fd, "port-write-wait-fd", 1, 0, 0,
(SCM port), "")
#define FUNC_NAME s_scm_port_write_wait_fd
{
int fd;
port = SCM_COERCE_OUTPORT (port);
SCM_VALIDATE_OPOUTPORT (1, port);
fd = port_write_wait_fd (port);
return fd < 0 ? SCM_BOOL_F : scm_from_int (fd);
}
#undef FUNC_NAME
static int
port_poll (SCM port, short events, int timeout)
#define FUNC_NAME "port-poll"
{
struct pollfd pollfd[2];
int nfds = 0, rv = 0;
if (events & POLLIN)
{
pollfd[nfds].fd = port_read_wait_fd (port);
pollfd[nfds].events = events & (POLLIN | POLLPRI);
pollfd[nfds].revents = 0;
nfds++;
}
if (events & POLLOUT)
{
pollfd[nfds].fd = port_write_wait_fd (port);
pollfd[nfds].events = events & (POLLOUT | POLLPRI);
pollfd[nfds].revents = 0;
nfds++;
}
if (nfds == 2 && pollfd[0].fd == pollfd[1].fd)
{
pollfd[0].events |= pollfd[1].events;
nfds--;
}
SCM_SYSCALL (rv = poll (pollfd, nfds, timeout));
if (rv < 0)
SCM_SYSERROR;
return rv;
}
#undef FUNC_NAME
SCM_INTERNAL SCM scm_port_poll (SCM, SCM, SCM);
SCM_DEFINE (scm_port_poll, "port-poll", 2, 1, 0,
(SCM port, SCM events, SCM timeout),
"")
#define FUNC_NAME s_scm_port_poll
{
short c_events = 0;
int c_timeout;
port = SCM_COERCE_OUTPORT (port);
SCM_VALIDATE_PORT (1, port);
SCM_VALIDATE_STRING (2, events);
c_timeout = SCM_UNBNDP (timeout) ? -1 : SCM_NUM2INT (3, timeout);
if (scm_i_string_contains_char (events, 'r'))
c_events |= POLLIN;
if (scm_i_string_contains_char (events, '!'))
c_events |= POLLPRI;
if (scm_i_string_contains_char (events, 'w'))
c_events |= POLLIN;
return scm_from_int (port_poll (port, c_events, c_timeout));
}
#undef FUNC_NAME
/* Input. */
@ -1330,8 +1469,15 @@ scm_i_read_bytes (SCM port, SCM dst, size_t start, size_t count)
assert (count <= SCM_BYTEVECTOR_LENGTH (dst));
assert (start + count <= SCM_BYTEVECTOR_LENGTH (dst));
retry:
filled = ptob->c_read (port, dst, start, count);
if (filled == (size_t) -1)
{
port_poll (port, POLLIN, -1);
goto retry;
}
assert (filled <= count);
return filled;
@ -2508,7 +2654,14 @@ scm_i_write_bytes (SCM port, SCM src, size_t start, size_t count)
assert (start + count <= SCM_BYTEVECTOR_LENGTH (src));
do
written += ptob->c_write (port, src, start + written, count - written);
{
size_t ret = ptob->c_write (port, src, start + written, count - written);
if (ret == (size_t) -1)
port_poll (port, POLLOUT, -1);
else
written += ret;
}
while (written < count);
assert (written == count);

View file

@ -90,6 +90,10 @@ SCM_API scm_t_port_type *scm_make_port_type
size_t (*write) (SCM port, SCM src, size_t start, size_t count));
SCM_API void scm_set_port_scm_read (scm_t_port_type *ptob, SCM read);
SCM_API void scm_set_port_scm_write (scm_t_port_type *ptob, SCM write);
SCM_API void scm_set_port_read_wait_fd (scm_t_port_type *ptob,
int (*wait_fd) (SCM port));
SCM_API void scm_set_port_write_wait_fd (scm_t_port_type *ptob,
int (*wait_fd) (SCM port));
SCM_API void scm_set_port_print (scm_t_port_type *ptob,
int (*print) (SCM exp,
SCM port,

View file

@ -179,7 +179,10 @@ interpret its input and output."
specialize-port-encoding!
port-random-access?
port-decode-char
port-read-buffering))
port-read-buffering
port-poll
port-read-wait-fd
port-write-wait-fd))
(define-syntax-rule (port-buffer-bytevector buf) (vector-ref buf 0))
(define-syntax-rule (port-buffer-cur buf) (vector-ref buf 1))
@ -209,7 +212,10 @@ interpret its input and output."
specialize-port-encoding!
port-decode-char
port-random-access?
port-read-buffering)
port-read-buffering
port-poll
port-read-wait-fd
port-write-wait-fd)
;; And we're back.
(define-module (ice-9 ports))

View file

@ -54,7 +54,9 @@
#:replace (peek-char
read-char)
#:export (lookahead-u8
get-u8))
get-u8
current-read-waiter
current-write-waiter))
(define (write-bytes port src start count)
(let ((written ((port-write port) port src start count)))
@ -77,11 +79,25 @@
(set-port-buffer-end! buf 0)
(write-bytes port (port-buffer-bytevector buf) cur (- end cur)))))
(define (default-read-waiter port) (port-poll port "r"))
(define (default-write-waiter port) (port-poll port "w"))
(define current-read-waiter (make-parameter default-read-waiter))
(define current-write-waiter (make-parameter default-write-waiter))
(define (wait-for-readable port) ((current-read-waiter) port))
(define (wait-for-writable port) ((current-write-waiter) port))
(define (read-bytes port dst start count)
(let ((read ((port-read port) port dst start count)))
(cond
(((port-read port) port dst start count)
=> (lambda (read)
(unless (<= 0 read count)
(error "bad return from port read function" read))
read))
(else
(wait-for-readable port)
(read-bytes port dst start count))))
(define utf8-bom #vu8(#xEF #xBB #xBF))
(define utf16be-bom #vu8(#xFE #xFF))

View file

@ -23,6 +23,7 @@
#:use-module (test-suite guile-test)
#:use-module (ice-9 popen)
#:use-module (ice-9 rdelim)
#:use-module (ice-9 threads)
#:use-module (rnrs bytevectors)
#:use-module ((ice-9 binary-ports) #:select (open-bytevector-input-port
open-bytevector-output-port
@ -601,20 +602,18 @@
(pass-if "unread residue"
(string=? (read-line) "moon"))))
;;; non-blocking mode on a port. create a pipe and set O_NONBLOCK on
;;; the reading end. try to read a byte: should get EAGAIN or
;;; EWOULDBLOCK error.
(let* ((p (pipe))
(r (car p)))
(when (provided? 'threads)
(let* ((p (pipe))
(r (car p))
(w (cdr p)))
(fcntl r F_SETFL (logior (fcntl r F_GETFL) O_NONBLOCK))
(pass-if "non-blocking-I/O"
(catch 'system-error
(lambda () (read-char r) #f)
(lambda (key . args)
(and (eq? key 'system-error)
(let ((errno (car (list-ref args 3))))
(or (= errno EAGAIN)
(= errno EWOULDBLOCK))))))))
(let ((thread (call-with-new-thread
(lambda ()
(usleep (* 250 1000))
(write-char #\a w)
(force-output w)))))
(pass-if-equal "non-blocking-I/O" #\a (read-char r))
(join-thread thread))))
;;;; Pipe (popen) ports.