forked from goodboy/tractor
Add IPC breakage on both parent and child side
With the new fancy `_pytest.pathlib.import_path()` we can do real parametrization of the example-script-module code and thus configure whether the child, parent, or both silently break the IPC connection. Parametrize the test for all the above mentioned cases as well as the case where the IPC never breaks but we still simulate the user hammering ctl-c / SIGINT to terminate the actor tree. Adjust expected errors based on each case and heavily document each of these.ipc_failure_while_streaming
parent
3a0817ff55
commit
6c35ba2cb6
|
@ -46,6 +46,7 @@ async def close_stream_and_error(
|
||||||
async def recv_and_spawn_net_killers(
|
async def recv_and_spawn_net_killers(
|
||||||
|
|
||||||
ctx: Context,
|
ctx: Context,
|
||||||
|
break_ipc: bool = False,
|
||||||
**kwargs,
|
**kwargs,
|
||||||
|
|
||||||
) -> None:
|
) -> None:
|
||||||
|
@ -58,9 +59,16 @@ async def recv_and_spawn_net_killers(
|
||||||
ctx.open_stream() as stream,
|
ctx.open_stream() as stream,
|
||||||
trio.open_nursery() as n,
|
trio.open_nursery() as n,
|
||||||
):
|
):
|
||||||
for i in range(100):
|
async for i in stream:
|
||||||
|
print(f'child echoing {i}')
|
||||||
await stream.send(i)
|
await stream.send(i)
|
||||||
if i > 80:
|
if (
|
||||||
|
break_ipc
|
||||||
|
and i > 500
|
||||||
|
):
|
||||||
|
'#################################\n'
|
||||||
|
'Simulating child-side IPC BREAK!\n'
|
||||||
|
'#################################'
|
||||||
n.start_soon(break_channel_silently_then_error, stream)
|
n.start_soon(break_channel_silently_then_error, stream)
|
||||||
n.start_soon(close_stream_and_error, stream)
|
n.start_soon(close_stream_and_error, stream)
|
||||||
|
|
||||||
|
@ -68,41 +76,71 @@ async def recv_and_spawn_net_killers(
|
||||||
async def main(
|
async def main(
|
||||||
debug_mode: bool = False,
|
debug_mode: bool = False,
|
||||||
start_method: str = 'trio',
|
start_method: str = 'trio',
|
||||||
|
break_parent_ipc: bool = False,
|
||||||
|
break_child_ipc: bool = False,
|
||||||
|
|
||||||
) -> None:
|
) -> None:
|
||||||
|
|
||||||
async with open_nursery(
|
async with (
|
||||||
|
open_nursery(
|
||||||
start_method=start_method,
|
start_method=start_method,
|
||||||
|
|
||||||
# NOTE: even debugger is used we shouldn't get
|
# NOTE: even debugger is used we shouldn't get
|
||||||
# a hang since it never engages due to broken IPC
|
# a hang since it never engages due to broken IPC
|
||||||
debug_mode=debug_mode,
|
debug_mode=debug_mode,
|
||||||
|
loglevel='warning',
|
||||||
|
|
||||||
) as n:
|
) as an,
|
||||||
portal = await n.start_actor(
|
):
|
||||||
|
portal = await an.start_actor(
|
||||||
'chitty_hijo',
|
'chitty_hijo',
|
||||||
enable_modules=[__name__],
|
enable_modules=[__name__],
|
||||||
)
|
)
|
||||||
|
|
||||||
async with portal.open_context(
|
async with portal.open_context(
|
||||||
recv_and_spawn_net_killers,
|
recv_and_spawn_net_killers,
|
||||||
|
break_ipc=break_child_ipc,
|
||||||
|
|
||||||
) as (ctx, sent):
|
) as (ctx, sent):
|
||||||
async with ctx.open_stream() as stream:
|
async with ctx.open_stream() as stream:
|
||||||
for i in range(100):
|
for i in range(1000):
|
||||||
|
|
||||||
|
if (
|
||||||
|
break_parent_ipc
|
||||||
|
and i > 100
|
||||||
|
):
|
||||||
|
print(
|
||||||
|
'#################################\n'
|
||||||
|
'Simulating parent-side IPC BREAK!\n'
|
||||||
|
'#################################'
|
||||||
|
)
|
||||||
|
await stream._ctx.chan.send(None)
|
||||||
|
|
||||||
# it actually breaks right here in the
|
# it actually breaks right here in the
|
||||||
# mp_spawn/forkserver backends and thus the zombie
|
# mp_spawn/forkserver backends and thus the zombie
|
||||||
# reaper never even kicks in?
|
# reaper never even kicks in?
|
||||||
|
print(f'parent sending {i}')
|
||||||
await stream.send(i)
|
await stream.send(i)
|
||||||
|
|
||||||
with trio.move_on_after(2) as cs:
|
with trio.move_on_after(2) as cs:
|
||||||
|
|
||||||
|
# NOTE: in the parent side IPC failure case this
|
||||||
|
# will raise an ``EndOfChannel`` after the child
|
||||||
|
# is killed and sends a stop msg back to it's
|
||||||
|
# caller/this-parent.
|
||||||
rx = await stream.receive()
|
rx = await stream.receive()
|
||||||
print(f'I a mad user and here is what i got {rx}')
|
|
||||||
|
print(f"I'm a happy user and echoed to me is {rx}")
|
||||||
|
|
||||||
if cs.cancelled_caught:
|
if cs.cancelled_caught:
|
||||||
# pretend to be a user seeing no streaming action
|
# pretend to be a user seeing no streaming action
|
||||||
# thinking it's a hang, and then hitting ctl-c..
|
# thinking it's a hang, and then hitting ctl-c..
|
||||||
print("YOO i'm a user anddd thingz hangin.. CTRL-C..")
|
print("YOO i'm a user anddd thingz hangin..")
|
||||||
|
|
||||||
|
print(
|
||||||
|
"YOO i'm mad send side dun but thingz hangin..\n"
|
||||||
|
'MASHING CTlR-C Ctl-c..'
|
||||||
|
)
|
||||||
raise KeyboardInterrupt
|
raise KeyboardInterrupt
|
||||||
|
|
||||||
|
|
||||||
|
|
|
@ -3,6 +3,8 @@ Sketchy network blackoutz, ugly byzantine gens, puedes eschuchar la
|
||||||
cancelacion?..
|
cancelacion?..
|
||||||
|
|
||||||
'''
|
'''
|
||||||
|
from functools import partial
|
||||||
|
|
||||||
import pytest
|
import pytest
|
||||||
from _pytest.pathlib import import_path
|
from _pytest.pathlib import import_path
|
||||||
import trio
|
import trio
|
||||||
|
@ -15,11 +17,30 @@ from conftest import (
|
||||||
@pytest.mark.parametrize(
|
@pytest.mark.parametrize(
|
||||||
'debug_mode',
|
'debug_mode',
|
||||||
[False, True],
|
[False, True],
|
||||||
ids=['debug_mode', 'no_debug_mode'],
|
ids=['no_debug_mode', 'debug_mode'],
|
||||||
|
)
|
||||||
|
@pytest.mark.parametrize(
|
||||||
|
'ipc_break',
|
||||||
|
[
|
||||||
|
{},
|
||||||
|
{'break_parent_ipc': True},
|
||||||
|
{'break_child_ipc': True},
|
||||||
|
{
|
||||||
|
'break_child_ipc': True,
|
||||||
|
'break_parent_ipc': True,
|
||||||
|
},
|
||||||
|
],
|
||||||
|
ids=[
|
||||||
|
'no_break',
|
||||||
|
'break_parent',
|
||||||
|
'break_child',
|
||||||
|
'break_both',
|
||||||
|
],
|
||||||
)
|
)
|
||||||
def test_child_breaks_ipc_channel_during_stream(
|
def test_child_breaks_ipc_channel_during_stream(
|
||||||
debug_mode: bool,
|
debug_mode: bool,
|
||||||
spawn_backend: str,
|
spawn_backend: str,
|
||||||
|
ipc_break: dict | None,
|
||||||
):
|
):
|
||||||
'''
|
'''
|
||||||
Ensure we can (purposely) break IPC during streaming and it's still
|
Ensure we can (purposely) break IPC during streaming and it's still
|
||||||
|
@ -27,12 +48,12 @@ def test_child_breaks_ipc_channel_during_stream(
|
||||||
SIGINT.
|
SIGINT.
|
||||||
|
|
||||||
'''
|
'''
|
||||||
expect_final_exc = KeyboardInterrupt
|
|
||||||
|
|
||||||
if spawn_backend != 'trio':
|
if spawn_backend != 'trio':
|
||||||
if debug_mode:
|
if debug_mode:
|
||||||
pytest.skip('`debug_mode` only supported on `trio` spawner')
|
pytest.skip('`debug_mode` only supported on `trio` spawner')
|
||||||
|
|
||||||
|
# non-`trio` spawners should never hit the hang condition that
|
||||||
|
# requires the user to do ctl-c to cancel the actor tree.
|
||||||
expect_final_exc = trio.ClosedResourceError
|
expect_final_exc = trio.ClosedResourceError
|
||||||
|
|
||||||
mod = import_path(
|
mod = import_path(
|
||||||
|
@ -40,9 +61,29 @@ def test_child_breaks_ipc_channel_during_stream(
|
||||||
root=examples_dir(),
|
root=examples_dir(),
|
||||||
)
|
)
|
||||||
|
|
||||||
|
expect_final_exc = KeyboardInterrupt
|
||||||
|
|
||||||
|
# when ONLY the child breaks we expect the parent to get a closed
|
||||||
|
# resource error on the next `MsgStream.receive()` and then fail out
|
||||||
|
# and cancel the child from there.
|
||||||
|
if 'break_child_ipc' in ipc_break:
|
||||||
|
expect_final_exc = trio.ClosedResourceError
|
||||||
|
|
||||||
|
# when the parent IPC side dies (even if the child's does as well)
|
||||||
|
# we expect the channel to be sent a stop msg from the child at some
|
||||||
|
# point which will signal the parent that the stream has been
|
||||||
|
# terminated.
|
||||||
|
# NOTE: when the parent breaks "after" the child you get the above
|
||||||
|
# case as well, but it's not worth testing right?
|
||||||
|
if 'break_parent_ipc' in ipc_break:
|
||||||
|
expect_final_exc = trio.EndOfChannel
|
||||||
|
|
||||||
with pytest.raises(expect_final_exc):
|
with pytest.raises(expect_final_exc):
|
||||||
trio.run(
|
trio.run(
|
||||||
|
partial(
|
||||||
mod.main,
|
mod.main,
|
||||||
debug_mode,
|
debug_mode=debug_mode,
|
||||||
spawn_backend,
|
start_method=spawn_backend,
|
||||||
|
**ipc_break,
|
||||||
|
)
|
||||||
)
|
)
|
||||||
|
|
Loading…
Reference in New Issue