From 77a15ebf19927c5e8f15a7d93f003a6162f59797 Mon Sep 17 00:00:00 2001 From: Tyler Goodlet Date: Thu, 18 Apr 2024 12:47:28 -0400 Subject: [PATCH] Use `DebugStatus` around subactor lock requests Breaks out all the (sub)actor local conc primitives from `Lock` (which is now only used in and by the root actor) such that there's an explicit distinction between a task that's "consuming" the `Lock` (remotely) vs. the root-side service tasks which do the actual acquire on behalf of the requesters. `DebugStatus` changeover deats: ------ - ------ - move all the actor-local vars over `DebugStatus` including: - move `_trio_handler` and `_orig_sigint_handler` - `local_task_in_debug` now `repl_task` - `_debugger_request_cs` now `req_cs` - `local_pdb_complete` now `repl_release` - drop all ^ fields from `Lock.repr()` obvi.. - move over the `.[un]shield_sigint()` and `.is_main_trio_thread()` methods. - add some new attrs/meths: - `DebugStatus.repl` for the currently running `Pdb` in-actor singleton. - `.repr()` for pprint of state (like `Lock`). - Note: that even when a root-actor task is in REPL, the `DebugStatus` is still used for certain actor-local state mgmt, such as SIGINT handler shielding. - obvi change all lock-requester code bits to now use a `DebugStatus` in their local actor-state instead of `Lock`, i.e. change usage from `Lock` in `._runtime` and `._root`. - use new `Lock.get_locking_task_cs()` API in when checking for sub-in-debug from `._runtime.Actor._stream_handler()`. Unrelated to topic-at-hand tweaks: ------ - ------ - drop the commented bits about hiding `@[a]cm` stack frames from `_debug.pause()` and simplify to only one block with the `shield` passthrough since we already solved the issue with cancel-scopes using `@pdbp.hideframe` B) - this includes all the extra logging about the extra frame for the user (good thing i put in that wasted effort back then eh..) - put the `try/except BaseException` with `log.exception()` around the whole of `._pause()` to ensure we don't miss in-func errors which can cause hangs.. - allow passing in `portal: Portal` to `Actor.start_remote_task()` such that `Portal` task spawning methods are always denoted correctly in terms of `Context.side`. - lotsa logging tweaks, decreasing a bit of noise from `.runtime()`s. --- tractor/_root.py | 2 +- tractor/_runtime.py | 101 ++++---- tractor/devx/_debug.py | 573 +++++++++++++++++++---------------------- 3 files changed, 322 insertions(+), 354 deletions(-) diff --git a/tractor/_root.py b/tractor/_root.py index 1964a06..afe91e7 100644 --- a/tractor/_root.py +++ b/tractor/_root.py @@ -135,7 +135,7 @@ async def open_root_actor( # attempt to retreive ``trio``'s sigint handler and stash it # on our debugger lock state. - _debug.Lock._trio_handler = signal.getsignal(signal.SIGINT) + _debug.DebugStatus._trio_handler = signal.getsignal(signal.SIGINT) # mark top most level process as root actor _state._runtime_vars['_is_root'] = True diff --git a/tractor/_runtime.py b/tractor/_runtime.py index 4d90c59..72866d4 100644 --- a/tractor/_runtime.py +++ b/tractor/_runtime.py @@ -267,10 +267,13 @@ class Actor: self._listeners: list[trio.abc.Listener] = [] self._parent_chan: Channel|None = None self._forkserver_info: tuple|None = None + + # track each child/sub-actor in it's locally + # supervising nursery self._actoruid2nursery: dict[ - tuple[str, str], + tuple[str, str], # sub-`Actor.uid` ActorNursery|None, - ] = {} # type: ignore # noqa + ] = {} # when provided, init the registry addresses property from # input via the validator. @@ -659,12 +662,18 @@ class Actor: # TODO: NEEEDS TO BE TESTED! # actually, no idea if this ever even enters.. XD + # + # XXX => YES IT DOES, when i was testing ctl-c + # from broken debug TTY locking due to + # msg-spec races on application using RunVar... pdb_user_uid: tuple = pdb_lock.global_actor_in_debug if ( pdb_user_uid and local_nursery ): - entry: tuple|None = local_nursery._children.get(pdb_user_uid) + entry: tuple|None = local_nursery._children.get( + tuple(pdb_user_uid) + ) if entry: proc: trio.Process _, proc, _ = entry @@ -674,10 +683,10 @@ class Actor: and poll() is None ): log.cancel( - 'Root actor reports no-more-peers, BUT ' + 'Root actor reports no-more-peers, BUT\n' 'a DISCONNECTED child still has the debug ' - 'lock!\n' - f'root uid: {self.uid}\n' + 'lock!\n\n' + # f'root uid: {self.uid}\n' f'last disconnected child uid: {uid}\n' f'locking child uid: {pdb_user_uid}\n' ) @@ -703,9 +712,8 @@ class Actor: # if a now stale local task has the TTY lock still # we cancel it to allow servicing other requests for # the lock. - db_cs: trio.CancelScope|None = pdb_lock._root_local_task_cs_in_debug if ( - db_cs + (db_cs := pdb_lock.get_locking_task_cs()) and not db_cs.cancel_called and uid == pdb_user_uid ): @@ -742,7 +750,7 @@ class Actor: except KeyError: log.warning( 'Ignoring invalid IPC ctx msg!\n\n' - f'<= sender: {uid}\n' + f'<= sender: {uid}\n\n' # XXX don't need right since it's always in msg? # f'=> cid: {cid}\n\n' @@ -796,7 +804,7 @@ class Actor: cid, # side, )] - log.runtime( + log.debug( f'Retreived cached IPC ctx for\n' f'peer: {chan.uid}\n' f'cid:{cid}\n' @@ -835,10 +843,14 @@ class Actor: nsf: NamespacePath, kwargs: dict, + # determines `Context.side: str` + portal: Portal|None = None, + # IPC channel config msg_buffer_size: int|None = None, allow_overruns: bool = False, load_nsf: bool = False, + ack_timeout: float = 3, ) -> Context: ''' @@ -863,10 +875,12 @@ class Actor: msg_buffer_size=msg_buffer_size, allow_overruns=allow_overruns, ) + ctx._portal = portal if ( 'self' in nsf - or not load_nsf + or + not load_nsf ): ns, _, func = nsf.partition(':') else: @@ -874,42 +888,29 @@ class Actor: # -[ ] but, how to do `self:`?? ns, func = nsf.to_tuple() + msg = msgtypes.Start( + ns=ns, + func=func, + kwargs=kwargs, + uid=self.uid, + cid=cid, + ) log.runtime( - 'Sending cmd to\n' - f'peer: {chan.uid} => \n' - '\n' - f'=> {ns}.{func}({kwargs})\n' + 'Sending RPC start msg\n\n' + f'=> peer: {chan.uid}\n' + f' |_ {ns}.{func}({kwargs})\n' ) - await chan.send( - msgtypes.Start( - ns=ns, - func=func, - kwargs=kwargs, - uid=self.uid, - cid=cid, - ) - ) - # {'cmd': ( - # ns, - # func, - # kwargs, - # self.uid, - # cid, - # )} - # ) + await chan.send(msg) - # Wait on first response msg and validate; this should be - # immediate. - # first_msg: dict = await ctx._recv_chan.receive() - # functype: str = first_msg.get('functype') - - first_msg: msgtypes.StartAck = await ctx._recv_chan.receive() + # NOTE wait on first `StartAck` response msg and validate; + # this should be immediate and does not (yet) wait for the + # remote child task to sync via `Context.started()`. + with trio.fail_after(ack_timeout): + first_msg: msgtypes.StartAck = await ctx._recv_chan.receive() try: functype: str = first_msg.functype except AttributeError: raise unpack_error(first_msg, chan) - # if 'error' in first_msg: - # raise unpack_error(first_msg, chan) if functype not in ( 'asyncfunc', @@ -917,7 +918,7 @@ class Actor: 'context', ): raise ValueError( - f'{first_msg} is an invalid response packet?' + f'Invalid `StartAck.functype: str = {first_msg!r}` ??' ) ctx._remote_func_type = functype @@ -1162,7 +1163,7 @@ class Actor: # kill any debugger request task to avoid deadlock # with the root actor in this tree - dbcs = _debug.Lock._debugger_request_cs + dbcs = _debug.DebugStatus.req_cs if dbcs is not None: msg += ( '>> Cancelling active debugger request..\n' @@ -1237,9 +1238,9 @@ class Actor: except KeyError: # NOTE: during msging race conditions this will often # emit, some examples: - # - callee returns a result before cancel-msg/ctxc-raised - # - callee self raises ctxc before caller send request, - # - callee errors prior to cancel req. + # - child returns a result before cancel-msg/ctxc-raised + # - child self raises ctxc before parent send request, + # - child errors prior to cancel req. log.cancel( 'Cancel request invalid, RPC task already completed?\n\n' f'<= canceller: {requesting_uid}\n\n' @@ -1302,15 +1303,15 @@ class Actor: flow_info: str = ( f'<= canceller: {requesting_uid}\n' f'=> ipc-parent: {parent_chan}\n' - f' |_{ctx}\n' + f'|_{ctx}\n' ) log.runtime( - 'Waiting on RPC task to cancel\n' + 'Waiting on RPC task to cancel\n\n' f'{flow_info}' ) await is_complete.wait() log.runtime( - f'Sucessfully cancelled RPC task\n' + f'Sucessfully cancelled RPC task\n\n' f'{flow_info}' ) return True @@ -1536,8 +1537,8 @@ async def async_main( ''' # attempt to retreive ``trio``'s sigint handler and stash it - # on our debugger lock state. - _debug.Lock._trio_handler = signal.getsignal(signal.SIGINT) + # on our debugger state. + _debug.DebugStatus._trio_handler = signal.getsignal(signal.SIGINT) is_registered: bool = False try: diff --git a/tractor/devx/_debug.py b/tractor/devx/_debug.py index 51e7437..e4ab7d8 100644 --- a/tractor/devx/_debug.py +++ b/tractor/devx/_debug.py @@ -160,12 +160,6 @@ class Lock: # placeholder for function to set a ``trio.Event`` on debugger exit # pdb_release_hook: Callable | None = None - _trio_handler: ( - Callable[[int, FrameType|None], Any] - |int - | None - ) = None - remote_task_in_debug: str|None = None @staticmethod @@ -188,12 +182,6 @@ class Lock: Lock._locking_task_cs = cs - # SUBACTOR ONLY - # ------ - ------- - local_task_in_debug: Task|None = None - _debugger_request_cs: trio.CancelScope|None = None - local_pdb_complete: trio.Event|None = None - # ROOT ONLY # ------ - ------- # the root-actor-ONLY singletons for, @@ -214,16 +202,12 @@ class Lock: _debug_lock: trio.StrictFIFOLock = trio.StrictFIFOLock() _blocked: set[tuple[str, str]] = set() # `Actor.uid` block list - # TODO: should go on `PbpREPL`? - _orig_sigint_handler: Callable | None = None - @classmethod def repr(cls) -> str: # both root and subs fields: str = ( f'repl: {cls.repl}\n' - f'local_repl_task: {cls.local_task_in_debug}\n' ) if is_root_process(): @@ -238,12 +222,6 @@ class Lock: f'_debug_lock: {cls._debug_lock}\n' f'lock_stats: {lock_stats}\n' ) - else: - fields += ( - f'local_task_in_debug: {cls.local_task_in_debug}\n' - f'local_pdb_complete: {cls.local_pdb_complete}\n' - f'_debugger_request_cs: {cls._debugger_request_cs}\n' - ) body: str = textwrap.indent( fields, @@ -255,7 +233,101 @@ class Lock: ')>' ) - # TODO: move to PdbREPL! + @classmethod + def release(cls): + try: + if not DebugStatus.is_main_trio_thread(): + trio.from_thread.run_sync( + cls._debug_lock.release + ) + else: + cls._debug_lock.release() + + except RuntimeError as rte: + # uhhh makes no sense but been seeing the non-owner + # release error even though this is definitely the task + # that locked? + owner = cls._debug_lock.statistics().owner + # if ( + # owner + # and + # cls.remote_task_in_debug is None + # ): + # raise RuntimeError( + # 'Stale `Lock` detected, no remote task active!?\n' + # f'|_{owner}\n' + # # f'{Lock}' + # ) from rte + + if owner: + raise rte + + # OW suppress, can't member why tho .. XD + # something somethin corrupts a cancel-scope + # somewhere.. + + try: + # sometimes the ``trio`` might already be terminated in + # which case this call will raise. + if DebugStatus.repl_release is not None: + DebugStatus.repl_release.set() + + finally: + cls.repl = None + cls.global_actor_in_debug = None + + # restore original sigint handler + DebugStatus.unshield_sigint() + # actor-local state, irrelevant for non-root. + DebugStatus.repl_task = None + + +# TODO: actually use this instead throughout for subs! +class DebugStatus: + ''' + Singleton-state for debugging machinery in a subactor. + + Composes conc primitives for syncing with a root actor to + acquire the tree-global (TTY) `Lock` such that only ever one + actor's task can have the REPL active at a given time. + + Methods to shield the process' `SIGINT` handler are used + whenever a local task is an active REPL. + + ''' + repl: PdbREPL|None = None + repl_task: Task|None = None + req_cs: trio.CancelScope|None = None + repl_release: trio.Event|None = None + + lock_status: LockStatus|None = None + + _orig_sigint_handler: Callable | None = None + _trio_handler: ( + Callable[[int, FrameType|None], Any] + |int + | None + ) = None + + + @classmethod + def repr(cls) -> str: + fields: str = ( + f'repl: {cls.repl}\n' + f'repl_task: {cls.repl_task}\n' + f'repl_release: {cls.repl_release}\n' + f'req_cs: {cls.req_cs}\n' + ) + body: str = textwrap.indent( + fields, + prefix=' |_', + ) + return ( + f'<{cls.__name__}(\n' + f'{body}' + ')>' + ) + @classmethod def shield_sigint(cls): ''' @@ -339,77 +411,6 @@ class Lock: # is not threading.main_thread() # ) - @classmethod - def release(cls): - try: - if not cls.is_main_trio_thread(): - trio.from_thread.run_sync( - cls._debug_lock.release - ) - else: - cls._debug_lock.release() - - except RuntimeError as rte: - # uhhh makes no sense but been seeing the non-owner - # release error even though this is definitely the task - # that locked? - owner = cls._debug_lock.statistics().owner - # if ( - # owner - # and - # cls.remote_task_in_debug is None - # ): - # raise RuntimeError( - # 'Stale `Lock` detected, no remote task active!?\n' - # f'|_{owner}\n' - # # f'{Lock}' - # ) from rte - - if owner: - raise rte - - # OW suppress, can't member why tho .. XD - # something somethin corrupts a cancel-scope - # somewhere.. - - try: - # sometimes the ``trio`` might already be terminated in - # which case this call will raise. - if cls.local_pdb_complete is not None: - cls.local_pdb_complete.set() - - finally: - # restore original sigint handler - cls.unshield_sigint() - cls.repl = None - - # actor-local state, irrelevant for non-root. - cls.global_actor_in_debug = None - cls.local_task_in_debug = None - - -# TODO: actually use this instead throughout for subs! -class DebugStatus: - ''' - Singleton-state for debugging machinery in a subactor. - - Composes conc primitives for syncing with a root actor to - acquire the tree-global (TTY) `Lock` such that only ever one - actor's task can have the REPL active at a given time. - - ''' - repl: PdbREPL|None = None - lock_status: LockStatus|None = None - - repl_task: Task|None = None - # local_task_in_debug: Task|None = None - - req_cs: trio.CancelScope|None = None - # _debugger_request_cs: trio.CancelScope|None = None - - repl_release: trio.Event|None = None - # local_pdb_complete: trio.Event|None = None - class TractorConfig(pdbp.DefaultConfig): ''' @@ -445,6 +446,7 @@ class PdbREPL(pdbp.Pdb): status = DebugStatus + # def preloop(self): # print('IN PRELOOP') # super().preloop() @@ -660,16 +662,19 @@ async def lock_tty_for_child( highly reliable at releasing the mutex complete! ''' - req_task_uid: tuple = tuple(subactor_task_uid) if req_task_uid in Lock._blocked: raise RuntimeError( + f'Double lock request!?\n' f'The same remote task already has an active request for TTY lock ??\n\n' f'task uid: {req_task_uid}\n' - f'subactor uid: {subactor_uid}\n' - ) + f'subactor uid: {subactor_uid}\n\n' - Lock._blocked.add(req_task_uid) + 'This might be mean that the requesting task ' + 'in `wait_for_parent_stdin_hijack()` may have crashed?\n' + 'Consider that an internal bug exists given the TTY ' + '`Lock`ing IPC dialog..\n' + ) root_task_name: str = current_task().name if tuple(subactor_uid) in Lock._blocked: @@ -695,8 +700,9 @@ async def lock_tty_for_child( f'subactor_uid: {subactor_uid}\n' f'remote task: {subactor_task_uid}\n' ) - Lock.shield_sigint() + DebugStatus.shield_sigint() try: + Lock._blocked.add(req_task_uid) with ( # NOTE: though a cs is created for every subactor lock # REQUEST in this ctx-child task, only the root-task @@ -708,6 +714,9 @@ async def lock_tty_for_child( # used to do so! trio.CancelScope(shield=True) as debug_lock_cs, + # TODO: make this ONLY limit the pld_spec such that we + # can on-error-decode-`.pld: Raw` fields in + # `Context._deliver_msg()`? _codec.limit_msg_spec( payload_spec=__msg_spec__, ) as codec, @@ -763,8 +772,9 @@ async def lock_tty_for_child( finally: debug_lock_cs.cancel() + Lock._blocked.remove(req_task_uid) Lock.set_locking_task_cs(None) - Lock.unshield_sigint() + DebugStatus.unshield_sigint() @cm @@ -817,7 +827,7 @@ async def wait_for_parent_stdin_hijack( trio.CancelScope(shield=True) as cs, apply_debug_codec(), ): - Lock._debugger_request_cs = cs + DebugStatus.req_cs = cs try: # TODO: merge into sync async with ? async with get_root() as portal: @@ -829,7 +839,7 @@ async def wait_for_parent_stdin_hijack( ) as (ctx, resp): log.pdb( - 'Subactor locked TTY per msg\n' + 'Subactor locked TTY with msg\n\n' f'{resp}\n' ) assert resp.subactor_uid == actor_uid @@ -837,12 +847,12 @@ async def wait_for_parent_stdin_hijack( async with ctx.open_stream() as stream: try: # to unblock local caller - assert Lock.local_pdb_complete + assert DebugStatus.repl_release task_status.started(cs) # wait for local task to exit and # release the REPL - await Lock.local_pdb_complete.wait() + await DebugStatus.repl_release.wait() finally: await stream.send( @@ -867,12 +877,12 @@ async def wait_for_parent_stdin_hijack( raise finally: - Lock.local_task_in_debug = None + DebugStatus.repl_task = None log.debug('Exiting debugger TTY lock request func from child') log.cancel('Reverting SIGINT handler!') - Lock.unshield_sigint() + DebugStatus.unshield_sigint() @@ -901,7 +911,7 @@ def mk_mpdb() -> PdbREPL: # in which case schedule the SIGINT shielding override # to in the main thread. # https://docs.python.org/3/library/signal.html#signals-and-threads - Lock.shield_sigint() + DebugStatus.shield_sigint() # XXX: These are the important flags mentioned in # https://github.com/python-trio/trio/issues/1155 @@ -1036,7 +1046,8 @@ def shield_sigint_handler( ) log.warning(message) - Lock.unshield_sigint() + # Lock.unshield_sigint() + DebugStatus.unshield_sigint() case_handled = True else: @@ -1064,7 +1075,7 @@ def shield_sigint_handler( if maybe_stale_lock_cs: lock_cs.cancel() - Lock.unshield_sigint() + DebugStatus.unshield_sigint() case_handled = True # child actor that has locked the debugger @@ -1086,11 +1097,11 @@ def shield_sigint_handler( f'{uid_in_debug}\n' 'Allowing SIGINT propagation..' ) - Lock.unshield_sigint() + DebugStatus.unshield_sigint() # do_cancel() case_handled = True - task: str|None = Lock.local_task_in_debug + task: str|None = DebugStatus.repl_task if ( task and @@ -1124,7 +1135,7 @@ def shield_sigint_handler( + 'Reverting handler to `trio` default!\n' ) - Lock.unshield_sigint() + DebugStatus.unshield_sigint() case_handled = True # XXX ensure that the reverted-to-handler actually is @@ -1200,32 +1211,15 @@ def _set_trace( pdb and actor is not None ) - # or shield ): - msg: str = _pause_msg - if shield: - # log.warning( - msg = ( - '\n\n' - ' ------ - ------\n' - 'Debugger invoked with `shield=True` so an extra\n' - '`trio.CancelScope.__exit__()` frame is shown..\n' - '\n' - 'Try going up one frame to see your pause point!\n' - '\n' - ' SORRY we need to fix this!\n' - ' ------ - ------\n\n' - ) + msg - - # pdbp.set_trace() # TODO: maybe print the actor supervion tree up to the # root here? Bo + log.pdb( - f'{msg}\n' + f'{_pause_msg}\n' '|\n' # TODO: make an `Actor.__repr()__` - # f'|_ {current_task()} @ {actor.name}\n' - f'|_ {current_task()}\n' + f'|_ {current_task()} @ {actor.uid}\n' ) # no f!#$&* idea, but when we're in async land # we need 2x frames up? @@ -1286,11 +1280,11 @@ async def _pause( # task_name: str = task.name if ( - not Lock.local_pdb_complete + not DebugStatus.repl_release or - Lock.local_pdb_complete.is_set() + DebugStatus.repl_release.is_set() ): - Lock.local_pdb_complete = trio.Event() + DebugStatus.repl_release = trio.Event() if debug_func is not None: debug_func = partial(debug_func) @@ -1333,71 +1327,14 @@ async def _pause( Lock.release() raise - except BaseException: - log.exception( - 'Failed to engage debugger via `_pause()` ??\n' - ) - raise + try: + if is_root_process(): - if is_root_process(): - - # we also wait in the root-parent for any child that - # may have the tty locked prior - # TODO: wait, what about multiple root tasks acquiring it though? - if Lock.global_actor_in_debug == actor.uid: - # re-entrant root process already has it: noop. - log.warning( - f'{task.name}@{actor.uid} already has TTY lock\n' - f'ignoring..' - ) - await trio.lowlevel.checkpoint() - return - - # XXX: since we need to enter pdb synchronously below, - # we have to release the lock manually from pdb completion - # callbacks. Can't think of a nicer way then this atm. - if Lock._debug_lock.locked(): - log.warning( - 'attempting to shield-acquire active TTY lock' - f' owned by {Lock.global_actor_in_debug}' - ) - - # must shield here to avoid hitting a ``Cancelled`` and - # a child getting stuck bc we clobbered the tty - with trio.CancelScope(shield=True): - await Lock._debug_lock.acquire() - else: - # may be cancelled - await Lock._debug_lock.acquire() - - Lock.global_actor_in_debug = actor.uid - Lock.local_task_in_debug = task - Lock.repl = pdb - - # enter REPL from root, no TTY locking IPC ctx necessary - _enter_repl_sync(debug_func) - return # next branch is mutex and for subactors - - # TODO: need a more robust check for the "root" actor - elif ( - not is_root_process() - and actor._parent_chan # a connected child - ): - if Lock.local_task_in_debug: - - # Recurrence entry case: this task already has the lock and - # is likely recurrently entering a breakpoint - # - # NOTE: noop on recurrent entry case but we want to trigger - # a checkpoint to allow other actors error-propagate and - # potetially avoid infinite re-entries in some - # subactor that would otherwise not bubble until the - # next checkpoint was hit. - if ( - (repl_task := Lock.local_task_in_debug) - and - repl_task is task - ): + # we also wait in the root-parent for any child that + # may have the tty locked prior + # TODO: wait, what about multiple root tasks acquiring it though? + if Lock.global_actor_in_debug == actor.uid: + # re-entrant root process already has it: noop. log.warning( f'{task.name}@{actor.uid} already has TTY lock\n' f'ignoring..' @@ -1405,79 +1342,137 @@ async def _pause( await trio.lowlevel.checkpoint() return - # if **this** actor is already in debug REPL we want - # to maintain actor-local-task mutex access, so block - # here waiting for the control to be released - this - # -> allows for recursive entries to `tractor.pause()` - log.warning( - f'{task.name}@{actor.uid} already has TTY lock\n' - f'waiting for release..' - ) - await Lock.local_pdb_complete.wait() - await trio.sleep(0.1) - - # mark local actor as "in debug mode" to avoid recurrent - # entries/requests to the root process - Lock.local_task_in_debug = task - - # this **must** be awaited by the caller and is done using the - # root nursery so that the debugger can continue to run without - # being restricted by the scope of a new task nursery. - - # TODO: if we want to debug a trio.Cancelled triggered exception - # we have to figure out how to avoid having the service nursery - # cancel on this task start? I *think* this works below: - # ```python - # actor._service_n.cancel_scope.shield = shield - # ``` - # but not entirely sure if that's a sane way to implement it? - - # NOTE: MUST it here bc multiple tasks are spawned by any - # one sub-actor AND there will be a race between when the - # root locking task delivers the `Started(pld=LockStatus)` - # and when the REPL is actually entered here. SO ensure - # the codec is set before either are run! - # - with ( - # _codec.limit_msg_spec( - # payload_spec=__msg_spec__, - # ) as debug_codec, - trio.CancelScope(shield=shield), - ): - # async with trio.open_nursery() as tn: - # tn.cancel_scope.shield = True - try: - # cs: trio.CancelScope = await tn.start( - cs: trio.CancelScope = await actor._service_n.start( - wait_for_parent_stdin_hijack, - actor.uid, - (task.name, id(task)), + # XXX: since we need to enter pdb synchronously below, + # we have to release the lock manually from pdb completion + # callbacks. Can't think of a nicer way then this atm. + if Lock._debug_lock.locked(): + log.warning( + 'attempting to shield-acquire active TTY lock' + f' owned by {Lock.global_actor_in_debug}' ) - # our locker task should be the one in ctx - # with the root actor - assert Lock._debugger_request_cs is cs - # XXX used by the SIGINT handler to check if - # THIS actor is in REPL interaction - Lock.repl = pdb + # must shield here to avoid hitting a ``Cancelled`` and + # a child getting stuck bc we clobbered the tty + with trio.CancelScope(shield=True): + await Lock._debug_lock.acquire() + else: + # may be cancelled + await Lock._debug_lock.acquire() - except RuntimeError: - Lock.release() + Lock.global_actor_in_debug = actor.uid + DebugStatus.repl_task = task + DebugStatus.repl = Lock.repl = pdb - if actor._cancel_called: - # service nursery won't be usable and we - # don't want to lock up the root either way since - # we're in (the midst of) cancellation. + # enter REPL from root, no TTY locking IPC ctx necessary + _enter_repl_sync(debug_func) + return # next branch is mutex and for subactors + + # TODO: need a more robust check for the "root" actor + elif ( + not is_root_process() + and actor._parent_chan # a connected child + ): + if DebugStatus.repl_task: + + # Recurrence entry case: this task already has the lock and + # is likely recurrently entering a breakpoint + # + # NOTE: noop on recurrent entry case but we want to trigger + # a checkpoint to allow other actors error-propagate and + # potetially avoid infinite re-entries in some + # subactor that would otherwise not bubble until the + # next checkpoint was hit. + if ( + (repl_task := DebugStatus.repl_task) + and + repl_task is task + ): + log.warning( + f'{task.name}@{actor.uid} already has TTY lock\n' + f'ignoring..' + ) + await trio.lowlevel.checkpoint() return - raise + # if **this** actor is already in debug REPL we want + # to maintain actor-local-task mutex access, so block + # here waiting for the control to be released - this + # -> allows for recursive entries to `tractor.pause()` + log.warning( + f'{task.name}@{actor.uid} already has TTY lock\n' + f'waiting for release..' + ) + await DebugStatus.repl_release.wait() + await trio.sleep(0.1) - # enter REPL + # mark local actor as "in debug mode" to avoid recurrent + # entries/requests to the root process + DebugStatus.repl_task = task - try: - _enter_repl_sync(debug_func) - finally: - Lock.unshield_sigint() + # this **must** be awaited by the caller and is done using the + # root nursery so that the debugger can continue to run without + # being restricted by the scope of a new task nursery. + + # TODO: if we want to debug a trio.Cancelled triggered exception + # we have to figure out how to avoid having the service nursery + # cancel on this task start? I *think* this works below: + # ```python + # actor._service_n.cancel_scope.shield = shield + # ``` + # but not entirely sure if that's a sane way to implement it? + + # NOTE: MUST it here bc multiple tasks are spawned by any + # one sub-actor AND there will be a race between when the + # root locking task delivers the `Started(pld=LockStatus)` + # and when the REPL is actually entered here. SO ensure + # the codec is set before either are run! + # + with ( + # _codec.limit_msg_spec( + # payload_spec=__msg_spec__, + # ) as debug_codec, + trio.CancelScope(shield=shield), + ): + # async with trio.open_nursery() as tn: + # tn.cancel_scope.shield = True + try: + # cs: trio.CancelScope = await tn.start( + cs: trio.CancelScope = await actor._service_n.start( + wait_for_parent_stdin_hijack, + actor.uid, + (task.name, id(task)), + ) + # our locker task should be the one in ctx + # with the root actor + assert DebugStatus.req_cs is cs + + # XXX used by the SIGINT handler to check if + # THIS actor is in REPL interaction + Lock.repl = pdb + + except RuntimeError: + Lock.release() + + if actor._cancel_called: + # service nursery won't be usable and we + # don't want to lock up the root either way since + # we're in (the midst of) cancellation. + return + + raise + + # enter REPL + + try: + _enter_repl_sync(debug_func) + finally: + DebugStatus.unshield_sigint() + + except BaseException: + log.exception( + 'Failed to engage debugger via `_pause()` ??\n' + ) + raise # XXX: apparently we can't do this without showing this frame @@ -1527,45 +1522,16 @@ async def pause( ''' __tracebackhide__: bool = True - if shield: - # NOTE XXX: even hard coding this inside the `class CancelScope:` - # doesn't seem to work for me!? - # ^ XXX ^ + with trio.CancelScope( + shield=shield, + ) as cs: - # def _exit(self, *args, **kwargs): - # __tracebackhide__: bool = True - # super().__exit__(*args, **kwargs) - - # trio.CancelScope.__enter__.__tracebackhide__ = True - # trio.CancelScope.__exit__.__tracebackhide__ = True - - # import types - # with trio.CancelScope(shield=shield) as cs: - # cs.__exit__ = types.MethodType(_exit, cs) - # cs.__exit__.__tracebackhide__ = True - - # TODO: LOL, solved this with the `pdb.hideframe` stuff - # at top-of-mod.. so I guess we can just only use this - # block right? - with trio.CancelScope( - shield=shield, - ) as cs: - print(f'debug cs is {cs}\n') - # setattr(cs.__exit__.__func__, '__tracebackhide__', True) - # setattr(cs.__enter__.__func__, '__tracebackhide__', True) - - # NOTE: so the caller can always cancel even if shielded - task_status.started(cs) - return await _pause( - debug_func=debug_func, - shield=True, - task_status=task_status, - **_pause_kwargs - ) - else: + # NOTE: so the caller can always manually cancel even + # if shielded! + task_status.started(cs) return await _pause( debug_func=debug_func, - shield=False, + shield=shield, task_status=task_status, **_pause_kwargs ) @@ -1682,7 +1648,7 @@ def pause_from_sync( ) ) # TODO: maybe the `trio.current_task()` id/name if avail? - Lock.local_task_in_debug: str = str(threading.current_thread()) + DebugStatus.repl_task: str = str(threading.current_thread()) else: # we are presumably the `trio.run()` + main thread greenback.await_( @@ -1692,7 +1658,7 @@ def pause_from_sync( hide_tb=hide_tb, ) ) - Lock.local_task_in_debug: str = current_task() + DebugStatus.repl_task: str = current_task() # TODO: ensure we aggressively make the user aware about # entering the global ``breakpoint()`` built-in from sync @@ -1754,7 +1720,8 @@ def _post_mortem( log.pdb( f'{_crash_msg}\n' '|\n' - f'|_ {current_task()}\n' + # f'|_ {current_task()}\n' + f'|_ {current_task()} @ {actor.uid}\n' # f'|_ @{actor.uid}\n' # TODO: make an `Actor.__repr()__`