Compare commits

..

No commits in common. "f8178df0fdc0c93ea484f66cb7483bd6a347cd2c" and "205382a39be6ecfc602c5b8a69f4cbba25348721" have entirely different histories.

5 changed files with 42 additions and 247 deletions

View File

@ -603,50 +603,3 @@ def test_orphaned_subactor_sigint_cleanup_DRAFT(
proc.wait(timeout=2.0) proc.wait(timeout=2.0)
except subprocess.TimeoutExpired: except subprocess.TimeoutExpired:
pass pass
# ----------------------------------------------------------------
# regression guard: variant-2 (`subint_forkserver`) placeholder
# MUST raise `NotImplementedError` today — guards against future
# commits accidentally re-aliasing the key to the variant-1
# coroutine (which was a transient state during the rename).
# ----------------------------------------------------------------
def test_subint_forkserver_key_errors_cleanly() -> None:
'''
`--spawn-backend=subint_forkserver` is reserved for the
eventual variant-2 (subint-isolated child runtime)
backend, gated on jcrist/msgspec#1026 unblocking PEP 684
isolated-mode subints upstream.
Until that lands, the dispatch entry MUST raise
`NotImplementedError` immediately rather than silently
aliasing to `main_thread_forkserver_proc`. Verify the
error message also surfaces both the working-backend
pointer and the upstream-blocker ref so an operator
arriving at the error has somewhere to go.
'''
import asyncio
from tractor.spawn._spawn import _methods
proc = _methods['subint_forkserver']
with pytest.raises(NotImplementedError) as ei:
# signature args match `main_thread_forkserver_proc`'s
# — the stub raises before touching them so dummy
# values are fine.
asyncio.run(
proc(
'x', None, None, {}, [],
('127.0.0.1', 0), {},
)
)
msg: str = str(ei.value)
assert 'main_thread_forkserver' in msg, (
f'stub error msg should redirect to the working '
f'variant-1 backend; got: {msg!r}'
)
assert 'msgspec#1026' in msg or '1026' in msg, (
f'stub error msg should reference the upstream '
f'blocker (jcrist/msgspec#1026); got: {msg!r}'
)

View File

@ -5,7 +5,6 @@ Advanced streaming patterns using bidirectional streams and contexts.
from collections import Counter from collections import Counter
import itertools import itertools
import platform import platform
from typing import Type
import pytest import pytest
import trio import trio
@ -107,120 +106,61 @@ async def consumer(
print(f'{uid} got: {value}') print(f'{uid} got: {value}')
# NOTE: deliberately NOT using `@pytest.mark.timeout(...)` —
# both pytest-timeout enforcement modes break trio under
# fork-based backends:
#
# - `method='signal'` (SIGALRM): the handler synchronously
# raises `Failed` in trio's main thread mid-`epoll.poll()`,
# leaves `GLOBAL_RUN_CONTEXT` half-installed ("Trio guest
# run got abandoned"), and EVERY subsequent `trio.run()`
# in the same pytest process bails with
# `RuntimeError: Attempted to call run() from inside a
# run()` — session-wide poison.
#
# - `method='thread'`: calls `_thread.interrupt_main()`
# raising `KeyboardInterrupt` into the main thread. Under
# fork-based backends with mid-cascade fd-juggling the KBI
# can escape trio's `KIManager` and bubble out of pytest
# itself — kills the WHOLE session.
#
# Instead we use `trio.fail_after()` INSIDE `main()` below:
# trio's own `Cancelled`/`TooSlowError` machinery handles the
# timeout, cleanly unwinds the actor nursery's cancel
# cascade, and only fails the single test (no cross-test
# state corruption either way).
#
# `pyproject.toml`'s default `timeout = 200` is still a
# last-resort safety net.
@pytest.mark.parametrize(
'expect_cancel_exc', [
KeyboardInterrupt,
trio.TooSlowError,
],
ids=lambda item:
f'expect_user_exc_raised={item.__name__}'
)
def test_dynamic_pub_sub( def test_dynamic_pub_sub(
reg_addr: tuple, reg_addr: tuple,
debug_mode: bool, debug_mode: bool,
test_log: tractor.log.StackLevelAdapter, test_log: tractor.log.StackLevelAdapter,
reap_subactors_per_test: int,
expect_cancel_exc: Type[BaseException],
): ):
failed_to_raise_report: str = (
f'Never got a {expect_cancel_exc!r} ??'
)
global _registry global _registry
from multiprocessing import cpu_count from multiprocessing import cpu_count
cpus = cpu_count() cpus = cpu_count()
async def main(): async def main():
# Hard safety cap via trio's own cancellation — see async with tractor.open_nursery(
# the module-level NOTE on why we avoid `pytest-timeout` registry_addrs=[reg_addr],
# for this test. Total expected runtime: ~1s spawn + 3s debug_mode=debug_mode,
# sleep + ~1-2s cancel cascade ≈ 5-6s. 12s gives plenty ) as n:
# of headroom; if exceeded, trio raises `TooSlowError`
# which the outer `try` block treats as a hang report
# (or, if `expect_cancel_exc is trio.TooSlowError`, as
# the test passing — either way, no global state
# corruption).
with trio.fail_after(12):
async with tractor.open_nursery(
registry_addrs=[reg_addr],
debug_mode=debug_mode,
) as n:
# name of this actor will be same as target func # name of this actor will be same as target func
await n.run_in_actor(publisher) await n.run_in_actor(publisher)
for i, sub in zip( for i, sub in zip(
range(cpus - 2), range(cpus - 2),
itertools.cycle(_registry.keys()) itertools.cycle(_registry.keys())
): ):
await n.run_in_actor(
consumer,
name=f'consumer_{sub}',
subs=[sub],
)
# make one dynamic subscriber
await n.run_in_actor( await n.run_in_actor(
consumer, consumer,
name='consumer_dynamic', name=f'consumer_{sub}',
subs=list(_registry.keys()), subs=[sub],
) )
# block until "cancelled by user" # make one dynamic subscriber
await trio.sleep(3) await n.run_in_actor(
test_log.warning( consumer,
f'Raising user cancel exc: ' name='consumer_dynamic',
f'{expect_cancel_exc!r}' subs=list(_registry.keys()),
) )
raise expect_cancel_exc('simulate user cancel!')
# block until cancelled by user
with trio.fail_after(3):
await trio.sleep_forever()
try: try:
trio.run(main) trio.run(main)
pytest.fail(failed_to_raise_report) except (
except expect_cancel_exc: trio.TooSlowError,
# parent-side raised the user-cancel exc directly and ExceptionGroup,
# it propagated unwrapped; clean path. ) as err:
test_log.exception('Got user-cancel exc AS EXPECTED') if isinstance(err, ExceptionGroup):
except BaseExceptionGroup as err: for suberr in err.exceptions:
# under fork-based backends the user-raised cancel if isinstance(suberr, trio.TooSlowError):
# can race with subactor-side stream teardown break
# (`trio.EndOfChannel` from a publisher's `send()` else:
# whose remote half got cut). The expected exc may pytest.fail('Never got a `TooSlowError` ?')
# then be nested deeper in the group rather than at
# the top level. `BaseExceptionGroup.split()` walks
# the exc tree recursively (Python 3.11+).
matched, _ = err.split(expect_cancel_exc)
if matched is None:
pytest.fail(failed_to_raise_report)
test_log.exception('Got user-cancel exc AS EXPECTED') assert err
test_log.exception('Timed out AS EXPECTED')
@tractor.context @tractor.context
@ -352,6 +292,7 @@ def test_sigint_both_stream_types():
resp = await stream.receive() resp = await stream.receive()
assert resp == msg assert resp == msg
raise KeyboardInterrupt raise KeyboardInterrupt
try: try:
trio.run(main) trio.run(main)
assert 0, "Didn't receive KBI!?" assert 0, "Didn't receive KBI!?"
@ -421,12 +362,7 @@ async def inf_streamer(
print('streamer exited .open_streamer() block') print('streamer exited .open_streamer() block')
@pytest.mark.timeout(
6,
method='signal',
)
def test_local_task_fanout_from_stream( def test_local_task_fanout_from_stream(
reg_addr: tuple,
debug_mode: bool, debug_mode: bool,
): ):
''' '''
@ -491,9 +427,4 @@ def test_local_task_fanout_from_stream(
await p.cancel_actor() await p.cancel_actor()
async def w_timeout(): trio.run(main)
with trio.fail_after(6):
await main()
# trio.run(main)
trio.run(w_timeout)

View File

@ -32,22 +32,6 @@ from tractor.trionics import BroadcastReceiver
from tractor._testing import expect_ctxc from tractor._testing import expect_ctxc
# Per-test zombie-subactor reaper. Opt-in (NOT autouse) —
# see `tractor._testing.pytest.reap_subactors_per_test`'s
# docstring for the full rationale. This module specifically
# needs it because tests like
# `test_echoserver_detailed_mechanics[KeyboardInterrupt]`
# and the `test_sigint_closes_lifetime_stack[*]` matrix have
# been observed to hang past pytest's wall-clock under
# `main_thread_forkserver`, leaving subactor forks that
# squat on registrar resources and cascade-fail every
# subsequent test (`test_inter_peer_cancellation`,
# `test_legacy_one_way_streaming`, etc.).
pytestmark = pytest.mark.usefixtures(
'reap_subactors_per_test',
)
@pytest.fixture( @pytest.fixture(
scope='module', scope='module',
) )

View File

@ -22,7 +22,6 @@ Might be eventually useful to expose as a util set from
our `tractor.discovery` subsys? our `tractor.discovery` subsys?
''' '''
import os
import random import random
from typing import ( from typing import (
Type, Type,
@ -32,28 +31,17 @@ from tractor.discovery import _addr
def get_rando_addr( def get_rando_addr(
tpt_proto: str, tpt_proto: str,
*,
# choose random port at import time
_rando_port: str = random.randint(1000, 9999)
) -> tuple[str, str|int]: ) -> tuple[str, str|int]:
''' '''
Used to globally override the runtime to the Used to globally override the runtime to the
per-test-session-dynamic addr so that all tests never conflict per-test-session-dynamic addr so that all tests never conflict
with any other actor tree using the default. with any other actor tree using the default.
Cross-process isolation: TCP-port picks salt
`random.randint()` with `os.getpid()` so two parallel
pytest sessions (e.g. one running `--tpt-proto=tcp` and
another `--tpt-proto=uds` concurrently) almost-never
collide on the same port. Without the salt, the prior
impl's import-time `random.randint(1000, 9999)` default
arg was effectively a process-singleton with a 1/9000
chance of cross-run collision per pair and when it
happened EVERY `reg_addr`-using test in BOTH runs would
fight over the bind, cascading into a chain of
"Address already in use" failures.
For UDS this concern doesn't apply: `UDSAddress.get_random()`
already builds socket paths from `os.getpid()` so each
pytest process gets its own socket-path namespace.
''' '''
addr_type: Type[_addr.Addres] = _addr._address_types[tpt_proto] addr_type: Type[_addr.Addres] = _addr._address_types[tpt_proto]
def_reg_addr: tuple[str, int] = _addr._default_lo_addrs[tpt_proto] def_reg_addr: tuple[str, int] = _addr._default_lo_addrs[tpt_proto]
@ -63,21 +51,9 @@ def get_rando_addr(
testrun_reg_addr: tuple[str, int|str] testrun_reg_addr: tuple[str, int|str]
match tpt_proto: match tpt_proto:
case 'tcp': case 'tcp':
# Per-call randomness mixed with `os.getpid()` —
# see the docstring above for the cross-process
# isolation rationale. The mix means:
# - within one pytest session, two calls return
# distinct ports (good for tests that need a
# second-different-reg-addr in one fn body, e.g.
# `test_tpt_bind_addrs::bind-subset-reg`),
# - across parallel pytest sessions, the pid bias
# makes coincident port choices unlikely.
port: int = 1000 + (
random.randint(0, 8999) + os.getpid()
) % 9000
testrun_reg_addr = ( testrun_reg_addr = (
addr_type.def_bindspace, addr_type.def_bindspace,
port, _rando_port,
) )
# NOTE, file-name uniqueness (no-collisions) will be based on # NOTE, file-name uniqueness (no-collisions) will be based on

View File

@ -327,55 +327,6 @@ def _reap_orphaned_subactors():
reap(pids, grace=3.0) reap(pids, grace=3.0)
@pytest.fixture
def reap_subactors_per_test() -> int:
'''
Per-test (function-scoped) zombie-subactor reaper
**opt-in**, NOT autouse.
When a test's teardown fails to fully cancel its actor
tree (e.g. an asyncio cancel-cascade times out under
`main_thread_forkserver`, pytest hits its 200s wall-
clock and abandons), the leftover subactor lingers as a
direct child of `pytest` and squats on whatever
registrar port / UDS path / shm segment it had bound.
Subsequent tests trying to allocate the same resource
fail and with backends that bind a session-shared
`reg_addr`, that means EVERY following test in the
suite cascades. The session-scoped sibling
(`_reap_orphaned_subactors`) only kicks in at session
end which is too late to save the cascade.
Apply at module-level on the topically-problematic
test files via:
```python
pytestmark = pytest.mark.usefixtures(
'reap_subactors_per_test',
)
```
Or per-test via the same `usefixtures` mark on a
specific function. Intentionally NOT autouse so the
fixture's presence on a module signals "this module's
teardown is known-leaky enough to contaminate
siblings"; the visibility helps future-us track down
root causes rather than burying them under blanket
cleanup.
'''
import os
parent_pid: int = os.getpid()
yield parent_pid
from tractor._testing._reap import (
find_descendants,
reap,
)
pids: list[int] = find_descendants(parent_pid)
if pids:
reap(pids, grace=3.0)
@pytest.fixture(scope='session') @pytest.fixture(scope='session')
def debug_mode( def debug_mode(
request: pytest.FixtureRequest, request: pytest.FixtureRequest,