From 408a74784e7c549db0d1f177137819867e5b9ad6 Mon Sep 17 00:00:00 2001 From: Tyler Goodlet Date: Thu, 6 Jun 2024 16:14:58 -0400 Subject: [PATCH] Catch `.pause_from_sync()` in root bg thread bugs! Originally discovered as while using `tractor.pause_from_sync()` from the `i3ipc` client running in a bg-thread that uses `asyncio` inside `modden`. Turns out we definitely aren't correctly handling `.pause_from_sync()` from the root actor when called from a `trio.to_thread.run_sync()` bg thread: - root-actor bg threads which can't `Lock._debug_lock.acquire()` since they aren't in `trio.Task`s. - even if scheduled via `.to_thread.run_sync(_debug._pause)` the acquirer won't be the task/thread which calls `Lock.release()` from `PdbREPL` hooks; this results in a RTE raised by `trio`.. - multiple threads will step on each other's stdio since cpython's GIL seems to ctx switch threads on every input from the user to the REPL loop.. Reproduce via reworking our example and test so that they catch and fail for all edge cases: - rework the `/examples/debugging/sync_bp.py` example to demonstrate the above issues, namely the stdio clobbering in the REPL when multiple threads and/or a subactor try to debug simultaneously. |_ run one thread using a task nursery to ensure it runs conc with the nursery's parent task. |_ ensure the bg threads run conc a subactor usage of `.pause_from_sync()`. |_ gravely detail all the special cases inside a TODO comment. |_ add some control flags to `sync_pause()` helper and don't use `breakpoint()` by default. - extend and adjust `test_debugger.test_pause_from_sync` to match (and thus currently fail) by ensuring exclusive `PdbREPL` attachment when the 2 bg root-actor threads are concurrently interacting alongside the subactor: |_ should only see one of the `_pause_msg` logs at a time for either one of the threads or the subactor. |_ ensure each attaches (in no particular order) before expecting the script to exit. Impl adjustments to `.devx._debug`: - drop `Lock.repl`, no longer used. - add `Lock._owned_by_root: bool` for the `.ctx_in_debug == None` root-actor-task active case. - always `log.exception()` for any `._debug_lock.release()` ownership RTE emitted by `trio`, like we used to.. - add special `Lock.release()` log message for the stale lock but `._owned_by_root == True` case; oh yeah and actually `log.devx(message)`.. - rename `Lock.acquire()` -> `.acquire_for_ctx()` since it's only ever used from subactor IPC usage; well that and for local root-task usage we should prolly add a `.acquire_from_root_task()`? - buncha `._pause()` impl improvements: |_ type `._pause()`'s `debug_func` as a `partial` as well. |_ offer `called_from_sync: bool` and `called_from_bg_thread: bool` for the special case handling when called from `.pause_from_sync()` |_ only set `DebugStatus.repl/repl_task` when `debug_func != None` (OW ensure the `.repl_task` is not the current one). |_ handle error logging even when `debug_func is None`.. |_ lotsa detailed commentary around root-actor-bg-thread special cases. - when `._set_trace(hide_tb=False)` do `pdbp.set_trace(frame=currentframe())` so the `._debug` internal frames are always included. - by default always hide tracebacks for `.pause[_from_sync]()` internals. - improve `.pause_from_sync()` to avoid root-bg-thread crashes: |_ pass new `called_from_xxx_` flags and ensure `DebugStatus.repl_task` is actually set to the `threading.current_thread()` when needed. |_ manually call `Lock._debug_lock.acquire_nowait()` for the non-bg thread case. |_ TODO: still need to implement the bg-thread case using a bg `trio.Task`-in-thread with an `trio.Event` set by thread REPL exit. --- examples/debugging/sync_bp.py | 125 +++++++++++++++--- tests/test_debugger.py | 71 +++++++--- tractor/devx/_debug.py | 241 +++++++++++++++++++++++----------- 3 files changed, 323 insertions(+), 114 deletions(-) diff --git a/examples/debugging/sync_bp.py b/examples/debugging/sync_bp.py index efa4e40..e265df4 100644 --- a/examples/debugging/sync_bp.py +++ b/examples/debugging/sync_bp.py @@ -1,15 +1,32 @@ +from functools import partial +import time +from threading import current_thread + import trio import tractor def sync_pause( - use_builtin: bool = True, + use_builtin: bool = False, error: bool = False, + hide_tb: bool = True, + pre_sleep: float|None = None, ): + if pre_sleep: + time.sleep(pre_sleep) + if use_builtin: - breakpoint(hide_tb=False) + print( + f'Entering `breakpoint()` from\n' + f'{current_thread()}\n' + ) + breakpoint(hide_tb=hide_tb) else: + print( + f'Entering `tractor.pause_from_sync()` from\n' + f'{current_thread()}@{tractor.current_actor().uid}\n' + ) tractor.pause_from_sync() if error: @@ -25,44 +42,114 @@ async def start_n_sync_pause( # sync to parent-side task await ctx.started() - print(f'entering SYNC PAUSE in {actor.uid}') + print(f'Entering `sync_pause()` in subactor: {actor.uid}\n') sync_pause() - print(f'back from SYNC PAUSE in {actor.uid}') + print(f'Exited `sync_pause()` in subactor: {actor.uid}\n') async def main() -> None: - async with tractor.open_nursery( - # NOTE: required for pausing from sync funcs - maybe_enable_greenback=True, - debug_mode=True, - ) as an: + async with ( + tractor.open_nursery( + # NOTE: required for pausing from sync funcs + maybe_enable_greenback=True, + debug_mode=True, + # loglevel='cancel', + ) as an, + trio.open_nursery() as tn, + ): + # just from root task + sync_pause() p: tractor.Portal = await an.start_actor( 'subactor', enable_modules=[__name__], # infect_asyncio=True, debug_mode=True, - loglevel='cancel', ) # TODO: 3 sub-actor usage cases: + # -[x] via a `.open_context()` # -[ ] via a `.run_in_actor()` call # -[ ] via a `.run()` - # -[ ] via a `.open_context()` - # + # -[ ] via a `.to_thread.run_sync()` in subactor async with p.open_context( start_n_sync_pause, ) as (ctx, first): assert first is None - await tractor.pause() - sync_pause() + # TODO: handle bg-thread-in-root-actor special cases! + # + # there are a couple very subtle situations possible here + # and they are likely to become more important as cpython + # moves to support no-GIL. + # + # Cases: + # 1. root-actor bg-threads that call `.pause_from_sync()` + # whilst an in-tree subactor also is using ` .pause()`. + # |_ since the root-actor bg thread can not + # `Lock._debug_lock.acquire_nowait()` without running + # a `trio.Task`, AND because the + # `PdbREPL.set_continue()` is called from that + # bg-thread, we can not `._debug_lock.release()` + # either! + # |_ this results in no actor-tree `Lock` being used + # on behalf of the bg-thread and thus the subactor's + # task and the thread trying to to use stdio + # simultaneously which results in the classic TTY + # clobbering! + # + # 2. mutiple sync-bg-threads that call + # `.pause_from_sync()` where one is scheduled via + # `Nursery.start_soon(to_thread.run_sync)` in a bg + # task. + # + # Due to the GIL, the threads never truly try to step + # through the REPL simultaneously, BUT their `logging` + # and traceback outputs are interleaved since the GIL + # (seemingly) on every REPL-input from the user + # switches threads.. + # + # Soo, the context switching semantics of the GIL + # result in a very confusing and messy interaction UX + # since eval and (tb) print output is NOT synced to + # each REPL-cycle (like we normally make it via + # a `.set_continue()` callback triggering the + # `Lock.release()`). Ideally we can solve this + # usability issue NOW because this will of course be + # that much more important when eventually there is no + # GIL! - # TODO: make this work!! - await trio.to_thread.run_sync( - sync_pause, - abandon_on_cancel=False, - ) + # XXX should cause double REPL entry and thus TTY + # clobbering due to case 1. above! + tn.start_soon( + partial( + trio.to_thread.run_sync, + partial( + sync_pause, + use_builtin=False, + # pre_sleep=0.5, + ), + abandon_on_cancel=True, + thread_name='start_soon_root_bg_thread', + ) + ) + + await tractor.pause() + + # XXX should cause double REPL entry and thus TTY + # clobbering due to case 2. above! + await trio.to_thread.run_sync( + partial( + sync_pause, + # NOTE this already works fine since in the new + # thread the `breakpoint()` built-in is never + # overloaded, thus NO locking is used, HOWEVER + # the case 2. from above still exists! + use_builtin=True, + ), + abandon_on_cancel=False, + thread_name='inline_root_bg_thread', + ) await ctx.cancel() diff --git a/tests/test_debugger.py b/tests/test_debugger.py index 72778bd..5f818a6 100644 --- a/tests/test_debugger.py +++ b/tests/test_debugger.py @@ -1073,6 +1073,8 @@ def test_pause_from_sync( ''' child = spawn('sync_bp') + + # first `sync_pause()` after nurseries open child.expect(PROMPT) assert_before( child, @@ -1087,43 +1089,70 @@ def test_pause_from_sync( do_ctlc(child) child.sendline('c') + + + # first `await tractor.pause()` inside `p.open_context()` body child.expect(PROMPT) - # XXX shouldn't see gb loaded again + # XXX shouldn't see gb loaded message with PDB loglevel! before = str(child.before.decode()) assert not in_prompt_msg( before, ['`greenback` portal opened!'], ) + # should be same root task assert_before( child, - [_pause_msg, "('root'",], + [ + _pause_msg, + " CancelScope|None: if not is_root_process(): @@ -223,6 +219,7 @@ class Lock: ctx_in_debug: Context|None = None req_handler_finished: trio.Event|None = None + _owned_by_root: bool = False _debug_lock: trio.StrictFIFOLock = trio.StrictFIFOLock() _blocked: set[ tuple[str, str] # `Actor.uid` for per actor @@ -231,23 +228,16 @@ class Lock: @classmethod def repr(cls) -> str: - - # both root and subs + lock_stats: trio.LockStatistics = cls._debug_lock.statistics() fields: str = ( - f'repl: {cls.repl}\n' + f'req_handler_finished: {cls.req_handler_finished}\n' + f'_blocked: {cls._blocked}\n\n' + f'_debug_lock: {cls._debug_lock}\n' + f'lock_stats: {lock_stats}\n' + f'ctx_in_debug: {cls.ctx_in_debug}\n' + ) - if is_root_process(): - lock_stats: trio.LockStatistics = cls._debug_lock.statistics() - fields += ( - f'req_handler_finished: {cls.req_handler_finished}\n' - - f'_blocked: {cls._blocked}\n\n' - f'_debug_lock: {cls._debug_lock}\n' - f'lock_stats: {lock_stats}\n' - - ) - body: str = textwrap.indent( fields, prefix=' |_', @@ -256,8 +246,6 @@ class Lock: f'<{cls.__name__}(\n' f'{body}' ')>\n\n' - - f'{cls.ctx_in_debug}\n' ) @classmethod @@ -266,7 +254,10 @@ class Lock: cls, force: bool = False, ): - message: str = 'TTY lock not held by any child\n' + if not cls._owned_by_root: + message: str = 'TTY lock not held by any child\n' + else: + message: str = 'TTY lock held in root-actor task\n' if not (is_trio_main := DebugStatus.is_main_trio_thread()): task: threading.Thread = threading.current_thread() @@ -279,8 +270,20 @@ class Lock: if ( lock.locked() and - owner is task - # ^-NOTE-^ if not will raise a RTE.. + ( + owner is task + # or + # cls._owned_by_root + ) + # ^-NOTE-^ if we do NOT ensure this, `trio` will + # raise a RTE when a non-owner tries to releasee the + # lock. + # + # Further we need to be extra pedantic about the + # correct task, greenback-spawned-task and/or thread + # being set to the `.repl_task` such that the above + # condition matches and we actually release the lock. + # This is particular of note from `.pause_from_sync()`! ): if not is_trio_main: trio.from_thread.run_sync( @@ -290,6 +293,10 @@ class Lock: cls._debug_lock.release() message: str = 'TTY lock released for child\n' + except RuntimeError as rte: + log.exception('Failed to release `Lock`?') + raise rte + finally: # IFF there are no more requesting tasks queued up fire, the # "tty-unlocked" event thereby alerting any monitors of the lock that @@ -305,7 +312,11 @@ class Lock: ): message += '-> No more child ctx tasks hold the TTY lock!\n' - elif req_handler_finished: + elif ( + req_handler_finished + and + lock.locked() + ): req_stats = req_handler_finished.statistics() message += ( f'-> A child ctx task still owns the `Lock` ??\n' @@ -315,9 +326,20 @@ class Lock: cls.ctx_in_debug = None + if ( + cls._owned_by_root + ): + if not lock.locked(): + cls._owned_by_root = False + else: + message += 'Lock still held by root actor task?!?\n' + lock.release() + + log.devx(message) + @classmethod @acm - async def acquire( + async def acquire_for_ctx( cls, ctx: Context, @@ -372,7 +394,7 @@ class Lock: ) # NOTE: critical section: this yield is unshielded! - + # # IF we received a cancel during the shielded lock entry of some # next-in-queue requesting task, then the resumption here will # result in that ``trio.Cancelled`` being raised to our caller @@ -384,7 +406,7 @@ class Lock: yield cls._debug_lock finally: - message :str = 'Exiting `Lock.acquire()` on behalf of sub-actor\n' + message :str = 'Exiting `Lock.acquire_for_ctx()` on behalf of sub-actor\n' if we_acquired: message += '-> TTY lock released by child\n' cls.release() @@ -468,11 +490,11 @@ async def lock_tty_for_child( # TODO: use `.msg._ops.maybe_limit_plds()` here instead so we # can merge into a single async with, with the - # `Lock.acquire()` enter below? + # `Lock.acquire_for_ctx()` enter below? # # enable the locking msgspec with apply_debug_pldec(): - async with Lock.acquire(ctx=ctx): + async with Lock.acquire_for_ctx(ctx=ctx): debug_lock_cs.shield = True log.devx( @@ -567,6 +589,11 @@ class DebugStatus: whenever a local task is an active REPL. ''' + # XXX local ref to the `pdbp.Pbp` instance, ONLY set in the + # actor-process that currently has activated a REPL i.e. it + # should be `None` (unset) in any other actor-process that does + # not yet have the `Lock` acquired via a root-actor debugger + # request. repl: PdbREPL|None = None # TODO: yet again this looks like a task outcome where we need @@ -1443,7 +1470,7 @@ class DebugRequestError(RuntimeError): async def _pause( - debug_func: Callable|None, + debug_func: Callable|partial|None, # NOTE: must be passed in the `.pause_from_sync()` case! repl: PdbREPL|None = None, @@ -1457,7 +1484,9 @@ async def _pause( # be no way to override it?.. # shield: bool = False, - hide_tb: bool = False, + hide_tb: bool = True, + called_from_sync: bool = False, + called_from_bg_thread: bool = False, task_status: TaskStatus[trio.Event] = trio.TASK_STATUS_IGNORED, **debug_func_kwargs, @@ -1502,27 +1531,15 @@ async def _pause( # -[ ] factor out better, main reason for it is common logic for # both root and sub repl entry def _enter_repl_sync( - debug_func: Callable, + debug_func: partial[None], ) -> None: __tracebackhide__: bool = hide_tb + debug_func_name: str = ( + debug_func.func.__name__ if debug_func else 'None' + ) try: - # set local actor task to avoid recurrent - # entries/requests from the same local task (to the root - # process). - DebugStatus.repl_task = task - DebugStatus.repl = repl - - # TODO: do we want to support using this **just** for the - # locking / common code (prolly to help address #320)? - if debug_func is None: - task_status.started(DebugStatus) - - else: - log.warning( - 'Entering REPL for task fuck you!\n' - f'{task}\n' - ) + if debug_func: # block here one (at the appropriate frame *up*) where # ``breakpoint()`` was awaited and begin handling stdio. log.devx( @@ -1531,6 +1548,12 @@ async def _pause( f' |_{task}\n' ) + # set local actor task to avoid recurrent + # entries/requests from the same local task (to the root + # process). + DebugStatus.repl = repl + DebugStatus.repl_task = task + # invoke the low-level REPL activation routine which itself # should call into a `Pdb.set_trace()` of some sort. debug_func( @@ -1539,10 +1562,27 @@ async def _pause( **debug_func_kwargs, ) + # TODO: maybe invert this logic and instead + # do `assert debug_func is None` when + # `called_from_sync`? + else: + if ( + called_from_sync + # and + # is_root_process() + and + not DebugStatus.is_main_trio_thread() + ): + assert DebugStatus.repl_task is not task + + # TODO: do we want to support using this **just** for the + # locking / common code (prolly to help address #320)? + task_status.started(DebugStatus) + except trio.Cancelled: log.exception( - 'Cancelled during invoke of internal `debug_func = ' - f'{debug_func.func.__name__}`\n' + 'Cancelled during invoke of internal\n\n' + f'`debug_func = {debug_func_name}`\n' ) # XXX NOTE: DON'T release lock yet raise @@ -1550,8 +1590,8 @@ async def _pause( except BaseException: __tracebackhide__: bool = False log.exception( - 'Failed to invoke internal `debug_func = ' - f'{debug_func.func.__name__}`\n' + 'Failed to invoke internal\n\n' + f'`debug_func = {debug_func_name}`\n' ) # NOTE: OW this is ONLY called from the # `.set_continue/next` hooks! @@ -1597,34 +1637,56 @@ async def _pause( f'This root actor task is already within an active REPL session\n' f'Ignoring this re-entered `tractor.pause()`\n' f'task: {task.name}\n' - f'REPL: {Lock.repl}\n' # TODO: use `._frame_stack` scanner to find the @api_frame ) with trio.CancelScope(shield=shield): await trio.lowlevel.checkpoint() return - # XXX: since we need to enter pdb synchronously below, - # we have to release the lock manually from pdb completion - # callbacks. Can't think of a nicer way then this atm. + # must shield here to avoid hitting a `Cancelled` and + # a child getting stuck bc we clobbered the tty with trio.CancelScope(shield=shield): if Lock._debug_lock.locked(): - log.warning( - 'attempting to shield-acquire active TTY lock owned by\n' + + acq_prefix: str = 'shield-' if shield else '' + ctx_line: str = ( + 'lock owned by ctx\n\n' f'{ctx}' + ) if ctx else 'stale lock with no request ctx!?' + log.devx( + f'attempting to {acq_prefix}acquire active TTY ' + f'{ctx_line}' ) - # must shield here to avoid hitting a ``Cancelled`` and - # a child getting stuck bc we clobbered the tty - # with trio.CancelScope(shield=True): - await Lock._debug_lock.acquire() - else: - # may be cancelled + # XXX: since we need to enter pdb synchronously below, + # and we don't want to block the thread that starts + # stepping through the application thread, we later + # must `Lock._debug_lock.release()` manually from + # some `PdbREPL` completion callback(`.set_[continue/exit]()`). + # + # So, when `._pause()` is called from a (bg/non-trio) + # thread, special provisions are needed and we need + # to do the `.acquire()`/`.release()` calls from + # a common `trio.task` (due to internal impl of + # `FIFOLock`). Thus we do not acquire here and + # instead expect `.pause_from_sync()` to take care of + # this detail depending on the caller's (threading) + # usage. + # + # NOTE that this special case is ONLY required when + # using `.pause_from_sync()` from the root actor + # since OW a subactor will instead make an IPC + # request (in the branch below) to acquire the + # `Lock`-mutex and a common root-actor RPC task will + # take care of `._debug_lock` mgmt! + if not called_from_sync: await Lock._debug_lock.acquire() + Lock._owned_by_root = True # enter REPL from root, no TTY locking IPC ctx necessary + # since we can acquire the `Lock._debug_lock` directly in + # thread. _enter_repl_sync(debug_func) - return # next branch is mutex and for subactors # TODO: need a more robust check for the "root" actor elif ( @@ -1843,6 +1905,11 @@ def _set_trace( # called our API. caller_frame: FrameType = api_frame.f_back # type: ignore + # pretend this frame is the caller frame to show + # the entire call-stack all the way down to here. + if not hide_tb: + caller_frame: FrameType = inspect.currentframe() + # engage ze REPL # B~() repl.set_trace(frame=caller_frame) @@ -1850,7 +1917,7 @@ def _set_trace( async def pause( *, - hide_tb: bool = False, + hide_tb: bool = True, api_frame: FrameType|None = None, # TODO: figure out how to still make this work: @@ -1970,13 +2037,12 @@ async def maybe_init_greenback( # runtime aware version which takes care of all . def pause_from_sync( - hide_tb: bool = False, - # proxied to `_pause()` + hide_tb: bool = True, - **_pause_kwargs, - # for eg. + # proxy to `._pause()`, for ex: # shield: bool = False, # api_frame: FrameType|None = None, + **_pause_kwargs, ) -> None: @@ -2020,26 +2086,53 @@ def pause_from_sync( # noop: non-cancelled `.to_thread` # `trio.Cancelled`: cancelled `.to_thread` # + log.warning( + 'Engaging `.pause_from_sync()` from ANOTHER THREAD!' + ) + task: threading.Thread = threading.current_thread() + DebugStatus.repl_task: str = task + + # TODO: make root-actor bg thread usage work! + # if is_root_process(): + # async def _pause_from_sync_thread(): + # ... + # else: + # .. the below .. + trio.from_thread.run( partial( _pause, debug_func=None, repl=mdb, + hide_tb=hide_tb, + + # XXX to prevent `._pause()` for setting + # `DebugStatus.repl_task` to the gb task! + called_from_sync=True, + called_from_bg_thread=True, + **_pause_kwargs ), ) - task: threading.Thread = threading.current_thread() else: # we are presumably the `trio.run()` + main thread task: trio.Task = current_task() + DebugStatus.repl_task: str = task greenback.await_( _pause( debug_func=None, repl=mdb, + hide_tb=hide_tb, + called_from_sync=True, **_pause_kwargs, ) ) - DebugStatus.repl_task: str = current_task() + + if is_root_process(): + # Manually acquire since otherwise on release we'll + # get a RTE raised by `trio` due to ownership.. + Lock._debug_lock.acquire_nowait() + Lock._owned_by_root = True # TODO: ensure we aggressively make the user aware about # entering the global ``breakpoint()`` built-in from sync