diff --git a/parallel-marker.h b/parallel-marker.h index 808a588ba..1bfbbbd93 100644 --- a/parallel-marker.h +++ b/parallel-marker.h @@ -1,5 +1,5 @@ -#ifndef SERIAL_TRACE_H -#define SERIAL_TRACE_H +#ifndef PARALLEL_MARKER_H +#define PARALLEL_MARKER_H #include #include @@ -215,16 +215,23 @@ mark_deque_steal(struct mark_deque *q) { return x; } +static int +mark_deque_can_steal(struct mark_deque *q) { + size_t t = LOAD_ACQUIRE(&q->top); + atomic_thread_fence(memory_order_seq_cst); + size_t b = LOAD_ACQUIRE(&q->bottom); + return t < b; +} + #undef LOAD_RELAXED #undef STORE_RELAXED #undef LOAD_ACQUIRE #undef STORE_RELEASE #undef LOAD_CONSUME -#define LOCAL_MARK_QUEUE_SIZE 64 -#define LOCAL_MARK_QUEUE_MASK 63 -#define LOCAL_MARK_QUEUE_SHARE_THRESHOLD 48 -#define LOCAL_MARK_QUEUE_SHARE_AMOUNT 32 +#define LOCAL_MARK_QUEUE_SIZE 1024 +#define LOCAL_MARK_QUEUE_MASK (LOCAL_MARK_QUEUE_SIZE - 1) +#define LOCAL_MARK_QUEUE_SHARE_AMOUNT (LOCAL_MARK_QUEUE_SIZE * 3 / 4) struct local_mark_queue { size_t read; size_t write; @@ -248,10 +255,6 @@ local_mark_queue_empty(struct local_mark_queue *q) { return local_mark_queue_size(q) == 0; } static inline int -local_mark_queue_should_share(struct local_mark_queue *q) { - return local_mark_queue_size(q) >= LOCAL_MARK_QUEUE_SHARE_THRESHOLD; -} -static inline int local_mark_queue_full(struct local_mark_queue *q) { return local_mark_queue_size(q) >= LOCAL_MARK_QUEUE_SIZE; } @@ -264,221 +267,6 @@ local_mark_queue_pop(struct local_mark_queue *q) { return q->data[q->read++ & LOCAL_MARK_QUEUE_MASK]; } -struct mark_notify { - size_t notifiers; - int pending; - pthread_mutex_t lock; - pthread_cond_t cond; -}; - -static void -mark_notify_init(struct mark_notify *notify) { - notify->notifiers = 0; - notify->pending = 0; - pthread_mutex_init(¬ify->lock, NULL); - pthread_cond_init(¬ify->cond, NULL); -} - -static void -mark_notify_destroy(struct mark_notify *notify) { - pthread_mutex_destroy(¬ify->lock); - pthread_cond_destroy(¬ify->cond); -} - -static void -mark_notify_add_notifier(struct mark_notify *notify) { - pthread_mutex_lock(¬ify->lock); - notify->notifiers++; - pthread_mutex_unlock(¬ify->lock); -} - -static void -mark_notify_remove_notifier(struct mark_notify *notify) { - pthread_mutex_lock(¬ify->lock); - notify->notifiers--; - if (notify->notifiers == 0) - pthread_cond_signal(¬ify->cond); - pthread_mutex_unlock(¬ify->lock); -} - -enum mark_notify_status { - MARK_NOTIFY_DONE, - MARK_NOTIFY_WOKE -}; -static enum mark_notify_status -mark_notify_wait(struct mark_notify *notify) { - enum mark_notify_status res; - - pthread_mutex_lock(¬ify->lock); - - if (notify->pending) { - res = MARK_NOTIFY_WOKE; - notify->pending = 0; - goto done; - } - - if (notify->notifiers == 0) { - res = MARK_NOTIFY_DONE; - goto done; - } - - // Spurious wakeup is OK. - DEBUG("-- marker waiting\n"); - pthread_cond_wait(¬ify->cond, ¬ify->lock); - DEBUG("-- marker woke\n"); - res = MARK_NOTIFY_WOKE; - notify->pending = 0; - -done: - pthread_mutex_unlock(¬ify->lock); - return res; -} - -static void -mark_notify_wake(struct mark_notify *notify) { - DEBUG("== notifying pending wake!\n"); - pthread_mutex_lock(¬ify->lock); - notify->pending = 1; - pthread_cond_signal(¬ify->cond); - pthread_mutex_unlock(¬ify->lock); - DEBUG("== notifying pending wake done\n"); -} - -// A mostly lock-free multi-producer, single consumer queue, largely -// inspired by Rust's std::sync::channel. -// -// https://www.1024cores.net/home/lock-free-algorithms/queues/non-intrusive-mpsc-node-based-queue - -struct mark_channel_message { - struct mark_channel_message * _Atomic next; - // Payload will be zero only for free messages, and for the sentinel - // message. - atomic_uintptr_t payload; -}; - -#define MARK_CHANNEL_WRITER_MESSAGE_COUNT ((size_t)1024) - -struct mark_channel { - union { - struct mark_channel_message* _Atomic head; - char head_padding[64]; - }; - union { - atomic_size_t length; - char length_padding[64]; - }; - struct mark_channel_message* tail; - struct mark_channel_message sentinel; - - struct mark_notify notify; -}; - -struct mark_channel_writer { - struct mark_channel_message messages[MARK_CHANNEL_WRITER_MESSAGE_COUNT]; - size_t next_message; - - struct mark_channel *channel; -}; - -static void -mark_channel_init(struct mark_channel *ch) { - memset(ch, 0, sizeof(*ch)); - atomic_init(&ch->head, &ch->sentinel); - atomic_init(&ch->length, 0); - mark_notify_init(&ch->notify); - ch->tail = &ch->sentinel; -} - -static void -mark_channel_destroy(struct mark_channel *ch) { - mark_notify_destroy(&ch->notify); -} - -static void -mark_channel_push(struct mark_channel *ch, struct mark_channel_message *msg) { - ASSERT(msg->payload); - atomic_store_explicit(&msg->next, NULL, memory_order_relaxed); - - struct mark_channel_message *prev = - atomic_exchange_explicit(&ch->head, msg, memory_order_acq_rel); - - atomic_store_explicit(&prev->next, msg, memory_order_release); - - size_t old_length = - atomic_fetch_add_explicit(&ch->length, 1, memory_order_relaxed); - if (old_length == 0) - mark_notify_wake(&ch->notify); -} - -static uintptr_t -mark_channel_try_pop(struct mark_channel *ch) { - struct mark_channel_message *tail = ch->tail; - struct mark_channel_message *next = - atomic_load_explicit(&tail->next, memory_order_acquire); - - if (next) { - ch->tail = next; - uintptr_t payload = - atomic_load_explicit(&next->payload, memory_order_acquire); - ASSERT(payload != 0); - // Indicate to the writer that the old tail node can now be re-used. - // Note though that the new tail node is floating garbage; its - // payload has been popped but the node itself is still part of the - // queue. Care has to be taken to ensure that any remaining queue - // entries are popped before the associated channel writer's - // messages are deallocated. - atomic_store_explicit(&tail->payload, 0, memory_order_release); - atomic_fetch_sub_explicit(&ch->length, 1, memory_order_relaxed); - return payload; - } - - // if (atomic_load_explicit(&ch->head) == tail) return EMPTY else INCONSISTENT - return 0; -} - -static uintptr_t -mark_channel_pop(struct mark_channel *ch) { - while (1) { - uintptr_t ret = mark_channel_try_pop(ch); - if (ret) - return ret; - - if (atomic_load_explicit(&ch->length, memory_order_relaxed) == 0) { - if (mark_notify_wait(&ch->notify) == MARK_NOTIFY_DONE) - return 0; - } - } -} - -static void -mark_channel_writer_init(struct mark_channel *ch, - struct mark_channel_writer *writer) { - memset(writer, 0, sizeof(*writer)); - writer->channel = ch; -} - -static void -mark_channel_write(struct mark_channel_writer *writer, uintptr_t payload) { - ASSERT(payload); - struct mark_channel_message *msg = &writer->messages[writer->next_message]; - while (atomic_load_explicit(&msg->payload, memory_order_acquire) != 0) - sched_yield(); - writer->next_message++; - if (writer->next_message == MARK_CHANNEL_WRITER_MESSAGE_COUNT) - writer->next_message = 0; - atomic_store_explicit(&msg->payload, payload, memory_order_release); - mark_channel_push(writer->channel, msg); -} - -static void -mark_channel_writer_activate(struct mark_channel_writer *writer) { - mark_notify_add_notifier(&writer->channel->notify); -} -static void -mark_channel_writer_deactivate(struct mark_channel_writer *writer) { - mark_notify_remove_notifier(&writer->channel->notify); -} - enum mark_worker_state { MARK_WORKER_STOPPED, MARK_WORKER_IDLE, @@ -489,27 +277,30 @@ enum mark_worker_state { struct mark_worker { struct context *cx; + size_t id; + size_t steal_id; pthread_t thread; enum mark_worker_state state; pthread_mutex_t lock; pthread_cond_t cond; - struct mark_channel_writer writer; + struct mark_deque deque; }; #define MARK_WORKERS_MAX_COUNT 8 struct marker { - struct mark_deque deque; - pthread_mutex_t deque_writer_lock; - struct mark_channel overflow; atomic_size_t active_markers; size_t worker_count; + atomic_size_t running_markers; + long count; + pthread_mutex_t lock; + pthread_cond_t cond; struct mark_worker workers[MARK_WORKERS_MAX_COUNT]; }; struct local_marker { struct mark_worker *worker; - struct mark_deque *deque; + struct mark_deque *share_deque; struct context *cx; struct local_mark_queue local; }; @@ -519,15 +310,17 @@ static inline struct marker* context_marker(struct context *cx); static size_t number_of_current_processors(void) { return 1; } -static void +static int mark_worker_init(struct mark_worker *worker, struct context *cx, - struct marker *marker) { + struct marker *marker, size_t id) { worker->cx = cx; + worker->id = id; + worker->steal_id = 0; worker->thread = 0; worker->state = MARK_WORKER_STOPPED; pthread_mutex_init(&worker->lock, NULL); pthread_cond_init(&worker->cond, NULL); - mark_channel_writer_init(&marker->overflow, &worker->writer); + return mark_deque_init(&worker->deque); } static void mark_worker_mark(struct mark_worker *worker); @@ -574,9 +367,10 @@ mark_worker_spawn(struct mark_worker *worker) { static void mark_worker_request_mark(struct mark_worker *worker) { + struct marker *marker = context_marker(worker->cx); + pthread_mutex_lock(&worker->lock); ASSERT(worker->state == MARK_WORKER_IDLE); - mark_channel_writer_activate(&worker->writer); worker->state = MARK_WORKER_MARKING; pthread_cond_signal(&worker->cond); pthread_mutex_unlock(&worker->lock); @@ -585,7 +379,14 @@ mark_worker_request_mark(struct mark_worker *worker) { static void mark_worker_finished_marking(struct mark_worker *worker) { // Signal controller that we are done with marking. - mark_channel_writer_deactivate(&worker->writer); + struct marker *marker = context_marker(worker->cx); + + if (atomic_fetch_sub(&marker->running_markers, 1) == 1) { + pthread_mutex_lock(&marker->lock); + marker->count++; + pthread_cond_signal(&marker->cond); + pthread_mutex_unlock(&marker->lock); + } } static void @@ -600,10 +401,11 @@ mark_worker_request_stop(struct mark_worker *worker) { static int marker_init(struct context *cx) { struct marker *marker = context_marker(cx); - if (!mark_deque_init(&marker->deque)) - return 0; - pthread_mutex_init(&marker->deque_writer_lock, NULL); - mark_channel_init(&marker->overflow); + atomic_init(&marker->active_markers, 0); + atomic_init(&marker->running_markers, 0); + marker->count = 0; + pthread_mutex_init(&marker->lock, NULL); + pthread_cond_init(&marker->cond, NULL); size_t desired_worker_count = 0; if (getenv("GC_MARKERS")) desired_worker_count = atoi(getenv("GC_MARKERS")); @@ -612,7 +414,8 @@ marker_init(struct context *cx) { if (desired_worker_count > MARK_WORKERS_MAX_COUNT) desired_worker_count = MARK_WORKERS_MAX_COUNT; for (size_t i = 0; i < desired_worker_count; i++) { - mark_worker_init(&marker->workers[i], cx, marker); + if (!mark_worker_init(&marker->workers[i], cx, marker, i)) + break; if (mark_worker_spawn(&marker->workers[i])) marker->worker_count++; else @@ -621,9 +424,15 @@ marker_init(struct context *cx) { return marker->worker_count > 0; } -static void marker_prepare(struct context *cx) {} +static void marker_prepare(struct context *cx) { + struct marker *marker = context_marker(cx); + for (size_t i = 0; i < marker->worker_count; i++) + marker->workers[i].steal_id = 0; +} static void marker_release(struct context *cx) { - mark_deque_release(&context_marker(cx)->deque); + struct marker *marker = context_marker(cx); + for (size_t i = 0; i < marker->worker_count; i++) + mark_deque_release(&marker->workers[i].deque); } struct gcobj; @@ -634,22 +443,9 @@ static inline int mark_object(struct context *cx, static inline void marker_share(struct local_marker *mark) { - struct marker *marker = context_marker(mark->cx); - DEBUG("marker %p: trying to share\n", mark->worker); - if (pthread_mutex_trylock(&marker->deque_writer_lock) != 0) { - DEBUG("marker %p: trylock failed\n", mark->worker); - if (local_mark_queue_full(&mark->local)) { - DEBUG("marker %p: forcing lock acquisition\n", mark->worker); - pthread_mutex_lock(&marker->deque_writer_lock); - } else - return; - } - - DEBUG("marker %p: sharing\n", mark->worker); + DEBUG("marker #%zu: sharing\n", mark->worker->id); for (size_t i = 0; i < LOCAL_MARK_QUEUE_SHARE_AMOUNT; i++) - mark_deque_push(&marker->deque, local_mark_queue_pop(&mark->local)); - - pthread_mutex_unlock(&marker->deque_writer_lock); + mark_deque_push(mark->share_deque, local_mark_queue_pop(&mark->local)); } static inline void @@ -657,55 +453,114 @@ marker_visit(void **loc, void *mark_data) { struct local_marker *mark = mark_data; struct gcobj *obj = *loc; if (obj && mark_object(mark->cx, obj)) { - if (local_mark_queue_should_share(&mark->local)) + if (local_mark_queue_full(&mark->local)) marker_share(mark); local_mark_queue_push(&mark->local, (uintptr_t)obj); } } static uintptr_t -mark_worker_steal(struct local_marker *mark) { - DEBUG("marker %p: trying to steal\n", mark->worker); +marker_steal_from_worker(struct marker *marker, size_t id) { + ASSERT(id < marker->worker_count); while (1) { - uintptr_t addr = mark_deque_steal(mark->deque); - if (addr == mark_deque_empty) { - struct marker *marker = context_marker(mark->cx); - if (atomic_fetch_sub_explicit(&marker->active_markers, 1, - memory_order_relaxed) == 1) { - DEBUG(" ->> marker %p: DONE (no spinning) <<-\n", mark->worker); - return 0; - } - size_t spin_count = 0; - while (1) { - addr = mark_deque_steal(mark->deque); - if (addr != mark_deque_empty) { - DEBUG("marker %p: spinning got 0x%zx\n", mark->worker, addr); - atomic_fetch_add_explicit(&marker->active_markers, 1, - memory_order_relaxed); - break; - } - if (atomic_load_explicit(&marker->active_markers, - memory_order_relaxed) == 0) { - DEBUG(" ->> marker %p: DONE <<-\n", mark->worker); - return 0; - } - // spin - DEBUG("marker %p: spinning #%zu\n", mark->worker, spin_count); - if (spin_count < 10) - __builtin_ia32_pause(); - else if (spin_count < 20) - sched_yield(); - else if (spin_count < 40) - usleep(0); - else - usleep(1); - spin_count++; - } - } - DEBUG("marker %p: stealing got 0x%zx\n", mark->worker, addr); - if (addr == mark_deque_abort) + uintptr_t res = mark_deque_steal(&marker->workers[id].deque); + if (res == mark_deque_empty) + return 0; + if (res == mark_deque_abort) continue; - return addr; + return res; + } +} + +static uintptr_t +marker_can_steal_from_worker(struct marker *marker, size_t id) { + ASSERT(id < marker->worker_count); + return mark_deque_can_steal(&marker->workers[id].deque); +} + +static uintptr_t +mark_worker_steal_from_any(struct mark_worker *worker, struct marker *marker) { + size_t steal_id = worker->steal_id; + for (size_t i = 0; i < marker->worker_count; i++) { + steal_id = (steal_id + 1) % marker->worker_count; + DEBUG("marker #%zu: stealing from #%zu\n", worker->id, steal_id); + uintptr_t addr = marker_steal_from_worker(marker, steal_id); + if (addr) { + DEBUG("marker #%zu: stealing got 0x%zx\n", worker->id, addr); + worker->steal_id = steal_id; + return addr; + } + } + DEBUG("marker #%zu: failed to steal\n", worker->id); + return 0; +} + +static int +mark_worker_can_steal_from_any(struct mark_worker *worker, struct marker *marker) { + size_t steal_id = worker->steal_id; + DEBUG("marker #%zu: checking if any worker has tasks\n", worker->id); + for (size_t i = 0; i < marker->worker_count; i++) { + steal_id = (steal_id + 1) % marker->worker_count; + int res = marker_can_steal_from_worker(marker, steal_id); + if (res) { + DEBUG("marker #%zu: worker #%zu has tasks!\n", worker->id, steal_id); + worker->steal_id = steal_id; + return 1; + } + } + DEBUG("marker #%zu: nothing to steal\n", worker->id); + return 0; +} + +static int +mark_worker_check_termination(struct mark_worker *worker, + struct marker *marker) { + // We went around all workers and nothing. Enter termination phase. + if (atomic_fetch_sub_explicit(&marker->active_markers, 1, + memory_order_relaxed) == 1) { + DEBUG(" ->> marker #%zu: DONE (no spinning) <<-\n", worker->id); + return 1; + } + + size_t spin_count = 0; + while (1) { + if (mark_worker_can_steal_from_any(worker, marker)) { + atomic_fetch_add_explicit(&marker->active_markers, 1, + memory_order_relaxed); + return 0; + } + if (atomic_load_explicit(&marker->active_markers, + memory_order_relaxed) == 0) { + DEBUG(" ->> marker #%zu: DONE <<-\n", worker->id); + return 1; + } + // spin + DEBUG("marker #%zu: spinning #%zu\n", worker->id, spin_count); + if (spin_count < 10) + __builtin_ia32_pause(); + else if (spin_count < 20) + sched_yield(); + else if (spin_count < 40) + usleep(0); + else + usleep(1); + spin_count++; + } +} + +static uintptr_t +mark_worker_steal(struct local_marker *mark) { + struct marker *marker = context_marker(mark->cx); + struct mark_worker *worker = mark->worker; + + while (1) { + DEBUG("marker #%zu: trying to steal\n", worker->id); + uintptr_t addr = mark_worker_steal_from_any(worker, marker); + if (addr) + return addr; + + if (mark_worker_check_termination(worker, marker)) + return 0; } } @@ -713,12 +568,12 @@ static void mark_worker_mark(struct mark_worker *worker) { struct local_marker mark; mark.worker = worker; - mark.deque = &context_marker(worker->cx)->deque; + mark.share_deque = &worker->deque; mark.cx = worker->cx; local_mark_queue_init(&mark.local); size_t n = 0; - DEBUG("marker %p: running mark loop\n", worker); + DEBUG("marker #%zu: running mark loop\n", worker->id); while (1) { uintptr_t addr; if (!local_mark_queue_empty(&mark.local)) { @@ -731,7 +586,7 @@ mark_worker_mark(struct mark_worker *worker) { trace_one((struct gcobj*)addr, &mark); n++; } - DEBUG("marker %p: done marking, %zu objects traced\n", worker, n); + DEBUG("marker #%zu: done marking, %zu objects traced\n", worker->id, n); mark_worker_finished_marking(worker); } @@ -739,51 +594,36 @@ mark_worker_mark(struct mark_worker *worker) { static inline void marker_visit_root(void **loc, struct context *cx) { struct gcobj *obj = *loc; + struct mark_deque *worker0_deque = &context_marker(cx)->workers[0].deque; if (obj && mark_object(cx, obj)) - mark_deque_push(&context_marker(cx)->deque, (uintptr_t)obj); + mark_deque_push(worker0_deque, (uintptr_t)obj); } static inline void marker_trace(struct context *cx) { struct marker *marker = context_marker(cx); + pthread_mutex_lock(&marker->lock); + long mark_count = marker->count; + pthread_mutex_unlock(&marker->lock); + DEBUG("starting trace; %zu workers\n", marker->worker_count); - while (1) { - DEBUG("waking workers\n"); - atomic_store_explicit(&marker->active_markers, marker->worker_count, - memory_order_release); - for (size_t i = 0; i < marker->worker_count; i++) - mark_worker_request_mark(&marker->workers[i]); + DEBUG("waking workers\n"); + atomic_store_explicit(&marker->active_markers, marker->worker_count, + memory_order_release); + atomic_store_explicit(&marker->running_markers, marker->worker_count, + memory_order_release); + for (size_t i = 0; i < marker->worker_count; i++) + mark_worker_request_mark(&marker->workers[i]); - DEBUG("running controller loop\n"); - size_t n = 0; - while (1) { - DEBUG("controller: popping\n"); - uintptr_t addr = mark_channel_pop(&marker->overflow); - DEBUG("controller: popped 0x%zx\n", addr); - if (!addr) - break; - mark_deque_push(&marker->deque, addr); - DEBUG("controller: pushed to deque\n"); - n++; - } - DEBUG("controller loop done, %zu objects sent for rebalancing\n", n); + DEBUG("waiting on markers\n"); + + pthread_mutex_lock(&marker->lock); + while (marker->count <= mark_count) + pthread_cond_wait(&marker->cond, &marker->lock); + pthread_mutex_unlock(&marker->lock); - // As in the ISMM'16 paper, it's possible that a worker decides to - // stop because the deque is empty, but actually there was an - // in-flight object in the mark channel that we hadn't been able to - // push yet. Loop in that case. - { - uintptr_t addr = mark_deque_try_pop(&marker->deque); - if (addr == mark_deque_empty) - break; - DEBUG("--> controller looping again due to slop\n"); - mark_deque_push(&marker->deque, addr); - } - } - ASSERT(atomic_load(&marker->overflow.length) == 0); - ASSERT(atomic_load(&marker->overflow.head) == marker->overflow.tail); DEBUG("trace finished\n"); } -#endif // SERIAL_MARK_H +#endif // PARALLEL_MARKER_H