diff --git a/src/parallel-tracer.h b/src/parallel-tracer.h index 09b636f3f..b88f1e792 100644 --- a/src/parallel-tracer.h +++ b/src/parallel-tracer.h @@ -319,7 +319,6 @@ struct trace_worker { pthread_t thread; enum trace_worker_state state; pthread_mutex_t lock; - pthread_cond_t cond; struct trace_deque deque; }; @@ -328,8 +327,7 @@ struct trace_worker { struct tracer { atomic_size_t active_tracers; size_t worker_count; - atomic_size_t running_tracers; - long count; + long epoch; pthread_mutex_t lock; pthread_cond_t cond; struct trace_worker workers[TRACE_WORKERS_MAX_COUNT]; @@ -351,9 +349,7 @@ trace_worker_init(struct trace_worker *worker, struct gc_heap *heap, worker->id = id; worker->steal_id = 0; worker->thread = 0; - worker->state = TRACE_WORKER_STOPPED; pthread_mutex_init(&worker->lock, NULL); - pthread_cond_init(&worker->cond, NULL); return trace_deque_init(&worker->deque); } @@ -362,89 +358,46 @@ static void trace_worker_trace(struct trace_worker *worker); static void* trace_worker_thread(void *data) { struct trace_worker *worker = data; + struct tracer *tracer = heap_tracer(worker->heap); + long trace_epoch = 0; pthread_mutex_lock(&worker->lock); while (1) { - switch (worker->state) { - case TRACE_WORKER_IDLE: - pthread_cond_wait(&worker->cond, &worker->lock); - break; - case TRACE_WORKER_TRACING: + long epoch = atomic_load_explicit(&tracer->epoch, memory_order_acquire); + if (trace_epoch != epoch) { + trace_epoch = epoch; trace_worker_trace(worker); - worker->state = TRACE_WORKER_IDLE; - break; - case TRACE_WORKER_STOPPING: - worker->state = TRACE_WORKER_DEAD; - pthread_mutex_unlock(&worker->lock); - return NULL; - default: - GC_CRASH(); } + pthread_cond_wait(&tracer->cond, &worker->lock); } + return NULL; } static int trace_worker_spawn(struct trace_worker *worker) { - pthread_mutex_lock(&worker->lock); - ASSERT(worker->state == TRACE_WORKER_STOPPED); - worker->state = TRACE_WORKER_IDLE; - pthread_mutex_unlock(&worker->lock); - if (pthread_create(&worker->thread, NULL, trace_worker_thread, worker)) { perror("spawning tracer thread failed"); - worker->state = TRACE_WORKER_STOPPED; return 0; } return 1; } -static void -trace_worker_request_trace(struct trace_worker *worker) { - struct tracer *tracer = heap_tracer(worker->heap); - - pthread_mutex_lock(&worker->lock); - ASSERT(worker->state == TRACE_WORKER_IDLE); - worker->state = TRACE_WORKER_TRACING; - pthread_cond_signal(&worker->cond); - pthread_mutex_unlock(&worker->lock); -} - -static void -trace_worker_finished_tracing(struct trace_worker *worker) { - // Signal controller that we are done with tracing. - struct tracer *tracer = heap_tracer(worker->heap); - - if (atomic_fetch_sub(&tracer->running_tracers, 1) == 1) { - pthread_mutex_lock(&tracer->lock); - tracer->count++; - pthread_cond_signal(&tracer->cond); - pthread_mutex_unlock(&tracer->lock); - } -} - -static void -trace_worker_request_stop(struct trace_worker *worker) { - pthread_mutex_lock(&worker->lock); - ASSERT(worker->state == TRACE_WORKER_IDLE); - worker->state = TRACE_WORKER_STOPPING; - pthread_cond_signal(&worker->cond); - pthread_mutex_unlock(&worker->lock); -} - static int tracer_init(struct gc_heap *heap, size_t parallelism) { struct tracer *tracer = heap_tracer(heap); atomic_init(&tracer->active_tracers, 0); - atomic_init(&tracer->running_tracers, 0); - tracer->count = 0; + tracer->epoch = 0; pthread_mutex_init(&tracer->lock, NULL); pthread_cond_init(&tracer->cond, NULL); size_t desired_worker_count = parallelism; ASSERT(desired_worker_count); if (desired_worker_count > TRACE_WORKERS_MAX_COUNT) desired_worker_count = TRACE_WORKERS_MAX_COUNT; - for (size_t i = 0; i < desired_worker_count; i++) { + if (!trace_worker_init(&tracer->workers[0], heap, tracer, 0)) + return 0; + tracer->worker_count++; + for (size_t i = 1; i < desired_worker_count; i++) { if (!trace_worker_init(&tracer->workers[i], heap, tracer, i)) break; if (trace_worker_spawn(&tracer->workers[i])) @@ -452,7 +405,7 @@ tracer_init(struct gc_heap *heap, size_t parallelism) { else break; } - return tracer->worker_count > 0; + return 1; } static void tracer_prepare(struct gc_heap *heap) { @@ -466,6 +419,24 @@ static void tracer_release(struct gc_heap *heap) { trace_deque_release(&tracer->workers[i].deque); } +static inline void +tracer_unpark_all_workers(struct tracer *tracer) { + long old_epoch = + atomic_fetch_add_explicit(&tracer->epoch, 1, memory_order_acq_rel); + long epoch = old_epoch + 1; + DEBUG("starting trace; %zu workers; epoch=%ld\n", tracer->worker_count, + epoch); + pthread_cond_broadcast(&tracer->cond); +} + +static inline void +tracer_maybe_unpark_workers(struct tracer *tracer) { + size_t active = + atomic_load_explicit(&tracer->active_tracers, memory_order_acquire); + if (active < tracer->worker_count) + tracer_unpark_all_workers(tracer); +} + static inline void tracer_visit(struct gc_edge edge, struct gc_heap *heap, void *trace_data) GC_ALWAYS_INLINE; static inline void tracer_enqueue(struct gc_ref ref, struct gc_heap *heap, @@ -485,6 +456,7 @@ tracer_share(struct local_tracer *trace) { trace_deque_push_many(trace->share_deque, objv, count); to_share -= count; } + tracer_maybe_unpark_workers(heap_tracer(trace->worker->heap)); } static inline void @@ -501,16 +473,6 @@ tracer_visit(struct gc_edge edge, struct gc_heap *heap, void *trace_data) { tracer_enqueue(gc_edge_ref(edge), heap, trace_data); } -static inline void -tracer_visit_(struct gc_edge edge, struct gc_heap *heap, void *trace_data) { - if (trace_edge(heap, edge)) { - struct local_tracer *trace = trace_data; - if (local_trace_queue_full(&trace->local)) - tracer_share(trace); - local_trace_queue_push(&trace->local, gc_edge_ref(edge)); - } -} - static struct gc_ref tracer_steal_from_worker(struct tracer *tracer, size_t id) { ASSERT(id < tracer->worker_count); @@ -556,28 +518,35 @@ trace_worker_can_steal_from_any(struct trace_worker *worker, struct tracer *trac } static int -trace_worker_check_termination(struct trace_worker *worker, - struct tracer *tracer) { - // We went around all workers and nothing. Enter termination phase. - if (atomic_fetch_sub_explicit(&tracer->active_tracers, 1, - memory_order_relaxed) == 1) { - DEBUG(" ->> tracer #%zu: DONE (no spinning) <<-\n", worker->id); - return 1; - } +trace_worker_should_continue(struct trace_worker *worker) { + // Helper workers should park themselves immediately if they have no work. + if (worker->id != 0) + return 0; + + struct tracer *tracer = heap_tracer(worker->heap); for (size_t spin_count = 0;; spin_count++) { - if (trace_worker_can_steal_from_any(worker, tracer)) { - atomic_fetch_add_explicit(&tracer->active_tracers, 1, - memory_order_relaxed); - return 0; - } if (atomic_load_explicit(&tracer->active_tracers, - memory_order_relaxed) == 0) { - DEBUG(" ->> tracer #%zu: DONE <<-\n", worker->id); - return 1; + memory_order_acquire) == 1) { + // All trace workers have exited except us, the main worker. We are + // probably done, but we need to synchronize to be sure that there is no + // work pending, for example if a worker had a spurious wakeup. Skip + // worker 0 (the main worker). + size_t locked = 1; + while (locked < tracer->worker_count) { + if (pthread_mutex_trylock(&tracer->workers[locked].lock) == 0) + locked++; + else + break; + } + int done = (locked == tracer->worker_count) && + !trace_worker_can_steal_from_any(worker, tracer); + while (locked > 1) + pthread_mutex_unlock(&tracer->workers[--locked].lock); + return !done; } // spin - DEBUG("tracer #%zu: spinning #%zu\n", worker->id, spin_count); + DEBUG("checking for termination: spinning #%zu\n", spin_count); yield_for_spin(spin_count); } } @@ -597,42 +566,46 @@ trace_worker_steal(struct local_tracer *trace) { return obj; } - while (1) { - DEBUG("tracer #%zu: trying to steal\n", worker->id); - struct gc_ref obj = trace_worker_steal_from_any(worker, tracer); - if (gc_ref_is_heap_object(obj)) - return obj; + DEBUG("tracer #%zu: trying to steal\n", worker->id); + struct gc_ref obj = trace_worker_steal_from_any(worker, tracer); + if (gc_ref_is_heap_object(obj)) + return obj; - if (trace_worker_check_termination(worker, tracer)) - return gc_ref_null(); - } + return gc_ref_null(); } static void trace_worker_trace(struct trace_worker *worker) { + struct gc_heap *heap = worker->heap; + struct tracer *tracer = heap_tracer(heap); + atomic_fetch_add_explicit(&tracer->active_tracers, 1, memory_order_acq_rel); + struct local_tracer trace; trace.worker = worker; trace.share_deque = &worker->deque; - struct gc_heap *heap = worker->heap; local_trace_queue_init(&trace.local); size_t n = 0; DEBUG("tracer #%zu: running trace loop\n", worker->id); - while (1) { - struct gc_ref ref; - if (!local_trace_queue_empty(&trace.local)) { - ref = local_trace_queue_pop(&trace.local); - } else { - ref = trace_worker_steal(&trace); - if (!gc_ref_is_heap_object(ref)) - break; + + do { + while (1) { + struct gc_ref ref; + if (!local_trace_queue_empty(&trace.local)) { + ref = local_trace_queue_pop(&trace.local); + } else { + ref = trace_worker_steal(&trace); + if (!gc_ref_is_heap_object(ref)) + break; + } + trace_one(ref, heap, &trace); + n++; } - trace_one(ref, heap, &trace); - n++; - } + } while (trace_worker_should_continue(worker)); + DEBUG("tracer #%zu: done tracing, %zu objects traced\n", worker->id, n); - trace_worker_finished_tracing(worker); + atomic_fetch_sub_explicit(&tracer->active_tracers, 1, memory_order_acq_rel); } static inline void @@ -652,25 +625,18 @@ static inline void tracer_trace(struct gc_heap *heap) { struct tracer *tracer = heap_tracer(heap); - pthread_mutex_lock(&tracer->lock); - long trace_count = tracer->count; - pthread_mutex_unlock(&tracer->lock); - DEBUG("starting trace; %zu workers\n", tracer->worker_count); - DEBUG("waking workers\n"); - atomic_store_explicit(&tracer->active_tracers, tracer->worker_count, - memory_order_release); - atomic_store_explicit(&tracer->running_tracers, tracer->worker_count, - memory_order_release); - for (size_t i = 0; i < tracer->worker_count; i++) - trace_worker_request_trace(&tracer->workers[i]); - DEBUG("waiting on tracers\n"); + ssize_t parallel_threshold = + LOCAL_TRACE_QUEUE_SIZE - LOCAL_TRACE_QUEUE_SHARE_AMOUNT; + if (trace_deque_size(&tracer->workers[0].deque) >= parallel_threshold) { + DEBUG("waking workers\n"); + tracer_unpark_all_workers(tracer); + } else { + DEBUG("starting in local-only mode\n"); + } - pthread_mutex_lock(&tracer->lock); - while (tracer->count <= trace_count) - pthread_cond_wait(&tracer->cond, &tracer->lock); - pthread_mutex_unlock(&tracer->lock); + trace_worker_trace(&tracer->workers[0]); DEBUG("trace finished\n"); }