Compare commits
No commits in common. "3a3fd36890720ef6b7de09d7f1718073efeb7eed" and "6a94a520d6c96e9312a1a14210b0047d01b6f8cc" have entirely different histories.
3a3fd36890
...
6a94a520d6
|
@ -443,6 +443,7 @@ def test_caller_cancels(
|
||||||
|
|
||||||
@tractor.context
|
@tractor.context
|
||||||
async def close_ctx_immediately(
|
async def close_ctx_immediately(
|
||||||
|
|
||||||
ctx: Context,
|
ctx: Context,
|
||||||
|
|
||||||
) -> None:
|
) -> None:
|
||||||
|
@ -453,21 +454,10 @@ async def close_ctx_immediately(
|
||||||
async with ctx.open_stream():
|
async with ctx.open_stream():
|
||||||
pass
|
pass
|
||||||
|
|
||||||
print('child returning!')
|
|
||||||
|
|
||||||
|
|
||||||
@pytest.mark.parametrize(
|
|
||||||
'parent_send_before_receive',
|
|
||||||
[
|
|
||||||
False,
|
|
||||||
True,
|
|
||||||
],
|
|
||||||
ids=lambda item: f'child_send_before_receive={item}'
|
|
||||||
)
|
|
||||||
@tractor_test
|
@tractor_test
|
||||||
async def test_child_exits_ctx_after_stream_open(
|
async def test_callee_closes_ctx_after_stream_open(
|
||||||
debug_mode: bool,
|
debug_mode: bool,
|
||||||
parent_send_before_receive: bool,
|
|
||||||
):
|
):
|
||||||
'''
|
'''
|
||||||
callee context closes without using stream.
|
callee context closes without using stream.
|
||||||
|
@ -484,15 +474,6 @@ async def test_child_exits_ctx_after_stream_open(
|
||||||
=> {'stop': True, 'cid': <str>}
|
=> {'stop': True, 'cid': <str>}
|
||||||
|
|
||||||
'''
|
'''
|
||||||
timeout: float = (
|
|
||||||
0.5 if (
|
|
||||||
not debug_mode
|
|
||||||
# NOTE, for debugging final
|
|
||||||
# Return-consumed-n-discarded-ishue!
|
|
||||||
# and
|
|
||||||
# not parent_send_before_receive
|
|
||||||
) else 999
|
|
||||||
)
|
|
||||||
async with tractor.open_nursery(
|
async with tractor.open_nursery(
|
||||||
debug_mode=debug_mode,
|
debug_mode=debug_mode,
|
||||||
) as an:
|
) as an:
|
||||||
|
@ -501,7 +482,7 @@ async def test_child_exits_ctx_after_stream_open(
|
||||||
enable_modules=[__name__],
|
enable_modules=[__name__],
|
||||||
)
|
)
|
||||||
|
|
||||||
with trio.fail_after(timeout):
|
with trio.fail_after(0.5):
|
||||||
async with portal.open_context(
|
async with portal.open_context(
|
||||||
close_ctx_immediately,
|
close_ctx_immediately,
|
||||||
|
|
||||||
|
@ -513,56 +494,41 @@ async def test_child_exits_ctx_after_stream_open(
|
||||||
|
|
||||||
with trio.fail_after(0.4):
|
with trio.fail_after(0.4):
|
||||||
async with ctx.open_stream() as stream:
|
async with ctx.open_stream() as stream:
|
||||||
if parent_send_before_receive:
|
|
||||||
print('sending first msg from parent!')
|
|
||||||
await stream.send('yo')
|
|
||||||
|
|
||||||
# should fall through since ``StopAsyncIteration``
|
# should fall through since ``StopAsyncIteration``
|
||||||
# should be raised through translation of
|
# should be raised through translation of
|
||||||
# a ``trio.EndOfChannel`` by
|
# a ``trio.EndOfChannel`` by
|
||||||
# ``trio.abc.ReceiveChannel.__anext__()``
|
# ``trio.abc.ReceiveChannel.__anext__()``
|
||||||
msg = 10
|
async for _ in stream:
|
||||||
async for msg in stream:
|
|
||||||
# trigger failure if we DO NOT
|
# trigger failure if we DO NOT
|
||||||
# get an EOC!
|
# get an EOC!
|
||||||
assert 0
|
assert 0
|
||||||
else:
|
else:
|
||||||
# never should get anythinig new from
|
|
||||||
# the underlying stream
|
|
||||||
assert msg == 10
|
|
||||||
|
|
||||||
# verify stream is now closed
|
# verify stream is now closed
|
||||||
try:
|
try:
|
||||||
with trio.fail_after(0.3):
|
with trio.fail_after(0.3):
|
||||||
print('parent trying to `.receive()` on EoC stream!')
|
|
||||||
await stream.receive()
|
await stream.receive()
|
||||||
assert 0, 'should have raised eoc!?'
|
|
||||||
except trio.EndOfChannel:
|
except trio.EndOfChannel:
|
||||||
print('parent got EoC as expected!')
|
|
||||||
pass
|
pass
|
||||||
# raise
|
|
||||||
|
|
||||||
# TODO: should be just raise the closed resource err
|
# TODO: should be just raise the closed resource err
|
||||||
# directly here to enforce not allowing a re-open
|
# directly here to enforce not allowing a re-open
|
||||||
# of a stream to the context (at least until a time of
|
# of a stream to the context (at least until a time of
|
||||||
# if/when we decide that's a good idea?)
|
# if/when we decide that's a good idea?)
|
||||||
try:
|
try:
|
||||||
with trio.fail_after(timeout):
|
with trio.fail_after(0.5):
|
||||||
async with ctx.open_stream() as stream:
|
async with ctx.open_stream() as stream:
|
||||||
pass
|
pass
|
||||||
except trio.ClosedResourceError:
|
except trio.ClosedResourceError:
|
||||||
pass
|
pass
|
||||||
|
|
||||||
# if ctx._rx_chan._state.data:
|
|
||||||
# await tractor.pause()
|
|
||||||
|
|
||||||
await portal.cancel_actor()
|
await portal.cancel_actor()
|
||||||
|
|
||||||
|
|
||||||
@tractor.context
|
@tractor.context
|
||||||
async def expect_cancelled(
|
async def expect_cancelled(
|
||||||
ctx: Context,
|
ctx: Context,
|
||||||
send_before_receive: bool = False,
|
|
||||||
|
|
||||||
) -> None:
|
) -> None:
|
||||||
global _state
|
global _state
|
||||||
|
@ -572,10 +538,6 @@ async def expect_cancelled(
|
||||||
|
|
||||||
try:
|
try:
|
||||||
async with ctx.open_stream() as stream:
|
async with ctx.open_stream() as stream:
|
||||||
|
|
||||||
if send_before_receive:
|
|
||||||
await stream.send('yo')
|
|
||||||
|
|
||||||
async for msg in stream:
|
async for msg in stream:
|
||||||
await stream.send(msg) # echo server
|
await stream.send(msg) # echo server
|
||||||
|
|
||||||
|
@ -605,46 +567,23 @@ async def expect_cancelled(
|
||||||
assert 0, "callee wasn't cancelled !?"
|
assert 0, "callee wasn't cancelled !?"
|
||||||
|
|
||||||
|
|
||||||
@pytest.mark.parametrize(
|
|
||||||
'child_send_before_receive',
|
|
||||||
[
|
|
||||||
False,
|
|
||||||
True,
|
|
||||||
],
|
|
||||||
ids=lambda item: f'child_send_before_receive={item}'
|
|
||||||
)
|
|
||||||
@pytest.mark.parametrize(
|
|
||||||
'rent_wait_for_msg',
|
|
||||||
[
|
|
||||||
False,
|
|
||||||
True,
|
|
||||||
],
|
|
||||||
ids=lambda item: f'rent_wait_for_msg={item}'
|
|
||||||
)
|
|
||||||
@pytest.mark.parametrize(
|
@pytest.mark.parametrize(
|
||||||
'use_ctx_cancel_method',
|
'use_ctx_cancel_method',
|
||||||
[
|
[False, True],
|
||||||
False,
|
|
||||||
'pre_stream',
|
|
||||||
'post_stream_open',
|
|
||||||
'post_stream_close',
|
|
||||||
],
|
|
||||||
ids=lambda item: f'use_ctx_cancel_method={item}'
|
|
||||||
)
|
)
|
||||||
@tractor_test
|
@tractor_test
|
||||||
async def test_parent_exits_ctx_after_child_enters_stream(
|
async def test_caller_closes_ctx_after_callee_opens_stream(
|
||||||
use_ctx_cancel_method: bool|str,
|
use_ctx_cancel_method: bool,
|
||||||
debug_mode: bool,
|
debug_mode: bool,
|
||||||
rent_wait_for_msg: bool,
|
|
||||||
child_send_before_receive: bool,
|
|
||||||
):
|
):
|
||||||
'''
|
'''
|
||||||
Parent-side of IPC context closes without sending on `MsgStream`.
|
caller context closes without using/opening stream
|
||||||
|
|
||||||
'''
|
'''
|
||||||
async with tractor.open_nursery(
|
async with tractor.open_nursery(
|
||||||
debug_mode=debug_mode,
|
debug_mode=debug_mode,
|
||||||
) as an:
|
) as an:
|
||||||
|
|
||||||
root: Actor = current_actor()
|
root: Actor = current_actor()
|
||||||
portal = await an.start_actor(
|
portal = await an.start_actor(
|
||||||
'ctx_cancelled',
|
'ctx_cancelled',
|
||||||
|
@ -653,45 +592,26 @@ async def test_parent_exits_ctx_after_child_enters_stream(
|
||||||
|
|
||||||
async with portal.open_context(
|
async with portal.open_context(
|
||||||
expect_cancelled,
|
expect_cancelled,
|
||||||
send_before_receive=child_send_before_receive,
|
|
||||||
) as (ctx, sent):
|
) as (ctx, sent):
|
||||||
assert sent is None
|
assert sent is None
|
||||||
|
|
||||||
await portal.run(assert_state, value=True)
|
await portal.run(assert_state, value=True)
|
||||||
|
|
||||||
# call `ctx.cancel()` explicitly
|
# call `ctx.cancel()` explicitly
|
||||||
if use_ctx_cancel_method == 'pre_stream':
|
if use_ctx_cancel_method:
|
||||||
await ctx.cancel()
|
await ctx.cancel()
|
||||||
|
|
||||||
# NOTE: means the local side `ctx._scope` will
|
# NOTE: means the local side `ctx._scope` will
|
||||||
# have been cancelled by an ctxc ack and thus
|
# have been cancelled by an ctxc ack and thus
|
||||||
# `._scope.cancelled_caught` should be set.
|
# `._scope.cancelled_caught` should be set.
|
||||||
async with (
|
try:
|
||||||
expect_ctxc(
|
async with ctx.open_stream() as stream:
|
||||||
|
async for msg in stream:
|
||||||
|
pass
|
||||||
|
|
||||||
|
except tractor.ContextCancelled as ctxc:
|
||||||
# XXX: the cause is US since we call
|
# XXX: the cause is US since we call
|
||||||
# `Context.cancel()` just above!
|
# `Context.cancel()` just above!
|
||||||
yay=True,
|
|
||||||
|
|
||||||
# XXX: must be propagated to __aexit__
|
|
||||||
# and should be silently absorbed there
|
|
||||||
# since we called `.cancel()` just above ;)
|
|
||||||
reraise=True,
|
|
||||||
) as maybe_ctxc,
|
|
||||||
):
|
|
||||||
async with ctx.open_stream() as stream:
|
|
||||||
|
|
||||||
if rent_wait_for_msg:
|
|
||||||
async for msg in stream:
|
|
||||||
print(f'PARENT rx: {msg!r}\n')
|
|
||||||
break
|
|
||||||
|
|
||||||
if use_ctx_cancel_method == 'post_stream_open':
|
|
||||||
await ctx.cancel()
|
|
||||||
|
|
||||||
if use_ctx_cancel_method == 'post_stream_close':
|
|
||||||
await ctx.cancel()
|
|
||||||
|
|
||||||
ctxc: tractor.ContextCancelled = maybe_ctxc.value
|
|
||||||
assert (
|
assert (
|
||||||
ctxc.canceller
|
ctxc.canceller
|
||||||
==
|
==
|
||||||
|
@ -700,6 +620,14 @@ async def test_parent_exits_ctx_after_child_enters_stream(
|
||||||
root.uid
|
root.uid
|
||||||
)
|
)
|
||||||
|
|
||||||
|
# XXX: must be propagated to __aexit__
|
||||||
|
# and should be silently absorbed there
|
||||||
|
# since we called `.cancel()` just above ;)
|
||||||
|
raise
|
||||||
|
|
||||||
|
else:
|
||||||
|
assert 0, "Should have context cancelled?"
|
||||||
|
|
||||||
# channel should still be up
|
# channel should still be up
|
||||||
assert portal.channel.connected()
|
assert portal.channel.connected()
|
||||||
|
|
||||||
|
@ -709,20 +637,13 @@ async def test_parent_exits_ctx_after_child_enters_stream(
|
||||||
value=False,
|
value=False,
|
||||||
)
|
)
|
||||||
|
|
||||||
# XXX CHILD-BLOCKS case, we SHOULD NOT exit from the
|
|
||||||
# `.open_context()` before the child has returned,
|
|
||||||
# errored or been cancelled!
|
|
||||||
else:
|
else:
|
||||||
try:
|
try:
|
||||||
with trio.fail_after(
|
with trio.fail_after(0.2):
|
||||||
0.5 # if not debug_mode else 999
|
await ctx.result()
|
||||||
):
|
|
||||||
res = await ctx.wait_for_result()
|
|
||||||
assert res is not tractor._context.Unresolved
|
|
||||||
assert 0, "Callee should have blocked!?"
|
assert 0, "Callee should have blocked!?"
|
||||||
except trio.TooSlowError:
|
except trio.TooSlowError:
|
||||||
# NO-OP -> since already triggered by
|
# NO-OP -> since already called above
|
||||||
# `trio.fail_after()` above!
|
|
||||||
await ctx.cancel()
|
await ctx.cancel()
|
||||||
|
|
||||||
# NOTE: local scope should have absorbed the cancellation since
|
# NOTE: local scope should have absorbed the cancellation since
|
||||||
|
@ -762,7 +683,7 @@ async def test_parent_exits_ctx_after_child_enters_stream(
|
||||||
|
|
||||||
|
|
||||||
@tractor_test
|
@tractor_test
|
||||||
async def test_multitask_parent_cancels_from_nonroot_task(
|
async def test_multitask_caller_cancels_from_nonroot_task(
|
||||||
debug_mode: bool,
|
debug_mode: bool,
|
||||||
):
|
):
|
||||||
async with tractor.open_nursery(
|
async with tractor.open_nursery(
|
||||||
|
@ -814,6 +735,7 @@ async def test_multitask_parent_cancels_from_nonroot_task(
|
||||||
|
|
||||||
@tractor.context
|
@tractor.context
|
||||||
async def cancel_self(
|
async def cancel_self(
|
||||||
|
|
||||||
ctx: Context,
|
ctx: Context,
|
||||||
|
|
||||||
) -> None:
|
) -> None:
|
||||||
|
@ -853,7 +775,7 @@ async def cancel_self(
|
||||||
|
|
||||||
|
|
||||||
@tractor_test
|
@tractor_test
|
||||||
async def test_child_cancels_before_started(
|
async def test_callee_cancels_before_started(
|
||||||
debug_mode: bool,
|
debug_mode: bool,
|
||||||
):
|
):
|
||||||
'''
|
'''
|
||||||
|
@ -904,7 +826,8 @@ async def never_open_stream(
|
||||||
|
|
||||||
|
|
||||||
@tractor.context
|
@tractor.context
|
||||||
async def keep_sending_from_child(
|
async def keep_sending_from_callee(
|
||||||
|
|
||||||
ctx: Context,
|
ctx: Context,
|
||||||
msg_buffer_size: int|None = None,
|
msg_buffer_size: int|None = None,
|
||||||
|
|
||||||
|
@ -927,7 +850,7 @@ async def keep_sending_from_child(
|
||||||
'overrun_by',
|
'overrun_by',
|
||||||
[
|
[
|
||||||
('caller', 1, never_open_stream),
|
('caller', 1, never_open_stream),
|
||||||
('callee', 0, keep_sending_from_child),
|
('callee', 0, keep_sending_from_callee),
|
||||||
],
|
],
|
||||||
ids=[
|
ids=[
|
||||||
('caller_1buf_never_open_stream'),
|
('caller_1buf_never_open_stream'),
|
||||||
|
@ -1008,6 +931,7 @@ def test_one_end_stream_not_opened(
|
||||||
|
|
||||||
@tractor.context
|
@tractor.context
|
||||||
async def echo_back_sequence(
|
async def echo_back_sequence(
|
||||||
|
|
||||||
ctx: Context,
|
ctx: Context,
|
||||||
seq: list[int],
|
seq: list[int],
|
||||||
wait_for_cancel: bool,
|
wait_for_cancel: bool,
|
||||||
|
|
|
@ -26,9 +26,6 @@ import os
|
||||||
import pathlib
|
import pathlib
|
||||||
|
|
||||||
import tractor
|
import tractor
|
||||||
from tractor.devx._debug import (
|
|
||||||
BoxedMaybeException,
|
|
||||||
)
|
|
||||||
from .pytest import (
|
from .pytest import (
|
||||||
tractor_test as tractor_test
|
tractor_test as tractor_test
|
||||||
)
|
)
|
||||||
|
@ -101,13 +98,12 @@ async def expect_ctxc(
|
||||||
'''
|
'''
|
||||||
if yay:
|
if yay:
|
||||||
try:
|
try:
|
||||||
yield (maybe_exc := BoxedMaybeException())
|
yield
|
||||||
raise RuntimeError('Never raised ctxc?')
|
raise RuntimeError('Never raised ctxc?')
|
||||||
except tractor.ContextCancelled as ctxc:
|
except tractor.ContextCancelled:
|
||||||
maybe_exc.value = ctxc
|
|
||||||
if reraise:
|
if reraise:
|
||||||
raise
|
raise
|
||||||
else:
|
else:
|
||||||
return
|
return
|
||||||
else:
|
else:
|
||||||
yield (maybe_exc := BoxedMaybeException())
|
yield
|
||||||
|
|
|
@ -186,16 +186,10 @@ class PldRx(Struct):
|
||||||
msg: MsgType = (
|
msg: MsgType = (
|
||||||
ipc_msg
|
ipc_msg
|
||||||
or
|
or
|
||||||
|
|
||||||
# sync-rx msg from underlying IPC feeder (mem-)chan
|
# sync-rx msg from underlying IPC feeder (mem-)chan
|
||||||
ipc._rx_chan.receive_nowait()
|
ipc._rx_chan.receive_nowait()
|
||||||
)
|
)
|
||||||
if (
|
|
||||||
type(msg) is Return
|
|
||||||
):
|
|
||||||
log.info(
|
|
||||||
f'Rxed final result msg\n'
|
|
||||||
f'{msg}\n'
|
|
||||||
)
|
|
||||||
return self.decode_pld(
|
return self.decode_pld(
|
||||||
msg,
|
msg,
|
||||||
ipc=ipc,
|
ipc=ipc,
|
||||||
|
@ -225,13 +219,6 @@ class PldRx(Struct):
|
||||||
# async-rx msg from underlying IPC feeder (mem-)chan
|
# async-rx msg from underlying IPC feeder (mem-)chan
|
||||||
await ipc._rx_chan.receive()
|
await ipc._rx_chan.receive()
|
||||||
)
|
)
|
||||||
if (
|
|
||||||
type(msg) is Return
|
|
||||||
):
|
|
||||||
log.info(
|
|
||||||
f'Rxed final result msg\n'
|
|
||||||
f'{msg}\n'
|
|
||||||
)
|
|
||||||
return self.decode_pld(
|
return self.decode_pld(
|
||||||
msg=msg,
|
msg=msg,
|
||||||
ipc=ipc,
|
ipc=ipc,
|
||||||
|
@ -420,6 +407,8 @@ class PldRx(Struct):
|
||||||
__tracebackhide__: bool = False
|
__tracebackhide__: bool = False
|
||||||
raise
|
raise
|
||||||
|
|
||||||
|
dec_msg = decode_pld
|
||||||
|
|
||||||
async def recv_msg_w_pld(
|
async def recv_msg_w_pld(
|
||||||
self,
|
self,
|
||||||
ipc: Context|MsgStream,
|
ipc: Context|MsgStream,
|
||||||
|
@ -433,19 +422,12 @@ class PldRx(Struct):
|
||||||
|
|
||||||
) -> tuple[MsgType, PayloadT]:
|
) -> tuple[MsgType, PayloadT]:
|
||||||
'''
|
'''
|
||||||
Retrieve the next avail IPC msg, decode it's payload, and
|
Retrieve the next avail IPC msg, decode it's payload, and return
|
||||||
return the pair of refs.
|
the pair of refs.
|
||||||
|
|
||||||
'''
|
'''
|
||||||
__tracebackhide__: bool = hide_tb
|
__tracebackhide__: bool = hide_tb
|
||||||
msg: MsgType = await ipc._rx_chan.receive()
|
msg: MsgType = await ipc._rx_chan.receive()
|
||||||
if (
|
|
||||||
type(msg) is Return
|
|
||||||
):
|
|
||||||
log.info(
|
|
||||||
f'Rxed final result msg\n'
|
|
||||||
f'{msg}\n'
|
|
||||||
)
|
|
||||||
|
|
||||||
if passthrough_non_pld_msgs:
|
if passthrough_non_pld_msgs:
|
||||||
match msg:
|
match msg:
|
||||||
|
@ -462,10 +444,6 @@ class PldRx(Struct):
|
||||||
hide_tb=hide_tb,
|
hide_tb=hide_tb,
|
||||||
**kwargs,
|
**kwargs,
|
||||||
)
|
)
|
||||||
# log.runtime(
|
|
||||||
# f'Delivering payload msg\n'
|
|
||||||
# f'{msg}\n'
|
|
||||||
# )
|
|
||||||
return msg, pld
|
return msg, pld
|
||||||
|
|
||||||
|
|
||||||
|
@ -560,8 +538,8 @@ async def maybe_limit_plds(
|
||||||
async def drain_to_final_msg(
|
async def drain_to_final_msg(
|
||||||
ctx: Context,
|
ctx: Context,
|
||||||
|
|
||||||
msg_limit: int = 6,
|
|
||||||
hide_tb: bool = True,
|
hide_tb: bool = True,
|
||||||
|
msg_limit: int = 6,
|
||||||
|
|
||||||
) -> tuple[
|
) -> tuple[
|
||||||
Return|None,
|
Return|None,
|
||||||
|
@ -590,8 +568,8 @@ async def drain_to_final_msg(
|
||||||
even after ctx closure and the `.open_context()` block exit.
|
even after ctx closure and the `.open_context()` block exit.
|
||||||
|
|
||||||
'''
|
'''
|
||||||
|
__tracebackhide__: bool = hide_tb
|
||||||
raise_overrun: bool = not ctx._allow_overruns
|
raise_overrun: bool = not ctx._allow_overruns
|
||||||
parent_never_opened_stream: bool = ctx._stream is None
|
|
||||||
|
|
||||||
# wait for a final context result by collecting (but
|
# wait for a final context result by collecting (but
|
||||||
# basically ignoring) any bi-dir-stream msgs still in transit
|
# basically ignoring) any bi-dir-stream msgs still in transit
|
||||||
|
@ -600,8 +578,7 @@ async def drain_to_final_msg(
|
||||||
result_msg: Return|Error|None = None
|
result_msg: Return|Error|None = None
|
||||||
while not (
|
while not (
|
||||||
ctx.maybe_error
|
ctx.maybe_error
|
||||||
and
|
and not ctx._final_result_is_set()
|
||||||
not ctx._final_result_is_set()
|
|
||||||
):
|
):
|
||||||
try:
|
try:
|
||||||
# receive all msgs, scanning for either a final result
|
# receive all msgs, scanning for either a final result
|
||||||
|
@ -654,11 +631,6 @@ async def drain_to_final_msg(
|
||||||
)
|
)
|
||||||
__tracebackhide__: bool = False
|
__tracebackhide__: bool = False
|
||||||
|
|
||||||
else:
|
|
||||||
log.cancel(
|
|
||||||
f'IPC ctx cancelled externally during result drain ?\n'
|
|
||||||
f'{ctx}'
|
|
||||||
)
|
|
||||||
# CASE 2: mask the local cancelled-error(s)
|
# CASE 2: mask the local cancelled-error(s)
|
||||||
# only when we are sure the remote error is
|
# only when we are sure the remote error is
|
||||||
# the source cause of this local task's
|
# the source cause of this local task's
|
||||||
|
@ -690,24 +662,17 @@ async def drain_to_final_msg(
|
||||||
case Yield():
|
case Yield():
|
||||||
pre_result_drained.append(msg)
|
pre_result_drained.append(msg)
|
||||||
if (
|
if (
|
||||||
not parent_never_opened_stream
|
|
||||||
and (
|
|
||||||
(ctx._stream.closed
|
(ctx._stream.closed
|
||||||
and
|
and (reason := 'stream was already closed')
|
||||||
(reason := 'stream was already closed')
|
)
|
||||||
) or
|
or (ctx.cancel_acked
|
||||||
(ctx.cancel_acked
|
and (reason := 'ctx cancelled other side')
|
||||||
and
|
|
||||||
(reason := 'ctx cancelled other side')
|
|
||||||
)
|
)
|
||||||
or (ctx._cancel_called
|
or (ctx._cancel_called
|
||||||
and
|
and (reason := 'ctx called `.cancel()`')
|
||||||
(reason := 'ctx called `.cancel()`')
|
|
||||||
)
|
)
|
||||||
or (len(pre_result_drained) > msg_limit
|
or (len(pre_result_drained) > msg_limit
|
||||||
and
|
and (reason := f'"yield" limit={msg_limit}')
|
||||||
(reason := f'"yield" limit={msg_limit}')
|
|
||||||
)
|
|
||||||
)
|
)
|
||||||
):
|
):
|
||||||
log.cancel(
|
log.cancel(
|
||||||
|
@ -725,7 +690,7 @@ async def drain_to_final_msg(
|
||||||
# drain up to the `msg_limit` hoping to get
|
# drain up to the `msg_limit` hoping to get
|
||||||
# a final result or error/ctxc.
|
# a final result or error/ctxc.
|
||||||
else:
|
else:
|
||||||
report: str = (
|
log.warning(
|
||||||
'Ignoring "yield" msg during `ctx.result()` drain..\n'
|
'Ignoring "yield" msg during `ctx.result()` drain..\n'
|
||||||
f'<= {ctx.chan.uid}\n'
|
f'<= {ctx.chan.uid}\n'
|
||||||
f' |_{ctx._nsf}()\n\n'
|
f' |_{ctx._nsf}()\n\n'
|
||||||
|
@ -734,14 +699,6 @@ async def drain_to_final_msg(
|
||||||
|
|
||||||
f'{pretty_struct.pformat(msg)}\n'
|
f'{pretty_struct.pformat(msg)}\n'
|
||||||
)
|
)
|
||||||
if parent_never_opened_stream:
|
|
||||||
report = (
|
|
||||||
f'IPC ctx never opened stream on {ctx.side!r}-side!\n'
|
|
||||||
f'\n'
|
|
||||||
# f'{ctx}\n'
|
|
||||||
) + report
|
|
||||||
|
|
||||||
log.warning(report)
|
|
||||||
continue
|
continue
|
||||||
|
|
||||||
# stream terminated, but no result yet..
|
# stream terminated, but no result yet..
|
||||||
|
@ -833,7 +790,6 @@ async def drain_to_final_msg(
|
||||||
f'{ctx.outcome}\n'
|
f'{ctx.outcome}\n'
|
||||||
)
|
)
|
||||||
|
|
||||||
__tracebackhide__: bool = hide_tb
|
|
||||||
return (
|
return (
|
||||||
result_msg,
|
result_msg,
|
||||||
pre_result_drained,
|
pre_result_drained,
|
||||||
|
|
Loading…
Reference in New Issue