1
Fork 0
mirror of https://git.savannah.gnu.org/git/guile.git synced 2025-04-30 03:40:34 +02:00

Only ptob->close() after read/write finish

* libguile/Makefile.am (noinst_HEADERS): Add atomics-internal.h.
* libguile/atomics-internal.h: New file.
* libguile/ports-internal.h (refcount): New member.
* libguile/ports.c (release_port, scm_dynwind_acquire_port): New
  facility for acquiring a port within a dynwind.
  (scm_port_poll, scm_i_read_bytes, scm_setvbuf, scm_end_input)
  (scm_i_write_bytes, scm_char_ready_p, scm_seek)
  (scm_truncate_file, trampoline_to_c_read)
  (trampoline_to_c_write): Acquire port.
  (scm_c_make_port_with_encoding): Init refcount to 1.
  (scm_close_port): Release port.
* doc/ref/api-io.texi (I/O Extensions): Add documentation
This commit is contained in:
Andy Wingo 2016-08-31 19:00:27 +02:00
parent cc9e72bd2b
commit b8a53b98b3
5 changed files with 216 additions and 28 deletions

View file

@ -1694,6 +1694,13 @@ operating system inform Guile about the appropriate buffer sizes for the
particular file opened by the port. particular file opened by the port.
@end table @end table
Note that calls to all of these methods can proceed in parallel and
concurrently and from any thread up until the point that the port is
closed. The call to @code{close} will happen when no other method is
running, and no method will be called after the @code{close} method is
called. If your port implementation needs mutual exclusion to prevent
concurrency, it is responsible for locking appropriately.
@node Non-Blocking I/O @node Non-Blocking I/O
@subsection Non-Blocking I/O @subsection Non-Blocking I/O

View file

@ -507,6 +507,7 @@ noinst_HEADERS = conv-integer.i.c conv-uinteger.i.c \
elf.h \ elf.h \
srfi-14.i.c \ srfi-14.i.c \
quicksort.i.c \ quicksort.i.c \
atomics-internal.h \
posix-w32.h \ posix-w32.h \
private-options.h ports-internal.h private-options.h ports-internal.h

View file

@ -0,0 +1,85 @@
#ifndef SCM_ATOMICS_INTERNAL_H
#define SCM_ATOMICS_INTERNAL_H
/* Copyright (C) 2016
* Free Software Foundation, Inc.
*
* This library is free software; you can redistribute it and/or
* modify it under the terms of the GNU Lesser General Public License
* as published by the Free Software Foundation; either version 3 of
* the License, or (at your option) any later version.
*
* This library is distributed in the hope that it will be useful, but
* WITHOUT ANY WARRANTY; without even the implied warranty of
* MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
* Lesser General Public License for more details.
*
* You should have received a copy of the GNU Lesser General Public
* License along with this library; if not, write to the Free Software
* Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA
* 02110-1301 USA
*/
#include <stdint.h>
#define HAVE_C11_ATOMICS (__STDC_VERSION__ >= 201112L && !defined(__STDC_NO_ATOMICS__))
#if HAVE_C11_ATOMICS
#include <stdatomic.h>
static inline uint32_t
scm_atomic_subtract_uint32 (uint32_t *obj, uint32_t arg)
{
return atomic_fetch_sub (obj, arg);
}
static inline _Bool
scm_atomic_compare_and_swap_uint32 (uint32_t *obj, uint32_t *expected,
uint32_t desired)
{
return atomic_compare_exchange_weak (obj, expected, desired);
}
#else /* HAVE_C11_ATOMICS */
/* Fallback implementation using locks. */
#include "libguile/threads.h"
static scm_i_pthread_mutex_t atomics_lock = SCM_I_PTHREAD_MUTEX_INITIALIZER;
static inline uint32_t
scm_atomic_subtract_uint32 (uint32_t *obj, uint32_t arg)
{
uint32_t ret;
scm_i_pthread_mutex_lock (&atomics_lock);
ret = *obj;
*obj -= arg;
scm_i_pthread_mutex_unlock (&atomics_lock);
return ret;
}
static inline int
scm_atomic_compare_and_swap_uint32 (uint32_t *obj, uint32_t *expected,
uint32_t desired)
{
int ret;
scm_i_pthread_mutex_lock (&atomics_lock);
if (*obj == *expected)
{
*obj = desired;
ret = 1;
}
else
{
*expected = *obj;
ret = 0;
}
scm_i_pthread_mutex_unlock (&atomics_lock);
return ret;
}
#endif /* HAVE_C11_ATOMICS */
#endif /* SCM_ATOMICS_INTERNAL_H */

View file

@ -323,12 +323,19 @@ struct scm_t_port
`unwrite-byte'. */ `unwrite-byte'. */
size_t read_buffering; size_t read_buffering;
/* Reads and writes can proceed concurrently, but we don't want to
start any read or write after close() has been called. So we have
a refcount which is positive if close has not yet been called.
Reading, writing, and the like temporarily increments this
refcount, provided it was nonzero to start with. */
scm_t_uint32 refcount;
/* True if the port is random access. Implies that the buffers must /* True if the port is random access. Implies that the buffers must
be flushed before switching between reading and writing, seeking, be flushed before switching between reading and writing, seeking,
and so on. */ and so on. */
unsigned rw_random : 1; scm_t_uint32 rw_random : 1;
unsigned at_stream_start_for_bom_read : 1; scm_t_uint32 at_stream_start_for_bom_read : 1;
unsigned at_stream_start_for_bom_write : 1; scm_t_uint32 at_stream_start_for_bom_write : 1;
/* Character encoding support. */ /* Character encoding support. */
SCM encoding; /* A symbol of upper-case ASCII. */ SCM encoding; /* A symbol of upper-case ASCII. */

View file

@ -27,6 +27,7 @@
# include <config.h> # include <config.h>
#endif #endif
#include <assert.h>
#include <stdio.h> #include <stdio.h>
#include <errno.h> #include <errno.h>
#include <fcntl.h> /* for chsize on mingw */ #include <fcntl.h> /* for chsize on mingw */
@ -37,10 +38,9 @@
#include <unistr.h> #include <unistr.h>
#include <striconveh.h> #include <striconveh.h>
#include <assert.h>
#include "libguile/_scm.h" #include "libguile/_scm.h"
#include "libguile/async.h" #include "libguile/async.h"
#include "libguile/atomics-internal.h"
#include "libguile/deprecation.h" #include "libguile/deprecation.h"
#include "libguile/eval.h" #include "libguile/eval.h"
#include "libguile/fports.h" /* direct access for seek and truncate */ #include "libguile/fports.h" /* direct access for seek and truncate */
@ -131,6 +131,63 @@ static const scm_t_wchar UNICODE_REPLACEMENT_CHARACTER = 0xFFFD;
static void
release_port (SCM port)
{
scm_t_port *pt = SCM_PORT (port);
scm_t_uint32 prev;
prev = scm_atomic_subtract_uint32 (&pt->refcount, 1);
if (prev == 0)
/* Logic failure. */
abort ();
if (prev > 1)
/* Port still alive. */
return;
/* FIXME: `catch' around the close call? It could throw an exception,
and in that case we'd leak the iconv descriptors, if any. */
if (SCM_PORT_TYPE (port)->close)
SCM_PORT_TYPE (port)->close (port);
scm_i_pthread_mutex_lock (&iconv_lock);
pt = SCM_PORT (port);
if (scm_is_true (pt->precise_encoding))
{
if (pt->input_cd != (iconv_t) -1)
iconv_close (pt->input_cd);
if (pt->output_cd != (iconv_t) -1)
iconv_close (pt->output_cd);
pt->precise_encoding = SCM_BOOL_F;
pt->input_cd = pt->output_cd = (iconv_t) -1;
}
scm_i_pthread_mutex_unlock (&iconv_lock);
}
static void
scm_dynwind_acquire_port (SCM port)
{
scm_t_port *pt = SCM_PORT (port);
/* We're acquiring a lease on the port so that we only close it when
no one is using it. The normal case is that it's open with a
refcount of 1 and we're going to push it to 2. Otherwise perhaps
there is someone else using it; that's fine, we just add our
refcount. However if the current refcount is 0 then the port has
been closed or is closing and we must throw an error. */
scm_t_uint32 cur = 1, next = 2;
while (!scm_atomic_compare_and_swap_uint32 (&pt->refcount, &cur, next))
{
if (cur == 0)
scm_wrong_type_arg_msg (NULL, 0, port, "open port");
next = cur + 1;
}
scm_dynwind_unwind_handler_with_scm (release_port, port,
SCM_F_WIND_EXPLICITLY);
}
static SCM trampoline_to_c_read_subr; static SCM trampoline_to_c_read_subr;
static SCM trampoline_to_c_write_subr; static SCM trampoline_to_c_write_subr;
@ -191,7 +248,10 @@ trampoline_to_c_read (SCM port, SCM dst, SCM start, SCM count)
SCM_ASSERT_RANGE (3, start, c_start <= SCM_BYTEVECTOR_LENGTH (dst)); SCM_ASSERT_RANGE (3, start, c_start <= SCM_BYTEVECTOR_LENGTH (dst));
SCM_ASSERT_RANGE (4, count, c_count <= SCM_BYTEVECTOR_LENGTH (dst) - c_start); SCM_ASSERT_RANGE (4, count, c_count <= SCM_BYTEVECTOR_LENGTH (dst) - c_start);
scm_dynwind_begin (0);
scm_dynwind_acquire_port (port);
ret = SCM_PORT_TYPE (port)->c_read (port, dst, c_start, c_count); ret = SCM_PORT_TYPE (port)->c_read (port, dst, c_start, c_count);
scm_dynwind_end ();
return ret == (size_t) -1 ? SCM_BOOL_F : scm_from_size_t (ret); return ret == (size_t) -1 ? SCM_BOOL_F : scm_from_size_t (ret);
} }
@ -218,7 +278,10 @@ trampoline_to_c_write (SCM port, SCM src, SCM start, SCM count)
SCM_ASSERT_RANGE (3, start, c_start <= SCM_BYTEVECTOR_LENGTH (src)); SCM_ASSERT_RANGE (3, start, c_start <= SCM_BYTEVECTOR_LENGTH (src));
SCM_ASSERT_RANGE (4, count, c_count <= SCM_BYTEVECTOR_LENGTH (src) - c_start); SCM_ASSERT_RANGE (4, count, c_count <= SCM_BYTEVECTOR_LENGTH (src) - c_start);
scm_dynwind_begin (0);
scm_dynwind_acquire_port (port);
ret = SCM_PORT_TYPE (port)->c_write (port, src, c_start, c_count); ret = SCM_PORT_TYPE (port)->c_write (port, src, c_start, c_count);
scm_dynwind_end ();
return ret == (size_t) -1 ? SCM_BOOL_F : scm_from_size_t (ret); return ret == (size_t) -1 ? SCM_BOOL_F : scm_from_size_t (ret);
} }
@ -691,6 +754,8 @@ scm_c_make_port_with_encoding (scm_t_port_type *ptob, unsigned long mode_bits,
pt->file_name = SCM_BOOL_F; pt->file_name = SCM_BOOL_F;
pt->position = scm_cons (SCM_INUM0, SCM_INUM0); pt->position = scm_cons (SCM_INUM0, SCM_INUM0);
pt->refcount = 1;
pt->at_stream_start_for_bom_read = 1; pt->at_stream_start_for_bom_read = 1;
pt->at_stream_start_for_bom_write = 1; pt->at_stream_start_for_bom_write = 1;
@ -797,11 +862,9 @@ SCM_DEFINE (scm_close_port, "close-port", 1, 0, 0,
"descriptors.") "descriptors.")
#define FUNC_NAME s_scm_close_port #define FUNC_NAME s_scm_close_port
{ {
scm_t_port *pt;
port = SCM_COERCE_OUTPORT (port); port = SCM_COERCE_OUTPORT (port);
SCM_VALIDATE_PORT (1, port); SCM_VALIDATE_PORT (1, port);
if (SCM_CLOSEDP (port)) if (SCM_CLOSEDP (port))
return SCM_BOOL_F; return SCM_BOOL_F;
@ -809,28 +872,12 @@ SCM_DEFINE (scm_close_port, "close-port", 1, 0, 0,
if (SCM_OUTPUT_PORT_P (port)) if (SCM_OUTPUT_PORT_P (port))
scm_flush (port); scm_flush (port);
pt = SCM_PORT (port);
SCM_CLR_PORT_OPEN_FLAG (port); SCM_CLR_PORT_OPEN_FLAG (port);
if (SCM_PORT_TYPE (port)->flags & SCM_PORT_TYPE_NEEDS_CLOSE_ON_GC) if (SCM_PORT_TYPE (port)->flags & SCM_PORT_TYPE_NEEDS_CLOSE_ON_GC)
scm_weak_set_remove_x (scm_i_port_weak_set, port); scm_weak_set_remove_x (scm_i_port_weak_set, port);
if (SCM_PORT_TYPE (port)->close) release_port (port);
/* Note! This may throw an exception. Anything after this point
should be resilient to non-local exits. */
SCM_PORT_TYPE (port)->close (port);
scm_i_pthread_mutex_lock (&iconv_lock);
if (scm_is_true (pt->precise_encoding))
{
if (pt->input_cd != (iconv_t) -1)
iconv_close (pt->input_cd);
if (pt->output_cd != (iconv_t) -1)
iconv_close (pt->output_cd);
pt->precise_encoding = SCM_BOOL_F;
pt->input_cd = pt->output_cd = (iconv_t) -1;
}
scm_i_pthread_mutex_unlock (&iconv_lock);
return SCM_BOOL_T; return SCM_BOOL_T;
} }
@ -1314,6 +1361,7 @@ SCM_DEFINE (scm_port_write_wait_fd, "port-write-wait-fd", 1, 0, 0,
} }
#undef FUNC_NAME #undef FUNC_NAME
/* Call while having acquired the port. */
static int static int
port_poll (SCM port, short events, int timeout) port_poll (SCM port, short events, int timeout)
#define FUNC_NAME "port-poll" #define FUNC_NAME "port-poll"
@ -1358,6 +1406,7 @@ SCM_DEFINE (scm_port_poll, "port-poll", 2, 1, 0,
{ {
short c_events = 0; short c_events = 0;
int c_timeout; int c_timeout;
SCM ret;
port = SCM_COERCE_OUTPORT (port); port = SCM_COERCE_OUTPORT (port);
SCM_VALIDATE_PORT (1, port); SCM_VALIDATE_PORT (1, port);
@ -1371,7 +1420,12 @@ SCM_DEFINE (scm_port_poll, "port-poll", 2, 1, 0,
if (scm_i_string_contains_char (events, 'w')) if (scm_i_string_contains_char (events, 'w'))
c_events |= POLLIN; c_events |= POLLIN;
return scm_from_int (port_poll (port, c_events, c_timeout)); scm_dynwind_begin (0);
scm_dynwind_acquire_port (port);
ret = scm_from_int (port_poll (port, c_events, c_timeout));
scm_dynwind_end ();
return ret;
} }
#undef FUNC_NAME #undef FUNC_NAME
@ -1476,6 +1530,9 @@ scm_i_read_bytes (SCM port, SCM dst, size_t start, size_t count)
assert (count <= SCM_BYTEVECTOR_LENGTH (dst)); assert (count <= SCM_BYTEVECTOR_LENGTH (dst));
assert (start + count <= SCM_BYTEVECTOR_LENGTH (dst)); assert (start + count <= SCM_BYTEVECTOR_LENGTH (dst));
scm_dynwind_begin (0);
scm_dynwind_acquire_port (port);
retry: retry:
filled = ptob->c_read (port, dst, start, count); filled = ptob->c_read (port, dst, start, count);
@ -1485,6 +1542,8 @@ scm_i_read_bytes (SCM port, SCM dst, size_t start, size_t count)
goto retry; goto retry;
} }
scm_dynwind_end ();
assert (filled <= count); assert (filled <= count);
return filled; return filled;
@ -2220,8 +2279,11 @@ SCM_DEFINE (scm_setvbuf, "setvbuf", 2, 1, 0,
else else
{ {
read_buf_size = write_buf_size = default_buffer_size; read_buf_size = write_buf_size = default_buffer_size;
scm_dynwind_begin (0);
scm_dynwind_acquire_port (port);
if (ptob->get_natural_buffer_sizes) if (ptob->get_natural_buffer_sizes)
ptob->get_natural_buffer_sizes (port, &read_buf_size, &write_buf_size); ptob->get_natural_buffer_sizes (port, &read_buf_size, &write_buf_size);
scm_dynwind_end ();
} }
/* Minimum buffer size is one byte. */ /* Minimum buffer size is one byte. */
@ -2310,7 +2372,12 @@ scm_end_input (SCM port)
offset = - (scm_t_off) discarded; offset = - (scm_t_off) discarded;
if (offset != 0) if (offset != 0)
{
scm_dynwind_begin (0);
scm_dynwind_acquire_port (port);
SCM_PORT_TYPE (port)->seek (port, offset, SEEK_CUR); SCM_PORT_TYPE (port)->seek (port, offset, SEEK_CUR);
scm_dynwind_end ();
}
} }
SCM_DEFINE (scm_force_output, "force-output", 0, 1, 0, SCM_DEFINE (scm_force_output, "force-output", 0, 1, 0,
@ -2722,6 +2789,9 @@ scm_i_write_bytes (SCM port, SCM src, size_t start, size_t count)
assert (count <= SCM_BYTEVECTOR_LENGTH (src)); assert (count <= SCM_BYTEVECTOR_LENGTH (src));
assert (start + count <= SCM_BYTEVECTOR_LENGTH (src)); assert (start + count <= SCM_BYTEVECTOR_LENGTH (src));
scm_dynwind_begin (0);
scm_dynwind_acquire_port (port);
do do
{ {
size_t ret = ptob->c_write (port, src, start + written, count - written); size_t ret = ptob->c_write (port, src, start + written, count - written);
@ -2733,6 +2803,8 @@ scm_i_write_bytes (SCM port, SCM src, size_t start, size_t count)
} }
while (written < count); while (written < count);
scm_dynwind_end ();
assert (written == count); assert (written == count);
} }
@ -3495,7 +3567,14 @@ SCM_DEFINE (scm_char_ready_p, "char-ready?", 0, 1, 0,
scm_t_port_type *ptob = SCM_PORT_TYPE (port); scm_t_port_type *ptob = SCM_PORT_TYPE (port);
if (ptob->input_waiting) if (ptob->input_waiting)
return scm_from_bool (ptob->input_waiting (port)); {
SCM ret;
scm_dynwind_begin (0);
scm_dynwind_acquire_port (port);
ret = scm_from_bool (ptob->input_waiting (port));
scm_dynwind_end ();
return ret;
}
else else
return SCM_BOOL_T; return SCM_BOOL_T;
} }
@ -3549,7 +3628,10 @@ SCM_DEFINE (scm_seek, "seek", 3, 0, 0,
/* If we are just querying the current position, avoid /* If we are just querying the current position, avoid
flushing buffers. We don't even need to require that the flushing buffers. We don't even need to require that the
port supports random access. */ port supports random access. */
scm_dynwind_begin (0);
scm_dynwind_acquire_port (fd_port);
rv = ptob->seek (fd_port, off, how); rv = ptob->seek (fd_port, off, how);
scm_dynwind_end ();
rv -= scm_port_buffer_can_take (pt->read_buf); rv -= scm_port_buffer_can_take (pt->read_buf);
rv += scm_port_buffer_can_take (pt->write_buf); rv += scm_port_buffer_can_take (pt->write_buf);
return scm_from_off_t_or_off64_t (rv); return scm_from_off_t_or_off64_t (rv);
@ -3562,7 +3644,10 @@ SCM_DEFINE (scm_seek, "seek", 3, 0, 0,
scm_end_input (fd_port); scm_end_input (fd_port);
scm_flush (fd_port); scm_flush (fd_port);
scm_dynwind_begin (0);
scm_dynwind_acquire_port (fd_port);
rv = ptob->seek (fd_port, off, how); rv = ptob->seek (fd_port, off, how);
scm_dynwind_end ();
/* Set stream-start flags according to new position. */ /* Set stream-start flags according to new position. */
pt->at_stream_start_for_bom_read = (rv == 0); pt->at_stream_start_for_bom_read = (rv == 0);
@ -3668,7 +3753,10 @@ SCM_DEFINE (scm_truncate_file, "truncate-file", 1, 1, 0,
scm_end_input (object); scm_end_input (object);
scm_flush (object); scm_flush (object);
scm_dynwind_begin (0);
scm_dynwind_acquire_port (object);
ptob->truncate (object, c_length); ptob->truncate (object, c_length);
scm_dynwind_end ();
rv = 0; rv = 0;
} }
else else