summaryrefslogtreecommitdiff
path: root/libcr_ipc/chan.c
diff options
context:
space:
mode:
Diffstat (limited to 'libcr_ipc/chan.c')
-rw-r--r--libcr_ipc/chan.c141
1 files changed, 135 insertions, 6 deletions
diff --git a/libcr_ipc/chan.c b/libcr_ipc/chan.c
index 7dd1132..12d2ec2 100644
--- a/libcr_ipc/chan.c
+++ b/libcr_ipc/chan.c
@@ -4,11 +4,31 @@
* SPDX-License-Identifier: AGPL-3.0-or-later
*/
+#include <alloca.h> /* for alloca() */
+#include <string.h> /* for memcpy() */
+
+#include <libcr/coroutine.h> /* for cid_t, cr_* */
+#include <libmisc/assert.h>
+#include <libmisc/rand.h>
+
#include <libcr_ipc/chan.h>
-void _cr_chan_dequeue(void *_ch, size_t) {
+#include "_linkedlist.h"
+
+/* base channels **************************************************************/
+
+struct cr_chan_waiter {
+ cr_ipc_dll_node;
+ cid_t cid;
+ void *val_ptr;
+ void (*dequeue)(void *, size_t);
+ void *dequeue_arg1;
+ size_t dequeue_arg2;
+};
+
+void cr_chan_dequeue(void *_ch, size_t) {
struct _cr_chan *ch = _ch;
- _cr_ipc_dll_pop_from_front(&ch->waiters);
+ cr_ipc_dll_pop_from_front(&ch->waiters);
}
void _cr_chan_xfer(enum _cr_chan_waiter_typ self_typ, struct _cr_chan *ch, void *val_ptr, size_t val_size) {
@@ -17,7 +37,7 @@ void _cr_chan_xfer(enum _cr_chan_waiter_typ self_typ, struct _cr_chan *ch, void
if (ch->waiters.front && ch->waiter_typ != self_typ) { /* non-blocking fast-path */
/* Copy. */
- struct _cr_chan_waiter *front = _cr_ipc_dll_node_cast(struct _cr_chan_waiter, ch->waiters.front);
+ struct cr_chan_waiter *front = cr_ipc_dll_node_cast(struct cr_chan_waiter, ch->waiters.front);
if (self_typ == _CR_CHAN_SENDER)
memcpy(front->val_ptr, val_ptr, val_size);
else
@@ -27,14 +47,123 @@ void _cr_chan_xfer(enum _cr_chan_waiter_typ self_typ, struct _cr_chan *ch, void
front->dequeue_arg2);
cr_yield();
} else { /* blocking slow-path */
- struct _cr_chan_waiter self = {
+ struct cr_chan_waiter self = {
.cid = cr_getcid(),
.val_ptr = val_ptr,
- .dequeue = _cr_chan_dequeue,
+ .dequeue = cr_chan_dequeue,
.dequeue_arg1 = ch,
};
- _cr_ipc_dll_push_to_rear(&ch->waiters, &self);
+ cr_ipc_dll_push_to_rear(&ch->waiters, &self);
ch->waiter_typ = self_typ;
cr_pause_and_yield();
}
}
+
+/* select *********************************************************************/
+
+enum cr_select_class {
+ CR_SELECT_CLASS_DEFAULT,
+ CR_SELECT_CLASS_BLOCKING,
+ CR_SELECT_CLASS_NONBLOCK,
+};
+
+struct cr_select_waiters {
+ size_t cnt;
+ struct cr_select_arg *args;
+ struct cr_chan_waiter *nodes;
+};
+
+static inline enum cr_select_class cr_select_getclass(struct cr_select_arg arg) {
+ switch (arg.op) {
+ case _CR_SELECT_OP_RECV:
+ if (arg.ch->waiters.front && arg.ch->waiter_typ == _CR_CHAN_SENDER)
+ return CR_SELECT_CLASS_NONBLOCK;
+ else
+ return CR_SELECT_CLASS_BLOCKING;
+ case _CR_SELECT_OP_SEND:
+ if (arg.ch->waiters.front && arg.ch->waiter_typ == _CR_CHAN_RECVER)
+ return CR_SELECT_CLASS_NONBLOCK;
+ else
+ return CR_SELECT_CLASS_BLOCKING;
+ case _CR_SELECT_OP_DEFAULT:
+ return CR_SELECT_CLASS_DEFAULT;
+ default:
+ assert_notreached("invalid arg.op");
+ }
+}
+
+void cr_select_dequeue(void *_waiters, size_t idx) {
+ struct cr_select_waiters *waiters = _waiters;
+ for (size_t i = 0; i < waiters->cnt; i++)
+ cr_ipc_dll_remove(&(waiters->args[i].ch->waiters),
+ &(waiters->nodes[i]));
+ waiters->cnt = idx;
+}
+
+size_t cr_select_v(size_t arg_cnt, struct cr_select_arg arg_vec[]) {
+ size_t cnt_blocking = 0;
+ size_t cnt_nonblock = 0;
+ size_t cnt_default = 0;
+
+ assert(arg_cnt);
+ assert(arg_vec);
+ cr_assert_in_coroutine();
+
+ for (size_t i = 0; i < arg_cnt; i++) {
+ switch (cr_select_getclass(arg_vec[i])) {
+ case CR_SELECT_CLASS_BLOCKING:
+ cnt_blocking++;
+ break;
+ case CR_SELECT_CLASS_NONBLOCK:
+ cnt_nonblock++;
+ break;
+ case CR_SELECT_CLASS_DEFAULT:
+ cnt_default++;
+ break;
+ }
+ }
+
+ if (cnt_nonblock) {
+ size_t choice = rand_uint63n(cnt_nonblock);
+ for (size_t i = 0, seen = 0; i < arg_cnt; i++) {
+ if (cr_select_getclass(arg_vec[i]) == CR_SELECT_CLASS_NONBLOCK) {
+ if (seen == choice) {
+ _cr_chan_xfer(arg_vec[i].op == _CR_SELECT_OP_RECV
+ ? _CR_CHAN_RECVER
+ : _CR_CHAN_SENDER,
+ arg_vec[i].ch,
+ arg_vec[i].val_ptr,
+ arg_vec[i].val_siz);
+ return i;
+ }
+ seen++;
+ }
+ }
+ assert_notreached("should have returned from inside for() loop");
+ }
+
+ if (cnt_default) {
+ for (size_t i = 0; i < arg_cnt; i++)
+ if (cr_select_getclass(arg_vec[i]) == CR_SELECT_CLASS_DEFAULT)
+ return i;
+ assert_notreached("should have returned from inside for() loop");
+ }
+
+ struct cr_select_waiters waiters = {
+ .cnt = arg_cnt,
+ .args = arg_vec,
+ .nodes = alloca(sizeof(struct cr_chan_waiter) * arg_cnt),
+ };
+ for (size_t i = 0; i < arg_cnt; i++) {
+ waiters.nodes[i] = (struct cr_chan_waiter){
+ .cid = cr_getcid(),
+ .val_ptr = arg_vec[i].val_ptr,
+ .dequeue = cr_select_dequeue,
+ .dequeue_arg1 = &waiters,
+ .dequeue_arg2 = i,
+ };
+ cr_ipc_dll_push_to_rear(&arg_vec[i].ch->waiters, &waiters.nodes[i]);
+ }
+ cr_pause_and_yield();
+ return waiters.cnt;
+}