1
Fork 0
mirror of https://git.savannah.gnu.org/git/guile.git synced 2025-05-20 11:40:18 +02:00

Rework parallel tracing state machine

Instead of sending a message to each worker, we pthread_cond_broadcast
at the start.  Instead of having N worker threads, we have N-1 threads
and the main thread also does work.  Instead of termination being
detected by the worker threads, let the main thread detect it.  Avoid
parallelism if the mark stack is small enough, which can be the case for
ephemeron chains.  Let aux threads exit when they have no work instead
of spinning: sharing will start them up again.
This commit is contained in:
Andy Wingo 2023-10-23 11:26:33 +02:00
parent 9cc12916a9
commit 3d2a12c684

View file

@ -319,7 +319,6 @@ struct trace_worker {
pthread_t thread;
enum trace_worker_state state;
pthread_mutex_t lock;
pthread_cond_t cond;
struct trace_deque deque;
};
@ -328,8 +327,7 @@ struct trace_worker {
struct tracer {
atomic_size_t active_tracers;
size_t worker_count;
atomic_size_t running_tracers;
long count;
long epoch;
pthread_mutex_t lock;
pthread_cond_t cond;
struct trace_worker workers[TRACE_WORKERS_MAX_COUNT];
@ -351,9 +349,7 @@ trace_worker_init(struct trace_worker *worker, struct gc_heap *heap,
worker->id = id;
worker->steal_id = 0;
worker->thread = 0;
worker->state = TRACE_WORKER_STOPPED;
pthread_mutex_init(&worker->lock, NULL);
pthread_cond_init(&worker->cond, NULL);
return trace_deque_init(&worker->deque);
}
@ -362,89 +358,46 @@ static void trace_worker_trace(struct trace_worker *worker);
static void*
trace_worker_thread(void *data) {
struct trace_worker *worker = data;
struct tracer *tracer = heap_tracer(worker->heap);
long trace_epoch = 0;
pthread_mutex_lock(&worker->lock);
while (1) {
switch (worker->state) {
case TRACE_WORKER_IDLE:
pthread_cond_wait(&worker->cond, &worker->lock);
break;
case TRACE_WORKER_TRACING:
long epoch = atomic_load_explicit(&tracer->epoch, memory_order_acquire);
if (trace_epoch != epoch) {
trace_epoch = epoch;
trace_worker_trace(worker);
worker->state = TRACE_WORKER_IDLE;
break;
case TRACE_WORKER_STOPPING:
worker->state = TRACE_WORKER_DEAD;
pthread_mutex_unlock(&worker->lock);
}
pthread_cond_wait(&tracer->cond, &worker->lock);
}
return NULL;
default:
GC_CRASH();
}
}
}
static int
trace_worker_spawn(struct trace_worker *worker) {
pthread_mutex_lock(&worker->lock);
ASSERT(worker->state == TRACE_WORKER_STOPPED);
worker->state = TRACE_WORKER_IDLE;
pthread_mutex_unlock(&worker->lock);
if (pthread_create(&worker->thread, NULL, trace_worker_thread, worker)) {
perror("spawning tracer thread failed");
worker->state = TRACE_WORKER_STOPPED;
return 0;
}
return 1;
}
static void
trace_worker_request_trace(struct trace_worker *worker) {
struct tracer *tracer = heap_tracer(worker->heap);
pthread_mutex_lock(&worker->lock);
ASSERT(worker->state == TRACE_WORKER_IDLE);
worker->state = TRACE_WORKER_TRACING;
pthread_cond_signal(&worker->cond);
pthread_mutex_unlock(&worker->lock);
}
static void
trace_worker_finished_tracing(struct trace_worker *worker) {
// Signal controller that we are done with tracing.
struct tracer *tracer = heap_tracer(worker->heap);
if (atomic_fetch_sub(&tracer->running_tracers, 1) == 1) {
pthread_mutex_lock(&tracer->lock);
tracer->count++;
pthread_cond_signal(&tracer->cond);
pthread_mutex_unlock(&tracer->lock);
}
}
static void
trace_worker_request_stop(struct trace_worker *worker) {
pthread_mutex_lock(&worker->lock);
ASSERT(worker->state == TRACE_WORKER_IDLE);
worker->state = TRACE_WORKER_STOPPING;
pthread_cond_signal(&worker->cond);
pthread_mutex_unlock(&worker->lock);
}
static int
tracer_init(struct gc_heap *heap, size_t parallelism) {
struct tracer *tracer = heap_tracer(heap);
atomic_init(&tracer->active_tracers, 0);
atomic_init(&tracer->running_tracers, 0);
tracer->count = 0;
tracer->epoch = 0;
pthread_mutex_init(&tracer->lock, NULL);
pthread_cond_init(&tracer->cond, NULL);
size_t desired_worker_count = parallelism;
ASSERT(desired_worker_count);
if (desired_worker_count > TRACE_WORKERS_MAX_COUNT)
desired_worker_count = TRACE_WORKERS_MAX_COUNT;
for (size_t i = 0; i < desired_worker_count; i++) {
if (!trace_worker_init(&tracer->workers[0], heap, tracer, 0))
return 0;
tracer->worker_count++;
for (size_t i = 1; i < desired_worker_count; i++) {
if (!trace_worker_init(&tracer->workers[i], heap, tracer, i))
break;
if (trace_worker_spawn(&tracer->workers[i]))
@ -452,7 +405,7 @@ tracer_init(struct gc_heap *heap, size_t parallelism) {
else
break;
}
return tracer->worker_count > 0;
return 1;
}
static void tracer_prepare(struct gc_heap *heap) {
@ -466,6 +419,24 @@ static void tracer_release(struct gc_heap *heap) {
trace_deque_release(&tracer->workers[i].deque);
}
static inline void
tracer_unpark_all_workers(struct tracer *tracer) {
long old_epoch =
atomic_fetch_add_explicit(&tracer->epoch, 1, memory_order_acq_rel);
long epoch = old_epoch + 1;
DEBUG("starting trace; %zu workers; epoch=%ld\n", tracer->worker_count,
epoch);
pthread_cond_broadcast(&tracer->cond);
}
static inline void
tracer_maybe_unpark_workers(struct tracer *tracer) {
size_t active =
atomic_load_explicit(&tracer->active_tracers, memory_order_acquire);
if (active < tracer->worker_count)
tracer_unpark_all_workers(tracer);
}
static inline void tracer_visit(struct gc_edge edge, struct gc_heap *heap,
void *trace_data) GC_ALWAYS_INLINE;
static inline void tracer_enqueue(struct gc_ref ref, struct gc_heap *heap,
@ -485,6 +456,7 @@ tracer_share(struct local_tracer *trace) {
trace_deque_push_many(trace->share_deque, objv, count);
to_share -= count;
}
tracer_maybe_unpark_workers(heap_tracer(trace->worker->heap));
}
static inline void
@ -501,16 +473,6 @@ tracer_visit(struct gc_edge edge, struct gc_heap *heap, void *trace_data) {
tracer_enqueue(gc_edge_ref(edge), heap, trace_data);
}
static inline void
tracer_visit_(struct gc_edge edge, struct gc_heap *heap, void *trace_data) {
if (trace_edge(heap, edge)) {
struct local_tracer *trace = trace_data;
if (local_trace_queue_full(&trace->local))
tracer_share(trace);
local_trace_queue_push(&trace->local, gc_edge_ref(edge));
}
}
static struct gc_ref
tracer_steal_from_worker(struct tracer *tracer, size_t id) {
ASSERT(id < tracer->worker_count);
@ -556,28 +518,35 @@ trace_worker_can_steal_from_any(struct trace_worker *worker, struct tracer *trac
}
static int
trace_worker_check_termination(struct trace_worker *worker,
struct tracer *tracer) {
// We went around all workers and nothing. Enter termination phase.
if (atomic_fetch_sub_explicit(&tracer->active_tracers, 1,
memory_order_relaxed) == 1) {
DEBUG(" ->> tracer #%zu: DONE (no spinning) <<-\n", worker->id);
return 1;
}
trace_worker_should_continue(struct trace_worker *worker) {
// Helper workers should park themselves immediately if they have no work.
if (worker->id != 0)
return 0;
struct tracer *tracer = heap_tracer(worker->heap);
for (size_t spin_count = 0;; spin_count++) {
if (trace_worker_can_steal_from_any(worker, tracer)) {
atomic_fetch_add_explicit(&tracer->active_tracers, 1,
memory_order_relaxed);
return 0;
}
if (atomic_load_explicit(&tracer->active_tracers,
memory_order_relaxed) == 0) {
DEBUG(" ->> tracer #%zu: DONE <<-\n", worker->id);
return 1;
memory_order_acquire) == 1) {
// All trace workers have exited except us, the main worker. We are
// probably done, but we need to synchronize to be sure that there is no
// work pending, for example if a worker had a spurious wakeup. Skip
// worker 0 (the main worker).
size_t locked = 1;
while (locked < tracer->worker_count) {
if (pthread_mutex_trylock(&tracer->workers[locked].lock) == 0)
locked++;
else
break;
}
int done = (locked == tracer->worker_count) &&
!trace_worker_can_steal_from_any(worker, tracer);
while (locked > 1)
pthread_mutex_unlock(&tracer->workers[--locked].lock);
return !done;
}
// spin
DEBUG("tracer #%zu: spinning #%zu\n", worker->id, spin_count);
DEBUG("checking for termination: spinning #%zu\n", spin_count);
yield_for_spin(spin_count);
}
}
@ -597,27 +566,29 @@ trace_worker_steal(struct local_tracer *trace) {
return obj;
}
while (1) {
DEBUG("tracer #%zu: trying to steal\n", worker->id);
struct gc_ref obj = trace_worker_steal_from_any(worker, tracer);
if (gc_ref_is_heap_object(obj))
return obj;
if (trace_worker_check_termination(worker, tracer))
return gc_ref_null();
}
}
static void
trace_worker_trace(struct trace_worker *worker) {
struct gc_heap *heap = worker->heap;
struct tracer *tracer = heap_tracer(heap);
atomic_fetch_add_explicit(&tracer->active_tracers, 1, memory_order_acq_rel);
struct local_tracer trace;
trace.worker = worker;
trace.share_deque = &worker->deque;
struct gc_heap *heap = worker->heap;
local_trace_queue_init(&trace.local);
size_t n = 0;
DEBUG("tracer #%zu: running trace loop\n", worker->id);
do {
while (1) {
struct gc_ref ref;
if (!local_trace_queue_empty(&trace.local)) {
@ -630,9 +601,11 @@ trace_worker_trace(struct trace_worker *worker) {
trace_one(ref, heap, &trace);
n++;
}
} while (trace_worker_should_continue(worker));
DEBUG("tracer #%zu: done tracing, %zu objects traced\n", worker->id, n);
trace_worker_finished_tracing(worker);
atomic_fetch_sub_explicit(&tracer->active_tracers, 1, memory_order_acq_rel);
}
static inline void
@ -652,25 +625,18 @@ static inline void
tracer_trace(struct gc_heap *heap) {
struct tracer *tracer = heap_tracer(heap);
pthread_mutex_lock(&tracer->lock);
long trace_count = tracer->count;
pthread_mutex_unlock(&tracer->lock);
DEBUG("starting trace; %zu workers\n", tracer->worker_count);
ssize_t parallel_threshold =
LOCAL_TRACE_QUEUE_SIZE - LOCAL_TRACE_QUEUE_SHARE_AMOUNT;
if (trace_deque_size(&tracer->workers[0].deque) >= parallel_threshold) {
DEBUG("waking workers\n");
atomic_store_explicit(&tracer->active_tracers, tracer->worker_count,
memory_order_release);
atomic_store_explicit(&tracer->running_tracers, tracer->worker_count,
memory_order_release);
for (size_t i = 0; i < tracer->worker_count; i++)
trace_worker_request_trace(&tracer->workers[i]);
tracer_unpark_all_workers(tracer);
} else {
DEBUG("starting in local-only mode\n");
}
DEBUG("waiting on tracers\n");
pthread_mutex_lock(&tracer->lock);
while (tracer->count <= trace_count)
pthread_cond_wait(&tracer->cond, &tracer->lock);
pthread_mutex_unlock(&tracer->lock);
trace_worker_trace(&tracer->workers[0]);
DEBUG("trace finished\n");
}