diff --git a/tests/test_infected_asyncio.py b/tests/test_infected_asyncio.py index 4a5a270..ebd0616 100644 --- a/tests/test_infected_asyncio.py +++ b/tests/test_infected_asyncio.py @@ -161,7 +161,7 @@ def test_aio_cancelled_from_aio_causes_trio_cancelled(arb_addr): async def main(): async with tractor.open_nursery() as n: - portal = await n.run_in_actor( + await n.run_in_actor( asyncio_actor, target='aio_cancel', expect_err='tractor.to_asyncio.AsyncioCancelled', @@ -321,5 +321,95 @@ def test_aio_errors_and_channel_propagates_and_closes(arb_addr): assert excinfo.value.type == Exception -# def test_2way_reqresp(arb_addr): -# ... +@tractor.context +async def trio_to_aio_echo_server( + ctx: tractor.Context, +): + + async def aio_echo_server( + to_trio: trio.MemorySendChannel, + from_trio: asyncio.Queue, + ) -> None: + + to_trio.send_nowait('start') + + while True: + msg = await from_trio.get() + + # echo the msg back + to_trio.send_nowait(msg) + + # if we get the terminate sentinel + # break the echo loop + if msg is None: + print('breaking aio echo loop') + break + + async with to_asyncio.open_channel_from( + aio_echo_server, + ) as (first, chan): + + assert first == 'start' + await ctx.started(first) + + async with ctx.open_stream() as stream: + + async for msg in stream: + print(f'asyncio echoing {msg}') + await chan.send(msg) + + out = await chan.receive() + # echo back to parent actor-task + await stream.send(out) + + if out is None: + try: + out = await chan.receive() + except trio.EndOfChannel: + break + else: + raise RuntimeError('aio channel never stopped?') + + +def test_echoserver_detailed_mechanics(arb_addr): + + async def main(): + async with tractor.open_nursery() as n: + p = await n.start_actor( + 'aio_server', + enable_modules=[__name__], + infect_asyncio=True, + ) + async with p.open_context( + trio_to_aio_echo_server, + ) as (ctx, first): + + assert first == 'start' + + async with ctx.open_stream() as stream: + for i in range(100): + await stream.send(i) + out = await stream.receive() + assert i == out + + # send terminate msg + await stream.send(None) + out = await stream.receive() + assert out is None + + if out is None: + # ensure the stream is stopped + # with trio.fail_after(0.1): + try: + await stream.receive() + except trio.EndOfChannel: + pass + else: + pytest.fail( + "stream wasn't stopped after sentinel?!") + + # TODO: the case where this blocks and + # is cancelled by kbi or out of task cancellation + await p.cancel_actor() + + trio.run(main)