diff --git a/tests/conftest.py b/tests/conftest.py index aadb1fb6..b198aee0 100644 --- a/tests/conftest.py +++ b/tests/conftest.py @@ -7,7 +7,6 @@ import sys import subprocess import os import signal -import socket import platform import time from pathlib import Path @@ -35,15 +34,10 @@ if platform.system() == 'Windows': _KILL_SIGNAL = signal.CTRL_BREAK_EVENT _INT_SIGNAL = signal.CTRL_C_EVENT _INT_RETURN_CODE = 3221225786 - _PROC_SPAWN_WAIT = 2 else: _KILL_SIGNAL = signal.SIGKILL _INT_SIGNAL = signal.SIGINT _INT_RETURN_CODE = 1 if sys.version_info < (3, 8) else -signal.SIGINT.value - _PROC_SPAWN_WAIT = ( - 2 if _ci_env - else 1 - ) no_windows = pytest.mark.skipif( @@ -245,191 +239,14 @@ def sig_prog( assert ret -def _wait_for_daemon_ready( - reg_addr: tuple, - tpt_proto: str, - *, - deadline: float = 10.0, - poll_interval: float = 0.05, - proc: subprocess.Popen|None = None, -) -> None: - ''' - Active-poll the daemon's bind address until it - accepts a connection (proving it has called - `bind() + listen()` and is ready to handle IPC). - - Replaces the historical blind `time.sleep()` in the - `daemon` fixture which was racy under load — see - `ai/conc-anal/test_register_duplicate_name_daemon_connect_race_issue.md`. - - Uses stdlib `socket` directly (no trio runtime - bootstrap cost) — sufficient because - `tractor.run_daemon()` doesn't return from - bootstrap until the runtime is fully ready to - accept IPC. - - Raises `TimeoutError` on `deadline` exceeded. If - `proc` is given, ALSO raises early if the daemon - process exits non-zero before the deadline (catches - daemon-startup-crash that the blind sleep used to - silently mask). - - ''' - end: float = time.monotonic() + deadline - last_exc: Exception|None = None - while time.monotonic() < end: - # Daemon-died-during-startup early-exit. Without - # this, a crashed-on-import daemon would just - # eat the full deadline before raising opaque - # TimeoutError. - if proc is not None and proc.poll() is not None: - raise RuntimeError( - f'Daemon proc exited (rc={proc.returncode}) ' - f'before becoming ready to accept on ' - f'{reg_addr!r}' - ) - try: - if tpt_proto == 'tcp': - # `socket.create_connection` does the - # `socket() + connect()` dance with a - # builtin timeout — perfect primitive - # for a one-shot probe. - with socket.create_connection( - reg_addr, - timeout=poll_interval, - ): - return - else: - # UDS — `reg_addr` is a `(filedir, sockname)` - # tuple per `tractor.ipc._uds.UDSAddress.unwrap`. - sockpath: str = os.path.join(*reg_addr) - sock = socket.socket(socket.AF_UNIX) - try: - sock.settimeout(poll_interval) - sock.connect(sockpath) - return - finally: - sock.close() - except ( - ConnectionRefusedError, - FileNotFoundError, - OSError, - socket.timeout, - ) as exc: - last_exc = exc - time.sleep(poll_interval) - raise TimeoutError( - f'Daemon never accepted on {reg_addr!r} within ' - f'{deadline}s (last connect-attempt exc: ' - f'{last_exc!r})' - ) - - -# TODO: factor into @cm and move to `._testing`? -@pytest.fixture -def daemon( - debug_mode: bool, - loglevel: str, - testdir: pytest.Pytester, - reg_addr: tuple[str, int], - tpt_proto: str, - ci_env: bool, - test_log: tractor.log.StackLevelAdapter, - # set_fork_aware_capture, - -) -> subprocess.Popen: - ''' - Run a daemon root actor as a separate actor-process tree and - "remote registrar" for discovery-protocol related tests. - - ''' - # XXX: too much logging will lock up the subproc (smh) - if loglevel in ('trace', 'debug'): - test_log.warning( - f'Test harness log level is too verbose: {loglevel!r}\n' - f'Reducing to INFO level..' - ) - loglevel: str = 'info' - - code: str = ( - "import tractor; " - "tractor.run_daemon([], " - "registry_addrs={reg_addrs}, " - "enable_transports={enable_tpts}, " - "debug_mode={debug_mode}, " - "loglevel={ll})" - ).format( - reg_addrs=str([reg_addr]), - enable_tpts=str([tpt_proto]), - ll="'{}'".format(loglevel) if loglevel else None, - debug_mode=debug_mode, - ) - cmd: list[str] = [ - sys.executable, - '-c', code, - ] - # breakpoint() - kwargs = {} - if platform.system() == 'Windows': - # without this, tests hang on windows forever - kwargs['creationflags'] = subprocess.CREATE_NEW_PROCESS_GROUP - - proc: subprocess.Popen = testdir.popen( - cmd, - **kwargs, - ) - - # Active-poll the daemon's bind address until it's - # ready to accept connections — replaces the legacy - # blind `time.sleep(_PROC_SPAWN_WAIT + uds_bonus)` - # which was racy under load (see - # `ai/conc-anal/test_register_duplicate_name_daemon_connect_race_issue.md`). - # - # Per-test deadline scales with platform: macOS/CI - # gets extra headroom; Linux dev boxes need very - # little. - deadline: float = ( - 15.0 if (_non_linux and ci_env) - else 10.0 - ) - _wait_for_daemon_ready( - reg_addr=reg_addr, - tpt_proto=tpt_proto, - deadline=deadline, - proc=proc, - ) - - assert not proc.returncode - yield proc - sig_prog(proc, _INT_SIGNAL) - - # XXX! yeah.. just be reaaal careful with this bc sometimes it - # can lock up on the `_io.BufferedReader` and hang.. - stderr: str = proc.stderr.read().decode() - stdout: str = proc.stdout.read().decode() - if ( - stderr - or - stdout - ): - print( - f'Daemon actor tree produced output:\n' - f'{proc.args}\n' - f'\n' - f'stderr: {stderr!r}\n' - f'stdout: {stdout!r}\n' - ) - - if (rc := proc.returncode) != -2: - msg: str = ( - f'Daemon actor tree was not cancelled !?\n' - f'proc.args: {proc.args!r}\n' - f'proc.returncode: {rc!r}\n' - ) - if rc < 0: - raise RuntimeError(msg) - - test_log.error(msg) +# NOTE, the `daemon` fixture (+ its `_wait_for_daemon_ready` +# helper + the post-yield teardown drain logic) has been +# moved to `tests/discovery/conftest.py` since 100% of its +# consumers are discovery-protocol tests now living under +# that subdir. See: +# - `tests/discovery/test_multi_program.py` +# - `tests/discovery/test_registrar.py` +# - `tests/discovery/test_tpt_bind_addrs.py` # @pytest.fixture(autouse=True) diff --git a/tests/discovery/conftest.py b/tests/discovery/conftest.py new file mode 100644 index 00000000..73749a6d --- /dev/null +++ b/tests/discovery/conftest.py @@ -0,0 +1,223 @@ +''' +Discovery-suite fixtures, including the `daemon` +remote-registrar subprocess used by the multi-program +discovery tests. + +Lives here (vs. the parent `tests/conftest.py`) +because `daemon` is a discovery-protocol primitive — +boots a separate `tractor.run_daemon()` process whose +sole purpose is to serve as a registrar peer for +discovery-roundtrip tests. Pytest fixtures inherit +DOWNWARD through conftest hierarchy, so anything +under `tests/discovery/` automatically picks this up. + +''' +from __future__ import annotations +import os +import platform +import socket +import subprocess +import sys +import time + +import pytest +import tractor + +from ..conftest import ( + sig_prog, + _INT_SIGNAL, + _non_linux, +) + + +def _wait_for_daemon_ready( + reg_addr: tuple, + tpt_proto: str, + *, + deadline: float = 10.0, + poll_interval: float = 0.05, + proc: subprocess.Popen|None = None, +) -> None: + ''' + Active-poll the daemon's bind address until it + accepts a connection (proving it has called + `bind() + listen()` and is ready to handle IPC). + + Replaces the historical blind `time.sleep()` in the + `daemon` fixture which was racy under load — see + `ai/conc-anal/test_register_duplicate_name_daemon_connect_race_issue.md`. + + Uses stdlib `socket` directly (no trio runtime + bootstrap cost) — sufficient because + `tractor.run_daemon()` doesn't return from + bootstrap until the runtime is fully ready to + accept IPC. + + Raises `TimeoutError` on `deadline` exceeded. If + `proc` is given, ALSO raises early if the daemon + process exits non-zero before the deadline (catches + daemon-startup-crash that the blind sleep used to + silently mask). + + ''' + end: float = time.monotonic() + deadline + last_exc: Exception|None = None + while time.monotonic() < end: + # Daemon-died-during-startup early-exit. Without + # this, a crashed-on-import daemon would just + # eat the full deadline before raising opaque + # TimeoutError. + if proc is not None and proc.poll() is not None: + raise RuntimeError( + f'Daemon proc exited (rc={proc.returncode}) ' + f'before becoming ready to accept on ' + f'{reg_addr!r}' + ) + try: + if tpt_proto == 'tcp': + # `socket.create_connection` does the + # `socket() + connect()` dance with a + # builtin timeout — perfect primitive + # for a one-shot probe. + with socket.create_connection( + reg_addr, + timeout=poll_interval, + ): + return + else: + # UDS — `reg_addr` is a `(filedir, sockname)` + # tuple per `tractor.ipc._uds.UDSAddress.unwrap`. + sockpath: str = os.path.join(*reg_addr) + sock = socket.socket(socket.AF_UNIX) + try: + sock.settimeout(poll_interval) + sock.connect(sockpath) + return + finally: + sock.close() + except ( + ConnectionRefusedError, + FileNotFoundError, + OSError, + socket.timeout, + ) as exc: + last_exc = exc + time.sleep(poll_interval) + raise TimeoutError( + f'Daemon never accepted on {reg_addr!r} within ' + f'{deadline}s (last connect-attempt exc: ' + f'{last_exc!r})' + ) + + +# TODO: factor into @cm and move to `._testing`? +@pytest.fixture +def daemon( + debug_mode: bool, + loglevel: str, + testdir: pytest.Pytester, + reg_addr: tuple[str, int], + tpt_proto: str, + ci_env: bool, + test_log: tractor.log.StackLevelAdapter, + +) -> subprocess.Popen: + ''' + Run a daemon root actor as a separate actor-process + tree and "remote registrar" for discovery-protocol + related tests. + + ''' + # XXX: too much logging will lock up the subproc (smh) + if loglevel in ('trace', 'debug'): + test_log.warning( + f'Test harness log level is too verbose: {loglevel!r}\n' + f'Reducing to INFO level..' + ) + loglevel: str = 'info' + + code: str = ( + "import tractor; " + "tractor.run_daemon([], " + "registry_addrs={reg_addrs}, " + "enable_transports={enable_tpts}, " + "debug_mode={debug_mode}, " + "loglevel={ll})" + ).format( + reg_addrs=str([reg_addr]), + enable_tpts=str([tpt_proto]), + ll="'{}'".format(loglevel) if loglevel else None, + debug_mode=debug_mode, + ) + cmd: list[str] = [ + sys.executable, + '-c', code, + ] + kwargs = {} + if platform.system() == 'Windows': + # without this, tests hang on windows forever + kwargs['creationflags'] = subprocess.CREATE_NEW_PROCESS_GROUP + + proc: subprocess.Popen = testdir.popen( + cmd, + **kwargs, + ) + + # Active-poll the daemon's bind address until it's + # ready to accept connections — replaces the legacy + # blind `time.sleep(2.2)` which was racy under load + # (see + # `ai/conc-anal/test_register_duplicate_name_daemon_connect_race_issue.md`). + # + # Per-test deadline scales with platform: macOS/CI + # gets extra headroom; Linux dev boxes need very + # little. + deadline: float = ( + 15.0 if (_non_linux and ci_env) + else 10.0 + ) + _wait_for_daemon_ready( + reg_addr=reg_addr, + tpt_proto=tpt_proto, + deadline=deadline, + proc=proc, + ) + + assert not proc.returncode + yield proc + sig_prog(proc, _INT_SIGNAL) + + # XXX! yeah.. just be reaaal careful with this bc + # sometimes it can lock up on the `_io.BufferedReader` + # and hang.. + # + # NB, drain happens at TEARDOWN (post-yield), so the + # test body has its chance to read `proc.stderr` + # FIRST. Reading here AFTER would silently swallow + # the daemon's stderr output and break tests that + # assert on it (e.g. `test_abort_on_sigint`). + stderr: str = proc.stderr.read().decode() + stdout: str = proc.stdout.read().decode() + if ( + stderr + or + stdout + ): + print( + f'Daemon actor tree produced output:\n' + f'{proc.args}\n' + f'\n' + f'stderr: {stderr!r}\n' + f'stdout: {stdout!r}\n' + ) + + if (rc := proc.returncode) != -2: + msg: str = ( + f'Daemon actor tree was not cancelled !?\n' + f'proc.args: {proc.args!r}\n' + f'proc.returncode: {rc!r}\n' + ) + if rc < 0: + raise RuntimeError(msg) + + test_log.error(msg) diff --git a/tests/test_multi_program.py b/tests/discovery/test_multi_program.py similarity index 92% rename from tests/test_multi_program.py rename to tests/discovery/test_multi_program.py index 5894ee70..7d8b24f1 100644 --- a/tests/test_multi_program.py +++ b/tests/discovery/test_multi_program.py @@ -22,7 +22,7 @@ from tractor import ( Portal, ) from tractor.runtime import _state -from .conftest import ( +from ..conftest import ( sig_prog, _INT_SIGNAL, _INT_RETURN_CODE, @@ -38,6 +38,17 @@ if TYPE_CHECKING: _non_linux: bool = platform.system() != 'Linux' +# NOTE, multi-program tests historically triggered both +# UDS sock-file leaks (daemon-subproc SIGKILL paths) AND +# trio `WakeupSocketpair.drain()` busy-loops +# (`test_register_duplicate_name`). Track + detect +# per-test as a regression net. +pytestmark = pytest.mark.usefixtures( + 'track_orphaned_uds_per_test', + 'detect_runaway_subactors_per_test', +) + + def test_abort_on_sigint( daemon: subprocess.Popen, ):