From 7ce07de6701ccbf7cbd4be287326fab4cc728127 Mon Sep 17 00:00:00 2001 From: Andy Wingo Date: Sat, 12 Mar 2022 21:09:17 +0100 Subject: [PATCH] First crack at parallel marking --- Makefile | 6 +- mark-sweep.h | 4 +- parallel-marker.h | 421 +++++++++++++++++++++++++++++++++++++++++++--- serial-marker.h | 7 +- 4 files changed, 406 insertions(+), 32 deletions(-) diff --git a/Makefile b/Makefile index 2846749e7..ecf35b3e3 100644 --- a/Makefile +++ b/Makefile @@ -2,7 +2,7 @@ TESTS=gcbench # MT_GCBench MT_GCBench2 COLLECTORS=bdw semi mark-sweep parallel-mark-sweep CC=gcc -CFLAGS=-Wall -O2 -g +CFLAGS=-Wall -O2 -g -fno-strict-aliasing ALL_TESTS=$(foreach COLLECTOR,$(COLLECTORS),$(addprefix $(COLLECTOR)-,$(TESTS))) @@ -15,10 +15,10 @@ semi-%: semi.h precise-roots.h %.c $(CC) $(CFLAGS) -I. -DNDEBUG -DGC_SEMI -o $@ $*.c mark-sweep-%: mark-sweep.h precise-roots.h serial-marker.h assert.h debug.h %.c - $(CC) $(CFLAGS) -I. -DNDEBUG -DGC_MARK_SWEEP -o $@ $*.c + $(CC) $(CFLAGS) -I. -Wno-unused -DNDEBUG -DGC_MARK_SWEEP -o $@ $*.c parallel-mark-sweep-%: mark-sweep.h precise-roots.h parallel-marker.h assert.h debug.h %.c - $(CC) $(CFLAGS) -I. -DNDEBUG -DGC_PARALLEL_MARK_SWEEP -o $@ $*.c + $(CC) $(CFLAGS) -I. -Wno-unused -DNDEBUG -DGC_PARALLEL_MARK_SWEEP -lpthread -o $@ $*.c check: $(addprefix test-$(TARGET),$(TARGETS)) diff --git a/mark-sweep.h b/mark-sweep.h index 6241d8ab6..2c90f48eb 100644 --- a/mark-sweep.h +++ b/mark-sweep.h @@ -144,7 +144,7 @@ static inline int mark_object(struct context *cx, struct gcobj *obj) { return 1; } -static void trace_one(struct gcobj *obj, void *mark_data) { +static inline void trace_one(struct gcobj *obj, void *mark_data) { switch (tag_live_alloc_kind(obj->tag)) { #define SCAN_OBJECT(name, Name, NAME) \ case ALLOC_KIND_##NAME: \ @@ -168,7 +168,7 @@ static void collect(struct context *cx) { marker_prepare(cx); for (struct handle *h = cx->roots; h; h = h->next) marker_visit_root(&h->v, cx); - marker_trace(cx, trace_one); + marker_trace(cx); marker_release(cx); DEBUG("done marking\n"); cx->sweep = cx->heap_base; diff --git a/parallel-marker.h b/parallel-marker.h index 805522ae7..f935bc155 100644 --- a/parallel-marker.h +++ b/parallel-marker.h @@ -1,6 +1,7 @@ #ifndef SERIAL_TRACE_H #define SERIAL_TRACE_H +#include #include #include #include @@ -253,23 +254,356 @@ local_mark_queue_pop(struct local_mark_queue *q) { return q->data[q->read++ & LOCAL_MARK_QUEUE_MASK]; } +struct mark_notify { + size_t notifiers; + int pending; + pthread_mutex_t lock; + pthread_cond_t cond; +}; + +static void +mark_notify_init(struct mark_notify *notify) { + notify->notifiers = 0; + notify->pending = 0; + pthread_mutex_init(¬ify->lock, NULL); + pthread_cond_init(¬ify->cond, NULL); +} + +static void +mark_notify_destroy(struct mark_notify *notify) { + pthread_mutex_destroy(¬ify->lock); + pthread_cond_destroy(¬ify->cond); +} + +static void +mark_notify_add_notifier(struct mark_notify *notify) { + pthread_mutex_lock(¬ify->lock); + notify->notifiers++; + pthread_mutex_unlock(¬ify->lock); +} + +static void +mark_notify_remove_notifier(struct mark_notify *notify) { + pthread_mutex_lock(¬ify->lock); + notify->notifiers--; + if (notify->notifiers == 0) + pthread_cond_signal(¬ify->cond); + pthread_mutex_unlock(¬ify->lock); +} + +enum mark_notify_status { + MARK_NOTIFY_DONE, + MARK_NOTIFY_WOKE +}; +static enum mark_notify_status +mark_notify_wait(struct mark_notify *notify) { + enum mark_notify_status res; + + pthread_mutex_lock(¬ify->lock); + + if (notify->pending) { + res = MARK_NOTIFY_WOKE; + notify->pending = 0; + goto done; + } + + if (notify->notifiers == 0) { + res = MARK_NOTIFY_DONE; + goto done; + } + + // Spurious wakeup is OK. + pthread_cond_wait(¬ify->cond, ¬ify->lock); + res = MARK_NOTIFY_WOKE; + notify->pending = 0; + +done: + pthread_mutex_unlock(¬ify->lock); + return res; +} + +static void +mark_notify_wake(struct mark_notify *notify) { + pthread_mutex_lock(¬ify->lock); + notify->pending = 1; + pthread_cond_signal(¬ify->cond); + pthread_mutex_unlock(¬ify->lock); +} + +// A mostly lock-free multi-producer, single consumer queue, largely +// inspired by Rust's std::sync::channel. +// +// https://www.1024cores.net/home/lock-free-algorithms/queues/non-intrusive-mpsc-node-based-queue + +struct mark_channel_message { + struct mark_channel_message * _Atomic next; + // Payload will be zero only for free messages, and for the sentinel + // message. + atomic_uintptr_t payload; +}; + +#define MARK_CHANNEL_WRITER_MESSAGE_COUNT ((size_t)1024) + +struct mark_channel { + union { + struct mark_channel_message* _Atomic head; + char head_padding[64]; + }; + union { + atomic_size_t length; + char length_padding[64]; + }; + struct mark_channel_message* tail; + struct mark_channel_message sentinel; + + struct mark_notify notify; +}; + +struct mark_channel_writer { + struct mark_channel_message messages[MARK_CHANNEL_WRITER_MESSAGE_COUNT]; + size_t next_message; + + struct mark_channel *channel; +}; + +static void +mark_channel_init(struct mark_channel *ch) { + memset(ch, 0, sizeof(*ch)); + atomic_init(&ch->head, &ch->sentinel); + atomic_init(&ch->length, 0); + mark_notify_init(&ch->notify); + ch->tail = &ch->sentinel; +} + +static void +mark_channel_destroy(struct mark_channel *ch) { + mark_notify_destroy(&ch->notify); +} + +static void +mark_channel_push(struct mark_channel *ch, struct mark_channel_message *msg) { + ASSERT(msg->payload); + atomic_store_explicit(&msg->next, NULL, memory_order_relaxed); + + struct mark_channel_message *prev = + atomic_exchange_explicit(&ch->head, msg, memory_order_acq_rel); + + atomic_store_explicit(&prev->next, msg, memory_order_release); + + size_t old_length = + atomic_fetch_add_explicit(&ch->length, 1, memory_order_relaxed); + if (old_length == 0) + mark_notify_wake(&ch->notify); +} + +static uintptr_t +mark_channel_try_pop(struct mark_channel *ch) { + struct mark_channel_message *tail = ch->tail; + struct mark_channel_message *next = + atomic_load_explicit(&tail->next, memory_order_acquire); + + if (next) { + ch->tail = next; + uintptr_t payload = + atomic_load_explicit(&next->payload, memory_order_acquire); + ASSERT(payload != 0); + // Indicate to the writer that the old tail node can now be re-used. + // Note though that the new tail node is floating garbage; its + // payload has been popped but the node itself is still part of the + // queue. Care has to be taken to ensure that any remaining queue + // entries are popped before the associated channel writer's + // messages are deallocated. + atomic_store_explicit(&tail->payload, 0, memory_order_release); + atomic_fetch_sub_explicit(&ch->length, 1, memory_order_relaxed); + return payload; + } + + // if (atomic_load_explicit(&ch->head) == tail) return EMPTY else INCONSISTENT + return 0; +} + +static uintptr_t +mark_channel_pop(struct mark_channel *ch) { + while (1) { + uintptr_t ret = mark_channel_try_pop(ch); + if (ret) + return ret; + + if (atomic_load_explicit(&ch->length, memory_order_relaxed) == 0) { + if (mark_notify_wait(&ch->notify) == MARK_NOTIFY_DONE) + return 0; + } + } +} + +static void +mark_channel_writer_init(struct mark_channel *ch, + struct mark_channel_writer *writer) { + memset(writer, 0, sizeof(*writer)); + writer->channel = ch; +} + +static void +mark_channel_write(struct mark_channel_writer *writer, uintptr_t payload) { + ASSERT(payload); + struct mark_channel_message *msg = &writer->messages[writer->next_message]; + while (atomic_load_explicit(&msg->payload, memory_order_acquire) != 0) + sched_yield(); + writer->next_message++; + if (writer->next_message == MARK_CHANNEL_WRITER_MESSAGE_COUNT) + writer->next_message = 0; + atomic_store_explicit(&msg->payload, payload, memory_order_release); + mark_channel_push(writer->channel, msg); +} + +static void +mark_channel_writer_activate(struct mark_channel_writer *writer) { + mark_notify_add_notifier(&writer->channel->notify); +} +static void +mark_channel_writer_deactivate(struct mark_channel_writer *writer) { + mark_notify_remove_notifier(&writer->channel->notify); +} + +enum mark_worker_state { + MARK_WORKER_STOPPED, + MARK_WORKER_IDLE, + MARK_WORKER_MARKING, + MARK_WORKER_STOPPING, + MARK_WORKER_DEAD +}; + +struct mark_worker { + struct context *cx; + pthread_t thread; + enum mark_worker_state state; + pthread_mutex_t lock; + pthread_cond_t cond; + struct mark_channel_writer writer; +}; + +#define MARK_WORKERS_MAX_COUNT 8 + struct marker { struct mark_deque deque; + struct mark_channel overflow; + size_t worker_count; + struct mark_worker workers[MARK_WORKERS_MAX_COUNT]; }; struct local_marker { - struct local_mark_queue local; + struct mark_worker *worker; struct mark_deque *deque; struct context *cx; + struct local_mark_queue local; }; struct context; static inline struct marker* context_marker(struct context *cx); +static size_t number_of_current_processors(void) { return 1; } + +static void +mark_worker_init(struct mark_worker *worker, struct context *cx, + struct marker *marker) { + worker->cx = cx; + worker->thread = 0; + worker->state = MARK_WORKER_STOPPED; + pthread_mutex_init(&worker->lock, NULL); + pthread_cond_init(&worker->cond, NULL); + mark_channel_writer_init(&marker->overflow, &worker->writer); +} + +static void mark_worker_mark(struct mark_worker *worker); + +static void* +mark_worker_thread(void *data) { + struct mark_worker *worker = data; + + pthread_mutex_lock(&worker->lock); + while (1) { + switch (worker->state) { + case MARK_WORKER_IDLE: + pthread_cond_wait(&worker->cond, &worker->lock); + break; + case MARK_WORKER_MARKING: + mark_worker_mark(worker); + worker->state = MARK_WORKER_IDLE; + break; + case MARK_WORKER_STOPPING: + worker->state = MARK_WORKER_DEAD; + pthread_mutex_unlock(&worker->lock); + return NULL; + default: + abort(); + } + } +} + +static int +mark_worker_spawn(struct mark_worker *worker) { + pthread_mutex_lock(&worker->lock); + ASSERT(worker->state == MARK_WORKER_STOPPED); + worker->state = MARK_WORKER_IDLE; + pthread_mutex_unlock(&worker->lock); + + if (pthread_create(&worker->thread, NULL, mark_worker_thread, worker)) { + perror("spawning marker thread failed"); + worker->state = MARK_WORKER_STOPPED; + return 0; + } + + return 1; +} + +static void +mark_worker_request_mark(struct mark_worker *worker) { + pthread_mutex_lock(&worker->lock); + ASSERT(worker->state == MARK_WORKER_IDLE); + mark_channel_writer_activate(&worker->writer); + worker->state = MARK_WORKER_MARKING; + pthread_cond_signal(&worker->cond); + pthread_mutex_unlock(&worker->lock); +} + +static void +mark_worker_finished_marking(struct mark_worker *worker) { + // Signal controller that we are done with marking. + mark_channel_writer_deactivate(&worker->writer); +} + +static void +mark_worker_request_stop(struct mark_worker *worker) { + pthread_mutex_lock(&worker->lock); + ASSERT(worker->state == MARK_WORKER_IDLE); + worker->state = MARK_WORKER_STOPPING; + pthread_cond_signal(&worker->cond); + pthread_mutex_unlock(&worker->lock); +} + static int marker_init(struct context *cx) { - return mark_deque_init(&context_marker(cx)->deque); + struct marker *marker = context_marker(cx); + if (!mark_deque_init(&marker->deque)) + return 0; + mark_channel_init(&marker->overflow); + size_t desired_worker_count = 0; + if (getenv("GC_MARKERS")) + desired_worker_count = atoi(getenv("GC_MARKERS")); + if (desired_worker_count == 0) + desired_worker_count = number_of_current_processors(); + if (desired_worker_count > MARK_WORKERS_MAX_COUNT) + desired_worker_count = MARK_WORKERS_MAX_COUNT; + for (size_t i = 0; i < desired_worker_count; i++) { + mark_worker_init(&marker->workers[i], cx, marker); + if (mark_worker_spawn(&marker->workers[i])) + marker->worker_count++; + else + break; + } + return marker->worker_count > 0; } + static void marker_prepare(struct context *cx) {} static void marker_release(struct context *cx) { mark_deque_release(&context_marker(cx)->deque); @@ -277,9 +611,7 @@ static void marker_release(struct context *cx) { struct gcobj; static inline void marker_visit(void **loc, void *mark_data) ALWAYS_INLINE; -static inline void marker_trace(struct context *cx, - void (*)(struct gcobj *, void *)) - ALWAYS_INLINE; +static inline void trace_one(struct gcobj *obj, void *mark_data) ALWAYS_INLINE; static inline int mark_object(struct context *cx, struct gcobj *obj) ALWAYS_INLINE; @@ -290,26 +622,22 @@ marker_visit(void **loc, void *mark_data) { if (obj && mark_object(mark->cx, obj)) { if (!local_mark_queue_full(&mark->local)) local_mark_queue_push(&mark->local, (uintptr_t)obj); - else - mark_deque_push(mark->deque, (uintptr_t)obj); + else { + mark_channel_write(&mark->worker->writer, (uintptr_t)obj); + } } } -static inline void -marker_visit_root(void **loc, struct context *cx) { - struct local_marker mark; - local_mark_queue_poison(&mark.local); - mark.deque = &context_marker(cx)->deque; - mark.cx = cx; - marker_visit(loc, &mark); -} -static inline void -marker_trace(struct context *cx, - void (*trace_one)(struct gcobj *, void *)) { - struct local_marker mark; - local_mark_queue_init(&mark.local); - mark.deque = &context_marker(cx)->deque; - mark.cx = cx; +static void +mark_worker_mark(struct mark_worker *worker) { + struct local_marker mark; + mark.worker = worker; + mark.deque = &context_marker(worker->cx)->deque; + mark.cx = worker->cx; + local_mark_queue_init(&mark.local); + + size_t n = 0; + DEBUG("marker %p: running mark loop\n", worker); while (1) { uintptr_t addr; if (!local_mark_queue_empty(&mark.local)) { @@ -322,7 +650,56 @@ marker_trace(struct context *cx, continue; } trace_one((struct gcobj*)addr, &mark); + n++; } + DEBUG("marker %p: done marking, %zu objects traced\n", worker, n); + + mark_worker_finished_marking(worker); +} + +static inline void +marker_visit_root(void **loc, struct context *cx) { + struct gcobj *obj = *loc; + if (obj && mark_object(cx, obj)) + mark_deque_push(&context_marker(cx)->deque, (uintptr_t)obj); +} + +static inline void +marker_trace(struct context *cx) { + struct marker *marker = context_marker(cx); + + DEBUG("starting trace; %zu workers\n", marker->worker_count); + while (1) { + DEBUG("waking workers\n"); + for (size_t i = 0; i < marker->worker_count; i++) + mark_worker_request_mark(&marker->workers[i]); + + DEBUG("running controller loop\n"); + size_t n = 0; + while (1) { + uintptr_t addr = mark_channel_pop(&marker->overflow); + if (!addr) + break; + mark_deque_push(&marker->deque, addr); + n++; + } + DEBUG("controller loop done, %zu objects sent for rebalancing\n", n); + + // As in the ISMM'16 paper, it's possible that a worker decides to + // stop because the deque is empty, but actually there was an + // in-flight object in the mark channel that we hadn't been able to + // push yet. Loop in that case. + { + uintptr_t addr = mark_deque_try_pop(&marker->deque); + if (addr == mark_deque_empty) + break; + DEBUG("--> controller looping again due to slop\n"); + mark_deque_push(&marker->deque, addr); + } + } + ASSERT(atomic_load(&marker->overflow.length) == 0); + ASSERT(atomic_load(&marker->overflow.head) == marker->overflow.tail); + DEBUG("trace finished\n"); } #endif // SERIAL_MARK_H diff --git a/serial-marker.h b/serial-marker.h index 1c2e305a7..719ba1c51 100644 --- a/serial-marker.h +++ b/serial-marker.h @@ -126,9 +126,7 @@ static void marker_release(struct context *cx) { struct gcobj; static inline void marker_visit(void **loc, void *mark_data) ALWAYS_INLINE; -static inline void marker_trace(struct context *cx, - void (*)(struct gcobj *, void *)) - ALWAYS_INLINE; +static inline void trace_one(struct gcobj *obj, void *mark_data) ALWAYS_INLINE; static inline int mark_object(struct context *cx, struct gcobj *obj) ALWAYS_INLINE; @@ -144,8 +142,7 @@ marker_visit_root(void **loc, struct context *cx) { marker_visit(loc, cx); } static inline void -marker_trace(struct context *cx, - void (*trace_one)(struct gcobj *, void *)) { +marker_trace(struct context *cx) { struct gcobj *obj; while ((obj = mark_queue_pop(&context_marker(cx)->queue))) trace_one(obj, cx);