diff options
author | Luke T. Shumaker <lukeshu@lukeshu.com> | 2025-04-16 05:27:40 -0600 |
---|---|---|
committer | Luke T. Shumaker <lukeshu@lukeshu.com> | 2025-04-16 05:27:40 -0600 |
commit | fdc9b04c051c7a49dcc48866b5675e06b9a87abe (patch) | |
tree | 30008026f025696066024c8d32001cf938afb9cc | |
parent | 802ed1e3cd0252cafd1be2aada0addf4d3f7eb2e (diff) | |
parent | f56ae03840bb264163c4035eb72fbac938847638 (diff) |
Merge branch 'lukeshu/9p-fix-flush'
25 files changed, 572 insertions, 406 deletions
diff --git a/cmd/sbc_harness/config/config.h b/cmd/sbc_harness/config/config.h index 5367dbe..ca23462 100644 --- a/cmd/sbc_harness/config/config.h +++ b/cmd/sbc_harness/config/config.h @@ -39,6 +39,18 @@ #define CONFIG_9P_MAX_ERR_SIZE 128 /* 128 is what Plan 9 4e uses */ +#define CONFIG_9P_ENABLE_9P2000 1 /* bool */ +#define CONFIG_9P_ENABLE_9P2000_u 1 /* bool */ +#define CONFIG_9P_ENABLE_9P2000_e 0 /* bool */ +#define CONFIG_9P_ENABLE_9P2000_L 0 /* bool */ +#define CONFIG_9P_ENABLE_9P2000_p9p 0 /* bool */ + +/* 9P_SRV *********************************************************************/ + +#define CONFIG_9P_SRV_DEBUG 1 /* bool */ + +#define CONFIG_9P_SRV_MAX_MSG_SIZE ((4*1024)+24) + /** * This max-msg-size is sized so that a Twrite message can return * 8KiB of data. @@ -64,12 +76,6 @@ */ #define CONFIG_9P_SRV_MAX_HOSTMSG_SIZE CONFIG_9P_SRV_MAX_MSG_SIZE+16 -#define CONFIG_9P_ENABLE_9P2000 1 /* bool */ -#define CONFIG_9P_ENABLE_9P2000_u 1 /* bool */ -#define CONFIG_9P_ENABLE_9P2000_e 0 /* bool */ -#define CONFIG_9P_ENABLE_9P2000_L 0 /* bool */ -#define CONFIG_9P_ENABLE_9P2000_p9p 0 /* bool */ - /* DHCP ***********************************************************************/ #define CONFIG_DHCP_CAN_RECV_UNICAST_IP_WITHOUT_IP 0 /* bool */ diff --git a/cmd/sbc_harness/fs_harness_uptime_txt.c b/cmd/sbc_harness/fs_harness_uptime_txt.c index 1425bf9..dd5c681 100644 --- a/cmd/sbc_harness/fs_harness_uptime_txt.c +++ b/cmd/sbc_harness/fs_harness_uptime_txt.c @@ -5,10 +5,10 @@ */ #include <stdio.h> /* for snprintf() */ -#include <stdlib.h> /* for malloc(), free() */ #include <libhw/generic/alarmclock.h> #include <util9p/static.h> +#include <libmisc/alloc.h> /* for heap_alloc(), free() */ #include "fs_harness_uptime_txt.h" @@ -91,7 +91,7 @@ static lo_interface lib9p_srv_fio uptime_file_fopen(struct uptime_file *self, st assert(self); assert(ctx); - struct uptime_fio *ret = malloc(sizeof(struct uptime_fio)); + struct uptime_fio *ret = heap_alloc(1, struct uptime_fio); ret->parent = self; ret->buf_len = 0; diff --git a/lib9p/srv.c b/lib9p/srv.c index dae73ea..b14ae33 100644 --- a/lib9p/srv.c +++ b/lib9p/srv.c @@ -4,7 +4,6 @@ * SPDX-License-Identifier: AGPL-3.0-or-later */ -#include <alloca.h> #include <inttypes.h> /* for PRI* */ #include <limits.h> /* for SSIZE_MAX, not set by newlib */ #include <stddef.h> /* for size_t */ @@ -17,6 +16,7 @@ #include <libcr/coroutine.h> #include <libcr_ipc/chan.h> #include <libcr_ipc/mutex.h> +#include <libmisc/alloc.h> #include <libmisc/assert.h> #include <libmisc/endian.h> #include <libmisc/map.h> @@ -43,17 +43,21 @@ static_assert(CONFIG_9P_SRV_MAX_HOSTMSG_SIZE <= SSIZE_MAX); /* context ********************************************************************/ -bool lib9p_srv_flush_requested(struct lib9p_srv_ctx *ctx) { +bool lib9p_srv_accept_flush(struct lib9p_srv_ctx *ctx) { assert(ctx); - return cr_chan_can_send(&ctx->flushch); + if (cr_chan_can_send(&ctx->flush_ch)) { + ctx->flush_observed = true; + return true; + } + return false; } -void lib9p_srv_acknowledge_flush(struct lib9p_srv_ctx *ctx) { - assert(ctx); - assert(cr_chan_can_send(&ctx->flushch)); - lib9p_error(&ctx->basectx, LIB9P_ERRNO_L_ECANCELED, "request canceled by flush"); - cr_chan_send(&ctx->flushch, true); -} +#define req_debugf(fmt, ...) \ + debugf("cid=%zu: %s(tag=%"PRIu16"): " fmt, \ + cr_getcid(), \ + lib9p_msgtype_str(ctx->basectx.version, ctx->net_bytes[4]), \ + ctx->tag \ + __VA_OPT__(,) __VA_ARGS__) /* structs ********************************************************************/ @@ -450,8 +454,6 @@ void lib9p_srv_accept_and_read_loop(struct lib9p_srv *srv, lo_interface net_stre } } -static void handle_message(struct srv_req *ctx); - void lib9p_srv_read(struct lib9p_srv *srv, lo_interface net_stream_conn _conn) { assert(srv); assert(srv->rootdir); @@ -511,7 +513,7 @@ void lib9p_srv_read(struct lib9p_srv *srv, lo_interface net_stream_conn _conn) { /* Handle the message... */ if (req.net_bytes[4] == LIB9P_TYP_Tversion) /* ...in this coroutine for Tversion, */ - handle_message(&req); + lib9p_srv_worker(&req); else /* ...but usually in another coroutine. */ cr_rpc_send_req(&srv->_reqch, &req); @@ -577,14 +579,7 @@ void lib9p_srv_worker_loop(struct lib9p_srv *srv) { cr_rpc_send_resp(rpc_handle, 0); /* Process the request. **************************************/ - handle_message(&req); - - /* Release resources. ****************************************/ - while (cr_chan_can_send(&req.flushch)) - cr_chan_send(&req.flushch, false); - map_del(&req.parent_sess->reqs, req.tag); - if (req.parent_sess->closing && !map_len(&req.parent_sess->reqs)) - cr_unpause(req.parent_sess->parent_conn->reader); + lib9p_srv_worker(&req); } } @@ -616,35 +611,11 @@ _HANDLER_PROTO(swrite); typedef void (*tmessage_handler)(struct srv_req *, void *, void *); -static tmessage_handler tmessage_handlers[0x100] = { - [LIB9P_TYP_Tversion] = (tmessage_handler)handle_Tversion, - [LIB9P_TYP_Tauth] = (tmessage_handler)handle_Tauth, - [LIB9P_TYP_Tattach] = (tmessage_handler)handle_Tattach, - [LIB9P_TYP_Tflush] = (tmessage_handler)handle_Tflush, - [LIB9P_TYP_Twalk] = (tmessage_handler)handle_Twalk, - [LIB9P_TYP_Topen] = (tmessage_handler)handle_Topen, - [LIB9P_TYP_Tcreate] = (tmessage_handler)handle_Tcreate, - [LIB9P_TYP_Tread] = (tmessage_handler)handle_Tread, - [LIB9P_TYP_Twrite] = (tmessage_handler)handle_Twrite, - [LIB9P_TYP_Tclunk] = (tmessage_handler)handle_Tclunk, - [LIB9P_TYP_Tremove] = (tmessage_handler)handle_Tremove, - [LIB9P_TYP_Tstat] = (tmessage_handler)handle_Tstat, - [LIB9P_TYP_Twstat] = (tmessage_handler)handle_Twstat, -#if CONFIG_9P_ENABLE_9P2000_p9p - [LIB9P_TYP_Topenfd] = (tmessage_handler)handle_Topenfd, -#endif -#if CONFIG_9P_ENABLE_9P2000_e - [LIB9P_TYP_Tsession] = (tmessage_handler)handle_Tsession, - [LIB9P_TYP_Tsread] = (tmessage_handler)handle_Tsread, - [LIB9P_TYP_Tswrite] = (tmessage_handler)handle_Tswrite, -#endif -}; - -static void handle_message(struct srv_req *ctx) { +void lib9p_srv_worker(struct srv_req *ctx) { uint8_t *host_req = NULL; uint8_t host_resp[CONFIG_9P_SRV_MAX_HOSTMSG_SIZE]; - /* Unmarshal it. */ + /* Unmarshal it. *****************************************************/ ssize_t host_size = lib9p_Tmsg_validate(&ctx->basectx, ctx->net_bytes); if (host_size < 0) goto write; @@ -655,13 +626,45 @@ static void handle_message(struct srv_req *ctx) { &typ, host_req); srv_msglog(ctx, typ, host_req); - /* Handle it. */ - tmessage_handlers[typ](ctx, (void *)host_req, (void *)host_resp); + /* Handle it. ********************************************************/ + tmessage_handler handler; +#pragma GCC diagnostic push +#pragma GCC diagnostic ignored "-Wswitch-enum" + switch (typ) { + case LIB9P_TYP_Tversion: handler = (tmessage_handler)handle_Tversion; break; + case LIB9P_TYP_Tauth: handler = (tmessage_handler)handle_Tauth; break; + case LIB9P_TYP_Tattach: handler = (tmessage_handler)handle_Tattach; break; + case LIB9P_TYP_Tflush: handler = (tmessage_handler)handle_Tflush; break; + case LIB9P_TYP_Twalk: handler = (tmessage_handler)handle_Twalk; break; + case LIB9P_TYP_Topen: handler = (tmessage_handler)handle_Topen; break; + case LIB9P_TYP_Tcreate: handler = (tmessage_handler)handle_Tcreate; break; + case LIB9P_TYP_Tread: handler = (tmessage_handler)handle_Tread; break; + case LIB9P_TYP_Twrite: handler = (tmessage_handler)handle_Twrite; break; + case LIB9P_TYP_Tclunk: handler = (tmessage_handler)handle_Tclunk; break; + case LIB9P_TYP_Tremove: handler = (tmessage_handler)handle_Tremove; break; + case LIB9P_TYP_Tstat: handler = (tmessage_handler)handle_Tstat; break; + case LIB9P_TYP_Twstat: handler = (tmessage_handler)handle_Twstat; break; +#if CONFIG_9P_ENABLE_9P2000_p9p + case LIB9P_TYP_Topenfd: handler = (tmessage_handler)handle_Topenfd; break +#endif +#if CONFIG_9P_ENABLE_9P2000_e + case LIB9P_TYP_Tsession: handler = (tmessage_handler)handle_Tsession; break; + case LIB9P_TYP_Tsread: handler = (tmessage_handler)handle_Tsread; break; + case LIB9P_TYP_Tswrite: handler = (tmessage_handler)handle_Tswrite; break; +#endif + default: + assert_notreached("lib9p_Tmsg_validate() should have rejected unknown typ"); + } +#pragma GCC diagnostic pop + handler(ctx, (void *)host_req, (void *)host_resp); + /* Write the response. ***********************************************/ write: - if (lib9p_ctx_has_error(&ctx->basectx)) + if (lib9p_ctx_has_error(&ctx->basectx)) { srv_respond_error(ctx); - else { + } else if (ctx->flush_observed) { + /* do nothing */ + } else { struct lib9p_Rmsg_send_buf net_resp; if (lib9p_Rmsg_marshal(&ctx->basectx, typ+1, host_resp, @@ -670,6 +673,16 @@ static void handle_message(struct srv_req *ctx) { srv_msglog(ctx, typ+1, &host_resp); srv_write_Rmsg(ctx, &net_resp); } + /* Release resources. ************************************************/ + map_del(&ctx->parent_sess->reqs, ctx->tag); + size_t nwaiters; + while ((nwaiters = cr_chan_num_waiters(&ctx->flush_ch))) { + cr_chan_send(&ctx->flush_ch, (nwaiters == 1) + ? _LIB9P_SRV_FLUSH_RFLUSH + : _LIB9P_SRV_FLUSH_SILENT); + } + if (ctx->parent_sess->closing && !map_len(&ctx->parent_sess->reqs)) + cr_unpause(ctx->parent_sess->parent_conn->reader); if (host_req) free(host_req); free(ctx->net_bytes); @@ -736,15 +749,15 @@ static void handle_Tversion(struct srv_req *ctx, if (map_len(&ctx->parent_sess->reqs)) { /* Flush all in-progress requests, and wait for them * to finish. */ - struct cr_select_arg *list = alloca(sizeof(struct cr_select_arg) * map_len(&ctx->parent_sess->reqs)); + struct cr_select_arg *args = stack_alloc(map_len(&ctx->parent_sess->reqs), struct cr_select_arg); while (map_len(&ctx->parent_sess->reqs)) { size_t i = 0; - bool flushed; MAP_FOREACH(&ctx->parent_sess->reqs, tag, reqpp) { - list[i] = CR_SELECT_RECV(&((*reqpp)->flushch), &flushed); + enum _lib9p_srv_flush_result flushed; + args[i++] = CR_SELECT_RECV(&((*reqpp)->flush_ch), &flushed); } assert(i == map_len(&ctx->parent_sess->reqs)); - cr_select_v(i, list); + cr_select_v(i, args); } } if (map_len(&ctx->parent_sess->fids)) { @@ -867,8 +880,21 @@ static void handle_Tflush(struct srv_req *ctx, srv_handler_common(ctx, req, resp); struct srv_req **oldreqp = map_load(&ctx->parent_sess->reqs, req->oldtag); - if (oldreqp) - cr_chan_recv(&((*oldreqp)->flushch)); + if (oldreqp) { + struct srv_req *oldreq = *oldreqp; + enum _lib9p_srv_flush_result res = _LIB9P_SRV_FLUSH_RFLUSH; + switch (cr_select_l(CR_SELECT_RECV(&oldreq->flush_ch, &res), + CR_SELECT_SEND(&ctx->flush_ch, &res))) { + case 0: /* original request returned */ + req_debugf("original request (tag=%"PRIu16") returned", req->oldtag); + ctx->flush_observed = (res == _LIB9P_SRV_FLUSH_SILENT); + break; + case 1: /* flush itself got flushed */ + req_debugf("flush itself flushed"); + ctx->flush_observed = true; + break; + } + } } static void handle_Twalk(struct srv_req *ctx, @@ -1146,7 +1172,7 @@ static void handle_Tread(struct srv_req *ctx, case SRV_FILETYPE_FILE: struct iovec iov; LO_CALL(fidinfo->file.io, pread, ctx, req->count, req->offset, &iov); - if (!lib9p_ctx_has_error(&ctx->basectx)) { + if (!lib9p_ctx_has_error(&ctx->basectx) && !ctx->flush_observed) { resp->count = iov.iov_len; resp->data = iov.iov_base; if (resp->count > req->count) diff --git a/lib9p/srv_include/lib9p/srv.h b/lib9p/srv_include/lib9p/srv.h index db5be41..5c1614a 100644 --- a/lib9p/srv_include/lib9p/srv.h +++ b/lib9p/srv_include/lib9p/srv.h @@ -8,8 +8,8 @@ #define _LIB9P_SRV_H_ #include <libcr/coroutine.h> -#include <libcr_ipc/rpc.h> #include <libcr_ipc/chan.h> +#include <libcr_ipc/rpc.h> #include <libhw/generic/net.h> #include <libmisc/assert.h> #include <libmisc/private.h> @@ -19,8 +19,6 @@ /* context ********************************************************************/ -CR_CHAN_DECLARE(_lib9p_srv_flushch, bool); - struct lib9p_srv_authinfo { lib9p_nuid_t uid; struct lib9p_s uname; @@ -30,21 +28,27 @@ struct lib9p_srv_authinfo { END_PRIVATE(LIB9P_SRV_H); }; +enum _lib9p_srv_flush_result { + _LIB9P_SRV_FLUSH_RFLUSH, + _LIB9P_SRV_FLUSH_SILENT, +}; + +CR_CHAN_DECLARE(_lib9p_srv_flush_ch, enum _lib9p_srv_flush_result); + struct lib9p_srv_ctx { struct lib9p_ctx basectx; struct lib9p_srv_authinfo *authinfo; BEGIN_PRIVATE(LIB9P_SRV_H); - struct _lib9p_srv_sess *parent_sess; - lib9p_tag_t tag; - uint8_t *net_bytes; - _lib9p_srv_flushch_t flushch; + struct _lib9p_srv_sess *parent_sess; + lib9p_tag_t tag; + uint8_t *net_bytes; + _lib9p_srv_flush_ch_t flush_ch; + bool flush_observed; END_PRIVATE(LIB9P_SRV_H); }; -bool lib9p_srv_flush_requested(struct lib9p_srv_ctx *ctx); - -void lib9p_srv_acknowledge_flush(struct lib9p_srv_ctx *ctx); +bool lib9p_srv_accept_flush(struct lib9p_srv_ctx *ctx); /* interface definitions ******************************************************/ @@ -207,20 +211,32 @@ void lib9p_srv_accept_and_read_loop(struct lib9p_srv *srv, lo_interface net_stre * @errno L_EDOM Tversion specified an impossibly small max_msg_size * @errno L_EOPNOTSUPP T-message has an R-message type, or an unrecognized T-message type * @errno L_EBADMSG T-message has wrong size[4] for its content, or has invalid UTF-8 - * @errno L_ERANGE R-message does not fit into max_msg_size */ void lib9p_srv_read(struct lib9p_srv *srv, lo_interface net_stream_conn conn); - - /** - * In a loop, service requests to the `struct lib9p_srv *srv` argument - * that have been read by lib9p_srv_accept_and_read_loop() / - * lib9p_srv_read(). A "NULL" request causes the function to return. + * In a loop, call lib9p_srv_worker() to service requests to the + * `struct lib9p_srv *srv` argument that have been read by + * lib9p_srv_accept_and_read_loop() / lib9p_srv_read(). A "NULL" + * request causes the function to return. * * @param srv: The server configuration and state; has an associated * pool of lib9p_srv_accept_and_read_loop() coroutines. */ void lib9p_srv_worker_loop(struct lib9p_srv *srv); +/** + * You should probably not call this directly; you should probably use + * lib9p_srv_worker_loop(). + * + * Handle and send a response to a single request. + * + * @param req: The request to handle. + * + * Errors that this function itself may send to clients: + * + * @errno L_ERANGE R-message does not fit into max_msg_size + */ +void lib9p_srv_worker(struct lib9p_srv_ctx *req); + #endif /* _LIB9P_SRV_H_ */ diff --git a/lib9p/tests/test_server/CMakeLists.txt b/lib9p/tests/test_server/CMakeLists.txt index b659373..c61d344 100644 --- a/lib9p/tests/test_server/CMakeLists.txt +++ b/lib9p/tests/test_server/CMakeLists.txt @@ -9,8 +9,8 @@ if (PICO_PLATFORM STREQUAL "host") add_library(test_server_objs OBJECT main.c + fs_flush.c fs_shutdown.c - fs_slowread.c fs_whoami.c ) target_include_directories(test_server_objs PUBLIC ${CMAKE_CURRENT_SOURCE_DIR}/config) diff --git a/lib9p/tests/test_server/config/config.h b/lib9p/tests/test_server/config/config.h index d9cf008..f49894b 100644 --- a/lib9p/tests/test_server/config/config.h +++ b/lib9p/tests/test_server/config/config.h @@ -14,6 +14,16 @@ #define CONFIG_9P_MAX_ERR_SIZE 128 /* 128 is what Plan 9 4e uses */ +#define CONFIG_9P_ENABLE_9P2000 1 /* bool */ +#define CONFIG_9P_ENABLE_9P2000_u 1 /* bool */ +#define CONFIG_9P_ENABLE_9P2000_e 0 /* bool */ +#define CONFIG_9P_ENABLE_9P2000_L 0 /* bool */ +#define CONFIG_9P_ENABLE_9P2000_p9p 0 /* bool */ + +/* 9P_SRV *********************************************************************/ + +#define CONFIG_9P_SRV_DEBUG 1 /* bool */ + /** * This max-msg-size is sized so that a Twrite message can return * 8KiB of data. @@ -39,12 +49,6 @@ */ #define CONFIG_9P_SRV_MAX_HOSTMSG_SIZE CONFIG_9P_SRV_MAX_MSG_SIZE+16 -#define CONFIG_9P_ENABLE_9P2000 1 /* bool */ -#define CONFIG_9P_ENABLE_9P2000_u 1 /* bool */ -#define CONFIG_9P_ENABLE_9P2000_e 0 /* bool */ -#define CONFIG_9P_ENABLE_9P2000_L 0 /* bool */ -#define CONFIG_9P_ENABLE_9P2000_p9p 0 /* bool */ - /* COROUTINE ******************************************************************/ #define CONFIG_COROUTINE_STACK_SIZE_DEFAULT (32*1024) diff --git a/lib9p/tests/test_server/fs_flush.c b/lib9p/tests/test_server/fs_flush.c new file mode 100644 index 0000000..dbed3e7 --- /dev/null +++ b/lib9p/tests/test_server/fs_flush.c @@ -0,0 +1,139 @@ +/* lib9p/tests/test_server/fs_flush.c - flush-* API endpoints + * + * Copyright (C) 2024-2025 Luke T. Shumaker <lukeshu@lukeshu.com> + * SPDX-License-Identifier: AGPL-3.0-or-later + */ + +#include <libmisc/alloc.h> + +#define IMPLEMENTATION_FOR_LIB9P_SRV_H YES /* for ctx->flush_ch */ +#include "fs_flush.h" + +LO_IMPLEMENTATION_C(lib9p_srv_file, struct flush_file, flush_file, static); + +struct flush_fio { + struct flush_file *parent; +}; +LO_IMPLEMENTATION_H(lib9p_srv_fio, struct flush_fio, flush_fio); +LO_IMPLEMENTATION_C(lib9p_srv_fio, struct flush_fio, flush_fio, static); + +/* srv_file *******************************************************************/ + +static void flush_file_free(struct flush_file *self) { + assert(self); +} +static struct lib9p_qid flush_file_qid(struct flush_file *self) { + assert(self); + return (struct lib9p_qid){ + .type = LIB9P_QT_FILE, + .vers = 1, + .path = self->pathnum, + }; +} + +static struct lib9p_stat flush_file_stat(struct flush_file *self, struct lib9p_srv_ctx *ctx) { + assert(self); + assert(ctx); + return (struct lib9p_stat){ + .kern_type = 0, + .kern_dev = 0, + .file_qid = flush_file_qid(self), + .file_mode = 0444, + .file_atime = UTIL9P_ATIME, + .file_mtime = UTIL9P_MTIME, + .file_size = 6, + .file_name = lib9p_str(self->name), + .file_owner_uid = lib9p_str("root"), + .file_owner_gid = lib9p_str("root"), + .file_last_modified_uid = lib9p_str("root"), + .file_extension = lib9p_str(NULL), + .file_owner_n_uid = 0, + .file_owner_n_gid = 0, + .file_last_modified_n_uid = 0, + }; +} +static void flush_file_wstat(struct flush_file *self, struct lib9p_srv_ctx *ctx, struct lib9p_stat) { + assert(self); + assert(ctx); + lib9p_error(&ctx->basectx, LIB9P_ERRNO_L_EROFS, "cannot wstat API file"); +} +static void flush_file_remove(struct flush_file *self, struct lib9p_srv_ctx *ctx) { + assert(self); + assert(ctx); + lib9p_error(&ctx->basectx, LIB9P_ERRNO_L_EROFS, "cannot remove API file"); +} + +LIB9P_SRV_NOTDIR(struct flush_file, flush_file) + +static lo_interface lib9p_srv_fio flush_file_fopen(struct flush_file *self, struct lib9p_srv_ctx *ctx, bool, bool, bool) { + assert(self); + assert(ctx); + + struct flush_fio *ret = heap_alloc(1, struct flush_fio); + ret->parent = self; + + return lo_box_flush_fio_as_lib9p_srv_fio(ret); +} + +/* srv_fio ********************************************************************/ + +static void flush_fio_iofree(struct flush_fio *self) { + assert(self); + free(self); +} + +static struct lib9p_qid flush_fio_qid(struct flush_fio *self) { + assert(self); + return flush_file_qid(self->parent); +} + +static uint32_t flush_fio_iounit(struct flush_fio *self) { + assert(self); + return 0; +} + +static uint32_t flush_fio_pwrite(struct flush_fio *LM_UNUSED(self), + struct lib9p_srv_ctx *LM_UNUSED(ctx), + void *LM_UNUSED(buf), uint32_t LM_UNUSED(byte_count), + uint64_t LM_UNUSED(offset)) { + assert_notreached("not writable"); +} + +static void flush_fio_pread(struct flush_fio *self, struct lib9p_srv_ctx *ctx, + uint32_t byte_count, uint64_t LM_UNUSED(byte_offset), + struct iovec *ret) { + assert(self); + assert(ctx); + assert(ret); + + /* Wait for first Tflush */ + while (cr_chan_num_waiters(&ctx->flush_ch) == 0) + cr_yield(); + + /* Wait for the specified number of Tflush (may be higher *or* + * lower than 1; lower would mean that the first Tflush needs + * to be flushed itself). */ + while (cr_chan_num_waiters(&ctx->flush_ch) != self->parent->flush_cnt) + cr_yield(); + + /* Return */ + bool flushed; + switch (self->parent->flush_behavior) { + case FLUSH_READ: + *ret = (struct iovec){ + .iov_base = "Sloth\n", + .iov_len = 6 < byte_count ? 6 : byte_count, + }; + break; + case FLUSH_ERROR: + flushed = lib9p_srv_accept_flush(ctx); + assert(flushed); + lib9p_error(&ctx->basectx, LIB9P_ERRNO_L_ECANCELED, "request canceled by flush"); + break; + case FLUSH_SILENT: + flushed = lib9p_srv_accept_flush(ctx); + assert(flushed); + break; + } + cr_yield(); +} diff --git a/lib9p/tests/test_server/fs_flush.h b/lib9p/tests/test_server/fs_flush.h new file mode 100644 index 0000000..a509c4a --- /dev/null +++ b/lib9p/tests/test_server/fs_flush.h @@ -0,0 +1,27 @@ +/* lib9p/tests/test_server/fs_flush.h - flush-* API endpoints + * + * Copyright (C) 2024-2025 Luke T. Shumaker <lukeshu@lukeshu.com> + * SPDX-License-Identifier: AGPL-3.0-or-later + */ + +#ifndef _LIB9P_TESTS_TEST_SERVER_FS_FLUSH_H_ +#define _LIB9P_TESTS_TEST_SERVER_FS_FLUSH_H_ + +#include <util9p/static.h> +#include <libhw/host_net.h> + +struct flush_file { + char *name; + uint64_t pathnum; + + unsigned int flush_cnt; + enum { + FLUSH_READ, + FLUSH_ERROR, + FLUSH_SILENT, + } flush_behavior; +}; +LO_IMPLEMENTATION_H(lib9p_srv_file, struct flush_file, flush_file); +#define lo_box_flush_file_as_lib9p_srv_file(obj) util9p_box(flush_file, obj) + +#endif /* _LIB9P_TESTS_TEST_SERVER_FS_FLUSH_H_ */ diff --git a/lib9p/tests/test_server/fs_shutdown.c b/lib9p/tests/test_server/fs_shutdown.c index e872b78..e7375ef 100644 --- a/lib9p/tests/test_server/fs_shutdown.c +++ b/lib9p/tests/test_server/fs_shutdown.c @@ -4,7 +4,7 @@ * SPDX-License-Identifier: AGPL-3.0-or-later */ -#include <stdlib.h> +#include <libmisc/alloc.h> #include "fs_shutdown.h" @@ -68,7 +68,7 @@ static lo_interface lib9p_srv_fio shutdown_file_fopen(struct shutdown_file *self assert(self); assert(ctx); - struct shutdown_fio *ret = malloc(sizeof(struct shutdown_fio)); + struct shutdown_fio *ret = heap_alloc(1, struct shutdown_fio); ret->parent = self; return lo_box_shutdown_fio_as_lib9p_srv_fio(ret); diff --git a/lib9p/tests/test_server/fs_slowread.c b/lib9p/tests/test_server/fs_slowread.c deleted file mode 100644 index c94fba0..0000000 --- a/lib9p/tests/test_server/fs_slowread.c +++ /dev/null @@ -1,116 +0,0 @@ -/* lib9p/tests/test_server/fs_slowread.c - slowread API endpoint - * - * Copyright (C) 2024-2025 Luke T. Shumaker <lukeshu@lukeshu.com> - * SPDX-License-Identifier: AGPL-3.0-or-later - */ - -#include <stdlib.h> - -#include "fs_slowread.h" - -LO_IMPLEMENTATION_C(lib9p_srv_file, struct slowread_file, slowread_file, static); - -struct slowread_fio { - struct slowread_file *parent; -}; -LO_IMPLEMENTATION_H(lib9p_srv_fio, struct slowread_fio, slowread_fio); -LO_IMPLEMENTATION_C(lib9p_srv_fio, struct slowread_fio, slowread_fio, static); - -/* srv_file *******************************************************************/ - -static void slowread_file_free(struct slowread_file *self) { - assert(self); -} -static struct lib9p_qid slowread_file_qid(struct slowread_file *self) { - assert(self); - return (struct lib9p_qid){ - .type = LIB9P_QT_FILE, - .vers = 1, - .path = self->pathnum, - }; -} - -static struct lib9p_stat slowread_file_stat(struct slowread_file *self, struct lib9p_srv_ctx *ctx) { - assert(self); - assert(ctx); - return (struct lib9p_stat){ - .kern_type = 0, - .kern_dev = 0, - .file_qid = slowread_file_qid(self), - .file_mode = 0444, - .file_atime = UTIL9P_ATIME, - .file_mtime = UTIL9P_MTIME, - .file_size = 6, - .file_name = lib9p_str(self->name), - .file_owner_uid = lib9p_str("root"), - .file_owner_gid = lib9p_str("root"), - .file_last_modified_uid = lib9p_str("root"), - .file_extension = lib9p_str(NULL), - .file_owner_n_uid = 0, - .file_owner_n_gid = 0, - .file_last_modified_n_uid = 0, - }; -} -static void slowread_file_wstat(struct slowread_file *self, struct lib9p_srv_ctx *ctx, struct lib9p_stat) { - assert(self); - assert(ctx); - lib9p_error(&ctx->basectx, LIB9P_ERRNO_L_EROFS, "cannot wstat API file"); -} -static void slowread_file_remove(struct slowread_file *self, struct lib9p_srv_ctx *ctx) { - assert(self); - assert(ctx); - lib9p_error(&ctx->basectx, LIB9P_ERRNO_L_EROFS, "cannot remove API file"); -} - -LIB9P_SRV_NOTDIR(struct slowread_file, slowread_file) - -static lo_interface lib9p_srv_fio slowread_file_fopen(struct slowread_file *self, struct lib9p_srv_ctx *ctx, bool, bool, bool) { - assert(self); - assert(ctx); - - struct slowread_fio *ret = malloc(sizeof(struct slowread_fio)); - ret->parent = self; - - return lo_box_slowread_fio_as_lib9p_srv_fio(ret); -} - -/* srv_fio ********************************************************************/ - -static void slowread_fio_iofree(struct slowread_fio *self) { - assert(self); - free(self); -} - -static struct lib9p_qid slowread_fio_qid(struct slowread_fio *self) { - assert(self); - return slowread_file_qid(self->parent); -} - -static uint32_t slowread_fio_iounit(struct slowread_fio *self) { - assert(self); - return 0; -} - -static uint32_t slowread_fio_pwrite(struct slowread_fio *LM_UNUSED(self), - struct lib9p_srv_ctx *LM_UNUSED(ctx), - void *LM_UNUSED(buf), uint32_t LM_UNUSED(byte_count), - uint64_t LM_UNUSED(offset)) { - assert_notreached("not writable"); -} -static void slowread_fio_pread(struct slowread_fio *self, struct lib9p_srv_ctx *ctx, - uint32_t byte_count, uint64_t LM_UNUSED(byte_offset), - struct iovec *ret) { - assert(self); - assert(ctx); - assert(ret); - - while (!lib9p_srv_flush_requested(ctx)) - cr_yield(); - if (self->parent->flushable) - lib9p_srv_acknowledge_flush(ctx); - else - *ret = (struct iovec){ - .iov_base = "Sloth\n", - .iov_len = 6 < byte_count ? 6 : byte_count, - }; -} diff --git a/lib9p/tests/test_server/fs_slowread.h b/lib9p/tests/test_server/fs_slowread.h deleted file mode 100644 index ef4b65f..0000000 --- a/lib9p/tests/test_server/fs_slowread.h +++ /dev/null @@ -1,22 +0,0 @@ -/* lib9p/tests/test_server/fs_slowread.h - slowread API endpoint - * - * Copyright (C) 2024-2025 Luke T. Shumaker <lukeshu@lukeshu.com> - * SPDX-License-Identifier: AGPL-3.0-or-later - */ - -#ifndef _LIB9P_TESTS_TEST_SERVER_FS_SLOWREAD_H_ -#define _LIB9P_TESTS_TEST_SERVER_FS_SLOWREAD_H_ - -#include <util9p/static.h> -#include <libhw/host_net.h> - -struct slowread_file { - char *name; - uint64_t pathnum; - - bool flushable; -}; -LO_IMPLEMENTATION_H(lib9p_srv_file, struct slowread_file, slowread_file); -#define lo_box_slowread_file_as_lib9p_srv_file(obj) util9p_box(slowread_file, obj) - -#endif /* _LIB9P_TESTS_TEST_SERVER_FS_SLOWREAD_H_ */ diff --git a/lib9p/tests/test_server/fs_whoami.c b/lib9p/tests/test_server/fs_whoami.c index 560e31f..653ac4b 100644 --- a/lib9p/tests/test_server/fs_whoami.c +++ b/lib9p/tests/test_server/fs_whoami.c @@ -5,7 +5,9 @@ */ #include <stdio.h> /* for snprintf() */ -#include <stdlib.h> /* for malloc(), realloc(), free() */ +#include <stdlib.h> /* for realloc(), free() */ + +#include <libmisc/alloc.h> #include "fs_whoami.h" @@ -89,7 +91,7 @@ static lo_interface lib9p_srv_fio whoami_file_fopen(struct whoami_file *self, st assert(self); assert(ctx); - struct whoami_fio *ret = malloc(sizeof(struct whoami_fio)); + struct whoami_fio *ret = heap_alloc(1, struct whoami_fio); ret->parent = self; ret->buf_len = 0; ret->buf = NULL; diff --git a/lib9p/tests/test_server/main.c b/lib9p/tests/test_server/main.c index 0705747..d7819eb 100644 --- a/lib9p/tests/test_server/main.c +++ b/lib9p/tests/test_server/main.c @@ -19,8 +19,8 @@ #include <util9p/static.h> #include "static.h" +#include "fs_flush.h" #include "fs_shutdown.h" -#include "fs_slowread.h" #include "fs_whoami.h" /* configuration **************************************************************/ @@ -76,11 +76,12 @@ struct lib9p_srv_file root = API_FILE(5, "shutdown", shutdown, .listeners = globals.listeners, .nlisteners = LM_ARRAY_LEN(globals.listeners)), - API_FILE(6, "slowread", slowread, - .flushable = false), - API_FILE(7, "slowread-flushable", slowread, - .flushable = true), API_FILE(8, "whoami", whoami), + API_FILE(9, "flush-read", flush, .flush_cnt=1, .flush_behavior=FLUSH_READ), + API_FILE(10, "flush-error", flush, .flush_cnt=1, .flush_behavior=FLUSH_ERROR), + API_FILE(11, "flush-silent", flush, .flush_cnt=1, .flush_behavior=FLUSH_SILENT), + API_FILE(12, "flush-slowsilent", flush, .flush_cnt=2, .flush_behavior=FLUSH_SILENT), + API_FILE(13, "flush-slowread", flush, .flush_cnt=0, .flush_behavior=FLUSH_READ), ); static lo_interface lib9p_srv_file get_root(struct lib9p_srv_ctx *LM_UNUSED(ctx), struct lib9p_s LM_UNUSED(treename)) { diff --git a/lib9p/tests/testclient-p9p b/lib9p/tests/testclient-p9p index 9c9fb5e..9c9f9f2 100755 --- a/lib9p/tests/testclient-p9p +++ b/lib9p/tests/testclient-p9p @@ -25,9 +25,12 @@ out=$("${client[@]}" ls -l '') expect_lines \ 'd-r-xr-xr-x M 0 root root 0 Oct 7 2024 Documentation' \ '--r--r--r-- M 0 root root 166 Oct 7 2024 README.md' \ + '--r--r--r-- M 0 root root 6 Oct 7 2024 flush-error' \ + '--r--r--r-- M 0 root root 6 Oct 7 2024 flush-read' \ + '--r--r--r-- M 0 root root 6 Oct 7 2024 flush-silent' \ + '--r--r--r-- M 0 root root 6 Oct 7 2024 flush-slowread' \ + '--r--r--r-- M 0 root root 6 Oct 7 2024 flush-slowsilent' \ '---w--w--w- M 0 root root 0 Oct 7 2024 shutdown' \ - '--r--r--r-- M 0 root root 6 Oct 7 2024 slowread' \ - '--r--r--r-- M 0 root root 6 Oct 7 2024 slowread-flushable' \ '--r--r--r-- M 0 root root 9 Oct 7 2024 whoami' out=$("${client[@]}" ls -l 'Documentation/') diff --git a/lib9p/tests/testclient-p9p.explog b/lib9p/tests/testclient-p9p.explog index e5901d2..7f3953d 100644 --- a/lib9p/tests/testclient-p9p.explog +++ b/lib9p/tests/testclient-p9p.explog @@ -19,8 +19,8 @@ > Topen { tag=0 fid=1 mode=(MODE_READ) } < Ropen { tag=0 qid={ type=(DIR) vers=1 path=1 } iounit=0 } > Tread { tag=0 fid=1 offset=0 count=4096 } -< Rread { tag=0 count=428 data=<bytedata> } -> Tread { tag=0 fid=1 offset=428 count=4096 } +< Rread { tag=0 count=648 data=<bytedata> } +> Tread { tag=0 fid=1 offset=648 count=4096 } < Rread { tag=0 count=0 data="" } > Tclunk { tag=0 fid=1 } < Rclunk { tag=0 } diff --git a/lib9p/tests/testclient-sess.c b/lib9p/tests/testclient-sess.c index ded70d1..561c0c9 100644 --- a/lib9p/tests/testclient-sess.c +++ b/lib9p/tests/testclient-sess.c @@ -170,29 +170,62 @@ int main(int argc, char *argv[]) { recv9p(); /* Rattach */ /* flush, but original response comes back first */ - wname[0] = lib9p_str("slowread"); send9p(Twalk, .tag=0, .fid=0, .newfid=1, .nwname=1, .wname=wname); + wname[0] = lib9p_str("flush-read"); send9p(Twalk, .tag=0, .fid=0, .newfid=1, .nwname=1, .wname=wname); recv9p(); /* Rwalk */ send9p(Topen, .tag=0, .fid=1, .mode=LIB9P_O_MODE_READ); recv9p(); /* Ropen */ - send9p(Tread, .tag=1, .fid=1, .offset=0, .count=6); - send9p(Tflush, .tag=2, .oldtag=1); + send9p(Tread, .tag=0, .fid=1, .offset=0, .count=10); + send9p(Tflush, .tag=1, .oldtag=0); recv9p(); /* Rread */ recv9p(); /* Rflush */ /* flush, original request is aborted with error */ - wname[0] = lib9p_str("slowread-flushable"); send9p(Twalk, .tag=1, .fid=0, .newfid=2, .nwname=1, .wname=wname); + wname[0] = lib9p_str("flush-error"); send9p(Twalk, .tag=0, .fid=0, .newfid=2, .nwname=1, .wname=wname); recv9p(); /* Rwalk */ send9p(Topen, .tag=0, .fid=2, .mode=LIB9P_O_MODE_READ); recv9p(); /* Ropen */ - send9p(Tread, .tag=1, .fid=2, .offset=0, .count=6); - send9p(Tflush, .tag=2, .oldtag=1); + send9p(Tread, .tag=0, .fid=2, .offset=0, .count=10); + send9p(Tflush, .tag=1, .oldtag=0); recv9p(); /* Rerror */ recv9p(); /* Rflush */ + /* flush, original request is aborted without error */ + wname[0] = lib9p_str("flush-silent"); send9p(Twalk, .tag=0, .fid=0, .newfid=3, .nwname=1, .wname=wname); + recv9p(); /* Rwalk */ + send9p(Topen, .tag=0, .fid=3, .mode=LIB9P_O_MODE_READ); + recv9p(); /* Ropen */ + send9p(Tread, .tag=0, .fid=3, .offset=0, .count=10); + send9p(Tflush, .tag=1, .oldtag=0); + recv9p(); /* Rflush */ + + /* multiflush, original request is aborted without error */ + wname[0] = lib9p_str("flush-slowsilent"); send9p(Twalk, .tag=0, .fid=0, .newfid=4, .nwname=1, .wname=wname); + recv9p(); /* Rwalk */ + send9p(Topen, .tag=0, .fid=4, .mode=LIB9P_O_MODE_READ); + recv9p(); /* Ropen */ + send9p(Tread, .tag=0, .fid=4, .offset=0, .count=10); + send9p(Tflush, .tag=1, .oldtag=0); + send9p(Tflush, .tag=2, .oldtag=0); + recv9p(); /* Rflush */ + + /* flush, but flush is flushed */ + wname[0] = lib9p_str("flush-slowread"); send9p(Twalk, .tag=0, .fid=0, .newfid=5, .nwname=1, .wname=wname); + recv9p(); /* Rwalk */ + send9p(Topen, .tag=0, .fid=5, .mode=LIB9P_O_MODE_READ); + recv9p(); /* Ropen */ + send9p(Tread, .tag=0, .fid=5, .offset=0, .count=10); + send9p(Tflush, .tag=1, .oldtag=0); + send9p(Tflush, .tag=2, .oldtag=1); + recv9p(); /* Rflush */ + recv9p(); /* Rread */ + /* flush, unknown tag */ send9p(Tflush, .tag=0, .oldtag=99); recv9p(); /* Rflush */ + /* flushed by Tversion */ + send9p(Tread, .tag=0, .fid=3, .offset=0, .count=10); + /* shutdown ***********************************************************/ send9p(Tversion, .tag=0, .max_msg_size=(8*1024), .version=lib9p_str("9P2000")); recv9p(); /* Rversion */ diff --git a/lib9p/tests/testclient-sess.explog b/lib9p/tests/testclient-sess.explog index 3e2209a..74a2cd7 100644 --- a/lib9p/tests/testclient-sess.explog +++ b/lib9p/tests/testclient-sess.explog @@ -87,29 +87,62 @@ < Rattach { tag=0 qid={ type=(DIR) vers=1 path=1 } } # flush, but original response comes back first -> Twalk { tag=0 fid=0 newfid=1 nwname=1 wname=[ "slowread" ] } -< Rwalk { tag=0 nwqid=1 wqid=[ { type=(0) vers=1 path=6 } ] } +> Twalk { tag=0 fid=0 newfid=1 nwname=1 wname=[ "flush-read" ] } +< Rwalk { tag=0 nwqid=1 wqid=[ { type=(0) vers=1 path=9 } ] } > Topen { tag=0 fid=1 mode=(MODE_READ) } -< Ropen { tag=0 qid={ type=(0) vers=1 path=6 } iounit=0 } -> Tread { tag=1 fid=1 offset=0 count=6 } -> Tflush { tag=2 oldtag=1 } -< Rread { tag=1 count=6 data="Sloth\n" } +< Ropen { tag=0 qid={ type=(0) vers=1 path=9 } iounit=0 } +> Tread { tag=0 fid=1 offset=0 count=10 } +> Tflush { tag=1 oldtag=0 } +< Rread { tag=0 count=6 data="Sloth\n" } +< Rflush { tag=1 } + +# flush, original request is aborted with error +> Twalk { tag=0 fid=0 newfid=2 nwname=1 wname=[ "flush-error" ] } +< Rwalk { tag=0 nwqid=1 wqid=[ { type=(0) vers=1 path=10 } ] } +> Topen { tag=0 fid=2 mode=(MODE_READ) } +< Ropen { tag=0 qid={ type=(0) vers=1 path=10 } iounit=0 } +> Tread { tag=0 fid=2 offset=0 count=10 } +> Tflush { tag=1 oldtag=0 } +< Rerror { tag=0 errstr="request canceled by flush" errnum=L_ECANCELED } +< Rflush { tag=1 } + +# flush, original request is aborted without error +> Twalk { tag=0 fid=0 newfid=3 nwname=1 wname=[ "flush-silent" ] } +< Rwalk { tag=0 nwqid=1 wqid=[ { type=(0) vers=1 path=11 } ] } +> Topen { tag=0 fid=3 mode=(MODE_READ) } +< Ropen { tag=0 qid={ type=(0) vers=1 path=11 } iounit=0 } +> Tread { tag=0 fid=3 offset=0 count=10 } +> Tflush { tag=1 oldtag=0 } +< Rflush { tag=1 } + +# multiflush, original request is aborted without error +> Twalk { tag=0 fid=0 newfid=4 nwname=1 wname=[ "flush-slowsilent" ] } +< Rwalk { tag=0 nwqid=1 wqid=[ { type=(0) vers=1 path=12 } ] } +> Topen { tag=0 fid=4 mode=(MODE_READ) } +< Ropen { tag=0 qid={ type=(0) vers=1 path=12 } iounit=0 } +> Tread { tag=0 fid=4 offset=0 count=10 } +> Tflush { tag=1 oldtag=0 } +> Tflush { tag=2 oldtag=0 } < Rflush { tag=2 } -# flush, succeeds -> Twalk { tag=1 fid=0 newfid=2 nwname=1 wname=[ "slowread-flushable" ] } -< Rwalk { tag=1 nwqid=1 wqid=[ { type=(0) vers=1 path=7 } ] } -> Topen { tag=0 fid=2 mode=(MODE_READ) } -< Ropen { tag=0 qid={ type=(0) vers=1 path=7 } iounit=0 } -> Tread { tag=1 fid=2 offset=0 count=6 } +# flush, but flush is flushed +> Twalk { tag=0 fid=0 newfid=5 nwname=1 wname=[ "flush-slowread" ] } +< Rwalk { tag=0 nwqid=1 wqid=[ { type=(0) vers=1 path=13 } ] } +> Topen { tag=0 fid=5 mode=(MODE_READ) } +< Ropen { tag=0 qid={ type=(0) vers=1 path=13 } iounit=0 } +> Tread { tag=0 fid=5 offset=0 count=10 } +> Tflush { tag=1 oldtag=0 } > Tflush { tag=2 oldtag=1 } < Rflush { tag=2 } -< Rerror { tag=1 errstr="request canceled by flush" errnum=L_ECANCELED } +< Rread { tag=0 count=6 data="Sloth\n" } # flush, unknown tag > Tflush { tag=0 oldtag=99 } < Rflush { tag=0 } +# flushed by Tversion +> Tread { tag=0 fid=3 offset=0 count=10 } + # shutdown ##################################################################### > Tversion { tag=0 max_msg_size=8192 version="9P2000" } < Rversion { tag=0 max_msg_size=4120 version="9P2000" } diff --git a/libcr_ipc/chan.c b/libcr_ipc/chan.c index 6cbe890..b7ecfc8 100644 --- a/libcr_ipc/chan.c +++ b/libcr_ipc/chan.c @@ -4,60 +4,22 @@ * SPDX-License-Identifier: AGPL-3.0-or-later */ -#include <alloca.h> /* for alloca() */ #include <string.h> /* for memcpy() */ #include <libcr/coroutine.h> /* for cid_t, cr_* */ #include <libmisc/assert.h> #include <libmisc/rand.h> +#define IMPLEMENTATION_FOR_LIBCR_IPC_CHAN_H YES #include <libcr_ipc/chan.h> -/* base channels **************************************************************/ - -struct cr_chan_waiter { +struct _cr_select_waiter { cid_t cid; - void *val_ptr; - void (*dequeue)(void *, size_t); - void *dequeue_arg1; - size_t dequeue_arg2; -}; -DLIST_DECLARE_NODE(_cr_chan_waiter_list, struct cr_chan_waiter); - -void cr_chan_dequeue(void *_ch, size_t) { - struct _cr_chan *ch = _ch; - dlist_pop_from_front(&ch->waiters); -} - -void _cr_chan_xfer(enum _cr_chan_waiter_typ self_typ, struct _cr_chan *ch, void *val_ptr, size_t val_size) { - assert(ch); - assert(val_ptr); + struct _cr_select_arg_list_node *arg_vec; + size_t arg_cnt; - if (ch->waiters.front && ch->waiter_typ != self_typ) { /* non-blocking fast-path */ - /* Copy. */ - struct _cr_chan_waiter_list_node *front = ch->waiters.front; - if (self_typ == _CR_CHAN_SENDER) - memcpy(front->val.val_ptr, val_ptr, val_size); - else - memcpy(val_ptr, front->val.val_ptr, val_size); - cr_unpause(front->val.cid); - front->val.dequeue(front->val.dequeue_arg1, - front->val.dequeue_arg2); - cr_yield(); - } else { /* blocking slow-path */ - struct _cr_chan_waiter_list_node self = { .val = { - .cid = cr_getcid(), - .val_ptr = val_ptr, - .dequeue = cr_chan_dequeue, - .dequeue_arg1 = ch, - }}; - dlist_push_to_rear(&ch->waiters, &self); - ch->waiter_typ = self_typ; - cr_pause_and_yield(); - } -} - -/* select *********************************************************************/ + size_t ret; +}; enum cr_select_class { CR_SELECT_CLASS_DEFAULT, @@ -65,40 +27,26 @@ enum cr_select_class { CR_SELECT_CLASS_NONBLOCK, }; -struct cr_select_waiters { - size_t cnt; - struct cr_select_arg *args; - struct _cr_chan_waiter_list_node *nodes; -}; - -static inline enum cr_select_class cr_select_getclass(struct cr_select_arg arg) { - switch (arg.op) { +static inline enum cr_select_class cr_select_getclass(struct _cr_select_arg *arg) { + switch (arg->op) { case _CR_SELECT_OP_RECV: - if (arg.ch->waiters.front && arg.ch->waiter_typ == _CR_CHAN_SENDER) + if (arg->ch->waiters.front && arg->ch->waiters.front->val.op == _CR_SELECT_OP_SEND) return CR_SELECT_CLASS_NONBLOCK; else return CR_SELECT_CLASS_BLOCKING; case _CR_SELECT_OP_SEND: - if (arg.ch->waiters.front && arg.ch->waiter_typ == _CR_CHAN_RECVER) + if (arg->ch->waiters.front && arg->ch->waiters.front->val.op == _CR_SELECT_OP_RECV) return CR_SELECT_CLASS_NONBLOCK; else return CR_SELECT_CLASS_BLOCKING; case _CR_SELECT_OP_DEFAULT: return CR_SELECT_CLASS_DEFAULT; default: - assert_notreached("invalid arg.op"); + assert_notreached("invalid arg->op"); } } -void cr_select_dequeue(void *_waiters, size_t idx) { - struct cr_select_waiters *waiters = _waiters; - for (size_t i = 0; i < waiters->cnt; i++) - dlist_remove(&(waiters->args[i].ch->waiters), - &(waiters->nodes[i])); - waiters->cnt = idx; -} - -size_t cr_select_v(size_t arg_cnt, struct cr_select_arg arg_vec[]) { +size_t cr_select_v(size_t arg_cnt, struct _cr_select_arg_list_node arg_vec[]) { size_t cnt_blocking = 0; size_t cnt_nonblock = 0; size_t cnt_default = 0; @@ -108,7 +56,7 @@ size_t cr_select_v(size_t arg_cnt, struct cr_select_arg arg_vec[]) { cr_assert_in_coroutine(); for (size_t i = 0; i < arg_cnt; i++) { - switch (cr_select_getclass(arg_vec[i])) { + switch (cr_select_getclass(&arg_vec[i].val)) { case CR_SELECT_CLASS_BLOCKING: cnt_blocking++; break; @@ -122,46 +70,65 @@ size_t cr_select_v(size_t arg_cnt, struct cr_select_arg arg_vec[]) { } if (cnt_nonblock) { - size_t choice = rand_uint63n(cnt_nonblock); - for (size_t i = 0, seen = 0; i < arg_cnt; i++) { - if (cr_select_getclass(arg_vec[i]) == CR_SELECT_CLASS_NONBLOCK) { - if (seen == choice) { - _cr_chan_xfer(arg_vec[i].op == _CR_SELECT_OP_RECV - ? _CR_CHAN_RECVER - : _CR_CHAN_SENDER, - arg_vec[i].ch, - arg_vec[i].val_ptr, - arg_vec[i].val_siz); - return i; - } + size_t choice_among_nonblock = rand_uint63n(cnt_nonblock); + size_t choice_among_all = arg_cnt; + for (size_t i = 0, seen = 0; i < choice_among_all; i++) { + if (cr_select_getclass(&arg_vec[i].val) == CR_SELECT_CLASS_NONBLOCK) { + if (seen == choice_among_nonblock) + choice_among_all = i; seen++; } } - assert_notreached("should have returned from inside for() loop"); + assert(choice_among_all < arg_cnt); + + struct _cr_select_arg *this = &arg_vec[choice_among_all].val; + assert(this->ch->waiters.front); + struct _cr_select_arg *other = &this->ch->waiters.front->val; + assert(this->val_siz == other->val_siz); + assert(this->ch == other->ch); + switch (this->op) { + case _CR_SELECT_OP_SEND: + assert(other->op == _CR_SELECT_OP_RECV); + memcpy(other->val_ptr, this->val_ptr, this->val_siz); + break; + case _CR_SELECT_OP_RECV: + assert(other->op == _CR_SELECT_OP_SEND); + memcpy(this->val_ptr, other->val_ptr, this->val_siz); + break; + case _CR_SELECT_OP_DEFAULT: + assert_notreached("_CR_SELECT_OP_DEFAULT is not CR_SELECT_CLASS_NONBLOCK"); + } + struct _cr_select_waiter *waiter = other->waiter; + for (size_t i = 0; i < waiter->arg_cnt; i++) { + waiter->arg_vec[i].val.ch->nwaiters--; + dlist_remove(&waiter->arg_vec[i].val.ch->waiters, &waiter->arg_vec[i]); + if (&waiter->arg_vec[i].val == other) + waiter->ret = i; + } + cr_unpause(waiter->cid); + cr_yield(); + return choice_among_all; } if (cnt_default) { for (size_t i = 0; i < arg_cnt; i++) - if (cr_select_getclass(arg_vec[i]) == CR_SELECT_CLASS_DEFAULT) + if (cr_select_getclass(&arg_vec[i].val) == CR_SELECT_CLASS_DEFAULT) return i; assert_notreached("should have returned from inside for() loop"); } - struct cr_select_waiters waiters = { - .cnt = arg_cnt, - .args = arg_vec, - .nodes = alloca(sizeof(struct cr_chan_waiter) * arg_cnt), + assert(cnt_blocking && cnt_blocking == arg_cnt); + + struct _cr_select_waiter waiter = { + .cid = cr_getcid(), + .arg_vec = arg_vec, + .arg_cnt = arg_cnt, }; for (size_t i = 0; i < arg_cnt; i++) { - waiters.nodes[i] = (struct _cr_chan_waiter_list_node){ .val = { - .cid = cr_getcid(), - .val_ptr = arg_vec[i].val_ptr, - .dequeue = cr_select_dequeue, - .dequeue_arg1 = &waiters, - .dequeue_arg2 = i, - }}; - dlist_push_to_rear(&arg_vec[i].ch->waiters, &waiters.nodes[i]); + arg_vec[i].val.waiter = &waiter; + arg_vec[i].val.ch->nwaiters++; + dlist_push_to_rear(&arg_vec[i].val.ch->waiters, &arg_vec[i]); } cr_pause_and_yield(); - return waiters.cnt; + return waiter.ret; } diff --git a/libcr_ipc/include/libcr_ipc/chan.h b/libcr_ipc/include/libcr_ipc/chan.h index 5a87643..853b4ad 100644 --- a/libcr_ipc/include/libcr_ipc/chan.h +++ b/libcr_ipc/include/libcr_ipc/chan.h @@ -11,6 +11,7 @@ #include <stddef.h> /* for size_t */ #include <libmisc/linkedlist.h> /* for DLIST_DECLARE() */ +#include <libmisc/private.h> /* base channels **************************************************************/ @@ -44,11 +45,9 @@ * * void cr_chan_send(NAME##_t *ch, VAL_T val); */ -#define cr_chan_send(CH, VAL) do { \ - cr_assert_in_coroutine(); \ - typeof((CH)->val_typ[0]) _val_lvalue = VAL; \ - _cr_chan_xfer(_CR_CHAN_SENDER, &(CH)->core, \ - &_val_lvalue, sizeof(_val_lvalue)); \ +#define cr_chan_send(CH, VAL) do { \ + typeof((CH)->val_typ[0]) _val_lvalue = VAL; \ + (void)cr_select_l(CR_SELECT_SEND(CH, &_val_lvalue)); \ } while(0) /** @@ -60,12 +59,10 @@ * * VAL_T cr_chan_recv(NAME##_T ch); */ -#define cr_chan_recv(CH) ({ \ - cr_assert_in_coroutine(); \ - typeof((CH)->val_typ[0]) _val_lvalue; \ - _cr_chan_xfer(_CR_CHAN_RECVER, &(CH)->core, \ - &_val_lvalue, sizeof(_val_lvalue)); \ - _val_lvalue; \ +#define cr_chan_recv(CH) ({ \ + typeof((CH)->val_typ[0]) _val_lvalue; \ + (void)cr_select_l(CR_SELECT_RECV(CH, &_val_lvalue)); \ + _val_lvalue; \ }) /** @@ -78,10 +75,10 @@ * * bool cr_chan_can_send(NAME##_t *ch); */ -#define cr_chan_can_send(CH) ({ \ - cr_assert_in_coroutine(); \ - (bool)((CH)->core.waiters.front && \ - (CH)->core.waiter_typ == _CR_CHAN_RECVER); \ +#define cr_chan_can_send(CH) ({ \ + cr_assert_in_coroutine(); \ + (bool)((CH)->core.waiters.front && \ + (CH)->core.waiters.front->val.op == _CR_SELECT_OP_RECV); \ }) /** @@ -94,78 +91,101 @@ * * bool cr_chan_can_recv(NAME##_t *ch); */ -#define cr_chan_can_recv(CH) ({ \ - cr_assert_in_coroutine(); \ - (bool)((CH)->core.waiters.front && \ - (CH)->core.waiter_typ == _CR_CHAN_SENDER); \ +#define cr_chan_can_recv(CH) ({ \ + cr_assert_in_coroutine(); \ + (bool)((CH)->core.waiters.front && \ + (CH)->core.waiters.front->val.op == _CR_SELECT_OP_SEND); \ }) +/** + * cr_chan_num_waiters(ch) returns the number of coroutines currently + * blocked on the channel. + * + * @runs_in coroutine + * @cr_pauses never + * @cr_yields never + * + * size_t cr_chan_num_waiters(NAME##_t *ch); + */ +#define cr_chan_num_waiters(CH) ({ \ + cr_assert_in_coroutine(); \ + ((CH)->core.nwaiters); \ +}) -enum _cr_chan_waiter_typ { - _CR_CHAN_SENDER, - _CR_CHAN_RECVER, -}; - -DLIST_DECLARE(_cr_chan_waiter_list); - +DLIST_DECLARE(_cr_select_arg_list); struct _cr_chan { - enum _cr_chan_waiter_typ waiter_typ; - struct _cr_chan_waiter_list waiters; + struct _cr_select_arg_list waiters; + size_t nwaiters; }; -void _cr_chan_xfer(enum _cr_chan_waiter_typ self_typ, struct _cr_chan *ch, void *val_ptr, size_t val_size); - /* cr_select arguments ********************************************************/ /** * Do not populate cr_select_arg yourself; use the * CR_SELECT_{RECV,SEND,DEFAULT} macros. */ -struct cr_select_arg { +struct _cr_select_waiter; +struct _cr_select_arg { enum { _CR_SELECT_OP_RECV, _CR_SELECT_OP_SEND, _CR_SELECT_OP_DEFAULT, - } op; - struct _cr_chan *ch; - void *val_ptr; - size_t val_siz; + } op; + struct _cr_chan *ch; + void *val_ptr; + size_t val_siz; + BEGIN_PRIVATE(LIBCR_IPC_CHAN_H); + struct _cr_select_waiter *waiter; + END_PRIVATE(LIBCR_IPC_CHAN_H); }; +DLIST_DECLARE_NODE(_cr_select_arg_list, struct _cr_select_arg); +#define cr_select_arg _cr_select_arg_list_node -#define CR_SELECT_RECV(CH, VALP) ((struct cr_select_arg){ \ +#define CR_SELECT_RECV(CH, VALP) ((struct cr_select_arg){ .val = { \ .op = _CR_SELECT_OP_RECV, \ .ch = &((CH)->core), \ /* The _valp temporary variable is to get the compiler to check that \ * the types are compatible. */ \ .val_ptr = ({ typeof((CH)->val_typ[0]) *_valp = VALP; _valp; }), \ .val_siz = sizeof((CH)->val_typ[0]), \ -}) +}}) /* BUG: It's bogus that CR_SELECT_SEND takes VALP instead of VAL, but * since we need an address, taking VAL would introduce uncomfortable * questions about where VAL sits on the stack. */ -#define CR_SELECT_SEND(CH, VALP) ((struct cr_select_arg){ \ +#define CR_SELECT_SEND(CH, VALP) ((struct cr_select_arg){ .val = { \ .op = _CR_SELECT_OP_SEND, \ .ch = &((CH)->core), \ /* The _valp temporary variable is to get the compiler to check that \ * the types are compatible. */ \ .val_ptr = ({ typeof((CH)->val_typ[0]) *_valp = VALP; _valp; }), \ .val_siz = sizeof((CH)->val_typ[0]), \ -}) +}}) -#define CR_SELECT_DEFAULT ((struct cr_select_arg){ \ - .op = _CR_SELECT_OP_DEFAULT, \ -}) +#define CR_SELECT_DEFAULT ((struct cr_select_arg){ .val = { \ + .op = _CR_SELECT_OP_DEFAULT, \ +}}) /* cr_select_v(arg_cnt, arg_vec) **********************************************/ +/** + * @runs_in coroutine + * @cr_pauses maybe + * @cr_yields always + */ size_t cr_select_v(size_t arg_cnt, struct cr_select_arg arg_vec[]); /* cr_select_l(arg1, arg2, arg3, ...) ******************************************/ -#define cr_select_l(...) ({ \ - struct cr_select_arg _cr_select_args[] = { __VA_ARGS__ }; \ - cr_select_v(sizeof(_cr_select_args)/sizeof(_cr_select_args[0])); \ +/** + * @runs_in coroutine + * @cr_pauses maybe + * @cr_yields always + */ +#define cr_select_l(...) ({ \ + struct cr_select_arg _cr_select_args[] = { __VA_ARGS__ }; \ + cr_select_v(sizeof(_cr_select_args)/sizeof(_cr_select_args[0]), \ + _cr_select_args); \ }) #endif /* _LIBCR_IPC_CHAN_H_ */ diff --git a/libcr_ipc/tests/test_select.c b/libcr_ipc/tests/test_select.c index f0a71a3..3da1c78 100644 --- a/libcr_ipc/tests/test_select.c +++ b/libcr_ipc/tests/test_select.c @@ -53,9 +53,8 @@ COROUTINE cr_consumer(void *) { test_assert(ret_arg == 0); send = 890; - args[0] = CR_SELECT_SEND(&fch, &send); - args[1] = CR_SELECT_DEFAULT; - ret_arg = cr_select_v(2, args); + ret_arg = cr_select_l(CR_SELECT_SEND(&fch, &send), + CR_SELECT_DEFAULT); test_assert(ret_arg == 1); cr_end(); diff --git a/libhw_cr/host_net.c b/libhw_cr/host_net.c index 6ed6e46..ba634a6 100644 --- a/libhw_cr/host_net.c +++ b/libhw_cr/host_net.c @@ -19,6 +19,7 @@ #include <signal.h> /* for siginfo_t, struct sigaction, enum sigval, sigaction(), SA_SIGINFO */ #include <libcr/coroutine.h> +#include <libmisc/alloc.h> #include <libmisc/assert.h> #include <libmisc/macro.h> #include <libobj/obj.h> @@ -283,7 +284,7 @@ static void *hostnet_pthread_writev(void *_args) { struct hostnet_pthread_writev_args *args = _args; size_t count = 0; - struct iovec *iov = alloca(sizeof(struct iovec)*args->iovcnt); + struct iovec *iov = stack_alloc(args->iovcnt, struct iovec); for (int i = 0; i < args->iovcnt; i++) { iov[i] = args->iov[i]; count += args->iov[i].iov_len; diff --git a/libhw_cr/rp2040_hwspi.c b/libhw_cr/rp2040_hwspi.c index d181650..29a7bac 100644 --- a/libhw_cr/rp2040_hwspi.c +++ b/libhw_cr/rp2040_hwspi.c @@ -4,7 +4,6 @@ * SPDX-License-Identifier: AGPL-3.0-or-later */ -#include <alloca.h> #include <inttypes.h> /* for PRIu{n} */ #include <hardware/clocks.h> /* for clock_get_hz() and clk_peri */ @@ -12,6 +11,7 @@ #include <hardware/spi.h> #include <libcr/coroutine.h> +#include <libmisc/alloc.h> #include <libmisc/assert.h> #define LOG_NAME RP2040_SPI @@ -198,8 +198,8 @@ static void rp2040_hwspi_readwritev(struct rp2040_hwspi *self, const struct dupl * happens, so the IRQ machinery doesn't need to be engaged * at all. */ - struct dma_alias1 *tx_data_blocks = alloca(sizeof(struct dma_alias1)*(pruned_iovcnt+1)); - struct dma_alias0 *rx_data_blocks = alloca(sizeof(struct dma_alias0)*(pruned_iovcnt+1)); + struct dma_alias1 *tx_data_blocks = stack_alloc(pruned_iovcnt+1, struct dma_alias1); + struct dma_alias0 *rx_data_blocks = stack_alloc(pruned_iovcnt+1, struct dma_alias0); static_assert(!DMA_IS_TRIGGER(typeof(tx_data_blocks[0]), ctrl)); static_assert(DMA_IS_TRIGGER(typeof(rx_data_blocks[0]), ctrl)); diff --git a/libhw_cr/w5500_ll.h b/libhw_cr/w5500_ll.h index 2506cd2..8b98f9d 100644 --- a/libhw_cr/w5500_ll.h +++ b/libhw_cr/w5500_ll.h @@ -10,10 +10,10 @@ #ifndef _LIBHW_CR_W5500_LL_H_ #define _LIBHW_CR_W5500_LL_H_ -#include <alloca.h> /* for alloca() */ #include <stdint.h> /* for uint{n}_t */ #include <string.h> /* for memcmp() */ +#include <libmisc/alloc.h> /* for stack_alloc() */ #include <libmisc/assert.h> /* for assert(), static_assert() */ #include <libmisc/endian.h> /* for uint16be_t */ @@ -94,7 +94,7 @@ w5500ll_writev( (block & CTL_MASK_BLOCK) | CTL_W | CTL_OM_VDM, }; int inner_cnt = 1+io_slice_cnt(iov, iovcnt, skip, max); - struct duplex_iovec *inner = alloca(sizeof(struct duplex_iovec)*inner_cnt); + struct duplex_iovec *inner = stack_alloc(inner_cnt, struct duplex_iovec); inner[0] = (struct duplex_iovec){ .iov_read_to = IOVEC_DISCARD, .iov_write_from = header, @@ -131,7 +131,7 @@ w5500ll_readv( (block & CTL_MASK_BLOCK) | CTL_R | CTL_OM_VDM, }; int inner_cnt = 1+io_slice_cnt(iov, iovcnt, 0, max); - struct duplex_iovec *inner = alloca(sizeof(struct duplex_iovec)*inner_cnt); + struct duplex_iovec *inner = stack_alloc(inner_cnt, struct duplex_iovec); inner[0] = (struct duplex_iovec){ .iov_read_to = IOVEC_DISCARD, .iov_write_from = header, diff --git a/libmisc/include/libmisc/alloc.h b/libmisc/include/libmisc/alloc.h new file mode 100644 index 0000000..afddbce --- /dev/null +++ b/libmisc/include/libmisc/alloc.h @@ -0,0 +1,26 @@ +/* libmisc/alloc.h - Type-safe wrappers around alloca and malloc + * + * Copyright (C) 2025 Luke T. Shumaker <lukeshu@lukeshu.com> + * SPDX-License-Identifier: AGPL-3.0-or-later + */ + +#ifndef _LIBMISC_ALLOC_H_ +#define _LIBMISC_ALLOC_H_ + +#include <alloca.h> /* for alloca() */ +#include <stdlib.h> /* for calloc(), free() */ +#include <string.h> /* for memset() */ + +#define stack_alloc(N, TYP) ({ \ + size_t _size; \ + TYP *_ret = NULL; \ + if (!__builtin_mul_overflow(N, sizeof(TYP), &_size)) { \ + _ret = alloca(_size); \ + memset(_ret, 0, _size); \ + } \ + _ret; \ +}) + +#define heap_alloc(N, TYP) ((TYP *)calloc(N, sizeof(TYP))) + +#endif /* _LIBMISC_ALLOC_H_ */ diff --git a/libmisc/map.c b/libmisc/map.c index 7629c8c..cc34c16 100644 --- a/libmisc/map.c +++ b/libmisc/map.c @@ -8,6 +8,7 @@ #include <string.h> #include <libmisc/hash.h> +#include <libmisc/alloc.h> #include <libmisc/assert.h> #include <libmisc/map.h> @@ -63,7 +64,7 @@ static inline void _map_lookup(struct _map *m, void *keyp, static inline void _map_resize(struct _map *m, size_t new_nbuckets) { assert(m); assert(new_nbuckets); - struct _map_kv_list *new_buckets = calloc(new_nbuckets, sizeof(struct _map_kv_list)); + struct _map_kv_list *new_buckets = heap_alloc(new_nbuckets, struct _map_kv_list); for (size_t i = 0; i < m->nbuckets; i++) { while (m->buckets[i].front) { struct _map_kv_list_node *kv = m->buckets[i].front; |