From 3dcbd43ecd77c28762b0595475893ff052c0444a Mon Sep 17 00:00:00 2001 From: "Luke T. Shumaker" Date: Mon, 14 Oct 2024 18:29:15 -0600 Subject: wip libnet rewrite --- CMakeLists.txt | 2 +- HACKING.md | 2 +- cmd/srv9p/CMakeLists.txt | 3 +- cmd/srv9p/gnet.c | 263 ++++++++++++++++++++++++++++++++++++++ cmd/srv9p/gnet.h | 29 +++++ cmd/srv9p/main.c | 26 ++-- lib9p/CMakeLists.txt | 1 + lib9p/include/lib9p/srv.h | 3 +- lib9p/srv.c | 31 +++-- libnet/CMakeLists.txt | 7 + libnet/include/libnet/libnet.h | 56 ++++++++ libnetio/CMakeLists.txt | 13 -- libnetio/include/libnetio/netio.h | 27 ---- libnetio/netio_posix.c | 238 ---------------------------------- 14 files changed, 394 insertions(+), 307 deletions(-) create mode 100644 cmd/srv9p/gnet.c create mode 100644 cmd/srv9p/gnet.h create mode 100644 libnet/CMakeLists.txt create mode 100644 libnet/include/libnet/libnet.h delete mode 100644 libnetio/CMakeLists.txt delete mode 100644 libnetio/include/libnetio/netio.h delete mode 100644 libnetio/netio_posix.c diff --git a/CMakeLists.txt b/CMakeLists.txt index ae76e60..5e11d52 100644 --- a/CMakeLists.txt +++ b/CMakeLists.txt @@ -17,7 +17,7 @@ add_compile_options(-Wall -Wextra -Werror) add_subdirectory(libcr) add_subdirectory(libcr_ipc) add_subdirectory(libusb) -add_subdirectory(libnetio) +add_subdirectory(libnet) add_subdirectory(lib9p) add_subdirectory(cmd/${_UBUILD_CMD}) diff --git a/HACKING.md b/HACKING.md index 1492820..920a87d 100644 --- a/HACKING.md +++ b/HACKING.md @@ -4,7 +4,7 @@ Source layout - cmd/srv9p - libcr - A simple coroutine (cooperative multitasking) library - libcr_ipc - IPC primatives for coroutines built on top of libcr - - libnetio - TODO + - libnet - TODO - lib9p - A simple 9P protocol server library - libusb - TODO diff --git a/cmd/srv9p/CMakeLists.txt b/cmd/srv9p/CMakeLists.txt index e9d4097..61bd5c2 100644 --- a/cmd/srv9p/CMakeLists.txt +++ b/cmd/srv9p/CMakeLists.txt @@ -11,13 +11,14 @@ set(static_srcs add_executable(srv9p main.c static9p.c + gnet.c ) target_include_directories(srv9p PRIVATE ${CMAKE_CURRENT_BINARY_DIR}) target_include_directories(srv9p PUBLIC ${CMAKE_CURRENT_SOURCE_DIR}/config) target_link_libraries(srv9p libcr libcr_ipc - libnetio + libnet lib9p ) diff --git a/cmd/srv9p/gnet.c b/cmd/srv9p/gnet.c new file mode 100644 index 0000000..5072428 --- /dev/null +++ b/cmd/srv9p/gnet.c @@ -0,0 +1,263 @@ +/* gnet.c - libnet implementation for libcr + GNU libc + * + * Copyright (C) 2024 Luke T. Shumaker + * SPDX-Licence-Identifier: AGPL-3.0-or-later + */ + +#define _GNU_SOURCE /* for pthread_sigqueue(3gnu) */ +/* misc */ +#include /* for assert() */ +#include /* for errno, EAGAIN, EINVAL */ +#include /* for error(3gnu) */ +#include /* for abs(), shutdown(), SHUT_RD, SHUT_WR, SHUT_RDWR */ +#include /* for read(), write() */ +/* net */ +#include /* for htons(3p) */ +#include /* for struct sockaddr_in */ +#include /* for struct sockaddr, socket(), SOCK_* flags, setsockopt(), SOL_SOCKET, SO_REUSEADDR, bind(), listen(), accept() */ +/* async */ +#include /* for pthread_* */ +#include /* for siginfo_t, struct sigaction, enum sigval, sigaction(), SIGRTMIN, SIGRTMAX, SA_SIGINFO */ + +#include + +#include "gnet.h" + +/* common *********************************************************************/ + +#define UNUSED(name) /* name __attribute__ ((unused)) */ + +static int gnet_sig_io = 0; + +static void gnet_handle_sig_io(int UNUSED(sig), siginfo_t *info, void *UNUSED(ucontext)) { + cr_unpause_from_intrhandler((cid_t)info->si_value.sival_int); +} + +static void gnet_init(void) { + struct sigaction action = {0}; + + if (gnet_sig_io) + return; + + gnet_sig_io = SIGRTMIN; + if (gnet_sig_io > SIGRTMAX) + error(1, 0, "SIGRTMAX exceeded"); + + action.sa_flags = SA_SIGINFO; + action.sa_sigaction = gnet_handle_sig_io; + if (sigaction(gnet_sig_io, &action, NULL) < 0) + error(1, errno, "sigaction"); +} + +#define WAKE_COROUTINE(args) do { \ + int r; \ + union sigval val = {0}; \ + val.sival_int = (int)((args)->cr_coroutine); \ + do { \ + r = pthread_sigqueue((args)->cr_thread, gnet_sig_io, val); \ + assert(r == 0 || r == EAGAIN); \ + } while (r == EAGAIN); \ + } while (0) + +static inline bool RUN_PTHREAD(void *(*fn)(void *), void *args) { + pthread_t thread; + if (pthread_create(&thread, NULL, fn, args)) + return true; + cr_pause_and_yield(); + if (pthread_join(thread, NULL)) + return true; + return false; +} + +/* vtables ********************************************************************/ + +static struct libnet_conn *gnet_accept(struct libnet_listener *_listener); +static ssize_t gnet_read(struct libnet_conn *conn, void *buf, size_t count); +static ssize_t gnet_write(struct libnet_conn *conn, void *buf, size_t count); +static int gnet_close(struct libnet_conn *conn, bool rd, bool wr); + +static struct libnet_listener_vtable gnet_listener_vtable = { + .accept = gnet_accept, +}; + +static struct libnet_conn_vtable gnet_conn_vtable = { + .read = gnet_read, + .write = gnet_write, + .close = gnet_close, +}; + +/* listen() *******************************************************************/ + +void gnet_listener_init(struct gnet_listener *self, uint16_t port) { + int listenerfd; + union { + struct sockaddr_in in; + struct sockaddr gen; + } addr = { 0 }; + + gnet_init(); + + addr.in.sin_family = AF_INET; + addr.in.sin_port = htons(port); + listenerfd = socket(AF_INET, SOCK_STREAM, 0); + if (listenerfd < 0) + error(1, errno, "socket"); + if (setsockopt(listenerfd, SOL_SOCKET, SO_REUSEADDR, &(int){1}, sizeof(int)) < 0) + error(1, errno, "setsockopt"); + if (bind(listenerfd, &addr.gen, sizeof addr) < 0) + error(1, errno, "bind"); + if (listen(listenerfd, 0) < 0) + error(1, errno, "listen"); + + self->vtable = &gnet_listener_vtable; + self->fd = listenerfd; +} + +/* accept() *******************************************************************/ + +struct _pthread_accept_args { + pthread_t cr_thread; + cid_t cr_coroutine; + + int listenerfd; + + int *ret_connfd; +}; + +static void *_pthread_accept(void *_args) { + struct _pthread_accept_args *args = _args; + *(args->ret_connfd) = accept(args->listenerfd, NULL, NULL); + if (*(args->ret_connfd) < 0) + *(args->ret_connfd) = -errno; + WAKE_COROUTINE(args); + return NULL; +}; + +static struct libnet_conn *gnet_accept(struct libnet_listener *_listener) { + struct gnet_listener *listener = (struct gnet_listener *)_listener; + assert(listener); + + int ret_connfd; + struct _pthread_accept_args args = { + .cr_thread = pthread_self(), + .cr_coroutine = cr_getcid(), + .listenerfd = listener->fd, + .ret_connfd = &ret_connfd, + }; + if (RUN_PTHREAD(_pthread_accept, &args)) + return NULL; + + listener->active_conn.vtable = &gnet_conn_vtable; + listener->active_conn.fd = ret_connfd; + return (struct libnet_conn *)&listener->active_conn; +} + +/* read() *********************************************************************/ + +struct _pthread_read_args { + pthread_t cr_thread; + cid_t cr_coroutine; + + int connfd; + void *buf; + size_t count; + + ssize_t *ret; +}; + +static void *_pthread_read(void *_args) { + struct _pthread_read_args *args = _args; + *(args->ret) = read(args->connfd, args->buf, args->count); + if (*(args->ret) < 0) + *(args->ret) = -errno; + WAKE_COROUTINE(args); + return NULL; +}; + +static ssize_t gnet_read(struct libnet_conn *_conn, void *buf, size_t count) { + struct gnet_conn *conn = (struct gnet_conn *)_conn; + assert(conn); + + ssize_t ret; + struct _pthread_read_args args = { + .cr_thread = pthread_self(), + .cr_coroutine = cr_getcid(), + + .connfd = conn->fd, + .buf = buf, + .count = count, + + .ret = &ret, + }; + if (RUN_PTHREAD(_pthread_read, &args)) + return -1; + return ret; +} + +/* write() ********************************************************************/ + +struct _pthread_write_args { + pthread_t cr_thread; + cid_t cr_coroutine; + + int connfd; + void *buf; + size_t count; + + ssize_t *ret; +}; + +static void *_pthread_write(void *_args) { + struct _pthread_read_args *args = _args; + size_t done = 0; + while (done < args->count) { + ssize_t r = write(args->connfd, args->buf, args->count); + if (r < 0) { + *(args->ret) = -errno; + break; + } + done += r; + } + if (done == args->count) + *(args->ret) = done; + WAKE_COROUTINE(args); + return NULL; +}; + +static ssize_t gnet_write(struct libnet_conn *_conn, void *buf, size_t count) { + struct gnet_conn *conn = (struct gnet_conn *)_conn; + assert(conn); + + ssize_t ret; + struct _pthread_write_args args = { + .cr_thread = pthread_self(), + .cr_coroutine = cr_getcid(), + + .connfd = conn->fd, + .buf = buf, + .count = count, + + .ret = &ret, + }; + if (RUN_PTHREAD(_pthread_write, &args)) + return -1; + return ret; +} + +/* close() ********************************************************************/ + +static int gnet_close(struct libnet_conn *_conn, bool rd, bool wr) { + struct gnet_conn *conn = (struct gnet_conn *)_conn; + assert(conn); + + int how; + if (rd && wr) + how = SHUT_RDWR; + else if (rd && !wr) + how = SHUT_RD; + else if (!rd && wr) + how = SHUT_WR; + else + return -EINVAL; + return shutdown(conn->fd, how) ? -errno : 0; +} diff --git a/cmd/srv9p/gnet.h b/cmd/srv9p/gnet.h new file mode 100644 index 0000000..a724128 --- /dev/null +++ b/cmd/srv9p/gnet.h @@ -0,0 +1,29 @@ +/* gnet.c - libnet implementation for libcr + GNU libc + * + * Copyright (C) 2024 Luke T. Shumaker + * SPDX-Licence-Identifier: AGPL-3.0-or-later + */ + +#ifndef _GNET_H_ +#define _GNET_H_ + +#include /* for uint16_6 */ + +#include + +struct gnet_conn { + struct libnet_conn_vtable *vtable; + + int fd; +}; + +struct gnet_listener { + struct libnet_listener_vtable *vtable; + + int fd; + struct gnet_conn active_conn; +}; + +void gnet_listener_init(struct gnet_listener *self, uint16_t port); + +#endif /* _GNET_H_ */ diff --git a/cmd/srv9p/main.c b/cmd/srv9p/main.c index 94a00f7..4170b5d 100644 --- a/cmd/srv9p/main.c +++ b/cmd/srv9p/main.c @@ -2,11 +2,12 @@ #include #include -#include +#include #include #include "static9p.h" #include "static.h" +#include "gnet.h" /* configuration **************************************************************/ @@ -78,19 +79,28 @@ static struct lib9p_srv_file *get_root(struct lib9p_srv_ctx *UNUSED(ctx), char * /* main ***********************************************************************/ -int main() { - int sock = netio_listen(9000); - if (sock < 0) - error(1, -sock, "netio_listen"); +static COROUTINE read_cr(void *_srv) { + struct lib9p_srv *srv = _srv; + assert(srv); + + cr_begin(); + + struct gnet_listener listener; + gnet_listener_init(&listener, 9000); + + lib9p_srv_read_cr(srv, (struct libnet_listener *)&listener); + cr_end(); +} + +int main() { struct lib9p_srv srv = { - .sockfd = sock, .rootdir = get_root, }; for (int i = 0; i < CONFIG_NETIO_NUM_CONNS; i++) - if (!coroutine_add(lib9p_srv_read_cr, &srv)) - error(1, 0, "coroutine_add(lib9p_srv_read_cr, &srv)"); + if (!coroutine_add(read_cr, &srv)) + error(1, 0, "coroutine_add(read_cr, &srv)"); for (int i = 0; i < 2*CONFIG_NETIO_NUM_CONNS; i++) if (!coroutine_add(lib9p_srv_write_cr, &srv)) error(1, 0, "coroutine_add(lib9p_srv_write_cr, &srv)"); diff --git a/lib9p/CMakeLists.txt b/lib9p/CMakeLists.txt index 3584d7a..63469d9 100644 --- a/lib9p/CMakeLists.txt +++ b/lib9p/CMakeLists.txt @@ -12,4 +12,5 @@ target_sources(lib9p INTERFACE ) target_link_libraries(lib9p INTERFACE libcr_ipc + libnet ) diff --git a/lib9p/include/lib9p/srv.h b/lib9p/include/lib9p/srv.h index 4bf957c..284888f 100644 --- a/lib9p/include/lib9p/srv.h +++ b/lib9p/include/lib9p/srv.h @@ -96,7 +96,6 @@ CR_RPC_DECLARE(_lib9p_srv_reqch, struct _lib9p_srv_req *, bool) struct lib9p_srv { /* Things you provide */ - int sockfd; void /*TODO*/ (*auth )(struct lib9p_srv_ctx *, char *treename); /* optional */ struct lib9p_srv_file *(*rootdir)(struct lib9p_srv_ctx *, char *treename); @@ -114,7 +113,7 @@ struct lib9p_srv { * @errno LINUX_ERANGE R-message does not fit into max_msg_size */ -COROUTINE lib9p_srv_read_cr(void *_srv); +__attribute__ ((noreturn)) void lib9p_srv_read_cr(struct lib9p_srv *srv, struct libnet_listener *listener); COROUTINE lib9p_srv_write_cr(void *_srv); #endif /* _LIB9P_SRV_H_ */ diff --git a/lib9p/srv.c b/lib9p/srv.c index a3e4335..282cd68 100644 --- a/lib9p/srv.c +++ b/lib9p/srv.c @@ -8,13 +8,15 @@ #include #include #include -#include +#include #include #include "internal.h" /* structs ********************************************************************/ +#define MCALL(o, m, ...) (o)->vtable->m(o __VA_OPT__(,) __VA_ARGS__) + #define FIDFLAG_OPEN_R (1<<0) #define FIDFLAG_OPEN_W (1<<1) #define FIDFLAG_RCLOSE (1<<2) @@ -51,7 +53,7 @@ struct _srv_fidinfo { struct _srv_conn { /* immutable */ struct lib9p_srv *parent_srv; - int fd; + struct libnet_conn *fd; cid_t reader; /* the lib9p_srv_read_cr() coroutine */ /* mutable */ cr_mutex_t writelock; @@ -143,7 +145,7 @@ static void respond_error(struct _lib9p_srv_req *req) { &host, req->net_bytes); cr_mutex_lock(&sess->parent_conn->writelock); - r = netio_write(sess->parent_conn->fd, + r = MCALL(sess->parent_conn->fd, write, req->net_bytes, decode_u32le(req->net_bytes)); cr_mutex_unlock(&sess->parent_conn->writelock); if (r < 0) @@ -152,12 +154,12 @@ static void respond_error(struct _lib9p_srv_req *req) { /* read coroutine *************************************************************/ -static bool read_at_least(int fd, uint8_t *buf, size_t goal, size_t *done) { +static bool read_at_least(struct libnet_conn *fd, uint8_t *buf, size_t goal, size_t *done) { assert(buf); assert(goal); assert(done); while (*done < goal) { - ssize_t r = netio_read(fd, &buf[*done], CONFIG_9P_MAX_MSG_SIZE - *done); + ssize_t r = MCALL(fd, read, &buf[*done], CONFIG_9P_MAX_MSG_SIZE - *done); if (r < 0) { nonrespond_errorf("read: %s", strerror(-r)); return true; @@ -173,24 +175,23 @@ static bool read_at_least(int fd, uint8_t *buf, size_t goal, size_t *done) { static void handle_message(struct _lib9p_srv_req *ctx); -COROUTINE lib9p_srv_read_cr(void *_srv) { +__attribute__ ((noreturn)) void lib9p_srv_read_cr(struct lib9p_srv *srv, struct libnet_listener *listener) { uint8_t buf[CONFIG_9P_MAX_MSG_SIZE]; - struct lib9p_srv *srv = _srv; assert(srv); assert(srv->rootdir); - cr_begin(); + assert(listener); uint32_t initial_rerror_overhead = rerror_overhead_for_version(0, buf); for (;;) { struct _srv_conn conn = { .parent_srv = srv, - .fd = netio_accept(srv->sockfd), + .fd = MCALL(listener, accept), .reader = cr_getcid(), }; - if (conn.fd < 0) { - nonrespond_errorf("accept: %s", strerror(-conn.fd)); + if (!conn.fd) { + nonrespond_errorf("accept: error"); continue; } @@ -246,16 +247,14 @@ COROUTINE lib9p_srv_read_cr(void *_srv) { _lib9p_srv_reqch_send_req(&srv->_reqch, &req); } close: - netio_close(conn.fd, true, sess.reqs.len == 0); + MCALL(conn.fd, close, true, sess.reqs.len == 0); if (sess.reqs.len) { sess.closing = true; cr_pause_and_yield(); assert(sess.reqs.len == 0); - netio_close(conn.fd, true, true); + MCALL(conn.fd, close, true, true); } } - - cr_end(); } /* write coroutine ************************************************************/ @@ -380,7 +379,7 @@ static void handle_message(struct _lib9p_srv_req *ctx) { goto write; cr_mutex_lock(&ctx->parent_sess->parent_conn->writelock); - netio_write(ctx->parent_sess->parent_conn->fd, + MCALL(ctx->parent_sess->parent_conn->fd, write, ctx->net_bytes, decode_u32le(ctx->net_bytes)); cr_mutex_unlock(&ctx->parent_sess->parent_conn->writelock); } diff --git a/libnet/CMakeLists.txt b/libnet/CMakeLists.txt new file mode 100644 index 0000000..32047f0 --- /dev/null +++ b/libnet/CMakeLists.txt @@ -0,0 +1,7 @@ +# libnet/CMakeLists.txt - Build script for libnet support library +# +# Copyright (C) 2024 Luke T. Shumaker +# SPDX-Licence-Identifier: AGPL-3.0-or-later + +add_library(libnet INTERFACE) +target_include_directories(libnet SYSTEM INTERFACE ${CMAKE_CURRENT_LIST_DIR}/include) diff --git a/libnet/include/libnet/libnet.h b/libnet/include/libnet/libnet.h new file mode 100644 index 0000000..cef13d9 --- /dev/null +++ b/libnet/include/libnet/libnet.h @@ -0,0 +1,56 @@ +#ifndef _NETIO_H_ +#define _NETIO_H_ + +#include /* for bool */ +#include /* for size_t */ +#include /* for ssize_t */ + +struct libnet_conn; +struct libnet_listener; + +struct libnet_listener_vtable { + /** + * It is invalid to accept() a new connection if an existing + * connection is still open. + */ + struct libnet_conn *(*accept)(struct libnet_listener *self); +}; + +struct libnet_conn_vtable { + /** + * Return bytes-read on success, 0 on EOF, -errno on error; a + * short read is *not* an error. + */ + ssize_t (*read)(struct libnet_conn *self, void *buf, size_t count); + + /** + * Return `count` on success, -errno on error; a short write *is* an + * error. + * + * Writes are *not* guaranteed to be atomic (as this would be + * expensive to implement), so if you have concurrent writers then you + * should arrange for a mutex to protect the connection. + */ + ssize_t (*write)(struct libnet_conn *self, void *buf, size_t count); + + /** + * Return 0 on success, -errno on error. + */ + int (*close)(struct libnet_conn *self, bool rd, bool wr); +}; + +struct libnet_listener { + struct libnet_listener_vtable *vtable; + + /* This is where your implementation data goes. */ + char data[0]; +}; + +struct libnet_conn { + struct libnet_conn_vtable *vtable; + + /* This is where your implementation data goes. */ + char data[0]; +}; + +#endif /* _NETIO_H_ */ diff --git a/libnetio/CMakeLists.txt b/libnetio/CMakeLists.txt deleted file mode 100644 index fcfecfd..0000000 --- a/libnetio/CMakeLists.txt +++ /dev/null @@ -1,13 +0,0 @@ -# libnetio/CMakeLists.txt - Build script for libnetio support library -# -# Copyright (C) 2024 Luke T. Shumaker -# SPDX-Licence-Identifier: AGPL-3.0-or-later - -add_library(libnetio INTERFACE) -target_sources(libnetio INTERFACE - netio_posix.c -) -target_link_libraries(libnetio INTERFACE - libcr_ipc -) -target_include_directories(libnetio SYSTEM INTERFACE ${CMAKE_CURRENT_LIST_DIR}/include) diff --git a/libnetio/include/libnetio/netio.h b/libnetio/include/libnetio/netio.h deleted file mode 100644 index 370d2ca..0000000 --- a/libnetio/include/libnetio/netio.h +++ /dev/null @@ -1,27 +0,0 @@ -#ifndef _NETIO_H_ -#define _NETIO_H_ - -#include /* for bool */ -#include /* for uint16_t */ -#include /* for size_t */ -#include /* for ssize_t */ - -/** Return socket-fd on success, -errno on error. */ -int netio_listen(uint16_t port); -/** Return connection-fd on success, -errno on error. */ -int netio_accept(int sock); -/** Return bytes-read on success, 0 on EOF, -errno on error; a short read is *not* an error. */ -ssize_t netio_read(int conn, void *buf, size_t count); -/** - * Return `count` on success, -errno on error; a short write *is* an - * error. - * - * Writes are *not* guaranteed to be atomic (as this would be - * expensive to implement), so if you have concurrent writers then you - * should arrange for a mutex to protect the connection. - */ -ssize_t netio_write(int conn, void *buf, size_t count); -/** Return 0 on success, -errno on error. */ -int netio_close(int conn, bool rd, bool wr); - -#endif /* _NETIO_H_ */ diff --git a/libnetio/netio_posix.c b/libnetio/netio_posix.c deleted file mode 100644 index f3e9fe0..0000000 --- a/libnetio/netio_posix.c +++ /dev/null @@ -1,238 +0,0 @@ -/* netio_posix.c - netio implementation for POSIX-ish systems - * (actually uses a few GNU extensions) - * - * Copyright (C) 2024 Luke T. Shumaker - * SPDX-Licence-Identifier: AGPL-3.0-or-later - */ - -#define _GNU_SOURCE /* for pthread_sigqueue(3gnu) */ -/* misc */ -#include /* for assert() */ -#include /* for errno, EAGAIN, EINVAL */ -#include /* for error(3gnu) */ -#include /* for abs(), shutdown(), SHUT_RD, SHUT_WR, SHUT_RDWR */ -#include /* for read(), write() */ -/* net */ -#include /* for htons(3p) */ -#include /* for struct sockaddr_in */ -#include /* for struct sockaddr, socket(), SOCK_* flags, setsockopt(), SOL_SOCKET, SO_REUSEADDR, bind(), listen(), accept() */ -/* async */ -#include /* for pthread_* */ -#include /* for siginfo_t, struct sigaction, enum sigval, sigaction(), SIGRTMIN, SIGRTMAX, SA_SIGINFO */ - -#include - -#include - -/* configuration **************************************************************/ - -#include "config.h" - -#ifndef CONFIG_NETIO_NUM_CONNS -# error config.h must define CONFIG_NETIO_NUM_CONNS -#endif - -/* common *********************************************************************/ - -#define UNUSED(name) /* name __attribute__ ((unused)) */ - -static int sig_io = 0; - -static void handle_sig_io(int UNUSED(sig), siginfo_t *info, void *UNUSED(ucontext)) { - cr_unpause_from_intrhandler((cid_t)info->si_value.sival_int); -} - -static void _netio_init(void) { - struct sigaction action = {0}; - - if (sig_io) - return; - - sig_io = SIGRTMIN; - if (sig_io > SIGRTMAX) - error(1, 0, "SIGRTMAX exceeded"); - - action.sa_flags = SA_SIGINFO; - action.sa_sigaction = handle_sig_io; - if (sigaction(sig_io, &action, NULL) < 0) - error(1, errno, "sigaction"); -} - -#define WAKE_COROUTINE(args) do { \ - int r; \ - union sigval val = {0}; \ - val.sival_int = (int)((args)->cr_coroutine); \ - do { \ - r = pthread_sigqueue((args)->cr_thread, sig_io, val); \ - assert(r == 0 || r == EAGAIN); \ - } while (r == EAGAIN); \ - } while (0) - -#define RUN_PTHREAD(fn, args) do { \ - pthread_t thread; \ - int r; \ - r = pthread_create(&thread, NULL, fn, args); \ - if (r) \ - return -abs(r); \ - cr_pause_and_yield(); \ - r = pthread_join(thread, NULL); \ - if (r) \ - return -abs(r); \ - } while (0) - -/* listen() *******************************************************************/ - -int netio_listen(uint16_t port) { - int sockfd; - union { - struct sockaddr_in in; - struct sockaddr gen; - } addr = { 0 }; - - _netio_init(); - - addr.in.sin_family = AF_INET; - addr.in.sin_port = htons(port); - sockfd = socket(AF_INET, SOCK_STREAM, 0); - if (sockfd < 0) - error(1, errno, "socket"); - if (setsockopt(sockfd, SOL_SOCKET, SO_REUSEADDR, &(int){1}, sizeof(int)) < 0) - error(1, errno, "setsockopt"); - if (bind(sockfd, &addr.gen, sizeof addr) < 0) - error(1, errno, "bind"); - if (listen(sockfd, CONFIG_NETIO_NUM_CONNS) < 0) - error(1, errno, "listen"); - - return sockfd; -} - -/* accept() *******************************************************************/ - -struct _pthread_accept_args { - pthread_t cr_thread; - cid_t cr_coroutine; - - int sockfd; - - int *ret; -}; - -void *_pthread_accept(void *_args) { - struct _pthread_accept_args *args = _args; - *(args->ret) = accept(args->sockfd, NULL, NULL); - if (*(args->ret) < 0) - *(args->ret) = -errno; - WAKE_COROUTINE(args); - return NULL; -}; - -int netio_accept(int sock) { - int ret; - struct _pthread_accept_args args = { - .cr_thread = pthread_self(), - .cr_coroutine = cr_getcid(), - .sockfd = sock, - .ret = &ret, - }; - RUN_PTHREAD(_pthread_accept, &args); - return ret; -} - -/* read() *********************************************************************/ - -struct _pthread_read_args { - pthread_t cr_thread; - cid_t cr_coroutine; - - int connfd; - void *buf; - size_t count; - - ssize_t *ret; -}; - -void *_pthread_read(void *_args) { - struct _pthread_read_args *args = _args; - *(args->ret) = read(args->connfd, args->buf, args->count); - if (*(args->ret) < 0) - *(args->ret) = -errno; - WAKE_COROUTINE(args); - return NULL; -}; - -ssize_t netio_read(int conn, void *buf, size_t count) { - ssize_t ret; - struct _pthread_read_args args = { - .cr_thread = pthread_self(), - .cr_coroutine = cr_getcid(), - - .connfd = conn, - .buf = buf, - .count = count, - - .ret = &ret, - }; - RUN_PTHREAD(_pthread_read, &args); - return ret; -} - -/* write() ********************************************************************/ - -struct _pthread_write_args { - pthread_t cr_thread; - cid_t cr_coroutine; - - int connfd; - void *buf; - size_t count; - - ssize_t *ret; -}; - -void *_pthread_write(void *_args) { - struct _pthread_read_args *args = _args; - size_t done = 0; - while (done < args->count) { - ssize_t r = write(args->connfd, args->buf, args->count); - if (r < 0) { - *(args->ret) = -errno; - break; - } - done += r; - } - if (done == args->count) - *(args->ret) = done; - WAKE_COROUTINE(args); - return NULL; -}; - -ssize_t netio_write(int conn, void *buf, size_t count) { - ssize_t ret; - struct _pthread_write_args args = { - .cr_thread = pthread_self(), - .cr_coroutine = cr_getcid(), - - .connfd = conn, - .buf = buf, - .count = count, - - .ret = &ret, - }; - RUN_PTHREAD(_pthread_write, &args); - return ret; -} - -/* close() ********************************************************************/ - -int netio_close(int conn, bool rd, bool wr) { - int how; - if (rd && wr) - how = SHUT_RDWR; - else if (rd && !wr) - how = SHUT_RD; - else if (!rd && wr) - how = SHUT_WR; - else - return -EINVAL; - return shutdown(conn, how) ? -errno : 0; -} -- cgit v1.2.3-2-g168b