diff options
author | Luke T. Shumaker <lukeshu@lukeshu.com> | 2024-10-06 11:09:09 -0600 |
---|---|---|
committer | Luke T. Shumaker <lukeshu@lukeshu.com> | 2024-10-06 11:09:09 -0600 |
commit | eabe7380e11468fbe8fa5b7bafdb4ce5b8a448ee (patch) | |
tree | 5b9c7483983eb4b4139869211d6c96cf78bbef36 | |
parent | 7ca16c45a2ecfbf415fffd384eb5efd32180c5da (diff) |
libnetio: Ugg, posix aio is trash
In concept, it's exactly what I need.
But
- glibc's implementation of it does not allow concurrent reads and writes
- it not required to work for fds that don't have absolute offsets (so
no pipes, no sockets)
So just spin up a pthread for each syscall. I hate it.
-rw-r--r-- | cmd/sbc_harness/config/config.h | 1 | ||||
-rw-r--r-- | libnetio/netio_posix.c | 319 |
2 files changed, 167 insertions, 153 deletions
diff --git a/cmd/sbc_harness/config/config.h b/cmd/sbc_harness/config/config.h index c5ba6d8..ff18b88 100644 --- a/cmd/sbc_harness/config/config.h +++ b/cmd/sbc_harness/config/config.h @@ -9,7 +9,6 @@ #endif #if defined(USE_CONFIG_NETIO_POSIX) && !defined(_CONFIG_H_NETIO_POSIX_) #define _CONFIG_H_NETIO_POSIX_ -# define CONFIG_NETIO_ISLINUX 1 /* can we use Linux-kernel-specific fcntls? */ # define CONFIG_NETIO_NUM_PORTS 1 #endif diff --git a/libnetio/netio_posix.c b/libnetio/netio_posix.c index dda30b0..62320f2 100644 --- a/libnetio/netio_posix.c +++ b/libnetio/netio_posix.c @@ -1,90 +1,84 @@ -#define _GNU_SOURCE -#include <aio.h> /* for struct aiocb, aio_read(), aio_write(), aio_error(), aio_return(), SIGEV_SIGNAL */ -#include <arpa/inet.h> /* for htons() */ -#include <errno.h> /* for errno, EAGAIN, EWOULDBLOCK, EINPROGRESS, EINVAL */ -#include <error.h> /* for error() */ +/* netio_posix.c - netio implementation for POSIX-ish systems + * (actually uses a few GNU extensions) + * + * Copyright (C) 2024 Luke T. Shumaker <lukeshu@lukeshu.com> + * SPDX-Licence-Identifier: AGPL-3.0-or-later + */ + +#define _GNU_SOURCE /* for pthread_sigqueue(3gnu) */ +/* misc */ +#include <assert.h> /* for assert() */ +#include <errno.h> /* for errno, EAGAIN, EINVAL */ +#include <error.h> /* for error(3gnu) */ +#include <stdlib.h> /* for abs(), shutdown(), SHUT_RD, SHUT_WR, SHUT_RDWR */ +#include <unistd.h> /* for read(), write() */ +/* net */ +#include <arpa/inet.h> /* for htons(3p) */ #include <netinet/in.h> /* for struct sockaddr_in */ -#include <signal.h> /* for siginfo_t, struct sigaction, sigaction(), SIGRTMIN, SIGRTMAX, SA_SIGINFO */ -#include <stdlib.h> /* for shutdown(), SHUT_RD, SHUT_WR, SHUT_RDWR */ -#include <string.h> /* for memset() */ #include <sys/socket.h> /* for struct sockaddr, socket(), SOCK_* flags, setsockopt(), SOL_SOCKET, SO_REUSEADDR, bind(), listen(), accept() */ -#include <unistd.h> /* for getpid() */ +/* async */ +#include <pthread.h> /* for pthread_* */ +#include <signal.h> /* for siginfo_t, struct sigaction, enum sigval, sigaction(), SIGRTMIN, SIGRTMAX, SA_SIGINFO */ -#define USE_CONFIG_NETIO_POSIX -#include "config.h" - -#if CONFIG_NETIO_ISLINUX -# include <fcntl.h> /* for fcntl(), F_SETFL, O_ASYNC, F_SETSIG */ -#endif +#include <libcr/coroutine.h> #include <libnetio/netio.h> -#include <libcr/coroutine.h> -#include <libcr_ipc/sema.h> -/* I found the following post to be very helpful when writing this: - * http://davmac.org/davpage/linux/async-io.html */ +/* common *********************************************************************/ -static int sigs_allocated = 0; -static int sig_io = 0; -#if CONFIG_NETIO_ISLINUX -static int sig_accept = 0; -#endif - -struct netio_socket { - int fd; -#if CONFIG_NETIO_ISLINUX - cr_sema_t accept_waiters; -#endif -}; +#define USE_CONFIG_NETIO_POSIX +#include "config.h" -static struct netio_socket socket_table[CONFIG_NETIO_NUM_PORTS] = {0}; +#define UNUSED(name) /* name __attribute__ ((unused)) */ -static void handle_sig_io(int sig __attribute__ ((unused)), siginfo_t *info, void *ucontext __attribute__ ((unused))) { - cr_unpause_from_intrhandler((cid_t)info->si_value.sival_int); -} +static int sig_io = 0; -#if CONFIG_NETIO_ISLINUX -static void handle_sig_accept(int sig __attribute__ ((unused)), siginfo_t *info, void *ucontext __attribute__ ((unused))) { - struct netio_socket *sock = NULL; - for (int i = 0; sock == NULL && i < CONFIG_NETIO_NUM_PORTS; i++) - if (info->si_fd == socket_table[i].fd) - sock = &socket_table[i]; - if (!sock) - return; - cr_sema_signal_from_intrhandler(&sock->accept_waiters); +static void handle_sig_io(int UNUSED(sig), siginfo_t *info, void *UNUSED(ucontext)) { + cr_unpause_from_intrhandler((cid_t)info->si_value.sival_int); } -#endif static void _netio_init(void) { - struct sigaction action; + struct sigaction action = {0}; if (sig_io) return; - sig_io = SIGRTMIN + (sigs_allocated++); + sig_io = SIGRTMIN; if (sig_io > SIGRTMAX) error(1, 0, "SIGRTMAX exceeded"); - memset(&action, 0, sizeof(action)); + action.sa_flags = SA_SIGINFO; action.sa_sigaction = handle_sig_io; if (sigaction(sig_io, &action, NULL) < 0) error(1, errno, "sigaction"); - -#if CONFIG_NETIO_ISLINUX - sig_accept = SIGRTMIN + (sigs_allocated++); - if (sig_accept > SIGRTMAX) - error(1, 0, "SIGRTMAX exceeded"); - memset(&action, 0, sizeof(action)); - action.sa_flags = SA_SIGINFO; - action.sa_sigaction = handle_sig_accept; - if (sigaction(sig_accept, &action, NULL) < 0) - error(1, errno, "sigaction"); -#endif } +#define WAKE_COROUTINE(args) do { \ + int r; \ + union sigval val = {0}; \ + val.sival_int = (int)((args)->cr_coroutine); \ + do { \ + r = pthread_sigqueue((args)->cr_thread, sig_io, val); \ + assert(r == 0 || r == EAGAIN); \ + } while (r == EAGAIN); \ + } while (0) + +#define RUN_PTHREAD(fn, args) do { \ + pthread_t thread; \ + int r; \ + r = pthread_create(&thread, NULL, fn, args); \ + if (r) \ + return -abs(r); \ + cr_pause_and_yield(); \ + r = pthread_join(thread, NULL); \ + if (r) \ + return -abs(r); \ + } while (0) + +/* listen() *******************************************************************/ + int netio_listen(uint16_t port) { - int handle; - struct netio_socket *sock; + int sockfd; union { struct sockaddr_in in; struct sockaddr gen; @@ -92,118 +86,139 @@ int netio_listen(uint16_t port) { _netio_init(); - /* Allocate a handle out of socket_table. */ - handle = -1; - for (int i = 0; handle < 0 && i < CONFIG_NETIO_NUM_PORTS; i++) - if (socket_table[i].fd == 0) - handle = i; - if (handle < 0) - error(1, 0, "CONFIG_NETIO_NUM_PORTS exceeded"); - sock = &socket_table[handle]; - - /* Bind the socket. */ addr.in.sin_family = AF_INET; addr.in.sin_port = htons(port); - sock->fd = socket(AF_INET, SOCK_STREAM | SOCK_NONBLOCK, 0); - if (sock->fd < 0) + sockfd = socket(AF_INET, SOCK_STREAM, 0); + if (sockfd < 0) error(1, errno, "socket"); - if (setsockopt(sock->fd, SOL_SOCKET, SO_REUSEADDR, &(int){1}, sizeof(int)) < 0) + if (setsockopt(sockfd, SOL_SOCKET, SO_REUSEADDR, &(int){1}, sizeof(int)) < 0) error(1, errno, "setsockopt"); -#if CONFIG_NETIO_ISLINUX - if (fcntl(sock->fd, F_SETFL, O_ASYNC) < 0) - error(1, errno, "fcntl(F_SETFL)"); - if (fcntl(sock->fd, F_SETSIG, sig_accept) < 0) - error(1, errno, "fcntl(F_SETSIG)"); - if (fcntl(sock->fd, F_SETOWN, getpid()) < 0) - error(1, errno, "fcntl(F_SETOWN)"); -#endif - if (bind(sock->fd, &addr.gen, sizeof addr) < 0) + if (bind(sockfd, &addr.gen, sizeof addr) < 0) error(1, errno, "bind"); - if (listen(sock->fd, CONFIG_NETIO_NUM_CONNS) < 0) + if (listen(sockfd, CONFIG_NETIO_NUM_CONNS) < 0) error(1, errno, "listen"); - /* Return. */ - return handle; + return sockfd; } +/* accept() *******************************************************************/ + +struct _pthread_accept_args { + pthread_t cr_thread; + cid_t cr_coroutine; + + int sockfd; + + int *ret; +}; + +void *_pthread_accept(void *_args) { + struct _pthread_accept_args *args = _args; + *(args->ret) = accept(args->sockfd, NULL, NULL); + if (*(args->ret) < 0) + *(args->ret) = -errno; + WAKE_COROUTINE(args); + return NULL; +}; + int netio_accept(int sock) { - /* AFAICT in pure POSIX there's no good way to do this that - * isn't just busy-polling. - * - * On Linux where we can get a signal to notify us when - * there's something to accept, we still do this non-blocking - * and check EAGAIN/EWOULDBLOCK in case the client timed out - * while waiting for us to accept(). */ - for (;;) { -#if CONFIG_NETIO_ISLINUX - cr_sema_wait(&socket_table[sock].accept_waiters); -#endif - int conn = accept(socket_table[sock].fd, NULL, NULL); - if (conn < 0) { - if (errno == EAGAIN || errno == EWOULDBLOCK) { -#if !CONFIG_NETIO_ISLINUX - cr_yield(); -#endif - continue; - } - return -errno; - } - return conn; - } + int ret; + struct _pthread_accept_args args = { + .cr_thread = pthread_self(), + .cr_coroutine = cr_getcid(), + .sockfd = sock, + .ret = &ret, + }; + RUN_PTHREAD(_pthread_accept, &args); + return ret; } +/* read() *********************************************************************/ + +struct _pthread_read_args { + pthread_t cr_thread; + cid_t cr_coroutine; + + int connfd; + void *buf; + size_t count; + + ssize_t *ret; +}; + +void *_pthread_read(void *_args) { + struct _pthread_read_args *args = _args; + *(args->ret) = read(args->connfd, args->buf, args->count); + if (*(args->ret) < 0) + *(args->ret) = -errno; + WAKE_COROUTINE(args); + return NULL; +}; + ssize_t netio_read(int conn, void *buf, size_t count) { - int r; - struct aiocb ctl_block = { - .aio_fildes = conn, - .aio_buf = buf, - .aio_nbytes = count, - .aio_sigevent = { - .sigev_notify = SIGEV_SIGNAL, - .sigev_signo = sig_io, - .sigev_value = { - .sival_int = (int)cr_getcid(), - }, - }, - }; + ssize_t ret; + struct _pthread_read_args args = { + .cr_thread = pthread_self(), + .cr_coroutine = cr_getcid(), - if (aio_read(&ctl_block) < 0) - return -errno; + .connfd = conn, + .buf = buf, + .count = count, - while ((r = aio_error(&ctl_block)) == EINPROGRESS) - cr_pause_and_yield(); - return r ? -abs(r) : aio_return(&ctl_block); + .ret = &ret, + }; + RUN_PTHREAD(_pthread_read, &args); + return ret; } -ssize_t netio_write(int conn, void *buf, size_t goal) { +/* write() ********************************************************************/ + +struct _pthread_write_args { + pthread_t cr_thread; + cid_t cr_coroutine; + + int connfd; + void *buf; + size_t count; + + ssize_t *ret; +}; + +void *_pthread_write(void *_args) { + struct _pthread_read_args *args = _args; size_t done = 0; - while (done < goal) { - int r; - struct aiocb ctl_block = { - .aio_fildes = conn, - .aio_buf = &(((uint8_t *)buf)[done]), - .aio_nbytes = goal-done, - .aio_sigevent = { - .sigev_notify = SIGEV_SIGNAL, - .sigev_signo = sig_io, - .sigev_value = { - .sival_int = (int)cr_getcid(), - }, - }, - }; - - if (aio_write(&ctl_block) < 0) - return -errno; - - while ((r = aio_error(&ctl_block)) == EINPROGRESS) - cr_pause_and_yield(); - if (r < 0) - return -abs(r); - done += aio_return(&ctl_block); + while (done < args->count) { + ssize_t r = write(args->connfd, args->buf, args->count); + if (r < 0) { + *(args->ret) = -errno; + break; + } + done += r; } - return done; + if (done == args->count) + *(args->ret) = done; + WAKE_COROUTINE(args); + return NULL; +}; + +ssize_t netio_write(int conn, void *buf, size_t count) { + ssize_t ret; + struct _pthread_write_args args = { + .cr_thread = pthread_self(), + .cr_coroutine = cr_getcid(), + + .connfd = conn, + .buf = buf, + .count = count, + + .ret = &ret, + }; + RUN_PTHREAD(_pthread_write, &args); + return ret; } +/* close() ********************************************************************/ + int netio_close(int conn, bool rd, bool wr) { int how; if (rd && wr) |