diff --git a/docs/README.rst b/docs/README.rst index 630f5398..cea223ee 100644 --- a/docs/README.rst +++ b/docs/README.rst @@ -1,8 +1,5 @@ |logo| ``tractor``: distributed structurred concurrency -|gh_actions| -|docs| - ``tractor`` is a `structured concurrency`_ (SC), multi-processing_ runtime built on trio_. Fundamentally, ``tractor`` provides parallelism via @@ -66,6 +63,13 @@ Features - (WIP) a ``TaskMngr``: one-cancels-one style nursery supervisor. +Status of `main` / infra +------------------------ + +- |gh_actions| +- |docs| + + Install ------- ``tractor`` is still in a *alpha-near-beta-stage* for many diff --git a/examples/advanced_faults/ipc_failure_during_stream.py b/examples/advanced_faults/ipc_failure_during_stream.py index 950d5a6f..f3a709e0 100644 --- a/examples/advanced_faults/ipc_failure_during_stream.py +++ b/examples/advanced_faults/ipc_failure_during_stream.py @@ -120,6 +120,7 @@ async def main( break_parent_ipc_after: int|bool = False, break_child_ipc_after: int|bool = False, pre_close: bool = False, + tpt_proto: str = 'tcp', ) -> None: @@ -131,6 +132,7 @@ async def main( # a hang since it never engages due to broken IPC debug_mode=debug_mode, loglevel=loglevel, + enable_transports=[tpt_proto], ) as an, ): @@ -145,7 +147,8 @@ async def main( _testing.expect_ctxc( yay=( break_parent_ipc_after - or break_child_ipc_after + or + break_child_ipc_after ), # TODO: we CAN'T remove this right? # since we need the ctxc to bubble up from either diff --git a/examples/debugging/root_cancelled_but_child_is_in_tty_lock.py b/examples/debugging/root_cancelled_but_child_is_in_tty_lock.py index 16f92b81..72c6de4c 100644 --- a/examples/debugging/root_cancelled_but_child_is_in_tty_lock.py +++ b/examples/debugging/root_cancelled_but_child_is_in_tty_lock.py @@ -24,10 +24,9 @@ async def spawn_until(depth=0): async def main(): - """The main ``tractor`` routine. - - The process tree should look as approximately as follows when the debugger - first engages: + ''' + The process tree should look as approximately as follows when the + debugger first engages: python examples/debugging/multi_nested_subactors_bp_forever.py ├─ python -m tractor._child --uid ('spawner1', '7eab8462 ...) @@ -37,10 +36,11 @@ async def main(): └─ python -m tractor._child --uid ('spawner0', '1d42012b ...) └─ python -m tractor._child --uid ('name_error', '6c2733b8 ...) - """ + ''' async with tractor.open_nursery( debug_mode=True, - loglevel='warning' + loglevel='devx', + enable_transports=['uds'], ) as n: # spawn both actors diff --git a/examples/debugging/shield_hang_in_sub.py b/examples/debugging/shield_hang_in_sub.py index 5387353f..bf045fe8 100644 --- a/examples/debugging/shield_hang_in_sub.py +++ b/examples/debugging/shield_hang_in_sub.py @@ -37,6 +37,7 @@ async def main( enable_stack_on_sig=True, # maybe_enable_greenback=False, loglevel='devx', + enable_transports=['uds'], ) as an, ): ptl: tractor.Portal = await an.start_actor( diff --git a/tests/conftest.py b/tests/conftest.py index 674767ff..ca175a05 100644 --- a/tests/conftest.py +++ b/tests/conftest.py @@ -1,24 +1,27 @@ """ -``tractor`` testing!! +Top level of the testing suites! + """ +from __future__ import annotations import sys import subprocess import os -import random import signal import platform import time import pytest -import tractor from tractor._testing import ( examples_dir as examples_dir, tractor_test as tractor_test, expect_ctxc as expect_ctxc, ) -# TODO: include wtv plugin(s) we build in `._testing.pytest`? -pytest_plugins = ['pytester'] +pytest_plugins: list[str] = [ + 'pytester', + 'tractor._testing.pytest', +] + # Sending signal.SIGINT on subprocess fails on windows. Use CTRL_* alternatives if platform.system() == 'Windows': @@ -30,7 +33,11 @@ else: _KILL_SIGNAL = signal.SIGKILL _INT_SIGNAL = signal.SIGINT _INT_RETURN_CODE = 1 if sys.version_info < (3, 8) else -signal.SIGINT.value - _PROC_SPAWN_WAIT = 0.6 if sys.version_info < (3, 7) else 0.4 + _PROC_SPAWN_WAIT = ( + 0.6 + if sys.version_info < (3, 7) + else 0.4 + ) no_windows = pytest.mark.skipif( @@ -39,7 +46,12 @@ no_windows = pytest.mark.skipif( ) -def pytest_addoption(parser): +def pytest_addoption( + parser: pytest.Parser, +): + # ?TODO? should this be exposed from our `._testing.pytest` + # plugin or should we make it more explicit with `--tl` for + # tractor logging like we do in other client projects? parser.addoption( "--ll", action="store", @@ -47,42 +59,10 @@ def pytest_addoption(parser): default='ERROR', help="logging level to set when testing" ) - parser.addoption( - "--spawn-backend", - action="store", - dest='spawn_backend', - default='trio', - help="Processing spawning backend to use for test run", - ) - - parser.addoption( - "--tpdb", "--debug-mode", - action="store_true", - dest='tractor_debug_mode', - # default=False, - help=( - 'Enable a flag that can be used by tests to to set the ' - '`debug_mode: bool` for engaging the internal ' - 'multi-proc debugger sys.' - ), - ) - - -def pytest_configure(config): - backend = config.option.spawn_backend - tractor._spawn.try_set_start_method(backend) - - -@pytest.fixture(scope='session') -def debug_mode(request): - debug_mode: bool = request.config.option.tractor_debug_mode - # if debug_mode: - # breakpoint() - return debug_mode - @pytest.fixture(scope='session', autouse=True) def loglevel(request): + import tractor orig = tractor.log._default_loglevel level = tractor.log._default_loglevel = request.config.option.loglevel tractor.log.get_console_log(level) @@ -90,106 +70,44 @@ def loglevel(request): tractor.log._default_loglevel = orig -@pytest.fixture(scope='session') -def spawn_backend(request) -> str: - return request.config.option.spawn_backend - - -# @pytest.fixture(scope='function', autouse=True) -# def debug_enabled(request) -> str: -# from tractor import _state -# if _state._runtime_vars['_debug_mode']: -# breakpoint() - _ci_env: bool = os.environ.get('CI', False) @pytest.fixture(scope='session') def ci_env() -> bool: ''' - Detect CI envoirment. + Detect CI environment. ''' return _ci_env -# TODO: also move this to `._testing` for now? -# -[ ] possibly generalize and re-use for multi-tree spawning -# along with the new stuff for multi-addrs in distribute_dis -# branch? -# -# choose randomly at import time -_reg_addr: tuple[str, int] = ( - '127.0.0.1', - random.randint(1000, 9999), -) - - -@pytest.fixture(scope='session') -def reg_addr() -> tuple[str, int]: - - # globally override the runtime to the per-test-session-dynamic - # addr so that all tests never conflict with any other actor - # tree using the default. - from tractor import _root - _root._default_lo_addrs = [_reg_addr] - - return _reg_addr - - -def pytest_generate_tests(metafunc): - spawn_backend = metafunc.config.option.spawn_backend - - if not spawn_backend: - # XXX some weird windows bug with `pytest`? - spawn_backend = 'trio' - - # TODO: maybe just use the literal `._spawn.SpawnMethodKey`? - assert spawn_backend in ( - 'mp_spawn', - 'mp_forkserver', - 'trio', - ) - - # NOTE: used to be used to dyanmically parametrize tests for when - # you just passed --spawn-backend=`mp` on the cli, but now we expect - # that cli input to be manually specified, BUT, maybe we'll do - # something like this again in the future? - if 'start_method' in metafunc.fixturenames: - metafunc.parametrize("start_method", [spawn_backend], scope='module') - - -# TODO: a way to let test scripts (like from `examples/`) -# guarantee they won't registry addr collide! -# @pytest.fixture -# def open_test_runtime( -# reg_addr: tuple, -# ) -> AsyncContextManager: -# return partial( -# tractor.open_nursery, -# registry_addrs=[reg_addr], -# ) - - -def sig_prog(proc, sig): +def sig_prog( + proc: subprocess.Popen, + sig: int, + canc_timeout: float = 0.1, +) -> int: "Kill the actor-process with ``sig``." proc.send_signal(sig) - time.sleep(0.1) + time.sleep(canc_timeout) if not proc.poll(): # TODO: why sometimes does SIGINT not work on teardown? # seems to happen only when trace logging enabled? proc.send_signal(_KILL_SIGNAL) - ret = proc.wait() + ret: int = proc.wait() assert ret # TODO: factor into @cm and move to `._testing`? @pytest.fixture def daemon( + debug_mode: bool, loglevel: str, testdir, reg_addr: tuple[str, int], -): + tpt_proto: str, + +) -> subprocess.Popen: ''' Run a daemon root actor as a separate actor-process tree and "remote registrar" for discovery-protocol related tests. @@ -200,28 +118,100 @@ def daemon( loglevel: str = 'info' code: str = ( - "import tractor; " - "tractor.run_daemon([], registry_addrs={reg_addrs}, loglevel={ll})" + "import tractor; " + "tractor.run_daemon([], " + "registry_addrs={reg_addrs}, " + "debug_mode={debug_mode}, " + "loglevel={ll})" ).format( reg_addrs=str([reg_addr]), ll="'{}'".format(loglevel) if loglevel else None, + debug_mode=debug_mode, ) cmd: list[str] = [ sys.executable, '-c', code, ] + # breakpoint() kwargs = {} if platform.system() == 'Windows': # without this, tests hang on windows forever kwargs['creationflags'] = subprocess.CREATE_NEW_PROCESS_GROUP - proc = testdir.popen( + proc: subprocess.Popen = testdir.popen( cmd, - stdout=subprocess.PIPE, - stderr=subprocess.PIPE, **kwargs, ) - assert not proc.returncode + + # UDS sockets are **really** fast to bind()/listen()/connect() + # so it's often required that we delay a bit more starting + # the first actor-tree.. + if tpt_proto == 'uds': + global _PROC_SPAWN_WAIT + _PROC_SPAWN_WAIT = 0.6 + time.sleep(_PROC_SPAWN_WAIT) + + assert not proc.returncode yield proc sig_prog(proc, _INT_SIGNAL) + + # XXX! yeah.. just be reaaal careful with this bc sometimes it + # can lock up on the `_io.BufferedReader` and hang.. + stderr: str = proc.stderr.read().decode() + if stderr: + print( + f'Daemon actor tree produced STDERR:\n' + f'{proc.args}\n' + f'\n' + f'{stderr}\n' + ) + if proc.returncode != -2: + raise RuntimeError( + 'Daemon actor tree failed !?\n' + f'{proc.args}\n' + ) + + +# @pytest.fixture(autouse=True) +# def shared_last_failed(pytestconfig): +# val = pytestconfig.cache.get("example/value", None) +# breakpoint() +# if val is None: +# pytestconfig.cache.set("example/value", val) +# return val + + +# TODO: a way to let test scripts (like from `examples/`) +# guarantee they won't `registry_addrs` collide! +# -[ ] maybe use some kinda standard `def main()` arg-spec that +# we can introspect from a fixture that is called from the test +# body? +# -[ ] test and figure out typing for below prototype! Bp +# +# @pytest.fixture +# def set_script_runtime_args( +# reg_addr: tuple, +# ) -> Callable[[...], None]: + +# def import_n_partial_in_args_n_triorun( +# script: Path, # under examples? +# **runtime_args, +# ) -> Callable[[], Any]: # a `partial`-ed equiv of `trio.run()` + +# # NOTE, below is taken from +# # `.test_advanced_faults.test_ipc_channel_break_during_stream` +# mod: ModuleType = import_path( +# examples_dir() / 'advanced_faults' +# / 'ipc_failure_during_stream.py', +# root=examples_dir(), +# consider_namespace_packages=False, +# ) +# return partial( +# trio.run, +# partial( +# mod.main, +# **runtime_args, +# ) +# ) +# return import_n_partial_in_args_n_triorun diff --git a/tests/ipc/__init__.py b/tests/ipc/__init__.py new file mode 100644 index 00000000..b8b1f156 --- /dev/null +++ b/tests/ipc/__init__.py @@ -0,0 +1,4 @@ +''' +`tractor.ipc` subsystem(s)/unit testing suites. + +''' diff --git a/tests/ipc/test_multi_tpt.py b/tests/ipc/test_multi_tpt.py new file mode 100644 index 00000000..353385e1 --- /dev/null +++ b/tests/ipc/test_multi_tpt.py @@ -0,0 +1,95 @@ +''' +Verify the `enable_transports` param drives various +per-root/sub-actor IPC endpoint/server settings. + +''' +from __future__ import annotations + +import pytest +import trio +import tractor +from tractor import ( + Actor, + Portal, + ipc, + msg, + _state, + _addr, +) + +@tractor.context +async def chk_tpts( + ctx: tractor.Context, + tpt_proto_key: str, +): + rtvars = _state._runtime_vars + assert ( + tpt_proto_key + in + rtvars['_enable_tpts'] + ) + actor: Actor = tractor.current_actor() + spec: msg.types.SpawnSpec = actor._spawn_spec + assert spec._runtime_vars == rtvars + + # ensure individual IPC ep-addr types + serv: ipc._server.Server = actor.ipc_server + addr: ipc._types.Address + for addr in serv.addrs: + assert addr.proto_key == tpt_proto_key + + # Actor delegate-props enforcement + assert ( + actor.accept_addrs + == + serv.accept_addrs + ) + + await ctx.started(serv.accept_addrs) + + +# TODO, parametrize over mis-matched-proto-typed `registry_addrs` +# since i seems to work in `piker` but not exactly sure if both tcp +# & uds are being deployed then? +# +@pytest.mark.parametrize( + 'tpt_proto_key', + ['tcp', 'uds'], + ids=lambda item: f'ipc_tpt={item!r}' +) +def test_root_passes_tpt_to_sub( + tpt_proto_key: str, + reg_addr: tuple, + debug_mode: bool, +): + async def main(): + async with tractor.open_nursery( + enable_transports=[tpt_proto_key], + registry_addrs=[reg_addr], + debug_mode=debug_mode, + ) as an: + + assert ( + tpt_proto_key + in + _state._runtime_vars['_enable_tpts'] + ) + + ptl: Portal = await an.start_actor( + name='sub', + enable_modules=[__name__], + ) + async with ptl.open_context( + chk_tpts, + tpt_proto_key=tpt_proto_key, + ) as (ctx, accept_addrs): + + uw_addr: tuple + for uw_addr in accept_addrs: + addr = _addr.wrap_address(uw_addr) + assert addr.is_valid + + # shudown sub-actor(s) + await an.cancel() + + trio.run(main) diff --git a/tests/ipc/test_server.py b/tests/ipc/test_server.py new file mode 100644 index 00000000..1d63bd1b --- /dev/null +++ b/tests/ipc/test_server.py @@ -0,0 +1,72 @@ +''' +High-level `.ipc._server` unit tests. + +''' +from __future__ import annotations + +import pytest +import trio +from tractor import ( + devx, + ipc, + log, +) +from tractor._testing.addr import ( + get_rando_addr, +) +# TODO, use/check-roundtripping with some of these wrapper types? +# +# from .._addr import Address +# from ._chan import Channel +# from ._transport import MsgTransport +# from ._uds import UDSAddress +# from ._tcp import TCPAddress + + +@pytest.mark.parametrize( + '_tpt_proto', + ['uds', 'tcp'] +) +def test_basic_ipc_server( + _tpt_proto: str, + debug_mode: bool, + loglevel: str, +): + + # so we see the socket-listener reporting on console + log.get_console_log("INFO") + + rando_addr: tuple = get_rando_addr( + tpt_proto=_tpt_proto, + ) + async def main(): + async with ipc._server.open_ipc_server() as server: + + assert ( + server._parent_tn + and + server._parent_tn is server._stream_handler_tn + ) + assert server._no_more_peers.is_set() + + eps: list[ipc._server.Endpoint] = await server.listen_on( + accept_addrs=[rando_addr], + stream_handler_nursery=None, + ) + assert ( + len(eps) == 1 + and + (ep := eps[0])._listener + and + not ep.peer_tpts + ) + + server._parent_tn.cancel_scope.cancel() + + # !TODO! actually make a bg-task connection from a client + # using `ipc._chan._connect_chan()` + + with devx.maybe_open_crash_handler( + pdb=debug_mode, + ): + trio.run(main) diff --git a/tests/test_advanced_faults.py b/tests/test_advanced_faults.py index de8a0e1c..061ae5aa 100644 --- a/tests/test_advanced_faults.py +++ b/tests/test_advanced_faults.py @@ -10,6 +10,9 @@ import pytest from _pytest.pathlib import import_path import trio import tractor +from tractor import ( + TransportClosed, +) from tractor._testing import ( examples_dir, break_ipc, @@ -74,6 +77,7 @@ def test_ipc_channel_break_during_stream( spawn_backend: str, ipc_break: dict|None, pre_aclose_msgstream: bool, + tpt_proto: str, ): ''' Ensure we can have an IPC channel break its connection during @@ -91,7 +95,7 @@ def test_ipc_channel_break_during_stream( # non-`trio` spawners should never hit the hang condition that # requires the user to do ctl-c to cancel the actor tree. # expect_final_exc = trio.ClosedResourceError - expect_final_exc = tractor.TransportClosed + expect_final_exc = TransportClosed mod: ModuleType = import_path( examples_dir() / 'advanced_faults' @@ -104,6 +108,8 @@ def test_ipc_channel_break_during_stream( # period" wherein the user eventually hits ctl-c to kill the # root-actor tree. expect_final_exc: BaseException = KeyboardInterrupt + expect_final_cause: BaseException|None = None + if ( # only expect EoC if trans is broken on the child side, ipc_break['break_child_ipc_after'] is not False @@ -138,6 +144,9 @@ def test_ipc_channel_break_during_stream( # a user sending ctl-c by raising a KBI. if pre_aclose_msgstream: expect_final_exc = KeyboardInterrupt + if tpt_proto == 'uds': + expect_final_exc = TransportClosed + expect_final_cause = trio.BrokenResourceError # XXX OLD XXX # if child calls `MsgStream.aclose()` then expect EoC. @@ -157,6 +166,10 @@ def test_ipc_channel_break_during_stream( if pre_aclose_msgstream: expect_final_exc = KeyboardInterrupt + if tpt_proto == 'uds': + expect_final_exc = TransportClosed + expect_final_cause = trio.BrokenResourceError + # NOTE when the parent IPC side dies (even if the child does as well # but the child fails BEFORE the parent) we always expect the # IPC layer to raise a closed-resource, NEVER do we expect @@ -169,8 +182,8 @@ def test_ipc_channel_break_during_stream( and ipc_break['break_child_ipc_after'] is False ): - # expect_final_exc = trio.ClosedResourceError expect_final_exc = tractor.TransportClosed + expect_final_cause = trio.ClosedResourceError # BOTH but, PARENT breaks FIRST elif ( @@ -181,8 +194,8 @@ def test_ipc_channel_break_during_stream( ipc_break['break_parent_ipc_after'] ) ): - # expect_final_exc = trio.ClosedResourceError expect_final_exc = tractor.TransportClosed + expect_final_cause = trio.ClosedResourceError with pytest.raises( expected_exception=( @@ -198,6 +211,7 @@ def test_ipc_channel_break_during_stream( start_method=spawn_backend, loglevel=loglevel, pre_close=pre_aclose_msgstream, + tpt_proto=tpt_proto, **ipc_break, ) ) @@ -220,10 +234,15 @@ def test_ipc_channel_break_during_stream( ) cause: Exception = tc.__cause__ assert ( - type(cause) is trio.ClosedResourceError - and - cause.args[0] == 'another task closed this fd' + # type(cause) is trio.ClosedResourceError + type(cause) is expect_final_cause + + # TODO, should we expect a certain exc-message (per + # tpt) as well?? + # and + # cause.args[0] == 'another task closed this fd' ) + raise # get raw instance from pytest wrapper diff --git a/tests/test_discovery.py b/tests/test_discovery.py index 18b2aa1b..65a76d08 100644 --- a/tests/test_discovery.py +++ b/tests/test_discovery.py @@ -7,7 +7,9 @@ import platform from functools import partial import itertools +import psutil import pytest +import subprocess import tractor from tractor._testing import tractor_test import trio @@ -152,13 +154,23 @@ async def unpack_reg(actor_or_portal): async def spawn_and_check_registry( reg_addr: tuple, use_signal: bool, + debug_mode: bool = False, remote_arbiter: bool = False, with_streaming: bool = False, + maybe_daemon: tuple[ + subprocess.Popen, + psutil.Process, + ]|None = None, ) -> None: + if maybe_daemon: + popen, proc = maybe_daemon + # breakpoint() + async with tractor.open_root_actor( registry_addrs=[reg_addr], + debug_mode=debug_mode, ): async with tractor.get_registry(reg_addr) as portal: # runtime needs to be up to call this @@ -176,11 +188,11 @@ async def spawn_and_check_registry( extra = 2 # local root actor + remote arbiter # ensure current actor is registered - registry = await get_reg() + registry: dict = await get_reg() assert actor.uid in registry try: - async with tractor.open_nursery() as n: + async with tractor.open_nursery() as an: async with trio.open_nursery( strict_exception_groups=False, ) as trion: @@ -189,17 +201,17 @@ async def spawn_and_check_registry( for i in range(3): name = f'a{i}' if with_streaming: - portals[name] = await n.start_actor( + portals[name] = await an.start_actor( name=name, enable_modules=[__name__]) else: # no streaming - portals[name] = await n.run_in_actor( + portals[name] = await an.run_in_actor( trio.sleep_forever, name=name) # wait on last actor to come up async with tractor.wait_for_actor(name): registry = await get_reg() - for uid in n._children: + for uid in an._children: assert uid in registry assert len(portals) + extra == len(registry) @@ -232,6 +244,7 @@ async def spawn_and_check_registry( @pytest.mark.parametrize('use_signal', [False, True]) @pytest.mark.parametrize('with_streaming', [False, True]) def test_subactors_unregister_on_cancel( + debug_mode: bool, start_method, use_signal, reg_addr, @@ -248,6 +261,7 @@ def test_subactors_unregister_on_cancel( spawn_and_check_registry, reg_addr, use_signal, + debug_mode=debug_mode, remote_arbiter=False, with_streaming=with_streaming, ), @@ -257,7 +271,8 @@ def test_subactors_unregister_on_cancel( @pytest.mark.parametrize('use_signal', [False, True]) @pytest.mark.parametrize('with_streaming', [False, True]) def test_subactors_unregister_on_cancel_remote_daemon( - daemon, + daemon: subprocess.Popen, + debug_mode: bool, start_method, use_signal, reg_addr, @@ -273,8 +288,13 @@ def test_subactors_unregister_on_cancel_remote_daemon( spawn_and_check_registry, reg_addr, use_signal, + debug_mode=debug_mode, remote_arbiter=True, with_streaming=with_streaming, + maybe_daemon=( + daemon, + psutil.Process(daemon.pid) + ), ), ) @@ -373,7 +393,7 @@ def test_close_channel_explicit( @pytest.mark.parametrize('use_signal', [False, True]) def test_close_channel_explicit_remote_arbiter( - daemon, + daemon: subprocess.Popen, start_method, use_signal, reg_addr, diff --git a/tests/test_resource_cache.py b/tests/test_resource_cache.py index d3859814..bdcdd6c9 100644 --- a/tests/test_resource_cache.py +++ b/tests/test_resource_cache.py @@ -100,16 +100,29 @@ async def streamer( @acm async def open_stream() -> Awaitable[tractor.MsgStream]: - async with tractor.open_nursery() as tn: - portal = await tn.start_actor('streamer', enable_modules=[__name__]) - async with ( - portal.open_context(streamer) as (ctx, first), - ctx.open_stream() as stream, - ): - yield stream + try: + async with tractor.open_nursery() as an: + portal = await an.start_actor( + 'streamer', + enable_modules=[__name__], + ) + async with ( + portal.open_context(streamer) as (ctx, first), + ctx.open_stream() as stream, + ): + yield stream - await portal.cancel_actor() - print('CANCELLED STREAMER') + print('Cancelling streamer') + await portal.cancel_actor() + print('Cancelled streamer') + + except Exception as err: + print( + f'`open_stream()` errored?\n' + f'{err!r}\n' + ) + await tractor.pause(shield=True) + raise err @acm @@ -132,19 +145,28 @@ async def maybe_open_stream(taskname: str): yield stream -def test_open_local_sub_to_stream(): +def test_open_local_sub_to_stream( + debug_mode: bool, +): ''' Verify a single inter-actor stream can can be fanned-out shared to - N local tasks using ``trionics.maybe_open_context():``. + N local tasks using `trionics.maybe_open_context()`. ''' - timeout: float = 3.6 if platform.system() != "Windows" else 10 + timeout: float = 3.6 + if platform.system() == "Windows": + timeout: float = 10 + + if debug_mode: + timeout = 999 async def main(): full = list(range(1000)) async def get_sub_and_pull(taskname: str): + + stream: tractor.MsgStream async with ( maybe_open_stream(taskname) as stream, ): @@ -165,17 +187,27 @@ def test_open_local_sub_to_stream(): assert set(seq).issubset(set(full)) print(f'{taskname} finished') - with trio.fail_after(timeout): + with trio.fail_after(timeout) as cs: # TODO: turns out this isn't multi-task entrant XD # We probably need an indepotent entry semantic? - async with tractor.open_root_actor(): + async with tractor.open_root_actor( + debug_mode=debug_mode, + ): async with ( - trio.open_nursery() as nurse, + trio.open_nursery() as tn, ): for i in range(10): - nurse.start_soon(get_sub_and_pull, f'task_{i}') + tn.start_soon( + get_sub_and_pull, + f'task_{i}', + ) await trio.sleep(0.001) print('all consumer tasks finished') + if cs.cancelled_caught: + pytest.fail( + 'Should NOT time out in `open_root_actor()` ?' + ) + trio.run(main) diff --git a/tests/test_trioisms.py b/tests/test_trioisms.py index 9f1ccec9..3343d788 100644 --- a/tests/test_trioisms.py +++ b/tests/test_trioisms.py @@ -180,7 +180,8 @@ def test_acm_embedded_nursery_propagates_enter_err( with tractor.devx.maybe_open_crash_handler( pdb=debug_mode, ) as bxerr: - assert not bxerr.value + if bxerr: + assert not bxerr.value async with ( wraps_tn_that_always_cancels() as tn, diff --git a/tractor/_testing/addr.py b/tractor/_testing/addr.py new file mode 100644 index 00000000..1b066336 --- /dev/null +++ b/tractor/_testing/addr.py @@ -0,0 +1,70 @@ +# tractor: structured concurrent "actors". +# Copyright 2018-eternity Tyler Goodlet. + +# This program is free software: you can redistribute it and/or modify +# it under the terms of the GNU Affero General Public License as published by +# the Free Software Foundation, either version 3 of the License, or +# (at your option) any later version. + +# This program is distributed in the hope that it will be useful, +# but WITHOUT ANY WARRANTY; without even the implied warranty of +# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the +# GNU Affero General Public License for more details. + +# You should have received a copy of the GNU Affero General Public License +# along with this program. If not, see . + +''' +Random IPC addr generation for isolating +the discovery space between test sessions. + +Might be eventually useful to expose as a util set from +our `tractor.discovery` subsys? + +''' +import random +from typing import ( + Type, +) +from tractor import ( + _addr, +) + + +def get_rando_addr( + tpt_proto: str, + *, + + # choose random port at import time + _rando_port: str = random.randint(1000, 9999) + +) -> tuple[str, str|int]: + ''' + Used to globally override the runtime to the + per-test-session-dynamic addr so that all tests never conflict + with any other actor tree using the default. + + ''' + addr_type: Type[_addr.Addres] = _addr._address_types[tpt_proto] + def_reg_addr: tuple[str, int] = _addr._default_lo_addrs[tpt_proto] + + # this is the "unwrapped" form expected to be passed to + # `.open_root_actor()` by test body. + testrun_reg_addr: tuple[str, int|str] + match tpt_proto: + case 'tcp': + testrun_reg_addr = ( + addr_type.def_bindspace, + _rando_port, + ) + + # NOTE, file-name uniqueness (no-collisions) will be based on + # the runtime-directory and root (pytest-proc's) pid. + case 'uds': + testrun_reg_addr = addr_type.get_random().unwrap() + + # XXX, as sanity it should never the same as the default for the + # host-singleton registry actor. + assert def_reg_addr != testrun_reg_addr + + return testrun_reg_addr diff --git a/tractor/_testing/pytest.py b/tractor/_testing/pytest.py index 93eeaf72..1a2f63ab 100644 --- a/tractor/_testing/pytest.py +++ b/tractor/_testing/pytest.py @@ -26,29 +26,46 @@ from functools import ( import inspect import platform +import pytest import tractor import trio def tractor_test(fn): ''' - Decorator for async test funcs to present them as "native" - looking sync funcs runnable by `pytest` using `trio.run()`. + Decorator for async test fns to decorator-wrap them as "native" + looking sync funcs runnable by `pytest` and auto invoked with + `trio.run()` (much like the `pytest-trio` plugin's approach). - Use: + Further the test fn body will be invoked AFTER booting the actor + runtime, i.e. from inside a `tractor.open_root_actor()` block AND + with various runtime and tooling parameters implicitly passed as + requested by by the test session's config; see immediately below. - @tractor_test - async def test_whatever(): - await ... + Basic deco use: + --------------- - If fixtures: + @tractor_test + async def test_whatever(): + await ... - - ``reg_addr`` (a socket addr tuple where arbiter is listening) - - ``loglevel`` (logging level passed to tractor internals) - - ``start_method`` (subprocess spawning backend) - are defined in the `pytest` fixture space they will be automatically - injected to tests declaring these funcargs. + Runtime config via special fixtures: + ------------------------------------ + If any of the following fixture are requested by the wrapped test + fn (via normal func-args declaration), + + - `reg_addr` (a socket addr tuple where arbiter is listening) + - `loglevel` (logging level passed to tractor internals) + - `start_method` (subprocess spawning backend) + + (TODO support) + - `tpt_proto` (IPC transport protocol key) + + they will be automatically injected to each test as normally + expected as well as passed to the initial + `tractor.open_root_actor()` funcargs. + ''' @wraps(fn) def wrapper( @@ -111,3 +128,164 @@ def tractor_test(fn): return trio.run(main) return wrapper + + +def pytest_addoption( + parser: pytest.Parser, +): + # parser.addoption( + # "--ll", + # action="store", + # dest='loglevel', + # default='ERROR', help="logging level to set when testing" + # ) + + parser.addoption( + "--spawn-backend", + action="store", + dest='spawn_backend', + default='trio', + help="Processing spawning backend to use for test run", + ) + + parser.addoption( + "--tpdb", + "--debug-mode", + action="store_true", + dest='tractor_debug_mode', + # default=False, + help=( + 'Enable a flag that can be used by tests to to set the ' + '`debug_mode: bool` for engaging the internal ' + 'multi-proc debugger sys.' + ), + ) + + # provide which IPC transport protocols opting-in test suites + # should accumulatively run against. + parser.addoption( + "--tpt-proto", + nargs='+', # accumulate-multiple-args + action="store", + dest='tpt_protos', + default=['tcp'], + help="Transport protocol to use under the `tractor.ipc.Channel`", + ) + + +def pytest_configure(config): + backend = config.option.spawn_backend + tractor._spawn.try_set_start_method(backend) + + +@pytest.fixture(scope='session') +def debug_mode(request) -> bool: + ''' + Flag state for whether `--tpdb` (for `tractor`-py-debugger) + was passed to the test run. + + Normally tests should pass this directly to `.open_root_actor()` + to allow the user to opt into suite-wide crash handling. + + ''' + debug_mode: bool = request.config.option.tractor_debug_mode + return debug_mode + + +@pytest.fixture(scope='session') +def spawn_backend(request) -> str: + return request.config.option.spawn_backend + + +@pytest.fixture(scope='session') +def tpt_protos(request) -> list[str]: + + # allow quoting on CLI + proto_keys: list[str] = [ + proto_key.replace('"', '').replace("'", "") + for proto_key in request.config.option.tpt_protos + ] + + # ?TODO, eventually support multiple protos per test-sesh? + if len(proto_keys) > 1: + pytest.fail( + 'We only support one `--tpt-proto ` atm!\n' + ) + + # XXX ensure we support the protocol by name via lookup! + for proto_key in proto_keys: + addr_type = tractor._addr._address_types[proto_key] + assert addr_type.proto_key == proto_key + + yield proto_keys + + +@pytest.fixture( + scope='session', + autouse=True, +) +def tpt_proto( + tpt_protos: list[str], +) -> str: + proto_key: str = tpt_protos[0] + + from tractor import _state + if _state._def_tpt_proto != proto_key: + _state._def_tpt_proto = proto_key + + yield proto_key + + +@pytest.fixture(scope='session') +def reg_addr( + tpt_proto: str, +) -> tuple[str, int|str]: + ''' + Deliver a test-sesh unique registry address such + that each run's (tests which use this fixture) will + have no conflicts/cross-talk when running simultaneously + nor will interfere with other live `tractor` apps active + on the same network-host (namespace). + + ''' + from tractor._testing.addr import get_rando_addr + return get_rando_addr( + tpt_proto=tpt_proto, + ) + + +def pytest_generate_tests( + metafunc: pytest.Metafunc, +): + spawn_backend: str = metafunc.config.option.spawn_backend + + if not spawn_backend: + # XXX some weird windows bug with `pytest`? + spawn_backend = 'trio' + + # TODO: maybe just use the literal `._spawn.SpawnMethodKey`? + assert spawn_backend in ( + 'mp_spawn', + 'mp_forkserver', + 'trio', + ) + + # NOTE: used-to-be-used-to dyanmically parametrize tests for when + # you just passed --spawn-backend=`mp` on the cli, but now we expect + # that cli input to be manually specified, BUT, maybe we'll do + # something like this again in the future? + if 'start_method' in metafunc.fixturenames: + metafunc.parametrize( + "start_method", + [spawn_backend], + scope='module', + ) + + # TODO, parametrize any `tpt_proto: str` declaring tests! + # proto_tpts: list[str] = metafunc.config.option.proto_tpts + # if 'tpt_proto' in metafunc.fixturenames: + # metafunc.parametrize( + # 'tpt_proto', + # proto_tpts, # TODO, double check this list usage! + # scope='module', + # ) diff --git a/tractor/ipc/_server.py b/tractor/ipc/_server.py index e06e747c..cec0e56f 100644 --- a/tractor/ipc/_server.py +++ b/tractor/ipc/_server.py @@ -289,7 +289,7 @@ async def maybe_wait_on_canced_subs( # # -[x] maybe change to mod-func and rename for implied # multi-transport semantics? -# -[ ] register each stream/tpt/chan with the owning `IPCEndpoint` +# -[ ] register each stream/tpt/chan with the owning `Endpoint` # so that we can query per tpt all peer contact infos? # |_[ ] possibly provide a global viewing via a # `collections.ChainMap`? @@ -309,7 +309,7 @@ async def handle_stream_from_peer( any `IPCServer.listen_on()` passed `stream_handler_tn: Nursery` such that it is invoked as, - IPCEndpoint.stream_handler_tn.start_soon( + Endpoint.stream_handler_tn.start_soon( handle_stream, stream, ) @@ -577,7 +577,7 @@ async def handle_stream_from_peer( # finally block closure -class IPCEndpoint(Struct): +class Endpoint(Struct): ''' An instance of an IPC "bound" address where the lifetime of the "ability to accept connections" (from clients) and then handle @@ -636,7 +636,7 @@ class IPCEndpoint(Struct): ) -class IPCServer(Struct): +class Server(Struct): _parent_tn: Nursery _stream_handler_tn: Nursery # level-triggered sig for whether "no peers are currently @@ -644,7 +644,7 @@ class IPCServer(Struct): # initialized with `.is_set() == True`. _no_more_peers: trio.Event - _endpoints: list[IPCEndpoint] = [] + _endpoints: list[Endpoint] = [] # connection tracking & mgmt _peers: defaultdict[ @@ -659,10 +659,10 @@ class IPCServer(Struct): # syncs for setup/teardown sequences _shutdown: trio.Event|None = None - # TODO, maybe just make `._endpoints: list[IPCEndpoint]` and + # TODO, maybe just make `._endpoints: list[Endpoint]` and # provide dict-views onto it? # @property - # def addrs2eps(self) -> dict[Address, IPCEndpoint]: + # def addrs2eps(self) -> dict[Address, Endpoint]: # ... @property @@ -708,7 +708,7 @@ class IPCServer(Struct): await self._shutdown.wait() else: tpt_protos: list[str] = [] - ep: IPCEndpoint + ep: Endpoint for ep in self._endpoints: tpt_protos.append(ep.addr.proto_key) @@ -790,7 +790,7 @@ class IPCServer(Struct): def epsdict(self) -> dict[ Address, - IPCEndpoint, + Endpoint, ]: return { ep.addr: ep @@ -804,7 +804,7 @@ class IPCServer(Struct): return ev.is_set() def pformat(self) -> str: - eps: list[IPCEndpoint] = self._endpoints + eps: list[Endpoint] = self._endpoints state_repr: str = ( f'{len(eps)!r} IPC-endpoints active' @@ -835,13 +835,13 @@ class IPCServer(Struct): # TODO? maybe allow shutting down a `.listen_on()`s worth of # listeners by cancelling the corresponding - # `IPCEndpoint._listen_tn` only ? + # `Endpoint._listen_tn` only ? # -[ ] in theory you could use this to # "boot-and-wait-for-reconnect" of all current and connecting # peers? # |_ would require that the stream-handler is intercepted so we # can intercept every `MsgTransport` (stream) and track per - # `IPCEndpoint` likely? + # `Endpoint` likely? # # async def unlisten( # self, @@ -854,7 +854,7 @@ class IPCServer(Struct): *, accept_addrs: list[tuple[str, int|str]]|None = None, stream_handler_nursery: Nursery|None = None, - ) -> list[IPCEndpoint]: + ) -> list[Endpoint]: ''' Start `SocketListeners` (i.e. bind and call `socket.listen()`) for all IPC-transport-protocol specific `Address`-types @@ -888,7 +888,7 @@ class IPCServer(Struct): f'Binding to endpoints for,\n' f'{accept_addrs}\n' ) - eps: list[IPCEndpoint] = await self._parent_tn.start( + eps: list[Endpoint] = await self._parent_tn.start( partial( _serve_ipc_eps, server=self, @@ -904,7 +904,7 @@ class IPCServer(Struct): self._endpoints.extend(eps) # XXX, just a little bit of sanity group_tn: Nursery|None = None - ep: IPCEndpoint + ep: Endpoint for ep in eps: if ep.addr not in self.addrs: breakpoint() @@ -917,6 +917,10 @@ class IPCServer(Struct): return eps +# alias until we decide on final naming +IPCServer = Server + + async def _serve_ipc_eps( *, server: IPCServer, @@ -941,12 +945,12 @@ async def _serve_ipc_eps( listen_tn: Nursery async with trio.open_nursery() as listen_tn: - eps: list[IPCEndpoint] = [] + eps: list[Endpoint] = [] # XXX NOTE, required to call `serve_listeners()` below. # ?TODO, maybe just pass `list(eps.values()` tho? listeners: list[trio.abc.Listener] = [] for addr in listen_addrs: - ep = IPCEndpoint( + ep = Endpoint( addr=addr, listen_tn=listen_tn, stream_handler_tn=stream_handler_tn, @@ -1010,7 +1014,7 @@ async def _serve_ipc_eps( finally: if eps: addr: Address - ep: IPCEndpoint + ep: Endpoint for addr, ep in server.epsdict().items(): ep.close_listener() server._endpoints.remove(ep)