forked from goodboy/tractor
				
			Add detailed ``@tractor.context`` cancellation/termination tests
							parent
							
								
									196dea80db
								
							
						
					
					
						commit
						7c5fd8ce9f
					
				| 
						 | 
				
			
			@ -6,20 +6,13 @@ import pytest
 | 
			
		|||
import trio
 | 
			
		||||
import tractor
 | 
			
		||||
 | 
			
		||||
# from conftest import tractor_test
 | 
			
		||||
from conftest import tractor_test
 | 
			
		||||
 | 
			
		||||
# TODO: test endofchannel semantics / cancellation / error cases:
 | 
			
		||||
# 3 possible outcomes:
 | 
			
		||||
# - normal termination: far end relays a stop message with
 | 
			
		||||
# final value as in async gen from ``return <val>``.
 | 
			
		||||
 | 
			
		||||
# possible outcomes:
 | 
			
		||||
# - normal termination: far end returns
 | 
			
		||||
# - premature close: far end relays a stop message to tear down stream
 | 
			
		||||
# - cancel: far end raises `ContextCancelled`
 | 
			
		||||
 | 
			
		||||
# future possible outcomes
 | 
			
		||||
# - restart request: far end raises `ContextRestart`
 | 
			
		||||
# the general stream semantics are
 | 
			
		||||
# - normal termination: far end relays a stop message which
 | 
			
		||||
# terminates an ongoing ``MsgStream`` iteration
 | 
			
		||||
# - cancel termination: context is cancelled on either side cancelling
 | 
			
		||||
#  the "linked" inter-actor task context
 | 
			
		||||
 | 
			
		||||
 | 
			
		||||
_state: bool = False
 | 
			
		||||
| 
						 | 
				
			
			@ -30,6 +23,7 @@ async def simple_setup_teardown(
 | 
			
		|||
 | 
			
		||||
    ctx: tractor.Context,
 | 
			
		||||
    data: int,
 | 
			
		||||
    block_forever: bool = False,
 | 
			
		||||
 | 
			
		||||
) -> None:
 | 
			
		||||
 | 
			
		||||
| 
						 | 
				
			
			@ -41,8 +35,11 @@ async def simple_setup_teardown(
 | 
			
		|||
    await ctx.started(data + 1)
 | 
			
		||||
 | 
			
		||||
    try:
 | 
			
		||||
        # block until cancelled
 | 
			
		||||
        await trio.sleep_forever()
 | 
			
		||||
        if block_forever:
 | 
			
		||||
            # block until cancelled
 | 
			
		||||
            await trio.sleep_forever()
 | 
			
		||||
        else:
 | 
			
		||||
            return 'yo'
 | 
			
		||||
    finally:
 | 
			
		||||
        _state = False
 | 
			
		||||
 | 
			
		||||
| 
						 | 
				
			
			@ -56,7 +53,14 @@ async def assert_state(value: bool):
 | 
			
		|||
    'error_parent',
 | 
			
		||||
    [False, True],
 | 
			
		||||
)
 | 
			
		||||
def test_simple_context(error_parent):
 | 
			
		||||
@pytest.mark.parametrize(
 | 
			
		||||
    'callee_blocks_forever',
 | 
			
		||||
    [False, True],
 | 
			
		||||
)
 | 
			
		||||
def test_simple_context(
 | 
			
		||||
    error_parent,
 | 
			
		||||
    callee_blocks_forever,
 | 
			
		||||
):
 | 
			
		||||
 | 
			
		||||
    async def main():
 | 
			
		||||
 | 
			
		||||
| 
						 | 
				
			
			@ -70,11 +74,16 @@ def test_simple_context(error_parent):
 | 
			
		|||
            async with portal.open_context(
 | 
			
		||||
                simple_setup_teardown,
 | 
			
		||||
                data=10,
 | 
			
		||||
                block_forever=callee_blocks_forever,
 | 
			
		||||
            ) as (ctx, sent):
 | 
			
		||||
 | 
			
		||||
                assert sent == 11
 | 
			
		||||
 | 
			
		||||
                await portal.run(assert_state, value=True)
 | 
			
		||||
                if callee_blocks_forever:
 | 
			
		||||
                    await portal.run(assert_state, value=True)
 | 
			
		||||
                    await ctx.cancel()
 | 
			
		||||
                else:
 | 
			
		||||
                    assert await ctx.result() == 'yo'
 | 
			
		||||
 | 
			
		||||
            # after cancellation
 | 
			
		||||
            await portal.run(assert_state, value=False)
 | 
			
		||||
| 
						 | 
				
			
			@ -94,6 +103,281 @@ def test_simple_context(error_parent):
 | 
			
		|||
        trio.run(main)
 | 
			
		||||
 | 
			
		||||
 | 
			
		||||
# basic stream terminations:
 | 
			
		||||
# - callee context closes without using stream
 | 
			
		||||
# - caller context closes without using stream
 | 
			
		||||
# - caller context calls `Context.cancel()` while streaming
 | 
			
		||||
#   is ongoing resulting in callee being cancelled
 | 
			
		||||
# - callee calls `Context.cancel()` while streaming and caller
 | 
			
		||||
#   sees stream terminated in `RemoteActorError`
 | 
			
		||||
 | 
			
		||||
# TODO: future possible features
 | 
			
		||||
# - restart request: far end raises `ContextRestart`
 | 
			
		||||
 | 
			
		||||
 | 
			
		||||
@tractor.context
 | 
			
		||||
async def close_ctx_immediately(
 | 
			
		||||
 | 
			
		||||
    ctx: tractor.Context,
 | 
			
		||||
 | 
			
		||||
) -> None:
 | 
			
		||||
 | 
			
		||||
    await ctx.started()
 | 
			
		||||
    global _state
 | 
			
		||||
 | 
			
		||||
    async with ctx.open_stream():
 | 
			
		||||
        pass
 | 
			
		||||
 | 
			
		||||
 | 
			
		||||
@tractor_test
 | 
			
		||||
async def test_callee_closes_ctx_after_stream_open():
 | 
			
		||||
    'callee context closes without using stream'
 | 
			
		||||
 | 
			
		||||
    async with tractor.open_nursery() as n:
 | 
			
		||||
 | 
			
		||||
        portal = await n.start_actor(
 | 
			
		||||
            'fast_stream_closer',
 | 
			
		||||
            enable_modules=[__name__],
 | 
			
		||||
        )
 | 
			
		||||
 | 
			
		||||
        async with portal.open_context(
 | 
			
		||||
            close_ctx_immediately,
 | 
			
		||||
 | 
			
		||||
            # flag to avoid waiting the final result
 | 
			
		||||
            # cancel_on_exit=True,
 | 
			
		||||
 | 
			
		||||
        ) as (ctx, sent):
 | 
			
		||||
 | 
			
		||||
            assert sent is None
 | 
			
		||||
 | 
			
		||||
            with trio.fail_after(0.5):
 | 
			
		||||
                async with ctx.open_stream() as stream:
 | 
			
		||||
 | 
			
		||||
                    # should fall through since ``StopAsyncIteration``
 | 
			
		||||
                    # should be raised through translation of
 | 
			
		||||
                    # a ``trio.EndOfChannel`` by
 | 
			
		||||
                    # ``trio.abc.ReceiveChannel.__anext__()``
 | 
			
		||||
                    async for _ in stream:
 | 
			
		||||
                        assert 0
 | 
			
		||||
                    else:
 | 
			
		||||
 | 
			
		||||
                        # verify stream is now closed
 | 
			
		||||
                        try:
 | 
			
		||||
                            await stream.receive()
 | 
			
		||||
                        except trio.EndOfChannel:
 | 
			
		||||
                            pass
 | 
			
		||||
 | 
			
		||||
            # TODO: should be just raise the closed resource err
 | 
			
		||||
            # directly here to enforce not allowing a re-open
 | 
			
		||||
            # of a stream to the context (at least until a time of
 | 
			
		||||
            # if/when we decide that's a good idea?)
 | 
			
		||||
            try:
 | 
			
		||||
                async with ctx.open_stream() as stream:
 | 
			
		||||
                    pass
 | 
			
		||||
            except trio.ClosedResourceError:
 | 
			
		||||
                pass
 | 
			
		||||
 | 
			
		||||
        await portal.cancel_actor()
 | 
			
		||||
 | 
			
		||||
 | 
			
		||||
@tractor.context
 | 
			
		||||
async def expect_cancelled(
 | 
			
		||||
 | 
			
		||||
    ctx: tractor.Context,
 | 
			
		||||
 | 
			
		||||
) -> None:
 | 
			
		||||
    global _state
 | 
			
		||||
    _state = True
 | 
			
		||||
 | 
			
		||||
    await ctx.started()
 | 
			
		||||
 | 
			
		||||
    try:
 | 
			
		||||
        async with ctx.open_stream() as stream:
 | 
			
		||||
            async for msg in stream:
 | 
			
		||||
                await stream.send(msg)  # echo server
 | 
			
		||||
 | 
			
		||||
    except trio.Cancelled:
 | 
			
		||||
        # expected case
 | 
			
		||||
        _state = False
 | 
			
		||||
        raise
 | 
			
		||||
 | 
			
		||||
    else:
 | 
			
		||||
        assert 0, "Wasn't cancelled!?"
 | 
			
		||||
 | 
			
		||||
 | 
			
		||||
@pytest.mark.parametrize(
 | 
			
		||||
    'use_ctx_cancel_method',
 | 
			
		||||
    [False, True],
 | 
			
		||||
)
 | 
			
		||||
@tractor_test
 | 
			
		||||
async def test_caller_closes_ctx_after_callee_opens_stream(
 | 
			
		||||
    use_ctx_cancel_method: bool,
 | 
			
		||||
):
 | 
			
		||||
    'caller context closes without using stream'
 | 
			
		||||
 | 
			
		||||
    async with tractor.open_nursery() as n:
 | 
			
		||||
 | 
			
		||||
        portal = await n.start_actor(
 | 
			
		||||
            'ctx_cancelled',
 | 
			
		||||
            enable_modules=[__name__],
 | 
			
		||||
        )
 | 
			
		||||
 | 
			
		||||
        async with portal.open_context(
 | 
			
		||||
            expect_cancelled,
 | 
			
		||||
        ) as (ctx, sent):
 | 
			
		||||
            await portal.run(assert_state, value=True)
 | 
			
		||||
 | 
			
		||||
            assert sent is None
 | 
			
		||||
 | 
			
		||||
            # call cancel explicitly
 | 
			
		||||
            if use_ctx_cancel_method:
 | 
			
		||||
 | 
			
		||||
                await ctx.cancel()
 | 
			
		||||
 | 
			
		||||
                try:
 | 
			
		||||
                    async with ctx.open_stream() as stream:
 | 
			
		||||
                        async for msg in stream:
 | 
			
		||||
                            pass
 | 
			
		||||
 | 
			
		||||
                except tractor.ContextCancelled:
 | 
			
		||||
                    raise  # XXX: must be propagated to __aexit__
 | 
			
		||||
 | 
			
		||||
                else:
 | 
			
		||||
                    assert 0, "Should have context cancelled?"
 | 
			
		||||
 | 
			
		||||
                # channel should still be up
 | 
			
		||||
                assert portal.channel.connected()
 | 
			
		||||
 | 
			
		||||
                # ctx is closed here
 | 
			
		||||
                await portal.run(assert_state, value=False)
 | 
			
		||||
 | 
			
		||||
            else:
 | 
			
		||||
                try:
 | 
			
		||||
                    with trio.fail_after(0.2):
 | 
			
		||||
                        await ctx.result()
 | 
			
		||||
                        assert 0, "Callee should have blocked!?"
 | 
			
		||||
                except trio.TooSlowError:
 | 
			
		||||
                    await ctx.cancel()
 | 
			
		||||
        try:
 | 
			
		||||
            async with ctx.open_stream() as stream:
 | 
			
		||||
                async for msg in stream:
 | 
			
		||||
                    pass
 | 
			
		||||
        except trio.ClosedResourceError:
 | 
			
		||||
            pass
 | 
			
		||||
        else:
 | 
			
		||||
            assert 0, "Should have received closed resource error?"
 | 
			
		||||
 | 
			
		||||
        # ctx is closed here
 | 
			
		||||
        await portal.run(assert_state, value=False)
 | 
			
		||||
 | 
			
		||||
        # channel should not have been destroyed yet, only the
 | 
			
		||||
        # inter-actor-task context
 | 
			
		||||
        assert portal.channel.connected()
 | 
			
		||||
 | 
			
		||||
        # teardown the actor
 | 
			
		||||
        await portal.cancel_actor()
 | 
			
		||||
 | 
			
		||||
 | 
			
		||||
@tractor_test
 | 
			
		||||
async def test_multitask_caller_cancels_from_nonroot_task():
 | 
			
		||||
 | 
			
		||||
    async with tractor.open_nursery() as n:
 | 
			
		||||
 | 
			
		||||
        portal = await n.start_actor(
 | 
			
		||||
            'ctx_cancelled',
 | 
			
		||||
            enable_modules=[__name__],
 | 
			
		||||
        )
 | 
			
		||||
 | 
			
		||||
        async with portal.open_context(
 | 
			
		||||
            expect_cancelled,
 | 
			
		||||
        ) as (ctx, sent):
 | 
			
		||||
 | 
			
		||||
            await portal.run(assert_state, value=True)
 | 
			
		||||
            assert sent is None
 | 
			
		||||
 | 
			
		||||
            async with ctx.open_stream() as stream:
 | 
			
		||||
 | 
			
		||||
                async def send_msg_then_cancel():
 | 
			
		||||
                    await stream.send('yo')
 | 
			
		||||
                    await portal.run(assert_state, value=True)
 | 
			
		||||
                    await ctx.cancel()
 | 
			
		||||
                    await portal.run(assert_state, value=False)
 | 
			
		||||
 | 
			
		||||
                async with trio.open_nursery() as n:
 | 
			
		||||
                    n.start_soon(send_msg_then_cancel)
 | 
			
		||||
 | 
			
		||||
                    try:
 | 
			
		||||
                        async for msg in stream:
 | 
			
		||||
                            assert msg == 'yo'
 | 
			
		||||
 | 
			
		||||
                    except tractor.ContextCancelled:
 | 
			
		||||
                        raise  # XXX: must be propagated to __aexit__
 | 
			
		||||
 | 
			
		||||
                # channel should still be up
 | 
			
		||||
                assert portal.channel.connected()
 | 
			
		||||
 | 
			
		||||
                # ctx is closed here
 | 
			
		||||
                await portal.run(assert_state, value=False)
 | 
			
		||||
 | 
			
		||||
        # channel should not have been destroyed yet, only the
 | 
			
		||||
        # inter-actor-task context
 | 
			
		||||
        assert portal.channel.connected()
 | 
			
		||||
 | 
			
		||||
        # teardown the actor
 | 
			
		||||
        await portal.cancel_actor()
 | 
			
		||||
 | 
			
		||||
 | 
			
		||||
@tractor.context
 | 
			
		||||
async def cancel_self(
 | 
			
		||||
 | 
			
		||||
    ctx: tractor.Context,
 | 
			
		||||
 | 
			
		||||
) -> None:
 | 
			
		||||
    global _state
 | 
			
		||||
    _state = True
 | 
			
		||||
 | 
			
		||||
    await ctx.cancel()
 | 
			
		||||
    try:
 | 
			
		||||
        with trio.fail_after(0.1):
 | 
			
		||||
            await trio.sleep_forever()
 | 
			
		||||
 | 
			
		||||
    except trio.Cancelled:
 | 
			
		||||
        raise
 | 
			
		||||
 | 
			
		||||
    except trio.TooSlowError:
 | 
			
		||||
        # should never get here
 | 
			
		||||
        assert 0
 | 
			
		||||
 | 
			
		||||
 | 
			
		||||
@tractor_test
 | 
			
		||||
async def test_callee_cancels_before_started():
 | 
			
		||||
    '''callee calls `Context.cancel()` while streaming and caller
 | 
			
		||||
    sees stream terminated in `ContextCancelled`.
 | 
			
		||||
 | 
			
		||||
    '''
 | 
			
		||||
    async with tractor.open_nursery() as n:
 | 
			
		||||
 | 
			
		||||
        portal = await n.start_actor(
 | 
			
		||||
            'cancels_self',
 | 
			
		||||
            enable_modules=[__name__],
 | 
			
		||||
        )
 | 
			
		||||
        try:
 | 
			
		||||
 | 
			
		||||
            async with portal.open_context(
 | 
			
		||||
                cancel_self,
 | 
			
		||||
            ) as (ctx, sent):
 | 
			
		||||
                async with ctx.open_stream():
 | 
			
		||||
 | 
			
		||||
                    await trio.sleep_forever()
 | 
			
		||||
 | 
			
		||||
        # raises a special cancel signal
 | 
			
		||||
        except tractor.ContextCancelled as ce:
 | 
			
		||||
            ce.type == trio.Cancelled
 | 
			
		||||
 | 
			
		||||
        # teardown the actor
 | 
			
		||||
        await portal.cancel_actor()
 | 
			
		||||
 | 
			
		||||
 | 
			
		||||
@tractor.context
 | 
			
		||||
async def simple_rpc(
 | 
			
		||||
 | 
			
		||||
| 
						 | 
				
			
			@ -207,6 +491,8 @@ def test_simple_rpc(server_func, use_async_for):
 | 
			
		|||
 | 
			
		||||
                # stream should terminate here
 | 
			
		||||
 | 
			
		||||
            # final context result(s) should be consumed here in __aexit__()
 | 
			
		||||
 | 
			
		||||
            await portal.cancel_actor()
 | 
			
		||||
 | 
			
		||||
    trio.run(main)
 | 
			
		||||
| 
						 | 
				
			
			
 | 
			
		|||
		Loading…
	
		Reference in New Issue