forked from goodboy/tractor
Try out trip as the default spawn_method on unix for now
parent
f1a96c1680
commit
ddbf55768f
|
@ -4,6 +4,7 @@ tractor: An actor model micro-framework built on
|
||||||
"""
|
"""
|
||||||
import importlib
|
import importlib
|
||||||
from functools import partial
|
from functools import partial
|
||||||
|
import platform
|
||||||
from typing import Tuple, Any, Optional
|
from typing import Tuple, Any, Optional
|
||||||
import typing
|
import typing
|
||||||
|
|
||||||
|
@ -99,15 +100,18 @@ def run(
|
||||||
name: Optional[str] = None,
|
name: Optional[str] = None,
|
||||||
arbiter_addr: Tuple[str, int] = (
|
arbiter_addr: Tuple[str, int] = (
|
||||||
_default_arbiter_host, _default_arbiter_port),
|
_default_arbiter_host, _default_arbiter_port),
|
||||||
# the `multiprocessing` start method:
|
# either the `multiprocessing` start method:
|
||||||
# https://docs.python.org/3/library/multiprocessing.html#contexts-and-start-methods
|
# https://docs.python.org/3/library/multiprocessing.html#contexts-and-start-methods
|
||||||
start_method: str = 'forkserver',
|
# OR `trio-run-in-process` (the new default).
|
||||||
|
start_method: str = 'trip',
|
||||||
**kwargs,
|
**kwargs,
|
||||||
) -> Any:
|
) -> Any:
|
||||||
"""Run a trio-actor async function in process.
|
"""Run a trio-actor async function in process.
|
||||||
|
|
||||||
This is tractor's main entry and the start point for any async actor.
|
This is tractor's main entry and the start point for any async actor.
|
||||||
"""
|
"""
|
||||||
|
if platform.system() == 'Windows':
|
||||||
|
start_method = 'spawn' # only one supported for now
|
||||||
_spawn.try_set_start_method(start_method)
|
_spawn.try_set_start_method(start_method)
|
||||||
return trio.run(_main, async_fn, args, kwargs, arbiter_addr, name)
|
return trio.run(_main, async_fn, args, kwargs, arbiter_addr, name)
|
||||||
|
|
||||||
|
|
|
@ -34,6 +34,7 @@ from ._actor import Actor, ActorFailure
|
||||||
log = get_logger('tractor')
|
log = get_logger('tractor')
|
||||||
|
|
||||||
_ctx: mp.context.BaseContext = mp.get_context("spawn") # type: ignore
|
_ctx: mp.context.BaseContext = mp.get_context("spawn") # type: ignore
|
||||||
|
_spawn_method: str = "spawn"
|
||||||
|
|
||||||
|
|
||||||
if platform.system() == 'Windows':
|
if platform.system() == 'Windows':
|
||||||
|
@ -51,21 +52,31 @@ def try_set_start_method(name: str) -> mp.context.BaseContext:
|
||||||
method) is used.
|
method) is used.
|
||||||
"""
|
"""
|
||||||
global _ctx
|
global _ctx
|
||||||
|
global _spawn_method
|
||||||
|
|
||||||
allowed = mp.get_all_start_methods()
|
allowed = mp.get_all_start_methods()
|
||||||
|
|
||||||
|
# no Windows support for trip yet (afaik)
|
||||||
|
if platform.system() != 'Windows':
|
||||||
|
allowed += ['trip']
|
||||||
|
|
||||||
if name not in allowed:
|
if name not in allowed:
|
||||||
name = 'spawn'
|
raise ValueError(
|
||||||
|
f"Spawn method {name} is unsupported please choose one of {allowed}"
|
||||||
|
)
|
||||||
|
|
||||||
|
if name == 'trip':
|
||||||
|
_spawn_method = name
|
||||||
|
return name
|
||||||
|
|
||||||
elif name == 'fork':
|
elif name == 'fork':
|
||||||
raise ValueError(
|
raise ValueError(
|
||||||
"`fork` is unsupported due to incompatibility with `trio`"
|
"`fork` is unsupported due to incompatibility with `trio`"
|
||||||
)
|
)
|
||||||
elif name == 'forkserver':
|
elif name == 'forkserver':
|
||||||
_forkserver_override.override_stdlib()
|
_forkserver_override.override_stdlib()
|
||||||
|
_ctx = mp.get_context(name)
|
||||||
|
|
||||||
assert name in allowed
|
|
||||||
|
|
||||||
_ctx = mp.get_context(name)
|
|
||||||
return _ctx
|
return _ctx
|
||||||
|
|
||||||
|
|
||||||
|
@ -144,7 +155,7 @@ async def new_proc(
|
||||||
bind_addr: Tuple[str, int],
|
bind_addr: Tuple[str, int],
|
||||||
parent_addr: Tuple[str, int],
|
parent_addr: Tuple[str, int],
|
||||||
begin_wait_phase: trio.Event,
|
begin_wait_phase: trio.Event,
|
||||||
use_trip: bool = True,
|
use_trip: bool = False,
|
||||||
task_status: TaskStatus[Portal] = trio.TASK_STATUS_IGNORED
|
task_status: TaskStatus[Portal] = trio.TASK_STATUS_IGNORED
|
||||||
) -> None:
|
) -> None:
|
||||||
"""Create a new ``multiprocessing.Process`` using the
|
"""Create a new ``multiprocessing.Process`` using the
|
||||||
|
@ -153,7 +164,7 @@ async def new_proc(
|
||||||
cancel_scope = None
|
cancel_scope = None
|
||||||
|
|
||||||
async with trio.open_nursery() as nursery:
|
async with trio.open_nursery() as nursery:
|
||||||
if use_trip:
|
if use_trip or _spawn_method == 'trip':
|
||||||
# trio_run_in_process
|
# trio_run_in_process
|
||||||
async with trio_run_in_process.open_in_process(
|
async with trio_run_in_process.open_in_process(
|
||||||
subactor._trip_main,
|
subactor._trip_main,
|
||||||
|
|
Loading…
Reference in New Issue