From d75739e9c7a30420aa2787a1381ac7ede2af4479 Mon Sep 17 00:00:00 2001 From: Tyler Goodlet Date: Tue, 5 Mar 2019 18:52:19 -0500 Subject: [PATCH] Factor process creation into a separate factory Make a `_spawn` module for encapsulating all the `multiprocessing` "spawn method" stuff and factor current forkserver steps into it. --- tractor/_actor.py | 2 +- tractor/_spawn.py | 61 ++++++++++++++++++++++++++++++++++++++++++++ tractor/_state.py | 7 ----- tractor/_trionics.py | 42 +++++------------------------- 4 files changed, 69 insertions(+), 43 deletions(-) create mode 100644 tractor/_spawn.py diff --git a/tractor/_actor.py b/tractor/_actor.py index 7a5e72e..2256ec2 100644 --- a/tractor/_actor.py +++ b/tractor/_actor.py @@ -483,7 +483,7 @@ class Actor: """The routine called *after fork* which invokes a fresh ``trio.run`` """ self._forkserver_info = forkserver_info - from ._trionics import ctx + from ._spawn import ctx if self.loglevel is not None: log.info( f"Setting loglevel for {self.uid} to {self.loglevel}") diff --git a/tractor/_spawn.py b/tractor/_spawn.py new file mode 100644 index 0000000..9dab478 --- /dev/null +++ b/tractor/_spawn.py @@ -0,0 +1,61 @@ +""" +Process spawning. + +Mostly just wrapping around ``multiprocessing``. +""" +import multiprocessing as mp +from multiprocessing import forkserver, semaphore_tracker # type: ignore +from typing import Tuple + +from . import _forkserver_hackzorz +from ._state import current_actor +from ._actor import Actor + + +_forkserver_hackzorz.override_stdlib() +ctx = mp.get_context("forkserver") + + +def is_main_process() -> bool: + """Bool determining if this actor is running in the top-most process. + """ + return mp.current_process().name == 'MainProcess' + + +def new_proc( + name: str, + actor: Actor, + # passed through to actor main + bind_addr: Tuple[str, int], + parent_addr: Tuple[str, int], +) -> mp.Process: + fs = forkserver._forkserver + curr_actor = current_actor() + if is_main_process() and not curr_actor._forkserver_info: + # if we're the "main" process start the forkserver only once + # and pass its ipc info to downstream children + # forkserver.set_forkserver_preload(rpc_module_paths) + forkserver.ensure_running() + fs_info = ( + fs._forkserver_address, + fs._forkserver_alive_fd, + getattr(fs, '_forkserver_pid', None), + getattr(semaphore_tracker._semaphore_tracker, '_pid', None), + semaphore_tracker._semaphore_tracker._fd, + ) + else: + assert curr_actor._forkserver_info + fs_info = ( + fs._forkserver_address, + fs._forkserver_alive_fd, + fs._forkserver_pid, + semaphore_tracker._semaphore_tracker._pid, + semaphore_tracker._semaphore_tracker._fd, + ) = curr_actor._forkserver_info + + return ctx.Process( + target=actor._fork_main, + args=(bind_addr, fs_info, parent_addr), + # daemon=True, + name=name, + ) diff --git a/tractor/_state.py b/tractor/_state.py index 2606ff3..704fae7 100644 --- a/tractor/_state.py +++ b/tractor/_state.py @@ -1,7 +1,6 @@ """ Per process state """ -import multiprocessing as mp from typing import Optional @@ -14,9 +13,3 @@ def current_actor() -> 'Actor': # type: ignore if not _current_actor: raise RuntimeError("No actor instance has been defined yet?") return _current_actor - - -def is_main_process(): - """Bool determining if this actor is running in the top-most process. - """ - return mp.current_process().name == 'MainProcess' diff --git a/tractor/_trionics.py b/tractor/_trionics.py index dd18158..71f8d48 100644 --- a/tractor/_trionics.py +++ b/tractor/_trionics.py @@ -3,22 +3,20 @@ """ import multiprocessing as mp import inspect -from multiprocessing import forkserver, semaphore_tracker # type: ignore from typing import Tuple, List, Dict, Optional, Any import typing import trio from async_generator import asynccontextmanager, aclosing -from . import _forkserver_hackzorz +# from . import _forkserver_hackzorz from ._state import current_actor from .log import get_logger, get_loglevel from ._actor import Actor, ActorFailure from ._portal import Portal +from . import _spawn -_forkserver_hackzorz.override_stdlib() -ctx = mp.get_context("forkserver") log = get_logger('tractor') @@ -36,7 +34,6 @@ class ActorNursery: # cancelled when their "main" result arrives self._cancel_after_result_on_exit: set = set() self.cancelled: bool = False - self._forkserver: forkserver.ForkServer = None async def __aenter__(self): return self @@ -60,36 +57,11 @@ class ActorNursery: ) parent_addr = self._actor.accept_addr assert parent_addr - self._forkserver = fs = forkserver._forkserver - if mp.current_process().name == 'MainProcess' and ( - not self._actor._forkserver_info - ): - # if we're the "main" process start the forkserver only once - # and pass its ipc info to downstream children - # forkserver.set_forkserver_preload(rpc_module_paths) - forkserver.ensure_running() - fs_info = ( - fs._forkserver_address, - fs._forkserver_alive_fd, - getattr(fs, '_forkserver_pid', None), - getattr(semaphore_tracker._semaphore_tracker, '_pid', None), - semaphore_tracker._semaphore_tracker._fd, - ) - else: - assert self._actor._forkserver_info - fs_info = ( - fs._forkserver_address, - fs._forkserver_alive_fd, - fs._forkserver_pid, - semaphore_tracker._semaphore_tracker._pid, - semaphore_tracker._semaphore_tracker._fd, - ) = self._actor._forkserver_info - - proc = ctx.Process( - target=actor._fork_main, - args=(bind_addr, fs_info, parent_addr), - # daemon=True, - name=name, + proc = _spawn.new_proc( + name, + actor, + bind_addr, + parent_addr, ) # register the process before start in case we get a cancel # request before the actor has fully spawned - then we can wait