Use collapser around root tn in `.async_main()`

Seems to cause the following test suites to fail however..

- 'test_advanced_faults.py::test_ipc_channel_break_during_stream'
- 'test_advanced_faults.py::test_ipc_channel_break_during_stream'
- 'test_clustering.py::test_empty_mngrs_input_raises'

Also tweak some ctxc request logging content.
strict_egs_everywhere
Tyler Goodlet 2025-06-16 11:58:59 -04:00
parent 83f53fd0c5
commit 9295af929c
1 changed files with 19 additions and 10 deletions

View File

@ -74,6 +74,9 @@ from tractor.msg import (
pretty_struct, pretty_struct,
types as msgtypes, types as msgtypes,
) )
from .trionics import (
collapse_eg,
)
from .ipc import ( from .ipc import (
Channel, Channel,
# IPCServer, # causes cycles atm.. # IPCServer, # causes cycles atm..
@ -1054,6 +1057,7 @@ class Actor:
cid: str, cid: str,
parent_chan: Channel, parent_chan: Channel,
requesting_uid: tuple[str, str]|None, requesting_uid: tuple[str, str]|None,
# ^^TODO! use the `Aid` directly here!
ipc_msg: dict|None|bool = False, ipc_msg: dict|None|bool = False,
@ -1099,9 +1103,12 @@ class Actor:
log.cancel( log.cancel(
'Rxed cancel request for RPC task\n' 'Rxed cancel request for RPC task\n'
f'<=c) {requesting_uid}\n' f'{ctx._task!r} <=c) {requesting_uid}\n'
f' |_{ctx._task}\n' f'|_>> {ctx.repr_rpc}\n'
f' >> {ctx.repr_rpc}\n'
# f'|_{ctx._task}\n'
# f' >> {ctx.repr_rpc}\n'
# f'=> {ctx._task}\n' # f'=> {ctx._task}\n'
# f' >> Actor._cancel_task() => {ctx._task}\n' # f' >> Actor._cancel_task() => {ctx._task}\n'
# f' |_ {ctx._task}\n\n' # f' |_ {ctx._task}\n\n'
@ -1386,10 +1393,12 @@ async def async_main(
# parent is kept alive as a resilient service until # parent is kept alive as a resilient service until
# cancellation steps have (mostly) occurred in # cancellation steps have (mostly) occurred in
# a deterministic way. # a deterministic way.
async with trio.open_nursery( root_tn: trio.Nursery
strict_exception_groups=False, async with (
) as root_nursery: collapse_eg(),
actor._root_n = root_nursery trio.open_nursery() as root_tn,
):
actor._root_n = root_tn
assert actor._root_n assert actor._root_n
ipc_server: _server.IPCServer ipc_server: _server.IPCServer
@ -1488,7 +1497,7 @@ async def async_main(
# their root actor over that channel. # their root actor over that channel.
if _state._runtime_vars['_is_root']: if _state._runtime_vars['_is_root']:
for addr in accept_addrs: for addr in accept_addrs:
waddr = wrap_address(addr) waddr: Address = wrap_address(addr)
if waddr == waddr.get_root(): if waddr == waddr.get_root():
_state._runtime_vars['_root_mailbox'] = addr _state._runtime_vars['_root_mailbox'] = addr
@ -1533,7 +1542,7 @@ async def async_main(
# start processing parent requests until our channel # start processing parent requests until our channel
# server is 100% up and running. # server is 100% up and running.
if actor._parent_chan: if actor._parent_chan:
await root_nursery.start( await root_tn.start(
partial( partial(
_rpc.process_messages, _rpc.process_messages,
chan=actor._parent_chan, chan=actor._parent_chan,