diff --git a/examples/advanced_faults/ipc_failure_during_stream.py b/examples/advanced_faults/ipc_failure_during_stream.py index a5d3738..6728b8d 100644 --- a/examples/advanced_faults/ipc_failure_during_stream.py +++ b/examples/advanced_faults/ipc_failure_during_stream.py @@ -46,8 +46,7 @@ async def close_stream_and_error( async def recv_and_spawn_net_killers( ctx: Context, - break_ipc: bool = False, - **kwargs, + break_ipc_after: bool | int = False, ) -> None: ''' @@ -63,8 +62,8 @@ async def recv_and_spawn_net_killers( print(f'child echoing {i}') await stream.send(i) if ( - break_ipc - and i > 500 + break_ipc_after + and i > break_ipc_after ): '#################################\n' 'Simulating child-side IPC BREAK!\n' @@ -76,8 +75,12 @@ async def recv_and_spawn_net_killers( async def main( debug_mode: bool = False, start_method: str = 'trio', - break_parent_ipc: bool = False, - break_child_ipc: bool = False, + + # by default we break the parent IPC first (if configured to break + # at all), but this can be changed so the child does first (even if + # both are set to break). + break_parent_ipc_after: int | bool = False, + break_child_ipc_after: int | bool = False, ) -> None: @@ -99,15 +102,15 @@ async def main( async with portal.open_context( recv_and_spawn_net_killers, - break_ipc=break_child_ipc, + break_ipc_after=break_child_ipc_after, ) as (ctx, sent): async with ctx.open_stream() as stream: for i in range(1000): if ( - break_parent_ipc - and i > 100 + break_parent_ipc_after + and i > break_parent_ipc_after ): print( '#################################\n' diff --git a/tests/test_advanced_faults.py b/tests/test_advanced_faults.py index a5e4aa9..081bd0e 100644 --- a/tests/test_advanced_faults.py +++ b/tests/test_advanced_faults.py @@ -22,30 +22,56 @@ from conftest import ( @pytest.mark.parametrize( 'ipc_break', [ - {}, - {'break_parent_ipc': True}, - {'break_child_ipc': True}, + # no breaks { - 'break_child_ipc': True, - 'break_parent_ipc': True, + 'break_parent_ipc_after': False, + 'break_child_ipc_after': False, }, + + # only parent breaks + { + 'break_parent_ipc_after': 500, + 'break_child_ipc_after': False, + }, + + # only child breaks + { + 'break_parent_ipc_after': False, + 'break_child_ipc_after': 500, + }, + + # both: break parent first + { + 'break_parent_ipc_after': 500, + 'break_child_ipc_after': 800, + }, + # both: break child first + { + 'break_parent_ipc_after': 800, + 'break_child_ipc_after': 500, + }, + ], ids=[ 'no_break', 'break_parent', 'break_child', - 'break_both', + 'break_both_parent_first', + 'break_both_child_first', ], ) -def test_child_breaks_ipc_channel_during_stream( +def test_ipc_channel_break_during_stream( debug_mode: bool, spawn_backend: str, ipc_break: dict | None, ): ''' - Ensure we can (purposely) break IPC during streaming and it's still - possible for the (simulated) user to kill the actor tree using - SIGINT. + Ensure we can have an IPC channel break its connection during + streaming and it's still possible for the (simulated) user to kill + the actor tree using SIGINT. + + We also verify the type of connection error expected in the parent + depending on which side if the IPC breaks first. ''' if spawn_backend != 'trio': @@ -66,16 +92,49 @@ def test_child_breaks_ipc_channel_during_stream( # when ONLY the child breaks we expect the parent to get a closed # resource error on the next `MsgStream.receive()` and then fail out # and cancel the child from there. - if 'break_child_ipc' in ipc_break: + if ( + + # only child breaks + ( + ipc_break['break_child_ipc_after'] + and ipc_break['break_parent_ipc_after'] is False + ) + + # both break but, parent breaks first + or ( + ipc_break['break_child_ipc_after'] is not False + and ( + ipc_break['break_parent_ipc_after'] + > ipc_break['break_child_ipc_after'] + ) + ) + + ): expect_final_exc = trio.ClosedResourceError - # when the parent IPC side dies (even if the child's does as well) - # we expect the channel to be sent a stop msg from the child at some - # point which will signal the parent that the stream has been - # terminated. - # NOTE: when the parent breaks "after" the child you get the above - # case as well, but it's not worth testing right? - if 'break_parent_ipc' in ipc_break: + # when the parent IPC side dies (even if the child's does as well + # but the child fails BEFORE the parent) we expect the channel to be + # sent a stop msg from the child at some point which will signal the + # parent that the stream has been terminated. + # NOTE: when the parent breaks "after" the child you get this same + # case as well, the child breaks the IPC channel with a stop msg + # before any closure takes place. + elif ( + # only parent breaks + ( + ipc_break['break_parent_ipc_after'] + and ipc_break['break_child_ipc_after'] is False + ) + + # both break but, child breaks first + or ( + ipc_break['break_parent_ipc_after'] is not False + and ( + ipc_break['break_child_ipc_after'] + > ipc_break['break_parent_ipc_after'] + ) + ) + ): expect_final_exc = trio.EndOfChannel with pytest.raises(expect_final_exc):