/* libcr_ipc/chan.c - Simple channels for libcr * * Copyright (C) 2024-2025 Luke T. Shumaker * SPDX-License-Identifier: AGPL-3.0-or-later */ #include /* for alloca() */ #include /* for memcpy() */ #include /* for cid_t, cr_* */ #include #include #include #include "_linkedlist.h" /* base channels **************************************************************/ struct cr_chan_waiter { cr_ipc_dll_node; cid_t cid; void *val_ptr; void (*dequeue)(void *, size_t); void *dequeue_arg1; size_t dequeue_arg2; }; void cr_chan_dequeue(void *_ch, size_t) { struct _cr_chan *ch = _ch; cr_ipc_dll_pop_from_front(&ch->waiters); } void _cr_chan_xfer(enum _cr_chan_waiter_typ self_typ, struct _cr_chan *ch, void *val_ptr, size_t val_size) { assert(ch); assert(val_ptr); if (ch->waiters.front && ch->waiter_typ != self_typ) { /* non-blocking fast-path */ /* Copy. */ struct cr_chan_waiter *front = cr_ipc_dll_node_cast(struct cr_chan_waiter, ch->waiters.front); if (self_typ == _CR_CHAN_SENDER) memcpy(front->val_ptr, val_ptr, val_size); else memcpy(val_ptr, front->val_ptr, val_size); cr_unpause(front->cid); front->dequeue(front->dequeue_arg1, front->dequeue_arg2); cr_yield(); } else { /* blocking slow-path */ struct cr_chan_waiter self = { .cid = cr_getcid(), .val_ptr = val_ptr, .dequeue = cr_chan_dequeue, .dequeue_arg1 = ch, }; cr_ipc_dll_push_to_rear(&ch->waiters, &self); ch->waiter_typ = self_typ; cr_pause_and_yield(); } } /* select *********************************************************************/ enum cr_select_class { CR_SELECT_CLASS_DEFAULT, CR_SELECT_CLASS_BLOCKING, CR_SELECT_CLASS_NONBLOCK, }; struct cr_select_waiters { size_t cnt; struct cr_select_arg *args; struct cr_chan_waiter *nodes; }; static inline enum cr_select_class cr_select_getclass(struct cr_select_arg arg) { switch (arg.op) { case _CR_SELECT_OP_RECV: if (arg.ch->waiters.front && arg.ch->waiter_typ == _CR_CHAN_SENDER) return CR_SELECT_CLASS_NONBLOCK; else return CR_SELECT_CLASS_BLOCKING; case _CR_SELECT_OP_SEND: if (arg.ch->waiters.front && arg.ch->waiter_typ == _CR_CHAN_RECVER) return CR_SELECT_CLASS_NONBLOCK; else return CR_SELECT_CLASS_BLOCKING; case _CR_SELECT_OP_DEFAULT: return CR_SELECT_CLASS_DEFAULT; default: assert_notreached("invalid arg.op"); } } void cr_select_dequeue(void *_waiters, size_t idx) { struct cr_select_waiters *waiters = _waiters; for (size_t i = 0; i < waiters->cnt; i++) cr_ipc_dll_remove(&(waiters->args[i].ch->waiters), &(waiters->nodes[i])); waiters->cnt = idx; } size_t cr_select_v(size_t arg_cnt, struct cr_select_arg arg_vec[]) { size_t cnt_blocking = 0; size_t cnt_nonblock = 0; size_t cnt_default = 0; assert(arg_cnt); assert(arg_vec); cr_assert_in_coroutine(); for (size_t i = 0; i < arg_cnt; i++) { switch (cr_select_getclass(arg_vec[i])) { case CR_SELECT_CLASS_BLOCKING: cnt_blocking++; break; case CR_SELECT_CLASS_NONBLOCK: cnt_nonblock++; break; case CR_SELECT_CLASS_DEFAULT: cnt_default++; break; } } if (cnt_nonblock) { size_t choice = rand_uint63n(cnt_nonblock); for (size_t i = 0, seen = 0; i < arg_cnt; i++) { if (cr_select_getclass(arg_vec[i]) == CR_SELECT_CLASS_NONBLOCK) { if (seen == choice) { _cr_chan_xfer(arg_vec[i].op == _CR_SELECT_OP_RECV ? _CR_CHAN_RECVER : _CR_CHAN_SENDER, arg_vec[i].ch, arg_vec[i].val_ptr, arg_vec[i].val_siz); return i; } seen++; } } assert_notreached("should have returned from inside for() loop"); } if (cnt_default) { for (size_t i = 0; i < arg_cnt; i++) if (cr_select_getclass(arg_vec[i]) == CR_SELECT_CLASS_DEFAULT) return i; assert_notreached("should have returned from inside for() loop"); } struct cr_select_waiters waiters = { .cnt = arg_cnt, .args = arg_vec, .nodes = alloca(sizeof(struct cr_chan_waiter) * arg_cnt), }; for (size_t i = 0; i < arg_cnt; i++) { waiters.nodes[i] = (struct cr_chan_waiter){ .cid = cr_getcid(), .val_ptr = arg_vec[i].val_ptr, .dequeue = cr_select_dequeue, .dequeue_arg1 = &waiters, .dequeue_arg2 = i, }; cr_ipc_dll_push_to_rear(&arg_vec[i].ch->waiters, &waiters.nodes[i]); } cr_pause_and_yield(); return waiters.cnt; }