forked from goodboy/tractor
'Rename mp spawn methods to have a `'mp_'` prefix'
parent
5ab98513b7
commit
d24fae8381
|
@ -57,8 +57,6 @@ def tractor_test(fn):
|
||||||
|
|
||||||
if start_method is None:
|
if start_method is None:
|
||||||
if platform.system() == "Windows":
|
if platform.system() == "Windows":
|
||||||
start_method = 'spawn'
|
|
||||||
else:
|
|
||||||
start_method = 'trio'
|
start_method = 'trio'
|
||||||
|
|
||||||
if 'start_method' in inspect.signature(fn).parameters:
|
if 'start_method' in inspect.signature(fn).parameters:
|
||||||
|
@ -79,7 +77,7 @@ def tractor_test(fn):
|
||||||
# TODO: only enable when pytest is passed --pdb
|
# TODO: only enable when pytest is passed --pdb
|
||||||
# debug_mode=True,
|
# debug_mode=True,
|
||||||
|
|
||||||
) as actor:
|
):
|
||||||
await fn(*args, **kwargs)
|
await fn(*args, **kwargs)
|
||||||
|
|
||||||
main = _main
|
main = _main
|
||||||
|
@ -89,13 +87,10 @@ def tractor_test(fn):
|
||||||
main = partial(fn, *args, **kwargs)
|
main = partial(fn, *args, **kwargs)
|
||||||
|
|
||||||
return trio.run(main)
|
return trio.run(main)
|
||||||
# arbiter_addr=arb_addr,
|
|
||||||
# loglevel=loglevel,
|
|
||||||
# start_method=start_method,
|
|
||||||
# )
|
|
||||||
|
|
||||||
return wrapper
|
return wrapper
|
||||||
|
|
||||||
|
|
||||||
_arb_addr = '127.0.0.1', random.randint(1000, 9999)
|
_arb_addr = '127.0.0.1', random.randint(1000, 9999)
|
||||||
|
|
||||||
|
|
||||||
|
@ -143,11 +138,7 @@ def pytest_addoption(parser):
|
||||||
|
|
||||||
def pytest_configure(config):
|
def pytest_configure(config):
|
||||||
backend = config.option.spawn_backend
|
backend = config.option.spawn_backend
|
||||||
|
tractor._spawn.try_set_start_method(backend)
|
||||||
if backend == 'mp':
|
|
||||||
tractor._spawn.try_set_start_method('spawn')
|
|
||||||
elif backend == 'trio':
|
|
||||||
tractor._spawn.try_set_start_method(backend)
|
|
||||||
|
|
||||||
|
|
||||||
@pytest.fixture(scope='session', autouse=True)
|
@pytest.fixture(scope='session', autouse=True)
|
||||||
|
@ -181,13 +172,20 @@ def arb_addr():
|
||||||
|
|
||||||
def pytest_generate_tests(metafunc):
|
def pytest_generate_tests(metafunc):
|
||||||
spawn_backend = metafunc.config.option.spawn_backend
|
spawn_backend = metafunc.config.option.spawn_backend
|
||||||
|
|
||||||
if not spawn_backend:
|
if not spawn_backend:
|
||||||
# XXX some weird windows bug with `pytest`?
|
# XXX some weird windows bug with `pytest`?
|
||||||
spawn_backend = 'mp'
|
spawn_backend = 'trio'
|
||||||
assert spawn_backend in ('mp', 'trio')
|
|
||||||
|
assert spawn_backend in (
|
||||||
|
'mp_spawn',
|
||||||
|
'mp_forkserver',
|
||||||
|
'trio',
|
||||||
|
)
|
||||||
|
|
||||||
if 'start_method' in metafunc.fixturenames:
|
if 'start_method' in metafunc.fixturenames:
|
||||||
if spawn_backend == 'mp':
|
if 'mp' in spawn_backend:
|
||||||
|
|
||||||
from multiprocessing import get_all_start_methods
|
from multiprocessing import get_all_start_methods
|
||||||
methods = get_all_start_methods()
|
methods = get_all_start_methods()
|
||||||
if 'fork' in methods:
|
if 'fork' in methods:
|
||||||
|
@ -195,6 +193,9 @@ def pytest_generate_tests(metafunc):
|
||||||
# removing XXX: the fork method is in general
|
# removing XXX: the fork method is in general
|
||||||
# incompatible with trio's global scheduler state
|
# incompatible with trio's global scheduler state
|
||||||
methods.remove('fork')
|
methods.remove('fork')
|
||||||
|
|
||||||
|
methods = [f'mp_{meth}' for meth in methods]
|
||||||
|
|
||||||
elif spawn_backend == 'trio':
|
elif spawn_backend == 'trio':
|
||||||
methods = ['trio']
|
methods = ['trio']
|
||||||
|
|
||||||
|
|
|
@ -435,7 +435,7 @@ def test_cancel_via_SIGINT(
|
||||||
with trio.fail_after(2):
|
with trio.fail_after(2):
|
||||||
async with tractor.open_nursery() as tn:
|
async with tractor.open_nursery() as tn:
|
||||||
await tn.start_actor('sucka')
|
await tn.start_actor('sucka')
|
||||||
if spawn_backend == 'mp':
|
if 'mp' in spawn_backend:
|
||||||
time.sleep(0.1)
|
time.sleep(0.1)
|
||||||
os.kill(pid, signal.SIGINT)
|
os.kill(pid, signal.SIGINT)
|
||||||
await trio.sleep_forever()
|
await trio.sleep_forever()
|
||||||
|
@ -474,7 +474,7 @@ def test_cancel_via_SIGINT_other_task(
|
||||||
with trio.fail_after(timeout):
|
with trio.fail_after(timeout):
|
||||||
async with trio.open_nursery() as n:
|
async with trio.open_nursery() as n:
|
||||||
await n.start(spawn_and_sleep_forever)
|
await n.start(spawn_and_sleep_forever)
|
||||||
if spawn_backend == 'mp':
|
if 'mp' in spawn_backend:
|
||||||
time.sleep(0.1)
|
time.sleep(0.1)
|
||||||
os.kill(pid, signal.SIGINT)
|
os.kill(pid, signal.SIGINT)
|
||||||
|
|
||||||
|
|
|
@ -47,7 +47,7 @@ log = get_logger(__name__)
|
||||||
|
|
||||||
def _mp_main(
|
def _mp_main(
|
||||||
|
|
||||||
actor: 'Actor', # type: ignore
|
actor: Actor, # type: ignore
|
||||||
accept_addr: tuple[str, int],
|
accept_addr: tuple[str, int],
|
||||||
forkserver_info: tuple[Any, Any, Any, Any, Any],
|
forkserver_info: tuple[Any, Any, Any, Any, Any],
|
||||||
start_method: SpawnMethodKey,
|
start_method: SpawnMethodKey,
|
||||||
|
|
|
@ -44,7 +44,6 @@ from ._state import (
|
||||||
is_root_process,
|
is_root_process,
|
||||||
debug_mode,
|
debug_mode,
|
||||||
)
|
)
|
||||||
|
|
||||||
from .log import get_logger
|
from .log import get_logger
|
||||||
from ._portal import Portal
|
from ._portal import Portal
|
||||||
from ._runtime import Actor
|
from ._runtime import Actor
|
||||||
|
@ -62,9 +61,9 @@ log = get_logger('tractor')
|
||||||
# placeholder for an mp start context if so using that backend
|
# placeholder for an mp start context if so using that backend
|
||||||
_ctx: Optional[mp.context.BaseContext] = None
|
_ctx: Optional[mp.context.BaseContext] = None
|
||||||
SpawnMethodKey = Literal[
|
SpawnMethodKey = Literal[
|
||||||
'trio',
|
'trio', # supported on all platforms
|
||||||
'spawn',
|
'mp_spawn',
|
||||||
'forkserver',
|
'mp_forkserver', # posix only
|
||||||
]
|
]
|
||||||
_spawn_method: SpawnMethodKey = 'trio'
|
_spawn_method: SpawnMethodKey = 'trio'
|
||||||
|
|
||||||
|
@ -84,7 +83,7 @@ else:
|
||||||
|
|
||||||
|
|
||||||
def try_set_start_method(
|
def try_set_start_method(
|
||||||
name: SpawnMethodKey
|
key: SpawnMethodKey
|
||||||
|
|
||||||
) -> Optional[mp.context.BaseContext]:
|
) -> Optional[mp.context.BaseContext]:
|
||||||
'''
|
'''
|
||||||
|
@ -101,29 +100,30 @@ def try_set_start_method(
|
||||||
global _ctx
|
global _ctx
|
||||||
global _spawn_method
|
global _spawn_method
|
||||||
|
|
||||||
methods = mp.get_all_start_methods()
|
mp_methods = mp.get_all_start_methods()
|
||||||
if 'fork' in methods:
|
if 'fork' in mp_methods:
|
||||||
# forking is incompatible with ``trio``s global task tree
|
# forking is incompatible with ``trio``s global task tree
|
||||||
methods.remove('fork')
|
mp_methods.remove('fork')
|
||||||
|
|
||||||
# supported on all platforms
|
match key:
|
||||||
methods += ['trio']
|
case 'mp_forkserver':
|
||||||
|
from . import _forkserver_override
|
||||||
|
_forkserver_override.override_stdlib()
|
||||||
|
_ctx = mp.get_context('forkserver')
|
||||||
|
|
||||||
if name not in methods:
|
case 'mp_spawn':
|
||||||
raise ValueError(
|
_ctx = mp.get_context('spawn')
|
||||||
f"Spawn method `{name}` is invalid please choose one of {methods}"
|
|
||||||
)
|
|
||||||
elif name == 'forkserver':
|
|
||||||
from . import _forkserver_override
|
|
||||||
_forkserver_override.override_stdlib()
|
|
||||||
_ctx = mp.get_context(name)
|
|
||||||
|
|
||||||
elif name == 'trio':
|
case 'trio':
|
||||||
_ctx = None
|
_ctx = None
|
||||||
else:
|
|
||||||
_ctx = mp.get_context(name)
|
|
||||||
|
|
||||||
_spawn_method = name
|
case _:
|
||||||
|
raise ValueError(
|
||||||
|
f'Spawn method `{key}` is invalid!\n'
|
||||||
|
f'Please choose one of {SpawnMethodKey}'
|
||||||
|
)
|
||||||
|
|
||||||
|
_spawn_method = key
|
||||||
return _ctx
|
return _ctx
|
||||||
|
|
||||||
|
|
||||||
|
@ -299,7 +299,6 @@ async def new_proc(
|
||||||
|
|
||||||
|
|
||||||
async def trio_proc(
|
async def trio_proc(
|
||||||
|
|
||||||
name: str,
|
name: str,
|
||||||
actor_nursery: ActorNursery,
|
actor_nursery: ActorNursery,
|
||||||
subactor: Actor,
|
subactor: Actor,
|
||||||
|
@ -309,9 +308,7 @@ async def trio_proc(
|
||||||
bind_addr: tuple[str, int],
|
bind_addr: tuple[str, int],
|
||||||
parent_addr: tuple[str, int],
|
parent_addr: tuple[str, int],
|
||||||
_runtime_vars: dict[str, Any], # serialized and sent to _child
|
_runtime_vars: dict[str, Any], # serialized and sent to _child
|
||||||
|
|
||||||
*,
|
*,
|
||||||
|
|
||||||
infect_asyncio: bool = False,
|
infect_asyncio: bool = False,
|
||||||
task_status: TaskStatus[Portal] = trio.TASK_STATUS_IGNORED
|
task_status: TaskStatus[Portal] = trio.TASK_STATUS_IGNORED
|
||||||
|
|
||||||
|
@ -475,7 +472,6 @@ async def trio_proc(
|
||||||
|
|
||||||
|
|
||||||
async def mp_proc(
|
async def mp_proc(
|
||||||
|
|
||||||
name: str,
|
name: str,
|
||||||
actor_nursery: ActorNursery, # type: ignore # noqa
|
actor_nursery: ActorNursery, # type: ignore # noqa
|
||||||
subactor: Actor,
|
subactor: Actor,
|
||||||
|
@ -502,6 +498,7 @@ async def mp_proc(
|
||||||
assert _ctx
|
assert _ctx
|
||||||
start_method = _ctx.get_start_method()
|
start_method = _ctx.get_start_method()
|
||||||
if start_method == 'forkserver':
|
if start_method == 'forkserver':
|
||||||
|
|
||||||
from multiprocessing import forkserver # type: ignore
|
from multiprocessing import forkserver # type: ignore
|
||||||
# XXX do our hackery on the stdlib to avoid multiple
|
# XXX do our hackery on the stdlib to avoid multiple
|
||||||
# forkservers (one at each subproc layer).
|
# forkservers (one at each subproc layer).
|
||||||
|
@ -521,7 +518,7 @@ async def mp_proc(
|
||||||
resource_tracker._resource_tracker, '_pid', None),
|
resource_tracker._resource_tracker, '_pid', None),
|
||||||
resource_tracker._resource_tracker._fd,
|
resource_tracker._resource_tracker._fd,
|
||||||
)
|
)
|
||||||
else:
|
else: # request to forkerserver to fork a new child
|
||||||
assert curr_actor._forkserver_info
|
assert curr_actor._forkserver_info
|
||||||
fs_info = (
|
fs_info = (
|
||||||
fs._forkserver_address, # type: ignore # noqa
|
fs._forkserver_address, # type: ignore # noqa
|
||||||
|
@ -531,6 +528,7 @@ async def mp_proc(
|
||||||
resource_tracker._resource_tracker._fd,
|
resource_tracker._resource_tracker._fd,
|
||||||
) = curr_actor._forkserver_info
|
) = curr_actor._forkserver_info
|
||||||
else:
|
else:
|
||||||
|
# spawn method
|
||||||
fs_info = (None, None, None, None, None)
|
fs_info = (None, None, None, None, None)
|
||||||
|
|
||||||
proc: mp.Process = _ctx.Process( # type: ignore
|
proc: mp.Process = _ctx.Process( # type: ignore
|
||||||
|
@ -539,7 +537,7 @@ async def mp_proc(
|
||||||
subactor,
|
subactor,
|
||||||
bind_addr,
|
bind_addr,
|
||||||
fs_info,
|
fs_info,
|
||||||
start_method,
|
_spawn_method,
|
||||||
parent_addr,
|
parent_addr,
|
||||||
infect_asyncio,
|
infect_asyncio,
|
||||||
),
|
),
|
||||||
|
@ -641,8 +639,8 @@ async def mp_proc(
|
||||||
|
|
||||||
|
|
||||||
# proc spawning backend target map
|
# proc spawning backend target map
|
||||||
_methods: dict[str, Callable] = {
|
_methods: dict[SpawnMethodKey, Callable] = {
|
||||||
'trio': trio_proc,
|
'trio': trio_proc,
|
||||||
'spawn': mp_proc,
|
'mp_spawn': mp_proc,
|
||||||
'forkserver': mp_proc,
|
'mp_forkserver': mp_proc,
|
||||||
}
|
}
|
||||||
|
|
Loading…
Reference in New Issue