From b965d20cba464b468d6228a6d11053b9bb427d2b Mon Sep 17 00:00:00 2001 From: Tyler Goodlet Date: Fri, 29 Mar 2019 19:10:56 -0400 Subject: [PATCH] Add stream func tests --- tests/test_streaming.py | 55 +++++++++++++++++++++++++++++++++-------- 1 file changed, 45 insertions(+), 10 deletions(-) diff --git a/tests/test_streaming.py b/tests/test_streaming.py index 1ada466..4fc7e37 100644 --- a/tests/test_streaming.py +++ b/tests/test_streaming.py @@ -2,13 +2,29 @@ Streaming via async gen api """ import time +from functools import partial import trio import tractor import pytest -async def stream_seq(sequence): + +def test_must_define_ctx(): + + with pytest.raises(TypeError) as err: + @tractor.stream + async def no_ctx(): + pass + + assert "no_ctx must be `ctx: tractor.Context" in str(err.value) + + @tractor.stream + async def no_ctx(ctx): + pass + + +async def async_gen_stream(sequence): for i in sequence: yield i await trio.sleep(0.1) @@ -20,10 +36,23 @@ async def stream_seq(sequence): assert cs.cancelled_caught -async def stream_from_single_subactor(): +@tractor.stream +async def context_stream(ctx, sequence): + for i in sequence: + await ctx.send_yield(i) + await trio.sleep(0.1) + + # block indefinitely waiting to be cancelled by ``aclose()`` call + with trio.CancelScope() as cs: + await trio.sleep(float('inf')) + assert 0 + assert cs.cancelled_caught + + +async def stream_from_single_subactor(stream_func_name): """Verify we can spawn a daemon actor and retrieve streamed data. """ - async with tractor.find_actor('brokerd') as portals: + async with tractor.find_actor('streamerd') as portals: if not portals: # only one per host address, spawns an actor if None async with tractor.open_nursery() as nursery: @@ -36,37 +65,43 @@ async def stream_from_single_subactor(): seq = range(10) - agen = await portal.run( + stream = await portal.run( __name__, - 'stream_seq', # the func above + stream_func_name, # one of the funcs above sequence=list(seq), # has to be msgpack serializable ) # it'd sure be nice to have an asyncitertools here... iseq = iter(seq) ival = next(iseq) - async for val in agen: + async for val in stream: assert val == ival try: ival = next(iseq) except StopIteration: # should cancel far end task which will be # caught and no error is raised - await agen.aclose() + await stream.aclose() await trio.sleep(0.3) try: - await agen.__anext__() + await stream.__anext__() except StopAsyncIteration: # stop all spawned subactors await portal.cancel_actor() # await nursery.cancel() -def test_stream_from_single_subactor(arb_addr, start_method): +@pytest.mark.parametrize( + 'stream_func', ['async_gen_stream', 'context_stream'] +) +def test_stream_from_single_subactor(arb_addr, start_method, stream_func): """Verify streaming from a spawned async generator. """ tractor.run( - stream_from_single_subactor, + partial( + stream_from_single_subactor, + stream_func_name=stream_func, + ), arbiter_addr=arb_addr, start_method=start_method, )