diff --git a/doc/ref/api-io.texi b/doc/ref/api-io.texi index e4e4f36ab..9facb38e0 100644 --- a/doc/ref/api-io.texi +++ b/doc/ref/api-io.texi @@ -1694,6 +1694,13 @@ operating system inform Guile about the appropriate buffer sizes for the particular file opened by the port. @end table +Note that calls to all of these methods can proceed in parallel and +concurrently and from any thread up until the point that the port is +closed. The call to @code{close} will happen when no other method is +running, and no method will be called after the @code{close} method is +called. If your port implementation needs mutual exclusion to prevent +concurrency, it is responsible for locking appropriately. + @node Non-Blocking I/O @subsection Non-Blocking I/O diff --git a/libguile/Makefile.am b/libguile/Makefile.am index 8161ade4e..ba6be2019 100644 --- a/libguile/Makefile.am +++ b/libguile/Makefile.am @@ -507,6 +507,7 @@ noinst_HEADERS = conv-integer.i.c conv-uinteger.i.c \ elf.h \ srfi-14.i.c \ quicksort.i.c \ + atomics-internal.h \ posix-w32.h \ private-options.h ports-internal.h diff --git a/libguile/atomics-internal.h b/libguile/atomics-internal.h new file mode 100644 index 000000000..1859daa92 --- /dev/null +++ b/libguile/atomics-internal.h @@ -0,0 +1,85 @@ +#ifndef SCM_ATOMICS_INTERNAL_H +#define SCM_ATOMICS_INTERNAL_H + +/* Copyright (C) 2016 + * Free Software Foundation, Inc. + * + * This library is free software; you can redistribute it and/or + * modify it under the terms of the GNU Lesser General Public License + * as published by the Free Software Foundation; either version 3 of + * the License, or (at your option) any later version. + * + * This library is distributed in the hope that it will be useful, but + * WITHOUT ANY WARRANTY; without even the implied warranty of + * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU + * Lesser General Public License for more details. + * + * You should have received a copy of the GNU Lesser General Public + * License along with this library; if not, write to the Free Software + * Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA + * 02110-1301 USA + */ + + + + +#include + + + + +#define HAVE_C11_ATOMICS (__STDC_VERSION__ >= 201112L && !defined(__STDC_NO_ATOMICS__)) + +#if HAVE_C11_ATOMICS + +#include +static inline uint32_t +scm_atomic_subtract_uint32 (uint32_t *obj, uint32_t arg) +{ + return atomic_fetch_sub (obj, arg); +} +static inline _Bool +scm_atomic_compare_and_swap_uint32 (uint32_t *obj, uint32_t *expected, + uint32_t desired) +{ + return atomic_compare_exchange_weak (obj, expected, desired); +} + +#else /* HAVE_C11_ATOMICS */ + +/* Fallback implementation using locks. */ +#include "libguile/threads.h" +static scm_i_pthread_mutex_t atomics_lock = SCM_I_PTHREAD_MUTEX_INITIALIZER; +static inline uint32_t +scm_atomic_subtract_uint32 (uint32_t *obj, uint32_t arg) +{ + uint32_t ret; + scm_i_pthread_mutex_lock (&atomics_lock); + ret = *obj; + *obj -= arg; + scm_i_pthread_mutex_unlock (&atomics_lock); + return ret; +} +static inline int +scm_atomic_compare_and_swap_uint32 (uint32_t *obj, uint32_t *expected, + uint32_t desired) +{ + int ret; + scm_i_pthread_mutex_lock (&atomics_lock); + if (*obj == *expected) + { + *obj = desired; + ret = 1; + } + else + { + *expected = *obj; + ret = 0; + } + scm_i_pthread_mutex_unlock (&atomics_lock); + return ret; +} + +#endif /* HAVE_C11_ATOMICS */ + +#endif /* SCM_ATOMICS_INTERNAL_H */ diff --git a/libguile/ports-internal.h b/libguile/ports-internal.h index d01441562..4203a5c51 100644 --- a/libguile/ports-internal.h +++ b/libguile/ports-internal.h @@ -323,12 +323,19 @@ struct scm_t_port `unwrite-byte'. */ size_t read_buffering; + /* Reads and writes can proceed concurrently, but we don't want to + start any read or write after close() has been called. So we have + a refcount which is positive if close has not yet been called. + Reading, writing, and the like temporarily increments this + refcount, provided it was nonzero to start with. */ + scm_t_uint32 refcount; + /* True if the port is random access. Implies that the buffers must be flushed before switching between reading and writing, seeking, and so on. */ - unsigned rw_random : 1; - unsigned at_stream_start_for_bom_read : 1; - unsigned at_stream_start_for_bom_write : 1; + scm_t_uint32 rw_random : 1; + scm_t_uint32 at_stream_start_for_bom_read : 1; + scm_t_uint32 at_stream_start_for_bom_write : 1; /* Character encoding support. */ SCM encoding; /* A symbol of upper-case ASCII. */ diff --git a/libguile/ports.c b/libguile/ports.c index 9e5211f62..278bbe9e7 100644 --- a/libguile/ports.c +++ b/libguile/ports.c @@ -27,6 +27,7 @@ # include #endif +#include #include #include #include /* for chsize on mingw */ @@ -37,10 +38,9 @@ #include #include -#include - #include "libguile/_scm.h" #include "libguile/async.h" +#include "libguile/atomics-internal.h" #include "libguile/deprecation.h" #include "libguile/eval.h" #include "libguile/fports.h" /* direct access for seek and truncate */ @@ -131,6 +131,63 @@ static const scm_t_wchar UNICODE_REPLACEMENT_CHARACTER = 0xFFFD; +static void +release_port (SCM port) +{ + scm_t_port *pt = SCM_PORT (port); + scm_t_uint32 prev; + + prev = scm_atomic_subtract_uint32 (&pt->refcount, 1); + if (prev == 0) + /* Logic failure. */ + abort (); + + if (prev > 1) + /* Port still alive. */ + return; + + /* FIXME: `catch' around the close call? It could throw an exception, + and in that case we'd leak the iconv descriptors, if any. */ + if (SCM_PORT_TYPE (port)->close) + SCM_PORT_TYPE (port)->close (port); + + scm_i_pthread_mutex_lock (&iconv_lock); + pt = SCM_PORT (port); + if (scm_is_true (pt->precise_encoding)) + { + if (pt->input_cd != (iconv_t) -1) + iconv_close (pt->input_cd); + if (pt->output_cd != (iconv_t) -1) + iconv_close (pt->output_cd); + pt->precise_encoding = SCM_BOOL_F; + pt->input_cd = pt->output_cd = (iconv_t) -1; + } + scm_i_pthread_mutex_unlock (&iconv_lock); +} + +static void +scm_dynwind_acquire_port (SCM port) +{ + scm_t_port *pt = SCM_PORT (port); + /* We're acquiring a lease on the port so that we only close it when + no one is using it. The normal case is that it's open with a + refcount of 1 and we're going to push it to 2. Otherwise perhaps + there is someone else using it; that's fine, we just add our + refcount. However if the current refcount is 0 then the port has + been closed or is closing and we must throw an error. */ + scm_t_uint32 cur = 1, next = 2; + while (!scm_atomic_compare_and_swap_uint32 (&pt->refcount, &cur, next)) + { + if (cur == 0) + scm_wrong_type_arg_msg (NULL, 0, port, "open port"); + next = cur + 1; + } + scm_dynwind_unwind_handler_with_scm (release_port, port, + SCM_F_WIND_EXPLICITLY); +} + + + static SCM trampoline_to_c_read_subr; static SCM trampoline_to_c_write_subr; @@ -191,7 +248,10 @@ trampoline_to_c_read (SCM port, SCM dst, SCM start, SCM count) SCM_ASSERT_RANGE (3, start, c_start <= SCM_BYTEVECTOR_LENGTH (dst)); SCM_ASSERT_RANGE (4, count, c_count <= SCM_BYTEVECTOR_LENGTH (dst) - c_start); + scm_dynwind_begin (0); + scm_dynwind_acquire_port (port); ret = SCM_PORT_TYPE (port)->c_read (port, dst, c_start, c_count); + scm_dynwind_end (); return ret == (size_t) -1 ? SCM_BOOL_F : scm_from_size_t (ret); } @@ -218,7 +278,10 @@ trampoline_to_c_write (SCM port, SCM src, SCM start, SCM count) SCM_ASSERT_RANGE (3, start, c_start <= SCM_BYTEVECTOR_LENGTH (src)); SCM_ASSERT_RANGE (4, count, c_count <= SCM_BYTEVECTOR_LENGTH (src) - c_start); + scm_dynwind_begin (0); + scm_dynwind_acquire_port (port); ret = SCM_PORT_TYPE (port)->c_write (port, src, c_start, c_count); + scm_dynwind_end (); return ret == (size_t) -1 ? SCM_BOOL_F : scm_from_size_t (ret); } @@ -691,6 +754,8 @@ scm_c_make_port_with_encoding (scm_t_port_type *ptob, unsigned long mode_bits, pt->file_name = SCM_BOOL_F; pt->position = scm_cons (SCM_INUM0, SCM_INUM0); + pt->refcount = 1; + pt->at_stream_start_for_bom_read = 1; pt->at_stream_start_for_bom_write = 1; @@ -797,11 +862,9 @@ SCM_DEFINE (scm_close_port, "close-port", 1, 0, 0, "descriptors.") #define FUNC_NAME s_scm_close_port { - scm_t_port *pt; - port = SCM_COERCE_OUTPORT (port); - SCM_VALIDATE_PORT (1, port); + if (SCM_CLOSEDP (port)) return SCM_BOOL_F; @@ -809,28 +872,12 @@ SCM_DEFINE (scm_close_port, "close-port", 1, 0, 0, if (SCM_OUTPUT_PORT_P (port)) scm_flush (port); - pt = SCM_PORT (port); SCM_CLR_PORT_OPEN_FLAG (port); if (SCM_PORT_TYPE (port)->flags & SCM_PORT_TYPE_NEEDS_CLOSE_ON_GC) scm_weak_set_remove_x (scm_i_port_weak_set, port); - if (SCM_PORT_TYPE (port)->close) - /* Note! This may throw an exception. Anything after this point - should be resilient to non-local exits. */ - SCM_PORT_TYPE (port)->close (port); - - scm_i_pthread_mutex_lock (&iconv_lock); - if (scm_is_true (pt->precise_encoding)) - { - if (pt->input_cd != (iconv_t) -1) - iconv_close (pt->input_cd); - if (pt->output_cd != (iconv_t) -1) - iconv_close (pt->output_cd); - pt->precise_encoding = SCM_BOOL_F; - pt->input_cd = pt->output_cd = (iconv_t) -1; - } - scm_i_pthread_mutex_unlock (&iconv_lock); + release_port (port); return SCM_BOOL_T; } @@ -1314,6 +1361,7 @@ SCM_DEFINE (scm_port_write_wait_fd, "port-write-wait-fd", 1, 0, 0, } #undef FUNC_NAME +/* Call while having acquired the port. */ static int port_poll (SCM port, short events, int timeout) #define FUNC_NAME "port-poll" @@ -1358,6 +1406,7 @@ SCM_DEFINE (scm_port_poll, "port-poll", 2, 1, 0, { short c_events = 0; int c_timeout; + SCM ret; port = SCM_COERCE_OUTPORT (port); SCM_VALIDATE_PORT (1, port); @@ -1371,7 +1420,12 @@ SCM_DEFINE (scm_port_poll, "port-poll", 2, 1, 0, if (scm_i_string_contains_char (events, 'w')) c_events |= POLLIN; - return scm_from_int (port_poll (port, c_events, c_timeout)); + scm_dynwind_begin (0); + scm_dynwind_acquire_port (port); + ret = scm_from_int (port_poll (port, c_events, c_timeout)); + scm_dynwind_end (); + + return ret; } #undef FUNC_NAME @@ -1476,6 +1530,9 @@ scm_i_read_bytes (SCM port, SCM dst, size_t start, size_t count) assert (count <= SCM_BYTEVECTOR_LENGTH (dst)); assert (start + count <= SCM_BYTEVECTOR_LENGTH (dst)); + scm_dynwind_begin (0); + scm_dynwind_acquire_port (port); + retry: filled = ptob->c_read (port, dst, start, count); @@ -1485,6 +1542,8 @@ scm_i_read_bytes (SCM port, SCM dst, size_t start, size_t count) goto retry; } + scm_dynwind_end (); + assert (filled <= count); return filled; @@ -2220,8 +2279,11 @@ SCM_DEFINE (scm_setvbuf, "setvbuf", 2, 1, 0, else { read_buf_size = write_buf_size = default_buffer_size; + scm_dynwind_begin (0); + scm_dynwind_acquire_port (port); if (ptob->get_natural_buffer_sizes) ptob->get_natural_buffer_sizes (port, &read_buf_size, &write_buf_size); + scm_dynwind_end (); } /* Minimum buffer size is one byte. */ @@ -2310,7 +2372,12 @@ scm_end_input (SCM port) offset = - (scm_t_off) discarded; if (offset != 0) - SCM_PORT_TYPE (port)->seek (port, offset, SEEK_CUR); + { + scm_dynwind_begin (0); + scm_dynwind_acquire_port (port); + SCM_PORT_TYPE (port)->seek (port, offset, SEEK_CUR); + scm_dynwind_end (); + } } SCM_DEFINE (scm_force_output, "force-output", 0, 1, 0, @@ -2722,6 +2789,9 @@ scm_i_write_bytes (SCM port, SCM src, size_t start, size_t count) assert (count <= SCM_BYTEVECTOR_LENGTH (src)); assert (start + count <= SCM_BYTEVECTOR_LENGTH (src)); + scm_dynwind_begin (0); + scm_dynwind_acquire_port (port); + do { size_t ret = ptob->c_write (port, src, start + written, count - written); @@ -2733,6 +2803,8 @@ scm_i_write_bytes (SCM port, SCM src, size_t start, size_t count) } while (written < count); + scm_dynwind_end (); + assert (written == count); } @@ -3495,7 +3567,14 @@ SCM_DEFINE (scm_char_ready_p, "char-ready?", 0, 1, 0, scm_t_port_type *ptob = SCM_PORT_TYPE (port); if (ptob->input_waiting) - return scm_from_bool (ptob->input_waiting (port)); + { + SCM ret; + scm_dynwind_begin (0); + scm_dynwind_acquire_port (port); + ret = scm_from_bool (ptob->input_waiting (port)); + scm_dynwind_end (); + return ret; + } else return SCM_BOOL_T; } @@ -3549,7 +3628,10 @@ SCM_DEFINE (scm_seek, "seek", 3, 0, 0, /* If we are just querying the current position, avoid flushing buffers. We don't even need to require that the port supports random access. */ + scm_dynwind_begin (0); + scm_dynwind_acquire_port (fd_port); rv = ptob->seek (fd_port, off, how); + scm_dynwind_end (); rv -= scm_port_buffer_can_take (pt->read_buf); rv += scm_port_buffer_can_take (pt->write_buf); return scm_from_off_t_or_off64_t (rv); @@ -3562,7 +3644,10 @@ SCM_DEFINE (scm_seek, "seek", 3, 0, 0, scm_end_input (fd_port); scm_flush (fd_port); + scm_dynwind_begin (0); + scm_dynwind_acquire_port (fd_port); rv = ptob->seek (fd_port, off, how); + scm_dynwind_end (); /* Set stream-start flags according to new position. */ pt->at_stream_start_for_bom_read = (rv == 0); @@ -3668,7 +3753,10 @@ SCM_DEFINE (scm_truncate_file, "truncate-file", 1, 1, 0, scm_end_input (object); scm_flush (object); + scm_dynwind_begin (0); + scm_dynwind_acquire_port (object); ptob->truncate (object, c_length); + scm_dynwind_end (); rv = 0; } else