forked from goodboy/tractor
1
0
Fork 0

Merge pull request from goodboy/try_trip

Support trio-run-in-process as process spawning backend
try_trip
goodboy 2020-01-31 14:30:54 -06:00 committed by GitHub
commit 5741bd5209
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
13 changed files with 736 additions and 329 deletions

View File

@ -4,7 +4,7 @@ sudo: required
matrix:
include:
- name: "Windows, Python Latest"
- name: "Windows, Python Latest: multiprocessing"
os: windows
language: sh
python: 3.x # only works on linux
@ -13,7 +13,7 @@ matrix:
- export PATH="/c/Python:/c/Python/Scripts:$PATH"
- python -m pip install --upgrade pip wheel
- name: "Windows, Python 3.7"
- name: "Windows, Python 3.7: multiprocessing"
os: windows
python: 3.7 # only works on linux
language: sh
@ -22,8 +22,19 @@ matrix:
- export PATH="/c/Python:/c/Python/Scripts:$PATH"
- python -m pip install --upgrade pip wheel
- python: 3.7 # this works for Linux but is ignored on macOS or Windows
- python: 3.8
- name: "Python 3.7: multiprocessing"
python: 3.7 # this works for Linux but is ignored on macOS or Windows
env: SPAWN_BACKEND="mp"
- name: "Python 3.7: trio-run-in-process"
python: 3.7 # this works for Linux but is ignored on macOS or Windows
env: SPAWN_BACKEND="trio_run_in_process"
- name: "Python 3.8: multiprocessing"
python: 3.8 # this works for Linux but is ignored on macOS or Windows
env: SPAWN_BACKEND="mp"
- name: "Python 3.8: trio-run-in-process"
python: 3.8 # this works for Linux but is ignored on macOS or Windows
env: SPAWN_BACKEND="trio_run_in_process"
install:
- cd $TRAVIS_BUILD_DIR
@ -32,4 +43,4 @@ install:
script:
- mypy tractor/ --ignore-missing-imports
- pytest tests/ --no-print-logs
- pytest tests/ --no-print-logs --spawn-backend=${SPAWN_BACKEND}

View File

@ -83,8 +83,6 @@ Its tenets non-comprehensively include:
are greatly appreciated!
.. _concept-in-progress: https://trio.discourse.group/t/structured-concurrency-kickoff/55
.. _pulsar: http://quantmind.github.io/pulsar/design.html
.. _execnet: https://codespeak.net/execnet/
Install
@ -357,14 +355,15 @@ Depending on the function type ``Portal.run()`` tries to
correctly interface exactly like a local version of the remote
built-in Python *function type*. Currently async functions, generators,
and regular functions are supported. Inspiration for this API comes
from the way execnet_ does `remote function execution`_ but without
the client code (necessarily) having to worry about the underlying
channels_ system or shipping code over the network.
`remote function execution`_ but without the client code being
concerned about the underlying channels_ system or shipping code
over the network.
This *portal* approach turns out to be paricularly exciting with the
introduction of `asynchronous generators`_ in Python 3.6! It means that
actors can compose nicely in a data streaming pipeline.
.. _exactly like trio: https://trio.readthedocs.io/en/latest/reference-core.html#cancellation-semantics
Streaming
*********
@ -703,20 +702,54 @@ need to hop into a debugger. You just need to pass the existing
tractor.run(main, arbiter_addr=('192.168.0.10', 1616))
Choosing a ``multiprocessing`` *start method*
*********************************************
``tractor`` supports selection of the `multiprocessing start method`_ via
a ``start_method`` kwarg to ``tractor.run()``. Note that on Windows
*spawn* it the only supported method and on nix systems *forkserver* is
selected by default for speed.
Choosing a process spawning backend
***********************************
``tractor`` is architected to support multiple actor (sub-process)
spawning backends. Specific defaults are chosen based on your system
but you can also explicitly select a backend of choice at startup
via a ``start_method`` kwarg to ``tractor.run()``.
.. _multiprocessing start method: https://docs.python.org/3/library/multiprocessing.html#contexts-and-start-methods
Currently the options available are:
- ``trio_run_in_process``: a ``trio``-native spawner from the `Ethereum community`_
- ``spawn``: one of the stdlib's ``multiprocessing`` `start methods`_
- ``forkserver``: a faster ``multiprocessing`` variant that is Unix only
.. _start methods: https://docs.python.org/3.8/library/multiprocessing.html#contexts-and-start-methods
.. _Ethereum community : https://github.com/ethereum/trio-run-in-process
``trio-run-in-process``
+++++++++++++++++++++++
`trio-run-in-process`_ is a young "pure ``trio``" process spawner
which utilizes the native `trio subprocess APIs`_. It has shown great
reliability under testing for predictable teardown when launching
recursive pools of actors (multiple nurseries deep) and as such has been
chosen as the default backend on \*nix systems.
.. _trio-run-in-process: https://github.com/ethereum/trio-run-in-process
.. _trio subprocess APIs : https://trio.readthedocs.io/en/stable/reference-io.html#spawning-subprocesses
``multiprocessing``
+++++++++++++++++++
There is support for the stdlib's ``multiprocessing`` `start methods`_.
Note that on Windows *spawn* it the only supported method and on \*nix
systems *forkserver* is the best method for speed but has the caveat
that it will break easily (hangs due to broken pipes) if spawning actors
using nested nurseries.
In general, the ``multiprocessing`` backend **has not proven reliable**
for handling errors from actors more then 2 nurseries *deep* (see `#89`_).
If you for some reason need this consider sticking with alternative
backends.
.. _#89: https://github.com/goodboy/tractor/issues/89
Windows "gotchas"
*****************
`tractor` internally uses the stdlib's `multiprocessing` package which
*can* have some gotchas on Windows. Namely, the need for calling
^^^^^^^^^^^^^^^^^
On Windows (which requires the use of the stdlib's `multiprocessing`
package) there are some gotchas. Namely, the need for calling
`freeze_support()`_ inside the ``__main__`` context. Additionally you
may need place you `tractor` program entry point in a seperate
`__main__.py` module in your package in order to avoid an error like the

2
mypy.ini 100644
View File

@ -0,0 +1,2 @@
[mypy]
plugins = trio_typing.plugin

View File

@ -39,6 +39,7 @@ setup(
],
install_requires=[
'msgpack', 'trio>0.8', 'async_generator', 'colorlog', 'wrapt',
'trio_typing', 'trio-run-in-process',
],
tests_require=['pytest'],
python_requires=">=3.7",

View File

@ -2,6 +2,7 @@
``tractor`` testing!!
"""
import random
import platform
import pytest
import tractor
@ -13,8 +14,28 @@ _arb_addr = '127.0.0.1', random.randint(1000, 9999)
def pytest_addoption(parser):
parser.addoption("--ll", action="store", dest='loglevel',
default=None, help="logging level to set when testing")
parser.addoption(
"--ll", action="store", dest='loglevel',
default=None, help="logging level to set when testing"
)
parser.addoption(
"--spawn-backend", action="store", dest='spawn_backend',
default='trio_run_in_process',
help="Processing spawning backend to use for test run",
)
def pytest_configure(config):
backend = config.option.spawn_backend
if platform.system() == "Windows":
backend = 'mp'
if backend == 'mp':
tractor._spawn.try_set_start_method('spawn')
elif backend == 'trio_run_in_process':
tractor._spawn.try_set_start_method(backend)
@pytest.fixture(scope='session', autouse=True)
@ -31,11 +52,26 @@ def arb_addr():
def pytest_generate_tests(metafunc):
spawn_backend = metafunc.config.option.spawn_backend
if not spawn_backend:
# XXX some weird windows bug with `pytest`?
spawn_backend = 'mp'
assert spawn_backend in ('mp', 'trio_run_in_process')
if 'start_method' in metafunc.fixturenames:
if spawn_backend == 'mp':
from multiprocessing import get_all_start_methods
methods = get_all_start_methods()
if 'fork' in methods: # fork not available on windows, so check before removing
# XXX: the fork method is in general incompatible with
# trio's global scheduler state
if 'fork' in methods:
# fork not available on windows, so check before
# removing XXX: the fork method is in general
# incompatible with trio's global scheduler state
methods.remove('fork')
elif spawn_backend == 'trio_run_in_process':
if platform.system() == "Windows":
pytest.fail(
"Only `--spawn-backend=mp` is supported on Windows")
methods = ['trio_run_in_process']
metafunc.parametrize("start_method", methods, scope='module')

View File

@ -289,11 +289,18 @@ async def test_nested_multierrors(loglevel, start_method):
This test goes only 2 nurseries deep but we should eventually have tests
for arbitrary n-depth actor trees.
"""
# XXX: forkserver can't seem to handle any more then 2 depth
# process trees for whatever reason.
# Any more process levels then this and we start getting pretty slow anyway
if start_method == 'trio_run_in_process':
depth = 3
subactor_breadth = 2
else:
# XXX: multiprocessing can't seem to handle any more then 2 depth
# process trees for whatever reason.
# Any more process levels then this and we see bugs that cause
# hangs and broken pipes all over the place...
if start_method == 'forkserver':
pytest.skip("Forksever sux hard at nested spawning...")
depth = 2
subactor_breadth = 2
with trio.fail_after(120):
try:

View File

@ -60,7 +60,7 @@ def test_rpc_errors(arb_addr, to_call, testdir):
if exposed_mods == ['tmp_mod']:
# create an importable module with a bad import
testdir.syspathinsert()
# module should cause raise a ModuleNotFoundError at import
# module should raise a ModuleNotFoundError at import
testdir.makefile('.py', tmp_mod=funcname)
# no need to exposed module to the subactor
@ -69,7 +69,9 @@ def test_rpc_errors(arb_addr, to_call, testdir):
func_defined = False
# subactor should not try to invoke anything
subactor_requests_to = None
remote_err = trio.MultiError
# the module will be attempted to be imported locally but will
# fail in the initial local instance of the actor
remote_err = inside_err
async def main():
actor = tractor.current_actor()
@ -100,7 +102,7 @@ def test_rpc_errors(arb_addr, to_call, testdir):
if exposed_mods and func_defined:
run()
else:
# underlying errors are propagated upwards (yet)
# underlying errors aren't propagated upwards (yet)
with pytest.raises(remote_err) as err:
run()
@ -114,4 +116,5 @@ def test_rpc_errors(arb_addr, to_call, testdir):
value.exceptions
))
if getattr(value, 'type', None):
assert value.type is inside_err

View File

@ -99,15 +99,17 @@ def run(
name: Optional[str] = None,
arbiter_addr: Tuple[str, int] = (
_default_arbiter_host, _default_arbiter_port),
# the `multiprocessing` start method:
# either the `multiprocessing` start method:
# https://docs.python.org/3/library/multiprocessing.html#contexts-and-start-methods
start_method: str = 'forkserver',
# OR `trio_run_in_process` (the new default).
start_method: Optional[str] = None,
**kwargs,
) -> Any:
"""Run a trio-actor async function in process.
This is tractor's main entry and the start point for any async actor.
"""
if start_method is not None:
_spawn.try_set_start_method(start_method)
return trio.run(_main, async_fn, args, kwargs, arbiter_addr, name)

View File

@ -5,10 +5,14 @@ from collections import defaultdict
from functools import partial
from itertools import chain
import importlib
import importlib.util
import inspect
import uuid
import typing
from typing import Dict, List, Tuple, Any, Optional
from types import ModuleType
import sys
import os
import trio # type: ignore
from trio_typing import TaskStatus
@ -25,6 +29,7 @@ from ._exceptions import (
from ._discovery import get_arbiter
from ._portal import Portal
from . import _state
from . import _mp_fixup_main
log = get_logger('tractor')
@ -148,6 +153,10 @@ async def _invoke(
actor._ongoing_rpc_tasks.set()
def _get_mod_abspath(module):
return os.path.abspath(module.__file__)
class Actor:
"""The fundamental concurrency primitive.
@ -162,6 +171,14 @@ class Actor:
_root_nursery: trio.Nursery
_server_nursery: trio.Nursery
# marked by the process spawning backend at startup
# will be None for the parent most process started manually
# by the user (currently called the "arbiter")
_spawn_method: Optional[str] = None
# Information about `__main__` from parent
_parent_main_data: Dict[str, str]
def __init__(
self,
name: str,
@ -171,10 +188,23 @@ class Actor:
loglevel: str = None,
arbiter_addr: Optional[Tuple[str, int]] = None,
) -> None:
"""This constructor is called in the parent actor **before** the spawning
phase (aka before a new process is executed).
"""
self.name = name
self.uid = (name, uid or str(uuid.uuid4()))
self.rpc_module_paths = rpc_module_paths
self._mods: dict = {}
# retreive and store parent `__main__` data which
# will be passed to children
self._parent_main_data = _mp_fixup_main._mp_figure_out_main()
mods = {}
for name in rpc_module_paths or ():
mod = importlib.import_module(name)
mods[name] = _get_mod_abspath(mod)
self.rpc_module_paths = mods
self._mods: Dict[str, ModuleType] = {}
# TODO: consider making this a dynamically defined
# @dataclass once we get py3.7
@ -225,11 +255,34 @@ class Actor:
the original nursery we need to try and load the local module
code (if it exists).
"""
for path in self.rpc_module_paths:
log.debug(f"Attempting to import {path}")
self._mods[path] = importlib.import_module(path)
try:
if self._spawn_method == 'trio_run_in_process':
parent_data = self._parent_main_data
if 'init_main_from_name' in parent_data:
_mp_fixup_main._fixup_main_from_name(
parent_data['init_main_from_name'])
elif 'init_main_from_path' in parent_data:
_mp_fixup_main._fixup_main_from_path(
parent_data['init_main_from_path'])
for modpath, filepath in self.rpc_module_paths.items():
# XXX append the allowed module to the python path which
# should allow for relative (at least downward) imports.
sys.path.append(os.path.dirname(filepath))
log.debug(f"Attempting to import {modpath}@{filepath}")
self._mods[modpath] = importlib.import_module(modpath)
except ModuleNotFoundError:
# it is expected the corresponding `ModuleNotExposed` error
# will be raised later
log.error(f"Failed to import {modpath} in {self.name}")
raise
def _get_rpc_func(self, ns, funcname):
if ns == "__mp_main__":
# In subprocesses, `__main__` will actually map to
# `__mp_main__` which should be the same entry-point-module
# as the parent.
ns = "__main__"
try:
return getattr(self._mods[ns], funcname)
except KeyError as err:
@ -488,7 +541,7 @@ class Actor:
f"Exiting msg loop for {chan} from {chan.uid} "
f"with last msg:\n{msg}")
def _fork_main(
def _mp_main(
self,
accept_addr: Tuple[str, int],
forkserver_info: Tuple[Any, Any, Any, Any, Any],
@ -500,13 +553,18 @@ class Actor:
self._forkserver_info = forkserver_info
from ._spawn import try_set_start_method
spawn_ctx = try_set_start_method(start_method)
if self.loglevel is not None:
log.info(
f"Setting loglevel for {self.uid} to {self.loglevel}")
get_console_log(self.loglevel)
assert spawn_ctx
log.info(
f"Started new {spawn_ctx.current_process()} for {self.uid}")
_state._current_actor = self
log.debug(f"parent_addr is {parent_addr}")
try:
trio.run(partial(
@ -515,6 +573,26 @@ class Actor:
pass # handle it the same way trio does?
log.info(f"Actor {self.uid} terminated")
async def _trip_main(
self,
accept_addr: Tuple[str, int],
parent_addr: Tuple[str, int] = None
) -> None:
"""Entry point for a `trio_run_in_process` subactor.
Here we don't need to call `trio.run()` since trip does that as
part of its subprocess startup sequence.
"""
if self.loglevel is not None:
log.info(
f"Setting loglevel for {self.uid} to {self.loglevel}")
get_console_log(self.loglevel)
log.info(f"Started new TRIP process for {self.uid}")
_state._current_actor = self
await self._async_main(accept_addr, parent_addr=parent_addr)
log.info(f"Actor {self.uid} terminated")
async def _async_main(
self,
accept_addr: Tuple[str, int],
@ -598,9 +676,13 @@ class Actor:
f"Failed to ship error to parent "
f"{self._parent_chan.uid}, channel was closed")
log.exception("Actor errored:")
if isinstance(err, ModuleNotFoundError):
raise
else:
# XXX wait, why?
# causes a hang if I always raise..
# A parent process does something weird here?
raise
finally:
@ -649,8 +731,8 @@ class Actor:
port=accept_port, host=accept_host,
)
)
log.debug(
f"Started tcp server(s) on {[l.socket for l in listeners]}") # type: ignore
log.debug(f"Started tcp server(s) on"
" {[l.socket for l in listeners]}") # type: ignore
self._listeners.extend(listeners)
task_status.started()

View File

@ -0,0 +1,100 @@
"""
Helpers pulled mostly verbatim from ``multiprocessing.spawn``
to aid with "fixing up" the ``__main__`` module in subprocesses.
These helpers are needed for any spawing backend that doesn't already handle this.
For example when using ``trio_run_in_process`` it is needed but obviously not when
we're already using ``multiprocessing``.
"""
import os
import sys
import platform
import types
import runpy
from typing import Dict
ORIGINAL_DIR = os.path.abspath(os.getcwd())
def _mp_figure_out_main() -> Dict[str, str]:
"""Taken from ``multiprocessing.spawn.get_preparation_data()``.
Retrieve parent actor `__main__` module data.
"""
d = {}
# Figure out whether to initialise main in the subprocess as a module
# or through direct execution (or to leave it alone entirely)
main_module = sys.modules['__main__']
main_mod_name = getattr(main_module.__spec__, "name", None)
if main_mod_name is not None:
d['init_main_from_name'] = main_mod_name
# elif sys.platform != 'win32' or (not WINEXE and not WINSERVICE):
elif platform.system() != 'Windows':
main_path = getattr(main_module, '__file__', None)
if main_path is not None:
if (
not os.path.isabs(main_path) and (
ORIGINAL_DIR is not None)
):
# process.ORIGINAL_DIR is not None):
# main_path = os.path.join(process.ORIGINAL_DIR, main_path)
main_path = os.path.join(ORIGINAL_DIR, main_path)
d['init_main_from_path'] = os.path.normpath(main_path)
return d
# Multiprocessing module helpers to fix up the main module in
# spawned subprocesses
def _fixup_main_from_name(mod_name: str) -> None:
# __main__.py files for packages, directories, zip archives, etc, run
# their "main only" code unconditionally, so we don't even try to
# populate anything in __main__, nor do we make any changes to
# __main__ attributes
current_main = sys.modules['__main__']
if mod_name == "__main__" or mod_name.endswith(".__main__"):
return
# If this process was forked, __main__ may already be populated
if getattr(current_main.__spec__, "name", None) == mod_name:
return
# Otherwise, __main__ may contain some non-main code where we need to
# support unpickling it properly. We rerun it as __mp_main__ and make
# the normal __main__ an alias to that
# old_main_modules.append(current_main)
main_module = types.ModuleType("__mp_main__")
main_content = runpy.run_module(mod_name,
run_name="__mp_main__",
alter_sys=True)
main_module.__dict__.update(main_content)
sys.modules['__main__'] = sys.modules['__mp_main__'] = main_module
def _fixup_main_from_path(main_path: str) -> None:
# If this process was forked, __main__ may already be populated
current_main = sys.modules['__main__']
# Unfortunately, the main ipython launch script historically had no
# "if __name__ == '__main__'" guard, so we work around that
# by treating it like a __main__.py file
# See https://github.com/ipython/ipython/issues/4698
main_name = os.path.splitext(os.path.basename(main_path))[0]
if main_name == 'ipython':
return
# Otherwise, if __file__ already has the setting we expect,
# there's nothing more to do
if getattr(current_main, '__file__', None) == main_path:
return
# If the parent process has sent a path through rather than a module
# name we assume it is an executable script that may contain
# non-main code that needs to be executed
# old_main_modules.append(current_main)
main_module = types.ModuleType("__mp_main__")
main_content = runpy.run_path(main_path,
run_name="__mp_main__")
main_module.__dict__.update(main_content)
sys.modules['__main__'] = sys.modules['__mp_main__'] = main_module

View File

@ -1,9 +1,14 @@
"""
Process spawning.
Mostly just wrapping around ``multiprocessing``.
Machinery for actor process spawning using multiple backends.
"""
import inspect
import multiprocessing as mp
import platform
from typing import Any, Dict, Optional
import trio
from trio_typing import TaskStatus
from async_generator import aclosing
try:
from multiprocessing import semaphore_tracker # type: ignore
@ -18,34 +23,65 @@ from typing import Tuple
from . import _forkserver_override
from ._state import current_actor
from ._actor import Actor
from .log import get_logger
from ._portal import Portal
from ._actor import Actor, ActorFailure
_ctx: mp.context.BaseContext = mp.get_context("spawn") # type: ignore
log = get_logger('tractor')
# use trip as our default on *nix systems for now
if platform.system() != 'Windows':
_spawn_method: str = "trio_run_in_process"
else:
_spawn_method = "spawn"
_ctx: Optional[mp.context.BaseContext] = None
def try_set_start_method(name: str) -> mp.context.BaseContext:
"""Attempt to set the start method for ``multiprocess.Process`` spawning.
if platform.system() == 'Windows':
async def proc_waiter(proc: mp.Process) -> None:
await trio.hazmat.WaitForSingleObject(proc.sentinel)
else:
import trio_run_in_process
If the desired method is not supported the sub-interpreter (aka "spawn"
method) is used.
async def proc_waiter(proc: mp.Process) -> None:
await trio.hazmat.wait_readable(proc.sentinel)
def try_set_start_method(name: str) -> Optional[mp.context.BaseContext]:
"""Attempt to set the start method for process starting, aka the "actor
spawning backend".
If the desired method is not supported this function will error. On Windows
the only supported option is the ``multiprocessing`` "spawn" method. The default
on *nix systems is ``trio_run_in_process``.
"""
global _ctx
global _spawn_method
allowed = mp.get_all_start_methods()
methods = mp.get_all_start_methods()
if 'fork' in methods:
# forking is incompatible with ``trio``s global task tree
methods.remove('fork')
if name not in allowed:
name = 'spawn'
elif name == 'fork':
# no Windows support for trip yet
if platform.system() != 'Windows':
methods += ['trio_run_in_process']
if name not in methods:
raise ValueError(
"`fork` is unsupported due to incompatibility with `trio`"
f"Spawn method `{name}` is invalid please choose one of {methods}"
)
elif name == 'forkserver':
_forkserver_override.override_stdlib()
assert name in allowed
_ctx = mp.get_context(name)
elif name == 'trio_run_in_process':
_ctx = None
else:
_ctx = mp.get_context(name)
_spawn_method = name
return _ctx
@ -55,16 +91,116 @@ def is_main_process() -> bool:
return mp.current_process().name == 'MainProcess'
def new_proc(
name: str,
async def exhaust_portal(
portal: Portal,
actor: Actor
) -> Any:
"""Pull final result from portal (assuming it has one).
If the main task is an async generator do our best to consume
what's left of it.
"""
try:
log.debug(f"Waiting on final result from {actor.uid}")
final = res = await portal.result()
# if it's an async-gen then alert that we're cancelling it
if inspect.isasyncgen(res):
final = []
log.warning(
f"Blindly consuming asyncgen for {actor.uid}")
with trio.fail_after(1):
async with aclosing(res) as agen:
async for item in agen:
log.debug(f"Consuming item {item}")
final.append(item)
except (Exception, trio.MultiError) as err:
# we reraise in the parent task via a ``trio.MultiError``
return err
else:
return final
async def cancel_on_completion(
portal: Portal,
actor: Actor,
errors: Dict[Tuple[str, str], Exception],
task_status: TaskStatus[trio.CancelScope] = trio.TASK_STATUS_IGNORED,
) -> None:
"""Cancel actor gracefully once it's "main" portal's
result arrives.
Should only be called for actors spawned with `run_in_actor()`.
"""
with trio.CancelScope() as cs:
task_status.started(cs)
# if this call errors we store the exception for later
# in ``errors`` which will be reraised inside
# a MultiError and we still send out a cancel request
result = await exhaust_portal(portal, actor)
if isinstance(result, Exception):
errors[actor.uid] = result
log.warning(
f"Cancelling {portal.channel.uid} after error {result}"
)
else:
log.info(
f"Cancelling {portal.channel.uid} gracefully "
"after result {result}")
# cancel the process now that we have a final result
await portal.cancel_actor()
async def new_proc(
name: str,
actor_nursery: 'ActorNursery', # type: ignore
subactor: Actor,
errors: Dict[Tuple[str, str], Exception],
# passed through to actor main
bind_addr: Tuple[str, int],
parent_addr: Tuple[str, int],
) -> mp.Process:
use_trio_run_in_process: bool = False,
task_status: TaskStatus[Portal] = trio.TASK_STATUS_IGNORED
) -> None:
"""Create a new ``multiprocessing.Process`` using the
spawn method as configured using ``try_set_start_method()``.
"""
cancel_scope = None
# mark the new actor with the global spawn method
subactor._spawn_method = _spawn_method
async with trio.open_nursery() as nursery:
if use_trio_run_in_process or _spawn_method == 'trio_run_in_process':
# trio_run_in_process
async with trio_run_in_process.open_in_process(
subactor._trip_main,
bind_addr,
parent_addr,
) as proc:
log.info(f"Started {proc}")
# wait for actor to spawn and connect back to us
# channel should have handshake completed by the
# local actor by the time we get a ref to it
event, chan = await actor_nursery._actor.wait_for_peer(
subactor.uid)
portal = Portal(chan)
actor_nursery._children[subactor.uid] = (
subactor, proc, portal)
task_status.started(portal)
# wait for ActorNursery.wait() to be called
await actor_nursery._join_procs.wait()
if portal in actor_nursery._cancel_after_result_on_exit:
cancel_scope = await nursery.start(
cancel_on_completion, portal, subactor, errors)
# TRIP blocks here until process is complete
else:
# `multiprocessing`
assert _ctx
start_method = _ctx.get_start_method()
if start_method == 'forkserver':
# XXX do our hackery on the stdlib to avoid multiple
@ -72,15 +208,17 @@ def new_proc(
fs = forkserver._forkserver
curr_actor = current_actor()
if is_main_process() and not curr_actor._forkserver_info:
# if we're the "main" process start the forkserver only once
# and pass its ipc info to downstream children
# if we're the "main" process start the forkserver
# only once and pass its ipc info to downstream
# children
# forkserver.set_forkserver_preload(rpc_module_paths)
forkserver.ensure_running()
fs_info = (
fs._forkserver_address,
fs._forkserver_alive_fd,
getattr(fs, '_forkserver_pid', None),
getattr(resource_tracker._resource_tracker, '_pid', None),
getattr(
resource_tracker._resource_tracker, '_pid', None),
resource_tracker._resource_tracker._fd,
)
else:
@ -95,8 +233,8 @@ def new_proc(
else:
fs_info = (None, None, None, None, None)
return _ctx.Process( # type: ignore
target=actor._fork_main,
proc = _ctx.Process( # type: ignore
target=subactor._mp_main,
args=(
bind_addr,
fs_info,
@ -106,3 +244,55 @@ def new_proc(
# daemon=True,
name=name,
)
# `multiprocessing` only (since no async interface):
# register the process before start in case we get a cancel
# request before the actor has fully spawned - then we can wait
# for it to fully come up before sending a cancel request
actor_nursery._children[subactor.uid] = (subactor, proc, None)
proc.start()
if not proc.is_alive():
raise ActorFailure("Couldn't start sub-actor?")
log.info(f"Started {proc}")
# wait for actor to spawn and connect back to us
# channel should have handshake completed by the
# local actor by the time we get a ref to it
event, chan = await actor_nursery._actor.wait_for_peer(
subactor.uid)
portal = Portal(chan)
actor_nursery._children[subactor.uid] = (subactor, proc, portal)
# unblock parent task
task_status.started(portal)
# wait for ``ActorNursery`` block to signal that
# subprocesses can be waited upon.
# This is required to ensure synchronization
# with user code that may want to manually await results
# from nursery spawned sub-actors. We don't want the
# containing nurseries here to collect results or error
# while user code is still doing it's thing. Only after the
# nursery block closes do we allow subactor results to be
# awaited and reported upwards to the supervisor.
await actor_nursery._join_procs.wait()
if portal in actor_nursery._cancel_after_result_on_exit:
cancel_scope = await nursery.start(
cancel_on_completion, portal, subactor, errors)
# TODO: timeout block here?
if proc.is_alive():
await proc_waiter(proc)
proc.join()
log.debug(f"Joined {proc}")
# pop child entry to indicate we are no longer managing this subactor
subactor, proc, portal = actor_nursery._children.pop(subactor.uid)
# cancel result waiter that may have been spawned in
# tandem if not done already
if cancel_scope:
log.warning(
f"Cancelling existing result waiter task for {subactor.uid}")
cancel_scope.cancel()

View File

@ -1,39 +1,37 @@
"""
``trio`` inspired apis and helpers
"""
import inspect
import platform
import multiprocessing as mp
from typing import Tuple, List, Dict, Optional, Any
import typing
import trio
from async_generator import asynccontextmanager, aclosing
from async_generator import asynccontextmanager
from ._state import current_actor
from .log import get_logger, get_loglevel
from ._actor import Actor, ActorFailure
from ._actor import Actor # , ActorFailure
from ._portal import Portal
from . import _spawn
if platform.system() == 'Windows':
async def proc_waiter(proc: mp.Process) -> None:
await trio.hazmat.WaitForSingleObject(proc.sentinel)
else:
async def proc_waiter(proc: mp.Process) -> None:
await trio.hazmat.wait_readable(proc.sentinel)
log = get_logger('tractor')
class ActorNursery:
"""Spawn scoped subprocess actors.
"""
def __init__(self, actor: Actor) -> None:
def __init__(
self,
actor: Actor,
ria_nursery: trio.Nursery,
da_nursery: trio.Nursery,
errors: Dict[Tuple[str, str], Exception],
) -> None:
# self.supervisor = supervisor # TODO
self._actor: Actor = actor
self._ria_nursery = ria_nursery
self._da_nursery = da_nursery
self._children: Dict[
Tuple[str, str],
Tuple[Actor, mp.Process, Optional[Portal]]
@ -42,9 +40,8 @@ class ActorNursery:
# cancelled when their "main" result arrives
self._cancel_after_result_on_exit: set = set()
self.cancelled: bool = False
async def __aenter__(self):
return self
self._join_procs = trio.Event()
self.errors = errors
async def start_actor(
self,
@ -53,9 +50,11 @@ class ActorNursery:
statespace: Optional[Dict[str, Any]] = None,
rpc_module_paths: List[str] = None,
loglevel: str = None, # set log level per subactor
nursery: trio.Nursery = None,
) -> Portal:
loglevel = loglevel or self._actor.loglevel or get_loglevel()
actor = Actor(
subactor = Actor(
name,
# modules allowed to invoked funcs from
rpc_module_paths=rpc_module_paths or [],
@ -64,37 +63,29 @@ class ActorNursery:
arbiter_addr=current_actor()._arb_addr,
)
parent_addr = self._actor.accept_addr
proc = _spawn.new_proc(
assert parent_addr
# start a task to spawn a process
# blocks until process has been started and a portal setup
nursery = nursery or self._da_nursery
# XXX: the type ignore is actually due to a `mypy` bug
return await nursery.start( # type: ignore
_spawn.new_proc,
name,
actor,
self,
subactor,
self.errors,
bind_addr,
parent_addr,
)
# register the process before start in case we get a cancel
# request before the actor has fully spawned - then we can wait
# for it to fully come up before sending a cancel request
self._children[actor.uid] = (actor, proc, None)
proc.start()
if not proc.is_alive():
raise ActorFailure("Couldn't start sub-actor?")
log.info(f"Started {proc}")
# wait for actor to spawn and connect back to us
# channel should have handshake completed by the
# local actor by the time we get a ref to it
event, chan = await self._actor.wait_for_peer(actor.uid)
portal = Portal(chan)
self._children[actor.uid] = (actor, proc, portal)
return portal
async def run_in_actor(
self,
name: str,
fn: typing.Callable,
bind_addr: Tuple[str, int] = ('127.0.0.1', 0),
rpc_module_paths: List[str] = [],
rpc_module_paths: Optional[List[str]] = None,
statespace: Dict[str, Any] = None,
loglevel: str = None, # set log level per subactor
**kwargs, # explicit args to ``fn``
@ -109,13 +100,15 @@ class ActorNursery:
mod_path = fn.__module__
portal = await self.start_actor(
name,
rpc_module_paths=[mod_path] + rpc_module_paths,
rpc_module_paths=[mod_path] + (rpc_module_paths or []),
bind_addr=bind_addr,
statespace=statespace,
loglevel=loglevel,
# use the run_in_actor nursery
nursery=self._ria_nursery,
)
# this marks the actor to be cancelled after its portal result
# is retreived, see ``wait()`` below.
# is retreived, see logic in `open_nursery()` below.
self._cancel_after_result_on_exit.add(portal)
await portal._submit_for_result(
mod_path,
@ -124,136 +117,6 @@ class ActorNursery:
)
return portal
async def wait(self) -> None:
"""Wait for all subactors to complete.
This is probably the most complicated (and confusing, sorry)
function that does all the clever crap to deal with cancellation,
error propagation, and graceful subprocess tear down.
"""
async def exhaust_portal(portal, actor):
"""Pull final result from portal (assuming it has one).
If the main task is an async generator do our best to consume
what's left of it.
"""
try:
log.debug(f"Waiting on final result from {actor.uid}")
final = res = await portal.result()
# if it's an async-gen then alert that we're cancelling it
if inspect.isasyncgen(res):
final = []
log.warning(
f"Blindly consuming asyncgen for {actor.uid}")
with trio.fail_after(1):
async with aclosing(res) as agen:
async for item in agen:
log.debug(f"Consuming item {item}")
final.append(item)
except (Exception, trio.MultiError) as err:
# we reraise in the parent task via a ``trio.MultiError``
return err
else:
return final
async def cancel_on_completion(
portal: Portal,
actor: Actor,
task_status=trio.TASK_STATUS_IGNORED,
) -> None:
"""Cancel actor gracefully once it's "main" portal's
result arrives.
Should only be called for actors spawned with `run_in_actor()`.
"""
with trio.CancelScope() as cs:
task_status.started(cs)
# if this call errors we store the exception for later
# in ``errors`` which will be reraised inside
# a MultiError and we still send out a cancel request
result = await exhaust_portal(portal, actor)
if isinstance(result, Exception):
errors.append(result)
log.warning(
f"Cancelling {portal.channel.uid} after error {result}"
)
else:
log.info(f"Cancelling {portal.channel.uid} gracefully")
# cancel the process now that we have a final result
await portal.cancel_actor()
# XXX: lol, this will never get run without a shield above..
# if cs.cancelled_caught:
# log.warning(
# "Result waiter was cancelled, process may have died")
async def wait_for_proc(
proc: mp.Process,
actor: Actor,
cancel_scope: Optional[trio.CancelScope] = None,
) -> None:
# TODO: timeout block here?
if proc.is_alive():
await proc_waiter(proc)
# please god don't hang
proc.join()
log.debug(f"Joined {proc}")
# indicate we are no longer managing this subactor
self._children.pop(actor.uid)
# proc terminated, cancel result waiter that may have
# been spawned in tandem if not done already
if cancel_scope:
log.warning(
f"Cancelling existing result waiter task for {actor.uid}")
cancel_scope.cancel()
log.debug(f"Waiting on all subactors to complete")
# since we pop each child subactor on termination,
# iterate a copy
children = self._children.copy()
errors: List[Exception] = []
# wait on run_in_actor() tasks, unblocks when all complete
async with trio.open_nursery() as nursery:
for subactor, proc, portal in children.values():
cs = None
# portal from ``run_in_actor()``
if portal in self._cancel_after_result_on_exit:
assert portal
cs = await nursery.start(
cancel_on_completion, portal, subactor)
# TODO: how do we handle remote host spawned actors?
nursery.start_soon(
wait_for_proc, proc, subactor, cs)
if errors:
if not self.cancelled:
# bubble up error(s) here and expect to be called again
# once the nursery has been cancelled externally (ex.
# from within __aexit__() if an error is caught around
# ``self.wait()`` then, ``self.cancel()`` is called
# immediately, in the default supervisor strat, after
# which in turn ``self.wait()`` is called again.)
raise trio.MultiError(errors)
# wait on all `start_actor()` subactors to complete
# if errors were captured above and we have not been cancelled
# then these ``start_actor()`` spawned actors will block until
# cancelled externally
children = self._children.copy()
async with trio.open_nursery() as nursery:
for subactor, proc, portal in children.values():
# TODO: how do we handle remote host spawned actors?
assert portal
nursery.start_soon(wait_for_proc, proc, subactor, cs)
log.debug(f"All subactors for {self} have terminated")
if errors:
# always raise any error if we're also cancelled
raise trio.MultiError(errors)
async def cancel(self, hard_kill: bool = False) -> None:
"""Cancel this nursery by instructing each subactor to cancel
itself and wait for all subactors to terminate.
@ -270,7 +133,7 @@ class ActorNursery:
log.debug(f"Cancelling nursery")
with trio.move_on_after(3) as cs:
async with trio.open_nursery() as n:
async with trio.open_nursery() as nursery:
for subactor, proc, portal in self._children.values():
if hard_kill:
do_hard_kill(proc)
@ -297,68 +160,137 @@ class ActorNursery:
# spawn cancel tasks for each sub-actor
assert portal
n.start_soon(portal.cancel_actor)
nursery.start_soon(portal.cancel_actor)
# if we cancelled the cancel (we hung cancelling remote actors)
# then hard kill all sub-processes
if cs.cancelled_caught:
log.error(f"Failed to gracefully cancel {self}, hard killing!")
async with trio.open_nursery() as n:
async with trio.open_nursery():
for subactor, proc, portal in self._children.values():
n.start_soon(do_hard_kill, proc)
nursery.start_soon(do_hard_kill, proc)
# mark ourselves as having (tried to have) cancelled all subactors
self.cancelled = True
await self.wait()
self._join_procs.set()
async def __aexit__(self, etype, value, tb):
"""Wait on all subactor's main routines to complete.
@asynccontextmanager
async def open_nursery() -> typing.AsyncGenerator[ActorNursery, None]:
"""Create and yield a new ``ActorNursery`` to be used for spawning
structured concurrent subactors.
When an actor is spawned a new trio task is started which
invokes one of the process spawning backends to create and start
a new subprocess. These tasks are started by one of two nurseries
detailed below. The reason for spawning processes from within
a new task is because ``trio_run_in_process`` itself creates a new
internal nursery and the same task that opens a nursery **must**
close it. It turns out this approach is probably more correct
anyway since it is more clear from the following nested nurseries
which cancellation scopes correspond to each spawned subactor set.
"""
# XXX: this is effectively the (for now) lone
# cancellation/supervisor strategy (one-cancels-all)
# which exactly mimicks trio's behaviour
if etype is not None:
actor = current_actor()
if not actor:
raise RuntimeError("No actor instance has been defined yet?")
# the collection of errors retreived from spawned sub-actors
errors: Dict[Tuple[str, str], Exception] = {}
# This is the outermost level "deamon actor" nursery. It is awaited
# **after** the below inner "run in actor nursery". This allows for
# handling errors that are generated by the inner nursery in
# a supervisor strategy **before** blocking indefinitely to wait for
# actors spawned in "daemon mode" (aka started using
# ``ActorNursery.start_actor()``).
async with trio.open_nursery() as da_nursery:
try:
# This is the inner level "run in actor" nursery. It is
# awaited first since actors spawned in this way (using
# ``ActorNusery.run_in_actor()``) are expected to only
# return a single result and then complete (i.e. be canclled
# gracefully). Errors collected from these actors are
# immediately raised for handling by a supervisor strategy.
# As such if the strategy propagates any error(s) upwards
# the above "daemon actor" nursery will be notified.
async with trio.open_nursery() as ria_nursery:
anursery = ActorNursery(
actor, ria_nursery, da_nursery, errors
)
try:
# spawning of actors happens in the caller's scope
# after we yield upwards
yield anursery
log.debug(
f"Waiting on subactors {anursery._children}"
"to complete"
)
except (BaseException, Exception) as err:
# if the caller's scope errored then we activate our
# one-cancels-all supervisor strategy (don't
# worry more are coming).
anursery._join_procs.set()
try:
# XXX: hypothetically an error could be raised and then
# a cancel signal shows up slightly after in which case
# the `else:` block here might not complete?
# For now, shield both.
with trio.CancelScope(shield=True):
if etype in (trio.Cancelled, KeyboardInterrupt):
if err in (trio.Cancelled, KeyboardInterrupt):
log.warning(
f"Nursery for {current_actor().uid} was "
f"cancelled with {etype}")
f"cancelled with {err}")
else:
log.exception(
f"Nursery for {current_actor().uid} "
f"errored with {etype}, ")
await self.cancel()
f"errored with {err}, ")
# cancel all subactors
await anursery.cancel()
except trio.MultiError as merr:
if value not in merr.exceptions:
raise trio.MultiError(merr.exceptions + [value])
raise
# If we receive additional errors while waiting on
# remaining subactors that were cancelled,
# aggregate those errors with the original error
# that triggered this teardown.
if err not in merr.exceptions:
raise trio.MultiError(merr.exceptions + [err])
else:
log.debug(f"Waiting on subactors {self._children} to complete")
try:
await self.wait()
except (Exception, trio.MultiError) as err:
log.warning(f"Nursery cancelling due to {err}")
if self._children:
with trio.CancelScope(shield=True):
await self.cancel()
raise
# Last bit before first nursery block ends in the case
# where we didn't error in the caller's scope
log.debug(f"Waiting on all subactors to complete")
anursery._join_procs.set()
# ria_nursery scope end
except (Exception, trio.MultiError) as err:
# If actor-local error was raised while waiting on
# ".run_in_actor()" actors then we also want to cancel all
# remaining sub-actors (due to our lone strategy:
# one-cancels-all).
log.warning(f"Nursery cancelling due to {err}")
if anursery._children:
with trio.CancelScope(shield=True):
await anursery.cancel()
raise
finally:
# No errors were raised while awaiting ".run_in_actor()"
# actors but those actors may have returned remote errors as
# results (meaning they errored remotely and have relayed
# those errors back to this parent actor). The errors are
# collected in ``errors`` so cancel all actors, summarize
# all errors and re-raise.
if errors:
if anursery._children:
with trio.CancelScope(shield=True):
await anursery.cancel()
if len(errors) > 1:
raise trio.MultiError(tuple(errors.values()))
else:
raise list(errors.values())[0]
# ria_nursery scope end
log.debug(f"Nursery teardown complete")
@asynccontextmanager
async def open_nursery() -> typing.AsyncGenerator[ActorNursery, None]:
"""Create and yield a new ``ActorNursery``.
"""
actor = current_actor()
if not actor:
raise RuntimeError("No actor instance has been defined yet?")
# TODO: figure out supervisors from erlang
async with ActorNursery(actor) as nursery:
yield nursery

View File

@ -1,4 +1,5 @@
import inspect
import platform
from functools import partial, wraps
from .. import run
@ -19,6 +20,7 @@ def tractor_test(fn):
- ``arb_addr`` (a socket addr tuple where arbiter is listening)
- ``loglevel`` (logging level passed to tractor internals)
- ``start_method`` (subprocess spawning backend)
are defined in the `pytest` fixture space they will be automatically
injected to tests declaring these funcargs.
@ -28,7 +30,7 @@ def tractor_test(fn):
*args,
loglevel=None,
arb_addr=None,
start_method='forkserver',
start_method=None,
**kwargs
):
# __tracebackhide__ = True
@ -40,9 +42,15 @@ def tractor_test(fn):
# allows test suites to define a 'loglevel' fixture
# that activates the internal logging
kwargs['loglevel'] = loglevel
if start_method is None:
if platform.system() == "Windows":
start_method = 'spawn'
else:
start_method = 'trio_run_in_process'
if 'start_method' in inspect.signature(fn).parameters:
# allows test suites to define a 'loglevel' fixture
# that activates the internal logging
# set of subprocess spawning backends
kwargs['start_method'] = start_method
return run(
partial(fn, *args, **kwargs),