diff --git a/examples/debugging/sync_bp.py b/examples/debugging/sync_bp.py index efa4e40..e265df4 100644 --- a/examples/debugging/sync_bp.py +++ b/examples/debugging/sync_bp.py @@ -1,15 +1,32 @@ +from functools import partial +import time +from threading import current_thread + import trio import tractor def sync_pause( - use_builtin: bool = True, + use_builtin: bool = False, error: bool = False, + hide_tb: bool = True, + pre_sleep: float|None = None, ): + if pre_sleep: + time.sleep(pre_sleep) + if use_builtin: - breakpoint(hide_tb=False) + print( + f'Entering `breakpoint()` from\n' + f'{current_thread()}\n' + ) + breakpoint(hide_tb=hide_tb) else: + print( + f'Entering `tractor.pause_from_sync()` from\n' + f'{current_thread()}@{tractor.current_actor().uid}\n' + ) tractor.pause_from_sync() if error: @@ -25,44 +42,114 @@ async def start_n_sync_pause( # sync to parent-side task await ctx.started() - print(f'entering SYNC PAUSE in {actor.uid}') + print(f'Entering `sync_pause()` in subactor: {actor.uid}\n') sync_pause() - print(f'back from SYNC PAUSE in {actor.uid}') + print(f'Exited `sync_pause()` in subactor: {actor.uid}\n') async def main() -> None: - async with tractor.open_nursery( - # NOTE: required for pausing from sync funcs - maybe_enable_greenback=True, - debug_mode=True, - ) as an: + async with ( + tractor.open_nursery( + # NOTE: required for pausing from sync funcs + maybe_enable_greenback=True, + debug_mode=True, + # loglevel='cancel', + ) as an, + trio.open_nursery() as tn, + ): + # just from root task + sync_pause() p: tractor.Portal = await an.start_actor( 'subactor', enable_modules=[__name__], # infect_asyncio=True, debug_mode=True, - loglevel='cancel', ) # TODO: 3 sub-actor usage cases: + # -[x] via a `.open_context()` # -[ ] via a `.run_in_actor()` call # -[ ] via a `.run()` - # -[ ] via a `.open_context()` - # + # -[ ] via a `.to_thread.run_sync()` in subactor async with p.open_context( start_n_sync_pause, ) as (ctx, first): assert first is None - await tractor.pause() - sync_pause() + # TODO: handle bg-thread-in-root-actor special cases! + # + # there are a couple very subtle situations possible here + # and they are likely to become more important as cpython + # moves to support no-GIL. + # + # Cases: + # 1. root-actor bg-threads that call `.pause_from_sync()` + # whilst an in-tree subactor also is using ` .pause()`. + # |_ since the root-actor bg thread can not + # `Lock._debug_lock.acquire_nowait()` without running + # a `trio.Task`, AND because the + # `PdbREPL.set_continue()` is called from that + # bg-thread, we can not `._debug_lock.release()` + # either! + # |_ this results in no actor-tree `Lock` being used + # on behalf of the bg-thread and thus the subactor's + # task and the thread trying to to use stdio + # simultaneously which results in the classic TTY + # clobbering! + # + # 2. mutiple sync-bg-threads that call + # `.pause_from_sync()` where one is scheduled via + # `Nursery.start_soon(to_thread.run_sync)` in a bg + # task. + # + # Due to the GIL, the threads never truly try to step + # through the REPL simultaneously, BUT their `logging` + # and traceback outputs are interleaved since the GIL + # (seemingly) on every REPL-input from the user + # switches threads.. + # + # Soo, the context switching semantics of the GIL + # result in a very confusing and messy interaction UX + # since eval and (tb) print output is NOT synced to + # each REPL-cycle (like we normally make it via + # a `.set_continue()` callback triggering the + # `Lock.release()`). Ideally we can solve this + # usability issue NOW because this will of course be + # that much more important when eventually there is no + # GIL! - # TODO: make this work!! - await trio.to_thread.run_sync( - sync_pause, - abandon_on_cancel=False, - ) + # XXX should cause double REPL entry and thus TTY + # clobbering due to case 1. above! + tn.start_soon( + partial( + trio.to_thread.run_sync, + partial( + sync_pause, + use_builtin=False, + # pre_sleep=0.5, + ), + abandon_on_cancel=True, + thread_name='start_soon_root_bg_thread', + ) + ) + + await tractor.pause() + + # XXX should cause double REPL entry and thus TTY + # clobbering due to case 2. above! + await trio.to_thread.run_sync( + partial( + sync_pause, + # NOTE this already works fine since in the new + # thread the `breakpoint()` built-in is never + # overloaded, thus NO locking is used, HOWEVER + # the case 2. from above still exists! + use_builtin=True, + ), + abandon_on_cancel=False, + thread_name='inline_root_bg_thread', + ) await ctx.cancel() diff --git a/tests/test_debugger.py b/tests/test_debugger.py index 72778bd..5f818a6 100644 --- a/tests/test_debugger.py +++ b/tests/test_debugger.py @@ -1073,6 +1073,8 @@ def test_pause_from_sync( ''' child = spawn('sync_bp') + + # first `sync_pause()` after nurseries open child.expect(PROMPT) assert_before( child, @@ -1087,43 +1089,70 @@ def test_pause_from_sync( do_ctlc(child) child.sendline('c') + + + # first `await tractor.pause()` inside `p.open_context()` body child.expect(PROMPT) - # XXX shouldn't see gb loaded again + # XXX shouldn't see gb loaded message with PDB loglevel! before = str(child.before.decode()) assert not in_prompt_msg( before, ['`greenback` portal opened!'], ) + # should be same root task assert_before( child, - [_pause_msg, "('root'",], + [ + _pause_msg, + " CancelScope|None: if not is_root_process(): @@ -223,6 +219,7 @@ class Lock: ctx_in_debug: Context|None = None req_handler_finished: trio.Event|None = None + _owned_by_root: bool = False _debug_lock: trio.StrictFIFOLock = trio.StrictFIFOLock() _blocked: set[ tuple[str, str] # `Actor.uid` for per actor @@ -231,23 +228,16 @@ class Lock: @classmethod def repr(cls) -> str: - - # both root and subs + lock_stats: trio.LockStatistics = cls._debug_lock.statistics() fields: str = ( - f'repl: {cls.repl}\n' + f'req_handler_finished: {cls.req_handler_finished}\n' + f'_blocked: {cls._blocked}\n\n' + f'_debug_lock: {cls._debug_lock}\n' + f'lock_stats: {lock_stats}\n' + f'ctx_in_debug: {cls.ctx_in_debug}\n' + ) - if is_root_process(): - lock_stats: trio.LockStatistics = cls._debug_lock.statistics() - fields += ( - f'req_handler_finished: {cls.req_handler_finished}\n' - - f'_blocked: {cls._blocked}\n\n' - f'_debug_lock: {cls._debug_lock}\n' - f'lock_stats: {lock_stats}\n' - - ) - body: str = textwrap.indent( fields, prefix=' |_', @@ -256,8 +246,6 @@ class Lock: f'<{cls.__name__}(\n' f'{body}' ')>\n\n' - - f'{cls.ctx_in_debug}\n' ) @classmethod @@ -266,7 +254,10 @@ class Lock: cls, force: bool = False, ): - message: str = 'TTY lock not held by any child\n' + if not cls._owned_by_root: + message: str = 'TTY lock not held by any child\n' + else: + message: str = 'TTY lock held in root-actor task\n' if not (is_trio_main := DebugStatus.is_main_trio_thread()): task: threading.Thread = threading.current_thread() @@ -279,8 +270,20 @@ class Lock: if ( lock.locked() and - owner is task - # ^-NOTE-^ if not will raise a RTE.. + ( + owner is task + # or + # cls._owned_by_root + ) + # ^-NOTE-^ if we do NOT ensure this, `trio` will + # raise a RTE when a non-owner tries to releasee the + # lock. + # + # Further we need to be extra pedantic about the + # correct task, greenback-spawned-task and/or thread + # being set to the `.repl_task` such that the above + # condition matches and we actually release the lock. + # This is particular of note from `.pause_from_sync()`! ): if not is_trio_main: trio.from_thread.run_sync( @@ -290,6 +293,10 @@ class Lock: cls._debug_lock.release() message: str = 'TTY lock released for child\n' + except RuntimeError as rte: + log.exception('Failed to release `Lock`?') + raise rte + finally: # IFF there are no more requesting tasks queued up fire, the # "tty-unlocked" event thereby alerting any monitors of the lock that @@ -305,7 +312,11 @@ class Lock: ): message += '-> No more child ctx tasks hold the TTY lock!\n' - elif req_handler_finished: + elif ( + req_handler_finished + and + lock.locked() + ): req_stats = req_handler_finished.statistics() message += ( f'-> A child ctx task still owns the `Lock` ??\n' @@ -315,9 +326,20 @@ class Lock: cls.ctx_in_debug = None + if ( + cls._owned_by_root + ): + if not lock.locked(): + cls._owned_by_root = False + else: + message += 'Lock still held by root actor task?!?\n' + lock.release() + + log.devx(message) + @classmethod @acm - async def acquire( + async def acquire_for_ctx( cls, ctx: Context, @@ -372,7 +394,7 @@ class Lock: ) # NOTE: critical section: this yield is unshielded! - + # # IF we received a cancel during the shielded lock entry of some # next-in-queue requesting task, then the resumption here will # result in that ``trio.Cancelled`` being raised to our caller @@ -384,7 +406,7 @@ class Lock: yield cls._debug_lock finally: - message :str = 'Exiting `Lock.acquire()` on behalf of sub-actor\n' + message :str = 'Exiting `Lock.acquire_for_ctx()` on behalf of sub-actor\n' if we_acquired: message += '-> TTY lock released by child\n' cls.release() @@ -468,11 +490,11 @@ async def lock_tty_for_child( # TODO: use `.msg._ops.maybe_limit_plds()` here instead so we # can merge into a single async with, with the - # `Lock.acquire()` enter below? + # `Lock.acquire_for_ctx()` enter below? # # enable the locking msgspec with apply_debug_pldec(): - async with Lock.acquire(ctx=ctx): + async with Lock.acquire_for_ctx(ctx=ctx): debug_lock_cs.shield = True log.devx( @@ -567,6 +589,11 @@ class DebugStatus: whenever a local task is an active REPL. ''' + # XXX local ref to the `pdbp.Pbp` instance, ONLY set in the + # actor-process that currently has activated a REPL i.e. it + # should be `None` (unset) in any other actor-process that does + # not yet have the `Lock` acquired via a root-actor debugger + # request. repl: PdbREPL|None = None # TODO: yet again this looks like a task outcome where we need @@ -1443,7 +1470,7 @@ class DebugRequestError(RuntimeError): async def _pause( - debug_func: Callable|None, + debug_func: Callable|partial|None, # NOTE: must be passed in the `.pause_from_sync()` case! repl: PdbREPL|None = None, @@ -1457,7 +1484,9 @@ async def _pause( # be no way to override it?.. # shield: bool = False, - hide_tb: bool = False, + hide_tb: bool = True, + called_from_sync: bool = False, + called_from_bg_thread: bool = False, task_status: TaskStatus[trio.Event] = trio.TASK_STATUS_IGNORED, **debug_func_kwargs, @@ -1502,27 +1531,15 @@ async def _pause( # -[ ] factor out better, main reason for it is common logic for # both root and sub repl entry def _enter_repl_sync( - debug_func: Callable, + debug_func: partial[None], ) -> None: __tracebackhide__: bool = hide_tb + debug_func_name: str = ( + debug_func.func.__name__ if debug_func else 'None' + ) try: - # set local actor task to avoid recurrent - # entries/requests from the same local task (to the root - # process). - DebugStatus.repl_task = task - DebugStatus.repl = repl - - # TODO: do we want to support using this **just** for the - # locking / common code (prolly to help address #320)? - if debug_func is None: - task_status.started(DebugStatus) - - else: - log.warning( - 'Entering REPL for task fuck you!\n' - f'{task}\n' - ) + if debug_func: # block here one (at the appropriate frame *up*) where # ``breakpoint()`` was awaited and begin handling stdio. log.devx( @@ -1531,6 +1548,12 @@ async def _pause( f' |_{task}\n' ) + # set local actor task to avoid recurrent + # entries/requests from the same local task (to the root + # process). + DebugStatus.repl = repl + DebugStatus.repl_task = task + # invoke the low-level REPL activation routine which itself # should call into a `Pdb.set_trace()` of some sort. debug_func( @@ -1539,10 +1562,27 @@ async def _pause( **debug_func_kwargs, ) + # TODO: maybe invert this logic and instead + # do `assert debug_func is None` when + # `called_from_sync`? + else: + if ( + called_from_sync + # and + # is_root_process() + and + not DebugStatus.is_main_trio_thread() + ): + assert DebugStatus.repl_task is not task + + # TODO: do we want to support using this **just** for the + # locking / common code (prolly to help address #320)? + task_status.started(DebugStatus) + except trio.Cancelled: log.exception( - 'Cancelled during invoke of internal `debug_func = ' - f'{debug_func.func.__name__}`\n' + 'Cancelled during invoke of internal\n\n' + f'`debug_func = {debug_func_name}`\n' ) # XXX NOTE: DON'T release lock yet raise @@ -1550,8 +1590,8 @@ async def _pause( except BaseException: __tracebackhide__: bool = False log.exception( - 'Failed to invoke internal `debug_func = ' - f'{debug_func.func.__name__}`\n' + 'Failed to invoke internal\n\n' + f'`debug_func = {debug_func_name}`\n' ) # NOTE: OW this is ONLY called from the # `.set_continue/next` hooks! @@ -1597,34 +1637,56 @@ async def _pause( f'This root actor task is already within an active REPL session\n' f'Ignoring this re-entered `tractor.pause()`\n' f'task: {task.name}\n' - f'REPL: {Lock.repl}\n' # TODO: use `._frame_stack` scanner to find the @api_frame ) with trio.CancelScope(shield=shield): await trio.lowlevel.checkpoint() return - # XXX: since we need to enter pdb synchronously below, - # we have to release the lock manually from pdb completion - # callbacks. Can't think of a nicer way then this atm. + # must shield here to avoid hitting a `Cancelled` and + # a child getting stuck bc we clobbered the tty with trio.CancelScope(shield=shield): if Lock._debug_lock.locked(): - log.warning( - 'attempting to shield-acquire active TTY lock owned by\n' + + acq_prefix: str = 'shield-' if shield else '' + ctx_line: str = ( + 'lock owned by ctx\n\n' f'{ctx}' + ) if ctx else 'stale lock with no request ctx!?' + log.devx( + f'attempting to {acq_prefix}acquire active TTY ' + f'{ctx_line}' ) - # must shield here to avoid hitting a ``Cancelled`` and - # a child getting stuck bc we clobbered the tty - # with trio.CancelScope(shield=True): - await Lock._debug_lock.acquire() - else: - # may be cancelled + # XXX: since we need to enter pdb synchronously below, + # and we don't want to block the thread that starts + # stepping through the application thread, we later + # must `Lock._debug_lock.release()` manually from + # some `PdbREPL` completion callback(`.set_[continue/exit]()`). + # + # So, when `._pause()` is called from a (bg/non-trio) + # thread, special provisions are needed and we need + # to do the `.acquire()`/`.release()` calls from + # a common `trio.task` (due to internal impl of + # `FIFOLock`). Thus we do not acquire here and + # instead expect `.pause_from_sync()` to take care of + # this detail depending on the caller's (threading) + # usage. + # + # NOTE that this special case is ONLY required when + # using `.pause_from_sync()` from the root actor + # since OW a subactor will instead make an IPC + # request (in the branch below) to acquire the + # `Lock`-mutex and a common root-actor RPC task will + # take care of `._debug_lock` mgmt! + if not called_from_sync: await Lock._debug_lock.acquire() + Lock._owned_by_root = True # enter REPL from root, no TTY locking IPC ctx necessary + # since we can acquire the `Lock._debug_lock` directly in + # thread. _enter_repl_sync(debug_func) - return # next branch is mutex and for subactors # TODO: need a more robust check for the "root" actor elif ( @@ -1843,6 +1905,11 @@ def _set_trace( # called our API. caller_frame: FrameType = api_frame.f_back # type: ignore + # pretend this frame is the caller frame to show + # the entire call-stack all the way down to here. + if not hide_tb: + caller_frame: FrameType = inspect.currentframe() + # engage ze REPL # B~() repl.set_trace(frame=caller_frame) @@ -1850,7 +1917,7 @@ def _set_trace( async def pause( *, - hide_tb: bool = False, + hide_tb: bool = True, api_frame: FrameType|None = None, # TODO: figure out how to still make this work: @@ -1970,13 +2037,12 @@ async def maybe_init_greenback( # runtime aware version which takes care of all . def pause_from_sync( - hide_tb: bool = False, - # proxied to `_pause()` + hide_tb: bool = True, - **_pause_kwargs, - # for eg. + # proxy to `._pause()`, for ex: # shield: bool = False, # api_frame: FrameType|None = None, + **_pause_kwargs, ) -> None: @@ -2020,26 +2086,53 @@ def pause_from_sync( # noop: non-cancelled `.to_thread` # `trio.Cancelled`: cancelled `.to_thread` # + log.warning( + 'Engaging `.pause_from_sync()` from ANOTHER THREAD!' + ) + task: threading.Thread = threading.current_thread() + DebugStatus.repl_task: str = task + + # TODO: make root-actor bg thread usage work! + # if is_root_process(): + # async def _pause_from_sync_thread(): + # ... + # else: + # .. the below .. + trio.from_thread.run( partial( _pause, debug_func=None, repl=mdb, + hide_tb=hide_tb, + + # XXX to prevent `._pause()` for setting + # `DebugStatus.repl_task` to the gb task! + called_from_sync=True, + called_from_bg_thread=True, + **_pause_kwargs ), ) - task: threading.Thread = threading.current_thread() else: # we are presumably the `trio.run()` + main thread task: trio.Task = current_task() + DebugStatus.repl_task: str = task greenback.await_( _pause( debug_func=None, repl=mdb, + hide_tb=hide_tb, + called_from_sync=True, **_pause_kwargs, ) ) - DebugStatus.repl_task: str = current_task() + + if is_root_process(): + # Manually acquire since otherwise on release we'll + # get a RTE raised by `trio` due to ownership.. + Lock._debug_lock.acquire_nowait() + Lock._owned_by_root = True # TODO: ensure we aggressively make the user aware about # entering the global ``breakpoint()`` built-in from sync