diff options
Diffstat (limited to 'build-aux')
-rw-r--r-- | build-aux/measurestack/analyze.py | 225 | ||||
-rw-r--r-- | build-aux/measurestack/app_main.py | 2 | ||||
-rw-r--r-- | build-aux/measurestack/app_plugins.py | 131 | ||||
-rw-r--r-- | build-aux/measurestack/test_analyze.py | 55 | ||||
-rw-r--r-- | build-aux/measurestack/test_app_plugins.py | 379 | ||||
-rw-r--r-- | build-aux/measurestack/testutil.py | 131 | ||||
-rw-r--r-- | build-aux/measurestack/util.py | 4 |
7 files changed, 814 insertions, 113 deletions
diff --git a/build-aux/measurestack/analyze.py b/build-aux/measurestack/analyze.py index a93874f..67c44ce 100644 --- a/build-aux/measurestack/analyze.py +++ b/build-aux/measurestack/analyze.py @@ -3,24 +3,103 @@ # Copyright (C) 2024-2025 Luke T. Shumaker <lukeshu@lukeshu.com> # SPDX-License-Identifier: AGPL-3.0-or-later +import random import re import sys import typing from . import vcg +# Whether to print "//dbg-cache:" on cache writes +dbg_cache = False +# Whether to print the graph in a /* comment */ before processing it +dbg_dumpgraph = False +# Whether to print "//dbg-nstatic:" lines that trace nstatic() execution +dbg_nstatic = False +# Whether to disable nstatic() caching (but does NOT disable any cache-related debug logging) +dbg_nocache = False +# Whether to sort things for consistently-ordered execution, or shuffle things to detect bugs +dbg_sort: typing.Literal["unsorted", "sorted", "shuffled"] = "unsorted" + # pylint: disable=unused-variable __all__ = [ "BaseName", "QName", "UsageKind", "Node", + "maybe_sorted", "AnalyzeResultVal", "AnalyzeResultGroup", "AnalyzeResult", "analyze", ] + +def dumps(x: typing.Any, depth: int = 0, compact: bool = False) -> str: + match x: + case int() | str() | None: + return repr(x) + case dict(): + if len(x) == 0: + return "{}" + ret = "{" + if not compact: + ret += "\n" + for k, v in x.items(): + if not compact: + ret += "\t" * (depth + 1) + ret += dumps(k, depth + 1, True) + ret += ":" + if not compact: + ret += " " + ret += dumps(v, depth + 1, compact) + ret += "," + if not compact: + ret += "\n" + if not compact: + ret += "\t" * depth + ret += "}" + return ret + case list(): + if len(x) == 0: + return "[]" + ret = "[" + if not compact: + ret += "\n" + for v in x: + if not compact: + ret += "\t" * (depth + 1) + ret += dumps(v, depth + 1, compact) + ret += "," + if not compact: + ret += "\n" + if not compact: + ret += "\t" * depth + ret += "]" + return ret + case set(): + if len(x) == 0: + return "set()" + ret = "{" + if not compact: + ret += "\n" + for v in x: + if not compact: + ret += "\t" * (depth + 1) + ret += dumps(v, depth + 1, compact) + ret += "," + if not compact: + ret += "\n" + if not compact: + ret += "\t" * depth + ret += "}" + return ret + case _: + if hasattr(x, "__dict__"): + return f"{x.__class__.__name__}(*{dumps(x.__dict__, depth, compact)})" + return f"TODO({x.__class__.__name__})" + + # types ######################################################################## @@ -152,28 +231,39 @@ class AnalyzeResult(typing.NamedTuple): class SkipModel(typing.NamedTuple): """Running the skipmodel calls `.fn(chain, ...)` with the chain - consisting of the last `.nchain` items (if .nchain is an int), or - the chain starting with the *last* occurance of `.nchain` (if - .nchain is a collection). If the chain is not that long or does - not contain a member of the collection, then .fn is not called and - the call is *not* skipped. + consisting of the last few items of the input chain. + + If `.nchain` is an int: + + - the chain is the last `.nchain` items or the input chain. If + the input chain is not that long, then `.fn` is not called and + the call is *not* skipped. + If `.nchain` is a collection: + + - the chain starts with the *last* occurance of `.nchain` in the + input chain. If the input chain does not contain a member of + the collection, then .fn is called with an empty chain. """ nchain: int | typing.Collection[BaseName] - fn: typing.Callable[[typing.Sequence[QName], QName], bool] - - def __call__(self, chain: typing.Sequence[QName], call: QName) -> tuple[bool, int]: - if isinstance(self.nchain, int): - if len(chain) >= self.nchain: - _chain = chain[-self.nchain :] - return self.fn(_chain, call), len(_chain) - else: - for i in reversed(range(len(chain))): - if chain[i].base() in self.nchain: - _chain = chain[i - 1 :] - return self.fn(_chain, call), len(_chain) - return False, 0 + fn: typing.Callable[[typing.Sequence[QName], Node, QName], bool] + + def __call__( + self, chain: typing.Sequence[QName], node: Node, call: QName + ) -> tuple[bool, int]: + match self.nchain: + case int(): + if len(chain) >= self.nchain: + _chain = chain[-self.nchain :] + return self.fn(_chain, node, call), len(_chain) + 1 + return False, 0 + case _: + for i in reversed(range(len(chain))): + if chain[i].base() in self.nchain: + _chain = chain[i:] + return self.fn(_chain, node, call), len(_chain) + 1 + return self.fn([], node, call), 1 class Application(typing.Protocol): @@ -235,6 +325,39 @@ class _Graph: return self._resolve_cache[funcname] +if typing.TYPE_CHECKING: + from _typeshed import SupportsRichComparisonT as _T_sortable + +_T = typing.TypeVar("_T") + + +@typing.overload +def maybe_sorted( + unsorted: typing.Iterable["_T_sortable"], /, *, key: None = None +) -> typing.Iterable["_T_sortable"]: ... +@typing.overload +def maybe_sorted( + unsorted: typing.Iterable[_T], /, *, key: typing.Callable[[_T], "_T_sortable"] +) -> typing.Iterable[_T]: ... + + +def maybe_sorted( + unsorted: typing.Iterable[_T], + /, + *, + key: typing.Callable[[_T], "_T_sortable"] | None = None, +) -> typing.Iterable[_T]: + match dbg_sort: + case "unsorted": + return unsorted + case "sorted": + return sorted(unsorted, key=key) # type: ignore + case "shuffled": + ret = [*unsorted] + random.shuffle(ret) + return ret + + def _make_graph( ci_fnames: typing.Collection[str], app: Application, @@ -297,7 +420,7 @@ def _make_graph( raise ValueError(f"unknown caller: {caller}") if callee == QName("__indirect_call"): callees, missing_ok = app.indirect_callees(elem) - for callee in callees: + for callee in maybe_sorted(callees): if callee not in graph[caller].calls: graph[caller].calls[callee] = missing_ok else: @@ -305,12 +428,15 @@ def _make_graph( case _: raise ValueError(f"unknown elem type {elem.typ!r}") - for ci_fname in ci_fnames: + for ci_fname in maybe_sorted(ci_fnames): with open(ci_fname, "r", encoding="utf-8") as fh: for elem in vcg.parse_vcg(fh): handle_elem(elem) - for node in app.extra_nodes(): + def sort_key(node: Node) -> QName: + return node.funcname + + for node in maybe_sorted(app.extra_nodes(), key=sort_key): if node.funcname in graph: raise ValueError(f"duplicate node {node.funcname}") graph[node.funcname] = node @@ -332,33 +458,33 @@ def analyze( cfg_max_call_depth: int, ) -> AnalyzeResult: graphdata = _make_graph(ci_fnames, app) + if dbg_dumpgraph: + print(f"/* {dumps(graphdata)} */") missing: set[QName] = set() dynamic: set[QName] = set() included_funcs: set[QName] = set() - dbg = False - track_inclusion: bool = True skipmodels = app.skipmodels() for name, model in skipmodels.items(): - if isinstance(model.nchain, int): - assert model.nchain > 1 - else: + if not isinstance(model.nchain, int): assert len(model.nchain) > 0 _nstatic_cache: dict[QName, int] = {} def _nstatic(chain: list[QName], funcname: QName) -> tuple[int, int]: - nonlocal dbg nonlocal track_inclusion assert funcname in graphdata.graph + def putdbg(msg: str) -> None: + print(f"//dbg-nstatic: {'- '*len(chain)}{msg}") + node = graphdata.graph[funcname] - if dbg: - print(f"//dbg: {'- '*len(chain)}{funcname}\t{node.nstatic}") + if dbg_nstatic: + putdbg(f"{funcname}\t{node.nstatic}") if node.usage_kind == "dynamic" or node.ndynamic > 0: dynamic.add(funcname) if track_inclusion: @@ -378,37 +504,52 @@ def analyze( call_qname = graphdata.resolve_funcname(call_orig_qname) if not call_qname: if skipmodel: - skip, _ = skipmodel(chain, call_orig_qname) + skip, _ = skipmodel(chain[:-1], node, call_orig_qname) if skip: - if dbg: - print( - f"//dbg: {'- '*len(chain)}{call_orig_qname}\tskip missing" - ) + if dbg_nstatic: + putdbg(f"{call_orig_qname}\tskip missing") continue if not call_missing_ok: missing.add(call_orig_qname) - if dbg: - print(f"//dbg: {'- '*len(chain)}{call_orig_qname}\tmissing") + if dbg_nstatic: + putdbg(f"{call_orig_qname}\tmissing") continue # 2. Skip if skipmodel: - skip, skip_nchain = skipmodel(chain, call_qname) + skip, skip_nchain = skipmodel(chain[:-1], node, call_qname) max_call_nchain = max(max_call_nchain, skip_nchain) if skip: - if dbg: - print(f"//dbg: {'- '*len(chain)}{call_qname}\tskip") + if dbg_nstatic: + putdbg(f"{call_qname}\tskip") continue # 3. Call - if skip_nchain == 0 and call_qname in _nstatic_cache: - max_call_nstatic = max(max_call_nstatic, _nstatic_cache[call_qname]) + if ( + (not dbg_nocache) + and skip_nchain == 0 + and call_qname in _nstatic_cache + ): + call_nstatic = _nstatic_cache[call_qname] + if dbg_nstatic: + putdbg(f"{call_qname}\ttotal={call_nstatic} (cache-read)") + max_call_nstatic = max(max_call_nstatic, call_nstatic) else: call_nstatic, call_nchain = _nstatic(chain, call_qname) max_call_nstatic = max(max_call_nstatic, call_nstatic) max_call_nchain = max(max_call_nchain, call_nchain) if skip_nchain == 0 and call_nchain == 0: - _nstatic_cache[call_qname] = call_nstatic + if dbg_nstatic: + putdbg(f"{call_qname}\ttotal={call_nstatic} (cache-write)") + if call_qname not in _nstatic_cache: + if dbg_cache: + print(f"//dbg-cache: {call_qname} = {call_nstatic}") + _nstatic_cache[call_qname] = call_nstatic + else: + assert dbg_nocache + assert _nstatic_cache[call_qname] == call_nstatic + elif dbg_nstatic: + putdbg(f"{call_qname}\ttotal={call_nstatic} (do-not-cache)") chain.pop() return node.nstatic + max_call_nstatic, max(0, max_call_nchain - 1) diff --git a/build-aux/measurestack/app_main.py b/build-aux/measurestack/app_main.py index c670325..f705876 100644 --- a/build-aux/measurestack/app_main.py +++ b/build-aux/measurestack/app_main.py @@ -48,7 +48,7 @@ def main( plugins += [ app_plugins.CmdPlugin(), libmisc_plugin, - app_plugins.PicoFmtPlugin(arg_pico_platform), + app_plugins.PicoFmtPlugin(arg_pico_platform, lib9p_plugin.formatters), app_plugins.LibHWPlugin(arg_pico_platform, libmisc_plugin), app_plugins.LibCRPlugin(), app_plugins.LibCRIPCPlugin(), diff --git a/build-aux/measurestack/app_plugins.py b/build-aux/measurestack/app_plugins.py index ae2dba9..e365f82 100644 --- a/build-aux/measurestack/app_plugins.py +++ b/build-aux/measurestack/app_plugins.py @@ -55,17 +55,16 @@ class CmdPlugin: return {} -re_comment = re.compile(r"/\*.*?\*/") -re_ws = re.compile(r"\s+") -re_lo_iface = re.compile(r"^\s*#\s*define\s+(?P<name>\S+)_LO_IFACE") -re_lo_func = re.compile(r"LO_FUNC *\([^,]*, *(?P<name>[^,) ]+) *[,)]") -re_lo_implementation = re.compile( - r"^LO_IMPLEMENTATION_[HC]\s*\(\s*(?P<iface>[^, ]+)\s*,\s*(?P<impl_typ>[^,]+)\s*,\s*(?P<impl_name>[^, ]+)\s*[,)].*" -) -re_call_objcall = re.compile(r"LO_CALL\((?P<obj>[^,]+), (?P<meth>[^,)]+)[,)].*") - - class LibMiscPlugin: + re_comment = re.compile(r"/\*.*?\*/") + re_ws = re.compile(r"\s+") + re_lo_iface = re.compile(r"^\s*#\s*define\s+(?P<name>\S+)_LO_IFACE") + re_lo_func = re.compile(r"LO_FUNC *\([^,]*, *(?P<name>[^,) ]+) *[,)]") + re_lo_implementation = re.compile( + r"^LO_IMPLEMENTATION_[HC]\s*\(\s*(?P<iface>[^, ]+)\s*,\s*(?P<impl_typ>[^,]+)\s*,\s*(?P<impl_name>[^, ]+)\s*[,)].*" + ) + re_call_objcall = re.compile(r"LO_CALL\((?P<obj>[^,]+), (?P<meth>[^,)]+)[,)].*") + objcalls: dict[str, set[QName]] # method_name => {method_impls} def __init__(self, arg_c_fnames: typing.Collection[str]) -> None: @@ -73,16 +72,16 @@ class LibMiscPlugin: for fname in arg_c_fnames: with open(fname, "r", encoding="utf-8") as fh: while line := fh.readline(): - if m := re_lo_iface.match(line): + if m := self.re_lo_iface.match(line): iface_name = m.group("name") if iface_name not in ifaces: ifaces[iface_name] = set() while line.endswith("\\\n"): line += fh.readline() line = line.replace("\\\n", " ") - line = re_comment.sub(" ", line) - line = re_ws.sub(" ", line) - for m2 in re_lo_func.finditer(line): + line = self.re_comment.sub(" ", line) + line = self.re_ws.sub(" ", line) + for m2 in self.re_lo_func.finditer(line): ifaces[iface_name].add(m2.group("name")) implementations: dict[str, set[str]] = {} # iface_name => {impl_names} @@ -92,7 +91,7 @@ class LibMiscPlugin: with open(fname, "r", encoding="utf-8") as fh: for line in fh: line = line.strip() - if m := re_lo_implementation.match(line): + if m := self.re_lo_implementation.match(line): implementations[m.group("iface")].add(m.group("impl_name")) objcalls: dict[str, set[QName]] = {} # method_name => {method_impls} @@ -121,7 +120,7 @@ class LibMiscPlugin: ) -> tuple[typing.Collection[QName], bool] | None: if "/3rd-party/" in loc: return None - if m := re_call_objcall.fullmatch(line): + if m := self.re_call_objcall.fullmatch(line): if m.group("meth") in self.objcalls: return self.objcalls[m.group("meth")], False return [ @@ -137,11 +136,11 @@ class LibMiscPlugin: } def _skipmodel___assert_msg_fail( - self, chain: typing.Sequence[QName], call: QName + self, chain: typing.Sequence[QName], node: Node, call: QName ) -> bool: if call.base() in [BaseName("__lm_printf"), BaseName("__lm_light_printf")]: return any( - c.base() == BaseName("__assert_msg_fail") for c in reversed(chain[:-1]) + c.base() == BaseName("__assert_msg_fail") for c in reversed(chain) ) return False @@ -268,19 +267,18 @@ class LibCRIPCPlugin: return {} -re_tmessage_handler = re.compile( - r"^\s*\[LIB9P_TYP_T[^]]+\]\s*=\s*\(tmessage_handler\)\s*(?P<handler>\S+),\s*$" -) -re_lib9p_msg_entry = re.compile(r"^\s*_MSG_(?:[A-Z]+)\((?P<typ>\S+)\),$") -re_lib9p_caller = re.compile( - r"^lib9p_(?P<grp>[TR])msg_(?P<meth>validate|unmarshal|marshal)$" -) -re_lib9p_callee = re.compile( - r"^(?P<meth>validate|unmarshal|marshal)_(?P<msg>(?P<grp>[TR]).*)$" -) - - class Lib9PPlugin: + re_tmessage_handler = re.compile( + r"^\s*\[LIB9P_TYP_T[^]]+\]\s*=\s*\(tmessage_handler\)\s*(?P<handler>\S+),\s*$" + ) + re_lib9p_msg_entry = re.compile(r"^\s*_MSG_(?:[A-Z]+)\((?P<typ>\S+)\),$") + re_lib9p_caller = re.compile( + r"^lib9p_(?P<grp>[TR])msg_(?P<meth>validate|unmarshal|marshal)$" + ) + re_lib9p_callee = re.compile( + r"^(?P<meth>validate|unmarshal|marshal)_(?P<msg>(?P<grp>[TR]).*)$" + ) + tmessage_handlers: set[QName] | None lib9p_msgs: set[str] _CONFIG_9P_MAX_CONNS: int | None @@ -344,7 +342,7 @@ class Lib9PPlugin: with open(lib9p_srv_c_fname, "r", encoding="utf-8") as fh: for line in fh: line = line.rstrip() - if m := re_tmessage_handler.fullmatch(line): + if m := self.re_tmessage_handler.fullmatch(line): tmessage_handlers.add(QName(m.group("handler"))) self.tmessage_handlers = tmessage_handlers @@ -353,7 +351,7 @@ class Lib9PPlugin: with open(lib9p_generated_c_fname, "r", encoding="utf-8") as fh: for line in fh: line = line.rstrip() - if m := re_lib9p_msg_entry.fullmatch(line): + if m := self.re_lib9p_msg_entry.fullmatch(line): typ = m.group("typ") lib9p_msgs.add(typ) self.lib9p_msgs = lib9p_msgs @@ -401,46 +399,39 @@ class Lib9PPlugin: def skipmodels(self) -> dict[BaseName, analyze.SkipModel]: ret: dict[BaseName, analyze.SkipModel] = { BaseName("_lib9p_validate"): analyze.SkipModel( - 2, + 1, self._skipmodel__lib9p_validate_unmarshal_marshal, ), BaseName("_lib9p_unmarshal"): analyze.SkipModel( - 2, + 1, self._skipmodel__lib9p_validate_unmarshal_marshal, ), BaseName("_lib9p_marshal"): analyze.SkipModel( - 2, + 1, self._skipmodel__lib9p_validate_unmarshal_marshal, ), - BaseName("_vfctprintf"): analyze.SkipModel( - self.formatters, self._skipmodel__vfctprintf - ), } return ret def _skipmodel__lib9p_validate_unmarshal_marshal( - self, chain: typing.Sequence[QName], call: QName + self, chain: typing.Sequence[QName], node: Node, call: QName ) -> bool: - m_caller = re_lib9p_caller.fullmatch(str(chain[-2].base())) + m_caller = self.re_lib9p_caller.fullmatch(str(chain[-1].base())) assert m_caller - m_callee = re_lib9p_callee.fullmatch(str(call.base())) + m_callee = self.re_lib9p_callee.fullmatch(str(call.base())) if not m_callee: return False return m_caller.group("grp") != m_callee.group("grp") - def _skipmodel__vfctprintf( - self, chain: typing.Sequence[QName], call: QName - ) -> bool: - if call.base() == BaseName("libfmt_conv_formatter"): - return any(c.base() in self.formatters for c in chain) - return False - class PicoFmtPlugin: known_fct: dict[BaseName, BaseName] + wont_call_v: typing.Collection[BaseName] - def __init__(self, arg_pico_platform: str) -> None: + def __init__( + self, arg_pico_platform: str, wont_call_v: typing.Collection[BaseName] + ) -> None: self.known_fct = { # pico_fmt BaseName("fmt_vsnprintf"): BaseName("_out_buffer"), @@ -464,6 +455,7 @@ class PicoFmtPlugin: BaseName("__lm_light_printf"): BaseName("libfmt_libc_fct"), } ) + self.wont_call_v = set([*self.known_fct.values(), *wont_call_v]) def is_intrhandler(self, name: QName) -> bool: return False @@ -508,11 +500,14 @@ class PicoFmtPlugin: BaseName("fmt_state_putchar"): analyze.SkipModel( self.known_fct.keys(), self._skipmodel_fmt_state_putchar ), + BaseName("_vfctprintf"): analyze.SkipModel( + self.wont_call_v, self._skipmodel__vfctprintf + ), } return ret def _skipmodel_fmt_state_putchar( - self, chain: typing.Sequence[QName], call: QName + self, chain: typing.Sequence[QName], node: Node, call: QName ) -> bool: if call.base() in self.known_fct.values(): fct: BaseName | None = None @@ -523,6 +518,13 @@ class PicoFmtPlugin: return True return False + def _skipmodel__vfctprintf( + self, chain: typing.Sequence[QName], node: Node, call: QName + ) -> bool: + if call.base() == BaseName("libfmt_conv_formatter"): + return any(c.base() in self.wont_call_v for c in chain) + return False + class PicoSDKPlugin: get_init_array: typing.Callable[[], typing.Collection[QName]] @@ -763,16 +765,17 @@ class PicoSDKPlugin: return ret -re_tud_class = re.compile( - r"^\s*#\s*define\s+(?P<k>CFG_TUD_(?:\S{3}|AUDIO|VIDEO|MIDI|VENDOR|USBTMC|DFU_RUNTIME|ECM_RNDIS))\s+(?P<v>\S+).*" -) -re_tud_entry = re.compile(r"^\s+\.(?P<meth>\S+)\s*=\s*(?P<impl>[a-zA-Z0-9_]+)(?:,.*)?") -re_tud_if1 = re.compile(r"^\s*#\s*if (\S+)\s*") -re_tud_if2 = re.compile(r"^\s*#\s*if (\S+)\s*\|\|\s*(\S+)\s*") -re_tud_endif = re.compile(r"^\s*#\s*endif\s*") - - class TinyUSBDevicePlugin: + re_tud_class = re.compile( + r"^\s*#\s*define\s+(?P<k>CFG_TUD_(?:\S{3}|AUDIO|VIDEO|MIDI|VENDOR|USBTMC|DFU_RUNTIME|ECM_RNDIS))\s+(?P<v>\S+).*" + ) + re_tud_entry = re.compile( + r"^\s+\.(?P<meth>\S+)\s*=\s*(?P<impl>[a-zA-Z0-9_]+)(?:,.*)?" + ) + re_tud_if1 = re.compile(r"^\s*#\s*if (\S+)\s*") + re_tud_if2 = re.compile(r"^\s*#\s*if (\S+)\s*\|\|\s*(\S+)\s*") + re_tud_endif = re.compile(r"^\s*#\s*endif\s*") + tud_drivers: dict[str, set[QName]] # method_name => {method_impls} def __init__(self, arg_c_fnames: typing.Collection[str]) -> None: @@ -794,7 +797,7 @@ class TinyUSBDevicePlugin: in_table = False for line in fh: line = line.rstrip() - if m := re_tud_class.fullmatch(line): + if m := self.re_tud_class.fullmatch(line): k = m.group("k") v = m.group("v") tusb_config[k] = bool(int(v)) @@ -806,13 +809,13 @@ class TinyUSBDevicePlugin: for line in fh: line = line.rstrip() if in_table: - if m := re_tud_if1.fullmatch(line): + if m := self.re_tud_if1.fullmatch(line): enabled = tusb_config[m.group(1)] - elif m := re_tud_if2.fullmatch(line): + elif m := self.re_tud_if2.fullmatch(line): enabled = tusb_config[m.group(1)] or tusb_config[m.group(2)] - elif re_tud_endif.fullmatch(line): + elif self.re_tud_endif.fullmatch(line): enabled = True - if m := re_tud_entry.fullmatch(line): + if m := self.re_tud_entry.fullmatch(line): meth = m.group("meth") impl = m.group("impl") if meth == "name" or not enabled: diff --git a/build-aux/measurestack/test_analyze.py b/build-aux/measurestack/test_analyze.py index ff1732d..df205e8 100644 --- a/build-aux/measurestack/test_analyze.py +++ b/build-aux/measurestack/test_analyze.py @@ -5,17 +5,20 @@ # pylint: disable=unused-variable +import re +import typing + import pytest -from .analyze import BaseName, QName +from . import analyze, testutil, util def test_name_base() -> None: - assert QName("foo.c:bar.1").base() == BaseName("bar") + assert analyze.QName("foo.c:bar.1").base() == analyze.BaseName("bar") def test_name_pretty() -> None: - name = QName("foo.c:bar.1") + name = analyze.QName("foo.c:bar.1") assert f"{name}" == "QName('foo.c:bar.1')" assert f"{name.base()}" == "BaseName('bar')" assert f"{[name]}" == "[QName('foo.c:bar.1')]" @@ -23,7 +26,7 @@ def test_name_pretty() -> None: def test_name_eq() -> None: - name = QName("foo.c:bar.1") + name = analyze.QName("foo.c:bar.1") with pytest.raises(AssertionError) as e: if name == "foo": pass @@ -32,3 +35,47 @@ def test_name_eq() -> None: if name.base() == "foo": pass assert "comparing BaseName with str" in str(e) + + +def test_max_call_depth() -> None: + graph: typing.Sequence[tuple[str, typing.Collection[str]]] = [ + ("a", {"b"}), # 1 + ("b", {"c"}), # 2 + ("c", {"d"}), # 3 + ("d", {"e"}), # 4 + ("e", {}), # 5 + ] + + testcases: dict[int, bool] = { + 1: True, + 2: True, + 3: True, + 4: True, + 5: False, + 6: False, + 7: False, + } + + def test_filter(name: analyze.QName) -> tuple[int, bool]: + if str(name.base()) in ["a"]: + return 1, True + return 0, False + + def doit(depth: int, graph_plugin: util.Plugin) -> None: + analyze.analyze( + ci_fnames=[], + app_func_filters={"Main": test_filter}, + app=util.PluginApplication(testutil.nop_location_xform, [graph_plugin]), + cfg_max_call_depth=depth, + ) + + pat = re.compile("^max call depth exceeded: ") + + for depth, should_fail in testcases.items(): + graph_plugin = testutil.GraphProviderPlugin(depth, graph) + + if should_fail: + with pytest.raises(ValueError, match=pat): + doit(depth, graph_plugin) + else: + doit(depth, graph_plugin) diff --git a/build-aux/measurestack/test_app_plugins.py b/build-aux/measurestack/test_app_plugins.py new file mode 100644 index 0000000..da8be65 --- /dev/null +++ b/build-aux/measurestack/test_app_plugins.py @@ -0,0 +1,379 @@ +# build-aux/measurestack/test_app_plugins.py - Tests for app_plugins.py +# +# Copyright (C) 2025 Luke T. Shumaker <lukeshu@lukeshu.com> +# SPDX-License-Identifier: AGPL-3.0-or-later + +# pylint: disable=unused-variable + +import typing + +from . import analyze, app_plugins, testutil, util +from .analyze import BaseName, Node, QName, SkipModel + + +def test_assert_msg_fail() -> None: + # 1 2 3 4 5 6 7 <= call_depth + # - main() + # - __assert_msg_fail() * + # - __lm_light_printf() + # - fmt_vfctprintf() + # - stdio_putchar() + # - __assert_msg_fail() ** + # - __lm_abort() + # - stdio_flush() (inconsequential) + # - __lm_abort() (inconsequential) + max_call_depth = 7 + exp = [ + "main", + "__assert_msg_fail", + "__lm_light_printf", + "fmt_vfctprintf", + "stdio_putchar", + "__assert_msg_fail", + "__lm_abort", + ] + graph: typing.Sequence[tuple[str, typing.Collection[str]]] = [ + # main.c + ("main", {"__assert_msg_fail"}), + # assert.c + ("__assert_msg_fail", {"__lm_light_printf", "__lm_abort"}), + # intercept.c / libfmt/libmisc.c + ("__lm_abort", {}), + ("__lm_light_printf", {"fmt_vfctprintf", "stdio_flush"}), + ("stdio_flush", {}), + ("stdio_putchar", {"__assert_msg_fail"}), + # printf.c + ("fmt_vfctprintf", {"stdio_putchar"}), + ] + graph_plugin = testutil.GraphProviderPlugin(max_call_depth, graph) + + class SkipPlugin(testutil.NopPlugin): + def skipmodels(self) -> dict[BaseName, SkipModel]: + models = app_plugins.LibMiscPlugin(arg_c_fnames=[]).skipmodels() + assert BaseName("__assert_msg_fail") in models + orig_model = models[BaseName("__assert_msg_fail")] + + def wrapped_model_fn( + chain: typing.Sequence[QName], node: Node, call: QName + ) -> bool: + dbgstr = ( + ("=>".join(str(c) for c in [*chain, node.funcname])) + + "=?=>" + + str(call) + ) + assert dbgstr in [ + "__assert_msg_fail=?=>__lm_light_printf", + "__assert_msg_fail=?=>__lm_abort", + "__assert_msg_fail=>__lm_light_printf=>fmt_vfctprintf=>stdio_putchar=>__assert_msg_fail=?=>__lm_light_printf", + "__assert_msg_fail=>__lm_light_printf=>fmt_vfctprintf=>stdio_putchar=>__assert_msg_fail=?=>__lm_abort", + ] + return orig_model.fn(chain, node, call) + + models[BaseName("__assert_msg_fail")] = SkipModel( + orig_model.nchain, wrapped_model_fn + ) + return models + + def test_filter(name: QName) -> tuple[int, bool]: + if name.base() == BaseName("main"): + return 1, True + return 0, False + + result = analyze.analyze( + ci_fnames=[], + app_func_filters={ + "Main": test_filter, + }, + app=util.PluginApplication( + testutil.nop_location_xform, [graph_plugin, SkipPlugin()] + ), + cfg_max_call_depth=max_call_depth, + ) + + graph_plugin.assert_nstatic(result.groups["Main"].rows[QName("main")].nstatic, exp) + + +def test_fct() -> None: + # 1. | a + | b + | c + |* + # 2. | fmt_vsnprintf + | vprintf + | __lm_light_printf + |* + # 3. | fmt_vfctprintf + | fmt_vfctprintf + | fmt_vfctprintf + | + # 4. | fmt_state_putchar + | fmt_state_putchar + | fmt_state_putchar + | + # 5. | _out_buffer + | stdio_buffered_printer + | libfmt_light_fct + |* + # 6. | | __assert_msg_fail + | __assert_msg_fail + | + # 7. | | a. __lm_light_printf + | a. __lm_light_printf + | + # 8. | | a. fmt_vfctprintf + | a. fmt_vfctprintf + | + # 9. | | a. fmt_state_putchar + | a. fmt_state_putchar + | + # 10. | | a. libfmt_light_fct + | a. libfmt_light_fct + | + # 11. | | a. __assert_msg_fail + | a. __assert_msg_fail + | + # 12. | | a. __lm_abort + | a. __lm_abort + | + # 7. | | b. __lm_abort | b. __lm_abort | + max_call_depth = 12 + exp_a = ["a", "fmt_vsnprintf", "fmt_vfctprintf", "fmt_state_putchar", "_out_buffer"] + exp_b = [ + "b", + "__wrap_vprintf", + "fmt_vfctprintf", + "fmt_state_putchar", + "stdio_buffered_printer", + "__assert_msg_fail", + "__lm_light_printf", + "fmt_vfctprintf", + "fmt_state_putchar", + "libfmt_light_fct", + "__assert_msg_fail", + "__lm_abort", + ] + exp_c = [ + "c", + "__lm_light_printf", + "fmt_vfctprintf", + "fmt_state_putchar", + "libfmt_light_fct", + "__assert_msg_fail", + "__lm_light_printf", + "fmt_vfctprintf", + "fmt_state_putchar", + "libfmt_light_fct", + "__assert_msg_fail", + "__lm_abort", + ] + graph: typing.Sequence[tuple[str, typing.Collection[str]]] = [ + # main.c + ("a", {"fmt_vsnprintf"}), # _out_buffer + ("b", {"vprintf"}), # stdio_buffered_printer + ("c", {"__lm_light_printf"}), # libfmt_light_printf + # wrappers + ("fmt_vsnprintf", {"fmt_vfctprintf"}), + ("__wrap_vprintf", {"fmt_vfctprintf"}), + ("__lm_light_printf", {"fmt_vfctprintf"}), + # printf.c + ("fmt_vfctprintf", {"fmt_state_putchar"}), + ( + "fmt_state_putchar", + {"_out_buffer", "stdio_buffered_printer", "libfmt_light_fct"}, + ), + # fcts + ("_out_buffer", {}), + ("stdio_buffered_printer", {"__assert_msg_fail"}), + ("libfmt_light_fct", {"__assert_msg_fail"}), + # assert.c + ("__assert_msg_fail", {"__lm_light_printf", "__lm_abort"}), + # intercept.c / libfmt/libmisc.c + ("__lm_abort", {}), + ] + graph_plugin = testutil.GraphProviderPlugin(max_call_depth, graph) + + plugins: list[util.Plugin] = [ + graph_plugin, + app_plugins.LibMiscPlugin(arg_c_fnames=[]), + # fmt_vsnprintf => fct=_out_buffer + # if rp2040: + # __wrap_vprintf => fct=stdio_buffered_printer + # stdio_vprintf => fct=stdio_buffered_printer + # __lm_light_printf => fct=libfmt_light_fct + # if host: + # __lm_printf => fct=libfmt_libc_fct + # __lm_light_printf => fct=libfmt_libc_fct + app_plugins.PicoFmtPlugin("rp2040", []), + ] + + def test_filter(name: QName) -> tuple[int, bool]: + if str(name.base()) in ["a", "b", "c"]: + return 1, True + return 0, False + + result = analyze.analyze( + ci_fnames=[], + app_func_filters={ + "Main": test_filter, + }, + app=util.PluginApplication(testutil.nop_location_xform, plugins), + cfg_max_call_depth=max_call_depth, + ) + + graph_plugin.assert_nstatic(result.groups["Main"].rows[QName("a")].nstatic, exp_a) + graph_plugin.assert_nstatic(result.groups["Main"].rows[QName("b")].nstatic, exp_b) + graph_plugin.assert_nstatic(result.groups["Main"].rows[QName("c")].nstatic, exp_c) + + +def test_assert_formatter() -> None: + # _________________________________________________________ + # | | + # | | + # | main | + # | | | + # | _ __wrap_vprintf | + # | / \ | _______________ | + # | | fmt_vfctprintf / \ | + # | | \ fmt_state_printf | | + # | | \____ ____/ | | + # | | \ / | | + # | | _vfctprintf | | + # | | ____/ \____ ^ | + # | | / ?<---snip | | + # | | conv_builtin \ | | + # | | | libfmt_conv_formatter | | + # | | | | | | + # | ^ \ lib9p_msg_Rread_format | | + # | | \ _____________/ | \______/ | + # | | \ / \ | + # | | fmt_state_putchar \ | + # | | ?<-?<--------snip | | + # | | / \_________ | | + # | | / \ | | + # | | stdio_buffered_printer \ | | + # | | \ libfmt_light_fct | | + # | | \ | / | + # | | \_______ | ________/ | + # | | \ | / | + # | | __assert_msg_fail | + # | | ___/ \____ | + # | | snip--->? \ | + # | | / \ | + # | | __lm_light_printf \ | + # | \____________/ __lm_abort | + # | | + # |_________________________________________________________| + # + graph: typing.Sequence[tuple[str, typing.Collection[str]]] = [ + ("main", {"vprintf"}), + ("__wrap_vprintf", {"fmt_vfctprintf"}), + ("fmt_vfctprintf", {"_vfctprintf"}), + ("fmt_state_printf", {"_vfctprintf"}), + ("_vfctprintf", {"conv_builtin", "libfmt_conv_formatter"}), + ("conv_builtin", {"fmt_state_putchar"}), + ("libfmt_conv_formatter", {"lib9p_msg_Rread_format"}), + ( + "lib9p_msg_Rread_format", + {"fmt_state_putchar", "__assert_msg_fail", "fmt_state_printf"}, + ), + ("fmt_state_putchar", {"stdio_buffered_printer", "libfmt_light_fct"}), + ("stdio_buffered_printer", {"__assert_msg_fail"}), + ("libfmt_light_fct", {"__assert_msg_fail"}), + ("__assert_msg_fail", {"__lm_light_printf", "__lm_abort"}), + ("__lm_light_printf", {"fmt_vfctprintf"}), + ("__lm_abort", {}), + ] + + # fct-determining wrappers have their callees marked with "|": + # + # 1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 <= call_depth + # - main() ; + + # - __wrap__vprintf() ; + + # |- fmt_vfctprintf() ; + + # | - _vfctprintf() ; + + # | - conv_builtin() ; + # | - fmt_state_putchar() ; + # | - stdio_buffered_printer() ; + # | - __assert_msg_fail() ; + # | - __lm_light_printf() ; + # | |- fmt_vfctprintf() ; + # | | - _vfctprintf() ; + # | | - conv_builtin() ; + # | | - fmt_state_putchar() ; + # | | - stdio_buffered_printer() ; skip (wrong fct) + # | | - libfmt_light_fct() ; + # | | - __assert_msg_fail() ; + # | | - __lm_light_printf() ; skip (nested __assert_msg_fail) + # | | - __lm_abort() ; + # | | - libfmt_conv_formatter() ; skip (fct won't use %v) + # | - __lm_abort() ; + # | - libfmt_light_fct() ; skip (wrong fct) + # | - libfmt_conv_formatter() ; + + # | - lib9p_msg_Rread_format() ; + + # | - fmt_state_putchar() ; + # | - stdio_buffered_printer() ; + # | - __assert_msg_fail() ; + # | - __lm_light_printf() ; + # | |- fmt_vfctprintf() ; + # | | - _vfctprintf() ; + # | | - conv_builtin() ; + # | | - fmt_state_putchar() ; + # | | - stdio_buffered_printer() ; skip (wrong fct) + # | | - libfmt_light_fct() ; + # | | - __assert_msg_fail() ; + # | | - __lm_light_printf() ; skip (neseted __assert_msg_fail) + # | | - __lm_abort() ; + # | | - libfmt_conv_formatter() ; skip (fct won't use %v) + # | - __lm_abort() ; + # | - libfmt_light_fct() ; skip (wrong fct) + # | - __assert_msg_fail() ; + # | - __lm_light_printf() ; + # | |- fmt_vfctprintf() ; + # | | - _vfctprintf() ; + # | | - conv_builtin() ; + # | | - fmt_state_putchar() ; + # | | - stdio_buffered_printer() ; skip (wrong fct) + # | | - libfmt_light_fct() ; + # | | - __assert_msg_fail() ; + # | | - __lm_light_printf() ; skip (nested__assert_msg_fail) + # | | - __lm_abort() ; + # | | - libfmt_conv_formatter() ; skip (formatter won't use %v) + # | - __lm_abort() ; + # | - fmt_state_printf() ; + + # | - _vfctprintf() ; + + # | - conv_builtin() ; + + # | - fmt_state_putchar() ; + + # | - stdio_buffered_printer() ; + + # | - __assert_msg_fail() ; + + # | - __lm_light_printf() ; + + # | |- fmt_vfctprintf() ; + + # | | - _vfctprintf() ; + + # | | - conv_builtin() ; + + # | | - fmt_state_putchar() ; + + # | | - stdio_buffered_printer() ; skip (wrong fct) + # | | - libfmt_light_fct() ; + + # | | - __assert_msg_fail() ; + + # | | - __lm_light_printf() ; skip (neseted __assert_msg_fail) + # | | - __lm_abort() ; + + # | | - libfmt_conv_formatter() ; skip (fct won't use %v) + # | - __lm_abort() ; + # | - libfmt_light_fct() ; skip (wrong fct) + # | - libfmt_conv_formatter() ; skip (formatter won't use %v) + max_call_depth = 20 + exp = [ + "main", + "__wrap_vprintf", + "fmt_vfctprintf", + "_vfctprintf", + "libfmt_conv_formatter", + "lib9p_msg_Rread_format", + "fmt_state_printf", + "_vfctprintf", + "conv_builtin", + "fmt_state_putchar", + "stdio_buffered_printer", + "__assert_msg_fail", + "__lm_light_printf", + "fmt_vfctprintf", + "_vfctprintf", + "conv_builtin", + "fmt_state_putchar", + "libfmt_light_fct", + "__assert_msg_fail", + "__lm_abort", + ] + + graph_plugin = testutil.GraphProviderPlugin(max_call_depth, graph) + + plugins: list[util.Plugin] = [ + graph_plugin, + app_plugins.LibMiscPlugin(arg_c_fnames=[]), + app_plugins.PicoFmtPlugin("rp2040", [BaseName("lib9p_msg_Rread_format")]), + ] + + def test_filter(name: QName) -> tuple[int, bool]: + if name.base() == BaseName("main"): + return 1, True + return 0, False + + result = analyze.analyze( + ci_fnames=[], + app_func_filters={ + "Main": test_filter, + }, + app=util.PluginApplication(testutil.nop_location_xform, plugins), + cfg_max_call_depth=max_call_depth, + ) + + graph_plugin.assert_nstatic(result.groups["Main"].rows[QName("main")].nstatic, exp) diff --git a/build-aux/measurestack/testutil.py b/build-aux/measurestack/testutil.py new file mode 100644 index 0000000..751e57f --- /dev/null +++ b/build-aux/measurestack/testutil.py @@ -0,0 +1,131 @@ +# build-aux/measurestack/testutil.py - Utilities for writing tests +# +# Copyright (C) 2025 Luke T. Shumaker <lukeshu@lukeshu.com> +# SPDX-License-Identifier: AGPL-3.0-or-later + +import typing + +from . import analyze, util + +# pylint: disable=unused-variable +__all__ = [ + "aprime_gen", + "aprime_decompose", + "NopPlugin", + "GraphProviderPlugin", + "nop_location_xform", +] + + +def aprime_gen(l: int, n: int) -> typing.Sequence[int]: + """Return an `l`-length sequence of nonnegative + integers such that any `n`-length-or-shorter combination of + members with repeats allowed can be uniquely identified by its + sum. + + (If that were "product" instead of "sum", the obvious solution + would be the first `l` primes.) + + """ + seq = [1] + while len(seq) < l: + x = seq[-1] * n + 1 + seq.append(x) + return seq + + +def aprime_decompose( + aprimes: typing.Sequence[int], tot: int +) -> tuple[typing.Collection[int], typing.Collection[int]]: + ret_idx = [] + ret_val = [] + while tot: + idx = max(i for i in range(len(aprimes)) if aprimes[i] <= tot) + val = aprimes[idx] + ret_idx.append(idx) + ret_val.append(val) + tot -= val + return ret_idx, ret_val + + +class NopPlugin: + def is_intrhandler(self, name: analyze.QName) -> bool: + return False + + def init_array(self) -> typing.Collection[analyze.QName]: + return [] + + def extra_includes(self) -> typing.Collection[analyze.BaseName]: + return [] + + def indirect_callees( + self, loc: str, line: str + ) -> tuple[typing.Collection[analyze.QName], bool] | None: + return None + + def skipmodels(self) -> dict[analyze.BaseName, analyze.SkipModel]: + return {} + + def extra_nodes(self) -> typing.Collection[analyze.Node]: + return [] + + +class GraphProviderPlugin(NopPlugin): + _nodes: typing.Sequence[analyze.Node] + + def __init__( + self, + max_call_depth: int, + graph: typing.Sequence[tuple[str, typing.Collection[str]]], + ) -> None: + seq = aprime_gen(len(graph), max_call_depth) + nodes: list[analyze.Node] = [] + for i, (name, calls) in enumerate(graph): + nodes.append(util.synthetic_node(name, seq[i], calls)) + assert ( + len(graph) + == len(nodes) + == len(set(n.nstatic for n in nodes)) + == len(set(str(n.funcname.base()) for n in nodes)) + ) + self._nodes = nodes + + def extra_nodes(self) -> typing.Collection[analyze.Node]: + return self._nodes + + def decode_nstatic(self, tot: int) -> typing.Collection[str]: + idxs, _ = aprime_decompose([n.nstatic for n in self._nodes], tot) + return [str(self._nodes[i].funcname.base()) for i in idxs] + + def encode_nstatic(self, calls: typing.Collection[str]) -> int: + tot = 0 + d: dict[str, int] = {} + for node in self._nodes: + d[str(node.funcname.base())] = node.nstatic + print(d) + for call in calls: + tot += d[call] + return tot + + def sorted_calls(self, calls: typing.Collection[str]) -> typing.Sequence[str]: + d: dict[str, int] = {} + for node in self._nodes: + d[str(node.funcname.base())] = node.nstatic + + def k(call: str) -> int: + return d[call] + + return sorted(calls, key=k) + + def assert_nstatic(self, act_tot: int, exp_calls: typing.Collection[str]) -> None: + exp_tot = self.encode_nstatic(exp_calls) + if act_tot != exp_tot: + act_str = f"{act_tot}: {self.sorted_calls(self.decode_nstatic(act_tot))}" + exp_str = f"{exp_tot}: {self.sorted_calls(exp_calls)}" + assert ( + False + ), f"act:{act_tot} != exp:{exp_tot}\n\t-exp = {exp_str}\n\t+act = {act_str}" + + +def nop_location_xform(loc: str) -> str: + return loc diff --git a/build-aux/measurestack/util.py b/build-aux/measurestack/util.py index 47b2617..0af3d02 100644 --- a/build-aux/measurestack/util.py +++ b/build-aux/measurestack/util.py @@ -7,7 +7,7 @@ import re import typing from . import analyze, vcg -from .analyze import BaseName, Node, QName +from .analyze import BaseName, Node, QName, maybe_sorted # pylint: disable=unused-variable __all__ = [ @@ -32,7 +32,7 @@ def synthetic_node( n.nstatic = nstatic n.ndynamic = 0 - n.calls = dict((QName(c), False) for c in calls) + n.calls = dict((QName(c), False) for c in maybe_sorted(calls)) return n |