From ad2567dd735dc462e0812dfe6d5ee74ff17adb81 Mon Sep 17 00:00:00 2001 From: Tyler Goodlet Date: Mon, 22 Nov 2021 13:27:16 -0500 Subject: [PATCH] Add first set of interloop streaming tests --- tests/test_infected_asyncio.py | 141 ++++++++++++++++++++++++++------- 1 file changed, 113 insertions(+), 28 deletions(-) diff --git a/tests/test_infected_asyncio.py b/tests/test_infected_asyncio.py index c219a4b..78003f7 100644 --- a/tests/test_infected_asyncio.py +++ b/tests/test_infected_asyncio.py @@ -134,8 +134,6 @@ def test_trio_cancels_aio(arb_addr): # cancel the nursery shortly after boot async with tractor.open_nursery() as n: - # debug_mode=True - # ) as n: portal = await n.run_in_actor( asyncio_actor, target='sleep_forever', @@ -177,7 +175,7 @@ def test_aio_cancelled_from_aio_causes_trio_cancelled(arb_addr): trio.run(main) -# TODO: +# TODO: verify open_channel_from will fail on this.. async def no_to_trio_in_args(): pass @@ -186,33 +184,77 @@ async def push_from_aio_task( sequence: Iterable, to_trio: trio.abc.SendChannel, + expect_cancel: False, + fail_early: bool, ) -> None: - for i in range(100): - print(f'asyncio sending {i}') - to_trio.send_nowait(i) - await asyncio.sleep(0.001) - print(f'asyncio streamer complete!') + try: + # sync caller ctx manager + to_trio.send_nowait(True) + + for i in sequence: + print(f'asyncio sending {i}') + to_trio.send_nowait(i) + await asyncio.sleep(0.001) + + if i == 50 and fail_early: + raise Exception + + print(f'asyncio streamer complete!') + + except asyncio.CancelledError: + if not expect_cancel: + pytest.fail("aio task was cancelled unexpectedly") + raise + else: + if expect_cancel: + pytest.fail("aio task wasn't cancelled as expected!?") -async def stream_from_aio(): +async def stream_from_aio( + + exit_early: bool = False, + raise_err: bool = False, + aio_raise_err: bool = False, + +) -> None: seq = range(100) expect = list(seq) - async with to_asyncio.open_channel_from( - push_from_aio_task, - sequence=seq, - ) as (first, chan): + try: + pulled = [] - pulled = [first] - async for value in chan: - print(f'trio received {value}') - pulled.append(value) + async with to_asyncio.open_channel_from( + push_from_aio_task, + sequence=seq, + expect_cancel=raise_err or exit_early, + fail_early=aio_raise_err, + ) as (first, chan): - assert pulled == expect + assert first is True - print('trio guest mode task completed!') + async for value in chan: + print(f'trio received {value}') + pulled.append(value) + + if value == 50: + if raise_err: + raise Exception + elif exit_early: + break + finally: + + if ( + not raise_err and + not exit_early and + not aio_raise_err + ): + assert pulled == expect + else: + assert pulled == expect[:51] + + print('trio guest mode task completed!') def test_basic_interloop_channel_stream(arb_addr): @@ -227,14 +269,57 @@ def test_basic_interloop_channel_stream(arb_addr): trio.run(main) +# TODO: parametrize the above test and avoid the duplication here? +def test_trio_error_cancels_intertask_chan(arb_addr): + async def main(): + async with tractor.open_nursery() as n: + portal = await n.run_in_actor( + stream_from_aio, + raise_err=True, + infect_asyncio=True, + ) + # should trigger remote actor error + await portal.result() -# def test_trio_error_cancels_intertask_chan(arb_addr): -# ... - - -# def test_trio_cancels_and_channel_exits(arb_addr): -# ... - - -# def test_aio_errors_and_channel_propagates(arb_addr): + with pytest.raises(RemoteActorError) as excinfo: + trio.run(main) + + # ensure boxed error is correct + assert excinfo.value.type == Exception + + +def test_trio_closes_early_and_channel_exits(arb_addr): + async def main(): + async with tractor.open_nursery() as n: + portal = await n.run_in_actor( + stream_from_aio, + exit_early=True, + infect_asyncio=True, + ) + # should trigger remote actor error + await portal.result() + + # should be a quiet exit on a simple channel exit + trio.run(main) + + +def test_aio_errors_and_channel_propagates_and_closes(arb_addr): + async def main(): + async with tractor.open_nursery() as n: + portal = await n.run_in_actor( + stream_from_aio, + aio_raise_err=True, + infect_asyncio=True, + ) + # should trigger remote actor error + await portal.result() + + with pytest.raises(RemoteActorError) as excinfo: + trio.run(main) + + # ensure boxed error is correct + assert excinfo.value.type == Exception + + +# def test_2way_reqresp(arb_addr): # ...