diff options
Diffstat (limited to 'gdb-helpers/libcr.py')
-rw-r--r-- | gdb-helpers/libcr.py | 355 |
1 files changed, 261 insertions, 94 deletions
diff --git a/gdb-helpers/libcr.py b/gdb-helpers/libcr.py index 385b61a..c07b679 100644 --- a/gdb-helpers/libcr.py +++ b/gdb-helpers/libcr.py @@ -4,41 +4,55 @@ # SPDX-License-Identifier: AGPL-3.0-or-later import contextlib +import time import typing import gdb +import gdb.unwinder # GDB helpers ################################################################## -def gdb_argv_to_string(argv: list[str]) -> str: - """Reverse of gdb.string_to_argv()""" - # TODO: This is wrong - return " ".join(argv) +class _gdb_Locus(typing.Protocol): + @property + def frame_unwinders(self) -> list["gdb._Unwinder"]: ... + + +def gdb_unregister_unwinder( + locus: gdb.Objfile | gdb.Progspace | None, unwinder: "gdb._Unwinder" +) -> None: + _locus: _gdb_Locus = typing.cast(_gdb_Locus, gdb) if locus is None else locus + _locus.frame_unwinders.remove(unwinder) + gdb.invalidate_cached_frames() class gdb_JmpBuf: """Our own in-Python GDB-specific implementation of `jmp_buf`""" - frame: gdb.Frame + level: int registers: dict[str, str] def gdb_setjmp() -> gdb_JmpBuf: """Our own in-Python GDB-specific implementation of `setjmp()`""" buf = gdb_JmpBuf() - buf.frame = gdb.selected_frame() + buf.level = gdb.selected_frame().level() + gdb.execute("select-frame level 0") buf.registers = {} for line in gdb.execute("info registers", to_string=True).split("\n"): words = line.split(maxsplit=2) if len(words) < 2: continue buf.registers[words[0]] = words[1] + gdb.execute(f"select-frame level {buf.level}") return buf def gdb_longjmp(buf: gdb_JmpBuf) -> None: """Our own in-Python GDB-specific implementation of `longjmp()`""" + + gdb.execute("select-frame level 0") + if ( ("sp" in buf.registers) and ("msp" in buf.registers) @@ -52,9 +66,12 @@ def gdb_longjmp(buf: gdb_JmpBuf) -> None: gdb.execute(f"set $sp = {buf.registers['sp']}", to_string=True) gdb.execute(f"set $msp = {buf.registers['msp']}") gdb.execute(f"set $psp = {buf.registers['psp']}") + for reg, val in buf.registers.items(): gdb.execute(f"set ${reg} = {val}") - buf.frame.select() + gdb.invalidate_cached_frames() + + gdb.execute(f"select-frame level {buf.level}") # Core libcr functionality ##################################################### @@ -62,42 +79,167 @@ def gdb_longjmp(buf: gdb_JmpBuf) -> None: class CrGlobals: coroutines: list["CrCoroutine"] - breakpoints: list[gdb.Breakpoint] + _breakpoint: "CrBreakpoint" + _known_threads: set[gdb.InferiorThread] def __init__(self) -> None: num = int( gdb.parse_and_eval("sizeof(coroutine_table)/sizeof(coroutine_table[0])") ) + self.coroutines = [CrCoroutine(self, i + 1) for i in range(num)] - self.breakpoints = [] + + self._breakpoint = CrBreakpoint() + self._breakpoint.enabled = False + + self._known_threads = set() + + gdb.events.cont.connect(self._on_cont) def delete(self) -> None: self.coroutines = [] - for b in self.breakpoints: - b.delete() - self.breakpoints = [] + self._breakpoint.delete() + gdb.events.cont.disconnect(self._on_cont) + + def readjmp(self, env_ptr_expr: str) -> gdb_JmpBuf: + self._breakpoint.enabled = True + gdb.execute(f"call (void)cr_gdb_readjmp({env_ptr_expr})") + self._breakpoint.enabled = False + gdb.execute("queue-signal SIGWINCH") + return self._breakpoint.env + + def _on_cont(self, event: gdb.Event) -> None: + cur_threads = set(gdb.selected_inferior().threads()) + if cur_threads - self._known_threads: + # Ignore thread creation events. + self._known_threads = cur_threads + return + if self.coroutine_running: + if not self.coroutine_running.is_selected(): + if True: # https://sourceware.org/bugzilla/show_bug.cgi?id=32428 + print("Must return to running coroutine before continuing.") + print("Hit ^C twice then run:") + print(f" cr select {self.coroutine_running.id}") + while True: + time.sleep(1) + assert self.coroutine_running._cont_env + gdb_longjmp(self.coroutine_running._cont_env) + for cr in self.coroutines: + cr._cont_env = None def is_valid_cid(self, cid: int) -> bool: return 0 < cid and cid <= len(self.coroutines) @property - def coroutine_running(self) -> int: - return int(gdb.parse_and_eval("coroutine_running")) + def coroutine_running(self) -> "CrCoroutine | None": + cid = int(gdb.parse_and_eval("coroutine_running")) + if not self.is_valid_cid(cid): + return None + return self.coroutines[cid - 1] + + @property + def coroutine_selected(self) -> "CrCoroutine | None": + for cr in self.coroutines: + if cr.is_selected(): + return cr + return None @property def CR_NONE(self) -> gdb.Value: return gdb.parse_and_eval("CR_NONE") + @property + def CR_RUNNING(self) -> gdb.Value: + return gdb.parse_and_eval("CR_RUNNING") + + +class CrBreakpointUnwinder(gdb.unwinder.Unwinder): + """Used to temporarily disable unwinding so that + gdb/breakpoint.c:check_longjmp_breakpoint_for_call_dummy() doesn't + prematurely garbage collect the `call`-dummy-frame. + + """ + + def __init__(self) -> None: + super().__init__("cr_breakpoint_unwinder") + + # The .pyi is wrong; it says `Frame` instead of `PendingFrame`. + def __call__(self, pending_frame: gdb.PendingFrame) -> gdb.UnwindInfo | None: + # Stop unwinding with stop_reason=UNWIND_NO_SAVED_PC by + # returning an UnwindInfo that doesn't have + # `.add_saved_register("pc", ...)`. + return pending_frame.create_unwind_info( + gdb.unwinder.FrameId( + sp=pending_frame.read_register("sp"), + pc=pending_frame.read_register("pc"), + ) + ) + + +class CrBreakpoint(gdb.Breakpoint): + env: gdb_JmpBuf + _unwinder_locus: gdb.Objfile + _unwinder: CrBreakpointUnwinder + + def __init__(self) -> None: + self.env = gdb_JmpBuf() + + self._unwinder = CrBreakpointUnwinder() + readjmp_sym = gdb.lookup_global_symbol("cr_gdb_readjmp") + assert readjmp_sym + self._unwinder_locus = readjmp_sym.symtab.objfile + gdb.unwinder.register_unwinder(self._unwinder_locus, self._unwinder, True) + self._unwinder.enabled = False + + super().__init__( + function="cr_gdb_breakpoint", type=gdb.BP_BREAKPOINT, internal=True + ) + + @property + def enabled(self) -> bool: + return super().enabled + + @enabled.setter + def enabled(self, value: bool) -> None: + self._unwinder.enabled = value + gdb.Breakpoint.enabled.__set__(self, value) # type: ignore + + def stop(self) -> bool: + assert self._unwinder.enabled + self._unwinder.enabled = False + self.env = gdb_setjmp() + self._unwinder.enabled = True + return False # don't stop + + def delete(self) -> None: + gdb_unregister_unwinder(self._unwinder_locus, self._unwinder) + super().delete() + + +def cr_select_top_frame() -> None: + gdb.execute("select-frame level 0") + base_frame = gdb.selected_frame() + while True: + fn = gdb.selected_frame().name() + if fn and (fn.startswith("cr_") or fn.startswith("_cr_")): + older = gdb.selected_frame().older() + if not older: + base_frame.select() + break + older.select() + else: + break + class CrCoroutine: cr_globals: CrGlobals cid: int - env: gdb_JmpBuf | None + _cont_env: gdb_JmpBuf | None def __init__(self, cr_globals: CrGlobals, cid: int) -> None: self.cr_globals = cr_globals self.cid = cid - self.env = None + self._cont_env = None @property def id(self) -> int: @@ -114,50 +256,39 @@ class CrCoroutine: bs[i] = int(gdb.parse_and_eval(f"coroutine_table[{self.cid-1}].name[{i}]")) return bytes(bs).decode("UTF-8").split("\x00", maxsplit=1)[0] + def is_selected(self) -> bool: + sp = int(gdb.parse_and_eval("$sp")) + lo = int(gdb.parse_and_eval(f"coroutine_table[{self.id-1}].stack")) + hi = lo + int(gdb.parse_and_eval(f"coroutine_table[{self.id-1}].stack_size")) + return lo <= sp and sp < hi + + def select(self, level: int = -1) -> None: + if self.cr_globals.coroutine_selected: + self.cr_globals.coroutine_selected._cont_env = gdb_setjmp() + + if self._cont_env: + gdb_longjmp(self._cont_env) + else: + env: gdb_JmpBuf + if self == self.cr_globals.coroutine_running: + assert False # self._cont_env should have been set + elif self.state == self.cr_globals.CR_RUNNING: + env = self.cr_globals.readjmp("&coroutine_add_env") + else: + env = self.cr_globals.readjmp(f"&coroutine_table[{self.id-1}].env") + gdb_longjmp(env) + cr_select_top_frame() + @contextlib.contextmanager - def active(self) -> typing.Iterator[None]: + def with_selected(self) -> typing.Iterator[None]: saved_env = gdb_setjmp() - cr_env = self.env - if self.cid == self.cr_globals.coroutine_running: - cr_env = saved_env - if not cr_env: - raise gdb.GdbError( - f"GDB does not have a saved execution environment for coroutine {self.id}. " - + "This can happen if the coroutine has not run for as long as GDB has been attached." - ) - - gdb_longjmp(cr_env) + self.select() try: yield finally: gdb_longjmp(saved_env) -class CrSetJmpBreakpoint(gdb.Breakpoint): - cr_globals: CrGlobals - - def __init__(self, cr_globals: CrGlobals) -> None: - self.cr_globals = cr_globals - cr_globals.breakpoints += [self] - super().__init__( - function="_cr_plat_setjmp_pre", type=gdb.BP_BREAKPOINT, internal=True - ) - - def stop(self) -> bool: - if bool(gdb.parse_and_eval("env == &coroutine_add_env")): - cid = self.cr_globals.coroutine_running - elif bool(gdb.parse_and_eval("env == &coroutine_main_env")): - cid = 0 - else: - idx = gdb.parse_and_eval( - "( ((char*)env)-((char *)&coroutine_table)) / sizeof(coroutine_table[0])" - ) - cid = int(idx) + 1 - if self.cr_globals.is_valid_cid(cid): - self.cr_globals.coroutines[cid - 1].env = gdb_setjmp() - return False - - # User-facing commands ######################################################### @@ -176,7 +307,12 @@ class CrCommand(gdb.Command): class CrListCommand(gdb.Command): """List libcr coroutines. - Usage: cr list""" + Usage: cr list + + In the output: + - the 'R' marker indicates the currently-running coroutine + - the 'G' marker indicates the coroutine that GDB is viewing; this may be changed with `cr select` + """ cr_globals: CrGlobals @@ -189,59 +325,80 @@ class CrListCommand(gdb.Command): if len(argv) != 0: raise gdb.GdbError(f"Usage: cr list") - w_marker = 1 - w_id = max(len("Id"), len(str(len(self.cr_globals.coroutines)))) - w_name = max( - len("Name"), int(gdb.parse_and_eval("sizeof(coroutine_table[0].name)")) - ) - w_state = len("CR_INITIALIZING") - print( - f" {'Id'.ljust(w_id)} {'Name'.ljust(w_name)} {'State'.ljust(w_state)} Frame" - ) + rows: list[tuple[str, str, str, str, str]] = [ + ("", "Id", "Name", "State", "Frame") + ] for cr in self.cr_globals.coroutines: if cr.state == self.cr_globals.CR_NONE: continue - v_marker = "*" if cr.id == self.cr_globals.coroutine_running else "" - v_id = str(cr.id) - v_name = cr.name - v_state = str(cr.state) - v_frame = self._pretty_frame(cr, from_tty) - print( - f"{v_marker.ljust(w_marker)} {v_id.ljust(w_id)} {v_name.ljust(w_name)} {v_state.ljust(w_state)} {v_frame}" - ) + rows += [ + ( + "".join( + [ + "R" if cr == self.cr_globals.coroutine_running else " ", + "G" if cr.is_selected() else " ", + ] + ), + str(cr.id), + repr(cr.name), + str(cr.state), + self._pretty_frame(cr, from_tty), + ) + ] + + widths: list[int] = [ + max(len(row[col]) for row in rows) for col in range(len(rows[0])) + ] + + def line(row: tuple[str, str, str, str, str]) -> str: + + def cell(col: int) -> str: + return row[col].ljust(widths[col]) + + return f"{cell(0)} {cell(1)} {cell(2)} {cell(3)} {row[4]}" + + maxline = 0 + if screenwidth := gdb.parameter("width"): + assert isinstance(screenwidth, int) + maxline = max(screenwidth, len(line(rows[0]))) + + for row in rows: + l = line(row) + if maxline and len(l) > maxline: + l = l[:maxline] + print(l) def _pretty_frame(self, cr: CrCoroutine, from_tty: bool) -> str: try: - with cr.active(): + with cr.with_selected(): + saved_level = gdb.selected_frame().level() + cr_select_top_frame() full = gdb.execute("frame", from_tty=from_tty, to_string=True) + gdb.execute(f"select-frame level {saved_level}") except Exception as e: - full = "#1 err: " + str(e) + full = "#0 err: " + str(e) line = full.split("\n", maxsplit=1)[0] return line.split(maxsplit=1)[1] -class CrApplyCommand(gdb.Command): - """Apply a GDB command to libcr coroutines. - Usage: cr apply COROUTINE... -- COMMAND - COROUTINE is a space-separated list of coroutine IDs or names.""" +class CrSelectCommand(gdb.Command): + """Select the coroutine that GDB is viewing + Usage: cr select COROUTINE + COROUTINE is either a coroutine ID or coroutine name.""" cr_globals: CrGlobals def __init__(self, cr_globals: CrGlobals) -> None: self.cr_globals = cr_globals - gdb.Command.__init__(self, "cr apply", gdb.COMMAND_RUNNING, gdb.COMPLETE_NONE) + gdb.Command.__init__(self, "cr select", gdb.COMMAND_RUNNING, gdb.COMPLETE_NONE) def invoke(self, arg: str, from_tty: bool) -> None: argv = gdb.string_to_argv(arg) - if "--" not in argv: - raise gdb.GdbError("Usage: cr apply COROUTINE... -- COMMAND") - sep = argv.index("--") - crs = argv[:sep] - cmd = argv[sep + 1 :] - for spec in crs: - cr = self._find(spec) - with cr.active(): - gdb.execute(gdb_argv_to_string(cmd), from_tty=from_tty) + if len(argv) != 1: + raise gdb.GdbError("Usage: cr select COROUTINE") + cr = self._find(argv[0]) + cr.select() + gdb.execute("frame") def _find(self, name: str) -> CrCoroutine: if name.isnumeric(): @@ -266,25 +423,35 @@ class CrApplyCommand(gdb.Command): # Wire it all in ############################################################### +cr_globals: CrGlobals | None = None -def cr_initialize() -> None: - cr_globals = CrGlobals() - - CrSetJmpBreakpoint(cr_globals) +def cr_initialize() -> None: + global cr_globals + if cr_globals: + old = cr_globals + new = CrGlobals() + for i in range(min(len(old.coroutines), len(new.coroutines))): + new.coroutines[i]._cont_env = old.coroutines[i]._cont_env + old.delete() + cr_globals = new + else: + cr_globals = CrGlobals() CrCommand(cr_globals) CrListCommand(cr_globals) - CrApplyCommand(cr_globals) + CrSelectCommand(cr_globals) def cr_on_new_objfile(event: gdb.Event) -> None: if any( - objfile.lookup_static_symbol("_cr_plat_setjmp_pre") - for objfile in gdb.objfiles() + objfile.lookup_global_symbol("cr_gdb_readjmp") for objfile in gdb.objfiles() ): print("Initializing libcr integration...") cr_initialize() gdb.events.new_objfile.disconnect(cr_on_new_objfile) -gdb.events.new_objfile.connect(cr_on_new_objfile) +if cr_globals: + cr_initialize() +else: + gdb.events.new_objfile.connect(cr_on_new_objfile) |