Compare commits

..

No commits in common. "3a3fd36890720ef6b7de09d7f1718073efeb7eed" and "6a94a520d6c96e9312a1a14210b0047d01b6f8cc" have entirely different histories.

3 changed files with 64 additions and 188 deletions

View File

@ -443,6 +443,7 @@ def test_caller_cancels(
@tractor.context @tractor.context
async def close_ctx_immediately( async def close_ctx_immediately(
ctx: Context, ctx: Context,
) -> None: ) -> None:
@ -453,21 +454,10 @@ async def close_ctx_immediately(
async with ctx.open_stream(): async with ctx.open_stream():
pass pass
print('child returning!')
@pytest.mark.parametrize(
'parent_send_before_receive',
[
False,
True,
],
ids=lambda item: f'child_send_before_receive={item}'
)
@tractor_test @tractor_test
async def test_child_exits_ctx_after_stream_open( async def test_callee_closes_ctx_after_stream_open(
debug_mode: bool, debug_mode: bool,
parent_send_before_receive: bool,
): ):
''' '''
callee context closes without using stream. callee context closes without using stream.
@ -484,15 +474,6 @@ async def test_child_exits_ctx_after_stream_open(
=> {'stop': True, 'cid': <str>} => {'stop': True, 'cid': <str>}
''' '''
timeout: float = (
0.5 if (
not debug_mode
# NOTE, for debugging final
# Return-consumed-n-discarded-ishue!
# and
# not parent_send_before_receive
) else 999
)
async with tractor.open_nursery( async with tractor.open_nursery(
debug_mode=debug_mode, debug_mode=debug_mode,
) as an: ) as an:
@ -501,7 +482,7 @@ async def test_child_exits_ctx_after_stream_open(
enable_modules=[__name__], enable_modules=[__name__],
) )
with trio.fail_after(timeout): with trio.fail_after(0.5):
async with portal.open_context( async with portal.open_context(
close_ctx_immediately, close_ctx_immediately,
@ -513,56 +494,41 @@ async def test_child_exits_ctx_after_stream_open(
with trio.fail_after(0.4): with trio.fail_after(0.4):
async with ctx.open_stream() as stream: async with ctx.open_stream() as stream:
if parent_send_before_receive:
print('sending first msg from parent!')
await stream.send('yo')
# should fall through since ``StopAsyncIteration`` # should fall through since ``StopAsyncIteration``
# should be raised through translation of # should be raised through translation of
# a ``trio.EndOfChannel`` by # a ``trio.EndOfChannel`` by
# ``trio.abc.ReceiveChannel.__anext__()`` # ``trio.abc.ReceiveChannel.__anext__()``
msg = 10 async for _ in stream:
async for msg in stream:
# trigger failure if we DO NOT # trigger failure if we DO NOT
# get an EOC! # get an EOC!
assert 0 assert 0
else: else:
# never should get anythinig new from
# the underlying stream
assert msg == 10
# verify stream is now closed # verify stream is now closed
try: try:
with trio.fail_after(0.3): with trio.fail_after(0.3):
print('parent trying to `.receive()` on EoC stream!')
await stream.receive() await stream.receive()
assert 0, 'should have raised eoc!?'
except trio.EndOfChannel: except trio.EndOfChannel:
print('parent got EoC as expected!')
pass pass
# raise
# TODO: should be just raise the closed resource err # TODO: should be just raise the closed resource err
# directly here to enforce not allowing a re-open # directly here to enforce not allowing a re-open
# of a stream to the context (at least until a time of # of a stream to the context (at least until a time of
# if/when we decide that's a good idea?) # if/when we decide that's a good idea?)
try: try:
with trio.fail_after(timeout): with trio.fail_after(0.5):
async with ctx.open_stream() as stream: async with ctx.open_stream() as stream:
pass pass
except trio.ClosedResourceError: except trio.ClosedResourceError:
pass pass
# if ctx._rx_chan._state.data:
# await tractor.pause()
await portal.cancel_actor() await portal.cancel_actor()
@tractor.context @tractor.context
async def expect_cancelled( async def expect_cancelled(
ctx: Context, ctx: Context,
send_before_receive: bool = False,
) -> None: ) -> None:
global _state global _state
@ -572,10 +538,6 @@ async def expect_cancelled(
try: try:
async with ctx.open_stream() as stream: async with ctx.open_stream() as stream:
if send_before_receive:
await stream.send('yo')
async for msg in stream: async for msg in stream:
await stream.send(msg) # echo server await stream.send(msg) # echo server
@ -605,46 +567,23 @@ async def expect_cancelled(
assert 0, "callee wasn't cancelled !?" assert 0, "callee wasn't cancelled !?"
@pytest.mark.parametrize(
'child_send_before_receive',
[
False,
True,
],
ids=lambda item: f'child_send_before_receive={item}'
)
@pytest.mark.parametrize(
'rent_wait_for_msg',
[
False,
True,
],
ids=lambda item: f'rent_wait_for_msg={item}'
)
@pytest.mark.parametrize( @pytest.mark.parametrize(
'use_ctx_cancel_method', 'use_ctx_cancel_method',
[ [False, True],
False,
'pre_stream',
'post_stream_open',
'post_stream_close',
],
ids=lambda item: f'use_ctx_cancel_method={item}'
) )
@tractor_test @tractor_test
async def test_parent_exits_ctx_after_child_enters_stream( async def test_caller_closes_ctx_after_callee_opens_stream(
use_ctx_cancel_method: bool|str, use_ctx_cancel_method: bool,
debug_mode: bool, debug_mode: bool,
rent_wait_for_msg: bool,
child_send_before_receive: bool,
): ):
''' '''
Parent-side of IPC context closes without sending on `MsgStream`. caller context closes without using/opening stream
''' '''
async with tractor.open_nursery( async with tractor.open_nursery(
debug_mode=debug_mode, debug_mode=debug_mode,
) as an: ) as an:
root: Actor = current_actor() root: Actor = current_actor()
portal = await an.start_actor( portal = await an.start_actor(
'ctx_cancelled', 'ctx_cancelled',
@ -653,45 +592,26 @@ async def test_parent_exits_ctx_after_child_enters_stream(
async with portal.open_context( async with portal.open_context(
expect_cancelled, expect_cancelled,
send_before_receive=child_send_before_receive,
) as (ctx, sent): ) as (ctx, sent):
assert sent is None assert sent is None
await portal.run(assert_state, value=True) await portal.run(assert_state, value=True)
# call `ctx.cancel()` explicitly # call `ctx.cancel()` explicitly
if use_ctx_cancel_method == 'pre_stream': if use_ctx_cancel_method:
await ctx.cancel() await ctx.cancel()
# NOTE: means the local side `ctx._scope` will # NOTE: means the local side `ctx._scope` will
# have been cancelled by an ctxc ack and thus # have been cancelled by an ctxc ack and thus
# `._scope.cancelled_caught` should be set. # `._scope.cancelled_caught` should be set.
async with ( try:
expect_ctxc( async with ctx.open_stream() as stream:
async for msg in stream:
pass
except tractor.ContextCancelled as ctxc:
# XXX: the cause is US since we call # XXX: the cause is US since we call
# `Context.cancel()` just above! # `Context.cancel()` just above!
yay=True,
# XXX: must be propagated to __aexit__
# and should be silently absorbed there
# since we called `.cancel()` just above ;)
reraise=True,
) as maybe_ctxc,
):
async with ctx.open_stream() as stream:
if rent_wait_for_msg:
async for msg in stream:
print(f'PARENT rx: {msg!r}\n')
break
if use_ctx_cancel_method == 'post_stream_open':
await ctx.cancel()
if use_ctx_cancel_method == 'post_stream_close':
await ctx.cancel()
ctxc: tractor.ContextCancelled = maybe_ctxc.value
assert ( assert (
ctxc.canceller ctxc.canceller
== ==
@ -700,6 +620,14 @@ async def test_parent_exits_ctx_after_child_enters_stream(
root.uid root.uid
) )
# XXX: must be propagated to __aexit__
# and should be silently absorbed there
# since we called `.cancel()` just above ;)
raise
else:
assert 0, "Should have context cancelled?"
# channel should still be up # channel should still be up
assert portal.channel.connected() assert portal.channel.connected()
@ -709,20 +637,13 @@ async def test_parent_exits_ctx_after_child_enters_stream(
value=False, value=False,
) )
# XXX CHILD-BLOCKS case, we SHOULD NOT exit from the
# `.open_context()` before the child has returned,
# errored or been cancelled!
else: else:
try: try:
with trio.fail_after( with trio.fail_after(0.2):
0.5 # if not debug_mode else 999 await ctx.result()
):
res = await ctx.wait_for_result()
assert res is not tractor._context.Unresolved
assert 0, "Callee should have blocked!?" assert 0, "Callee should have blocked!?"
except trio.TooSlowError: except trio.TooSlowError:
# NO-OP -> since already triggered by # NO-OP -> since already called above
# `trio.fail_after()` above!
await ctx.cancel() await ctx.cancel()
# NOTE: local scope should have absorbed the cancellation since # NOTE: local scope should have absorbed the cancellation since
@ -762,7 +683,7 @@ async def test_parent_exits_ctx_after_child_enters_stream(
@tractor_test @tractor_test
async def test_multitask_parent_cancels_from_nonroot_task( async def test_multitask_caller_cancels_from_nonroot_task(
debug_mode: bool, debug_mode: bool,
): ):
async with tractor.open_nursery( async with tractor.open_nursery(
@ -814,6 +735,7 @@ async def test_multitask_parent_cancels_from_nonroot_task(
@tractor.context @tractor.context
async def cancel_self( async def cancel_self(
ctx: Context, ctx: Context,
) -> None: ) -> None:
@ -853,7 +775,7 @@ async def cancel_self(
@tractor_test @tractor_test
async def test_child_cancels_before_started( async def test_callee_cancels_before_started(
debug_mode: bool, debug_mode: bool,
): ):
''' '''
@ -904,7 +826,8 @@ async def never_open_stream(
@tractor.context @tractor.context
async def keep_sending_from_child( async def keep_sending_from_callee(
ctx: Context, ctx: Context,
msg_buffer_size: int|None = None, msg_buffer_size: int|None = None,
@ -927,7 +850,7 @@ async def keep_sending_from_child(
'overrun_by', 'overrun_by',
[ [
('caller', 1, never_open_stream), ('caller', 1, never_open_stream),
('callee', 0, keep_sending_from_child), ('callee', 0, keep_sending_from_callee),
], ],
ids=[ ids=[
('caller_1buf_never_open_stream'), ('caller_1buf_never_open_stream'),
@ -1008,6 +931,7 @@ def test_one_end_stream_not_opened(
@tractor.context @tractor.context
async def echo_back_sequence( async def echo_back_sequence(
ctx: Context, ctx: Context,
seq: list[int], seq: list[int],
wait_for_cancel: bool, wait_for_cancel: bool,

View File

@ -26,9 +26,6 @@ import os
import pathlib import pathlib
import tractor import tractor
from tractor.devx._debug import (
BoxedMaybeException,
)
from .pytest import ( from .pytest import (
tractor_test as tractor_test tractor_test as tractor_test
) )
@ -101,13 +98,12 @@ async def expect_ctxc(
''' '''
if yay: if yay:
try: try:
yield (maybe_exc := BoxedMaybeException()) yield
raise RuntimeError('Never raised ctxc?') raise RuntimeError('Never raised ctxc?')
except tractor.ContextCancelled as ctxc: except tractor.ContextCancelled:
maybe_exc.value = ctxc
if reraise: if reraise:
raise raise
else: else:
return return
else: else:
yield (maybe_exc := BoxedMaybeException()) yield

View File

@ -186,16 +186,10 @@ class PldRx(Struct):
msg: MsgType = ( msg: MsgType = (
ipc_msg ipc_msg
or or
# sync-rx msg from underlying IPC feeder (mem-)chan # sync-rx msg from underlying IPC feeder (mem-)chan
ipc._rx_chan.receive_nowait() ipc._rx_chan.receive_nowait()
) )
if (
type(msg) is Return
):
log.info(
f'Rxed final result msg\n'
f'{msg}\n'
)
return self.decode_pld( return self.decode_pld(
msg, msg,
ipc=ipc, ipc=ipc,
@ -225,13 +219,6 @@ class PldRx(Struct):
# async-rx msg from underlying IPC feeder (mem-)chan # async-rx msg from underlying IPC feeder (mem-)chan
await ipc._rx_chan.receive() await ipc._rx_chan.receive()
) )
if (
type(msg) is Return
):
log.info(
f'Rxed final result msg\n'
f'{msg}\n'
)
return self.decode_pld( return self.decode_pld(
msg=msg, msg=msg,
ipc=ipc, ipc=ipc,
@ -420,6 +407,8 @@ class PldRx(Struct):
__tracebackhide__: bool = False __tracebackhide__: bool = False
raise raise
dec_msg = decode_pld
async def recv_msg_w_pld( async def recv_msg_w_pld(
self, self,
ipc: Context|MsgStream, ipc: Context|MsgStream,
@ -433,19 +422,12 @@ class PldRx(Struct):
) -> tuple[MsgType, PayloadT]: ) -> tuple[MsgType, PayloadT]:
''' '''
Retrieve the next avail IPC msg, decode it's payload, and Retrieve the next avail IPC msg, decode it's payload, and return
return the pair of refs. the pair of refs.
''' '''
__tracebackhide__: bool = hide_tb __tracebackhide__: bool = hide_tb
msg: MsgType = await ipc._rx_chan.receive() msg: MsgType = await ipc._rx_chan.receive()
if (
type(msg) is Return
):
log.info(
f'Rxed final result msg\n'
f'{msg}\n'
)
if passthrough_non_pld_msgs: if passthrough_non_pld_msgs:
match msg: match msg:
@ -462,10 +444,6 @@ class PldRx(Struct):
hide_tb=hide_tb, hide_tb=hide_tb,
**kwargs, **kwargs,
) )
# log.runtime(
# f'Delivering payload msg\n'
# f'{msg}\n'
# )
return msg, pld return msg, pld
@ -560,8 +538,8 @@ async def maybe_limit_plds(
async def drain_to_final_msg( async def drain_to_final_msg(
ctx: Context, ctx: Context,
msg_limit: int = 6,
hide_tb: bool = True, hide_tb: bool = True,
msg_limit: int = 6,
) -> tuple[ ) -> tuple[
Return|None, Return|None,
@ -590,8 +568,8 @@ async def drain_to_final_msg(
even after ctx closure and the `.open_context()` block exit. even after ctx closure and the `.open_context()` block exit.
''' '''
__tracebackhide__: bool = hide_tb
raise_overrun: bool = not ctx._allow_overruns raise_overrun: bool = not ctx._allow_overruns
parent_never_opened_stream: bool = ctx._stream is None
# wait for a final context result by collecting (but # wait for a final context result by collecting (but
# basically ignoring) any bi-dir-stream msgs still in transit # basically ignoring) any bi-dir-stream msgs still in transit
@ -600,8 +578,7 @@ async def drain_to_final_msg(
result_msg: Return|Error|None = None result_msg: Return|Error|None = None
while not ( while not (
ctx.maybe_error ctx.maybe_error
and and not ctx._final_result_is_set()
not ctx._final_result_is_set()
): ):
try: try:
# receive all msgs, scanning for either a final result # receive all msgs, scanning for either a final result
@ -654,11 +631,6 @@ async def drain_to_final_msg(
) )
__tracebackhide__: bool = False __tracebackhide__: bool = False
else:
log.cancel(
f'IPC ctx cancelled externally during result drain ?\n'
f'{ctx}'
)
# CASE 2: mask the local cancelled-error(s) # CASE 2: mask the local cancelled-error(s)
# only when we are sure the remote error is # only when we are sure the remote error is
# the source cause of this local task's # the source cause of this local task's
@ -690,24 +662,17 @@ async def drain_to_final_msg(
case Yield(): case Yield():
pre_result_drained.append(msg) pre_result_drained.append(msg)
if ( if (
not parent_never_opened_stream
and (
(ctx._stream.closed (ctx._stream.closed
and and (reason := 'stream was already closed')
(reason := 'stream was already closed') )
) or or (ctx.cancel_acked
(ctx.cancel_acked and (reason := 'ctx cancelled other side')
and
(reason := 'ctx cancelled other side')
) )
or (ctx._cancel_called or (ctx._cancel_called
and and (reason := 'ctx called `.cancel()`')
(reason := 'ctx called `.cancel()`')
) )
or (len(pre_result_drained) > msg_limit or (len(pre_result_drained) > msg_limit
and and (reason := f'"yield" limit={msg_limit}')
(reason := f'"yield" limit={msg_limit}')
)
) )
): ):
log.cancel( log.cancel(
@ -725,7 +690,7 @@ async def drain_to_final_msg(
# drain up to the `msg_limit` hoping to get # drain up to the `msg_limit` hoping to get
# a final result or error/ctxc. # a final result or error/ctxc.
else: else:
report: str = ( log.warning(
'Ignoring "yield" msg during `ctx.result()` drain..\n' 'Ignoring "yield" msg during `ctx.result()` drain..\n'
f'<= {ctx.chan.uid}\n' f'<= {ctx.chan.uid}\n'
f' |_{ctx._nsf}()\n\n' f' |_{ctx._nsf}()\n\n'
@ -734,14 +699,6 @@ async def drain_to_final_msg(
f'{pretty_struct.pformat(msg)}\n' f'{pretty_struct.pformat(msg)}\n'
) )
if parent_never_opened_stream:
report = (
f'IPC ctx never opened stream on {ctx.side!r}-side!\n'
f'\n'
# f'{ctx}\n'
) + report
log.warning(report)
continue continue
# stream terminated, but no result yet.. # stream terminated, but no result yet..
@ -833,7 +790,6 @@ async def drain_to_final_msg(
f'{ctx.outcome}\n' f'{ctx.outcome}\n'
) )
__tracebackhide__: bool = hide_tb
return ( return (
result_msg, result_msg,
pre_result_drained, pre_result_drained,