Merge pull request #380 from goodboy/multi_ipc_testing

Multi ipc testing
main
Bd 2025-07-13 19:19:50 -04:00 committed by GitHub
commit 2248ffb74f
No known key found for this signature in database
GPG Key ID: B5690EEEBB952194
15 changed files with 683 additions and 190 deletions

View File

@ -1,8 +1,5 @@
|logo| ``tractor``: distributed structurred concurrency
|gh_actions|
|docs|
``tractor`` is a `structured concurrency`_ (SC), multi-processing_ runtime built on trio_.
Fundamentally, ``tractor`` provides parallelism via
@ -66,6 +63,13 @@ Features
- (WIP) a ``TaskMngr``: one-cancels-one style nursery supervisor.
Status of `main` / infra
------------------------
- |gh_actions|
- |docs|
Install
-------
``tractor`` is still in a *alpha-near-beta-stage* for many

View File

@ -120,6 +120,7 @@ async def main(
break_parent_ipc_after: int|bool = False,
break_child_ipc_after: int|bool = False,
pre_close: bool = False,
tpt_proto: str = 'tcp',
) -> None:
@ -131,6 +132,7 @@ async def main(
# a hang since it never engages due to broken IPC
debug_mode=debug_mode,
loglevel=loglevel,
enable_transports=[tpt_proto],
) as an,
):
@ -145,7 +147,8 @@ async def main(
_testing.expect_ctxc(
yay=(
break_parent_ipc_after
or break_child_ipc_after
or
break_child_ipc_after
),
# TODO: we CAN'T remove this right?
# since we need the ctxc to bubble up from either

View File

@ -24,10 +24,9 @@ async def spawn_until(depth=0):
async def main():
"""The main ``tractor`` routine.
The process tree should look as approximately as follows when the debugger
first engages:
'''
The process tree should look as approximately as follows when the
debugger first engages:
python examples/debugging/multi_nested_subactors_bp_forever.py
python -m tractor._child --uid ('spawner1', '7eab8462 ...)
@ -37,10 +36,11 @@ async def main():
python -m tractor._child --uid ('spawner0', '1d42012b ...)
python -m tractor._child --uid ('name_error', '6c2733b8 ...)
"""
'''
async with tractor.open_nursery(
debug_mode=True,
loglevel='warning'
loglevel='devx',
enable_transports=['uds'],
) as n:
# spawn both actors

View File

@ -37,6 +37,7 @@ async def main(
enable_stack_on_sig=True,
# maybe_enable_greenback=False,
loglevel='devx',
enable_transports=['uds'],
) as an,
):
ptl: tractor.Portal = await an.start_actor(

View File

@ -1,24 +1,27 @@
"""
``tractor`` testing!!
Top level of the testing suites!
"""
from __future__ import annotations
import sys
import subprocess
import os
import random
import signal
import platform
import time
import pytest
import tractor
from tractor._testing import (
examples_dir as examples_dir,
tractor_test as tractor_test,
expect_ctxc as expect_ctxc,
)
# TODO: include wtv plugin(s) we build in `._testing.pytest`?
pytest_plugins = ['pytester']
pytest_plugins: list[str] = [
'pytester',
'tractor._testing.pytest',
]
# Sending signal.SIGINT on subprocess fails on windows. Use CTRL_* alternatives
if platform.system() == 'Windows':
@ -30,7 +33,11 @@ else:
_KILL_SIGNAL = signal.SIGKILL
_INT_SIGNAL = signal.SIGINT
_INT_RETURN_CODE = 1 if sys.version_info < (3, 8) else -signal.SIGINT.value
_PROC_SPAWN_WAIT = 0.6 if sys.version_info < (3, 7) else 0.4
_PROC_SPAWN_WAIT = (
0.6
if sys.version_info < (3, 7)
else 0.4
)
no_windows = pytest.mark.skipif(
@ -39,7 +46,12 @@ no_windows = pytest.mark.skipif(
)
def pytest_addoption(parser):
def pytest_addoption(
parser: pytest.Parser,
):
# ?TODO? should this be exposed from our `._testing.pytest`
# plugin or should we make it more explicit with `--tl` for
# tractor logging like we do in other client projects?
parser.addoption(
"--ll",
action="store",
@ -47,42 +59,10 @@ def pytest_addoption(parser):
default='ERROR', help="logging level to set when testing"
)
parser.addoption(
"--spawn-backend",
action="store",
dest='spawn_backend',
default='trio',
help="Processing spawning backend to use for test run",
)
parser.addoption(
"--tpdb", "--debug-mode",
action="store_true",
dest='tractor_debug_mode',
# default=False,
help=(
'Enable a flag that can be used by tests to to set the '
'`debug_mode: bool` for engaging the internal '
'multi-proc debugger sys.'
),
)
def pytest_configure(config):
backend = config.option.spawn_backend
tractor._spawn.try_set_start_method(backend)
@pytest.fixture(scope='session')
def debug_mode(request):
debug_mode: bool = request.config.option.tractor_debug_mode
# if debug_mode:
# breakpoint()
return debug_mode
@pytest.fixture(scope='session', autouse=True)
def loglevel(request):
import tractor
orig = tractor.log._default_loglevel
level = tractor.log._default_loglevel = request.config.option.loglevel
tractor.log.get_console_log(level)
@ -90,106 +70,44 @@ def loglevel(request):
tractor.log._default_loglevel = orig
@pytest.fixture(scope='session')
def spawn_backend(request) -> str:
return request.config.option.spawn_backend
# @pytest.fixture(scope='function', autouse=True)
# def debug_enabled(request) -> str:
# from tractor import _state
# if _state._runtime_vars['_debug_mode']:
# breakpoint()
_ci_env: bool = os.environ.get('CI', False)
@pytest.fixture(scope='session')
def ci_env() -> bool:
'''
Detect CI envoirment.
Detect CI environment.
'''
return _ci_env
# TODO: also move this to `._testing` for now?
# -[ ] possibly generalize and re-use for multi-tree spawning
# along with the new stuff for multi-addrs in distribute_dis
# branch?
#
# choose randomly at import time
_reg_addr: tuple[str, int] = (
'127.0.0.1',
random.randint(1000, 9999),
)
@pytest.fixture(scope='session')
def reg_addr() -> tuple[str, int]:
# globally override the runtime to the per-test-session-dynamic
# addr so that all tests never conflict with any other actor
# tree using the default.
from tractor import _root
_root._default_lo_addrs = [_reg_addr]
return _reg_addr
def pytest_generate_tests(metafunc):
spawn_backend = metafunc.config.option.spawn_backend
if not spawn_backend:
# XXX some weird windows bug with `pytest`?
spawn_backend = 'trio'
# TODO: maybe just use the literal `._spawn.SpawnMethodKey`?
assert spawn_backend in (
'mp_spawn',
'mp_forkserver',
'trio',
)
# NOTE: used to be used to dyanmically parametrize tests for when
# you just passed --spawn-backend=`mp` on the cli, but now we expect
# that cli input to be manually specified, BUT, maybe we'll do
# something like this again in the future?
if 'start_method' in metafunc.fixturenames:
metafunc.parametrize("start_method", [spawn_backend], scope='module')
# TODO: a way to let test scripts (like from `examples/`)
# guarantee they won't registry addr collide!
# @pytest.fixture
# def open_test_runtime(
# reg_addr: tuple,
# ) -> AsyncContextManager:
# return partial(
# tractor.open_nursery,
# registry_addrs=[reg_addr],
# )
def sig_prog(proc, sig):
def sig_prog(
proc: subprocess.Popen,
sig: int,
canc_timeout: float = 0.1,
) -> int:
"Kill the actor-process with ``sig``."
proc.send_signal(sig)
time.sleep(0.1)
time.sleep(canc_timeout)
if not proc.poll():
# TODO: why sometimes does SIGINT not work on teardown?
# seems to happen only when trace logging enabled?
proc.send_signal(_KILL_SIGNAL)
ret = proc.wait()
ret: int = proc.wait()
assert ret
# TODO: factor into @cm and move to `._testing`?
@pytest.fixture
def daemon(
debug_mode: bool,
loglevel: str,
testdir,
reg_addr: tuple[str, int],
):
tpt_proto: str,
) -> subprocess.Popen:
'''
Run a daemon root actor as a separate actor-process tree and
"remote registrar" for discovery-protocol related tests.
@ -200,28 +118,100 @@ def daemon(
loglevel: str = 'info'
code: str = (
"import tractor; "
"tractor.run_daemon([], registry_addrs={reg_addrs}, loglevel={ll})"
"import tractor; "
"tractor.run_daemon([], "
"registry_addrs={reg_addrs}, "
"debug_mode={debug_mode}, "
"loglevel={ll})"
).format(
reg_addrs=str([reg_addr]),
ll="'{}'".format(loglevel) if loglevel else None,
debug_mode=debug_mode,
)
cmd: list[str] = [
sys.executable,
'-c', code,
]
# breakpoint()
kwargs = {}
if platform.system() == 'Windows':
# without this, tests hang on windows forever
kwargs['creationflags'] = subprocess.CREATE_NEW_PROCESS_GROUP
proc = testdir.popen(
proc: subprocess.Popen = testdir.popen(
cmd,
stdout=subprocess.PIPE,
stderr=subprocess.PIPE,
**kwargs,
)
assert not proc.returncode
# UDS sockets are **really** fast to bind()/listen()/connect()
# so it's often required that we delay a bit more starting
# the first actor-tree..
if tpt_proto == 'uds':
global _PROC_SPAWN_WAIT
_PROC_SPAWN_WAIT = 0.6
time.sleep(_PROC_SPAWN_WAIT)
assert not proc.returncode
yield proc
sig_prog(proc, _INT_SIGNAL)
# XXX! yeah.. just be reaaal careful with this bc sometimes it
# can lock up on the `_io.BufferedReader` and hang..
stderr: str = proc.stderr.read().decode()
if stderr:
print(
f'Daemon actor tree produced STDERR:\n'
f'{proc.args}\n'
f'\n'
f'{stderr}\n'
)
if proc.returncode != -2:
raise RuntimeError(
'Daemon actor tree failed !?\n'
f'{proc.args}\n'
)
# @pytest.fixture(autouse=True)
# def shared_last_failed(pytestconfig):
# val = pytestconfig.cache.get("example/value", None)
# breakpoint()
# if val is None:
# pytestconfig.cache.set("example/value", val)
# return val
# TODO: a way to let test scripts (like from `examples/`)
# guarantee they won't `registry_addrs` collide!
# -[ ] maybe use some kinda standard `def main()` arg-spec that
# we can introspect from a fixture that is called from the test
# body?
# -[ ] test and figure out typing for below prototype! Bp
#
# @pytest.fixture
# def set_script_runtime_args(
# reg_addr: tuple,
# ) -> Callable[[...], None]:
# def import_n_partial_in_args_n_triorun(
# script: Path, # under examples?
# **runtime_args,
# ) -> Callable[[], Any]: # a `partial`-ed equiv of `trio.run()`
# # NOTE, below is taken from
# # `.test_advanced_faults.test_ipc_channel_break_during_stream`
# mod: ModuleType = import_path(
# examples_dir() / 'advanced_faults'
# / 'ipc_failure_during_stream.py',
# root=examples_dir(),
# consider_namespace_packages=False,
# )
# return partial(
# trio.run,
# partial(
# mod.main,
# **runtime_args,
# )
# )
# return import_n_partial_in_args_n_triorun

View File

@ -0,0 +1,4 @@
'''
`tractor.ipc` subsystem(s)/unit testing suites.
'''

View File

@ -0,0 +1,95 @@
'''
Verify the `enable_transports` param drives various
per-root/sub-actor IPC endpoint/server settings.
'''
from __future__ import annotations
import pytest
import trio
import tractor
from tractor import (
Actor,
Portal,
ipc,
msg,
_state,
_addr,
)
@tractor.context
async def chk_tpts(
ctx: tractor.Context,
tpt_proto_key: str,
):
rtvars = _state._runtime_vars
assert (
tpt_proto_key
in
rtvars['_enable_tpts']
)
actor: Actor = tractor.current_actor()
spec: msg.types.SpawnSpec = actor._spawn_spec
assert spec._runtime_vars == rtvars
# ensure individual IPC ep-addr types
serv: ipc._server.Server = actor.ipc_server
addr: ipc._types.Address
for addr in serv.addrs:
assert addr.proto_key == tpt_proto_key
# Actor delegate-props enforcement
assert (
actor.accept_addrs
==
serv.accept_addrs
)
await ctx.started(serv.accept_addrs)
# TODO, parametrize over mis-matched-proto-typed `registry_addrs`
# since i seems to work in `piker` but not exactly sure if both tcp
# & uds are being deployed then?
#
@pytest.mark.parametrize(
'tpt_proto_key',
['tcp', 'uds'],
ids=lambda item: f'ipc_tpt={item!r}'
)
def test_root_passes_tpt_to_sub(
tpt_proto_key: str,
reg_addr: tuple,
debug_mode: bool,
):
async def main():
async with tractor.open_nursery(
enable_transports=[tpt_proto_key],
registry_addrs=[reg_addr],
debug_mode=debug_mode,
) as an:
assert (
tpt_proto_key
in
_state._runtime_vars['_enable_tpts']
)
ptl: Portal = await an.start_actor(
name='sub',
enable_modules=[__name__],
)
async with ptl.open_context(
chk_tpts,
tpt_proto_key=tpt_proto_key,
) as (ctx, accept_addrs):
uw_addr: tuple
for uw_addr in accept_addrs:
addr = _addr.wrap_address(uw_addr)
assert addr.is_valid
# shudown sub-actor(s)
await an.cancel()
trio.run(main)

View File

@ -0,0 +1,72 @@
'''
High-level `.ipc._server` unit tests.
'''
from __future__ import annotations
import pytest
import trio
from tractor import (
devx,
ipc,
log,
)
from tractor._testing.addr import (
get_rando_addr,
)
# TODO, use/check-roundtripping with some of these wrapper types?
#
# from .._addr import Address
# from ._chan import Channel
# from ._transport import MsgTransport
# from ._uds import UDSAddress
# from ._tcp import TCPAddress
@pytest.mark.parametrize(
'_tpt_proto',
['uds', 'tcp']
)
def test_basic_ipc_server(
_tpt_proto: str,
debug_mode: bool,
loglevel: str,
):
# so we see the socket-listener reporting on console
log.get_console_log("INFO")
rando_addr: tuple = get_rando_addr(
tpt_proto=_tpt_proto,
)
async def main():
async with ipc._server.open_ipc_server() as server:
assert (
server._parent_tn
and
server._parent_tn is server._stream_handler_tn
)
assert server._no_more_peers.is_set()
eps: list[ipc._server.Endpoint] = await server.listen_on(
accept_addrs=[rando_addr],
stream_handler_nursery=None,
)
assert (
len(eps) == 1
and
(ep := eps[0])._listener
and
not ep.peer_tpts
)
server._parent_tn.cancel_scope.cancel()
# !TODO! actually make a bg-task connection from a client
# using `ipc._chan._connect_chan()`
with devx.maybe_open_crash_handler(
pdb=debug_mode,
):
trio.run(main)

View File

@ -10,6 +10,9 @@ import pytest
from _pytest.pathlib import import_path
import trio
import tractor
from tractor import (
TransportClosed,
)
from tractor._testing import (
examples_dir,
break_ipc,
@ -74,6 +77,7 @@ def test_ipc_channel_break_during_stream(
spawn_backend: str,
ipc_break: dict|None,
pre_aclose_msgstream: bool,
tpt_proto: str,
):
'''
Ensure we can have an IPC channel break its connection during
@ -91,7 +95,7 @@ def test_ipc_channel_break_during_stream(
# non-`trio` spawners should never hit the hang condition that
# requires the user to do ctl-c to cancel the actor tree.
# expect_final_exc = trio.ClosedResourceError
expect_final_exc = tractor.TransportClosed
expect_final_exc = TransportClosed
mod: ModuleType = import_path(
examples_dir() / 'advanced_faults'
@ -104,6 +108,8 @@ def test_ipc_channel_break_during_stream(
# period" wherein the user eventually hits ctl-c to kill the
# root-actor tree.
expect_final_exc: BaseException = KeyboardInterrupt
expect_final_cause: BaseException|None = None
if (
# only expect EoC if trans is broken on the child side,
ipc_break['break_child_ipc_after'] is not False
@ -138,6 +144,9 @@ def test_ipc_channel_break_during_stream(
# a user sending ctl-c by raising a KBI.
if pre_aclose_msgstream:
expect_final_exc = KeyboardInterrupt
if tpt_proto == 'uds':
expect_final_exc = TransportClosed
expect_final_cause = trio.BrokenResourceError
# XXX OLD XXX
# if child calls `MsgStream.aclose()` then expect EoC.
@ -157,6 +166,10 @@ def test_ipc_channel_break_during_stream(
if pre_aclose_msgstream:
expect_final_exc = KeyboardInterrupt
if tpt_proto == 'uds':
expect_final_exc = TransportClosed
expect_final_cause = trio.BrokenResourceError
# NOTE when the parent IPC side dies (even if the child does as well
# but the child fails BEFORE the parent) we always expect the
# IPC layer to raise a closed-resource, NEVER do we expect
@ -169,8 +182,8 @@ def test_ipc_channel_break_during_stream(
and
ipc_break['break_child_ipc_after'] is False
):
# expect_final_exc = trio.ClosedResourceError
expect_final_exc = tractor.TransportClosed
expect_final_cause = trio.ClosedResourceError
# BOTH but, PARENT breaks FIRST
elif (
@ -181,8 +194,8 @@ def test_ipc_channel_break_during_stream(
ipc_break['break_parent_ipc_after']
)
):
# expect_final_exc = trio.ClosedResourceError
expect_final_exc = tractor.TransportClosed
expect_final_cause = trio.ClosedResourceError
with pytest.raises(
expected_exception=(
@ -198,6 +211,7 @@ def test_ipc_channel_break_during_stream(
start_method=spawn_backend,
loglevel=loglevel,
pre_close=pre_aclose_msgstream,
tpt_proto=tpt_proto,
**ipc_break,
)
)
@ -220,10 +234,15 @@ def test_ipc_channel_break_during_stream(
)
cause: Exception = tc.__cause__
assert (
type(cause) is trio.ClosedResourceError
and
cause.args[0] == 'another task closed this fd'
# type(cause) is trio.ClosedResourceError
type(cause) is expect_final_cause
# TODO, should we expect a certain exc-message (per
# tpt) as well??
# and
# cause.args[0] == 'another task closed this fd'
)
raise
# get raw instance from pytest wrapper

View File

@ -7,7 +7,9 @@ import platform
from functools import partial
import itertools
import psutil
import pytest
import subprocess
import tractor
from tractor._testing import tractor_test
import trio
@ -152,13 +154,23 @@ async def unpack_reg(actor_or_portal):
async def spawn_and_check_registry(
reg_addr: tuple,
use_signal: bool,
debug_mode: bool = False,
remote_arbiter: bool = False,
with_streaming: bool = False,
maybe_daemon: tuple[
subprocess.Popen,
psutil.Process,
]|None = None,
) -> None:
if maybe_daemon:
popen, proc = maybe_daemon
# breakpoint()
async with tractor.open_root_actor(
registry_addrs=[reg_addr],
debug_mode=debug_mode,
):
async with tractor.get_registry(reg_addr) as portal:
# runtime needs to be up to call this
@ -176,11 +188,11 @@ async def spawn_and_check_registry(
extra = 2 # local root actor + remote arbiter
# ensure current actor is registered
registry = await get_reg()
registry: dict = await get_reg()
assert actor.uid in registry
try:
async with tractor.open_nursery() as n:
async with tractor.open_nursery() as an:
async with trio.open_nursery(
strict_exception_groups=False,
) as trion:
@ -189,17 +201,17 @@ async def spawn_and_check_registry(
for i in range(3):
name = f'a{i}'
if with_streaming:
portals[name] = await n.start_actor(
portals[name] = await an.start_actor(
name=name, enable_modules=[__name__])
else: # no streaming
portals[name] = await n.run_in_actor(
portals[name] = await an.run_in_actor(
trio.sleep_forever, name=name)
# wait on last actor to come up
async with tractor.wait_for_actor(name):
registry = await get_reg()
for uid in n._children:
for uid in an._children:
assert uid in registry
assert len(portals) + extra == len(registry)
@ -232,6 +244,7 @@ async def spawn_and_check_registry(
@pytest.mark.parametrize('use_signal', [False, True])
@pytest.mark.parametrize('with_streaming', [False, True])
def test_subactors_unregister_on_cancel(
debug_mode: bool,
start_method,
use_signal,
reg_addr,
@ -248,6 +261,7 @@ def test_subactors_unregister_on_cancel(
spawn_and_check_registry,
reg_addr,
use_signal,
debug_mode=debug_mode,
remote_arbiter=False,
with_streaming=with_streaming,
),
@ -257,7 +271,8 @@ def test_subactors_unregister_on_cancel(
@pytest.mark.parametrize('use_signal', [False, True])
@pytest.mark.parametrize('with_streaming', [False, True])
def test_subactors_unregister_on_cancel_remote_daemon(
daemon,
daemon: subprocess.Popen,
debug_mode: bool,
start_method,
use_signal,
reg_addr,
@ -273,8 +288,13 @@ def test_subactors_unregister_on_cancel_remote_daemon(
spawn_and_check_registry,
reg_addr,
use_signal,
debug_mode=debug_mode,
remote_arbiter=True,
with_streaming=with_streaming,
maybe_daemon=(
daemon,
psutil.Process(daemon.pid)
),
),
)
@ -373,7 +393,7 @@ def test_close_channel_explicit(
@pytest.mark.parametrize('use_signal', [False, True])
def test_close_channel_explicit_remote_arbiter(
daemon,
daemon: subprocess.Popen,
start_method,
use_signal,
reg_addr,

View File

@ -100,16 +100,29 @@ async def streamer(
@acm
async def open_stream() -> Awaitable[tractor.MsgStream]:
async with tractor.open_nursery() as tn:
portal = await tn.start_actor('streamer', enable_modules=[__name__])
async with (
portal.open_context(streamer) as (ctx, first),
ctx.open_stream() as stream,
):
yield stream
try:
async with tractor.open_nursery() as an:
portal = await an.start_actor(
'streamer',
enable_modules=[__name__],
)
async with (
portal.open_context(streamer) as (ctx, first),
ctx.open_stream() as stream,
):
yield stream
await portal.cancel_actor()
print('CANCELLED STREAMER')
print('Cancelling streamer')
await portal.cancel_actor()
print('Cancelled streamer')
except Exception as err:
print(
f'`open_stream()` errored?\n'
f'{err!r}\n'
)
await tractor.pause(shield=True)
raise err
@acm
@ -132,19 +145,28 @@ async def maybe_open_stream(taskname: str):
yield stream
def test_open_local_sub_to_stream():
def test_open_local_sub_to_stream(
debug_mode: bool,
):
'''
Verify a single inter-actor stream can can be fanned-out shared to
N local tasks using ``trionics.maybe_open_context():``.
N local tasks using `trionics.maybe_open_context()`.
'''
timeout: float = 3.6 if platform.system() != "Windows" else 10
timeout: float = 3.6
if platform.system() == "Windows":
timeout: float = 10
if debug_mode:
timeout = 999
async def main():
full = list(range(1000))
async def get_sub_and_pull(taskname: str):
stream: tractor.MsgStream
async with (
maybe_open_stream(taskname) as stream,
):
@ -165,17 +187,27 @@ def test_open_local_sub_to_stream():
assert set(seq).issubset(set(full))
print(f'{taskname} finished')
with trio.fail_after(timeout):
with trio.fail_after(timeout) as cs:
# TODO: turns out this isn't multi-task entrant XD
# We probably need an indepotent entry semantic?
async with tractor.open_root_actor():
async with tractor.open_root_actor(
debug_mode=debug_mode,
):
async with (
trio.open_nursery() as nurse,
trio.open_nursery() as tn,
):
for i in range(10):
nurse.start_soon(get_sub_and_pull, f'task_{i}')
tn.start_soon(
get_sub_and_pull,
f'task_{i}',
)
await trio.sleep(0.001)
print('all consumer tasks finished')
if cs.cancelled_caught:
pytest.fail(
'Should NOT time out in `open_root_actor()` ?'
)
trio.run(main)

View File

@ -180,7 +180,8 @@ def test_acm_embedded_nursery_propagates_enter_err(
with tractor.devx.maybe_open_crash_handler(
pdb=debug_mode,
) as bxerr:
assert not bxerr.value
if bxerr:
assert not bxerr.value
async with (
wraps_tn_that_always_cancels() as tn,

View File

@ -0,0 +1,70 @@
# tractor: structured concurrent "actors".
# Copyright 2018-eternity Tyler Goodlet.
# This program is free software: you can redistribute it and/or modify
# it under the terms of the GNU Affero General Public License as published by
# the Free Software Foundation, either version 3 of the License, or
# (at your option) any later version.
# This program is distributed in the hope that it will be useful,
# but WITHOUT ANY WARRANTY; without even the implied warranty of
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
# GNU Affero General Public License for more details.
# You should have received a copy of the GNU Affero General Public License
# along with this program. If not, see <https://www.gnu.org/licenses/>.
'''
Random IPC addr generation for isolating
the discovery space between test sessions.
Might be eventually useful to expose as a util set from
our `tractor.discovery` subsys?
'''
import random
from typing import (
Type,
)
from tractor import (
_addr,
)
def get_rando_addr(
tpt_proto: str,
*,
# choose random port at import time
_rando_port: str = random.randint(1000, 9999)
) -> tuple[str, str|int]:
'''
Used to globally override the runtime to the
per-test-session-dynamic addr so that all tests never conflict
with any other actor tree using the default.
'''
addr_type: Type[_addr.Addres] = _addr._address_types[tpt_proto]
def_reg_addr: tuple[str, int] = _addr._default_lo_addrs[tpt_proto]
# this is the "unwrapped" form expected to be passed to
# `.open_root_actor()` by test body.
testrun_reg_addr: tuple[str, int|str]
match tpt_proto:
case 'tcp':
testrun_reg_addr = (
addr_type.def_bindspace,
_rando_port,
)
# NOTE, file-name uniqueness (no-collisions) will be based on
# the runtime-directory and root (pytest-proc's) pid.
case 'uds':
testrun_reg_addr = addr_type.get_random().unwrap()
# XXX, as sanity it should never the same as the default for the
# host-singleton registry actor.
assert def_reg_addr != testrun_reg_addr
return testrun_reg_addr

View File

@ -26,29 +26,46 @@ from functools import (
import inspect
import platform
import pytest
import tractor
import trio
def tractor_test(fn):
'''
Decorator for async test funcs to present them as "native"
looking sync funcs runnable by `pytest` using `trio.run()`.
Decorator for async test fns to decorator-wrap them as "native"
looking sync funcs runnable by `pytest` and auto invoked with
`trio.run()` (much like the `pytest-trio` plugin's approach).
Use:
Further the test fn body will be invoked AFTER booting the actor
runtime, i.e. from inside a `tractor.open_root_actor()` block AND
with various runtime and tooling parameters implicitly passed as
requested by by the test session's config; see immediately below.
@tractor_test
async def test_whatever():
await ...
Basic deco use:
---------------
If fixtures:
@tractor_test
async def test_whatever():
await ...
- ``reg_addr`` (a socket addr tuple where arbiter is listening)
- ``loglevel`` (logging level passed to tractor internals)
- ``start_method`` (subprocess spawning backend)
are defined in the `pytest` fixture space they will be automatically
injected to tests declaring these funcargs.
Runtime config via special fixtures:
------------------------------------
If any of the following fixture are requested by the wrapped test
fn (via normal func-args declaration),
- `reg_addr` (a socket addr tuple where arbiter is listening)
- `loglevel` (logging level passed to tractor internals)
- `start_method` (subprocess spawning backend)
(TODO support)
- `tpt_proto` (IPC transport protocol key)
they will be automatically injected to each test as normally
expected as well as passed to the initial
`tractor.open_root_actor()` funcargs.
'''
@wraps(fn)
def wrapper(
@ -111,3 +128,164 @@ def tractor_test(fn):
return trio.run(main)
return wrapper
def pytest_addoption(
parser: pytest.Parser,
):
# parser.addoption(
# "--ll",
# action="store",
# dest='loglevel',
# default='ERROR', help="logging level to set when testing"
# )
parser.addoption(
"--spawn-backend",
action="store",
dest='spawn_backend',
default='trio',
help="Processing spawning backend to use for test run",
)
parser.addoption(
"--tpdb",
"--debug-mode",
action="store_true",
dest='tractor_debug_mode',
# default=False,
help=(
'Enable a flag that can be used by tests to to set the '
'`debug_mode: bool` for engaging the internal '
'multi-proc debugger sys.'
),
)
# provide which IPC transport protocols opting-in test suites
# should accumulatively run against.
parser.addoption(
"--tpt-proto",
nargs='+', # accumulate-multiple-args
action="store",
dest='tpt_protos',
default=['tcp'],
help="Transport protocol to use under the `tractor.ipc.Channel`",
)
def pytest_configure(config):
backend = config.option.spawn_backend
tractor._spawn.try_set_start_method(backend)
@pytest.fixture(scope='session')
def debug_mode(request) -> bool:
'''
Flag state for whether `--tpdb` (for `tractor`-py-debugger)
was passed to the test run.
Normally tests should pass this directly to `.open_root_actor()`
to allow the user to opt into suite-wide crash handling.
'''
debug_mode: bool = request.config.option.tractor_debug_mode
return debug_mode
@pytest.fixture(scope='session')
def spawn_backend(request) -> str:
return request.config.option.spawn_backend
@pytest.fixture(scope='session')
def tpt_protos(request) -> list[str]:
# allow quoting on CLI
proto_keys: list[str] = [
proto_key.replace('"', '').replace("'", "")
for proto_key in request.config.option.tpt_protos
]
# ?TODO, eventually support multiple protos per test-sesh?
if len(proto_keys) > 1:
pytest.fail(
'We only support one `--tpt-proto <key>` atm!\n'
)
# XXX ensure we support the protocol by name via lookup!
for proto_key in proto_keys:
addr_type = tractor._addr._address_types[proto_key]
assert addr_type.proto_key == proto_key
yield proto_keys
@pytest.fixture(
scope='session',
autouse=True,
)
def tpt_proto(
tpt_protos: list[str],
) -> str:
proto_key: str = tpt_protos[0]
from tractor import _state
if _state._def_tpt_proto != proto_key:
_state._def_tpt_proto = proto_key
yield proto_key
@pytest.fixture(scope='session')
def reg_addr(
tpt_proto: str,
) -> tuple[str, int|str]:
'''
Deliver a test-sesh unique registry address such
that each run's (tests which use this fixture) will
have no conflicts/cross-talk when running simultaneously
nor will interfere with other live `tractor` apps active
on the same network-host (namespace).
'''
from tractor._testing.addr import get_rando_addr
return get_rando_addr(
tpt_proto=tpt_proto,
)
def pytest_generate_tests(
metafunc: pytest.Metafunc,
):
spawn_backend: str = metafunc.config.option.spawn_backend
if not spawn_backend:
# XXX some weird windows bug with `pytest`?
spawn_backend = 'trio'
# TODO: maybe just use the literal `._spawn.SpawnMethodKey`?
assert spawn_backend in (
'mp_spawn',
'mp_forkserver',
'trio',
)
# NOTE: used-to-be-used-to dyanmically parametrize tests for when
# you just passed --spawn-backend=`mp` on the cli, but now we expect
# that cli input to be manually specified, BUT, maybe we'll do
# something like this again in the future?
if 'start_method' in metafunc.fixturenames:
metafunc.parametrize(
"start_method",
[spawn_backend],
scope='module',
)
# TODO, parametrize any `tpt_proto: str` declaring tests!
# proto_tpts: list[str] = metafunc.config.option.proto_tpts
# if 'tpt_proto' in metafunc.fixturenames:
# metafunc.parametrize(
# 'tpt_proto',
# proto_tpts, # TODO, double check this list usage!
# scope='module',
# )

View File

@ -289,7 +289,7 @@ async def maybe_wait_on_canced_subs(
#
# -[x] maybe change to mod-func and rename for implied
# multi-transport semantics?
# -[ ] register each stream/tpt/chan with the owning `IPCEndpoint`
# -[ ] register each stream/tpt/chan with the owning `Endpoint`
# so that we can query per tpt all peer contact infos?
# |_[ ] possibly provide a global viewing via a
# `collections.ChainMap`?
@ -309,7 +309,7 @@ async def handle_stream_from_peer(
any `IPCServer.listen_on()` passed `stream_handler_tn: Nursery`
such that it is invoked as,
IPCEndpoint.stream_handler_tn.start_soon(
Endpoint.stream_handler_tn.start_soon(
handle_stream,
stream,
)
@ -577,7 +577,7 @@ async def handle_stream_from_peer(
# finally block closure
class IPCEndpoint(Struct):
class Endpoint(Struct):
'''
An instance of an IPC "bound" address where the lifetime of the
"ability to accept connections" (from clients) and then handle
@ -636,7 +636,7 @@ class IPCEndpoint(Struct):
)
class IPCServer(Struct):
class Server(Struct):
_parent_tn: Nursery
_stream_handler_tn: Nursery
# level-triggered sig for whether "no peers are currently
@ -644,7 +644,7 @@ class IPCServer(Struct):
# initialized with `.is_set() == True`.
_no_more_peers: trio.Event
_endpoints: list[IPCEndpoint] = []
_endpoints: list[Endpoint] = []
# connection tracking & mgmt
_peers: defaultdict[
@ -659,10 +659,10 @@ class IPCServer(Struct):
# syncs for setup/teardown sequences
_shutdown: trio.Event|None = None
# TODO, maybe just make `._endpoints: list[IPCEndpoint]` and
# TODO, maybe just make `._endpoints: list[Endpoint]` and
# provide dict-views onto it?
# @property
# def addrs2eps(self) -> dict[Address, IPCEndpoint]:
# def addrs2eps(self) -> dict[Address, Endpoint]:
# ...
@property
@ -708,7 +708,7 @@ class IPCServer(Struct):
await self._shutdown.wait()
else:
tpt_protos: list[str] = []
ep: IPCEndpoint
ep: Endpoint
for ep in self._endpoints:
tpt_protos.append(ep.addr.proto_key)
@ -790,7 +790,7 @@ class IPCServer(Struct):
def epsdict(self) -> dict[
Address,
IPCEndpoint,
Endpoint,
]:
return {
ep.addr: ep
@ -804,7 +804,7 @@ class IPCServer(Struct):
return ev.is_set()
def pformat(self) -> str:
eps: list[IPCEndpoint] = self._endpoints
eps: list[Endpoint] = self._endpoints
state_repr: str = (
f'{len(eps)!r} IPC-endpoints active'
@ -835,13 +835,13 @@ class IPCServer(Struct):
# TODO? maybe allow shutting down a `.listen_on()`s worth of
# listeners by cancelling the corresponding
# `IPCEndpoint._listen_tn` only ?
# `Endpoint._listen_tn` only ?
# -[ ] in theory you could use this to
# "boot-and-wait-for-reconnect" of all current and connecting
# peers?
# |_ would require that the stream-handler is intercepted so we
# can intercept every `MsgTransport` (stream) and track per
# `IPCEndpoint` likely?
# `Endpoint` likely?
#
# async def unlisten(
# self,
@ -854,7 +854,7 @@ class IPCServer(Struct):
*,
accept_addrs: list[tuple[str, int|str]]|None = None,
stream_handler_nursery: Nursery|None = None,
) -> list[IPCEndpoint]:
) -> list[Endpoint]:
'''
Start `SocketListeners` (i.e. bind and call `socket.listen()`)
for all IPC-transport-protocol specific `Address`-types
@ -888,7 +888,7 @@ class IPCServer(Struct):
f'Binding to endpoints for,\n'
f'{accept_addrs}\n'
)
eps: list[IPCEndpoint] = await self._parent_tn.start(
eps: list[Endpoint] = await self._parent_tn.start(
partial(
_serve_ipc_eps,
server=self,
@ -904,7 +904,7 @@ class IPCServer(Struct):
self._endpoints.extend(eps)
# XXX, just a little bit of sanity
group_tn: Nursery|None = None
ep: IPCEndpoint
ep: Endpoint
for ep in eps:
if ep.addr not in self.addrs:
breakpoint()
@ -917,6 +917,10 @@ class IPCServer(Struct):
return eps
# alias until we decide on final naming
IPCServer = Server
async def _serve_ipc_eps(
*,
server: IPCServer,
@ -941,12 +945,12 @@ async def _serve_ipc_eps(
listen_tn: Nursery
async with trio.open_nursery() as listen_tn:
eps: list[IPCEndpoint] = []
eps: list[Endpoint] = []
# XXX NOTE, required to call `serve_listeners()` below.
# ?TODO, maybe just pass `list(eps.values()` tho?
listeners: list[trio.abc.Listener] = []
for addr in listen_addrs:
ep = IPCEndpoint(
ep = Endpoint(
addr=addr,
listen_tn=listen_tn,
stream_handler_tn=stream_handler_tn,
@ -1010,7 +1014,7 @@ async def _serve_ipc_eps(
finally:
if eps:
addr: Address
ep: IPCEndpoint
ep: Endpoint
for addr, ep in server.epsdict().items():
ep.close_listener()
server._endpoints.remove(ep)