From 9295af929cddc0e752b5410e6f49f4494c30c49c Mon Sep 17 00:00:00 2001 From: Tyler Goodlet Date: Mon, 16 Jun 2025 11:58:59 -0400 Subject: [PATCH] Use collapser around root tn in `.async_main()` Seems to cause the following test suites to fail however.. - 'test_advanced_faults.py::test_ipc_channel_break_during_stream' - 'test_advanced_faults.py::test_ipc_channel_break_during_stream' - 'test_clustering.py::test_empty_mngrs_input_raises' Also tweak some ctxc request logging content. --- tractor/_runtime.py | 29 +++++++++++++++++++---------- 1 file changed, 19 insertions(+), 10 deletions(-) diff --git a/tractor/_runtime.py b/tractor/_runtime.py index 51bdf92f..1e51a469 100644 --- a/tractor/_runtime.py +++ b/tractor/_runtime.py @@ -74,6 +74,9 @@ from tractor.msg import ( pretty_struct, types as msgtypes, ) +from .trionics import ( + collapse_eg, +) from .ipc import ( Channel, # IPCServer, # causes cycles atm.. @@ -345,7 +348,7 @@ class Actor: def pformat( self, - ds: str = ':', + ds: str = ': ', indent: int = 0, ) -> str: fields_sect_prefix: str = ' |_' @@ -1054,6 +1057,7 @@ class Actor: cid: str, parent_chan: Channel, requesting_uid: tuple[str, str]|None, + # ^^TODO! use the `Aid` directly here! ipc_msg: dict|None|bool = False, @@ -1099,9 +1103,12 @@ class Actor: log.cancel( 'Rxed cancel request for RPC task\n' - f'<=c) {requesting_uid}\n' - f' |_{ctx._task}\n' - f' >> {ctx.repr_rpc}\n' + f'{ctx._task!r} <=c) {requesting_uid}\n' + f'|_>> {ctx.repr_rpc}\n' + + # f'|_{ctx._task}\n' + # f' >> {ctx.repr_rpc}\n' + # f'=> {ctx._task}\n' # f' >> Actor._cancel_task() => {ctx._task}\n' # f' |_ {ctx._task}\n\n' @@ -1386,10 +1393,12 @@ async def async_main( # parent is kept alive as a resilient service until # cancellation steps have (mostly) occurred in # a deterministic way. - async with trio.open_nursery( - strict_exception_groups=False, - ) as root_nursery: - actor._root_n = root_nursery + root_tn: trio.Nursery + async with ( + collapse_eg(), + trio.open_nursery() as root_tn, + ): + actor._root_n = root_tn assert actor._root_n ipc_server: _server.IPCServer @@ -1488,7 +1497,7 @@ async def async_main( # their root actor over that channel. if _state._runtime_vars['_is_root']: for addr in accept_addrs: - waddr = wrap_address(addr) + waddr: Address = wrap_address(addr) if waddr == waddr.get_root(): _state._runtime_vars['_root_mailbox'] = addr @@ -1533,7 +1542,7 @@ async def async_main( # start processing parent requests until our channel # server is 100% up and running. if actor._parent_chan: - await root_nursery.start( + await root_tn.start( partial( _rpc.process_messages, chan=actor._parent_chan,