From 47b531a43a80b9355bae7eeacadca02b92afed6c Mon Sep 17 00:00:00 2001 From: Tyler Goodlet Date: Mon, 10 Dec 2018 23:13:58 -0500 Subject: [PATCH] Add test to verify remote task cancellation --- tests/test_streaming.py | 29 ++++++++++++++++++++--------- 1 file changed, 20 insertions(+), 9 deletions(-) diff --git a/tests/test_streaming.py b/tests/test_streaming.py index 5691fbd..e25a000 100644 --- a/tests/test_streaming.py +++ b/tests/test_streaming.py @@ -13,6 +13,12 @@ async def stream_seq(sequence): yield i await trio.sleep(0.1) + # block indefinitely waiting to be cancelled by ``aclose()`` call + with trio.open_cancel_scope() as cs: + await trio.sleep(float('inf')) + assert 0 + assert cs.cancelled_caught + async def stream_from_single_subactor(): """Verify we can spawn a daemon actor and retrieve streamed data. @@ -37,17 +43,22 @@ async def stream_from_single_subactor(): ) # it'd sure be nice to have an asyncitertools here... iseq = iter(seq) + ival = next(iseq) async for val in agen: - assert val == next(iseq) - # TODO: test breaking the loop (should it kill the - # far end?) - # break - # terminate far-end async-gen - # await gen.asend(None) - # break + assert val == ival + try: + ival = next(iseq) + except StopIteration: + # should cancel far end task which will be + # caught and no error is raised + await agen.aclose() - # stop all spawned subactors - await portal.cancel_actor() + await trio.sleep(0.3) + try: + await agen.__anext__() + except StopAsyncIteration: + # stop all spawned subactors + await portal.cancel_actor() # await nursery.cancel()