Be explicit about the spawning backend default
Set `trio-run-in-process` as the default on *nix systems and `multiprocessing`'s spawn method on Windows. Enable overriding the default choice using `tractor._spawn.try_set_start_method()`. Allows for easy runs of the test suite using a user chosen backend.try_trip^2
parent
783fe53b06
commit
27c9760f96
|
@ -4,7 +4,6 @@ tractor: An actor model micro-framework built on
|
||||||
"""
|
"""
|
||||||
import importlib
|
import importlib
|
||||||
from functools import partial
|
from functools import partial
|
||||||
import platform
|
|
||||||
from typing import Tuple, Any, Optional
|
from typing import Tuple, Any, Optional
|
||||||
import typing
|
import typing
|
||||||
|
|
||||||
|
@ -103,15 +102,14 @@ def run(
|
||||||
# either the `multiprocessing` start method:
|
# either the `multiprocessing` start method:
|
||||||
# https://docs.python.org/3/library/multiprocessing.html#contexts-and-start-methods
|
# https://docs.python.org/3/library/multiprocessing.html#contexts-and-start-methods
|
||||||
# OR `trio_run_in_process` (the new default).
|
# OR `trio_run_in_process` (the new default).
|
||||||
start_method: str = 'trio_run_in_process',
|
start_method: Optional[str] = None,
|
||||||
**kwargs,
|
**kwargs,
|
||||||
) -> Any:
|
) -> Any:
|
||||||
"""Run a trio-actor async function in process.
|
"""Run a trio-actor async function in process.
|
||||||
|
|
||||||
This is tractor's main entry and the start point for any async actor.
|
This is tractor's main entry and the start point for any async actor.
|
||||||
"""
|
"""
|
||||||
if platform.system() == 'Windows':
|
if start_method is not None:
|
||||||
start_method = 'spawn' # only one supported for now
|
|
||||||
_spawn.try_set_start_method(start_method)
|
_spawn.try_set_start_method(start_method)
|
||||||
return trio.run(_main, async_fn, args, kwargs, arbiter_addr, name)
|
return trio.run(_main, async_fn, args, kwargs, arbiter_addr, name)
|
||||||
|
|
||||||
|
|
|
@ -537,6 +537,7 @@ class Actor:
|
||||||
f"Setting loglevel for {self.uid} to {self.loglevel}")
|
f"Setting loglevel for {self.uid} to {self.loglevel}")
|
||||||
get_console_log(self.loglevel)
|
get_console_log(self.loglevel)
|
||||||
|
|
||||||
|
assert spawn_ctx
|
||||||
log.info(
|
log.info(
|
||||||
f"Started new {spawn_ctx.current_process()} for {self.uid}")
|
f"Started new {spawn_ctx.current_process()} for {self.uid}")
|
||||||
|
|
||||||
|
@ -555,6 +556,11 @@ class Actor:
|
||||||
accept_addr: Tuple[str, int],
|
accept_addr: Tuple[str, int],
|
||||||
parent_addr: Tuple[str, int] = None
|
parent_addr: Tuple[str, int] = None
|
||||||
) -> None:
|
) -> None:
|
||||||
|
"""Entry point for a `trio_run_in_process` subactor.
|
||||||
|
|
||||||
|
Here we don't need to call `trio.run()` since trip does that as
|
||||||
|
part of its subprocess startup sequence.
|
||||||
|
"""
|
||||||
if self.loglevel is not None:
|
if self.loglevel is not None:
|
||||||
log.info(
|
log.info(
|
||||||
f"Setting loglevel for {self.uid} to {self.loglevel}")
|
f"Setting loglevel for {self.uid} to {self.loglevel}")
|
||||||
|
|
|
@ -6,7 +6,7 @@ Mostly just wrapping around ``multiprocessing``.
|
||||||
import inspect
|
import inspect
|
||||||
import multiprocessing as mp
|
import multiprocessing as mp
|
||||||
import platform
|
import platform
|
||||||
from typing import Any, List, Dict
|
from typing import Any, Dict, Optional
|
||||||
|
|
||||||
import trio
|
import trio
|
||||||
from trio_typing import TaskStatus
|
from trio_typing import TaskStatus
|
||||||
|
@ -32,8 +32,13 @@ from ._actor import Actor, ActorFailure
|
||||||
|
|
||||||
log = get_logger('tractor')
|
log = get_logger('tractor')
|
||||||
|
|
||||||
_ctx: mp.context.BaseContext = mp.get_context("spawn") # type: ignore
|
# use trip as our default for now
|
||||||
_spawn_method: str = "spawn"
|
if platform.system() != 'Windows':
|
||||||
|
_spawn_method: str = "trio_run_in_process"
|
||||||
|
else:
|
||||||
|
_spawn_method = "spawn"
|
||||||
|
|
||||||
|
_ctx: Optional[mp.context.BaseContext] = None
|
||||||
|
|
||||||
|
|
||||||
if platform.system() == 'Windows':
|
if platform.system() == 'Windows':
|
||||||
|
@ -46,7 +51,7 @@ else:
|
||||||
await trio.hazmat.wait_readable(proc.sentinel)
|
await trio.hazmat.wait_readable(proc.sentinel)
|
||||||
|
|
||||||
|
|
||||||
def try_set_start_method(name: str) -> mp.context.BaseContext:
|
def try_set_start_method(name: str) -> Optional[mp.context.BaseContext]:
|
||||||
"""Attempt to set the start method for ``multiprocess.Process`` spawning.
|
"""Attempt to set the start method for ``multiprocess.Process`` spawning.
|
||||||
|
|
||||||
If the desired method is not supported the sub-interpreter (aka "spawn"
|
If the desired method is not supported the sub-interpreter (aka "spawn"
|
||||||
|
@ -55,15 +60,16 @@ def try_set_start_method(name: str) -> mp.context.BaseContext:
|
||||||
global _ctx
|
global _ctx
|
||||||
global _spawn_method
|
global _spawn_method
|
||||||
|
|
||||||
allowed = mp.get_all_start_methods()
|
methods = mp.get_all_start_methods()
|
||||||
|
methods.remove('fork')
|
||||||
|
|
||||||
# no Windows support for trip yet (afaik)
|
# no Windows support for trip yet
|
||||||
if platform.system() != 'Windows':
|
if platform.system() != 'Windows':
|
||||||
allowed += ['trio_run_in_process']
|
methods += ['trio_run_in_process']
|
||||||
|
|
||||||
if name not in allowed:
|
if name not in methods:
|
||||||
raise ValueError(
|
raise ValueError(
|
||||||
f"Spawn method {name} is unsupported please choose one of {allowed}"
|
f"Spawn method `{name}` is invalid please choose one of {methods}"
|
||||||
)
|
)
|
||||||
|
|
||||||
elif name == 'fork':
|
elif name == 'fork':
|
||||||
|
@ -73,6 +79,10 @@ def try_set_start_method(name: str) -> mp.context.BaseContext:
|
||||||
elif name == 'forkserver':
|
elif name == 'forkserver':
|
||||||
_forkserver_override.override_stdlib()
|
_forkserver_override.override_stdlib()
|
||||||
_ctx = mp.get_context(name)
|
_ctx = mp.get_context(name)
|
||||||
|
elif name == 'trio_run_in_process':
|
||||||
|
_ctx = None
|
||||||
|
else:
|
||||||
|
_ctx = mp.get_context(name)
|
||||||
|
|
||||||
_spawn_method = name
|
_spawn_method = name
|
||||||
return _ctx
|
return _ctx
|
||||||
|
@ -191,6 +201,7 @@ async def new_proc(
|
||||||
# TRIP blocks here until process is complete
|
# TRIP blocks here until process is complete
|
||||||
else:
|
else:
|
||||||
# `multiprocessing`
|
# `multiprocessing`
|
||||||
|
assert _ctx
|
||||||
start_method = _ctx.get_start_method()
|
start_method = _ctx.get_start_method()
|
||||||
if start_method == 'forkserver':
|
if start_method == 'forkserver':
|
||||||
# XXX do our hackery on the stdlib to avoid multiple
|
# XXX do our hackery on the stdlib to avoid multiple
|
||||||
|
|
Loading…
Reference in New Issue