From 2be69bb9fba66ee04e04464bd73413fbc7f0a0d5 Mon Sep 17 00:00:00 2001 From: Tyler Goodlet Date: Thu, 10 Jun 2021 14:00:09 -0400 Subject: [PATCH] Add a multi-task streaming test --- tests/test_advanced_streaming.py | 78 ++++++++++++++++++++++++++++++++ 1 file changed, 78 insertions(+) diff --git a/tests/test_advanced_streaming.py b/tests/test_advanced_streaming.py index 246fc1b..b5cf2b9 100644 --- a/tests/test_advanced_streaming.py +++ b/tests/test_advanced_streaming.py @@ -140,3 +140,81 @@ def test_dynamic_pub_sub(): trio.run(main) except trio.TooSlowError: pass + + +@tractor.context +async def one_task_streams_and_one_handles_reqresp( + + ctx: tractor.Context, + +) -> None: + + await ctx.started() + + async with ctx.open_stream() as stream: + + async def pingpong(): + '''Run a simple req/response service. + + ''' + async for msg in stream: + print('rpc server ping') + assert msg == 'ping' + print('rpc server pong') + await stream.send('pong') + + async with trio.open_nursery() as n: + n.start_soon(pingpong) + + for _ in itertools.count(): + await stream.send('yo') + await trio.sleep(0.01) + + +def test_reqresp_ontopof_streaming(): + '''Test a subactor that both streams with one task and + spawns another which handles a small requests-response + dialogue over the same bidir-stream. + + ''' + async def main(): + + with trio.move_on_after(2): + async with tractor.open_nursery() as n: + + # name of this actor will be same as target func + portal = await n.start_actor( + 'dual_tasks', + enable_modules=[__name__] + ) + + # flat to make sure we get at least one pong + got_pong: bool = False + + async with portal.open_context( + one_task_streams_and_one_handles_reqresp, + + ) as (ctx, first): + + assert first is None + + async with ctx.open_stream() as stream: + + await stream.send('ping') + + async for msg in stream: + print(f'client received: {msg}') + + assert msg in {'pong', 'yo'} + + if msg == 'pong': + got_pong = True + await stream.send('ping') + print('client sent ping') + + assert got_pong + + try: + trio.run(main) + except trio.TooSlowError: + pass