Add an inter-leaved-task error test

Trying to replicate cases where errors are raised in both `trio` and
`asyncio` tasks independently (at least in `.to_asyncio` API terms) with
a new `test_trio_prestarted_task_bubbles` that generates 3 cases inside
a `@acm` calls stack composing a `trio.Nursery` with
a `to_asyncio.open_channel_from()` call where a set of `trio` tasks are
started in a loop using `.start()` with various exc raising sequences,
- the aio task raising *before* the last `trio` task spawns.
- the aio task raising just after the last trio task spawns, but before
  it starts.
- after the last trio task `.start()` call returns control to the
  parent - but (for now) did not error.

TODO, still more cases to discover as i'm still fighting a `modden` bug
of this sort atm..

Other,
- tweak some other tests to have timeouts since some recent hangs were
  found..
- started mucking with py3.13 and thus adjustments for strict egs in
  some tests; full patchset to test suite likely coming soon!
hilevel_serman
Tyler Goodlet 2025-01-09 08:59:30 -05:00
parent 89fc072ca0
commit 924eff2985
1 changed files with 259 additions and 57 deletions

View File

@ -109,7 +109,9 @@ async def asyncio_actor(
except BaseException as err: except BaseException as err:
if expect_err: if expect_err:
assert isinstance(err, error_type) assert isinstance(err, error_type), (
f'{type(err)} is not {error_type}?'
)
raise raise
@ -181,8 +183,8 @@ def test_trio_cancels_aio(reg_addr):
with trio.move_on_after(1): with trio.move_on_after(1):
# cancel the nursery shortly after boot # cancel the nursery shortly after boot
async with tractor.open_nursery() as n: async with tractor.open_nursery() as tn:
await n.run_in_actor( await tn.run_in_actor(
asyncio_actor, asyncio_actor,
target='aio_sleep_forever', target='aio_sleep_forever',
expect_err='trio.Cancelled', expect_err='trio.Cancelled',
@ -202,22 +204,33 @@ async def trio_ctx(
# this will block until the ``asyncio`` task sends a "first" # this will block until the ``asyncio`` task sends a "first"
# message. # message.
with trio.fail_after(2): with trio.fail_after(2):
async with ( try:
trio.open_nursery() as n, async with (
trio.open_nursery(
# TODO, for new `trio` / py3.13
# strict_exception_groups=False,
) as tn,
tractor.to_asyncio.open_channel_from(
sleep_and_err,
) as (first, chan),
):
tractor.to_asyncio.open_channel_from( assert first == 'start'
sleep_and_err,
) as (first, chan),
):
assert first == 'start' # spawn another asyncio task for the cuck of it.
tn.start_soon(
tractor.to_asyncio.run_task,
aio_sleep_forever,
)
await trio.sleep_forever()
# spawn another asyncio task for the cuck of it. # TODO, factor this into a `trionics.callapse()`?
n.start_soon( except* BaseException as beg:
tractor.to_asyncio.run_task, # await tractor.pause(shield=True)
aio_sleep_forever, if len(excs := beg.exceptions) == 1:
) raise excs[0]
await trio.sleep_forever() else:
raise
@pytest.mark.parametrize( @pytest.mark.parametrize(
@ -236,7 +249,6 @@ def test_context_spawns_aio_task_that_errors(
''' '''
async def main(): async def main():
with trio.fail_after(2): with trio.fail_after(2):
async with tractor.open_nursery() as n: async with tractor.open_nursery() as n:
p = await n.start_actor( p = await n.start_actor(
@ -308,7 +320,9 @@ async def aio_cancel():
await aio_sleep_forever() await aio_sleep_forever()
def test_aio_cancelled_from_aio_causes_trio_cancelled(reg_addr): def test_aio_cancelled_from_aio_causes_trio_cancelled(
reg_addr: tuple,
):
''' '''
When the `asyncio.Task` cancels itself the `trio` side cshould When the `asyncio.Task` cancels itself the `trio` side cshould
also cancel and teardown and relay the cancellation cross-process also cancel and teardown and relay the cancellation cross-process
@ -405,6 +419,7 @@ async def stream_from_aio(
sequence=seq, sequence=seq,
expect_cancel=raise_err or exit_early, expect_cancel=raise_err or exit_early,
fail_early=aio_raise_err, fail_early=aio_raise_err,
) as (first, chan): ) as (first, chan):
assert first is True assert first is True
@ -423,10 +438,15 @@ async def stream_from_aio(
if raise_err: if raise_err:
raise Exception raise Exception
elif exit_early: elif exit_early:
print('`consume()` breaking early!\n')
break break
print('returning from `consume()`..\n')
# run 2 tasks each pulling from
# the inter-task-channel with the 2nd
# using a fan-out `BroadcastReceiver`.
if fan_out: if fan_out:
# start second task that get's the same stream value set.
async with ( async with (
# NOTE: this has to come first to avoid # NOTE: this has to come first to avoid
@ -436,11 +456,19 @@ async def stream_from_aio(
trio.open_nursery() as n, trio.open_nursery() as n,
): ):
# start 2nd task that get's broadcast the same
# value set.
n.start_soon(consume, br) n.start_soon(consume, br)
await consume(chan) await consume(chan)
else: else:
await consume(chan) await consume(chan)
except BaseException as err:
import logging
log = logging.getLogger()
log.exception('aio-subactor errored!\n')
raise err
finally: finally:
if ( if (
@ -461,7 +489,8 @@ async def stream_from_aio(
assert not fan_out assert not fan_out
assert pulled == expect[:51] assert pulled == expect[:51]
print('trio guest mode task completed!') print('trio guest-mode task completed!')
assert chan._aio_task.done()
@pytest.mark.parametrize( @pytest.mark.parametrize(
@ -501,19 +530,37 @@ def test_trio_error_cancels_intertask_chan(reg_addr):
excinfo.value.boxed_type is Exception excinfo.value.boxed_type is Exception
def test_trio_closes_early_and_channel_exits(reg_addr): def test_trio_closes_early_and_channel_exits(
reg_addr: tuple[str, int],
):
'''
Check that if the `trio`-task "exits early" on `async for`ing the
inter-task-channel (via a `break`) we exit silently from the
`open_channel_from()` block and get a final `Return[None]` msg.
'''
async def main(): async def main():
async with tractor.open_nursery() as n: with trio.fail_after(2):
portal = await n.run_in_actor( async with tractor.open_nursery(
stream_from_aio, # debug_mode=True,
exit_early=True, # enable_stack_on_sig=True,
infect_asyncio=True, ) as n:
) portal = await n.run_in_actor(
# should raise RAE diectly stream_from_aio,
await portal.result() exit_early=True,
infect_asyncio=True,
)
# should raise RAE diectly
print('waiting on final infected subactor result..')
res: None = await portal.wait_for_result()
assert res is None
print('infected subactor returned result: {res!r}\n')
# should be a quiet exit on a simple channel exit # should be a quiet exit on a simple channel exit
trio.run(main) trio.run(
main,
# strict_exception_groups=False,
)
def test_aio_errors_and_channel_propagates_and_closes(reg_addr): def test_aio_errors_and_channel_propagates_and_closes(reg_addr):
@ -660,6 +707,7 @@ def test_echoserver_detailed_mechanics(
) )
def test_infected_root_actor( def test_infected_root_actor(
raise_error_mid_stream: bool|Exception, raise_error_mid_stream: bool|Exception,
# conftest wide # conftest wide
loglevel: str, loglevel: str,
debug_mode: bool, debug_mode: bool,
@ -670,36 +718,38 @@ def test_infected_root_actor(
''' '''
async def _trio_main(): async def _trio_main():
with trio.fail_after(2):
first: str
chan: to_asyncio.LinkedTaskChannel
async with (
tractor.open_root_actor(
debug_mode=debug_mode,
loglevel=loglevel,
),
to_asyncio.open_channel_from(
aio_echo_server,
) as (first, chan),
):
assert first == 'start'
first: str for i in range(1000):
chan: to_asyncio.LinkedTaskChannel await chan.send(i)
async with ( out = await chan.receive()
tractor.open_root_actor( assert out == i
debug_mode=debug_mode, print(f'asyncio echoing {i}')
loglevel=loglevel,
),
to_asyncio.open_channel_from(
aio_echo_server,
) as (first, chan),
):
assert first == 'start'
for i in range(1000): if raise_error_mid_stream and i == 500:
await chan.send(i) raise raise_error_mid_stream
out = await chan.receive()
assert out == i
print(f'asyncio echoing {i}')
if raise_error_mid_stream and i == 500: if out is None:
raise raise_error_mid_stream try:
out = await chan.receive()
if out is None: except trio.EndOfChannel:
try: break
out = await chan.receive() else:
except trio.EndOfChannel: raise RuntimeError(
break 'aio channel never stopped?'
else: )
raise RuntimeError('aio channel never stopped?')
if raise_error_mid_stream: if raise_error_mid_stream:
with pytest.raises(raise_error_mid_stream): with pytest.raises(raise_error_mid_stream):
@ -947,6 +997,158 @@ def test_sigint_closes_lifetime_stack(
trio.run(main) trio.run(main)
async def sync_and_err(
# just signature placeholders for compat with
# ``to_asyncio.open_channel_from()``
to_trio: trio.MemorySendChannel,
from_trio: asyncio.Queue,
ev: asyncio.Event,
):
if to_trio:
to_trio.send_nowait('start')
await ev.wait()
raise RuntimeError('asyncio-side')
@pytest.mark.parametrize(
'aio_err_trigger',
[
'before_start_point',
'after_trio_task_starts',
'after_start_point',
],
ids='aio_err_triggered={}'.format
)
def test_trio_prestarted_task_bubbles(
aio_err_trigger: str,
# conftest wide
loglevel: str,
debug_mode: bool,
):
async def pre_started_err(
raise_err: bool = False,
pre_sleep: float|None = None,
aio_trigger: asyncio.Event|None = None,
task_status=trio.TASK_STATUS_IGNORED,
):
'''
Maybe pre-started error then sleep.
'''
if pre_sleep is not None:
print(f'Sleeping from trio for {pre_sleep!r}s !')
await trio.sleep(pre_sleep)
# signal aio-task to raise JUST AFTER this task
# starts but has not yet `.started()`
if aio_trigger:
print('Signalling aio-task to raise from `trio`!!')
aio_trigger.set()
if raise_err:
print('Raising from trio!')
raise TypeError('trio-side')
task_status.started()
await trio.sleep_forever()
async def _trio_main():
# with trio.fail_after(2):
with trio.fail_after(999):
first: str
chan: to_asyncio.LinkedTaskChannel
aio_ev = asyncio.Event()
async with (
tractor.open_root_actor(
debug_mode=False,
loglevel=loglevel,
),
# where we'll start a sub-task that errors BEFORE
# calling `.started()` such that the error should
# bubble before the guest run terminates!
trio.open_nursery() as tn,
# THEN start an infect task which should error just
# after the trio-side's task does.
to_asyncio.open_channel_from(
partial(
sync_and_err,
ev=aio_ev,
)
) as (first, chan),
):
for i in range(5):
pre_sleep: float|None = None
raise_err: bool = False
last_iter: bool = (i == 4)
if last_iter:
raise_err: bool = True
# trigger aio task to error on next loop
# tick/checkpoint
if aio_err_trigger == 'before_start_point':
aio_ev.set()
pre_sleep: float = 0
await tn.start(
pre_started_err,
raise_err,
pre_sleep,
(aio_ev if (
aio_err_trigger == 'after_trio_task_starts'
and
last_iter
) else None
),
)
if (
aio_err_trigger == 'after_start_point'
and
last_iter
):
aio_ev.set()
with pytest.raises(
expected_exception=ExceptionGroup,
) as excinfo:
tractor.to_asyncio.run_as_asyncio_guest(
trio_main=_trio_main,
)
eg = excinfo.value
rte_eg, rest_eg = eg.split(RuntimeError)
# ensure the trio-task's error bubbled despite the aio-side
# having (maybe) errored first.
if aio_err_trigger in (
'after_trio_task_starts',
'after_start_point',
):
assert len(errs := rest_eg.exceptions) == 1
typerr = errs[0]
assert (
type(typerr) is TypeError
and
'trio-side' in typerr.args
)
# when aio errors BEFORE (last) trio task is scheduled, we should
# never see anythinb but the aio-side.
else:
assert len(rtes := rte_eg.exceptions) == 1
assert 'asyncio-side' in rtes[0].args[0]
# TODO: debug_mode tests once we get support for `asyncio`! # TODO: debug_mode tests once we get support for `asyncio`!
# #
# -[ ] need tests to wrap both scripts: # -[ ] need tests to wrap both scripts: