diff --git a/tractor/_root.py b/tractor/_root.py index 5ad1afb9..e496dfd5 100644 --- a/tractor/_root.py +++ b/tractor/_root.py @@ -488,6 +488,7 @@ async def open_root_actor( # -> see note on why shielding. # maybe_raise_from_masking_exc(), ): + actor._root_tn = root_tn # `_runtime.async_main()` creates an internal nursery # and blocks here until any underlying actor(-process) # tree has terminated thereby conducting so called diff --git a/tractor/_rpc.py b/tractor/_rpc.py index 5aee986d..573aa77b 100644 --- a/tractor/_rpc.py +++ b/tractor/_rpc.py @@ -384,7 +384,7 @@ async def _errors_relayed_via_ipc( # RPC task bookeeping. # since RPC tasks are scheduled inside a flat - # `Actor._service_n`, we add "handles" to each such that + # `Actor._service_tn`, we add "handles" to each such that # they can be individually ccancelled. finally: @@ -462,7 +462,7 @@ async def _invoke( connected IPC channel. This is the core "RPC" `trio.Task` scheduling machinery used to start every - remotely invoked function, normally in `Actor._service_n: Nursery`. + remotely invoked function, normally in `Actor._service_tn: Nursery`. ''' __tracebackhide__: bool = hide_tb @@ -936,7 +936,7 @@ async def process_messages( Receive (multiplexed) per-`Channel` RPC requests as msgs from remote processes; schedule target async funcs as local - `trio.Task`s inside the `Actor._service_n: Nursery`. + `trio.Task`s inside the `Actor._service_tn: Nursery`. Depending on msg type, non-`cmd` (task spawning/starting) request payloads (eg. `started`, `yield`, `return`, `error`) @@ -961,7 +961,7 @@ async def process_messages( ''' actor: Actor = _state.current_actor() - assert actor._service_n # runtime state sanity + assert actor._service_tn # runtime state sanity # TODO: once `trio` get's an "obvious way" for req/resp we # should use it? @@ -1172,7 +1172,7 @@ async def process_messages( start_status += '->( scheduling new task..\n' log.runtime(start_status) try: - ctx: Context = await actor._service_n.start( + ctx: Context = await actor._service_tn.start( partial( _invoke, actor, @@ -1312,7 +1312,7 @@ async def process_messages( ) as err: if nursery_cancelled_before_task: - sn: Nursery = actor._service_n + sn: Nursery = actor._service_tn assert sn and sn.cancel_scope.cancel_called # sanity log.cancel( f'Service nursery cancelled before it handled {funcname}' diff --git a/tractor/_runtime.py b/tractor/_runtime.py index c9bd34bb..c204f033 100644 --- a/tractor/_runtime.py +++ b/tractor/_runtime.py @@ -76,6 +76,7 @@ from tractor.msg import ( ) from .trionics import ( collapse_eg, + maybe_open_nursery, ) from .ipc import ( Channel, @@ -173,9 +174,11 @@ class Actor: msg_buffer_size: int = 2**6 - # nursery placeholders filled in by `async_main()` after fork - _service_n: Nursery|None = None - + # nursery placeholders filled in by `async_main()`, + # - after fork for subactors. + # - during boot for the root actor. + _root_tn: Nursery|None = None + _service_tn: Nursery|None = None _ipc_server: _server.IPCServer|None = None @property @@ -1009,12 +1012,46 @@ class Actor: the RPC service nursery. ''' - assert self._service_n - self._service_n.start_soon( + actor_repr: str = _pformat.nest_from_op( + input_op='>c(', + text=self.pformat(), + nest_indent=1, + ) + log.cancel( + 'Actor.cancel_soon()` was called!\n' + f'>> scheduling `Actor.cancel()`\n' + f'{actor_repr}' + ) + assert self._service_tn + self._service_tn.start_soon( self.cancel, None, # self cancel all rpc tasks ) + # schedule a root-tn canceller task once the service tn + # is fully shutdown. + async def cancel_root_tn_after_services(): + log.runtime( + 'Waiting on service-tn to cancel..\n' + f'c>)\n' + f'|_{self._service_tn.cancel_scope!r}\n' + ) + await self._cancel_complete.wait() + log.cancel( + f'`._service_tn` cancelled\n' + f'>c)\n' + f'|_{self._service_tn.cancel_scope!r}\n' + f'\n' + f'>> cancelling `._root_tn`\n' + f'c>(\n' + f' |_{self._root_tn.cancel_scope!r}\n' + ) + self._root_tn.cancel_scope.cancel() + + self._root_tn.start_soon( + cancel_root_tn_after_services + ) + @property def cancel_complete(self) -> bool: return self._cancel_complete.is_set() @@ -1119,8 +1156,8 @@ class Actor: await ipc_server.wait_for_shutdown() # cancel all rpc tasks permanently - if self._service_n: - self._service_n.cancel_scope.cancel() + if self._service_tn: + self._service_tn.cancel_scope.cancel() log_meth(msg) self._cancel_complete.set() @@ -1257,7 +1294,7 @@ class Actor: ''' Cancel all ongoing RPC tasks owned/spawned for a given `parent_chan: Channel` or simply all tasks (inside - `._service_n`) when `parent_chan=None`. + `._service_tn`) when `parent_chan=None`. ''' tasks: dict = self._rpc_tasks @@ -1469,40 +1506,56 @@ async def async_main( accept_addrs.append(addr.unwrap()) assert accept_addrs - # The "root" nursery ensures the channel with the immediate - # parent is kept alive as a resilient service until - # cancellation steps have (mostly) occurred in + + ya_root_tn: bool = bool(actor._root_tn) + ya_service_tn: bool = bool(actor._service_tn) + + # NOTE, a top-most "root" task-nursery ensures the channel + # with the immediate parent is kept alive as a resilient + # service until cancellation steps have (mostly) occurred in # a deterministic way. root_tn: trio.Nursery async with ( collapse_eg(), - trio.open_nursery() as root_tn, + maybe_open_nursery( + nursery=actor._root_tn, + ) as root_tn, + # trio.open_nursery() as root_tn, ): - # actor._root_n = root_tn - # assert actor._root_n + if ya_root_tn: + assert root_tn is actor._root_tn + else: + actor._root_tn = root_tn ipc_server: _server.IPCServer async with ( collapse_eg(), - trio.open_nursery() as service_nursery, + maybe_open_nursery( + nursery=actor._service_tn, + ) as service_tn, + # trio.open_nursery() as service_tn, _server.open_ipc_server( - parent_tn=service_nursery, - stream_handler_tn=service_nursery, + parent_tn=service_tn, + stream_handler_tn=service_tn, ) as ipc_server, # ) as actor._ipc_server, # ^TODO? prettier? ): - # This nursery is used to handle all inbound - # connections to us such that if the TCP server - # is killed, connections can continue to process - # in the background until this nursery is cancelled. - actor._service_n = service_nursery + if ya_service_tn: + assert service_tn is actor._service_tn + else: + # This nursery is used to handle all inbound + # connections to us such that if the TCP server + # is killed, connections can continue to process + # in the background until this nursery is cancelled. + actor._service_tn = service_tn + actor._ipc_server = ipc_server assert ( - actor._service_n + actor._service_tn and ( - actor._service_n + actor._service_tn is actor._ipc_server._parent_tn is @@ -1537,7 +1590,7 @@ async def async_main( try: eps: list = await ipc_server.listen_on( accept_addrs=accept_addrs, - stream_handler_nursery=service_nursery, + stream_handler_nursery=service_tn, ) log.runtime( f'Booted IPC server\n' @@ -1545,7 +1598,7 @@ async def async_main( ) assert ( (eps[0].listen_tn) - is not service_nursery + is not service_tn ) except OSError as oserr: @@ -1707,7 +1760,7 @@ async def async_main( # XXX TODO but hard XXX # we can't actually do this bc the debugger uses the - # _service_n to spawn the lock task, BUT, in theory if we had + # _service_tn to spawn the lock task, BUT, in theory if we had # the root nursery surround this finally block it might be # actually possible to debug THIS machinery in the same way # as user task code? diff --git a/tractor/devx/debug/_trace.py b/tractor/devx/debug/_trace.py index a7a1ce43..a23d2e23 100644 --- a/tractor/devx/debug/_trace.py +++ b/tractor/devx/debug/_trace.py @@ -481,12 +481,12 @@ async def _pause( # we have to figure out how to avoid having the service nursery # cancel on this task start? I *think* this works below: # ```python - # actor._service_n.cancel_scope.shield = shield + # actor._service_tn.cancel_scope.shield = shield # ``` # but not entirely sure if that's a sane way to implement it? # NOTE currently we spawn the lock request task inside this - # subactor's global `Actor._service_n` so that the + # subactor's global `Actor._service_tn` so that the # lifetime of the lock-request can outlive the current # `._pause()` scope while the user steps through their # application code and when they finally exit the @@ -510,7 +510,7 @@ async def _pause( f'|_{task}\n' ) with trio.CancelScope(shield=shield): - req_ctx: Context = await actor._service_n.start( + req_ctx: Context = await actor._service_tn.start( partial( request_root_stdio_lock, actor_uid=actor.uid, @@ -544,7 +544,7 @@ async def _pause( _repl_fail_report = None # when the actor is mid-runtime cancellation the - # `Actor._service_n` might get closed before we can spawn + # `Actor._service_tn` might get closed before we can spawn # the request task, so just ignore expected RTE. elif ( isinstance(pause_err, RuntimeError) @@ -989,7 +989,7 @@ def pause_from_sync( # that output and assign the `repl` created above! bg_task, _ = trio.from_thread.run( afn=partial( - actor._service_n.start, + actor._service_tn.start, partial( _pause_from_bg_root_thread, behalf_of_thread=thread, diff --git a/tractor/trionics/_mngrs.py b/tractor/trionics/_mngrs.py index 99f05852..3acfbeda 100644 --- a/tractor/trionics/_mngrs.py +++ b/tractor/trionics/_mngrs.py @@ -204,7 +204,7 @@ class _Cache: a kept-alive-while-in-use async resource. ''' - service_n: Optional[trio.Nursery] = None + service_tn: Optional[trio.Nursery] = None locks: dict[Hashable, trio.Lock] = {} users: int = 0 values: dict[Any, Any] = {} @@ -294,15 +294,15 @@ async def maybe_open_context( f'task: {task}\n' f'task_tn: {task_tn}\n' ) - service_n = tn + service_tn = tn else: - service_n: trio.Nursery = current_actor()._service_n + service_tn: trio.Nursery = current_actor()._service_tn # TODO: is there any way to allocate # a 'stays-open-till-last-task-finshed nursery? - # service_n: trio.Nursery - # async with maybe_open_nursery(_Cache.service_n) as service_n: - # _Cache.service_n = service_n + # service_tn: trio.Nursery + # async with maybe_open_nursery(_Cache.service_tn) as service_tn: + # _Cache.service_tn = service_tn cache_miss_ke: KeyError|None = None maybe_taskc: trio.Cancelled|None = None @@ -324,8 +324,8 @@ async def maybe_open_context( mngr = acm_func(**kwargs) resources = _Cache.resources assert not resources.get(ctx_key), f'Resource exists? {ctx_key}' - resources[ctx_key] = (service_n, trio.Event()) - yielded: Any = await service_n.start( + resources[ctx_key] = (service_tn, trio.Event()) + yielded: Any = await service_tn.start( _Cache.run_ctx, mngr, ctx_key,