559 lines
18 KiB
Python
559 lines
18 KiB
Python
# tractor: structured concurrent "actors".
|
|
# Copyright 2018-eternity Tyler Goodlet.
|
|
|
|
# This program is free software: you can redistribute it and/or modify
|
|
# it under the terms of the GNU Affero General Public License as published by
|
|
# the Free Software Foundation, either version 3 of the License, or
|
|
# (at your option) any later version.
|
|
|
|
# This program is distributed in the hope that it will be useful,
|
|
# but WITHOUT ANY WARRANTY; without even the implied warranty of
|
|
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
|
|
# GNU Affero General Public License for more details.
|
|
|
|
# You should have received a copy of the GNU Affero General Public License
|
|
# along with this program. If not, see <https://www.gnu.org/licenses/>.
|
|
|
|
'''
|
|
`pytest` utils helpers and plugins for testing `tractor`'s runtime
|
|
and applications.
|
|
|
|
'''
|
|
from functools import (
|
|
partial,
|
|
wraps,
|
|
)
|
|
import inspect
|
|
import platform
|
|
from typing import (
|
|
Callable,
|
|
get_args,
|
|
)
|
|
|
|
import pytest
|
|
import tractor
|
|
from tractor.spawn._spawn import SpawnMethodKey
|
|
import trio
|
|
|
|
|
|
def tractor_test(
|
|
wrapped: Callable|None = None,
|
|
*,
|
|
# @tractor_test(<deco-params>)
|
|
timeout: float = 30,
|
|
hide_tb: bool = True,
|
|
):
|
|
'''
|
|
Decorator for async test fns to decorator-wrap them as "native"
|
|
looking sync funcs runnable by `pytest` and auto invoked with
|
|
`trio.run()` (much like the `pytest-trio` plugin's approach).
|
|
|
|
Further the test fn body will be invoked AFTER booting the actor
|
|
runtime, i.e. from inside a `tractor.open_root_actor()` block AND
|
|
with various runtime and tooling parameters implicitly passed as
|
|
requested by by the test session's config; see immediately below.
|
|
|
|
Basic deco use:
|
|
---------------
|
|
|
|
@tractor_test(
|
|
timeout=10,
|
|
)
|
|
async def test_whatever(
|
|
# fixture param declarations
|
|
loglevel: str,
|
|
start_method: str,
|
|
reg_addr: tuple,
|
|
tpt_proto: str,
|
|
debug_mode: bool,
|
|
):
|
|
# already inside a root-actor runtime `trio.Task`
|
|
await ...
|
|
|
|
|
|
Runtime config via special fixtures:
|
|
------------------------------------
|
|
If any of the following fixture are requested by the wrapped test
|
|
fn (via normal func-args declaration),
|
|
|
|
- `reg_addr` (a socket addr tuple where registrar is listening)
|
|
- `loglevel` (logging level passed to tractor internals)
|
|
- `start_method` (subprocess spawning backend)
|
|
|
|
(TODO support)
|
|
- `tpt_proto` (IPC transport protocol key)
|
|
|
|
they will be automatically injected to each test as normally
|
|
expected as well as passed to the initial
|
|
`tractor.open_root_actor()` funcargs.
|
|
|
|
'''
|
|
__tracebackhide__: bool = hide_tb
|
|
|
|
# handle @tractor_test (no parens) vs @tractor_test(timeout=10)
|
|
if wrapped is None:
|
|
return partial(
|
|
tractor_test,
|
|
timeout=timeout,
|
|
hide_tb=hide_tb,
|
|
)
|
|
|
|
funcname: str = wrapped.__name__
|
|
if not inspect.iscoroutinefunction(wrapped):
|
|
raise TypeError(
|
|
f'Test-fn {funcname!r} must be an async-function !!'
|
|
)
|
|
|
|
# NOTE: we intentionally use `functools.wraps` instead of
|
|
# `@wrapt.decorator` here bc wrapt's transparent proxy makes
|
|
# `inspect.iscoroutinefunction(wrapper)` return `True` (it
|
|
# proxies `__code__` from the wrapped async fn), which causes
|
|
# pytest to skip the test as an "unhandled coroutine".
|
|
# `functools.wraps` preserves the signature for fixture
|
|
# injection (via `__wrapped__`) without leaking the async
|
|
# nature.
|
|
@wraps(wrapped)
|
|
def wrapper(**kwargs):
|
|
__tracebackhide__: bool = hide_tb
|
|
|
|
# NOTE, ensure we inject any test-fn declared fixture
|
|
# names.
|
|
for kw in [
|
|
'reg_addr',
|
|
'loglevel',
|
|
'start_method',
|
|
'debug_mode',
|
|
'tpt_proto',
|
|
'timeout',
|
|
]:
|
|
if kw in inspect.signature(wrapped).parameters:
|
|
assert kw in kwargs
|
|
|
|
# Extract runtime settings as locals for
|
|
# `open_root_actor()`; these must NOT leak into
|
|
# `kwargs` when the test fn doesn't declare them
|
|
# (the original pre-wrapt code had the same guard).
|
|
reg_addr = kwargs.get('reg_addr')
|
|
loglevel = kwargs.get('loglevel')
|
|
debug_mode = kwargs.get('debug_mode', False)
|
|
start_method = kwargs.get('start_method')
|
|
if platform.system() == 'Windows':
|
|
if start_method is None:
|
|
start_method = 'trio'
|
|
elif start_method != 'trio':
|
|
raise ValueError(
|
|
'ONLY the `start_method="trio"` is supported on Windows.'
|
|
)
|
|
|
|
# Open a root-actor, passing runtime-settings
|
|
# extracted above as closure locals, then invoke
|
|
# the test-fn body as the root-most task.
|
|
#
|
|
# NOTE: `kwargs` is forwarded as-is to
|
|
# `wrapped()` — it only contains what pytest
|
|
# injected based on the test fn's signature.
|
|
async def _main(**kwargs):
|
|
__tracebackhide__: bool = hide_tb
|
|
|
|
with trio.fail_after(timeout):
|
|
async with tractor.open_root_actor(
|
|
registry_addrs=(
|
|
[reg_addr] if reg_addr else None
|
|
),
|
|
loglevel=loglevel,
|
|
start_method=start_method,
|
|
|
|
# TODO: only enable when pytest is passed
|
|
# --pdb
|
|
debug_mode=debug_mode,
|
|
|
|
):
|
|
# invoke test-fn body IN THIS task
|
|
await wrapped(**kwargs)
|
|
|
|
# invoke runtime via a root task.
|
|
return trio.run(
|
|
partial(
|
|
_main,
|
|
**kwargs,
|
|
)
|
|
)
|
|
|
|
return wrapper
|
|
|
|
|
|
def pytest_addoption(
|
|
parser: pytest.Parser,
|
|
):
|
|
# parser.addoption(
|
|
# "--ll",
|
|
# action="store",
|
|
# dest='loglevel',
|
|
# default='ERROR', help="logging level to set when testing"
|
|
# )
|
|
|
|
parser.addoption(
|
|
"--spawn-backend",
|
|
action="store",
|
|
dest='spawn_backend',
|
|
default='trio',
|
|
help="Processing spawning backend to use for test run",
|
|
)
|
|
|
|
parser.addoption(
|
|
"--tpdb",
|
|
"--debug-mode",
|
|
action="store_true",
|
|
dest='tractor_debug_mode',
|
|
# default=False,
|
|
help=(
|
|
'Enable a flag that can be used by tests to to set the '
|
|
'`debug_mode: bool` for engaging the internal '
|
|
'multi-proc debugger sys.'
|
|
),
|
|
)
|
|
|
|
parser.addoption(
|
|
"--enable-stackscope",
|
|
action="store_true",
|
|
dest='tractor_enable_stackscope',
|
|
default=False,
|
|
help=(
|
|
'Install `stackscope` SIGUSR1 handler in pytest + '
|
|
'every spawned subactor for live trio task-tree '
|
|
'dumps during hang investigations. Lighter than '
|
|
'`--tpdb` (no pdb machinery / tty-lock contention) '
|
|
'— use when you only need stack visibility. To '
|
|
'capture: `kill -USR1 <pytest-or-subactor-pid>`.'
|
|
),
|
|
)
|
|
|
|
# provide which IPC transport protocols opting-in test suites
|
|
# should accumulatively run against.
|
|
parser.addoption(
|
|
"--tpt-proto",
|
|
nargs='+', # accumulate-multiple-args
|
|
action="store",
|
|
dest='tpt_protos',
|
|
default=['tcp'],
|
|
help="Transport protocol to use under the `tractor.ipc.Channel`",
|
|
)
|
|
|
|
|
|
def pytest_configure(
|
|
config: pytest.Config,
|
|
):
|
|
backend: str = config.option.spawn_backend
|
|
from tractor.spawn._spawn import try_set_start_method
|
|
try:
|
|
try_set_start_method(backend)
|
|
except RuntimeError as err:
|
|
# e.g. `--spawn-backend=subint` on Python < 3.14 — turn the
|
|
# runtime gate error into a clean pytest usage error so the
|
|
# suite exits with a helpful banner instead of a traceback.
|
|
raise pytest.UsageError(str(err)) from err
|
|
|
|
# register custom marks to avoid warnings see,
|
|
# https://docs.pytest.org/en/stable/how-to/writing_plugins.html#registering-custom-markers
|
|
config.addinivalue_line(
|
|
'markers',
|
|
'no_tpt(proto_key): test will (likely) not behave with tpt backend'
|
|
)
|
|
config.addinivalue_line(
|
|
'markers',
|
|
'skipon_spawn_backend(*start_methods, reason=None): '
|
|
'skip this test under any of the given `--spawn-backend` '
|
|
'values; useful for backend-specific known-hang / -borked '
|
|
'cases (e.g. the `subint` GIL-starvation class documented '
|
|
'in `ai/conc-anal/subint_sigint_starvation_issue.md`).'
|
|
)
|
|
|
|
# `--enable-stackscope`: install SIGUSR1 → trio task-tree
|
|
# dump in pytest itself + propagate to every subactor via
|
|
# an env var that fork-children inherit and the runtime
|
|
# gate honors. Lighter than `--tpdb` (no pdb machinery) —
|
|
# purely for hang-investigation stack visibility.
|
|
if getattr(
|
|
config.option, 'tractor_enable_stackscope', False
|
|
):
|
|
import os
|
|
# Env var inherited via fork → subactor's runtime
|
|
# picks it up at `Actor.async_main` startup. See the
|
|
# gate in `tractor.runtime._runtime` matching this
|
|
# var name.
|
|
os.environ['TRACTOR_ENABLE_STACKSCOPE'] = '1'
|
|
|
|
# Install in pytest itself so `kill -USR1 <pytest>`
|
|
# dumps the parent trio task-tree (which is where
|
|
# most Mode-A-class hangs park).
|
|
try:
|
|
from tractor.devx._stackscope import (
|
|
enable_stack_on_sig,
|
|
)
|
|
enable_stack_on_sig()
|
|
except ImportError:
|
|
import warnings
|
|
warnings.warn(
|
|
'`stackscope` not installed — '
|
|
'--enable-stackscope is a no-op. '
|
|
'Install via the `devx` dep group.'
|
|
)
|
|
|
|
|
|
def pytest_collection_modifyitems(
|
|
config: pytest.Config,
|
|
items: list[pytest.Function],
|
|
):
|
|
'''
|
|
Expand any `@pytest.mark.skipon_spawn_backend('<backend>'[,
|
|
...], reason='...')` markers into concrete
|
|
`pytest.mark.skip(reason=...)` calls for tests whose
|
|
backend-arg set contains the active `--spawn-backend`.
|
|
|
|
Uses `item.iter_markers(name=...)` which walks function +
|
|
class + module-level marks in the correct scope order (and
|
|
handles both the single-`MarkDecorator` and `list[Mark]`
|
|
forms of a module-level `pytestmark`) — so the same marker
|
|
works at any level a user puts it.
|
|
|
|
'''
|
|
backend: str = config.option.spawn_backend
|
|
default_reason: str = f'Borked on --spawn-backend={backend!r}'
|
|
for item in items:
|
|
for mark in item.iter_markers(name='skipon_spawn_backend'):
|
|
skip_backends: tuple[str] = mark.args
|
|
for skip_backend in skip_backends:
|
|
assert skip_backend in get_args(SpawnMethodKey)
|
|
# ?TODO, run these through the try-set-backend checker to
|
|
# avoid typos?
|
|
if backend in skip_backends:
|
|
reason: str = mark.kwargs.get(
|
|
'reason',
|
|
default_reason,
|
|
)
|
|
item.add_marker(pytest.mark.skip(reason=reason))
|
|
# first matching mark wins; no value in stacking
|
|
# multiple `skip`s on the same item.
|
|
break
|
|
|
|
|
|
@pytest.fixture(
|
|
scope='session',
|
|
autouse=True,
|
|
)
|
|
def _reap_orphaned_subactors():
|
|
'''
|
|
Session-scoped autouse fixture: after the whole test
|
|
session finishes, SIGINT any subactor processes still
|
|
parented to this `pytest` process, wait a bounded
|
|
grace window, then SIGKILL survivors.
|
|
|
|
Rationale: under fork-based spawn backends (notably
|
|
`main_thread_forkserver`), a test that times out or bails
|
|
mid-teardown can leave subactor forks alive. Without
|
|
this reap, they linger across sessions and compete
|
|
for ports / inherit pytest's capture-pipe fds — which
|
|
flakifies later tests. SC-polite discipline: SIGINT
|
|
first to let the subactor's trio cancel shield + IPC
|
|
teardown paths run before we escalate.
|
|
|
|
Matching companion CLI: `scripts/tractor-reap` for
|
|
the pytest-died-mid-session case.
|
|
|
|
'''
|
|
import os
|
|
parent_pid: int = os.getpid()
|
|
yield
|
|
from tractor._testing._reap import (
|
|
find_descendants,
|
|
reap,
|
|
)
|
|
pids: list[int] = find_descendants(parent_pid)
|
|
if pids:
|
|
reap(pids, grace=3.0)
|
|
|
|
|
|
@pytest.fixture
|
|
def reap_subactors_per_test() -> int:
|
|
'''
|
|
Per-test (function-scoped) zombie-subactor reaper —
|
|
**opt-in**, NOT autouse.
|
|
|
|
When a test's teardown fails to fully cancel its actor
|
|
tree (e.g. an asyncio cancel-cascade times out under
|
|
`main_thread_forkserver`, pytest hits its 200s wall-
|
|
clock and abandons), the leftover subactor lingers as a
|
|
direct child of `pytest` and squats on whatever
|
|
registrar port / UDS path / shm segment it had bound.
|
|
Subsequent tests trying to allocate the same resource
|
|
fail — and with backends that bind a session-shared
|
|
`reg_addr`, that means EVERY following test in the
|
|
suite cascades. The session-scoped sibling
|
|
(`_reap_orphaned_subactors`) only kicks in at session
|
|
end which is too late to save the cascade.
|
|
|
|
Apply at module-level on the topically-problematic
|
|
test files via:
|
|
|
|
```python
|
|
pytestmark = pytest.mark.usefixtures(
|
|
'reap_subactors_per_test',
|
|
)
|
|
```
|
|
|
|
Or per-test via the same `usefixtures` mark on a
|
|
specific function. Intentionally NOT autouse so the
|
|
fixture's presence on a module signals "this module's
|
|
teardown is known-leaky enough to contaminate
|
|
siblings"; the visibility helps future-us track down
|
|
root causes rather than burying them under blanket
|
|
cleanup.
|
|
|
|
'''
|
|
import os
|
|
parent_pid: int = os.getpid()
|
|
yield parent_pid
|
|
from tractor._testing._reap import (
|
|
find_descendants,
|
|
reap,
|
|
)
|
|
pids: list[int] = find_descendants(parent_pid)
|
|
if pids:
|
|
reap(pids, grace=3.0)
|
|
|
|
|
|
@pytest.fixture(scope='session')
|
|
def debug_mode(
|
|
request: pytest.FixtureRequest,
|
|
) -> bool:
|
|
'''
|
|
Flag state for whether `--tpdb` (for `tractor`-py-debugger)
|
|
was passed to the test run.
|
|
|
|
Normally tests should pass this directly to `.open_root_actor()`
|
|
to allow the user to opt into suite-wide crash handling.
|
|
|
|
'''
|
|
debug_mode: bool = request.config.option.tractor_debug_mode
|
|
return debug_mode
|
|
|
|
|
|
@pytest.fixture(scope='session')
|
|
def spawn_backend(
|
|
request: pytest.FixtureRequest,
|
|
) -> str:
|
|
return request.config.option.spawn_backend
|
|
|
|
|
|
@pytest.fixture(scope='session')
|
|
def tpt_protos(
|
|
request: pytest.FixtureRequest,
|
|
) -> list[str]:
|
|
|
|
# allow quoting on CLI
|
|
proto_keys: list[str] = [
|
|
proto_key.replace('"', '').replace("'", "")
|
|
for proto_key in request.config.option.tpt_protos
|
|
]
|
|
|
|
# ?TODO, eventually support multiple protos per test-sesh?
|
|
if len(proto_keys) > 1:
|
|
pytest.fail(
|
|
'We only support one `--tpt-proto <key>` atm!\n'
|
|
)
|
|
|
|
# XXX ensure we support the protocol by name via lookup!
|
|
for proto_key in proto_keys:
|
|
from tractor.discovery import _addr
|
|
addr_type = _addr._address_types[proto_key]
|
|
assert addr_type.proto_key == proto_key
|
|
|
|
yield proto_keys
|
|
|
|
|
|
@pytest.fixture(
|
|
scope='session',
|
|
autouse=True,
|
|
)
|
|
def tpt_proto(
|
|
request: pytest.FixtureRequest,
|
|
tpt_protos: list[str],
|
|
) -> str:
|
|
proto_key: str = tpt_protos[0]
|
|
|
|
# ?TODO, but needs a fn-scoped tpt_proto fixture..
|
|
# @pytest.mark.no_tpt('uds')
|
|
# node = request.node
|
|
# markers = node.own_markers
|
|
# for mark in markers:
|
|
# if (
|
|
# mark.name == 'no_tpt'
|
|
# and
|
|
# proto_key in mark.args
|
|
# ):
|
|
# pytest.skip(
|
|
# f'Test {node} normally fails with '
|
|
# f'tpt-proto={proto_key!r}\n'
|
|
# )
|
|
|
|
from tractor.runtime import _state
|
|
if _state._def_tpt_proto != proto_key:
|
|
_state._def_tpt_proto = proto_key
|
|
_state._runtime_vars['_enable_tpts'] = [
|
|
proto_key,
|
|
]
|
|
|
|
yield proto_key
|
|
|
|
|
|
@pytest.fixture(scope='session')
|
|
def reg_addr(
|
|
tpt_proto: str,
|
|
) -> tuple[str, int|str]:
|
|
'''
|
|
Deliver a test-sesh unique registry address such
|
|
that each run's (tests which use this fixture) will
|
|
have no conflicts/cross-talk when running simultaneously
|
|
nor will interfere with other live `tractor` apps active
|
|
on the same network-host (namespace).
|
|
|
|
'''
|
|
from tractor._testing.addr import get_rando_addr
|
|
return get_rando_addr(
|
|
tpt_proto=tpt_proto,
|
|
)
|
|
|
|
|
|
def pytest_generate_tests(
|
|
metafunc: pytest.Metafunc,
|
|
):
|
|
spawn_backend: str = metafunc.config.option.spawn_backend
|
|
if not spawn_backend:
|
|
# XXX some weird windows bug with `pytest`?
|
|
spawn_backend = 'trio'
|
|
|
|
# drive the valid-backend set from the canonical `Literal` so
|
|
# adding a new spawn backend (e.g. `'subint'`) doesn't require
|
|
# touching the harness.
|
|
assert spawn_backend in get_args(SpawnMethodKey)
|
|
|
|
# NOTE: used-to-be-used-to dyanmically parametrize tests for when
|
|
# you just passed --spawn-backend=`mp` on the cli, but now we expect
|
|
# that cli input to be manually specified, BUT, maybe we'll do
|
|
# something like this again in the future?
|
|
if 'start_method' in metafunc.fixturenames:
|
|
metafunc.parametrize(
|
|
"start_method",
|
|
[spawn_backend],
|
|
scope='module',
|
|
)
|
|
|
|
# TODO, parametrize any `tpt_proto: str` declaring tests!
|
|
# proto_tpts: list[str] = metafunc.config.option.proto_tpts
|
|
# if 'tpt_proto' in metafunc.fixturenames:
|
|
# metafunc.parametrize(
|
|
# 'tpt_proto',
|
|
# proto_tpts, # TODO, double check this list usage!
|
|
# scope='module',
|
|
# )
|