From b160e07ac1cb13bef89c248c555a00287c7574b9 Mon Sep 17 00:00:00 2001 From: "Luke T. Shumaker" Date: Sun, 6 Oct 2024 22:13:55 -0600 Subject: lib9p: wip srv --- lib9p/srv.c | 70 ++++++++++++++++++++++++++++++++++++++++--------------------- 1 file changed, 46 insertions(+), 24 deletions(-) (limited to 'lib9p') diff --git a/lib9p/srv.c b/lib9p/srv.c index ee28b38..7dca9ec 100644 --- a/lib9p/srv.c +++ b/lib9p/srv.c @@ -1,4 +1,5 @@ #include +#include #include /* for PRI* */ #include /* for fprintf(), stderr */ #include /* for strerror() */ @@ -6,6 +7,7 @@ #include #include #include +#include #include #include @@ -152,13 +154,14 @@ static bool read_at_least(int fd, uint8_t *buf, size_t goal, size_t *done) { return false; } -static bool handle_Tmessage(struct _lib9p_srv_req *ctx); +static void handle_message(struct _lib9p_srv_req *ctx); COROUTINE lib9p_srv_read_cr(void *_srv) { uint8_t buf[CONFIG_9P_MAX_MSG_SIZE]; struct lib9p_srv *srv = _srv; assert(srv); + assert(srv->rootdir); cr_begin(); uint32_t initial_rerror_overhead = rerror_overhead_for_version(0, buf); @@ -217,8 +220,13 @@ COROUTINE lib9p_srv_read_cr(void *_srv) { if (read_at_least(conn.fd, buf, goal, &done)) goto close; - /* Handle the message... in another coroutine. */ - _lib9p_srv_reqch_send_req(&srv->_reqch, &req); + /* Handle the message... */ + if (buf[4] == LIB9P_TYP_Tversion) + /* ...in this coroutine for Tversion, */ + handle_message(&req); + else + /* ...but usually in another coroutine. */ + _lib9p_srv_reqch_send_req(&srv->_reqch, &req); } close: netio_close(conn.fd, true, sess.reqs.len == 0); @@ -242,6 +250,7 @@ COROUTINE lib9p_srv_write_cr(void *_srv) { struct lib9p_srv *srv = _srv; assert(srv); + assert(srv->rootdir); cr_begin(); for (;;) { @@ -259,9 +268,11 @@ COROUTINE lib9p_srv_write_cr(void *_srv) { _lib9p_srv_reqch_send_resp(rpc_handle, 0); /* Process the request. **************************************/ - handle_Tmessage(&req); + handle_message(&req); /* Release resources. ****************************************/ + if (_lib9p_srv_flushch_can_send(&req.ctx._flushch)) + _lib9p_srv_flushch_send(&req.ctx._flushch, false); reqmap_del(&req.parent_sess->reqs, req.tag); if (req.parent_sess->closing && !req.parent_sess->reqs.len) cr_unpause(req.parent_sess->parent_conn->reader); @@ -312,7 +323,7 @@ static tmessage_handler tmessage_handlers[0x100] = { [LIB9P_TYP_Tswrite] = (tmessage_handler)handle_Tswrite, /* 9P2000.e */ }; -static bool handle_Tmessage(struct _lib9p_srv_req *ctx) { +static void handle_message(struct _lib9p_srv_req *ctx) { uint8_t host_req[CONFIG_9P_MAX_HOSTMSG_SIZE]; uint8_t host_resp[CONFIG_9P_MAX_HOSTMSG_SIZE]; @@ -340,10 +351,9 @@ static bool handle_Tmessage(struct _lib9p_srv_req *ctx) { tmessage_handlers[typ](ctx, (void *)host_req, (void *)host_resp); write: - if (lib9p_ctx_has_error(&ctx->ctx.basectx)) { + if (lib9p_ctx_has_error(&ctx->ctx.basectx)) respond_error(ctx); - return true; - } else { + else { if (lib9p_marshal(&ctx->ctx.basectx, typ+1, ctx->tag, host_resp, ctx->net_bytes)) goto write; @@ -352,7 +362,6 @@ static bool handle_Tmessage(struct _lib9p_srv_req *ctx) { netio_write(ctx->parent_sess->parent_conn->fd, ctx->net_bytes, decode_u32le(ctx->net_bytes)); cr_mutex_unlock(&ctx->parent_sess->parent_conn->writelock); - return false; } } @@ -385,14 +394,33 @@ static void handle_Tversion(struct _lib9p_srv_req *ctx, return; } - /* +#pragma GCC diagnostic push +#pragma GCC diagnostic ignored "-Wdiscarded-qualifiers" + resp->version.utf8 = lib9p_version_str(version); +#pragma GCC diagnostic pop + resp->version.len = strlen((char *)resp->version.utf8); + resp->max_msg_size = (CONFIG_9P_MAX_MSG_SIZE < req->max_msg_size) + ? CONFIG_9P_MAX_MSG_SIZE + : req->max_msg_size; + + /* Close the old session. */ if (ctx->parent_sess->reqs.len) { - ctx->parent_sess->closing = true; - // TODO: send flush events - cr_pause_and_yield(); - assert(ctx->parent_sess->reqs.len == 0); + /* Flush all in-progress requests, and wait for them + * to finish. */ + struct cr_select_arg *list = alloca(sizeof(struct cr_select_arg) * ctx->parent_sess->reqs.len); + while (ctx->parent_sess->reqs.len) { + uint16_t tag __attribute__((unused)); + struct _lib9p_srv_req **reqpp; + size_t i = 0; + bool flushed; + MAP_FOREACH(&ctx->parent_sess->reqs, tag, reqpp) { + list[i] = CR_SELECT_RECV(&((*reqpp)->ctx._flushch), &flushed); + } + cr_select_v(i, list); + } } if (ctx->parent_sess->fids.len) { + /* Close all FIDs. */ uint32_t fid; struct lib9p_srv_file *fptr; MAP_FOREACH(&ctx->parent_sess->fids, fid, fptr) { @@ -400,17 +428,11 @@ static void handle_Tversion(struct _lib9p_srv_req *ctx, fidmap_del(&ctx->parent_sess->fids, fid); } } - */ - // TODO: replace session with a new one? -#pragma GCC diagnostic push -#pragma GCC diagnostic ignored "-Wdiscarded-qualifiers" - resp->version.utf8 = lib9p_version_str(version); -#pragma GCC diagnostic pop - resp->version.len = strlen((char *)resp->version.utf8); - resp->max_msg_size = (CONFIG_9P_MAX_MSG_SIZE < req->max_msg_size) - ? CONFIG_9P_MAX_MSG_SIZE - : req->max_msg_size; + /* Replace the old session with the new session. */ + ctx->parent_sess->version = version; + ctx->parent_sess->max_msg_size = resp->max_msg_size; + ctx->parent_sess->rerror_overhead = min_msg_size; } static void handle_Tauth(struct _lib9p_srv_req *ctx, -- cgit v1.2.3-2-g168b