summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
-rw-r--r--CMakeLists.txt2
-rwxr-xr-xbuild-aux/stack.c.gen798
-rw-r--r--cmd/sbc_harness/main.c6
-rw-r--r--lib9p/tests/test_server/config/config.h3
4 files changed, 521 insertions, 288 deletions
diff --git a/CMakeLists.txt b/CMakeLists.txt
index 483274d..f13226b 100644
--- a/CMakeLists.txt
+++ b/CMakeLists.txt
@@ -78,7 +78,7 @@ function(add_stack_analysis arg_outfile arg_objlib_target)
)
add_custom_command(
OUTPUT "${arg_outfile}"
- COMMAND "${CMAKE_SOURCE_DIR}/build-aux/stack.c.gen" "${PICO_PLATFORM}" "${CMAKE_SOURCE_DIR}" "$<TARGET_OBJECTS:${arg_objlib_target}>" "$<PATH:ABSOLUTE_PATH,$<TARGET_PROPERTY:${arg_objlib_target},SOURCES>,${CMAKE_CURRENT_SOURCE_DIR}>" >"${arg_outfile}"
+ COMMAND "${CMAKE_SOURCE_DIR}/build-aux/stack.c.gen" "${PICO_PLATFORM}" "${CMAKE_SOURCE_DIR}" "$<TARGET_OBJECTS:${arg_objlib_target}>" >"${arg_outfile}"
COMMAND_EXPAND_LISTS
DEPENDS "$<TARGET_OBJECTS:${arg_objlib_target}>" "${CMAKE_SOURCE_DIR}/build-aux/stack.c.gen"
COMMENT "Calculating ${arg_objlib_target} required stack sizes"
diff --git a/build-aux/stack.c.gen b/build-aux/stack.c.gen
index 07d5cfc..2acce60 100755
--- a/build-aux/stack.c.gen
+++ b/build-aux/stack.c.gen
@@ -143,14 +143,18 @@ class AnalyzeResult(typing.NamedTuple):
dynamic: set[str]
+class Application(typing.Protocol):
+ def extra_nodes(self) -> typing.Collection[Node]: ...
+ def location_xform(self, loc: str) -> str: ...
+ def indirect_callees(self, elem: VCGElem) -> tuple[list[str], bool]: ...
+ def skip_call(self, chain: list[str], funcname: str) -> bool: ...
+
+
def analyze(
*,
- ci_fnames: list[str],
- extra_nodes: list[Node] = [],
+ ci_fnames: typing.Collection[str],
app_func_filters: dict[str, typing.Callable[[str], int]],
- app_location_xform: typing.Callable[[str], str],
- app_indirect_callees: typing.Callable[[VCGElem], list[str]],
- app_skip_call: typing.Callable[[list[str], str], bool],
+ app: Application,
cfg_max_call_depth: int,
) -> AnalyzeResult:
re_node_label = re.compile(
@@ -221,9 +225,10 @@ def analyze(
if caller not in graph:
raise ValueError(f"unknown caller: {caller}")
if callee == "__indirect_call":
- for callee in app_indirect_callees(elem):
+ callees, missing_ok = app.indirect_callees(elem)
+ for callee in callees:
if callee not in graph[caller].calls:
- graph[caller].calls[callee] = True
+ graph[caller].calls[callee] = missing_ok
else:
graph[caller].calls[callee] = False
case _:
@@ -234,7 +239,7 @@ def analyze(
for elem in parse_vcg(fh):
handle_elem(elem)
- for node in extra_nodes:
+ for node in app.extra_nodes():
if node.funcname in graph:
raise ValueError(f"duplicate node {repr(node.funcname)}")
graph[node.funcname] = node
@@ -267,12 +272,12 @@ def analyze(
nonlocal dbg
funcname = resolve_funcname(orig_funcname)
if not funcname:
- if app_skip_call(chain, orig_funcname):
+ if app.skip_call(chain, orig_funcname):
return 0
if not missing_ok:
missing.add(orig_funcname)
return 0
- if app_skip_call(chain, funcname):
+ if app.skip_call(chain, funcname):
return 0
if len(chain) == cfg_max_call_depth:
@@ -282,7 +287,7 @@ def analyze(
if dbg:
print(f"//dbg: {funcname}\t{node.nstatic}")
if node.usage_kind == "dynamic" or node.ndynamic > 0:
- dynamic.add(app_location_xform(funcname))
+ dynamic.add(app.location_xform(funcname))
return node.nstatic + max(
[
0,
@@ -301,7 +306,7 @@ def analyze(
for funcname in graph:
if cnt := grp_filter(funcname):
n = nstatic(funcname)
- rows[app_location_xform(funcname)] = n
+ rows[app.location_xform(funcname)] = n
if n > nmax:
nmax = n
nsum += cnt * n
@@ -311,12 +316,11 @@ def analyze(
################################################################################
-# Application-specific code
-
-re_location = re.compile(r"(?P<filename>.+):(?P<row>[0-9]+):(?P<col>[0-9]+)")
+# Mildly-application-specific code
def read_source(location: str) -> str:
+ re_location = re.compile(r"(?P<filename>.+):(?P<row>[0-9]+):(?P<col>[0-9]+)")
m = re_location.fullmatch(location)
if not m:
raise ValueError(f"unexpected label value {repr(location)}")
@@ -327,140 +331,311 @@ def read_source(location: str) -> str:
return fh.readlines()[row][col:].rstrip()
-def main(
- *,
- arg_pico_platform: str,
- arg_base_dir: str,
- arg_ci_fnames: list[str],
- arg_c_fnames: list[str],
-) -> None:
+def get_zero_or_one(
+ pred: typing.Callable[[str], bool], fnames: typing.Collection[str]
+) -> str | None:
+ count = sum(1 for fname in fnames if pred(fname))
+ assert count < 2
+ if count:
+ return next(fname for fname in fnames if pred(fname))
+ return None
- re_call_other = re.compile(r"(?P<func>[^(]+)\(.*")
- all_nodes: list[Node] = []
- hooks_is_intrhandler: list[typing.Callable[[str], bool]] = []
- hooks_indirect_callees: list[typing.Callable[[str, str], list[str] | None]] = []
- hooks_skip_call: list[typing.Callable[[list[str], str], bool]] = []
+re_call_other = re.compile(r"(?P<func>[^(]+)\(.*")
- # The sbc-harness codebase #######################################
- objcalls: dict[str, set[str]] = {}
- re_vtable_start = re.compile(r"_vtable\s*=\s*\{")
- re_vtable_entry = re.compile(r"^\s+\.(?P<meth>\S+)\s*=\s*(?P<impl>\S+),.*")
- for fname in c_fnames:
- with open(fname, "r") as fh:
- in_vtable = False
- for line in fh:
- line = line.rstrip()
- if in_vtable:
- if m := re_vtable_entry.fullmatch(line):
- meth = m.group("meth")
- impl = m.group("impl")
- if impl == "NULL":
- continue
- if m.group("meth") not in objcalls:
- objcalls[meth] = set()
- objcalls[meth].add(impl)
- if "}" in line:
- in_vtable = False
- elif re_vtable_start.search(line):
- in_vtable = True
-
- tmessage_handlers: set[str] | None = None
- if any(fname.endswith("lib9p/srv.c") for fname in c_fnames):
- srv_c = next(fname for fname in c_fnames if fname.endswith("lib9p/srv.c"))
- re_tmessage_handler = re.compile(
- r"^\s*\[LIB9P_TYP_T[^]]+\]\s*=\s*\(tmessage_handler\)\s*(?P<handler>\S+),\s*$"
- )
- tmessage_handlers = set()
- with open(srv_c, "r") as fh:
- for line in fh:
- line = line.rstrip()
- if m := re_tmessage_handler.fullmatch(line):
- tmessage_handlers.add(m.group("handler"))
+class Plugin(typing.Protocol):
+ def extra_nodes(self) -> typing.Collection[Node]: ...
+ def indirect_callees(
+ self, loc: str, line: str
+ ) -> tuple[list[str], bool] | None: ...
+ def skip_call(self, chain: list[str], call: str) -> bool: ...
- lib9p_msgs: set[str] = set()
- if any(fname.endswith("lib9p/9p.c") for fname in c_fnames):
- generated_c = next(
- fname for fname in c_fnames if fname.endswith("lib9p/9p.generated.c")
- )
- re_lib9p_msg_entry = re.compile(r"^\s*_MSG_(?:[A-Z]+)\((?P<typ>\S+)\),$")
- with open(generated_c, "r") as fh:
- for line in fh:
- line = line.rstrip()
- if m := re_lib9p_msg_entry.fullmatch(line):
- typ = m.group("typ")
- lib9p_msgs.add(typ)
- re_call_objcall = re.compile(r"LO_CALL\((?P<obj>[^,]+), (?P<meth>[^,)]+)[,)].*")
+class PluginApplication:
+ _location_xform: typing.Callable[[str], str]
+ _plugins: list[Plugin]
+
+ def __init__(
+ self, location_xform: typing.Callable[[str], str], plugins: list[Plugin]
+ ) -> None:
+ self._location_xform = location_xform
+ self._plugins = plugins
+
+ def extra_nodes(self) -> typing.Collection[Node]:
+ ret: list[Node] = []
+ for plugin in self._plugins:
+ ret.extend(plugin.extra_nodes())
+ return ret
+
+ def location_xform(self, loc: str) -> str:
+ return self._location_xform(loc)
+
+ def indirect_callees(self, elem: VCGElem) -> tuple[list[str], bool]:
+ loc = elem.attrs.get("label", "")
+ line = read_source(loc)
+
+ for plugin in self._plugins:
+ ret = plugin.indirect_callees(loc, line)
+ if ret is not None:
+ return ret
+
+ placeholder = "__indirect_call"
+ if m := re_call_other.fullmatch(line):
+ placeholder += ":" + m.group("func")
+ placeholder += " at " + self.location_xform(elem.attrs.get("label", ""))
+ return [placeholder], False
+
+ def skip_call(self, chain: list[str], funcname: str) -> bool:
+ for plugin in self._plugins:
+ if plugin.skip_call(chain, funcname):
+ return True
+ return False
+
+
+################################################################################
+# Application-specific code
+
+
+class AppPlugin:
+ def extra_nodes(self) -> typing.Collection[Node]:
+ return []
+
+ def indirect_callees(self, loc: str, line: str) -> tuple[list[str], bool] | None:
+ if "/3rd-party/" in loc:
+ return None
+ if "srv->auth" in line:
+ return [], False
+ if "srv->rootdir" in line:
+ return ["get_root"], False
+ return None
+
+ def skip_call(self, chain: list[str], call: str) -> bool:
+ return False
+
+
+class LibObjPlugin:
+ objcalls: dict[str, set[str]] # method_name => {method_impls}
+
+ def __init__(self, arg_c_fnames: typing.Collection[str]) -> None:
+ ifaces: dict[str, set[str]] = {} # iface_name => {method_names}
+ re_comment = re.compile(r"/\*.*?\*/")
+ re_ws = re.compile(r"\s+")
+ re_lo_iface = re.compile(r"^\s*#\s*define\s+(?P<name>\S+)_LO_IFACE")
+ re_lo_func = re.compile(r"LO_FUNC *\([^,]*, *(?P<name>[^,) ]+) *[,)]")
+ for fname in arg_c_fnames:
+ with open(fname, "r") as fh:
+ while line := fh.readline():
+ if m := re_lo_iface.match(line):
+ iface_name = m.group("name")
+ if iface_name not in ifaces:
+ ifaces[iface_name] = set()
+ while line.endswith("\\\n"):
+ line += fh.readline()
+ line = line.replace("\\\n", " ")
+ line = re_comment.sub(" ", line)
+ line = re_ws.sub(" ", line)
+ for m2 in re_lo_func.finditer(line):
+ ifaces[iface_name].add(m2.group("name"))
+
+ implementations: dict[str, set[str]] = {} # iface_name => {impl_names}
+ for iface_name in ifaces:
+ implementations[iface_name] = set()
+ re_lo_implementation = re.compile(
+ r"^LO_IMPLEMENTATION_[HC]\s*\(\s*(?P<iface>[^, ]+)\s*,\s*(?P<impl_typ>[^,]+)\s*,\s*(?P<impl_name>[^, ]+)\s*[,)].*"
+ )
+ for fname in arg_c_fnames:
+ with open(fname, "r") as fh:
+ for line in fh:
+ line = line.strip()
+ if m := re_lo_implementation.match(line):
+ implementations[m.group("iface")].add(m.group("impl_name"))
+
+ objcalls: dict[str, set[str]] = {} # method_name => {method_impls}
+ for iface_name in ifaces:
+ for method_name in ifaces[iface_name]:
+ if method_name not in objcalls:
+ objcalls[method_name] = set()
+ for impl_name in implementations[iface_name]:
+ objcalls[method_name].add(impl_name + "_" + method_name)
+ self.objcalls = objcalls
+
+ def extra_nodes(self) -> typing.Collection[Node]:
+ return []
+
+ def indirect_callees(self, loc: str, line: str) -> tuple[list[str], bool] | None:
+ re_call_objcall = re.compile(r"LO_CALL\((?P<obj>[^,]+), (?P<meth>[^,)]+)[,)].*")
- def sbc_indirect_callees(loc: str, line: str) -> list[str] | None:
if "/3rd-party/" in loc:
return None
if m := re_call_objcall.fullmatch(line):
- if m.group("meth") in objcalls:
- return sorted(objcalls[m.group("meth")])
- return [f"__indirect_call:{m.group('obj')}.vtable->{m.group('meth')}"]
+ if m.group("meth") in self.objcalls:
+ return sorted(self.objcalls[m.group("meth")]), True
+ return [
+ f"__indirect_call:{m.group('obj')}.vtable->{m.group('meth')}"
+ ], False
+ return None
+
+ def skip_call(self, chain: list[str], call: str) -> bool:
+ return False
+
+
+class LibHWPlugin:
+ def extra_nodes(self) -> typing.Collection[Node]:
+ return []
+
+ def indirect_callees(self, loc: str, line: str) -> tuple[list[str], bool] | None:
+ if "/3rd-party/" in loc:
+ return None
if "trigger->cb(trigger->cb_arg)" in line:
return [
"alarmclock_sleep_intrhandler",
"w5500_tcp_alarm_handler",
"w5500_udp_alarm_handler",
- ]
+ ], True
+ return None
+
+ def skip_call(self, chain: list[str], call: str) -> bool:
+ return False
+
+
+class LibCRIPCPlugin:
+ def extra_nodes(self) -> typing.Collection[Node]:
+ return []
+
+ def indirect_callees(self, loc: str, line: str) -> tuple[list[str], bool] | None:
+ if "/3rd-party/" in loc:
+ return None
if "/chan.h:" in loc and "front->dequeue(" in line:
return [
"_cr_chan_dequeue",
"_cr_select_dequeue",
- ]
- if tmessage_handlers and "/srv.c:" in loc and "tmessage_handlers[typ](" in line:
- return sorted(tmessage_handlers)
- if lib9p_msgs and "/9p.c:" in loc:
+ ], True
+ return None
+
+ def skip_call(self, chain: list[str], call: str) -> bool:
+ return False
+
+
+class Lib9PPlugin:
+ tmessage_handlers: set[str] | None
+ lib9p_msgs: set[str]
+ _CONFIG_9P_NUM_SOCKS: int | None
+ CONFIG_9P_SRV_MAX_REQS: int | None
+ CONFIG_9P_SRV_MAX_DEPTH: int | None
+
+ def __init__(self, arg_base_dir: str, arg_c_fnames: typing.Collection[str]) -> None:
+ # Find filenames #######################################################
+
+ def _is_config_h(fname: str) -> bool:
+ if not fname.startswith(arg_base_dir + "/"):
+ return False
+ suffix = fname[len(arg_base_dir) + 1 :]
+ if suffix.startswith("3rd-party/"):
+ return False
+ return suffix.endswith("/config.h")
+
+ config_h_fname = get_zero_or_one(_is_config_h, arg_c_fnames)
+
+ lib9p_srv_c_fname = get_zero_or_one(
+ lambda fname: fname.endswith("lib9p/srv.c"), arg_c_fnames
+ )
+
+ lib9p_generated_c_fname = get_zero_or_one(
+ lambda fname: fname.endswith("lib9p/9p.generated.c"), arg_c_fnames
+ )
+
+ # Read config ##########################################################
+
+ def config_h_get(varname: str) -> int | None:
+ if config_h_fname:
+ with open(config_h_fname, "r") as fh:
+ for line in fh:
+ line = line.rstrip()
+ if line.startswith("#define"):
+ parts = line.split()
+ if parts[1] == varname:
+ return int(parts[2])
+ return None
+
+ self._CONFIG_9P_NUM_SOCKS = config_h_get("_CONFIG_9P_NUM_SOCKS")
+ self.CONFIG_9P_SRV_MAX_REQS = config_h_get("CONFIG_9P_SRV_MAX_REQS")
+ self.CONFIG_9P_SRV_MAX_DEPTH = config_h_get("CONFIG_9P_SRV_MAX_DEPTH")
+
+ # Read sources #########################################################
+
+ tmessage_handlers: set[str] | None = None
+ if lib9p_srv_c_fname:
+ re_tmessage_handler = re.compile(
+ r"^\s*\[LIB9P_TYP_T[^]]+\]\s*=\s*\(tmessage_handler\)\s*(?P<handler>\S+),\s*$"
+ )
+ tmessage_handlers = set()
+ with open(lib9p_srv_c_fname, "r") as fh:
+ for line in fh:
+ line = line.rstrip()
+ if m := re_tmessage_handler.fullmatch(line):
+ tmessage_handlers.add(m.group("handler"))
+ self.tmessage_handlers = tmessage_handlers
+
+ lib9p_msgs: set[str] = set()
+ if lib9p_generated_c_fname:
+ re_lib9p_msg_entry = re.compile(r"^\s*_MSG_(?:[A-Z]+)\((?P<typ>\S+)\),$")
+ with open(lib9p_generated_c_fname, "r") as fh:
+ for line in fh:
+ line = line.rstrip()
+ if m := re_lib9p_msg_entry.fullmatch(line):
+ typ = m.group("typ")
+ lib9p_msgs.add(typ)
+ self.lib9p_msgs = lib9p_msgs
+
+ def thread_count(self, name: str) -> int:
+ assert self._CONFIG_9P_NUM_SOCKS
+ assert self.CONFIG_9P_SRV_MAX_REQS
+ if "read" in name:
+ return self._CONFIG_9P_NUM_SOCKS
+ elif "write" in name:
+ return self._CONFIG_9P_NUM_SOCKS * self.CONFIG_9P_SRV_MAX_REQS
+ return 1
+
+ def extra_nodes(self) -> typing.Collection[Node]:
+ return []
+
+ def indirect_callees(self, loc: str, line: str) -> tuple[list[str], bool] | None:
+ if "/3rd-party/" in loc:
+ return None
+ if (
+ self.tmessage_handlers
+ and "/srv.c:" in loc
+ and "tmessage_handlers[typ](" in line
+ ):
+ # Functions for disabled protocol extensions will be missing.
+ return sorted(self.tmessage_handlers), True
+ if self.lib9p_msgs and "/9p.c:" in loc:
for meth in ["validate", "unmarshal", "marshal"]:
if line.startswith(f"tentry.{meth}("):
- return sorted(f"{meth}_{msg}" for msg in lib9p_msgs)
+ # Functions for disabled protocol extensions will be missing.
+ return sorted(f"{meth}_{msg}" for msg in self.lib9p_msgs), True
return None
- hooks_indirect_callees += [sbc_indirect_callees]
-
- def sbc_is_thread(name: str) -> int:
- if name.endswith("_cr") and name != "lib9p_srv_read_cr":
- if "9p" in name:
- # TODO: FIXME: Sniff these numbers from config.h
- _CONFIG_9P_NUM_SOCKS = 3
- CONFIG_9P_SRV_MAX_REQS = 2
- if "read" in name:
- return _CONFIG_9P_NUM_SOCKS
- elif "write" in name:
- return _CONFIG_9P_NUM_SOCKS * CONFIG_9P_SRV_MAX_REQS
- return 1
- if name == "main":
- return True
+ def skip_call(self, chain: list[str], call: str) -> bool:
+ if "lib9p/srv.c:srv_util_pathfree" in call:
+ assert isinstance(self.CONFIG_9P_SRV_MAX_DEPTH, int)
+ if len(chain) >= self.CONFIG_9P_SRV_MAX_DEPTH and all(
+ ("lib9p/srv.c:srv_util_pathfree" in c)
+ for c in chain[-self.CONFIG_9P_SRV_MAX_DEPTH :]
+ ):
+ return True
return False
- def sbc_is_intrhandler(name: str) -> bool:
- return name in [
- "rp2040_hwtimer_intrhandler",
- "_cr_gdb_intrhandler",
- "hostclock_handle_sig_alarm",
- "hostnet_handle_sig_io",
- ]
-
- hooks_is_intrhandler += [sbc_is_intrhandler]
- sbc_gpio_handlers = [
- "w5500_intrhandler",
- ]
+class LibMiscPlugin:
+ def extra_nodes(self) -> typing.Collection[Node]:
+ return []
- # 1=just root directory
- # 2=just files in root directory
- # 3=just 1 level of subdirectories
- # 4=just 2 levels of subdirectories
- # ...
- #
- # TODO: FIXME: Sniff this from config.h
- CONFIG_9P_SRV_MAX_DEPTH = 3
+ def indirect_callees(self, loc: str, line: str) -> tuple[list[str], bool] | None:
+ return None
- def sbc_skip_call(chain: list[str], call: str) -> bool:
+ def skip_call(self, chain: list[str], call: str) -> bool:
if (
len(chain) > 1
and chain[-1] == "__assert_msg_fail"
@@ -468,103 +643,85 @@ def main(
and "__assert_msg_fail" in chain[:-1]
):
return True
- if (
- len(chain) >= CONFIG_9P_SRV_MAX_DEPTH
- and "/srv.c:srv_util_pathfree" in call
- and all(
- ("/srv.c:srv_util_pathfree" in c)
- for c in chain[-CONFIG_9P_SRV_MAX_DEPTH:]
- )
- ):
- return True
return False
- hooks_skip_call += [sbc_skip_call]
-
- # pico-sdk #######################################################
- if arg_pico_platform == "rp2040":
-
- def pico_is_intrhandler(name: str) -> bool:
- return name in [
- "gpio_default_irq_handler",
- ]
+class PicoSDKPlugin:
+ app_gpio_handlers: typing.Collection[str]
- hooks_is_intrhandler += [pico_is_intrhandler]
+ def __init__(self, app_gpio_handlers: typing.Collection[str]) -> None:
+ self.app_gpio_handlers = app_gpio_handlers
- def pico_indirect_callees(loc: str, line: str) -> list[str] | None:
- if "/3rd-party/pico-sdk/" not in loc or "/3rd-party/pico-sdk/lib/" in loc:
- return None
- m = re_call_other.fullmatch(line)
- call: str | None = m.group("func") if m else None
-
- match call:
- case "connect_internal_flash_func":
- return ["rom_func_lookup(ROM_FUNC_CONNECT_INTERNAL_FLASH)"]
- case "flash_exit_xip_func":
- return ["rom_func_lookup(ROM_FUNC_FLASH_EXIT_XIP)"]
- case "flash_range_erase_func":
- return ["rom_func_lookup(ROM_FUNC_FLASH_RANGE_ERASE)"]
- case "flash_flush_cache_func":
- return ["rom_func_lookup(ROM_FUNC_FLASH_FLUSH_CACHE)"]
- case "rom_table_lookup":
- return ["rom_hword_as_ptr(BOOTROM_TABLE_LOOKUP_OFFSET)"]
- if "/flash.c:" in loc and "boot2_copyout" in line:
- return ["_stage2_boot"]
- if "/gpio.c:" in loc and call == "callback":
- return sbc_gpio_handlers
- if "/printf.c:" in loc:
- if call == "out":
- return [
- "_out_buffer",
- "_out_null",
- "_out_fct",
- ]
- if "->fct(" in line:
- return ["stdio_buffered_printer"]
- if "/stdio.c:" in loc:
- if call == "out_func":
- return [
- "stdio_out_chars_crlf",
- "stdio_out_chars_no_crlf",
- ]
- if call and (call.startswith("d->") or call.startswith("driver->")):
- _, meth = call.split("->", 1)
- match meth:
- case "out_chars":
- return ["stdio_uart_out_chars"]
- case "out_flush":
- return ["stdio_uart_out_flush"]
- case "in_chars":
- return ["stdio_uart_in_chars"]
+ def indirect_callees(self, loc: str, line: str) -> tuple[list[str], bool] | None:
+ if "/3rd-party/pico-sdk/" not in loc or "/3rd-party/pico-sdk/lib/" in loc:
return None
+ m = re_call_other.fullmatch(line)
+ call: str | None = m.group("func") if m else None
+
+ match call:
+ case "connect_internal_flash_func":
+ return ["rom_func_lookup(ROM_FUNC_CONNECT_INTERNAL_FLASH)"], False
+ case "flash_exit_xip_func":
+ return ["rom_func_lookup(ROM_FUNC_FLASH_EXIT_XIP)"], False
+ case "flash_range_erase_func":
+ return ["rom_func_lookup(ROM_FUNC_FLASH_RANGE_ERASE)"], False
+ case "flash_flush_cache_func":
+ return ["rom_func_lookup(ROM_FUNC_FLASH_FLUSH_CACHE)"], False
+ case "rom_table_lookup":
+ return ["rom_hword_as_ptr(BOOTROM_TABLE_LOOKUP_OFFSET)"], False
+ if "/flash.c:" in loc and "boot2_copyout" in line:
+ return ["_stage2_boot"], False
+ if "/gpio.c:" in loc and call == "callback":
+ return sorted(self.app_gpio_handlers), True
+ if "/printf.c:" in loc:
+ if call == "out":
+ return [
+ "_out_buffer",
+ "_out_null",
+ "_out_fct",
+ ], False
+ if "->fct(" in line:
+ return ["stdio_buffered_printer"], False
+ if "/stdio.c:" in loc:
+ if call == "out_func":
+ return [
+ "stdio_out_chars_crlf",
+ "stdio_out_chars_no_crlf",
+ ], False
+ if call and (call.startswith("d->") or call.startswith("driver->")):
+ _, meth = call.split("->", 1)
+ match meth:
+ case "out_chars":
+ return ["stdio_uart_out_chars"], False
+ case "out_flush":
+ return ["stdio_uart_out_flush"], False
+ case "in_chars":
+ return ["stdio_uart_in_chars"], False
+ return None
- hooks_indirect_callees += [pico_indirect_callees]
-
- def pico_skip_call(chain: list[str], call: str) -> bool:
- if call == "_out_buffer" or call == "_out_fct":
- last = ""
- for pcall in chain:
- if pcall in [
- "__wrap_sprintf",
- "__wrap_snprintf",
- "__wrap_vsnprintf",
- "vfctprintf",
- ]:
- last = pcall
- if last == "vfctprintf":
- return call != "_out_fct"
- else:
- return call == "_out_buffer"
- return False
-
- hooks_skip_call += [pico_skip_call]
+ def skip_call(self, chain: list[str], call: str) -> bool:
+ if call == "_out_buffer" or call == "_out_fct":
+ last = ""
+ for pcall in chain:
+ if pcall in [
+ "__wrap_sprintf",
+ "__wrap_snprintf",
+ "__wrap_vsnprintf",
+ "vfctprintf",
+ ]:
+ last = pcall
+ if last == "vfctprintf":
+ return call != "_out_fct"
+ else:
+ return call == "_out_buffer"
+ return False
+ def extra_nodes(self) -> typing.Collection[Node]:
# src/rp2_common/hardware_divider/include/hardware/divider_helper.S
save_div_state_and_lr = 5 * 4
# src/rp2_common/pico_divider/divider_hardware.S
save_div_state_and_lr_64 = 5 * 4
- all_nodes += [
+ return [
# src/rp2_common/pico_int64_ops/pico_int64_ops_aeabi.S
synthetic_node("__aeabi_lmul", 4),
# src/rp2_common/pico_divider/divider_hardware.S
@@ -634,17 +791,29 @@ def main(
# synthetic_node("rom_hword_as_ptr(BOOTROM_TABLE_LOOKUP_OFFSET)", 0), # TODO
]
- # TinyUSB device #################################################
- if any(fname.endswith("/tinyusb/src/device/usbd.c") for fname in c_fnames):
- tusb_config_fname = (
- arg_base_dir + "/cmd/sbc_harness/config/tusb_config.h"
- ) # TODO: FIXME
+class TinyUSBDevicePlugin:
+ tud_drivers: dict[str, set[str]]
+
+ def __init__(self, arg_c_fnames: typing.Collection[str]) -> None:
+ usbd_c_fname = get_zero_or_one(
+ lambda fname: fname.endswith("/tinyusb/src/device/usbd.c"), arg_c_fnames
+ )
+
+ tusb_config_h_fname = get_zero_or_one(
+ lambda fname: fname.endswith("/tusb_config.h"), arg_c_fnames
+ )
+
+ if not usbd_c_fname:
+ self.tud_drivers = {}
+ return
+
+ assert tusb_config_h_fname
re_tud_class = re.compile(
r"^\s*#\s*define\s+(?P<k>CFG_TUD_(?:\S{3}|AUDIO|VIDEO|MIDI|VENDOR|USBTMC|DFU_RUNTIME|ECM_RNDIS))\s+(?P<v>\S+).*"
)
tusb_config: dict[str, bool] = {}
- with open(tusb_config_fname, "r") as fh:
+ with open(tusb_config_h_fname, "r") as fh:
in_table = False
for line in fh:
line = line.rstrip()
@@ -653,9 +822,6 @@ def main(
v = m.group("v")
tusb_config[k] = bool(int(v))
- usbd_fname = next(
- fname for fname in c_fnames if fname.endswith("/tinyusb/src/device/usbd.c")
- )
tud_drivers: dict[str, set[str]] = {}
re_tud_entry = re.compile(
r"^\s+\.(?P<meth>\S+)\s*=\s*(?P<impl>[a-zA-Z0-9_]+)(?:,.*)?"
@@ -663,7 +829,7 @@ def main(
re_tud_if1 = re.compile(r"^\s*#\s*if (\S+)\s*")
re_tud_if2 = re.compile(r"^\s*#\s*if (\S+)\s*\|\|\s*(\S+)\s*")
re_tud_endif = re.compile(r"^\s*#\s*endif\s*")
- with open(usbd_fname, "r") as fh:
+ with open(usbd_c_fname, "r") as fh:
in_table = False
enabled = True
for line in fh:
@@ -688,40 +854,41 @@ def main(
in_table = False
elif " _usbd_driver[] = {" in line:
in_table = True
+ self.tud_drivers = tud_drivers
- def tud_indirect_callees(loc: str, line: str) -> list[str] | None:
- if (
- "/tinyusb/" not in loc
- or "/tinyusb/src/host/" in loc
- or "_host.c:" in loc
- ):
- return None
- m = re_call_other.fullmatch(line)
- assert m
- call = m.group("func")
- if call == "_ctrl_xfer.complete_cb":
- return [
- # "process_test_mode_cb",
- "tud_vendor_control_xfer_cb",
- *sorted(tud_drivers["control_xfer_cb"]),
- ]
- elif call.startswith("driver->"):
- return sorted(tud_drivers[call[len("driver->") :]])
- elif call == "event.func_call.func":
- # callback from usb_defer_func()
- return []
+ def extra_nodes(self) -> typing.Collection[Node]:
+ return []
+ def indirect_callees(self, loc: str, line: str) -> tuple[list[str], bool] | None:
+ if "/tinyusb/" not in loc or "/tinyusb/src/host/" in loc or "_host.c:" in loc:
return None
+ m = re_call_other.fullmatch(line)
+ assert m
+ call = m.group("func")
+ if call == "_ctrl_xfer.complete_cb":
+ return [
+ # "process_test_mode_cb",
+ "tud_vendor_control_xfer_cb",
+ *sorted(self.tud_drivers["control_xfer_cb"]),
+ ], False
+ elif call.startswith("driver->"):
+ return sorted(self.tud_drivers[call[len("driver->") :]]), False
+ elif call == "event.func_call.func":
+ # callback from usb_defer_func()
+ return [], False
+
+ return None
- hooks_indirect_callees += [tud_indirect_callees]
+ def skip_call(self, chain: list[str], call: str) -> bool:
+ return False
- # newlib #########################################################
- if arg_pico_platform == "rp2040":
+class NewlibPlugin:
+ def extra_nodes(self) -> typing.Collection[Node]:
# This is accurate to
# /usr/arm-none-eabi/lib/thumb/v6-m/nofp/libg.a as of
# Parabola's arm-none-eabi-newlib 4.5.0.20241231-1.
- all_nodes += [
+ return [
# malloc
synthetic_node("free", 8, {"_free_r"}),
synthetic_node("malloc", 8, {"_malloc_r"}),
@@ -751,18 +918,96 @@ def main(
synthetic_node("random", 8),
]
- # libgcc #########################################################
+ def indirect_callees(self, loc: str, line: str) -> tuple[list[str], bool] | None:
+ return None
- if arg_pico_platform == "rp2040":
+ def skip_call(self, chain: list[str], call: str) -> bool:
+ return False
+
+
+class LibGCCPlugin:
+ def extra_nodes(self) -> typing.Collection[Node]:
# This is accurate to
# /usr/lib/gcc/arm-none-eabi/14.2.0/thumb/v6-m/nofp/libgcc.a
# as of Parabola's arm-none-eabi-gcc 14.2.0-1.
- all_nodes += [
+ return [
synthetic_node("__aeabi_idiv0", 0),
synthetic_node("__aeabi_ldiv0", 0),
synthetic_node("__aeabi_llsr", 0),
]
+ def indirect_callees(self, loc: str, line: str) -> tuple[list[str], bool] | None:
+ return None
+
+ def skip_call(self, chain: list[str], call: str) -> bool:
+ return False
+
+
+def main(
+ *,
+ arg_pico_platform: str,
+ arg_base_dir: str,
+ arg_ci_fnames: typing.Collection[str],
+ arg_c_fnames: typing.Collection[str],
+) -> None:
+
+ plugins: list[Plugin] = []
+ hooks_is_intrhandler: list[typing.Callable[[str], bool]] = []
+
+ # sbc-harness ####################################################
+
+ lib9p_plugin = Lib9PPlugin(arg_base_dir, arg_c_fnames)
+
+ sbc_gpio_handlers = [
+ "w5500_intrhandler",
+ ]
+
+ def sbc_is_thread(name: str) -> int:
+ if name.endswith("_cr") and name != "lib9p_srv_read_cr":
+ if "9p" in name:
+ return lib9p_plugin.thread_count(name)
+ return 1
+ if name == "main":
+ return 1
+ return 0
+
+ def sbc_is_intrhandler(name: str) -> bool:
+ return name in [
+ "rp2040_hwtimer_intrhandler",
+ "_cr_gdb_intrhandler",
+ "hostclock_handle_sig_alarm",
+ "hostnet_handle_sig_io",
+ ]
+
+ hooks_is_intrhandler += [sbc_is_intrhandler]
+
+ plugins += [
+ AppPlugin(),
+ LibObjPlugin(arg_c_fnames),
+ LibHWPlugin(),
+ LibCRIPCPlugin(),
+ lib9p_plugin,
+ LibMiscPlugin(),
+ ]
+
+ # pico-sdk #######################################################
+
+ if arg_pico_platform == "rp2040":
+
+ def pico_is_intrhandler(name: str) -> bool:
+ return name in [
+ "gpio_default_irq_handler",
+ ]
+
+ hooks_is_intrhandler += [pico_is_intrhandler]
+
+ plugins += [
+ PicoSDKPlugin(sbc_gpio_handlers),
+ TinyUSBDevicePlugin(arg_c_fnames),
+ NewlibPlugin(),
+ LibGCCPlugin(),
+ ]
+
# Tie it all together ############################################
def thread_filter(name: str) -> int:
@@ -787,38 +1032,14 @@ def main(
parts[0] = "./" + os.path.relpath(parts[0], arg_base_dir)
return ":".join(parts)
- def indirect_callees(elem: VCGElem) -> list[str]:
- loc = elem.attrs.get("label", "")
- line = read_source(loc)
-
- for hook in hooks_indirect_callees:
- ret = hook(loc, line)
- if ret is not None:
- return ret
-
- placeholder = "__indirect_call"
- if m := re_call_other.fullmatch(line):
- placeholder += ":" + m.group("func")
- placeholder += " at " + location_xform(elem.attrs.get("label", ""))
- return [placeholder]
-
- def skip_call(chain: list[str], call: str) -> bool:
- for hook in hooks_skip_call:
- if hook(chain, call):
- return True
- return False
-
result = analyze(
ci_fnames=arg_ci_fnames,
- extra_nodes=all_nodes,
app_func_filters={
"Threads": thread_filter,
"Interrupt handlers": intrhandler_filter,
"Misc": misc_filter,
},
- app_location_xform=location_xform,
- app_indirect_callees=indirect_callees,
- app_skip_call=skip_call,
+ app=PluginApplication(location_xform, plugins),
cfg_max_call_depth=100,
)
@@ -876,20 +1097,29 @@ def main(
if __name__ == "__main__":
- pico_platform = sys.argv[1]
- base_dir = sys.argv[2]
- fnames = sys.argv[3:]
-
- re_suffix = re.compile(r"\.c\.o(bj)?$")
- ci_fnames = [
- re_suffix.sub(".c.ci", fname) for fname in fnames if re_suffix.search(fname)
- ]
- c_fnames = [fname for fname in fnames if fname.endswith(".c")]
+ def _main() -> None:
+ pico_platform = sys.argv[1]
+ base_dir = sys.argv[2]
+ obj_fnames = set(sys.argv[3:])
+
+ re_c_obj_suffix = re.compile(r"\.c\.(?:o|obj)$")
+
+ c_fnames: set[str] = set()
+ ci_fnames: set[str] = set()
+ for obj_fname in obj_fnames:
+ if re_c_obj_suffix.search(obj_fname):
+ ci_fnames.add(re_c_obj_suffix.sub(".c.ci", obj_fname))
+ with open(obj_fname + ".d", "r") as fh:
+ c_fnames.update(
+ fh.read().replace("\\\n", " ").split(":")[-1].split()
+ )
+
+ main(
+ arg_pico_platform=pico_platform,
+ arg_base_dir=base_dir,
+ arg_ci_fnames=ci_fnames,
+ arg_c_fnames=c_fnames,
+ )
- main(
- arg_pico_platform=pico_platform,
- arg_base_dir=base_dir,
- arg_ci_fnames=ci_fnames,
- arg_c_fnames=c_fnames,
- )
+ _main()
diff --git a/cmd/sbc_harness/main.c b/cmd/sbc_harness/main.c
index 18a649e..6f1d0ca 100644
--- a/cmd/sbc_harness/main.c
+++ b/cmd/sbc_harness/main.c
@@ -138,8 +138,10 @@ static COROUTINE dhcp_cr(void *) {
static COROUTINE read9p_cr(void *) {
cr_begin();
- lib9p_srv_read_cr(&globals.srv,
- LO_CALL(lo_box_w5500_if_as_net_iface(&globals.dev_w5500), tcp_listen, CONFIG_9P_PORT));
+ lo_interface net_iface iface = lo_box_w5500_if_as_net_iface(&globals.dev_w5500);
+ lo_interface net_stream_listener listener = LO_CALL(iface, tcp_listen, CONFIG_9P_PORT);
+
+ lib9p_srv_read_cr(&globals.srv, listener);
cr_end();
}
diff --git a/lib9p/tests/test_server/config/config.h b/lib9p/tests/test_server/config/config.h
index 201cfd0..67960ca 100644
--- a/lib9p/tests/test_server/config/config.h
+++ b/lib9p/tests/test_server/config/config.h
@@ -7,7 +7,8 @@
#ifndef _CONFIG_H_
#define _CONFIG_H_
-#define CONFIG_SRV9P_NUM_CONNS 8
+#define _CONFIG_9P_NUM_SOCKS 8
+#define CONFIG_SRV9P_NUM_CONNS _CONFIG_9P_NUM_SOCKS
/* 9P *************************************************************************/