diff options
Diffstat (limited to 'gdb-helpers/libcr.py')
-rw-r--r-- | gdb-helpers/libcr.py | 191 |
1 files changed, 109 insertions, 82 deletions
diff --git a/gdb-helpers/libcr.py b/gdb-helpers/libcr.py index 3ffafce..6f95a81 100644 --- a/gdb-helpers/libcr.py +++ b/gdb-helpers/libcr.py @@ -1,17 +1,20 @@ # gdb-helpers/libcr.py - GDB helpers for libcr. # -# Copyright (C) 2024 Luke T. Shumaker <lukeshu@lukeshu.com> +# Copyright (C) 2024-2025 Luke T. Shumaker <lukeshu@lukeshu.com> # SPDX-License-Identifier: AGPL-3.0-or-later import contextlib import time import typing -import gdb -import gdb.unwinder +import gdb # pylint: disable=import-error +import gdb.unwinder # pylint: disable=import-error # GDB helpers ################################################################## +# https://sourceware.org/bugzilla/show_bug.cgi?id=32428 +gdb_bug_32428 = True + class _gdb_Locus(typing.Protocol): @property @@ -86,6 +89,7 @@ def gdb_longjmp(buf: gdb_JmpBuf) -> None: class CrGlobals: + main: "CrMain" coroutines: list["CrCoroutine"] _breakpoint: "CrBreakpoint" _known_threads: set[gdb.InferiorThread] @@ -95,6 +99,7 @@ class CrGlobals: gdb.parse_and_eval("sizeof(coroutine_table)/sizeof(coroutine_table[0])") ) + self.main = CrMain(self) self.coroutines = [CrCoroutine(self, i + 1) for i in range(num)] self._breakpoint = CrBreakpoint() @@ -123,35 +128,37 @@ class CrGlobals: # Ignore thread creation events. self._known_threads = cur_threads return - if self.coroutine_running: - if not self.coroutine_running.is_selected(): - if True: # https://sourceware.org/bugzilla/show_bug.cgi?id=32428 - print("Must return to running coroutine before continuing.") - print("Hit ^C twice then run:") - print(f" cr select {self.coroutine_running.id}") - while True: - time.sleep(1) - assert self.coroutine_running._cont_env - gdb_longjmp(self.coroutine_running._cont_env) + if not self.coroutine_running.is_selected(): + if gdb_bug_32428: + print("Must return to running coroutine before continuing.") + print("Hit ^C twice then run:") + print(f" cr select {self.coroutine_running.cid}") + while True: + time.sleep(1) + assert self.coroutine_running.cont_env + gdb_longjmp(self.coroutine_running.cont_env) + self.main.cont_env = None for cr in self.coroutines: - cr._cont_env = None + cr.cont_env = None def is_valid_cid(self, cid: int) -> bool: - return 0 < cid and cid <= len(self.coroutines) + return (0 < cid <= len(self.coroutines)) and ( + self.coroutines[cid - 1].state != self.CR_NONE + ) @property - def coroutine_running(self) -> "CrCoroutine | None": + def coroutine_running(self) -> "CrMain | CrCoroutine": cid = int(gdb.parse_and_eval("coroutine_running")) if not self.is_valid_cid(cid): - return None + return self.main return self.coroutines[cid - 1] @property - def coroutine_selected(self) -> "CrCoroutine | None": + def coroutine_selected(self) -> "CrMain | CrCoroutine": for cr in self.coroutines: if cr.is_selected(): return cr - return None + return self.main @property def CR_NONE(self) -> gdb.Value: @@ -161,6 +168,35 @@ class CrGlobals: def CR_RUNNING(self) -> gdb.Value: return gdb.parse_and_eval("CR_RUNNING") + def select(self, cr: "CrMain | CrCoroutine", level: int = -1) -> None: + self.coroutine_selected.cont_env = gdb_setjmp() + + if cr.cont_env: + gdb_longjmp(cr.cont_env) + else: + env: gdb_JmpBuf + if cr == self.coroutine_running: + assert False # cr.cont_env should have been set + match cr: + case CrMain(): + env = self.readjmp("&coroutine_main_env") + case CrCoroutine(): + if cr.state == self.CR_RUNNING: + env = self.readjmp("&coroutine_add_env") + else: + env = self.readjmp(f"&coroutine_table[{cr.cid-1}].env") + gdb_longjmp(env) + cr_select_top_frame() + + @contextlib.contextmanager + def with_selected(self, cr: "CrMain | CrCoroutine") -> typing.Iterator[None]: + saved_env = gdb_setjmp() + self.select(cr) + try: + yield + finally: + gdb_longjmp(saved_env) + class CrBreakpointUnwinder(gdb.unwinder.Unwinder): """Used to temporarily disable unwinding so that @@ -211,6 +247,8 @@ class CrBreakpoint(gdb.Breakpoint): @enabled.setter def enabled(self, value: bool) -> None: self._unwinder.enabled = value + # Use a dunder-call to avoid an infinite loop. + # pylint: disable=unnecessary-dunder-call gdb.Breakpoint.enabled.__set__(self, value) # type: ignore def stop(self) -> bool: @@ -240,19 +278,31 @@ def cr_select_top_frame() -> None: break +class CrMain: + cr_globals: CrGlobals + cont_env: gdb_JmpBuf | None + + def __init__(self, cr_globals: CrGlobals) -> None: + self.cr_globals = cr_globals + self.cont_env = None + + @property + def cid(self) -> int: + return 0 + + def is_selected(self) -> bool: + return not any(cr.is_selected() for cr in self.cr_globals.coroutines) + + class CrCoroutine: cr_globals: CrGlobals cid: int - _cont_env: gdb_JmpBuf | None + cont_env: gdb_JmpBuf | None def __init__(self, cr_globals: CrGlobals, cid: int) -> None: self.cr_globals = cr_globals self.cid = cid - self._cont_env = None - - @property - def id(self) -> int: - return self.cid + self.cont_env = None @property def state(self) -> gdb.Value: @@ -267,35 +317,9 @@ class CrCoroutine: def is_selected(self) -> bool: sp = int(gdb.parse_and_eval("$sp")) - lo = int(gdb.parse_and_eval(f"coroutine_table[{self.id-1}].stack")) - hi = lo + int(gdb.parse_and_eval(f"coroutine_table[{self.id-1}].stack_size")) - return lo <= sp and sp < hi - - def select(self, level: int = -1) -> None: - if self.cr_globals.coroutine_selected: - self.cr_globals.coroutine_selected._cont_env = gdb_setjmp() - - if self._cont_env: - gdb_longjmp(self._cont_env) - else: - env: gdb_JmpBuf - if self == self.cr_globals.coroutine_running: - assert False # self._cont_env should have been set - elif self.state == self.cr_globals.CR_RUNNING: - env = self.cr_globals.readjmp("&coroutine_add_env") - else: - env = self.cr_globals.readjmp(f"&coroutine_table[{self.id-1}].env") - gdb_longjmp(env) - cr_select_top_frame() - - @contextlib.contextmanager - def with_selected(self) -> typing.Iterator[None]: - saved_env = gdb_setjmp() - self.select() - try: - yield - finally: - gdb_longjmp(saved_env) + lo = int(gdb.parse_and_eval(f"coroutine_table[{self.cid-1}].stack")) + hi = lo + int(gdb.parse_and_eval(f"coroutine_table[{self.cid-1}].stack_size")) + return lo <= sp < hi # User-facing commands ######################################################### @@ -332,13 +356,16 @@ class CrListCommand(gdb.Command): def invoke(self, arg: str, from_tty: bool) -> None: argv = gdb.string_to_argv(arg) if len(argv) != 0: - raise gdb.GdbError(f"Usage: cr list") + raise gdb.GdbError("Usage: cr list") rows: list[tuple[str, str, str, str, str]] = [ ("", "Id", "Name", "State", "Frame") ] - for cr in self.cr_globals.coroutines: - if cr.state == self.cr_globals.CR_NONE: + for cid in range(len(self.cr_globals.coroutines) + 1): + cr: CrMain | CrCoroutine = ( + self.cr_globals.coroutines[cid - 1] if cid else self.cr_globals.main + ) + if isinstance(cr, CrCoroutine) and cr.state == self.cr_globals.CR_NONE: continue rows += [ ( @@ -348,9 +375,9 @@ class CrListCommand(gdb.Command): "G" if cr.is_selected() else " ", ] ), - str(cr.id), - repr(cr.name), - str(cr.state), + str(cr.cid), + repr(cr.name) if isinstance(cr, CrCoroutine) else "-", + str(cr.state) if isinstance(cr, CrCoroutine) else "-", self._pretty_frame(cr, from_tty), ) ] @@ -377,14 +404,14 @@ class CrListCommand(gdb.Command): l = l[:maxline] print(l) - def _pretty_frame(self, cr: CrCoroutine, from_tty: bool) -> str: + def _pretty_frame(self, cr: CrMain | CrCoroutine, from_tty: bool) -> str: try: - with cr.with_selected(): + with self.cr_globals.with_selected(cr): saved_level = gdb.selected_frame().level() cr_select_top_frame() full = gdb.execute("frame", from_tty=from_tty, to_string=True) gdb.execute(f"select-frame level {saved_level}") - except Exception as e: + except Exception as e: # pylint: disable=broad-exception-caught full = "#0 err: " + str(e) line = full.split("\n", maxsplit=1)[0] return line.split(maxsplit=1)[1] @@ -406,16 +433,15 @@ class CrSelectCommand(gdb.Command): if len(argv) != 1: raise gdb.GdbError("Usage: cr select COROUTINE") cr = self._find(argv[0]) - cr.select() + self.cr_globals.select(cr) gdb.execute("frame") - def _find(self, name: str) -> CrCoroutine: + def _find(self, name: str) -> CrMain | CrCoroutine: if name.isnumeric(): cid = int(name) - if ( - self.cr_globals.is_valid_cid(cid) - and self.cr_globals.coroutines[cid - 1].state != self.cr_globals.CR_NONE - ): + if cid == 0: + return self.cr_globals.main + if self.cr_globals.is_valid_cid(cid): return self.cr_globals.coroutines[cid - 1] crs: list[CrCoroutine] = [] for cr in self.cr_globals.coroutines: @@ -423,32 +449,33 @@ class CrSelectCommand(gdb.Command): crs += [cr] match len(crs): case 0: - raise gdb.GdbError(f"No such coroutine: {repr(name)}") + raise gdb.GdbError(f"No such coroutine: {name!r}") case 1: return crs[0] case _: - raise gdb.GdbError(f"Ambiguous name, must use Id: {repr(name)}") + raise gdb.GdbError(f"Ambiguous name, must use Id: {name!r}") # Wire it all in ############################################################### -cr_globals: CrGlobals | None = None +_cr_globals: CrGlobals | None = None def cr_initialize() -> None: - global cr_globals - if cr_globals: - old = cr_globals + global _cr_globals + if _cr_globals: + old = _cr_globals new = CrGlobals() + new.main.cont_env = old.main.cont_env for i in range(min(len(old.coroutines), len(new.coroutines))): - new.coroutines[i]._cont_env = old.coroutines[i]._cont_env + new.coroutines[i].cont_env = old.coroutines[i].cont_env old.delete() - cr_globals = new + _cr_globals = new else: - cr_globals = CrGlobals() - CrCommand(cr_globals) - CrListCommand(cr_globals) - CrSelectCommand(cr_globals) + _cr_globals = CrGlobals() + CrCommand(_cr_globals) + CrListCommand(_cr_globals) + CrSelectCommand(_cr_globals) def cr_on_new_objfile(event: gdb.Event) -> None: @@ -460,7 +487,7 @@ def cr_on_new_objfile(event: gdb.Event) -> None: gdb.events.new_objfile.disconnect(cr_on_new_objfile) -if cr_globals: +if _cr_globals: cr_initialize() else: gdb.events.new_objfile.connect(cr_on_new_objfile) |