From 50517c94882ec34415178e6b58373dae535e87c6 Mon Sep 17 00:00:00 2001 From: Tyler Goodlet Date: Wed, 25 Jul 2018 00:43:31 -0400 Subject: [PATCH] Manage a `multiprocessing.forkserver` manually Start a forkserver once in the main (parent-most) process and pass ipc info (fds) to subprocesses manually such that embedded calls to `multiprocessing.Process.start()` just work. Note that this relies on our overridden version of the stdlib's `multiprocessing.forkserver` module. Resolves #6 --- tractor/_actor.py | 4 +++- tractor/_portal.py | 2 +- tractor/_trionics.py | 30 +++++++++++++++++++++++++++++- 3 files changed, 33 insertions(+), 3 deletions(-) diff --git a/tractor/_actor.py b/tractor/_actor.py index 7693035..0a20ead 100644 --- a/tractor/_actor.py +++ b/tractor/_actor.py @@ -165,6 +165,7 @@ class Actor: self._listeners = [] self._parent_chan = None self._accept_host = None + self._fs_deats = None async def wait_for_peer(self, uid): """Wait for a connection back from a spawned actor with a given @@ -361,9 +362,10 @@ class Actor: finally: log.debug(f"Exiting msg loop for {chan} from {chan.uid}") - def _fork_main(self, accept_addr, parent_addr=None): + def _fork_main(self, accept_addr, fs_deats, parent_addr=None): # after fork routine which invokes a fresh ``trio.run`` # log.warn("Log level after fork is {self.loglevel}") + self._fs_deats = fs_deats from ._trionics import ctx if self.loglevel is not None: get_console_log(self.loglevel) diff --git a/tractor/_portal.py b/tractor/_portal.py index ebae431..d9ee957 100644 --- a/tractor/_portal.py +++ b/tractor/_portal.py @@ -173,7 +173,7 @@ class Portal: # send cancel cmd - might not get response await self.run('self', 'cancel') return True - except trio.ClosedStreamError: + except trio.ClosedResourceError: log.warn( f"{self.channel} for {self.channel.uid} was already closed?") return False diff --git a/tractor/_trionics.py b/tractor/_trionics.py index aa9becb..19e7f97 100644 --- a/tractor/_trionics.py +++ b/tractor/_trionics.py @@ -3,10 +3,12 @@ """ import multiprocessing as mp import inspect +from multiprocessing import forkserver, semaphore_tracker import trio from async_generator import asynccontextmanager, aclosing +from . import _forkserver_hackzorz # overrides stdlib from ._state import current_actor from .log import get_logger, get_loglevel from ._actor import Actor, ActorFailure @@ -27,6 +29,7 @@ class ActorNursery: # portals spawned with ``run_in_actor()`` self._cancel_after_result_on_exit = set() self.cancelled = False + self._fs = None async def __aenter__(self): return self @@ -50,9 +53,34 @@ class ActorNursery: ) parent_addr = self._actor.accept_addr assert parent_addr + self._fs = fs = forkserver._forkserver + if mp.current_process().name == 'MainProcess' and ( + not self._actor._fs_deats + ): + # if we're the "main" process start the forkserver only once + # and pass it's ipc info to downstream children + + # forkserver.set_forkserver_preload(rpc_module_paths) + forkserver.ensure_running() + fs_deats = addr, alive_fd, pid, st_pid, st_fd = ( + fs._forkserver_address, + fs._forkserver_alive_fd, + fs._forkserver_pid, + semaphore_tracker._semaphore_tracker._pid, + semaphore_tracker._semaphore_tracker._fd, + ) + else: + fs_deats = ( + fs._forkserver_address, + fs._forkserver_alive_fd, + fs._forkserver_pid, + semaphore_tracker._semaphore_tracker._pid, + semaphore_tracker._semaphore_tracker._fd, + ) = self._actor._fs_deats + proc = ctx.Process( target=actor._fork_main, - args=(bind_addr, parent_addr), + args=(bind_addr, fs_deats, parent_addr), # daemon=True, name=name, )