diff --git a/tests/test_infected_asyncio.py b/tests/test_infected_asyncio.py index b0a1171..5d88920 100644 --- a/tests/test_infected_asyncio.py +++ b/tests/test_infected_asyncio.py @@ -109,7 +109,9 @@ async def asyncio_actor( except BaseException as err: if expect_err: - assert isinstance(err, error_type) + assert isinstance(err, error_type), ( + f'{type(err)} is not {error_type}?' + ) raise @@ -181,8 +183,8 @@ def test_trio_cancels_aio(reg_addr): with trio.move_on_after(1): # cancel the nursery shortly after boot - async with tractor.open_nursery() as n: - await n.run_in_actor( + async with tractor.open_nursery() as tn: + await tn.run_in_actor( asyncio_actor, target='aio_sleep_forever', expect_err='trio.Cancelled', @@ -202,22 +204,33 @@ async def trio_ctx( # this will block until the ``asyncio`` task sends a "first" # message. with trio.fail_after(2): - async with ( - trio.open_nursery() as n, + try: + async with ( + trio.open_nursery( + # TODO, for new `trio` / py3.13 + # strict_exception_groups=False, + ) as tn, + tractor.to_asyncio.open_channel_from( + sleep_and_err, + ) as (first, chan), + ): - tractor.to_asyncio.open_channel_from( - sleep_and_err, - ) as (first, chan), - ): + assert first == 'start' - assert first == 'start' + # spawn another asyncio task for the cuck of it. + tn.start_soon( + tractor.to_asyncio.run_task, + aio_sleep_forever, + ) + await trio.sleep_forever() - # spawn another asyncio task for the cuck of it. - n.start_soon( - tractor.to_asyncio.run_task, - aio_sleep_forever, - ) - await trio.sleep_forever() + # TODO, factor this into a `trionics.callapse()`? + except* BaseException as beg: + # await tractor.pause(shield=True) + if len(excs := beg.exceptions) == 1: + raise excs[0] + else: + raise @pytest.mark.parametrize( @@ -236,7 +249,6 @@ def test_context_spawns_aio_task_that_errors( ''' async def main(): - with trio.fail_after(2): async with tractor.open_nursery() as n: p = await n.start_actor( @@ -308,7 +320,9 @@ async def aio_cancel(): await aio_sleep_forever() -def test_aio_cancelled_from_aio_causes_trio_cancelled(reg_addr): +def test_aio_cancelled_from_aio_causes_trio_cancelled( + reg_addr: tuple, +): ''' When the `asyncio.Task` cancels itself the `trio` side cshould also cancel and teardown and relay the cancellation cross-process @@ -405,6 +419,7 @@ async def stream_from_aio( sequence=seq, expect_cancel=raise_err or exit_early, fail_early=aio_raise_err, + ) as (first, chan): assert first is True @@ -423,10 +438,15 @@ async def stream_from_aio( if raise_err: raise Exception elif exit_early: + print('`consume()` breaking early!\n') break + print('returning from `consume()`..\n') + + # run 2 tasks each pulling from + # the inter-task-channel with the 2nd + # using a fan-out `BroadcastReceiver`. if fan_out: - # start second task that get's the same stream value set. async with ( # NOTE: this has to come first to avoid @@ -436,11 +456,19 @@ async def stream_from_aio( trio.open_nursery() as n, ): + # start 2nd task that get's broadcast the same + # value set. n.start_soon(consume, br) await consume(chan) else: await consume(chan) + except BaseException as err: + import logging + log = logging.getLogger() + log.exception('aio-subactor errored!\n') + raise err + finally: if ( @@ -461,7 +489,8 @@ async def stream_from_aio( assert not fan_out assert pulled == expect[:51] - print('trio guest mode task completed!') + print('trio guest-mode task completed!') + assert chan._aio_task.done() @pytest.mark.parametrize( @@ -501,19 +530,37 @@ def test_trio_error_cancels_intertask_chan(reg_addr): excinfo.value.boxed_type is Exception -def test_trio_closes_early_and_channel_exits(reg_addr): +def test_trio_closes_early_and_channel_exits( + reg_addr: tuple[str, int], +): + ''' + Check that if the `trio`-task "exits early" on `async for`ing the + inter-task-channel (via a `break`) we exit silently from the + `open_channel_from()` block and get a final `Return[None]` msg. + + ''' async def main(): - async with tractor.open_nursery() as n: - portal = await n.run_in_actor( - stream_from_aio, - exit_early=True, - infect_asyncio=True, - ) - # should raise RAE diectly - await portal.result() + with trio.fail_after(2): + async with tractor.open_nursery( + # debug_mode=True, + # enable_stack_on_sig=True, + ) as n: + portal = await n.run_in_actor( + stream_from_aio, + exit_early=True, + infect_asyncio=True, + ) + # should raise RAE diectly + print('waiting on final infected subactor result..') + res: None = await portal.wait_for_result() + assert res is None + print('infected subactor returned result: {res!r}\n') # should be a quiet exit on a simple channel exit - trio.run(main) + trio.run( + main, + # strict_exception_groups=False, + ) def test_aio_errors_and_channel_propagates_and_closes(reg_addr): @@ -660,6 +707,7 @@ def test_echoserver_detailed_mechanics( ) def test_infected_root_actor( raise_error_mid_stream: bool|Exception, + # conftest wide loglevel: str, debug_mode: bool, @@ -670,36 +718,38 @@ def test_infected_root_actor( ''' async def _trio_main(): + with trio.fail_after(2): + first: str + chan: to_asyncio.LinkedTaskChannel + async with ( + tractor.open_root_actor( + debug_mode=debug_mode, + loglevel=loglevel, + ), + to_asyncio.open_channel_from( + aio_echo_server, + ) as (first, chan), + ): + assert first == 'start' - first: str - chan: to_asyncio.LinkedTaskChannel - async with ( - tractor.open_root_actor( - debug_mode=debug_mode, - loglevel=loglevel, - ), - to_asyncio.open_channel_from( - aio_echo_server, - ) as (first, chan), - ): - assert first == 'start' + for i in range(1000): + await chan.send(i) + out = await chan.receive() + assert out == i + print(f'asyncio echoing {i}') - for i in range(1000): - await chan.send(i) - out = await chan.receive() - assert out == i - print(f'asyncio echoing {i}') + if raise_error_mid_stream and i == 500: + raise raise_error_mid_stream - if raise_error_mid_stream and i == 500: - raise raise_error_mid_stream - - if out is None: - try: - out = await chan.receive() - except trio.EndOfChannel: - break - else: - raise RuntimeError('aio channel never stopped?') + if out is None: + try: + out = await chan.receive() + except trio.EndOfChannel: + break + else: + raise RuntimeError( + 'aio channel never stopped?' + ) if raise_error_mid_stream: with pytest.raises(raise_error_mid_stream): @@ -947,6 +997,158 @@ def test_sigint_closes_lifetime_stack( trio.run(main) +async def sync_and_err( + # just signature placeholders for compat with + # ``to_asyncio.open_channel_from()`` + to_trio: trio.MemorySendChannel, + from_trio: asyncio.Queue, + ev: asyncio.Event, + +): + if to_trio: + to_trio.send_nowait('start') + + await ev.wait() + raise RuntimeError('asyncio-side') + + +@pytest.mark.parametrize( + 'aio_err_trigger', + [ + 'before_start_point', + 'after_trio_task_starts', + 'after_start_point', + ], + ids='aio_err_triggered={}'.format +) +def test_trio_prestarted_task_bubbles( + aio_err_trigger: str, + + # conftest wide + loglevel: str, + debug_mode: bool, +): + + async def pre_started_err( + raise_err: bool = False, + pre_sleep: float|None = None, + aio_trigger: asyncio.Event|None = None, + task_status=trio.TASK_STATUS_IGNORED, + ): + ''' + Maybe pre-started error then sleep. + + ''' + if pre_sleep is not None: + print(f'Sleeping from trio for {pre_sleep!r}s !') + await trio.sleep(pre_sleep) + + # signal aio-task to raise JUST AFTER this task + # starts but has not yet `.started()` + if aio_trigger: + print('Signalling aio-task to raise from `trio`!!') + aio_trigger.set() + + if raise_err: + print('Raising from trio!') + raise TypeError('trio-side') + + task_status.started() + await trio.sleep_forever() + + async def _trio_main(): + # with trio.fail_after(2): + with trio.fail_after(999): + first: str + chan: to_asyncio.LinkedTaskChannel + aio_ev = asyncio.Event() + + async with ( + tractor.open_root_actor( + debug_mode=False, + loglevel=loglevel, + ), + + # where we'll start a sub-task that errors BEFORE + # calling `.started()` such that the error should + # bubble before the guest run terminates! + trio.open_nursery() as tn, + + # THEN start an infect task which should error just + # after the trio-side's task does. + to_asyncio.open_channel_from( + partial( + sync_and_err, + ev=aio_ev, + ) + ) as (first, chan), + ): + + for i in range(5): + pre_sleep: float|None = None + raise_err: bool = False + last_iter: bool = (i == 4) + + if last_iter: + raise_err: bool = True + + # trigger aio task to error on next loop + # tick/checkpoint + if aio_err_trigger == 'before_start_point': + aio_ev.set() + + pre_sleep: float = 0 + + await tn.start( + pre_started_err, + raise_err, + pre_sleep, + (aio_ev if ( + aio_err_trigger == 'after_trio_task_starts' + and + last_iter + ) else None + ), + ) + + if ( + aio_err_trigger == 'after_start_point' + and + last_iter + ): + aio_ev.set() + + with pytest.raises( + expected_exception=ExceptionGroup, + ) as excinfo: + tractor.to_asyncio.run_as_asyncio_guest( + trio_main=_trio_main, + ) + + eg = excinfo.value + rte_eg, rest_eg = eg.split(RuntimeError) + + # ensure the trio-task's error bubbled despite the aio-side + # having (maybe) errored first. + if aio_err_trigger in ( + 'after_trio_task_starts', + 'after_start_point', + ): + assert len(errs := rest_eg.exceptions) == 1 + typerr = errs[0] + assert ( + type(typerr) is TypeError + and + 'trio-side' in typerr.args + ) + + # when aio errors BEFORE (last) trio task is scheduled, we should + # never see anythinb but the aio-side. + else: + assert len(rtes := rte_eg.exceptions) == 1 + assert 'asyncio-side' in rtes[0].args[0] + + # TODO: debug_mode tests once we get support for `asyncio`! # # -[ ] need tests to wrap both scripts: