Compare commits

..

3 Commits

Author SHA1 Message Date
Tyler Goodlet 3a3fd36890 Fix msg-draining on `parent_never_opened_stream`!
Repairs a bug in `drain_to_final_msg()` where in the `Yield()` case
block we weren't guarding against the `ctx._stream is None` edge case
which should be treated a `continue`-draining (not a `break` or
attr-error!!) situation since the peer task maybe be continuing to send
`Yield` but has not yet sent an outcome msg (one of
`Return/Error/ContextCancelled`) to terminate the loop. Ensure we
explicitly warn about this case as well as `.cancel()` emit on a taskc.

Thanks again to @guille for discovering this!

Also add temporary `.info()`s around rxed `Return` msgs as part of
trying to debug a different bug discovered while updating the
context-semantics test suite (in a prior commit).
2025-03-11 14:31:53 -04:00
Tyler Goodlet f0561fc8c0 Extend ctx semantics suite for streaming edge cases!
Muchas grax to @guilledk for finding the first issue which kicked of
this further scrutiny of the `tractor.Context` and `MsgStream` semantics
test suite with a strange edge case where,
- if the parent opened and immediately closed a stream while the remote
  child task started and continued (without terminating) to send msgs
  the parent's `open_context().__aexit__()` would **not block** on the
  child to complete!
=> this was seemingly due to a bug discovered inside the
  `.msg._ops.drain_to_final_msg()` stream handling case logic where we
  are NOT checking if `Context._stream` is non-`None`!

As such this,
- extends the `test_caller_closes_ctx_after_callee_opens_stream` (now
  renamed, see below) to include cases for all combinations of the child
  and parent sending before receiving on the stream as well as all
  placements of `Context.cancel()` in the parent before, around and after
  the stream open.
- uses the new `expect_ctxc()` for expecting the taskc (`trio.Task`
  cancelled)` cases.
- also extends the `test_callee_closes_ctx_after_stream_open` (also
  renamed) to include the case where the parent sends a msg before it
  receives.
=> this case has unveiled yet-another-bug where somehow the underlying
  `MsgStream._rx_chan: trio.ReceiveMemoryChannel` is allowing the
  child's `Return[None]` msg be consumed and NOT in a place where it is
  correctly set as `Context._result` resulting in the parent hanging
  forever inside `._ops.drain_to_final_msg()`..

Alongside,
- start renaming using the new "remote-task-peer-side" semantics
  throughout the test module: "caller" -> "parent", "callee" -> "child".
2025-03-11 14:04:55 -04:00
Tyler Goodlet d1abe4da44 Deliver a `MaybeBoxedError` from `.expect_ctxc()`
Just like we do from the `.devx._debug.open_crash_handler()`, this
allows checking various attrs on the raised `ContextCancelled` much like
`with pytest.raises() as excinfo:`.
2025-03-10 18:17:31 -04:00
3 changed files with 188 additions and 64 deletions

View File

@ -443,7 +443,6 @@ def test_caller_cancels(
@tractor.context @tractor.context
async def close_ctx_immediately( async def close_ctx_immediately(
ctx: Context, ctx: Context,
) -> None: ) -> None:
@ -454,10 +453,21 @@ async def close_ctx_immediately(
async with ctx.open_stream(): async with ctx.open_stream():
pass pass
print('child returning!')
@pytest.mark.parametrize(
'parent_send_before_receive',
[
False,
True,
],
ids=lambda item: f'child_send_before_receive={item}'
)
@tractor_test @tractor_test
async def test_callee_closes_ctx_after_stream_open( async def test_child_exits_ctx_after_stream_open(
debug_mode: bool, debug_mode: bool,
parent_send_before_receive: bool,
): ):
''' '''
callee context closes without using stream. callee context closes without using stream.
@ -474,6 +484,15 @@ async def test_callee_closes_ctx_after_stream_open(
=> {'stop': True, 'cid': <str>} => {'stop': True, 'cid': <str>}
''' '''
timeout: float = (
0.5 if (
not debug_mode
# NOTE, for debugging final
# Return-consumed-n-discarded-ishue!
# and
# not parent_send_before_receive
) else 999
)
async with tractor.open_nursery( async with tractor.open_nursery(
debug_mode=debug_mode, debug_mode=debug_mode,
) as an: ) as an:
@ -482,7 +501,7 @@ async def test_callee_closes_ctx_after_stream_open(
enable_modules=[__name__], enable_modules=[__name__],
) )
with trio.fail_after(0.5): with trio.fail_after(timeout):
async with portal.open_context( async with portal.open_context(
close_ctx_immediately, close_ctx_immediately,
@ -494,41 +513,56 @@ async def test_callee_closes_ctx_after_stream_open(
with trio.fail_after(0.4): with trio.fail_after(0.4):
async with ctx.open_stream() as stream: async with ctx.open_stream() as stream:
if parent_send_before_receive:
print('sending first msg from parent!')
await stream.send('yo')
# should fall through since ``StopAsyncIteration`` # should fall through since ``StopAsyncIteration``
# should be raised through translation of # should be raised through translation of
# a ``trio.EndOfChannel`` by # a ``trio.EndOfChannel`` by
# ``trio.abc.ReceiveChannel.__anext__()`` # ``trio.abc.ReceiveChannel.__anext__()``
async for _ in stream: msg = 10
async for msg in stream:
# trigger failure if we DO NOT # trigger failure if we DO NOT
# get an EOC! # get an EOC!
assert 0 assert 0
else: else:
# never should get anythinig new from
# the underlying stream
assert msg == 10
# verify stream is now closed # verify stream is now closed
try: try:
with trio.fail_after(0.3): with trio.fail_after(0.3):
print('parent trying to `.receive()` on EoC stream!')
await stream.receive() await stream.receive()
assert 0, 'should have raised eoc!?'
except trio.EndOfChannel: except trio.EndOfChannel:
print('parent got EoC as expected!')
pass pass
# raise
# TODO: should be just raise the closed resource err # TODO: should be just raise the closed resource err
# directly here to enforce not allowing a re-open # directly here to enforce not allowing a re-open
# of a stream to the context (at least until a time of # of a stream to the context (at least until a time of
# if/when we decide that's a good idea?) # if/when we decide that's a good idea?)
try: try:
with trio.fail_after(0.5): with trio.fail_after(timeout):
async with ctx.open_stream() as stream: async with ctx.open_stream() as stream:
pass pass
except trio.ClosedResourceError: except trio.ClosedResourceError:
pass pass
# if ctx._rx_chan._state.data:
# await tractor.pause()
await portal.cancel_actor() await portal.cancel_actor()
@tractor.context @tractor.context
async def expect_cancelled( async def expect_cancelled(
ctx: Context, ctx: Context,
send_before_receive: bool = False,
) -> None: ) -> None:
global _state global _state
@ -538,6 +572,10 @@ async def expect_cancelled(
try: try:
async with ctx.open_stream() as stream: async with ctx.open_stream() as stream:
if send_before_receive:
await stream.send('yo')
async for msg in stream: async for msg in stream:
await stream.send(msg) # echo server await stream.send(msg) # echo server
@ -567,23 +605,46 @@ async def expect_cancelled(
assert 0, "callee wasn't cancelled !?" assert 0, "callee wasn't cancelled !?"
@pytest.mark.parametrize(
'child_send_before_receive',
[
False,
True,
],
ids=lambda item: f'child_send_before_receive={item}'
)
@pytest.mark.parametrize(
'rent_wait_for_msg',
[
False,
True,
],
ids=lambda item: f'rent_wait_for_msg={item}'
)
@pytest.mark.parametrize( @pytest.mark.parametrize(
'use_ctx_cancel_method', 'use_ctx_cancel_method',
[False, True], [
False,
'pre_stream',
'post_stream_open',
'post_stream_close',
],
ids=lambda item: f'use_ctx_cancel_method={item}'
) )
@tractor_test @tractor_test
async def test_caller_closes_ctx_after_callee_opens_stream( async def test_parent_exits_ctx_after_child_enters_stream(
use_ctx_cancel_method: bool, use_ctx_cancel_method: bool|str,
debug_mode: bool, debug_mode: bool,
rent_wait_for_msg: bool,
child_send_before_receive: bool,
): ):
''' '''
caller context closes without using/opening stream Parent-side of IPC context closes without sending on `MsgStream`.
''' '''
async with tractor.open_nursery( async with tractor.open_nursery(
debug_mode=debug_mode, debug_mode=debug_mode,
) as an: ) as an:
root: Actor = current_actor() root: Actor = current_actor()
portal = await an.start_actor( portal = await an.start_actor(
'ctx_cancelled', 'ctx_cancelled',
@ -592,26 +653,45 @@ async def test_caller_closes_ctx_after_callee_opens_stream(
async with portal.open_context( async with portal.open_context(
expect_cancelled, expect_cancelled,
send_before_receive=child_send_before_receive,
) as (ctx, sent): ) as (ctx, sent):
assert sent is None assert sent is None
await portal.run(assert_state, value=True) await portal.run(assert_state, value=True)
# call `ctx.cancel()` explicitly # call `ctx.cancel()` explicitly
if use_ctx_cancel_method: if use_ctx_cancel_method == 'pre_stream':
await ctx.cancel() await ctx.cancel()
# NOTE: means the local side `ctx._scope` will # NOTE: means the local side `ctx._scope` will
# have been cancelled by an ctxc ack and thus # have been cancelled by an ctxc ack and thus
# `._scope.cancelled_caught` should be set. # `._scope.cancelled_caught` should be set.
try: async with (
async with ctx.open_stream() as stream: expect_ctxc(
async for msg in stream:
pass
except tractor.ContextCancelled as ctxc:
# XXX: the cause is US since we call # XXX: the cause is US since we call
# `Context.cancel()` just above! # `Context.cancel()` just above!
yay=True,
# XXX: must be propagated to __aexit__
# and should be silently absorbed there
# since we called `.cancel()` just above ;)
reraise=True,
) as maybe_ctxc,
):
async with ctx.open_stream() as stream:
if rent_wait_for_msg:
async for msg in stream:
print(f'PARENT rx: {msg!r}\n')
break
if use_ctx_cancel_method == 'post_stream_open':
await ctx.cancel()
if use_ctx_cancel_method == 'post_stream_close':
await ctx.cancel()
ctxc: tractor.ContextCancelled = maybe_ctxc.value
assert ( assert (
ctxc.canceller ctxc.canceller
== ==
@ -620,14 +700,6 @@ async def test_caller_closes_ctx_after_callee_opens_stream(
root.uid root.uid
) )
# XXX: must be propagated to __aexit__
# and should be silently absorbed there
# since we called `.cancel()` just above ;)
raise
else:
assert 0, "Should have context cancelled?"
# channel should still be up # channel should still be up
assert portal.channel.connected() assert portal.channel.connected()
@ -637,13 +709,20 @@ async def test_caller_closes_ctx_after_callee_opens_stream(
value=False, value=False,
) )
# XXX CHILD-BLOCKS case, we SHOULD NOT exit from the
# `.open_context()` before the child has returned,
# errored or been cancelled!
else: else:
try: try:
with trio.fail_after(0.2): with trio.fail_after(
await ctx.result() 0.5 # if not debug_mode else 999
):
res = await ctx.wait_for_result()
assert res is not tractor._context.Unresolved
assert 0, "Callee should have blocked!?" assert 0, "Callee should have blocked!?"
except trio.TooSlowError: except trio.TooSlowError:
# NO-OP -> since already called above # NO-OP -> since already triggered by
# `trio.fail_after()` above!
await ctx.cancel() await ctx.cancel()
# NOTE: local scope should have absorbed the cancellation since # NOTE: local scope should have absorbed the cancellation since
@ -683,7 +762,7 @@ async def test_caller_closes_ctx_after_callee_opens_stream(
@tractor_test @tractor_test
async def test_multitask_caller_cancels_from_nonroot_task( async def test_multitask_parent_cancels_from_nonroot_task(
debug_mode: bool, debug_mode: bool,
): ):
async with tractor.open_nursery( async with tractor.open_nursery(
@ -735,7 +814,6 @@ async def test_multitask_caller_cancels_from_nonroot_task(
@tractor.context @tractor.context
async def cancel_self( async def cancel_self(
ctx: Context, ctx: Context,
) -> None: ) -> None:
@ -775,7 +853,7 @@ async def cancel_self(
@tractor_test @tractor_test
async def test_callee_cancels_before_started( async def test_child_cancels_before_started(
debug_mode: bool, debug_mode: bool,
): ):
''' '''
@ -826,8 +904,7 @@ async def never_open_stream(
@tractor.context @tractor.context
async def keep_sending_from_callee( async def keep_sending_from_child(
ctx: Context, ctx: Context,
msg_buffer_size: int|None = None, msg_buffer_size: int|None = None,
@ -850,7 +927,7 @@ async def keep_sending_from_callee(
'overrun_by', 'overrun_by',
[ [
('caller', 1, never_open_stream), ('caller', 1, never_open_stream),
('callee', 0, keep_sending_from_callee), ('callee', 0, keep_sending_from_child),
], ],
ids=[ ids=[
('caller_1buf_never_open_stream'), ('caller_1buf_never_open_stream'),
@ -931,7 +1008,6 @@ def test_one_end_stream_not_opened(
@tractor.context @tractor.context
async def echo_back_sequence( async def echo_back_sequence(
ctx: Context, ctx: Context,
seq: list[int], seq: list[int],
wait_for_cancel: bool, wait_for_cancel: bool,

View File

@ -26,6 +26,9 @@ import os
import pathlib import pathlib
import tractor import tractor
from tractor.devx._debug import (
BoxedMaybeException,
)
from .pytest import ( from .pytest import (
tractor_test as tractor_test tractor_test as tractor_test
) )
@ -98,12 +101,13 @@ async def expect_ctxc(
''' '''
if yay: if yay:
try: try:
yield yield (maybe_exc := BoxedMaybeException())
raise RuntimeError('Never raised ctxc?') raise RuntimeError('Never raised ctxc?')
except tractor.ContextCancelled: except tractor.ContextCancelled as ctxc:
maybe_exc.value = ctxc
if reraise: if reraise:
raise raise
else: else:
return return
else: else:
yield yield (maybe_exc := BoxedMaybeException())

View File

@ -186,10 +186,16 @@ class PldRx(Struct):
msg: MsgType = ( msg: MsgType = (
ipc_msg ipc_msg
or or
# sync-rx msg from underlying IPC feeder (mem-)chan # sync-rx msg from underlying IPC feeder (mem-)chan
ipc._rx_chan.receive_nowait() ipc._rx_chan.receive_nowait()
) )
if (
type(msg) is Return
):
log.info(
f'Rxed final result msg\n'
f'{msg}\n'
)
return self.decode_pld( return self.decode_pld(
msg, msg,
ipc=ipc, ipc=ipc,
@ -219,6 +225,13 @@ class PldRx(Struct):
# async-rx msg from underlying IPC feeder (mem-)chan # async-rx msg from underlying IPC feeder (mem-)chan
await ipc._rx_chan.receive() await ipc._rx_chan.receive()
) )
if (
type(msg) is Return
):
log.info(
f'Rxed final result msg\n'
f'{msg}\n'
)
return self.decode_pld( return self.decode_pld(
msg=msg, msg=msg,
ipc=ipc, ipc=ipc,
@ -407,8 +420,6 @@ class PldRx(Struct):
__tracebackhide__: bool = False __tracebackhide__: bool = False
raise raise
dec_msg = decode_pld
async def recv_msg_w_pld( async def recv_msg_w_pld(
self, self,
ipc: Context|MsgStream, ipc: Context|MsgStream,
@ -422,12 +433,19 @@ class PldRx(Struct):
) -> tuple[MsgType, PayloadT]: ) -> tuple[MsgType, PayloadT]:
''' '''
Retrieve the next avail IPC msg, decode it's payload, and return Retrieve the next avail IPC msg, decode it's payload, and
the pair of refs. return the pair of refs.
''' '''
__tracebackhide__: bool = hide_tb __tracebackhide__: bool = hide_tb
msg: MsgType = await ipc._rx_chan.receive() msg: MsgType = await ipc._rx_chan.receive()
if (
type(msg) is Return
):
log.info(
f'Rxed final result msg\n'
f'{msg}\n'
)
if passthrough_non_pld_msgs: if passthrough_non_pld_msgs:
match msg: match msg:
@ -444,6 +462,10 @@ class PldRx(Struct):
hide_tb=hide_tb, hide_tb=hide_tb,
**kwargs, **kwargs,
) )
# log.runtime(
# f'Delivering payload msg\n'
# f'{msg}\n'
# )
return msg, pld return msg, pld
@ -538,8 +560,8 @@ async def maybe_limit_plds(
async def drain_to_final_msg( async def drain_to_final_msg(
ctx: Context, ctx: Context,
hide_tb: bool = True,
msg_limit: int = 6, msg_limit: int = 6,
hide_tb: bool = True,
) -> tuple[ ) -> tuple[
Return|None, Return|None,
@ -568,8 +590,8 @@ async def drain_to_final_msg(
even after ctx closure and the `.open_context()` block exit. even after ctx closure and the `.open_context()` block exit.
''' '''
__tracebackhide__: bool = hide_tb
raise_overrun: bool = not ctx._allow_overruns raise_overrun: bool = not ctx._allow_overruns
parent_never_opened_stream: bool = ctx._stream is None
# wait for a final context result by collecting (but # wait for a final context result by collecting (but
# basically ignoring) any bi-dir-stream msgs still in transit # basically ignoring) any bi-dir-stream msgs still in transit
@ -578,7 +600,8 @@ async def drain_to_final_msg(
result_msg: Return|Error|None = None result_msg: Return|Error|None = None
while not ( while not (
ctx.maybe_error ctx.maybe_error
and not ctx._final_result_is_set() and
not ctx._final_result_is_set()
): ):
try: try:
# receive all msgs, scanning for either a final result # receive all msgs, scanning for either a final result
@ -631,6 +654,11 @@ async def drain_to_final_msg(
) )
__tracebackhide__: bool = False __tracebackhide__: bool = False
else:
log.cancel(
f'IPC ctx cancelled externally during result drain ?\n'
f'{ctx}'
)
# CASE 2: mask the local cancelled-error(s) # CASE 2: mask the local cancelled-error(s)
# only when we are sure the remote error is # only when we are sure the remote error is
# the source cause of this local task's # the source cause of this local task's
@ -662,17 +690,24 @@ async def drain_to_final_msg(
case Yield(): case Yield():
pre_result_drained.append(msg) pre_result_drained.append(msg)
if ( if (
not parent_never_opened_stream
and (
(ctx._stream.closed (ctx._stream.closed
and (reason := 'stream was already closed') and
) (reason := 'stream was already closed')
or (ctx.cancel_acked ) or
and (reason := 'ctx cancelled other side') (ctx.cancel_acked
and
(reason := 'ctx cancelled other side')
) )
or (ctx._cancel_called or (ctx._cancel_called
and (reason := 'ctx called `.cancel()`') and
(reason := 'ctx called `.cancel()`')
) )
or (len(pre_result_drained) > msg_limit or (len(pre_result_drained) > msg_limit
and (reason := f'"yield" limit={msg_limit}') and
(reason := f'"yield" limit={msg_limit}')
)
) )
): ):
log.cancel( log.cancel(
@ -690,7 +725,7 @@ async def drain_to_final_msg(
# drain up to the `msg_limit` hoping to get # drain up to the `msg_limit` hoping to get
# a final result or error/ctxc. # a final result or error/ctxc.
else: else:
log.warning( report: str = (
'Ignoring "yield" msg during `ctx.result()` drain..\n' 'Ignoring "yield" msg during `ctx.result()` drain..\n'
f'<= {ctx.chan.uid}\n' f'<= {ctx.chan.uid}\n'
f' |_{ctx._nsf}()\n\n' f' |_{ctx._nsf}()\n\n'
@ -699,6 +734,14 @@ async def drain_to_final_msg(
f'{pretty_struct.pformat(msg)}\n' f'{pretty_struct.pformat(msg)}\n'
) )
if parent_never_opened_stream:
report = (
f'IPC ctx never opened stream on {ctx.side!r}-side!\n'
f'\n'
# f'{ctx}\n'
) + report
log.warning(report)
continue continue
# stream terminated, but no result yet.. # stream terminated, but no result yet..
@ -790,6 +833,7 @@ async def drain_to_final_msg(
f'{ctx.outcome}\n' f'{ctx.outcome}\n'
) )
__tracebackhide__: bool = hide_tb
return ( return (
result_msg, result_msg,
pre_result_drained, pre_result_drained,