1
Fork 0
mirror of https://git.savannah.gnu.org/git/guile.git synced 2025-05-15 10:10:21 +02:00

Hey parallel marking is finally an improvement??

This commit is contained in:
Andy Wingo 2022-03-13 21:38:59 +01:00
parent 4d7041bfa9
commit fddd4d9416

View file

@ -1,5 +1,5 @@
#ifndef SERIAL_TRACE_H #ifndef PARALLEL_MARKER_H
#define SERIAL_TRACE_H #define PARALLEL_MARKER_H
#include <pthread.h> #include <pthread.h>
#include <stdatomic.h> #include <stdatomic.h>
@ -215,16 +215,23 @@ mark_deque_steal(struct mark_deque *q) {
return x; return x;
} }
static int
mark_deque_can_steal(struct mark_deque *q) {
size_t t = LOAD_ACQUIRE(&q->top);
atomic_thread_fence(memory_order_seq_cst);
size_t b = LOAD_ACQUIRE(&q->bottom);
return t < b;
}
#undef LOAD_RELAXED #undef LOAD_RELAXED
#undef STORE_RELAXED #undef STORE_RELAXED
#undef LOAD_ACQUIRE #undef LOAD_ACQUIRE
#undef STORE_RELEASE #undef STORE_RELEASE
#undef LOAD_CONSUME #undef LOAD_CONSUME
#define LOCAL_MARK_QUEUE_SIZE 64 #define LOCAL_MARK_QUEUE_SIZE 1024
#define LOCAL_MARK_QUEUE_MASK 63 #define LOCAL_MARK_QUEUE_MASK (LOCAL_MARK_QUEUE_SIZE - 1)
#define LOCAL_MARK_QUEUE_SHARE_THRESHOLD 48 #define LOCAL_MARK_QUEUE_SHARE_AMOUNT (LOCAL_MARK_QUEUE_SIZE * 3 / 4)
#define LOCAL_MARK_QUEUE_SHARE_AMOUNT 32
struct local_mark_queue { struct local_mark_queue {
size_t read; size_t read;
size_t write; size_t write;
@ -248,10 +255,6 @@ local_mark_queue_empty(struct local_mark_queue *q) {
return local_mark_queue_size(q) == 0; return local_mark_queue_size(q) == 0;
} }
static inline int static inline int
local_mark_queue_should_share(struct local_mark_queue *q) {
return local_mark_queue_size(q) >= LOCAL_MARK_QUEUE_SHARE_THRESHOLD;
}
static inline int
local_mark_queue_full(struct local_mark_queue *q) { local_mark_queue_full(struct local_mark_queue *q) {
return local_mark_queue_size(q) >= LOCAL_MARK_QUEUE_SIZE; return local_mark_queue_size(q) >= LOCAL_MARK_QUEUE_SIZE;
} }
@ -264,221 +267,6 @@ local_mark_queue_pop(struct local_mark_queue *q) {
return q->data[q->read++ & LOCAL_MARK_QUEUE_MASK]; return q->data[q->read++ & LOCAL_MARK_QUEUE_MASK];
} }
struct mark_notify {
size_t notifiers;
int pending;
pthread_mutex_t lock;
pthread_cond_t cond;
};
static void
mark_notify_init(struct mark_notify *notify) {
notify->notifiers = 0;
notify->pending = 0;
pthread_mutex_init(&notify->lock, NULL);
pthread_cond_init(&notify->cond, NULL);
}
static void
mark_notify_destroy(struct mark_notify *notify) {
pthread_mutex_destroy(&notify->lock);
pthread_cond_destroy(&notify->cond);
}
static void
mark_notify_add_notifier(struct mark_notify *notify) {
pthread_mutex_lock(&notify->lock);
notify->notifiers++;
pthread_mutex_unlock(&notify->lock);
}
static void
mark_notify_remove_notifier(struct mark_notify *notify) {
pthread_mutex_lock(&notify->lock);
notify->notifiers--;
if (notify->notifiers == 0)
pthread_cond_signal(&notify->cond);
pthread_mutex_unlock(&notify->lock);
}
enum mark_notify_status {
MARK_NOTIFY_DONE,
MARK_NOTIFY_WOKE
};
static enum mark_notify_status
mark_notify_wait(struct mark_notify *notify) {
enum mark_notify_status res;
pthread_mutex_lock(&notify->lock);
if (notify->pending) {
res = MARK_NOTIFY_WOKE;
notify->pending = 0;
goto done;
}
if (notify->notifiers == 0) {
res = MARK_NOTIFY_DONE;
goto done;
}
// Spurious wakeup is OK.
DEBUG("-- marker waiting\n");
pthread_cond_wait(&notify->cond, &notify->lock);
DEBUG("-- marker woke\n");
res = MARK_NOTIFY_WOKE;
notify->pending = 0;
done:
pthread_mutex_unlock(&notify->lock);
return res;
}
static void
mark_notify_wake(struct mark_notify *notify) {
DEBUG("== notifying pending wake!\n");
pthread_mutex_lock(&notify->lock);
notify->pending = 1;
pthread_cond_signal(&notify->cond);
pthread_mutex_unlock(&notify->lock);
DEBUG("== notifying pending wake done\n");
}
// A mostly lock-free multi-producer, single consumer queue, largely
// inspired by Rust's std::sync::channel.
//
// https://www.1024cores.net/home/lock-free-algorithms/queues/non-intrusive-mpsc-node-based-queue
struct mark_channel_message {
struct mark_channel_message * _Atomic next;
// Payload will be zero only for free messages, and for the sentinel
// message.
atomic_uintptr_t payload;
};
#define MARK_CHANNEL_WRITER_MESSAGE_COUNT ((size_t)1024)
struct mark_channel {
union {
struct mark_channel_message* _Atomic head;
char head_padding[64];
};
union {
atomic_size_t length;
char length_padding[64];
};
struct mark_channel_message* tail;
struct mark_channel_message sentinel;
struct mark_notify notify;
};
struct mark_channel_writer {
struct mark_channel_message messages[MARK_CHANNEL_WRITER_MESSAGE_COUNT];
size_t next_message;
struct mark_channel *channel;
};
static void
mark_channel_init(struct mark_channel *ch) {
memset(ch, 0, sizeof(*ch));
atomic_init(&ch->head, &ch->sentinel);
atomic_init(&ch->length, 0);
mark_notify_init(&ch->notify);
ch->tail = &ch->sentinel;
}
static void
mark_channel_destroy(struct mark_channel *ch) {
mark_notify_destroy(&ch->notify);
}
static void
mark_channel_push(struct mark_channel *ch, struct mark_channel_message *msg) {
ASSERT(msg->payload);
atomic_store_explicit(&msg->next, NULL, memory_order_relaxed);
struct mark_channel_message *prev =
atomic_exchange_explicit(&ch->head, msg, memory_order_acq_rel);
atomic_store_explicit(&prev->next, msg, memory_order_release);
size_t old_length =
atomic_fetch_add_explicit(&ch->length, 1, memory_order_relaxed);
if (old_length == 0)
mark_notify_wake(&ch->notify);
}
static uintptr_t
mark_channel_try_pop(struct mark_channel *ch) {
struct mark_channel_message *tail = ch->tail;
struct mark_channel_message *next =
atomic_load_explicit(&tail->next, memory_order_acquire);
if (next) {
ch->tail = next;
uintptr_t payload =
atomic_load_explicit(&next->payload, memory_order_acquire);
ASSERT(payload != 0);
// Indicate to the writer that the old tail node can now be re-used.
// Note though that the new tail node is floating garbage; its
// payload has been popped but the node itself is still part of the
// queue. Care has to be taken to ensure that any remaining queue
// entries are popped before the associated channel writer's
// messages are deallocated.
atomic_store_explicit(&tail->payload, 0, memory_order_release);
atomic_fetch_sub_explicit(&ch->length, 1, memory_order_relaxed);
return payload;
}
// if (atomic_load_explicit(&ch->head) == tail) return EMPTY else INCONSISTENT
return 0;
}
static uintptr_t
mark_channel_pop(struct mark_channel *ch) {
while (1) {
uintptr_t ret = mark_channel_try_pop(ch);
if (ret)
return ret;
if (atomic_load_explicit(&ch->length, memory_order_relaxed) == 0) {
if (mark_notify_wait(&ch->notify) == MARK_NOTIFY_DONE)
return 0;
}
}
}
static void
mark_channel_writer_init(struct mark_channel *ch,
struct mark_channel_writer *writer) {
memset(writer, 0, sizeof(*writer));
writer->channel = ch;
}
static void
mark_channel_write(struct mark_channel_writer *writer, uintptr_t payload) {
ASSERT(payload);
struct mark_channel_message *msg = &writer->messages[writer->next_message];
while (atomic_load_explicit(&msg->payload, memory_order_acquire) != 0)
sched_yield();
writer->next_message++;
if (writer->next_message == MARK_CHANNEL_WRITER_MESSAGE_COUNT)
writer->next_message = 0;
atomic_store_explicit(&msg->payload, payload, memory_order_release);
mark_channel_push(writer->channel, msg);
}
static void
mark_channel_writer_activate(struct mark_channel_writer *writer) {
mark_notify_add_notifier(&writer->channel->notify);
}
static void
mark_channel_writer_deactivate(struct mark_channel_writer *writer) {
mark_notify_remove_notifier(&writer->channel->notify);
}
enum mark_worker_state { enum mark_worker_state {
MARK_WORKER_STOPPED, MARK_WORKER_STOPPED,
MARK_WORKER_IDLE, MARK_WORKER_IDLE,
@ -489,27 +277,30 @@ enum mark_worker_state {
struct mark_worker { struct mark_worker {
struct context *cx; struct context *cx;
size_t id;
size_t steal_id;
pthread_t thread; pthread_t thread;
enum mark_worker_state state; enum mark_worker_state state;
pthread_mutex_t lock; pthread_mutex_t lock;
pthread_cond_t cond; pthread_cond_t cond;
struct mark_channel_writer writer; struct mark_deque deque;
}; };
#define MARK_WORKERS_MAX_COUNT 8 #define MARK_WORKERS_MAX_COUNT 8
struct marker { struct marker {
struct mark_deque deque;
pthread_mutex_t deque_writer_lock;
struct mark_channel overflow;
atomic_size_t active_markers; atomic_size_t active_markers;
size_t worker_count; size_t worker_count;
atomic_size_t running_markers;
long count;
pthread_mutex_t lock;
pthread_cond_t cond;
struct mark_worker workers[MARK_WORKERS_MAX_COUNT]; struct mark_worker workers[MARK_WORKERS_MAX_COUNT];
}; };
struct local_marker { struct local_marker {
struct mark_worker *worker; struct mark_worker *worker;
struct mark_deque *deque; struct mark_deque *share_deque;
struct context *cx; struct context *cx;
struct local_mark_queue local; struct local_mark_queue local;
}; };
@ -519,15 +310,17 @@ static inline struct marker* context_marker(struct context *cx);
static size_t number_of_current_processors(void) { return 1; } static size_t number_of_current_processors(void) { return 1; }
static void static int
mark_worker_init(struct mark_worker *worker, struct context *cx, mark_worker_init(struct mark_worker *worker, struct context *cx,
struct marker *marker) { struct marker *marker, size_t id) {
worker->cx = cx; worker->cx = cx;
worker->id = id;
worker->steal_id = 0;
worker->thread = 0; worker->thread = 0;
worker->state = MARK_WORKER_STOPPED; worker->state = MARK_WORKER_STOPPED;
pthread_mutex_init(&worker->lock, NULL); pthread_mutex_init(&worker->lock, NULL);
pthread_cond_init(&worker->cond, NULL); pthread_cond_init(&worker->cond, NULL);
mark_channel_writer_init(&marker->overflow, &worker->writer); return mark_deque_init(&worker->deque);
} }
static void mark_worker_mark(struct mark_worker *worker); static void mark_worker_mark(struct mark_worker *worker);
@ -574,9 +367,10 @@ mark_worker_spawn(struct mark_worker *worker) {
static void static void
mark_worker_request_mark(struct mark_worker *worker) { mark_worker_request_mark(struct mark_worker *worker) {
struct marker *marker = context_marker(worker->cx);
pthread_mutex_lock(&worker->lock); pthread_mutex_lock(&worker->lock);
ASSERT(worker->state == MARK_WORKER_IDLE); ASSERT(worker->state == MARK_WORKER_IDLE);
mark_channel_writer_activate(&worker->writer);
worker->state = MARK_WORKER_MARKING; worker->state = MARK_WORKER_MARKING;
pthread_cond_signal(&worker->cond); pthread_cond_signal(&worker->cond);
pthread_mutex_unlock(&worker->lock); pthread_mutex_unlock(&worker->lock);
@ -585,7 +379,14 @@ mark_worker_request_mark(struct mark_worker *worker) {
static void static void
mark_worker_finished_marking(struct mark_worker *worker) { mark_worker_finished_marking(struct mark_worker *worker) {
// Signal controller that we are done with marking. // Signal controller that we are done with marking.
mark_channel_writer_deactivate(&worker->writer); struct marker *marker = context_marker(worker->cx);
if (atomic_fetch_sub(&marker->running_markers, 1) == 1) {
pthread_mutex_lock(&marker->lock);
marker->count++;
pthread_cond_signal(&marker->cond);
pthread_mutex_unlock(&marker->lock);
}
} }
static void static void
@ -600,10 +401,11 @@ mark_worker_request_stop(struct mark_worker *worker) {
static int static int
marker_init(struct context *cx) { marker_init(struct context *cx) {
struct marker *marker = context_marker(cx); struct marker *marker = context_marker(cx);
if (!mark_deque_init(&marker->deque)) atomic_init(&marker->active_markers, 0);
return 0; atomic_init(&marker->running_markers, 0);
pthread_mutex_init(&marker->deque_writer_lock, NULL); marker->count = 0;
mark_channel_init(&marker->overflow); pthread_mutex_init(&marker->lock, NULL);
pthread_cond_init(&marker->cond, NULL);
size_t desired_worker_count = 0; size_t desired_worker_count = 0;
if (getenv("GC_MARKERS")) if (getenv("GC_MARKERS"))
desired_worker_count = atoi(getenv("GC_MARKERS")); desired_worker_count = atoi(getenv("GC_MARKERS"));
@ -612,7 +414,8 @@ marker_init(struct context *cx) {
if (desired_worker_count > MARK_WORKERS_MAX_COUNT) if (desired_worker_count > MARK_WORKERS_MAX_COUNT)
desired_worker_count = MARK_WORKERS_MAX_COUNT; desired_worker_count = MARK_WORKERS_MAX_COUNT;
for (size_t i = 0; i < desired_worker_count; i++) { for (size_t i = 0; i < desired_worker_count; i++) {
mark_worker_init(&marker->workers[i], cx, marker); if (!mark_worker_init(&marker->workers[i], cx, marker, i))
break;
if (mark_worker_spawn(&marker->workers[i])) if (mark_worker_spawn(&marker->workers[i]))
marker->worker_count++; marker->worker_count++;
else else
@ -621,9 +424,15 @@ marker_init(struct context *cx) {
return marker->worker_count > 0; return marker->worker_count > 0;
} }
static void marker_prepare(struct context *cx) {} static void marker_prepare(struct context *cx) {
struct marker *marker = context_marker(cx);
for (size_t i = 0; i < marker->worker_count; i++)
marker->workers[i].steal_id = 0;
}
static void marker_release(struct context *cx) { static void marker_release(struct context *cx) {
mark_deque_release(&context_marker(cx)->deque); struct marker *marker = context_marker(cx);
for (size_t i = 0; i < marker->worker_count; i++)
mark_deque_release(&marker->workers[i].deque);
} }
struct gcobj; struct gcobj;
@ -634,22 +443,9 @@ static inline int mark_object(struct context *cx,
static inline void static inline void
marker_share(struct local_marker *mark) { marker_share(struct local_marker *mark) {
struct marker *marker = context_marker(mark->cx); DEBUG("marker #%zu: sharing\n", mark->worker->id);
DEBUG("marker %p: trying to share\n", mark->worker);
if (pthread_mutex_trylock(&marker->deque_writer_lock) != 0) {
DEBUG("marker %p: trylock failed\n", mark->worker);
if (local_mark_queue_full(&mark->local)) {
DEBUG("marker %p: forcing lock acquisition\n", mark->worker);
pthread_mutex_lock(&marker->deque_writer_lock);
} else
return;
}
DEBUG("marker %p: sharing\n", mark->worker);
for (size_t i = 0; i < LOCAL_MARK_QUEUE_SHARE_AMOUNT; i++) for (size_t i = 0; i < LOCAL_MARK_QUEUE_SHARE_AMOUNT; i++)
mark_deque_push(&marker->deque, local_mark_queue_pop(&mark->local)); mark_deque_push(mark->share_deque, local_mark_queue_pop(&mark->local));
pthread_mutex_unlock(&marker->deque_writer_lock);
} }
static inline void static inline void
@ -657,55 +453,114 @@ marker_visit(void **loc, void *mark_data) {
struct local_marker *mark = mark_data; struct local_marker *mark = mark_data;
struct gcobj *obj = *loc; struct gcobj *obj = *loc;
if (obj && mark_object(mark->cx, obj)) { if (obj && mark_object(mark->cx, obj)) {
if (local_mark_queue_should_share(&mark->local)) if (local_mark_queue_full(&mark->local))
marker_share(mark); marker_share(mark);
local_mark_queue_push(&mark->local, (uintptr_t)obj); local_mark_queue_push(&mark->local, (uintptr_t)obj);
} }
} }
static uintptr_t static uintptr_t
mark_worker_steal(struct local_marker *mark) { marker_steal_from_worker(struct marker *marker, size_t id) {
DEBUG("marker %p: trying to steal\n", mark->worker); ASSERT(id < marker->worker_count);
while (1) { while (1) {
uintptr_t addr = mark_deque_steal(mark->deque); uintptr_t res = mark_deque_steal(&marker->workers[id].deque);
if (addr == mark_deque_empty) { if (res == mark_deque_empty)
struct marker *marker = context_marker(mark->cx); return 0;
if (atomic_fetch_sub_explicit(&marker->active_markers, 1, if (res == mark_deque_abort)
memory_order_relaxed) == 1) {
DEBUG(" ->> marker %p: DONE (no spinning) <<-\n", mark->worker);
return 0;
}
size_t spin_count = 0;
while (1) {
addr = mark_deque_steal(mark->deque);
if (addr != mark_deque_empty) {
DEBUG("marker %p: spinning got 0x%zx\n", mark->worker, addr);
atomic_fetch_add_explicit(&marker->active_markers, 1,
memory_order_relaxed);
break;
}
if (atomic_load_explicit(&marker->active_markers,
memory_order_relaxed) == 0) {
DEBUG(" ->> marker %p: DONE <<-\n", mark->worker);
return 0;
}
// spin
DEBUG("marker %p: spinning #%zu\n", mark->worker, spin_count);
if (spin_count < 10)
__builtin_ia32_pause();
else if (spin_count < 20)
sched_yield();
else if (spin_count < 40)
usleep(0);
else
usleep(1);
spin_count++;
}
}
DEBUG("marker %p: stealing got 0x%zx\n", mark->worker, addr);
if (addr == mark_deque_abort)
continue; continue;
return addr; return res;
}
}
static uintptr_t
marker_can_steal_from_worker(struct marker *marker, size_t id) {
ASSERT(id < marker->worker_count);
return mark_deque_can_steal(&marker->workers[id].deque);
}
static uintptr_t
mark_worker_steal_from_any(struct mark_worker *worker, struct marker *marker) {
size_t steal_id = worker->steal_id;
for (size_t i = 0; i < marker->worker_count; i++) {
steal_id = (steal_id + 1) % marker->worker_count;
DEBUG("marker #%zu: stealing from #%zu\n", worker->id, steal_id);
uintptr_t addr = marker_steal_from_worker(marker, steal_id);
if (addr) {
DEBUG("marker #%zu: stealing got 0x%zx\n", worker->id, addr);
worker->steal_id = steal_id;
return addr;
}
}
DEBUG("marker #%zu: failed to steal\n", worker->id);
return 0;
}
static int
mark_worker_can_steal_from_any(struct mark_worker *worker, struct marker *marker) {
size_t steal_id = worker->steal_id;
DEBUG("marker #%zu: checking if any worker has tasks\n", worker->id);
for (size_t i = 0; i < marker->worker_count; i++) {
steal_id = (steal_id + 1) % marker->worker_count;
int res = marker_can_steal_from_worker(marker, steal_id);
if (res) {
DEBUG("marker #%zu: worker #%zu has tasks!\n", worker->id, steal_id);
worker->steal_id = steal_id;
return 1;
}
}
DEBUG("marker #%zu: nothing to steal\n", worker->id);
return 0;
}
static int
mark_worker_check_termination(struct mark_worker *worker,
struct marker *marker) {
// We went around all workers and nothing. Enter termination phase.
if (atomic_fetch_sub_explicit(&marker->active_markers, 1,
memory_order_relaxed) == 1) {
DEBUG(" ->> marker #%zu: DONE (no spinning) <<-\n", worker->id);
return 1;
}
size_t spin_count = 0;
while (1) {
if (mark_worker_can_steal_from_any(worker, marker)) {
atomic_fetch_add_explicit(&marker->active_markers, 1,
memory_order_relaxed);
return 0;
}
if (atomic_load_explicit(&marker->active_markers,
memory_order_relaxed) == 0) {
DEBUG(" ->> marker #%zu: DONE <<-\n", worker->id);
return 1;
}
// spin
DEBUG("marker #%zu: spinning #%zu\n", worker->id, spin_count);
if (spin_count < 10)
__builtin_ia32_pause();
else if (spin_count < 20)
sched_yield();
else if (spin_count < 40)
usleep(0);
else
usleep(1);
spin_count++;
}
}
static uintptr_t
mark_worker_steal(struct local_marker *mark) {
struct marker *marker = context_marker(mark->cx);
struct mark_worker *worker = mark->worker;
while (1) {
DEBUG("marker #%zu: trying to steal\n", worker->id);
uintptr_t addr = mark_worker_steal_from_any(worker, marker);
if (addr)
return addr;
if (mark_worker_check_termination(worker, marker))
return 0;
} }
} }
@ -713,12 +568,12 @@ static void
mark_worker_mark(struct mark_worker *worker) { mark_worker_mark(struct mark_worker *worker) {
struct local_marker mark; struct local_marker mark;
mark.worker = worker; mark.worker = worker;
mark.deque = &context_marker(worker->cx)->deque; mark.share_deque = &worker->deque;
mark.cx = worker->cx; mark.cx = worker->cx;
local_mark_queue_init(&mark.local); local_mark_queue_init(&mark.local);
size_t n = 0; size_t n = 0;
DEBUG("marker %p: running mark loop\n", worker); DEBUG("marker #%zu: running mark loop\n", worker->id);
while (1) { while (1) {
uintptr_t addr; uintptr_t addr;
if (!local_mark_queue_empty(&mark.local)) { if (!local_mark_queue_empty(&mark.local)) {
@ -731,7 +586,7 @@ mark_worker_mark(struct mark_worker *worker) {
trace_one((struct gcobj*)addr, &mark); trace_one((struct gcobj*)addr, &mark);
n++; n++;
} }
DEBUG("marker %p: done marking, %zu objects traced\n", worker, n); DEBUG("marker #%zu: done marking, %zu objects traced\n", worker->id, n);
mark_worker_finished_marking(worker); mark_worker_finished_marking(worker);
} }
@ -739,51 +594,36 @@ mark_worker_mark(struct mark_worker *worker) {
static inline void static inline void
marker_visit_root(void **loc, struct context *cx) { marker_visit_root(void **loc, struct context *cx) {
struct gcobj *obj = *loc; struct gcobj *obj = *loc;
struct mark_deque *worker0_deque = &context_marker(cx)->workers[0].deque;
if (obj && mark_object(cx, obj)) if (obj && mark_object(cx, obj))
mark_deque_push(&context_marker(cx)->deque, (uintptr_t)obj); mark_deque_push(worker0_deque, (uintptr_t)obj);
} }
static inline void static inline void
marker_trace(struct context *cx) { marker_trace(struct context *cx) {
struct marker *marker = context_marker(cx); struct marker *marker = context_marker(cx);
pthread_mutex_lock(&marker->lock);
long mark_count = marker->count;
pthread_mutex_unlock(&marker->lock);
DEBUG("starting trace; %zu workers\n", marker->worker_count); DEBUG("starting trace; %zu workers\n", marker->worker_count);
while (1) { DEBUG("waking workers\n");
DEBUG("waking workers\n"); atomic_store_explicit(&marker->active_markers, marker->worker_count,
atomic_store_explicit(&marker->active_markers, marker->worker_count, memory_order_release);
memory_order_release); atomic_store_explicit(&marker->running_markers, marker->worker_count,
for (size_t i = 0; i < marker->worker_count; i++) memory_order_release);
mark_worker_request_mark(&marker->workers[i]); for (size_t i = 0; i < marker->worker_count; i++)
mark_worker_request_mark(&marker->workers[i]);
DEBUG("running controller loop\n"); DEBUG("waiting on markers\n");
size_t n = 0;
while (1) { pthread_mutex_lock(&marker->lock);
DEBUG("controller: popping\n"); while (marker->count <= mark_count)
uintptr_t addr = mark_channel_pop(&marker->overflow); pthread_cond_wait(&marker->cond, &marker->lock);
DEBUG("controller: popped 0x%zx\n", addr); pthread_mutex_unlock(&marker->lock);
if (!addr)
break;
mark_deque_push(&marker->deque, addr);
DEBUG("controller: pushed to deque\n");
n++;
}
DEBUG("controller loop done, %zu objects sent for rebalancing\n", n);
// As in the ISMM'16 paper, it's possible that a worker decides to
// stop because the deque is empty, but actually there was an
// in-flight object in the mark channel that we hadn't been able to
// push yet. Loop in that case.
{
uintptr_t addr = mark_deque_try_pop(&marker->deque);
if (addr == mark_deque_empty)
break;
DEBUG("--> controller looping again due to slop\n");
mark_deque_push(&marker->deque, addr);
}
}
ASSERT(atomic_load(&marker->overflow.length) == 0);
ASSERT(atomic_load(&marker->overflow.head) == marker->overflow.tail);
DEBUG("trace finished\n"); DEBUG("trace finished\n");
} }
#endif // SERIAL_MARK_H #endif // PARALLEL_MARKER_H