pytest_pluginize: expose some pytest reusables from our test harness #27

Open
goodboy wants to merge 7 commits from pytest_pluginize into structural_dynamics_of_flow
9 changed files with 334 additions and 192 deletions

View File

@ -6,21 +6,22 @@ from __future__ import annotations
import sys import sys
import subprocess import subprocess
import os import os
import random
import signal import signal
import platform import platform
import time import time
import pytest import pytest
import tractor
from tractor._testing import ( from tractor._testing import (
examples_dir as examples_dir, examples_dir as examples_dir,
tractor_test as tractor_test, tractor_test as tractor_test,
expect_ctxc as expect_ctxc, expect_ctxc as expect_ctxc,
) )
# TODO: include wtv plugin(s) we build in `._testing.pytest`? pytest_plugins: list[str] = [
pytest_plugins = ['pytester'] 'pytester',
'tractor._testing.pytest',
]
# Sending signal.SIGINT on subprocess fails on windows. Use CTRL_* alternatives # Sending signal.SIGINT on subprocess fails on windows. Use CTRL_* alternatives
if platform.system() == 'Windows': if platform.system() == 'Windows':
@ -48,6 +49,9 @@ no_windows = pytest.mark.skipif(
def pytest_addoption( def pytest_addoption(
parser: pytest.Parser, parser: pytest.Parser,
): ):
# ?TODO? should this be exposed from our `._testing.pytest`
# plugin or should we make it more explicit with `--tl` for
# tractor logging like we do in other client projects?
parser.addoption( parser.addoption(
"--ll", "--ll",
action="store", action="store",
@ -55,54 +59,10 @@ def pytest_addoption(
default='ERROR', help="logging level to set when testing" default='ERROR', help="logging level to set when testing"
) )
parser.addoption(
"--spawn-backend",
action="store",
dest='spawn_backend',
default='trio',
help="Processing spawning backend to use for test run",
)
parser.addoption(
"--tpdb",
"--debug-mode",
action="store_true",
dest='tractor_debug_mode',
# default=False,
help=(
'Enable a flag that can be used by tests to to set the '
'`debug_mode: bool` for engaging the internal '
'multi-proc debugger sys.'
),
)
# provide which IPC transport protocols opting-in test suites
# should accumulatively run against.
parser.addoption(
"--tpt-proto",
nargs='+', # accumulate-multiple-args
action="store",
dest='tpt_protos',
default=['tcp'],
help="Transport protocol to use under the `tractor.ipc.Channel`",
)
def pytest_configure(config):
backend = config.option.spawn_backend
tractor._spawn.try_set_start_method(backend)
@pytest.fixture(scope='session')
def debug_mode(request) -> bool:
debug_mode: bool = request.config.option.tractor_debug_mode
# if debug_mode:
# breakpoint()
return debug_mode
@pytest.fixture(scope='session', autouse=True) @pytest.fixture(scope='session', autouse=True)
def loglevel(request): def loglevel(request):
import tractor
orig = tractor.log._default_loglevel orig = tractor.log._default_loglevel
level = tractor.log._default_loglevel = request.config.option.loglevel level = tractor.log._default_loglevel = request.config.option.loglevel
tractor.log.get_console_log(level) tractor.log.get_console_log(level)
@ -110,49 +70,6 @@ def loglevel(request):
tractor.log._default_loglevel = orig tractor.log._default_loglevel = orig
@pytest.fixture(scope='session')
def spawn_backend(request) -> str:
return request.config.option.spawn_backend
@pytest.fixture(scope='session')
def tpt_protos(request) -> list[str]:
# allow quoting on CLI
proto_keys: list[str] = [
proto_key.replace('"', '').replace("'", "")
for proto_key in request.config.option.tpt_protos
]
# ?TODO, eventually support multiple protos per test-sesh?
if len(proto_keys) > 1:
pytest.fail(
'We only support one `--tpt-proto <key>` atm!\n'
)
# XXX ensure we support the protocol by name via lookup!
for proto_key in proto_keys:
addr_type = tractor._addr._address_types[proto_key]
assert addr_type.proto_key == proto_key
yield proto_keys
@pytest.fixture(
scope='session',
autouse=True,
)
def tpt_proto(
tpt_protos: list[str],
) -> str:
proto_key: str = tpt_protos[0]
from tractor import _state
if _state._def_tpt_proto != proto_key:
_state._def_tpt_proto = proto_key
# breakpoint()
yield proto_key
_ci_env: bool = os.environ.get('CI', False) _ci_env: bool = os.environ.get('CI', False)
@ -165,80 +82,6 @@ def ci_env() -> bool:
return _ci_env return _ci_env
# TODO: also move this to `._testing` for now?
# -[ ] possibly generalize and re-use for multi-tree spawning
# along with the new stuff for multi-addrs?
#
# choose random port at import time
_rando_port: str = random.randint(1000, 9999)
@pytest.fixture(scope='session')
def reg_addr(
tpt_proto: str,
) -> tuple[str, int|str]:
# globally override the runtime to the per-test-session-dynamic
# addr so that all tests never conflict with any other actor
# tree using the default.
from tractor import (
_addr,
)
addr_type = _addr._address_types[tpt_proto]
def_reg_addr: tuple[str, int] = _addr._default_lo_addrs[tpt_proto]
testrun_reg_addr: tuple[str, int]
match tpt_proto:
case 'tcp':
testrun_reg_addr = (
addr_type.def_bindspace,
_rando_port,
)
# NOTE, file-name uniqueness (no-collisions) will be based on
# the runtime-directory and root (pytest-proc's) pid.
case 'uds':
testrun_reg_addr = addr_type.get_random().unwrap()
assert def_reg_addr != testrun_reg_addr
return testrun_reg_addr
def pytest_generate_tests(metafunc):
spawn_backend: str = metafunc.config.option.spawn_backend
if not spawn_backend:
# XXX some weird windows bug with `pytest`?
spawn_backend = 'trio'
# TODO: maybe just use the literal `._spawn.SpawnMethodKey`?
assert spawn_backend in (
'mp_spawn',
'mp_forkserver',
'trio',
)
# NOTE: used-to-be-used-to dyanmically parametrize tests for when
# you just passed --spawn-backend=`mp` on the cli, but now we expect
# that cli input to be manually specified, BUT, maybe we'll do
# something like this again in the future?
if 'start_method' in metafunc.fixturenames:
metafunc.parametrize(
"start_method",
[spawn_backend],
scope='module',
)
# TODO, parametrize any `tpt_proto: str` declaring tests!
# proto_tpts: list[str] = metafunc.config.option.proto_tpts
# if 'tpt_proto' in metafunc.fixturenames:
# metafunc.parametrize(
# 'tpt_proto',
# proto_tpts, # TODO, double check this list usage!
# scope='module',
# )
def sig_prog( def sig_prog(
proc: subprocess.Popen, proc: subprocess.Popen,
sig: int, sig: int,

View File

@ -83,3 +83,26 @@ def test_implicit_root_via_first_nursery(
assert tractor.current_actor().aid.name == 'root' assert tractor.current_actor().aid.name == 'root'
trio.run(main) trio.run(main)
def test_runtime_vars_unset(
reg_addr: tuple,
debug_mode: bool
):
'''
Ensure any `._state._runtime_vars` are restored to default values
after the root actor-runtime exits!
'''
assert not tractor._state._runtime_vars['_debug_mode']
async def main():
assert not tractor._state._runtime_vars['_debug_mode']
async with tractor.open_nursery(
debug_mode=True,
):
assert tractor._state._runtime_vars['_debug_mode']
# after runtime closure, should be reverted!
assert not tractor._state._runtime_vars['_debug_mode']
trio.run(main)

View File

@ -276,7 +276,8 @@ async def open_root_actor(
if ( if (
debug_mode debug_mode
and _spawn._spawn_method == 'trio' and
_spawn._spawn_method == 'trio'
): ):
_state._runtime_vars['_debug_mode'] = True _state._runtime_vars['_debug_mode'] = True
@ -512,8 +513,17 @@ async def open_root_actor(
) )
await actor.cancel(None) # self cancel await actor.cancel(None) # self cancel
finally: finally:
# revert all process-global runtime state
if (
debug_mode
and
_spawn._spawn_method == 'trio'
):
_state._runtime_vars['_debug_mode'] = False
_state._current_actor = None _state._current_actor = None
_state._last_actor_terminated = actor _state._last_actor_terminated = actor
logger.runtime( logger.runtime(
f'Root actor terminated\n' f'Root actor terminated\n'
f')>\n' f')>\n'

View File

@ -37,6 +37,9 @@ from .fault_simulation import (
) )
# TODO, use dulwhich for this instead?
# -> we're going to likely need it (or something similar)
# for supporting hot-coad reload feats eventually anyway!
def repodir() -> pathlib.Path: def repodir() -> pathlib.Path:
''' '''
Return the abspath to the repo directory. Return the abspath to the repo directory.

View File

@ -0,0 +1,70 @@
# tractor: structured concurrent "actors".
# Copyright 2018-eternity Tyler Goodlet.
# This program is free software: you can redistribute it and/or modify
# it under the terms of the GNU Affero General Public License as published by
# the Free Software Foundation, either version 3 of the License, or
# (at your option) any later version.
# This program is distributed in the hope that it will be useful,
# but WITHOUT ANY WARRANTY; without even the implied warranty of
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
# GNU Affero General Public License for more details.
# You should have received a copy of the GNU Affero General Public License
# along with this program. If not, see <https://www.gnu.org/licenses/>.
'''
Random IPC addr generation for isolating
the discovery space between test sessions.
Might be eventually useful to expose as a util set from
our `tractor.discovery` subsys?
'''
import random
from typing import (
Type,
)
from tractor import (
_addr,
)
def get_rando_addr(
tpt_proto: str,
*,
# choose random port at import time
_rando_port: str = random.randint(1000, 9999)
) -> tuple[str, str|int]:
'''
Used to globally override the runtime to the
per-test-session-dynamic addr so that all tests never conflict
with any other actor tree using the default.
'''
addr_type: Type[_addr.Addres] = _addr._address_types[tpt_proto]
def_reg_addr: tuple[str, int] = _addr._default_lo_addrs[tpt_proto]
# this is the "unwrapped" form expected to be passed to
# `.open_root_actor()` by test body.
testrun_reg_addr: tuple[str, int|str]
match tpt_proto:
case 'tcp':
testrun_reg_addr = (
addr_type.def_bindspace,
_rando_port,
)
# NOTE, file-name uniqueness (no-collisions) will be based on
# the runtime-directory and root (pytest-proc's) pid.
case 'uds':
testrun_reg_addr = addr_type.get_random().unwrap()
# XXX, as sanity it should never the same as the default for the
# host-singleton registry actor.
assert def_reg_addr != testrun_reg_addr
return testrun_reg_addr

View File

@ -26,29 +26,46 @@ from functools import (
import inspect import inspect
import platform import platform
import pytest
import tractor import tractor
import trio import trio
def tractor_test(fn): def tractor_test(fn):
''' '''
Decorator for async test funcs to present them as "native" Decorator for async test fns to decorator-wrap them as "native"
looking sync funcs runnable by `pytest` using `trio.run()`. looking sync funcs runnable by `pytest` and auto invoked with
`trio.run()` (much like the `pytest-trio` plugin's approach).
Use: Further the test fn body will be invoked AFTER booting the actor
runtime, i.e. from inside a `tractor.open_root_actor()` block AND
with various runtime and tooling parameters implicitly passed as
requested by by the test session's config; see immediately below.
@tractor_test Basic deco use:
async def test_whatever(): ---------------
await ...
If fixtures: @tractor_test
async def test_whatever():
await ...
- ``reg_addr`` (a socket addr tuple where arbiter is listening)
- ``loglevel`` (logging level passed to tractor internals)
- ``start_method`` (subprocess spawning backend)
are defined in the `pytest` fixture space they will be automatically Runtime config via special fixtures:
injected to tests declaring these funcargs. ------------------------------------
If any of the following fixture are requested by the wrapped test
fn (via normal func-args declaration),
- `reg_addr` (a socket addr tuple where arbiter is listening)
- `loglevel` (logging level passed to tractor internals)
- `start_method` (subprocess spawning backend)
(TODO support)
- `tpt_proto` (IPC transport protocol key)
they will be automatically injected to each test as normally
expected as well as passed to the initial
`tractor.open_root_actor()` funcargs.
''' '''
@wraps(fn) @wraps(fn)
def wrapper( def wrapper(
@ -111,3 +128,164 @@ def tractor_test(fn):
return trio.run(main) return trio.run(main)
return wrapper return wrapper
def pytest_addoption(
parser: pytest.Parser,
):
# parser.addoption(
# "--ll",
# action="store",
# dest='loglevel',
# default='ERROR', help="logging level to set when testing"
# )
parser.addoption(
"--spawn-backend",
action="store",
dest='spawn_backend',
default='trio',
help="Processing spawning backend to use for test run",
)
parser.addoption(
"--tpdb",
"--debug-mode",
action="store_true",
dest='tractor_debug_mode',
# default=False,
help=(
'Enable a flag that can be used by tests to to set the '
'`debug_mode: bool` for engaging the internal '
'multi-proc debugger sys.'
),
)
# provide which IPC transport protocols opting-in test suites
# should accumulatively run against.
parser.addoption(
"--tpt-proto",
nargs='+', # accumulate-multiple-args
action="store",
dest='tpt_protos',
default=['tcp'],
help="Transport protocol to use under the `tractor.ipc.Channel`",
)
def pytest_configure(config):
backend = config.option.spawn_backend
tractor._spawn.try_set_start_method(backend)
@pytest.fixture(scope='session')
def debug_mode(request) -> bool:
'''
Flag state for whether `--tpdb` (for `tractor`-py-debugger)
was passed to the test run.
Normally tests should pass this directly to `.open_root_actor()`
to allow the user to opt into suite-wide crash handling.
'''
debug_mode: bool = request.config.option.tractor_debug_mode
return debug_mode
@pytest.fixture(scope='session')
def spawn_backend(request) -> str:
return request.config.option.spawn_backend
@pytest.fixture(scope='session')
def tpt_protos(request) -> list[str]:
# allow quoting on CLI
proto_keys: list[str] = [
proto_key.replace('"', '').replace("'", "")
for proto_key in request.config.option.tpt_protos
]
# ?TODO, eventually support multiple protos per test-sesh?
if len(proto_keys) > 1:
pytest.fail(
'We only support one `--tpt-proto <key>` atm!\n'
)
# XXX ensure we support the protocol by name via lookup!
for proto_key in proto_keys:
addr_type = tractor._addr._address_types[proto_key]
assert addr_type.proto_key == proto_key
yield proto_keys
@pytest.fixture(
scope='session',
autouse=True,
)
def tpt_proto(
tpt_protos: list[str],
) -> str:
proto_key: str = tpt_protos[0]
from tractor import _state
if _state._def_tpt_proto != proto_key:
_state._def_tpt_proto = proto_key
yield proto_key
@pytest.fixture(scope='session')
def reg_addr(
tpt_proto: str,
) -> tuple[str, int|str]:
'''
Deliver a test-sesh unique registry address such
that each run's (tests which use this fixture) will
have no conflicts/cross-talk when running simultaneously
nor will interfere with other live `tractor` apps active
on the same network-host (namespace).
'''
from tractor._testing.addr import get_rando_addr
return get_rando_addr(
tpt_proto=tpt_proto,
)
def pytest_generate_tests(
metafunc: pytest.Metafunc,
):
spawn_backend: str = metafunc.config.option.spawn_backend
if not spawn_backend:
# XXX some weird windows bug with `pytest`?
spawn_backend = 'trio'
# TODO: maybe just use the literal `._spawn.SpawnMethodKey`?
assert spawn_backend in (
'mp_spawn',
'mp_forkserver',
'trio',
)
# NOTE: used-to-be-used-to dyanmically parametrize tests for when
# you just passed --spawn-backend=`mp` on the cli, but now we expect
# that cli input to be manually specified, BUT, maybe we'll do
# something like this again in the future?
if 'start_method' in metafunc.fixturenames:
metafunc.parametrize(
"start_method",
[spawn_backend],
scope='module',
)
# TODO, parametrize any `tpt_proto: str` declaring tests!
# proto_tpts: list[str] = metafunc.config.option.proto_tpts
# if 'tpt_proto' in metafunc.fixturenames:
# metafunc.parametrize(
# 'tpt_proto',
# proto_tpts, # TODO, double check this list usage!
# scope='module',
# )

View File

@ -3280,7 +3280,7 @@ def open_crash_handler(
@cm @cm
def maybe_open_crash_handler( def maybe_open_crash_handler(
pdb: bool = False, pdb: bool|None = None,
tb_hide: bool = True, tb_hide: bool = True,
**kwargs, **kwargs,
@ -3291,7 +3291,11 @@ def maybe_open_crash_handler(
Normally this is used with CLI endpoints such that if the --pdb Normally this is used with CLI endpoints such that if the --pdb
flag is passed the pdb REPL is engaed on any crashes B) flag is passed the pdb REPL is engaed on any crashes B)
''' '''
if pdb is None:
pdb: bool = _state.is_debug_mode()
__tracebackhide__: bool = tb_hide __tracebackhide__: bool = tb_hide
rtctx = nullcontext( rtctx = nullcontext(

View File

@ -270,7 +270,9 @@ def get_logger(
subsys_spec: str|None = None, subsys_spec: str|None = None,
) -> StackLevelAdapter: ) -> StackLevelAdapter:
'''Return the package log or a sub-logger for ``name`` if provided. '''
Return the `tractor`-library root logger or a sub-logger for
`name` if provided.
''' '''
log: Logger log: Logger
@ -282,7 +284,7 @@ def get_logger(
name != _proj_name name != _proj_name
): ):
# NOTE: for handling for modules that use ``get_logger(__name__)`` # NOTE: for handling for modules that use `get_logger(__name__)`
# we make the following stylistic choice: # we make the following stylistic choice:
# - always avoid duplicate project-package token # - always avoid duplicate project-package token
# in msg output: i.e. tractor.tractor.ipc._chan.py in header # in msg output: i.e. tractor.tractor.ipc._chan.py in header
@ -331,7 +333,7 @@ def get_logger(
def get_console_log( def get_console_log(
level: str|None = None, level: str|None = None,
logger: Logger|None = None, logger: Logger|StackLevelAdapter|None = None,
**kwargs, **kwargs,
) -> LoggerAdapter: ) -> LoggerAdapter:
@ -344,12 +346,23 @@ def get_console_log(
Yeah yeah, i know we can use `logging.config.dictConfig()`. You do it. Yeah yeah, i know we can use `logging.config.dictConfig()`. You do it.
''' '''
log = get_logger( # get/create a stack-aware-adapter
logger=logger, if (
**kwargs logger
) # set a root logger and
logger: Logger = log.logger isinstance(logger, StackLevelAdapter)
):
# XXX, for ex. when passed in by a caller wrapping some
# other lib's logger instance with our level-adapter.
log = logger
else:
log: StackLevelAdapter = get_logger(
logger=logger,
**kwargs
)
logger: Logger|StackLevelAdapter = log.logger
if not level: if not level:
return log return log
@ -367,10 +380,7 @@ def get_console_log(
None, None,
) )
): ):
fmt = LOG_FORMAT fmt: str = LOG_FORMAT # always apply our format?
# if logger:
# fmt = None
handler = StreamHandler() handler = StreamHandler()
formatter = colorlog.ColoredFormatter( formatter = colorlog.ColoredFormatter(
fmt=fmt, fmt=fmt,

View File

@ -31,4 +31,5 @@ from ._broadcast import (
) )
from ._beg import ( from ._beg import (
collapse_eg as collapse_eg, collapse_eg as collapse_eg,
maybe_collapse_eg as maybe_collapse_eg,
) )