Compare commits
3 Commits
6a94a520d6
...
3a3fd36890
Author | SHA1 | Date |
---|---|---|
|
3a3fd36890 | |
|
f0561fc8c0 | |
|
d1abe4da44 |
|
@ -443,7 +443,6 @@ def test_caller_cancels(
|
|||
|
||||
@tractor.context
|
||||
async def close_ctx_immediately(
|
||||
|
||||
ctx: Context,
|
||||
|
||||
) -> None:
|
||||
|
@ -454,10 +453,21 @@ async def close_ctx_immediately(
|
|||
async with ctx.open_stream():
|
||||
pass
|
||||
|
||||
print('child returning!')
|
||||
|
||||
|
||||
@pytest.mark.parametrize(
|
||||
'parent_send_before_receive',
|
||||
[
|
||||
False,
|
||||
True,
|
||||
],
|
||||
ids=lambda item: f'child_send_before_receive={item}'
|
||||
)
|
||||
@tractor_test
|
||||
async def test_callee_closes_ctx_after_stream_open(
|
||||
async def test_child_exits_ctx_after_stream_open(
|
||||
debug_mode: bool,
|
||||
parent_send_before_receive: bool,
|
||||
):
|
||||
'''
|
||||
callee context closes without using stream.
|
||||
|
@ -474,6 +484,15 @@ async def test_callee_closes_ctx_after_stream_open(
|
|||
=> {'stop': True, 'cid': <str>}
|
||||
|
||||
'''
|
||||
timeout: float = (
|
||||
0.5 if (
|
||||
not debug_mode
|
||||
# NOTE, for debugging final
|
||||
# Return-consumed-n-discarded-ishue!
|
||||
# and
|
||||
# not parent_send_before_receive
|
||||
) else 999
|
||||
)
|
||||
async with tractor.open_nursery(
|
||||
debug_mode=debug_mode,
|
||||
) as an:
|
||||
|
@ -482,7 +501,7 @@ async def test_callee_closes_ctx_after_stream_open(
|
|||
enable_modules=[__name__],
|
||||
)
|
||||
|
||||
with trio.fail_after(0.5):
|
||||
with trio.fail_after(timeout):
|
||||
async with portal.open_context(
|
||||
close_ctx_immediately,
|
||||
|
||||
|
@ -494,41 +513,56 @@ async def test_callee_closes_ctx_after_stream_open(
|
|||
|
||||
with trio.fail_after(0.4):
|
||||
async with ctx.open_stream() as stream:
|
||||
if parent_send_before_receive:
|
||||
print('sending first msg from parent!')
|
||||
await stream.send('yo')
|
||||
|
||||
# should fall through since ``StopAsyncIteration``
|
||||
# should be raised through translation of
|
||||
# a ``trio.EndOfChannel`` by
|
||||
# ``trio.abc.ReceiveChannel.__anext__()``
|
||||
async for _ in stream:
|
||||
msg = 10
|
||||
async for msg in stream:
|
||||
# trigger failure if we DO NOT
|
||||
# get an EOC!
|
||||
assert 0
|
||||
else:
|
||||
# never should get anythinig new from
|
||||
# the underlying stream
|
||||
assert msg == 10
|
||||
|
||||
# verify stream is now closed
|
||||
try:
|
||||
with trio.fail_after(0.3):
|
||||
print('parent trying to `.receive()` on EoC stream!')
|
||||
await stream.receive()
|
||||
assert 0, 'should have raised eoc!?'
|
||||
except trio.EndOfChannel:
|
||||
print('parent got EoC as expected!')
|
||||
pass
|
||||
# raise
|
||||
|
||||
# TODO: should be just raise the closed resource err
|
||||
# directly here to enforce not allowing a re-open
|
||||
# of a stream to the context (at least until a time of
|
||||
# if/when we decide that's a good idea?)
|
||||
try:
|
||||
with trio.fail_after(0.5):
|
||||
with trio.fail_after(timeout):
|
||||
async with ctx.open_stream() as stream:
|
||||
pass
|
||||
except trio.ClosedResourceError:
|
||||
pass
|
||||
|
||||
# if ctx._rx_chan._state.data:
|
||||
# await tractor.pause()
|
||||
|
||||
await portal.cancel_actor()
|
||||
|
||||
|
||||
@tractor.context
|
||||
async def expect_cancelled(
|
||||
ctx: Context,
|
||||
send_before_receive: bool = False,
|
||||
|
||||
) -> None:
|
||||
global _state
|
||||
|
@ -538,6 +572,10 @@ async def expect_cancelled(
|
|||
|
||||
try:
|
||||
async with ctx.open_stream() as stream:
|
||||
|
||||
if send_before_receive:
|
||||
await stream.send('yo')
|
||||
|
||||
async for msg in stream:
|
||||
await stream.send(msg) # echo server
|
||||
|
||||
|
@ -567,23 +605,46 @@ async def expect_cancelled(
|
|||
assert 0, "callee wasn't cancelled !?"
|
||||
|
||||
|
||||
@pytest.mark.parametrize(
|
||||
'child_send_before_receive',
|
||||
[
|
||||
False,
|
||||
True,
|
||||
],
|
||||
ids=lambda item: f'child_send_before_receive={item}'
|
||||
)
|
||||
@pytest.mark.parametrize(
|
||||
'rent_wait_for_msg',
|
||||
[
|
||||
False,
|
||||
True,
|
||||
],
|
||||
ids=lambda item: f'rent_wait_for_msg={item}'
|
||||
)
|
||||
@pytest.mark.parametrize(
|
||||
'use_ctx_cancel_method',
|
||||
[False, True],
|
||||
[
|
||||
False,
|
||||
'pre_stream',
|
||||
'post_stream_open',
|
||||
'post_stream_close',
|
||||
],
|
||||
ids=lambda item: f'use_ctx_cancel_method={item}'
|
||||
)
|
||||
@tractor_test
|
||||
async def test_caller_closes_ctx_after_callee_opens_stream(
|
||||
use_ctx_cancel_method: bool,
|
||||
async def test_parent_exits_ctx_after_child_enters_stream(
|
||||
use_ctx_cancel_method: bool|str,
|
||||
debug_mode: bool,
|
||||
rent_wait_for_msg: bool,
|
||||
child_send_before_receive: bool,
|
||||
):
|
||||
'''
|
||||
caller context closes without using/opening stream
|
||||
Parent-side of IPC context closes without sending on `MsgStream`.
|
||||
|
||||
'''
|
||||
async with tractor.open_nursery(
|
||||
debug_mode=debug_mode,
|
||||
) as an:
|
||||
|
||||
root: Actor = current_actor()
|
||||
portal = await an.start_actor(
|
||||
'ctx_cancelled',
|
||||
|
@ -592,26 +653,45 @@ async def test_caller_closes_ctx_after_callee_opens_stream(
|
|||
|
||||
async with portal.open_context(
|
||||
expect_cancelled,
|
||||
send_before_receive=child_send_before_receive,
|
||||
) as (ctx, sent):
|
||||
assert sent is None
|
||||
|
||||
await portal.run(assert_state, value=True)
|
||||
|
||||
# call `ctx.cancel()` explicitly
|
||||
if use_ctx_cancel_method:
|
||||
if use_ctx_cancel_method == 'pre_stream':
|
||||
await ctx.cancel()
|
||||
|
||||
# NOTE: means the local side `ctx._scope` will
|
||||
# have been cancelled by an ctxc ack and thus
|
||||
# `._scope.cancelled_caught` should be set.
|
||||
try:
|
||||
async with ctx.open_stream() as stream:
|
||||
async for msg in stream:
|
||||
pass
|
||||
|
||||
except tractor.ContextCancelled as ctxc:
|
||||
async with (
|
||||
expect_ctxc(
|
||||
# XXX: the cause is US since we call
|
||||
# `Context.cancel()` just above!
|
||||
yay=True,
|
||||
|
||||
# XXX: must be propagated to __aexit__
|
||||
# and should be silently absorbed there
|
||||
# since we called `.cancel()` just above ;)
|
||||
reraise=True,
|
||||
) as maybe_ctxc,
|
||||
):
|
||||
async with ctx.open_stream() as stream:
|
||||
|
||||
if rent_wait_for_msg:
|
||||
async for msg in stream:
|
||||
print(f'PARENT rx: {msg!r}\n')
|
||||
break
|
||||
|
||||
if use_ctx_cancel_method == 'post_stream_open':
|
||||
await ctx.cancel()
|
||||
|
||||
if use_ctx_cancel_method == 'post_stream_close':
|
||||
await ctx.cancel()
|
||||
|
||||
ctxc: tractor.ContextCancelled = maybe_ctxc.value
|
||||
assert (
|
||||
ctxc.canceller
|
||||
==
|
||||
|
@ -620,14 +700,6 @@ async def test_caller_closes_ctx_after_callee_opens_stream(
|
|||
root.uid
|
||||
)
|
||||
|
||||
# XXX: must be propagated to __aexit__
|
||||
# and should be silently absorbed there
|
||||
# since we called `.cancel()` just above ;)
|
||||
raise
|
||||
|
||||
else:
|
||||
assert 0, "Should have context cancelled?"
|
||||
|
||||
# channel should still be up
|
||||
assert portal.channel.connected()
|
||||
|
||||
|
@ -637,13 +709,20 @@ async def test_caller_closes_ctx_after_callee_opens_stream(
|
|||
value=False,
|
||||
)
|
||||
|
||||
# XXX CHILD-BLOCKS case, we SHOULD NOT exit from the
|
||||
# `.open_context()` before the child has returned,
|
||||
# errored or been cancelled!
|
||||
else:
|
||||
try:
|
||||
with trio.fail_after(0.2):
|
||||
await ctx.result()
|
||||
with trio.fail_after(
|
||||
0.5 # if not debug_mode else 999
|
||||
):
|
||||
res = await ctx.wait_for_result()
|
||||
assert res is not tractor._context.Unresolved
|
||||
assert 0, "Callee should have blocked!?"
|
||||
except trio.TooSlowError:
|
||||
# NO-OP -> since already called above
|
||||
# NO-OP -> since already triggered by
|
||||
# `trio.fail_after()` above!
|
||||
await ctx.cancel()
|
||||
|
||||
# NOTE: local scope should have absorbed the cancellation since
|
||||
|
@ -683,7 +762,7 @@ async def test_caller_closes_ctx_after_callee_opens_stream(
|
|||
|
||||
|
||||
@tractor_test
|
||||
async def test_multitask_caller_cancels_from_nonroot_task(
|
||||
async def test_multitask_parent_cancels_from_nonroot_task(
|
||||
debug_mode: bool,
|
||||
):
|
||||
async with tractor.open_nursery(
|
||||
|
@ -735,7 +814,6 @@ async def test_multitask_caller_cancels_from_nonroot_task(
|
|||
|
||||
@tractor.context
|
||||
async def cancel_self(
|
||||
|
||||
ctx: Context,
|
||||
|
||||
) -> None:
|
||||
|
@ -775,7 +853,7 @@ async def cancel_self(
|
|||
|
||||
|
||||
@tractor_test
|
||||
async def test_callee_cancels_before_started(
|
||||
async def test_child_cancels_before_started(
|
||||
debug_mode: bool,
|
||||
):
|
||||
'''
|
||||
|
@ -826,8 +904,7 @@ async def never_open_stream(
|
|||
|
||||
|
||||
@tractor.context
|
||||
async def keep_sending_from_callee(
|
||||
|
||||
async def keep_sending_from_child(
|
||||
ctx: Context,
|
||||
msg_buffer_size: int|None = None,
|
||||
|
||||
|
@ -850,7 +927,7 @@ async def keep_sending_from_callee(
|
|||
'overrun_by',
|
||||
[
|
||||
('caller', 1, never_open_stream),
|
||||
('callee', 0, keep_sending_from_callee),
|
||||
('callee', 0, keep_sending_from_child),
|
||||
],
|
||||
ids=[
|
||||
('caller_1buf_never_open_stream'),
|
||||
|
@ -931,7 +1008,6 @@ def test_one_end_stream_not_opened(
|
|||
|
||||
@tractor.context
|
||||
async def echo_back_sequence(
|
||||
|
||||
ctx: Context,
|
||||
seq: list[int],
|
||||
wait_for_cancel: bool,
|
||||
|
|
|
@ -26,6 +26,9 @@ import os
|
|||
import pathlib
|
||||
|
||||
import tractor
|
||||
from tractor.devx._debug import (
|
||||
BoxedMaybeException,
|
||||
)
|
||||
from .pytest import (
|
||||
tractor_test as tractor_test
|
||||
)
|
||||
|
@ -98,12 +101,13 @@ async def expect_ctxc(
|
|||
'''
|
||||
if yay:
|
||||
try:
|
||||
yield
|
||||
yield (maybe_exc := BoxedMaybeException())
|
||||
raise RuntimeError('Never raised ctxc?')
|
||||
except tractor.ContextCancelled:
|
||||
except tractor.ContextCancelled as ctxc:
|
||||
maybe_exc.value = ctxc
|
||||
if reraise:
|
||||
raise
|
||||
else:
|
||||
return
|
||||
else:
|
||||
yield
|
||||
yield (maybe_exc := BoxedMaybeException())
|
||||
|
|
|
@ -186,10 +186,16 @@ class PldRx(Struct):
|
|||
msg: MsgType = (
|
||||
ipc_msg
|
||||
or
|
||||
|
||||
# sync-rx msg from underlying IPC feeder (mem-)chan
|
||||
ipc._rx_chan.receive_nowait()
|
||||
)
|
||||
if (
|
||||
type(msg) is Return
|
||||
):
|
||||
log.info(
|
||||
f'Rxed final result msg\n'
|
||||
f'{msg}\n'
|
||||
)
|
||||
return self.decode_pld(
|
||||
msg,
|
||||
ipc=ipc,
|
||||
|
@ -219,6 +225,13 @@ class PldRx(Struct):
|
|||
# async-rx msg from underlying IPC feeder (mem-)chan
|
||||
await ipc._rx_chan.receive()
|
||||
)
|
||||
if (
|
||||
type(msg) is Return
|
||||
):
|
||||
log.info(
|
||||
f'Rxed final result msg\n'
|
||||
f'{msg}\n'
|
||||
)
|
||||
return self.decode_pld(
|
||||
msg=msg,
|
||||
ipc=ipc,
|
||||
|
@ -407,8 +420,6 @@ class PldRx(Struct):
|
|||
__tracebackhide__: bool = False
|
||||
raise
|
||||
|
||||
dec_msg = decode_pld
|
||||
|
||||
async def recv_msg_w_pld(
|
||||
self,
|
||||
ipc: Context|MsgStream,
|
||||
|
@ -422,12 +433,19 @@ class PldRx(Struct):
|
|||
|
||||
) -> tuple[MsgType, PayloadT]:
|
||||
'''
|
||||
Retrieve the next avail IPC msg, decode it's payload, and return
|
||||
the pair of refs.
|
||||
Retrieve the next avail IPC msg, decode it's payload, and
|
||||
return the pair of refs.
|
||||
|
||||
'''
|
||||
__tracebackhide__: bool = hide_tb
|
||||
msg: MsgType = await ipc._rx_chan.receive()
|
||||
if (
|
||||
type(msg) is Return
|
||||
):
|
||||
log.info(
|
||||
f'Rxed final result msg\n'
|
||||
f'{msg}\n'
|
||||
)
|
||||
|
||||
if passthrough_non_pld_msgs:
|
||||
match msg:
|
||||
|
@ -444,6 +462,10 @@ class PldRx(Struct):
|
|||
hide_tb=hide_tb,
|
||||
**kwargs,
|
||||
)
|
||||
# log.runtime(
|
||||
# f'Delivering payload msg\n'
|
||||
# f'{msg}\n'
|
||||
# )
|
||||
return msg, pld
|
||||
|
||||
|
||||
|
@ -538,8 +560,8 @@ async def maybe_limit_plds(
|
|||
async def drain_to_final_msg(
|
||||
ctx: Context,
|
||||
|
||||
hide_tb: bool = True,
|
||||
msg_limit: int = 6,
|
||||
hide_tb: bool = True,
|
||||
|
||||
) -> tuple[
|
||||
Return|None,
|
||||
|
@ -568,8 +590,8 @@ async def drain_to_final_msg(
|
|||
even after ctx closure and the `.open_context()` block exit.
|
||||
|
||||
'''
|
||||
__tracebackhide__: bool = hide_tb
|
||||
raise_overrun: bool = not ctx._allow_overruns
|
||||
parent_never_opened_stream: bool = ctx._stream is None
|
||||
|
||||
# wait for a final context result by collecting (but
|
||||
# basically ignoring) any bi-dir-stream msgs still in transit
|
||||
|
@ -578,7 +600,8 @@ async def drain_to_final_msg(
|
|||
result_msg: Return|Error|None = None
|
||||
while not (
|
||||
ctx.maybe_error
|
||||
and not ctx._final_result_is_set()
|
||||
and
|
||||
not ctx._final_result_is_set()
|
||||
):
|
||||
try:
|
||||
# receive all msgs, scanning for either a final result
|
||||
|
@ -631,6 +654,11 @@ async def drain_to_final_msg(
|
|||
)
|
||||
__tracebackhide__: bool = False
|
||||
|
||||
else:
|
||||
log.cancel(
|
||||
f'IPC ctx cancelled externally during result drain ?\n'
|
||||
f'{ctx}'
|
||||
)
|
||||
# CASE 2: mask the local cancelled-error(s)
|
||||
# only when we are sure the remote error is
|
||||
# the source cause of this local task's
|
||||
|
@ -662,17 +690,24 @@ async def drain_to_final_msg(
|
|||
case Yield():
|
||||
pre_result_drained.append(msg)
|
||||
if (
|
||||
not parent_never_opened_stream
|
||||
and (
|
||||
(ctx._stream.closed
|
||||
and (reason := 'stream was already closed')
|
||||
)
|
||||
or (ctx.cancel_acked
|
||||
and (reason := 'ctx cancelled other side')
|
||||
and
|
||||
(reason := 'stream was already closed')
|
||||
) or
|
||||
(ctx.cancel_acked
|
||||
and
|
||||
(reason := 'ctx cancelled other side')
|
||||
)
|
||||
or (ctx._cancel_called
|
||||
and (reason := 'ctx called `.cancel()`')
|
||||
and
|
||||
(reason := 'ctx called `.cancel()`')
|
||||
)
|
||||
or (len(pre_result_drained) > msg_limit
|
||||
and (reason := f'"yield" limit={msg_limit}')
|
||||
and
|
||||
(reason := f'"yield" limit={msg_limit}')
|
||||
)
|
||||
)
|
||||
):
|
||||
log.cancel(
|
||||
|
@ -690,7 +725,7 @@ async def drain_to_final_msg(
|
|||
# drain up to the `msg_limit` hoping to get
|
||||
# a final result or error/ctxc.
|
||||
else:
|
||||
log.warning(
|
||||
report: str = (
|
||||
'Ignoring "yield" msg during `ctx.result()` drain..\n'
|
||||
f'<= {ctx.chan.uid}\n'
|
||||
f' |_{ctx._nsf}()\n\n'
|
||||
|
@ -699,6 +734,14 @@ async def drain_to_final_msg(
|
|||
|
||||
f'{pretty_struct.pformat(msg)}\n'
|
||||
)
|
||||
if parent_never_opened_stream:
|
||||
report = (
|
||||
f'IPC ctx never opened stream on {ctx.side!r}-side!\n'
|
||||
f'\n'
|
||||
# f'{ctx}\n'
|
||||
) + report
|
||||
|
||||
log.warning(report)
|
||||
continue
|
||||
|
||||
# stream terminated, but no result yet..
|
||||
|
@ -790,6 +833,7 @@ async def drain_to_final_msg(
|
|||
f'{ctx.outcome}\n'
|
||||
)
|
||||
|
||||
__tracebackhide__: bool = hide_tb
|
||||
return (
|
||||
result_msg,
|
||||
pre_result_drained,
|
||||
|
|
Loading…
Reference in New Issue