diff --git a/examples/advanced_faults/ipc_failure_during_stream.py b/examples/advanced_faults/ipc_failure_during_stream.py index 809af50..a5d3738 100644 --- a/examples/advanced_faults/ipc_failure_during_stream.py +++ b/examples/advanced_faults/ipc_failure_during_stream.py @@ -46,6 +46,7 @@ async def close_stream_and_error( async def recv_and_spawn_net_killers( ctx: Context, + break_ipc: bool = False, **kwargs, ) -> None: @@ -58,9 +59,16 @@ async def recv_and_spawn_net_killers( ctx.open_stream() as stream, trio.open_nursery() as n, ): - for i in range(100): + async for i in stream: + print(f'child echoing {i}') await stream.send(i) - if i > 80: + if ( + break_ipc + and i > 500 + ): + '#################################\n' + 'Simulating child-side IPC BREAK!\n' + '#################################' n.start_soon(break_channel_silently_then_error, stream) n.start_soon(close_stream_and_error, stream) @@ -68,42 +76,72 @@ async def recv_and_spawn_net_killers( async def main( debug_mode: bool = False, start_method: str = 'trio', + break_parent_ipc: bool = False, + break_child_ipc: bool = False, ) -> None: - async with open_nursery( - start_method=start_method, + async with ( + open_nursery( + start_method=start_method, - # NOTE: even debugger is used we shouldn't get - # a hang since it never engages due to broken IPC - debug_mode=debug_mode, + # NOTE: even debugger is used we shouldn't get + # a hang since it never engages due to broken IPC + debug_mode=debug_mode, + loglevel='warning', - ) as n: - portal = await n.start_actor( + ) as an, + ): + portal = await an.start_actor( 'chitty_hijo', enable_modules=[__name__], ) async with portal.open_context( recv_and_spawn_net_killers, + break_ipc=break_child_ipc, + ) as (ctx, sent): async with ctx.open_stream() as stream: - for i in range(100): + for i in range(1000): + + if ( + break_parent_ipc + and i > 100 + ): + print( + '#################################\n' + 'Simulating parent-side IPC BREAK!\n' + '#################################' + ) + await stream._ctx.chan.send(None) # it actually breaks right here in the # mp_spawn/forkserver backends and thus the zombie # reaper never even kicks in? + print(f'parent sending {i}') await stream.send(i) with trio.move_on_after(2) as cs: + + # NOTE: in the parent side IPC failure case this + # will raise an ``EndOfChannel`` after the child + # is killed and sends a stop msg back to it's + # caller/this-parent. rx = await stream.receive() - print(f'I a mad user and here is what i got {rx}') + + print(f"I'm a happy user and echoed to me is {rx}") if cs.cancelled_caught: # pretend to be a user seeing no streaming action # thinking it's a hang, and then hitting ctl-c.. - print("YOO i'm a user anddd thingz hangin.. CTRL-C..") - raise KeyboardInterrupt + print("YOO i'm a user anddd thingz hangin..") + + print( + "YOO i'm mad send side dun but thingz hangin..\n" + 'MASHING CTlR-C Ctl-c..' + ) + raise KeyboardInterrupt if __name__ == '__main__': diff --git a/tests/test_advanced_faults.py b/tests/test_advanced_faults.py index 8294f2a..a5e4aa9 100644 --- a/tests/test_advanced_faults.py +++ b/tests/test_advanced_faults.py @@ -3,6 +3,8 @@ Sketchy network blackoutz, ugly byzantine gens, puedes eschuchar la cancelacion?.. ''' +from functools import partial + import pytest from _pytest.pathlib import import_path import trio @@ -15,11 +17,30 @@ from conftest import ( @pytest.mark.parametrize( 'debug_mode', [False, True], - ids=['debug_mode', 'no_debug_mode'], + ids=['no_debug_mode', 'debug_mode'], +) +@pytest.mark.parametrize( + 'ipc_break', + [ + {}, + {'break_parent_ipc': True}, + {'break_child_ipc': True}, + { + 'break_child_ipc': True, + 'break_parent_ipc': True, + }, + ], + ids=[ + 'no_break', + 'break_parent', + 'break_child', + 'break_both', + ], ) def test_child_breaks_ipc_channel_during_stream( debug_mode: bool, spawn_backend: str, + ipc_break: dict | None, ): ''' Ensure we can (purposely) break IPC during streaming and it's still @@ -27,12 +48,12 @@ def test_child_breaks_ipc_channel_during_stream( SIGINT. ''' - expect_final_exc = KeyboardInterrupt - if spawn_backend != 'trio': if debug_mode: pytest.skip('`debug_mode` only supported on `trio` spawner') + # non-`trio` spawners should never hit the hang condition that + # requires the user to do ctl-c to cancel the actor tree. expect_final_exc = trio.ClosedResourceError mod = import_path( @@ -40,9 +61,29 @@ def test_child_breaks_ipc_channel_during_stream( root=examples_dir(), ) + expect_final_exc = KeyboardInterrupt + + # when ONLY the child breaks we expect the parent to get a closed + # resource error on the next `MsgStream.receive()` and then fail out + # and cancel the child from there. + if 'break_child_ipc' in ipc_break: + expect_final_exc = trio.ClosedResourceError + + # when the parent IPC side dies (even if the child's does as well) + # we expect the channel to be sent a stop msg from the child at some + # point which will signal the parent that the stream has been + # terminated. + # NOTE: when the parent breaks "after" the child you get the above + # case as well, but it's not worth testing right? + if 'break_parent_ipc' in ipc_break: + expect_final_exc = trio.EndOfChannel + with pytest.raises(expect_final_exc): trio.run( - mod.main, - debug_mode, - spawn_backend, + partial( + mod.main, + debug_mode=debug_mode, + start_method=spawn_backend, + **ipc_break, + ) )