Harden `test_infected_asyncio` for fork spawners
Deats, - `test_echoserver_detailed_mechanics`: add `is_forking_spawner` param, wrap `main()` in `fa_main()` with per-backend `trio.fail_after` (4s fork / 1s trio) to cap cancel-cascade teardown that compounds under forkserver. - `test_sigint_closes_lifetime_stack`: swap `start_method` param for `is_forking_spawner`, pre-init `tmp_file`/`ctx` to `None` so KBI firing before `open_context` body doesn't `UnboundLocalError`, add `pytest.fail` guard for the spawn-time IPC race case, arm `signal.alarm` AFK-safety cap (10s) under fork backends Also, - `pytestmark`: add `track_orphaned_uds_per_test` + `detect_runaway_subactors_per_test` fixtures. - `delay()`: hardcode `return 1e3` at top (debug override still in place). (this patch was generated in some part by [`claude-code`][claude-code-gh]) [claude-code-gh]: https://github.com/anthropics/claude-codesubint_forkserver_backend
parent
b10011a36e
commit
7ee0dc2e8f
|
|
@ -45,6 +45,12 @@ from tractor._testing import expect_ctxc
|
||||||
# `test_legacy_one_way_streaming`, etc.).
|
# `test_legacy_one_way_streaming`, etc.).
|
||||||
pytestmark = pytest.mark.usefixtures(
|
pytestmark = pytest.mark.usefixtures(
|
||||||
'reap_subactors_per_test',
|
'reap_subactors_per_test',
|
||||||
|
# NOTE, asyncio cancel cascade has historically
|
||||||
|
# triggered both UDS sockfile leaks (SIGKILL path)
|
||||||
|
# AND the trio `WakeupSocketpair.drain()` busy-loop
|
||||||
|
# — see `test_aio_simple_error`'s history.
|
||||||
|
'track_orphaned_uds_per_test',
|
||||||
|
'detect_runaway_subactors_per_test',
|
||||||
)
|
)
|
||||||
|
|
||||||
|
|
||||||
|
|
@ -52,6 +58,7 @@ pytestmark = pytest.mark.usefixtures(
|
||||||
scope='module',
|
scope='module',
|
||||||
)
|
)
|
||||||
def delay(debug_mode: bool) -> int:
|
def delay(debug_mode: bool) -> int:
|
||||||
|
return 1e3
|
||||||
if debug_mode:
|
if debug_mode:
|
||||||
return 999
|
return 999
|
||||||
else:
|
else:
|
||||||
|
|
@ -826,13 +833,19 @@ async def trio_to_aio_echo_server(
|
||||||
|
|
||||||
@pytest.mark.parametrize(
|
@pytest.mark.parametrize(
|
||||||
'raise_error_mid_stream',
|
'raise_error_mid_stream',
|
||||||
[False, Exception, KeyboardInterrupt],
|
[
|
||||||
|
False,
|
||||||
|
Exception,
|
||||||
|
KeyboardInterrupt,
|
||||||
|
],
|
||||||
ids='raise_error={}'.format,
|
ids='raise_error={}'.format,
|
||||||
)
|
)
|
||||||
def test_echoserver_detailed_mechanics(
|
def test_echoserver_detailed_mechanics(
|
||||||
reg_addr: tuple[str, int],
|
reg_addr: tuple[str, int],
|
||||||
debug_mode: bool,
|
debug_mode: bool,
|
||||||
raise_error_mid_stream,
|
raise_error_mid_stream,
|
||||||
|
|
||||||
|
is_forking_spawner: bool,
|
||||||
):
|
):
|
||||||
async def main():
|
async def main():
|
||||||
async with tractor.open_nursery(
|
async with tractor.open_nursery(
|
||||||
|
|
@ -880,12 +893,34 @@ def test_echoserver_detailed_mechanics(
|
||||||
# is cancelled by kbi or out of task cancellation
|
# is cancelled by kbi or out of task cancellation
|
||||||
await p.cancel_actor()
|
await p.cancel_actor()
|
||||||
|
|
||||||
|
# NOTE: under fork-based backends the cancel-cascade
|
||||||
|
# path is structurally slower than `trio`'s subproc-exec
|
||||||
|
# (per-spawn forkserver-handshake compounds during
|
||||||
|
# teardown). Bump the cap so cross-test contamination
|
||||||
|
# doesn't flake this — see
|
||||||
|
# `ai/conc-anal/cancel_cascade_too_slow_under_main_thread_forkserver_issue.md`.
|
||||||
|
timeout: float = (
|
||||||
|
999 if tractor.debug_mode()
|
||||||
|
else 4 if is_forking_spawner
|
||||||
|
else 1
|
||||||
|
)
|
||||||
|
with_timeout: bool = (
|
||||||
|
True
|
||||||
|
# False
|
||||||
|
)
|
||||||
|
async def fa_main():
|
||||||
|
if with_timeout:
|
||||||
|
with trio.fail_after(timeout):
|
||||||
|
await main()
|
||||||
|
else:
|
||||||
|
await main()
|
||||||
|
|
||||||
if raise_error_mid_stream:
|
if raise_error_mid_stream:
|
||||||
with pytest.raises(raise_error_mid_stream):
|
with pytest.raises(raise_error_mid_stream):
|
||||||
trio.run(main)
|
trio.run(fa_main)
|
||||||
|
|
||||||
else:
|
else:
|
||||||
trio.run(main)
|
trio.run(fa_main)
|
||||||
|
|
||||||
|
|
||||||
@tractor.context
|
@tractor.context
|
||||||
|
|
@ -1038,7 +1073,7 @@ def test_sigint_closes_lifetime_stack(
|
||||||
bg_aio_task: bool,
|
bg_aio_task: bool,
|
||||||
trio_side_is_shielded: bool,
|
trio_side_is_shielded: bool,
|
||||||
send_sigint_to: str,
|
send_sigint_to: str,
|
||||||
start_method: str,
|
is_forking_spawner: bool,
|
||||||
):
|
):
|
||||||
'''
|
'''
|
||||||
Ensure that an infected child can use the `Actor.lifetime_stack`
|
Ensure that an infected child can use the `Actor.lifetime_stack`
|
||||||
|
|
@ -1053,6 +1088,14 @@ def test_sigint_closes_lifetime_stack(
|
||||||
if debug_mode
|
if debug_mode
|
||||||
else 1
|
else 1
|
||||||
)
|
)
|
||||||
|
# pre-init so the `except (KeyboardInterrupt, ContextCancelled)`
|
||||||
|
# handler below doesn't `UnboundLocalError` if KBI fires BEFORE
|
||||||
|
# we ever enter the `as (ctx, first)` body (e.g. when
|
||||||
|
# `p.open_context().__aenter__` is hung waiting for the
|
||||||
|
# subactor's `StartAck` due to a fork-child IPC race —
|
||||||
|
# see `dynamic_pub_sub_spawn_time_transport_close_under_mtf_issue.md`).
|
||||||
|
tmp_file: Path|None = None
|
||||||
|
ctx: tractor.Context|None = None
|
||||||
try:
|
try:
|
||||||
an: tractor.ActorNursery
|
an: tractor.ActorNursery
|
||||||
async with tractor.open_nursery(
|
async with tractor.open_nursery(
|
||||||
|
|
@ -1078,7 +1121,7 @@ def test_sigint_closes_lifetime_stack(
|
||||||
) as (ctx, first):
|
) as (ctx, first):
|
||||||
|
|
||||||
path_str, cpid = first
|
path_str, cpid = first
|
||||||
tmp_file: Path = Path(path_str)
|
tmp_file = Path(path_str)
|
||||||
assert tmp_file.exists()
|
assert tmp_file.exists()
|
||||||
|
|
||||||
# XXX originally to simulate what (hopefully)
|
# XXX originally to simulate what (hopefully)
|
||||||
|
|
@ -1129,7 +1172,7 @@ def test_sigint_closes_lifetime_stack(
|
||||||
if (
|
if (
|
||||||
send_sigint_to == 'child'
|
send_sigint_to == 'child'
|
||||||
and
|
and
|
||||||
start_method == 'main_thread_forkserver'
|
is_forking_spawner
|
||||||
):
|
):
|
||||||
pytest.xfail(
|
pytest.xfail(
|
||||||
reason=(
|
reason=(
|
||||||
|
|
@ -1156,6 +1199,21 @@ def test_sigint_closes_lifetime_stack(
|
||||||
KeyboardInterrupt,
|
KeyboardInterrupt,
|
||||||
ContextCancelled,
|
ContextCancelled,
|
||||||
):
|
):
|
||||||
|
# If we got here BEFORE entering the ctx body (e.g.
|
||||||
|
# spawn-time IPC race hung `open_context.__aenter__` and
|
||||||
|
# the AFK-guard `signal.alarm` fired KBI from outside the
|
||||||
|
# trio loop), `tmp_file`/`ctx` are still `None` — surface
|
||||||
|
# that fact directly instead of `UnboundLocalError`.
|
||||||
|
if tmp_file is None:
|
||||||
|
pytest.fail(
|
||||||
|
'KBI/ctxc fired BEFORE `p.open_context()` returned '
|
||||||
|
"the child's `started` value — likely fork-child "
|
||||||
|
'IPC race; see '
|
||||||
|
'`ai/conc-anal/'
|
||||||
|
'dynamic_pub_sub_spawn_time_transport_close_'
|
||||||
|
'under_mtf_issue.md`'
|
||||||
|
)
|
||||||
|
|
||||||
# XXX CASE 2: without the bug fixed, in the
|
# XXX CASE 2: without the bug fixed, in the
|
||||||
# KBI-raised-in-parent case, the actor teardown should
|
# KBI-raised-in-parent case, the actor teardown should
|
||||||
# never get run (silently abaondoned by `asyncio`..) and
|
# never get run (silently abaondoned by `asyncio`..) and
|
||||||
|
|
@ -1163,7 +1221,32 @@ def test_sigint_closes_lifetime_stack(
|
||||||
assert not tmp_file.exists()
|
assert not tmp_file.exists()
|
||||||
assert ctx.maybe_error
|
assert ctx.maybe_error
|
||||||
|
|
||||||
trio.run(main)
|
# outer signal-based AFK-safety guard. mirrors the pattern in
|
||||||
|
# `tests/test_advanced_streaming.py::test_dynamic_pub_sub`: when
|
||||||
|
# the in-band trio cancel path doesn't fire (e.g. parent is
|
||||||
|
# parked in a shielded `await` inside actor-nursery teardown, or
|
||||||
|
# `open_context.__aenter__` hangs waiting for a child's
|
||||||
|
# `StartAck` that never comes), `signal.alarm` raises KBI in the
|
||||||
|
# main thread regardless of trio's scope state. This caps the
|
||||||
|
# absolute wall-clock so an AFK run can't sit for an hour on a
|
||||||
|
# forkserver-launchpad-contamination hang. Only armed under fork-
|
||||||
|
# based backends since the bug class is MTF-specific.
|
||||||
|
_AFK_CAP_S: int = (
|
||||||
|
999 if debug_mode
|
||||||
|
else 10
|
||||||
|
)
|
||||||
|
armed_alarm: bool = (
|
||||||
|
not debug_mode
|
||||||
|
and
|
||||||
|
is_forking_spawner
|
||||||
|
)
|
||||||
|
if armed_alarm:
|
||||||
|
signal.alarm(_AFK_CAP_S)
|
||||||
|
try:
|
||||||
|
trio.run(main)
|
||||||
|
finally:
|
||||||
|
if armed_alarm:
|
||||||
|
signal.alarm(0)
|
||||||
|
|
||||||
|
|
||||||
|
|
||||||
|
|
|
||||||
Loading…
Reference in New Issue