diff --git a/tests/test_streaming.py b/tests/test_streaming.py index baee54e..6f9d18c 100644 --- a/tests/test_streaming.py +++ b/tests/test_streaming.py @@ -79,33 +79,36 @@ async def stream_from_single_subactor( seq = range(10) - async with portal.open_stream_from( - stream_func, - sequence=list(seq), # has to be msgpack serializable - ) as stream: + with trio.fail_after(5): + async with portal.open_stream_from( + stream_func, + sequence=list(seq), # has to be msgpack serializable + ) as stream: - # it'd sure be nice to have an asyncitertools here... - iseq = iter(seq) - ival = next(iseq) + # it'd sure be nice to have an asyncitertools here... + iseq = iter(seq) + ival = next(iseq) - async for val in stream: - assert val == ival + async for val in stream: + assert val == ival + try: + ival = next(iseq) + except StopIteration: + # should cancel far end task which will be + # caught and no error is raised + await stream.aclose() + + await trio.sleep(0.3) + + # ensure EOC signalled-state translates + # XXX: not really sure this is correct, + # shouldn't it be a `ClosedResourceError`? try: - ival = next(iseq) - except StopIteration: - # should cancel far end task which will be - # caught and no error is raised - await stream.aclose() - - await trio.sleep(0.3) - - try: - await stream.__anext__() - except StopAsyncIteration: - # stop all spawned subactors - await portal.cancel_actor() - # await nursery.cancel() + await stream.__anext__() + except StopAsyncIteration: + # stop all spawned subactors + await portal.cancel_actor() @pytest.mark.parametrize(