summaryrefslogtreecommitdiff
path: root/libcr_ipc
diff options
context:
space:
mode:
authorLuke T. Shumaker <lukeshu@lukeshu.com>2025-02-04 17:32:54 -0700
committerLuke T. Shumaker <lukeshu@lukeshu.com>2025-02-04 21:00:44 -0700
commit635539cd0da713729ec2852b3691dc9f73734e59 (patch)
tree2b8861092dd4315d187c8e38ea58f847bed59414 /libcr_ipc
parent04978d7abe02e7f7e9092a44bb4d0ac3ec6ad366 (diff)
libcr_ipc: Fix rpc.h
Diffstat (limited to 'libcr_ipc')
-rw-r--r--libcr_ipc/CMakeLists.txt4
-rw-r--r--libcr_ipc/include/libcr_ipc/_linkedlist.h7
-rw-r--r--libcr_ipc/include/libcr_ipc/rpc.h164
-rw-r--r--libcr_ipc/tests/test_rpc.c59
4 files changed, 179 insertions, 55 deletions
diff --git a/libcr_ipc/CMakeLists.txt b/libcr_ipc/CMakeLists.txt
index 3ab56ab..05e6fa3 100644
--- a/libcr_ipc/CMakeLists.txt
+++ b/libcr_ipc/CMakeLists.txt
@@ -1,6 +1,6 @@
# libcr_ipc/CMakeLists.txt - TODO
#
-# Copyright (C) 2024 Luke T. Shumaker <lukeshu@lukeshu.com>
+# Copyright (C) 2024-2025 Luke T. Shumaker <lukeshu@lukeshu.com>
# SPDX-License-Identifier: AGPL-3.0-or-later
add_library(libcr_ipc INTERFACE)
@@ -14,7 +14,7 @@ set(ipc_tests
#select
#mutex
#owned_mutex
- #rpc
+ rpc
#sema
)
if (ENABLE_TESTS)
diff --git a/libcr_ipc/include/libcr_ipc/_linkedlist.h b/libcr_ipc/include/libcr_ipc/_linkedlist.h
index 67bff24..543e058 100644
--- a/libcr_ipc/include/libcr_ipc/_linkedlist.h
+++ b/libcr_ipc/include/libcr_ipc/_linkedlist.h
@@ -1,6 +1,6 @@
/* libcr_ipc/_linkedlist.h - Common low-level linked lists for use in libcr_ipc
*
- * Copyright (C) 2024 Luke T. Shumaker <lukeshu@lukeshu.com>
+ * Copyright (C) 2024-2025 Luke T. Shumaker <lukeshu@lukeshu.com>
* SPDX-License-Identifier: AGPL-3.0-or-later
*/
@@ -33,10 +33,9 @@ static inline void _cr_ipc_sll_push_to_rear(_cr_ipc_sll_root *root, _cr_ipc_sll_
node->rear = NULL;
if (root->rear)
root->rear->rear = node;
- else {
+ else
root->front = node;
- root->rear = node;
- }
+ root->rear = node;
}
static inline void _cr_ipc_sll_pop_from_front(_cr_ipc_sll_root *root) {
diff --git a/libcr_ipc/include/libcr_ipc/rpc.h b/libcr_ipc/include/libcr_ipc/rpc.h
index 80eee74..0ff8bbf 100644
--- a/libcr_ipc/include/libcr_ipc/rpc.h
+++ b/libcr_ipc/include/libcr_ipc/rpc.h
@@ -8,6 +8,7 @@
#define _LIBCR_IPC_RPC_H_
#include <stdbool.h> /* for bool */
+#include <string.h> /* for memcpy() */
#include <libcr/coroutine.h> /* for cid_t, cr_* */
@@ -34,6 +35,12 @@
* * _recv_req() and _send_resp().
* typedef ... NAME##_t;
*
+ * /**
+ * * A NAME##_req_t is handle that wraps a REQ_T and is used to return
+ * * the response RESP_T to the correct requester. `REQ_T req` is the
+ * * only public member.
+ * typedef struct { REQ_T req; ... } NAME##_req_t;
+ *
* methods:
*
* /**
@@ -89,71 +96,41 @@
#define CR_RPC_DECLARE(NAME, REQ_T, RESP_T) \
typedef struct { \
REQ_T req; \
- \
- RESP_T *_resp; \
+ \
+ RESP_T *_resp; /* where to write resp to */ \
cid_t _requester; \
} NAME##_req_t; \
- \
- struct _##NAME##_waiting_req { \
- _cr_ipc_sll_node; \
- REQ_T *req; \
- RESP_T *resp; \
- cid_t cid; \
- }; \
- \
- struct _##NAME##_waiting_resp { \
- _cr_ipc_sll_node; \
- cid_t cid; \
- }; \
- \
+ \
typedef struct { \
- _cr_ipc_sll_root waiting_reqs; \
- _cr_ipc_sll_root waiting_resps; \
+ struct _cr_rpc core; \
+ NAME##_req_t handle[0]; \
} NAME##_t; \
- \
+ \
static inline RESP_T NAME##_send_req(NAME##_t *ch, REQ_T req) { \
cr_assert_in_coroutine(); \
RESP_T resp; \
- struct _##NAME##_waiting_req self = { \
- .req = &req, \
- .resp = &resp, \
- .cid = cr_getcid(), \
- }; \
- _cr_ipc_sll_push_to_rear(&ch->waiting_reqs, &self); \
- if (ch->waiting_resps.front) { \
- cr_unpause(_cr_ipc_sll_node_cast(struct _##NAME##_waiting_resp, ch->waiting_resps.front)->cid); \
- _cr_ipc_sll_pop_from_front(&ch->waiting_resps); \
- } \
- cr_pause_and_yield(); \
+ _cr_rpc_send_req(&ch->core, \
+ &req, sizeof(req), \
+ &resp, sizeof(resp)); \
return resp; \
} \
- \
+ \
static inline NAME##_req_t NAME##_recv_req(NAME##_t *ch) { \
cr_assert_in_coroutine(); \
- if (!ch->waiting_reqs.front) { \
- struct _##NAME##_waiting_resp self = { \
- .cid = cr_getcid(), \
- }; \
- _cr_ipc_sll_push_to_rear(&ch->waiting_resps, &self); \
- cr_pause_and_yield(); \
- } \
- assert(ch->waiting_reqs.front); \
- struct _##NAME##_waiting_req *front_req = \
- _cr_ipc_sll_node_cast(struct _##NAME##_waiting_req, ch->waiting_reqs.front); \
- NAME##_req_t ret = { \
- .req = *(front_req->req), \
- ._resp = front_req->resp, \
- ._requester = front_req->cid, \
- }; \
- _cr_ipc_sll_pop_from_front(&ch->waiting_reqs); \
+ NAME##_req_t ret; \
+ _cr_rpc_recv_req(&ch->core, \
+ &ret.req, sizeof(ret.req), \
+ (void **)&ret._resp, \
+ &ret._requester); \
return ret; \
} \
- \
+ \
static inline bool NAME##_can_recv_req(NAME##_t *ch) { \
cr_assert_in_coroutine(); \
- return ch->waiting_reqs.front != NULL; \
+ return ch->core.waiters.front && \
+ ch->core.waiter_typ == _CR_RPC_REQUESTER; \
} \
- \
+ \
static inline void NAME##_send_resp(NAME##_req_t req, RESP_T resp) { \
cr_assert_in_coroutine(); \
*(req._resp) = resp; \
@@ -161,4 +138,93 @@
cr_yield(); \
}
+enum _cr_rpc_waiter_typ {
+ _CR_RPC_REQUESTER,
+ _CR_RPC_RESPONDER,
+};
+
+struct _cr_rpc_requester {
+ _cr_ipc_sll_node;
+ cid_t cid;
+ void *req_ptr; /* where to read req from */
+ void *resp_ptr; /* where to write resp to */
+};
+
+struct _cr_rpc_responder {
+ _cr_ipc_sll_node;
+ /* /* before enqueued | after dequeued */
+ /* /* -------------------+-------------------- */
+ cid_t cid; /* responder cid | requester cid */
+ void *ptr; /* where to write req | where to write resp */
+};
+
+struct _cr_rpc {
+ enum _cr_rpc_waiter_typ waiter_typ;
+ _cr_ipc_sll_root waiters;
+};
+
+static inline void _cr_rpc_send_req(struct _cr_rpc *ch,
+ void *req_ptr, size_t req_size,
+ void *resp_ptr, size_t resp_size)
+{
+ assert(ch);
+ assert(req_ptr);
+ assert(resp_ptr);
+
+ if (ch->waiters.front && ch->waiter_typ != _CR_RPC_REQUESTER) { /* fast-path (still blocks) */
+ struct _cr_rpc_responder *responder =
+ _cr_ipc_sll_node_cast(struct _cr_rpc_responder, ch->waiters.front);
+ _cr_ipc_sll_pop_from_front(&ch->waiters);
+ /* Copy the req to the responder's stack. */
+ memcpy(responder->ptr, req_ptr, req_size);
+ /* Notify the responder that we have done so. */
+ cr_unpause(responder->cid);
+ responder->cid = cr_getcid();
+ responder->ptr = resp_ptr;
+ /* Wait for the responder to set `*resp_ptr`. */
+ cr_pause_and_yield();
+ } else { /* blocking slow-path */
+ struct _cr_rpc_requester self = {
+ .cid = cr_getcid(),
+ .req_ptr = req_ptr,
+ .resp_ptr = resp_ptr,
+ };
+ _cr_ipc_sll_push_to_rear(&ch->waiters, &self);
+ /* Wait for a responder to both copy our req and sed
+ * `*resp_ptr`. */
+ cr_pause_and_yield();
+ }
+}
+
+static inline void _cr_rpc_recv_req(struct _cr_rpc *ch,
+ void *req_ptr, size_t req_size,
+ void **ret_resp_ptr,
+ cid_t *ret_requester)
+{
+ assert(ch);
+ assert(req_ptr);
+ assert(ret_resp_ptr);
+ assert(ret_requester);
+
+ if (ch->waiters.front && ch->waiter_typ != _CR_RPC_RESPONDER) { /* non-blocking fast-path */
+ struct _cr_rpc_requester *requester =
+ _cr_ipc_sll_node_cast(struct _cr_rpc_requester, ch->waiters.front);
+ _cr_ipc_sll_pop_from_front(&ch->waiters);
+
+ memcpy(req_ptr, requester->req_ptr, req_size);
+ *ret_requester = requester->cid;
+ *ret_resp_ptr = requester->resp_ptr;
+ } else { /* blocking slow-path */
+ struct _cr_rpc_responder self = {
+ .cid = cr_getcid(),
+ .ptr = req_ptr,
+ };
+ _cr_ipc_sll_push_to_rear(&ch->waiters, &self);
+ ch->waiter_typ = _CR_RPC_RESPONDER;
+ cr_pause_and_yield();
+ *ret_requester = self.cid;
+ *ret_resp_ptr = self.ptr;
+ }
+}
+
#endif /* _LIBCR_IPC_RPC_H_ */
diff --git a/libcr_ipc/tests/test_rpc.c b/libcr_ipc/tests/test_rpc.c
new file mode 100644
index 0000000..4aff5ca
--- /dev/null
+++ b/libcr_ipc/tests/test_rpc.c
@@ -0,0 +1,59 @@
+/* libcr_ipc/tests/test_rpc.c - Tests for <libcr_ipc/rpc.h>
+ *
+ * Copyright (C) 2025 Luke T. Shumaker <lukeshu@lukeshu.com>
+ * SPDX-License-Identifier: AGPL-3.0-or-later
+ */
+
+#include <libcr/coroutine.h>
+#include <libcr_ipc/rpc.h>
+
+#include "test.h"
+
+CR_RPC_DECLARE(intrpc, int, int)
+
+/* Test that the RPC is fair, have worker1 start waiting first, and
+ * ensure that it gets the first request. */
+
+COROUTINE cr_caller(void *_ch) {
+ intrpc_t *ch = _ch;
+ cr_begin();
+
+ int resp = intrpc_send_req(ch, 1);
+ test_assert(resp == 2);
+
+ resp = intrpc_send_req(ch, 3);
+ test_assert(resp == 4);
+
+ cr_exit();
+}
+
+COROUTINE cr_worker1(void *_ch) {
+ intrpc_t *ch = _ch;
+ cr_begin();
+
+ intrpc_req_t req = intrpc_recv_req(ch);
+ test_assert(req.req == 1);
+ intrpc_send_resp(req, 2);
+
+ cr_exit();
+}
+
+COROUTINE cr_worker2(void *_ch) {
+ intrpc_t *ch = _ch;
+ cr_begin();
+
+ intrpc_req_t req = intrpc_recv_req(ch);
+ test_assert(req.req == 3);
+ intrpc_send_resp(req, 4);
+
+ cr_exit();
+}
+
+int main() {
+ intrpc_t ch = {0};
+ coroutine_add("worker1", cr_worker1, &ch);
+ coroutine_add("caller", cr_caller, &ch);
+ coroutine_add("worker2", cr_worker2, &ch);
+ coroutine_main();
+ return 0;
+}