diff --git a/tractor/__init__.py b/tractor/__init__.py index 4214c2d..96ac856 100644 --- a/tractor/__init__.py +++ b/tractor/__init__.py @@ -19,6 +19,7 @@ from ._trionics import open_nursery from ._state import current_actor from ._exceptions import RemoteActorError, ModuleNotExposed from . import msg +from . import _spawn __all__ = [ @@ -92,12 +93,14 @@ def run( name: str = None, arbiter_addr: Tuple[str, int] = ( _default_arbiter_host, _default_arbiter_port), + spawn_method: str = 'forkserver', **kwargs: typing.Dict[str, typing.Any], ) -> Any: """Run a trio-actor async function in process. This is tractor's main entry and the start point for any async actor. """ + _spawn.try_set_start_method(spawn_method) return trio.run(_main, async_fn, args, kwargs, name, arbiter_addr) diff --git a/tractor/_actor.py b/tractor/_actor.py index 2256ec2..1b38d88 100644 --- a/tractor/_actor.py +++ b/tractor/_actor.py @@ -391,7 +391,8 @@ class Actor: f" {chan} from {chan.uid}") break - log.trace(f"Received msg {msg} from {chan.uid}") # type: ignore + log.trace( # type: ignore + f"Received msg {msg} from {chan.uid}") if msg.get('cid'): # deliver response to local caller/waiter await self._push_result(chan, msg) @@ -478,18 +479,20 @@ class Actor: self, accept_addr: Tuple[str, int], forkserver_info: Tuple[Any, Any, Any, Any, Any], + start_method: str, parent_addr: Tuple[str, int] = None ) -> None: """The routine called *after fork* which invokes a fresh ``trio.run`` """ self._forkserver_info = forkserver_info - from ._spawn import ctx + from ._spawn import try_set_start_method + spawn_ctx = try_set_start_method(start_method) if self.loglevel is not None: log.info( f"Setting loglevel for {self.uid} to {self.loglevel}") get_console_log(self.loglevel) log.info( - f"Started new {ctx.current_process()} for {self.uid}") + f"Started new {spawn_ctx.current_process()} for {self.uid}") _state._current_actor = self log.debug(f"parent_addr is {parent_addr}") try: diff --git a/tractor/_spawn.py b/tractor/_spawn.py index 9dab478..3d35515 100644 --- a/tractor/_spawn.py +++ b/tractor/_spawn.py @@ -5,15 +5,35 @@ Mostly just wrapping around ``multiprocessing``. """ import multiprocessing as mp from multiprocessing import forkserver, semaphore_tracker # type: ignore -from typing import Tuple +from typing import Tuple, Optional from . import _forkserver_hackzorz from ._state import current_actor from ._actor import Actor -_forkserver_hackzorz.override_stdlib() -ctx = mp.get_context("forkserver") +_ctx: mp.context.BaseContext = mp.get_context("spawn") + + +def try_set_start_method(name: str) -> mp.context.BaseContext: + """Attempt to set the start method for ``multiprocess.Process`` spawning. + + If the desired method is not supported the sub-interpreter (aka "spawn" + method) is used. + """ + global _ctx + + allowed = mp.get_all_start_methods() + if name not in allowed: + name == 'spawn' + + assert name in allowed + + if name == 'forkserver': + _forkserver_hackzorz.override_stdlib() + + _ctx = mp.get_context(name) + return _ctx def is_main_process() -> bool: @@ -29,33 +49,47 @@ def new_proc( bind_addr: Tuple[str, int], parent_addr: Tuple[str, int], ) -> mp.Process: - fs = forkserver._forkserver - curr_actor = current_actor() - if is_main_process() and not curr_actor._forkserver_info: - # if we're the "main" process start the forkserver only once - # and pass its ipc info to downstream children - # forkserver.set_forkserver_preload(rpc_module_paths) - forkserver.ensure_running() - fs_info = ( - fs._forkserver_address, - fs._forkserver_alive_fd, - getattr(fs, '_forkserver_pid', None), - getattr(semaphore_tracker._semaphore_tracker, '_pid', None), - semaphore_tracker._semaphore_tracker._fd, - ) + """Create a new ``multiprocessing.Process`` using the + spawn method as configured using ``try_set_start_method()``. + """ + start_method = _ctx.get_start_method() + if start_method == 'forkserver': + # XXX do our hackery on the stdlib to avoid multiple + # forkservers (one at each subproc layer). + fs = forkserver._forkserver + curr_actor = current_actor() + if is_main_process() and not curr_actor._forkserver_info: + # if we're the "main" process start the forkserver only once + # and pass its ipc info to downstream children + # forkserver.set_forkserver_preload(rpc_module_paths) + forkserver.ensure_running() + fs_info = ( + fs._forkserver_address, + fs._forkserver_alive_fd, + getattr(fs, '_forkserver_pid', None), + getattr(semaphore_tracker._semaphore_tracker, '_pid', None), + semaphore_tracker._semaphore_tracker._fd, + ) + else: + assert curr_actor._forkserver_info + fs_info = ( + fs._forkserver_address, + fs._forkserver_alive_fd, + fs._forkserver_pid, + semaphore_tracker._semaphore_tracker._pid, + semaphore_tracker._semaphore_tracker._fd, + ) = curr_actor._forkserver_info else: - assert curr_actor._forkserver_info - fs_info = ( - fs._forkserver_address, - fs._forkserver_alive_fd, - fs._forkserver_pid, - semaphore_tracker._semaphore_tracker._pid, - semaphore_tracker._semaphore_tracker._fd, - ) = curr_actor._forkserver_info + fs_info = (None, None, None, None, None) - return ctx.Process( + return _ctx.Process( target=actor._fork_main, - args=(bind_addr, fs_info, parent_addr), + args=( + bind_addr, + fs_info, + start_method, + parent_addr + ), # daemon=True, name=name, )