Factor process creation into a separate factory

Make a `_spawn` module for encapsulating all the `multiprocessing`
"spawn method" stuff and factor current forkserver steps into it.
spawn_method_support
Tyler Goodlet 2019-03-05 18:52:19 -05:00
parent a927966170
commit d75739e9c7
4 changed files with 69 additions and 43 deletions

View File

@ -483,7 +483,7 @@ class Actor:
"""The routine called *after fork* which invokes a fresh ``trio.run`` """The routine called *after fork* which invokes a fresh ``trio.run``
""" """
self._forkserver_info = forkserver_info self._forkserver_info = forkserver_info
from ._trionics import ctx from ._spawn import ctx
if self.loglevel is not None: if self.loglevel is not None:
log.info( log.info(
f"Setting loglevel for {self.uid} to {self.loglevel}") f"Setting loglevel for {self.uid} to {self.loglevel}")

61
tractor/_spawn.py 100644
View File

@ -0,0 +1,61 @@
"""
Process spawning.
Mostly just wrapping around ``multiprocessing``.
"""
import multiprocessing as mp
from multiprocessing import forkserver, semaphore_tracker # type: ignore
from typing import Tuple
from . import _forkserver_hackzorz
from ._state import current_actor
from ._actor import Actor
_forkserver_hackzorz.override_stdlib()
ctx = mp.get_context("forkserver")
def is_main_process() -> bool:
"""Bool determining if this actor is running in the top-most process.
"""
return mp.current_process().name == 'MainProcess'
def new_proc(
name: str,
actor: Actor,
# passed through to actor main
bind_addr: Tuple[str, int],
parent_addr: Tuple[str, int],
) -> mp.Process:
fs = forkserver._forkserver
curr_actor = current_actor()
if is_main_process() and not curr_actor._forkserver_info:
# if we're the "main" process start the forkserver only once
# and pass its ipc info to downstream children
# forkserver.set_forkserver_preload(rpc_module_paths)
forkserver.ensure_running()
fs_info = (
fs._forkserver_address,
fs._forkserver_alive_fd,
getattr(fs, '_forkserver_pid', None),
getattr(semaphore_tracker._semaphore_tracker, '_pid', None),
semaphore_tracker._semaphore_tracker._fd,
)
else:
assert curr_actor._forkserver_info
fs_info = (
fs._forkserver_address,
fs._forkserver_alive_fd,
fs._forkserver_pid,
semaphore_tracker._semaphore_tracker._pid,
semaphore_tracker._semaphore_tracker._fd,
) = curr_actor._forkserver_info
return ctx.Process(
target=actor._fork_main,
args=(bind_addr, fs_info, parent_addr),
# daemon=True,
name=name,
)

View File

@ -1,7 +1,6 @@
""" """
Per process state Per process state
""" """
import multiprocessing as mp
from typing import Optional from typing import Optional
@ -14,9 +13,3 @@ def current_actor() -> 'Actor': # type: ignore
if not _current_actor: if not _current_actor:
raise RuntimeError("No actor instance has been defined yet?") raise RuntimeError("No actor instance has been defined yet?")
return _current_actor return _current_actor
def is_main_process():
"""Bool determining if this actor is running in the top-most process.
"""
return mp.current_process().name == 'MainProcess'

View File

@ -3,22 +3,20 @@
""" """
import multiprocessing as mp import multiprocessing as mp
import inspect import inspect
from multiprocessing import forkserver, semaphore_tracker # type: ignore
from typing import Tuple, List, Dict, Optional, Any from typing import Tuple, List, Dict, Optional, Any
import typing import typing
import trio import trio
from async_generator import asynccontextmanager, aclosing from async_generator import asynccontextmanager, aclosing
from . import _forkserver_hackzorz # from . import _forkserver_hackzorz
from ._state import current_actor from ._state import current_actor
from .log import get_logger, get_loglevel from .log import get_logger, get_loglevel
from ._actor import Actor, ActorFailure from ._actor import Actor, ActorFailure
from ._portal import Portal from ._portal import Portal
from . import _spawn
_forkserver_hackzorz.override_stdlib()
ctx = mp.get_context("forkserver")
log = get_logger('tractor') log = get_logger('tractor')
@ -36,7 +34,6 @@ class ActorNursery:
# cancelled when their "main" result arrives # cancelled when their "main" result arrives
self._cancel_after_result_on_exit: set = set() self._cancel_after_result_on_exit: set = set()
self.cancelled: bool = False self.cancelled: bool = False
self._forkserver: forkserver.ForkServer = None
async def __aenter__(self): async def __aenter__(self):
return self return self
@ -60,36 +57,11 @@ class ActorNursery:
) )
parent_addr = self._actor.accept_addr parent_addr = self._actor.accept_addr
assert parent_addr assert parent_addr
self._forkserver = fs = forkserver._forkserver proc = _spawn.new_proc(
if mp.current_process().name == 'MainProcess' and ( name,
not self._actor._forkserver_info actor,
): bind_addr,
# if we're the "main" process start the forkserver only once parent_addr,
# and pass its ipc info to downstream children
# forkserver.set_forkserver_preload(rpc_module_paths)
forkserver.ensure_running()
fs_info = (
fs._forkserver_address,
fs._forkserver_alive_fd,
getattr(fs, '_forkserver_pid', None),
getattr(semaphore_tracker._semaphore_tracker, '_pid', None),
semaphore_tracker._semaphore_tracker._fd,
)
else:
assert self._actor._forkserver_info
fs_info = (
fs._forkserver_address,
fs._forkserver_alive_fd,
fs._forkserver_pid,
semaphore_tracker._semaphore_tracker._pid,
semaphore_tracker._semaphore_tracker._fd,
) = self._actor._forkserver_info
proc = ctx.Process(
target=actor._fork_main,
args=(bind_addr, fs_info, parent_addr),
# daemon=True,
name=name,
) )
# register the process before start in case we get a cancel # register the process before start in case we get a cancel
# request before the actor has fully spawned - then we can wait # request before the actor has fully spawned - then we can wait