diff --git a/examples/debugging/root_self_cancelled_w_error.py b/examples/debugging/root_self_cancelled_w_error.py new file mode 100644 index 00000000..b3c15288 --- /dev/null +++ b/examples/debugging/root_self_cancelled_w_error.py @@ -0,0 +1,35 @@ +import trio +import tractor + + +async def main(): + async with tractor.open_root_actor( + debug_mode=True, + loglevel='cancel', + ) as _root: + + # manually trigger self-cancellation and wait + # for it to fully trigger. + _root.cancel_soon() + await _root._cancel_complete.wait() + print('root cancelled') + + # now ensure we can still use the REPL + try: + await tractor.pause() + except trio.Cancelled as _taskc: + assert (root_cs := _root._root_tn.cancel_scope).cancel_called + # NOTE^^ above logic but inside `open_root_actor()` and + # passed to the `shield=` expression is effectively what + # we're testing here! + await tractor.pause(shield=root_cs.cancel_called) + + # XXX, if shield logic *is wrong* inside `open_root_actor()`'s + # crash-handler block this should never be interacted, + # instead `trio.Cancelled` would be bubbled up: the original + # BUG. + assert 0 + + +if __name__ == '__main__': + trio.run(main) diff --git a/tests/devx/test_debugger.py b/tests/devx/test_debugger.py index 6179ef01..cacab803 100644 --- a/tests/devx/test_debugger.py +++ b/tests/devx/test_debugger.py @@ -1,13 +1,13 @@ """ That "native" debug mode better work! -All these tests can be understood (somewhat) by running the equivalent -`examples/debugging/` scripts manually. +All these tests can be understood (somewhat) by running the +equivalent `examples/debugging/` scripts manually. TODO: - - none of these tests have been run successfully on windows yet but - there's been manual testing that verified it works. - - wonder if any of it'll work on OS X? + - none of these tests have been run successfully on windows yet but + there's been manual testing that verified it works. + - wonder if any of it'll work on OS X? """ from __future__ import annotations @@ -925,6 +925,7 @@ def test_post_mortem_api( " see note on why shielding. # maybe_raise_from_masking_exc(), ): + actor._root_tn = root_tn # `_runtime.async_main()` creates an internal nursery # and blocks here until any underlying actor(-process) # tree has terminated thereby conducting so called @@ -523,6 +524,11 @@ async def open_root_actor( err, api_frame=inspect.currentframe(), debug_filter=debug_filter, + + # XXX NOTE, required to debug root-actor + # crashes under cancellation conditions; so + # most of them! + shield=root_tn.cancel_scope.cancel_called, ) if ( @@ -562,6 +568,7 @@ async def open_root_actor( f'{op_nested_actor_repr}' ) # XXX, THIS IS A *finally-footgun*! + # (also mentioned in with-block above) # -> though already shields iternally it can # taskc here and mask underlying errors raised in # the try-block above? diff --git a/tractor/_rpc.py b/tractor/_rpc.py index 5aee986d..573aa77b 100644 --- a/tractor/_rpc.py +++ b/tractor/_rpc.py @@ -384,7 +384,7 @@ async def _errors_relayed_via_ipc( # RPC task bookeeping. # since RPC tasks are scheduled inside a flat - # `Actor._service_n`, we add "handles" to each such that + # `Actor._service_tn`, we add "handles" to each such that # they can be individually ccancelled. finally: @@ -462,7 +462,7 @@ async def _invoke( connected IPC channel. This is the core "RPC" `trio.Task` scheduling machinery used to start every - remotely invoked function, normally in `Actor._service_n: Nursery`. + remotely invoked function, normally in `Actor._service_tn: Nursery`. ''' __tracebackhide__: bool = hide_tb @@ -936,7 +936,7 @@ async def process_messages( Receive (multiplexed) per-`Channel` RPC requests as msgs from remote processes; schedule target async funcs as local - `trio.Task`s inside the `Actor._service_n: Nursery`. + `trio.Task`s inside the `Actor._service_tn: Nursery`. Depending on msg type, non-`cmd` (task spawning/starting) request payloads (eg. `started`, `yield`, `return`, `error`) @@ -961,7 +961,7 @@ async def process_messages( ''' actor: Actor = _state.current_actor() - assert actor._service_n # runtime state sanity + assert actor._service_tn # runtime state sanity # TODO: once `trio` get's an "obvious way" for req/resp we # should use it? @@ -1172,7 +1172,7 @@ async def process_messages( start_status += '->( scheduling new task..\n' log.runtime(start_status) try: - ctx: Context = await actor._service_n.start( + ctx: Context = await actor._service_tn.start( partial( _invoke, actor, @@ -1312,7 +1312,7 @@ async def process_messages( ) as err: if nursery_cancelled_before_task: - sn: Nursery = actor._service_n + sn: Nursery = actor._service_tn assert sn and sn.cancel_scope.cancel_called # sanity log.cancel( f'Service nursery cancelled before it handled {funcname}' diff --git a/tractor/_runtime.py b/tractor/_runtime.py index c9bd34bb..f18e0d61 100644 --- a/tractor/_runtime.py +++ b/tractor/_runtime.py @@ -35,6 +35,15 @@ for running all lower level spawning, supervision and msging layers: SC-transitive RPC via scheduling of `trio` tasks. - registration of newly spawned actors with the discovery sys. +Glossary: +-------- + - tn: a `trio.Nursery` or "task nursery". + - an: an `ActorNursery` or "actor nursery". + - root: top/parent-most scope/task/process/actor (or other runtime + primitive) in a hierarchical tree. + - parent-ish: "higher-up" in the runtime-primitive hierarchy. + - child-ish: "lower-down" in the runtime-primitive hierarchy. + ''' from __future__ import annotations from contextlib import ( @@ -76,6 +85,7 @@ from tractor.msg import ( ) from .trionics import ( collapse_eg, + maybe_open_nursery, ) from .ipc import ( Channel, @@ -173,9 +183,11 @@ class Actor: msg_buffer_size: int = 2**6 - # nursery placeholders filled in by `async_main()` after fork - _service_n: Nursery|None = None - + # nursery placeholders filled in by `async_main()`, + # - after fork for subactors. + # - during boot for the root actor. + _root_tn: Nursery|None = None + _service_tn: Nursery|None = None _ipc_server: _server.IPCServer|None = None @property @@ -1009,12 +1021,48 @@ class Actor: the RPC service nursery. ''' - assert self._service_n - self._service_n.start_soon( + actor_repr: str = _pformat.nest_from_op( + input_op='>c(', + text=self.pformat(), + nest_indent=1, + ) + log.cancel( + 'Actor.cancel_soon()` was called!\n' + f'>> scheduling `Actor.cancel()`\n' + f'{actor_repr}' + ) + assert self._service_tn + self._service_tn.start_soon( self.cancel, None, # self cancel all rpc tasks ) + # schedule a "canceller task" in the `._root_tn` once the + # `._service_tn` is fully shutdown; task waits for child-ish + # scopes to fully exit then finally cancels its parent, + # root-most, scope. + async def cancel_root_tn_after_services(): + log.runtime( + 'Waiting on service-tn to cancel..\n' + f'c>)\n' + f'|_{self._service_tn.cancel_scope!r}\n' + ) + await self._cancel_complete.wait() + log.cancel( + f'`._service_tn` cancelled\n' + f'>c)\n' + f'|_{self._service_tn.cancel_scope!r}\n' + f'\n' + f'>> cancelling `._root_tn`\n' + f'c>(\n' + f' |_{self._root_tn.cancel_scope!r}\n' + ) + self._root_tn.cancel_scope.cancel() + + self._root_tn.start_soon( + cancel_root_tn_after_services + ) + @property def cancel_complete(self) -> bool: return self._cancel_complete.is_set() @@ -1119,8 +1167,8 @@ class Actor: await ipc_server.wait_for_shutdown() # cancel all rpc tasks permanently - if self._service_n: - self._service_n.cancel_scope.cancel() + if self._service_tn: + self._service_tn.cancel_scope.cancel() log_meth(msg) self._cancel_complete.set() @@ -1257,7 +1305,7 @@ class Actor: ''' Cancel all ongoing RPC tasks owned/spawned for a given `parent_chan: Channel` or simply all tasks (inside - `._service_n`) when `parent_chan=None`. + `._service_tn`) when `parent_chan=None`. ''' tasks: dict = self._rpc_tasks @@ -1469,46 +1517,55 @@ async def async_main( accept_addrs.append(addr.unwrap()) assert accept_addrs - # The "root" nursery ensures the channel with the immediate - # parent is kept alive as a resilient service until - # cancellation steps have (mostly) occurred in - # a deterministic way. + + ya_root_tn: bool = bool(actor._root_tn) + ya_service_tn: bool = bool(actor._service_tn) + + # NOTE, a top-most "root" nursery in each actor-process + # enables a lifetime priority for the IPC-channel connection + # with a sub-actor's immediate parent. I.e. this connection + # is kept alive as a resilient service connection until all + # other machinery has exited, cancellation of all + # embedded/child scopes have completed. This helps ensure + # a deterministic (and thus "graceful") + # first-class-supervision style teardown where a parent actor + # (vs. say peers) is always the last to be contacted before + # disconnect. root_tn: trio.Nursery async with ( collapse_eg(), - trio.open_nursery() as root_tn, + maybe_open_nursery( + nursery=actor._root_tn, + ) as root_tn, ): - # actor._root_n = root_tn - # assert actor._root_n + if ya_root_tn: + assert root_tn is actor._root_tn + else: + actor._root_tn = root_tn ipc_server: _server.IPCServer async with ( collapse_eg(), - trio.open_nursery() as service_nursery, + maybe_open_nursery( + nursery=actor._service_tn, + ) as service_tn, _server.open_ipc_server( - parent_tn=service_nursery, - stream_handler_tn=service_nursery, + parent_tn=service_tn, # ?TODO, why can't this be the root-tn + stream_handler_tn=service_tn, ) as ipc_server, - # ) as actor._ipc_server, - # ^TODO? prettier? ): - # This nursery is used to handle all inbound - # connections to us such that if the TCP server - # is killed, connections can continue to process - # in the background until this nursery is cancelled. - actor._service_n = service_nursery + if ya_service_tn: + assert service_tn is actor._service_tn + else: + # This nursery is used to handle all inbound + # connections to us such that if the TCP server + # is killed, connections can continue to process + # in the background until this nursery is cancelled. + actor._service_tn = service_tn + + # set after allocate actor._ipc_server = ipc_server - assert ( - actor._service_n - and ( - actor._service_n - is - actor._ipc_server._parent_tn - is - ipc_server._stream_handler_tn - ) - ) # load exposed/allowed RPC modules # XXX: do this **after** establishing a channel to the parent @@ -1534,10 +1591,11 @@ async def async_main( # - root actor: the ``accept_addr`` passed to this method # TODO: why is this not with the root nursery? + # - see above that the `._service_tn` is what's used? try: eps: list = await ipc_server.listen_on( accept_addrs=accept_addrs, - stream_handler_nursery=service_nursery, + stream_handler_nursery=service_tn, ) log.runtime( f'Booted IPC server\n' @@ -1545,7 +1603,7 @@ async def async_main( ) assert ( (eps[0].listen_tn) - is not service_nursery + is not service_tn ) except OSError as oserr: @@ -1707,7 +1765,7 @@ async def async_main( # XXX TODO but hard XXX # we can't actually do this bc the debugger uses the - # _service_n to spawn the lock task, BUT, in theory if we had + # _service_tn to spawn the lock task, BUT, in theory if we had # the root nursery surround this finally block it might be # actually possible to debug THIS machinery in the same way # as user task code? diff --git a/tractor/_spawn.py b/tractor/_spawn.py index 408e793c..8d3c2cf6 100644 --- a/tractor/_spawn.py +++ b/tractor/_spawn.py @@ -297,6 +297,23 @@ async def hard_kill( # zombies (as a feature) we ask the OS to do send in the # removal swad as the last resort. if cs.cancelled_caught: + + # TODO? attempt at intermediary-rent-sub + # with child in debug lock? + # |_https://github.com/goodboy/tractor/issues/320 + # + # if not is_root_process(): + # log.warning( + # 'Attempting to acquire debug-REPL-lock before zombie reap!' + # ) + # with trio.CancelScope(shield=True): + # async with debug.acquire_debug_lock( + # subactor_uid=current_actor().uid, + # ) as _ctx: + # log.warning( + # 'Acquired debug lock, child ready to be killed ??\n' + # ) + # TODO: toss in the skynet-logo face as ascii art? log.critical( # 'Well, the #ZOMBIE_LORD_IS_HERE# to collect\n' diff --git a/tractor/devx/debug/_post_mortem.py b/tractor/devx/debug/_post_mortem.py index eca0cd98..32d10074 100644 --- a/tractor/devx/debug/_post_mortem.py +++ b/tractor/devx/debug/_post_mortem.py @@ -250,7 +250,7 @@ async def _maybe_enter_pm( *, tb: TracebackType|None = None, api_frame: FrameType|None = None, - hide_tb: bool = False, + hide_tb: bool = True, # only enter debugger REPL when returns `True` debug_filter: Callable[ diff --git a/tractor/devx/debug/_trace.py b/tractor/devx/debug/_trace.py index 70d39325..a23d2e23 100644 --- a/tractor/devx/debug/_trace.py +++ b/tractor/devx/debug/_trace.py @@ -58,6 +58,7 @@ from tractor._context import Context from tractor import _state from tractor._exceptions import ( NoRuntime, + InternalError, ) from tractor._state import ( current_actor, @@ -79,6 +80,9 @@ from ._sigint import ( sigint_shield as sigint_shield, _ctlc_ignore_header as _ctlc_ignore_header ) +from ..pformat import ( + ppfmt, +) if TYPE_CHECKING: from trio.lowlevel import Task @@ -477,12 +481,12 @@ async def _pause( # we have to figure out how to avoid having the service nursery # cancel on this task start? I *think* this works below: # ```python - # actor._service_n.cancel_scope.shield = shield + # actor._service_tn.cancel_scope.shield = shield # ``` # but not entirely sure if that's a sane way to implement it? # NOTE currently we spawn the lock request task inside this - # subactor's global `Actor._service_n` so that the + # subactor's global `Actor._service_tn` so that the # lifetime of the lock-request can outlive the current # `._pause()` scope while the user steps through their # application code and when they finally exit the @@ -506,7 +510,7 @@ async def _pause( f'|_{task}\n' ) with trio.CancelScope(shield=shield): - req_ctx: Context = await actor._service_n.start( + req_ctx: Context = await actor._service_tn.start( partial( request_root_stdio_lock, actor_uid=actor.uid, @@ -540,7 +544,7 @@ async def _pause( _repl_fail_report = None # when the actor is mid-runtime cancellation the - # `Actor._service_n` might get closed before we can spawn + # `Actor._service_tn` might get closed before we can spawn # the request task, so just ignore expected RTE. elif ( isinstance(pause_err, RuntimeError) @@ -985,7 +989,7 @@ def pause_from_sync( # that output and assign the `repl` created above! bg_task, _ = trio.from_thread.run( afn=partial( - actor._service_n.start, + actor._service_tn.start, partial( _pause_from_bg_root_thread, behalf_of_thread=thread, @@ -1153,9 +1157,10 @@ def pause_from_sync( 'use_greenback', False, ): - raise RuntimeError( - '`greenback` was never initialized in this actor!?\n\n' - f'{_state._runtime_vars}\n' + raise InternalError( + f'`greenback` was never initialized in this actor?\n' + f'\n' + f'{ppfmt(_state._runtime_vars)}\n' ) from rte raise diff --git a/tractor/ipc/_server.py b/tractor/ipc/_server.py index 46fde2fc..55374b0a 100644 --- a/tractor/ipc/_server.py +++ b/tractor/ipc/_server.py @@ -1001,7 +1001,11 @@ class Server(Struct): partial( _serve_ipc_eps, server=self, - stream_handler_tn=stream_handler_nursery, + stream_handler_tn=( + stream_handler_nursery + or + self._stream_handler_tn + ), listen_addrs=accept_addrs, ) ) @@ -1145,13 +1149,17 @@ async def open_ipc_server( async with maybe_open_nursery( nursery=parent_tn, - ) as rent_tn: + ) as parent_tn: no_more_peers = trio.Event() no_more_peers.set() ipc_server = IPCServer( - _parent_tn=rent_tn, - _stream_handler_tn=stream_handler_tn or rent_tn, + _parent_tn=parent_tn, + _stream_handler_tn=( + stream_handler_tn + or + parent_tn + ), _no_more_peers=no_more_peers, ) try: diff --git a/tractor/trionics/_mngrs.py b/tractor/trionics/_mngrs.py index 99f05852..3acfbeda 100644 --- a/tractor/trionics/_mngrs.py +++ b/tractor/trionics/_mngrs.py @@ -204,7 +204,7 @@ class _Cache: a kept-alive-while-in-use async resource. ''' - service_n: Optional[trio.Nursery] = None + service_tn: Optional[trio.Nursery] = None locks: dict[Hashable, trio.Lock] = {} users: int = 0 values: dict[Any, Any] = {} @@ -294,15 +294,15 @@ async def maybe_open_context( f'task: {task}\n' f'task_tn: {task_tn}\n' ) - service_n = tn + service_tn = tn else: - service_n: trio.Nursery = current_actor()._service_n + service_tn: trio.Nursery = current_actor()._service_tn # TODO: is there any way to allocate # a 'stays-open-till-last-task-finshed nursery? - # service_n: trio.Nursery - # async with maybe_open_nursery(_Cache.service_n) as service_n: - # _Cache.service_n = service_n + # service_tn: trio.Nursery + # async with maybe_open_nursery(_Cache.service_tn) as service_tn: + # _Cache.service_tn = service_tn cache_miss_ke: KeyError|None = None maybe_taskc: trio.Cancelled|None = None @@ -324,8 +324,8 @@ async def maybe_open_context( mngr = acm_func(**kwargs) resources = _Cache.resources assert not resources.get(ctx_key), f'Resource exists? {ctx_key}' - resources[ctx_key] = (service_n, trio.Event()) - yielded: Any = await service_n.start( + resources[ctx_key] = (service_tn, trio.Event()) + yielded: Any = await service_tn.start( _Cache.run_ctx, mngr, ctx_key,