Merge pull request #390 from goodboy/strict_egs_everywhere

Strict egs everywhere: drop use of `strict_exception_groups=False` throughout!
main
Bd 2025-08-18 14:15:49 -04:00 committed by GitHub
commit b05abea51e
No known key found for this signature in database
GPG Key ID: B5690EEEBB952194
23 changed files with 426 additions and 192 deletions

View File

@ -16,6 +16,7 @@ from tractor import (
ContextCancelled, ContextCancelled,
MsgStream, MsgStream,
_testing, _testing,
trionics,
) )
import trio import trio
import pytest import pytest
@ -62,9 +63,8 @@ async def recv_and_spawn_net_killers(
await ctx.started() await ctx.started()
async with ( async with (
ctx.open_stream() as stream, ctx.open_stream() as stream,
trio.open_nursery( trionics.collapse_eg(),
strict_exception_groups=False, trio.open_nursery() as tn,
) as tn,
): ):
async for i in stream: async for i in stream:
print(f'child echoing {i}') print(f'child echoing {i}')

View File

@ -23,9 +23,8 @@ async def main():
modules=[__name__] modules=[__name__]
) as portal_map, ) as portal_map,
trio.open_nursery( tractor.trionics.collapse_eg(),
strict_exception_groups=False, trio.open_nursery() as tn,
) as tn,
): ):
for (name, portal) in portal_map.items(): for (name, portal) in portal_map.items():

View File

@ -317,7 +317,6 @@ def test_subactor_breakpoint(
assert in_prompt_msg( assert in_prompt_msg(
child, [ child, [
'MessagingError:',
'RemoteActorError:', 'RemoteActorError:',
"('breakpoint_forever'", "('breakpoint_forever'",
'bdb.BdbQuit', 'bdb.BdbQuit',

View File

@ -313,9 +313,8 @@ async def inf_streamer(
# `trio.EndOfChannel` doesn't propagate directly to the above # `trio.EndOfChannel` doesn't propagate directly to the above
# .open_stream() parent, resulting in it also raising instead # .open_stream() parent, resulting in it also raising instead
# of gracefully absorbing as normal.. so how to handle? # of gracefully absorbing as normal.. so how to handle?
trio.open_nursery( tractor.trionics.collapse_eg(),
strict_exception_groups=False, trio.open_nursery() as tn,
) as tn,
): ):
async def close_stream_on_sentinel(): async def close_stream_on_sentinel():
async for msg in stream: async for msg in stream:

View File

@ -236,7 +236,10 @@ async def stream_forever():
async def test_cancel_infinite_streamer(start_method): async def test_cancel_infinite_streamer(start_method):
# stream for at most 1 seconds # stream for at most 1 seconds
with trio.move_on_after(1) as cancel_scope: with (
trio.fail_after(4),
trio.move_on_after(1) as cancel_scope
):
async with tractor.open_nursery() as n: async with tractor.open_nursery() as n:
portal = await n.start_actor( portal = await n.start_actor(
'donny', 'donny',
@ -284,20 +287,32 @@ async def test_cancel_infinite_streamer(start_method):
], ],
) )
@tractor_test @tractor_test
async def test_some_cancels_all(num_actors_and_errs, start_method, loglevel): async def test_some_cancels_all(
"""Verify a subset of failed subactors causes all others in num_actors_and_errs: tuple,
start_method: str,
loglevel: str,
):
'''
Verify a subset of failed subactors causes all others in
the nursery to be cancelled just like the strategy in trio. the nursery to be cancelled just like the strategy in trio.
This is the first and only supervisory strategy at the moment. This is the first and only supervisory strategy at the moment.
"""
num_actors, first_err, err_type, ria_func, da_func = num_actors_and_errs '''
(
num_actors,
first_err,
err_type,
ria_func,
da_func,
) = num_actors_and_errs
try: try:
async with tractor.open_nursery() as n: async with tractor.open_nursery() as an:
# spawn the same number of deamon actors which should be cancelled # spawn the same number of deamon actors which should be cancelled
dactor_portals = [] dactor_portals = []
for i in range(num_actors): for i in range(num_actors):
dactor_portals.append(await n.start_actor( dactor_portals.append(await an.start_actor(
f'deamon_{i}', f'deamon_{i}',
enable_modules=[__name__], enable_modules=[__name__],
)) ))
@ -307,7 +322,7 @@ async def test_some_cancels_all(num_actors_and_errs, start_method, loglevel):
for i in range(num_actors): for i in range(num_actors):
# start actor(s) that will fail immediately # start actor(s) that will fail immediately
riactor_portals.append( riactor_portals.append(
await n.run_in_actor( await an.run_in_actor(
func, func,
name=f'actor_{i}', name=f'actor_{i}',
**kwargs **kwargs
@ -337,7 +352,8 @@ async def test_some_cancels_all(num_actors_and_errs, start_method, loglevel):
# should error here with a ``RemoteActorError`` or ``MultiError`` # should error here with a ``RemoteActorError`` or ``MultiError``
except first_err as err: except first_err as _err:
err = _err
if isinstance(err, BaseExceptionGroup): if isinstance(err, BaseExceptionGroup):
assert len(err.exceptions) == num_actors assert len(err.exceptions) == num_actors
for exc in err.exceptions: for exc in err.exceptions:
@ -348,8 +364,8 @@ async def test_some_cancels_all(num_actors_and_errs, start_method, loglevel):
elif isinstance(err, tractor.RemoteActorError): elif isinstance(err, tractor.RemoteActorError):
assert err.boxed_type == err_type assert err.boxed_type == err_type
assert n.cancelled is True assert an.cancelled is True
assert not n._children assert not an._children
else: else:
pytest.fail("Should have gotten a remote assertion error?") pytest.fail("Should have gotten a remote assertion error?")
@ -519,10 +535,15 @@ def test_cancel_via_SIGINT_other_task(
async def main(): async def main():
# should never timeout since SIGINT should cancel the current program # should never timeout since SIGINT should cancel the current program
with trio.fail_after(timeout): with trio.fail_after(timeout):
async with trio.open_nursery( async with (
# XXX ?TODO? why no work!?
# tractor.trionics.collapse_eg(),
trio.open_nursery(
strict_exception_groups=False, strict_exception_groups=False,
) as n: ) as tn,
await n.start(spawn_and_sleep_forever) ):
await tn.start(spawn_and_sleep_forever)
if 'mp' in spawn_backend: if 'mp' in spawn_backend:
time.sleep(0.1) time.sleep(0.1)
os.kill(pid, signal.SIGINT) os.kill(pid, signal.SIGINT)
@ -533,38 +554,123 @@ def test_cancel_via_SIGINT_other_task(
async def spin_for(period=3): async def spin_for(period=3):
"Sync sleep." "Sync sleep."
print(f'sync sleeping in sub-sub for {period}\n')
time.sleep(period) time.sleep(period)
async def spawn(): async def spawn_sub_with_sync_blocking_task():
async with tractor.open_nursery() as tn: async with tractor.open_nursery() as an:
await tn.run_in_actor( print('starting sync blocking subactor..\n')
await an.run_in_actor(
spin_for, spin_for,
name='sleeper', name='sleeper',
) )
print('exiting first subactor layer..\n')
@pytest.mark.parametrize(
'man_cancel_outer',
[
False, # passes if delay != 2
# always causes an unexpected eg-w-embedded-assert-err?
pytest.param(True,
marks=pytest.mark.xfail(
reason=(
'always causes an unexpected eg-w-embedded-assert-err?'
)
),
),
],
)
@no_windows @no_windows
def test_cancel_while_childs_child_in_sync_sleep( def test_cancel_while_childs_child_in_sync_sleep(
loglevel, loglevel: str,
start_method, start_method: str,
spawn_backend, spawn_backend: str,
debug_mode: bool,
reg_addr: tuple,
man_cancel_outer: bool,
): ):
"""Verify that a child cancelled while executing sync code is torn '''
Verify that a child cancelled while executing sync code is torn
down even when that cancellation is triggered by the parent down even when that cancellation is triggered by the parent
2 nurseries "up". 2 nurseries "up".
"""
Though the grandchild should stay blocking its actor runtime, its
parent should issue a "zombie reaper" to hard kill it after
sufficient timeout.
'''
if start_method == 'forkserver': if start_method == 'forkserver':
pytest.skip("Forksever sux hard at resuming from sync sleep...") pytest.skip("Forksever sux hard at resuming from sync sleep...")
async def main(): async def main():
with trio.fail_after(2): #
async with tractor.open_nursery() as tn: # XXX BIG TODO NOTE XXX
await tn.run_in_actor( #
spawn, # it seems there's a strange race that can happen
name='spawn', # where where the fail-after will trigger outer scope
# .cancel() which then causes the inner scope to raise,
#
# BaseExceptionGroup('Exceptions from Trio nursery', [
# BaseExceptionGroup('Exceptions from Trio nursery',
# [
# Cancelled(),
# Cancelled(),
# ]
# ),
# AssertionError('assert 0')
# ])
#
# WHY THIS DOESN'T MAKE SENSE:
# ---------------------------
# - it should raise too-slow-error when too slow..
# * verified that using simple-cs and manually cancelling
# you get same outcome -> indicates that the fail-after
# can have its TooSlowError overriden!
# |_ to check this it's easy, simplly decrease the timeout
# as per the var below.
#
# - when using the manual simple-cs the outcome is different
# DESPITE the `assert 0` which means regardless of the
# inner scope effectively failing in the same way, the
# bubbling up **is NOT the same**.
#
# delays trigger diff outcomes..
# ---------------------------
# as seen by uncommenting various lines below there is from
# my POV an unexpected outcome due to the delay=2 case.
#
# delay = 1 # no AssertionError in eg, TooSlowError raised.
# delay = 2 # is AssertionError in eg AND no TooSlowError !?
delay = 4 # is AssertionError in eg AND no _cs cancellation.
with trio.fail_after(delay) as _cs:
# with trio.CancelScope() as cs:
# ^XXX^ can be used instead to see same outcome.
async with (
# tractor.trionics.collapse_eg(), # doesn't help
tractor.open_nursery(
hide_tb=False,
debug_mode=debug_mode,
registry_addrs=[reg_addr],
) as an,
):
await an.run_in_actor(
spawn_sub_with_sync_blocking_task,
name='sync_blocking_sub',
) )
await trio.sleep(1) await trio.sleep(1)
if man_cancel_outer:
print('Cancelling manually in root')
_cs.cancel()
# trigger exc-srced taskc down
# the actor tree.
print('RAISING IN ROOT')
assert 0 assert 0
with pytest.raises(AssertionError): with pytest.raises(AssertionError):

View File

@ -117,9 +117,10 @@ async def open_actor_local_nursery(
ctx: tractor.Context, ctx: tractor.Context,
): ):
global _nursery global _nursery
async with trio.open_nursery( async with (
strict_exception_groups=False, tractor.trionics.collapse_eg(),
) as tn: trio.open_nursery() as tn
):
_nursery = tn _nursery = tn
await ctx.started() await ctx.started()
await trio.sleep(10) await trio.sleep(10)

View File

@ -13,26 +13,24 @@ MESSAGE = 'tractoring at full speed'
def test_empty_mngrs_input_raises() -> None: def test_empty_mngrs_input_raises() -> None:
async def main(): async def main():
with trio.fail_after(1): with trio.fail_after(3):
async with ( async with (
open_actor_cluster( open_actor_cluster(
modules=[__name__], modules=[__name__],
# NOTE: ensure we can passthrough runtime opts # NOTE: ensure we can passthrough runtime opts
loglevel='info', loglevel='cancel',
# debug_mode=True, debug_mode=False,
) as portals, ) as portals,
gather_contexts( gather_contexts(mngrs=()),
# NOTE: it's the use of inline-generator syntax
# here that causes the empty input.
mngrs=(
p.open_context(worker) for p in portals.values()
),
),
): ):
assert 0 # should fail before this?
assert portals
# test should fail if we mk it here!
assert 0, 'Should have raised val-err !?'
with pytest.raises(ValueError): with pytest.raises(ValueError):
trio.run(main) trio.run(main)

View File

@ -11,6 +11,7 @@ import psutil
import pytest import pytest
import subprocess import subprocess
import tractor import tractor
from tractor.trionics import collapse_eg
from tractor._testing import tractor_test from tractor._testing import tractor_test
import trio import trio
@ -193,10 +194,10 @@ async def spawn_and_check_registry(
try: try:
async with tractor.open_nursery() as an: async with tractor.open_nursery() as an:
async with trio.open_nursery( async with (
strict_exception_groups=False, collapse_eg(),
) as trion: trio.open_nursery() as trion,
):
portals = {} portals = {}
for i in range(3): for i in range(3):
name = f'a{i}' name = f'a{i}'
@ -338,11 +339,12 @@ async def close_chans_before_nursery(
async with portal2.open_stream_from( async with portal2.open_stream_from(
stream_forever stream_forever
) as agen2: ) as agen2:
async with trio.open_nursery( async with (
strict_exception_groups=False, collapse_eg(),
) as n: trio.open_nursery() as tn,
n.start_soon(streamer, agen1) ):
n.start_soon(cancel, use_signal, .5) tn.start_soon(streamer, agen1)
tn.start_soon(cancel, use_signal, .5)
try: try:
await streamer(agen2) await streamer(agen2)
finally: finally:

View File

@ -234,10 +234,8 @@ async def trio_ctx(
with trio.fail_after(1 + delay): with trio.fail_after(1 + delay):
try: try:
async with ( async with (
trio.open_nursery( tractor.trionics.collapse_eg(),
# TODO, for new `trio` / py3.13 trio.open_nursery() as tn,
# strict_exception_groups=False,
) as tn,
tractor.to_asyncio.open_channel_from( tractor.to_asyncio.open_channel_from(
sleep_and_err, sleep_and_err,
) as (first, chan), ) as (first, chan),

View File

@ -235,10 +235,16 @@ async def cancel_after(wait, reg_addr):
@pytest.fixture(scope='module') @pytest.fixture(scope='module')
def time_quad_ex(reg_addr, ci_env, spawn_backend): def time_quad_ex(
reg_addr: tuple,
ci_env: bool,
spawn_backend: str,
):
if spawn_backend == 'mp': if spawn_backend == 'mp':
"""no idea but the mp *nix runs are flaking out here often... '''
""" no idea but the mp *nix runs are flaking out here often...
'''
pytest.skip("Test is too flaky on mp in CI") pytest.skip("Test is too flaky on mp in CI")
timeout = 7 if platform.system() in ('Windows', 'Darwin') else 4 timeout = 7 if platform.system() in ('Windows', 'Darwin') else 4
@ -249,12 +255,24 @@ def time_quad_ex(reg_addr, ci_env, spawn_backend):
return results, diff return results, diff
def test_a_quadruple_example(time_quad_ex, ci_env, spawn_backend): def test_a_quadruple_example(
"""This also serves as a kind of "we'd like to be this fast test".""" time_quad_ex: tuple,
ci_env: bool,
spawn_backend: str,
):
'''
This also serves as a kind of "we'd like to be this fast test".
'''
results, diff = time_quad_ex results, diff = time_quad_ex
assert results assert results
this_fast = 6 if platform.system() in ('Windows', 'Darwin') else 3 this_fast = (
6 if platform.system() in (
'Windows',
'Darwin',
)
else 3
)
assert diff < this_fast assert diff < this_fast

View File

@ -147,8 +147,7 @@ def test_trio_prestarted_task_bubbles(
await trio.sleep_forever() await trio.sleep_forever()
async def _trio_main(): async def _trio_main():
# with trio.fail_after(2): with trio.fail_after(2 if not debug_mode else 999):
with trio.fail_after(999):
first: str first: str
chan: to_asyncio.LinkedTaskChannel chan: to_asyncio.LinkedTaskChannel
aio_ev = asyncio.Event() aio_ev = asyncio.Event()
@ -217,32 +216,25 @@ def test_trio_prestarted_task_bubbles(
): ):
aio_ev.set() aio_ev.set()
with pytest.raises(
expected_exception=ExceptionGroup,
) as excinfo:
tractor.to_asyncio.run_as_asyncio_guest(
trio_main=_trio_main,
)
eg = excinfo.value
rte_eg, rest_eg = eg.split(RuntimeError)
# ensure the trio-task's error bubbled despite the aio-side # ensure the trio-task's error bubbled despite the aio-side
# having (maybe) errored first. # having (maybe) errored first.
if aio_err_trigger in ( if aio_err_trigger in (
'after_trio_task_starts', 'after_trio_task_starts',
'after_start_point', 'after_start_point',
): ):
assert len(errs := rest_eg.exceptions) == 1 patt: str = 'trio-side'
typerr = errs[0] expect_exc = TypeError
assert (
type(typerr) is TypeError
and
'trio-side' in typerr.args
)
# when aio errors BEFORE (last) trio task is scheduled, we should # when aio errors BEFORE (last) trio task is scheduled, we should
# never see anythinb but the aio-side. # never see anythinb but the aio-side.
else: else:
assert len(rtes := rte_eg.exceptions) == 1 patt: str = 'asyncio-side'
assert 'asyncio-side' in rtes[0].args[0] expect_exc = RuntimeError
with pytest.raises(expect_exc) as excinfo:
tractor.to_asyncio.run_as_asyncio_guest(
trio_main=_trio_main,
)
caught_exc = excinfo.value
assert patt in caught_exc.args

View File

@ -8,6 +8,7 @@ from contextlib import (
) )
import pytest import pytest
from tractor.trionics import collapse_eg
import trio import trio
from trio import TaskStatus from trio import TaskStatus
@ -64,9 +65,8 @@ def test_stashed_child_nursery(use_start_soon):
async def main(): async def main():
async with ( async with (
trio.open_nursery( collapse_eg(),
strict_exception_groups=False, trio.open_nursery() as pn,
) as pn,
): ):
cn = await pn.start(mk_child_nursery) cn = await pn.start(mk_child_nursery)
assert cn assert cn
@ -197,10 +197,8 @@ def test_gatherctxs_with_memchan_breaks_multicancelled(
async with ( async with (
# XXX should ensure ONLY the KBI # XXX should ensure ONLY the KBI
# is relayed upward # is relayed upward
trionics.collapse_eg(), collapse_eg(),
trio.open_nursery( trio.open_nursery(), # as tn,
# strict_exception_groups=False,
), # as tn,
trionics.gather_contexts([ trionics.gather_contexts([
open_memchan(), open_memchan(),

View File

@ -55,10 +55,17 @@ async def open_actor_cluster(
raise ValueError( raise ValueError(
'Number of names is {len(names)} but count it {count}') 'Number of names is {len(names)} but count it {count}')
async with tractor.open_nursery( async with (
# tractor.trionics.collapse_eg(),
tractor.open_nursery(
**runtime_kwargs, **runtime_kwargs,
) as an: ) as an
async with trio.open_nursery() as n: ):
async with (
# tractor.trionics.collapse_eg(),
trio.open_nursery() as tn,
tractor.trionics.maybe_raise_from_masking_exc()
):
uid = tractor.current_actor().uid uid = tractor.current_actor().uid
async def _start(name: str) -> None: async def _start(name: str) -> None:
@ -69,9 +76,8 @@ async def open_actor_cluster(
) )
for name in names: for name in names:
n.start_soon(_start, name) tn.start_soon(_start, name)
assert len(portals) == count assert len(portals) == count
yield portals yield portals
await an.cancel(hard_kill=hard_kill) await an.cancel(hard_kill=hard_kill)

View File

@ -101,6 +101,9 @@ from ._state import (
debug_mode, debug_mode,
_ctxvar_Context, _ctxvar_Context,
) )
from .trionics import (
collapse_eg,
)
# ------ - ------ # ------ - ------
if TYPE_CHECKING: if TYPE_CHECKING:
from ._portal import Portal from ._portal import Portal
@ -942,7 +945,7 @@ class Context:
self.cancel_called = True self.cancel_called = True
header: str = ( header: str = (
f'Cancelling ctx from {side.upper()}-side\n' f'Cancelling ctx from {side!r}-side\n'
) )
reminfo: str = ( reminfo: str = (
# ' =>\n' # ' =>\n'
@ -2025,10 +2028,8 @@ async def open_context_from_portal(
ctxc_from_callee: ContextCancelled|None = None ctxc_from_callee: ContextCancelled|None = None
try: try:
async with ( async with (
trio.open_nursery( collapse_eg(),
strict_exception_groups=False, trio.open_nursery() as tn,
) as tn,
msgops.maybe_limit_plds( msgops.maybe_limit_plds(
ctx=ctx, ctx=ctx,
spec=ctx_meta.get('pld_spec'), spec=ctx_meta.get('pld_spec'),

View File

@ -28,7 +28,10 @@ from typing import (
from contextlib import asynccontextmanager as acm from contextlib import asynccontextmanager as acm
from tractor.log import get_logger from tractor.log import get_logger
from .trionics import gather_contexts from .trionics import (
gather_contexts,
collapse_eg,
)
from .ipc import _connect_chan, Channel from .ipc import _connect_chan, Channel
from ._addr import ( from ._addr import (
UnwrappedAddress, UnwrappedAddress,
@ -87,7 +90,6 @@ async def get_registry(
yield regstr_ptl yield regstr_ptl
@acm @acm
async def get_root( async def get_root(
**kwargs, **kwargs,
@ -253,9 +255,12 @@ async def find_actor(
for addr in registry_addrs for addr in registry_addrs
) )
portals: list[Portal] portals: list[Portal]
async with gather_contexts( async with (
collapse_eg(),
gather_contexts(
mngrs=maybe_portals, mngrs=maybe_portals,
) as portals: ) as portals,
):
# log.runtime( # log.runtime(
# 'Gathered portals:\n' # 'Gathered portals:\n'
# f'{portals}' # f'{portals}'

View File

@ -39,7 +39,10 @@ import warnings
import trio import trio
from .trionics import maybe_open_nursery from .trionics import (
maybe_open_nursery,
collapse_eg,
)
from ._state import ( from ._state import (
current_actor, current_actor,
) )
@ -583,14 +586,13 @@ async def open_portal(
assert actor assert actor
was_connected: bool = False was_connected: bool = False
async with maybe_open_nursery( async with (
collapse_eg(),
maybe_open_nursery(
tn, tn,
shield=shield, shield=shield,
strict_exception_groups=False, ) as tn,
# ^XXX^ TODO? soo roll our own then ?? ):
# -> since we kinda want the "if only one `.exception` then
# just raise that" interface?
) as tn:
if not channel.connected(): if not channel.connected():
await channel.connect() await channel.connect()

View File

@ -37,13 +37,7 @@ import warnings
import trio import trio
from ._runtime import ( from . import _runtime
Actor,
Arbiter,
# TODO: rename and make a non-actor subtype?
# Arbiter as Registry,
async_main,
)
from .devx import ( from .devx import (
debug, debug,
_frame_stack, _frame_stack,
@ -64,6 +58,7 @@ from ._addr import (
) )
from .trionics import ( from .trionics import (
is_multi_cancelled, is_multi_cancelled,
collapse_eg,
) )
from ._exceptions import ( from ._exceptions import (
RuntimeFailure, RuntimeFailure,
@ -102,7 +97,7 @@ async def maybe_block_bp(
): ):
logger.info( logger.info(
f'Found `greenback` installed @ {maybe_mod}\n' f'Found `greenback` installed @ {maybe_mod}\n'
'Enabling `tractor.pause_from_sync()` support!\n' f'Enabling `tractor.pause_from_sync()` support!\n'
) )
os.environ['PYTHONBREAKPOINT'] = ( os.environ['PYTHONBREAKPOINT'] = (
'tractor.devx.debug._sync_pause_from_builtin' 'tractor.devx.debug._sync_pause_from_builtin'
@ -197,9 +192,13 @@ async def open_root_actor(
# read-only state to sublayers? # read-only state to sublayers?
# extra_rt_vars: dict|None = None, # extra_rt_vars: dict|None = None,
) -> Actor: ) -> _runtime.Actor:
''' '''
Runtime init entry point for ``tractor``. Initialize the `tractor` runtime by starting a "root actor" in
a parent-most Python process.
All (disjoint) actor-process-trees-as-programs are created via
this entrypoint.
''' '''
# XXX NEVER allow nested actor-trees! # XXX NEVER allow nested actor-trees!
@ -397,7 +396,7 @@ async def open_root_actor(
f'Registry(s) seem(s) to exist @ {ponged_addrs}' f'Registry(s) seem(s) to exist @ {ponged_addrs}'
) )
actor = Actor( actor = _runtime.Actor(
name=name or 'anonymous', name=name or 'anonymous',
uuid=mk_uuid(), uuid=mk_uuid(),
registry_addrs=ponged_addrs, registry_addrs=ponged_addrs,
@ -436,7 +435,8 @@ async def open_root_actor(
# https://github.com/goodboy/tractor/pull/348 # https://github.com/goodboy/tractor/pull/348
# https://github.com/goodboy/tractor/issues/296 # https://github.com/goodboy/tractor/issues/296
actor = Arbiter( # TODO: rename as `RootActor` or is that even necessary?
actor = _runtime.Arbiter(
name=name or 'registrar', name=name or 'registrar',
uuid=mk_uuid(), uuid=mk_uuid(),
registry_addrs=registry_addrs, registry_addrs=registry_addrs,
@ -471,18 +471,21 @@ async def open_root_actor(
'-> Opening new registry @ ' '-> Opening new registry @ '
+ +
'\n'.join( '\n'.join(
f'@{addr}' for addr in reg_addrs f'{addr}' for addr in reg_addrs
) )
) )
logger.info(f'{report}\n') logger.info(f'{report}\n')
# start the actor runtime in a new task # start runtime in a bg sub-task, yield to caller.
async with trio.open_nursery( async with (
strict_exception_groups=False, collapse_eg(),
# ^XXX^ TODO? instead unpack any RAE as per "loose" style? trio.open_nursery() as root_tn,
) as nursery:
# ``_runtime.async_main()`` creates an internal nursery # XXX, finally-footgun below?
# -> see note on why shielding.
# maybe_raise_from_masking_exc(),
):
# `_runtime.async_main()` creates an internal nursery
# and blocks here until any underlying actor(-process) # and blocks here until any underlying actor(-process)
# tree has terminated thereby conducting so called # tree has terminated thereby conducting so called
# "end-to-end" structured concurrency throughout an # "end-to-end" structured concurrency throughout an
@ -490,9 +493,9 @@ async def open_root_actor(
# "actor runtime" primitives are SC-compat and thus all # "actor runtime" primitives are SC-compat and thus all
# transitively spawned actors/processes must be as # transitively spawned actors/processes must be as
# well. # well.
await nursery.start( await root_tn.start(
partial( partial(
async_main, _runtime.async_main,
actor, actor,
accept_addrs=trans_bind_addrs, accept_addrs=trans_bind_addrs,
parent_addr=None parent_addr=None
@ -540,7 +543,7 @@ async def open_root_actor(
raise raise
finally: finally:
# NOTE: not sure if we'll ever need this but it's # NOTE/TODO?, not sure if we'll ever need this but it's
# possibly better for even more determinism? # possibly better for even more determinism?
# logger.cancel( # logger.cancel(
# f'Waiting on {len(nurseries)} nurseries in root..') # f'Waiting on {len(nurseries)} nurseries in root..')

View File

@ -765,7 +765,6 @@ async def _invoke(
BaseExceptionGroup, BaseExceptionGroup,
BaseException, BaseException,
trio.Cancelled, trio.Cancelled,
) as _scope_err: ) as _scope_err:
scope_err = _scope_err scope_err = _scope_err
if ( if (

View File

@ -74,6 +74,9 @@ from tractor.msg import (
pretty_struct, pretty_struct,
types as msgtypes, types as msgtypes,
) )
from .trionics import (
collapse_eg,
)
from .ipc import ( from .ipc import (
Channel, Channel,
# IPCServer, # causes cycles atm.. # IPCServer, # causes cycles atm..
@ -1471,17 +1474,18 @@ async def async_main(
# parent is kept alive as a resilient service until # parent is kept alive as a resilient service until
# cancellation steps have (mostly) occurred in # cancellation steps have (mostly) occurred in
# a deterministic way. # a deterministic way.
async with trio.open_nursery( root_tn: trio.Nursery
strict_exception_groups=False, async with (
) as root_nursery: collapse_eg(),
actor._root_n = root_nursery trio.open_nursery() as root_tn,
):
actor._root_n = root_tn
assert actor._root_n assert actor._root_n
ipc_server: _server.IPCServer ipc_server: _server.IPCServer
async with ( async with (
trio.open_nursery( collapse_eg(),
strict_exception_groups=False, trio.open_nursery() as service_nursery,
) as service_nursery,
_server.open_ipc_server( _server.open_ipc_server(
parent_tn=service_nursery, parent_tn=service_nursery,
stream_handler_tn=service_nursery, stream_handler_tn=service_nursery,
@ -1605,7 +1609,7 @@ async def async_main(
# start processing parent requests until our channel # start processing parent requests until our channel
# server is 100% up and running. # server is 100% up and running.
if actor._parent_chan: if actor._parent_chan:
await root_nursery.start( await root_tn.start(
partial( partial(
_rpc.process_messages, _rpc.process_messages,
chan=actor._parent_chan, chan=actor._parent_chan,
@ -1756,9 +1760,7 @@ async def async_main(
f' {pformat(ipc_server._peers)}' f' {pformat(ipc_server._peers)}'
) )
log.runtime(teardown_report) log.runtime(teardown_report)
await ipc_server.wait_for_no_more_peers( await ipc_server.wait_for_no_more_peers()
shield=True,
)
teardown_report += ( teardown_report += (
'-]> all peer channels are complete.\n' '-]> all peer channels are complete.\n'

View File

@ -44,6 +44,7 @@ from ._runtime import Actor
from ._portal import Portal from ._portal import Portal
from .trionics import ( from .trionics import (
is_multi_cancelled, is_multi_cancelled,
collapse_eg,
) )
from ._exceptions import ( from ._exceptions import (
ContextCancelled, ContextCancelled,
@ -326,9 +327,10 @@ class ActorNursery:
server: IPCServer = self._actor.ipc_server server: IPCServer = self._actor.ipc_server
with trio.move_on_after(3) as cs: with trio.move_on_after(3) as cs:
async with trio.open_nursery( async with (
strict_exception_groups=False, collapse_eg(),
) as tn: trio.open_nursery() as tn,
):
subactor: Actor subactor: Actor
proc: trio.Process proc: trio.Process
@ -421,10 +423,10 @@ async def _open_and_supervise_one_cancels_all_nursery(
# `ActorNursery.start_actor()`). # `ActorNursery.start_actor()`).
# errors from this daemon actor nursery bubble up to caller # errors from this daemon actor nursery bubble up to caller
async with trio.open_nursery( async with (
strict_exception_groups=False, collapse_eg(),
# ^XXX^ TODO? instead unpack any RAE as per "loose" style? trio.open_nursery() as da_nursery,
) as da_nursery: ):
try: try:
# This is the inner level "run in actor" nursery. It is # This is the inner level "run in actor" nursery. It is
# awaited first since actors spawned in this way (using # awaited first since actors spawned in this way (using
@ -434,11 +436,10 @@ async def _open_and_supervise_one_cancels_all_nursery(
# immediately raised for handling by a supervisor strategy. # immediately raised for handling by a supervisor strategy.
# As such if the strategy propagates any error(s) upwards # As such if the strategy propagates any error(s) upwards
# the above "daemon actor" nursery will be notified. # the above "daemon actor" nursery will be notified.
async with trio.open_nursery( async with (
strict_exception_groups=False, collapse_eg(),
# ^XXX^ TODO? instead unpack any RAE as per "loose" style? trio.open_nursery() as ria_nursery,
) as ria_nursery: ):
an = ActorNursery( an = ActorNursery(
actor, actor,
ria_nursery, ria_nursery,

View File

@ -814,10 +814,14 @@ class Server(Struct):
async def wait_for_no_more_peers( async def wait_for_no_more_peers(
self, self,
shield: bool = False, # XXX, should this even be allowed?
# -> i've seen it cause hangs on teardown
# in `test_resource_cache.py`
# _shield: bool = False,
) -> None: ) -> None:
with trio.CancelScope(shield=shield):
await self._no_more_peers.wait() await self._no_more_peers.wait()
# with trio.CancelScope(shield=_shield):
# await self._no_more_peers.wait()
async def wait_for_peer( async def wait_for_peer(
self, self,

View File

@ -31,7 +31,7 @@ from ._broadcast import (
) )
from ._beg import ( from ._beg import (
collapse_eg as collapse_eg, collapse_eg as collapse_eg,
maybe_collapse_eg as maybe_collapse_eg, get_collapsed_eg as get_collapsed_eg,
is_multi_cancelled as is_multi_cancelled, is_multi_cancelled as is_multi_cancelled,
) )
from ._taskc import ( from ._taskc import (

View File

@ -15,8 +15,9 @@
# along with this program. If not, see <https://www.gnu.org/licenses/>. # along with this program. If not, see <https://www.gnu.org/licenses/>.
''' '''
`BaseExceptionGroup` related utils and helpers pertaining to `BaseExceptionGroup` utils and helpers pertaining to
first-class-`trio` from a historical perspective B) first-class-`trio` from a "historical" perspective, like "loose
exception group" task-nurseries.
''' '''
from contextlib import ( from contextlib import (
@ -24,27 +25,84 @@ from contextlib import (
) )
from typing import ( from typing import (
Literal, Literal,
Type,
) )
import trio import trio
# from trio._core._concat_tb import (
# concat_tb,
# )
def maybe_collapse_eg( # XXX NOTE
beg: BaseExceptionGroup, # taken verbatim from `trio._core._run` except,
# - remove the NONSTRICT_EXCEPTIONGROUP_NOTE deprecation-note
# guard-check; we know we want an explicit collapse.
# - mask out tb rewriting in collapse case, i don't think it really
# matters?
#
def collapse_exception_group(
excgroup: BaseExceptionGroup[BaseException],
) -> BaseException: ) -> BaseException:
"""Recursively collapse any single-exception groups into that single contained
exception.
"""
exceptions = list(excgroup.exceptions)
modified = False
for i, exc in enumerate(exceptions):
if isinstance(exc, BaseExceptionGroup):
new_exc = collapse_exception_group(exc)
if new_exc is not exc:
modified = True
exceptions[i] = new_exc
if (
len(exceptions) == 1
and isinstance(excgroup, BaseExceptionGroup)
# XXX trio's loose-setting condition..
# and NONSTRICT_EXCEPTIONGROUP_NOTE in getattr(excgroup, "__notes__", ())
):
# exceptions[0].__traceback__ = concat_tb(
# excgroup.__traceback__,
# exceptions[0].__traceback__,
# )
return exceptions[0]
elif modified:
return excgroup.derive(exceptions)
else:
return excgroup
def get_collapsed_eg(
beg: BaseExceptionGroup,
) -> BaseException|None:
''' '''
If the input beg can collapse to a single non-eg sub-exception, If the input beg can collapse to a single sub-exception which is
return it instead. itself **not** an eg, return it.
''' '''
if len(excs := beg.exceptions) == 1: maybe_exc = collapse_exception_group(beg)
return excs[0] if maybe_exc is beg:
return None
return beg return maybe_exc
@acm @acm
async def collapse_eg(): async def collapse_eg(
hide_tb: bool = True,
# XXX, for ex. will always show begs containing single taskc
ignore: set[Type[BaseException]] = {
# trio.Cancelled,
},
add_notes: bool = True,
bp: bool = False,
):
''' '''
If `BaseExceptionGroup` raised in the body scope is If `BaseExceptionGroup` raised in the body scope is
"collapse-able" (in the same way that "collapse-able" (in the same way that
@ -52,15 +110,58 @@ async def collapse_eg():
only raise the lone emedded non-eg in in place. only raise the lone emedded non-eg in in place.
''' '''
__tracebackhide__: bool = hide_tb
try: try:
yield yield
except* BaseException as beg: except BaseExceptionGroup as _beg:
if ( beg = _beg
exc := maybe_collapse_eg(beg)
) is not beg:
raise exc
raise beg if (
bp
and
len(beg.exceptions) > 1
):
import tractor
if tractor.current_actor(
err_on_no_runtime=False,
):
await tractor.pause(shield=True)
else:
breakpoint()
if (
(exc := get_collapsed_eg(beg))
and
type(exc) not in ignore
):
# TODO? report number of nested groups it was collapsed
# *from*?
if add_notes:
from_group_note: str = (
'( ^^^ this exc was collapsed from a group ^^^ )\n'
)
if (
from_group_note
not in
getattr(exc, "__notes__", ())
):
exc.add_note(from_group_note)
# raise exc
# ^^ this will leave the orig beg tb above with the
# "during the handling of <beg> the following.."
# So, instead do..
#
if cause := exc.__cause__:
raise exc from cause
else:
# suppress "during handling of <the beg>"
# output in tb/console.
raise exc from None
# keep original
raise # beg
def is_multi_cancelled( def is_multi_cancelled(