From a9f06df3fb86d5f2ef33035babca187cf82b7289 Mon Sep 17 00:00:00 2001 From: Tyler Goodlet Date: Tue, 19 Aug 2025 19:24:20 -0400 Subject: [PATCH] Heh, add back `Actor._root_tn`, it has purpose.. Turns out I didn't read my own internals docs/comments and despite it not being used previously, this adds the real use case: a root, per-actor, scope which ensures parent comms are the last conc-thing to be cancelled. Also, the impl changes here make the test from 6410e45 (or wtv it's rebased to) pass, i.e. we can support crash handling in the root actor despite the root-tn having been (self) cancelled. Superficial adjustments, - rename `Actor._service_n` -> `._service_tn` everywhere. - add asserts to `._runtime.async_main()` which ensure that the any `.trionics.maybe_open_nursery()` calls against optionally passed `._[root/service]_tn` are allocated-if-not-provided (the `._service_tn`-case being an i-guess-prep-for-the-future-anti-pattern Bp). - obvi adjust all internal usage to match new naming. Serious/real-use-case changes, - add (back) a `Actor._root_tn` which sits a scope "above" the service-tn and is either, + assigned in `._runtime.async_main()` for sub-actors OR, + assigned in `._root.open_root_actor()` for the root actor. **THE primary reason** to keep this "upper" tn is that during a full-`Actor`-cancellation condition (more details below) we want to ensure that the IPC connection with a sub-actor's parent is **the last thing to be cancelled**; this is most simply implemented by ensuring that the `Actor._parent_chan: .ipc.Channel` is handled in an upper scope in `_rpc.process_messages()`-subtask-terms. - for the root actor this `root_tn` is allocated in `.open_root_actor()` body and assigned as such. - extend `Actor.cancel_soon()` to be cohesive with this entire teardown "policy" by scheduling a task in the `._root_tn` which, * waits for the `._service_tn` to complete and then, * cancels the `._root_tn.cancel_scope`, * includes "sclangy" console logging throughout. --- tractor/_root.py | 1 + tractor/_rpc.py | 12 ++-- tractor/_runtime.py | 107 ++++++++++++++++++++++++++--------- tractor/devx/debug/_trace.py | 10 ++-- tractor/trionics/_mngrs.py | 16 +++--- 5 files changed, 100 insertions(+), 46 deletions(-) diff --git a/tractor/_root.py b/tractor/_root.py index 5ad1afb9..e496dfd5 100644 --- a/tractor/_root.py +++ b/tractor/_root.py @@ -488,6 +488,7 @@ async def open_root_actor( # -> see note on why shielding. # maybe_raise_from_masking_exc(), ): + actor._root_tn = root_tn # `_runtime.async_main()` creates an internal nursery # and blocks here until any underlying actor(-process) # tree has terminated thereby conducting so called diff --git a/tractor/_rpc.py b/tractor/_rpc.py index 5aee986d..573aa77b 100644 --- a/tractor/_rpc.py +++ b/tractor/_rpc.py @@ -384,7 +384,7 @@ async def _errors_relayed_via_ipc( # RPC task bookeeping. # since RPC tasks are scheduled inside a flat - # `Actor._service_n`, we add "handles" to each such that + # `Actor._service_tn`, we add "handles" to each such that # they can be individually ccancelled. finally: @@ -462,7 +462,7 @@ async def _invoke( connected IPC channel. This is the core "RPC" `trio.Task` scheduling machinery used to start every - remotely invoked function, normally in `Actor._service_n: Nursery`. + remotely invoked function, normally in `Actor._service_tn: Nursery`. ''' __tracebackhide__: bool = hide_tb @@ -936,7 +936,7 @@ async def process_messages( Receive (multiplexed) per-`Channel` RPC requests as msgs from remote processes; schedule target async funcs as local - `trio.Task`s inside the `Actor._service_n: Nursery`. + `trio.Task`s inside the `Actor._service_tn: Nursery`. Depending on msg type, non-`cmd` (task spawning/starting) request payloads (eg. `started`, `yield`, `return`, `error`) @@ -961,7 +961,7 @@ async def process_messages( ''' actor: Actor = _state.current_actor() - assert actor._service_n # runtime state sanity + assert actor._service_tn # runtime state sanity # TODO: once `trio` get's an "obvious way" for req/resp we # should use it? @@ -1172,7 +1172,7 @@ async def process_messages( start_status += '->( scheduling new task..\n' log.runtime(start_status) try: - ctx: Context = await actor._service_n.start( + ctx: Context = await actor._service_tn.start( partial( _invoke, actor, @@ -1312,7 +1312,7 @@ async def process_messages( ) as err: if nursery_cancelled_before_task: - sn: Nursery = actor._service_n + sn: Nursery = actor._service_tn assert sn and sn.cancel_scope.cancel_called # sanity log.cancel( f'Service nursery cancelled before it handled {funcname}' diff --git a/tractor/_runtime.py b/tractor/_runtime.py index c9bd34bb..c204f033 100644 --- a/tractor/_runtime.py +++ b/tractor/_runtime.py @@ -76,6 +76,7 @@ from tractor.msg import ( ) from .trionics import ( collapse_eg, + maybe_open_nursery, ) from .ipc import ( Channel, @@ -173,9 +174,11 @@ class Actor: msg_buffer_size: int = 2**6 - # nursery placeholders filled in by `async_main()` after fork - _service_n: Nursery|None = None - + # nursery placeholders filled in by `async_main()`, + # - after fork for subactors. + # - during boot for the root actor. + _root_tn: Nursery|None = None + _service_tn: Nursery|None = None _ipc_server: _server.IPCServer|None = None @property @@ -1009,12 +1012,46 @@ class Actor: the RPC service nursery. ''' - assert self._service_n - self._service_n.start_soon( + actor_repr: str = _pformat.nest_from_op( + input_op='>c(', + text=self.pformat(), + nest_indent=1, + ) + log.cancel( + 'Actor.cancel_soon()` was called!\n' + f'>> scheduling `Actor.cancel()`\n' + f'{actor_repr}' + ) + assert self._service_tn + self._service_tn.start_soon( self.cancel, None, # self cancel all rpc tasks ) + # schedule a root-tn canceller task once the service tn + # is fully shutdown. + async def cancel_root_tn_after_services(): + log.runtime( + 'Waiting on service-tn to cancel..\n' + f'c>)\n' + f'|_{self._service_tn.cancel_scope!r}\n' + ) + await self._cancel_complete.wait() + log.cancel( + f'`._service_tn` cancelled\n' + f'>c)\n' + f'|_{self._service_tn.cancel_scope!r}\n' + f'\n' + f'>> cancelling `._root_tn`\n' + f'c>(\n' + f' |_{self._root_tn.cancel_scope!r}\n' + ) + self._root_tn.cancel_scope.cancel() + + self._root_tn.start_soon( + cancel_root_tn_after_services + ) + @property def cancel_complete(self) -> bool: return self._cancel_complete.is_set() @@ -1119,8 +1156,8 @@ class Actor: await ipc_server.wait_for_shutdown() # cancel all rpc tasks permanently - if self._service_n: - self._service_n.cancel_scope.cancel() + if self._service_tn: + self._service_tn.cancel_scope.cancel() log_meth(msg) self._cancel_complete.set() @@ -1257,7 +1294,7 @@ class Actor: ''' Cancel all ongoing RPC tasks owned/spawned for a given `parent_chan: Channel` or simply all tasks (inside - `._service_n`) when `parent_chan=None`. + `._service_tn`) when `parent_chan=None`. ''' tasks: dict = self._rpc_tasks @@ -1469,40 +1506,56 @@ async def async_main( accept_addrs.append(addr.unwrap()) assert accept_addrs - # The "root" nursery ensures the channel with the immediate - # parent is kept alive as a resilient service until - # cancellation steps have (mostly) occurred in + + ya_root_tn: bool = bool(actor._root_tn) + ya_service_tn: bool = bool(actor._service_tn) + + # NOTE, a top-most "root" task-nursery ensures the channel + # with the immediate parent is kept alive as a resilient + # service until cancellation steps have (mostly) occurred in # a deterministic way. root_tn: trio.Nursery async with ( collapse_eg(), - trio.open_nursery() as root_tn, + maybe_open_nursery( + nursery=actor._root_tn, + ) as root_tn, + # trio.open_nursery() as root_tn, ): - # actor._root_n = root_tn - # assert actor._root_n + if ya_root_tn: + assert root_tn is actor._root_tn + else: + actor._root_tn = root_tn ipc_server: _server.IPCServer async with ( collapse_eg(), - trio.open_nursery() as service_nursery, + maybe_open_nursery( + nursery=actor._service_tn, + ) as service_tn, + # trio.open_nursery() as service_tn, _server.open_ipc_server( - parent_tn=service_nursery, - stream_handler_tn=service_nursery, + parent_tn=service_tn, + stream_handler_tn=service_tn, ) as ipc_server, # ) as actor._ipc_server, # ^TODO? prettier? ): - # This nursery is used to handle all inbound - # connections to us such that if the TCP server - # is killed, connections can continue to process - # in the background until this nursery is cancelled. - actor._service_n = service_nursery + if ya_service_tn: + assert service_tn is actor._service_tn + else: + # This nursery is used to handle all inbound + # connections to us such that if the TCP server + # is killed, connections can continue to process + # in the background until this nursery is cancelled. + actor._service_tn = service_tn + actor._ipc_server = ipc_server assert ( - actor._service_n + actor._service_tn and ( - actor._service_n + actor._service_tn is actor._ipc_server._parent_tn is @@ -1537,7 +1590,7 @@ async def async_main( try: eps: list = await ipc_server.listen_on( accept_addrs=accept_addrs, - stream_handler_nursery=service_nursery, + stream_handler_nursery=service_tn, ) log.runtime( f'Booted IPC server\n' @@ -1545,7 +1598,7 @@ async def async_main( ) assert ( (eps[0].listen_tn) - is not service_nursery + is not service_tn ) except OSError as oserr: @@ -1707,7 +1760,7 @@ async def async_main( # XXX TODO but hard XXX # we can't actually do this bc the debugger uses the - # _service_n to spawn the lock task, BUT, in theory if we had + # _service_tn to spawn the lock task, BUT, in theory if we had # the root nursery surround this finally block it might be # actually possible to debug THIS machinery in the same way # as user task code? diff --git a/tractor/devx/debug/_trace.py b/tractor/devx/debug/_trace.py index a7a1ce43..a23d2e23 100644 --- a/tractor/devx/debug/_trace.py +++ b/tractor/devx/debug/_trace.py @@ -481,12 +481,12 @@ async def _pause( # we have to figure out how to avoid having the service nursery # cancel on this task start? I *think* this works below: # ```python - # actor._service_n.cancel_scope.shield = shield + # actor._service_tn.cancel_scope.shield = shield # ``` # but not entirely sure if that's a sane way to implement it? # NOTE currently we spawn the lock request task inside this - # subactor's global `Actor._service_n` so that the + # subactor's global `Actor._service_tn` so that the # lifetime of the lock-request can outlive the current # `._pause()` scope while the user steps through their # application code and when they finally exit the @@ -510,7 +510,7 @@ async def _pause( f'|_{task}\n' ) with trio.CancelScope(shield=shield): - req_ctx: Context = await actor._service_n.start( + req_ctx: Context = await actor._service_tn.start( partial( request_root_stdio_lock, actor_uid=actor.uid, @@ -544,7 +544,7 @@ async def _pause( _repl_fail_report = None # when the actor is mid-runtime cancellation the - # `Actor._service_n` might get closed before we can spawn + # `Actor._service_tn` might get closed before we can spawn # the request task, so just ignore expected RTE. elif ( isinstance(pause_err, RuntimeError) @@ -989,7 +989,7 @@ def pause_from_sync( # that output and assign the `repl` created above! bg_task, _ = trio.from_thread.run( afn=partial( - actor._service_n.start, + actor._service_tn.start, partial( _pause_from_bg_root_thread, behalf_of_thread=thread, diff --git a/tractor/trionics/_mngrs.py b/tractor/trionics/_mngrs.py index 99f05852..3acfbeda 100644 --- a/tractor/trionics/_mngrs.py +++ b/tractor/trionics/_mngrs.py @@ -204,7 +204,7 @@ class _Cache: a kept-alive-while-in-use async resource. ''' - service_n: Optional[trio.Nursery] = None + service_tn: Optional[trio.Nursery] = None locks: dict[Hashable, trio.Lock] = {} users: int = 0 values: dict[Any, Any] = {} @@ -294,15 +294,15 @@ async def maybe_open_context( f'task: {task}\n' f'task_tn: {task_tn}\n' ) - service_n = tn + service_tn = tn else: - service_n: trio.Nursery = current_actor()._service_n + service_tn: trio.Nursery = current_actor()._service_tn # TODO: is there any way to allocate # a 'stays-open-till-last-task-finshed nursery? - # service_n: trio.Nursery - # async with maybe_open_nursery(_Cache.service_n) as service_n: - # _Cache.service_n = service_n + # service_tn: trio.Nursery + # async with maybe_open_nursery(_Cache.service_tn) as service_tn: + # _Cache.service_tn = service_tn cache_miss_ke: KeyError|None = None maybe_taskc: trio.Cancelled|None = None @@ -324,8 +324,8 @@ async def maybe_open_context( mngr = acm_func(**kwargs) resources = _Cache.resources assert not resources.get(ctx_key), f'Resource exists? {ctx_key}' - resources[ctx_key] = (service_n, trio.Event()) - yielded: Any = await service_n.start( + resources[ctx_key] = (service_tn, trio.Event()) + yielded: Any = await service_tn.start( _Cache.run_ctx, mngr, ctx_key,