diff options
author | Luke T. Shumaker <lukeshu@lukeshu.com> | 2025-04-16 05:21:31 -0600 |
---|---|---|
committer | Luke T. Shumaker <lukeshu@lukeshu.com> | 2025-04-16 09:02:38 -0600 |
commit | a5061fa634af1e7011182e1c115151dd96af8393 (patch) | |
tree | e13794366a09d0480253076d458b413faa5680a5 /lib9p/srv.c | |
parent | cf102e5e0ce3dfcba0a2dbb6c3d16883e0525b41 (diff) |
lib9p_srv: Re-think flush semantics
Diffstat (limited to 'lib9p/srv.c')
-rw-r--r-- | lib9p/srv.c | 61 |
1 files changed, 43 insertions, 18 deletions
diff --git a/lib9p/srv.c b/lib9p/srv.c index a681952..6ab2ab2 100644 --- a/lib9p/srv.c +++ b/lib9p/srv.c @@ -45,16 +45,22 @@ static_assert(CONFIG_9P_SRV_MAX_HOSTMSG_SIZE <= SSIZE_MAX); bool lib9p_srv_flush_requested(struct lib9p_srv_ctx *ctx) { assert(ctx); - return cr_chan_can_send(&ctx->flushch); + return cr_chan_can_send(&ctx->flush_ch); } void lib9p_srv_acknowledge_flush(struct lib9p_srv_ctx *ctx) { assert(ctx); - assert(cr_chan_can_send(&ctx->flushch)); - lib9p_error(&ctx->basectx, LIB9P_ERRNO_L_ECANCELED, "request canceled by flush"); - cr_chan_send(&ctx->flushch, true); + assert(cr_chan_can_send(&ctx->flush_ch)); + ctx->flush_acknowledged = true; } +#define req_debugf(fmt, ...) \ + debugf("cid=%zu: %s(tag=%"PRIu16"): " fmt, \ + cr_getcid(), \ + lib9p_msgtype_str(ctx->basectx.version, ctx->net_bytes[4]), \ + ctx->tag \ + __VA_OPT__(,) __VA_ARGS__) + /* structs ********************************************************************/ enum srv_filetype { @@ -656,9 +662,11 @@ void lib9p_srv_worker(struct srv_req *ctx) { /* Write the response. ***********************************************/ write: - if (lib9p_ctx_has_error(&ctx->basectx)) + if (lib9p_ctx_has_error(&ctx->basectx)) { srv_respond_error(ctx); - else { + } else if (ctx->flush_acknowledged) { + /* do nothing */ + } else { struct lib9p_Rmsg_send_buf net_resp; if (lib9p_Rmsg_marshal(&ctx->basectx, typ+1, host_resp, @@ -668,14 +676,18 @@ void lib9p_srv_worker(struct srv_req *ctx) { srv_write_Rmsg(ctx, &net_resp); } /* Release resources. ************************************************/ - if (host_req) - free(host_req); - free(ctx->net_bytes); - while (cr_chan_can_send(&ctx->flushch)) - cr_chan_send(&ctx->flushch, false); map_del(&ctx->parent_sess->reqs, ctx->tag); + size_t nwaiters; + while ((nwaiters = cr_chan_num_waiters(&ctx->flush_ch))) { + cr_chan_send(&ctx->flush_ch, (nwaiters == 1) + ? _LIB9P_SRV_FLUSH_RFLUSH + : _LIB9P_SRV_FLUSH_SILENT); + } if (ctx->parent_sess->closing && !map_len(&ctx->parent_sess->reqs)) cr_unpause(ctx->parent_sess->parent_conn->reader); + if (host_req) + free(host_req); + free(ctx->net_bytes); } /* handle_T* ******************************************************************/ @@ -739,15 +751,15 @@ static void handle_Tversion(struct srv_req *ctx, if (map_len(&ctx->parent_sess->reqs)) { /* Flush all in-progress requests, and wait for them * to finish. */ - struct cr_select_arg *list = stack_alloc(map_len(&ctx->parent_sess->reqs), struct cr_select_arg); + struct cr_select_arg *args = stack_alloc(map_len(&ctx->parent_sess->reqs), struct cr_select_arg); while (map_len(&ctx->parent_sess->reqs)) { size_t i = 0; - bool flushed; MAP_FOREACH(&ctx->parent_sess->reqs, tag, reqpp) { - list[i] = CR_SELECT_RECV(&((*reqpp)->flushch), &flushed); + enum _lib9p_srv_flush_result flushed; + args[i++] = CR_SELECT_RECV(&((*reqpp)->flush_ch), &flushed); } assert(i == map_len(&ctx->parent_sess->reqs)); - cr_select_v(i, list); + cr_select_v(i, args); } } if (map_len(&ctx->parent_sess->fids)) { @@ -870,8 +882,21 @@ static void handle_Tflush(struct srv_req *ctx, srv_handler_common(ctx, req, resp); struct srv_req **oldreqp = map_load(&ctx->parent_sess->reqs, req->oldtag); - if (oldreqp) - cr_chan_recv(&((*oldreqp)->flushch)); + if (oldreqp) { + struct srv_req *oldreq = *oldreqp; + enum _lib9p_srv_flush_result res = _LIB9P_SRV_FLUSH_RFLUSH; + switch (cr_select_l(CR_SELECT_RECV(&oldreq->flush_ch, &res), + CR_SELECT_SEND(&ctx->flush_ch, &res))) { + case 0: /* original request returned */ + req_debugf("original request (tag=%"PRIu16") returned", req->oldtag); + ctx->flush_acknowledged = (res == _LIB9P_SRV_FLUSH_SILENT); + break; + case 1: /* flush itself got flushed */ + req_debugf("flush itself flushed"); + ctx->flush_acknowledged = true; + break; + } + } } static void handle_Twalk(struct srv_req *ctx, @@ -1149,7 +1174,7 @@ static void handle_Tread(struct srv_req *ctx, case SRV_FILETYPE_FILE: struct iovec iov; LO_CALL(fidinfo->file.io, pread, ctx, req->count, req->offset, &iov); - if (!lib9p_ctx_has_error(&ctx->basectx)) { + if (!lib9p_ctx_has_error(&ctx->basectx) && !ctx->flush_acknowledged) { resp->count = iov.iov_len; resp->data = iov.iov_base; if (resp->count > req->count) |