diff options
author | Luke T. Shumaker <lukeshu@lukeshu.com> | 2024-09-22 00:56:20 -0600 |
---|---|---|
committer | Luke T. Shumaker <lukeshu@lukeshu.com> | 2024-09-22 00:56:20 -0600 |
commit | 8ae41297e21c96793ab1f2c8d1abcc5f274501e2 (patch) | |
tree | c2d8a94ed875c3739a2f730a2ea16dd802948615 | |
parent | b13be233849bfec6277638f6bf7261a4b709cedf (diff) |
wip netio
-rw-r--r-- | Makefile | 5 | ||||
-rw-r--r-- | netio.h | 14 | ||||
-rw-r--r-- | netio_posix.c | 49 | ||||
-rw-r--r-- | netio_uring.c | 365 |
4 files changed, 433 insertions, 0 deletions
@@ -1,3 +1,8 @@ +CFLAGS += -fno-split-stack +CFLAGS += -Wall -Wextra -Werror +CFLAGS += -g -O0 +LDFLAGS += -static + srv9p: srv9p.o coroutine.o net9p.o net9p_defs.h net9p_defs.c: net9p_defs.gen net9p_defs.txt @@ -0,0 +1,14 @@ +#ifndef _NETIO_H_ +#define _NETIO_H_ + +#include <stdint.h> +#include <stdbool.h> + +void netio_init(void); + +int netio_accept(int sock); +int netio_read(int conn, void *buf, size_t count); +int netio_write(int conn, void *buf, size_t count); +int netio_close(int conn, bool rd, bool wr); + +#endef /* _NETIO_H_ */ diff --git a/netio_posix.c b/netio_posix.c new file mode 100644 index 0000000..0aa8277 --- /dev/null +++ b/netio_posix.c @@ -0,0 +1,49 @@ +#include <aio.h> +#include <sys/socket.h> + +#include "coroutine.h" + +/* http://davmac.org/davpage/linux/async-io.html */ +void netio_init(void) { + sigaction(TODO) +} + +int netio_accept(int socket) { + /* AFAICT there is no good POSIX way to do this without busy-polling. */ + for (;;) { + int conn = accept(socket, NULL, NULL); + if (conn < 0) { + if (errno == EAGAIN || errno == EWOULDBLOCK) { + cr_yield(); + continue; + } + return -errno; + } + return conn; + } +} + +int netio_read(int conn, void *buf, size_t count) { + int r; + struct aiocb ctl_block = { + .aio_filedes = conn, + .aio_buf = buff, + .aio_nbytes = count, + .aio_sigevent = { + .sigev_notify = SIGEV_SIGNAL, + .sigev_signo = SIGIO, + .sigev_value = { + .sigval_int = (int)cr_getcid(), + }, + }, + }; + + if (aio_read(&ctl_block) < 0) + return -errno; + + while ((r = aio_error(&ctl_block)) == EINPROGRESS) + cr_pause_and_yield(); + + + +} diff --git a/netio_uring.c b/netio_uring.c new file mode 100644 index 0000000..bf3f9c0 --- /dev/null +++ b/netio_uring.c @@ -0,0 +1,365 @@ +#include <linux/io_uring.h> +#include <sys/mman.h> +#include <assert.h> +#include <error.h> +#include <string.h> + +#include "coroutine.h" +#include "netio.h" + +/* Config *********************************************************************/ + +#define MAX_WORKERS 8 + +/* Syscalls *******************************************************************/ + +/* I don't want to pull in liburing, and glibc doesn't provide stubs + * for these syscalls. + * + * For the signatures, search for `SYSCALL_DEFINE` in + * `linux.git:io_uring/io_uring.c`. + */ + +static inline int io_uring_setup(uint32_t entries, struct io_uring_params *p) { + return syscall(__NR_io_uring_setup, entries, p); +} +static inline int io_uring_enter1(int fd, uint32_t to_submit, uint32_t min_complete, uint32_t flags, + sigset_t *sig) { + return syscall(__NR_io_uring_enter, fd, to_submit, min_complete, flags & ~IORING_ENTER_EXT_ARG, sig); +} +static inline int io_uring_enter2(int fd, uint32_t to_submit, uint32_t min_complete, uint32_t flags, + struct io_uring_getevents_arg *argp, size_t argsz) { + return syscall(__NR_io_uring_enter, fd, to_submit, min_complete, flags | IORING_ENTER_EXT_ARG, argp, argsz); +} +static inline int io_uring_register(int fd, int opcode, void *arg, unsigned int nr_args) { + return syscall(__NR_io_uring_register, opcode, arg, nr_args); +} + +/* Userspace component of io_uring ********************************************/ + +/* I'm not too sure what the sematics around volatile-memory vs + * __sync_synchronize() are, but this is the definition that + * linux.git/tools/include/io_uring/mini_liburing.h uses. */ +#if defined(__x86_64__) +# define memory_barrier() asm volatile ("":::"memory") +#else +# define memory_barrier() __sync_synchronize() /* GCC built-in */ +#endif + +/** + * Submission Queue + * + * A proxy into the kernel's internal structures, which have been mmap()ed into + * userspace. Each member is a pointer (which is what makes this a proxy) + * because the order of the members kernel memory is not ABI-stable, and instead + * we get offsets into the mmap()ed area from the io_uring_setup() syscall. + * + * Call memory_barrier() before reading a non-const value from the proxy, and + * after writing to it. + * + * Exactly which kernel structure this is proxying into varies by kernel version + * (I point this out so that you can more easily find the documentation for the + * internal kernel structures): + * + * fs/io_uring.c struct io_sq_ring; kernel v5.1 - v5.3 + * fs/io_uring.c struct io_rings; kernel v5.4 - v5.19 + * include/linux/io_uring_types.h struct io_rings; kernel v6.0 + + * + * Despite the kernel having merged `io_sq_ring` and `io_cq_ring` into a single + * monolithic structure in v5.4, I leave them separate here, because + * conceptually they are better separate; they were merged purely for + * performance reasons. + * + * I also include in a leading comment on each member with a kernel definition, + * which will make the size passed to mmap() make sense. + */ +struct my_io_sq_ring_proxy { + void *ring_mmap; /* in case we want to munmap() it */ + size_t ring_mmap_size; /* in case we want to munmap() it */ + /* pointers into mmap(offset=ORING_OFF_SQ_RING) */ + /* kernel def */ /* who-writes-it ; description */ + /* u32 r.head; */ uint32_t *sq_head; /* kernel ; apply sq_mask to get a valid index */ + /* u32 r.tail; */ uint32_t *sq_tail; /* userspace ; apply sq_mask to get a valid index */ + /* u32 ring_mask; */ uint32_t *sq_mask; /* kern-const ; TODO */ + /* u32 ring_entries; */ uint32_t *sq_cap; /* kern-const ; number of entries, always a power-of-2 */ + /* u32 dropped; */ uint32_t *sq_dropped; /* kernel ; number of entries dropped because invalid index */ + /* atomic_t flags; */ int *sq_flags; /* kernel ; must use memory barrier before checking */ + /* u32 array[]; */ uint32_t *sq_sqe_idxs; /* userspace ; sq_cap-sized array of indexes into the sq_sqes array */ + + /* This is actually separate from `struct io_sq_ring`/`struct io_rings`. */ + void *entries_mmap; /* in case we want to munmap() */ + size_t entries_mmap_size; /* in case we want to munmap() */ + /* pointers into mmap(offset=ORING_OFF_SQES) */ + /* who-writes-it ; description */ + struct io_uring_sqe *sq_sqes; /* userspace ; sq_cap-sized array */ + + /* The structure of sq_sqe_idxs is as follows. The + * application writes a request into [sq_tail+1], then + * increments sq_tail. The kernel increments sq_head when it + * has processed a request. + * + * <- sq_cap=8 + * [ 7: uninitialized ] + * [ 6: uninitialized ] + * [ 5: uninitialized ] <- sq_tail=5 + * [ 4: pending T ] + * [ 3: pending | ] + * [ 2: pending H ] <- sq_head=2 + * [ 1: finished ] + * [ 0: finished ] + * + * It may wrap-around like + * + * <- sq_cap=8 + * [ 7: pending | ] + * [ 6: pending | ] + * [ 5: pending | ] + * [ 4: pending H ] <- sq_head=4 + * [ 3: finished ] + * [ 2: finished ] <- sq_tail=10 (sq_tail%sq_cap = sq_tail&sq_mask = 2) + * [ 1: pending T ] + * [ 0: pending | ] + * + * When empty it looks like + * <- sq_cap=8 + * [ 7: uninitialized ] + * [ 6: uninitialized ] + * [ 5: uninitialized ] + * [ 4: uninitialized ] + * [ 3: uninitialized ] + * [ 2: uninitialized ] + * [ 1: finished O ] <- sq_head=sq_tail=1 + * [ 0: finished ] + */ +}; + +/** + * Completion Queue + * + * A proxy into the kernel's internal structure, which has been mmap()ed into + * userspace. Each member is a pointer (which is what makes this a proxy) + * because the order of the members kernel memory is not ABI-stable, and instead + * we get offsets into the mmap()ed area from the io_uring_setup() syscall. + * + * Call memory_barrier() before reading a non-const value from the proxy, and + * after writing to it. + * + * Exactly which kernel structure this is proxying into varies by kernel version + * (I point this out so that you can more easily find the documentation for the + * internal kernel structures): + * + * fs/io_uring.c struct io_cq_ring; kernel v5.1 - v5.3 + * fs/io_uring.c struct io_rings; kernel v5.4 - v5.19 + * include/linux/io_uring_types.h struct io_rings; kernel v6.0 + + * + * Despite the kernel having merged `io_sq_ring` and `io_cq_ring` into a single + * monolithic structure in v5.4, I leave them separate here, because + * conceptually they are better separate; they were merged purely for + * performance reasons. + * + * I also include in a leading comment on each member with a kernel definition, + * which will make the size passed to mmap() make sense. + */ +struct my_io_cq_ring_proxy { + size_t mmap_size; /* in case we want to munmap() it */ + void *mmap; /* in case we want to munmap() it */ + /* pointers into mmap(offset=ORING_OFF_CQ_RING) */ + /* kernel def */ /* who-writes-it ; description */ + /* u32 r.head; */ uint32_t *cq_head; /* userspace ; apply cq_mask to get a valid index */ + /* u32 r.tail; */ uint32_t *cq_tail; /* kernel ; apply cq_mask to get a valid index */ + /* u32 ring_mask; */ uint32_t *cq_mask; /* kern-const ; TODO */ + /* u32 ring_entries; */ uint32_t *cq_cap; /* kern-const ; number of entries, always a power-of-2 */ + /* u32 flags; */ uint32_t *cq_flags; /* userspace ; TODO */ + /* u32 overflow; */ uint32_t *cq_overflow; /* kernel ; TODO */ + /* struct io_uring_cqe cqes[]; */ struct io_uring_cqe *cq_cqes; /* mostly-kernel ; cq_cap-sized array; userspace is allowed to modify pending entries */ +} + +struct my_uring { + int fd; + + struct my_io_sq_ring_proxy kern_sq; + struct my_io_cq_ring_proxy kern_cq; + + uint32_t user_sq_head; + uint32_t user_sq_tail; + uint32_t user_cq_head; + uint32_t user_cq_tail; +}; + +static int my_uring_deinit(struct my_uring *ring) { + if (ring->kern_cq.mmap && + ring->kern_cq.mmap != MAP_FAILED && + ring->kern_cq.mmap != ring->kern_sq.ring_mmap) + munmap(ring->ring->kern_cq.mmap, ring->kern_cq.mmap_size); + if (ring->kern_sq.entries_mmap && + ring->kern_sq.entries_mmap != MAP_FAILED) + munmap(ring->ring->kern_sq.entries_mmap, ring->kern_sq.entries_mmap_size); + if (ring->kern_sq.ring_mmap && + ring->kern_sq.ring_mmap != MAP_FAILED) + munmap(ring->ring->kern_sq.ring_mmap, ring->kern_sq.ring_mmap_size); + if (ring->fd >= 0) + close(ring->fd); + memset(ring, 0, sizeof(*ring)); + ring->fd = -1; +} + +static int my_uring_init(struct my_uring *ring, uint32_t num_entries) { + assert(ring); + + memset(ring, 0, sizeof(*ring)); + + static struct io_uring_params params = { 0 }; + ring->fd = io_uring_setup(num_entries, ¶ms); + if (ring->fd < 0) { + error(0, ring->fd, "io_uring_setup"); + my_uring_deinit(ring); + return -1; + } + + ring->kern_sq.ring_mmap_size = params.sq_off.array + (params.sq_entries * sizeof(uint32_t)); + ring->kern_sq.ring_mmap = mmap(NULL, ring->kern_sq.ring_mmap_size, + PROT_READ|PROT_WRITE, + MAP_SHARED|MAP_POPULATE, + ring->fd, + IORING_OFF_SQ_RING); + if (ring->kern_sq.ring_mmap == MAP_FAILED) { + error(0, errno, "mmap(SQ_RING)"); + my_uring_deinit(ring); + return -1; + } + ring->kern_sq.sq_head = ring->kern_sq.mmap + params.sq_off.head; + ring->kern_sq.sq_tail = ring->kern_sq.mmap + params.sq_off.tail; + ring->kern_sq.sq_mask = ring->kern_sq.mmap + params.sq_off.ring_mask; + ring->kern_sq.sq_cap = ring->kern_sq.mmap + params.sq_off.ring_entries; + ring->kern_sq.sq_flags = ring->kern_sq.mmap + params.sq_off.flags; + ring->kern_sq.sq_dropped = ring->kern_sq.mmap + params.sq_off.dropped; + ring->kern_sq.sq_sqe_idxs = ring->kern_sq.mmap + params.sq_off.array; + + ring->kern_sq.entries_mmap_size = params.sq_entries * sizeof(struct io_uring_sqe); + ring->kern_sq.entries_mmap = mmap(NULL, ring->kern_sq.entries_mmap_size, + PROT_READ|PROT_WRITE, + MAP_SHARED|MAP_POPULATE, + ring->fd, + IORING_OFF_SQES); + if (ring->kern_sq == MAP_FAILED) { + error(0, errno, "mmap(SQES)"); + my_uring_deinit(ring); + return -1; + } + ring->kern_sq.sq_sqes = ring->kern_sq.entries_mmap; + + if (params.features & IORING_FEAT_SINGLE_MMAP) { + /* Purely optional optimization that is possible since kernel v5.4. */ + ring->cq.mmap_size = ring->kern_sq.ring_mmap_size; + ring->cq.mmap = ring->kern_sq.ring_mmap; + } else { + ring->cq.mmap_size = params.cq_off.cqes + p->cq_entries * sizeof(struct io_uring_cqe); + ring->cq.mmap = mmap(NULL, ring->cq.mmap_size, + PROT_READ|PROT_WRITE, + MAP_SHARED|MAP_POPULATE, + ring->fd, + IORING_OFF_CQ_RING); + if (ring->cq.mmap == MAP_FAILED) { + error(0, errno, "mmap(CQ_RING)"); + my_uring_deinit(ring); + return -1; + } + } + ring->cq.head = ring->cq.mmap + params.cq_off.head; + ring->cq.tail = ring->cq.mmap + params.cq_off.tail; + ring->cq.mask = ring->cq.mmap + params.cq_off.mask; + ring->cq.cap = ring->cq.mmap + params.cq_off.entries; + ring->cq.overflow = ring->cq.mmap + params.cq_off.overflow; + ring->cq.cqes = ring->cq.mmap + params.cq_off.cqes; + ring->cq.flags = ring->cq.mmap + params.cq_off.flags; + + return 0; +} + +/** + * Obtain a Submission Queue Entry that is available to be written into. + * Returns NULL if the queue is full. + * + * Once you have filled the sqe with your request, call my_uring_submit() to + * submit it to the kernel, in a batch with any other sqe's you've gotten since + * the last my_uring_submit() call. + */ +static inline struct io_uring_sqe *my_uring_get_sqe(struct my_uring *ring) { + /* Because sq_cap is always a power-of-2, we don't ever have to apply `% + * sq_cap` because `& sq_mask` does that for us, and UINT32_MAX being a + * power-of-2 as well means overflow will never hurt us. */ + if (ring->user_sq_tail + 1 - ring->user_sq_head > *ring->kern_sq.sq_cap) + return 1; + return &sq->kern_sq.sq_sqes[sq->user_sq_tail++ & *sq->kern_sq.sq_mask]; +} + +/** + * Submit a batch of sqes (obtained from my_uring_get_sqe()) to the kernel. + * + * Returns the number of sqes successfully submitted, or -errno. + */ +static inline int my_uring_submit(struct my_uring *ring) { + uint32_t read_khead, ktail_to_write, mask; + int ret; + + if (ring->user_sq.sq_head == ring->user_sq.sq_tail) + /* nothing to do */ + return 0; + + mask = *ring->kern_sq.sq_mask; + + memory_barrier(); /* to read sq_head */ + read_khead = *ring->kern_sq.sq_head; + + ktail_to_write = *ring->kern_sq.sq_tail; + while (ring->user_sq.sq_head != ring->user_sq.sq_tail && ktail_to_write + 1 != read_khead) + ring_kern_sq.sq_sqe_idxs[ktail_to_write++ & mask] = ring->user_sq.sq_head++ & mask; + memory_barrier(); /* wrote sq_sqe_idxs; must do this *before* the sq_tail write below */ + + *ring->kern_sq.sq_tail = ktail_to_write; + memory_barrier(); /* wrote sq_tail */ + + ret = io_uring_enter1(ring->fd, ktail_to_write - read_khead, 0, 0, NULL); + return ret < 0 ? -errno : ret; +} + +/* netio implementation on top of that ****************************************/ + +static struct my_uring netio_uring = { 0 }; + +void netio_init(void) { + if (!netio_uring.fd) + if (my_uring_init(&netio_uring, num_entries) < 0) + exit(1); +} + +/** Like accept4(2). */ +static inline int netio_accept4(int sock, struct sockaddr *addr, socklen_t *addrlen, int flags) { + struct io_uring_sqe *sqe = my_uring_get_sqe(&netio_uring); + if (!sqe) + return -ENOSR; + + sqe.opcode = IORING_OP_ACCEPT; + sqe.fd = sock; + sqe.addr = (uint64_t)addr; + sqe.off = (uint64_t)addrlen; + sqe.accept_flags = flags; + sqe.len = 0; /* number of iovecs */ + + /* Submit that accept4() call. */ + my_uring_submit(&netio_uring); +} + +int netio_accept(int sock) { + return netio_accept4(sock, NULL, NULL, 0); +} + +int netio_read(int conn, void *buf, size_t count) { +} + +int netio_write(int conn, void *buf, size_t count) { +} + +int netio_close(int conn, bool rd, bool wr) { +} + |