Adjust advanced faults test(s) for absorbed EoCs

More or less just simplifies to not seeing the stream closure errors and
instead expecting KBIs from the simulated user who 'ctl-cs after hang'.

Toss in a little `stuff_hangin_ctlc()` to the script to wrap all that
and always check stream closure before sending the final KBI.
mv_to_new_trio_py3.11
Tyler Goodlet 2024-03-19 19:33:06 -04:00
parent 668016d37b
commit 8ab5e08830
2 changed files with 116 additions and 78 deletions

View File

@ -6,6 +6,7 @@ been an outage) and we want to ensure that despite being in debug mode
actor tree will eventually be cancelled without leaving any zombies. actor tree will eventually be cancelled without leaving any zombies.
''' '''
from contextlib import asynccontextmanager as acm
from functools import partial from functools import partial
from tractor import ( from tractor import (
@ -17,6 +18,7 @@ from tractor import (
_testing, _testing,
) )
import trio import trio
import pytest
async def break_ipc( async def break_ipc(
@ -41,6 +43,13 @@ async def break_ipc(
await stream.aclose() await stream.aclose()
method: str = method or def_method method: str = method or def_method
print(
'#################################\n'
'Simulating CHILD-side IPC BREAK!\n'
f'method: {method}\n'
f'pre `.aclose()`: {pre_close}\n'
'#################################\n'
)
match method: match method:
case 'trans_aclose': case 'trans_aclose':
@ -80,17 +89,17 @@ async def break_ipc_then_error(
break_ipc_with: str|None = None, break_ipc_with: str|None = None,
pre_close: bool = False, pre_close: bool = False,
): ):
async for msg in stream:
await stream.send(msg)
await break_ipc( await break_ipc(
stream=stream, stream=stream,
method=break_ipc_with, method=break_ipc_with,
pre_close=pre_close, pre_close=pre_close,
) )
async for msg in stream:
await stream.send(msg)
assert 0 assert 0
# async def close_stream_and_error(
async def iter_ipc_stream( async def iter_ipc_stream(
stream: MsgStream, stream: MsgStream,
break_ipc_with: str|None = None, break_ipc_with: str|None = None,
@ -99,20 +108,6 @@ async def iter_ipc_stream(
async for msg in stream: async for msg in stream:
await stream.send(msg) await stream.send(msg)
# wipe out channel right before raising
# await break_ipc(
# stream=stream,
# method=break_ipc_with,
# pre_close=pre_close,
# )
# send channel close msg at SC-prot level
#
# TODO: what should get raised here if anything?
# await stream.aclose()
# assert 0
@context @context
async def recv_and_spawn_net_killers( async def recv_and_spawn_net_killers(
@ -134,14 +129,16 @@ async def recv_and_spawn_net_killers(
async for i in stream: async for i in stream:
print(f'child echoing {i}') print(f'child echoing {i}')
await stream.send(i) await stream.send(i)
if ( if (
break_ipc_after break_ipc_after
and and
i > break_ipc_after i >= break_ipc_after
): ):
'#################################\n' n.start_soon(
'Simulating CHILD-side IPC BREAK!\n' iter_ipc_stream,
'#################################\n' stream,
)
n.start_soon( n.start_soon(
partial( partial(
break_ipc_then_error, break_ipc_then_error,
@ -149,10 +146,23 @@ async def recv_and_spawn_net_killers(
pre_close=pre_close, pre_close=pre_close,
) )
) )
n.start_soon(
iter_ipc_stream,
stream, @acm
async def stuff_hangin_ctlc(timeout: float = 1) -> None:
with trio.move_on_after(timeout) as cs:
yield timeout
if cs.cancelled_caught:
# pretend to be a user seeing no streaming action
# thinking it's a hang, and then hitting ctl-c..
print(
f"i'm a user on the PARENT side and thingz hangin "
f'after timeout={timeout} ???\n\n'
'MASHING CTlR-C..!?\n'
) )
raise KeyboardInterrupt
async def main( async def main(
@ -169,9 +179,6 @@ async def main(
) -> None: ) -> None:
# from tractor._state import _runtime_vars as rtv
# rtv['_debug_mode'] = debug_mode
async with ( async with (
open_nursery( open_nursery(
start_method=start_method, start_method=start_method,
@ -190,10 +197,11 @@ async def main(
) )
async with ( async with (
stuff_hangin_ctlc(timeout=2) as timeout,
_testing.expect_ctxc( _testing.expect_ctxc(
yay=( yay=(
break_parent_ipc_after break_parent_ipc_after
or break_child_ipc_after, or break_child_ipc_after
), ),
# TODO: we CAN'T remove this right? # TODO: we CAN'T remove this right?
# since we need the ctxc to bubble up from either # since we need the ctxc to bubble up from either
@ -205,12 +213,14 @@ async def main(
# and KBI in an eg? # and KBI in an eg?
reraise=True, reraise=True,
), ),
portal.open_context( portal.open_context(
recv_and_spawn_net_killers, recv_and_spawn_net_killers,
break_ipc_after=break_child_ipc_after, break_ipc_after=break_child_ipc_after,
pre_close=pre_close, pre_close=pre_close,
) as (ctx, sent), ) as (ctx, sent),
): ):
rx_eoc: bool = False
ipc_break_sent: bool = False ipc_break_sent: bool = False
async with ctx.open_stream() as stream: async with ctx.open_stream() as stream:
for i in range(1000): for i in range(1000):
@ -228,6 +238,7 @@ async def main(
'#################################\n' '#################################\n'
) )
# TODO: other methods? see break func above.
# await stream._ctx.chan.send(None) # await stream._ctx.chan.send(None)
# await stream._ctx.chan.transport.stream.send_eof() # await stream._ctx.chan.transport.stream.send_eof()
await stream._ctx.chan.transport.stream.aclose() await stream._ctx.chan.transport.stream.aclose()
@ -251,10 +262,12 @@ async def main(
# TODO: is this needed or no? # TODO: is this needed or no?
raise raise
timeout: int = 1 # timeout: int = 1
print(f'Entering `stream.receive()` with timeout={timeout}\n') # with trio.move_on_after(timeout) as cs:
with trio.move_on_after(timeout) as cs: async with stuff_hangin_ctlc() as timeout:
print(
f'PARENT `stream.receive()` with timeout={timeout}\n'
)
# NOTE: in the parent side IPC failure case this # NOTE: in the parent side IPC failure case this
# will raise an ``EndOfChannel`` after the child # will raise an ``EndOfChannel`` after the child
# is killed and sends a stop msg back to it's # is killed and sends a stop msg back to it's
@ -266,22 +279,29 @@ async def main(
f'{rx}\n' f'{rx}\n'
) )
except trio.EndOfChannel: except trio.EndOfChannel:
rx_eoc: bool = True
print('MsgStream got EoC for PARENT') print('MsgStream got EoC for PARENT')
raise raise
if cs.cancelled_caught:
# pretend to be a user seeing no streaming action
# thinking it's a hang, and then hitting ctl-c..
print( print(
f"YOO i'm a PARENT user anddd thingz hangin..\n" 'Streaming finished and we got Eoc.\n'
f'after timeout={timeout}\n' 'Canceling `.open_context()` in root with\n'
'CTlR-C..'
) )
if rx_eoc:
assert stream.closed
try:
await stream.send(i)
pytest.fail('stream not closed?')
except (
trio.ClosedResourceError,
trio.EndOfChannel,
) as send_err:
if rx_eoc:
assert send_err is stream._eoc
else:
assert send_err is stream._closed
print(
"YOO i'm mad!\n"
'The send side is dun but thingz hangin..\n'
'MASHING CTlR-C Ctl-c..'
)
raise KeyboardInterrupt raise KeyboardInterrupt

View File

@ -85,8 +85,8 @@ def test_ipc_channel_break_during_stream(
''' '''
if spawn_backend != 'trio': if spawn_backend != 'trio':
# if debug_mode: if debug_mode:
# pytest.skip('`debug_mode` only supported on `trio` spawner') pytest.skip('`debug_mode` only supported on `trio` spawner')
# non-`trio` spawners should never hit the hang condition that # non-`trio` spawners should never hit the hang condition that
# requires the user to do ctl-c to cancel the actor tree. # requires the user to do ctl-c to cancel the actor tree.
@ -107,7 +107,10 @@ def test_ipc_channel_break_during_stream(
# AND we tell the child to call `MsgStream.aclose()`. # AND we tell the child to call `MsgStream.aclose()`.
and pre_aclose_msgstream and pre_aclose_msgstream
): ):
expect_final_exc = trio.EndOfChannel # expect_final_exc = trio.EndOfChannel
# ^XXX NOPE! XXX^ since now `.open_stream()` absorbs this
# gracefully!
expect_final_exc = KeyboardInterrupt
# NOTE when ONLY the child breaks or it breaks BEFORE the # NOTE when ONLY the child breaks or it breaks BEFORE the
# parent we expect the parent to get a closed resource error # parent we expect the parent to get a closed resource error
@ -120,11 +123,25 @@ def test_ipc_channel_break_during_stream(
and and
ipc_break['break_parent_ipc_after'] is False ipc_break['break_parent_ipc_after'] is False
): ):
expect_final_exc = trio.ClosedResourceError # NOTE: we DO NOT expect this any more since
# the child side's channel will be broken silently
# and nothing on the parent side will indicate this!
# expect_final_exc = trio.ClosedResourceError
# if child calls `MsgStream.aclose()` then expect EoC. # NOTE: child will send a 'stop' msg before it breaks
# the transport channel BUT, that will be absorbed by the
# `ctx.open_stream()` block and thus the `.open_context()`
# should hang, after which the test script simulates
# a user sending ctl-c by raising a KBI.
if pre_aclose_msgstream: if pre_aclose_msgstream:
expect_final_exc = trio.EndOfChannel expect_final_exc = KeyboardInterrupt
# XXX OLD XXX
# if child calls `MsgStream.aclose()` then expect EoC.
# ^ XXX not any more ^ since eoc is always absorbed
# gracefully and NOT bubbled to the `.open_context()`
# block!
# expect_final_exc = trio.EndOfChannel
# BOTH but, CHILD breaks FIRST # BOTH but, CHILD breaks FIRST
elif ( elif (
@ -134,12 +151,8 @@ def test_ipc_channel_break_during_stream(
> ipc_break['break_child_ipc_after'] > ipc_break['break_child_ipc_after']
) )
): ):
expect_final_exc = trio.ClosedResourceError
# child will send a 'stop' msg before it breaks
# the transport channel.
if pre_aclose_msgstream: if pre_aclose_msgstream:
expect_final_exc = trio.EndOfChannel expect_final_exc = KeyboardInterrupt
# NOTE when the parent IPC side dies (even if the child's does as well # NOTE when the parent IPC side dies (even if the child's does as well
# but the child fails BEFORE the parent) we always expect the # but the child fails BEFORE the parent) we always expect the
@ -160,7 +173,8 @@ def test_ipc_channel_break_during_stream(
ipc_break['break_parent_ipc_after'] is not False ipc_break['break_parent_ipc_after'] is not False
and ( and (
ipc_break['break_child_ipc_after'] ipc_break['break_child_ipc_after']
> ipc_break['break_parent_ipc_after'] >
ipc_break['break_parent_ipc_after']
) )
): ):
expect_final_exc = trio.ClosedResourceError expect_final_exc = trio.ClosedResourceError
@ -224,6 +238,7 @@ def test_stream_closed_right_after_ipc_break_and_zombie_lord_engages():
''' '''
async def main(): async def main():
with trio.fail_after(3):
async with tractor.open_nursery() as n: async with tractor.open_nursery() as n:
portal = await n.start_actor( portal = await n.start_actor(
'ipc_breaker', 'ipc_breaker',
@ -241,7 +256,10 @@ def test_stream_closed_right_after_ipc_break_and_zombie_lord_engages():
print('parent waiting on context') print('parent waiting on context')
print('parent exited context') print(
'parent exited context\n'
'parent raising KBI..\n'
)
raise KeyboardInterrupt raise KeyboardInterrupt
with pytest.raises(KeyboardInterrupt): with pytest.raises(KeyboardInterrupt):