Add (back) a `tractor._testing` sub-pkg

Since importing from our top level `conftest.py` is not scaleable
or as "future forward thinking" in terms of:
- LoC-wise (it's only one file),
- prevents "external" (aka non-test) example scripts from importing
  content easily,
- seemingly(?) can't be used via abs-import if using
  a `[tool.pytest.ini_options]` in a `pyproject.toml` vs.
  a `pytest.ini`, see:
  https://docs.pytest.org/en/8.0.x/reference/customize.html#pyproject-toml)

=> Go back to having an internal "testing" pkg like `trio` (kinda) does.

Deats:
- move generic top level helpers into pkg-mod including the new
  `expect_ctxc()` (which i needed in the advanced faults testing script.
- move `@tractor_test` into `._testing.pytest` sub-mod.
- adjust all the helper imports to be a `from tractor._testing import <..>`

Rework `test_ipc_channel_break_during_stream()` and backing script:
- make test(s) pull `debug_mode` from new fixture (which is now
  controlled manually from `--tpdb` flag) and drop the previous
  parametrized input.
- update logic in ^ test for "which-side-fails" cases to better match
  recently updated/stricter cancel/failure semantics in terms of
  `ClosedResouruceError` vs. `EndOfChannel` expectations.
- handle `ExceptionGroup`s with expected embedded errors in test.
- better pendantics around whether to expect a user simulated KBI.
- for `examples/advanced_faults/ipc_failure_during_stream.py` script:
  - generalize ipc breakage in new `break_ipc()` with support for diff
    internal `trio` methods and a #TODO for future disti frameworks
  - only make one sub-actor task break and the other just stream.
  - use new `._testing.expect_ctxc()` around ctx block.
  - add a bit of exception handling with `print()`s around ctxc (unused
    except if 'msg' break method is set) and eoc cases.
  - don't break parent side ipc in loop any more then once
    after first break, checked via flag var.
  - add a `pre_close: bool` flag to control whether
    `MsgStreama.aclose()` is called *before* any ipc breakage method.

Still TODO:
- drop `pytest.ini` and add the alt section to `pyproject.py`.
 -> currently can't get `--rootdir=` opt to work.. not showing in
   console header.
 -> ^ also breaks on 'tests' `enable_modules` imports in subactors
   during discovery tests?
modden_spawn_from_client_req
Tyler Goodlet 2024-03-12 15:48:20 -04:00
parent 6533285d7d
commit 96992bcbb9
20 changed files with 535 additions and 258 deletions

View File

@ -6,47 +6,120 @@ been an outage) and we want to ensure that despite being in debug mode
actor tree will eventually be cancelled without leaving any zombies.
'''
import trio
from functools import partial
from tractor import (
open_nursery,
context,
Context,
ContextCancelled,
MsgStream,
_testing,
)
import trio
async def break_channel_silently_then_error(
async def break_ipc(
stream: MsgStream,
method: str|None = None,
pre_close: bool = False,
def_method: str = 'eof',
) -> None:
'''
XXX: close the channel right after an error is raised
purposely breaking the IPC transport to make sure the parent
doesn't get stuck in debug or hang on the connection join.
this more or less simulates an infinite msg-receive hang on
the other end.
'''
# close channel via IPC prot msging before
# any transport breakage
if pre_close:
await stream.aclose()
method: str = method or def_method
match method:
case 'trans_aclose':
await stream._ctx.chan.transport.stream.aclose()
case 'eof':
await stream._ctx.chan.transport.stream.send_eof()
case 'msg':
await stream._ctx.chan.send(None)
# TODO: the actual real-world simulated cases like
# transport layer hangs and/or lower layer 2-gens type
# scenarios..
#
# -[ ] already have some issues for this general testing
# area:
# - https://github.com/goodboy/tractor/issues/97
# - https://github.com/goodboy/tractor/issues/124
# - PR from @guille:
# https://github.com/goodboy/tractor/pull/149
# case 'hang':
# TODO: framework research:
#
# - https://github.com/GuoTengda1993/pynetem
# - https://github.com/shopify/toxiproxy
# - https://manpages.ubuntu.com/manpages/trusty/man1/wirefilter.1.html
case _:
raise RuntimeError(
f'IPC break method unsupported: {method}'
)
async def break_ipc_then_error(
stream: MsgStream,
break_ipc_with: str|None = None,
pre_close: bool = False,
):
async for msg in stream:
await stream.send(msg)
# XXX: close the channel right after an error is raised
# purposely breaking the IPC transport to make sure the parent
# doesn't get stuck in debug or hang on the connection join.
# this more or less simulates an infinite msg-receive hang on
# the other end.
await stream._ctx.chan.send(None)
await break_ipc(
stream=stream,
method=break_ipc_with,
pre_close=pre_close,
)
assert 0
async def close_stream_and_error(
# async def close_stream_and_error(
async def iter_ipc_stream(
stream: MsgStream,
break_ipc_with: str|None = None,
pre_close: bool = False,
):
async for msg in stream:
await stream.send(msg)
# wipe out channel right before raising
await stream._ctx.chan.send(None)
await stream.aclose()
assert 0
# await break_ipc(
# stream=stream,
# method=break_ipc_with,
# pre_close=pre_close,
# )
# send channel close msg at SC-prot level
#
# TODO: what should get raised here if anything?
# await stream.aclose()
# assert 0
@context
async def recv_and_spawn_net_killers(
ctx: Context,
break_ipc_after: bool | int = False,
break_ipc_after: bool|int = False,
pre_close: bool = False,
) -> None:
'''
@ -63,27 +136,42 @@ async def recv_and_spawn_net_killers(
await stream.send(i)
if (
break_ipc_after
and i > break_ipc_after
and
i > break_ipc_after
):
'#################################\n'
'Simulating child-side IPC BREAK!\n'
'#################################'
n.start_soon(break_channel_silently_then_error, stream)
n.start_soon(close_stream_and_error, stream)
'Simulating CHILD-side IPC BREAK!\n'
'#################################\n'
n.start_soon(
partial(
break_ipc_then_error,
stream=stream,
pre_close=pre_close,
)
)
n.start_soon(
iter_ipc_stream,
stream,
)
async def main(
debug_mode: bool = False,
start_method: str = 'trio',
loglevel: str = 'cancel',
# by default we break the parent IPC first (if configured to break
# at all), but this can be changed so the child does first (even if
# both are set to break).
break_parent_ipc_after: int | bool = False,
break_child_ipc_after: int | bool = False,
break_parent_ipc_after: int|bool = False,
break_child_ipc_after: int|bool = False,
pre_close: bool = False,
) -> None:
# from tractor._state import _runtime_vars as rtv
# rtv['_debug_mode'] = debug_mode
async with (
open_nursery(
start_method=start_method,
@ -91,57 +179,107 @@ async def main(
# NOTE: even debugger is used we shouldn't get
# a hang since it never engages due to broken IPC
debug_mode=debug_mode,
loglevel='warning',
loglevel=loglevel,
) as an,
):
sub_name: str = 'chitty_hijo'
portal = await an.start_actor(
'chitty_hijo',
sub_name,
enable_modules=[__name__],
)
async with portal.open_context(
recv_and_spawn_net_killers,
break_ipc_after=break_child_ipc_after,
) as (ctx, sent):
async with (
_testing.expect_ctxc(
yay=(
break_parent_ipc_after
or break_child_ipc_after,
),
# TODO: we CAN'T remove this right?
# since we need the ctxc to bubble up from either
# the stream API after the `None` msg is sent
# (which actually implicitly cancels all remote
# tasks in the hijo) or from simluated
# KBI-mash-from-user
# or should we expect that a KBI triggers the ctxc
# and KBI in an eg?
reraise=True,
),
portal.open_context(
recv_and_spawn_net_killers,
break_ipc_after=break_child_ipc_after,
pre_close=pre_close,
) as (ctx, sent),
):
ipc_break_sent: bool = False
async with ctx.open_stream() as stream:
for i in range(1000):
if (
break_parent_ipc_after
and i > break_parent_ipc_after
and
i > break_parent_ipc_after
and
not ipc_break_sent
):
print(
'#################################\n'
'Simulating parent-side IPC BREAK!\n'
'#################################'
'Simulating PARENT-side IPC BREAK!\n'
'#################################\n'
)
await stream._ctx.chan.send(None)
# await stream._ctx.chan.send(None)
# await stream._ctx.chan.transport.stream.send_eof()
await stream._ctx.chan.transport.stream.aclose()
ipc_break_sent = True
# it actually breaks right here in the
# mp_spawn/forkserver backends and thus the zombie
# reaper never even kicks in?
print(f'parent sending {i}')
await stream.send(i)
try:
await stream.send(i)
except ContextCancelled as ctxc:
print(
'parent received ctxc on `stream.send()`\n'
f'{ctxc}\n'
)
assert 'root' in ctxc.canceller
assert sub_name in ctx.canceller
with trio.move_on_after(2) as cs:
# TODO: is this needed or no?
raise
timeout: int = 1
print(f'Entering `stream.receive()` with timeout={timeout}\n')
with trio.move_on_after(timeout) as cs:
# NOTE: in the parent side IPC failure case this
# will raise an ``EndOfChannel`` after the child
# is killed and sends a stop msg back to it's
# caller/this-parent.
rx = await stream.receive()
print(f"I'm a happy user and echoed to me is {rx}")
try:
rx = await stream.receive()
print(
"I'm a happy PARENT user and echoed to me is\n"
f'{rx}\n'
)
except trio.EndOfChannel:
print('MsgStream got EoC for PARENT')
raise
if cs.cancelled_caught:
# pretend to be a user seeing no streaming action
# thinking it's a hang, and then hitting ctl-c..
print("YOO i'm a user anddd thingz hangin..")
print(
f"YOO i'm a PARENT user anddd thingz hangin..\n"
f'after timeout={timeout}\n'
)
print(
"YOO i'm mad send side dun but thingz hangin..\n"
"YOO i'm mad!\n"
'The send side is dun but thingz hangin..\n'
'MASHING CTlR-C Ctl-c..'
)
raise KeyboardInterrupt

View File

@ -26,3 +26,23 @@ all_bullets = true
directory = "trivial"
name = "Trivial/Internal Changes"
showcontent = true
[tool.pytest.ini_options]
minversion = '6.0'
testpaths = [
'tests'
]
addopts = [
# TODO: figure out why this isn't working..
'--rootdir=./tests',
'--import-mode=importlib',
# don't show frickin captured logs AGAIN in the report..
'--show-capture=no',
]
log_cli = false
# TODO: maybe some of these layout choices?
# https://docs.pytest.org/en/8.0.x/explanation/goodpractices.html#choosing-a-test-layout-import-rules
# pythonpath = "src"

View File

@ -1,8 +0,0 @@
# vim: ft=conf
# pytest.ini for tractor
[pytest]
# don't show frickin captured logs AGAIN in the report..
addopts = --show-capture='no'
log_cli = false
; minversion = 6.0

View File

@ -1,105 +1,25 @@
"""
``tractor`` testing!!
"""
from contextlib import asynccontextmanager as acm
import sys
import subprocess
import os
import random
import signal
import platform
import pathlib
import time
import inspect
from functools import partial, wraps
import pytest
import trio
import tractor
from tractor._testing import (
examples_dir as examples_dir,
tractor_test as tractor_test,
expect_ctxc as expect_ctxc,
)
# TODO: include wtv plugin(s) we build in `._testing.pytest`?
pytest_plugins = ['pytester']
def tractor_test(fn):
"""
Use:
@tractor_test
async def test_whatever():
await ...
If fixtures:
- ``reg_addr`` (a socket addr tuple where arbiter is listening)
- ``loglevel`` (logging level passed to tractor internals)
- ``start_method`` (subprocess spawning backend)
are defined in the `pytest` fixture space they will be automatically
injected to tests declaring these funcargs.
"""
@wraps(fn)
def wrapper(
*args,
loglevel=None,
reg_addr=None,
start_method: str|None = None,
debug_mode: bool = False,
**kwargs
):
# __tracebackhide__ = True
# NOTE: inject ant test func declared fixture
# names by manually checking!
if 'reg_addr' in inspect.signature(fn).parameters:
# injects test suite fixture value to test as well
# as `run()`
kwargs['reg_addr'] = reg_addr
if 'loglevel' in inspect.signature(fn).parameters:
# allows test suites to define a 'loglevel' fixture
# that activates the internal logging
kwargs['loglevel'] = loglevel
if start_method is None:
if platform.system() == "Windows":
start_method = 'trio'
if 'start_method' in inspect.signature(fn).parameters:
# set of subprocess spawning backends
kwargs['start_method'] = start_method
if 'debug_mode' in inspect.signature(fn).parameters:
# set of subprocess spawning backends
kwargs['debug_mode'] = debug_mode
if kwargs:
# use explicit root actor start
async def _main():
async with tractor.open_root_actor(
# **kwargs,
registry_addrs=[reg_addr] if reg_addr else None,
loglevel=loglevel,
start_method=start_method,
# TODO: only enable when pytest is passed --pdb
debug_mode=debug_mode,
):
await fn(*args, **kwargs)
main = _main
else:
# use implicit root actor start
main = partial(fn, *args, **kwargs)
return trio.run(main)
return wrapper
# Sending signal.SIGINT on subprocess fails on windows. Use CTRL_* alternatives
if platform.system() == 'Windows':
_KILL_SIGNAL = signal.CTRL_BREAK_EVENT
@ -119,23 +39,6 @@ no_windows = pytest.mark.skipif(
)
def repodir() -> pathlib.Path:
'''
Return the abspath to the repo directory.
'''
# 2 parents up to step up through tests/<repo_dir>
return pathlib.Path(__file__).parent.parent.absolute()
def examples_dir() -> pathlib.Path:
'''
Return the abspath to the examples directory as `pathlib.Path`.
'''
return repodir() / 'examples'
def pytest_addoption(parser):
parser.addoption(
"--ll",
@ -194,11 +97,18 @@ _ci_env: bool = os.environ.get('CI', False)
@pytest.fixture(scope='session')
def ci_env() -> bool:
"""Detect CI envoirment.
"""
'''
Detect CI envoirment.
'''
return _ci_env
# TODO: also move this to `._testing` for now?
# -[ ] possibly generalize and re-use for multi-tree spawning
# along with the new stuff for multi-addrs in distribute_dis
# branch?
#
# choose randomly at import time
_reg_addr: tuple[str, int] = (
'127.0.0.1',
@ -252,6 +162,7 @@ def sig_prog(proc, sig):
assert ret
# TODO: factor into @cm and move to `._testing`?
@pytest.fixture
def daemon(
loglevel: str,
@ -293,26 +204,3 @@ def daemon(
time.sleep(_PROC_SPAWN_WAIT)
yield proc
sig_prog(proc, _INT_SIGNAL)
@acm
async def expect_ctxc(
yay: bool,
reraise: bool = False,
) -> None:
'''
Small acm to catch `ContextCancelled` errors when expected
below it in a `async with ()` block.
'''
if yay:
try:
yield
raise RuntimeError('Never raised ctxc?')
except tractor.ContextCancelled:
if reraise:
raise
else:
return
else:
yield

View File

@ -3,24 +3,28 @@ Sketchy network blackoutz, ugly byzantine gens, puedes eschuchar la
cancelacion?..
'''
import itertools
from functools import partial
from types import ModuleType
import pytest
from _pytest.pathlib import import_path
import trio
import tractor
from conftest import (
from tractor._testing import (
examples_dir,
)
@pytest.mark.parametrize(
'debug_mode',
[False, True],
'pre_aclose_msgstream',
[
False,
True,
],
ids=[
'no_debug_mode',
'debug_mode',
'no_msgstream_aclose',
'pre_aclose_msgstream',
],
)
@pytest.mark.parametrize(
@ -66,8 +70,10 @@ from conftest import (
)
def test_ipc_channel_break_during_stream(
debug_mode: bool,
loglevel: str,
spawn_backend: str,
ipc_break: dict | None,
ipc_break: dict|None,
pre_aclose_msgstream: bool,
):
'''
Ensure we can have an IPC channel break its connection during
@ -79,77 +85,123 @@ def test_ipc_channel_break_during_stream(
'''
if spawn_backend != 'trio':
if debug_mode:
pytest.skip('`debug_mode` only supported on `trio` spawner')
# if debug_mode:
# pytest.skip('`debug_mode` only supported on `trio` spawner')
# non-`trio` spawners should never hit the hang condition that
# requires the user to do ctl-c to cancel the actor tree.
expect_final_exc = trio.ClosedResourceError
mod = import_path(
mod: ModuleType = import_path(
examples_dir() / 'advanced_faults' / 'ipc_failure_during_stream.py',
root=examples_dir(),
)
expect_final_exc = KeyboardInterrupt
# when ONLY the child breaks we expect the parent to get a closed
# resource error on the next `MsgStream.receive()` and then fail out
# and cancel the child from there.
# by def we expect KBI from user after a simulated "hang
# period" wherein the user eventually hits ctl-c to kill the
# root-actor tree.
expect_final_exc: BaseException = KeyboardInterrupt
if (
# only child breaks
(
ipc_break['break_child_ipc_after']
and ipc_break['break_parent_ipc_after'] is False
)
# both break but, parent breaks first
or (
ipc_break['break_child_ipc_after'] is not False
and (
ipc_break['break_parent_ipc_after']
> ipc_break['break_child_ipc_after']
)
)
):
expect_final_exc = trio.ClosedResourceError
# when the parent IPC side dies (even if the child's does as well
# but the child fails BEFORE the parent) we expect the channel to be
# sent a stop msg from the child at some point which will signal the
# parent that the stream has been terminated.
# NOTE: when the parent breaks "after" the child you get this same
# case as well, the child breaks the IPC channel with a stop msg
# before any closure takes place.
elif (
# only parent breaks
(
ipc_break['break_parent_ipc_after']
and ipc_break['break_child_ipc_after'] is False
)
# both break but, child breaks first
or (
ipc_break['break_parent_ipc_after'] is not False
and (
ipc_break['break_child_ipc_after']
> ipc_break['break_parent_ipc_after']
)
)
# only expect EoC if trans is broken on the child side,
ipc_break['break_child_ipc_after'] is not False
# AND we tell the child to call `MsgStream.aclose()`.
and pre_aclose_msgstream
):
expect_final_exc = trio.EndOfChannel
with pytest.raises(expect_final_exc):
trio.run(
partial(
mod.main,
debug_mode=debug_mode,
start_method=spawn_backend,
**ipc_break,
# NOTE when ONLY the child breaks or it breaks BEFORE the
# parent we expect the parent to get a closed resource error
# on the next `MsgStream.receive()` and then fail out and
# cancel the child from there.
#
# ONLY CHILD breaks
if (
ipc_break['break_child_ipc_after']
and
ipc_break['break_parent_ipc_after'] is False
):
expect_final_exc = trio.ClosedResourceError
# if child calls `MsgStream.aclose()` then expect EoC.
if pre_aclose_msgstream:
expect_final_exc = trio.EndOfChannel
# BOTH but, CHILD breaks FIRST
elif (
ipc_break['break_child_ipc_after'] is not False
and (
ipc_break['break_parent_ipc_after']
> ipc_break['break_child_ipc_after']
)
):
expect_final_exc = trio.ClosedResourceError
# child will send a 'stop' msg before it breaks
# the transport channel.
if pre_aclose_msgstream:
expect_final_exc = trio.EndOfChannel
# NOTE when the parent IPC side dies (even if the child's does as well
# but the child fails BEFORE the parent) we always expect the
# IPC layer to raise a closed-resource, NEVER do we expect
# a stop msg since the parent-side ctx apis will error out
# IMMEDIATELY before the child ever sends any 'stop' msg.
#
# ONLY PARENT breaks
elif (
ipc_break['break_parent_ipc_after']
and
ipc_break['break_child_ipc_after'] is False
):
expect_final_exc = trio.ClosedResourceError
# BOTH but, PARENT breaks FIRST
elif (
ipc_break['break_parent_ipc_after'] is not False
and (
ipc_break['break_child_ipc_after']
> ipc_break['break_parent_ipc_after']
)
):
expect_final_exc = trio.ClosedResourceError
with pytest.raises(
expected_exception=(
expect_final_exc,
ExceptionGroup,
),
) as excinfo:
try:
trio.run(
partial(
mod.main,
debug_mode=debug_mode,
start_method=spawn_backend,
loglevel=loglevel,
pre_close=pre_aclose_msgstream,
**ipc_break,
)
)
except KeyboardInterrupt as kbi:
_err = kbi
if expect_final_exc is not KeyboardInterrupt:
pytest.fail(
'Rxed unexpected KBI !?\n'
f'{repr(kbi)}'
)
raise
# get raw instance from pytest wrapper
value = excinfo.value
if isinstance(value, ExceptionGroup):
value = next(
itertools.dropwhile(
lambda exc: not isinstance(exc, expect_final_exc),
value.exceptions,
)
)
assert value
@tractor.context

View File

@ -15,8 +15,10 @@ from exceptiongroup import (
import pytest
import trio
import tractor
from conftest import tractor_test, no_windows
from tractor._testing import (
tractor_test,
)
from conftest import no_windows
def is_win():

View File

@ -5,9 +5,7 @@ import trio
import tractor
from tractor import open_actor_cluster
from tractor.trionics import gather_contexts
from conftest import tractor_test
from tractor._testing import tractor_test
MESSAGE = 'tractoring at full speed'

View File

@ -25,7 +25,7 @@ from tractor._exceptions import (
ContextCancelled,
)
from conftest import (
from tractor._testing import (
tractor_test,
expect_ctxc,
)

View File

@ -30,8 +30,10 @@ from tractor.devx._debug import (
_pause_msg,
_crash_msg,
)
from conftest import (
from tractor._testing import (
examples_dir,
)
from conftest import (
_ci_env,
)

View File

@ -9,10 +9,9 @@ import itertools
import pytest
import tractor
from tractor._testing import tractor_test
import trio
from conftest import tractor_test
@tractor_test
async def test_reg_then_unreg(reg_addr):

View File

@ -11,8 +11,7 @@ import platform
import shutil
import pytest
from conftest import (
from tractor._testing import (
examples_dir,
)

View File

@ -18,8 +18,7 @@ from tractor import (
ContextCancelled,
)
from tractor.trionics import BroadcastReceiver
from conftest import expect_ctxc
from tractor._testing import expect_ctxc
async def sleep_and_err(

View File

@ -9,7 +9,7 @@ import trio
import tractor
import pytest
from conftest import tractor_test
from tractor._testing import tractor_test
def test_must_define_ctx():

View File

@ -7,7 +7,7 @@ import pytest
import trio
import tractor
from conftest import tractor_test
from tractor._testing import tractor_test
@pytest.mark.trio

View File

@ -7,8 +7,10 @@ import time
import pytest
import trio
import tractor
from conftest import (
from tractor._testing import (
tractor_test,
)
from conftest import (
sig_prog,
_INT_SIGNAL,
_INT_RETURN_CODE,

View File

@ -5,8 +5,7 @@ import pytest
import trio
import tractor
from tractor.experimental import msgpub
from conftest import tractor_test
from tractor._testing import tractor_test
def test_type_checks():

View File

@ -8,7 +8,7 @@ import pytest
import trio
import tractor
from conftest import tractor_test
from tractor._testing import tractor_test
_file_path: str = ''

View File

@ -8,7 +8,7 @@ import pytest
import trio
import tractor
from conftest import tractor_test
from tractor._testing import tractor_test
data_to_pass_down = {'doggy': 10, 'kitty': 4}

View File

@ -0,0 +1,74 @@
# tractor: structured concurrent "actors".
# Copyright 2018-eternity Tyler Goodlet.
# This program is free software: you can redistribute it and/or modify
# it under the terms of the GNU Affero General Public License as published by
# the Free Software Foundation, either version 3 of the License, or
# (at your option) any later version.
# This program is distributed in the hope that it will be useful,
# but WITHOUT ANY WARRANTY; without even the implied warranty of
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
# GNU Affero General Public License for more details.
# You should have received a copy of the GNU Affero General Public License
# along with this program. If not, see <https://www.gnu.org/licenses/>.
'''
Various helpers/utils for auditing your `tractor` app and/or the
core runtime.
'''
from contextlib import asynccontextmanager as acm
import pathlib
import tractor
from .pytest import (
tractor_test as tractor_test
)
def repodir() -> pathlib.Path:
'''
Return the abspath to the repo directory.
'''
# 2 parents up to step up through tests/<repo_dir>
return pathlib.Path(
__file__
# 3 .parents bc:
# <._testing-pkg>.<tractor-pkg>.<git-repo-dir>
# /$HOME/../<tractor-repo-dir>/tractor/_testing/__init__.py
).parent.parent.parent.absolute()
def examples_dir() -> pathlib.Path:
'''
Return the abspath to the examples directory as `pathlib.Path`.
'''
return repodir() / 'examples'
@acm
async def expect_ctxc(
yay: bool,
reraise: bool = False,
) -> None:
'''
Small acm to catch `ContextCancelled` errors when expected
below it in a `async with ()` block.
'''
if yay:
try:
yield
raise RuntimeError('Never raised ctxc?')
except tractor.ContextCancelled:
if reraise:
raise
else:
return
else:
yield

View File

@ -0,0 +1,113 @@
# tractor: structured concurrent "actors".
# Copyright 2018-eternity Tyler Goodlet.
# This program is free software: you can redistribute it and/or modify
# it under the terms of the GNU Affero General Public License as published by
# the Free Software Foundation, either version 3 of the License, or
# (at your option) any later version.
# This program is distributed in the hope that it will be useful,
# but WITHOUT ANY WARRANTY; without even the implied warranty of
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
# GNU Affero General Public License for more details.
# You should have received a copy of the GNU Affero General Public License
# along with this program. If not, see <https://www.gnu.org/licenses/>.
'''
`pytest` utils helpers and plugins for testing `tractor`'s runtime
and applications.
'''
from functools import (
partial,
wraps,
)
import inspect
import platform
import tractor
import trio
def tractor_test(fn):
'''
Decorator for async test funcs to present them as "native"
looking sync funcs runnable by `pytest` using `trio.run()`.
Use:
@tractor_test
async def test_whatever():
await ...
If fixtures:
- ``reg_addr`` (a socket addr tuple where arbiter is listening)
- ``loglevel`` (logging level passed to tractor internals)
- ``start_method`` (subprocess spawning backend)
are defined in the `pytest` fixture space they will be automatically
injected to tests declaring these funcargs.
'''
@wraps(fn)
def wrapper(
*args,
loglevel=None,
reg_addr=None,
start_method: str|None = None,
debug_mode: bool = False,
**kwargs
):
# __tracebackhide__ = True
# NOTE: inject ant test func declared fixture
# names by manually checking!
if 'reg_addr' in inspect.signature(fn).parameters:
# injects test suite fixture value to test as well
# as `run()`
kwargs['reg_addr'] = reg_addr
if 'loglevel' in inspect.signature(fn).parameters:
# allows test suites to define a 'loglevel' fixture
# that activates the internal logging
kwargs['loglevel'] = loglevel
if start_method is None:
if platform.system() == "Windows":
start_method = 'trio'
if 'start_method' in inspect.signature(fn).parameters:
# set of subprocess spawning backends
kwargs['start_method'] = start_method
if 'debug_mode' in inspect.signature(fn).parameters:
# set of subprocess spawning backends
kwargs['debug_mode'] = debug_mode
if kwargs:
# use explicit root actor start
async def _main():
async with tractor.open_root_actor(
# **kwargs,
registry_addrs=[reg_addr] if reg_addr else None,
loglevel=loglevel,
start_method=start_method,
# TODO: only enable when pytest is passed --pdb
debug_mode=debug_mode,
):
await fn(*args, **kwargs)
main = _main
else:
# use implicit root actor start
main = partial(fn, *args, **kwargs)
return trio.run(main)
return wrapper