From b22f7dcae042dae0a9d068021a76f2c818489d7d Mon Sep 17 00:00:00 2001 From: Tyler Goodlet Date: Tue, 21 May 2024 09:19:56 -0400 Subject: [PATCH] Resolve remaining debug-request race causing hangs More or less by pedantically separating and managing root and subactor request syncing events to always be managed by the locking IPC context task-funcs: - for the root's "child"-side, `lock_tty_for_child()` directly creates and sets a new `Lock.req_handler_finished` inside a `finally:` - for the sub's "parent"-side, `request_root_stdio_lock()` does the same with a new `DebugStatus.req_finished` event and separates it from the `.repl_release` event (which indicates a "c" or "q" from user and thus exit of the REPL session) as well as sets a new `.req_task: trio.Task` to explicitly distinguish from the app-user-task that enters the REPL vs. the paired bg task used to request the global root's stdio mutex alongside it. - apply the `__pld_spec__` on "child"-side of the ctx using the new `Portal.open_context(pld_spec)` parameter support; drops use of any `ContextVar` malarky used prior for `PldRx` mgmt. - removing `Lock.no_remote_has_tty` since it was a nebulous name and from the prior "everything is in a `Lock`" design.. ------ - ------ More rigorous impl to handle various edge cases in `._pause()`: - rejig `_enter_repl_sync()` to wrap the `debug_func == None` case inside maybe-internal-error handler blocks. - better logic for recurrent vs. multi-task contention for REPL entry in subactors, by guarding using `DebugStatus.req_task` and by now waiting on the new `DebugStatus.req_finished` for the multi-task contention case. - even better internal error handling and reporting for when this code is hacked on and possibly broken ;p ------ - ------ Updates to `.pause_from_sync()` support: - add optional `actor`, `task` kwargs to `_set_trace()` to allow compat with the new explicit `debug_func` calling in `._pause()` and pass a `threading.Thread` for `task` in the `.to_thread()` usage case. - add an `except` block that tries to show the frame on any internal error. ------ - ------ Relatedly includes a buncha cleanups/simplifications somewhat in prep for some coming refinements (around `DebugStatus`): - use all the new attrs mentioned above as needed in the SIGINT shielder. - wait on `Lock.req_handler_finished` in `maybe_wait_for_debugger()`. - dropping a ton of masked legacy code left in during the recent reworks. - better comments, like on the use of `Context._scope` for shielding on the "child"-side to avoid the need to manage yet another cs. - add/change-to lotsa `log.devx()` level emissions for those infos which are handy while hacking on the debugger but not ideal/necessary to be user visible. - obvi add lotsa follow up todo notes! --- tractor/devx/_debug.py | 824 ++++++++++++++++++++++------------------- 1 file changed, 446 insertions(+), 378 deletions(-) diff --git a/tractor/devx/_debug.py b/tractor/devx/_debug.py index 1e82122..877d2de 100644 --- a/tractor/devx/_debug.py +++ b/tractor/devx/_debug.py @@ -73,10 +73,10 @@ from tractor._state import ( debug_mode, current_ipc_ctx, ) -from .pformat import ( - # pformat_caller_frame, - pformat_cs, -) +# from .pformat import ( +# pformat_caller_frame, +# pformat_cs, +# ) if TYPE_CHECKING: from tractor._ipc import Channel @@ -190,8 +190,8 @@ class Lock: # a stale lock condition (eg. IPC failure with the locking # child ctx_in_debug: Context|None = None + req_handler_finished: trio.Event|None = None - no_remote_has_tty: trio.Event|None = None _debug_lock: trio.StrictFIFOLock = trio.StrictFIFOLock() _blocked: set[ tuple[str, str] # `Actor.uid` for per actor @@ -209,13 +209,12 @@ class Lock: if is_root_process(): lock_stats: trio.LockStatistics = cls._debug_lock.statistics() fields += ( - f'no_remote_has_tty: {cls.no_remote_has_tty}\n' + f'req_handler_finished: {cls.req_handler_finished}\n' + f'_blocked: {cls._blocked}\n\n' - - f'ctx_in_debug: {cls.ctx_in_debug}\n\n' - f'_debug_lock: {cls._debug_lock}\n' f'lock_stats: {lock_stats}\n' + ) body: str = textwrap.indent( @@ -225,7 +224,9 @@ class Lock: return ( f'<{cls.__name__}(\n' f'{body}' - ')>' + ')>\n\n' + + f'{cls.ctx_in_debug}\n' ) @classmethod @@ -234,16 +235,23 @@ class Lock: cls, force: bool = False, ): + message: str = 'TTY lock not held by any child\n' + + if not (is_trio_main := DebugStatus.is_main_trio_thread()): + task: threading.Thread = threading.current_thread() + else: + task: trio.Task = current_task() + try: lock: trio.StrictFIFOLock = cls._debug_lock owner: Task = lock.statistics().owner if ( lock.locked() and - owner is current_task() + owner is task # ^-NOTE-^ if not will raise a RTE.. ): - if not DebugStatus.is_main_trio_thread(): + if not is_trio_main: trio.from_thread.run_sync( cls._debug_lock.release ) @@ -251,45 +259,27 @@ class Lock: cls._debug_lock.release() message: str = 'TTY lock released for child\n' - else: - message: str = 'TTY lock not held by any child\n' - finally: # IFF there are no more requesting tasks queued up fire, the # "tty-unlocked" event thereby alerting any monitors of the lock that # we are now back in the "tty unlocked" state. This is basically # and edge triggered signal around an empty queue of sub-actor # tasks that may have tried to acquire the lock. - stats = cls._debug_lock.statistics() + lock_stats = cls._debug_lock.statistics() + req_handler_finished: trio.Event|None = Lock.req_handler_finished if ( - not stats.owner + not lock_stats.owner or force - # and cls.no_remote_has_tty is not None + and req_handler_finished is None ): message += '-> No more child ctx tasks hold the TTY lock!\n' - # set and release - if cls.no_remote_has_tty is not None: - cls.no_remote_has_tty.set() - cls.no_remote_has_tty = None - - # cls.remote_task_in_debug = None - - else: - message += ( - f'-> Not signalling `Lock.no_remote_has_tty` since it has value:{cls.no_remote_has_tty}\n' - ) - - else: - # wakeup any waiters since the lock was released - # (presumably) temporarily. - if no_remote_has_tty := cls.no_remote_has_tty: - no_remote_has_tty.set() - no_remote_has_tty = trio.Event() - + elif req_handler_finished: + req_stats = req_handler_finished.statistics() message += ( f'-> A child ctx task still owns the `Lock` ??\n' - f' |_owner task: {stats.owner}\n' + f' |_lock_stats: {lock_stats}\n' + f' |_req_stats: {req_stats}\n' ) cls.ctx_in_debug = None @@ -299,8 +289,6 @@ class Lock: async def acquire( cls, ctx: Context, - # subactor_uid: tuple[str, str], - # remote_task_uid: str, ) -> AsyncIterator[trio.StrictFIFOLock]: ''' @@ -328,7 +316,6 @@ class Lock: ) stats = cls._debug_lock.statistics() if owner := stats.owner: - # and cls.no_remote_has_tty is not None pre_msg += ( f'\n' f'`Lock` already held by local task?\n' @@ -347,12 +334,6 @@ class Lock: await cls._debug_lock.acquire() cls.ctx_in_debug = ctx we_acquired = True - if cls.no_remote_has_tty is None: - # mark the tty lock as being in use so that the runtime - # can try to avoid clobbering any connection from a child - # that's currently relying on it. - cls.no_remote_has_tty = trio.Event() - # cls.remote_task_in_debug = remote_task_uid log.runtime( f'TTY lock acquired for sub-actor\n' @@ -373,11 +354,7 @@ class Lock: finally: message :str = 'Exiting `Lock.acquire()` on behalf of sub-actor\n' - if ( - we_acquired - # and - # cls._debug_lock.locked() - ): + if we_acquired: message += '-> TTY lock released by child\n' cls.release() @@ -392,7 +369,6 @@ class Lock: @tractor.context async def lock_tty_for_child( - ctx: Context, subactor_task_uid: tuple[str, int], @@ -409,13 +385,11 @@ async def lock_tty_for_child( ''' subactor_uid: tuple[str, str] = ctx.chan.uid - # NOTE: we use the IPC ctx's cancel scope directly in order to - # ensure that on any transport failure, or cancellation request - # from the child we expect - # `Context._maybe_cancel_and_set_remote_error()` to cancel this - # scope despite the shielding we apply below. - debug_lock_cs: CancelScope = ctx._scope + # mark the tty lock as being in use so that the runtime + # can try to avoid clobbering any connection from a child + # that's currently relying on it. + we_finished = Lock.req_handler_finished = trio.Event() try: if ctx.cid in Lock._blocked: raise RuntimeError( @@ -437,18 +411,15 @@ async def lock_tty_for_child( f'remote task: {subactor_task_uid}\n' ) ctx._enter_debugger_on_cancel: bool = False - await ctx.cancel(f'Debug lock blocked for {subactor_uid}') - # TODO: remove right? - # return LockStatus( - # subactor_uid=subactor_uid, - # cid=ctx.cid, - # locked=False, - # ) + message: str = ( + f'Debug lock blocked for {subactor_uid}\n' + 'Cancelling debug request!\n' + ) + log.cancel(message) + await ctx.cancel() + raise DebugRequestError(message) - # TODO: when we get to true remote debugging - # this will deliver stdin data? - - log.debug( + log.devx( 'Subactor attempting to acquire TTY lock\n' f'root task: {root_task_name}\n' f'subactor_uid: {subactor_uid}\n' @@ -456,13 +427,33 @@ async def lock_tty_for_child( ) DebugStatus.shield_sigint() Lock._blocked.add(ctx.cid) - with ( - # enable the locking msgspec - apply_debug_pldec(), - ): + + # NOTE: we use the IPC ctx's cancel scope directly in order to + # ensure that on any transport failure, or cancellation request + # from the child we expect + # `Context._maybe_cancel_and_set_remote_error()` to cancel this + # scope despite the shielding we apply below. + debug_lock_cs: CancelScope = ctx._scope + + # TODO: use `.msg._ops.maybe_limit_plds()` here instead so we + # can merge into a single async with, with the + # `Lock.acquire()` enter below? + # + # enable the locking msgspec + with apply_debug_pldec(): async with Lock.acquire(ctx=ctx): debug_lock_cs.shield = True + log.devx( + 'Subactor acquired debugger request lock!\n' + f'root task: {root_task_name}\n' + f'subactor_uid: {subactor_uid}\n' + f'remote task: {subactor_task_uid}\n\n' + + 'Sending `ctx.started(LockStatus)`..\n' + + ) + # indicate to child that we've locked stdio await ctx.started( LockStatus( @@ -472,7 +463,9 @@ async def lock_tty_for_child( ) ) - log.debug( f'Actor {subactor_uid} acquired TTY lock') + log.devx( + f'Actor {subactor_uid} acquired `Lock` via debugger request' + ) # wait for unlock pdb by child async with ctx.open_stream() as stream: @@ -480,14 +473,16 @@ async def lock_tty_for_child( # TODO: security around only releasing if # these match? - log.pdb( + log.devx( f'TTY lock released requested\n\n' f'{release_msg}\n' ) assert release_msg.cid == ctx.cid assert release_msg.subactor_uid == tuple(subactor_uid) - log.debug(f'Actor {subactor_uid} released TTY lock') + log.devx( + f'Actor {subactor_uid} released TTY lock' + ) return LockStatus( subactor_uid=subactor_uid, @@ -497,29 +492,33 @@ async def lock_tty_for_child( except BaseException as req_err: message: str = ( - 'Forcing `Lock.release()` since likely an internal error!\n' + 'Forcing `Lock.release()` for req-ctx since likely an ' + 'internal error!\n\n' + f'{ctx}' ) if isinstance(req_err, trio.Cancelled): - log.cancel( + message = ( 'Cancelled during root TTY-lock dialog?\n' + message ) else: - log.exception( + message = ( 'Errored during root TTY-lock dialog?\n' + message ) + log.exception(message) Lock.release(force=True) raise finally: Lock._blocked.remove(ctx.cid) - if (no_locker := Lock.no_remote_has_tty): - no_locker.set() + # wakeup any waiters since the lock was (presumably) + # released, possibly only temporarily. + we_finished.set() DebugStatus.unshield_sigint() @@ -538,14 +537,23 @@ class DebugStatus: ''' repl: PdbREPL|None = None + + # TODO: yet again this looks like a task outcome where we need + # to sync to the completion of one task (and get its result) + # being used everywhere for syncing.. + # -[ ] see if we can get our proto oco task-mngr to work for + # this? repl_task: Task|None = None + repl_release: trio.Event|None = None + + req_task: Task|None = None req_ctx: Context|None = None req_cs: CancelScope|None = None - repl_release: trio.Event|None = None req_finished: trio.Event|None = None - lock_status: LockStatus|None = None req_err: BaseException|None = None + lock_status: LockStatus|None = None + _orig_sigint_handler: Callable|None = None _trio_handler: ( Callable[[int, FrameType|None], Any] @@ -715,13 +723,13 @@ class DebugStatus: f'{cls.repl_task}\n' ) - # restore original sigint handler - cls.unshield_sigint() - # actor-local state, irrelevant for non-root. cls.repl_task = None cls.repl = None + # restore original sigint handler + cls.unshield_sigint() + class TractorConfig(pdbp.DefaultConfig): ''' @@ -814,17 +822,6 @@ class PdbREPL(pdbp.Pdb): ): Lock.release() - # TODO: special handling where we just want the next LOC and - # not to resume to the next pause/crash point? - # def set_next( - # self, - # frame: FrameType - # ) -> None: - # try: - # super().set_next(frame) - # finally: - # pdbp.set_trace() - # XXX NOTE: we only override this because apparently the stdlib pdb # bois likes to touch the SIGINT handler as much as i like to touch # my d$%&. @@ -855,6 +852,9 @@ class PdbREPL(pdbp.Pdb): return None +# TODO: prolly remove this and instead finally get our @context API +# supporting a msg/pld-spec via type annots as per, +# https://github.com/goodboy/tractor/issues/365 @cm def apply_debug_pldec() -> _codec.MsgCodec: ''' @@ -865,8 +865,9 @@ def apply_debug_pldec() -> _codec.MsgCodec: from tractor.msg import ( _ops as msgops, ) - orig_plrx: msgops.PldRx = msgops.current_pldrx() - orig_pldec: msgops.MsgDec = orig_plrx.pld_dec + cctx: Context = current_ipc_ctx() + rx: msgops.PldRx = cctx.pld_rx + orig_pldec: msgops.MsgDec = rx.pld_dec try: with msgops.limit_plds( @@ -875,9 +876,9 @@ def apply_debug_pldec() -> _codec.MsgCodec: assert ( debug_dec is - msgops.current_pldrx().pld_dec + rx.pld_dec ) - log.info( + log.runtime( 'Applied `.devx._debug` pld-spec\n\n' f'{debug_dec}\n' ) @@ -885,11 +886,9 @@ def apply_debug_pldec() -> _codec.MsgCodec: finally: assert ( - (plrx := msgops.current_pldrx()) is orig_plrx - and - plrx.pld_dec is orig_pldec + rx.pld_dec is orig_pldec ) - log.info( + log.runtime( 'Reverted to previous pld-spec\n\n' f'{orig_pldec}\n' ) @@ -898,7 +897,9 @@ def apply_debug_pldec() -> _codec.MsgCodec: async def request_root_stdio_lock( actor_uid: tuple[str, str], task_uid: tuple[str, int], - task_status: TaskStatus[CancelScope] = trio.TASK_STATUS_IGNORED + + shield: bool = False, + task_status: TaskStatus[CancelScope] = trio.TASK_STATUS_IGNORED, ): ''' Connect to the root actor of this process tree and RPC-invoke @@ -915,7 +916,7 @@ async def request_root_stdio_lock( ''' - log.pdb( + log.devx( 'Initing stdio-lock request task with root actor' ) # TODO: likely we can implement this mutex more generally as @@ -928,40 +929,22 @@ async def request_root_stdio_lock( # -[ ] technically we need a `RLock` since re-acquire should be a noop # - https://docs.python.org/3.8/library/multiprocessing.html#multiprocessing.RLock DebugStatus.req_finished = trio.Event() + DebugStatus.req_task = current_task() try: from tractor._discovery import get_root - from tractor.msg import _ops as msgops - debug_dec: msgops.MsgDec - with ( - # NOTE: we need this to ensure that this task exits - # BEFORE the REPl instance raises an error like - # `bdb.BdbQuit` directly, OW you get a trio cs stack - # corruption! - # Further, the since this task is spawned inside the - # `Context._scope_nursery: trio.Nursery`, once an RPC - # task errors that cs is cancel_called and so if we want - # to debug the TPC task that failed we need to shield - # against that expected `.cancel()` call and instead - # expect all of the `PdbREPL`.set_[continue/quit/]()` - # methods to unblock this task by setting the - # `.repl_release: # trio.Event`. - trio.CancelScope(shield=True) as req_cs, - - # NOTE: set it here in the locker request task bc it's - # possible for multiple such requests for the lock in any - # single sub-actor AND there will be a race between when the - # root locking task delivers the `Started(pld=LockStatus)` - # and when the REPL is actually entered by the requesting - # application task who called - # `.pause()`/`.post_mortem()`. - # - # SO, applying the pld-spec here means it is only applied to - # this IPC-ctx request task, NOT any other task(s) - # including the one that actually enters the REPL. This - # is oc desired bc ow the debugged task will msg-type-error. - # - apply_debug_pldec() as debug_dec, - ): + # NOTE: we need this to ensure that this task exits + # BEFORE the REPl instance raises an error like + # `bdb.BdbQuit` directly, OW you get a trio cs stack + # corruption! + # Further, the since this task is spawned inside the + # `Context._scope_nursery: trio.Nursery`, once an RPC + # task errors that cs is cancel_called and so if we want + # to debug the TPC task that failed we need to shield + # against that expected `.cancel()` call and instead + # expect all of the `PdbREPL`.set_[continue/quit/]()` + # methods to unblock this task by setting the + # `.repl_release: # trio.Event`. + with trio.CancelScope(shield=shield) as req_cs: # XXX: was orig for debugging cs stack corruption.. # log.info( # 'Request cancel-scope is:\n\n' @@ -972,46 +955,49 @@ async def request_root_stdio_lock( try: # TODO: merge into single async with ? async with get_root() as portal: - async with portal.open_context( lock_tty_for_child, subactor_task_uid=task_uid, + # NOTE: set it here in the locker request task bc it's + # possible for multiple such requests for the lock in any + # single sub-actor AND there will be a race between when the + # root locking task delivers the `Started(pld=LockStatus)` + # and when the REPL is actually entered by the requesting + # application task who called + # `.pause()`/`.post_mortem()`. + # + # SO, applying the pld-spec here means it is only applied to + # this IPC-ctx request task, NOT any other task(s) + # including the one that actually enters the REPL. This + # is oc desired bc ow the debugged task will msg-type-error. + pld_spec=__pld_spec__, + ) as (req_ctx, status): DebugStatus.req_ctx = req_ctx - - # sanity checks on pld-spec limit state - assert debug_dec - # curr_pldrx: msgops.PldRx = msgops.current_pldrx() - # assert ( - # curr_pldrx.pld_dec is debug_dec - # ) - - log.debug( + log.devx( 'Subactor locked TTY with msg\n\n' f'{status}\n' ) - # mk_pdb().set_trace() - try: - assert status.subactor_uid == actor_uid - assert status.cid - except AttributeError: - log.exception('failed pldspec asserts!') - raise + # try: + assert status.subactor_uid == actor_uid + assert status.cid + # except AttributeError: + # log.exception('failed pldspec asserts!') + # mk_pdb().set_trace() + # raise # set last rxed lock dialog status. DebugStatus.lock_status = status async with req_ctx.open_stream() as stream: - - assert DebugStatus.repl_release task_status.started(req_ctx) - # wait for local task to exit its - # `PdbREPL.interaction()`, call - # `DebugStatus.release()` and then - # unblock here. + # wait for local task to exit + # `PdbREPL.interaction()`, normally via + # a `DebugStatus.release()`call, and + # then unblock us here. await DebugStatus.repl_release.wait() await stream.send( LockRelease( @@ -1026,10 +1012,10 @@ async def request_root_stdio_lock( assert not status.locked DebugStatus.lock_status = status - log.pdb( + log.devx( 'TTY lock was released for subactor with msg\n\n' f'{status}\n\n' - f'Exitting {req_ctx.side!r}-side of locking req_ctx' + f'Exitting {req_ctx.side!r}-side of locking req_ctx\n' ) except ( @@ -1081,13 +1067,14 @@ async def request_root_stdio_lock( # ctl-c out of the currently hanging task! raise DebugRequestError( 'Failed to lock stdio from subactor IPC ctx!\n\n' - f'req_ctx: {req_ctx}\n' + f'req_ctx: {DebugStatus.req_ctx}\n' ) from req_err finally: - log.debug('Exiting debugger TTY lock request func from child') + log.devx('Exiting debugger TTY lock request func from child') # signal request task exit DebugStatus.req_finished.set() + DebugStatus.req_task = None def mk_pdb() -> PdbREPL: @@ -1321,31 +1308,40 @@ def shield_sigint_handler( DebugStatus.unshield_sigint() # do_cancel() - task: str|None = DebugStatus.repl_task + repl_task: str|None = DebugStatus.repl_task + req_task: str|None = DebugStatus.req_task if ( - task + repl_task and repl ): log.pdb( f'Ignoring SIGINT while local task using debug REPL\n' - f'|_{task}\n' + f'|_{repl_task}\n' f' |_{repl}\n' ) + elif req_task: + log.pdb( + f'Ignoring SIGINT while debug request task is open\n' + f'|_{req_task}\n' + ) else: msg: str = ( 'SIGINT shield handler still active BUT, \n\n' ) - if task is None: + if repl_task is None: msg += ( - f'- No local task claims to be in debug?\n' - f' |_{task}\n\n' + '- No local task claims to be in debug?\n' ) if repl is None: msg += ( - f'- No local REPL is currently active?\n' - f' |_{repl}\n\n' + '- No local REPL is currently active?\n' + ) + + if req_task is None: + msg += ( + '- No debug request task is active?\n' ) log.warning( @@ -1358,7 +1354,6 @@ def shield_sigint_handler( # XXX ensure that the reverted-to-handler actually is # able to rx what should have been **this** KBI ;) do_cancel() - # raise KeyboardInterrupt # TODO: how to handle the case of an intermediary-child actor # that **is not** marked in debug mode? See oustanding issue: @@ -1392,7 +1387,7 @@ def shield_sigint_handler( # https://github.com/prompt-toolkit/python-prompt-toolkit/blob/c2c6af8a0308f9e5d7c0e28cb8a02963fe0ce07a/prompt_toolkit/patch_stdout.py # XXX only for tracing this handler - # log.warning('exiting SIGINT') + log.devx('exiting SIGINT') _pause_msg: str = 'Attaching to pdb REPL in actor' @@ -1420,14 +1415,9 @@ async def _pause( # is always show in the debugger on entry.. and there seems to # be no way to override it?.. # - # shield: bool = False, - hide_tb: bool = True, - - # bc, `debug_func()`, `_enter_repl_sync()` and `_pause()` - # extra_frames_up_when_async: int = 3, - + shield: bool = False, + hide_tb: bool = False, task_status: TaskStatus[trio.Event] = trio.TASK_STATUS_IGNORED, - **debug_func_kwargs, ) -> None: @@ -1452,6 +1442,87 @@ async def _pause( 'for infected `asyncio` mode!' ) from rte + if debug_func is not None: + debug_func = partial(debug_func) + + repl: PdbREPL = repl or mk_pdb() + + # XXX NOTE XXX set it here to avoid ctl-c from cancelling a debug + # request from a subactor BEFORE the REPL is entered by that + # process. + DebugStatus.shield_sigint() + + # TODO: move this into a `open_debug_request()` @acm? + # -[ ] prolly makes the most sense to do the request + # task spawn as part of an `@acm` api which delivers the + # `DebugRequest` instance and ensures encapsing all the + # pld-spec and debug-nursery? + # -[ ] maybe make this a `PdbREPL` method or mod func? + # -[ ] factor out better, main reason for it is common logic for + # both root and sub repl entry + def _enter_repl_sync( + debug_func: Callable, + ) -> None: + __tracebackhide__: bool = hide_tb + + try: + # set local actor task to avoid recurrent + # entries/requests from the same local task (to the root + # process). + DebugStatus.repl_task = task + DebugStatus.repl = repl + + # TODO: do we want to support using this **just** for the + # locking / common code (prolly to help address #320)? + if debug_func is None: + task_status.started(DebugStatus) + + else: + log.warning( + 'Entering REPL for task fuck you!\n' + f'{task}\n' + ) + # block here one (at the appropriate frame *up*) where + # ``breakpoint()`` was awaited and begin handling stdio. + log.devx( + 'Entering sync world of the `pdb` REPL for task..\n' + f'{repl}\n' + f' |_{task}\n' + ) + + # invoke the low-level REPL activation routine which itself + # should call into a `Pdb.set_trace()` of some sort. + debug_func( + repl=repl, + hide_tb=hide_tb, + **debug_func_kwargs, + ) + + except trio.Cancelled: + log.exception( + 'Cancelled during invoke of internal `debug_func = ' + f'{debug_func.func.__name__}`\n' + ) + # XXX NOTE: DON'T release lock yet + raise + + except BaseException: + __tracebackhide__: bool = False + log.exception( + 'Failed to invoke internal `debug_func = ' + f'{debug_func.func.__name__}`\n' + ) + # NOTE: OW this is ONLY called from the + # `.set_continue/next` hooks! + DebugStatus.release(cancel_req_task=True) + + raise + + log.devx( + 'Entering `._pause()` for requesting task\n' + f'|_{task}\n' + ) + # TODO: this should be created as part of `DebugRequest()` init # which should instead be a one-shot-use singleton much like # the `PdbREPL`. @@ -1461,71 +1532,9 @@ async def _pause( DebugStatus.repl_release.is_set() ): DebugStatus.repl_release = trio.Event() - - if debug_func is not None: - debug_func = partial(debug_func) - - repl: PdbREPL = repl or mk_pdb() - - # TODO: maybe make this a `PdbREPL` method or mod func? - # -[ ] factor out better, main reason for it is common logic for - # both root and sub repl entry - def _enter_repl_sync( - debug_func: Callable, - ) -> None: - __tracebackhide__: bool = hide_tb - - # TODO: do we want to support using this **just** for the - # locking / common code (prolly to help address #320)? - # - if debug_func is None: - task_status.started(DebugStatus) - else: - # block here one (at the appropriate frame *up*) where - # ``breakpoint()`` was awaited and begin handling stdio. - log.debug('Entering sync world of the `pdb` REPL..') - - # XXX used by the SIGINT handler to check if - # THIS actor is in REPL interaction - try: - # TODO: move this into a `open_debug_request()` @acm? - # -[ ] prolly makes the most send to do the request - # task spawn as part of an `@acm` api which - # delivers the `DebugRequest` instance and ensures - # encapsing all the pld-spec and debug-nursery? - # - # set local actor task to avoid recurrent - # entries/requests from the same local task - # (to the root process). - DebugStatus.repl_task = task - DebugStatus.repl = repl - DebugStatus.shield_sigint() - - # enter `PdbREPL` specific method - debug_func( - repl=repl, - hide_tb=hide_tb, - **debug_func_kwargs, - ) - except trio.Cancelled: - log.exception( - 'Cancelled during invoke of internal `debug_func = ' - f'{debug_func.func.__name__}`\n' - ) - # NOTE: DON'T release lock yet - raise - - except BaseException: - __tracebackhide__: bool = False - log.exception( - 'Failed to invoke internal `debug_func = ' - f'{debug_func.func.__name__}`\n' - ) - # NOTE: OW this is ONLY called from the - # `.set_continue/next` hooks! - DebugStatus.release(cancel_req_task=True) - - raise + # ^-NOTE-^ this must be created BEFORE scheduling any subactor + # debug-req task since it needs to wait on it just after + # `.started()`-ing back its wrapping `.req_cs: CancelScope`. repl_err: BaseException|None = None try: @@ -1579,38 +1588,61 @@ async def _pause( not is_root_process() and actor._parent_chan # a connected child ): - if DebugStatus.repl_task: + repl_task: Task|None = DebugStatus.repl_task + req_task: Task|None = DebugStatus.req_task + if req_task: + log.warning( + f'Already an ongoing repl request?\n' + f'|_{req_task}\n\n' - # Recurrence entry case: this task already has the lock and - # is likely recurrently entering a breakpoint + f'REPL task is\n' + f'|_{repl_task}\n\n' + + ) + # Recurrent entry case. + # this task already has the lock and is likely + # recurrently entering a `.pause()`-point either bc, + # - someone is hacking on runtime internals and put + # one inside code that get's called on the way to + # this code, + # - a legit app task uses the 'next' command while in + # a REPL sesh, and actually enters another + # `.pause()` (in a loop or something). # - # NOTE: noop on recurrent entry case but we want to trigger - # a checkpoint to allow other actors error-propagate and - # potetially avoid infinite re-entries in some - # subactor that would otherwise not bubble until the - # next checkpoint was hit. + # XXX Any other cose is likely a bug. if ( - (repl_task := DebugStatus.repl_task) - and - repl_task is task + repl_task + ): + if repl_task is task: + log.warning( + f'{task.name}@{actor.uid} already has TTY lock\n' + f'ignoring..' + ) + await trio.lowlevel.checkpoint() + return + + else: + # if **this** actor is already in debug REPL we want + # to maintain actor-local-task mutex access, so block + # here waiting for the control to be released - this + # -> allows for recursive entries to `tractor.pause()` + log.warning( + f'{task}@{actor.uid} already has TTY lock\n' + f'waiting for release..' + ) + await DebugStatus.repl_release.wait() + await trio.sleep(0.1) + + elif ( + req_task ): log.warning( - f'{task.name}@{actor.uid} already has TTY lock\n' - f'ignoring..' - ) - await trio.lowlevel.checkpoint() - return + 'Local task already has active debug request\n' + f'|_{task}\n\n' - # if **this** actor is already in debug REPL we want - # to maintain actor-local-task mutex access, so block - # here waiting for the control to be released - this - # -> allows for recursive entries to `tractor.pause()` - log.warning( - f'{task.name}@{actor.uid} already has TTY lock\n' - f'waiting for release..' - ) - await DebugStatus.repl_release.wait() - await trio.sleep(0.1) + 'Waiting for previous request to complete..\n' + ) + await DebugStatus.req_finished.wait() # this **must** be awaited by the caller and is done using the # root nursery so that the debugger can continue to run without @@ -1642,16 +1674,23 @@ async def _pause( # -[ ] we probably only need to allocate the nursery when # we detect the runtime is already in debug mode. # - # ctx: Context = await curr_ctx._debug_tn.start( + curr_ctx: Context = current_ipc_ctx() + # req_ctx: Context = await curr_ctx._debug_tn.start( + log.devx( + 'Starting request task\n' + f'|_{task}\n' + ) req_ctx: Context = await actor._service_n.start( - request_root_stdio_lock, - actor.uid, - (task.name, id(task)), # task uuid (effectively) + partial( + request_root_stdio_lock, + actor_uid=actor.uid, + task_uid=(task.name, id(task)), # task uuid (effectively) + shield=shield, + ) ) # XXX sanity, our locker task should be the one which # entered a new IPC ctx with the root actor, NOT the one # that exists around the task calling into `._pause()`. - curr_ctx: Context = current_ipc_ctx() assert ( req_ctx is @@ -1665,8 +1704,8 @@ async def _pause( # TODO: prolly factor this plus the similar block from # `_enter_repl_sync()` into a common @cm? - except BaseException as repl_err: - if isinstance(repl_err, bdb.BdbQuit): + except BaseException as pause_err: + if isinstance(pause_err, bdb.BdbQuit): log.devx( 'REPL for pdb was quit!\n' ) @@ -1675,7 +1714,7 @@ async def _pause( # `Actor._service_n` might get closed before we can spawn # the request task, so just ignore expected RTE. elif ( - isinstance(repl_err, RuntimeError) + isinstance(pause_err, RuntimeError) and actor._cancel_called ): @@ -1698,13 +1737,22 @@ async def _pause( # sanity checks for ^ on request/status teardown assert DebugStatus.repl is None assert DebugStatus.repl_task is None - req_ctx: Context = DebugStatus.req_ctx - if req_ctx: - assert req_ctx._scope.cancel_called + + # sanity, for when hackin on all this? + if not isinstance(pause_err, trio.Cancelled): + req_ctx: Context = DebugStatus.req_ctx + if req_ctx: + # XXX, bc the child-task in root might cancel it? + # assert req_ctx._scope.cancel_called + assert req_ctx.maybe_error raise finally: + # set in finally block of func.. this can be synced-to + # eventually with a debug_nursery somehow? + # assert DebugStatus.req_task is None + # always show frame when request fails due to internal # failure in the above code (including an `BdbQuit`). if ( @@ -1721,9 +1769,15 @@ def _set_trace( # partial-ed in by `.pause()` api_frame: FrameType, + + # optionally passed in to provide support for + # `pause_from_sync()` where + actor: tractor.Actor|None = None, + task: trio.Task|None = None, ): __tracebackhide__: bool = hide_tb - actor: tractor.Actor = current_actor() + actor: tractor.Actor = actor or current_actor() + task: task or current_task() # else: # TODO: maybe print the actor supervion tree up to the @@ -1731,8 +1785,10 @@ def _set_trace( log.pdb( f'{_pause_msg}\n' '|\n' - # TODO: make an `Actor.__repr()__` - f'|_ {current_task()} @ {actor.uid}\n' + # TODO: more compact pformating? + # -[ ] make an `Actor.__repr()__` + # -[ ] should we use `log.pformat_task_uid()`? + f'|_ {task} @ {actor.uid}\n' ) # presuming the caller passed in the "api frame" # (the last frame before user code - like `.pause()`) @@ -1747,7 +1803,7 @@ def _set_trace( async def pause( *, - hide_tb: bool = True, + hide_tb: bool = False, api_frame: FrameType|None = None, # TODO: figure out how to still make this work: @@ -1798,8 +1854,7 @@ async def pause( _set_trace, api_frame=api_frame, ), - - # task_status=task_status, + shield=shield, **_pause_kwargs ) # XXX avoid cs stack corruption when `PdbREPL.interaction()` @@ -1867,88 +1922,97 @@ async def maybe_init_greenback( # normally by remapping python's builtin breakpoint() hook to this # runtime aware version which takes care of all . def pause_from_sync( + hide_tb: bool = False, + # proxied to `_pause()` + + **_pause_kwargs, + # for eg. + # shield: bool = False, + # api_frame: FrameType|None = None, + ) -> None: __tracebackhide__: bool = hide_tb - actor: tractor.Actor = current_actor( - err_on_no_runtime=False, - ) - log.debug( - f'{actor.uid}: JUST ENTERED `tractor.pause_from_sync()`' - f'|_{actor}\n' - ) - if not actor: - raise RuntimeError( - 'Not inside the `tractor`-runtime?\n' - '`tractor.pause_from_sync()` is not functional without a wrapping\n' - '- `async with tractor.open_nursery()` or,\n' - '- `async with tractor.open_root_actor()`\n' + try: + actor: tractor.Actor = current_actor( + err_on_no_runtime=False, ) - - # NOTE: once supported, remove this AND the one - # inside `._pause()`! - if actor.is_infected_aio(): - raise RuntimeError( - '`tractor.pause[_from_sync]()` not yet supported ' - 'for infected `asyncio` mode!' + log.debug( + f'{actor.uid}: JUST ENTERED `tractor.pause_from_sync()`' + f'|_{actor}\n' ) - - # raises on not-found by default - greenback: ModuleType = maybe_import_greenback() - mdb: PdbREPL = mk_pdb() - - # run async task which will lock out the root proc's TTY. - if not Lock.is_main_trio_thread(): - - # TODO: we could also check for a non-`.to_thread` context - # using `trio.from_thread.check_cancelled()` (says - # oremanj) wherein we get the following outputs: - # - # `RuntimeError`: non-`.to_thread` spawned thread - # noop: non-cancelled `.to_thread` - # `trio.Cancelled`: cancelled `.to_thread` - # - trio.from_thread.run( - partial( - pause, - debug_func=None, - pdb=mdb, - hide_tb=hide_tb, + if not actor: + raise RuntimeError( + 'Not inside the `tractor`-runtime?\n' + '`tractor.pause_from_sync()` is not functional without a wrapping\n' + '- `async with tractor.open_nursery()` or,\n' + '- `async with tractor.open_root_actor()`\n' ) - ) - # TODO: maybe the `trio.current_task()` id/name if avail? - DebugStatus.repl_task: str = str(threading.current_thread()) - else: # we are presumably the `trio.run()` + main thread - greenback.await_( - pause( - debug_func=None, - pdb=mdb, - hide_tb=hide_tb, + # NOTE: once supported, remove this AND the one + # inside `._pause()`! + if actor.is_infected_aio(): + raise RuntimeError( + '`tractor.pause[_from_sync]()` not yet supported ' + 'for infected `asyncio` mode!' ) + + # raises on not-found by default + greenback: ModuleType = maybe_import_greenback() + mdb: PdbREPL = mk_pdb() + + # run async task which will lock out the root proc's TTY. + if not DebugStatus.is_main_trio_thread(): + + # TODO: we could also check for a non-`.to_thread` context + # using `trio.from_thread.check_cancelled()` (says + # oremanj) wherein we get the following outputs: + # + # `RuntimeError`: non-`.to_thread` spawned thread + # noop: non-cancelled `.to_thread` + # `trio.Cancelled`: cancelled `.to_thread` + # + trio.from_thread.run( + partial( + _pause, + debug_func=None, + repl=mdb, + **_pause_kwargs + ), + ) + task: threading.Thread = threading.current_thread() + + else: # we are presumably the `trio.run()` + main thread + task: trio.Task = current_task() + greenback.await_( + _pause( + debug_func=None, + repl=mdb, + **_pause_kwargs, + ) + ) + DebugStatus.repl_task: str = current_task() + + # TODO: ensure we aggressively make the user aware about + # entering the global ``breakpoint()`` built-in from sync + # code? + _set_trace( + api_frame=inspect.currentframe(), + repl=mdb, + hide_tb=hide_tb, + actor=actor, + task=task, ) - DebugStatus.repl_task: str = current_task() - - # TODO: ensure we aggressively make the user aware about - # entering the global ``breakpoint()`` built-in from sync - # code? - _set_trace( - api_frame=inspect.current_frame(), - actor=actor, - pdb=mdb, - hide_tb=hide_tb, - - # TODO? will we ever need it? - # -> the gb._await() won't be affected by cancellation? - # shield=shield, - ) - # LEGACY NOTE on next LOC's frame showing weirdness.. - # - # XXX NOTE XXX no other LOC can be here without it - # showing up in the REPL's last stack frame !?! - # -[ ] tried to use `@pdbp.hideframe` decoration but - # still doesn't work + # LEGACY NOTE on next LOC's frame showing weirdness.. + # + # XXX NOTE XXX no other LOC can be here without it + # showing up in the REPL's last stack frame !?! + # -[ ] tried to use `@pdbp.hideframe` decoration but + # still doesn't work + except BaseException as err: + __tracebackhide__: bool = False + raise err # NOTE prefer a new "pause" semantic since it better describes @@ -2135,6 +2199,7 @@ async def maybe_wait_for_debugger( child_in_debug: bool = False, header_msg: str = '', + _ll: str = 'devx', ) -> bool: # was locked and we polled? @@ -2144,6 +2209,7 @@ async def maybe_wait_for_debugger( ): return False + logmeth: Callable = getattr(log, _ll) msg: str = header_msg if ( @@ -2156,7 +2222,11 @@ async def maybe_wait_for_debugger( # Instead try to wait for pdb to be released before # tearing down. ctx_in_debug: Context|None = Lock.ctx_in_debug - in_debug: tuple[str, str]|None = ctx_in_debug.chan.uid if ctx_in_debug else None + in_debug: tuple[str, str]|None = ( + ctx_in_debug.chan.uid + if ctx_in_debug + else None + ) if in_debug == current_actor().uid: log.debug( msg @@ -2176,7 +2246,7 @@ async def maybe_wait_for_debugger( # XXX => but it doesn't seem to work.. # await trio.testing.wait_all_tasks_blocked(cushion=0) else: - log.debug( + logmeth( msg + 'Root immediately acquired debug TTY LOCK' @@ -2185,13 +2255,13 @@ async def maybe_wait_for_debugger( for istep in range(poll_steps): if ( - Lock.no_remote_has_tty is not None - and not Lock.no_remote_has_tty.is_set() + Lock.req_handler_finished is not None + and not Lock.req_handler_finished.is_set() and in_debug is not None ): # caller_frame_info: str = pformat_caller_frame() - log.debug( + logmeth( msg + '\nRoot is waiting on tty lock to release from\n\n' @@ -2202,7 +2272,7 @@ async def maybe_wait_for_debugger( Lock.get_locking_task_cs().cancel() with trio.CancelScope(shield=True): - await Lock.no_remote_has_tty.wait() + await Lock.req_handler_finished.wait() log.pdb( f'Subactor released debug lock\n' @@ -2214,11 +2284,11 @@ async def maybe_wait_for_debugger( if ( in_debug is None and ( - Lock.no_remote_has_tty is None - or Lock.no_remote_has_tty.is_set() + Lock.req_handler_finished is None + or Lock.req_handler_finished.is_set() ) ): - log.pdb( + logmeth( msg + 'Root acquired tty lock!' @@ -2226,13 +2296,11 @@ async def maybe_wait_for_debugger( break else: - # TODO: don't need this right? - # await trio.lowlevel.checkpoint() - - log.debug( + logmeth( 'Root polling for debug:\n' f'poll step: {istep}\n' - f'poll delya: {poll_delay}' + f'poll delya: {poll_delay}\n\n' + f'{Lock.repr()}\n' ) with CancelScope(shield=True): await trio.sleep(poll_delay)