Compare commits

...

10 Commits

Author SHA1 Message Date
Tyler Goodlet 99dc40c5d7 Use `collapse_eg()` in broadcaster suite
Around the test embedded `trio.open_nursery()` calls as expected. Also
tidy up the various nursery var names.
2025-03-03 18:36:01 -05:00
Tyler Goodlet 749c43b034 Draft some eg collapsing helpers
Inside a new `.trionics._beg` and exposed from the subpkg ns in
anticipation of the `strict_exception_groups=False` being removed by
`trio` in py 3.15.

Notes,
- mk an embedded single-exc "extractor" using a `BaseExceptionGroup.exceptions` length
  check, when 1 return the lone child.
- use the above in a new `@acm`, async bc it's most likely to be composed in an
  `async with` tuple-style sequence block, called `collapse_eg()` which
  acts a one line "absorber" for when the above mentioned flag is no
  logner supported by `trio.open_nursery()`.

All untested atm fwiw.. but soon to be used in our test suite(s) likely!
2025-03-03 18:35:44 -05:00
Tyler Goodlet ad56d10c51 Fix docs tests with yet another loosie-goosie
So the KBI propagates up to the actor nursery scope and also avoid
running any `examples/multihost/` subdir scripts.
2025-03-03 17:55:07 -05:00
Tyler Goodlet 375c00da6e Another couple loose-ifies for discovery and advanced fault suites 2025-03-03 13:57:54 -05:00
Tyler Goodlet 0631a5b30b Add (masked) meta-debug-fixture for determining if `debug_mode` is set in harness.. 2025-03-03 12:32:25 -05:00
Tyler Goodlet eb910bb83f Various test tweaks related to 3.13 egs
Including changes like,
- loose eg flagging in various test emedded `trio.open_nursery()`s.
- changes to eg handling (like using `except*`).
- added `debug_mode` integration to tests that needed some REPLin
  in order to figure out appropriate updates.
2025-03-03 12:31:29 -05:00
Tyler Goodlet 4721c81d23 Go to loose egs in `Actor` root & service nurseries (for now..) 2025-03-03 12:20:33 -05:00
Tyler Goodlet 591b4640e1 Fix `roundtripped` ref error in `validate_payload_msg()` 2025-03-03 12:19:11 -05:00
Tyler Goodlet b44482b1c1 Hide `open_nursery()` frame by def 2025-03-03 12:18:10 -05:00
Tyler Goodlet 03f8864fd4 Moar sclang log fmting tweaks 2025-03-03 12:17:51 -05:00
25 changed files with 214 additions and 86 deletions

View File

@ -62,7 +62,9 @@ async def recv_and_spawn_net_killers(
await ctx.started() await ctx.started()
async with ( async with (
ctx.open_stream() as stream, ctx.open_stream() as stream,
trio.open_nursery() as n, trio.open_nursery(
strict_exception_groups=False,
) as tn,
): ):
async for i in stream: async for i in stream:
print(f'child echoing {i}') print(f'child echoing {i}')
@ -77,11 +79,11 @@ async def recv_and_spawn_net_killers(
i >= break_ipc_after i >= break_ipc_after
): ):
broke_ipc = True broke_ipc = True
n.start_soon( tn.start_soon(
iter_ipc_stream, iter_ipc_stream,
stream, stream,
) )
n.start_soon( tn.start_soon(
partial( partial(
break_ipc_then_error, break_ipc_then_error,
stream=stream, stream=stream,

View File

@ -21,11 +21,13 @@ async def name_error():
async def main(): async def main():
"""Test breakpoint in a streaming actor. '''
""" Test breakpoint in a streaming actor.
'''
async with tractor.open_nursery( async with tractor.open_nursery(
debug_mode=True, debug_mode=True,
# loglevel='cancel', loglevel='cancel',
# loglevel='devx', # loglevel='devx',
) as n: ) as n:

View File

@ -40,7 +40,7 @@ async def main():
""" """
async with tractor.open_nursery( async with tractor.open_nursery(
debug_mode=True, debug_mode=True,
# loglevel='cancel', loglevel='devx',
) as n: ) as n:
# spawn both actors # spawn both actors

View File

@ -91,7 +91,7 @@ async def main() -> list[int]:
an: ActorNursery an: ActorNursery
async with tractor.open_nursery( async with tractor.open_nursery(
loglevel='cancel', loglevel='cancel',
debug_mode=True, # debug_mode=True,
) as an: ) as an:
seed = int(1e3) seed = int(1e3)

View File

@ -3,20 +3,18 @@ import trio
import tractor import tractor
async def sleepy_jane(): async def sleepy_jane() -> None:
uid = tractor.current_actor().uid uid: tuple = tractor.current_actor().uid
print(f'Yo i am actor {uid}') print(f'Yo i am actor {uid}')
await trio.sleep_forever() await trio.sleep_forever()
async def main(): async def main():
''' '''
Spawn a flat actor cluster, with one process per Spawn a flat actor cluster, with one process per detected core.
detected core.
''' '''
portal_map: dict[str, tractor.Portal] portal_map: dict[str, tractor.Portal]
results: dict[str, str]
# look at this hip new syntax! # look at this hip new syntax!
async with ( async with (
@ -25,11 +23,16 @@ async def main():
modules=[__name__] modules=[__name__]
) as portal_map, ) as portal_map,
trio.open_nursery() as n, trio.open_nursery(
strict_exception_groups=False,
) as tn,
): ):
for (name, portal) in portal_map.items(): for (name, portal) in portal_map.items():
n.start_soon(portal.run, sleepy_jane) tn.start_soon(
portal.run,
sleepy_jane,
)
await trio.sleep(0.5) await trio.sleep(0.5)
@ -41,4 +44,4 @@ if __name__ == '__main__':
try: try:
trio.run(main) trio.run(main)
except KeyboardInterrupt: except KeyboardInterrupt:
pass print('trio cancelled by KBI')

View File

@ -75,7 +75,10 @@ def pytest_configure(config):
@pytest.fixture(scope='session') @pytest.fixture(scope='session')
def debug_mode(request): def debug_mode(request):
return request.config.option.tractor_debug_mode debug_mode: bool = request.config.option.tractor_debug_mode
# if debug_mode:
# breakpoint()
return debug_mode
@pytest.fixture(scope='session', autouse=True) @pytest.fixture(scope='session', autouse=True)
@ -92,6 +95,12 @@ def spawn_backend(request) -> str:
return request.config.option.spawn_backend return request.config.option.spawn_backend
# @pytest.fixture(scope='function', autouse=True)
# def debug_enabled(request) -> str:
# from tractor import _state
# if _state._runtime_vars['_debug_mode']:
# breakpoint()
_ci_env: bool = os.environ.get('CI', False) _ci_env: bool = os.environ.get('CI', False)

View File

@ -309,10 +309,13 @@ def test_subactor_breakpoint(
child.expect(EOF) child.expect(EOF)
assert in_prompt_msg( assert in_prompt_msg(
child, child, [
['RemoteActorError:', 'MessagingError:',
'RemoteActorError:',
"('breakpoint_forever'", "('breakpoint_forever'",
'bdb.BdbQuit',] 'bdb.BdbQuit',
],
pause_on_false=True,
) )

View File

@ -3,7 +3,6 @@ Sketchy network blackoutz, ugly byzantine gens, puedes eschuchar la
cancelacion?.. cancelacion?..
''' '''
import itertools
from functools import partial from functools import partial
from types import ModuleType from types import ModuleType
@ -230,13 +229,10 @@ def test_ipc_channel_break_during_stream(
# get raw instance from pytest wrapper # get raw instance from pytest wrapper
value = excinfo.value value = excinfo.value
if isinstance(value, ExceptionGroup): if isinstance(value, ExceptionGroup):
value = next( excs = value.exceptions
itertools.dropwhile( assert len(excs) == 1
lambda exc: not isinstance(exc, expect_final_exc), final_exc = excs[0]
value.exceptions, assert isinstance(final_exc, expect_final_exc)
)
)
assert value
@tractor.context @tractor.context
@ -259,15 +255,16 @@ async def break_ipc_after_started(
def test_stream_closed_right_after_ipc_break_and_zombie_lord_engages(): def test_stream_closed_right_after_ipc_break_and_zombie_lord_engages():
''' '''
Verify that is a subactor's IPC goes down just after bringing up a stream Verify that is a subactor's IPC goes down just after bringing up
the parent can trigger a SIGINT and the child will be reaped out-of-IPC by a stream the parent can trigger a SIGINT and the child will be
the localhost process supervision machinery: aka "zombie lord". reaped out-of-IPC by the localhost process supervision machinery:
aka "zombie lord".
''' '''
async def main(): async def main():
with trio.fail_after(3): with trio.fail_after(3):
async with tractor.open_nursery() as n: async with tractor.open_nursery() as an:
portal = await n.start_actor( portal = await an.start_actor(
'ipc_breaker', 'ipc_breaker',
enable_modules=[__name__], enable_modules=[__name__],
) )

View File

@ -307,7 +307,15 @@ async def inf_streamer(
async with ( async with (
ctx.open_stream() as stream, ctx.open_stream() as stream,
trio.open_nursery() as tn,
# XXX TODO, INTERESTING CASE!!
# - if we don't collapse the eg then the embedded
# `trio.EndOfChannel` doesn't propagate directly to the above
# .open_stream() parent, resulting in it also raising instead
# of gracefully absorbing as normal.. so how to handle?
trio.open_nursery(
strict_exception_groups=False,
) as tn,
): ):
async def close_stream_on_sentinel(): async def close_stream_on_sentinel():
async for msg in stream: async for msg in stream:

View File

@ -519,7 +519,9 @@ def test_cancel_via_SIGINT_other_task(
async def main(): async def main():
# should never timeout since SIGINT should cancel the current program # should never timeout since SIGINT should cancel the current program
with trio.fail_after(timeout): with trio.fail_after(timeout):
async with trio.open_nursery() as n: async with trio.open_nursery(
strict_exception_groups=False,
) as n:
await n.start(spawn_and_sleep_forever) await n.start(spawn_and_sleep_forever)
if 'mp' in spawn_backend: if 'mp' in spawn_backend:
time.sleep(0.1) time.sleep(0.1)
@ -612,6 +614,12 @@ def test_fast_graceful_cancel_when_spawn_task_in_soft_proc_wait_for_daemon(
nurse.start_soon(delayed_kbi) nurse.start_soon(delayed_kbi)
await p.run(do_nuthin) await p.run(do_nuthin)
# need to explicitly re-raise the lone kbi..now
except* KeyboardInterrupt as kbi_eg:
assert (len(excs := kbi_eg.exceptions) == 1)
raise excs[0]
finally: finally:
duration = time.time() - start duration = time.time() - start
if duration > timeout: if duration > timeout:

View File

@ -874,13 +874,13 @@ def chk_pld_type(
return roundtrip return roundtrip
def test_limit_msgspec(): def test_limit_msgspec(
debug_mode: bool,
):
async def main(): async def main():
async with tractor.open_root_actor( async with tractor.open_root_actor(
debug_mode=True debug_mode=debug_mode,
): ):
# ensure we can round-trip a boxing `PayloadMsg` # ensure we can round-trip a boxing `PayloadMsg`
assert chk_pld_type( assert chk_pld_type(
payload_spec=Any, payload_spec=Any,

View File

@ -95,8 +95,8 @@ async def trio_main(
# stash a "service nursery" as "actor local" (aka a Python global) # stash a "service nursery" as "actor local" (aka a Python global)
global _nursery global _nursery
n = _nursery tn = _nursery
assert n assert tn
async def consume_stream(): async def consume_stream():
async with wrapper_mngr() as stream: async with wrapper_mngr() as stream:
@ -104,10 +104,10 @@ async def trio_main(
print(msg) print(msg)
# run 2 tasks to ensure broadcaster chan use # run 2 tasks to ensure broadcaster chan use
n.start_soon(consume_stream) tn.start_soon(consume_stream)
n.start_soon(consume_stream) tn.start_soon(consume_stream)
n.start_soon(trio_sleep_and_err) tn.start_soon(trio_sleep_and_err)
await trio.sleep_forever() await trio.sleep_forever()
@ -119,8 +119,8 @@ async def open_actor_local_nursery(
global _nursery global _nursery
async with trio.open_nursery( async with trio.open_nursery(
strict_exception_groups=False, strict_exception_groups=False,
) as n: ) as tn:
_nursery = n _nursery = tn
await ctx.started() await ctx.started()
await trio.sleep(10) await trio.sleep(10)
# await trio.sleep(1) # await trio.sleep(1)
@ -134,7 +134,7 @@ async def open_actor_local_nursery(
# never yields back.. aka a scenario where the # never yields back.. aka a scenario where the
# ``tractor.context`` task IS NOT in the service n's cancel # ``tractor.context`` task IS NOT in the service n's cancel
# scope. # scope.
n.cancel_scope.cancel() tn.cancel_scope.cancel()
@pytest.mark.parametrize( @pytest.mark.parametrize(
@ -159,7 +159,7 @@ def test_actor_managed_trio_nursery_task_error_cancels_aio(
async with tractor.open_nursery() as n: async with tractor.open_nursery() as n:
p = await n.start_actor( p = await n.start_actor(
'nursery_mngr', 'nursery_mngr',
infect_asyncio=asyncio_mode, infect_asyncio=asyncio_mode, # TODO, is this enabling debug mode?
enable_modules=[__name__], enable_modules=[__name__],
) )
async with ( async with (

View File

@ -181,7 +181,9 @@ async def spawn_and_check_registry(
try: try:
async with tractor.open_nursery() as n: async with tractor.open_nursery() as n:
async with trio.open_nursery() as trion: async with trio.open_nursery(
strict_exception_groups=False,
) as trion:
portals = {} portals = {}
for i in range(3): for i in range(3):
@ -316,7 +318,9 @@ async def close_chans_before_nursery(
async with portal2.open_stream_from( async with portal2.open_stream_from(
stream_forever stream_forever
) as agen2: ) as agen2:
async with trio.open_nursery() as n: async with trio.open_nursery(
strict_exception_groups=False,
) as n:
n.start_soon(streamer, agen1) n.start_soon(streamer, agen1)
n.start_soon(cancel, use_signal, .5) n.start_soon(cancel, use_signal, .5)
try: try:

View File

@ -19,7 +19,7 @@ from tractor._testing import (
@pytest.fixture @pytest.fixture
def run_example_in_subproc( def run_example_in_subproc(
loglevel: str, loglevel: str,
testdir: pytest.Testdir, testdir: pytest.Pytester,
reg_addr: tuple[str, int], reg_addr: tuple[str, int],
): ):
@ -81,27 +81,37 @@ def run_example_in_subproc(
# walk yields: (dirpath, dirnames, filenames) # walk yields: (dirpath, dirnames, filenames)
[ [
(p[0], f) for p in os.walk(examples_dir()) for f in p[2] (p[0], f)
for p in os.walk(examples_dir())
for f in p[2]
if '__' not in f if (
'__' not in f
and f[0] != '_' and f[0] != '_'
and 'debugging' not in p[0] and 'debugging' not in p[0]
and 'integration' not in p[0] and 'integration' not in p[0]
and 'advanced_faults' not in p[0] and 'advanced_faults' not in p[0]
and 'multihost' not in p[0]
)
], ],
ids=lambda t: t[1], ids=lambda t: t[1],
) )
def test_example(run_example_in_subproc, example_script): def test_example(
"""Load and run scripts from this repo's ``examples/`` dir as a user run_example_in_subproc,
example_script,
):
'''
Load and run scripts from this repo's ``examples/`` dir as a user
would copy and pasing them into their editor. would copy and pasing them into their editor.
On windows a little more "finessing" is done to make On windows a little more "finessing" is done to make
``multiprocessing`` play nice: we copy the ``__main__.py`` into the ``multiprocessing`` play nice: we copy the ``__main__.py`` into the
test directory and invoke the script as a module with ``python -m test directory and invoke the script as a module with ``python -m
test_example``. test_example``.
"""
ex_file = os.path.join(*example_script) '''
ex_file: str = os.path.join(*example_script)
if 'rpc_bidir_streaming' in ex_file and sys.version_info < (3, 9): if 'rpc_bidir_streaming' in ex_file and sys.version_info < (3, 9):
pytest.skip("2-way streaming example requires py3.9 async with syntax") pytest.skip("2-way streaming example requires py3.9 async with syntax")
@ -127,7 +137,8 @@ def test_example(run_example_in_subproc, example_script):
# shouldn't eventually once we figure out what's # shouldn't eventually once we figure out what's
# a better way to be explicit about aio side # a better way to be explicit about aio side
# cancels? # cancels?
and 'asyncio.exceptions.CancelledError' not in last_error and
'asyncio.exceptions.CancelledError' not in last_error
): ):
raise Exception(errmsg) raise Exception(errmsg)

View File

@ -2,7 +2,9 @@
Broadcast channels for fan-out to local tasks. Broadcast channels for fan-out to local tasks.
""" """
from contextlib import asynccontextmanager from contextlib import (
asynccontextmanager as acm,
)
from functools import partial from functools import partial
from itertools import cycle from itertools import cycle
import time import time
@ -15,6 +17,7 @@ import tractor
from tractor.trionics import ( from tractor.trionics import (
broadcast_receiver, broadcast_receiver,
Lagged, Lagged,
collapse_eg,
) )
@ -62,7 +65,7 @@ async def ensure_sequence(
break break
@asynccontextmanager @acm
async def open_sequence_streamer( async def open_sequence_streamer(
sequence: list[int], sequence: list[int],
@ -74,9 +77,9 @@ async def open_sequence_streamer(
async with tractor.open_nursery( async with tractor.open_nursery(
arbiter_addr=reg_addr, arbiter_addr=reg_addr,
start_method=start_method, start_method=start_method,
) as tn: ) as an:
portal = await tn.start_actor( portal = await an.start_actor(
'sequence_echoer', 'sequence_echoer',
enable_modules=[__name__], enable_modules=[__name__],
) )
@ -155,9 +158,12 @@ def test_consumer_and_parent_maybe_lag(
) as stream: ) as stream:
try: try:
async with trio.open_nursery() as n: async with (
collapse_eg(),
trio.open_nursery() as tn,
):
n.start_soon( tn.start_soon(
ensure_sequence, ensure_sequence,
stream, stream,
sequence.copy(), sequence.copy(),
@ -230,8 +236,8 @@ def test_faster_task_to_recv_is_cancelled_by_slower(
) as stream: ) as stream:
async with trio.open_nursery() as n: async with trio.open_nursery() as tn:
n.start_soon( tn.start_soon(
ensure_sequence, ensure_sequence,
stream, stream,
sequence.copy(), sequence.copy(),
@ -371,13 +377,13 @@ def test_ensure_slow_consumers_lag_out(
f'on {lags}:{value}') f'on {lags}:{value}')
return return
async with trio.open_nursery() as nursery: async with trio.open_nursery() as tn:
for i in range(1, num_laggers): for i in range(1, num_laggers):
task_name = f'sub_{i}' task_name = f'sub_{i}'
laggers[task_name] = 0 laggers[task_name] = 0
nursery.start_soon( tn.start_soon(
partial( partial(
sub_and_print, sub_and_print,
delay=i*0.001, delay=i*0.001,
@ -497,6 +503,7 @@ def test_no_raise_on_lag():
# internals when the no raise flag is set. # internals when the no raise flag is set.
loglevel='warning', loglevel='warning',
), ),
collapse_eg(),
trio.open_nursery() as n, trio.open_nursery() as n,
): ):
n.start_soon(slow) n.start_soon(slow)

View File

@ -101,6 +101,7 @@ def test_stashed_child_nursery(use_start_soon):
def test_acm_embedded_nursery_propagates_enter_err( def test_acm_embedded_nursery_propagates_enter_err(
canc_from_finally: bool, canc_from_finally: bool,
unmask_from_canc: bool, unmask_from_canc: bool,
debug_mode: bool,
): ):
''' '''
Demo how a masking `trio.Cancelled` could be handled by unmasking from the Demo how a masking `trio.Cancelled` could be handled by unmasking from the
@ -174,7 +175,9 @@ def test_acm_embedded_nursery_propagates_enter_err(
await trio.lowlevel.checkpoint() await trio.lowlevel.checkpoint()
async def _main(): async def _main():
with tractor.devx.open_crash_handler() as bxerr: with tractor.devx.maybe_open_crash_handler(
pdb=debug_mode,
) as bxerr:
assert not bxerr.value assert not bxerr.value
async with ( async with (

View File

@ -255,7 +255,7 @@ class MsgpackTCPStream(MsgTransport):
raise TransportClosed( raise TransportClosed(
message=( message=(
f'IPC transport already closed by peer\n' f'IPC transport already closed by peer\n'
f'x)> {type(trans_err)}\n' f'x]> {type(trans_err)}\n'
f' |_{self}\n' f' |_{self}\n'
), ),
loglevel=loglevel, loglevel=loglevel,
@ -273,7 +273,7 @@ class MsgpackTCPStream(MsgTransport):
raise TransportClosed( raise TransportClosed(
message=( message=(
f'IPC transport already manually closed locally?\n' f'IPC transport already manually closed locally?\n'
f'x)> {type(closure_err)} \n' f'x]> {type(closure_err)} \n'
f' |_{self}\n' f' |_{self}\n'
), ),
loglevel='error', loglevel='error',
@ -289,8 +289,8 @@ class MsgpackTCPStream(MsgTransport):
raise TransportClosed( raise TransportClosed(
message=( message=(
f'IPC transport already gracefully closed\n' f'IPC transport already gracefully closed\n'
f')>\n' f']>\n'
f'|_{self}\n' f' |_{self}\n'
), ),
loglevel='transport', loglevel='transport',
# cause=??? # handy or no? # cause=??? # handy or no?

View File

@ -852,7 +852,7 @@ async def try_ship_error_to_remote(
log.critical( log.critical(
'IPC transport failure -> ' 'IPC transport failure -> '
f'failed to ship error to {remote_descr}!\n\n' f'failed to ship error to {remote_descr}!\n\n'
f'{type(msg)!r}[{msg.boxed_type}] X=> {channel.uid}\n' f'{type(msg)!r}[{msg.boxed_type_str}] X=> {channel.uid}\n'
f'\n' f'\n'
# TODO: use `.msg.preetty_struct` for this! # TODO: use `.msg.preetty_struct` for this!
f'{msg}\n' f'{msg}\n'

View File

@ -1725,11 +1725,15 @@ async def async_main(
# parent is kept alive as a resilient service until # parent is kept alive as a resilient service until
# cancellation steps have (mostly) occurred in # cancellation steps have (mostly) occurred in
# a deterministic way. # a deterministic way.
async with trio.open_nursery() as root_nursery: async with trio.open_nursery(
strict_exception_groups=False,
) as root_nursery:
actor._root_n = root_nursery actor._root_n = root_nursery
assert actor._root_n assert actor._root_n
async with trio.open_nursery() as service_nursery: async with trio.open_nursery(
strict_exception_groups=False,
) as service_nursery:
# This nursery is used to handle all inbound # This nursery is used to handle all inbound
# connections to us such that if the TCP server # connections to us such that if the TCP server
# is killed, connections can continue to process # is killed, connections can continue to process

View File

@ -108,6 +108,7 @@ def is_main_process() -> bool:
return mp.current_process().name == 'MainProcess' return mp.current_process().name == 'MainProcess'
# TODO, more verby name?
def debug_mode() -> bool: def debug_mode() -> bool:
''' '''
Bool determining if "debug mode" is on which enables Bool determining if "debug mode" is on which enables

View File

@ -376,7 +376,7 @@ class MsgStream(trio.abc.Channel):
f'Stream self-closed by {self._ctx.side!r}-side before EoC\n' f'Stream self-closed by {self._ctx.side!r}-side before EoC\n'
# } bc a stream is a "scope"/msging-phase inside an IPC # } bc a stream is a "scope"/msging-phase inside an IPC
f'x}}>\n' f'x}}>\n'
f'|_{self}\n' f' |_{self}\n'
) )
log.cancel(message) log.cancel(message)
self._eoc = trio.EndOfChannel(message) self._eoc = trio.EndOfChannel(message)

View File

@ -571,7 +571,7 @@ async def _open_and_supervise_one_cancels_all_nursery(
@acm @acm
# @api_frame # @api_frame
async def open_nursery( async def open_nursery(
hide_tb: bool = False, hide_tb: bool = True,
**kwargs, **kwargs,
# ^TODO, paramspec for `open_root_actor()` # ^TODO, paramspec for `open_root_actor()`

View File

@ -796,6 +796,7 @@ def validate_payload_msg(
__tracebackhide__: bool = hide_tb __tracebackhide__: bool = hide_tb
codec: MsgCodec = current_codec() codec: MsgCodec = current_codec()
msg_bytes: bytes = codec.encode(pld_msg) msg_bytes: bytes = codec.encode(pld_msg)
roundtripped: Started|None = None
try: try:
roundtripped: Started = codec.decode(msg_bytes) roundtripped: Started = codec.decode(msg_bytes)
ctx: Context = getattr(ipc, 'ctx', ipc) ctx: Context = getattr(ipc, 'ctx', ipc)
@ -832,9 +833,13 @@ def validate_payload_msg(
verb_header='Trying to send ', verb_header='Trying to send ',
is_invalid_payload=True, is_invalid_payload=True,
) )
except BaseException: except BaseException as _be:
if not roundtripped:
raise verr
be = _be
__tracebackhide__: bool = False __tracebackhide__: bool = False
raise raise be
if not raise_mte: if not raise_mte:
return mte return mte

View File

@ -29,3 +29,6 @@ from ._broadcast import (
BroadcastReceiver as BroadcastReceiver, BroadcastReceiver as BroadcastReceiver,
Lagged as Lagged, Lagged as Lagged,
) )
from ._beg import (
collapse_eg as collapse_eg,
)

View File

@ -0,0 +1,58 @@
# tractor: structured concurrent "actors".
# Copyright 2018-eternity Tyler Goodlet.
# This program is free software: you can redistribute it and/or modify
# it under the terms of the GNU Affero General Public License as published by
# the Free Software Foundation, either version 3 of the License, or
# (at your option) any later version.
# This program is distributed in the hope that it will be useful,
# but WITHOUT ANY WARRANTY; without even the implied warranty of
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
# GNU Affero General Public License for more details.
# You should have received a copy of the GNU Affero General Public License
# along with this program. If not, see <https://www.gnu.org/licenses/>.
'''
`BaseExceptionGroup` related utils and helpers pertaining to
first-class-`trio` from a historical perspective B)
'''
from contextlib import (
asynccontextmanager as acm,
)
def maybe_collapse_eg(
beg: BaseExceptionGroup,
) -> BaseException:
'''
If the input beg can collapse to a single non-eg sub-exception,
return it instead.
'''
if len(excs := beg.exceptions) == 1:
return excs[0]
return beg
@acm
async def collapse_eg():
'''
If `BaseExceptionGroup` raised in the body scope is
"collapse-able" (in the same way that
`trio.open_nursery(strict_exception_groups=False)` works) then
only raise the lone emedded non-eg in in place.
'''
try:
yield
except* BaseException as beg:
if (
exc := maybe_collapse_eg(beg)
) is not beg:
raise exc
raise beg