Add parent vs. child cancels first cases

ipc_failure_while_streaming
Tyler Goodlet 2023-01-28 16:44:35 -05:00
parent 6c35ba2cb6
commit e34823aab4
2 changed files with 89 additions and 27 deletions

View File

@ -46,8 +46,7 @@ async def close_stream_and_error(
async def recv_and_spawn_net_killers( async def recv_and_spawn_net_killers(
ctx: Context, ctx: Context,
break_ipc: bool = False, break_ipc_after: bool | int = False,
**kwargs,
) -> None: ) -> None:
''' '''
@ -63,8 +62,8 @@ async def recv_and_spawn_net_killers(
print(f'child echoing {i}') print(f'child echoing {i}')
await stream.send(i) await stream.send(i)
if ( if (
break_ipc break_ipc_after
and i > 500 and i > break_ipc_after
): ):
'#################################\n' '#################################\n'
'Simulating child-side IPC BREAK!\n' 'Simulating child-side IPC BREAK!\n'
@ -76,8 +75,12 @@ async def recv_and_spawn_net_killers(
async def main( async def main(
debug_mode: bool = False, debug_mode: bool = False,
start_method: str = 'trio', start_method: str = 'trio',
break_parent_ipc: bool = False,
break_child_ipc: bool = False, # by default we break the parent IPC first (if configured to break
# at all), but this can be changed so the child does first (even if
# both are set to break).
break_parent_ipc_after: int | bool = False,
break_child_ipc_after: int | bool = False,
) -> None: ) -> None:
@ -99,15 +102,15 @@ async def main(
async with portal.open_context( async with portal.open_context(
recv_and_spawn_net_killers, recv_and_spawn_net_killers,
break_ipc=break_child_ipc, break_ipc_after=break_child_ipc_after,
) as (ctx, sent): ) as (ctx, sent):
async with ctx.open_stream() as stream: async with ctx.open_stream() as stream:
for i in range(1000): for i in range(1000):
if ( if (
break_parent_ipc break_parent_ipc_after
and i > 100 and i > break_parent_ipc_after
): ):
print( print(
'#################################\n' '#################################\n'

View File

@ -22,30 +22,56 @@ from conftest import (
@pytest.mark.parametrize( @pytest.mark.parametrize(
'ipc_break', 'ipc_break',
[ [
{}, # no breaks
{'break_parent_ipc': True},
{'break_child_ipc': True},
{ {
'break_child_ipc': True, 'break_parent_ipc_after': False,
'break_parent_ipc': True, 'break_child_ipc_after': False,
}, },
# only parent breaks
{
'break_parent_ipc_after': 500,
'break_child_ipc_after': False,
},
# only child breaks
{
'break_parent_ipc_after': False,
'break_child_ipc_after': 500,
},
# both: break parent first
{
'break_parent_ipc_after': 500,
'break_child_ipc_after': 800,
},
# both: break child first
{
'break_parent_ipc_after': 800,
'break_child_ipc_after': 500,
},
], ],
ids=[ ids=[
'no_break', 'no_break',
'break_parent', 'break_parent',
'break_child', 'break_child',
'break_both', 'break_both_parent_first',
'break_both_child_first',
], ],
) )
def test_child_breaks_ipc_channel_during_stream( def test_ipc_channel_break_during_stream(
debug_mode: bool, debug_mode: bool,
spawn_backend: str, spawn_backend: str,
ipc_break: dict | None, ipc_break: dict | None,
): ):
''' '''
Ensure we can (purposely) break IPC during streaming and it's still Ensure we can have an IPC channel break its connection during
possible for the (simulated) user to kill the actor tree using streaming and it's still possible for the (simulated) user to kill
SIGINT. the actor tree using SIGINT.
We also verify the type of connection error expected in the parent
depending on which side if the IPC breaks first.
''' '''
if spawn_backend != 'trio': if spawn_backend != 'trio':
@ -66,16 +92,49 @@ def test_child_breaks_ipc_channel_during_stream(
# when ONLY the child breaks we expect the parent to get a closed # when ONLY the child breaks we expect the parent to get a closed
# resource error on the next `MsgStream.receive()` and then fail out # resource error on the next `MsgStream.receive()` and then fail out
# and cancel the child from there. # and cancel the child from there.
if 'break_child_ipc' in ipc_break: if (
# only child breaks
(
ipc_break['break_child_ipc_after']
and ipc_break['break_parent_ipc_after'] is False
)
# both break but, parent breaks first
or (
ipc_break['break_child_ipc_after'] is not False
and (
ipc_break['break_parent_ipc_after']
> ipc_break['break_child_ipc_after']
)
)
):
expect_final_exc = trio.ClosedResourceError expect_final_exc = trio.ClosedResourceError
# when the parent IPC side dies (even if the child's does as well) # when the parent IPC side dies (even if the child's does as well
# we expect the channel to be sent a stop msg from the child at some # but the child fails BEFORE the parent) we expect the channel to be
# point which will signal the parent that the stream has been # sent a stop msg from the child at some point which will signal the
# terminated. # parent that the stream has been terminated.
# NOTE: when the parent breaks "after" the child you get the above # NOTE: when the parent breaks "after" the child you get this same
# case as well, but it's not worth testing right? # case as well, the child breaks the IPC channel with a stop msg
if 'break_parent_ipc' in ipc_break: # before any closure takes place.
elif (
# only parent breaks
(
ipc_break['break_parent_ipc_after']
and ipc_break['break_child_ipc_after'] is False
)
# both break but, child breaks first
or (
ipc_break['break_parent_ipc_after'] is not False
and (
ipc_break['break_child_ipc_after']
> ipc_break['break_parent_ipc_after']
)
)
):
expect_final_exc = trio.EndOfChannel expect_final_exc = trio.EndOfChannel
with pytest.raises(expect_final_exc): with pytest.raises(expect_final_exc):