diff --git a/tests/test_context_stream_semantics.py b/tests/test_context_stream_semantics.py index c92c440..f7c1e92 100644 --- a/tests/test_context_stream_semantics.py +++ b/tests/test_context_stream_semantics.py @@ -13,7 +13,10 @@ from typing import Optional import pytest import trio import tractor -from tractor._exceptions import StreamOverrun +from tractor._exceptions import ( + StreamOverrun, + ContextCancelled, +) from conftest import tractor_test @@ -91,7 +94,10 @@ async def not_started_but_stream_opened( @pytest.mark.parametrize( 'target', - [too_many_starteds, not_started_but_stream_opened], + [ + too_many_starteds, + not_started_but_stream_opened, + ], ids='misuse_type={}'.format, ) def test_started_misuse(target): @@ -228,6 +234,70 @@ def test_simple_context( trio.run(main) +@pytest.mark.parametrize( + 'cancel_method', + ['ctx', 'portal'], + ids=lambda item: f'cancel_method={item}' +) +@pytest.mark.parametrize( + 'result_before_exit', + [True, False], + ids=lambda item: f'result_before_exit={item}' +) +def test_caller_cancels( + cancel_method: str, + result_before_exit: bool, +): + ''' + Verify that when the opening side of a context (aka the caller) + cancels that context, the ctx does not raise a cancelled when + either calling `.result()` or on context exit. + + ''' + + async def check_canceller( + ctx: tractor.Context, + ) -> None: + # should not raise yet return the remote + # context cancelled error. + err = await ctx.result() + assert isinstance(err, ContextCancelled) + assert ( + tuple(err.canceller) + == + tractor.current_actor().uid + ) + + async def main(): + async with tractor.open_nursery() as nursery: + portal = await nursery.start_actor( + 'simple_context', + enable_modules=[__name__], + ) + with trio.fail_after(0.5): + async with portal.open_context( + simple_setup_teardown, + data=10, + block_forever=True, + ) as (ctx, sent): + + if cancel_method == 'ctx': + await ctx.cancel() + else: + await portal.cancel_actor() + + if result_before_exit: + await check_canceller(ctx) + + if not result_before_exit: + await check_canceller(ctx) + + if cancel_method != 'portal': + await portal.cancel_actor() + + trio.run(main) + + # basic stream terminations: # - callee context closes without using stream # - caller context closes without using stream @@ -506,7 +576,6 @@ async def test_callee_cancels_before_started(): cancel_self, ) as (ctx, sent): async with ctx.open_stream(): - await trio.sleep_forever() # raises a special cancel signal @@ -610,7 +679,7 @@ def test_one_end_stream_not_opened(overrun_by): # 2 overrun cases and the no overrun case (which pushes right up to # the msg limit) - if overrunner == 'caller' or 'cance' in overrunner: + if overrunner == 'caller' or 'cancel' in overrunner: with pytest.raises(tractor.RemoteActorError) as excinfo: trio.run(main) @@ -634,7 +703,7 @@ async def echo_back_sequence( ctx: tractor.Context, seq: list[int], - msg_buffer_size: Optional[int] = None, + msg_buffer_size: int | None = None, ) -> None: ''' @@ -644,11 +713,13 @@ async def echo_back_sequence( await ctx.started() async with ctx.open_stream( msg_buffer_size=msg_buffer_size, + backpressure=True, ) as stream: seq = list(seq) # bleh, `msgpack`... count = 0 - while count < 3: + # while count < 10: + while True: batch = [] async for msg in stream: batch.append(msg) @@ -661,13 +732,17 @@ async def echo_back_sequence( count += 1 - return 'yo' + print("EXITING CALLEEE") + return 'yo' -def test_stream_backpressure(): +def test_stream_backpressure( + loglevel: str, +): ''' Demonstrate small overruns of each task back and forth - on a stream not raising any errors by default. + on a stream not raising any errors by default by setting + the ``backpressure=True``. ''' async def main(): @@ -675,16 +750,21 @@ def test_stream_backpressure(): portal = await n.start_actor( 'callee_sends_forever', enable_modules=[__name__], + loglevel=loglevel, ) seq = list(range(3)) async with portal.open_context( echo_back_sequence, seq=seq, - msg_buffer_size=1, ) as (ctx, sent): + assert sent is None - async with ctx.open_stream(msg_buffer_size=1) as stream: + async with ctx.open_stream( + msg_buffer_size=1, + backpressure=True, + # allow_overruns=True, + ) as stream: count = 0 while count < 3: for msg in seq: @@ -693,15 +773,25 @@ def test_stream_backpressure(): await trio.sleep(0.1) batch = [] + # with trio.move_on_after(1) as cs: async for msg in stream: + print(f'RX {msg}') batch.append(msg) if batch == seq: break count += 1 + # if cs.cancelled_caught: + # break + + # cancel the remote task + # print('SENDING ROOT SIDE CANCEL') + # await ctx.cancel() + # here the context should return - assert await ctx.result() == 'yo' + res = await ctx.result() + assert res == 'yo' # cancel the daemon await portal.cancel_actor() @@ -737,18 +827,18 @@ async def attach_to_sleep_forever(): finally: # XXX: previously this would trigger local # ``ContextCancelled`` to be received and raised in the - # local context overriding any local error due to - # logic inside ``_invoke()`` which checked for - # an error set on ``Context._error`` and raised it in - # under a cancellation scenario. - - # The problem is you can have a remote cancellation - # that is part of a local error and we shouldn't raise - # ``ContextCancelled`` **iff** we weren't the side of - # the context to initiate it, i.e. + # local context overriding any local error due to logic + # inside ``_invoke()`` which checked for an error set on + # ``Context._error`` and raised it in a cancellation + # scenario. + # ------ + # The problem is you can have a remote cancellation that + # is part of a local error and we shouldn't raise + # ``ContextCancelled`` **iff** we **were not** the side + # of the context to initiate it, i.e. # ``Context._cancel_called`` should **NOT** have been # set. The special logic to handle this case is now - # inside ``Context._may_raise_from_remote_msg()`` XD + # inside ``Context._maybe_raise_from_remote_msg()`` XD await peer_ctx.cancel() @@ -769,9 +859,10 @@ async def error_before_started( def test_do_not_swallow_error_before_started_by_remote_contextcancelled(): ''' - Verify that an error raised in a remote context which itself opens another - remote context, which it cancels, does not ovverride the original error that - caused the cancellation of the secondardy context. + Verify that an error raised in a remote context which itself opens + another remote context, which it cancels, does not ovverride the + original error that caused the cancellation of the secondardy + context. ''' async def main():