From 85f1a66b418562c8e42c709cc1f3d6602328f582 Mon Sep 17 00:00:00 2001 From: Tyler Goodlet Date: Wed, 12 Mar 2025 13:15:48 -0400 Subject: [PATCH] Complete rename to parent->child IPC ctx peers Now changed in all comments docs **and** test-code content such that we aren't using the "caller"->"callee" semantics anymore. --- tests/test_context_stream_semantics.py | 105 ++++++++++++------------- 1 file changed, 49 insertions(+), 56 deletions(-) diff --git a/tests/test_context_stream_semantics.py b/tests/test_context_stream_semantics.py index 29e99b2e..14cb9cc6 100644 --- a/tests/test_context_stream_semantics.py +++ b/tests/test_context_stream_semantics.py @@ -38,9 +38,9 @@ from tractor._testing import ( # - standard setup/teardown: # ``Portal.open_context()`` starts a new # remote task context in another actor. The target actor's task must -# call ``Context.started()`` to unblock this entry on the caller side. -# the callee task executes until complete and returns a final value -# which is delivered to the caller side and retreived via +# call ``Context.started()`` to unblock this entry on the parent side. +# the child task executes until complete and returns a final value +# which is delivered to the parent side and retreived via # ``Context.result()``. # - cancel termination: @@ -170,9 +170,9 @@ async def assert_state(value: bool): [False, ValueError, KeyboardInterrupt], ) @pytest.mark.parametrize( - 'callee_blocks_forever', + 'child_blocks_forever', [False, True], - ids=lambda item: f'callee_blocks_forever={item}' + ids=lambda item: f'child_blocks_forever={item}' ) @pytest.mark.parametrize( 'pointlessly_open_stream', @@ -181,7 +181,7 @@ async def assert_state(value: bool): ) def test_simple_context( error_parent, - callee_blocks_forever, + child_blocks_forever, pointlessly_open_stream, debug_mode: bool, ): @@ -204,13 +204,13 @@ def test_simple_context( portal.open_context( simple_setup_teardown, data=10, - block_forever=callee_blocks_forever, + block_forever=child_blocks_forever, ) as (ctx, sent), ): assert current_ipc_ctx() is ctx assert sent == 11 - if callee_blocks_forever: + if child_blocks_forever: await portal.run(assert_state, value=True) else: assert await ctx.result() == 'yo' @@ -220,7 +220,7 @@ def test_simple_context( if error_parent: raise error_parent - if callee_blocks_forever: + if child_blocks_forever: await ctx.cancel() else: # in this case the stream will send a @@ -259,9 +259,9 @@ def test_simple_context( @pytest.mark.parametrize( - 'callee_returns_early', + 'child_returns_early', [True, False], - ids=lambda item: f'callee_returns_early={item}' + ids=lambda item: f'child_returns_early={item}' ) @pytest.mark.parametrize( 'cancel_method', @@ -273,14 +273,14 @@ def test_simple_context( [True, False], ids=lambda item: f'chk_ctx_result_before_exit={item}' ) -def test_caller_cancels( +def test_parent_cancels( cancel_method: str, chk_ctx_result_before_exit: bool, - callee_returns_early: bool, + child_returns_early: bool, debug_mode: bool, ): ''' - Verify that when the opening side of a context (aka the caller) + Verify that when the opening side of a context (aka the parent) cancels that context, the ctx does not raise a cancelled when either calling `.result()` or on context exit. @@ -294,7 +294,7 @@ def test_caller_cancels( if ( cancel_method == 'portal' - and not callee_returns_early + and not child_returns_early ): try: res = await ctx.result() @@ -318,7 +318,7 @@ def test_caller_cancels( pytest.fail(f'should not have raised ctxc\n{ctxc}') # we actually get a result - if callee_returns_early: + if child_returns_early: assert res == 'yo' assert ctx.outcome is res assert ctx.maybe_error is None @@ -362,14 +362,14 @@ def test_caller_cancels( ) timeout: float = ( 0.5 - if not callee_returns_early + if not child_returns_early else 2 ) with trio.fail_after(timeout): async with ( expect_ctxc( yay=( - not callee_returns_early + not child_returns_early and cancel_method == 'portal' ) ), @@ -377,13 +377,13 @@ def test_caller_cancels( portal.open_context( simple_setup_teardown, data=10, - block_forever=not callee_returns_early, + block_forever=not child_returns_early, ) as (ctx, sent), ): - if callee_returns_early: + if child_returns_early: # ensure we block long enough before sending - # a cancel such that the callee has already + # a cancel such that the child has already # returned it's result. await trio.sleep(0.5) @@ -421,7 +421,7 @@ def test_caller_cancels( # which should in turn cause `ctx._scope` to # catch any cancellation? if ( - not callee_returns_early + not child_returns_early and cancel_method != 'portal' ): assert not ctx._scope.cancelled_caught @@ -430,11 +430,11 @@ def test_caller_cancels( # basic stream terminations: -# - callee context closes without using stream -# - caller context closes without using stream -# - caller context calls `Context.cancel()` while streaming -# is ongoing resulting in callee being cancelled -# - callee calls `Context.cancel()` while streaming and caller +# - child context closes without using stream +# - parent context closes without using stream +# - parent context calls `Context.cancel()` while streaming +# is ongoing resulting in child being cancelled +# - child calls `Context.cancel()` while streaming and parent # sees stream terminated in `RemoteActorError` # TODO: future possible features @@ -470,7 +470,7 @@ async def test_child_exits_ctx_after_stream_open( parent_send_before_receive: bool, ): ''' - callee context closes without using stream. + child context closes without using stream. This should result in a msg sequence |__ @@ -485,13 +485,7 @@ async def test_child_exits_ctx_after_stream_open( ''' timeout: float = ( - 0.5 if ( - not debug_mode - # NOTE, for debugging final - # Return-consumed-n-discarded-ishue! - # and - # not parent_send_before_receive - ) else 999 + 0.5 if not debug_mode else 999 ) async with tractor.open_nursery( debug_mode=debug_mode, @@ -602,7 +596,7 @@ async def expect_cancelled( raise else: - assert 0, "callee wasn't cancelled !?" + assert 0, "child wasn't cancelled !?" @pytest.mark.parametrize( @@ -857,7 +851,7 @@ async def test_child_cancels_before_started( debug_mode: bool, ): ''' - Callee calls `Context.cancel()` while streaming and caller + Callee calls `Context.cancel()` while streaming and parent sees stream terminated in `ContextCancelled`. ''' @@ -910,7 +904,7 @@ async def keep_sending_from_child( ) -> None: ''' - Send endlessly on the calleee stream. + Send endlessly on the child stream. ''' await ctx.started() @@ -918,7 +912,7 @@ async def keep_sending_from_child( msg_buffer_size=msg_buffer_size, ) as stream: for msg in count(): - print(f'callee sending {msg}') + print(f'child sending {msg}') await stream.send(msg) await trio.sleep(0.01) @@ -926,12 +920,12 @@ async def keep_sending_from_child( @pytest.mark.parametrize( 'overrun_by', [ - ('caller', 1, never_open_stream), - ('callee', 0, keep_sending_from_child), + ('parent', 1, never_open_stream), + ('child', 0, keep_sending_from_child), ], ids=[ - ('caller_1buf_never_open_stream'), - ('callee_0buf_keep_sending_from_callee'), + ('parent_1buf_never_open_stream'), + ('child_0buf_keep_sending_from_child'), ] ) def test_one_end_stream_not_opened( @@ -962,8 +956,7 @@ def test_one_end_stream_not_opened( ) as (ctx, sent): assert sent is None - if 'caller' in overrunner: - + if 'parent' in overrunner: async with ctx.open_stream() as stream: # itersend +1 msg more then the buffer size @@ -978,7 +971,7 @@ def test_one_end_stream_not_opened( await trio.sleep_forever() else: - # callee overruns caller case so we do nothing here + # child overruns parent case so we do nothing here await trio.sleep_forever() await portal.cancel_actor() @@ -986,19 +979,19 @@ def test_one_end_stream_not_opened( # 2 overrun cases and the no overrun case (which pushes right up to # the msg limit) if ( - overrunner == 'caller' + overrunner == 'parent' ): with pytest.raises(tractor.RemoteActorError) as excinfo: trio.run(main) assert excinfo.value.boxed_type == StreamOverrun - elif overrunner == 'callee': + elif overrunner == 'child': with pytest.raises(tractor.RemoteActorError) as excinfo: trio.run(main) # TODO: embedded remote errors so that we can verify the source - # error? the callee delivers an error which is an overrun + # error? the child delivers an error which is an overrun # wrapped in a remote actor error. assert excinfo.value.boxed_type == tractor.RemoteActorError @@ -1017,12 +1010,12 @@ async def echo_back_sequence( ) -> None: ''' - Send endlessly on the calleee stream using a small buffer size + Send endlessly on the child stream using a small buffer size setting on the contex to simulate backlogging that would normally cause overruns. ''' - # NOTE: ensure that if the caller is expecting to cancel this task + # NOTE: ensure that if the parent is expecting to cancel this task # that we stay echoing much longer then they are so we don't # return early instead of receive the cancel msg. total_batches: int = ( @@ -1072,18 +1065,18 @@ async def echo_back_sequence( if be_slow: await trio.sleep(0.05) - print('callee waiting on next') + print('child waiting on next') - print(f'callee echoing back latest batch\n{batch}') + print(f'child echoing back latest batch\n{batch}') for msg in batch: - print(f'callee sending msg\n{msg}') + print(f'child sending msg\n{msg}') await stream.send(msg) try: return 'yo' finally: print( - 'exiting callee with context:\n' + 'exiting child with context:\n' f'{pformat(ctx)}\n' ) @@ -1137,7 +1130,7 @@ def test_maybe_allow_overruns_stream( debug_mode=debug_mode, ) as an: portal = await an.start_actor( - 'callee_sends_forever', + 'child_sends_forever', enable_modules=[__name__], loglevel=loglevel, debug_mode=debug_mode,