Extend ctx semantics suite for streaming edge cases!

Muchas grax to @guilledk for finding the first issue which kicked of
this further scrutiny of the `tractor.Context` and `MsgStream` semantics
test suite with a strange edge case where,
- if the parent opened and immediately closed a stream while the remote
  child task started and continued (without terminating) to send msgs
  the parent's `open_context().__aexit__()` would **not block** on the
  child to complete!
=> this was seemingly due to a bug discovered inside the
  `.msg._ops.drain_to_final_msg()` stream handling case logic where we
  are NOT checking if `Context._stream` is non-`None`!

As such this,
- extends the `test_caller_closes_ctx_after_callee_opens_stream` (now
  renamed, see below) to include cases for all combinations of the child
  and parent sending before receiving on the stream as well as all
  placements of `Context.cancel()` in the parent before, around and after
  the stream open.
- uses the new `expect_ctxc()` for expecting the taskc (`trio.Task`
  cancelled)` cases.
- also extends the `test_callee_closes_ctx_after_stream_open` (also
  renamed) to include the case where the parent sends a msg before it
=> this case has unveiled yet-another-bug where somehow the underlying
  `MsgStream._rx_chan: trio.ReceiveMemoryChannel` is allowing the
  child's `Return[None]` msg be consumed and NOT in a place where it is
  correctly set as `Context._result` resulting in the parent hanging
  forever inside `._ops.drain_to_final_msg()`..

- start renaming using the new "remote-task-peer-side" semantics
  throughout the test module: "caller" -> "parent", "callee" -> "child".
Tyler Goodlet 2025-03-11 14:04:55 -04:00
parent 9008b2a0d4
commit b13cd4f16b
1 changed files with 117 additions and 41 deletions

View File

@ -443,7 +443,6 @@ def test_caller_cancels(
async def close_ctx_immediately(
ctx: Context,
) -> None:
@ -454,10 +453,21 @@ async def close_ctx_immediately(
async with ctx.open_stream():
print('child returning!')
ids=lambda item: f'child_send_before_receive={item}'
async def test_callee_closes_ctx_after_stream_open(
async def test_child_exits_ctx_after_stream_open(
debug_mode: bool,
parent_send_before_receive: bool,
callee context closes without using stream.
@ -474,6 +484,15 @@ async def test_callee_closes_ctx_after_stream_open(
=> {'stop': True, 'cid': <str>}
timeout: float = (
0.5 if (
not debug_mode
# NOTE, for debugging final
# Return-consumed-n-discarded-ishue!
# and
# not parent_send_before_receive
) else 999
async with tractor.open_nursery(
) as an:
@ -482,7 +501,7 @@ async def test_callee_closes_ctx_after_stream_open(
with trio.fail_after(0.5):
with trio.fail_after(timeout):
async with portal.open_context(
@ -494,41 +513,56 @@ async def test_callee_closes_ctx_after_stream_open(
with trio.fail_after(0.4):
async with ctx.open_stream() as stream:
if parent_send_before_receive:
print('sending first msg from parent!')
await stream.send('yo')
# should fall through since ``StopAsyncIteration``
# should be raised through translation of
# a ``trio.EndOfChannel`` by
# ``trio.abc.ReceiveChannel.__anext__()``
async for _ in stream:
msg = 10
async for msg in stream:
# trigger failure if we DO NOT
# get an EOC!
assert 0
# never should get anythinig new from
# the underlying stream
assert msg == 10
# verify stream is now closed
with trio.fail_after(0.3):
print('parent trying to `.receive()` on EoC stream!')
await stream.receive()
assert 0, 'should have raised eoc!?'
except trio.EndOfChannel:
print('parent got EoC as expected!')
# raise
# TODO: should be just raise the closed resource err
# directly here to enforce not allowing a re-open
# of a stream to the context (at least until a time of
# if/when we decide that's a good idea?)
with trio.fail_after(0.5):
with trio.fail_after(timeout):
async with ctx.open_stream() as stream:
except trio.ClosedResourceError:
# if ctx._rx_chan._state.data:
# await tractor.pause()
await portal.cancel_actor()
async def expect_cancelled(
ctx: Context,
send_before_receive: bool = False,
) -> None:
global _state
@ -538,6 +572,10 @@ async def expect_cancelled(
async with ctx.open_stream() as stream:
if send_before_receive:
await stream.send('yo')
async for msg in stream:
await stream.send(msg) # echo server
@ -567,23 +605,46 @@ async def expect_cancelled(
assert 0, "callee wasn't cancelled !?"
ids=lambda item: f'child_send_before_receive={item}'
ids=lambda item: f'rent_wait_for_msg={item}'
[False, True],
ids=lambda item: f'use_ctx_cancel_method={item}'
async def test_caller_closes_ctx_after_callee_opens_stream(
use_ctx_cancel_method: bool,
async def test_parent_exits_ctx_after_child_enters_stream(
use_ctx_cancel_method: bool|str,
debug_mode: bool,
rent_wait_for_msg: bool,
child_send_before_receive: bool,
caller context closes without using/opening stream
Parent-side of IPC context closes without sending on `MsgStream`.
async with tractor.open_nursery(
) as an:
root: Actor = current_actor()
portal = await an.start_actor(
@ -592,26 +653,45 @@ async def test_caller_closes_ctx_after_callee_opens_stream(
async with portal.open_context(
) as (ctx, sent):
assert sent is None
await portal.run(assert_state, value=True)
# call `ctx.cancel()` explicitly
if use_ctx_cancel_method:
if use_ctx_cancel_method == 'pre_stream':
await ctx.cancel()
# NOTE: means the local side `ctx._scope` will
# have been cancelled by an ctxc ack and thus
# `._scope.cancelled_caught` should be set.
async with ctx.open_stream() as stream:
async for msg in stream:
except tractor.ContextCancelled as ctxc:
async with (
# XXX: the cause is US since we call
# `Context.cancel()` just above!
# XXX: must be propagated to __aexit__
# and should be silently absorbed there
# since we called `.cancel()` just above ;)
) as maybe_ctxc,
async with ctx.open_stream() as stream:
if rent_wait_for_msg:
async for msg in stream:
print(f'PARENT rx: {msg!r}\n')
if use_ctx_cancel_method == 'post_stream_open':
await ctx.cancel()
if use_ctx_cancel_method == 'post_stream_close':
await ctx.cancel()
ctxc: tractor.ContextCancelled = maybe_ctxc.value
assert (
@ -620,14 +700,6 @@ async def test_caller_closes_ctx_after_callee_opens_stream(
# XXX: must be propagated to __aexit__
# and should be silently absorbed there
# since we called `.cancel()` just above ;)
assert 0, "Should have context cancelled?"
# channel should still be up
assert portal.channel.connected()
@ -637,13 +709,20 @@ async def test_caller_closes_ctx_after_callee_opens_stream(
# XXX CHILD-BLOCKS case, we SHOULD NOT exit from the
# `.open_context()` before the child has returned,
# errored or been cancelled!
with trio.fail_after(0.2):
await ctx.result()
with trio.fail_after(
0.5 # if not debug_mode else 999
res = await ctx.wait_for_result()
assert res is not tractor._context.Unresolved
assert 0, "Callee should have blocked!?"
except trio.TooSlowError:
# NO-OP -> since already called above
# NO-OP -> since already triggered by
# `trio.fail_after()` above!
await ctx.cancel()
# NOTE: local scope should have absorbed the cancellation since
@ -683,7 +762,7 @@ async def test_caller_closes_ctx_after_callee_opens_stream(
async def test_multitask_caller_cancels_from_nonroot_task(
async def test_multitask_parent_cancels_from_nonroot_task(
debug_mode: bool,
async with tractor.open_nursery(
@ -735,7 +814,6 @@ async def test_multitask_caller_cancels_from_nonroot_task(
async def cancel_self(
ctx: Context,
) -> None:
@ -775,7 +853,7 @@ async def cancel_self(
async def test_callee_cancels_before_started(
async def test_child_cancels_before_started(
debug_mode: bool,
@ -826,8 +904,7 @@ async def never_open_stream(
async def keep_sending_from_callee(
async def keep_sending_from_child(
ctx: Context,
msg_buffer_size: int|None = None,
@ -850,7 +927,7 @@ async def keep_sending_from_callee(
('caller', 1, never_open_stream),
('callee', 0, keep_sending_from_callee),
('callee', 0, keep_sending_from_child),
@ -931,7 +1008,6 @@ def test_one_end_stream_not_opened(
async def echo_back_sequence(
ctx: Context,
seq: list[int],
wait_for_cancel: bool,