summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorLuke T. Shumaker <lukeshu@lukeshu.com>2025-04-07 14:28:17 -0600
committerLuke T. Shumaker <lukeshu@lukeshu.com>2025-04-07 20:28:56 -0600
commita1a20d71f6d9f6b26893b0f2d641da47cd59e74e (patch)
treed6a692b3858f2c222dd8eeed371bf7ed4ec27db7
parent4d5a8b2f99be5e04954c5067080d1725af8c0ae7 (diff)
libcr_ipc: Add waitgrouplukeshu/waitgroup
-rw-r--r--libcr_ipc/CMakeLists.txt2
-rw-r--r--libcr_ipc/include/libcr_ipc/waitgroup.h65
-rw-r--r--libcr_ipc/tests/test_waitgroup.c111
-rw-r--r--libcr_ipc/waitgroup.c68
4 files changed, 246 insertions, 0 deletions
diff --git a/libcr_ipc/CMakeLists.txt b/libcr_ipc/CMakeLists.txt
index 7d249d7..1f0ad07 100644
--- a/libcr_ipc/CMakeLists.txt
+++ b/libcr_ipc/CMakeLists.txt
@@ -11,6 +11,7 @@ target_sources(libcr_ipc INTERFACE
mutex.c
rpc.c
sema.c
+ waitgroup.c
)
target_link_libraries(libcr_ipc INTERFACE
libcr
@@ -23,6 +24,7 @@ set(ipc_tests
rpc
select
sema
+ waitgroup
)
foreach(test IN LISTS ipc_tests)
add_lib_test(libcr_ipc "test_${test}")
diff --git a/libcr_ipc/include/libcr_ipc/waitgroup.h b/libcr_ipc/include/libcr_ipc/waitgroup.h
new file mode 100644
index 0000000..32369cf
--- /dev/null
+++ b/libcr_ipc/include/libcr_ipc/waitgroup.h
@@ -0,0 +1,65 @@
+/* libcr_ipc/waitgroup.h - Go-like WaitGroups for libcr
+ *
+ * Copyright (C) 2025 Luke T. Shumaker <lukeshu@lukeshu.com>
+ * SPDX-License-Identifier: AGPL-3.0-or-later
+ */
+
+#ifndef _LIBCR_IPC_WAITGROUP_H_
+#define _LIBCR_IPC_WAITGROUP_H_
+
+#include <stdbool.h>
+
+#include <libmisc/private.h>
+
+#include <libcr_ipc/_linkedlist_pub.h>
+
+/**
+ * A cr_waitgroup_t is a fair read/write mutex.
+ *
+ * A waiting writer blocks any new readers; this ensures that the
+ * writer is able to eventually get the lock.
+ *
+ * None of the methods have `_from_intrhandler` variants because (1)
+ * an interrupt handler can't block, so it shouldn't ever lock a mutex
+ * because that can block; and (2) if it can't lock a mutex in the
+ * first place, then it has no business unlocking one.
+ */
+typedef struct {
+ BEGIN_PRIVATE(LIBCR_IPC_WAITGROUP_H);
+ int count;
+ bool unpausing;
+ _cr_ipc_sll_root waiters;
+ END_PRIVATE(LIBCR_IPC_WAITGROUP_H);
+} cr_waitgroup_t;
+
+/**
+ * Add delta (which may be negative) to the counter. If the counter
+ * becomes zero, all cr_waitgroup_wait() are unblocked. The counter
+ * must not become negative.
+ *
+ * @runs_in coroutine
+ * @cr_pauses never
+ * @cr_yields never
+ */
+void cr_waitgroup_add(cr_waitgroup_t *wg, int delta);
+
+/**
+ * Like cr_waitgroup_add(), but for use from an interrupt handler.
+ *
+ * @runs_in intrhandler
+ */
+void cr_waitgroup_add_from_intrhandler(cr_waitgroup_t *wg, int delta);
+
+#define cr_waitgroup_done(WG) cr_waitgroup_add(WG, -1)
+#define cr_waitgroup_done_from_intrhandler(WG) cr_waitgroup_add_from_intrhandler(WG, -1)
+
+/**
+ * Wait until the counter becomes zero. If the counter is already zero, returns immediately.
+ *
+ * @runs_in coroutine
+ * @cr_pauses maybe
+ * @cr_yields maybe
+ */
+void cr_waitgroup_wait(cr_waitgroup_t *wg);
+
+#endif /* _LIBCR_IPC_WAITGROUP_H_ */
diff --git a/libcr_ipc/tests/test_waitgroup.c b/libcr_ipc/tests/test_waitgroup.c
new file mode 100644
index 0000000..46c077d
--- /dev/null
+++ b/libcr_ipc/tests/test_waitgroup.c
@@ -0,0 +1,111 @@
+/* libcr_ipc/tests/test_waitgroup.c - Tests for <libcr_ipc/waitgroup.h>
+ *
+ * Copyright (C) 2025 Luke T. Shumaker <lukeshu@lukeshu.com>
+ * SPDX-License-Identifier: AGPL-3.0-or-later
+ */
+
+#include <string.h>
+
+#include <libcr/coroutine.h>
+#include <libcr_ipc/waitgroup.h>
+
+#include "test.h"
+
+cr_waitgroup_t wg = {};
+int cnt = 0;
+
+COROUTINE cr1_worker(void *) {
+ cr_begin();
+
+ cnt++;
+ cr_waitgroup_done(&wg);
+
+ cr_end();
+}
+
+COROUTINE cr1_waiter(void *) {
+ cr_begin();
+
+ cr_waitgroup_wait(&wg);
+
+ test_assert(cnt == 10);
+
+ cr_end();
+}
+
+COROUTINE cr1_init(void *) {
+ cr_begin();
+
+ cr_waitgroup_add(&wg, 10);
+ coroutine_add("wait", cr1_waiter, NULL);
+ for (int i = 0; i < 10; i++)
+ coroutine_add("worker", cr1_worker, NULL);
+
+ cr_end();
+}
+
+COROUTINE cr2_waiter(void *) {
+ cr_begin();
+
+ cr_waitgroup_wait(&wg);
+ cr_waitgroup_add(&wg, 1);
+
+ cr_end();
+}
+
+COROUTINE cr2_init(void *) {
+ cr_begin();
+
+ cr_waitgroup_add(&wg, 1);
+ coroutine_add("waiter1", cr2_waiter, NULL);
+ coroutine_add("waiter2", cr2_waiter, NULL);
+ cr_yield();
+ cr_waitgroup_done(&wg);
+
+ cr_end();
+}
+
+char out[10] = {0};
+size_t len = 0;
+
+COROUTINE cr3_waiter(void *_ch) {
+ char ch = *(char *)_ch;
+ cr_begin();
+
+ cr_waitgroup_wait(&wg);
+ out[len++] = ch;
+
+ cr_end();
+}
+
+COROUTINE cr3_init(void *) {
+ cr_begin();
+
+ char ch;
+ cr_waitgroup_add(&wg, 1);
+ ch = 'a'; coroutine_add("wait-a", cr3_waiter, &ch);
+ ch = 'b'; coroutine_add("wait-b", cr3_waiter, &ch);
+ cr_yield();
+ ch = 'c'; coroutine_add("wait-c", cr3_waiter, &ch);
+ cr_waitgroup_done(&wg);
+
+ cr_end();
+}
+
+int main() {
+ printf("== test 1 =========================================\n");
+ coroutine_add("init", cr1_init, NULL);
+ coroutine_main();
+
+ printf("== test 2 =========================================\n");
+ coroutine_add("init", cr2_init, NULL);
+ coroutine_main();
+
+ printf("== test 3 =========================================\n");
+ coroutine_add("init", cr3_init, NULL);
+ coroutine_main();
+ test_assert(len == 3);
+ test_assert(strcmp(out, "abc") == 0);
+
+ return 0;
+}
diff --git a/libcr_ipc/waitgroup.c b/libcr_ipc/waitgroup.c
new file mode 100644
index 0000000..e76db01
--- /dev/null
+++ b/libcr_ipc/waitgroup.c
@@ -0,0 +1,68 @@
+/* libcr_ipc/waitgroup.c - Go-like WaitGroups for libcr
+ *
+ * Copyright (C) 2025 Luke T. Shumaker <lukeshu@lukeshu.com>
+ * SPDX-License-Identifier: AGPL-3.0-or-later
+ */
+
+#include <libcr/coroutine.h> /* for cid_t, cr_* */
+
+#define IMPLEMENTATION_FOR_LIBCR_IPC_WAITGROUP_H YES
+#include <libcr_ipc/waitgroup.h>
+
+#include "_linkedlist.h"
+
+struct cr_waitgroup_waiter {
+ cr_ipc_sll_node;
+ cid_t cid;
+};
+
+void cr_waitgroup_wait(cr_waitgroup_t *wg) {
+ assert(wg);
+ cr_assert_in_coroutine();
+
+ struct cr_waitgroup_waiter self = {
+ .cid = cr_getcid(),
+ };
+ cr_ipc_sll_push_to_rear(&wg->waiters, &self);
+ if (wg->waiters.front != &self.cr_ipc_sll_node || wg->count > 0)
+ cr_pause_and_yield();
+ assert(wg->waiters.front == &self.cr_ipc_sll_node);
+
+ cr_ipc_sll_pop_from_front(&wg->waiters);
+ if (wg->waiters.front) {
+ assert(wg->unpausing);
+ cr_unpause(cr_ipc_sll_node_cast(struct cr_waitgroup_waiter, wg->waiters.front)->cid);
+ } else {
+ wg->unpausing = false;
+ }
+}
+
+void cr_waitgroup_add(cr_waitgroup_t *wg, int delta) {
+ assert(wg);
+ cr_assert_in_coroutine();
+
+ if (delta == 0)
+ return;
+ bool saved = cr_save_and_disable_interrupts();
+ wg->count += delta;
+ assert(wg->count >= 0);
+ if (wg->count == 0 && !wg->unpausing && wg->waiters.front) {
+ wg->unpausing = true;
+ cr_unpause(cr_ipc_sll_node_cast(struct cr_waitgroup_waiter, wg->waiters.front)->cid);
+ }
+ cr_restore_interrupts(saved);
+}
+
+void cr_waitgroup_add_from_intrhandler(cr_waitgroup_t *wg, int delta) {
+ assert(wg);
+ cr_assert_in_intrhandler();
+
+ if (delta == 0)
+ return;
+ wg->count += delta;
+ assert(wg->count >= 0);
+ if (wg->count == 0 && !wg->unpausing && wg->waiters.front) {
+ wg->unpausing = true;
+ cr_unpause_from_intrhandler(cr_ipc_sll_node_cast(struct cr_waitgroup_waiter, wg->waiters.front)->cid);
+ }
+}