Use collapser around root tn in `.async_main()`

Seems to cause the following test suites to fail however..

- 'test_advanced_faults.py::test_ipc_channel_break_during_stream'
- 'test_advanced_faults.py::test_ipc_channel_break_during_stream'
- 'test_clustering.py::test_empty_mngrs_input_raises'

Also tweak some ctxc request logging content.
enable_tpts
Tyler Goodlet 2025-06-16 11:58:59 -04:00
parent 5f6240939f
commit 6dc5f4c914
1 changed files with 18 additions and 9 deletions

View File

@ -74,6 +74,9 @@ from tractor.msg import (
pretty_struct,
types as msgtypes,
)
from .trionics import (
collapse_eg,
)
from .ipc import (
Channel,
# IPCServer, # causes cycles atm..
@ -345,7 +348,7 @@ class Actor:
def pformat(
self,
ds: str = ':',
ds: str = ': ',
indent: int = 0,
) -> str:
fields_sect_prefix: str = ' |_'
@ -1052,6 +1055,7 @@ class Actor:
cid: str,
parent_chan: Channel,
requesting_uid: tuple[str, str]|None,
# ^^TODO! use the `Aid` directly here!
ipc_msg: dict|None|bool = False,
@ -1097,9 +1101,12 @@ class Actor:
log.cancel(
'Rxed cancel request for RPC task\n'
f'<=c) {requesting_uid}\n'
f' |_{ctx._task}\n'
f' >> {ctx.repr_rpc}\n'
f'{ctx._task!r} <=c) {requesting_uid}\n'
f'|_>> {ctx.repr_rpc}\n'
# f'|_{ctx._task}\n'
# f' >> {ctx.repr_rpc}\n'
# f'=> {ctx._task}\n'
# f' >> Actor._cancel_task() => {ctx._task}\n'
# f' |_ {ctx._task}\n\n'
@ -1384,10 +1391,12 @@ async def async_main(
# parent is kept alive as a resilient service until
# cancellation steps have (mostly) occurred in
# a deterministic way.
async with trio.open_nursery(
strict_exception_groups=False,
) as root_nursery:
actor._root_n = root_nursery
root_tn: trio.Nursery
async with (
collapse_eg(),
trio.open_nursery() as root_tn,
):
actor._root_n = root_tn
assert actor._root_n
ipc_server: _server.IPCServer
@ -1533,7 +1542,7 @@ async def async_main(
# start processing parent requests until our channel
# server is 100% up and running.
if actor._parent_chan:
await root_nursery.start(
await root_tn.start(
partial(
_rpc.process_messages,
chan=actor._parent_chan,