diff options
author | Luke T. Shumaker <lukeshu@lukeshu.com> | 2025-04-07 14:28:17 -0600 |
---|---|---|
committer | Luke T. Shumaker <lukeshu@lukeshu.com> | 2025-04-07 20:28:56 -0600 |
commit | a1a20d71f6d9f6b26893b0f2d641da47cd59e74e (patch) | |
tree | d6a692b3858f2c222dd8eeed371bf7ed4ec27db7 | |
parent | 4d5a8b2f99be5e04954c5067080d1725af8c0ae7 (diff) |
libcr_ipc: Add waitgrouplukeshu/waitgroup
-rw-r--r-- | libcr_ipc/CMakeLists.txt | 2 | ||||
-rw-r--r-- | libcr_ipc/include/libcr_ipc/waitgroup.h | 65 | ||||
-rw-r--r-- | libcr_ipc/tests/test_waitgroup.c | 111 | ||||
-rw-r--r-- | libcr_ipc/waitgroup.c | 68 |
4 files changed, 246 insertions, 0 deletions
diff --git a/libcr_ipc/CMakeLists.txt b/libcr_ipc/CMakeLists.txt index 7d249d7..1f0ad07 100644 --- a/libcr_ipc/CMakeLists.txt +++ b/libcr_ipc/CMakeLists.txt @@ -11,6 +11,7 @@ target_sources(libcr_ipc INTERFACE mutex.c rpc.c sema.c + waitgroup.c ) target_link_libraries(libcr_ipc INTERFACE libcr @@ -23,6 +24,7 @@ set(ipc_tests rpc select sema + waitgroup ) foreach(test IN LISTS ipc_tests) add_lib_test(libcr_ipc "test_${test}") diff --git a/libcr_ipc/include/libcr_ipc/waitgroup.h b/libcr_ipc/include/libcr_ipc/waitgroup.h new file mode 100644 index 0000000..32369cf --- /dev/null +++ b/libcr_ipc/include/libcr_ipc/waitgroup.h @@ -0,0 +1,65 @@ +/* libcr_ipc/waitgroup.h - Go-like WaitGroups for libcr + * + * Copyright (C) 2025 Luke T. Shumaker <lukeshu@lukeshu.com> + * SPDX-License-Identifier: AGPL-3.0-or-later + */ + +#ifndef _LIBCR_IPC_WAITGROUP_H_ +#define _LIBCR_IPC_WAITGROUP_H_ + +#include <stdbool.h> + +#include <libmisc/private.h> + +#include <libcr_ipc/_linkedlist_pub.h> + +/** + * A cr_waitgroup_t is a fair read/write mutex. + * + * A waiting writer blocks any new readers; this ensures that the + * writer is able to eventually get the lock. + * + * None of the methods have `_from_intrhandler` variants because (1) + * an interrupt handler can't block, so it shouldn't ever lock a mutex + * because that can block; and (2) if it can't lock a mutex in the + * first place, then it has no business unlocking one. + */ +typedef struct { + BEGIN_PRIVATE(LIBCR_IPC_WAITGROUP_H); + int count; + bool unpausing; + _cr_ipc_sll_root waiters; + END_PRIVATE(LIBCR_IPC_WAITGROUP_H); +} cr_waitgroup_t; + +/** + * Add delta (which may be negative) to the counter. If the counter + * becomes zero, all cr_waitgroup_wait() are unblocked. The counter + * must not become negative. + * + * @runs_in coroutine + * @cr_pauses never + * @cr_yields never + */ +void cr_waitgroup_add(cr_waitgroup_t *wg, int delta); + +/** + * Like cr_waitgroup_add(), but for use from an interrupt handler. + * + * @runs_in intrhandler + */ +void cr_waitgroup_add_from_intrhandler(cr_waitgroup_t *wg, int delta); + +#define cr_waitgroup_done(WG) cr_waitgroup_add(WG, -1) +#define cr_waitgroup_done_from_intrhandler(WG) cr_waitgroup_add_from_intrhandler(WG, -1) + +/** + * Wait until the counter becomes zero. If the counter is already zero, returns immediately. + * + * @runs_in coroutine + * @cr_pauses maybe + * @cr_yields maybe + */ +void cr_waitgroup_wait(cr_waitgroup_t *wg); + +#endif /* _LIBCR_IPC_WAITGROUP_H_ */ diff --git a/libcr_ipc/tests/test_waitgroup.c b/libcr_ipc/tests/test_waitgroup.c new file mode 100644 index 0000000..46c077d --- /dev/null +++ b/libcr_ipc/tests/test_waitgroup.c @@ -0,0 +1,111 @@ +/* libcr_ipc/tests/test_waitgroup.c - Tests for <libcr_ipc/waitgroup.h> + * + * Copyright (C) 2025 Luke T. Shumaker <lukeshu@lukeshu.com> + * SPDX-License-Identifier: AGPL-3.0-or-later + */ + +#include <string.h> + +#include <libcr/coroutine.h> +#include <libcr_ipc/waitgroup.h> + +#include "test.h" + +cr_waitgroup_t wg = {}; +int cnt = 0; + +COROUTINE cr1_worker(void *) { + cr_begin(); + + cnt++; + cr_waitgroup_done(&wg); + + cr_end(); +} + +COROUTINE cr1_waiter(void *) { + cr_begin(); + + cr_waitgroup_wait(&wg); + + test_assert(cnt == 10); + + cr_end(); +} + +COROUTINE cr1_init(void *) { + cr_begin(); + + cr_waitgroup_add(&wg, 10); + coroutine_add("wait", cr1_waiter, NULL); + for (int i = 0; i < 10; i++) + coroutine_add("worker", cr1_worker, NULL); + + cr_end(); +} + +COROUTINE cr2_waiter(void *) { + cr_begin(); + + cr_waitgroup_wait(&wg); + cr_waitgroup_add(&wg, 1); + + cr_end(); +} + +COROUTINE cr2_init(void *) { + cr_begin(); + + cr_waitgroup_add(&wg, 1); + coroutine_add("waiter1", cr2_waiter, NULL); + coroutine_add("waiter2", cr2_waiter, NULL); + cr_yield(); + cr_waitgroup_done(&wg); + + cr_end(); +} + +char out[10] = {0}; +size_t len = 0; + +COROUTINE cr3_waiter(void *_ch) { + char ch = *(char *)_ch; + cr_begin(); + + cr_waitgroup_wait(&wg); + out[len++] = ch; + + cr_end(); +} + +COROUTINE cr3_init(void *) { + cr_begin(); + + char ch; + cr_waitgroup_add(&wg, 1); + ch = 'a'; coroutine_add("wait-a", cr3_waiter, &ch); + ch = 'b'; coroutine_add("wait-b", cr3_waiter, &ch); + cr_yield(); + ch = 'c'; coroutine_add("wait-c", cr3_waiter, &ch); + cr_waitgroup_done(&wg); + + cr_end(); +} + +int main() { + printf("== test 1 =========================================\n"); + coroutine_add("init", cr1_init, NULL); + coroutine_main(); + + printf("== test 2 =========================================\n"); + coroutine_add("init", cr2_init, NULL); + coroutine_main(); + + printf("== test 3 =========================================\n"); + coroutine_add("init", cr3_init, NULL); + coroutine_main(); + test_assert(len == 3); + test_assert(strcmp(out, "abc") == 0); + + return 0; +} diff --git a/libcr_ipc/waitgroup.c b/libcr_ipc/waitgroup.c new file mode 100644 index 0000000..e76db01 --- /dev/null +++ b/libcr_ipc/waitgroup.c @@ -0,0 +1,68 @@ +/* libcr_ipc/waitgroup.c - Go-like WaitGroups for libcr + * + * Copyright (C) 2025 Luke T. Shumaker <lukeshu@lukeshu.com> + * SPDX-License-Identifier: AGPL-3.0-or-later + */ + +#include <libcr/coroutine.h> /* for cid_t, cr_* */ + +#define IMPLEMENTATION_FOR_LIBCR_IPC_WAITGROUP_H YES +#include <libcr_ipc/waitgroup.h> + +#include "_linkedlist.h" + +struct cr_waitgroup_waiter { + cr_ipc_sll_node; + cid_t cid; +}; + +void cr_waitgroup_wait(cr_waitgroup_t *wg) { + assert(wg); + cr_assert_in_coroutine(); + + struct cr_waitgroup_waiter self = { + .cid = cr_getcid(), + }; + cr_ipc_sll_push_to_rear(&wg->waiters, &self); + if (wg->waiters.front != &self.cr_ipc_sll_node || wg->count > 0) + cr_pause_and_yield(); + assert(wg->waiters.front == &self.cr_ipc_sll_node); + + cr_ipc_sll_pop_from_front(&wg->waiters); + if (wg->waiters.front) { + assert(wg->unpausing); + cr_unpause(cr_ipc_sll_node_cast(struct cr_waitgroup_waiter, wg->waiters.front)->cid); + } else { + wg->unpausing = false; + } +} + +void cr_waitgroup_add(cr_waitgroup_t *wg, int delta) { + assert(wg); + cr_assert_in_coroutine(); + + if (delta == 0) + return; + bool saved = cr_save_and_disable_interrupts(); + wg->count += delta; + assert(wg->count >= 0); + if (wg->count == 0 && !wg->unpausing && wg->waiters.front) { + wg->unpausing = true; + cr_unpause(cr_ipc_sll_node_cast(struct cr_waitgroup_waiter, wg->waiters.front)->cid); + } + cr_restore_interrupts(saved); +} + +void cr_waitgroup_add_from_intrhandler(cr_waitgroup_t *wg, int delta) { + assert(wg); + cr_assert_in_intrhandler(); + + if (delta == 0) + return; + wg->count += delta; + assert(wg->count >= 0); + if (wg->count == 0 && !wg->unpausing && wg->waiters.front) { + wg->unpausing = true; + cr_unpause_from_intrhandler(cr_ipc_sll_node_cast(struct cr_waitgroup_waiter, wg->waiters.front)->cid); + } +} |