forked from goodboy/tractor
249 lines
6.9 KiB
Python
249 lines
6.9 KiB
Python
'''
|
|
Sketchy network blackoutz, ugly byzantine gens, puedes eschuchar la
|
|
cancelacion?..
|
|
|
|
'''
|
|
import itertools
|
|
from functools import partial
|
|
from types import ModuleType
|
|
|
|
import pytest
|
|
from _pytest.pathlib import import_path
|
|
import trio
|
|
import tractor
|
|
from tractor._testing import (
|
|
examples_dir,
|
|
)
|
|
|
|
|
|
@pytest.mark.parametrize(
|
|
'pre_aclose_msgstream',
|
|
[
|
|
False,
|
|
True,
|
|
],
|
|
ids=[
|
|
'no_msgstream_aclose',
|
|
'pre_aclose_msgstream',
|
|
],
|
|
)
|
|
@pytest.mark.parametrize(
|
|
'ipc_break',
|
|
[
|
|
# no breaks
|
|
{
|
|
'break_parent_ipc_after': False,
|
|
'break_child_ipc_after': False,
|
|
},
|
|
|
|
# only parent breaks
|
|
{
|
|
'break_parent_ipc_after': 500,
|
|
'break_child_ipc_after': False,
|
|
},
|
|
|
|
# only child breaks
|
|
{
|
|
'break_parent_ipc_after': False,
|
|
'break_child_ipc_after': 500,
|
|
},
|
|
|
|
# both: break parent first
|
|
{
|
|
'break_parent_ipc_after': 500,
|
|
'break_child_ipc_after': 800,
|
|
},
|
|
# both: break child first
|
|
{
|
|
'break_parent_ipc_after': 800,
|
|
'break_child_ipc_after': 500,
|
|
},
|
|
|
|
],
|
|
ids=[
|
|
'no_break',
|
|
'break_parent',
|
|
'break_child',
|
|
'break_both_parent_first',
|
|
'break_both_child_first',
|
|
],
|
|
)
|
|
def test_ipc_channel_break_during_stream(
|
|
debug_mode: bool,
|
|
loglevel: str,
|
|
spawn_backend: str,
|
|
ipc_break: dict|None,
|
|
pre_aclose_msgstream: bool,
|
|
):
|
|
'''
|
|
Ensure we can have an IPC channel break its connection during
|
|
streaming and it's still possible for the (simulated) user to kill
|
|
the actor tree using SIGINT.
|
|
|
|
We also verify the type of connection error expected in the parent
|
|
depending on which side if the IPC breaks first.
|
|
|
|
'''
|
|
if spawn_backend != 'trio':
|
|
# if debug_mode:
|
|
# pytest.skip('`debug_mode` only supported on `trio` spawner')
|
|
|
|
# non-`trio` spawners should never hit the hang condition that
|
|
# requires the user to do ctl-c to cancel the actor tree.
|
|
expect_final_exc = trio.ClosedResourceError
|
|
|
|
mod: ModuleType = import_path(
|
|
examples_dir() / 'advanced_faults' / 'ipc_failure_during_stream.py',
|
|
root=examples_dir(),
|
|
)
|
|
|
|
# by def we expect KBI from user after a simulated "hang
|
|
# period" wherein the user eventually hits ctl-c to kill the
|
|
# root-actor tree.
|
|
expect_final_exc: BaseException = KeyboardInterrupt
|
|
if (
|
|
# only expect EoC if trans is broken on the child side,
|
|
ipc_break['break_child_ipc_after'] is not False
|
|
# AND we tell the child to call `MsgStream.aclose()`.
|
|
and pre_aclose_msgstream
|
|
):
|
|
expect_final_exc = trio.EndOfChannel
|
|
|
|
# NOTE when ONLY the child breaks or it breaks BEFORE the
|
|
# parent we expect the parent to get a closed resource error
|
|
# on the next `MsgStream.receive()` and then fail out and
|
|
# cancel the child from there.
|
|
#
|
|
# ONLY CHILD breaks
|
|
if (
|
|
ipc_break['break_child_ipc_after']
|
|
and
|
|
ipc_break['break_parent_ipc_after'] is False
|
|
):
|
|
expect_final_exc = trio.ClosedResourceError
|
|
|
|
# if child calls `MsgStream.aclose()` then expect EoC.
|
|
if pre_aclose_msgstream:
|
|
expect_final_exc = trio.EndOfChannel
|
|
|
|
# BOTH but, CHILD breaks FIRST
|
|
elif (
|
|
ipc_break['break_child_ipc_after'] is not False
|
|
and (
|
|
ipc_break['break_parent_ipc_after']
|
|
> ipc_break['break_child_ipc_after']
|
|
)
|
|
):
|
|
expect_final_exc = trio.ClosedResourceError
|
|
|
|
# child will send a 'stop' msg before it breaks
|
|
# the transport channel.
|
|
if pre_aclose_msgstream:
|
|
expect_final_exc = trio.EndOfChannel
|
|
|
|
# NOTE when the parent IPC side dies (even if the child's does as well
|
|
# but the child fails BEFORE the parent) we always expect the
|
|
# IPC layer to raise a closed-resource, NEVER do we expect
|
|
# a stop msg since the parent-side ctx apis will error out
|
|
# IMMEDIATELY before the child ever sends any 'stop' msg.
|
|
#
|
|
# ONLY PARENT breaks
|
|
elif (
|
|
ipc_break['break_parent_ipc_after']
|
|
and
|
|
ipc_break['break_child_ipc_after'] is False
|
|
):
|
|
expect_final_exc = trio.ClosedResourceError
|
|
|
|
# BOTH but, PARENT breaks FIRST
|
|
elif (
|
|
ipc_break['break_parent_ipc_after'] is not False
|
|
and (
|
|
ipc_break['break_child_ipc_after']
|
|
> ipc_break['break_parent_ipc_after']
|
|
)
|
|
):
|
|
expect_final_exc = trio.ClosedResourceError
|
|
|
|
with pytest.raises(
|
|
expected_exception=(
|
|
expect_final_exc,
|
|
ExceptionGroup,
|
|
),
|
|
) as excinfo:
|
|
try:
|
|
trio.run(
|
|
partial(
|
|
mod.main,
|
|
debug_mode=debug_mode,
|
|
start_method=spawn_backend,
|
|
loglevel=loglevel,
|
|
pre_close=pre_aclose_msgstream,
|
|
**ipc_break,
|
|
)
|
|
)
|
|
except KeyboardInterrupt as kbi:
|
|
_err = kbi
|
|
if expect_final_exc is not KeyboardInterrupt:
|
|
pytest.fail(
|
|
'Rxed unexpected KBI !?\n'
|
|
f'{repr(kbi)}'
|
|
)
|
|
|
|
raise
|
|
|
|
# get raw instance from pytest wrapper
|
|
value = excinfo.value
|
|
if isinstance(value, ExceptionGroup):
|
|
value = next(
|
|
itertools.dropwhile(
|
|
lambda exc: not isinstance(exc, expect_final_exc),
|
|
value.exceptions,
|
|
)
|
|
)
|
|
assert value
|
|
|
|
|
|
@tractor.context
|
|
async def break_ipc_after_started(
|
|
ctx: tractor.Context,
|
|
) -> None:
|
|
await ctx.started()
|
|
async with ctx.open_stream() as stream:
|
|
await stream.aclose()
|
|
await trio.sleep(0.2)
|
|
await ctx.chan.send(None)
|
|
print('child broke IPC and terminating')
|
|
|
|
|
|
def test_stream_closed_right_after_ipc_break_and_zombie_lord_engages():
|
|
'''
|
|
Verify that is a subactor's IPC goes down just after bringing up a stream
|
|
the parent can trigger a SIGINT and the child will be reaped out-of-IPC by
|
|
the localhost process supervision machinery: aka "zombie lord".
|
|
|
|
'''
|
|
async def main():
|
|
async with tractor.open_nursery() as n:
|
|
portal = await n.start_actor(
|
|
'ipc_breaker',
|
|
enable_modules=[__name__],
|
|
)
|
|
|
|
with trio.move_on_after(1):
|
|
async with (
|
|
portal.open_context(
|
|
break_ipc_after_started
|
|
) as (ctx, sent),
|
|
):
|
|
async with ctx.open_stream():
|
|
await trio.sleep(0.5)
|
|
|
|
print('parent waiting on context')
|
|
|
|
print('parent exited context')
|
|
raise KeyboardInterrupt
|
|
|
|
with pytest.raises(KeyboardInterrupt):
|
|
trio.run(main)
|