1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
|
/* libcr_ipc/chan.c - Simple channels for libcr
*
* Copyright (C) 2024-2025 Luke T. Shumaker <lukeshu@lukeshu.com>
* SPDX-License-Identifier: AGPL-3.0-or-later
*/
#include <alloca.h> /* for alloca() */
#include <string.h> /* for memcpy() */
#include <libcr/coroutine.h> /* for cid_t, cr_* */
#include <libmisc/assert.h>
#include <libmisc/rand.h>
#include <libcr_ipc/chan.h>
/* base channels **************************************************************/
struct cr_chan_waiter {
lm_dll_node;
cid_t cid;
void *val_ptr;
void (*dequeue)(void *, size_t);
void *dequeue_arg1;
size_t dequeue_arg2;
};
void cr_chan_dequeue(void *_ch, size_t) {
struct _cr_chan *ch = _ch;
lm_dll_pop_from_front(&ch->waiters);
}
void _cr_chan_xfer(enum _cr_chan_waiter_typ self_typ, struct _cr_chan *ch, void *val_ptr, size_t val_size) {
assert(ch);
assert(val_ptr);
if (ch->waiters.front && ch->waiter_typ != self_typ) { /* non-blocking fast-path */
/* Copy. */
struct cr_chan_waiter *front = lm_dll_node_cast(struct cr_chan_waiter, ch->waiters.front);
if (self_typ == _CR_CHAN_SENDER)
memcpy(front->val_ptr, val_ptr, val_size);
else
memcpy(val_ptr, front->val_ptr, val_size);
cr_unpause(front->cid);
front->dequeue(front->dequeue_arg1,
front->dequeue_arg2);
cr_yield();
} else { /* blocking slow-path */
struct cr_chan_waiter self = {
.cid = cr_getcid(),
.val_ptr = val_ptr,
.dequeue = cr_chan_dequeue,
.dequeue_arg1 = ch,
};
lm_dll_push_to_rear(&ch->waiters, &self);
ch->waiter_typ = self_typ;
cr_pause_and_yield();
}
}
/* select *********************************************************************/
enum cr_select_class {
CR_SELECT_CLASS_DEFAULT,
CR_SELECT_CLASS_BLOCKING,
CR_SELECT_CLASS_NONBLOCK,
};
struct cr_select_waiters {
size_t cnt;
struct cr_select_arg *args;
struct cr_chan_waiter *nodes;
};
static inline enum cr_select_class cr_select_getclass(struct cr_select_arg arg) {
switch (arg.op) {
case _CR_SELECT_OP_RECV:
if (arg.ch->waiters.front && arg.ch->waiter_typ == _CR_CHAN_SENDER)
return CR_SELECT_CLASS_NONBLOCK;
else
return CR_SELECT_CLASS_BLOCKING;
case _CR_SELECT_OP_SEND:
if (arg.ch->waiters.front && arg.ch->waiter_typ == _CR_CHAN_RECVER)
return CR_SELECT_CLASS_NONBLOCK;
else
return CR_SELECT_CLASS_BLOCKING;
case _CR_SELECT_OP_DEFAULT:
return CR_SELECT_CLASS_DEFAULT;
default:
assert_notreached("invalid arg.op");
}
}
void cr_select_dequeue(void *_waiters, size_t idx) {
struct cr_select_waiters *waiters = _waiters;
for (size_t i = 0; i < waiters->cnt; i++)
lm_dll_remove(&(waiters->args[i].ch->waiters),
&(waiters->nodes[i]));
waiters->cnt = idx;
}
size_t cr_select_v(size_t arg_cnt, struct cr_select_arg arg_vec[]) {
size_t cnt_blocking = 0;
size_t cnt_nonblock = 0;
size_t cnt_default = 0;
assert(arg_cnt);
assert(arg_vec);
cr_assert_in_coroutine();
for (size_t i = 0; i < arg_cnt; i++) {
switch (cr_select_getclass(arg_vec[i])) {
case CR_SELECT_CLASS_BLOCKING:
cnt_blocking++;
break;
case CR_SELECT_CLASS_NONBLOCK:
cnt_nonblock++;
break;
case CR_SELECT_CLASS_DEFAULT:
cnt_default++;
break;
}
}
if (cnt_nonblock) {
size_t choice = rand_uint63n(cnt_nonblock);
for (size_t i = 0, seen = 0; i < arg_cnt; i++) {
if (cr_select_getclass(arg_vec[i]) == CR_SELECT_CLASS_NONBLOCK) {
if (seen == choice) {
_cr_chan_xfer(arg_vec[i].op == _CR_SELECT_OP_RECV
? _CR_CHAN_RECVER
: _CR_CHAN_SENDER,
arg_vec[i].ch,
arg_vec[i].val_ptr,
arg_vec[i].val_siz);
return i;
}
seen++;
}
}
assert_notreached("should have returned from inside for() loop");
}
if (cnt_default) {
for (size_t i = 0; i < arg_cnt; i++)
if (cr_select_getclass(arg_vec[i]) == CR_SELECT_CLASS_DEFAULT)
return i;
assert_notreached("should have returned from inside for() loop");
}
struct cr_select_waiters waiters = {
.cnt = arg_cnt,
.args = arg_vec,
.nodes = alloca(sizeof(struct cr_chan_waiter) * arg_cnt),
};
for (size_t i = 0; i < arg_cnt; i++) {
waiters.nodes[i] = (struct cr_chan_waiter){
.cid = cr_getcid(),
.val_ptr = arg_vec[i].val_ptr,
.dequeue = cr_select_dequeue,
.dequeue_arg1 = &waiters,
.dequeue_arg2 = i,
};
lm_dll_push_to_rear(&arg_vec[i].ch->waiters, &waiters.nodes[i]);
}
cr_pause_and_yield();
return waiters.cnt;
}
|