Complete rename to parent->child IPC ctx peers
Now changed in all comments docs **and** test-code content such that we aren't using the "caller"->"callee" semantics anymore.ext_type_plds_XPS_BACKUP
							parent
							
								
									b6b001faad
								
							
						
					
					
						commit
						434577953a
					
				|  | @ -38,9 +38,9 @@ from tractor._testing import ( | |||
| # - standard setup/teardown: | ||||
| #   ``Portal.open_context()`` starts a new | ||||
| #   remote task context in another actor. The target actor's task must | ||||
| #   call ``Context.started()`` to unblock this entry on the caller side. | ||||
| #   the callee task executes until complete and returns a final value | ||||
| #   which is delivered to the caller side and retreived via | ||||
| #   call ``Context.started()`` to unblock this entry on the parent side. | ||||
| #   the child task executes until complete and returns a final value | ||||
| #   which is delivered to the parent side and retreived via | ||||
| #   ``Context.result()``. | ||||
| 
 | ||||
| # - cancel termination: | ||||
|  | @ -170,9 +170,9 @@ async def assert_state(value: bool): | |||
|     [False, ValueError, KeyboardInterrupt], | ||||
| ) | ||||
| @pytest.mark.parametrize( | ||||
|     'callee_blocks_forever', | ||||
|     'child_blocks_forever', | ||||
|     [False, True], | ||||
|     ids=lambda item: f'callee_blocks_forever={item}' | ||||
|     ids=lambda item: f'child_blocks_forever={item}' | ||||
| ) | ||||
| @pytest.mark.parametrize( | ||||
|     'pointlessly_open_stream', | ||||
|  | @ -181,7 +181,7 @@ async def assert_state(value: bool): | |||
| ) | ||||
| def test_simple_context( | ||||
|     error_parent, | ||||
|     callee_blocks_forever, | ||||
|     child_blocks_forever, | ||||
|     pointlessly_open_stream, | ||||
|     debug_mode: bool, | ||||
| ): | ||||
|  | @ -204,13 +204,13 @@ def test_simple_context( | |||
|                         portal.open_context( | ||||
|                             simple_setup_teardown, | ||||
|                             data=10, | ||||
|                             block_forever=callee_blocks_forever, | ||||
|                             block_forever=child_blocks_forever, | ||||
|                         ) as (ctx, sent), | ||||
|                     ): | ||||
|                         assert current_ipc_ctx() is ctx | ||||
|                         assert sent == 11 | ||||
| 
 | ||||
|                         if callee_blocks_forever: | ||||
|                         if child_blocks_forever: | ||||
|                             await portal.run(assert_state, value=True) | ||||
|                         else: | ||||
|                             assert await ctx.result() == 'yo' | ||||
|  | @ -220,7 +220,7 @@ def test_simple_context( | |||
|                                 if error_parent: | ||||
|                                     raise error_parent | ||||
| 
 | ||||
|                                 if callee_blocks_forever: | ||||
|                                 if child_blocks_forever: | ||||
|                                     await ctx.cancel() | ||||
|                                 else: | ||||
|                                     # in this case the stream will send a | ||||
|  | @ -259,9 +259,9 @@ def test_simple_context( | |||
| 
 | ||||
| 
 | ||||
| @pytest.mark.parametrize( | ||||
|     'callee_returns_early', | ||||
|     'child_returns_early', | ||||
|     [True, False], | ||||
|     ids=lambda item: f'callee_returns_early={item}' | ||||
|     ids=lambda item: f'child_returns_early={item}' | ||||
| ) | ||||
| @pytest.mark.parametrize( | ||||
|     'cancel_method', | ||||
|  | @ -273,14 +273,14 @@ def test_simple_context( | |||
|     [True, False], | ||||
|     ids=lambda item: f'chk_ctx_result_before_exit={item}' | ||||
| ) | ||||
| def test_caller_cancels( | ||||
| def test_parent_cancels( | ||||
|     cancel_method: str, | ||||
|     chk_ctx_result_before_exit: bool, | ||||
|     callee_returns_early: bool, | ||||
|     child_returns_early: bool, | ||||
|     debug_mode: bool, | ||||
| ): | ||||
|     ''' | ||||
|     Verify that when the opening side of a context (aka the caller) | ||||
|     Verify that when the opening side of a context (aka the parent) | ||||
|     cancels that context, the ctx does not raise a cancelled when | ||||
|     either calling `.result()` or on context exit. | ||||
| 
 | ||||
|  | @ -294,7 +294,7 @@ def test_caller_cancels( | |||
| 
 | ||||
|         if ( | ||||
|             cancel_method == 'portal' | ||||
|             and not callee_returns_early | ||||
|             and not child_returns_early | ||||
|         ): | ||||
|             try: | ||||
|                 res = await ctx.result() | ||||
|  | @ -318,7 +318,7 @@ def test_caller_cancels( | |||
|                 pytest.fail(f'should not have raised ctxc\n{ctxc}') | ||||
| 
 | ||||
|         # we actually get a result | ||||
|         if callee_returns_early: | ||||
|         if child_returns_early: | ||||
|             assert res == 'yo' | ||||
|             assert ctx.outcome is res | ||||
|             assert ctx.maybe_error is None | ||||
|  | @ -362,14 +362,14 @@ def test_caller_cancels( | |||
|             ) | ||||
|             timeout: float = ( | ||||
|                 0.5 | ||||
|                 if not callee_returns_early | ||||
|                 if not child_returns_early | ||||
|                 else 2 | ||||
|             ) | ||||
|             with trio.fail_after(timeout): | ||||
|                 async with ( | ||||
|                     expect_ctxc( | ||||
|                         yay=( | ||||
|                             not callee_returns_early | ||||
|                             not child_returns_early | ||||
|                             and cancel_method == 'portal' | ||||
|                         ) | ||||
|                     ), | ||||
|  | @ -377,13 +377,13 @@ def test_caller_cancels( | |||
|                     portal.open_context( | ||||
|                         simple_setup_teardown, | ||||
|                         data=10, | ||||
|                         block_forever=not callee_returns_early, | ||||
|                         block_forever=not child_returns_early, | ||||
|                     ) as (ctx, sent), | ||||
|                 ): | ||||
| 
 | ||||
|                     if callee_returns_early: | ||||
|                     if child_returns_early: | ||||
|                         # ensure we block long enough before sending | ||||
|                         # a cancel such that the callee has already | ||||
|                         # a cancel such that the child has already | ||||
|                         # returned it's result. | ||||
|                         await trio.sleep(0.5) | ||||
| 
 | ||||
|  | @ -421,7 +421,7 @@ def test_caller_cancels( | |||
|             #   which should in turn cause `ctx._scope` to | ||||
|             # catch any cancellation? | ||||
|             if ( | ||||
|                 not callee_returns_early | ||||
|                 not child_returns_early | ||||
|                 and cancel_method != 'portal' | ||||
|             ): | ||||
|                 assert not ctx._scope.cancelled_caught | ||||
|  | @ -430,11 +430,11 @@ def test_caller_cancels( | |||
| 
 | ||||
| 
 | ||||
| # basic stream terminations: | ||||
| # - callee context closes without using stream | ||||
| # - caller context closes without using stream | ||||
| # - caller context calls `Context.cancel()` while streaming | ||||
| #   is ongoing resulting in callee being cancelled | ||||
| # - callee calls `Context.cancel()` while streaming and caller | ||||
| # - child context closes without using stream | ||||
| # - parent context closes without using stream | ||||
| # - parent context calls `Context.cancel()` while streaming | ||||
| #   is ongoing resulting in child being cancelled | ||||
| # - child calls `Context.cancel()` while streaming and parent | ||||
| #   sees stream terminated in `RemoteActorError` | ||||
| 
 | ||||
| # TODO: future possible features | ||||
|  | @ -470,7 +470,7 @@ async def test_child_exits_ctx_after_stream_open( | |||
|     parent_send_before_receive: bool, | ||||
| ): | ||||
|     ''' | ||||
|     callee context closes without using stream. | ||||
|     child context closes without using stream. | ||||
| 
 | ||||
|     This should result in a msg sequence | ||||
|     |_<root>_ | ||||
|  | @ -485,13 +485,7 @@ async def test_child_exits_ctx_after_stream_open( | |||
| 
 | ||||
|     ''' | ||||
|     timeout: float = ( | ||||
|         0.5 if ( | ||||
|             not debug_mode | ||||
|             # NOTE, for debugging final | ||||
|             # Return-consumed-n-discarded-ishue! | ||||
|             # and | ||||
|             # not parent_send_before_receive | ||||
|         ) else 999 | ||||
|         0.5 if not debug_mode else 999 | ||||
|     ) | ||||
|     async with tractor.open_nursery( | ||||
|         debug_mode=debug_mode, | ||||
|  | @ -602,7 +596,7 @@ async def expect_cancelled( | |||
|         raise | ||||
| 
 | ||||
|     else: | ||||
|         assert 0, "callee wasn't cancelled !?" | ||||
|         assert 0, "child wasn't cancelled !?" | ||||
| 
 | ||||
| 
 | ||||
| @pytest.mark.parametrize( | ||||
|  | @ -857,7 +851,7 @@ async def test_child_cancels_before_started( | |||
|     debug_mode: bool, | ||||
| ): | ||||
|     ''' | ||||
|     Callee calls `Context.cancel()` while streaming and caller | ||||
|     Callee calls `Context.cancel()` while streaming and parent | ||||
|     sees stream terminated in `ContextCancelled`. | ||||
| 
 | ||||
|     ''' | ||||
|  | @ -910,7 +904,7 @@ async def keep_sending_from_child( | |||
| 
 | ||||
| ) -> None: | ||||
|     ''' | ||||
|     Send endlessly on the calleee stream. | ||||
|     Send endlessly on the child stream. | ||||
| 
 | ||||
|     ''' | ||||
|     await ctx.started() | ||||
|  | @ -918,7 +912,7 @@ async def keep_sending_from_child( | |||
|         msg_buffer_size=msg_buffer_size, | ||||
|     ) as stream: | ||||
|         for msg in count(): | ||||
|             print(f'callee sending {msg}') | ||||
|             print(f'child sending {msg}') | ||||
|             await stream.send(msg) | ||||
|             await trio.sleep(0.01) | ||||
| 
 | ||||
|  | @ -926,12 +920,12 @@ async def keep_sending_from_child( | |||
| @pytest.mark.parametrize( | ||||
|     'overrun_by', | ||||
|     [ | ||||
|         ('caller', 1, never_open_stream), | ||||
|         ('callee', 0, keep_sending_from_child), | ||||
|         ('parent', 1, never_open_stream), | ||||
|         ('child', 0, keep_sending_from_child), | ||||
|     ], | ||||
|     ids=[ | ||||
|          ('caller_1buf_never_open_stream'), | ||||
|          ('callee_0buf_keep_sending_from_callee'), | ||||
|          ('parent_1buf_never_open_stream'), | ||||
|          ('child_0buf_keep_sending_from_child'), | ||||
|     ] | ||||
| ) | ||||
| def test_one_end_stream_not_opened( | ||||
|  | @ -962,8 +956,7 @@ def test_one_end_stream_not_opened( | |||
|                 ) as (ctx, sent): | ||||
|                     assert sent is None | ||||
| 
 | ||||
|                     if 'caller' in overrunner: | ||||
| 
 | ||||
|                     if 'parent' in overrunner: | ||||
|                         async with ctx.open_stream() as stream: | ||||
| 
 | ||||
|                             # itersend +1 msg more then the buffer size | ||||
|  | @ -978,7 +971,7 @@ def test_one_end_stream_not_opened( | |||
|                                 await trio.sleep_forever() | ||||
| 
 | ||||
|                     else: | ||||
|                         # callee overruns caller case so we do nothing here | ||||
|                         # child overruns parent case so we do nothing here | ||||
|                         await trio.sleep_forever() | ||||
| 
 | ||||
|             await portal.cancel_actor() | ||||
|  | @ -986,19 +979,19 @@ def test_one_end_stream_not_opened( | |||
|     # 2 overrun cases and the no overrun case (which pushes right up to | ||||
|     # the msg limit) | ||||
|     if ( | ||||
|         overrunner == 'caller' | ||||
|         overrunner == 'parent' | ||||
|     ): | ||||
|         with pytest.raises(tractor.RemoteActorError) as excinfo: | ||||
|             trio.run(main) | ||||
| 
 | ||||
|         assert excinfo.value.boxed_type == StreamOverrun | ||||
| 
 | ||||
|     elif overrunner == 'callee': | ||||
|     elif overrunner == 'child': | ||||
|         with pytest.raises(tractor.RemoteActorError) as excinfo: | ||||
|             trio.run(main) | ||||
| 
 | ||||
|         # TODO: embedded remote errors so that we can verify the source | ||||
|         # error? the callee delivers an error which is an overrun | ||||
|         # error? the child delivers an error which is an overrun | ||||
|         # wrapped in a remote actor error. | ||||
|         assert excinfo.value.boxed_type == tractor.RemoteActorError | ||||
| 
 | ||||
|  | @ -1017,12 +1010,12 @@ async def echo_back_sequence( | |||
| 
 | ||||
| ) -> None: | ||||
|     ''' | ||||
|     Send endlessly on the calleee stream using a small buffer size | ||||
|     Send endlessly on the child stream using a small buffer size | ||||
|     setting on the contex to simulate backlogging that would normally | ||||
|     cause overruns. | ||||
| 
 | ||||
|     ''' | ||||
|     # NOTE: ensure that if the caller is expecting to cancel this task | ||||
|     # NOTE: ensure that if the parent is expecting to cancel this task | ||||
|     # that we stay echoing much longer then they are so we don't | ||||
|     # return early instead of receive the cancel msg. | ||||
|     total_batches: int = ( | ||||
|  | @ -1072,18 +1065,18 @@ async def echo_back_sequence( | |||
|                 if be_slow: | ||||
|                     await trio.sleep(0.05) | ||||
| 
 | ||||
|                 print('callee waiting on next') | ||||
|                 print('child waiting on next') | ||||
| 
 | ||||
|             print(f'callee echoing back latest batch\n{batch}') | ||||
|             print(f'child echoing back latest batch\n{batch}') | ||||
|             for msg in batch: | ||||
|                 print(f'callee sending msg\n{msg}') | ||||
|                 print(f'child sending msg\n{msg}') | ||||
|                 await stream.send(msg) | ||||
| 
 | ||||
|     try: | ||||
|         return 'yo' | ||||
|     finally: | ||||
|         print( | ||||
|             'exiting callee with context:\n' | ||||
|             'exiting child with context:\n' | ||||
|             f'{pformat(ctx)}\n' | ||||
|         ) | ||||
| 
 | ||||
|  | @ -1137,7 +1130,7 @@ def test_maybe_allow_overruns_stream( | |||
|             debug_mode=debug_mode, | ||||
|         ) as an: | ||||
|             portal = await an.start_actor( | ||||
|                 'callee_sends_forever', | ||||
|                 'child_sends_forever', | ||||
|                 enable_modules=[__name__], | ||||
|                 loglevel=loglevel, | ||||
|                 debug_mode=debug_mode, | ||||
|  |  | |||
		Loading…
	
		Reference in New Issue