From b463841019255ab801616b9c4744e6fd8c0b3247 Mon Sep 17 00:00:00 2001 From: Tyler Goodlet Date: Fri, 10 Dec 2021 13:48:41 -0500 Subject: [PATCH] Add infected `asyncio` echo server example --- examples/infected_asyncio_echo_server.py | 90 ++++++++++++++++++++++++ 1 file changed, 90 insertions(+) create mode 100644 examples/infected_asyncio_echo_server.py diff --git a/examples/infected_asyncio_echo_server.py b/examples/infected_asyncio_echo_server.py new file mode 100644 index 0000000..aeda42c --- /dev/null +++ b/examples/infected_asyncio_echo_server.py @@ -0,0 +1,90 @@ +''' +An SC compliant infected ``asyncio`` echo server. + +''' +import asyncio +from statistics import mean +import time + +import trio +import tractor + + +async def aio_echo_server( + to_trio: trio.MemorySendChannel, + from_trio: asyncio.Queue, +) -> None: + + # a first message must be sent **from** this ``asyncio`` + # task or the ``trio`` side will never unblock from + # ``tractor.to_asyncio.open_channel_from():`` + to_trio.send_nowait('start') + + # XXX: this uses an ``from_trio: asyncio.Queue`` currently but we + # should probably offer something better. + while True: + # echo the msg back + to_trio.send_nowait(await from_trio.get()) + + +@tractor.context +async def trio_to_aio_echo_server( + ctx: tractor.Context, +): + # this will block until the ``asyncio`` task sends a "first" + # message. + async with tractor.to_asyncio.open_channel_from( + aio_echo_server, + ) as (first, chan): + + assert first == 'start' + await ctx.started(first) + + async with ctx.open_stream() as stream: + + async for msg in stream: + await chan.send(msg) + + out = await chan.receive() + # echo back to parent actor-task + await stream.send(out) + + +async def main(): + + async with tractor.open_nursery() as n: + p = await n.start_actor( + 'aio_server', + enable_modules=[__name__], + infect_asyncio=True, + ) + async with p.open_context( + trio_to_aio_echo_server, + ) as (ctx, first): + + assert first == 'start' + + count = 0 + async with ctx.open_stream() as stream: + + delays = [] + send = time.time() + + await stream.send(count) + async for msg in stream: + recv = time.time() + delays.append(recv - send) + assert msg == count + count += 1 + send = time.time() + await stream.send(count) + + if count >= 1e3: + break + + print(f'mean round trip rate (Hz): {1/mean(delays)}') + await p.cancel_actor() + + +if __name__ == '__main__': + trio.run(main)