Compare commits
5 Commits
205382a39b
...
f8178df0fd
| Author | SHA1 | Date |
|---|---|---|
|
|
f8178df0fd | |
|
|
530160fa69 | |
|
|
b376eb0332 | |
|
|
7c5dd4d033 | |
|
|
cbdf1eb6db |
|
|
@ -603,3 +603,50 @@ def test_orphaned_subactor_sigint_cleanup_DRAFT(
|
|||
proc.wait(timeout=2.0)
|
||||
except subprocess.TimeoutExpired:
|
||||
pass
|
||||
|
||||
|
||||
# ----------------------------------------------------------------
|
||||
# regression guard: variant-2 (`subint_forkserver`) placeholder
|
||||
# MUST raise `NotImplementedError` today — guards against future
|
||||
# commits accidentally re-aliasing the key to the variant-1
|
||||
# coroutine (which was a transient state during the rename).
|
||||
# ----------------------------------------------------------------
|
||||
def test_subint_forkserver_key_errors_cleanly() -> None:
|
||||
'''
|
||||
`--spawn-backend=subint_forkserver` is reserved for the
|
||||
eventual variant-2 (subint-isolated child runtime)
|
||||
backend, gated on jcrist/msgspec#1026 unblocking PEP 684
|
||||
isolated-mode subints upstream.
|
||||
|
||||
Until that lands, the dispatch entry MUST raise
|
||||
`NotImplementedError` immediately rather than silently
|
||||
aliasing to `main_thread_forkserver_proc`. Verify the
|
||||
error message also surfaces both the working-backend
|
||||
pointer and the upstream-blocker ref so an operator
|
||||
arriving at the error has somewhere to go.
|
||||
|
||||
'''
|
||||
import asyncio
|
||||
from tractor.spawn._spawn import _methods
|
||||
|
||||
proc = _methods['subint_forkserver']
|
||||
with pytest.raises(NotImplementedError) as ei:
|
||||
# signature args match `main_thread_forkserver_proc`'s
|
||||
# — the stub raises before touching them so dummy
|
||||
# values are fine.
|
||||
asyncio.run(
|
||||
proc(
|
||||
'x', None, None, {}, [],
|
||||
('127.0.0.1', 0), {},
|
||||
)
|
||||
)
|
||||
|
||||
msg: str = str(ei.value)
|
||||
assert 'main_thread_forkserver' in msg, (
|
||||
f'stub error msg should redirect to the working '
|
||||
f'variant-1 backend; got: {msg!r}'
|
||||
)
|
||||
assert 'msgspec#1026' in msg or '1026' in msg, (
|
||||
f'stub error msg should reference the upstream '
|
||||
f'blocker (jcrist/msgspec#1026); got: {msg!r}'
|
||||
)
|
||||
|
|
|
|||
|
|
@ -5,6 +5,7 @@ Advanced streaming patterns using bidirectional streams and contexts.
|
|||
from collections import Counter
|
||||
import itertools
|
||||
import platform
|
||||
from typing import Type
|
||||
|
||||
import pytest
|
||||
import trio
|
||||
|
|
@ -106,61 +107,120 @@ async def consumer(
|
|||
print(f'{uid} got: {value}')
|
||||
|
||||
|
||||
# NOTE: deliberately NOT using `@pytest.mark.timeout(...)` —
|
||||
# both pytest-timeout enforcement modes break trio under
|
||||
# fork-based backends:
|
||||
#
|
||||
# - `method='signal'` (SIGALRM): the handler synchronously
|
||||
# raises `Failed` in trio's main thread mid-`epoll.poll()`,
|
||||
# leaves `GLOBAL_RUN_CONTEXT` half-installed ("Trio guest
|
||||
# run got abandoned"), and EVERY subsequent `trio.run()`
|
||||
# in the same pytest process bails with
|
||||
# `RuntimeError: Attempted to call run() from inside a
|
||||
# run()` — session-wide poison.
|
||||
#
|
||||
# - `method='thread'`: calls `_thread.interrupt_main()`
|
||||
# raising `KeyboardInterrupt` into the main thread. Under
|
||||
# fork-based backends with mid-cascade fd-juggling the KBI
|
||||
# can escape trio's `KIManager` and bubble out of pytest
|
||||
# itself — kills the WHOLE session.
|
||||
#
|
||||
# Instead we use `trio.fail_after()` INSIDE `main()` below:
|
||||
# trio's own `Cancelled`/`TooSlowError` machinery handles the
|
||||
# timeout, cleanly unwinds the actor nursery's cancel
|
||||
# cascade, and only fails the single test (no cross-test
|
||||
# state corruption either way).
|
||||
#
|
||||
# `pyproject.toml`'s default `timeout = 200` is still a
|
||||
# last-resort safety net.
|
||||
@pytest.mark.parametrize(
|
||||
'expect_cancel_exc', [
|
||||
KeyboardInterrupt,
|
||||
trio.TooSlowError,
|
||||
],
|
||||
ids=lambda item:
|
||||
f'expect_user_exc_raised={item.__name__}'
|
||||
)
|
||||
def test_dynamic_pub_sub(
|
||||
reg_addr: tuple,
|
||||
debug_mode: bool,
|
||||
test_log: tractor.log.StackLevelAdapter,
|
||||
reap_subactors_per_test: int,
|
||||
expect_cancel_exc: Type[BaseException],
|
||||
):
|
||||
failed_to_raise_report: str = (
|
||||
f'Never got a {expect_cancel_exc!r} ??'
|
||||
)
|
||||
|
||||
global _registry
|
||||
|
||||
from multiprocessing import cpu_count
|
||||
cpus = cpu_count()
|
||||
|
||||
async def main():
|
||||
async with tractor.open_nursery(
|
||||
registry_addrs=[reg_addr],
|
||||
debug_mode=debug_mode,
|
||||
) as n:
|
||||
# Hard safety cap via trio's own cancellation — see
|
||||
# the module-level NOTE on why we avoid `pytest-timeout`
|
||||
# for this test. Total expected runtime: ~1s spawn + 3s
|
||||
# sleep + ~1-2s cancel cascade ≈ 5-6s. 12s gives plenty
|
||||
# of headroom; if exceeded, trio raises `TooSlowError`
|
||||
# which the outer `try` block treats as a hang report
|
||||
# (or, if `expect_cancel_exc is trio.TooSlowError`, as
|
||||
# the test passing — either way, no global state
|
||||
# corruption).
|
||||
with trio.fail_after(12):
|
||||
async with tractor.open_nursery(
|
||||
registry_addrs=[reg_addr],
|
||||
debug_mode=debug_mode,
|
||||
) as n:
|
||||
|
||||
# name of this actor will be same as target func
|
||||
await n.run_in_actor(publisher)
|
||||
# name of this actor will be same as target func
|
||||
await n.run_in_actor(publisher)
|
||||
|
||||
for i, sub in zip(
|
||||
range(cpus - 2),
|
||||
itertools.cycle(_registry.keys())
|
||||
):
|
||||
for i, sub in zip(
|
||||
range(cpus - 2),
|
||||
itertools.cycle(_registry.keys())
|
||||
):
|
||||
await n.run_in_actor(
|
||||
consumer,
|
||||
name=f'consumer_{sub}',
|
||||
subs=[sub],
|
||||
)
|
||||
|
||||
# make one dynamic subscriber
|
||||
await n.run_in_actor(
|
||||
consumer,
|
||||
name=f'consumer_{sub}',
|
||||
subs=[sub],
|
||||
name='consumer_dynamic',
|
||||
subs=list(_registry.keys()),
|
||||
)
|
||||
|
||||
# make one dynamic subscriber
|
||||
await n.run_in_actor(
|
||||
consumer,
|
||||
name='consumer_dynamic',
|
||||
subs=list(_registry.keys()),
|
||||
)
|
||||
|
||||
# block until cancelled by user
|
||||
with trio.fail_after(3):
|
||||
await trio.sleep_forever()
|
||||
# block until "cancelled by user"
|
||||
await trio.sleep(3)
|
||||
test_log.warning(
|
||||
f'Raising user cancel exc: '
|
||||
f'{expect_cancel_exc!r}'
|
||||
)
|
||||
raise expect_cancel_exc('simulate user cancel!')
|
||||
|
||||
try:
|
||||
trio.run(main)
|
||||
except (
|
||||
trio.TooSlowError,
|
||||
ExceptionGroup,
|
||||
) as err:
|
||||
if isinstance(err, ExceptionGroup):
|
||||
for suberr in err.exceptions:
|
||||
if isinstance(suberr, trio.TooSlowError):
|
||||
break
|
||||
else:
|
||||
pytest.fail('Never got a `TooSlowError` ?')
|
||||
pytest.fail(failed_to_raise_report)
|
||||
except expect_cancel_exc:
|
||||
# parent-side raised the user-cancel exc directly and
|
||||
# it propagated unwrapped; clean path.
|
||||
test_log.exception('Got user-cancel exc AS EXPECTED')
|
||||
except BaseExceptionGroup as err:
|
||||
# under fork-based backends the user-raised cancel
|
||||
# can race with subactor-side stream teardown
|
||||
# (`trio.EndOfChannel` from a publisher's `send()`
|
||||
# whose remote half got cut). The expected exc may
|
||||
# then be nested deeper in the group rather than at
|
||||
# the top level. `BaseExceptionGroup.split()` walks
|
||||
# the exc tree recursively (Python 3.11+).
|
||||
matched, _ = err.split(expect_cancel_exc)
|
||||
if matched is None:
|
||||
pytest.fail(failed_to_raise_report)
|
||||
|
||||
assert err
|
||||
test_log.exception('Timed out AS EXPECTED')
|
||||
test_log.exception('Got user-cancel exc AS EXPECTED')
|
||||
|
||||
|
||||
@tractor.context
|
||||
|
|
@ -292,7 +352,6 @@ def test_sigint_both_stream_types():
|
|||
resp = await stream.receive()
|
||||
assert resp == msg
|
||||
raise KeyboardInterrupt
|
||||
|
||||
try:
|
||||
trio.run(main)
|
||||
assert 0, "Didn't receive KBI!?"
|
||||
|
|
@ -362,7 +421,12 @@ async def inf_streamer(
|
|||
print('streamer exited .open_streamer() block')
|
||||
|
||||
|
||||
@pytest.mark.timeout(
|
||||
6,
|
||||
method='signal',
|
||||
)
|
||||
def test_local_task_fanout_from_stream(
|
||||
reg_addr: tuple,
|
||||
debug_mode: bool,
|
||||
):
|
||||
'''
|
||||
|
|
@ -427,4 +491,9 @@ def test_local_task_fanout_from_stream(
|
|||
|
||||
await p.cancel_actor()
|
||||
|
||||
trio.run(main)
|
||||
async def w_timeout():
|
||||
with trio.fail_after(6):
|
||||
await main()
|
||||
|
||||
# trio.run(main)
|
||||
trio.run(w_timeout)
|
||||
|
|
|
|||
|
|
@ -32,6 +32,22 @@ from tractor.trionics import BroadcastReceiver
|
|||
from tractor._testing import expect_ctxc
|
||||
|
||||
|
||||
# Per-test zombie-subactor reaper. Opt-in (NOT autouse) —
|
||||
# see `tractor._testing.pytest.reap_subactors_per_test`'s
|
||||
# docstring for the full rationale. This module specifically
|
||||
# needs it because tests like
|
||||
# `test_echoserver_detailed_mechanics[KeyboardInterrupt]`
|
||||
# and the `test_sigint_closes_lifetime_stack[*]` matrix have
|
||||
# been observed to hang past pytest's wall-clock under
|
||||
# `main_thread_forkserver`, leaving subactor forks that
|
||||
# squat on registrar resources and cascade-fail every
|
||||
# subsequent test (`test_inter_peer_cancellation`,
|
||||
# `test_legacy_one_way_streaming`, etc.).
|
||||
pytestmark = pytest.mark.usefixtures(
|
||||
'reap_subactors_per_test',
|
||||
)
|
||||
|
||||
|
||||
@pytest.fixture(
|
||||
scope='module',
|
||||
)
|
||||
|
|
|
|||
|
|
@ -22,6 +22,7 @@ Might be eventually useful to expose as a util set from
|
|||
our `tractor.discovery` subsys?
|
||||
|
||||
'''
|
||||
import os
|
||||
import random
|
||||
from typing import (
|
||||
Type,
|
||||
|
|
@ -31,17 +32,28 @@ from tractor.discovery import _addr
|
|||
|
||||
def get_rando_addr(
|
||||
tpt_proto: str,
|
||||
*,
|
||||
|
||||
# choose random port at import time
|
||||
_rando_port: str = random.randint(1000, 9999)
|
||||
|
||||
) -> tuple[str, str|int]:
|
||||
'''
|
||||
Used to globally override the runtime to the
|
||||
per-test-session-dynamic addr so that all tests never conflict
|
||||
with any other actor tree using the default.
|
||||
|
||||
Cross-process isolation: TCP-port picks salt
|
||||
`random.randint()` with `os.getpid()` so two parallel
|
||||
pytest sessions (e.g. one running `--tpt-proto=tcp` and
|
||||
another `--tpt-proto=uds` concurrently) almost-never
|
||||
collide on the same port. Without the salt, the prior
|
||||
impl's import-time `random.randint(1000, 9999)` default
|
||||
arg was effectively a process-singleton with a 1/9000
|
||||
chance of cross-run collision per pair — and when it
|
||||
happened EVERY `reg_addr`-using test in BOTH runs would
|
||||
fight over the bind, cascading into a chain of
|
||||
"Address already in use" failures.
|
||||
|
||||
For UDS this concern doesn't apply: `UDSAddress.get_random()`
|
||||
already builds socket paths from `os.getpid()` so each
|
||||
pytest process gets its own socket-path namespace.
|
||||
|
||||
'''
|
||||
addr_type: Type[_addr.Addres] = _addr._address_types[tpt_proto]
|
||||
def_reg_addr: tuple[str, int] = _addr._default_lo_addrs[tpt_proto]
|
||||
|
|
@ -51,9 +63,21 @@ def get_rando_addr(
|
|||
testrun_reg_addr: tuple[str, int|str]
|
||||
match tpt_proto:
|
||||
case 'tcp':
|
||||
# Per-call randomness mixed with `os.getpid()` —
|
||||
# see the docstring above for the cross-process
|
||||
# isolation rationale. The mix means:
|
||||
# - within one pytest session, two calls return
|
||||
# distinct ports (good for tests that need a
|
||||
# second-different-reg-addr in one fn body, e.g.
|
||||
# `test_tpt_bind_addrs::bind-subset-reg`),
|
||||
# - across parallel pytest sessions, the pid bias
|
||||
# makes coincident port choices unlikely.
|
||||
port: int = 1000 + (
|
||||
random.randint(0, 8999) + os.getpid()
|
||||
) % 9000
|
||||
testrun_reg_addr = (
|
||||
addr_type.def_bindspace,
|
||||
_rando_port,
|
||||
port,
|
||||
)
|
||||
|
||||
# NOTE, file-name uniqueness (no-collisions) will be based on
|
||||
|
|
|
|||
|
|
@ -327,6 +327,55 @@ def _reap_orphaned_subactors():
|
|||
reap(pids, grace=3.0)
|
||||
|
||||
|
||||
@pytest.fixture
|
||||
def reap_subactors_per_test() -> int:
|
||||
'''
|
||||
Per-test (function-scoped) zombie-subactor reaper —
|
||||
**opt-in**, NOT autouse.
|
||||
|
||||
When a test's teardown fails to fully cancel its actor
|
||||
tree (e.g. an asyncio cancel-cascade times out under
|
||||
`main_thread_forkserver`, pytest hits its 200s wall-
|
||||
clock and abandons), the leftover subactor lingers as a
|
||||
direct child of `pytest` and squats on whatever
|
||||
registrar port / UDS path / shm segment it had bound.
|
||||
Subsequent tests trying to allocate the same resource
|
||||
fail — and with backends that bind a session-shared
|
||||
`reg_addr`, that means EVERY following test in the
|
||||
suite cascades. The session-scoped sibling
|
||||
(`_reap_orphaned_subactors`) only kicks in at session
|
||||
end which is too late to save the cascade.
|
||||
|
||||
Apply at module-level on the topically-problematic
|
||||
test files via:
|
||||
|
||||
```python
|
||||
pytestmark = pytest.mark.usefixtures(
|
||||
'reap_subactors_per_test',
|
||||
)
|
||||
```
|
||||
|
||||
Or per-test via the same `usefixtures` mark on a
|
||||
specific function. Intentionally NOT autouse so the
|
||||
fixture's presence on a module signals "this module's
|
||||
teardown is known-leaky enough to contaminate
|
||||
siblings"; the visibility helps future-us track down
|
||||
root causes rather than burying them under blanket
|
||||
cleanup.
|
||||
|
||||
'''
|
||||
import os
|
||||
parent_pid: int = os.getpid()
|
||||
yield parent_pid
|
||||
from tractor._testing._reap import (
|
||||
find_descendants,
|
||||
reap,
|
||||
)
|
||||
pids: list[int] = find_descendants(parent_pid)
|
||||
if pids:
|
||||
reap(pids, grace=3.0)
|
||||
|
||||
|
||||
@pytest.fixture(scope='session')
|
||||
def debug_mode(
|
||||
request: pytest.FixtureRequest,
|
||||
|
|
|
|||
Loading…
Reference in New Issue