summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorLuke T. Shumaker <lukeshu@lukeshu.com>2024-10-06 11:09:09 -0600
committerLuke T. Shumaker <lukeshu@lukeshu.com>2024-10-06 11:09:09 -0600
commiteabe7380e11468fbe8fa5b7bafdb4ce5b8a448ee (patch)
tree5b9c7483983eb4b4139869211d6c96cf78bbef36
parent7ca16c45a2ecfbf415fffd384eb5efd32180c5da (diff)
libnetio: Ugg, posix aio is trash
In concept, it's exactly what I need. But - glibc's implementation of it does not allow concurrent reads and writes - it not required to work for fds that don't have absolute offsets (so no pipes, no sockets) So just spin up a pthread for each syscall. I hate it.
-rw-r--r--cmd/sbc_harness/config/config.h1
-rw-r--r--libnetio/netio_posix.c319
2 files changed, 167 insertions, 153 deletions
diff --git a/cmd/sbc_harness/config/config.h b/cmd/sbc_harness/config/config.h
index c5ba6d8..ff18b88 100644
--- a/cmd/sbc_harness/config/config.h
+++ b/cmd/sbc_harness/config/config.h
@@ -9,7 +9,6 @@
#endif
#if defined(USE_CONFIG_NETIO_POSIX) && !defined(_CONFIG_H_NETIO_POSIX_)
#define _CONFIG_H_NETIO_POSIX_
-# define CONFIG_NETIO_ISLINUX 1 /* can we use Linux-kernel-specific fcntls? */
# define CONFIG_NETIO_NUM_PORTS 1
#endif
diff --git a/libnetio/netio_posix.c b/libnetio/netio_posix.c
index dda30b0..62320f2 100644
--- a/libnetio/netio_posix.c
+++ b/libnetio/netio_posix.c
@@ -1,90 +1,84 @@
-#define _GNU_SOURCE
-#include <aio.h> /* for struct aiocb, aio_read(), aio_write(), aio_error(), aio_return(), SIGEV_SIGNAL */
-#include <arpa/inet.h> /* for htons() */
-#include <errno.h> /* for errno, EAGAIN, EWOULDBLOCK, EINPROGRESS, EINVAL */
-#include <error.h> /* for error() */
+/* netio_posix.c - netio implementation for POSIX-ish systems
+ * (actually uses a few GNU extensions)
+ *
+ * Copyright (C) 2024 Luke T. Shumaker <lukeshu@lukeshu.com>
+ * SPDX-Licence-Identifier: AGPL-3.0-or-later
+ */
+
+#define _GNU_SOURCE /* for pthread_sigqueue(3gnu) */
+/* misc */
+#include <assert.h> /* for assert() */
+#include <errno.h> /* for errno, EAGAIN, EINVAL */
+#include <error.h> /* for error(3gnu) */
+#include <stdlib.h> /* for abs(), shutdown(), SHUT_RD, SHUT_WR, SHUT_RDWR */
+#include <unistd.h> /* for read(), write() */
+/* net */
+#include <arpa/inet.h> /* for htons(3p) */
#include <netinet/in.h> /* for struct sockaddr_in */
-#include <signal.h> /* for siginfo_t, struct sigaction, sigaction(), SIGRTMIN, SIGRTMAX, SA_SIGINFO */
-#include <stdlib.h> /* for shutdown(), SHUT_RD, SHUT_WR, SHUT_RDWR */
-#include <string.h> /* for memset() */
#include <sys/socket.h> /* for struct sockaddr, socket(), SOCK_* flags, setsockopt(), SOL_SOCKET, SO_REUSEADDR, bind(), listen(), accept() */
-#include <unistd.h> /* for getpid() */
+/* async */
+#include <pthread.h> /* for pthread_* */
+#include <signal.h> /* for siginfo_t, struct sigaction, enum sigval, sigaction(), SIGRTMIN, SIGRTMAX, SA_SIGINFO */
-#define USE_CONFIG_NETIO_POSIX
-#include "config.h"
-
-#if CONFIG_NETIO_ISLINUX
-# include <fcntl.h> /* for fcntl(), F_SETFL, O_ASYNC, F_SETSIG */
-#endif
+#include <libcr/coroutine.h>
#include <libnetio/netio.h>
-#include <libcr/coroutine.h>
-#include <libcr_ipc/sema.h>
-/* I found the following post to be very helpful when writing this:
- * http://davmac.org/davpage/linux/async-io.html */
+/* common *********************************************************************/
-static int sigs_allocated = 0;
-static int sig_io = 0;
-#if CONFIG_NETIO_ISLINUX
-static int sig_accept = 0;
-#endif
-
-struct netio_socket {
- int fd;
-#if CONFIG_NETIO_ISLINUX
- cr_sema_t accept_waiters;
-#endif
-};
+#define USE_CONFIG_NETIO_POSIX
+#include "config.h"
-static struct netio_socket socket_table[CONFIG_NETIO_NUM_PORTS] = {0};
+#define UNUSED(name) /* name __attribute__ ((unused)) */
-static void handle_sig_io(int sig __attribute__ ((unused)), siginfo_t *info, void *ucontext __attribute__ ((unused))) {
- cr_unpause_from_intrhandler((cid_t)info->si_value.sival_int);
-}
+static int sig_io = 0;
-#if CONFIG_NETIO_ISLINUX
-static void handle_sig_accept(int sig __attribute__ ((unused)), siginfo_t *info, void *ucontext __attribute__ ((unused))) {
- struct netio_socket *sock = NULL;
- for (int i = 0; sock == NULL && i < CONFIG_NETIO_NUM_PORTS; i++)
- if (info->si_fd == socket_table[i].fd)
- sock = &socket_table[i];
- if (!sock)
- return;
- cr_sema_signal_from_intrhandler(&sock->accept_waiters);
+static void handle_sig_io(int UNUSED(sig), siginfo_t *info, void *UNUSED(ucontext)) {
+ cr_unpause_from_intrhandler((cid_t)info->si_value.sival_int);
}
-#endif
static void _netio_init(void) {
- struct sigaction action;
+ struct sigaction action = {0};
if (sig_io)
return;
- sig_io = SIGRTMIN + (sigs_allocated++);
+ sig_io = SIGRTMIN;
if (sig_io > SIGRTMAX)
error(1, 0, "SIGRTMAX exceeded");
- memset(&action, 0, sizeof(action));
+
action.sa_flags = SA_SIGINFO;
action.sa_sigaction = handle_sig_io;
if (sigaction(sig_io, &action, NULL) < 0)
error(1, errno, "sigaction");
-
-#if CONFIG_NETIO_ISLINUX
- sig_accept = SIGRTMIN + (sigs_allocated++);
- if (sig_accept > SIGRTMAX)
- error(1, 0, "SIGRTMAX exceeded");
- memset(&action, 0, sizeof(action));
- action.sa_flags = SA_SIGINFO;
- action.sa_sigaction = handle_sig_accept;
- if (sigaction(sig_accept, &action, NULL) < 0)
- error(1, errno, "sigaction");
-#endif
}
+#define WAKE_COROUTINE(args) do { \
+ int r; \
+ union sigval val = {0}; \
+ val.sival_int = (int)((args)->cr_coroutine); \
+ do { \
+ r = pthread_sigqueue((args)->cr_thread, sig_io, val); \
+ assert(r == 0 || r == EAGAIN); \
+ } while (r == EAGAIN); \
+ } while (0)
+
+#define RUN_PTHREAD(fn, args) do { \
+ pthread_t thread; \
+ int r; \
+ r = pthread_create(&thread, NULL, fn, args); \
+ if (r) \
+ return -abs(r); \
+ cr_pause_and_yield(); \
+ r = pthread_join(thread, NULL); \
+ if (r) \
+ return -abs(r); \
+ } while (0)
+
+/* listen() *******************************************************************/
+
int netio_listen(uint16_t port) {
- int handle;
- struct netio_socket *sock;
+ int sockfd;
union {
struct sockaddr_in in;
struct sockaddr gen;
@@ -92,118 +86,139 @@ int netio_listen(uint16_t port) {
_netio_init();
- /* Allocate a handle out of socket_table. */
- handle = -1;
- for (int i = 0; handle < 0 && i < CONFIG_NETIO_NUM_PORTS; i++)
- if (socket_table[i].fd == 0)
- handle = i;
- if (handle < 0)
- error(1, 0, "CONFIG_NETIO_NUM_PORTS exceeded");
- sock = &socket_table[handle];
-
- /* Bind the socket. */
addr.in.sin_family = AF_INET;
addr.in.sin_port = htons(port);
- sock->fd = socket(AF_INET, SOCK_STREAM | SOCK_NONBLOCK, 0);
- if (sock->fd < 0)
+ sockfd = socket(AF_INET, SOCK_STREAM, 0);
+ if (sockfd < 0)
error(1, errno, "socket");
- if (setsockopt(sock->fd, SOL_SOCKET, SO_REUSEADDR, &(int){1}, sizeof(int)) < 0)
+ if (setsockopt(sockfd, SOL_SOCKET, SO_REUSEADDR, &(int){1}, sizeof(int)) < 0)
error(1, errno, "setsockopt");
-#if CONFIG_NETIO_ISLINUX
- if (fcntl(sock->fd, F_SETFL, O_ASYNC) < 0)
- error(1, errno, "fcntl(F_SETFL)");
- if (fcntl(sock->fd, F_SETSIG, sig_accept) < 0)
- error(1, errno, "fcntl(F_SETSIG)");
- if (fcntl(sock->fd, F_SETOWN, getpid()) < 0)
- error(1, errno, "fcntl(F_SETOWN)");
-#endif
- if (bind(sock->fd, &addr.gen, sizeof addr) < 0)
+ if (bind(sockfd, &addr.gen, sizeof addr) < 0)
error(1, errno, "bind");
- if (listen(sock->fd, CONFIG_NETIO_NUM_CONNS) < 0)
+ if (listen(sockfd, CONFIG_NETIO_NUM_CONNS) < 0)
error(1, errno, "listen");
- /* Return. */
- return handle;
+ return sockfd;
}
+/* accept() *******************************************************************/
+
+struct _pthread_accept_args {
+ pthread_t cr_thread;
+ cid_t cr_coroutine;
+
+ int sockfd;
+
+ int *ret;
+};
+
+void *_pthread_accept(void *_args) {
+ struct _pthread_accept_args *args = _args;
+ *(args->ret) = accept(args->sockfd, NULL, NULL);
+ if (*(args->ret) < 0)
+ *(args->ret) = -errno;
+ WAKE_COROUTINE(args);
+ return NULL;
+};
+
int netio_accept(int sock) {
- /* AFAICT in pure POSIX there's no good way to do this that
- * isn't just busy-polling.
- *
- * On Linux where we can get a signal to notify us when
- * there's something to accept, we still do this non-blocking
- * and check EAGAIN/EWOULDBLOCK in case the client timed out
- * while waiting for us to accept(). */
- for (;;) {
-#if CONFIG_NETIO_ISLINUX
- cr_sema_wait(&socket_table[sock].accept_waiters);
-#endif
- int conn = accept(socket_table[sock].fd, NULL, NULL);
- if (conn < 0) {
- if (errno == EAGAIN || errno == EWOULDBLOCK) {
-#if !CONFIG_NETIO_ISLINUX
- cr_yield();
-#endif
- continue;
- }
- return -errno;
- }
- return conn;
- }
+ int ret;
+ struct _pthread_accept_args args = {
+ .cr_thread = pthread_self(),
+ .cr_coroutine = cr_getcid(),
+ .sockfd = sock,
+ .ret = &ret,
+ };
+ RUN_PTHREAD(_pthread_accept, &args);
+ return ret;
}
+/* read() *********************************************************************/
+
+struct _pthread_read_args {
+ pthread_t cr_thread;
+ cid_t cr_coroutine;
+
+ int connfd;
+ void *buf;
+ size_t count;
+
+ ssize_t *ret;
+};
+
+void *_pthread_read(void *_args) {
+ struct _pthread_read_args *args = _args;
+ *(args->ret) = read(args->connfd, args->buf, args->count);
+ if (*(args->ret) < 0)
+ *(args->ret) = -errno;
+ WAKE_COROUTINE(args);
+ return NULL;
+};
+
ssize_t netio_read(int conn, void *buf, size_t count) {
- int r;
- struct aiocb ctl_block = {
- .aio_fildes = conn,
- .aio_buf = buf,
- .aio_nbytes = count,
- .aio_sigevent = {
- .sigev_notify = SIGEV_SIGNAL,
- .sigev_signo = sig_io,
- .sigev_value = {
- .sival_int = (int)cr_getcid(),
- },
- },
- };
+ ssize_t ret;
+ struct _pthread_read_args args = {
+ .cr_thread = pthread_self(),
+ .cr_coroutine = cr_getcid(),
- if (aio_read(&ctl_block) < 0)
- return -errno;
+ .connfd = conn,
+ .buf = buf,
+ .count = count,
- while ((r = aio_error(&ctl_block)) == EINPROGRESS)
- cr_pause_and_yield();
- return r ? -abs(r) : aio_return(&ctl_block);
+ .ret = &ret,
+ };
+ RUN_PTHREAD(_pthread_read, &args);
+ return ret;
}
-ssize_t netio_write(int conn, void *buf, size_t goal) {
+/* write() ********************************************************************/
+
+struct _pthread_write_args {
+ pthread_t cr_thread;
+ cid_t cr_coroutine;
+
+ int connfd;
+ void *buf;
+ size_t count;
+
+ ssize_t *ret;
+};
+
+void *_pthread_write(void *_args) {
+ struct _pthread_read_args *args = _args;
size_t done = 0;
- while (done < goal) {
- int r;
- struct aiocb ctl_block = {
- .aio_fildes = conn,
- .aio_buf = &(((uint8_t *)buf)[done]),
- .aio_nbytes = goal-done,
- .aio_sigevent = {
- .sigev_notify = SIGEV_SIGNAL,
- .sigev_signo = sig_io,
- .sigev_value = {
- .sival_int = (int)cr_getcid(),
- },
- },
- };
-
- if (aio_write(&ctl_block) < 0)
- return -errno;
-
- while ((r = aio_error(&ctl_block)) == EINPROGRESS)
- cr_pause_and_yield();
- if (r < 0)
- return -abs(r);
- done += aio_return(&ctl_block);
+ while (done < args->count) {
+ ssize_t r = write(args->connfd, args->buf, args->count);
+ if (r < 0) {
+ *(args->ret) = -errno;
+ break;
+ }
+ done += r;
}
- return done;
+ if (done == args->count)
+ *(args->ret) = done;
+ WAKE_COROUTINE(args);
+ return NULL;
+};
+
+ssize_t netio_write(int conn, void *buf, size_t count) {
+ ssize_t ret;
+ struct _pthread_write_args args = {
+ .cr_thread = pthread_self(),
+ .cr_coroutine = cr_getcid(),
+
+ .connfd = conn,
+ .buf = buf,
+ .count = count,
+
+ .ret = &ret,
+ };
+ RUN_PTHREAD(_pthread_write, &args);
+ return ret;
}
+/* close() ********************************************************************/
+
int netio_close(int conn, bool rd, bool wr) {
int how;
if (rd && wr)