summaryrefslogtreecommitdiff
path: root/lib9p/srv.c
diff options
context:
space:
mode:
authorLuke T. Shumaker <lukeshu@lukeshu.com>2025-04-16 05:21:31 -0600
committerLuke T. Shumaker <lukeshu@lukeshu.com>2025-04-16 09:02:38 -0600
commita5061fa634af1e7011182e1c115151dd96af8393 (patch)
treee13794366a09d0480253076d458b413faa5680a5 /lib9p/srv.c
parentcf102e5e0ce3dfcba0a2dbb6c3d16883e0525b41 (diff)
lib9p_srv: Re-think flush semantics
Diffstat (limited to 'lib9p/srv.c')
-rw-r--r--lib9p/srv.c61
1 files changed, 43 insertions, 18 deletions
diff --git a/lib9p/srv.c b/lib9p/srv.c
index a681952..6ab2ab2 100644
--- a/lib9p/srv.c
+++ b/lib9p/srv.c
@@ -45,16 +45,22 @@ static_assert(CONFIG_9P_SRV_MAX_HOSTMSG_SIZE <= SSIZE_MAX);
bool lib9p_srv_flush_requested(struct lib9p_srv_ctx *ctx) {
assert(ctx);
- return cr_chan_can_send(&ctx->flushch);
+ return cr_chan_can_send(&ctx->flush_ch);
}
void lib9p_srv_acknowledge_flush(struct lib9p_srv_ctx *ctx) {
assert(ctx);
- assert(cr_chan_can_send(&ctx->flushch));
- lib9p_error(&ctx->basectx, LIB9P_ERRNO_L_ECANCELED, "request canceled by flush");
- cr_chan_send(&ctx->flushch, true);
+ assert(cr_chan_can_send(&ctx->flush_ch));
+ ctx->flush_acknowledged = true;
}
+#define req_debugf(fmt, ...) \
+ debugf("cid=%zu: %s(tag=%"PRIu16"): " fmt, \
+ cr_getcid(), \
+ lib9p_msgtype_str(ctx->basectx.version, ctx->net_bytes[4]), \
+ ctx->tag \
+ __VA_OPT__(,) __VA_ARGS__)
+
/* structs ********************************************************************/
enum srv_filetype {
@@ -656,9 +662,11 @@ void lib9p_srv_worker(struct srv_req *ctx) {
/* Write the response. ***********************************************/
write:
- if (lib9p_ctx_has_error(&ctx->basectx))
+ if (lib9p_ctx_has_error(&ctx->basectx)) {
srv_respond_error(ctx);
- else {
+ } else if (ctx->flush_acknowledged) {
+ /* do nothing */
+ } else {
struct lib9p_Rmsg_send_buf net_resp;
if (lib9p_Rmsg_marshal(&ctx->basectx,
typ+1, host_resp,
@@ -668,14 +676,18 @@ void lib9p_srv_worker(struct srv_req *ctx) {
srv_write_Rmsg(ctx, &net_resp);
}
/* Release resources. ************************************************/
- if (host_req)
- free(host_req);
- free(ctx->net_bytes);
- while (cr_chan_can_send(&ctx->flushch))
- cr_chan_send(&ctx->flushch, false);
map_del(&ctx->parent_sess->reqs, ctx->tag);
+ size_t nwaiters;
+ while ((nwaiters = cr_chan_num_waiters(&ctx->flush_ch))) {
+ cr_chan_send(&ctx->flush_ch, (nwaiters == 1)
+ ? _LIB9P_SRV_FLUSH_RFLUSH
+ : _LIB9P_SRV_FLUSH_SILENT);
+ }
if (ctx->parent_sess->closing && !map_len(&ctx->parent_sess->reqs))
cr_unpause(ctx->parent_sess->parent_conn->reader);
+ if (host_req)
+ free(host_req);
+ free(ctx->net_bytes);
}
/* handle_T* ******************************************************************/
@@ -739,15 +751,15 @@ static void handle_Tversion(struct srv_req *ctx,
if (map_len(&ctx->parent_sess->reqs)) {
/* Flush all in-progress requests, and wait for them
* to finish. */
- struct cr_select_arg *list = stack_alloc(map_len(&ctx->parent_sess->reqs), struct cr_select_arg);
+ struct cr_select_arg *args = stack_alloc(map_len(&ctx->parent_sess->reqs), struct cr_select_arg);
while (map_len(&ctx->parent_sess->reqs)) {
size_t i = 0;
- bool flushed;
MAP_FOREACH(&ctx->parent_sess->reqs, tag, reqpp) {
- list[i] = CR_SELECT_RECV(&((*reqpp)->flushch), &flushed);
+ enum _lib9p_srv_flush_result flushed;
+ args[i++] = CR_SELECT_RECV(&((*reqpp)->flush_ch), &flushed);
}
assert(i == map_len(&ctx->parent_sess->reqs));
- cr_select_v(i, list);
+ cr_select_v(i, args);
}
}
if (map_len(&ctx->parent_sess->fids)) {
@@ -870,8 +882,21 @@ static void handle_Tflush(struct srv_req *ctx,
srv_handler_common(ctx, req, resp);
struct srv_req **oldreqp = map_load(&ctx->parent_sess->reqs, req->oldtag);
- if (oldreqp)
- cr_chan_recv(&((*oldreqp)->flushch));
+ if (oldreqp) {
+ struct srv_req *oldreq = *oldreqp;
+ enum _lib9p_srv_flush_result res = _LIB9P_SRV_FLUSH_RFLUSH;
+ switch (cr_select_l(CR_SELECT_RECV(&oldreq->flush_ch, &res),
+ CR_SELECT_SEND(&ctx->flush_ch, &res))) {
+ case 0: /* original request returned */
+ req_debugf("original request (tag=%"PRIu16") returned", req->oldtag);
+ ctx->flush_acknowledged = (res == _LIB9P_SRV_FLUSH_SILENT);
+ break;
+ case 1: /* flush itself got flushed */
+ req_debugf("flush itself flushed");
+ ctx->flush_acknowledged = true;
+ break;
+ }
+ }
}
static void handle_Twalk(struct srv_req *ctx,
@@ -1149,7 +1174,7 @@ static void handle_Tread(struct srv_req *ctx,
case SRV_FILETYPE_FILE:
struct iovec iov;
LO_CALL(fidinfo->file.io, pread, ctx, req->count, req->offset, &iov);
- if (!lib9p_ctx_has_error(&ctx->basectx)) {
+ if (!lib9p_ctx_has_error(&ctx->basectx) && !ctx->flush_acknowledged) {
resp->count = iov.iov_len;
resp->data = iov.iov_base;
if (resp->count > req->count)