From 6dc5f4c914cf13cb60ecf2b6d93b84024e599be6 Mon Sep 17 00:00:00 2001 From: Tyler Goodlet Date: Mon, 16 Jun 2025 11:58:59 -0400 Subject: [PATCH] Use collapser around root tn in `.async_main()` Seems to cause the following test suites to fail however.. - 'test_advanced_faults.py::test_ipc_channel_break_during_stream' - 'test_advanced_faults.py::test_ipc_channel_break_during_stream' - 'test_clustering.py::test_empty_mngrs_input_raises' Also tweak some ctxc request logging content. --- tractor/_runtime.py | 27 ++++++++++++++++++--------- 1 file changed, 18 insertions(+), 9 deletions(-) diff --git a/tractor/_runtime.py b/tractor/_runtime.py index e7475662..2f9749a2 100644 --- a/tractor/_runtime.py +++ b/tractor/_runtime.py @@ -74,6 +74,9 @@ from tractor.msg import ( pretty_struct, types as msgtypes, ) +from .trionics import ( + collapse_eg, +) from .ipc import ( Channel, # IPCServer, # causes cycles atm.. @@ -345,7 +348,7 @@ class Actor: def pformat( self, - ds: str = ':', + ds: str = ': ', indent: int = 0, ) -> str: fields_sect_prefix: str = ' |_' @@ -1052,6 +1055,7 @@ class Actor: cid: str, parent_chan: Channel, requesting_uid: tuple[str, str]|None, + # ^^TODO! use the `Aid` directly here! ipc_msg: dict|None|bool = False, @@ -1097,9 +1101,12 @@ class Actor: log.cancel( 'Rxed cancel request for RPC task\n' - f'<=c) {requesting_uid}\n' - f' |_{ctx._task}\n' - f' >> {ctx.repr_rpc}\n' + f'{ctx._task!r} <=c) {requesting_uid}\n' + f'|_>> {ctx.repr_rpc}\n' + + # f'|_{ctx._task}\n' + # f' >> {ctx.repr_rpc}\n' + # f'=> {ctx._task}\n' # f' >> Actor._cancel_task() => {ctx._task}\n' # f' |_ {ctx._task}\n\n' @@ -1384,10 +1391,12 @@ async def async_main( # parent is kept alive as a resilient service until # cancellation steps have (mostly) occurred in # a deterministic way. - async with trio.open_nursery( - strict_exception_groups=False, - ) as root_nursery: - actor._root_n = root_nursery + root_tn: trio.Nursery + async with ( + collapse_eg(), + trio.open_nursery() as root_tn, + ): + actor._root_n = root_tn assert actor._root_n ipc_server: _server.IPCServer @@ -1533,7 +1542,7 @@ async def async_main( # start processing parent requests until our channel # server is 100% up and running. if actor._parent_chan: - await root_nursery.start( + await root_tn.start( partial( _rpc.process_messages, chan=actor._parent_chan,