diff options
Diffstat (limited to 'lib9p/srv.c')
-rw-r--r-- | lib9p/srv.c | 134 |
1 files changed, 81 insertions, 53 deletions
diff --git a/lib9p/srv.c b/lib9p/srv.c index dae73ea..6ab2ab2 100644 --- a/lib9p/srv.c +++ b/lib9p/srv.c @@ -4,7 +4,6 @@ * SPDX-License-Identifier: AGPL-3.0-or-later */ -#include <alloca.h> #include <inttypes.h> /* for PRI* */ #include <limits.h> /* for SSIZE_MAX, not set by newlib */ #include <stddef.h> /* for size_t */ @@ -17,6 +16,7 @@ #include <libcr/coroutine.h> #include <libcr_ipc/chan.h> #include <libcr_ipc/mutex.h> +#include <libmisc/alloc.h> #include <libmisc/assert.h> #include <libmisc/endian.h> #include <libmisc/map.h> @@ -45,16 +45,22 @@ static_assert(CONFIG_9P_SRV_MAX_HOSTMSG_SIZE <= SSIZE_MAX); bool lib9p_srv_flush_requested(struct lib9p_srv_ctx *ctx) { assert(ctx); - return cr_chan_can_send(&ctx->flushch); + return cr_chan_can_send(&ctx->flush_ch); } void lib9p_srv_acknowledge_flush(struct lib9p_srv_ctx *ctx) { assert(ctx); - assert(cr_chan_can_send(&ctx->flushch)); - lib9p_error(&ctx->basectx, LIB9P_ERRNO_L_ECANCELED, "request canceled by flush"); - cr_chan_send(&ctx->flushch, true); + assert(cr_chan_can_send(&ctx->flush_ch)); + ctx->flush_acknowledged = true; } +#define req_debugf(fmt, ...) \ + debugf("cid=%zu: %s(tag=%"PRIu16"): " fmt, \ + cr_getcid(), \ + lib9p_msgtype_str(ctx->basectx.version, ctx->net_bytes[4]), \ + ctx->tag \ + __VA_OPT__(,) __VA_ARGS__) + /* structs ********************************************************************/ enum srv_filetype { @@ -450,8 +456,6 @@ void lib9p_srv_accept_and_read_loop(struct lib9p_srv *srv, lo_interface net_stre } } -static void handle_message(struct srv_req *ctx); - void lib9p_srv_read(struct lib9p_srv *srv, lo_interface net_stream_conn _conn) { assert(srv); assert(srv->rootdir); @@ -511,7 +515,7 @@ void lib9p_srv_read(struct lib9p_srv *srv, lo_interface net_stream_conn _conn) { /* Handle the message... */ if (req.net_bytes[4] == LIB9P_TYP_Tversion) /* ...in this coroutine for Tversion, */ - handle_message(&req); + lib9p_srv_worker(&req); else /* ...but usually in another coroutine. */ cr_rpc_send_req(&srv->_reqch, &req); @@ -577,14 +581,7 @@ void lib9p_srv_worker_loop(struct lib9p_srv *srv) { cr_rpc_send_resp(rpc_handle, 0); /* Process the request. **************************************/ - handle_message(&req); - - /* Release resources. ****************************************/ - while (cr_chan_can_send(&req.flushch)) - cr_chan_send(&req.flushch, false); - map_del(&req.parent_sess->reqs, req.tag); - if (req.parent_sess->closing && !map_len(&req.parent_sess->reqs)) - cr_unpause(req.parent_sess->parent_conn->reader); + lib9p_srv_worker(&req); } } @@ -616,35 +613,11 @@ _HANDLER_PROTO(swrite); typedef void (*tmessage_handler)(struct srv_req *, void *, void *); -static tmessage_handler tmessage_handlers[0x100] = { - [LIB9P_TYP_Tversion] = (tmessage_handler)handle_Tversion, - [LIB9P_TYP_Tauth] = (tmessage_handler)handle_Tauth, - [LIB9P_TYP_Tattach] = (tmessage_handler)handle_Tattach, - [LIB9P_TYP_Tflush] = (tmessage_handler)handle_Tflush, - [LIB9P_TYP_Twalk] = (tmessage_handler)handle_Twalk, - [LIB9P_TYP_Topen] = (tmessage_handler)handle_Topen, - [LIB9P_TYP_Tcreate] = (tmessage_handler)handle_Tcreate, - [LIB9P_TYP_Tread] = (tmessage_handler)handle_Tread, - [LIB9P_TYP_Twrite] = (tmessage_handler)handle_Twrite, - [LIB9P_TYP_Tclunk] = (tmessage_handler)handle_Tclunk, - [LIB9P_TYP_Tremove] = (tmessage_handler)handle_Tremove, - [LIB9P_TYP_Tstat] = (tmessage_handler)handle_Tstat, - [LIB9P_TYP_Twstat] = (tmessage_handler)handle_Twstat, -#if CONFIG_9P_ENABLE_9P2000_p9p - [LIB9P_TYP_Topenfd] = (tmessage_handler)handle_Topenfd, -#endif -#if CONFIG_9P_ENABLE_9P2000_e - [LIB9P_TYP_Tsession] = (tmessage_handler)handle_Tsession, - [LIB9P_TYP_Tsread] = (tmessage_handler)handle_Tsread, - [LIB9P_TYP_Tswrite] = (tmessage_handler)handle_Tswrite, -#endif -}; - -static void handle_message(struct srv_req *ctx) { +void lib9p_srv_worker(struct srv_req *ctx) { uint8_t *host_req = NULL; uint8_t host_resp[CONFIG_9P_SRV_MAX_HOSTMSG_SIZE]; - /* Unmarshal it. */ + /* Unmarshal it. *****************************************************/ ssize_t host_size = lib9p_Tmsg_validate(&ctx->basectx, ctx->net_bytes); if (host_size < 0) goto write; @@ -655,13 +628,45 @@ static void handle_message(struct srv_req *ctx) { &typ, host_req); srv_msglog(ctx, typ, host_req); - /* Handle it. */ - tmessage_handlers[typ](ctx, (void *)host_req, (void *)host_resp); + /* Handle it. ********************************************************/ + tmessage_handler handler; +#pragma GCC diagnostic push +#pragma GCC diagnostic ignored "-Wswitch-enum" + switch (typ) { + case LIB9P_TYP_Tversion: handler = (tmessage_handler)handle_Tversion; break; + case LIB9P_TYP_Tauth: handler = (tmessage_handler)handle_Tauth; break; + case LIB9P_TYP_Tattach: handler = (tmessage_handler)handle_Tattach; break; + case LIB9P_TYP_Tflush: handler = (tmessage_handler)handle_Tflush; break; + case LIB9P_TYP_Twalk: handler = (tmessage_handler)handle_Twalk; break; + case LIB9P_TYP_Topen: handler = (tmessage_handler)handle_Topen; break; + case LIB9P_TYP_Tcreate: handler = (tmessage_handler)handle_Tcreate; break; + case LIB9P_TYP_Tread: handler = (tmessage_handler)handle_Tread; break; + case LIB9P_TYP_Twrite: handler = (tmessage_handler)handle_Twrite; break; + case LIB9P_TYP_Tclunk: handler = (tmessage_handler)handle_Tclunk; break; + case LIB9P_TYP_Tremove: handler = (tmessage_handler)handle_Tremove; break; + case LIB9P_TYP_Tstat: handler = (tmessage_handler)handle_Tstat; break; + case LIB9P_TYP_Twstat: handler = (tmessage_handler)handle_Twstat; break; +#if CONFIG_9P_ENABLE_9P2000_p9p + case LIB9P_TYP_Topenfd: handler = (tmessage_handler)handle_Topenfd; break +#endif +#if CONFIG_9P_ENABLE_9P2000_e + case LIB9P_TYP_Tsession: handler = (tmessage_handler)handle_Tsession; break; + case LIB9P_TYP_Tsread: handler = (tmessage_handler)handle_Tsread; break; + case LIB9P_TYP_Tswrite: handler = (tmessage_handler)handle_Tswrite; break; +#endif + default: + assert_notreached("lib9p_Tmsg_validate() should have rejected unknown typ"); + } +#pragma GCC diagnostic pop + handler(ctx, (void *)host_req, (void *)host_resp); + /* Write the response. ***********************************************/ write: - if (lib9p_ctx_has_error(&ctx->basectx)) + if (lib9p_ctx_has_error(&ctx->basectx)) { srv_respond_error(ctx); - else { + } else if (ctx->flush_acknowledged) { + /* do nothing */ + } else { struct lib9p_Rmsg_send_buf net_resp; if (lib9p_Rmsg_marshal(&ctx->basectx, typ+1, host_resp, @@ -670,6 +675,16 @@ static void handle_message(struct srv_req *ctx) { srv_msglog(ctx, typ+1, &host_resp); srv_write_Rmsg(ctx, &net_resp); } + /* Release resources. ************************************************/ + map_del(&ctx->parent_sess->reqs, ctx->tag); + size_t nwaiters; + while ((nwaiters = cr_chan_num_waiters(&ctx->flush_ch))) { + cr_chan_send(&ctx->flush_ch, (nwaiters == 1) + ? _LIB9P_SRV_FLUSH_RFLUSH + : _LIB9P_SRV_FLUSH_SILENT); + } + if (ctx->parent_sess->closing && !map_len(&ctx->parent_sess->reqs)) + cr_unpause(ctx->parent_sess->parent_conn->reader); if (host_req) free(host_req); free(ctx->net_bytes); @@ -736,15 +751,15 @@ static void handle_Tversion(struct srv_req *ctx, if (map_len(&ctx->parent_sess->reqs)) { /* Flush all in-progress requests, and wait for them * to finish. */ - struct cr_select_arg *list = alloca(sizeof(struct cr_select_arg) * map_len(&ctx->parent_sess->reqs)); + struct cr_select_arg *args = stack_alloc(map_len(&ctx->parent_sess->reqs), struct cr_select_arg); while (map_len(&ctx->parent_sess->reqs)) { size_t i = 0; - bool flushed; MAP_FOREACH(&ctx->parent_sess->reqs, tag, reqpp) { - list[i] = CR_SELECT_RECV(&((*reqpp)->flushch), &flushed); + enum _lib9p_srv_flush_result flushed; + args[i++] = CR_SELECT_RECV(&((*reqpp)->flush_ch), &flushed); } assert(i == map_len(&ctx->parent_sess->reqs)); - cr_select_v(i, list); + cr_select_v(i, args); } } if (map_len(&ctx->parent_sess->fids)) { @@ -867,8 +882,21 @@ static void handle_Tflush(struct srv_req *ctx, srv_handler_common(ctx, req, resp); struct srv_req **oldreqp = map_load(&ctx->parent_sess->reqs, req->oldtag); - if (oldreqp) - cr_chan_recv(&((*oldreqp)->flushch)); + if (oldreqp) { + struct srv_req *oldreq = *oldreqp; + enum _lib9p_srv_flush_result res = _LIB9P_SRV_FLUSH_RFLUSH; + switch (cr_select_l(CR_SELECT_RECV(&oldreq->flush_ch, &res), + CR_SELECT_SEND(&ctx->flush_ch, &res))) { + case 0: /* original request returned */ + req_debugf("original request (tag=%"PRIu16") returned", req->oldtag); + ctx->flush_acknowledged = (res == _LIB9P_SRV_FLUSH_SILENT); + break; + case 1: /* flush itself got flushed */ + req_debugf("flush itself flushed"); + ctx->flush_acknowledged = true; + break; + } + } } static void handle_Twalk(struct srv_req *ctx, @@ -1146,7 +1174,7 @@ static void handle_Tread(struct srv_req *ctx, case SRV_FILETYPE_FILE: struct iovec iov; LO_CALL(fidinfo->file.io, pread, ctx, req->count, req->offset, &iov); - if (!lib9p_ctx_has_error(&ctx->basectx)) { + if (!lib9p_ctx_has_error(&ctx->basectx) && !ctx->flush_acknowledged) { resp->count = iov.iov_len; resp->data = iov.iov_base; if (resp->count > req->count) |