forked from goodboy/tractor
1
0
Fork 0

Do __main__ fixups like ``mulitprocessing does``

Instead of hackery trying to map modules manually from the filesystem
let Python do all the work by simply copying what ``multiprocessing``
does to "fixup the __main__ module" in spawned subprocesses. The new
private module ``_mp_fixup_main.py`` is simply cherry picked code from
``multiprocessing.spawn`` which does just that. We only need these
"fixups" when using a backend other then ``multiprocessing``; for
now just when using ``trio_run_in_process``.
try_trip^2
Tyler Goodlet 2020-01-29 21:06:40 -05:00
parent 2a4307975d
commit 6348121d23
3 changed files with 147 additions and 27 deletions

View File

@ -28,6 +28,7 @@ from ._exceptions import (
from ._discovery import get_arbiter from ._discovery import get_arbiter
from ._portal import Portal from ._portal import Portal
from . import _state from . import _state
from . import _mp_fixup_main
log = get_logger('tractor') log = get_logger('tractor')
@ -169,6 +170,14 @@ class Actor:
_root_nursery: trio.Nursery _root_nursery: trio.Nursery
_server_nursery: trio.Nursery _server_nursery: trio.Nursery
# marked by the process spawning backend at startup
# will be None for the parent most process started manually
# by the user (currently called the "arbiter")
_spawn_method: Optional[str] = None
# Information about `__main__` from parent
_parent_main_data: Dict[str, str]
def __init__( def __init__(
self, self,
name: str, name: str,
@ -178,17 +187,20 @@ class Actor:
loglevel: str = None, loglevel: str = None,
arbiter_addr: Optional[Tuple[str, int]] = None, arbiter_addr: Optional[Tuple[str, int]] = None,
) -> None: ) -> None:
"""This constructor is called in the parent actor **before** the spawning
phase (aka before a new process is executed).
"""
self.name = name self.name = name
self.uid = (name, uid or str(uuid.uuid4())) self.uid = (name, uid or str(uuid.uuid4()))
# retreive and store parent `__main__` data which
# will be passed to children
self._parent_main_data = _mp_fixup_main._mp_figure_out_main()
mods = {} mods = {}
for name in rpc_module_paths or (): for name in rpc_module_paths or ():
mod = importlib.import_module(name) mod = importlib.import_module(name)
suffix_index = mod.__file__.find('.py') mods[name] = _get_mod_abspath(mod)
unique_modname = os.path.basename(mod.__file__[:suffix_index])
mods[unique_modname] = _get_mod_abspath(mod)
if mod.__name__ == '__main__' or mod.__name__ == '__mp_main__':
self._main_mod = unique_modname
self.rpc_module_paths = mods self.rpc_module_paths = mods
self._mods: dict = {} self._mods: dict = {}
@ -243,35 +255,40 @@ class Actor:
code (if it exists). code (if it exists).
""" """
try: try:
for modname, absfilepath in self.rpc_module_paths.items(): if self._spawn_method == 'trio_run_in_process':
sys.path.append(os.path.dirname(absfilepath)) parent_data = self._parent_main_data
log.debug(f"Attempting to import {modname}@{absfilepath}") if 'init_main_from_name' in parent_data:
spec = importlib.util.spec_from_file_location( _mp_fixup_main._fixup_main_from_name(
modname, absfilepath) parent_data['init_main_from_name'])
mod = importlib.util.module_from_spec(spec) elif 'init_main_from_path' in parent_data:
spec.loader.exec_module(mod) # type: ignore _mp_fixup_main._fixup_main_from_path(
self._mods[modname] = mod parent_data['init_main_from_path'])
# XXX append the allowed module to the python path for modpath, filepath in self.rpc_module_paths.items():
# which should allow for relative (at least downward) # XXX append the allowed module to the python path which
# imports. Seems to be the only that will work currently # should allow for relative (at least downward) imports.
# to get `trio-run-in-process` to import modules we "send sys.path.append(os.path.dirname(filepath))
# it". # XXX leaving this in for now incase we decide to swap
# it with the above path mutating solution:
# if self.name != 'arbiter': # spec = importlib.util.spec_from_file_location(
# importlib.import_module('doggy') # modname, absfilepath)
# from celery.contrib import rdb; rdb.set_trace() # mod = importlib.util.module_from_spec(spec)
# spec.loader.exec_module(mod) # type: ignore
log.debug(f"Attempting to import {modpath}@{filepath}")
mod = importlib.import_module(modpath)
self._mods[modpath] = mod
except ModuleNotFoundError: except ModuleNotFoundError:
# it is expected the corresponding `ModuleNotExposed` error # it is expected the corresponding `ModuleNotExposed` error
# will be raised later # will be raised later
log.error(f"Failed to import {modname} in {self.name}") log.error(f"Failed to import {modpath} in {self.name}")
raise raise
def _get_rpc_func(self, ns, funcname): def _get_rpc_func(self, ns, funcname):
if ns == '__main__' or ns == '__mp_main__': if ns == "__mp_main__":
# lookup the specific module in the child denoted # In subprocesses, `__main__` will actually map to
# as `__main__`/`__mp_main__` in the parent # `__mp_main__` which should be the same entry-point-module
ns = self._main_mod # as the parent.
ns = "__main__"
try: try:
return getattr(self._mods[ns], funcname) return getattr(self._mods[ns], funcname)
except KeyError as err: except KeyError as err:

View File

@ -0,0 +1,100 @@
"""
Helpers pulled mostly verbatim from ``multiprocessing.spawn``
to aid with "fixing up" the ``__main__`` module in subprocesses.
These helpers are needed for any spawing backend that doesn't already handle this.
For example when using ``trio_run_in_process`` it is needed but obviously not when
we're already using ``multiprocessing``.
"""
import os
import sys
import platform
import types
import runpy
from typing import Dict
ORIGINAL_DIR = os.path.abspath(os.getcwd())
def _mp_figure_out_main() -> Dict[str, str]:
"""Taken from ``multiprocessing.spawn.get_preparation_data()``.
Retrieve parent actor `__main__` module data.
"""
d = {}
# Figure out whether to initialise main in the subprocess as a module
# or through direct execution (or to leave it alone entirely)
main_module = sys.modules['__main__']
main_mod_name = getattr(main_module.__spec__, "name", None)
if main_mod_name is not None:
d['init_main_from_name'] = main_mod_name
# elif sys.platform != 'win32' or (not WINEXE and not WINSERVICE):
elif platform.system() != 'Windows':
main_path = getattr(main_module, '__file__', None)
if main_path is not None:
if (
not os.path.isabs(main_path) and (
ORIGINAL_DIR is not None)
):
# process.ORIGINAL_DIR is not None):
# main_path = os.path.join(process.ORIGINAL_DIR, main_path)
main_path = os.path.join(ORIGINAL_DIR, main_path)
d['init_main_from_path'] = os.path.normpath(main_path)
return d
# Multiprocessing module helpers to fix up the main module in
# spawned subprocesses
def _fixup_main_from_name(mod_name: str) -> None:
# __main__.py files for packages, directories, zip archives, etc, run
# their "main only" code unconditionally, so we don't even try to
# populate anything in __main__, nor do we make any changes to
# __main__ attributes
current_main = sys.modules['__main__']
if mod_name == "__main__" or mod_name.endswith(".__main__"):
return
# If this process was forked, __main__ may already be populated
if getattr(current_main.__spec__, "name", None) == mod_name:
return
# Otherwise, __main__ may contain some non-main code where we need to
# support unpickling it properly. We rerun it as __mp_main__ and make
# the normal __main__ an alias to that
# old_main_modules.append(current_main)
main_module = types.ModuleType("__mp_main__")
main_content = runpy.run_module(mod_name,
run_name="__mp_main__",
alter_sys=True)
main_module.__dict__.update(main_content)
sys.modules['__main__'] = sys.modules['__mp_main__'] = main_module
def _fixup_main_from_path(main_path: str) -> None:
# If this process was forked, __main__ may already be populated
current_main = sys.modules['__main__']
# Unfortunately, the main ipython launch script historically had no
# "if __name__ == '__main__'" guard, so we work around that
# by treating it like a __main__.py file
# See https://github.com/ipython/ipython/issues/4698
main_name = os.path.splitext(os.path.basename(main_path))[0]
if main_name == 'ipython':
return
# Otherwise, if __file__ already has the setting we expect,
# there's nothing more to do
if getattr(current_main, '__file__', None) == main_path:
return
# If the parent process has sent a path through rather than a module
# name we assume it is an executable script that may contain
# non-main code that needs to be executed
# old_main_modules.append(current_main)
main_module = types.ModuleType("__mp_main__")
main_content = runpy.run_path(main_path,
run_name="__mp_main__")
main_module.__dict__.update(main_content)
sys.modules['__main__'] = sys.modules['__mp_main__'] = main_module

View File

@ -166,6 +166,9 @@ async def new_proc(
""" """
cancel_scope = None cancel_scope = None
# mark the new actor with the global spawn method
subactor._spawn_method = _spawn_method
async with trio.open_nursery() as nursery: async with trio.open_nursery() as nursery:
if use_trio_run_in_process or _spawn_method == 'trio_run_in_process': if use_trio_run_in_process or _spawn_method == 'trio_run_in_process':
# trio_run_in_process # trio_run_in_process