Extend ctx semantics suite for streaming edge cases!
Muchas grax to @guilledk for finding the first issue which kicked of this further scrutiny of the `tractor.Context` and `MsgStream` semantics test suite with a strange edge case where, - if the parent opened and immediately closed a stream while the remote child task started and continued (without terminating) to send msgs the parent's `open_context().__aexit__()` would **not block** on the child to complete! => this was seemingly due to a bug discovered inside the `.msg._ops.drain_to_final_msg()` stream handling case logic where we are NOT checking if `Context._stream` is non-`None`! As such this, - extends the `test_caller_closes_ctx_after_callee_opens_stream` (now renamed, see below) to include cases for all combinations of the child and parent sending before receiving on the stream as well as all placements of `Context.cancel()` in the parent before, around and after the stream open. - uses the new `expect_ctxc()` for expecting the taskc (`trio.Task` cancelled)` cases. - also extends the `test_callee_closes_ctx_after_stream_open` (also renamed) to include the case where the parent sends a msg before it receives. => this case has unveiled yet-another-bug where somehow the underlying `MsgStream._rx_chan: trio.ReceiveMemoryChannel` is allowing the child's `Return[None]` msg be consumed and NOT in a place where it is correctly set as `Context._result` resulting in the parent hanging forever inside `._ops.drain_to_final_msg()`.. Alongside, - start renaming using the new "remote-task-peer-side" semantics throughout the test module: "caller" -> "parent", "callee" -> "child".ext_type_plds_XPS_BACKUP
							parent
							
								
									d1abe4da44
								
							
						
					
					
						commit
						f0561fc8c0
					
				|  | @ -443,7 +443,6 @@ def test_caller_cancels( | |||
| 
 | ||||
| @tractor.context | ||||
| async def close_ctx_immediately( | ||||
| 
 | ||||
|     ctx: Context, | ||||
| 
 | ||||
| ) -> None: | ||||
|  | @ -454,10 +453,21 @@ async def close_ctx_immediately( | |||
|     async with ctx.open_stream(): | ||||
|         pass | ||||
| 
 | ||||
|     print('child returning!') | ||||
| 
 | ||||
| 
 | ||||
| @pytest.mark.parametrize( | ||||
|     'parent_send_before_receive', | ||||
|     [ | ||||
|         False, | ||||
|         True, | ||||
|     ], | ||||
|     ids=lambda item: f'child_send_before_receive={item}' | ||||
| ) | ||||
| @tractor_test | ||||
| async def test_callee_closes_ctx_after_stream_open( | ||||
| async def test_child_exits_ctx_after_stream_open( | ||||
|     debug_mode: bool, | ||||
|     parent_send_before_receive: bool, | ||||
| ): | ||||
|     ''' | ||||
|     callee context closes without using stream. | ||||
|  | @ -474,6 +484,15 @@ async def test_callee_closes_ctx_after_stream_open( | |||
|     => {'stop': True, 'cid': <str>} | ||||
| 
 | ||||
|     ''' | ||||
|     timeout: float = ( | ||||
|         0.5 if ( | ||||
|             not debug_mode | ||||
|             # NOTE, for debugging final | ||||
|             # Return-consumed-n-discarded-ishue! | ||||
|             # and | ||||
|             # not parent_send_before_receive | ||||
|         ) else 999 | ||||
|     ) | ||||
|     async with tractor.open_nursery( | ||||
|         debug_mode=debug_mode, | ||||
|     ) as an: | ||||
|  | @ -482,7 +501,7 @@ async def test_callee_closes_ctx_after_stream_open( | |||
|             enable_modules=[__name__], | ||||
|         ) | ||||
| 
 | ||||
|         with trio.fail_after(0.5): | ||||
|         with trio.fail_after(timeout): | ||||
|             async with portal.open_context( | ||||
|                 close_ctx_immediately, | ||||
| 
 | ||||
|  | @ -494,41 +513,56 @@ async def test_callee_closes_ctx_after_stream_open( | |||
| 
 | ||||
|                 with trio.fail_after(0.4): | ||||
|                     async with ctx.open_stream() as stream: | ||||
|                         if parent_send_before_receive: | ||||
|                             print('sending first msg from parent!') | ||||
|                             await stream.send('yo') | ||||
| 
 | ||||
|                         # should fall through since ``StopAsyncIteration`` | ||||
|                         # should be raised through translation of | ||||
|                         # a ``trio.EndOfChannel`` by | ||||
|                         # ``trio.abc.ReceiveChannel.__anext__()`` | ||||
|                         async for _ in stream: | ||||
|                         msg = 10 | ||||
|                         async for msg in stream: | ||||
|                             # trigger failure if we DO NOT | ||||
|                             # get an EOC! | ||||
|                             assert 0 | ||||
|                         else: | ||||
|                             # never should get anythinig new from | ||||
|                             # the underlying stream | ||||
|                             assert msg == 10 | ||||
| 
 | ||||
|                             # verify stream is now closed | ||||
|                             try: | ||||
|                                 with trio.fail_after(0.3): | ||||
|                                     print('parent trying to `.receive()` on EoC stream!') | ||||
|                                     await stream.receive() | ||||
|                                     assert 0, 'should have raised eoc!?' | ||||
|                             except trio.EndOfChannel: | ||||
|                                 print('parent got EoC as expected!') | ||||
|                                 pass | ||||
|                                 # raise | ||||
| 
 | ||||
|                 # TODO: should be just raise the closed resource err | ||||
|                 # directly here to enforce not allowing a re-open | ||||
|                 # of a stream to the context (at least until a time of | ||||
|                 # if/when we decide that's a good idea?) | ||||
|                 try: | ||||
|                     with trio.fail_after(0.5): | ||||
|                     with trio.fail_after(timeout): | ||||
|                         async with ctx.open_stream() as stream: | ||||
|                             pass | ||||
|                 except trio.ClosedResourceError: | ||||
|                     pass | ||||
| 
 | ||||
|                 # if ctx._rx_chan._state.data: | ||||
|                 #     await tractor.pause() | ||||
| 
 | ||||
|         await portal.cancel_actor() | ||||
| 
 | ||||
| 
 | ||||
| @tractor.context | ||||
| async def expect_cancelled( | ||||
|     ctx: Context, | ||||
|     send_before_receive: bool = False, | ||||
| 
 | ||||
| ) -> None: | ||||
|     global _state | ||||
|  | @ -538,6 +572,10 @@ async def expect_cancelled( | |||
| 
 | ||||
|     try: | ||||
|         async with ctx.open_stream() as stream: | ||||
| 
 | ||||
|             if send_before_receive: | ||||
|                 await stream.send('yo') | ||||
| 
 | ||||
|             async for msg in stream: | ||||
|                 await stream.send(msg)  # echo server | ||||
| 
 | ||||
|  | @ -567,23 +605,46 @@ async def expect_cancelled( | |||
|         assert 0, "callee wasn't cancelled !?" | ||||
| 
 | ||||
| 
 | ||||
| @pytest.mark.parametrize( | ||||
|     'child_send_before_receive', | ||||
|     [ | ||||
|         False, | ||||
|         True, | ||||
|     ], | ||||
|     ids=lambda item: f'child_send_before_receive={item}' | ||||
| ) | ||||
| @pytest.mark.parametrize( | ||||
|     'rent_wait_for_msg', | ||||
|     [ | ||||
|         False, | ||||
|         True, | ||||
|     ], | ||||
|     ids=lambda item: f'rent_wait_for_msg={item}' | ||||
| ) | ||||
| @pytest.mark.parametrize( | ||||
|     'use_ctx_cancel_method', | ||||
|     [False, True], | ||||
|     [ | ||||
|         False, | ||||
|         'pre_stream', | ||||
|         'post_stream_open', | ||||
|         'post_stream_close', | ||||
|     ], | ||||
|     ids=lambda item: f'use_ctx_cancel_method={item}' | ||||
| ) | ||||
| @tractor_test | ||||
| async def test_caller_closes_ctx_after_callee_opens_stream( | ||||
|     use_ctx_cancel_method: bool, | ||||
| async def test_parent_exits_ctx_after_child_enters_stream( | ||||
|     use_ctx_cancel_method: bool|str, | ||||
|     debug_mode: bool, | ||||
|     rent_wait_for_msg: bool, | ||||
|     child_send_before_receive: bool, | ||||
| ): | ||||
|     ''' | ||||
|     caller context closes without using/opening stream | ||||
|     Parent-side of IPC context closes without sending on `MsgStream`. | ||||
| 
 | ||||
|     ''' | ||||
|     async with tractor.open_nursery( | ||||
|         debug_mode=debug_mode, | ||||
|     ) as an: | ||||
| 
 | ||||
|         root: Actor = current_actor() | ||||
|         portal = await an.start_actor( | ||||
|             'ctx_cancelled', | ||||
|  | @ -592,41 +653,52 @@ async def test_caller_closes_ctx_after_callee_opens_stream( | |||
| 
 | ||||
|         async with portal.open_context( | ||||
|             expect_cancelled, | ||||
|             send_before_receive=child_send_before_receive, | ||||
|         ) as (ctx, sent): | ||||
|             assert sent is None | ||||
| 
 | ||||
|             await portal.run(assert_state, value=True) | ||||
| 
 | ||||
|             # call `ctx.cancel()` explicitly | ||||
|             if use_ctx_cancel_method: | ||||
|             if use_ctx_cancel_method == 'pre_stream': | ||||
|                 await ctx.cancel() | ||||
| 
 | ||||
|                 # NOTE: means the local side `ctx._scope` will | ||||
|                 # have been cancelled by an ctxc ack and thus | ||||
|                 # `._scope.cancelled_caught` should be set. | ||||
|                 try: | ||||
|                 async with ( | ||||
|                     expect_ctxc( | ||||
|                         # XXX: the cause is US since we call | ||||
|                         # `Context.cancel()` just above! | ||||
|                         yay=True, | ||||
| 
 | ||||
|                         # XXX: must be propagated to __aexit__ | ||||
|                         # and should be silently absorbed there | ||||
|                         # since we called `.cancel()` just above ;) | ||||
|                         reraise=True, | ||||
|                     ) as maybe_ctxc, | ||||
|                 ): | ||||
|                     async with ctx.open_stream() as stream: | ||||
|                         async for msg in stream: | ||||
|                             pass | ||||
| 
 | ||||
|                 except tractor.ContextCancelled as ctxc: | ||||
|                     # XXX: the cause is US since we call | ||||
|                     # `Context.cancel()` just above! | ||||
|                     assert ( | ||||
|                         ctxc.canceller | ||||
|                         == | ||||
|                         current_actor().uid | ||||
|                         == | ||||
|                         root.uid | ||||
|                     ) | ||||
|                         if rent_wait_for_msg: | ||||
|                             async for msg in stream: | ||||
|                                 print(f'PARENT rx: {msg!r}\n') | ||||
|                                 break | ||||
| 
 | ||||
|                     # XXX: must be propagated to __aexit__ | ||||
|                     # and should be silently absorbed there | ||||
|                     # since we called `.cancel()` just above ;) | ||||
|                     raise | ||||
|                         if use_ctx_cancel_method == 'post_stream_open': | ||||
|                             await ctx.cancel() | ||||
| 
 | ||||
|                 else: | ||||
|                     assert 0, "Should have context cancelled?" | ||||
|                     if use_ctx_cancel_method == 'post_stream_close': | ||||
|                         await ctx.cancel() | ||||
| 
 | ||||
|                 ctxc: tractor.ContextCancelled = maybe_ctxc.value | ||||
|                 assert ( | ||||
|                     ctxc.canceller | ||||
|                     == | ||||
|                     current_actor().uid | ||||
|                     == | ||||
|                     root.uid | ||||
|                 ) | ||||
| 
 | ||||
|                 # channel should still be up | ||||
|                 assert portal.channel.connected() | ||||
|  | @ -637,13 +709,20 @@ async def test_caller_closes_ctx_after_callee_opens_stream( | |||
|                     value=False, | ||||
|                 ) | ||||
| 
 | ||||
|             # XXX CHILD-BLOCKS case, we SHOULD NOT exit from the | ||||
|             # `.open_context()` before the child has returned, | ||||
|             # errored or been cancelled! | ||||
|             else: | ||||
|                 try: | ||||
|                     with trio.fail_after(0.2): | ||||
|                         await ctx.result() | ||||
|                     with trio.fail_after( | ||||
|                         0.5  # if not debug_mode else 999 | ||||
|                     ): | ||||
|                         res = await ctx.wait_for_result() | ||||
|                         assert res is not tractor._context.Unresolved | ||||
|                         assert 0, "Callee should have blocked!?" | ||||
|                 except trio.TooSlowError: | ||||
|                     # NO-OP -> since already called above | ||||
|                     # NO-OP -> since already triggered by | ||||
|                     # `trio.fail_after()` above! | ||||
|                     await ctx.cancel() | ||||
| 
 | ||||
|         # NOTE: local scope should have absorbed the cancellation since | ||||
|  | @ -683,7 +762,7 @@ async def test_caller_closes_ctx_after_callee_opens_stream( | |||
| 
 | ||||
| 
 | ||||
| @tractor_test | ||||
| async def test_multitask_caller_cancels_from_nonroot_task( | ||||
| async def test_multitask_parent_cancels_from_nonroot_task( | ||||
|     debug_mode: bool, | ||||
| ): | ||||
|     async with tractor.open_nursery( | ||||
|  | @ -735,7 +814,6 @@ async def test_multitask_caller_cancels_from_nonroot_task( | |||
| 
 | ||||
| @tractor.context | ||||
| async def cancel_self( | ||||
| 
 | ||||
|     ctx: Context, | ||||
| 
 | ||||
| ) -> None: | ||||
|  | @ -775,7 +853,7 @@ async def cancel_self( | |||
| 
 | ||||
| 
 | ||||
| @tractor_test | ||||
| async def test_callee_cancels_before_started( | ||||
| async def test_child_cancels_before_started( | ||||
|     debug_mode: bool, | ||||
| ): | ||||
|     ''' | ||||
|  | @ -826,8 +904,7 @@ async def never_open_stream( | |||
| 
 | ||||
| 
 | ||||
| @tractor.context | ||||
| async def keep_sending_from_callee( | ||||
| 
 | ||||
| async def keep_sending_from_child( | ||||
|     ctx:  Context, | ||||
|     msg_buffer_size: int|None = None, | ||||
| 
 | ||||
|  | @ -850,7 +927,7 @@ async def keep_sending_from_callee( | |||
|     'overrun_by', | ||||
|     [ | ||||
|         ('caller', 1, never_open_stream), | ||||
|         ('callee', 0, keep_sending_from_callee), | ||||
|         ('callee', 0, keep_sending_from_child), | ||||
|     ], | ||||
|     ids=[ | ||||
|          ('caller_1buf_never_open_stream'), | ||||
|  | @ -931,8 +1008,7 @@ def test_one_end_stream_not_opened( | |||
| 
 | ||||
| @tractor.context | ||||
| async def echo_back_sequence( | ||||
| 
 | ||||
|     ctx:  Context, | ||||
|     ctx: Context, | ||||
|     seq: list[int], | ||||
|     wait_for_cancel: bool, | ||||
|     allow_overruns_side: str, | ||||
|  |  | |||
		Loading…
	
		Reference in New Issue