diff --git a/examples/advanced_faults/ipc_failure_during_stream.py b/examples/advanced_faults/ipc_failure_during_stream.py
index f3a709e0..c88e0dfe 100644
--- a/examples/advanced_faults/ipc_failure_during_stream.py
+++ b/examples/advanced_faults/ipc_failure_during_stream.py
@@ -16,6 +16,7 @@ from tractor import (
ContextCancelled,
MsgStream,
_testing,
+ trionics,
)
import trio
import pytest
@@ -62,9 +63,8 @@ async def recv_and_spawn_net_killers(
await ctx.started()
async with (
ctx.open_stream() as stream,
- trio.open_nursery(
- strict_exception_groups=False,
- ) as tn,
+ trionics.collapse_eg(),
+ trio.open_nursery() as tn,
):
async for i in stream:
print(f'child echoing {i}')
diff --git a/examples/quick_cluster.py b/examples/quick_cluster.py
index 2378a3cf..3fa4ca2a 100644
--- a/examples/quick_cluster.py
+++ b/examples/quick_cluster.py
@@ -23,9 +23,8 @@ async def main():
modules=[__name__]
) as portal_map,
- trio.open_nursery(
- strict_exception_groups=False,
- ) as tn,
+ tractor.trionics.collapse_eg(),
+ trio.open_nursery() as tn,
):
for (name, portal) in portal_map.items():
diff --git a/tests/devx/test_debugger.py b/tests/devx/test_debugger.py
index 975fbc03..6179ef01 100644
--- a/tests/devx/test_debugger.py
+++ b/tests/devx/test_debugger.py
@@ -317,7 +317,6 @@ def test_subactor_breakpoint(
assert in_prompt_msg(
child, [
- 'MessagingError:',
'RemoteActorError:',
"('breakpoint_forever'",
'bdb.BdbQuit',
diff --git a/tests/test_advanced_streaming.py b/tests/test_advanced_streaming.py
index 64f24167..907a2196 100644
--- a/tests/test_advanced_streaming.py
+++ b/tests/test_advanced_streaming.py
@@ -313,9 +313,8 @@ async def inf_streamer(
# `trio.EndOfChannel` doesn't propagate directly to the above
# .open_stream() parent, resulting in it also raising instead
# of gracefully absorbing as normal.. so how to handle?
- trio.open_nursery(
- strict_exception_groups=False,
- ) as tn,
+ tractor.trionics.collapse_eg(),
+ trio.open_nursery() as tn,
):
async def close_stream_on_sentinel():
async for msg in stream:
diff --git a/tests/test_cancellation.py b/tests/test_cancellation.py
index ca14ae4b..27fd59d7 100644
--- a/tests/test_cancellation.py
+++ b/tests/test_cancellation.py
@@ -236,7 +236,10 @@ async def stream_forever():
async def test_cancel_infinite_streamer(start_method):
# stream for at most 1 seconds
- with trio.move_on_after(1) as cancel_scope:
+ with (
+ trio.fail_after(4),
+ trio.move_on_after(1) as cancel_scope
+ ):
async with tractor.open_nursery() as n:
portal = await n.start_actor(
'donny',
@@ -284,20 +287,32 @@ async def test_cancel_infinite_streamer(start_method):
],
)
@tractor_test
-async def test_some_cancels_all(num_actors_and_errs, start_method, loglevel):
- """Verify a subset of failed subactors causes all others in
+async def test_some_cancels_all(
+ num_actors_and_errs: tuple,
+ start_method: str,
+ loglevel: str,
+):
+ '''
+ Verify a subset of failed subactors causes all others in
the nursery to be cancelled just like the strategy in trio.
This is the first and only supervisory strategy at the moment.
- """
- num_actors, first_err, err_type, ria_func, da_func = num_actors_and_errs
+
+ '''
+ (
+ num_actors,
+ first_err,
+ err_type,
+ ria_func,
+ da_func,
+ ) = num_actors_and_errs
try:
- async with tractor.open_nursery() as n:
+ async with tractor.open_nursery() as an:
# spawn the same number of deamon actors which should be cancelled
dactor_portals = []
for i in range(num_actors):
- dactor_portals.append(await n.start_actor(
+ dactor_portals.append(await an.start_actor(
f'deamon_{i}',
enable_modules=[__name__],
))
@@ -307,7 +322,7 @@ async def test_some_cancels_all(num_actors_and_errs, start_method, loglevel):
for i in range(num_actors):
# start actor(s) that will fail immediately
riactor_portals.append(
- await n.run_in_actor(
+ await an.run_in_actor(
func,
name=f'actor_{i}',
**kwargs
@@ -337,7 +352,8 @@ async def test_some_cancels_all(num_actors_and_errs, start_method, loglevel):
# should error here with a ``RemoteActorError`` or ``MultiError``
- except first_err as err:
+ except first_err as _err:
+ err = _err
if isinstance(err, BaseExceptionGroup):
assert len(err.exceptions) == num_actors
for exc in err.exceptions:
@@ -348,8 +364,8 @@ async def test_some_cancels_all(num_actors_and_errs, start_method, loglevel):
elif isinstance(err, tractor.RemoteActorError):
assert err.boxed_type == err_type
- assert n.cancelled is True
- assert not n._children
+ assert an.cancelled is True
+ assert not an._children
else:
pytest.fail("Should have gotten a remote assertion error?")
@@ -519,10 +535,15 @@ def test_cancel_via_SIGINT_other_task(
async def main():
# should never timeout since SIGINT should cancel the current program
with trio.fail_after(timeout):
- async with trio.open_nursery(
- strict_exception_groups=False,
- ) as n:
- await n.start(spawn_and_sleep_forever)
+ async with (
+
+ # XXX ?TODO? why no work!?
+ # tractor.trionics.collapse_eg(),
+ trio.open_nursery(
+ strict_exception_groups=False,
+ ) as tn,
+ ):
+ await tn.start(spawn_and_sleep_forever)
if 'mp' in spawn_backend:
time.sleep(0.1)
os.kill(pid, signal.SIGINT)
@@ -533,38 +554,123 @@ def test_cancel_via_SIGINT_other_task(
async def spin_for(period=3):
"Sync sleep."
+ print(f'sync sleeping in sub-sub for {period}\n')
time.sleep(period)
-async def spawn():
- async with tractor.open_nursery() as tn:
- await tn.run_in_actor(
+async def spawn_sub_with_sync_blocking_task():
+ async with tractor.open_nursery() as an:
+ print('starting sync blocking subactor..\n')
+ await an.run_in_actor(
spin_for,
name='sleeper',
)
+ print('exiting first subactor layer..\n')
+@pytest.mark.parametrize(
+ 'man_cancel_outer',
+ [
+ False, # passes if delay != 2
+
+ # always causes an unexpected eg-w-embedded-assert-err?
+ pytest.param(True,
+ marks=pytest.mark.xfail(
+ reason=(
+ 'always causes an unexpected eg-w-embedded-assert-err?'
+ )
+ ),
+ ),
+ ],
+)
@no_windows
def test_cancel_while_childs_child_in_sync_sleep(
- loglevel,
- start_method,
- spawn_backend,
+ loglevel: str,
+ start_method: str,
+ spawn_backend: str,
+ debug_mode: bool,
+ reg_addr: tuple,
+ man_cancel_outer: bool,
):
- """Verify that a child cancelled while executing sync code is torn
+ '''
+ Verify that a child cancelled while executing sync code is torn
down even when that cancellation is triggered by the parent
2 nurseries "up".
- """
+
+ Though the grandchild should stay blocking its actor runtime, its
+ parent should issue a "zombie reaper" to hard kill it after
+ sufficient timeout.
+
+ '''
if start_method == 'forkserver':
pytest.skip("Forksever sux hard at resuming from sync sleep...")
async def main():
- with trio.fail_after(2):
- async with tractor.open_nursery() as tn:
- await tn.run_in_actor(
- spawn,
- name='spawn',
+ #
+ # XXX BIG TODO NOTE XXX
+ #
+ # it seems there's a strange race that can happen
+ # where where the fail-after will trigger outer scope
+ # .cancel() which then causes the inner scope to raise,
+ #
+ # BaseExceptionGroup('Exceptions from Trio nursery', [
+ # BaseExceptionGroup('Exceptions from Trio nursery',
+ # [
+ # Cancelled(),
+ # Cancelled(),
+ # ]
+ # ),
+ # AssertionError('assert 0')
+ # ])
+ #
+ # WHY THIS DOESN'T MAKE SENSE:
+ # ---------------------------
+ # - it should raise too-slow-error when too slow..
+ # * verified that using simple-cs and manually cancelling
+ # you get same outcome -> indicates that the fail-after
+ # can have its TooSlowError overriden!
+ # |_ to check this it's easy, simplly decrease the timeout
+ # as per the var below.
+ #
+ # - when using the manual simple-cs the outcome is different
+ # DESPITE the `assert 0` which means regardless of the
+ # inner scope effectively failing in the same way, the
+ # bubbling up **is NOT the same**.
+ #
+ # delays trigger diff outcomes..
+ # ---------------------------
+ # as seen by uncommenting various lines below there is from
+ # my POV an unexpected outcome due to the delay=2 case.
+ #
+ # delay = 1 # no AssertionError in eg, TooSlowError raised.
+ # delay = 2 # is AssertionError in eg AND no TooSlowError !?
+ delay = 4 # is AssertionError in eg AND no _cs cancellation.
+
+ with trio.fail_after(delay) as _cs:
+ # with trio.CancelScope() as cs:
+ # ^XXX^ can be used instead to see same outcome.
+
+ async with (
+ # tractor.trionics.collapse_eg(), # doesn't help
+ tractor.open_nursery(
+ hide_tb=False,
+ debug_mode=debug_mode,
+ registry_addrs=[reg_addr],
+ ) as an,
+ ):
+ await an.run_in_actor(
+ spawn_sub_with_sync_blocking_task,
+ name='sync_blocking_sub',
)
await trio.sleep(1)
+
+ if man_cancel_outer:
+ print('Cancelling manually in root')
+ _cs.cancel()
+
+ # trigger exc-srced taskc down
+ # the actor tree.
+ print('RAISING IN ROOT')
assert 0
with pytest.raises(AssertionError):
diff --git a/tests/test_child_manages_service_nursery.py b/tests/test_child_manages_service_nursery.py
index 540e9b2e..6379afc6 100644
--- a/tests/test_child_manages_service_nursery.py
+++ b/tests/test_child_manages_service_nursery.py
@@ -117,9 +117,10 @@ async def open_actor_local_nursery(
ctx: tractor.Context,
):
global _nursery
- async with trio.open_nursery(
- strict_exception_groups=False,
- ) as tn:
+ async with (
+ tractor.trionics.collapse_eg(),
+ trio.open_nursery() as tn
+ ):
_nursery = tn
await ctx.started()
await trio.sleep(10)
diff --git a/tests/test_clustering.py b/tests/test_clustering.py
index 92362b58..603b2eb4 100644
--- a/tests/test_clustering.py
+++ b/tests/test_clustering.py
@@ -13,26 +13,24 @@ MESSAGE = 'tractoring at full speed'
def test_empty_mngrs_input_raises() -> None:
async def main():
- with trio.fail_after(1):
+ with trio.fail_after(3):
async with (
open_actor_cluster(
modules=[__name__],
# NOTE: ensure we can passthrough runtime opts
- loglevel='info',
- # debug_mode=True,
+ loglevel='cancel',
+ debug_mode=False,
) as portals,
- gather_contexts(
- # NOTE: it's the use of inline-generator syntax
- # here that causes the empty input.
- mngrs=(
- p.open_context(worker) for p in portals.values()
- ),
- ),
+ gather_contexts(mngrs=()),
):
- assert 0
+ # should fail before this?
+ assert portals
+
+ # test should fail if we mk it here!
+ assert 0, 'Should have raised val-err !?'
with pytest.raises(ValueError):
trio.run(main)
diff --git a/tests/test_discovery.py b/tests/test_discovery.py
index 65a76d08..453b1aa3 100644
--- a/tests/test_discovery.py
+++ b/tests/test_discovery.py
@@ -11,6 +11,7 @@ import psutil
import pytest
import subprocess
import tractor
+from tractor.trionics import collapse_eg
from tractor._testing import tractor_test
import trio
@@ -193,10 +194,10 @@ async def spawn_and_check_registry(
try:
async with tractor.open_nursery() as an:
- async with trio.open_nursery(
- strict_exception_groups=False,
- ) as trion:
-
+ async with (
+ collapse_eg(),
+ trio.open_nursery() as trion,
+ ):
portals = {}
for i in range(3):
name = f'a{i}'
@@ -338,11 +339,12 @@ async def close_chans_before_nursery(
async with portal2.open_stream_from(
stream_forever
) as agen2:
- async with trio.open_nursery(
- strict_exception_groups=False,
- ) as n:
- n.start_soon(streamer, agen1)
- n.start_soon(cancel, use_signal, .5)
+ async with (
+ collapse_eg(),
+ trio.open_nursery() as tn,
+ ):
+ tn.start_soon(streamer, agen1)
+ tn.start_soon(cancel, use_signal, .5)
try:
await streamer(agen2)
finally:
diff --git a/tests/test_infected_asyncio.py b/tests/test_infected_asyncio.py
index 195ed4cd..edd7ee47 100644
--- a/tests/test_infected_asyncio.py
+++ b/tests/test_infected_asyncio.py
@@ -234,10 +234,8 @@ async def trio_ctx(
with trio.fail_after(1 + delay):
try:
async with (
- trio.open_nursery(
- # TODO, for new `trio` / py3.13
- # strict_exception_groups=False,
- ) as tn,
+ tractor.trionics.collapse_eg(),
+ trio.open_nursery() as tn,
tractor.to_asyncio.open_channel_from(
sleep_and_err,
) as (first, chan),
diff --git a/tests/test_legacy_one_way_streaming.py b/tests/test_legacy_one_way_streaming.py
index 6092bca7..10cf3aed 100644
--- a/tests/test_legacy_one_way_streaming.py
+++ b/tests/test_legacy_one_way_streaming.py
@@ -235,10 +235,16 @@ async def cancel_after(wait, reg_addr):
@pytest.fixture(scope='module')
-def time_quad_ex(reg_addr, ci_env, spawn_backend):
+def time_quad_ex(
+ reg_addr: tuple,
+ ci_env: bool,
+ spawn_backend: str,
+):
if spawn_backend == 'mp':
- """no idea but the mp *nix runs are flaking out here often...
- """
+ '''
+ no idea but the mp *nix runs are flaking out here often...
+
+ '''
pytest.skip("Test is too flaky on mp in CI")
timeout = 7 if platform.system() in ('Windows', 'Darwin') else 4
@@ -249,12 +255,24 @@ def time_quad_ex(reg_addr, ci_env, spawn_backend):
return results, diff
-def test_a_quadruple_example(time_quad_ex, ci_env, spawn_backend):
- """This also serves as a kind of "we'd like to be this fast test"."""
+def test_a_quadruple_example(
+ time_quad_ex: tuple,
+ ci_env: bool,
+ spawn_backend: str,
+):
+ '''
+ This also serves as a kind of "we'd like to be this fast test".
+ '''
results, diff = time_quad_ex
assert results
- this_fast = 6 if platform.system() in ('Windows', 'Darwin') else 3
+ this_fast = (
+ 6 if platform.system() in (
+ 'Windows',
+ 'Darwin',
+ )
+ else 3
+ )
assert diff < this_fast
diff --git a/tests/test_root_infect_asyncio.py b/tests/test_root_infect_asyncio.py
index 93deba13..78f9b2b4 100644
--- a/tests/test_root_infect_asyncio.py
+++ b/tests/test_root_infect_asyncio.py
@@ -147,8 +147,7 @@ def test_trio_prestarted_task_bubbles(
await trio.sleep_forever()
async def _trio_main():
- # with trio.fail_after(2):
- with trio.fail_after(999):
+ with trio.fail_after(2 if not debug_mode else 999):
first: str
chan: to_asyncio.LinkedTaskChannel
aio_ev = asyncio.Event()
@@ -217,32 +216,25 @@ def test_trio_prestarted_task_bubbles(
):
aio_ev.set()
- with pytest.raises(
- expected_exception=ExceptionGroup,
- ) as excinfo:
- tractor.to_asyncio.run_as_asyncio_guest(
- trio_main=_trio_main,
- )
-
- eg = excinfo.value
- rte_eg, rest_eg = eg.split(RuntimeError)
-
# ensure the trio-task's error bubbled despite the aio-side
# having (maybe) errored first.
if aio_err_trigger in (
'after_trio_task_starts',
'after_start_point',
):
- assert len(errs := rest_eg.exceptions) == 1
- typerr = errs[0]
- assert (
- type(typerr) is TypeError
- and
- 'trio-side' in typerr.args
- )
+ patt: str = 'trio-side'
+ expect_exc = TypeError
# when aio errors BEFORE (last) trio task is scheduled, we should
# never see anythinb but the aio-side.
else:
- assert len(rtes := rte_eg.exceptions) == 1
- assert 'asyncio-side' in rtes[0].args[0]
+ patt: str = 'asyncio-side'
+ expect_exc = RuntimeError
+
+ with pytest.raises(expect_exc) as excinfo:
+ tractor.to_asyncio.run_as_asyncio_guest(
+ trio_main=_trio_main,
+ )
+
+ caught_exc = excinfo.value
+ assert patt in caught_exc.args
diff --git a/tests/test_trioisms.py b/tests/test_trioisms.py
index c68d75c1..ca1e6d55 100644
--- a/tests/test_trioisms.py
+++ b/tests/test_trioisms.py
@@ -8,6 +8,7 @@ from contextlib import (
)
import pytest
+from tractor.trionics import collapse_eg
import trio
from trio import TaskStatus
@@ -64,9 +65,8 @@ def test_stashed_child_nursery(use_start_soon):
async def main():
async with (
- trio.open_nursery(
- strict_exception_groups=False,
- ) as pn,
+ collapse_eg(),
+ trio.open_nursery() as pn,
):
cn = await pn.start(mk_child_nursery)
assert cn
@@ -197,10 +197,8 @@ def test_gatherctxs_with_memchan_breaks_multicancelled(
async with (
# XXX should ensure ONLY the KBI
# is relayed upward
- trionics.collapse_eg(),
- trio.open_nursery(
- # strict_exception_groups=False,
- ), # as tn,
+ collapse_eg(),
+ trio.open_nursery(), # as tn,
trionics.gather_contexts([
open_memchan(),
diff --git a/tractor/_clustering.py b/tractor/_clustering.py
index 46224d6f..dbb50304 100644
--- a/tractor/_clustering.py
+++ b/tractor/_clustering.py
@@ -55,10 +55,17 @@ async def open_actor_cluster(
raise ValueError(
'Number of names is {len(names)} but count it {count}')
- async with tractor.open_nursery(
- **runtime_kwargs,
- ) as an:
- async with trio.open_nursery() as n:
+ async with (
+ # tractor.trionics.collapse_eg(),
+ tractor.open_nursery(
+ **runtime_kwargs,
+ ) as an
+ ):
+ async with (
+ # tractor.trionics.collapse_eg(),
+ trio.open_nursery() as tn,
+ tractor.trionics.maybe_raise_from_masking_exc()
+ ):
uid = tractor.current_actor().uid
async def _start(name: str) -> None:
@@ -69,9 +76,8 @@ async def open_actor_cluster(
)
for name in names:
- n.start_soon(_start, name)
+ tn.start_soon(_start, name)
assert len(portals) == count
yield portals
-
await an.cancel(hard_kill=hard_kill)
diff --git a/tractor/_context.py b/tractor/_context.py
index 61994f98..7a2bf5c6 100644
--- a/tractor/_context.py
+++ b/tractor/_context.py
@@ -101,6 +101,9 @@ from ._state import (
debug_mode,
_ctxvar_Context,
)
+from .trionics import (
+ collapse_eg,
+)
# ------ - ------
if TYPE_CHECKING:
from ._portal import Portal
@@ -942,7 +945,7 @@ class Context:
self.cancel_called = True
header: str = (
- f'Cancelling ctx from {side.upper()}-side\n'
+ f'Cancelling ctx from {side!r}-side\n'
)
reminfo: str = (
# ' =>\n'
@@ -950,7 +953,7 @@ class Context:
f'\n'
f'c)=> {self.chan.uid}\n'
f' |_[{self.dst_maddr}\n'
- f' >>{self.repr_rpc}\n'
+ f' >> {self.repr_rpc}\n'
# f' >> {self._nsf}() -> {codec}[dict]:\n\n'
# TODO: pull msg-type from spec re #320
)
@@ -2025,10 +2028,8 @@ async def open_context_from_portal(
ctxc_from_callee: ContextCancelled|None = None
try:
async with (
- trio.open_nursery(
- strict_exception_groups=False,
- ) as tn,
-
+ collapse_eg(),
+ trio.open_nursery() as tn,
msgops.maybe_limit_plds(
ctx=ctx,
spec=ctx_meta.get('pld_spec'),
diff --git a/tractor/_discovery.py b/tractor/_discovery.py
index f6b3b585..a332ab73 100644
--- a/tractor/_discovery.py
+++ b/tractor/_discovery.py
@@ -28,7 +28,10 @@ from typing import (
from contextlib import asynccontextmanager as acm
from tractor.log import get_logger
-from .trionics import gather_contexts
+from .trionics import (
+ gather_contexts,
+ collapse_eg,
+)
from .ipc import _connect_chan, Channel
from ._addr import (
UnwrappedAddress,
@@ -87,7 +90,6 @@ async def get_registry(
yield regstr_ptl
-
@acm
async def get_root(
**kwargs,
@@ -253,9 +255,12 @@ async def find_actor(
for addr in registry_addrs
)
portals: list[Portal]
- async with gather_contexts(
- mngrs=maybe_portals,
- ) as portals:
+ async with (
+ collapse_eg(),
+ gather_contexts(
+ mngrs=maybe_portals,
+ ) as portals,
+ ):
# log.runtime(
# 'Gathered portals:\n'
# f'{portals}'
diff --git a/tractor/_portal.py b/tractor/_portal.py
index 659ddf6d..5729edd4 100644
--- a/tractor/_portal.py
+++ b/tractor/_portal.py
@@ -39,7 +39,10 @@ import warnings
import trio
-from .trionics import maybe_open_nursery
+from .trionics import (
+ maybe_open_nursery,
+ collapse_eg,
+)
from ._state import (
current_actor,
)
@@ -583,14 +586,13 @@ async def open_portal(
assert actor
was_connected: bool = False
- async with maybe_open_nursery(
- tn,
- shield=shield,
- strict_exception_groups=False,
- # ^XXX^ TODO? soo roll our own then ??
- # -> since we kinda want the "if only one `.exception` then
- # just raise that" interface?
- ) as tn:
+ async with (
+ collapse_eg(),
+ maybe_open_nursery(
+ tn,
+ shield=shield,
+ ) as tn,
+ ):
if not channel.connected():
await channel.connect()
diff --git a/tractor/_root.py b/tractor/_root.py
index 16d70b98..370798dd 100644
--- a/tractor/_root.py
+++ b/tractor/_root.py
@@ -37,13 +37,7 @@ import warnings
import trio
-from ._runtime import (
- Actor,
- Arbiter,
- # TODO: rename and make a non-actor subtype?
- # Arbiter as Registry,
- async_main,
-)
+from . import _runtime
from .devx import (
debug,
_frame_stack,
@@ -64,6 +58,7 @@ from ._addr import (
)
from .trionics import (
is_multi_cancelled,
+ collapse_eg,
)
from ._exceptions import (
RuntimeFailure,
@@ -102,7 +97,7 @@ async def maybe_block_bp(
):
logger.info(
f'Found `greenback` installed @ {maybe_mod}\n'
- 'Enabling `tractor.pause_from_sync()` support!\n'
+ f'Enabling `tractor.pause_from_sync()` support!\n'
)
os.environ['PYTHONBREAKPOINT'] = (
'tractor.devx.debug._sync_pause_from_builtin'
@@ -197,9 +192,13 @@ async def open_root_actor(
# read-only state to sublayers?
# extra_rt_vars: dict|None = None,
-) -> Actor:
+) -> _runtime.Actor:
'''
- Runtime init entry point for ``tractor``.
+ Initialize the `tractor` runtime by starting a "root actor" in
+ a parent-most Python process.
+
+ All (disjoint) actor-process-trees-as-programs are created via
+ this entrypoint.
'''
# XXX NEVER allow nested actor-trees!
@@ -397,7 +396,7 @@ async def open_root_actor(
f'Registry(s) seem(s) to exist @ {ponged_addrs}'
)
- actor = Actor(
+ actor = _runtime.Actor(
name=name or 'anonymous',
uuid=mk_uuid(),
registry_addrs=ponged_addrs,
@@ -436,7 +435,8 @@ async def open_root_actor(
# https://github.com/goodboy/tractor/pull/348
# https://github.com/goodboy/tractor/issues/296
- actor = Arbiter(
+ # TODO: rename as `RootActor` or is that even necessary?
+ actor = _runtime.Arbiter(
name=name or 'registrar',
uuid=mk_uuid(),
registry_addrs=registry_addrs,
@@ -471,18 +471,21 @@ async def open_root_actor(
'-> Opening new registry @ '
+
'\n'.join(
- f'@{addr}' for addr in reg_addrs
+ f'{addr}' for addr in reg_addrs
)
)
logger.info(f'{report}\n')
- # start the actor runtime in a new task
- async with trio.open_nursery(
- strict_exception_groups=False,
- # ^XXX^ TODO? instead unpack any RAE as per "loose" style?
- ) as nursery:
+ # start runtime in a bg sub-task, yield to caller.
+ async with (
+ collapse_eg(),
+ trio.open_nursery() as root_tn,
- # ``_runtime.async_main()`` creates an internal nursery
+ # XXX, finally-footgun below?
+ # -> see note on why shielding.
+ # maybe_raise_from_masking_exc(),
+ ):
+ # `_runtime.async_main()` creates an internal nursery
# and blocks here until any underlying actor(-process)
# tree has terminated thereby conducting so called
# "end-to-end" structured concurrency throughout an
@@ -490,9 +493,9 @@ async def open_root_actor(
# "actor runtime" primitives are SC-compat and thus all
# transitively spawned actors/processes must be as
# well.
- await nursery.start(
+ await root_tn.start(
partial(
- async_main,
+ _runtime.async_main,
actor,
accept_addrs=trans_bind_addrs,
parent_addr=None
@@ -540,7 +543,7 @@ async def open_root_actor(
raise
finally:
- # NOTE: not sure if we'll ever need this but it's
+ # NOTE/TODO?, not sure if we'll ever need this but it's
# possibly better for even more determinism?
# logger.cancel(
# f'Waiting on {len(nurseries)} nurseries in root..')
diff --git a/tractor/_rpc.py b/tractor/_rpc.py
index 2bd4d6e3..eb1df2cc 100644
--- a/tractor/_rpc.py
+++ b/tractor/_rpc.py
@@ -765,7 +765,6 @@ async def _invoke(
BaseExceptionGroup,
BaseException,
trio.Cancelled,
-
) as _scope_err:
scope_err = _scope_err
if (
diff --git a/tractor/_runtime.py b/tractor/_runtime.py
index bc915b85..ef7a3018 100644
--- a/tractor/_runtime.py
+++ b/tractor/_runtime.py
@@ -74,6 +74,9 @@ from tractor.msg import (
pretty_struct,
types as msgtypes,
)
+from .trionics import (
+ collapse_eg,
+)
from .ipc import (
Channel,
# IPCServer, # causes cycles atm..
@@ -359,7 +362,7 @@ class Actor:
def pformat(
self,
- ds: str = ':',
+ ds: str = ': ',
indent: int = 0,
privates: bool = False,
) -> str:
@@ -1471,17 +1474,18 @@ async def async_main(
# parent is kept alive as a resilient service until
# cancellation steps have (mostly) occurred in
# a deterministic way.
- async with trio.open_nursery(
- strict_exception_groups=False,
- ) as root_nursery:
- actor._root_n = root_nursery
+ root_tn: trio.Nursery
+ async with (
+ collapse_eg(),
+ trio.open_nursery() as root_tn,
+ ):
+ actor._root_n = root_tn
assert actor._root_n
ipc_server: _server.IPCServer
async with (
- trio.open_nursery(
- strict_exception_groups=False,
- ) as service_nursery,
+ collapse_eg(),
+ trio.open_nursery() as service_nursery,
_server.open_ipc_server(
parent_tn=service_nursery,
stream_handler_tn=service_nursery,
@@ -1605,7 +1609,7 @@ async def async_main(
# start processing parent requests until our channel
# server is 100% up and running.
if actor._parent_chan:
- await root_nursery.start(
+ await root_tn.start(
partial(
_rpc.process_messages,
chan=actor._parent_chan,
@@ -1756,9 +1760,7 @@ async def async_main(
f' {pformat(ipc_server._peers)}'
)
log.runtime(teardown_report)
- await ipc_server.wait_for_no_more_peers(
- shield=True,
- )
+ await ipc_server.wait_for_no_more_peers()
teardown_report += (
'-]> all peer channels are complete.\n'
diff --git a/tractor/_supervise.py b/tractor/_supervise.py
index 9fdad8ce..ec2b1864 100644
--- a/tractor/_supervise.py
+++ b/tractor/_supervise.py
@@ -44,6 +44,7 @@ from ._runtime import Actor
from ._portal import Portal
from .trionics import (
is_multi_cancelled,
+ collapse_eg,
)
from ._exceptions import (
ContextCancelled,
@@ -326,9 +327,10 @@ class ActorNursery:
server: IPCServer = self._actor.ipc_server
with trio.move_on_after(3) as cs:
- async with trio.open_nursery(
- strict_exception_groups=False,
- ) as tn:
+ async with (
+ collapse_eg(),
+ trio.open_nursery() as tn,
+ ):
subactor: Actor
proc: trio.Process
@@ -421,10 +423,10 @@ async def _open_and_supervise_one_cancels_all_nursery(
# `ActorNursery.start_actor()`).
# errors from this daemon actor nursery bubble up to caller
- async with trio.open_nursery(
- strict_exception_groups=False,
- # ^XXX^ TODO? instead unpack any RAE as per "loose" style?
- ) as da_nursery:
+ async with (
+ collapse_eg(),
+ trio.open_nursery() as da_nursery,
+ ):
try:
# This is the inner level "run in actor" nursery. It is
# awaited first since actors spawned in this way (using
@@ -434,11 +436,10 @@ async def _open_and_supervise_one_cancels_all_nursery(
# immediately raised for handling by a supervisor strategy.
# As such if the strategy propagates any error(s) upwards
# the above "daemon actor" nursery will be notified.
- async with trio.open_nursery(
- strict_exception_groups=False,
- # ^XXX^ TODO? instead unpack any RAE as per "loose" style?
- ) as ria_nursery:
-
+ async with (
+ collapse_eg(),
+ trio.open_nursery() as ria_nursery,
+ ):
an = ActorNursery(
actor,
ria_nursery,
diff --git a/tractor/ipc/_server.py b/tractor/ipc/_server.py
index e857db19..46fde2fc 100644
--- a/tractor/ipc/_server.py
+++ b/tractor/ipc/_server.py
@@ -814,10 +814,14 @@ class Server(Struct):
async def wait_for_no_more_peers(
self,
- shield: bool = False,
+ # XXX, should this even be allowed?
+ # -> i've seen it cause hangs on teardown
+ # in `test_resource_cache.py`
+ # _shield: bool = False,
) -> None:
- with trio.CancelScope(shield=shield):
- await self._no_more_peers.wait()
+ await self._no_more_peers.wait()
+ # with trio.CancelScope(shield=_shield):
+ # await self._no_more_peers.wait()
async def wait_for_peer(
self,
diff --git a/tractor/trionics/__init__.py b/tractor/trionics/__init__.py
index afd1f434..2e91aa30 100644
--- a/tractor/trionics/__init__.py
+++ b/tractor/trionics/__init__.py
@@ -31,7 +31,7 @@ from ._broadcast import (
)
from ._beg import (
collapse_eg as collapse_eg,
- maybe_collapse_eg as maybe_collapse_eg,
+ get_collapsed_eg as get_collapsed_eg,
is_multi_cancelled as is_multi_cancelled,
)
from ._taskc import (
diff --git a/tractor/trionics/_beg.py b/tractor/trionics/_beg.py
index ad10f3bf..f466ab3c 100644
--- a/tractor/trionics/_beg.py
+++ b/tractor/trionics/_beg.py
@@ -15,8 +15,9 @@
# along with this program. If not, see .
'''
-`BaseExceptionGroup` related utils and helpers pertaining to
-first-class-`trio` from a historical perspective B)
+`BaseExceptionGroup` utils and helpers pertaining to
+first-class-`trio` from a "historical" perspective, like "loose
+exception group" task-nurseries.
'''
from contextlib import (
@@ -24,27 +25,84 @@ from contextlib import (
)
from typing import (
Literal,
+ Type,
)
import trio
+# from trio._core._concat_tb import (
+# concat_tb,
+# )
-def maybe_collapse_eg(
- beg: BaseExceptionGroup,
+# XXX NOTE
+# taken verbatim from `trio._core._run` except,
+# - remove the NONSTRICT_EXCEPTIONGROUP_NOTE deprecation-note
+# guard-check; we know we want an explicit collapse.
+# - mask out tb rewriting in collapse case, i don't think it really
+# matters?
+#
+def collapse_exception_group(
+ excgroup: BaseExceptionGroup[BaseException],
) -> BaseException:
+ """Recursively collapse any single-exception groups into that single contained
+ exception.
+
+ """
+ exceptions = list(excgroup.exceptions)
+ modified = False
+ for i, exc in enumerate(exceptions):
+ if isinstance(exc, BaseExceptionGroup):
+ new_exc = collapse_exception_group(exc)
+ if new_exc is not exc:
+ modified = True
+ exceptions[i] = new_exc
+
+ if (
+ len(exceptions) == 1
+ and isinstance(excgroup, BaseExceptionGroup)
+
+ # XXX trio's loose-setting condition..
+ # and NONSTRICT_EXCEPTIONGROUP_NOTE in getattr(excgroup, "__notes__", ())
+ ):
+ # exceptions[0].__traceback__ = concat_tb(
+ # excgroup.__traceback__,
+ # exceptions[0].__traceback__,
+ # )
+ return exceptions[0]
+ elif modified:
+ return excgroup.derive(exceptions)
+ else:
+ return excgroup
+
+
+def get_collapsed_eg(
+ beg: BaseExceptionGroup,
+
+) -> BaseException|None:
'''
- If the input beg can collapse to a single non-eg sub-exception,
- return it instead.
+ If the input beg can collapse to a single sub-exception which is
+ itself **not** an eg, return it.
'''
- if len(excs := beg.exceptions) == 1:
- return excs[0]
+ maybe_exc = collapse_exception_group(beg)
+ if maybe_exc is beg:
+ return None
- return beg
+ return maybe_exc
@acm
-async def collapse_eg():
+async def collapse_eg(
+ hide_tb: bool = True,
+
+ # XXX, for ex. will always show begs containing single taskc
+ ignore: set[Type[BaseException]] = {
+ # trio.Cancelled,
+ },
+ add_notes: bool = True,
+
+ bp: bool = False,
+):
'''
If `BaseExceptionGroup` raised in the body scope is
"collapse-able" (in the same way that
@@ -52,15 +110,58 @@ async def collapse_eg():
only raise the lone emedded non-eg in in place.
'''
+ __tracebackhide__: bool = hide_tb
try:
yield
- except* BaseException as beg:
- if (
- exc := maybe_collapse_eg(beg)
- ) is not beg:
- raise exc
+ except BaseExceptionGroup as _beg:
+ beg = _beg
- raise beg
+ if (
+ bp
+ and
+ len(beg.exceptions) > 1
+ ):
+ import tractor
+ if tractor.current_actor(
+ err_on_no_runtime=False,
+ ):
+ await tractor.pause(shield=True)
+ else:
+ breakpoint()
+
+ if (
+ (exc := get_collapsed_eg(beg))
+ and
+ type(exc) not in ignore
+ ):
+
+ # TODO? report number of nested groups it was collapsed
+ # *from*?
+ if add_notes:
+ from_group_note: str = (
+ '( ^^^ this exc was collapsed from a group ^^^ )\n'
+ )
+ if (
+ from_group_note
+ not in
+ getattr(exc, "__notes__", ())
+ ):
+ exc.add_note(from_group_note)
+
+ # raise exc
+ # ^^ this will leave the orig beg tb above with the
+ # "during the handling of the following.."
+ # So, instead do..
+ #
+ if cause := exc.__cause__:
+ raise exc from cause
+ else:
+ # suppress "during handling of "
+ # output in tb/console.
+ raise exc from None
+
+ # keep original
+ raise # beg
def is_multi_cancelled(