From d24fae8381e9d0a9387675ed71745656951b3912 Mon Sep 17 00:00:00 2001 From: Tyler Goodlet Date: Sun, 9 Oct 2022 17:54:55 -0400 Subject: [PATCH] 'Rename mp spawn methods to have a `'mp_'` prefix' --- tests/conftest.py | 31 ++++++++++--------- tests/test_cancellation.py | 4 +-- tractor/_entry.py | 2 +- tractor/_spawn.py | 62 ++++++++++++++++++-------------------- 4 files changed, 49 insertions(+), 50 deletions(-) diff --git a/tests/conftest.py b/tests/conftest.py index a459d8a..152c6c2 100644 --- a/tests/conftest.py +++ b/tests/conftest.py @@ -57,8 +57,6 @@ def tractor_test(fn): if start_method is None: if platform.system() == "Windows": - start_method = 'spawn' - else: start_method = 'trio' if 'start_method' in inspect.signature(fn).parameters: @@ -79,7 +77,7 @@ def tractor_test(fn): # TODO: only enable when pytest is passed --pdb # debug_mode=True, - ) as actor: + ): await fn(*args, **kwargs) main = _main @@ -89,13 +87,10 @@ def tractor_test(fn): main = partial(fn, *args, **kwargs) return trio.run(main) - # arbiter_addr=arb_addr, - # loglevel=loglevel, - # start_method=start_method, - # ) return wrapper + _arb_addr = '127.0.0.1', random.randint(1000, 9999) @@ -143,11 +138,7 @@ def pytest_addoption(parser): def pytest_configure(config): backend = config.option.spawn_backend - - if backend == 'mp': - tractor._spawn.try_set_start_method('spawn') - elif backend == 'trio': - tractor._spawn.try_set_start_method(backend) + tractor._spawn.try_set_start_method(backend) @pytest.fixture(scope='session', autouse=True) @@ -181,13 +172,20 @@ def arb_addr(): def pytest_generate_tests(metafunc): spawn_backend = metafunc.config.option.spawn_backend + if not spawn_backend: # XXX some weird windows bug with `pytest`? - spawn_backend = 'mp' - assert spawn_backend in ('mp', 'trio') + spawn_backend = 'trio' + + assert spawn_backend in ( + 'mp_spawn', + 'mp_forkserver', + 'trio', + ) if 'start_method' in metafunc.fixturenames: - if spawn_backend == 'mp': + if 'mp' in spawn_backend: + from multiprocessing import get_all_start_methods methods = get_all_start_methods() if 'fork' in methods: @@ -195,6 +193,9 @@ def pytest_generate_tests(metafunc): # removing XXX: the fork method is in general # incompatible with trio's global scheduler state methods.remove('fork') + + methods = [f'mp_{meth}' for meth in methods] + elif spawn_backend == 'trio': methods = ['trio'] diff --git a/tests/test_cancellation.py b/tests/test_cancellation.py index e709531..21681f0 100644 --- a/tests/test_cancellation.py +++ b/tests/test_cancellation.py @@ -435,7 +435,7 @@ def test_cancel_via_SIGINT( with trio.fail_after(2): async with tractor.open_nursery() as tn: await tn.start_actor('sucka') - if spawn_backend == 'mp': + if 'mp' in spawn_backend: time.sleep(0.1) os.kill(pid, signal.SIGINT) await trio.sleep_forever() @@ -474,7 +474,7 @@ def test_cancel_via_SIGINT_other_task( with trio.fail_after(timeout): async with trio.open_nursery() as n: await n.start(spawn_and_sleep_forever) - if spawn_backend == 'mp': + if 'mp' in spawn_backend: time.sleep(0.1) os.kill(pid, signal.SIGINT) diff --git a/tractor/_entry.py b/tractor/_entry.py index 35a9abf..9e95fee 100644 --- a/tractor/_entry.py +++ b/tractor/_entry.py @@ -47,7 +47,7 @@ log = get_logger(__name__) def _mp_main( - actor: 'Actor', # type: ignore + actor: Actor, # type: ignore accept_addr: tuple[str, int], forkserver_info: tuple[Any, Any, Any, Any, Any], start_method: SpawnMethodKey, diff --git a/tractor/_spawn.py b/tractor/_spawn.py index 4defb58..4a9f118 100644 --- a/tractor/_spawn.py +++ b/tractor/_spawn.py @@ -44,7 +44,6 @@ from ._state import ( is_root_process, debug_mode, ) - from .log import get_logger from ._portal import Portal from ._runtime import Actor @@ -62,9 +61,9 @@ log = get_logger('tractor') # placeholder for an mp start context if so using that backend _ctx: Optional[mp.context.BaseContext] = None SpawnMethodKey = Literal[ - 'trio', - 'spawn', - 'forkserver', + 'trio', # supported on all platforms + 'mp_spawn', + 'mp_forkserver', # posix only ] _spawn_method: SpawnMethodKey = 'trio' @@ -84,7 +83,7 @@ else: def try_set_start_method( - name: SpawnMethodKey + key: SpawnMethodKey ) -> Optional[mp.context.BaseContext]: ''' @@ -101,29 +100,30 @@ def try_set_start_method( global _ctx global _spawn_method - methods = mp.get_all_start_methods() - if 'fork' in methods: + mp_methods = mp.get_all_start_methods() + if 'fork' in mp_methods: # forking is incompatible with ``trio``s global task tree - methods.remove('fork') + mp_methods.remove('fork') - # supported on all platforms - methods += ['trio'] + match key: + case 'mp_forkserver': + from . import _forkserver_override + _forkserver_override.override_stdlib() + _ctx = mp.get_context('forkserver') - if name not in methods: - raise ValueError( - f"Spawn method `{name}` is invalid please choose one of {methods}" - ) - elif name == 'forkserver': - from . import _forkserver_override - _forkserver_override.override_stdlib() - _ctx = mp.get_context(name) + case 'mp_spawn': + _ctx = mp.get_context('spawn') - elif name == 'trio': - _ctx = None - else: - _ctx = mp.get_context(name) + case 'trio': + _ctx = None - _spawn_method = name + case _: + raise ValueError( + f'Spawn method `{key}` is invalid!\n' + f'Please choose one of {SpawnMethodKey}' + ) + + _spawn_method = key return _ctx @@ -299,7 +299,6 @@ async def new_proc( async def trio_proc( - name: str, actor_nursery: ActorNursery, subactor: Actor, @@ -309,9 +308,7 @@ async def trio_proc( bind_addr: tuple[str, int], parent_addr: tuple[str, int], _runtime_vars: dict[str, Any], # serialized and sent to _child - *, - infect_asyncio: bool = False, task_status: TaskStatus[Portal] = trio.TASK_STATUS_IGNORED @@ -475,7 +472,6 @@ async def trio_proc( async def mp_proc( - name: str, actor_nursery: ActorNursery, # type: ignore # noqa subactor: Actor, @@ -502,6 +498,7 @@ async def mp_proc( assert _ctx start_method = _ctx.get_start_method() if start_method == 'forkserver': + from multiprocessing import forkserver # type: ignore # XXX do our hackery on the stdlib to avoid multiple # forkservers (one at each subproc layer). @@ -521,7 +518,7 @@ async def mp_proc( resource_tracker._resource_tracker, '_pid', None), resource_tracker._resource_tracker._fd, ) - else: + else: # request to forkerserver to fork a new child assert curr_actor._forkserver_info fs_info = ( fs._forkserver_address, # type: ignore # noqa @@ -531,6 +528,7 @@ async def mp_proc( resource_tracker._resource_tracker._fd, ) = curr_actor._forkserver_info else: + # spawn method fs_info = (None, None, None, None, None) proc: mp.Process = _ctx.Process( # type: ignore @@ -539,7 +537,7 @@ async def mp_proc( subactor, bind_addr, fs_info, - start_method, + _spawn_method, parent_addr, infect_asyncio, ), @@ -641,8 +639,8 @@ async def mp_proc( # proc spawning backend target map -_methods: dict[str, Callable] = { +_methods: dict[SpawnMethodKey, Callable] = { 'trio': trio_proc, - 'spawn': mp_proc, - 'forkserver': mp_proc, + 'mp_spawn': mp_proc, + 'mp_forkserver': mp_proc, }