Start protoyping multi-transport testing

Such that we can run (opting-in) tests on both TCP and UDS backends and
ensure the `reg_addr` fixture and various timeouts are adjusted
accordingly.

Impl deats,
- add a new `tpc_proto` CLI option and fixture to allow choosing which
  "transport protocol" will be used in the test suites (either globally
  or contextually).
- rm `_reg_addr` instead opting for a `_rando_port` which will only be
  used for `reg_addr`s which are net-tpt-protos.
- rejig `reg_addr` fixture to set a ideally session-unique `testrun_reg_addr`
  based on the `tpt_proto` setting making appropriate calls to `._addr`
  APIs as needed.
- refine `daemon` fixture a bit with typing, `tpt_proto` timings, and
  stderr capture.
- in `test_discovery` do a ton of type-annots, add `debug_mode` fixture
  opt ins, augment `spawn_and_check_registry()` with `psutil.Process`
  passing for introspection (when things go wrong..).
leslies_extra_appendix
Tyler Goodlet 2025-04-02 22:40:28 -04:00
parent 10f9b505ee
commit dc68ea4118
2 changed files with 143 additions and 30 deletions

View File

@ -1,6 +1,8 @@
"""
``tractor`` testing!!
Top level of the testing suites!
"""
from __future__ import annotations
import sys
import subprocess
import os
@ -30,7 +32,11 @@ else:
_KILL_SIGNAL = signal.SIGKILL
_INT_SIGNAL = signal.SIGINT
_INT_RETURN_CODE = 1 if sys.version_info < (3, 8) else -signal.SIGINT.value
_PROC_SPAWN_WAIT = 0.6 if sys.version_info < (3, 7) else 0.4
_PROC_SPAWN_WAIT = (
0.6
if sys.version_info < (3, 7)
else 0.4
)
no_windows = pytest.mark.skipif(
@ -67,6 +73,15 @@ def pytest_addoption(parser):
),
)
parser.addoption(
"--tpt-proto",
action="store",
dest='tpt_proto',
# default='tcp', # TODO, mk this default!
default='uds',
help="Transport protocol to use under the `tractor.ipc.Channel`",
)
def pytest_configure(config):
backend = config.option.spawn_backend
@ -74,7 +89,7 @@ def pytest_configure(config):
@pytest.fixture(scope='session')
def debug_mode(request):
def debug_mode(request) -> bool:
debug_mode: bool = request.config.option.tractor_debug_mode
# if debug_mode:
# breakpoint()
@ -95,6 +110,15 @@ def spawn_backend(request) -> str:
return request.config.option.spawn_backend
@pytest.fixture(scope='session')
def tpt_proto(request) -> str:
proto_key: str = request.config.option.tpt_proto
# XXX ensure we support the protocol by name
addr_type = tractor._addr._address_types[proto_key]
assert addr_type.proto_key == proto_key
yield proto_key
# @pytest.fixture(scope='function', autouse=True)
# def debug_enabled(request) -> str:
# from tractor import _state
@ -119,22 +143,38 @@ def ci_env() -> bool:
# branch?
#
# choose randomly at import time
_reg_addr: tuple[str, int] = (
'127.0.0.1',
random.randint(1000, 9999),
)
_rando_port: str = random.randint(1000, 9999)
@pytest.fixture(scope='session')
def reg_addr() -> tuple[str, int]:
def reg_addr(
tpt_proto: str,
) -> tuple[str, int]:
# globally override the runtime to the per-test-session-dynamic
# addr so that all tests never conflict with any other actor
# tree using the default.
from tractor import _root
_root._default_lo_addrs = [_reg_addr]
from tractor import (
_addr,
)
tpt_proto: str = _addr.preferred_transport
addr_type = _addr._address_types[tpt_proto]
def_reg_addr: tuple[str, int] = _addr._default_lo_addrs[tpt_proto]
return _reg_addr
testrun_reg_addr: tuple[str, int]
match tpt_proto:
case 'tcp':
testrun_reg_addr = (
addr_type.def_bindspace,
_rando_port,
)
case 'uds':
# NOTE, uniqueness will be based on the pid
testrun_reg_addr = addr_type.get_random().unwrap()
# testrun_reg_addr = def_reg_addr
assert def_reg_addr != testrun_reg_addr
return testrun_reg_addr
def pytest_generate_tests(metafunc):
@ -151,13 +191,25 @@ def pytest_generate_tests(metafunc):
'trio',
)
# NOTE: used to be used to dyanmically parametrize tests for when
# NOTE: used-to-be-used-to dyanmically parametrize tests for when
# you just passed --spawn-backend=`mp` on the cli, but now we expect
# that cli input to be manually specified, BUT, maybe we'll do
# something like this again in the future?
if 'start_method' in metafunc.fixturenames:
metafunc.parametrize("start_method", [spawn_backend], scope='module')
metafunc.parametrize(
"start_method",
[spawn_backend],
scope='module',
)
# TODO, is this better then parametrizing the fixture above?
# spawn_backend = metafunc.config.option.tpt_backend
# if 'tpt_proto' in metafunc.fixturenames:
# metafunc.parametrize(
# 'tpt_proto',
# [spawn_backend],
# scope='module',
# )
# TODO: a way to let test scripts (like from `examples/`)
# guarantee they won't registry addr collide!
@ -171,25 +223,32 @@ def pytest_generate_tests(metafunc):
# )
def sig_prog(proc, sig):
def sig_prog(
proc: subprocess.Popen,
sig: int,
canc_timeout: float = 0.1,
) -> int:
"Kill the actor-process with ``sig``."
proc.send_signal(sig)
time.sleep(0.1)
time.sleep(canc_timeout)
if not proc.poll():
# TODO: why sometimes does SIGINT not work on teardown?
# seems to happen only when trace logging enabled?
proc.send_signal(_KILL_SIGNAL)
ret = proc.wait()
ret: int = proc.wait()
assert ret
# TODO: factor into @cm and move to `._testing`?
@pytest.fixture
def daemon(
debug_mode: bool,
loglevel: str,
testdir,
reg_addr: tuple[str, int],
):
tpt_proto: str,
) -> subprocess.Popen:
'''
Run a daemon root actor as a separate actor-process tree and
"remote registrar" for discovery-protocol related tests.
@ -201,27 +260,61 @@ def daemon(
code: str = (
"import tractor; "
"tractor.run_daemon([], registry_addrs={reg_addrs}, loglevel={ll})"
"tractor.run_daemon([], "
"registry_addrs={reg_addrs}, "
"debug_mode={debug_mode}, "
"loglevel={ll})"
).format(
reg_addrs=str([reg_addr]),
ll="'{}'".format(loglevel) if loglevel else None,
debug_mode=debug_mode,
)
cmd: list[str] = [
sys.executable,
'-c', code,
]
# breakpoint()
kwargs = {}
if platform.system() == 'Windows':
# without this, tests hang on windows forever
kwargs['creationflags'] = subprocess.CREATE_NEW_PROCESS_GROUP
proc = testdir.popen(
proc: subprocess.Popen = testdir.popen(
cmd,
stdout=subprocess.PIPE,
stderr=subprocess.PIPE,
**kwargs,
)
assert not proc.returncode
# UDS sockets are **really** fast to bind()/listen()/connect()
# so it's often required that we delay a bit more starting
# the first actor-tree..
if tpt_proto == 'uds':
_PROC_SPAWN_WAIT: float = 0.6
time.sleep(_PROC_SPAWN_WAIT)
assert not proc.returncode
yield proc
sig_prog(proc, _INT_SIGNAL)
# XXX! yeah.. just be reaaal careful with this bc sometimes it
# can lock up on the `_io.BufferedReader` and hang..
stderr: str = proc.stderr.read().decode()
if stderr:
print(
f'Daemon actor tree produced STDERR:\n'
f'{proc.args}\n'
f'\n'
f'{stderr}\n'
)
if proc.returncode != -2:
raise RuntimeError(
'Daemon actor tree failed !?\n'
f'{proc.args}\n'
)
# @pytest.fixture(autouse=True)
# def shared_last_failed(pytestconfig):
# val = pytestconfig.cache.get("example/value", None)
# breakpoint()
# if val is None:
# pytestconfig.cache.set("example/value", val)
# return val

View File

@ -7,7 +7,9 @@ import platform
from functools import partial
import itertools
import psutil
import pytest
import subprocess
import tractor
from tractor._testing import tractor_test
import trio
@ -152,13 +154,23 @@ async def unpack_reg(actor_or_portal):
async def spawn_and_check_registry(
reg_addr: tuple,
use_signal: bool,
debug_mode: bool = False,
remote_arbiter: bool = False,
with_streaming: bool = False,
maybe_daemon: tuple[
subprocess.Popen,
psutil.Process,
]|None = None,
) -> None:
if maybe_daemon:
popen, proc = maybe_daemon
# breakpoint()
async with tractor.open_root_actor(
registry_addrs=[reg_addr],
debug_mode=debug_mode,
):
async with tractor.get_registry(reg_addr) as portal:
# runtime needs to be up to call this
@ -176,11 +188,11 @@ async def spawn_and_check_registry(
extra = 2 # local root actor + remote arbiter
# ensure current actor is registered
registry = await get_reg()
registry: dict = await get_reg()
assert actor.uid in registry
try:
async with tractor.open_nursery() as n:
async with tractor.open_nursery() as an:
async with trio.open_nursery(
strict_exception_groups=False,
) as trion:
@ -189,17 +201,17 @@ async def spawn_and_check_registry(
for i in range(3):
name = f'a{i}'
if with_streaming:
portals[name] = await n.start_actor(
portals[name] = await an.start_actor(
name=name, enable_modules=[__name__])
else: # no streaming
portals[name] = await n.run_in_actor(
portals[name] = await an.run_in_actor(
trio.sleep_forever, name=name)
# wait on last actor to come up
async with tractor.wait_for_actor(name):
registry = await get_reg()
for uid in n._children:
for uid in an._children:
assert uid in registry
assert len(portals) + extra == len(registry)
@ -232,6 +244,7 @@ async def spawn_and_check_registry(
@pytest.mark.parametrize('use_signal', [False, True])
@pytest.mark.parametrize('with_streaming', [False, True])
def test_subactors_unregister_on_cancel(
debug_mode: bool,
start_method,
use_signal,
reg_addr,
@ -248,6 +261,7 @@ def test_subactors_unregister_on_cancel(
spawn_and_check_registry,
reg_addr,
use_signal,
debug_mode=debug_mode,
remote_arbiter=False,
with_streaming=with_streaming,
),
@ -257,7 +271,8 @@ def test_subactors_unregister_on_cancel(
@pytest.mark.parametrize('use_signal', [False, True])
@pytest.mark.parametrize('with_streaming', [False, True])
def test_subactors_unregister_on_cancel_remote_daemon(
daemon,
daemon: subprocess.Popen,
debug_mode: bool,
start_method,
use_signal,
reg_addr,
@ -273,8 +288,13 @@ def test_subactors_unregister_on_cancel_remote_daemon(
spawn_and_check_registry,
reg_addr,
use_signal,
debug_mode=debug_mode,
remote_arbiter=True,
with_streaming=with_streaming,
maybe_daemon=(
daemon,
psutil.Process(daemon.pid)
),
),
)
@ -373,7 +393,7 @@ def test_close_channel_explicit(
@pytest.mark.parametrize('use_signal', [False, True])
def test_close_channel_explicit_remote_arbiter(
daemon,
daemon: subprocess.Popen,
start_method,
use_signal,
reg_addr,