diff options
author | Luke T. Shumaker <lukeshu@lukeshu.com> | 2024-09-22 02:52:42 -0600 |
---|---|---|
committer | Luke T. Shumaker <lukeshu@lukeshu.com> | 2024-09-22 02:52:42 -0600 |
commit | 042220ed00c1b2642e8c3b7f68b858ec9c1e71d6 (patch) | |
tree | 8b7daaa21c2d5f29f7cede38d07b6c0306071090 | |
parent | e2a29828cdd5c68f70ee771bb9f73c4b03c59529 (diff) |
finish(?) netio_posix
-rw-r--r-- | netio.h | 5 | ||||
-rw-r--r-- | netio_posix.c | 195 | ||||
-rw-r--r-- | netio_uring.c | 365 |
3 files changed, 183 insertions, 382 deletions
@@ -4,11 +4,10 @@ #include <stdint.h> #include <stdbool.h> -void netio_init(void); - +int netio_listen(uint16_t port); int netio_accept(int sock); int netio_read(int conn, void *buf, size_t count); int netio_write(int conn, void *buf, size_t count); int netio_close(int conn, bool rd, bool wr); -#endef /* _NETIO_H_ */ +#endif /* _NETIO_H_ */ diff --git a/netio_posix.c b/netio_posix.c index 0aa8277..57c46df 100644 --- a/netio_posix.c +++ b/netio_posix.c @@ -1,17 +1,149 @@ -#include <aio.h> -#include <sys/socket.h> +#define LINUX 1 +#define NUM_SOCKETS 1 +#define NUM_WORKERS 8 +#define _GNU_SOURCE +#include <aio.h> /* for struct aiocb, aio_read(), aio_write(), aio_error(), aio_return(), SIGEV_SIGNAL */ +#include <arpa/inet.h> /* for htons() */ +#include <errno.h> /* for errno, EAGAIN, EWOULDBLOCK, EINPROGRESS, EINVAL */ +#include <error.h> /* for error() */ +#include <netinet/in.h> /* for struct sockaddr_in */ +#include <signal.h> /* for siginfo_t, struct sigaction, sigaction(), SIGRTMIN, SIGRTMAX, SA_SIGINFO */ +#include <stdlib.h> /* for shutdown(), SHUT_RD, SHUT_WR, SHUT_RDWR */ +#include <string.h> /* for memset() */ +#include <sys/socket.h> /* for struct sockaddr, socket(), SOCK_* flags, setsockopt(), SOL_SOCKET, SO_REUSEADDR, bind(), listen(), accept() */ +#if LINUX +# include <fcntl.h> /* for fcntl(), F_SETFL, O_ASYNC, F_SETSIG */ +#endif + +#include "netio.h" #include "coroutine.h" -/* http://davmac.org/davpage/linux/async-io.html */ -void netio_init(void) { - sigaction(TODO) +/* I found the following post to be very helpful when writing this: + * http://davmac.org/davpage/linux/async-io.html */ + +static int sigs_allocated = 0; +static int sig_io = 0; +#if LINUX +static int sig_accept = 0; +#endif + +struct netio_socket { + int fd; +#if LINUX + cid_t accept_waiters[NUM_WORKERS]; +#endif +}; + +static struct netio_socket socket_table[NUM_SOCKETS] = {0}; + +static void handle_sig_io(int sig __attribute__ ((unused)), siginfo_t *info, void *ucontext __attribute__ ((unused))) { + cr_unpause((cid_t)info->si_value.sival_int); +} + +#if LINUX +static void handle_sig_accept(int sig __attribute__ ((unused)), siginfo_t *info, void *ucontext __attribute__ ((unused))) { + struct netio_socket *sock = NULL; + for (int i = 0; sock == NULL && i < NUM_SOCKETS; i++) + if (info->si_fd == socket_table[i].fd) + sock = &socket_table[i]; + if (!sock) + return; + for (int i = 0; i < NUM_WORKERS; i++) + if (sock->accept_waiters[i] > 0) { + cr_unpause(sock->accept_waiters[i]); + sock->accept_waiters[i] = 0; + return; + } +} +#endif + +static void _netio_init(void) { + struct sigaction action; + + if (sig_io) + return; + + sig_io = SIGRTMIN + (sigs_allocated++); + if (sig_io > SIGRTMAX) + error(1, 0, "SIGRTMAX exceeded"); + memset(&action, 0, sizeof(action)); + action.sa_flags = SA_SIGINFO; + action.sa_sigaction = handle_sig_io; + if (sigaction(sig_io, &action, NULL) < 0) + error(1, errno, "sigaction"); + +#if LINUX + sig_accept = SIGRTMIN + (sigs_allocated++); + if (sig_accept > SIGRTMAX) + error(1, 0, "SIGRTMAX exceeded"); + memset(&action, 0, sizeof(action)); + action.sa_flags = SA_SIGINFO; + action.sa_sigaction = handle_sig_accept; + if (sigaction(sig_accept, &action, NULL) < 0) + error(1, errno, "sigaction"); +#endif +} + +int netio_listen(uint16_t port) { + int handle; + struct netio_socket *sock; + union { + struct sockaddr_in in; + struct sockaddr gen; + } addr; + + _netio_init(); + + /* Allocate a handle out of socket_table. */ + handle = -1; + for (int i = 0; handle < 0 && i < NUM_SOCKETS; i++) + if (socket_table[i].fd == 0) + handle = i; + if (handle < 0) + error(1, 0, "NUM_SOCKETS exceeded"); + sock = &socket_table[handle]; + + /* Bind the socket. */ + memset(&addr, 0, sizeof addr); + addr.in.sin_family = AF_INET; + addr.in.sin_port = htons(port); + sock->fd = socket(AF_INET, SOCK_STREAM | (LINUX ? 0 : SOCK_NONBLOCK), 0); + if (sock->fd < 0) + error(1, errno, "socket"); + if (setsockopt(sock->fd, SOL_SOCKET, SO_REUSEADDR, &(int){1}, sizeof(int)) < 0) + error(1, errno, "setsockopt"); +#if LINUX + if (fcntl(sock->fd, F_SETFL, O_ASYNC) < 0) + error(1, errno, "fcntl(F_SETFL)"); + if (fcntl(sock->fd, F_SETSIG, sig_accept) < 0) + error(1, errno, "fcntl(F_SETSIG)"); +#endif + if (bind(sock->fd, &addr.gen, sizeof addr) < 0) + error(1, errno, "bind"); + if (listen(sock->fd, NUM_WORKERS) < 0) + error(1, errno, "listen"); + + /* Return. */ + return handle; } -int netio_accept(int socket) { - /* AFAICT there is no good POSIX way to do this without busy-polling. */ +int netio_accept(int sock) { +#if LINUX + int conn; + for (int i = 0; i < NUM_WORKERS; i++) + if (socket_table[sock].accept_waiters[i] == 0) { + socket_table[sock].accept_waiters[i] = cr_getcid(); + break; + } + cr_pause_and_yield(); + conn = accept(socket_table[sock].fd, NULL, NULL); + return conn < 0 ? -errno : conn; +#else + /* AFAICT in pure POSIX there's no good way to do this that + * isn't just busy-polling. */ for (;;) { - int conn = accept(socket, NULL, NULL); + int conn = accept(socket_table[sock].fd, NULL, NULL); if (conn < 0) { if (errno == EAGAIN || errno == EWOULDBLOCK) { cr_yield(); @@ -21,19 +153,20 @@ int netio_accept(int socket) { } return conn; } +#endif } int netio_read(int conn, void *buf, size_t count) { int r; struct aiocb ctl_block = { - .aio_filedes = conn, - .aio_buf = buff, + .aio_fildes = conn, + .aio_buf = buf, .aio_nbytes = count, .aio_sigevent = { .sigev_notify = SIGEV_SIGNAL, - .sigev_signo = SIGIO, + .sigev_signo = sig_io, .sigev_value = { - .sigval_int = (int)cr_getcid(), + .sival_int = (int)cr_getcid(), }, }, }; @@ -43,7 +176,41 @@ int netio_read(int conn, void *buf, size_t count) { while ((r = aio_error(&ctl_block)) == EINPROGRESS) cr_pause_and_yield(); - + return r ? -abs(r) : aio_return(&ctl_block); +} - +int netio_write(int conn, void *buf, size_t count) { + int r; + struct aiocb ctl_block = { + .aio_fildes = conn, + .aio_buf = buf, + .aio_nbytes = count, + .aio_sigevent = { + .sigev_notify = SIGEV_SIGNAL, + .sigev_signo = sig_io, + .sigev_value = { + .sival_int = (int)cr_getcid(), + }, + }, + }; + + if (aio_write(&ctl_block) < 0) + return -errno; + + while ((r = aio_error(&ctl_block)) == EINPROGRESS) + cr_pause_and_yield(); + return r ? -abs(r) : aio_return(&ctl_block); +} + +int netio_close(int conn, bool rd, bool wr) { + int how; + if (rd && wr) + how = SHUT_RDWR; + else if (rd && !wr) + how = SHUT_RD; + else if (!rd && wr) + how = SHUT_WR; + else + return -EINVAL; + return shutdown(conn, how) ? -errno : 0; } diff --git a/netio_uring.c b/netio_uring.c deleted file mode 100644 index bf3f9c0..0000000 --- a/netio_uring.c +++ /dev/null @@ -1,365 +0,0 @@ -#include <linux/io_uring.h> -#include <sys/mman.h> -#include <assert.h> -#include <error.h> -#include <string.h> - -#include "coroutine.h" -#include "netio.h" - -/* Config *********************************************************************/ - -#define MAX_WORKERS 8 - -/* Syscalls *******************************************************************/ - -/* I don't want to pull in liburing, and glibc doesn't provide stubs - * for these syscalls. - * - * For the signatures, search for `SYSCALL_DEFINE` in - * `linux.git:io_uring/io_uring.c`. - */ - -static inline int io_uring_setup(uint32_t entries, struct io_uring_params *p) { - return syscall(__NR_io_uring_setup, entries, p); -} -static inline int io_uring_enter1(int fd, uint32_t to_submit, uint32_t min_complete, uint32_t flags, - sigset_t *sig) { - return syscall(__NR_io_uring_enter, fd, to_submit, min_complete, flags & ~IORING_ENTER_EXT_ARG, sig); -} -static inline int io_uring_enter2(int fd, uint32_t to_submit, uint32_t min_complete, uint32_t flags, - struct io_uring_getevents_arg *argp, size_t argsz) { - return syscall(__NR_io_uring_enter, fd, to_submit, min_complete, flags | IORING_ENTER_EXT_ARG, argp, argsz); -} -static inline int io_uring_register(int fd, int opcode, void *arg, unsigned int nr_args) { - return syscall(__NR_io_uring_register, opcode, arg, nr_args); -} - -/* Userspace component of io_uring ********************************************/ - -/* I'm not too sure what the sematics around volatile-memory vs - * __sync_synchronize() are, but this is the definition that - * linux.git/tools/include/io_uring/mini_liburing.h uses. */ -#if defined(__x86_64__) -# define memory_barrier() asm volatile ("":::"memory") -#else -# define memory_barrier() __sync_synchronize() /* GCC built-in */ -#endif - -/** - * Submission Queue - * - * A proxy into the kernel's internal structures, which have been mmap()ed into - * userspace. Each member is a pointer (which is what makes this a proxy) - * because the order of the members kernel memory is not ABI-stable, and instead - * we get offsets into the mmap()ed area from the io_uring_setup() syscall. - * - * Call memory_barrier() before reading a non-const value from the proxy, and - * after writing to it. - * - * Exactly which kernel structure this is proxying into varies by kernel version - * (I point this out so that you can more easily find the documentation for the - * internal kernel structures): - * - * fs/io_uring.c struct io_sq_ring; kernel v5.1 - v5.3 - * fs/io_uring.c struct io_rings; kernel v5.4 - v5.19 - * include/linux/io_uring_types.h struct io_rings; kernel v6.0 + - * - * Despite the kernel having merged `io_sq_ring` and `io_cq_ring` into a single - * monolithic structure in v5.4, I leave them separate here, because - * conceptually they are better separate; they were merged purely for - * performance reasons. - * - * I also include in a leading comment on each member with a kernel definition, - * which will make the size passed to mmap() make sense. - */ -struct my_io_sq_ring_proxy { - void *ring_mmap; /* in case we want to munmap() it */ - size_t ring_mmap_size; /* in case we want to munmap() it */ - /* pointers into mmap(offset=ORING_OFF_SQ_RING) */ - /* kernel def */ /* who-writes-it ; description */ - /* u32 r.head; */ uint32_t *sq_head; /* kernel ; apply sq_mask to get a valid index */ - /* u32 r.tail; */ uint32_t *sq_tail; /* userspace ; apply sq_mask to get a valid index */ - /* u32 ring_mask; */ uint32_t *sq_mask; /* kern-const ; TODO */ - /* u32 ring_entries; */ uint32_t *sq_cap; /* kern-const ; number of entries, always a power-of-2 */ - /* u32 dropped; */ uint32_t *sq_dropped; /* kernel ; number of entries dropped because invalid index */ - /* atomic_t flags; */ int *sq_flags; /* kernel ; must use memory barrier before checking */ - /* u32 array[]; */ uint32_t *sq_sqe_idxs; /* userspace ; sq_cap-sized array of indexes into the sq_sqes array */ - - /* This is actually separate from `struct io_sq_ring`/`struct io_rings`. */ - void *entries_mmap; /* in case we want to munmap() */ - size_t entries_mmap_size; /* in case we want to munmap() */ - /* pointers into mmap(offset=ORING_OFF_SQES) */ - /* who-writes-it ; description */ - struct io_uring_sqe *sq_sqes; /* userspace ; sq_cap-sized array */ - - /* The structure of sq_sqe_idxs is as follows. The - * application writes a request into [sq_tail+1], then - * increments sq_tail. The kernel increments sq_head when it - * has processed a request. - * - * <- sq_cap=8 - * [ 7: uninitialized ] - * [ 6: uninitialized ] - * [ 5: uninitialized ] <- sq_tail=5 - * [ 4: pending T ] - * [ 3: pending | ] - * [ 2: pending H ] <- sq_head=2 - * [ 1: finished ] - * [ 0: finished ] - * - * It may wrap-around like - * - * <- sq_cap=8 - * [ 7: pending | ] - * [ 6: pending | ] - * [ 5: pending | ] - * [ 4: pending H ] <- sq_head=4 - * [ 3: finished ] - * [ 2: finished ] <- sq_tail=10 (sq_tail%sq_cap = sq_tail&sq_mask = 2) - * [ 1: pending T ] - * [ 0: pending | ] - * - * When empty it looks like - * <- sq_cap=8 - * [ 7: uninitialized ] - * [ 6: uninitialized ] - * [ 5: uninitialized ] - * [ 4: uninitialized ] - * [ 3: uninitialized ] - * [ 2: uninitialized ] - * [ 1: finished O ] <- sq_head=sq_tail=1 - * [ 0: finished ] - */ -}; - -/** - * Completion Queue - * - * A proxy into the kernel's internal structure, which has been mmap()ed into - * userspace. Each member is a pointer (which is what makes this a proxy) - * because the order of the members kernel memory is not ABI-stable, and instead - * we get offsets into the mmap()ed area from the io_uring_setup() syscall. - * - * Call memory_barrier() before reading a non-const value from the proxy, and - * after writing to it. - * - * Exactly which kernel structure this is proxying into varies by kernel version - * (I point this out so that you can more easily find the documentation for the - * internal kernel structures): - * - * fs/io_uring.c struct io_cq_ring; kernel v5.1 - v5.3 - * fs/io_uring.c struct io_rings; kernel v5.4 - v5.19 - * include/linux/io_uring_types.h struct io_rings; kernel v6.0 + - * - * Despite the kernel having merged `io_sq_ring` and `io_cq_ring` into a single - * monolithic structure in v5.4, I leave them separate here, because - * conceptually they are better separate; they were merged purely for - * performance reasons. - * - * I also include in a leading comment on each member with a kernel definition, - * which will make the size passed to mmap() make sense. - */ -struct my_io_cq_ring_proxy { - size_t mmap_size; /* in case we want to munmap() it */ - void *mmap; /* in case we want to munmap() it */ - /* pointers into mmap(offset=ORING_OFF_CQ_RING) */ - /* kernel def */ /* who-writes-it ; description */ - /* u32 r.head; */ uint32_t *cq_head; /* userspace ; apply cq_mask to get a valid index */ - /* u32 r.tail; */ uint32_t *cq_tail; /* kernel ; apply cq_mask to get a valid index */ - /* u32 ring_mask; */ uint32_t *cq_mask; /* kern-const ; TODO */ - /* u32 ring_entries; */ uint32_t *cq_cap; /* kern-const ; number of entries, always a power-of-2 */ - /* u32 flags; */ uint32_t *cq_flags; /* userspace ; TODO */ - /* u32 overflow; */ uint32_t *cq_overflow; /* kernel ; TODO */ - /* struct io_uring_cqe cqes[]; */ struct io_uring_cqe *cq_cqes; /* mostly-kernel ; cq_cap-sized array; userspace is allowed to modify pending entries */ -} - -struct my_uring { - int fd; - - struct my_io_sq_ring_proxy kern_sq; - struct my_io_cq_ring_proxy kern_cq; - - uint32_t user_sq_head; - uint32_t user_sq_tail; - uint32_t user_cq_head; - uint32_t user_cq_tail; -}; - -static int my_uring_deinit(struct my_uring *ring) { - if (ring->kern_cq.mmap && - ring->kern_cq.mmap != MAP_FAILED && - ring->kern_cq.mmap != ring->kern_sq.ring_mmap) - munmap(ring->ring->kern_cq.mmap, ring->kern_cq.mmap_size); - if (ring->kern_sq.entries_mmap && - ring->kern_sq.entries_mmap != MAP_FAILED) - munmap(ring->ring->kern_sq.entries_mmap, ring->kern_sq.entries_mmap_size); - if (ring->kern_sq.ring_mmap && - ring->kern_sq.ring_mmap != MAP_FAILED) - munmap(ring->ring->kern_sq.ring_mmap, ring->kern_sq.ring_mmap_size); - if (ring->fd >= 0) - close(ring->fd); - memset(ring, 0, sizeof(*ring)); - ring->fd = -1; -} - -static int my_uring_init(struct my_uring *ring, uint32_t num_entries) { - assert(ring); - - memset(ring, 0, sizeof(*ring)); - - static struct io_uring_params params = { 0 }; - ring->fd = io_uring_setup(num_entries, ¶ms); - if (ring->fd < 0) { - error(0, ring->fd, "io_uring_setup"); - my_uring_deinit(ring); - return -1; - } - - ring->kern_sq.ring_mmap_size = params.sq_off.array + (params.sq_entries * sizeof(uint32_t)); - ring->kern_sq.ring_mmap = mmap(NULL, ring->kern_sq.ring_mmap_size, - PROT_READ|PROT_WRITE, - MAP_SHARED|MAP_POPULATE, - ring->fd, - IORING_OFF_SQ_RING); - if (ring->kern_sq.ring_mmap == MAP_FAILED) { - error(0, errno, "mmap(SQ_RING)"); - my_uring_deinit(ring); - return -1; - } - ring->kern_sq.sq_head = ring->kern_sq.mmap + params.sq_off.head; - ring->kern_sq.sq_tail = ring->kern_sq.mmap + params.sq_off.tail; - ring->kern_sq.sq_mask = ring->kern_sq.mmap + params.sq_off.ring_mask; - ring->kern_sq.sq_cap = ring->kern_sq.mmap + params.sq_off.ring_entries; - ring->kern_sq.sq_flags = ring->kern_sq.mmap + params.sq_off.flags; - ring->kern_sq.sq_dropped = ring->kern_sq.mmap + params.sq_off.dropped; - ring->kern_sq.sq_sqe_idxs = ring->kern_sq.mmap + params.sq_off.array; - - ring->kern_sq.entries_mmap_size = params.sq_entries * sizeof(struct io_uring_sqe); - ring->kern_sq.entries_mmap = mmap(NULL, ring->kern_sq.entries_mmap_size, - PROT_READ|PROT_WRITE, - MAP_SHARED|MAP_POPULATE, - ring->fd, - IORING_OFF_SQES); - if (ring->kern_sq == MAP_FAILED) { - error(0, errno, "mmap(SQES)"); - my_uring_deinit(ring); - return -1; - } - ring->kern_sq.sq_sqes = ring->kern_sq.entries_mmap; - - if (params.features & IORING_FEAT_SINGLE_MMAP) { - /* Purely optional optimization that is possible since kernel v5.4. */ - ring->cq.mmap_size = ring->kern_sq.ring_mmap_size; - ring->cq.mmap = ring->kern_sq.ring_mmap; - } else { - ring->cq.mmap_size = params.cq_off.cqes + p->cq_entries * sizeof(struct io_uring_cqe); - ring->cq.mmap = mmap(NULL, ring->cq.mmap_size, - PROT_READ|PROT_WRITE, - MAP_SHARED|MAP_POPULATE, - ring->fd, - IORING_OFF_CQ_RING); - if (ring->cq.mmap == MAP_FAILED) { - error(0, errno, "mmap(CQ_RING)"); - my_uring_deinit(ring); - return -1; - } - } - ring->cq.head = ring->cq.mmap + params.cq_off.head; - ring->cq.tail = ring->cq.mmap + params.cq_off.tail; - ring->cq.mask = ring->cq.mmap + params.cq_off.mask; - ring->cq.cap = ring->cq.mmap + params.cq_off.entries; - ring->cq.overflow = ring->cq.mmap + params.cq_off.overflow; - ring->cq.cqes = ring->cq.mmap + params.cq_off.cqes; - ring->cq.flags = ring->cq.mmap + params.cq_off.flags; - - return 0; -} - -/** - * Obtain a Submission Queue Entry that is available to be written into. - * Returns NULL if the queue is full. - * - * Once you have filled the sqe with your request, call my_uring_submit() to - * submit it to the kernel, in a batch with any other sqe's you've gotten since - * the last my_uring_submit() call. - */ -static inline struct io_uring_sqe *my_uring_get_sqe(struct my_uring *ring) { - /* Because sq_cap is always a power-of-2, we don't ever have to apply `% - * sq_cap` because `& sq_mask` does that for us, and UINT32_MAX being a - * power-of-2 as well means overflow will never hurt us. */ - if (ring->user_sq_tail + 1 - ring->user_sq_head > *ring->kern_sq.sq_cap) - return 1; - return &sq->kern_sq.sq_sqes[sq->user_sq_tail++ & *sq->kern_sq.sq_mask]; -} - -/** - * Submit a batch of sqes (obtained from my_uring_get_sqe()) to the kernel. - * - * Returns the number of sqes successfully submitted, or -errno. - */ -static inline int my_uring_submit(struct my_uring *ring) { - uint32_t read_khead, ktail_to_write, mask; - int ret; - - if (ring->user_sq.sq_head == ring->user_sq.sq_tail) - /* nothing to do */ - return 0; - - mask = *ring->kern_sq.sq_mask; - - memory_barrier(); /* to read sq_head */ - read_khead = *ring->kern_sq.sq_head; - - ktail_to_write = *ring->kern_sq.sq_tail; - while (ring->user_sq.sq_head != ring->user_sq.sq_tail && ktail_to_write + 1 != read_khead) - ring_kern_sq.sq_sqe_idxs[ktail_to_write++ & mask] = ring->user_sq.sq_head++ & mask; - memory_barrier(); /* wrote sq_sqe_idxs; must do this *before* the sq_tail write below */ - - *ring->kern_sq.sq_tail = ktail_to_write; - memory_barrier(); /* wrote sq_tail */ - - ret = io_uring_enter1(ring->fd, ktail_to_write - read_khead, 0, 0, NULL); - return ret < 0 ? -errno : ret; -} - -/* netio implementation on top of that ****************************************/ - -static struct my_uring netio_uring = { 0 }; - -void netio_init(void) { - if (!netio_uring.fd) - if (my_uring_init(&netio_uring, num_entries) < 0) - exit(1); -} - -/** Like accept4(2). */ -static inline int netio_accept4(int sock, struct sockaddr *addr, socklen_t *addrlen, int flags) { - struct io_uring_sqe *sqe = my_uring_get_sqe(&netio_uring); - if (!sqe) - return -ENOSR; - - sqe.opcode = IORING_OP_ACCEPT; - sqe.fd = sock; - sqe.addr = (uint64_t)addr; - sqe.off = (uint64_t)addrlen; - sqe.accept_flags = flags; - sqe.len = 0; /* number of iovecs */ - - /* Submit that accept4() call. */ - my_uring_submit(&netio_uring); -} - -int netio_accept(int sock) { - return netio_accept4(sock, NULL, NULL, 0); -} - -int netio_read(int conn, void *buf, size_t count) { -} - -int netio_write(int conn, void *buf, size_t count) { -} - -int netio_close(int conn, bool rd, bool wr) { -} - |