From 15047341bd5007519fc592ba79915e119f8caad4 Mon Sep 17 00:00:00 2001 From: Tyler Goodlet Date: Sun, 9 Oct 2022 15:09:14 -0400 Subject: [PATCH 01/11] Ignore forserver override attrs with `mypy` --- tractor/_spawn.py | 29 +++++++++++++++++++---------- tractor/_supervise.py | 6 +++++- 2 files changed, 24 insertions(+), 11 deletions(-) diff --git a/tractor/_spawn.py b/tractor/_spawn.py index 78b3ba8..d4fb60b 100644 --- a/tractor/_spawn.py +++ b/tractor/_spawn.py @@ -22,7 +22,11 @@ from __future__ import annotations import sys import platform from typing import ( - Any, Optional, Callable, TypeVar, TYPE_CHECKING + Any, + Optional, + Callable, + TypeVar, + TYPE_CHECKING, ) from collections.abc import Awaitable @@ -48,6 +52,7 @@ from ._exceptions import ActorFailure if TYPE_CHECKING: + from ._supervise import ActorNursery import multiprocessing as mp ProcessType = TypeVar('ProcessType', mp.Process, trio.Process) @@ -249,7 +254,7 @@ async def soft_wait( async def new_proc( name: str, - actor_nursery: 'ActorNursery', # type: ignore # noqa + actor_nursery: ActorNursery, subactor: Actor, errors: dict[tuple[str, str], Exception], @@ -308,7 +313,8 @@ async def new_proc( try: try: # TODO: needs ``trio_typing`` patch? - proc = await trio.lowlevel.open_process(spawn_cmd) # type: ignore + proc = await trio.lowlevel.open_process( # type: ignore + spawn_cmd) log.runtime(f"Started {proc}") @@ -340,7 +346,10 @@ async def new_proc( portal = Portal(chan) actor_nursery._children[subactor.uid] = ( - subactor, proc, portal) + subactor, + proc, + portal, + ) # send additional init params await chan.send({ @@ -443,7 +452,7 @@ async def new_proc( async def mp_new_proc( name: str, - actor_nursery: 'ActorNursery', # type: ignore # noqa + actor_nursery: ActorNursery, # type: ignore # noqa subactor: Actor, errors: dict[tuple[str, str], Exception], # passed through to actor main @@ -480,8 +489,8 @@ async def mp_new_proc( # forkserver.set_forkserver_preload(enable_modules) forkserver.ensure_running() fs_info = ( - fs._forkserver_address, - fs._forkserver_alive_fd, + fs._forkserver_address, # type: ignore # noqa + fs._forkserver_alive_fd, # type: ignore # noqa getattr(fs, '_forkserver_pid', None), getattr( resource_tracker._resource_tracker, '_pid', None), @@ -490,9 +499,9 @@ async def mp_new_proc( else: assert curr_actor._forkserver_info fs_info = ( - fs._forkserver_address, - fs._forkserver_alive_fd, - fs._forkserver_pid, + fs._forkserver_address, # type: ignore # noqa + fs._forkserver_alive_fd, # type: ignore # noqa + fs._forkserver_pid, # type: ignore # noqa resource_tracker._resource_tracker._pid, resource_tracker._resource_tracker._fd, ) = curr_actor._forkserver_info diff --git a/tractor/_supervise.py b/tractor/_supervise.py index 06ee38d..19c7100 100644 --- a/tractor/_supervise.py +++ b/tractor/_supervise.py @@ -90,7 +90,11 @@ class ActorNursery: self._da_nursery = da_nursery self._children: dict[ tuple[str, str], - tuple[Actor, mp.Process, Optional[Portal]] + tuple[ + Actor, + mp.context.Process | trio.Process, + Optional[Portal], + ] ] = {} # portals spawned with ``run_in_actor()`` are # cancelled when their "main" result arrives From 90f4912580b186b056597a288c699fb99dba59b9 Mon Sep 17 00:00:00 2001 From: Tyler Goodlet Date: Sun, 9 Oct 2022 16:05:40 -0400 Subject: [PATCH 02/11] Organize process spawning into lookup table Instead of the logic branching create a table `._spawn._methods` which is used to lookup the desired backend framework (in this case still only one of `multiprocessing` or `trio`) and make the top level `.new_proc()` do the lookup and any common logic. Use a `typing.Literal` to define the lookup table's key set. Repair and ignore a bunch of type-annot related stuff todo with `mypy` updates and backend-specific process typing. --- tractor/_entry.py | 21 ++- tractor/_root.py | 2 +- tractor/_spawn.py | 357 +++++++++++++++++++++++------------------- tractor/_supervise.py | 2 +- 4 files changed, 216 insertions(+), 166 deletions(-) diff --git a/tractor/_entry.py b/tractor/_entry.py index 931b2e2..35a9abf 100644 --- a/tractor/_entry.py +++ b/tractor/_entry.py @@ -18,15 +18,28 @@ Sub-process entry points. """ +from __future__ import annotations from functools import partial -from typing import Any +from typing import ( + Any, + TYPE_CHECKING, +) import trio # type: ignore -from .log import get_console_log, get_logger +from .log import ( + get_console_log, + get_logger, +) from . import _state from .to_asyncio import run_as_asyncio_guest -from ._runtime import async_main, Actor +from ._runtime import ( + async_main, + Actor, +) + +if TYPE_CHECKING: + from ._spawn import SpawnMethodKey log = get_logger(__name__) @@ -37,7 +50,7 @@ def _mp_main( actor: 'Actor', # type: ignore accept_addr: tuple[str, int], forkserver_info: tuple[Any, Any, Any, Any, Any], - start_method: str, + start_method: SpawnMethodKey, parent_addr: tuple[str, int] = None, infect_asyncio: bool = False, diff --git a/tractor/_root.py b/tractor/_root.py index 1def614..0e5b2aa 100644 --- a/tractor/_root.py +++ b/tractor/_root.py @@ -62,7 +62,7 @@ async def open_root_actor( # either the `multiprocessing` start method: # https://docs.python.org/3/library/multiprocessing.html#contexts-and-start-methods # OR `trio` (the new default). - start_method: Optional[str] = None, + start_method: Optional[_spawn.SpawnMethodKey] = None, # enables the multi-process debugger support debug_mode: bool = False, diff --git a/tractor/_spawn.py b/tractor/_spawn.py index d4fb60b..4defb58 100644 --- a/tractor/_spawn.py +++ b/tractor/_spawn.py @@ -23,6 +23,7 @@ import sys import platform from typing import ( Any, + Literal, Optional, Callable, TypeVar, @@ -60,7 +61,12 @@ log = get_logger('tractor') # placeholder for an mp start context if so using that backend _ctx: Optional[mp.context.BaseContext] = None -_spawn_method: str = "trio" +SpawnMethodKey = Literal[ + 'trio', + 'spawn', + 'forkserver', +] +_spawn_method: SpawnMethodKey = 'trio' if platform.system() == 'Windows': @@ -77,7 +83,10 @@ else: await trio.lowlevel.wait_readable(proc.sentinel) -def try_set_start_method(name: str) -> Optional[mp.context.BaseContext]: +def try_set_start_method( + name: SpawnMethodKey + +) -> Optional[mp.context.BaseContext]: ''' Attempt to set the method for process starting, aka the "actor spawning backend". @@ -108,6 +117,7 @@ def try_set_start_method(name: str) -> Optional[mp.context.BaseContext]: from . import _forkserver_override _forkserver_override.override_stdlib() _ctx = mp.get_context(name) + elif name == 'trio': _ctx = None else: @@ -252,6 +262,43 @@ async def soft_wait( async def new_proc( + name: str, + actor_nursery: ActorNursery, + subactor: Actor, + errors: dict[tuple[str, str], Exception], + + # passed through to actor main + bind_addr: tuple[str, int], + parent_addr: tuple[str, int], + _runtime_vars: dict[str, Any], # serialized and sent to _child + + *, + + infect_asyncio: bool = False, + task_status: TaskStatus[Portal] = trio.TASK_STATUS_IGNORED + +) -> None: + + # lookup backend spawning target + target = _methods[_spawn_method] + + # mark the new actor with the global spawn method + subactor._spawn_method = _spawn_method + + await target( + name, + actor_nursery, + subactor, + errors, + bind_addr, + parent_addr, + _runtime_vars, # run time vars + infect_asyncio=infect_asyncio, + task_status=task_status, + ) + + +async def trio_proc( name: str, actor_nursery: ActorNursery, @@ -277,179 +324,157 @@ async def new_proc( here is to be considered the core supervision strategy. ''' - # mark the new actor with the global spawn method - subactor._spawn_method = _spawn_method - uid = subactor.uid + spawn_cmd = [ + sys.executable, + "-m", + # Hardcode this (instead of using ``_child.__name__`` to avoid a + # double import warning: https://stackoverflow.com/a/45070583 + "tractor._child", + # We provide the child's unique identifier on this exec/spawn + # line for debugging purposes when viewing the process tree from + # the OS; it otherwise can be passed via the parent channel if + # we prefer in the future (for privacy). + "--uid", + str(subactor.uid), + # Address the child must connect to on startup + "--parent_addr", + str(parent_addr) + ] - if _spawn_method == 'trio': - spawn_cmd = [ - sys.executable, - "-m", - # Hardcode this (instead of using ``_child.__name__`` to avoid a - # double import warning: https://stackoverflow.com/a/45070583 - "tractor._child", - # We provide the child's unique identifier on this exec/spawn - # line for debugging purposes when viewing the process tree from - # the OS; it otherwise can be passed via the parent channel if - # we prefer in the future (for privacy). - "--uid", - str(subactor.uid), - # Address the child must connect to on startup - "--parent_addr", - str(parent_addr) + if subactor.loglevel: + spawn_cmd += [ + "--loglevel", + subactor.loglevel ] + # Tell child to run in guest mode on top of ``asyncio`` loop + if infect_asyncio: + spawn_cmd.append("--asyncio") - if subactor.loglevel: - spawn_cmd += [ - "--loglevel", - subactor.loglevel - ] - # Tell child to run in guest mode on top of ``asyncio`` loop - if infect_asyncio: - spawn_cmd.append("--asyncio") - - cancelled_during_spawn: bool = False - proc: Optional[trio.Process] = None + cancelled_during_spawn: bool = False + proc: Optional[trio.Process] = None + try: try: - try: - # TODO: needs ``trio_typing`` patch? - proc = await trio.lowlevel.open_process( # type: ignore - spawn_cmd) + # TODO: needs ``trio_typing`` patch? + proc = await trio.lowlevel.open_process( # type: ignore + spawn_cmd) - log.runtime(f"Started {proc}") + log.runtime(f"Started {proc}") - # wait for actor to spawn and connect back to us - # channel should have handshake completed by the - # local actor by the time we get a ref to it - event, chan = await actor_nursery._actor.wait_for_peer( - subactor.uid) + # wait for actor to spawn and connect back to us + # channel should have handshake completed by the + # local actor by the time we get a ref to it + event, chan = await actor_nursery._actor.wait_for_peer( + subactor.uid) - except trio.Cancelled: - cancelled_during_spawn = True - # we may cancel before the child connects back in which - # case avoid clobbering the pdb tty. - if debug_mode(): - with trio.CancelScope(shield=True): - # don't clobber an ongoing pdb - if is_root_process(): - await maybe_wait_for_debugger() - - elif proc is not None: - async with acquire_debug_lock(uid): - # soft wait on the proc to terminate - with trio.move_on_after(0.5): - await proc.wait() - raise - - # a sub-proc ref **must** exist now - assert proc - - portal = Portal(chan) - actor_nursery._children[subactor.uid] = ( - subactor, - proc, - portal, - ) - - # send additional init params - await chan.send({ - "_parent_main_data": subactor._parent_main_data, - "enable_modules": subactor.enable_modules, - "_arb_addr": subactor._arb_addr, - "bind_host": bind_addr[0], - "bind_port": bind_addr[1], - "_runtime_vars": _runtime_vars, - }) - - # track subactor in current nursery - curr_actor = current_actor() - curr_actor._actoruid2nursery[subactor.uid] = actor_nursery - - # resume caller at next checkpoint now that child is up - task_status.started(portal) - - # wait for ActorNursery.wait() to be called - with trio.CancelScope(shield=True): - await actor_nursery._join_procs.wait() - - async with trio.open_nursery() as nursery: - if portal in actor_nursery._cancel_after_result_on_exit: - nursery.start_soon( - cancel_on_completion, - portal, - subactor, - errors - ) - - # This is a "soft" (cancellable) join/reap which - # will remote cancel the actor on a ``trio.Cancelled`` - # condition. - await soft_wait( - proc, - trio.Process.wait, - portal - ) - - # cancel result waiter that may have been spawned in - # tandem if not done already - log.warning( - "Cancelling existing result waiter task for " - f"{subactor.uid}") - nursery.cancel_scope.cancel() - - finally: - # The "hard" reap since no actor zombies are allowed! - # XXX: do this **after** cancellation/tearfown to avoid - # killing the process too early. - if proc: - log.cancel(f'Hard reap sequence starting for {uid}') + except trio.Cancelled: + cancelled_during_spawn = True + # we may cancel before the child connects back in which + # case avoid clobbering the pdb tty. + if debug_mode(): with trio.CancelScope(shield=True): - # don't clobber an ongoing pdb - if cancelled_during_spawn: - # Try again to avoid TTY clobbering. - async with acquire_debug_lock(uid): + if is_root_process(): + await maybe_wait_for_debugger() + + elif proc is not None: + async with acquire_debug_lock(subactor.uid): + # soft wait on the proc to terminate with trio.move_on_after(0.5): await proc.wait() + raise - if is_root_process(): - await maybe_wait_for_debugger( - child_in_debug=_runtime_vars.get( - '_debug_mode', False), - ) + # a sub-proc ref **must** exist now + assert proc - if proc.poll() is None: - log.cancel(f"Attempting to hard kill {proc}") - await do_hard_kill(proc) - - log.debug(f"Joined {proc}") - else: - log.warning('Nursery cancelled before sub-proc started') - - if not cancelled_during_spawn: - # pop child entry to indicate we no longer managing this - # subactor - actor_nursery._children.pop(subactor.uid) - - else: - # `multiprocessing` - # async with trio.open_nursery() as nursery: - await mp_new_proc( - name=name, - actor_nursery=actor_nursery, - subactor=subactor, - errors=errors, - - # passed through to actor main - bind_addr=bind_addr, - parent_addr=parent_addr, - _runtime_vars=_runtime_vars, - infect_asyncio=infect_asyncio, - task_status=task_status, + portal = Portal(chan) + actor_nursery._children[subactor.uid] = ( + subactor, + proc, + portal, ) + # send additional init params + await chan.send({ + "_parent_main_data": subactor._parent_main_data, + "enable_modules": subactor.enable_modules, + "_arb_addr": subactor._arb_addr, + "bind_host": bind_addr[0], + "bind_port": bind_addr[1], + "_runtime_vars": _runtime_vars, + }) -async def mp_new_proc( + # track subactor in current nursery + curr_actor = current_actor() + curr_actor._actoruid2nursery[subactor.uid] = actor_nursery + + # resume caller at next checkpoint now that child is up + task_status.started(portal) + + # wait for ActorNursery.wait() to be called + with trio.CancelScope(shield=True): + await actor_nursery._join_procs.wait() + + async with trio.open_nursery() as nursery: + if portal in actor_nursery._cancel_after_result_on_exit: + nursery.start_soon( + cancel_on_completion, + portal, + subactor, + errors + ) + + # This is a "soft" (cancellable) join/reap which + # will remote cancel the actor on a ``trio.Cancelled`` + # condition. + await soft_wait( + proc, + trio.Process.wait, + portal + ) + + # cancel result waiter that may have been spawned in + # tandem if not done already + log.warning( + "Cancelling existing result waiter task for " + f"{subactor.uid}") + nursery.cancel_scope.cancel() + + finally: + # The "hard" reap since no actor zombies are allowed! + # XXX: do this **after** cancellation/tearfown to avoid + # killing the process too early. + if proc: + log.cancel(f'Hard reap sequence starting for {subactor.uid}') + with trio.CancelScope(shield=True): + + # don't clobber an ongoing pdb + if cancelled_during_spawn: + # Try again to avoid TTY clobbering. + async with acquire_debug_lock(subactor.uid): + with trio.move_on_after(0.5): + await proc.wait() + + if is_root_process(): + await maybe_wait_for_debugger( + child_in_debug=_runtime_vars.get( + '_debug_mode', False), + ) + + if proc.poll() is None: + log.cancel(f"Attempting to hard kill {proc}") + await do_hard_kill(proc) + + log.debug(f"Joined {proc}") + else: + log.warning('Nursery cancelled before sub-proc started') + + if not cancelled_during_spawn: + # pop child entry to indicate we no longer managing this + # subactor + actor_nursery._children.pop(subactor.uid) + + +async def mp_proc( name: str, actor_nursery: ActorNursery, # type: ignore # noqa @@ -608,4 +633,16 @@ async def mp_new_proc( log.debug(f"Joined {proc}") # pop child entry to indicate we are no longer managing subactor - subactor, proc, portal = actor_nursery._children.pop(subactor.uid) + actor_nursery._children.pop(subactor.uid) + + # TODO: prolly report to ``mypy`` how this causes all sorts of + # false errors.. + # subactor, proc, portal = actor_nursery._children.pop(subactor.uid) + + +# proc spawning backend target map +_methods: dict[str, Callable] = { + 'trio': trio_proc, + 'spawn': mp_proc, + 'forkserver': mp_proc, +} diff --git a/tractor/_supervise.py b/tractor/_supervise.py index 19c7100..4708e1e 100644 --- a/tractor/_supervise.py +++ b/tractor/_supervise.py @@ -92,7 +92,7 @@ class ActorNursery: tuple[str, str], tuple[ Actor, - mp.context.Process | trio.Process, + trio.Process | mp.Process, Optional[Portal], ] ] = {} From 5ab98513b7e8b27b5fb291a61a2ad1519715e329 Mon Sep 17 00:00:00 2001 From: Tyler Goodlet Date: Sun, 9 Oct 2022 13:40:42 -0400 Subject: [PATCH 03/11] Move `@tractor_test` into `conftest.py` --- tests/conftest.py | 87 ++++++++++++++++++++++++-- tractor/testing/__init__.py | 17 ----- tractor/testing/_tractor_test.py | 104 ------------------------------- 3 files changed, 83 insertions(+), 125 deletions(-) delete mode 100644 tractor/testing/__init__.py delete mode 100644 tractor/testing/_tractor_test.py diff --git a/tests/conftest.py b/tests/conftest.py index 3739eae..a459d8a 100644 --- a/tests/conftest.py +++ b/tests/conftest.py @@ -8,15 +8,94 @@ import random import signal import platform import time +import inspect +from functools import partial, wraps import pytest +import trio import tractor -# export for tests -from tractor.testing import tractor_test # noqa - - pytest_plugins = ['pytester'] + + +def tractor_test(fn): + """ + Use: + + @tractor_test + async def test_whatever(): + await ... + + If fixtures: + + - ``arb_addr`` (a socket addr tuple where arbiter is listening) + - ``loglevel`` (logging level passed to tractor internals) + - ``start_method`` (subprocess spawning backend) + + are defined in the `pytest` fixture space they will be automatically + injected to tests declaring these funcargs. + """ + @wraps(fn) + def wrapper( + *args, + loglevel=None, + arb_addr=None, + start_method=None, + **kwargs + ): + # __tracebackhide__ = True + + if 'arb_addr' in inspect.signature(fn).parameters: + # injects test suite fixture value to test as well + # as `run()` + kwargs['arb_addr'] = arb_addr + + if 'loglevel' in inspect.signature(fn).parameters: + # allows test suites to define a 'loglevel' fixture + # that activates the internal logging + kwargs['loglevel'] = loglevel + + if start_method is None: + if platform.system() == "Windows": + start_method = 'spawn' + else: + start_method = 'trio' + + if 'start_method' in inspect.signature(fn).parameters: + # set of subprocess spawning backends + kwargs['start_method'] = start_method + + if kwargs: + + # use explicit root actor start + + async def _main(): + async with tractor.open_root_actor( + # **kwargs, + arbiter_addr=arb_addr, + loglevel=loglevel, + start_method=start_method, + + # TODO: only enable when pytest is passed --pdb + # debug_mode=True, + + ) as actor: + await fn(*args, **kwargs) + + main = _main + + else: + # use implicit root actor start + main = partial(fn, *args, **kwargs) + + return trio.run(main) + # arbiter_addr=arb_addr, + # loglevel=loglevel, + # start_method=start_method, + # ) + + return wrapper + _arb_addr = '127.0.0.1', random.randint(1000, 9999) diff --git a/tractor/testing/__init__.py b/tractor/testing/__init__.py deleted file mode 100644 index 2361f99..0000000 --- a/tractor/testing/__init__.py +++ /dev/null @@ -1,17 +0,0 @@ -# tractor: structured concurrent "actors". -# Copyright 2018-eternity Tyler Goodlet. - -# This program is free software: you can redistribute it and/or modify -# it under the terms of the GNU Affero General Public License as published by -# the Free Software Foundation, either version 3 of the License, or -# (at your option) any later version. - -# This program is distributed in the hope that it will be useful, -# but WITHOUT ANY WARRANTY; without even the implied warranty of -# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the -# GNU Affero General Public License for more details. - -# You should have received a copy of the GNU Affero General Public License -# along with this program. If not, see . - -from ._tractor_test import tractor_test diff --git a/tractor/testing/_tractor_test.py b/tractor/testing/_tractor_test.py deleted file mode 100644 index ccfe349..0000000 --- a/tractor/testing/_tractor_test.py +++ /dev/null @@ -1,104 +0,0 @@ -# tractor: structured concurrent "actors". -# Copyright 2018-eternity Tyler Goodlet. - -# This program is free software: you can redistribute it and/or modify -# it under the terms of the GNU Affero General Public License as published by -# the Free Software Foundation, either version 3 of the License, or -# (at your option) any later version. - -# This program is distributed in the hope that it will be useful, -# but WITHOUT ANY WARRANTY; without even the implied warranty of -# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the -# GNU Affero General Public License for more details. - -# You should have received a copy of the GNU Affero General Public License -# along with this program. If not, see . - -import inspect -import platform -from functools import partial, wraps - -import trio -import tractor - - -__all__ = ['tractor_test'] - - -def tractor_test(fn): - """ - Use: - - @tractor_test - async def test_whatever(): - await ... - - If fixtures: - - - ``arb_addr`` (a socket addr tuple where arbiter is listening) - - ``loglevel`` (logging level passed to tractor internals) - - ``start_method`` (subprocess spawning backend) - - are defined in the `pytest` fixture space they will be automatically - injected to tests declaring these funcargs. - """ - @wraps(fn) - def wrapper( - *args, - loglevel=None, - arb_addr=None, - start_method=None, - **kwargs - ): - # __tracebackhide__ = True - - if 'arb_addr' in inspect.signature(fn).parameters: - # injects test suite fixture value to test as well - # as `run()` - kwargs['arb_addr'] = arb_addr - - if 'loglevel' in inspect.signature(fn).parameters: - # allows test suites to define a 'loglevel' fixture - # that activates the internal logging - kwargs['loglevel'] = loglevel - - if start_method is None: - if platform.system() == "Windows": - start_method = 'spawn' - else: - start_method = 'trio' - - if 'start_method' in inspect.signature(fn).parameters: - # set of subprocess spawning backends - kwargs['start_method'] = start_method - - if kwargs: - - # use explicit root actor start - - async def _main(): - async with tractor.open_root_actor( - # **kwargs, - arbiter_addr=arb_addr, - loglevel=loglevel, - start_method=start_method, - - # TODO: only enable when pytest is passed --pdb - # debug_mode=True, - - ) as actor: - await fn(*args, **kwargs) - - main = _main - - else: - # use implicit root actor start - main = partial(fn, *args, **kwargs) - - return trio.run(main) - # arbiter_addr=arb_addr, - # loglevel=loglevel, - # start_method=start_method, - # ) - - return wrapper From d24fae8381e9d0a9387675ed71745656951b3912 Mon Sep 17 00:00:00 2001 From: Tyler Goodlet Date: Sun, 9 Oct 2022 17:54:55 -0400 Subject: [PATCH 04/11] 'Rename mp spawn methods to have a `'mp_'` prefix' --- tests/conftest.py | 31 ++++++++++--------- tests/test_cancellation.py | 4 +-- tractor/_entry.py | 2 +- tractor/_spawn.py | 62 ++++++++++++++++++-------------------- 4 files changed, 49 insertions(+), 50 deletions(-) diff --git a/tests/conftest.py b/tests/conftest.py index a459d8a..152c6c2 100644 --- a/tests/conftest.py +++ b/tests/conftest.py @@ -57,8 +57,6 @@ def tractor_test(fn): if start_method is None: if platform.system() == "Windows": - start_method = 'spawn' - else: start_method = 'trio' if 'start_method' in inspect.signature(fn).parameters: @@ -79,7 +77,7 @@ def tractor_test(fn): # TODO: only enable when pytest is passed --pdb # debug_mode=True, - ) as actor: + ): await fn(*args, **kwargs) main = _main @@ -89,13 +87,10 @@ def tractor_test(fn): main = partial(fn, *args, **kwargs) return trio.run(main) - # arbiter_addr=arb_addr, - # loglevel=loglevel, - # start_method=start_method, - # ) return wrapper + _arb_addr = '127.0.0.1', random.randint(1000, 9999) @@ -143,11 +138,7 @@ def pytest_addoption(parser): def pytest_configure(config): backend = config.option.spawn_backend - - if backend == 'mp': - tractor._spawn.try_set_start_method('spawn') - elif backend == 'trio': - tractor._spawn.try_set_start_method(backend) + tractor._spawn.try_set_start_method(backend) @pytest.fixture(scope='session', autouse=True) @@ -181,13 +172,20 @@ def arb_addr(): def pytest_generate_tests(metafunc): spawn_backend = metafunc.config.option.spawn_backend + if not spawn_backend: # XXX some weird windows bug with `pytest`? - spawn_backend = 'mp' - assert spawn_backend in ('mp', 'trio') + spawn_backend = 'trio' + + assert spawn_backend in ( + 'mp_spawn', + 'mp_forkserver', + 'trio', + ) if 'start_method' in metafunc.fixturenames: - if spawn_backend == 'mp': + if 'mp' in spawn_backend: + from multiprocessing import get_all_start_methods methods = get_all_start_methods() if 'fork' in methods: @@ -195,6 +193,9 @@ def pytest_generate_tests(metafunc): # removing XXX: the fork method is in general # incompatible with trio's global scheduler state methods.remove('fork') + + methods = [f'mp_{meth}' for meth in methods] + elif spawn_backend == 'trio': methods = ['trio'] diff --git a/tests/test_cancellation.py b/tests/test_cancellation.py index e709531..21681f0 100644 --- a/tests/test_cancellation.py +++ b/tests/test_cancellation.py @@ -435,7 +435,7 @@ def test_cancel_via_SIGINT( with trio.fail_after(2): async with tractor.open_nursery() as tn: await tn.start_actor('sucka') - if spawn_backend == 'mp': + if 'mp' in spawn_backend: time.sleep(0.1) os.kill(pid, signal.SIGINT) await trio.sleep_forever() @@ -474,7 +474,7 @@ def test_cancel_via_SIGINT_other_task( with trio.fail_after(timeout): async with trio.open_nursery() as n: await n.start(spawn_and_sleep_forever) - if spawn_backend == 'mp': + if 'mp' in spawn_backend: time.sleep(0.1) os.kill(pid, signal.SIGINT) diff --git a/tractor/_entry.py b/tractor/_entry.py index 35a9abf..9e95fee 100644 --- a/tractor/_entry.py +++ b/tractor/_entry.py @@ -47,7 +47,7 @@ log = get_logger(__name__) def _mp_main( - actor: 'Actor', # type: ignore + actor: Actor, # type: ignore accept_addr: tuple[str, int], forkserver_info: tuple[Any, Any, Any, Any, Any], start_method: SpawnMethodKey, diff --git a/tractor/_spawn.py b/tractor/_spawn.py index 4defb58..4a9f118 100644 --- a/tractor/_spawn.py +++ b/tractor/_spawn.py @@ -44,7 +44,6 @@ from ._state import ( is_root_process, debug_mode, ) - from .log import get_logger from ._portal import Portal from ._runtime import Actor @@ -62,9 +61,9 @@ log = get_logger('tractor') # placeholder for an mp start context if so using that backend _ctx: Optional[mp.context.BaseContext] = None SpawnMethodKey = Literal[ - 'trio', - 'spawn', - 'forkserver', + 'trio', # supported on all platforms + 'mp_spawn', + 'mp_forkserver', # posix only ] _spawn_method: SpawnMethodKey = 'trio' @@ -84,7 +83,7 @@ else: def try_set_start_method( - name: SpawnMethodKey + key: SpawnMethodKey ) -> Optional[mp.context.BaseContext]: ''' @@ -101,29 +100,30 @@ def try_set_start_method( global _ctx global _spawn_method - methods = mp.get_all_start_methods() - if 'fork' in methods: + mp_methods = mp.get_all_start_methods() + if 'fork' in mp_methods: # forking is incompatible with ``trio``s global task tree - methods.remove('fork') + mp_methods.remove('fork') - # supported on all platforms - methods += ['trio'] + match key: + case 'mp_forkserver': + from . import _forkserver_override + _forkserver_override.override_stdlib() + _ctx = mp.get_context('forkserver') - if name not in methods: - raise ValueError( - f"Spawn method `{name}` is invalid please choose one of {methods}" - ) - elif name == 'forkserver': - from . import _forkserver_override - _forkserver_override.override_stdlib() - _ctx = mp.get_context(name) + case 'mp_spawn': + _ctx = mp.get_context('spawn') - elif name == 'trio': - _ctx = None - else: - _ctx = mp.get_context(name) + case 'trio': + _ctx = None - _spawn_method = name + case _: + raise ValueError( + f'Spawn method `{key}` is invalid!\n' + f'Please choose one of {SpawnMethodKey}' + ) + + _spawn_method = key return _ctx @@ -299,7 +299,6 @@ async def new_proc( async def trio_proc( - name: str, actor_nursery: ActorNursery, subactor: Actor, @@ -309,9 +308,7 @@ async def trio_proc( bind_addr: tuple[str, int], parent_addr: tuple[str, int], _runtime_vars: dict[str, Any], # serialized and sent to _child - *, - infect_asyncio: bool = False, task_status: TaskStatus[Portal] = trio.TASK_STATUS_IGNORED @@ -475,7 +472,6 @@ async def trio_proc( async def mp_proc( - name: str, actor_nursery: ActorNursery, # type: ignore # noqa subactor: Actor, @@ -502,6 +498,7 @@ async def mp_proc( assert _ctx start_method = _ctx.get_start_method() if start_method == 'forkserver': + from multiprocessing import forkserver # type: ignore # XXX do our hackery on the stdlib to avoid multiple # forkservers (one at each subproc layer). @@ -521,7 +518,7 @@ async def mp_proc( resource_tracker._resource_tracker, '_pid', None), resource_tracker._resource_tracker._fd, ) - else: + else: # request to forkerserver to fork a new child assert curr_actor._forkserver_info fs_info = ( fs._forkserver_address, # type: ignore # noqa @@ -531,6 +528,7 @@ async def mp_proc( resource_tracker._resource_tracker._fd, ) = curr_actor._forkserver_info else: + # spawn method fs_info = (None, None, None, None, None) proc: mp.Process = _ctx.Process( # type: ignore @@ -539,7 +537,7 @@ async def mp_proc( subactor, bind_addr, fs_info, - start_method, + _spawn_method, parent_addr, infect_asyncio, ), @@ -641,8 +639,8 @@ async def mp_proc( # proc spawning backend target map -_methods: dict[str, Callable] = { +_methods: dict[SpawnMethodKey, Callable] = { 'trio': trio_proc, - 'spawn': mp_proc, - 'forkserver': mp_proc, + 'mp_spawn': mp_proc, + 'mp_forkserver': mp_proc, } From 023b6fc84502c3db2e8ae852b0f73daa394dd2ec Mon Sep 17 00:00:00 2001 From: Tyler Goodlet Date: Sun, 9 Oct 2022 17:57:02 -0400 Subject: [PATCH 05/11] Drop `tractor.testing` sub-package --- setup.py | 1 - 1 file changed, 1 deletion(-) diff --git a/setup.py b/setup.py index 7a46d98..7e966ec 100755 --- a/setup.py +++ b/setup.py @@ -38,7 +38,6 @@ setup( 'tractor', 'tractor.experimental', 'tractor.trionics', - 'tractor.testing', ], install_requires=[ From 2c20b2d64f8a5868f70cb59992d52dfd8a4f27ae Mon Sep 17 00:00:00 2001 From: Tyler Goodlet Date: Sun, 9 Oct 2022 18:03:17 -0400 Subject: [PATCH 06/11] Fix import to load from `conftest.py` --- tests/test_pubsub.py | 3 ++- tests/test_streaming.py | 3 ++- 2 files changed, 4 insertions(+), 2 deletions(-) diff --git a/tests/test_pubsub.py b/tests/test_pubsub.py index 390ca29..ababcb5 100644 --- a/tests/test_pubsub.py +++ b/tests/test_pubsub.py @@ -4,9 +4,10 @@ from itertools import cycle import pytest import trio import tractor -from tractor.testing import tractor_test from tractor.experimental import msgpub +from conftest import tractor_test + def test_type_checks(): diff --git a/tests/test_streaming.py b/tests/test_streaming.py index 0d3ee1f..4e54e02 100644 --- a/tests/test_streaming.py +++ b/tests/test_streaming.py @@ -7,9 +7,10 @@ import platform import trio import tractor -from tractor.testing import tractor_test import pytest +from conftest import tractor_test + def test_must_define_ctx(): From b19f08d9f013a839fdb12045ff87af371a2a1e27 Mon Sep 17 00:00:00 2001 From: Tyler Goodlet Date: Sun, 9 Oct 2022 18:08:07 -0400 Subject: [PATCH 07/11] Fill out new backend names in ci script --- .github/workflows/ci.yml | 6 +++++- 1 file changed, 5 insertions(+), 1 deletion(-) diff --git a/.github/workflows/ci.yml b/.github/workflows/ci.yml index 15a2763..c6d69ed 100644 --- a/.github/workflows/ci.yml +++ b/.github/workflows/ci.yml @@ -60,7 +60,11 @@ jobs: matrix: os: [ubuntu-latest] python: ['3.10'] - spawn_backend: ['trio', 'mp'] + spawn_backend: [ + 'trio', + 'mp_spawn', + 'mp_forkserver', + ] steps: From 7e5bb0437ef18c5ede67552afee424dbcb77d968 Mon Sep 17 00:00:00 2001 From: Tyler Goodlet Date: Sun, 9 Oct 2022 18:13:45 -0400 Subject: [PATCH 08/11] Go to latest `mypy` version in CI --- .github/workflows/ci.yml | 2 +- requirements-test.txt | 4 ++-- 2 files changed, 3 insertions(+), 3 deletions(-) diff --git a/.github/workflows/ci.yml b/.github/workflows/ci.yml index c6d69ed..9d91f17 100644 --- a/.github/workflows/ci.yml +++ b/.github/workflows/ci.yml @@ -26,7 +26,7 @@ jobs: run: pip install -U . --upgrade-strategy eager -r requirements-test.txt - name: Run MyPy check - run: mypy tractor/ --ignore-missing-imports + run: mypy tractor/ --ignore-missing-imports --show-traceback # test that we can generate a software distribution and install it # thus avoid missing file issues after packaging. diff --git a/requirements-test.txt b/requirements-test.txt index c2b43c1..579b6f0 100644 --- a/requirements-test.txt +++ b/requirements-test.txt @@ -2,7 +2,7 @@ pytest pytest-trio pytest-timeout pdbpp -mypy<0.920 -trio_typing<0.7.0 +mypy +trio_typing pexpect towncrier From 4d808757a633866ad611bb2c1df67d5635d5c831 Mon Sep 17 00:00:00 2001 From: Tyler Goodlet Date: Sun, 9 Oct 2022 18:22:55 -0400 Subject: [PATCH 09/11] Fix start method name in logging propagation test --- tests/test_spawning.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/tests/test_spawning.py b/tests/test_spawning.py index 3e5ab1b..17798c0 100644 --- a/tests/test_spawning.py +++ b/tests/test_spawning.py @@ -142,7 +142,7 @@ def test_loglevel_propagated_to_subactor( capfd, arb_addr, ): - if start_method == 'forkserver': + if start_method == 'mp_forkserver': pytest.skip( "a bug with `capfd` seems to make forkserver capture not work?") From 93b9d2dc2d19889d69e556fa99e53bb8a726d53b Mon Sep 17 00:00:00 2001 From: Tyler Goodlet Date: Sun, 9 Oct 2022 18:29:50 -0400 Subject: [PATCH 10/11] Drop dynamic backend-spawn-method test generation --- tests/conftest.py | 22 ++++++---------------- 1 file changed, 6 insertions(+), 16 deletions(-) diff --git a/tests/conftest.py b/tests/conftest.py index 152c6c2..9b26de6 100644 --- a/tests/conftest.py +++ b/tests/conftest.py @@ -177,29 +177,19 @@ def pytest_generate_tests(metafunc): # XXX some weird windows bug with `pytest`? spawn_backend = 'trio' + # TODO: maybe just use the literal `._spawn.SpawnMethodKey`? assert spawn_backend in ( 'mp_spawn', 'mp_forkserver', 'trio', ) + # NOTE: used to be used to dyanmically parametrize tests for when + # you just passed --spawn-backend=`mp` on the cli, but now we expect + # that cli input to be manually specified, BUT, maybe we'll do + # something like this again in the future? if 'start_method' in metafunc.fixturenames: - if 'mp' in spawn_backend: - - from multiprocessing import get_all_start_methods - methods = get_all_start_methods() - if 'fork' in methods: - # fork not available on windows, so check before - # removing XXX: the fork method is in general - # incompatible with trio's global scheduler state - methods.remove('fork') - - methods = [f'mp_{meth}' for meth in methods] - - elif spawn_backend == 'trio': - methods = ['trio'] - - metafunc.parametrize("start_method", methods, scope='module') + metafunc.parametrize("start_method", [spawn_backend], scope='module') def sig_prog(proc, sig): From b1abec543fcdcaa45c2193ba5ff49bb2b0b4de92 Mon Sep 17 00:00:00 2001 From: Tyler Goodlet Date: Sun, 9 Oct 2022 18:51:31 -0400 Subject: [PATCH 11/11] Add trivial news snippet --- nooz/335.trivial.rst | 5 +++++ 1 file changed, 5 insertions(+) create mode 100644 nooz/335.trivial.rst diff --git a/nooz/335.trivial.rst b/nooz/335.trivial.rst new file mode 100644 index 0000000..1030f4d --- /dev/null +++ b/nooz/335.trivial.rst @@ -0,0 +1,5 @@ +Establish an explicit "backend spawning" method table; use it from CI + +More clearly lays out the current set of (3) backends: ``['trio', +'mp_spawn', 'mp_forkserver']`` and adjusts the ``._spawn.py`` internals +as well as the test suite to accommodate.