Add an asyncio echo server test

infect_asyncio
Tyler Goodlet 2021-11-25 17:10:22 -05:00
parent b69412a903
commit 2b9b29eb71
1 changed files with 93 additions and 3 deletions

View File

@ -161,7 +161,7 @@ def test_aio_cancelled_from_aio_causes_trio_cancelled(arb_addr):
async def main():
async with tractor.open_nursery() as n:
portal = await n.run_in_actor(
await n.run_in_actor(
asyncio_actor,
target='aio_cancel',
expect_err='tractor.to_asyncio.AsyncioCancelled',
@ -321,5 +321,95 @@ def test_aio_errors_and_channel_propagates_and_closes(arb_addr):
assert excinfo.value.type == Exception
# def test_2way_reqresp(arb_addr):
# ...
@tractor.context
async def trio_to_aio_echo_server(
ctx: tractor.Context,
):
async def aio_echo_server(
to_trio: trio.MemorySendChannel,
from_trio: asyncio.Queue,
) -> None:
to_trio.send_nowait('start')
while True:
msg = await from_trio.get()
# echo the msg back
to_trio.send_nowait(msg)
# if we get the terminate sentinel
# break the echo loop
if msg is None:
print('breaking aio echo loop')
break
async with to_asyncio.open_channel_from(
aio_echo_server,
) as (first, chan):
assert first == 'start'
await ctx.started(first)
async with ctx.open_stream() as stream:
async for msg in stream:
print(f'asyncio echoing {msg}')
await chan.send(msg)
out = await chan.receive()
# echo back to parent actor-task
await stream.send(out)
if out is None:
try:
out = await chan.receive()
except trio.EndOfChannel:
break
else:
raise RuntimeError('aio channel never stopped?')
def test_echoserver_detailed_mechanics(arb_addr):
async def main():
async with tractor.open_nursery() as n:
p = await n.start_actor(
'aio_server',
enable_modules=[__name__],
infect_asyncio=True,
)
async with p.open_context(
trio_to_aio_echo_server,
) as (ctx, first):
assert first == 'start'
async with ctx.open_stream() as stream:
for i in range(100):
await stream.send(i)
out = await stream.receive()
assert i == out
# send terminate msg
await stream.send(None)
out = await stream.receive()
assert out is None
if out is None:
# ensure the stream is stopped
# with trio.fail_after(0.1):
try:
await stream.receive()
except trio.EndOfChannel:
pass
else:
pytest.fail(
"stream wasn't stopped after sentinel?!")
# TODO: the case where this blocks and
# is cancelled by kbi or out of task cancellation
await p.cancel_actor()
trio.run(main)