From 4d7041bfa9820a90f4fe234eea8ad183e1e8b2f2 Mon Sep 17 00:00:00 2001 From: Andy Wingo Date: Sun, 13 Mar 2022 13:54:58 +0100 Subject: [PATCH] Another attempt at parallel marking, avoiding the channel Not great though! --- parallel-marker.h | 104 +++++++++++++++++++++++++++++++++++++++++----- 1 file changed, 94 insertions(+), 10 deletions(-) diff --git a/parallel-marker.h b/parallel-marker.h index f935bc155..808a588ba 100644 --- a/parallel-marker.h +++ b/parallel-marker.h @@ -223,6 +223,8 @@ mark_deque_steal(struct mark_deque *q) { #define LOCAL_MARK_QUEUE_SIZE 64 #define LOCAL_MARK_QUEUE_MASK 63 +#define LOCAL_MARK_QUEUE_SHARE_THRESHOLD 48 +#define LOCAL_MARK_QUEUE_SHARE_AMOUNT 32 struct local_mark_queue { size_t read; size_t write; @@ -237,13 +239,21 @@ static inline void local_mark_queue_poison(struct local_mark_queue *q) { q->read = 0; q->write = LOCAL_MARK_QUEUE_SIZE; } +static inline size_t +local_mark_queue_size(struct local_mark_queue *q) { + return q->write - q->read; +} static inline int local_mark_queue_empty(struct local_mark_queue *q) { - return q->read == q->write; + return local_mark_queue_size(q) == 0; +} +static inline int +local_mark_queue_should_share(struct local_mark_queue *q) { + return local_mark_queue_size(q) >= LOCAL_MARK_QUEUE_SHARE_THRESHOLD; } static inline int local_mark_queue_full(struct local_mark_queue *q) { - return q->read + LOCAL_MARK_QUEUE_SIZE == q->write; + return local_mark_queue_size(q) >= LOCAL_MARK_QUEUE_SIZE; } static inline void local_mark_queue_push(struct local_mark_queue *q, uintptr_t v) { @@ -313,7 +323,9 @@ mark_notify_wait(struct mark_notify *notify) { } // Spurious wakeup is OK. + DEBUG("-- marker waiting\n"); pthread_cond_wait(¬ify->cond, ¬ify->lock); + DEBUG("-- marker woke\n"); res = MARK_NOTIFY_WOKE; notify->pending = 0; @@ -324,10 +336,12 @@ done: static void mark_notify_wake(struct mark_notify *notify) { + DEBUG("== notifying pending wake!\n"); pthread_mutex_lock(¬ify->lock); notify->pending = 1; pthread_cond_signal(¬ify->cond); pthread_mutex_unlock(¬ify->lock); + DEBUG("== notifying pending wake done\n"); } // A mostly lock-free multi-producer, single consumer queue, largely @@ -486,7 +500,9 @@ struct mark_worker { struct marker { struct mark_deque deque; + pthread_mutex_t deque_writer_lock; struct mark_channel overflow; + atomic_size_t active_markers; size_t worker_count; struct mark_worker workers[MARK_WORKERS_MAX_COUNT]; }; @@ -586,6 +602,7 @@ marker_init(struct context *cx) { struct marker *marker = context_marker(cx); if (!mark_deque_init(&marker->deque)) return 0; + pthread_mutex_init(&marker->deque_writer_lock, NULL); mark_channel_init(&marker->overflow); size_t desired_worker_count = 0; if (getenv("GC_MARKERS")) @@ -615,16 +632,80 @@ static inline void trace_one(struct gcobj *obj, void *mark_data) ALWAYS_INLINE; static inline int mark_object(struct context *cx, struct gcobj *obj) ALWAYS_INLINE; +static inline void +marker_share(struct local_marker *mark) { + struct marker *marker = context_marker(mark->cx); + DEBUG("marker %p: trying to share\n", mark->worker); + if (pthread_mutex_trylock(&marker->deque_writer_lock) != 0) { + DEBUG("marker %p: trylock failed\n", mark->worker); + if (local_mark_queue_full(&mark->local)) { + DEBUG("marker %p: forcing lock acquisition\n", mark->worker); + pthread_mutex_lock(&marker->deque_writer_lock); + } else + return; + } + + DEBUG("marker %p: sharing\n", mark->worker); + for (size_t i = 0; i < LOCAL_MARK_QUEUE_SHARE_AMOUNT; i++) + mark_deque_push(&marker->deque, local_mark_queue_pop(&mark->local)); + + pthread_mutex_unlock(&marker->deque_writer_lock); +} + static inline void marker_visit(void **loc, void *mark_data) { struct local_marker *mark = mark_data; struct gcobj *obj = *loc; if (obj && mark_object(mark->cx, obj)) { - if (!local_mark_queue_full(&mark->local)) - local_mark_queue_push(&mark->local, (uintptr_t)obj); - else { - mark_channel_write(&mark->worker->writer, (uintptr_t)obj); + if (local_mark_queue_should_share(&mark->local)) + marker_share(mark); + local_mark_queue_push(&mark->local, (uintptr_t)obj); + } +} + +static uintptr_t +mark_worker_steal(struct local_marker *mark) { + DEBUG("marker %p: trying to steal\n", mark->worker); + while (1) { + uintptr_t addr = mark_deque_steal(mark->deque); + if (addr == mark_deque_empty) { + struct marker *marker = context_marker(mark->cx); + if (atomic_fetch_sub_explicit(&marker->active_markers, 1, + memory_order_relaxed) == 1) { + DEBUG(" ->> marker %p: DONE (no spinning) <<-\n", mark->worker); + return 0; + } + size_t spin_count = 0; + while (1) { + addr = mark_deque_steal(mark->deque); + if (addr != mark_deque_empty) { + DEBUG("marker %p: spinning got 0x%zx\n", mark->worker, addr); + atomic_fetch_add_explicit(&marker->active_markers, 1, + memory_order_relaxed); + break; + } + if (atomic_load_explicit(&marker->active_markers, + memory_order_relaxed) == 0) { + DEBUG(" ->> marker %p: DONE <<-\n", mark->worker); + return 0; + } + // spin + DEBUG("marker %p: spinning #%zu\n", mark->worker, spin_count); + if (spin_count < 10) + __builtin_ia32_pause(); + else if (spin_count < 20) + sched_yield(); + else if (spin_count < 40) + usleep(0); + else + usleep(1); + spin_count++; + } } + DEBUG("marker %p: stealing got 0x%zx\n", mark->worker, addr); + if (addr == mark_deque_abort) + continue; + return addr; } } @@ -643,11 +724,9 @@ mark_worker_mark(struct mark_worker *worker) { if (!local_mark_queue_empty(&mark.local)) { addr = local_mark_queue_pop(&mark.local); } else { - addr = mark_deque_steal(mark.deque); - if (addr == mark_deque_empty) + addr = mark_worker_steal(&mark); + if (!addr) break; - if (addr == mark_deque_abort) - continue; } trace_one((struct gcobj*)addr, &mark); n++; @@ -671,16 +750,21 @@ marker_trace(struct context *cx) { DEBUG("starting trace; %zu workers\n", marker->worker_count); while (1) { DEBUG("waking workers\n"); + atomic_store_explicit(&marker->active_markers, marker->worker_count, + memory_order_release); for (size_t i = 0; i < marker->worker_count; i++) mark_worker_request_mark(&marker->workers[i]); DEBUG("running controller loop\n"); size_t n = 0; while (1) { + DEBUG("controller: popping\n"); uintptr_t addr = mark_channel_pop(&marker->overflow); + DEBUG("controller: popped 0x%zx\n", addr); if (!addr) break; mark_deque_push(&marker->deque, addr); + DEBUG("controller: pushed to deque\n"); n++; } DEBUG("controller loop done, %zu objects sent for rebalancing\n", n);