diff options
Diffstat (limited to 'gdb-helpers/libcr.py')
-rw-r--r-- | gdb-helpers/libcr.py | 493 |
1 files changed, 493 insertions, 0 deletions
diff --git a/gdb-helpers/libcr.py b/gdb-helpers/libcr.py new file mode 100644 index 0000000..6f95a81 --- /dev/null +++ b/gdb-helpers/libcr.py @@ -0,0 +1,493 @@ +# gdb-helpers/libcr.py - GDB helpers for libcr. +# +# Copyright (C) 2024-2025 Luke T. Shumaker <lukeshu@lukeshu.com> +# SPDX-License-Identifier: AGPL-3.0-or-later + +import contextlib +import time +import typing + +import gdb # pylint: disable=import-error +import gdb.unwinder # pylint: disable=import-error + +# GDB helpers ################################################################## + +# https://sourceware.org/bugzilla/show_bug.cgi?id=32428 +gdb_bug_32428 = True + + +class _gdb_Locus(typing.Protocol): + @property + def frame_unwinders(self) -> list["gdb._Unwinder"]: ... + + +def gdb_unregister_unwinder( + locus: gdb.Objfile | gdb.Progspace | None, unwinder: "gdb._Unwinder" +) -> None: + _locus: _gdb_Locus = typing.cast(_gdb_Locus, gdb) if locus is None else locus + _locus.frame_unwinders.remove(unwinder) + gdb.invalidate_cached_frames() + + +def gdb_is_on_os() -> bool: + try: + gdb.execute("info proc", to_string=True) + return True + except gdb.error: + return False + + +class gdb_JmpBuf: + """Our own in-Python GDB-specific implementation of `jmp_buf`""" + + level: int + registers: dict[str, str] + + +def gdb_setjmp() -> gdb_JmpBuf: + """Our own in-Python GDB-specific implementation of `setjmp()`""" + buf = gdb_JmpBuf() + buf.level = gdb.selected_frame().level() + gdb.execute("select-frame level 0") + buf.registers = {} + for line in gdb.execute("info registers", to_string=True).split("\n"): + words = line.split(maxsplit=2) + if len(words) < 2: + continue + buf.registers[words[0]] = words[1] + gdb.execute(f"select-frame level {buf.level}") + return buf + + +def gdb_longjmp(buf: gdb_JmpBuf) -> None: + """Our own in-Python GDB-specific implementation of `longjmp()`""" + + gdb.execute("select-frame level 0") + + if ( + ("sp" in buf.registers) + and ("msp" in buf.registers) + and ("psp" in buf.registers) + and ("control" in buf.registers) + ): + # On ARM, 'sp' is an alias for either 'msp' or 'psp' + # (depending on 'control'&(1<<1)). We must set all 3 before + # fussing with 'xPSR' or frames, or GDB will get upset at us + # about "Invalid state". + gdb.execute(f"set $sp = {buf.registers['sp']}", to_string=True) + gdb.execute(f"set $msp = {buf.registers['msp']}") + gdb.execute(f"set $psp = {buf.registers['psp']}") + + for reg, val in buf.registers.items(): + gdb.execute(f"set ${reg} = {val}") + gdb.invalidate_cached_frames() + + gdb.execute(f"select-frame level {buf.level}") + + +# Core libcr functionality ##################################################### + + +class CrGlobals: + main: "CrMain" + coroutines: list["CrCoroutine"] + _breakpoint: "CrBreakpoint" + _known_threads: set[gdb.InferiorThread] + + def __init__(self) -> None: + num = int( + gdb.parse_and_eval("sizeof(coroutine_table)/sizeof(coroutine_table[0])") + ) + + self.main = CrMain(self) + self.coroutines = [CrCoroutine(self, i + 1) for i in range(num)] + + self._breakpoint = CrBreakpoint() + self._breakpoint.enabled = False + + self._known_threads = set() + + gdb.events.cont.connect(self._on_cont) + + def delete(self) -> None: + self.coroutines = [] + self._breakpoint.delete() + gdb.events.cont.disconnect(self._on_cont) + + def readjmp(self, env_ptr_expr: str) -> gdb_JmpBuf: + self._breakpoint.enabled = True + gdb.execute(f"call (void)cr_gdb_readjmp({env_ptr_expr})") + self._breakpoint.enabled = False + if gdb_is_on_os(): + gdb.execute("queue-signal SIGWINCH") + return self._breakpoint.env + + def _on_cont(self, event: gdb.Event) -> None: + cur_threads = set(gdb.selected_inferior().threads()) + if cur_threads - self._known_threads: + # Ignore thread creation events. + self._known_threads = cur_threads + return + if not self.coroutine_running.is_selected(): + if gdb_bug_32428: + print("Must return to running coroutine before continuing.") + print("Hit ^C twice then run:") + print(f" cr select {self.coroutine_running.cid}") + while True: + time.sleep(1) + assert self.coroutine_running.cont_env + gdb_longjmp(self.coroutine_running.cont_env) + self.main.cont_env = None + for cr in self.coroutines: + cr.cont_env = None + + def is_valid_cid(self, cid: int) -> bool: + return (0 < cid <= len(self.coroutines)) and ( + self.coroutines[cid - 1].state != self.CR_NONE + ) + + @property + def coroutine_running(self) -> "CrMain | CrCoroutine": + cid = int(gdb.parse_and_eval("coroutine_running")) + if not self.is_valid_cid(cid): + return self.main + return self.coroutines[cid - 1] + + @property + def coroutine_selected(self) -> "CrMain | CrCoroutine": + for cr in self.coroutines: + if cr.is_selected(): + return cr + return self.main + + @property + def CR_NONE(self) -> gdb.Value: + return gdb.parse_and_eval("CR_NONE") + + @property + def CR_RUNNING(self) -> gdb.Value: + return gdb.parse_and_eval("CR_RUNNING") + + def select(self, cr: "CrMain | CrCoroutine", level: int = -1) -> None: + self.coroutine_selected.cont_env = gdb_setjmp() + + if cr.cont_env: + gdb_longjmp(cr.cont_env) + else: + env: gdb_JmpBuf + if cr == self.coroutine_running: + assert False # cr.cont_env should have been set + match cr: + case CrMain(): + env = self.readjmp("&coroutine_main_env") + case CrCoroutine(): + if cr.state == self.CR_RUNNING: + env = self.readjmp("&coroutine_add_env") + else: + env = self.readjmp(f"&coroutine_table[{cr.cid-1}].env") + gdb_longjmp(env) + cr_select_top_frame() + + @contextlib.contextmanager + def with_selected(self, cr: "CrMain | CrCoroutine") -> typing.Iterator[None]: + saved_env = gdb_setjmp() + self.select(cr) + try: + yield + finally: + gdb_longjmp(saved_env) + + +class CrBreakpointUnwinder(gdb.unwinder.Unwinder): + """Used to temporarily disable unwinding so that + gdb/breakpoint.c:check_longjmp_breakpoint_for_call_dummy() doesn't + prematurely garbage collect the `call`-dummy-frame. + + """ + + def __init__(self) -> None: + super().__init__("cr_breakpoint_unwinder") + + # The .pyi is wrong; it says `Frame` instead of `PendingFrame`. + def __call__(self, pending_frame: gdb.PendingFrame) -> gdb.UnwindInfo | None: + # Stop unwinding with stop_reason=UNWIND_NO_SAVED_PC by + # returning an UnwindInfo that doesn't have + # `.add_saved_register("pc", ...)`. + return pending_frame.create_unwind_info( + gdb.unwinder.FrameId( + sp=pending_frame.read_register("sp"), + pc=pending_frame.read_register("pc"), + ) + ) + + +class CrBreakpoint(gdb.Breakpoint): + env: gdb_JmpBuf + _unwinder_locus: gdb.Objfile + _unwinder: CrBreakpointUnwinder + + def __init__(self) -> None: + self.env = gdb_JmpBuf() + + self._unwinder = CrBreakpointUnwinder() + readjmp_sym = gdb.lookup_global_symbol("cr_gdb_readjmp") + assert readjmp_sym + self._unwinder_locus = readjmp_sym.symtab.objfile + gdb.unwinder.register_unwinder(self._unwinder_locus, self._unwinder, True) + self._unwinder.enabled = False + + super().__init__( + function="cr_gdb_breakpoint", type=gdb.BP_BREAKPOINT, internal=True + ) + + @property + def enabled(self) -> bool: + return super().enabled + + @enabled.setter + def enabled(self, value: bool) -> None: + self._unwinder.enabled = value + # Use a dunder-call to avoid an infinite loop. + # pylint: disable=unnecessary-dunder-call + gdb.Breakpoint.enabled.__set__(self, value) # type: ignore + + def stop(self) -> bool: + assert self._unwinder.enabled + self._unwinder.enabled = False + self.env = gdb_setjmp() + self._unwinder.enabled = True + return False # don't stop + + def delete(self) -> None: + gdb_unregister_unwinder(self._unwinder_locus, self._unwinder) + super().delete() + + +def cr_select_top_frame() -> None: + gdb.execute("select-frame level 0") + base_frame = gdb.selected_frame() + while True: + fn = gdb.selected_frame().name() + if fn and (fn.startswith("cr_") or fn.startswith("_cr_")): + older = gdb.selected_frame().older() + if not older: + base_frame.select() + break + older.select() + else: + break + + +class CrMain: + cr_globals: CrGlobals + cont_env: gdb_JmpBuf | None + + def __init__(self, cr_globals: CrGlobals) -> None: + self.cr_globals = cr_globals + self.cont_env = None + + @property + def cid(self) -> int: + return 0 + + def is_selected(self) -> bool: + return not any(cr.is_selected() for cr in self.cr_globals.coroutines) + + +class CrCoroutine: + cr_globals: CrGlobals + cid: int + cont_env: gdb_JmpBuf | None + + def __init__(self, cr_globals: CrGlobals, cid: int) -> None: + self.cr_globals = cr_globals + self.cid = cid + self.cont_env = None + + @property + def state(self) -> gdb.Value: + return gdb.parse_and_eval(f"coroutine_table[{self.cid-1}].state") + + @property + def name(self) -> str: + bs: list[int] = [0] * int(gdb.parse_and_eval("sizeof(coroutine_table[0].name)")) + for i, _ in enumerate(bs): + bs[i] = int(gdb.parse_and_eval(f"coroutine_table[{self.cid-1}].name[{i}]")) + return bytes(bs).decode("UTF-8").split("\x00", maxsplit=1)[0] + + def is_selected(self) -> bool: + sp = int(gdb.parse_and_eval("$sp")) + lo = int(gdb.parse_and_eval(f"coroutine_table[{self.cid-1}].stack")) + hi = lo + int(gdb.parse_and_eval(f"coroutine_table[{self.cid-1}].stack_size")) + return lo <= sp < hi + + +# User-facing commands ######################################################### + + +class CrCommand(gdb.Command): + """Use this command for libcr coroutines.""" + + cr_globals: CrGlobals + + def __init__(self, cr_globals: CrGlobals) -> None: + self.cr_globals = cr_globals + gdb.Command.__init__(self, "cr", gdb.COMMAND_RUNNING, gdb.COMPLETE_NONE, True) + + def invoke(self, arg: str, from_tty: bool) -> None: + gdb.execute("help cr") + + +class CrListCommand(gdb.Command): + """List libcr coroutines. + Usage: cr list + + In the output: + - the 'R' marker indicates the currently-running coroutine + - the 'G' marker indicates the coroutine that GDB is viewing; this may be changed with `cr select` + """ + + cr_globals: CrGlobals + + def __init__(self, cr_globals: CrGlobals) -> None: + self.cr_globals = cr_globals + gdb.Command.__init__(self, "cr list", gdb.COMMAND_RUNNING, gdb.COMPLETE_NONE) + + def invoke(self, arg: str, from_tty: bool) -> None: + argv = gdb.string_to_argv(arg) + if len(argv) != 0: + raise gdb.GdbError("Usage: cr list") + + rows: list[tuple[str, str, str, str, str]] = [ + ("", "Id", "Name", "State", "Frame") + ] + for cid in range(len(self.cr_globals.coroutines) + 1): + cr: CrMain | CrCoroutine = ( + self.cr_globals.coroutines[cid - 1] if cid else self.cr_globals.main + ) + if isinstance(cr, CrCoroutine) and cr.state == self.cr_globals.CR_NONE: + continue + rows += [ + ( + "".join( + [ + "R" if cr == self.cr_globals.coroutine_running else " ", + "G" if cr.is_selected() else " ", + ] + ), + str(cr.cid), + repr(cr.name) if isinstance(cr, CrCoroutine) else "-", + str(cr.state) if isinstance(cr, CrCoroutine) else "-", + self._pretty_frame(cr, from_tty), + ) + ] + + widths: list[int] = [ + max(len(row[col]) for row in rows) for col in range(len(rows[0])) + ] + + def line(row: tuple[str, str, str, str, str]) -> str: + + def cell(col: int) -> str: + return row[col].ljust(widths[col]) + + return f"{cell(0)} {cell(1)} {cell(2)} {cell(3)} {row[4]}" + + maxline = 0 + if screenwidth := gdb.parameter("width"): + assert isinstance(screenwidth, int) + maxline = max(screenwidth, len(line(rows[0]))) + + for row in rows: + l = line(row) + if maxline and len(l) > maxline: + l = l[:maxline] + print(l) + + def _pretty_frame(self, cr: CrMain | CrCoroutine, from_tty: bool) -> str: + try: + with self.cr_globals.with_selected(cr): + saved_level = gdb.selected_frame().level() + cr_select_top_frame() + full = gdb.execute("frame", from_tty=from_tty, to_string=True) + gdb.execute(f"select-frame level {saved_level}") + except Exception as e: # pylint: disable=broad-exception-caught + full = "#0 err: " + str(e) + line = full.split("\n", maxsplit=1)[0] + return line.split(maxsplit=1)[1] + + +class CrSelectCommand(gdb.Command): + """Select the coroutine that GDB is viewing + Usage: cr select COROUTINE + COROUTINE is either a coroutine ID or coroutine name.""" + + cr_globals: CrGlobals + + def __init__(self, cr_globals: CrGlobals) -> None: + self.cr_globals = cr_globals + gdb.Command.__init__(self, "cr select", gdb.COMMAND_RUNNING, gdb.COMPLETE_NONE) + + def invoke(self, arg: str, from_tty: bool) -> None: + argv = gdb.string_to_argv(arg) + if len(argv) != 1: + raise gdb.GdbError("Usage: cr select COROUTINE") + cr = self._find(argv[0]) + self.cr_globals.select(cr) + gdb.execute("frame") + + def _find(self, name: str) -> CrMain | CrCoroutine: + if name.isnumeric(): + cid = int(name) + if cid == 0: + return self.cr_globals.main + if self.cr_globals.is_valid_cid(cid): + return self.cr_globals.coroutines[cid - 1] + crs: list[CrCoroutine] = [] + for cr in self.cr_globals.coroutines: + if cr.state != self.cr_globals.CR_NONE and cr.name == name: + crs += [cr] + match len(crs): + case 0: + raise gdb.GdbError(f"No such coroutine: {name!r}") + case 1: + return crs[0] + case _: + raise gdb.GdbError(f"Ambiguous name, must use Id: {name!r}") + + +# Wire it all in ############################################################### + +_cr_globals: CrGlobals | None = None + + +def cr_initialize() -> None: + global _cr_globals + if _cr_globals: + old = _cr_globals + new = CrGlobals() + new.main.cont_env = old.main.cont_env + for i in range(min(len(old.coroutines), len(new.coroutines))): + new.coroutines[i].cont_env = old.coroutines[i].cont_env + old.delete() + _cr_globals = new + else: + _cr_globals = CrGlobals() + CrCommand(_cr_globals) + CrListCommand(_cr_globals) + CrSelectCommand(_cr_globals) + + +def cr_on_new_objfile(event: gdb.Event) -> None: + if any( + objfile.lookup_global_symbol("cr_gdb_readjmp") for objfile in gdb.objfiles() + ): + print("Initializing libcr integration...") + cr_initialize() + gdb.events.new_objfile.disconnect(cr_on_new_objfile) + + +if _cr_globals: + cr_initialize() +else: + gdb.events.new_objfile.connect(cr_on_new_objfile) |