From 11d471a4cd72302bc47372d208bb12a3f382e6e6 Mon Sep 17 00:00:00 2001 From: Tyler Goodlet Date: Sun, 13 Jun 2021 20:25:36 -0400 Subject: [PATCH] Add detailed ``@tractor.context`` cancellation/termination tests --- tests/test_2way.py | 320 ++++++++++++++++++++++++++++++++++++++++++--- 1 file changed, 303 insertions(+), 17 deletions(-) diff --git a/tests/test_2way.py b/tests/test_2way.py index fec0cae..716e2ce 100644 --- a/tests/test_2way.py +++ b/tests/test_2way.py @@ -6,20 +6,13 @@ import pytest import trio import tractor -# from conftest import tractor_test +from conftest import tractor_test -# TODO: test endofchannel semantics / cancellation / error cases: -# 3 possible outcomes: -# - normal termination: far end relays a stop message with -# final value as in async gen from ``return ``. - -# possible outcomes: -# - normal termination: far end returns -# - premature close: far end relays a stop message to tear down stream -# - cancel: far end raises `ContextCancelled` - -# future possible outcomes -# - restart request: far end raises `ContextRestart` +# the general stream semantics are +# - normal termination: far end relays a stop message which +# terminates an ongoing ``MsgStream`` iteration +# - cancel termination: context is cancelled on either side cancelling +# the "linked" inter-actor task context _state: bool = False @@ -30,6 +23,7 @@ async def simple_setup_teardown( ctx: tractor.Context, data: int, + block_forever: bool = False, ) -> None: @@ -41,8 +35,11 @@ async def simple_setup_teardown( await ctx.started(data + 1) try: - # block until cancelled - await trio.sleep_forever() + if block_forever: + # block until cancelled + await trio.sleep_forever() + else: + return 'yo' finally: _state = False @@ -56,7 +53,14 @@ async def assert_state(value: bool): 'error_parent', [False, True], ) -def test_simple_context(error_parent): +@pytest.mark.parametrize( + 'callee_blocks_forever', + [False, True], +) +def test_simple_context( + error_parent, + callee_blocks_forever, +): async def main(): @@ -70,11 +74,16 @@ def test_simple_context(error_parent): async with portal.open_context( simple_setup_teardown, data=10, + block_forever=callee_blocks_forever, ) as (ctx, sent): assert sent == 11 - await portal.run(assert_state, value=True) + if callee_blocks_forever: + await portal.run(assert_state, value=True) + await ctx.cancel() + else: + assert await ctx.result() == 'yo' # after cancellation await portal.run(assert_state, value=False) @@ -94,6 +103,281 @@ def test_simple_context(error_parent): trio.run(main) +# basic stream terminations: +# - callee context closes without using stream +# - caller context closes without using stream +# - caller context calls `Context.cancel()` while streaming +# is ongoing resulting in callee being cancelled +# - callee calls `Context.cancel()` while streaming and caller +# sees stream terminated in `RemoteActorError` + +# TODO: future possible features +# - restart request: far end raises `ContextRestart` + + +@tractor.context +async def close_ctx_immediately( + + ctx: tractor.Context, + +) -> None: + + await ctx.started() + global _state + + async with ctx.open_stream(): + pass + + +@tractor_test +async def test_callee_closes_ctx_after_stream_open(): + 'callee context closes without using stream' + + async with tractor.open_nursery() as n: + + portal = await n.start_actor( + 'fast_stream_closer', + enable_modules=[__name__], + ) + + async with portal.open_context( + close_ctx_immediately, + + # flag to avoid waiting the final result + # cancel_on_exit=True, + + ) as (ctx, sent): + + assert sent is None + + with trio.fail_after(0.5): + async with ctx.open_stream() as stream: + + # should fall through since ``StopAsyncIteration`` + # should be raised through translation of + # a ``trio.EndOfChannel`` by + # ``trio.abc.ReceiveChannel.__anext__()`` + async for _ in stream: + assert 0 + else: + + # verify stream is now closed + try: + await stream.receive() + except trio.EndOfChannel: + pass + + # TODO: should be just raise the closed resource err + # directly here to enforce not allowing a re-open + # of a stream to the context (at least until a time of + # if/when we decide that's a good idea?) + try: + async with ctx.open_stream() as stream: + pass + except trio.ClosedResourceError: + pass + + await portal.cancel_actor() + + +@tractor.context +async def expect_cancelled( + + ctx: tractor.Context, + +) -> None: + global _state + _state = True + + await ctx.started() + + try: + async with ctx.open_stream() as stream: + async for msg in stream: + await stream.send(msg) # echo server + + except trio.Cancelled: + # expected case + _state = False + raise + + else: + assert 0, "Wasn't cancelled!?" + + +@pytest.mark.parametrize( + 'use_ctx_cancel_method', + [False, True], +) +@tractor_test +async def test_caller_closes_ctx_after_callee_opens_stream( + use_ctx_cancel_method: bool, +): + 'caller context closes without using stream' + + async with tractor.open_nursery() as n: + + portal = await n.start_actor( + 'ctx_cancelled', + enable_modules=[__name__], + ) + + async with portal.open_context( + expect_cancelled, + ) as (ctx, sent): + await portal.run(assert_state, value=True) + + assert sent is None + + # call cancel explicitly + if use_ctx_cancel_method: + + await ctx.cancel() + + try: + async with ctx.open_stream() as stream: + async for msg in stream: + pass + + except tractor.ContextCancelled: + raise # XXX: must be propagated to __aexit__ + + else: + assert 0, "Should have context cancelled?" + + # channel should still be up + assert portal.channel.connected() + + # ctx is closed here + await portal.run(assert_state, value=False) + + else: + try: + with trio.fail_after(0.2): + await ctx.result() + assert 0, "Callee should have blocked!?" + except trio.TooSlowError: + await ctx.cancel() + try: + async with ctx.open_stream() as stream: + async for msg in stream: + pass + except trio.ClosedResourceError: + pass + else: + assert 0, "Should have received closed resource error?" + + # ctx is closed here + await portal.run(assert_state, value=False) + + # channel should not have been destroyed yet, only the + # inter-actor-task context + assert portal.channel.connected() + + # teardown the actor + await portal.cancel_actor() + + +@tractor_test +async def test_multitask_caller_cancels_from_nonroot_task(): + + async with tractor.open_nursery() as n: + + portal = await n.start_actor( + 'ctx_cancelled', + enable_modules=[__name__], + ) + + async with portal.open_context( + expect_cancelled, + ) as (ctx, sent): + + await portal.run(assert_state, value=True) + assert sent is None + + async with ctx.open_stream() as stream: + + async def send_msg_then_cancel(): + await stream.send('yo') + await portal.run(assert_state, value=True) + await ctx.cancel() + await portal.run(assert_state, value=False) + + async with trio.open_nursery() as n: + n.start_soon(send_msg_then_cancel) + + try: + async for msg in stream: + assert msg == 'yo' + + except tractor.ContextCancelled: + raise # XXX: must be propagated to __aexit__ + + # channel should still be up + assert portal.channel.connected() + + # ctx is closed here + await portal.run(assert_state, value=False) + + # channel should not have been destroyed yet, only the + # inter-actor-task context + assert portal.channel.connected() + + # teardown the actor + await portal.cancel_actor() + + +@tractor.context +async def cancel_self( + + ctx: tractor.Context, + +) -> None: + global _state + _state = True + + await ctx.cancel() + try: + with trio.fail_after(0.1): + await trio.sleep_forever() + + except trio.Cancelled: + raise + + except trio.TooSlowError: + # should never get here + assert 0 + + +@tractor_test +async def test_callee_cancels_before_started(): + '''callee calls `Context.cancel()` while streaming and caller + sees stream terminated in `ContextCancelled`. + + ''' + async with tractor.open_nursery() as n: + + portal = await n.start_actor( + 'cancels_self', + enable_modules=[__name__], + ) + try: + + async with portal.open_context( + cancel_self, + ) as (ctx, sent): + async with ctx.open_stream(): + + await trio.sleep_forever() + + # raises a special cancel signal + except tractor.ContextCancelled as ce: + ce.type == trio.Cancelled + + # teardown the actor + await portal.cancel_actor() + + @tractor.context async def simple_rpc( @@ -207,6 +491,8 @@ def test_simple_rpc(server_func, use_async_for): # stream should terminate here + # final context result(s) should be consumed here in __aexit__() + await portal.cancel_actor() trio.run(main)