Manage a `multiprocessing.forkserver` manually
Start a forkserver once in the main (parent-most) process and pass ipc info (fds) to subprocesses manually such that embedded calls to `multiprocessing.Process.start()` just work. Note that this relies on our overridden version of the stdlib's `multiprocessing.forkserver` module. Resolves #6forkserver_singleton
parent
f46d5b2b62
commit
50517c9488
|
@ -165,6 +165,7 @@ class Actor:
|
||||||
self._listeners = []
|
self._listeners = []
|
||||||
self._parent_chan = None
|
self._parent_chan = None
|
||||||
self._accept_host = None
|
self._accept_host = None
|
||||||
|
self._fs_deats = None
|
||||||
|
|
||||||
async def wait_for_peer(self, uid):
|
async def wait_for_peer(self, uid):
|
||||||
"""Wait for a connection back from a spawned actor with a given
|
"""Wait for a connection back from a spawned actor with a given
|
||||||
|
@ -361,9 +362,10 @@ class Actor:
|
||||||
finally:
|
finally:
|
||||||
log.debug(f"Exiting msg loop for {chan} from {chan.uid}")
|
log.debug(f"Exiting msg loop for {chan} from {chan.uid}")
|
||||||
|
|
||||||
def _fork_main(self, accept_addr, parent_addr=None):
|
def _fork_main(self, accept_addr, fs_deats, parent_addr=None):
|
||||||
# after fork routine which invokes a fresh ``trio.run``
|
# after fork routine which invokes a fresh ``trio.run``
|
||||||
# log.warn("Log level after fork is {self.loglevel}")
|
# log.warn("Log level after fork is {self.loglevel}")
|
||||||
|
self._fs_deats = fs_deats
|
||||||
from ._trionics import ctx
|
from ._trionics import ctx
|
||||||
if self.loglevel is not None:
|
if self.loglevel is not None:
|
||||||
get_console_log(self.loglevel)
|
get_console_log(self.loglevel)
|
||||||
|
|
|
@ -173,7 +173,7 @@ class Portal:
|
||||||
# send cancel cmd - might not get response
|
# send cancel cmd - might not get response
|
||||||
await self.run('self', 'cancel')
|
await self.run('self', 'cancel')
|
||||||
return True
|
return True
|
||||||
except trio.ClosedStreamError:
|
except trio.ClosedResourceError:
|
||||||
log.warn(
|
log.warn(
|
||||||
f"{self.channel} for {self.channel.uid} was already closed?")
|
f"{self.channel} for {self.channel.uid} was already closed?")
|
||||||
return False
|
return False
|
||||||
|
|
|
@ -3,10 +3,12 @@
|
||||||
"""
|
"""
|
||||||
import multiprocessing as mp
|
import multiprocessing as mp
|
||||||
import inspect
|
import inspect
|
||||||
|
from multiprocessing import forkserver, semaphore_tracker
|
||||||
|
|
||||||
import trio
|
import trio
|
||||||
from async_generator import asynccontextmanager, aclosing
|
from async_generator import asynccontextmanager, aclosing
|
||||||
|
|
||||||
|
from . import _forkserver_hackzorz # overrides stdlib
|
||||||
from ._state import current_actor
|
from ._state import current_actor
|
||||||
from .log import get_logger, get_loglevel
|
from .log import get_logger, get_loglevel
|
||||||
from ._actor import Actor, ActorFailure
|
from ._actor import Actor, ActorFailure
|
||||||
|
@ -27,6 +29,7 @@ class ActorNursery:
|
||||||
# portals spawned with ``run_in_actor()``
|
# portals spawned with ``run_in_actor()``
|
||||||
self._cancel_after_result_on_exit = set()
|
self._cancel_after_result_on_exit = set()
|
||||||
self.cancelled = False
|
self.cancelled = False
|
||||||
|
self._fs = None
|
||||||
|
|
||||||
async def __aenter__(self):
|
async def __aenter__(self):
|
||||||
return self
|
return self
|
||||||
|
@ -50,9 +53,34 @@ class ActorNursery:
|
||||||
)
|
)
|
||||||
parent_addr = self._actor.accept_addr
|
parent_addr = self._actor.accept_addr
|
||||||
assert parent_addr
|
assert parent_addr
|
||||||
|
self._fs = fs = forkserver._forkserver
|
||||||
|
if mp.current_process().name == 'MainProcess' and (
|
||||||
|
not self._actor._fs_deats
|
||||||
|
):
|
||||||
|
# if we're the "main" process start the forkserver only once
|
||||||
|
# and pass it's ipc info to downstream children
|
||||||
|
|
||||||
|
# forkserver.set_forkserver_preload(rpc_module_paths)
|
||||||
|
forkserver.ensure_running()
|
||||||
|
fs_deats = addr, alive_fd, pid, st_pid, st_fd = (
|
||||||
|
fs._forkserver_address,
|
||||||
|
fs._forkserver_alive_fd,
|
||||||
|
fs._forkserver_pid,
|
||||||
|
semaphore_tracker._semaphore_tracker._pid,
|
||||||
|
semaphore_tracker._semaphore_tracker._fd,
|
||||||
|
)
|
||||||
|
else:
|
||||||
|
fs_deats = (
|
||||||
|
fs._forkserver_address,
|
||||||
|
fs._forkserver_alive_fd,
|
||||||
|
fs._forkserver_pid,
|
||||||
|
semaphore_tracker._semaphore_tracker._pid,
|
||||||
|
semaphore_tracker._semaphore_tracker._fd,
|
||||||
|
) = self._actor._fs_deats
|
||||||
|
|
||||||
proc = ctx.Process(
|
proc = ctx.Process(
|
||||||
target=actor._fork_main,
|
target=actor._fork_main,
|
||||||
args=(bind_addr, parent_addr),
|
args=(bind_addr, fs_deats, parent_addr),
|
||||||
# daemon=True,
|
# daemon=True,
|
||||||
name=name,
|
name=name,
|
||||||
)
|
)
|
||||||
|
|
Loading…
Reference in New Issue