diff options
Diffstat (limited to 'libcr_ipc/chan.c')
-rw-r--r-- | libcr_ipc/chan.c | 141 |
1 files changed, 135 insertions, 6 deletions
diff --git a/libcr_ipc/chan.c b/libcr_ipc/chan.c index 7dd1132..12d2ec2 100644 --- a/libcr_ipc/chan.c +++ b/libcr_ipc/chan.c @@ -4,11 +4,31 @@ * SPDX-License-Identifier: AGPL-3.0-or-later */ +#include <alloca.h> /* for alloca() */ +#include <string.h> /* for memcpy() */ + +#include <libcr/coroutine.h> /* for cid_t, cr_* */ +#include <libmisc/assert.h> +#include <libmisc/rand.h> + #include <libcr_ipc/chan.h> -void _cr_chan_dequeue(void *_ch, size_t) { +#include "_linkedlist.h" + +/* base channels **************************************************************/ + +struct cr_chan_waiter { + cr_ipc_dll_node; + cid_t cid; + void *val_ptr; + void (*dequeue)(void *, size_t); + void *dequeue_arg1; + size_t dequeue_arg2; +}; + +void cr_chan_dequeue(void *_ch, size_t) { struct _cr_chan *ch = _ch; - _cr_ipc_dll_pop_from_front(&ch->waiters); + cr_ipc_dll_pop_from_front(&ch->waiters); } void _cr_chan_xfer(enum _cr_chan_waiter_typ self_typ, struct _cr_chan *ch, void *val_ptr, size_t val_size) { @@ -17,7 +37,7 @@ void _cr_chan_xfer(enum _cr_chan_waiter_typ self_typ, struct _cr_chan *ch, void if (ch->waiters.front && ch->waiter_typ != self_typ) { /* non-blocking fast-path */ /* Copy. */ - struct _cr_chan_waiter *front = _cr_ipc_dll_node_cast(struct _cr_chan_waiter, ch->waiters.front); + struct cr_chan_waiter *front = cr_ipc_dll_node_cast(struct cr_chan_waiter, ch->waiters.front); if (self_typ == _CR_CHAN_SENDER) memcpy(front->val_ptr, val_ptr, val_size); else @@ -27,14 +47,123 @@ void _cr_chan_xfer(enum _cr_chan_waiter_typ self_typ, struct _cr_chan *ch, void front->dequeue_arg2); cr_yield(); } else { /* blocking slow-path */ - struct _cr_chan_waiter self = { + struct cr_chan_waiter self = { .cid = cr_getcid(), .val_ptr = val_ptr, - .dequeue = _cr_chan_dequeue, + .dequeue = cr_chan_dequeue, .dequeue_arg1 = ch, }; - _cr_ipc_dll_push_to_rear(&ch->waiters, &self); + cr_ipc_dll_push_to_rear(&ch->waiters, &self); ch->waiter_typ = self_typ; cr_pause_and_yield(); } } + +/* select *********************************************************************/ + +enum cr_select_class { + CR_SELECT_CLASS_DEFAULT, + CR_SELECT_CLASS_BLOCKING, + CR_SELECT_CLASS_NONBLOCK, +}; + +struct cr_select_waiters { + size_t cnt; + struct cr_select_arg *args; + struct cr_chan_waiter *nodes; +}; + +static inline enum cr_select_class cr_select_getclass(struct cr_select_arg arg) { + switch (arg.op) { + case _CR_SELECT_OP_RECV: + if (arg.ch->waiters.front && arg.ch->waiter_typ == _CR_CHAN_SENDER) + return CR_SELECT_CLASS_NONBLOCK; + else + return CR_SELECT_CLASS_BLOCKING; + case _CR_SELECT_OP_SEND: + if (arg.ch->waiters.front && arg.ch->waiter_typ == _CR_CHAN_RECVER) + return CR_SELECT_CLASS_NONBLOCK; + else + return CR_SELECT_CLASS_BLOCKING; + case _CR_SELECT_OP_DEFAULT: + return CR_SELECT_CLASS_DEFAULT; + default: + assert_notreached("invalid arg.op"); + } +} + +void cr_select_dequeue(void *_waiters, size_t idx) { + struct cr_select_waiters *waiters = _waiters; + for (size_t i = 0; i < waiters->cnt; i++) + cr_ipc_dll_remove(&(waiters->args[i].ch->waiters), + &(waiters->nodes[i])); + waiters->cnt = idx; +} + +size_t cr_select_v(size_t arg_cnt, struct cr_select_arg arg_vec[]) { + size_t cnt_blocking = 0; + size_t cnt_nonblock = 0; + size_t cnt_default = 0; + + assert(arg_cnt); + assert(arg_vec); + cr_assert_in_coroutine(); + + for (size_t i = 0; i < arg_cnt; i++) { + switch (cr_select_getclass(arg_vec[i])) { + case CR_SELECT_CLASS_BLOCKING: + cnt_blocking++; + break; + case CR_SELECT_CLASS_NONBLOCK: + cnt_nonblock++; + break; + case CR_SELECT_CLASS_DEFAULT: + cnt_default++; + break; + } + } + + if (cnt_nonblock) { + size_t choice = rand_uint63n(cnt_nonblock); + for (size_t i = 0, seen = 0; i < arg_cnt; i++) { + if (cr_select_getclass(arg_vec[i]) == CR_SELECT_CLASS_NONBLOCK) { + if (seen == choice) { + _cr_chan_xfer(arg_vec[i].op == _CR_SELECT_OP_RECV + ? _CR_CHAN_RECVER + : _CR_CHAN_SENDER, + arg_vec[i].ch, + arg_vec[i].val_ptr, + arg_vec[i].val_siz); + return i; + } + seen++; + } + } + assert_notreached("should have returned from inside for() loop"); + } + + if (cnt_default) { + for (size_t i = 0; i < arg_cnt; i++) + if (cr_select_getclass(arg_vec[i]) == CR_SELECT_CLASS_DEFAULT) + return i; + assert_notreached("should have returned from inside for() loop"); + } + + struct cr_select_waiters waiters = { + .cnt = arg_cnt, + .args = arg_vec, + .nodes = alloca(sizeof(struct cr_chan_waiter) * arg_cnt), + }; + for (size_t i = 0; i < arg_cnt; i++) { + waiters.nodes[i] = (struct cr_chan_waiter){ + .cid = cr_getcid(), + .val_ptr = arg_vec[i].val_ptr, + .dequeue = cr_select_dequeue, + .dequeue_arg1 = &waiters, + .dequeue_arg2 = i, + }; + cr_ipc_dll_push_to_rear(&arg_vec[i].ch->waiters, &waiters.nodes[i]); + } + cr_pause_and_yield(); + return waiters.cnt; +} |