diff --git a/examples/advanced_faults/ipc_failure_during_stream.py b/examples/advanced_faults/ipc_failure_during_stream.py index 6728b8d..c7322a7 100644 --- a/examples/advanced_faults/ipc_failure_during_stream.py +++ b/examples/advanced_faults/ipc_failure_during_stream.py @@ -6,47 +6,120 @@ been an outage) and we want to ensure that despite being in debug mode actor tree will eventually be cancelled without leaving any zombies. ''' -import trio +from functools import partial + from tractor import ( open_nursery, context, Context, + ContextCancelled, MsgStream, + _testing, ) +import trio -async def break_channel_silently_then_error( +async def break_ipc( stream: MsgStream, + method: str|None = None, + pre_close: bool = False, + + def_method: str = 'eof', + +) -> None: + ''' + XXX: close the channel right after an error is raised + purposely breaking the IPC transport to make sure the parent + doesn't get stuck in debug or hang on the connection join. + this more or less simulates an infinite msg-receive hang on + the other end. + + ''' + # close channel via IPC prot msging before + # any transport breakage + if pre_close: + await stream.aclose() + + method: str = method or def_method + + match method: + case 'trans_aclose': + await stream._ctx.chan.transport.stream.aclose() + + case 'eof': + await stream._ctx.chan.transport.stream.send_eof() + + case 'msg': + await stream._ctx.chan.send(None) + + # TODO: the actual real-world simulated cases like + # transport layer hangs and/or lower layer 2-gens type + # scenarios.. + # + # -[ ] already have some issues for this general testing + # area: + # - https://github.com/goodboy/tractor/issues/97 + # - https://github.com/goodboy/tractor/issues/124 + # - PR from @guille: + # https://github.com/goodboy/tractor/pull/149 + # case 'hang': + # TODO: framework research: + # + # - https://github.com/GuoTengda1993/pynetem + # - https://github.com/shopify/toxiproxy + # - https://manpages.ubuntu.com/manpages/trusty/man1/wirefilter.1.html + + case _: + raise RuntimeError( + f'IPC break method unsupported: {method}' + ) + + +async def break_ipc_then_error( + stream: MsgStream, + break_ipc_with: str|None = None, + pre_close: bool = False, ): async for msg in stream: await stream.send(msg) - - # XXX: close the channel right after an error is raised - # purposely breaking the IPC transport to make sure the parent - # doesn't get stuck in debug or hang on the connection join. - # this more or less simulates an infinite msg-receive hang on - # the other end. - await stream._ctx.chan.send(None) + await break_ipc( + stream=stream, + method=break_ipc_with, + pre_close=pre_close, + ) assert 0 -async def close_stream_and_error( +# async def close_stream_and_error( +async def iter_ipc_stream( stream: MsgStream, + break_ipc_with: str|None = None, + pre_close: bool = False, ): async for msg in stream: await stream.send(msg) # wipe out channel right before raising - await stream._ctx.chan.send(None) - await stream.aclose() - assert 0 + # await break_ipc( + # stream=stream, + # method=break_ipc_with, + # pre_close=pre_close, + # ) + + # send channel close msg at SC-prot level + # + # TODO: what should get raised here if anything? + # await stream.aclose() + + # assert 0 @context async def recv_and_spawn_net_killers( ctx: Context, - break_ipc_after: bool | int = False, + break_ipc_after: bool|int = False, + pre_close: bool = False, ) -> None: ''' @@ -63,27 +136,42 @@ async def recv_and_spawn_net_killers( await stream.send(i) if ( break_ipc_after - and i > break_ipc_after + and + i > break_ipc_after ): '#################################\n' - 'Simulating child-side IPC BREAK!\n' - '#################################' - n.start_soon(break_channel_silently_then_error, stream) - n.start_soon(close_stream_and_error, stream) + 'Simulating CHILD-side IPC BREAK!\n' + '#################################\n' + n.start_soon( + partial( + break_ipc_then_error, + stream=stream, + pre_close=pre_close, + ) + ) + n.start_soon( + iter_ipc_stream, + stream, + ) async def main( debug_mode: bool = False, start_method: str = 'trio', + loglevel: str = 'cancel', # by default we break the parent IPC first (if configured to break # at all), but this can be changed so the child does first (even if # both are set to break). - break_parent_ipc_after: int | bool = False, - break_child_ipc_after: int | bool = False, + break_parent_ipc_after: int|bool = False, + break_child_ipc_after: int|bool = False, + pre_close: bool = False, ) -> None: + # from tractor._state import _runtime_vars as rtv + # rtv['_debug_mode'] = debug_mode + async with ( open_nursery( start_method=start_method, @@ -91,57 +179,107 @@ async def main( # NOTE: even debugger is used we shouldn't get # a hang since it never engages due to broken IPC debug_mode=debug_mode, - loglevel='warning', + loglevel=loglevel, ) as an, ): + sub_name: str = 'chitty_hijo' portal = await an.start_actor( - 'chitty_hijo', + sub_name, enable_modules=[__name__], ) - async with portal.open_context( - recv_and_spawn_net_killers, - break_ipc_after=break_child_ipc_after, - - ) as (ctx, sent): + async with ( + _testing.expect_ctxc( + yay=( + break_parent_ipc_after + or break_child_ipc_after, + ), + # TODO: we CAN'T remove this right? + # since we need the ctxc to bubble up from either + # the stream API after the `None` msg is sent + # (which actually implicitly cancels all remote + # tasks in the hijo) or from simluated + # KBI-mash-from-user + # or should we expect that a KBI triggers the ctxc + # and KBI in an eg? + reraise=True, + ), + portal.open_context( + recv_and_spawn_net_killers, + break_ipc_after=break_child_ipc_after, + pre_close=pre_close, + ) as (ctx, sent), + ): + ipc_break_sent: bool = False async with ctx.open_stream() as stream: for i in range(1000): if ( break_parent_ipc_after - and i > break_parent_ipc_after + and + i > break_parent_ipc_after + and + not ipc_break_sent ): print( '#################################\n' - 'Simulating parent-side IPC BREAK!\n' - '#################################' + 'Simulating PARENT-side IPC BREAK!\n' + '#################################\n' ) - await stream._ctx.chan.send(None) + + # await stream._ctx.chan.send(None) + # await stream._ctx.chan.transport.stream.send_eof() + await stream._ctx.chan.transport.stream.aclose() + + ipc_break_sent = True # it actually breaks right here in the # mp_spawn/forkserver backends and thus the zombie # reaper never even kicks in? print(f'parent sending {i}') - await stream.send(i) + try: + await stream.send(i) + except ContextCancelled as ctxc: + print( + 'parent received ctxc on `stream.send()`\n' + f'{ctxc}\n' + ) + assert 'root' in ctxc.canceller + assert sub_name in ctx.canceller - with trio.move_on_after(2) as cs: + # TODO: is this needed or no? + raise + + timeout: int = 1 + print(f'Entering `stream.receive()` with timeout={timeout}\n') + with trio.move_on_after(timeout) as cs: # NOTE: in the parent side IPC failure case this # will raise an ``EndOfChannel`` after the child # is killed and sends a stop msg back to it's # caller/this-parent. - rx = await stream.receive() - - print(f"I'm a happy user and echoed to me is {rx}") + try: + rx = await stream.receive() + print( + "I'm a happy PARENT user and echoed to me is\n" + f'{rx}\n' + ) + except trio.EndOfChannel: + print('MsgStream got EoC for PARENT') + raise if cs.cancelled_caught: # pretend to be a user seeing no streaming action # thinking it's a hang, and then hitting ctl-c.. - print("YOO i'm a user anddd thingz hangin..") + print( + f"YOO i'm a PARENT user anddd thingz hangin..\n" + f'after timeout={timeout}\n' + ) print( - "YOO i'm mad send side dun but thingz hangin..\n" + "YOO i'm mad!\n" + 'The send side is dun but thingz hangin..\n' 'MASHING CTlR-C Ctl-c..' ) raise KeyboardInterrupt diff --git a/pyproject.toml b/pyproject.toml index e52aa47..8463380 100644 --- a/pyproject.toml +++ b/pyproject.toml @@ -26,3 +26,23 @@ all_bullets = true directory = "trivial" name = "Trivial/Internal Changes" showcontent = true + + +[tool.pytest.ini_options] +minversion = '6.0' +testpaths = [ + 'tests' +] +addopts = [ + # TODO: figure out why this isn't working.. + '--rootdir=./tests', + + '--import-mode=importlib', + # don't show frickin captured logs AGAIN in the report.. + '--show-capture=no', +] +log_cli = false + +# TODO: maybe some of these layout choices? +# https://docs.pytest.org/en/8.0.x/explanation/goodpractices.html#choosing-a-test-layout-import-rules +# pythonpath = "src" diff --git a/pytest.ini b/pytest.ini deleted file mode 100644 index b252722..0000000 --- a/pytest.ini +++ /dev/null @@ -1,8 +0,0 @@ -# vim: ft=conf -# pytest.ini for tractor - -[pytest] -# don't show frickin captured logs AGAIN in the report.. -addopts = --show-capture='no' -log_cli = false -; minversion = 6.0 diff --git a/tests/conftest.py b/tests/conftest.py index fb82a55..5ce8442 100644 --- a/tests/conftest.py +++ b/tests/conftest.py @@ -1,105 +1,25 @@ """ ``tractor`` testing!! """ -from contextlib import asynccontextmanager as acm import sys import subprocess import os import random import signal import platform -import pathlib import time -import inspect -from functools import partial, wraps import pytest -import trio import tractor +from tractor._testing import ( + examples_dir as examples_dir, + tractor_test as tractor_test, + expect_ctxc as expect_ctxc, +) +# TODO: include wtv plugin(s) we build in `._testing.pytest`? pytest_plugins = ['pytester'] - -def tractor_test(fn): - """ - Use: - - @tractor_test - async def test_whatever(): - await ... - - If fixtures: - - - ``reg_addr`` (a socket addr tuple where arbiter is listening) - - ``loglevel`` (logging level passed to tractor internals) - - ``start_method`` (subprocess spawning backend) - - are defined in the `pytest` fixture space they will be automatically - injected to tests declaring these funcargs. - """ - @wraps(fn) - def wrapper( - *args, - loglevel=None, - reg_addr=None, - start_method: str|None = None, - debug_mode: bool = False, - **kwargs - ): - # __tracebackhide__ = True - - # NOTE: inject ant test func declared fixture - # names by manually checking! - if 'reg_addr' in inspect.signature(fn).parameters: - # injects test suite fixture value to test as well - # as `run()` - kwargs['reg_addr'] = reg_addr - - if 'loglevel' in inspect.signature(fn).parameters: - # allows test suites to define a 'loglevel' fixture - # that activates the internal logging - kwargs['loglevel'] = loglevel - - if start_method is None: - if platform.system() == "Windows": - start_method = 'trio' - - if 'start_method' in inspect.signature(fn).parameters: - # set of subprocess spawning backends - kwargs['start_method'] = start_method - - if 'debug_mode' in inspect.signature(fn).parameters: - # set of subprocess spawning backends - kwargs['debug_mode'] = debug_mode - - - if kwargs: - - # use explicit root actor start - async def _main(): - async with tractor.open_root_actor( - # **kwargs, - registry_addrs=[reg_addr] if reg_addr else None, - loglevel=loglevel, - start_method=start_method, - - # TODO: only enable when pytest is passed --pdb - debug_mode=debug_mode, - - ): - await fn(*args, **kwargs) - - main = _main - - else: - # use implicit root actor start - main = partial(fn, *args, **kwargs) - - return trio.run(main) - - return wrapper - - # Sending signal.SIGINT on subprocess fails on windows. Use CTRL_* alternatives if platform.system() == 'Windows': _KILL_SIGNAL = signal.CTRL_BREAK_EVENT @@ -119,23 +39,6 @@ no_windows = pytest.mark.skipif( ) -def repodir() -> pathlib.Path: - ''' - Return the abspath to the repo directory. - - ''' - # 2 parents up to step up through tests/ - return pathlib.Path(__file__).parent.parent.absolute() - - -def examples_dir() -> pathlib.Path: - ''' - Return the abspath to the examples directory as `pathlib.Path`. - - ''' - return repodir() / 'examples' - - def pytest_addoption(parser): parser.addoption( "--ll", @@ -194,11 +97,18 @@ _ci_env: bool = os.environ.get('CI', False) @pytest.fixture(scope='session') def ci_env() -> bool: - """Detect CI envoirment. - """ + ''' + Detect CI envoirment. + + ''' return _ci_env +# TODO: also move this to `._testing` for now? +# -[ ] possibly generalize and re-use for multi-tree spawning +# along with the new stuff for multi-addrs in distribute_dis +# branch? +# # choose randomly at import time _reg_addr: tuple[str, int] = ( '127.0.0.1', @@ -252,6 +162,7 @@ def sig_prog(proc, sig): assert ret +# TODO: factor into @cm and move to `._testing`? @pytest.fixture def daemon( loglevel: str, @@ -293,26 +204,3 @@ def daemon( time.sleep(_PROC_SPAWN_WAIT) yield proc sig_prog(proc, _INT_SIGNAL) - - -@acm -async def expect_ctxc( - yay: bool, - reraise: bool = False, -) -> None: - ''' - Small acm to catch `ContextCancelled` errors when expected - below it in a `async with ()` block. - - ''' - if yay: - try: - yield - raise RuntimeError('Never raised ctxc?') - except tractor.ContextCancelled: - if reraise: - raise - else: - return - else: - yield diff --git a/tests/test_advanced_faults.py b/tests/test_advanced_faults.py index f34738b..8b73b4c 100644 --- a/tests/test_advanced_faults.py +++ b/tests/test_advanced_faults.py @@ -3,24 +3,28 @@ Sketchy network blackoutz, ugly byzantine gens, puedes eschuchar la cancelacion?.. ''' +import itertools from functools import partial +from types import ModuleType import pytest from _pytest.pathlib import import_path import trio import tractor - -from conftest import ( +from tractor._testing import ( examples_dir, ) @pytest.mark.parametrize( - 'debug_mode', - [False, True], + 'pre_aclose_msgstream', + [ + False, + True, + ], ids=[ - 'no_debug_mode', - 'debug_mode', + 'no_msgstream_aclose', + 'pre_aclose_msgstream', ], ) @pytest.mark.parametrize( @@ -66,8 +70,10 @@ from conftest import ( ) def test_ipc_channel_break_during_stream( debug_mode: bool, + loglevel: str, spawn_backend: str, - ipc_break: dict | None, + ipc_break: dict|None, + pre_aclose_msgstream: bool, ): ''' Ensure we can have an IPC channel break its connection during @@ -79,77 +85,123 @@ def test_ipc_channel_break_during_stream( ''' if spawn_backend != 'trio': - if debug_mode: - pytest.skip('`debug_mode` only supported on `trio` spawner') + # if debug_mode: + # pytest.skip('`debug_mode` only supported on `trio` spawner') # non-`trio` spawners should never hit the hang condition that # requires the user to do ctl-c to cancel the actor tree. expect_final_exc = trio.ClosedResourceError - mod = import_path( + mod: ModuleType = import_path( examples_dir() / 'advanced_faults' / 'ipc_failure_during_stream.py', root=examples_dir(), ) - expect_final_exc = KeyboardInterrupt - - # when ONLY the child breaks we expect the parent to get a closed - # resource error on the next `MsgStream.receive()` and then fail out - # and cancel the child from there. + # by def we expect KBI from user after a simulated "hang + # period" wherein the user eventually hits ctl-c to kill the + # root-actor tree. + expect_final_exc: BaseException = KeyboardInterrupt if ( - - # only child breaks - ( - ipc_break['break_child_ipc_after'] - and ipc_break['break_parent_ipc_after'] is False - ) - - # both break but, parent breaks first - or ( - ipc_break['break_child_ipc_after'] is not False - and ( - ipc_break['break_parent_ipc_after'] - > ipc_break['break_child_ipc_after'] - ) - ) - - ): - expect_final_exc = trio.ClosedResourceError - - # when the parent IPC side dies (even if the child's does as well - # but the child fails BEFORE the parent) we expect the channel to be - # sent a stop msg from the child at some point which will signal the - # parent that the stream has been terminated. - # NOTE: when the parent breaks "after" the child you get this same - # case as well, the child breaks the IPC channel with a stop msg - # before any closure takes place. - elif ( - # only parent breaks - ( - ipc_break['break_parent_ipc_after'] - and ipc_break['break_child_ipc_after'] is False - ) - - # both break but, child breaks first - or ( - ipc_break['break_parent_ipc_after'] is not False - and ( - ipc_break['break_child_ipc_after'] - > ipc_break['break_parent_ipc_after'] - ) - ) + # only expect EoC if trans is broken on the child side, + ipc_break['break_child_ipc_after'] is not False + # AND we tell the child to call `MsgStream.aclose()`. + and pre_aclose_msgstream ): expect_final_exc = trio.EndOfChannel - with pytest.raises(expect_final_exc): - trio.run( - partial( - mod.main, - debug_mode=debug_mode, - start_method=spawn_backend, - **ipc_break, + # NOTE when ONLY the child breaks or it breaks BEFORE the + # parent we expect the parent to get a closed resource error + # on the next `MsgStream.receive()` and then fail out and + # cancel the child from there. + # + # ONLY CHILD breaks + if ( + ipc_break['break_child_ipc_after'] + and + ipc_break['break_parent_ipc_after'] is False + ): + expect_final_exc = trio.ClosedResourceError + + # if child calls `MsgStream.aclose()` then expect EoC. + if pre_aclose_msgstream: + expect_final_exc = trio.EndOfChannel + + # BOTH but, CHILD breaks FIRST + elif ( + ipc_break['break_child_ipc_after'] is not False + and ( + ipc_break['break_parent_ipc_after'] + > ipc_break['break_child_ipc_after'] + ) + ): + expect_final_exc = trio.ClosedResourceError + + # child will send a 'stop' msg before it breaks + # the transport channel. + if pre_aclose_msgstream: + expect_final_exc = trio.EndOfChannel + + # NOTE when the parent IPC side dies (even if the child's does as well + # but the child fails BEFORE the parent) we always expect the + # IPC layer to raise a closed-resource, NEVER do we expect + # a stop msg since the parent-side ctx apis will error out + # IMMEDIATELY before the child ever sends any 'stop' msg. + # + # ONLY PARENT breaks + elif ( + ipc_break['break_parent_ipc_after'] + and + ipc_break['break_child_ipc_after'] is False + ): + expect_final_exc = trio.ClosedResourceError + + # BOTH but, PARENT breaks FIRST + elif ( + ipc_break['break_parent_ipc_after'] is not False + and ( + ipc_break['break_child_ipc_after'] + > ipc_break['break_parent_ipc_after'] + ) + ): + expect_final_exc = trio.ClosedResourceError + + with pytest.raises( + expected_exception=( + expect_final_exc, + ExceptionGroup, + ), + ) as excinfo: + try: + trio.run( + partial( + mod.main, + debug_mode=debug_mode, + start_method=spawn_backend, + loglevel=loglevel, + pre_close=pre_aclose_msgstream, + **ipc_break, + ) + ) + except KeyboardInterrupt as kbi: + _err = kbi + if expect_final_exc is not KeyboardInterrupt: + pytest.fail( + 'Rxed unexpected KBI !?\n' + f'{repr(kbi)}' + ) + + raise + + # get raw instance from pytest wrapper + value = excinfo.value + if isinstance(value, ExceptionGroup): + value = next( + itertools.dropwhile( + lambda exc: not isinstance(exc, expect_final_exc), + value.exceptions, ) ) + assert value @tractor.context diff --git a/tests/test_cancellation.py b/tests/test_cancellation.py index 9a729f3..5b589f6 100644 --- a/tests/test_cancellation.py +++ b/tests/test_cancellation.py @@ -15,8 +15,10 @@ from exceptiongroup import ( import pytest import trio import tractor - -from conftest import tractor_test, no_windows +from tractor._testing import ( + tractor_test, +) +from conftest import no_windows def is_win(): diff --git a/tests/test_clustering.py b/tests/test_clustering.py index 02b1f8f..92362b5 100644 --- a/tests/test_clustering.py +++ b/tests/test_clustering.py @@ -5,9 +5,7 @@ import trio import tractor from tractor import open_actor_cluster from tractor.trionics import gather_contexts - -from conftest import tractor_test - +from tractor._testing import tractor_test MESSAGE = 'tractoring at full speed' diff --git a/tests/test_context_stream_semantics.py b/tests/test_context_stream_semantics.py index d5767ee..42b1f7d 100644 --- a/tests/test_context_stream_semantics.py +++ b/tests/test_context_stream_semantics.py @@ -25,7 +25,7 @@ from tractor._exceptions import ( ContextCancelled, ) -from conftest import ( +from tractor._testing import ( tractor_test, expect_ctxc, ) diff --git a/tests/test_debugger.py b/tests/test_debugger.py index c314ba6..20e67ab 100644 --- a/tests/test_debugger.py +++ b/tests/test_debugger.py @@ -30,8 +30,10 @@ from tractor.devx._debug import ( _pause_msg, _crash_msg, ) -from conftest import ( +from tractor._testing import ( examples_dir, +) +from conftest import ( _ci_env, ) diff --git a/tests/test_discovery.py b/tests/test_discovery.py index 8b47700..cd9dc02 100644 --- a/tests/test_discovery.py +++ b/tests/test_discovery.py @@ -9,10 +9,9 @@ import itertools import pytest import tractor +from tractor._testing import tractor_test import trio -from conftest import tractor_test - @tractor_test async def test_reg_then_unreg(reg_addr): diff --git a/tests/test_docs_examples.py b/tests/test_docs_examples.py index 1eefdb4..79a2200 100644 --- a/tests/test_docs_examples.py +++ b/tests/test_docs_examples.py @@ -11,8 +11,7 @@ import platform import shutil import pytest - -from conftest import ( +from tractor._testing import ( examples_dir, ) diff --git a/tests/test_infected_asyncio.py b/tests/test_infected_asyncio.py index 1ac1fba..a3f96ee 100644 --- a/tests/test_infected_asyncio.py +++ b/tests/test_infected_asyncio.py @@ -18,8 +18,7 @@ from tractor import ( ContextCancelled, ) from tractor.trionics import BroadcastReceiver - -from conftest import expect_ctxc +from tractor._testing import expect_ctxc async def sleep_and_err( diff --git a/tests/test_legacy_one_way_streaming.py b/tests/test_legacy_one_way_streaming.py index 0cbda4d..1e7ec98 100644 --- a/tests/test_legacy_one_way_streaming.py +++ b/tests/test_legacy_one_way_streaming.py @@ -9,7 +9,7 @@ import trio import tractor import pytest -from conftest import tractor_test +from tractor._testing import tractor_test def test_must_define_ctx(): diff --git a/tests/test_local.py b/tests/test_local.py index 009d0d7..a019d77 100644 --- a/tests/test_local.py +++ b/tests/test_local.py @@ -7,7 +7,7 @@ import pytest import trio import tractor -from conftest import tractor_test +from tractor._testing import tractor_test @pytest.mark.trio diff --git a/tests/test_multi_program.py b/tests/test_multi_program.py index d1ee0f5..0b6b5ba 100644 --- a/tests/test_multi_program.py +++ b/tests/test_multi_program.py @@ -7,8 +7,10 @@ import time import pytest import trio import tractor -from conftest import ( +from tractor._testing import ( tractor_test, +) +from conftest import ( sig_prog, _INT_SIGNAL, _INT_RETURN_CODE, diff --git a/tests/test_pubsub.py b/tests/test_pubsub.py index 20554fa..6d416f8 100644 --- a/tests/test_pubsub.py +++ b/tests/test_pubsub.py @@ -5,8 +5,7 @@ import pytest import trio import tractor from tractor.experimental import msgpub - -from conftest import tractor_test +from tractor._testing import tractor_test def test_type_checks(): diff --git a/tests/test_runtime.py b/tests/test_runtime.py index 127138c..3755af1 100644 --- a/tests/test_runtime.py +++ b/tests/test_runtime.py @@ -8,7 +8,7 @@ import pytest import trio import tractor -from conftest import tractor_test +from tractor._testing import tractor_test _file_path: str = '' diff --git a/tests/test_spawning.py b/tests/test_spawning.py index 1a07610..5995ed2 100644 --- a/tests/test_spawning.py +++ b/tests/test_spawning.py @@ -8,7 +8,7 @@ import pytest import trio import tractor -from conftest import tractor_test +from tractor._testing import tractor_test data_to_pass_down = {'doggy': 10, 'kitty': 4} diff --git a/tractor/_testing/__init__.py b/tractor/_testing/__init__.py new file mode 100644 index 0000000..876c87e --- /dev/null +++ b/tractor/_testing/__init__.py @@ -0,0 +1,74 @@ +# tractor: structured concurrent "actors". +# Copyright 2018-eternity Tyler Goodlet. + +# This program is free software: you can redistribute it and/or modify +# it under the terms of the GNU Affero General Public License as published by +# the Free Software Foundation, either version 3 of the License, or +# (at your option) any later version. + +# This program is distributed in the hope that it will be useful, +# but WITHOUT ANY WARRANTY; without even the implied warranty of +# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the +# GNU Affero General Public License for more details. + +# You should have received a copy of the GNU Affero General Public License +# along with this program. If not, see . + +''' +Various helpers/utils for auditing your `tractor` app and/or the +core runtime. + +''' +from contextlib import asynccontextmanager as acm +import pathlib + +import tractor +from .pytest import ( + tractor_test as tractor_test +) + + +def repodir() -> pathlib.Path: + ''' + Return the abspath to the repo directory. + + ''' + # 2 parents up to step up through tests/ + return pathlib.Path( + __file__ + + # 3 .parents bc: + # <._testing-pkg>.. + # /$HOME/..//tractor/_testing/__init__.py + ).parent.parent.parent.absolute() + + +def examples_dir() -> pathlib.Path: + ''' + Return the abspath to the examples directory as `pathlib.Path`. + + ''' + return repodir() / 'examples' + + +@acm +async def expect_ctxc( + yay: bool, + reraise: bool = False, +) -> None: + ''' + Small acm to catch `ContextCancelled` errors when expected + below it in a `async with ()` block. + + ''' + if yay: + try: + yield + raise RuntimeError('Never raised ctxc?') + except tractor.ContextCancelled: + if reraise: + raise + else: + return + else: + yield diff --git a/tractor/_testing/pytest.py b/tractor/_testing/pytest.py new file mode 100644 index 0000000..93eeaf7 --- /dev/null +++ b/tractor/_testing/pytest.py @@ -0,0 +1,113 @@ +# tractor: structured concurrent "actors". +# Copyright 2018-eternity Tyler Goodlet. + +# This program is free software: you can redistribute it and/or modify +# it under the terms of the GNU Affero General Public License as published by +# the Free Software Foundation, either version 3 of the License, or +# (at your option) any later version. + +# This program is distributed in the hope that it will be useful, +# but WITHOUT ANY WARRANTY; without even the implied warranty of +# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the +# GNU Affero General Public License for more details. + +# You should have received a copy of the GNU Affero General Public License +# along with this program. If not, see . + +''' +`pytest` utils helpers and plugins for testing `tractor`'s runtime +and applications. + +''' +from functools import ( + partial, + wraps, +) +import inspect +import platform + +import tractor +import trio + + +def tractor_test(fn): + ''' + Decorator for async test funcs to present them as "native" + looking sync funcs runnable by `pytest` using `trio.run()`. + + Use: + + @tractor_test + async def test_whatever(): + await ... + + If fixtures: + + - ``reg_addr`` (a socket addr tuple where arbiter is listening) + - ``loglevel`` (logging level passed to tractor internals) + - ``start_method`` (subprocess spawning backend) + + are defined in the `pytest` fixture space they will be automatically + injected to tests declaring these funcargs. + ''' + @wraps(fn) + def wrapper( + *args, + loglevel=None, + reg_addr=None, + start_method: str|None = None, + debug_mode: bool = False, + **kwargs + ): + # __tracebackhide__ = True + + # NOTE: inject ant test func declared fixture + # names by manually checking! + if 'reg_addr' in inspect.signature(fn).parameters: + # injects test suite fixture value to test as well + # as `run()` + kwargs['reg_addr'] = reg_addr + + if 'loglevel' in inspect.signature(fn).parameters: + # allows test suites to define a 'loglevel' fixture + # that activates the internal logging + kwargs['loglevel'] = loglevel + + if start_method is None: + if platform.system() == "Windows": + start_method = 'trio' + + if 'start_method' in inspect.signature(fn).parameters: + # set of subprocess spawning backends + kwargs['start_method'] = start_method + + if 'debug_mode' in inspect.signature(fn).parameters: + # set of subprocess spawning backends + kwargs['debug_mode'] = debug_mode + + + if kwargs: + + # use explicit root actor start + async def _main(): + async with tractor.open_root_actor( + # **kwargs, + registry_addrs=[reg_addr] if reg_addr else None, + loglevel=loglevel, + start_method=start_method, + + # TODO: only enable when pytest is passed --pdb + debug_mode=debug_mode, + + ): + await fn(*args, **kwargs) + + main = _main + + else: + # use implicit root actor start + main = partial(fn, *args, **kwargs) + + return trio.run(main) + + return wrapper