Mv `daemon` + `test_multi_program` to `discovery/`

All `daemon` fixture consumers are discovery-
protocol tests now living under `tests/discovery/`.
Move the fixture, its `_wait_for_daemon_ready`
helper, and `test_multi_program.py` into that subdir
so scope matches usage.

Also,
- add `pytestmark` for `track_orphaned_uds_per_test`
  + `detect_runaway_subactors_per_test` to `test_multi_program` as
    regression net.
- drop now-unused `_PROC_SPAWN_WAIT` + `socket` import from root
  conftest.

(this commit msg was generated in some part by [`claude-code`][claude-code-gh])
[claude-code-gh]: https://github.com/anthropics/claude-code
subint_forkserver_backend
Gud Boi 2026-05-06 13:23:42 -04:00
parent ec8c4659c4
commit c4082be876
3 changed files with 243 additions and 192 deletions

View File

@ -7,7 +7,6 @@ import sys
import subprocess import subprocess
import os import os
import signal import signal
import socket
import platform import platform
import time import time
from pathlib import Path from pathlib import Path
@ -35,15 +34,10 @@ if platform.system() == 'Windows':
_KILL_SIGNAL = signal.CTRL_BREAK_EVENT _KILL_SIGNAL = signal.CTRL_BREAK_EVENT
_INT_SIGNAL = signal.CTRL_C_EVENT _INT_SIGNAL = signal.CTRL_C_EVENT
_INT_RETURN_CODE = 3221225786 _INT_RETURN_CODE = 3221225786
_PROC_SPAWN_WAIT = 2
else: else:
_KILL_SIGNAL = signal.SIGKILL _KILL_SIGNAL = signal.SIGKILL
_INT_SIGNAL = signal.SIGINT _INT_SIGNAL = signal.SIGINT
_INT_RETURN_CODE = 1 if sys.version_info < (3, 8) else -signal.SIGINT.value _INT_RETURN_CODE = 1 if sys.version_info < (3, 8) else -signal.SIGINT.value
_PROC_SPAWN_WAIT = (
2 if _ci_env
else 1
)
no_windows = pytest.mark.skipif( no_windows = pytest.mark.skipif(
@ -245,191 +239,14 @@ def sig_prog(
assert ret assert ret
def _wait_for_daemon_ready( # NOTE, the `daemon` fixture (+ its `_wait_for_daemon_ready`
reg_addr: tuple, # helper + the post-yield teardown drain logic) has been
tpt_proto: str, # moved to `tests/discovery/conftest.py` since 100% of its
*, # consumers are discovery-protocol tests now living under
deadline: float = 10.0, # that subdir. See:
poll_interval: float = 0.05, # - `tests/discovery/test_multi_program.py`
proc: subprocess.Popen|None = None, # - `tests/discovery/test_registrar.py`
) -> None: # - `tests/discovery/test_tpt_bind_addrs.py`
'''
Active-poll the daemon's bind address until it
accepts a connection (proving it has called
`bind() + listen()` and is ready to handle IPC).
Replaces the historical blind `time.sleep()` in the
`daemon` fixture which was racy under load see
`ai/conc-anal/test_register_duplicate_name_daemon_connect_race_issue.md`.
Uses stdlib `socket` directly (no trio runtime
bootstrap cost) sufficient because
`tractor.run_daemon()` doesn't return from
bootstrap until the runtime is fully ready to
accept IPC.
Raises `TimeoutError` on `deadline` exceeded. If
`proc` is given, ALSO raises early if the daemon
process exits non-zero before the deadline (catches
daemon-startup-crash that the blind sleep used to
silently mask).
'''
end: float = time.monotonic() + deadline
last_exc: Exception|None = None
while time.monotonic() < end:
# Daemon-died-during-startup early-exit. Without
# this, a crashed-on-import daemon would just
# eat the full deadline before raising opaque
# TimeoutError.
if proc is not None and proc.poll() is not None:
raise RuntimeError(
f'Daemon proc exited (rc={proc.returncode}) '
f'before becoming ready to accept on '
f'{reg_addr!r}'
)
try:
if tpt_proto == 'tcp':
# `socket.create_connection` does the
# `socket() + connect()` dance with a
# builtin timeout — perfect primitive
# for a one-shot probe.
with socket.create_connection(
reg_addr,
timeout=poll_interval,
):
return
else:
# UDS — `reg_addr` is a `(filedir, sockname)`
# tuple per `tractor.ipc._uds.UDSAddress.unwrap`.
sockpath: str = os.path.join(*reg_addr)
sock = socket.socket(socket.AF_UNIX)
try:
sock.settimeout(poll_interval)
sock.connect(sockpath)
return
finally:
sock.close()
except (
ConnectionRefusedError,
FileNotFoundError,
OSError,
socket.timeout,
) as exc:
last_exc = exc
time.sleep(poll_interval)
raise TimeoutError(
f'Daemon never accepted on {reg_addr!r} within '
f'{deadline}s (last connect-attempt exc: '
f'{last_exc!r})'
)
# TODO: factor into @cm and move to `._testing`?
@pytest.fixture
def daemon(
debug_mode: bool,
loglevel: str,
testdir: pytest.Pytester,
reg_addr: tuple[str, int],
tpt_proto: str,
ci_env: bool,
test_log: tractor.log.StackLevelAdapter,
# set_fork_aware_capture,
) -> subprocess.Popen:
'''
Run a daemon root actor as a separate actor-process tree and
"remote registrar" for discovery-protocol related tests.
'''
# XXX: too much logging will lock up the subproc (smh)
if loglevel in ('trace', 'debug'):
test_log.warning(
f'Test harness log level is too verbose: {loglevel!r}\n'
f'Reducing to INFO level..'
)
loglevel: str = 'info'
code: str = (
"import tractor; "
"tractor.run_daemon([], "
"registry_addrs={reg_addrs}, "
"enable_transports={enable_tpts}, "
"debug_mode={debug_mode}, "
"loglevel={ll})"
).format(
reg_addrs=str([reg_addr]),
enable_tpts=str([tpt_proto]),
ll="'{}'".format(loglevel) if loglevel else None,
debug_mode=debug_mode,
)
cmd: list[str] = [
sys.executable,
'-c', code,
]
# breakpoint()
kwargs = {}
if platform.system() == 'Windows':
# without this, tests hang on windows forever
kwargs['creationflags'] = subprocess.CREATE_NEW_PROCESS_GROUP
proc: subprocess.Popen = testdir.popen(
cmd,
**kwargs,
)
# Active-poll the daemon's bind address until it's
# ready to accept connections — replaces the legacy
# blind `time.sleep(_PROC_SPAWN_WAIT + uds_bonus)`
# which was racy under load (see
# `ai/conc-anal/test_register_duplicate_name_daemon_connect_race_issue.md`).
#
# Per-test deadline scales with platform: macOS/CI
# gets extra headroom; Linux dev boxes need very
# little.
deadline: float = (
15.0 if (_non_linux and ci_env)
else 10.0
)
_wait_for_daemon_ready(
reg_addr=reg_addr,
tpt_proto=tpt_proto,
deadline=deadline,
proc=proc,
)
assert not proc.returncode
yield proc
sig_prog(proc, _INT_SIGNAL)
# XXX! yeah.. just be reaaal careful with this bc sometimes it
# can lock up on the `_io.BufferedReader` and hang..
stderr: str = proc.stderr.read().decode()
stdout: str = proc.stdout.read().decode()
if (
stderr
or
stdout
):
print(
f'Daemon actor tree produced output:\n'
f'{proc.args}\n'
f'\n'
f'stderr: {stderr!r}\n'
f'stdout: {stdout!r}\n'
)
if (rc := proc.returncode) != -2:
msg: str = (
f'Daemon actor tree was not cancelled !?\n'
f'proc.args: {proc.args!r}\n'
f'proc.returncode: {rc!r}\n'
)
if rc < 0:
raise RuntimeError(msg)
test_log.error(msg)
# @pytest.fixture(autouse=True) # @pytest.fixture(autouse=True)

View File

@ -0,0 +1,223 @@
'''
Discovery-suite fixtures, including the `daemon`
remote-registrar subprocess used by the multi-program
discovery tests.
Lives here (vs. the parent `tests/conftest.py`)
because `daemon` is a discovery-protocol primitive
boots a separate `tractor.run_daemon()` process whose
sole purpose is to serve as a registrar peer for
discovery-roundtrip tests. Pytest fixtures inherit
DOWNWARD through conftest hierarchy, so anything
under `tests/discovery/` automatically picks this up.
'''
from __future__ import annotations
import os
import platform
import socket
import subprocess
import sys
import time
import pytest
import tractor
from ..conftest import (
sig_prog,
_INT_SIGNAL,
_non_linux,
)
def _wait_for_daemon_ready(
reg_addr: tuple,
tpt_proto: str,
*,
deadline: float = 10.0,
poll_interval: float = 0.05,
proc: subprocess.Popen|None = None,
) -> None:
'''
Active-poll the daemon's bind address until it
accepts a connection (proving it has called
`bind() + listen()` and is ready to handle IPC).
Replaces the historical blind `time.sleep()` in the
`daemon` fixture which was racy under load see
`ai/conc-anal/test_register_duplicate_name_daemon_connect_race_issue.md`.
Uses stdlib `socket` directly (no trio runtime
bootstrap cost) sufficient because
`tractor.run_daemon()` doesn't return from
bootstrap until the runtime is fully ready to
accept IPC.
Raises `TimeoutError` on `deadline` exceeded. If
`proc` is given, ALSO raises early if the daemon
process exits non-zero before the deadline (catches
daemon-startup-crash that the blind sleep used to
silently mask).
'''
end: float = time.monotonic() + deadline
last_exc: Exception|None = None
while time.monotonic() < end:
# Daemon-died-during-startup early-exit. Without
# this, a crashed-on-import daemon would just
# eat the full deadline before raising opaque
# TimeoutError.
if proc is not None and proc.poll() is not None:
raise RuntimeError(
f'Daemon proc exited (rc={proc.returncode}) '
f'before becoming ready to accept on '
f'{reg_addr!r}'
)
try:
if tpt_proto == 'tcp':
# `socket.create_connection` does the
# `socket() + connect()` dance with a
# builtin timeout — perfect primitive
# for a one-shot probe.
with socket.create_connection(
reg_addr,
timeout=poll_interval,
):
return
else:
# UDS — `reg_addr` is a `(filedir, sockname)`
# tuple per `tractor.ipc._uds.UDSAddress.unwrap`.
sockpath: str = os.path.join(*reg_addr)
sock = socket.socket(socket.AF_UNIX)
try:
sock.settimeout(poll_interval)
sock.connect(sockpath)
return
finally:
sock.close()
except (
ConnectionRefusedError,
FileNotFoundError,
OSError,
socket.timeout,
) as exc:
last_exc = exc
time.sleep(poll_interval)
raise TimeoutError(
f'Daemon never accepted on {reg_addr!r} within '
f'{deadline}s (last connect-attempt exc: '
f'{last_exc!r})'
)
# TODO: factor into @cm and move to `._testing`?
@pytest.fixture
def daemon(
debug_mode: bool,
loglevel: str,
testdir: pytest.Pytester,
reg_addr: tuple[str, int],
tpt_proto: str,
ci_env: bool,
test_log: tractor.log.StackLevelAdapter,
) -> subprocess.Popen:
'''
Run a daemon root actor as a separate actor-process
tree and "remote registrar" for discovery-protocol
related tests.
'''
# XXX: too much logging will lock up the subproc (smh)
if loglevel in ('trace', 'debug'):
test_log.warning(
f'Test harness log level is too verbose: {loglevel!r}\n'
f'Reducing to INFO level..'
)
loglevel: str = 'info'
code: str = (
"import tractor; "
"tractor.run_daemon([], "
"registry_addrs={reg_addrs}, "
"enable_transports={enable_tpts}, "
"debug_mode={debug_mode}, "
"loglevel={ll})"
).format(
reg_addrs=str([reg_addr]),
enable_tpts=str([tpt_proto]),
ll="'{}'".format(loglevel) if loglevel else None,
debug_mode=debug_mode,
)
cmd: list[str] = [
sys.executable,
'-c', code,
]
kwargs = {}
if platform.system() == 'Windows':
# without this, tests hang on windows forever
kwargs['creationflags'] = subprocess.CREATE_NEW_PROCESS_GROUP
proc: subprocess.Popen = testdir.popen(
cmd,
**kwargs,
)
# Active-poll the daemon's bind address until it's
# ready to accept connections — replaces the legacy
# blind `time.sleep(2.2)` which was racy under load
# (see
# `ai/conc-anal/test_register_duplicate_name_daemon_connect_race_issue.md`).
#
# Per-test deadline scales with platform: macOS/CI
# gets extra headroom; Linux dev boxes need very
# little.
deadline: float = (
15.0 if (_non_linux and ci_env)
else 10.0
)
_wait_for_daemon_ready(
reg_addr=reg_addr,
tpt_proto=tpt_proto,
deadline=deadline,
proc=proc,
)
assert not proc.returncode
yield proc
sig_prog(proc, _INT_SIGNAL)
# XXX! yeah.. just be reaaal careful with this bc
# sometimes it can lock up on the `_io.BufferedReader`
# and hang..
#
# NB, drain happens at TEARDOWN (post-yield), so the
# test body has its chance to read `proc.stderr`
# FIRST. Reading here AFTER would silently swallow
# the daemon's stderr output and break tests that
# assert on it (e.g. `test_abort_on_sigint`).
stderr: str = proc.stderr.read().decode()
stdout: str = proc.stdout.read().decode()
if (
stderr
or
stdout
):
print(
f'Daemon actor tree produced output:\n'
f'{proc.args}\n'
f'\n'
f'stderr: {stderr!r}\n'
f'stdout: {stdout!r}\n'
)
if (rc := proc.returncode) != -2:
msg: str = (
f'Daemon actor tree was not cancelled !?\n'
f'proc.args: {proc.args!r}\n'
f'proc.returncode: {rc!r}\n'
)
if rc < 0:
raise RuntimeError(msg)
test_log.error(msg)

View File

@ -22,7 +22,7 @@ from tractor import (
Portal, Portal,
) )
from tractor.runtime import _state from tractor.runtime import _state
from .conftest import ( from ..conftest import (
sig_prog, sig_prog,
_INT_SIGNAL, _INT_SIGNAL,
_INT_RETURN_CODE, _INT_RETURN_CODE,
@ -38,6 +38,17 @@ if TYPE_CHECKING:
_non_linux: bool = platform.system() != 'Linux' _non_linux: bool = platform.system() != 'Linux'
# NOTE, multi-program tests historically triggered both
# UDS sock-file leaks (daemon-subproc SIGKILL paths) AND
# trio `WakeupSocketpair.drain()` busy-loops
# (`test_register_duplicate_name`). Track + detect
# per-test as a regression net.
pytestmark = pytest.mark.usefixtures(
'track_orphaned_uds_per_test',
'detect_runaway_subactors_per_test',
)
def test_abort_on_sigint( def test_abort_on_sigint(
daemon: subprocess.Popen, daemon: subprocess.Popen,
): ):