Complete rename to parent->child IPC ctx peers
Now changed in all comments docs **and** test-code content such that we aren't using the "caller"->"callee" semantics anymore.
parent
fde6d84b9d
commit
85f1a66b41
|
@ -38,9 +38,9 @@ from tractor._testing import (
|
|||
# - standard setup/teardown:
|
||||
# ``Portal.open_context()`` starts a new
|
||||
# remote task context in another actor. The target actor's task must
|
||||
# call ``Context.started()`` to unblock this entry on the caller side.
|
||||
# the callee task executes until complete and returns a final value
|
||||
# which is delivered to the caller side and retreived via
|
||||
# call ``Context.started()`` to unblock this entry on the parent side.
|
||||
# the child task executes until complete and returns a final value
|
||||
# which is delivered to the parent side and retreived via
|
||||
# ``Context.result()``.
|
||||
|
||||
# - cancel termination:
|
||||
|
@ -170,9 +170,9 @@ async def assert_state(value: bool):
|
|||
[False, ValueError, KeyboardInterrupt],
|
||||
)
|
||||
@pytest.mark.parametrize(
|
||||
'callee_blocks_forever',
|
||||
'child_blocks_forever',
|
||||
[False, True],
|
||||
ids=lambda item: f'callee_blocks_forever={item}'
|
||||
ids=lambda item: f'child_blocks_forever={item}'
|
||||
)
|
||||
@pytest.mark.parametrize(
|
||||
'pointlessly_open_stream',
|
||||
|
@ -181,7 +181,7 @@ async def assert_state(value: bool):
|
|||
)
|
||||
def test_simple_context(
|
||||
error_parent,
|
||||
callee_blocks_forever,
|
||||
child_blocks_forever,
|
||||
pointlessly_open_stream,
|
||||
debug_mode: bool,
|
||||
):
|
||||
|
@ -204,13 +204,13 @@ def test_simple_context(
|
|||
portal.open_context(
|
||||
simple_setup_teardown,
|
||||
data=10,
|
||||
block_forever=callee_blocks_forever,
|
||||
block_forever=child_blocks_forever,
|
||||
) as (ctx, sent),
|
||||
):
|
||||
assert current_ipc_ctx() is ctx
|
||||
assert sent == 11
|
||||
|
||||
if callee_blocks_forever:
|
||||
if child_blocks_forever:
|
||||
await portal.run(assert_state, value=True)
|
||||
else:
|
||||
assert await ctx.result() == 'yo'
|
||||
|
@ -220,7 +220,7 @@ def test_simple_context(
|
|||
if error_parent:
|
||||
raise error_parent
|
||||
|
||||
if callee_blocks_forever:
|
||||
if child_blocks_forever:
|
||||
await ctx.cancel()
|
||||
else:
|
||||
# in this case the stream will send a
|
||||
|
@ -259,9 +259,9 @@ def test_simple_context(
|
|||
|
||||
|
||||
@pytest.mark.parametrize(
|
||||
'callee_returns_early',
|
||||
'child_returns_early',
|
||||
[True, False],
|
||||
ids=lambda item: f'callee_returns_early={item}'
|
||||
ids=lambda item: f'child_returns_early={item}'
|
||||
)
|
||||
@pytest.mark.parametrize(
|
||||
'cancel_method',
|
||||
|
@ -273,14 +273,14 @@ def test_simple_context(
|
|||
[True, False],
|
||||
ids=lambda item: f'chk_ctx_result_before_exit={item}'
|
||||
)
|
||||
def test_caller_cancels(
|
||||
def test_parent_cancels(
|
||||
cancel_method: str,
|
||||
chk_ctx_result_before_exit: bool,
|
||||
callee_returns_early: bool,
|
||||
child_returns_early: bool,
|
||||
debug_mode: bool,
|
||||
):
|
||||
'''
|
||||
Verify that when the opening side of a context (aka the caller)
|
||||
Verify that when the opening side of a context (aka the parent)
|
||||
cancels that context, the ctx does not raise a cancelled when
|
||||
either calling `.result()` or on context exit.
|
||||
|
||||
|
@ -294,7 +294,7 @@ def test_caller_cancels(
|
|||
|
||||
if (
|
||||
cancel_method == 'portal'
|
||||
and not callee_returns_early
|
||||
and not child_returns_early
|
||||
):
|
||||
try:
|
||||
res = await ctx.result()
|
||||
|
@ -318,7 +318,7 @@ def test_caller_cancels(
|
|||
pytest.fail(f'should not have raised ctxc\n{ctxc}')
|
||||
|
||||
# we actually get a result
|
||||
if callee_returns_early:
|
||||
if child_returns_early:
|
||||
assert res == 'yo'
|
||||
assert ctx.outcome is res
|
||||
assert ctx.maybe_error is None
|
||||
|
@ -362,14 +362,14 @@ def test_caller_cancels(
|
|||
)
|
||||
timeout: float = (
|
||||
0.5
|
||||
if not callee_returns_early
|
||||
if not child_returns_early
|
||||
else 2
|
||||
)
|
||||
with trio.fail_after(timeout):
|
||||
async with (
|
||||
expect_ctxc(
|
||||
yay=(
|
||||
not callee_returns_early
|
||||
not child_returns_early
|
||||
and cancel_method == 'portal'
|
||||
)
|
||||
),
|
||||
|
@ -377,13 +377,13 @@ def test_caller_cancels(
|
|||
portal.open_context(
|
||||
simple_setup_teardown,
|
||||
data=10,
|
||||
block_forever=not callee_returns_early,
|
||||
block_forever=not child_returns_early,
|
||||
) as (ctx, sent),
|
||||
):
|
||||
|
||||
if callee_returns_early:
|
||||
if child_returns_early:
|
||||
# ensure we block long enough before sending
|
||||
# a cancel such that the callee has already
|
||||
# a cancel such that the child has already
|
||||
# returned it's result.
|
||||
await trio.sleep(0.5)
|
||||
|
||||
|
@ -421,7 +421,7 @@ def test_caller_cancels(
|
|||
# which should in turn cause `ctx._scope` to
|
||||
# catch any cancellation?
|
||||
if (
|
||||
not callee_returns_early
|
||||
not child_returns_early
|
||||
and cancel_method != 'portal'
|
||||
):
|
||||
assert not ctx._scope.cancelled_caught
|
||||
|
@ -430,11 +430,11 @@ def test_caller_cancels(
|
|||
|
||||
|
||||
# basic stream terminations:
|
||||
# - callee context closes without using stream
|
||||
# - caller context closes without using stream
|
||||
# - caller context calls `Context.cancel()` while streaming
|
||||
# is ongoing resulting in callee being cancelled
|
||||
# - callee calls `Context.cancel()` while streaming and caller
|
||||
# - child context closes without using stream
|
||||
# - parent context closes without using stream
|
||||
# - parent context calls `Context.cancel()` while streaming
|
||||
# is ongoing resulting in child being cancelled
|
||||
# - child calls `Context.cancel()` while streaming and parent
|
||||
# sees stream terminated in `RemoteActorError`
|
||||
|
||||
# TODO: future possible features
|
||||
|
@ -470,7 +470,7 @@ async def test_child_exits_ctx_after_stream_open(
|
|||
parent_send_before_receive: bool,
|
||||
):
|
||||
'''
|
||||
callee context closes without using stream.
|
||||
child context closes without using stream.
|
||||
|
||||
This should result in a msg sequence
|
||||
|_<root>_
|
||||
|
@ -485,13 +485,7 @@ async def test_child_exits_ctx_after_stream_open(
|
|||
|
||||
'''
|
||||
timeout: float = (
|
||||
0.5 if (
|
||||
not debug_mode
|
||||
# NOTE, for debugging final
|
||||
# Return-consumed-n-discarded-ishue!
|
||||
# and
|
||||
# not parent_send_before_receive
|
||||
) else 999
|
||||
0.5 if not debug_mode else 999
|
||||
)
|
||||
async with tractor.open_nursery(
|
||||
debug_mode=debug_mode,
|
||||
|
@ -602,7 +596,7 @@ async def expect_cancelled(
|
|||
raise
|
||||
|
||||
else:
|
||||
assert 0, "callee wasn't cancelled !?"
|
||||
assert 0, "child wasn't cancelled !?"
|
||||
|
||||
|
||||
@pytest.mark.parametrize(
|
||||
|
@ -857,7 +851,7 @@ async def test_child_cancels_before_started(
|
|||
debug_mode: bool,
|
||||
):
|
||||
'''
|
||||
Callee calls `Context.cancel()` while streaming and caller
|
||||
Callee calls `Context.cancel()` while streaming and parent
|
||||
sees stream terminated in `ContextCancelled`.
|
||||
|
||||
'''
|
||||
|
@ -910,7 +904,7 @@ async def keep_sending_from_child(
|
|||
|
||||
) -> None:
|
||||
'''
|
||||
Send endlessly on the calleee stream.
|
||||
Send endlessly on the child stream.
|
||||
|
||||
'''
|
||||
await ctx.started()
|
||||
|
@ -918,7 +912,7 @@ async def keep_sending_from_child(
|
|||
msg_buffer_size=msg_buffer_size,
|
||||
) as stream:
|
||||
for msg in count():
|
||||
print(f'callee sending {msg}')
|
||||
print(f'child sending {msg}')
|
||||
await stream.send(msg)
|
||||
await trio.sleep(0.01)
|
||||
|
||||
|
@ -926,12 +920,12 @@ async def keep_sending_from_child(
|
|||
@pytest.mark.parametrize(
|
||||
'overrun_by',
|
||||
[
|
||||
('caller', 1, never_open_stream),
|
||||
('callee', 0, keep_sending_from_child),
|
||||
('parent', 1, never_open_stream),
|
||||
('child', 0, keep_sending_from_child),
|
||||
],
|
||||
ids=[
|
||||
('caller_1buf_never_open_stream'),
|
||||
('callee_0buf_keep_sending_from_callee'),
|
||||
('parent_1buf_never_open_stream'),
|
||||
('child_0buf_keep_sending_from_child'),
|
||||
]
|
||||
)
|
||||
def test_one_end_stream_not_opened(
|
||||
|
@ -962,8 +956,7 @@ def test_one_end_stream_not_opened(
|
|||
) as (ctx, sent):
|
||||
assert sent is None
|
||||
|
||||
if 'caller' in overrunner:
|
||||
|
||||
if 'parent' in overrunner:
|
||||
async with ctx.open_stream() as stream:
|
||||
|
||||
# itersend +1 msg more then the buffer size
|
||||
|
@ -978,7 +971,7 @@ def test_one_end_stream_not_opened(
|
|||
await trio.sleep_forever()
|
||||
|
||||
else:
|
||||
# callee overruns caller case so we do nothing here
|
||||
# child overruns parent case so we do nothing here
|
||||
await trio.sleep_forever()
|
||||
|
||||
await portal.cancel_actor()
|
||||
|
@ -986,19 +979,19 @@ def test_one_end_stream_not_opened(
|
|||
# 2 overrun cases and the no overrun case (which pushes right up to
|
||||
# the msg limit)
|
||||
if (
|
||||
overrunner == 'caller'
|
||||
overrunner == 'parent'
|
||||
):
|
||||
with pytest.raises(tractor.RemoteActorError) as excinfo:
|
||||
trio.run(main)
|
||||
|
||||
assert excinfo.value.boxed_type == StreamOverrun
|
||||
|
||||
elif overrunner == 'callee':
|
||||
elif overrunner == 'child':
|
||||
with pytest.raises(tractor.RemoteActorError) as excinfo:
|
||||
trio.run(main)
|
||||
|
||||
# TODO: embedded remote errors so that we can verify the source
|
||||
# error? the callee delivers an error which is an overrun
|
||||
# error? the child delivers an error which is an overrun
|
||||
# wrapped in a remote actor error.
|
||||
assert excinfo.value.boxed_type == tractor.RemoteActorError
|
||||
|
||||
|
@ -1017,12 +1010,12 @@ async def echo_back_sequence(
|
|||
|
||||
) -> None:
|
||||
'''
|
||||
Send endlessly on the calleee stream using a small buffer size
|
||||
Send endlessly on the child stream using a small buffer size
|
||||
setting on the contex to simulate backlogging that would normally
|
||||
cause overruns.
|
||||
|
||||
'''
|
||||
# NOTE: ensure that if the caller is expecting to cancel this task
|
||||
# NOTE: ensure that if the parent is expecting to cancel this task
|
||||
# that we stay echoing much longer then they are so we don't
|
||||
# return early instead of receive the cancel msg.
|
||||
total_batches: int = (
|
||||
|
@ -1072,18 +1065,18 @@ async def echo_back_sequence(
|
|||
if be_slow:
|
||||
await trio.sleep(0.05)
|
||||
|
||||
print('callee waiting on next')
|
||||
print('child waiting on next')
|
||||
|
||||
print(f'callee echoing back latest batch\n{batch}')
|
||||
print(f'child echoing back latest batch\n{batch}')
|
||||
for msg in batch:
|
||||
print(f'callee sending msg\n{msg}')
|
||||
print(f'child sending msg\n{msg}')
|
||||
await stream.send(msg)
|
||||
|
||||
try:
|
||||
return 'yo'
|
||||
finally:
|
||||
print(
|
||||
'exiting callee with context:\n'
|
||||
'exiting child with context:\n'
|
||||
f'{pformat(ctx)}\n'
|
||||
)
|
||||
|
||||
|
@ -1137,7 +1130,7 @@ def test_maybe_allow_overruns_stream(
|
|||
debug_mode=debug_mode,
|
||||
) as an:
|
||||
portal = await an.start_actor(
|
||||
'callee_sends_forever',
|
||||
'child_sends_forever',
|
||||
enable_modules=[__name__],
|
||||
loglevel=loglevel,
|
||||
debug_mode=debug_mode,
|
||||
|
|
Loading…
Reference in New Issue