From 71cd4453190401573eb3dc0fb75e210b445461b5 Mon Sep 17 00:00:00 2001 From: Tyler Goodlet Date: Wed, 12 Apr 2023 18:13:30 -0400 Subject: [PATCH] Add new set of context cancellation tests These will verify new changes to the runtime/messaging core which allows us to adopt an "ignore cancel if requested by us" style handling of `ContextCancelled` more like how `trio` does with `trio.Nursery.cancel_scope.cancel()`. We now expect a `ContextCancelled.canceller: tuple` which is set to the actor uid of the actor which requested the cancellation which eventually resulted in the remote error-msg. Also adds some experimental tweaks to the "backpressure" test which it turns out is very problematic in coordination with context cancellation since blocking on the feed mem chan to some task will block the ipc msg loop and thus handling of cancellation.. More to come to both the test and core to address this hopefully since right now this test is failing. --- tests/test_context_stream_semantics.py | 141 ++++++++++++++++++++----- 1 file changed, 116 insertions(+), 25 deletions(-) diff --git a/tests/test_context_stream_semantics.py b/tests/test_context_stream_semantics.py index c92c440..f7c1e92 100644 --- a/tests/test_context_stream_semantics.py +++ b/tests/test_context_stream_semantics.py @@ -13,7 +13,10 @@ from typing import Optional import pytest import trio import tractor -from tractor._exceptions import StreamOverrun +from tractor._exceptions import ( + StreamOverrun, + ContextCancelled, +) from conftest import tractor_test @@ -91,7 +94,10 @@ async def not_started_but_stream_opened( @pytest.mark.parametrize( 'target', - [too_many_starteds, not_started_but_stream_opened], + [ + too_many_starteds, + not_started_but_stream_opened, + ], ids='misuse_type={}'.format, ) def test_started_misuse(target): @@ -228,6 +234,70 @@ def test_simple_context( trio.run(main) +@pytest.mark.parametrize( + 'cancel_method', + ['ctx', 'portal'], + ids=lambda item: f'cancel_method={item}' +) +@pytest.mark.parametrize( + 'result_before_exit', + [True, False], + ids=lambda item: f'result_before_exit={item}' +) +def test_caller_cancels( + cancel_method: str, + result_before_exit: bool, +): + ''' + Verify that when the opening side of a context (aka the caller) + cancels that context, the ctx does not raise a cancelled when + either calling `.result()` or on context exit. + + ''' + + async def check_canceller( + ctx: tractor.Context, + ) -> None: + # should not raise yet return the remote + # context cancelled error. + err = await ctx.result() + assert isinstance(err, ContextCancelled) + assert ( + tuple(err.canceller) + == + tractor.current_actor().uid + ) + + async def main(): + async with tractor.open_nursery() as nursery: + portal = await nursery.start_actor( + 'simple_context', + enable_modules=[__name__], + ) + with trio.fail_after(0.5): + async with portal.open_context( + simple_setup_teardown, + data=10, + block_forever=True, + ) as (ctx, sent): + + if cancel_method == 'ctx': + await ctx.cancel() + else: + await portal.cancel_actor() + + if result_before_exit: + await check_canceller(ctx) + + if not result_before_exit: + await check_canceller(ctx) + + if cancel_method != 'portal': + await portal.cancel_actor() + + trio.run(main) + + # basic stream terminations: # - callee context closes without using stream # - caller context closes without using stream @@ -506,7 +576,6 @@ async def test_callee_cancels_before_started(): cancel_self, ) as (ctx, sent): async with ctx.open_stream(): - await trio.sleep_forever() # raises a special cancel signal @@ -610,7 +679,7 @@ def test_one_end_stream_not_opened(overrun_by): # 2 overrun cases and the no overrun case (which pushes right up to # the msg limit) - if overrunner == 'caller' or 'cance' in overrunner: + if overrunner == 'caller' or 'cancel' in overrunner: with pytest.raises(tractor.RemoteActorError) as excinfo: trio.run(main) @@ -634,7 +703,7 @@ async def echo_back_sequence( ctx: tractor.Context, seq: list[int], - msg_buffer_size: Optional[int] = None, + msg_buffer_size: int | None = None, ) -> None: ''' @@ -644,11 +713,13 @@ async def echo_back_sequence( await ctx.started() async with ctx.open_stream( msg_buffer_size=msg_buffer_size, + backpressure=True, ) as stream: seq = list(seq) # bleh, `msgpack`... count = 0 - while count < 3: + # while count < 10: + while True: batch = [] async for msg in stream: batch.append(msg) @@ -661,13 +732,17 @@ async def echo_back_sequence( count += 1 - return 'yo' + print("EXITING CALLEEE") + return 'yo' -def test_stream_backpressure(): +def test_stream_backpressure( + loglevel: str, +): ''' Demonstrate small overruns of each task back and forth - on a stream not raising any errors by default. + on a stream not raising any errors by default by setting + the ``backpressure=True``. ''' async def main(): @@ -675,16 +750,21 @@ def test_stream_backpressure(): portal = await n.start_actor( 'callee_sends_forever', enable_modules=[__name__], + loglevel=loglevel, ) seq = list(range(3)) async with portal.open_context( echo_back_sequence, seq=seq, - msg_buffer_size=1, ) as (ctx, sent): + assert sent is None - async with ctx.open_stream(msg_buffer_size=1) as stream: + async with ctx.open_stream( + msg_buffer_size=1, + backpressure=True, + # allow_overruns=True, + ) as stream: count = 0 while count < 3: for msg in seq: @@ -693,15 +773,25 @@ def test_stream_backpressure(): await trio.sleep(0.1) batch = [] + # with trio.move_on_after(1) as cs: async for msg in stream: + print(f'RX {msg}') batch.append(msg) if batch == seq: break count += 1 + # if cs.cancelled_caught: + # break + + # cancel the remote task + # print('SENDING ROOT SIDE CANCEL') + # await ctx.cancel() + # here the context should return - assert await ctx.result() == 'yo' + res = await ctx.result() + assert res == 'yo' # cancel the daemon await portal.cancel_actor() @@ -737,18 +827,18 @@ async def attach_to_sleep_forever(): finally: # XXX: previously this would trigger local # ``ContextCancelled`` to be received and raised in the - # local context overriding any local error due to - # logic inside ``_invoke()`` which checked for - # an error set on ``Context._error`` and raised it in - # under a cancellation scenario. - - # The problem is you can have a remote cancellation - # that is part of a local error and we shouldn't raise - # ``ContextCancelled`` **iff** we weren't the side of - # the context to initiate it, i.e. + # local context overriding any local error due to logic + # inside ``_invoke()`` which checked for an error set on + # ``Context._error`` and raised it in a cancellation + # scenario. + # ------ + # The problem is you can have a remote cancellation that + # is part of a local error and we shouldn't raise + # ``ContextCancelled`` **iff** we **were not** the side + # of the context to initiate it, i.e. # ``Context._cancel_called`` should **NOT** have been # set. The special logic to handle this case is now - # inside ``Context._may_raise_from_remote_msg()`` XD + # inside ``Context._maybe_raise_from_remote_msg()`` XD await peer_ctx.cancel() @@ -769,9 +859,10 @@ async def error_before_started( def test_do_not_swallow_error_before_started_by_remote_contextcancelled(): ''' - Verify that an error raised in a remote context which itself opens another - remote context, which it cancels, does not ovverride the original error that - caused the cancellation of the secondardy context. + Verify that an error raised in a remote context which itself opens + another remote context, which it cancels, does not ovverride the + original error that caused the cancellation of the secondardy + context. ''' async def main():