#ifndef PARALLEL_TRACER_H #define PARALLEL_TRACER_H #include #include #include #include #include "assert.h" #include "debug.h" #include "inline.h" #include "spin.h" // The Chase-Lev work-stealing deque, as initially described in "Dynamic // Circular Work-Stealing Deque" (Chase and Lev, SPAA'05) // (https://www.dre.vanderbilt.edu/~schmidt/PDF/work-stealing-dequeue.pdf) // and improved with C11 atomics in "Correct and Efficient Work-Stealing // for Weak Memory Models" (LĂȘ et al, PPoPP'13) // (http://www.di.ens.fr/%7Ezappa/readings/ppopp13.pdf). struct gcobj; struct trace_buf { unsigned log_size; size_t size; struct gcobj **data; }; // Min size: 8 kB on 64-bit systems, 4 kB on 32-bit. #define trace_buf_min_log_size ((unsigned) 10) // Max size: 2 GB on 64-bit systems, 1 GB on 32-bit. #define trace_buf_max_log_size ((unsigned) 28) static int trace_buf_init(struct trace_buf *buf, unsigned log_size) { ASSERT(log_size >= trace_buf_min_log_size); ASSERT(log_size <= trace_buf_max_log_size); size_t size = (1 << log_size) * sizeof(struct gcobj *); void *mem = mmap(NULL, size, PROT_READ|PROT_WRITE, MAP_PRIVATE|MAP_ANONYMOUS, -1, 0); if (mem == MAP_FAILED) { perror("Failed to grow work-stealing dequeue"); DEBUG("Failed to allocate %zu bytes", size); return 0; } buf->log_size = log_size; buf->size = 1 << log_size; buf->data = mem; return 1; } static inline size_t trace_buf_size(struct trace_buf *buf) { return buf->size; } static inline size_t trace_buf_byte_size(struct trace_buf *buf) { return trace_buf_size(buf) * sizeof(struct gcobj *); } static void trace_buf_release(struct trace_buf *buf) { if (buf->data) madvise(buf->data, trace_buf_byte_size(buf), MADV_DONTNEED); } static void trace_buf_destroy(struct trace_buf *buf) { if (buf->data) { munmap(buf->data, trace_buf_byte_size(buf)); buf->data = NULL; buf->log_size = 0; buf->size = 0; } } static inline struct gcobj * trace_buf_get(struct trace_buf *buf, size_t i) { return atomic_load_explicit(&buf->data[i & (buf->size - 1)], memory_order_relaxed); } static inline void trace_buf_put(struct trace_buf *buf, size_t i, struct gcobj * o) { return atomic_store_explicit(&buf->data[i & (buf->size - 1)], o, memory_order_relaxed); } static inline int trace_buf_grow(struct trace_buf *from, struct trace_buf *to, size_t b, size_t t) { if (from->log_size == trace_buf_max_log_size) return 0; if (!trace_buf_init (to, from->log_size + 1)) return 0; for (size_t i=t; ibufs[0], trace_buf_min_log_size); // Note, this fence isn't in the paper, I added it out of caution. atomic_thread_fence(memory_order_release); return ret; } static void trace_deque_release(struct trace_deque *q) { for (int i = LOAD_RELAXED(&q->active); i >= 0; i--) trace_buf_release(&q->bufs[i]); } static void trace_deque_destroy(struct trace_deque *q) { for (int i = LOAD_RELAXED(&q->active); i >= 0; i--) trace_buf_destroy(&q->bufs[i]); } static int trace_deque_grow(struct trace_deque *q, int cur, size_t b, size_t t) { if (!trace_buf_grow(&q->bufs[cur], &q->bufs[cur + 1], b, t)) { fprintf(stderr, "failed to grow deque!!\n"); abort(); } cur++; STORE_RELAXED(&q->active, cur); return cur; } static void trace_deque_push(struct trace_deque *q, struct gcobj * x) { size_t b = LOAD_RELAXED(&q->bottom); size_t t = LOAD_ACQUIRE(&q->top); int active = LOAD_RELAXED(&q->active); if (b - t > trace_buf_size(&q->bufs[active]) - 1) /* Full queue. */ active = trace_deque_grow(q, active, b, t); trace_buf_put(&q->bufs[active], b, x); atomic_thread_fence(memory_order_release); STORE_RELAXED(&q->bottom, b + 1); } static void trace_deque_push_many(struct trace_deque *q, struct gcobj **objv, size_t count) { size_t b = LOAD_RELAXED(&q->bottom); size_t t = LOAD_ACQUIRE(&q->top); int active = LOAD_RELAXED(&q->active); while (b - t > trace_buf_size(&q->bufs[active]) - count) /* Full queue. */ active = trace_deque_grow(q, active, b, t); for (size_t i = 0; i < count; i++) trace_buf_put(&q->bufs[active], b + i, objv[i]); atomic_thread_fence(memory_order_release); STORE_RELAXED(&q->bottom, b + count); } static struct gcobj * trace_deque_try_pop(struct trace_deque *q) { size_t b = LOAD_RELAXED(&q->bottom); b = b - 1; int active = LOAD_RELAXED(&q->active); STORE_RELAXED(&q->bottom, b); atomic_thread_fence(memory_order_seq_cst); size_t t = LOAD_RELAXED(&q->top); struct gcobj * x; if (t <= b) { // Non-empty queue. x = trace_buf_get(&q->bufs[active], b); if (t == b) { // Single last element in queue. if (!atomic_compare_exchange_strong_explicit(&q->top, &t, t + 1, memory_order_seq_cst, memory_order_relaxed)) // Failed race. x = NULL; STORE_RELAXED(&q->bottom, b + 1); } } else { // Empty queue. x = NULL; STORE_RELAXED(&q->bottom, b + 1); } return x; } static struct gcobj * trace_deque_steal(struct trace_deque *q) { while (1) { size_t t = LOAD_ACQUIRE(&q->top); atomic_thread_fence(memory_order_seq_cst); size_t b = LOAD_ACQUIRE(&q->bottom); if (t >= b) return NULL; int active = LOAD_CONSUME(&q->active); struct gcobj *x = x = trace_buf_get(&q->bufs[active], t); if (!atomic_compare_exchange_strong_explicit(&q->top, &t, t + 1, memory_order_seq_cst, memory_order_relaxed)) // Failed race. continue; return x; } } static int trace_deque_can_steal(struct trace_deque *q) { size_t t = LOAD_ACQUIRE(&q->top); atomic_thread_fence(memory_order_seq_cst); size_t b = LOAD_ACQUIRE(&q->bottom); return t < b; } #undef LOAD_RELAXED #undef STORE_RELAXED #undef LOAD_ACQUIRE #undef STORE_RELEASE #undef LOAD_CONSUME #define LOCAL_TRACE_QUEUE_SIZE 1024 #define LOCAL_TRACE_QUEUE_MASK (LOCAL_TRACE_QUEUE_SIZE - 1) #define LOCAL_TRACE_QUEUE_SHARE_AMOUNT (LOCAL_TRACE_QUEUE_SIZE * 3 / 4) struct local_trace_queue { size_t read; size_t write; struct gcobj * data[LOCAL_TRACE_QUEUE_SIZE]; }; static inline void local_trace_queue_init(struct local_trace_queue *q) { q->read = q->write = 0; } static inline void local_trace_queue_poison(struct local_trace_queue *q) { q->read = 0; q->write = LOCAL_TRACE_QUEUE_SIZE; } static inline size_t local_trace_queue_size(struct local_trace_queue *q) { return q->write - q->read; } static inline int local_trace_queue_empty(struct local_trace_queue *q) { return local_trace_queue_size(q) == 0; } static inline int local_trace_queue_full(struct local_trace_queue *q) { return local_trace_queue_size(q) >= LOCAL_TRACE_QUEUE_SIZE; } static inline void local_trace_queue_push(struct local_trace_queue *q, struct gcobj * v) { q->data[q->write++ & LOCAL_TRACE_QUEUE_MASK] = v; } static inline struct gcobj * local_trace_queue_pop(struct local_trace_queue *q) { return q->data[q->read++ & LOCAL_TRACE_QUEUE_MASK]; } enum trace_worker_state { TRACE_WORKER_STOPPED, TRACE_WORKER_IDLE, TRACE_WORKER_TRACING, TRACE_WORKER_STOPPING, TRACE_WORKER_DEAD }; struct heap; struct trace_worker { struct heap *heap; size_t id; size_t steal_id; pthread_t thread; enum trace_worker_state state; pthread_mutex_t lock; pthread_cond_t cond; struct trace_deque deque; }; #define TRACE_WORKERS_MAX_COUNT 8 struct tracer { atomic_size_t active_tracers; size_t worker_count; atomic_size_t running_tracers; long count; pthread_mutex_t lock; pthread_cond_t cond; struct trace_worker workers[TRACE_WORKERS_MAX_COUNT]; }; struct local_tracer { struct trace_worker *worker; struct trace_deque *share_deque; struct heap *heap; struct local_trace_queue local; }; struct context; static inline struct tracer* heap_tracer(struct heap *heap); static size_t number_of_current_processors(void) { return 1; } static int trace_worker_init(struct trace_worker *worker, struct heap *heap, struct tracer *tracer, size_t id) { worker->heap = heap; worker->id = id; worker->steal_id = 0; worker->thread = 0; worker->state = TRACE_WORKER_STOPPED; pthread_mutex_init(&worker->lock, NULL); pthread_cond_init(&worker->cond, NULL); return trace_deque_init(&worker->deque); } static void trace_worker_trace(struct trace_worker *worker); static void* trace_worker_thread(void *data) { struct trace_worker *worker = data; pthread_mutex_lock(&worker->lock); while (1) { switch (worker->state) { case TRACE_WORKER_IDLE: pthread_cond_wait(&worker->cond, &worker->lock); break; case TRACE_WORKER_TRACING: trace_worker_trace(worker); worker->state = TRACE_WORKER_IDLE; break; case TRACE_WORKER_STOPPING: worker->state = TRACE_WORKER_DEAD; pthread_mutex_unlock(&worker->lock); return NULL; default: abort(); } } } static int trace_worker_spawn(struct trace_worker *worker) { pthread_mutex_lock(&worker->lock); ASSERT(worker->state == TRACE_WORKER_STOPPED); worker->state = TRACE_WORKER_IDLE; pthread_mutex_unlock(&worker->lock); if (pthread_create(&worker->thread, NULL, trace_worker_thread, worker)) { perror("spawning tracer thread failed"); worker->state = TRACE_WORKER_STOPPED; return 0; } return 1; } static void trace_worker_request_trace(struct trace_worker *worker) { struct tracer *tracer = heap_tracer(worker->heap); pthread_mutex_lock(&worker->lock); ASSERT(worker->state == TRACE_WORKER_IDLE); worker->state = TRACE_WORKER_TRACING; pthread_cond_signal(&worker->cond); pthread_mutex_unlock(&worker->lock); } static void trace_worker_finished_tracing(struct trace_worker *worker) { // Signal controller that we are done with tracing. struct tracer *tracer = heap_tracer(worker->heap); if (atomic_fetch_sub(&tracer->running_tracers, 1) == 1) { pthread_mutex_lock(&tracer->lock); tracer->count++; pthread_cond_signal(&tracer->cond); pthread_mutex_unlock(&tracer->lock); } } static void trace_worker_request_stop(struct trace_worker *worker) { pthread_mutex_lock(&worker->lock); ASSERT(worker->state == TRACE_WORKER_IDLE); worker->state = TRACE_WORKER_STOPPING; pthread_cond_signal(&worker->cond); pthread_mutex_unlock(&worker->lock); } static int tracer_init(struct heap *heap) { struct tracer *tracer = heap_tracer(heap); atomic_init(&tracer->active_tracers, 0); atomic_init(&tracer->running_tracers, 0); tracer->count = 0; pthread_mutex_init(&tracer->lock, NULL); pthread_cond_init(&tracer->cond, NULL); size_t desired_worker_count = 0; if (getenv("GC_TRACERS")) desired_worker_count = atoi(getenv("GC_TRACERS")); if (desired_worker_count == 0) desired_worker_count = number_of_current_processors(); if (desired_worker_count > TRACE_WORKERS_MAX_COUNT) desired_worker_count = TRACE_WORKERS_MAX_COUNT; for (size_t i = 0; i < desired_worker_count; i++) { if (!trace_worker_init(&tracer->workers[i], heap, tracer, i)) break; if (trace_worker_spawn(&tracer->workers[i])) tracer->worker_count++; else break; } return tracer->worker_count > 0; } static void tracer_prepare(struct heap *heap) { struct tracer *tracer = heap_tracer(heap); for (size_t i = 0; i < tracer->worker_count; i++) tracer->workers[i].steal_id = 0; } static void tracer_release(struct heap *heap) { struct tracer *tracer = heap_tracer(heap); for (size_t i = 0; i < tracer->worker_count; i++) trace_deque_release(&tracer->workers[i].deque); } struct gcobj; static inline void tracer_visit(struct gc_edge edge, void *trace_data) ALWAYS_INLINE; static inline void trace_one(struct gcobj *obj, void *trace_data) ALWAYS_INLINE; static inline int trace_edge(struct heap *heap, struct gc_edge edge) ALWAYS_INLINE; static inline void tracer_share(struct local_tracer *trace) { DEBUG("tracer #%zu: sharing\n", trace->worker->id); for (size_t i = 0; i < LOCAL_TRACE_QUEUE_SHARE_AMOUNT; i++) trace_deque_push(trace->share_deque, local_trace_queue_pop(&trace->local)); } static inline void tracer_visit(struct gc_edge edge, void *trace_data) { struct local_tracer *trace = trace_data; if (trace_edge(trace->heap, edge)) { if (local_trace_queue_full(&trace->local)) tracer_share(trace); local_trace_queue_push(&trace->local, dereference_edge(edge)); } } static struct gcobj * tracer_steal_from_worker(struct tracer *tracer, size_t id) { ASSERT(id < tracer->worker_count); return trace_deque_steal(&tracer->workers[id].deque); } static int tracer_can_steal_from_worker(struct tracer *tracer, size_t id) { ASSERT(id < tracer->worker_count); return trace_deque_can_steal(&tracer->workers[id].deque); } static struct gcobj * trace_worker_steal_from_any(struct trace_worker *worker, struct tracer *tracer) { size_t steal_id = worker->steal_id; for (size_t i = 0; i < tracer->worker_count; i++) { steal_id = (steal_id + 1) % tracer->worker_count; DEBUG("tracer #%zu: stealing from #%zu\n", worker->id, steal_id); struct gcobj * obj = tracer_steal_from_worker(tracer, steal_id); if (obj) { DEBUG("tracer #%zu: stealing got %p\n", worker->id, obj); worker->steal_id = steal_id; return obj; } } DEBUG("tracer #%zu: failed to steal\n", worker->id); return 0; } static int trace_worker_can_steal_from_any(struct trace_worker *worker, struct tracer *tracer) { size_t steal_id = worker->steal_id; DEBUG("tracer #%zu: checking if any worker has tasks\n", worker->id); for (size_t i = 0; i < tracer->worker_count; i++) { steal_id = (steal_id + 1) % tracer->worker_count; int res = tracer_can_steal_from_worker(tracer, steal_id); if (res) { DEBUG("tracer #%zu: worker #%zu has tasks!\n", worker->id, steal_id); worker->steal_id = steal_id; return 1; } } DEBUG("tracer #%zu: nothing to steal\n", worker->id); return 0; } static int trace_worker_check_termination(struct trace_worker *worker, struct tracer *tracer) { // We went around all workers and nothing. Enter termination phase. if (atomic_fetch_sub_explicit(&tracer->active_tracers, 1, memory_order_relaxed) == 1) { DEBUG(" ->> tracer #%zu: DONE (no spinning) <<-\n", worker->id); return 1; } for (size_t spin_count = 0;; spin_count++) { if (trace_worker_can_steal_from_any(worker, tracer)) { atomic_fetch_add_explicit(&tracer->active_tracers, 1, memory_order_relaxed); return 0; } if (atomic_load_explicit(&tracer->active_tracers, memory_order_relaxed) == 0) { DEBUG(" ->> tracer #%zu: DONE <<-\n", worker->id); return 1; } // spin DEBUG("tracer #%zu: spinning #%zu\n", worker->id, spin_count); yield_for_spin(spin_count); } } static struct gcobj * trace_worker_steal(struct local_tracer *trace) { struct tracer *tracer = heap_tracer(trace->heap); struct trace_worker *worker = trace->worker; while (1) { DEBUG("tracer #%zu: trying to steal\n", worker->id); struct gcobj *obj = trace_worker_steal_from_any(worker, tracer); if (obj) return obj; if (trace_worker_check_termination(worker, tracer)) return NULL; } } static void trace_worker_trace(struct trace_worker *worker) { struct local_tracer trace; trace.worker = worker; trace.share_deque = &worker->deque; trace.heap = worker->heap; local_trace_queue_init(&trace.local); size_t n = 0; DEBUG("tracer #%zu: running trace loop\n", worker->id); while (1) { struct gcobj * obj; if (!local_trace_queue_empty(&trace.local)) { obj = local_trace_queue_pop(&trace.local); } else { obj = trace_worker_steal(&trace); if (!obj) break; } trace_one(obj, &trace); n++; } DEBUG("tracer #%zu: done tracing, %zu objects traced\n", worker->id, n); trace_worker_finished_tracing(worker); } static inline void tracer_enqueue_root(struct tracer *tracer, struct gcobj *obj) { struct trace_deque *worker0_deque = &tracer->workers[0].deque; trace_deque_push(worker0_deque, obj); } static inline void tracer_enqueue_roots(struct tracer *tracer, struct gcobj **objv, size_t count) { struct trace_deque *worker0_deque = &tracer->workers[0].deque; trace_deque_push_many(worker0_deque, objv, count); } static inline void tracer_trace(struct heap *heap) { struct tracer *tracer = heap_tracer(heap); pthread_mutex_lock(&tracer->lock); long trace_count = tracer->count; pthread_mutex_unlock(&tracer->lock); DEBUG("starting trace; %zu workers\n", tracer->worker_count); DEBUG("waking workers\n"); atomic_store_explicit(&tracer->active_tracers, tracer->worker_count, memory_order_release); atomic_store_explicit(&tracer->running_tracers, tracer->worker_count, memory_order_release); for (size_t i = 0; i < tracer->worker_count; i++) trace_worker_request_trace(&tracer->workers[i]); DEBUG("waiting on tracers\n"); pthread_mutex_lock(&tracer->lock); while (tracer->count <= trace_count) pthread_cond_wait(&tracer->cond, &tracer->lock); pthread_mutex_unlock(&tracer->lock); DEBUG("trace finished\n"); } #endif // PARALLEL_TRACER_H