Add detailed ``@tractor.context`` cancellation/termination tests
							parent
							
								
									b63ccf0007
								
							
						
					
					
						commit
						11d471a4cd
					
				|  | @ -6,20 +6,13 @@ import pytest | ||||||
| import trio | import trio | ||||||
| import tractor | import tractor | ||||||
| 
 | 
 | ||||||
| # from conftest import tractor_test | from conftest import tractor_test | ||||||
| 
 | 
 | ||||||
| # TODO: test endofchannel semantics / cancellation / error cases: | # the general stream semantics are | ||||||
| # 3 possible outcomes: | # - normal termination: far end relays a stop message which | ||||||
| # - normal termination: far end relays a stop message with | # terminates an ongoing ``MsgStream`` iteration | ||||||
| # final value as in async gen from ``return <val>``. | # - cancel termination: context is cancelled on either side cancelling | ||||||
| 
 | #  the "linked" inter-actor task context | ||||||
| # possible outcomes: |  | ||||||
| # - normal termination: far end returns |  | ||||||
| # - premature close: far end relays a stop message to tear down stream |  | ||||||
| # - cancel: far end raises `ContextCancelled` |  | ||||||
| 
 |  | ||||||
| # future possible outcomes |  | ||||||
| # - restart request: far end raises `ContextRestart` |  | ||||||
| 
 | 
 | ||||||
| 
 | 
 | ||||||
| _state: bool = False | _state: bool = False | ||||||
|  | @ -30,6 +23,7 @@ async def simple_setup_teardown( | ||||||
| 
 | 
 | ||||||
|     ctx: tractor.Context, |     ctx: tractor.Context, | ||||||
|     data: int, |     data: int, | ||||||
|  |     block_forever: bool = False, | ||||||
| 
 | 
 | ||||||
| ) -> None: | ) -> None: | ||||||
| 
 | 
 | ||||||
|  | @ -41,8 +35,11 @@ async def simple_setup_teardown( | ||||||
|     await ctx.started(data + 1) |     await ctx.started(data + 1) | ||||||
| 
 | 
 | ||||||
|     try: |     try: | ||||||
|  |         if block_forever: | ||||||
|             # block until cancelled |             # block until cancelled | ||||||
|             await trio.sleep_forever() |             await trio.sleep_forever() | ||||||
|  |         else: | ||||||
|  |             return 'yo' | ||||||
|     finally: |     finally: | ||||||
|         _state = False |         _state = False | ||||||
| 
 | 
 | ||||||
|  | @ -56,7 +53,14 @@ async def assert_state(value: bool): | ||||||
|     'error_parent', |     'error_parent', | ||||||
|     [False, True], |     [False, True], | ||||||
| ) | ) | ||||||
| def test_simple_context(error_parent): | @pytest.mark.parametrize( | ||||||
|  |     'callee_blocks_forever', | ||||||
|  |     [False, True], | ||||||
|  | ) | ||||||
|  | def test_simple_context( | ||||||
|  |     error_parent, | ||||||
|  |     callee_blocks_forever, | ||||||
|  | ): | ||||||
| 
 | 
 | ||||||
|     async def main(): |     async def main(): | ||||||
| 
 | 
 | ||||||
|  | @ -70,11 +74,16 @@ def test_simple_context(error_parent): | ||||||
|             async with portal.open_context( |             async with portal.open_context( | ||||||
|                 simple_setup_teardown, |                 simple_setup_teardown, | ||||||
|                 data=10, |                 data=10, | ||||||
|  |                 block_forever=callee_blocks_forever, | ||||||
|             ) as (ctx, sent): |             ) as (ctx, sent): | ||||||
| 
 | 
 | ||||||
|                 assert sent == 11 |                 assert sent == 11 | ||||||
| 
 | 
 | ||||||
|  |                 if callee_blocks_forever: | ||||||
|                     await portal.run(assert_state, value=True) |                     await portal.run(assert_state, value=True) | ||||||
|  |                     await ctx.cancel() | ||||||
|  |                 else: | ||||||
|  |                     assert await ctx.result() == 'yo' | ||||||
| 
 | 
 | ||||||
|             # after cancellation |             # after cancellation | ||||||
|             await portal.run(assert_state, value=False) |             await portal.run(assert_state, value=False) | ||||||
|  | @ -94,6 +103,281 @@ def test_simple_context(error_parent): | ||||||
|         trio.run(main) |         trio.run(main) | ||||||
| 
 | 
 | ||||||
| 
 | 
 | ||||||
|  | # basic stream terminations: | ||||||
|  | # - callee context closes without using stream | ||||||
|  | # - caller context closes without using stream | ||||||
|  | # - caller context calls `Context.cancel()` while streaming | ||||||
|  | #   is ongoing resulting in callee being cancelled | ||||||
|  | # - callee calls `Context.cancel()` while streaming and caller | ||||||
|  | #   sees stream terminated in `RemoteActorError` | ||||||
|  | 
 | ||||||
|  | # TODO: future possible features | ||||||
|  | # - restart request: far end raises `ContextRestart` | ||||||
|  | 
 | ||||||
|  | 
 | ||||||
|  | @tractor.context | ||||||
|  | async def close_ctx_immediately( | ||||||
|  | 
 | ||||||
|  |     ctx: tractor.Context, | ||||||
|  | 
 | ||||||
|  | ) -> None: | ||||||
|  | 
 | ||||||
|  |     await ctx.started() | ||||||
|  |     global _state | ||||||
|  | 
 | ||||||
|  |     async with ctx.open_stream(): | ||||||
|  |         pass | ||||||
|  | 
 | ||||||
|  | 
 | ||||||
|  | @tractor_test | ||||||
|  | async def test_callee_closes_ctx_after_stream_open(): | ||||||
|  |     'callee context closes without using stream' | ||||||
|  | 
 | ||||||
|  |     async with tractor.open_nursery() as n: | ||||||
|  | 
 | ||||||
|  |         portal = await n.start_actor( | ||||||
|  |             'fast_stream_closer', | ||||||
|  |             enable_modules=[__name__], | ||||||
|  |         ) | ||||||
|  | 
 | ||||||
|  |         async with portal.open_context( | ||||||
|  |             close_ctx_immediately, | ||||||
|  | 
 | ||||||
|  |             # flag to avoid waiting the final result | ||||||
|  |             # cancel_on_exit=True, | ||||||
|  | 
 | ||||||
|  |         ) as (ctx, sent): | ||||||
|  | 
 | ||||||
|  |             assert sent is None | ||||||
|  | 
 | ||||||
|  |             with trio.fail_after(0.5): | ||||||
|  |                 async with ctx.open_stream() as stream: | ||||||
|  | 
 | ||||||
|  |                     # should fall through since ``StopAsyncIteration`` | ||||||
|  |                     # should be raised through translation of | ||||||
|  |                     # a ``trio.EndOfChannel`` by | ||||||
|  |                     # ``trio.abc.ReceiveChannel.__anext__()`` | ||||||
|  |                     async for _ in stream: | ||||||
|  |                         assert 0 | ||||||
|  |                     else: | ||||||
|  | 
 | ||||||
|  |                         # verify stream is now closed | ||||||
|  |                         try: | ||||||
|  |                             await stream.receive() | ||||||
|  |                         except trio.EndOfChannel: | ||||||
|  |                             pass | ||||||
|  | 
 | ||||||
|  |             # TODO: should be just raise the closed resource err | ||||||
|  |             # directly here to enforce not allowing a re-open | ||||||
|  |             # of a stream to the context (at least until a time of | ||||||
|  |             # if/when we decide that's a good idea?) | ||||||
|  |             try: | ||||||
|  |                 async with ctx.open_stream() as stream: | ||||||
|  |                     pass | ||||||
|  |             except trio.ClosedResourceError: | ||||||
|  |                 pass | ||||||
|  | 
 | ||||||
|  |         await portal.cancel_actor() | ||||||
|  | 
 | ||||||
|  | 
 | ||||||
|  | @tractor.context | ||||||
|  | async def expect_cancelled( | ||||||
|  | 
 | ||||||
|  |     ctx: tractor.Context, | ||||||
|  | 
 | ||||||
|  | ) -> None: | ||||||
|  |     global _state | ||||||
|  |     _state = True | ||||||
|  | 
 | ||||||
|  |     await ctx.started() | ||||||
|  | 
 | ||||||
|  |     try: | ||||||
|  |         async with ctx.open_stream() as stream: | ||||||
|  |             async for msg in stream: | ||||||
|  |                 await stream.send(msg)  # echo server | ||||||
|  | 
 | ||||||
|  |     except trio.Cancelled: | ||||||
|  |         # expected case | ||||||
|  |         _state = False | ||||||
|  |         raise | ||||||
|  | 
 | ||||||
|  |     else: | ||||||
|  |         assert 0, "Wasn't cancelled!?" | ||||||
|  | 
 | ||||||
|  | 
 | ||||||
|  | @pytest.mark.parametrize( | ||||||
|  |     'use_ctx_cancel_method', | ||||||
|  |     [False, True], | ||||||
|  | ) | ||||||
|  | @tractor_test | ||||||
|  | async def test_caller_closes_ctx_after_callee_opens_stream( | ||||||
|  |     use_ctx_cancel_method: bool, | ||||||
|  | ): | ||||||
|  |     'caller context closes without using stream' | ||||||
|  | 
 | ||||||
|  |     async with tractor.open_nursery() as n: | ||||||
|  | 
 | ||||||
|  |         portal = await n.start_actor( | ||||||
|  |             'ctx_cancelled', | ||||||
|  |             enable_modules=[__name__], | ||||||
|  |         ) | ||||||
|  | 
 | ||||||
|  |         async with portal.open_context( | ||||||
|  |             expect_cancelled, | ||||||
|  |         ) as (ctx, sent): | ||||||
|  |             await portal.run(assert_state, value=True) | ||||||
|  | 
 | ||||||
|  |             assert sent is None | ||||||
|  | 
 | ||||||
|  |             # call cancel explicitly | ||||||
|  |             if use_ctx_cancel_method: | ||||||
|  | 
 | ||||||
|  |                 await ctx.cancel() | ||||||
|  | 
 | ||||||
|  |                 try: | ||||||
|  |                     async with ctx.open_stream() as stream: | ||||||
|  |                         async for msg in stream: | ||||||
|  |                             pass | ||||||
|  | 
 | ||||||
|  |                 except tractor.ContextCancelled: | ||||||
|  |                     raise  # XXX: must be propagated to __aexit__ | ||||||
|  | 
 | ||||||
|  |                 else: | ||||||
|  |                     assert 0, "Should have context cancelled?" | ||||||
|  | 
 | ||||||
|  |                 # channel should still be up | ||||||
|  |                 assert portal.channel.connected() | ||||||
|  | 
 | ||||||
|  |                 # ctx is closed here | ||||||
|  |                 await portal.run(assert_state, value=False) | ||||||
|  | 
 | ||||||
|  |             else: | ||||||
|  |                 try: | ||||||
|  |                     with trio.fail_after(0.2): | ||||||
|  |                         await ctx.result() | ||||||
|  |                         assert 0, "Callee should have blocked!?" | ||||||
|  |                 except trio.TooSlowError: | ||||||
|  |                     await ctx.cancel() | ||||||
|  |         try: | ||||||
|  |             async with ctx.open_stream() as stream: | ||||||
|  |                 async for msg in stream: | ||||||
|  |                     pass | ||||||
|  |         except trio.ClosedResourceError: | ||||||
|  |             pass | ||||||
|  |         else: | ||||||
|  |             assert 0, "Should have received closed resource error?" | ||||||
|  | 
 | ||||||
|  |         # ctx is closed here | ||||||
|  |         await portal.run(assert_state, value=False) | ||||||
|  | 
 | ||||||
|  |         # channel should not have been destroyed yet, only the | ||||||
|  |         # inter-actor-task context | ||||||
|  |         assert portal.channel.connected() | ||||||
|  | 
 | ||||||
|  |         # teardown the actor | ||||||
|  |         await portal.cancel_actor() | ||||||
|  | 
 | ||||||
|  | 
 | ||||||
|  | @tractor_test | ||||||
|  | async def test_multitask_caller_cancels_from_nonroot_task(): | ||||||
|  | 
 | ||||||
|  |     async with tractor.open_nursery() as n: | ||||||
|  | 
 | ||||||
|  |         portal = await n.start_actor( | ||||||
|  |             'ctx_cancelled', | ||||||
|  |             enable_modules=[__name__], | ||||||
|  |         ) | ||||||
|  | 
 | ||||||
|  |         async with portal.open_context( | ||||||
|  |             expect_cancelled, | ||||||
|  |         ) as (ctx, sent): | ||||||
|  | 
 | ||||||
|  |             await portal.run(assert_state, value=True) | ||||||
|  |             assert sent is None | ||||||
|  | 
 | ||||||
|  |             async with ctx.open_stream() as stream: | ||||||
|  | 
 | ||||||
|  |                 async def send_msg_then_cancel(): | ||||||
|  |                     await stream.send('yo') | ||||||
|  |                     await portal.run(assert_state, value=True) | ||||||
|  |                     await ctx.cancel() | ||||||
|  |                     await portal.run(assert_state, value=False) | ||||||
|  | 
 | ||||||
|  |                 async with trio.open_nursery() as n: | ||||||
|  |                     n.start_soon(send_msg_then_cancel) | ||||||
|  | 
 | ||||||
|  |                     try: | ||||||
|  |                         async for msg in stream: | ||||||
|  |                             assert msg == 'yo' | ||||||
|  | 
 | ||||||
|  |                     except tractor.ContextCancelled: | ||||||
|  |                         raise  # XXX: must be propagated to __aexit__ | ||||||
|  | 
 | ||||||
|  |                 # channel should still be up | ||||||
|  |                 assert portal.channel.connected() | ||||||
|  | 
 | ||||||
|  |                 # ctx is closed here | ||||||
|  |                 await portal.run(assert_state, value=False) | ||||||
|  | 
 | ||||||
|  |         # channel should not have been destroyed yet, only the | ||||||
|  |         # inter-actor-task context | ||||||
|  |         assert portal.channel.connected() | ||||||
|  | 
 | ||||||
|  |         # teardown the actor | ||||||
|  |         await portal.cancel_actor() | ||||||
|  | 
 | ||||||
|  | 
 | ||||||
|  | @tractor.context | ||||||
|  | async def cancel_self( | ||||||
|  | 
 | ||||||
|  |     ctx: tractor.Context, | ||||||
|  | 
 | ||||||
|  | ) -> None: | ||||||
|  |     global _state | ||||||
|  |     _state = True | ||||||
|  | 
 | ||||||
|  |     await ctx.cancel() | ||||||
|  |     try: | ||||||
|  |         with trio.fail_after(0.1): | ||||||
|  |             await trio.sleep_forever() | ||||||
|  | 
 | ||||||
|  |     except trio.Cancelled: | ||||||
|  |         raise | ||||||
|  | 
 | ||||||
|  |     except trio.TooSlowError: | ||||||
|  |         # should never get here | ||||||
|  |         assert 0 | ||||||
|  | 
 | ||||||
|  | 
 | ||||||
|  | @tractor_test | ||||||
|  | async def test_callee_cancels_before_started(): | ||||||
|  |     '''callee calls `Context.cancel()` while streaming and caller | ||||||
|  |     sees stream terminated in `ContextCancelled`. | ||||||
|  | 
 | ||||||
|  |     ''' | ||||||
|  |     async with tractor.open_nursery() as n: | ||||||
|  | 
 | ||||||
|  |         portal = await n.start_actor( | ||||||
|  |             'cancels_self', | ||||||
|  |             enable_modules=[__name__], | ||||||
|  |         ) | ||||||
|  |         try: | ||||||
|  | 
 | ||||||
|  |             async with portal.open_context( | ||||||
|  |                 cancel_self, | ||||||
|  |             ) as (ctx, sent): | ||||||
|  |                 async with ctx.open_stream(): | ||||||
|  | 
 | ||||||
|  |                     await trio.sleep_forever() | ||||||
|  | 
 | ||||||
|  |         # raises a special cancel signal | ||||||
|  |         except tractor.ContextCancelled as ce: | ||||||
|  |             ce.type == trio.Cancelled | ||||||
|  | 
 | ||||||
|  |         # teardown the actor | ||||||
|  |         await portal.cancel_actor() | ||||||
|  | 
 | ||||||
|  | 
 | ||||||
| @tractor.context | @tractor.context | ||||||
| async def simple_rpc( | async def simple_rpc( | ||||||
| 
 | 
 | ||||||
|  | @ -207,6 +491,8 @@ def test_simple_rpc(server_func, use_async_for): | ||||||
| 
 | 
 | ||||||
|                 # stream should terminate here |                 # stream should terminate here | ||||||
| 
 | 
 | ||||||
|  |             # final context result(s) should be consumed here in __aexit__() | ||||||
|  | 
 | ||||||
|             await portal.cancel_actor() |             await portal.cancel_actor() | ||||||
| 
 | 
 | ||||||
|     trio.run(main) |     trio.run(main) | ||||||
|  |  | ||||||
		Loading…
	
		Reference in New Issue