diff --git a/.travis.yml b/.travis.yml index d8f58be..6e57aed 100644 --- a/.travis.yml +++ b/.travis.yml @@ -4,7 +4,7 @@ sudo: required matrix: include: - - name: "Windows, Python Latest" + - name: "Windows, Python Latest: multiprocessing" os: windows language: sh python: 3.x # only works on linux @@ -13,7 +13,7 @@ matrix: - export PATH="/c/Python:/c/Python/Scripts:$PATH" - python -m pip install --upgrade pip wheel - - name: "Windows, Python 3.7" + - name: "Windows, Python 3.7: multiprocessing" os: windows python: 3.7 # only works on linux language: sh @@ -22,8 +22,19 @@ matrix: - export PATH="/c/Python:/c/Python/Scripts:$PATH" - python -m pip install --upgrade pip wheel - - python: 3.7 # this works for Linux but is ignored on macOS or Windows - - python: 3.8 + - name: "Python 3.7: multiprocessing" + python: 3.7 # this works for Linux but is ignored on macOS or Windows + env: SPAWN_BACKEND="mp" + - name: "Python 3.7: trio-run-in-process" + python: 3.7 # this works for Linux but is ignored on macOS or Windows + env: SPAWN_BACKEND="trio_run_in_process" + + - name: "Python 3.8: multiprocessing" + python: 3.8 # this works for Linux but is ignored on macOS or Windows + env: SPAWN_BACKEND="mp" + - name: "Python 3.8: trio-run-in-process" + python: 3.8 # this works for Linux but is ignored on macOS or Windows + env: SPAWN_BACKEND="trio_run_in_process" install: - cd $TRAVIS_BUILD_DIR @@ -32,4 +43,4 @@ install: script: - mypy tractor/ --ignore-missing-imports - - pytest tests/ --no-print-logs + - pytest tests/ --no-print-logs --spawn-backend=${SPAWN_BACKEND} diff --git a/README.rst b/README.rst index 7b6992c..f97b623 100644 --- a/README.rst +++ b/README.rst @@ -83,8 +83,6 @@ Its tenets non-comprehensively include: are greatly appreciated! .. _concept-in-progress: https://trio.discourse.group/t/structured-concurrency-kickoff/55 -.. _pulsar: http://quantmind.github.io/pulsar/design.html -.. _execnet: https://codespeak.net/execnet/ Install @@ -357,14 +355,15 @@ Depending on the function type ``Portal.run()`` tries to correctly interface exactly like a local version of the remote built-in Python *function type*. Currently async functions, generators, and regular functions are supported. Inspiration for this API comes -from the way execnet_ does `remote function execution`_ but without -the client code (necessarily) having to worry about the underlying -channels_ system or shipping code over the network. +`remote function execution`_ but without the client code being +concerned about the underlying channels_ system or shipping code +over the network. This *portal* approach turns out to be paricularly exciting with the introduction of `asynchronous generators`_ in Python 3.6! It means that actors can compose nicely in a data streaming pipeline. +.. _exactly like trio: https://trio.readthedocs.io/en/latest/reference-core.html#cancellation-semantics Streaming ********* @@ -703,20 +702,54 @@ need to hop into a debugger. You just need to pass the existing tractor.run(main, arbiter_addr=('192.168.0.10', 1616)) -Choosing a ``multiprocessing`` *start method* -********************************************* -``tractor`` supports selection of the `multiprocessing start method`_ via -a ``start_method`` kwarg to ``tractor.run()``. Note that on Windows -*spawn* it the only supported method and on nix systems *forkserver* is -selected by default for speed. +Choosing a process spawning backend +*********************************** +``tractor`` is architected to support multiple actor (sub-process) +spawning backends. Specific defaults are chosen based on your system +but you can also explicitly select a backend of choice at startup +via a ``start_method`` kwarg to ``tractor.run()``. -.. _multiprocessing start method: https://docs.python.org/3/library/multiprocessing.html#contexts-and-start-methods +Currently the options available are: +- ``trio_run_in_process``: a ``trio``-native spawner from the `Ethereum community`_ +- ``spawn``: one of the stdlib's ``multiprocessing`` `start methods`_ +- ``forkserver``: a faster ``multiprocessing`` variant that is Unix only + +.. _start methods: https://docs.python.org/3.8/library/multiprocessing.html#contexts-and-start-methods +.. _Ethereum community : https://github.com/ethereum/trio-run-in-process + + +``trio-run-in-process`` ++++++++++++++++++++++++ +`trio-run-in-process`_ is a young "pure ``trio``" process spawner +which utilizes the native `trio subprocess APIs`_. It has shown great +reliability under testing for predictable teardown when launching +recursive pools of actors (multiple nurseries deep) and as such has been +chosen as the default backend on \*nix systems. + +.. _trio-run-in-process: https://github.com/ethereum/trio-run-in-process +.. _trio subprocess APIs : https://trio.readthedocs.io/en/stable/reference-io.html#spawning-subprocesses + + +``multiprocessing`` ++++++++++++++++++++ +There is support for the stdlib's ``multiprocessing`` `start methods`_. +Note that on Windows *spawn* it the only supported method and on \*nix +systems *forkserver* is the best method for speed but has the caveat +that it will break easily (hangs due to broken pipes) if spawning actors +using nested nurseries. + +In general, the ``multiprocessing`` backend **has not proven reliable** +for handling errors from actors more then 2 nurseries *deep* (see `#89`_). +If you for some reason need this consider sticking with alternative +backends. + +.. _#89: https://github.com/goodboy/tractor/issues/89 Windows "gotchas" -***************** -`tractor` internally uses the stdlib's `multiprocessing` package which -*can* have some gotchas on Windows. Namely, the need for calling +^^^^^^^^^^^^^^^^^ +On Windows (which requires the use of the stdlib's `multiprocessing` +package) there are some gotchas. Namely, the need for calling `freeze_support()`_ inside the ``__main__`` context. Additionally you may need place you `tractor` program entry point in a seperate `__main__.py` module in your package in order to avoid an error like the diff --git a/mypy.ini b/mypy.ini new file mode 100644 index 0000000..b991c7c --- /dev/null +++ b/mypy.ini @@ -0,0 +1,2 @@ +[mypy] +plugins = trio_typing.plugin diff --git a/setup.py b/setup.py index cb44d1c..3fe45dc 100755 --- a/setup.py +++ b/setup.py @@ -39,6 +39,7 @@ setup( ], install_requires=[ 'msgpack', 'trio>0.8', 'async_generator', 'colorlog', 'wrapt', + 'trio_typing', 'trio-run-in-process', ], tests_require=['pytest'], python_requires=">=3.7", diff --git a/tests/conftest.py b/tests/conftest.py index fd8427f..128ff3e 100644 --- a/tests/conftest.py +++ b/tests/conftest.py @@ -2,6 +2,7 @@ ``tractor`` testing!! """ import random +import platform import pytest import tractor @@ -13,8 +14,28 @@ _arb_addr = '127.0.0.1', random.randint(1000, 9999) def pytest_addoption(parser): - parser.addoption("--ll", action="store", dest='loglevel', - default=None, help="logging level to set when testing") + parser.addoption( + "--ll", action="store", dest='loglevel', + default=None, help="logging level to set when testing" + ) + + parser.addoption( + "--spawn-backend", action="store", dest='spawn_backend', + default='trio_run_in_process', + help="Processing spawning backend to use for test run", + ) + + +def pytest_configure(config): + backend = config.option.spawn_backend + + if platform.system() == "Windows": + backend = 'mp' + + if backend == 'mp': + tractor._spawn.try_set_start_method('spawn') + elif backend == 'trio_run_in_process': + tractor._spawn.try_set_start_method(backend) @pytest.fixture(scope='session', autouse=True) @@ -31,11 +52,26 @@ def arb_addr(): def pytest_generate_tests(metafunc): + spawn_backend = metafunc.config.option.spawn_backend + if not spawn_backend: + # XXX some weird windows bug with `pytest`? + spawn_backend = 'mp' + assert spawn_backend in ('mp', 'trio_run_in_process') + if 'start_method' in metafunc.fixturenames: - from multiprocessing import get_all_start_methods - methods = get_all_start_methods() - if 'fork' in methods: # fork not available on windows, so check before removing - # XXX: the fork method is in general incompatible with - # trio's global scheduler state - methods.remove('fork') + if spawn_backend == 'mp': + from multiprocessing import get_all_start_methods + methods = get_all_start_methods() + if 'fork' in methods: + # fork not available on windows, so check before + # removing XXX: the fork method is in general + # incompatible with trio's global scheduler state + methods.remove('fork') + elif spawn_backend == 'trio_run_in_process': + if platform.system() == "Windows": + pytest.fail( + "Only `--spawn-backend=mp` is supported on Windows") + + methods = ['trio_run_in_process'] + metafunc.parametrize("start_method", methods, scope='module') diff --git a/tests/test_cancellation.py b/tests/test_cancellation.py index 80b66f1..ce74fdc 100644 --- a/tests/test_cancellation.py +++ b/tests/test_cancellation.py @@ -289,11 +289,18 @@ async def test_nested_multierrors(loglevel, start_method): This test goes only 2 nurseries deep but we should eventually have tests for arbitrary n-depth actor trees. """ - # XXX: forkserver can't seem to handle any more then 2 depth - # process trees for whatever reason. - # Any more process levels then this and we start getting pretty slow anyway - depth = 3 - subactor_breadth = 2 + if start_method == 'trio_run_in_process': + depth = 3 + subactor_breadth = 2 + else: + # XXX: multiprocessing can't seem to handle any more then 2 depth + # process trees for whatever reason. + # Any more process levels then this and we see bugs that cause + # hangs and broken pipes all over the place... + if start_method == 'forkserver': + pytest.skip("Forksever sux hard at nested spawning...") + depth = 2 + subactor_breadth = 2 with trio.fail_after(120): try: diff --git a/tests/test_rpc.py b/tests/test_rpc.py index 5dec131..b3fa1df 100644 --- a/tests/test_rpc.py +++ b/tests/test_rpc.py @@ -60,7 +60,7 @@ def test_rpc_errors(arb_addr, to_call, testdir): if exposed_mods == ['tmp_mod']: # create an importable module with a bad import testdir.syspathinsert() - # module should cause raise a ModuleNotFoundError at import + # module should raise a ModuleNotFoundError at import testdir.makefile('.py', tmp_mod=funcname) # no need to exposed module to the subactor @@ -69,7 +69,9 @@ def test_rpc_errors(arb_addr, to_call, testdir): func_defined = False # subactor should not try to invoke anything subactor_requests_to = None - remote_err = trio.MultiError + # the module will be attempted to be imported locally but will + # fail in the initial local instance of the actor + remote_err = inside_err async def main(): actor = tractor.current_actor() @@ -100,7 +102,7 @@ def test_rpc_errors(arb_addr, to_call, testdir): if exposed_mods and func_defined: run() else: - # underlying errors are propagated upwards (yet) + # underlying errors aren't propagated upwards (yet) with pytest.raises(remote_err) as err: run() @@ -114,4 +116,5 @@ def test_rpc_errors(arb_addr, to_call, testdir): value.exceptions )) - assert value.type is inside_err + if getattr(value, 'type', None): + assert value.type is inside_err diff --git a/tractor/__init__.py b/tractor/__init__.py index 10bb895..80eaf05 100644 --- a/tractor/__init__.py +++ b/tractor/__init__.py @@ -99,16 +99,18 @@ def run( name: Optional[str] = None, arbiter_addr: Tuple[str, int] = ( _default_arbiter_host, _default_arbiter_port), - # the `multiprocessing` start method: + # either the `multiprocessing` start method: # https://docs.python.org/3/library/multiprocessing.html#contexts-and-start-methods - start_method: str = 'forkserver', + # OR `trio_run_in_process` (the new default). + start_method: Optional[str] = None, **kwargs, ) -> Any: """Run a trio-actor async function in process. This is tractor's main entry and the start point for any async actor. """ - _spawn.try_set_start_method(start_method) + if start_method is not None: + _spawn.try_set_start_method(start_method) return trio.run(_main, async_fn, args, kwargs, arbiter_addr, name) diff --git a/tractor/_actor.py b/tractor/_actor.py index 22e2253..7118a3e 100644 --- a/tractor/_actor.py +++ b/tractor/_actor.py @@ -5,10 +5,14 @@ from collections import defaultdict from functools import partial from itertools import chain import importlib +import importlib.util import inspect import uuid import typing from typing import Dict, List, Tuple, Any, Optional +from types import ModuleType +import sys +import os import trio # type: ignore from trio_typing import TaskStatus @@ -25,6 +29,7 @@ from ._exceptions import ( from ._discovery import get_arbiter from ._portal import Portal from . import _state +from . import _mp_fixup_main log = get_logger('tractor') @@ -148,6 +153,10 @@ async def _invoke( actor._ongoing_rpc_tasks.set() +def _get_mod_abspath(module): + return os.path.abspath(module.__file__) + + class Actor: """The fundamental concurrency primitive. @@ -162,6 +171,14 @@ class Actor: _root_nursery: trio.Nursery _server_nursery: trio.Nursery + # marked by the process spawning backend at startup + # will be None for the parent most process started manually + # by the user (currently called the "arbiter") + _spawn_method: Optional[str] = None + + # Information about `__main__` from parent + _parent_main_data: Dict[str, str] + def __init__( self, name: str, @@ -171,10 +188,23 @@ class Actor: loglevel: str = None, arbiter_addr: Optional[Tuple[str, int]] = None, ) -> None: + """This constructor is called in the parent actor **before** the spawning + phase (aka before a new process is executed). + """ self.name = name self.uid = (name, uid or str(uuid.uuid4())) - self.rpc_module_paths = rpc_module_paths - self._mods: dict = {} + + # retreive and store parent `__main__` data which + # will be passed to children + self._parent_main_data = _mp_fixup_main._mp_figure_out_main() + + mods = {} + for name in rpc_module_paths or (): + mod = importlib.import_module(name) + mods[name] = _get_mod_abspath(mod) + + self.rpc_module_paths = mods + self._mods: Dict[str, ModuleType] = {} # TODO: consider making this a dynamically defined # @dataclass once we get py3.7 @@ -225,11 +255,34 @@ class Actor: the original nursery we need to try and load the local module code (if it exists). """ - for path in self.rpc_module_paths: - log.debug(f"Attempting to import {path}") - self._mods[path] = importlib.import_module(path) + try: + if self._spawn_method == 'trio_run_in_process': + parent_data = self._parent_main_data + if 'init_main_from_name' in parent_data: + _mp_fixup_main._fixup_main_from_name( + parent_data['init_main_from_name']) + elif 'init_main_from_path' in parent_data: + _mp_fixup_main._fixup_main_from_path( + parent_data['init_main_from_path']) + + for modpath, filepath in self.rpc_module_paths.items(): + # XXX append the allowed module to the python path which + # should allow for relative (at least downward) imports. + sys.path.append(os.path.dirname(filepath)) + log.debug(f"Attempting to import {modpath}@{filepath}") + self._mods[modpath] = importlib.import_module(modpath) + except ModuleNotFoundError: + # it is expected the corresponding `ModuleNotExposed` error + # will be raised later + log.error(f"Failed to import {modpath} in {self.name}") + raise def _get_rpc_func(self, ns, funcname): + if ns == "__mp_main__": + # In subprocesses, `__main__` will actually map to + # `__mp_main__` which should be the same entry-point-module + # as the parent. + ns = "__main__" try: return getattr(self._mods[ns], funcname) except KeyError as err: @@ -488,7 +541,7 @@ class Actor: f"Exiting msg loop for {chan} from {chan.uid} " f"with last msg:\n{msg}") - def _fork_main( + def _mp_main( self, accept_addr: Tuple[str, int], forkserver_info: Tuple[Any, Any, Any, Any, Any], @@ -500,13 +553,18 @@ class Actor: self._forkserver_info = forkserver_info from ._spawn import try_set_start_method spawn_ctx = try_set_start_method(start_method) + if self.loglevel is not None: log.info( f"Setting loglevel for {self.uid} to {self.loglevel}") get_console_log(self.loglevel) + + assert spawn_ctx log.info( f"Started new {spawn_ctx.current_process()} for {self.uid}") + _state._current_actor = self + log.debug(f"parent_addr is {parent_addr}") try: trio.run(partial( @@ -515,6 +573,26 @@ class Actor: pass # handle it the same way trio does? log.info(f"Actor {self.uid} terminated") + async def _trip_main( + self, + accept_addr: Tuple[str, int], + parent_addr: Tuple[str, int] = None + ) -> None: + """Entry point for a `trio_run_in_process` subactor. + + Here we don't need to call `trio.run()` since trip does that as + part of its subprocess startup sequence. + """ + if self.loglevel is not None: + log.info( + f"Setting loglevel for {self.uid} to {self.loglevel}") + get_console_log(self.loglevel) + + log.info(f"Started new TRIP process for {self.uid}") + _state._current_actor = self + await self._async_main(accept_addr, parent_addr=parent_addr) + log.info(f"Actor {self.uid} terminated") + async def _async_main( self, accept_addr: Tuple[str, int], @@ -598,9 +676,13 @@ class Actor: f"Failed to ship error to parent " f"{self._parent_chan.uid}, channel was closed") log.exception("Actor errored:") + + if isinstance(err, ModuleNotFoundError): + raise else: # XXX wait, why? # causes a hang if I always raise.. + # A parent process does something weird here? raise finally: @@ -649,8 +731,8 @@ class Actor: port=accept_port, host=accept_host, ) ) - log.debug( - f"Started tcp server(s) on {[l.socket for l in listeners]}") # type: ignore + log.debug(f"Started tcp server(s) on" + " {[l.socket for l in listeners]}") # type: ignore self._listeners.extend(listeners) task_status.started() diff --git a/tractor/_mp_fixup_main.py b/tractor/_mp_fixup_main.py new file mode 100644 index 0000000..7869561 --- /dev/null +++ b/tractor/_mp_fixup_main.py @@ -0,0 +1,100 @@ +""" +Helpers pulled mostly verbatim from ``multiprocessing.spawn`` +to aid with "fixing up" the ``__main__`` module in subprocesses. + +These helpers are needed for any spawing backend that doesn't already handle this. +For example when using ``trio_run_in_process`` it is needed but obviously not when +we're already using ``multiprocessing``. +""" +import os +import sys +import platform +import types +import runpy +from typing import Dict + + +ORIGINAL_DIR = os.path.abspath(os.getcwd()) + + +def _mp_figure_out_main() -> Dict[str, str]: + """Taken from ``multiprocessing.spawn.get_preparation_data()``. + + Retrieve parent actor `__main__` module data. + """ + d = {} + # Figure out whether to initialise main in the subprocess as a module + # or through direct execution (or to leave it alone entirely) + main_module = sys.modules['__main__'] + main_mod_name = getattr(main_module.__spec__, "name", None) + if main_mod_name is not None: + d['init_main_from_name'] = main_mod_name + # elif sys.platform != 'win32' or (not WINEXE and not WINSERVICE): + elif platform.system() != 'Windows': + main_path = getattr(main_module, '__file__', None) + if main_path is not None: + if ( + not os.path.isabs(main_path) and ( + ORIGINAL_DIR is not None) + ): + # process.ORIGINAL_DIR is not None): + # main_path = os.path.join(process.ORIGINAL_DIR, main_path) + main_path = os.path.join(ORIGINAL_DIR, main_path) + d['init_main_from_path'] = os.path.normpath(main_path) + + return d + + +# Multiprocessing module helpers to fix up the main module in +# spawned subprocesses +def _fixup_main_from_name(mod_name: str) -> None: + # __main__.py files for packages, directories, zip archives, etc, run + # their "main only" code unconditionally, so we don't even try to + # populate anything in __main__, nor do we make any changes to + # __main__ attributes + current_main = sys.modules['__main__'] + if mod_name == "__main__" or mod_name.endswith(".__main__"): + return + + # If this process was forked, __main__ may already be populated + if getattr(current_main.__spec__, "name", None) == mod_name: + return + + # Otherwise, __main__ may contain some non-main code where we need to + # support unpickling it properly. We rerun it as __mp_main__ and make + # the normal __main__ an alias to that + # old_main_modules.append(current_main) + main_module = types.ModuleType("__mp_main__") + main_content = runpy.run_module(mod_name, + run_name="__mp_main__", + alter_sys=True) + main_module.__dict__.update(main_content) + sys.modules['__main__'] = sys.modules['__mp_main__'] = main_module + + +def _fixup_main_from_path(main_path: str) -> None: + # If this process was forked, __main__ may already be populated + current_main = sys.modules['__main__'] + + # Unfortunately, the main ipython launch script historically had no + # "if __name__ == '__main__'" guard, so we work around that + # by treating it like a __main__.py file + # See https://github.com/ipython/ipython/issues/4698 + main_name = os.path.splitext(os.path.basename(main_path))[0] + if main_name == 'ipython': + return + + # Otherwise, if __file__ already has the setting we expect, + # there's nothing more to do + if getattr(current_main, '__file__', None) == main_path: + return + + # If the parent process has sent a path through rather than a module + # name we assume it is an executable script that may contain + # non-main code that needs to be executed + # old_main_modules.append(current_main) + main_module = types.ModuleType("__mp_main__") + main_content = runpy.run_path(main_path, + run_name="__mp_main__") + main_module.__dict__.update(main_content) + sys.modules['__main__'] = sys.modules['__mp_main__'] = main_module diff --git a/tractor/_spawn.py b/tractor/_spawn.py index f299b9d..9421d21 100644 --- a/tractor/_spawn.py +++ b/tractor/_spawn.py @@ -1,9 +1,14 @@ """ -Process spawning. - -Mostly just wrapping around ``multiprocessing``. +Machinery for actor process spawning using multiple backends. """ +import inspect import multiprocessing as mp +import platform +from typing import Any, Dict, Optional + +import trio +from trio_typing import TaskStatus +from async_generator import aclosing try: from multiprocessing import semaphore_tracker # type: ignore @@ -18,34 +23,65 @@ from typing import Tuple from . import _forkserver_override from ._state import current_actor -from ._actor import Actor +from .log import get_logger +from ._portal import Portal +from ._actor import Actor, ActorFailure -_ctx: mp.context.BaseContext = mp.get_context("spawn") # type: ignore +log = get_logger('tractor') + +# use trip as our default on *nix systems for now +if platform.system() != 'Windows': + _spawn_method: str = "trio_run_in_process" +else: + _spawn_method = "spawn" + +_ctx: Optional[mp.context.BaseContext] = None -def try_set_start_method(name: str) -> mp.context.BaseContext: - """Attempt to set the start method for ``multiprocess.Process`` spawning. +if platform.system() == 'Windows': + async def proc_waiter(proc: mp.Process) -> None: + await trio.hazmat.WaitForSingleObject(proc.sentinel) +else: + import trio_run_in_process - If the desired method is not supported the sub-interpreter (aka "spawn" - method) is used. + async def proc_waiter(proc: mp.Process) -> None: + await trio.hazmat.wait_readable(proc.sentinel) + + +def try_set_start_method(name: str) -> Optional[mp.context.BaseContext]: + """Attempt to set the start method for process starting, aka the "actor + spawning backend". + + If the desired method is not supported this function will error. On Windows + the only supported option is the ``multiprocessing`` "spawn" method. The default + on *nix systems is ``trio_run_in_process``. """ global _ctx + global _spawn_method - allowed = mp.get_all_start_methods() + methods = mp.get_all_start_methods() + if 'fork' in methods: + # forking is incompatible with ``trio``s global task tree + methods.remove('fork') - if name not in allowed: - name = 'spawn' - elif name == 'fork': + # no Windows support for trip yet + if platform.system() != 'Windows': + methods += ['trio_run_in_process'] + + if name not in methods: raise ValueError( - "`fork` is unsupported due to incompatibility with `trio`" + f"Spawn method `{name}` is invalid please choose one of {methods}" ) elif name == 'forkserver': _forkserver_override.override_stdlib() + _ctx = mp.get_context(name) + elif name == 'trio_run_in_process': + _ctx = None + else: + _ctx = mp.get_context(name) - assert name in allowed - - _ctx = mp.get_context(name) + _spawn_method = name return _ctx @@ -55,54 +91,208 @@ def is_main_process() -> bool: return mp.current_process().name == 'MainProcess' -def new_proc( - name: str, +async def exhaust_portal( + portal: Portal, + actor: Actor +) -> Any: + """Pull final result from portal (assuming it has one). + + If the main task is an async generator do our best to consume + what's left of it. + """ + try: + log.debug(f"Waiting on final result from {actor.uid}") + final = res = await portal.result() + # if it's an async-gen then alert that we're cancelling it + if inspect.isasyncgen(res): + final = [] + log.warning( + f"Blindly consuming asyncgen for {actor.uid}") + with trio.fail_after(1): + async with aclosing(res) as agen: + async for item in agen: + log.debug(f"Consuming item {item}") + final.append(item) + except (Exception, trio.MultiError) as err: + # we reraise in the parent task via a ``trio.MultiError`` + return err + else: + return final + + +async def cancel_on_completion( + portal: Portal, actor: Actor, + errors: Dict[Tuple[str, str], Exception], + task_status: TaskStatus[trio.CancelScope] = trio.TASK_STATUS_IGNORED, +) -> None: + """Cancel actor gracefully once it's "main" portal's + result arrives. + + Should only be called for actors spawned with `run_in_actor()`. + """ + with trio.CancelScope() as cs: + task_status.started(cs) + # if this call errors we store the exception for later + # in ``errors`` which will be reraised inside + # a MultiError and we still send out a cancel request + result = await exhaust_portal(portal, actor) + if isinstance(result, Exception): + errors[actor.uid] = result + log.warning( + f"Cancelling {portal.channel.uid} after error {result}" + ) + else: + log.info( + f"Cancelling {portal.channel.uid} gracefully " + "after result {result}") + + # cancel the process now that we have a final result + await portal.cancel_actor() + + +async def new_proc( + name: str, + actor_nursery: 'ActorNursery', # type: ignore + subactor: Actor, + errors: Dict[Tuple[str, str], Exception], # passed through to actor main bind_addr: Tuple[str, int], parent_addr: Tuple[str, int], -) -> mp.Process: + use_trio_run_in_process: bool = False, + task_status: TaskStatus[Portal] = trio.TASK_STATUS_IGNORED +) -> None: """Create a new ``multiprocessing.Process`` using the spawn method as configured using ``try_set_start_method()``. """ - start_method = _ctx.get_start_method() - if start_method == 'forkserver': - # XXX do our hackery on the stdlib to avoid multiple - # forkservers (one at each subproc layer). - fs = forkserver._forkserver - curr_actor = current_actor() - if is_main_process() and not curr_actor._forkserver_info: - # if we're the "main" process start the forkserver only once - # and pass its ipc info to downstream children - # forkserver.set_forkserver_preload(rpc_module_paths) - forkserver.ensure_running() - fs_info = ( - fs._forkserver_address, - fs._forkserver_alive_fd, - getattr(fs, '_forkserver_pid', None), - getattr(resource_tracker._resource_tracker, '_pid', None), - resource_tracker._resource_tracker._fd, - ) - else: - assert curr_actor._forkserver_info - fs_info = ( - fs._forkserver_address, - fs._forkserver_alive_fd, - fs._forkserver_pid, - resource_tracker._resource_tracker._pid, - resource_tracker._resource_tracker._fd, - ) = curr_actor._forkserver_info - else: - fs_info = (None, None, None, None, None) + cancel_scope = None - return _ctx.Process( # type: ignore - target=actor._fork_main, - args=( - bind_addr, - fs_info, - start_method, - parent_addr - ), - # daemon=True, - name=name, - ) + # mark the new actor with the global spawn method + subactor._spawn_method = _spawn_method + + async with trio.open_nursery() as nursery: + if use_trio_run_in_process or _spawn_method == 'trio_run_in_process': + # trio_run_in_process + async with trio_run_in_process.open_in_process( + subactor._trip_main, + bind_addr, + parent_addr, + ) as proc: + log.info(f"Started {proc}") + + # wait for actor to spawn and connect back to us + # channel should have handshake completed by the + # local actor by the time we get a ref to it + event, chan = await actor_nursery._actor.wait_for_peer( + subactor.uid) + portal = Portal(chan) + actor_nursery._children[subactor.uid] = ( + subactor, proc, portal) + task_status.started(portal) + + # wait for ActorNursery.wait() to be called + await actor_nursery._join_procs.wait() + + if portal in actor_nursery._cancel_after_result_on_exit: + cancel_scope = await nursery.start( + cancel_on_completion, portal, subactor, errors) + + # TRIP blocks here until process is complete + else: + # `multiprocessing` + assert _ctx + start_method = _ctx.get_start_method() + if start_method == 'forkserver': + # XXX do our hackery on the stdlib to avoid multiple + # forkservers (one at each subproc layer). + fs = forkserver._forkserver + curr_actor = current_actor() + if is_main_process() and not curr_actor._forkserver_info: + # if we're the "main" process start the forkserver + # only once and pass its ipc info to downstream + # children + # forkserver.set_forkserver_preload(rpc_module_paths) + forkserver.ensure_running() + fs_info = ( + fs._forkserver_address, + fs._forkserver_alive_fd, + getattr(fs, '_forkserver_pid', None), + getattr( + resource_tracker._resource_tracker, '_pid', None), + resource_tracker._resource_tracker._fd, + ) + else: + assert curr_actor._forkserver_info + fs_info = ( + fs._forkserver_address, + fs._forkserver_alive_fd, + fs._forkserver_pid, + resource_tracker._resource_tracker._pid, + resource_tracker._resource_tracker._fd, + ) = curr_actor._forkserver_info + else: + fs_info = (None, None, None, None, None) + + proc = _ctx.Process( # type: ignore + target=subactor._mp_main, + args=( + bind_addr, + fs_info, + start_method, + parent_addr + ), + # daemon=True, + name=name, + ) + # `multiprocessing` only (since no async interface): + # register the process before start in case we get a cancel + # request before the actor has fully spawned - then we can wait + # for it to fully come up before sending a cancel request + actor_nursery._children[subactor.uid] = (subactor, proc, None) + + proc.start() + if not proc.is_alive(): + raise ActorFailure("Couldn't start sub-actor?") + + log.info(f"Started {proc}") + + # wait for actor to spawn and connect back to us + # channel should have handshake completed by the + # local actor by the time we get a ref to it + event, chan = await actor_nursery._actor.wait_for_peer( + subactor.uid) + portal = Portal(chan) + actor_nursery._children[subactor.uid] = (subactor, proc, portal) + + # unblock parent task + task_status.started(portal) + + # wait for ``ActorNursery`` block to signal that + # subprocesses can be waited upon. + # This is required to ensure synchronization + # with user code that may want to manually await results + # from nursery spawned sub-actors. We don't want the + # containing nurseries here to collect results or error + # while user code is still doing it's thing. Only after the + # nursery block closes do we allow subactor results to be + # awaited and reported upwards to the supervisor. + await actor_nursery._join_procs.wait() + + if portal in actor_nursery._cancel_after_result_on_exit: + cancel_scope = await nursery.start( + cancel_on_completion, portal, subactor, errors) + + # TODO: timeout block here? + if proc.is_alive(): + await proc_waiter(proc) + proc.join() + + log.debug(f"Joined {proc}") + # pop child entry to indicate we are no longer managing this subactor + subactor, proc, portal = actor_nursery._children.pop(subactor.uid) + # cancel result waiter that may have been spawned in + # tandem if not done already + if cancel_scope: + log.warning( + f"Cancelling existing result waiter task for {subactor.uid}") + cancel_scope.cancel() diff --git a/tractor/_trionics.py b/tractor/_trionics.py index 5250f81..17ae548 100644 --- a/tractor/_trionics.py +++ b/tractor/_trionics.py @@ -1,39 +1,37 @@ """ ``trio`` inspired apis and helpers """ -import inspect -import platform import multiprocessing as mp from typing import Tuple, List, Dict, Optional, Any import typing import trio -from async_generator import asynccontextmanager, aclosing +from async_generator import asynccontextmanager from ._state import current_actor from .log import get_logger, get_loglevel -from ._actor import Actor, ActorFailure +from ._actor import Actor # , ActorFailure from ._portal import Portal from . import _spawn -if platform.system() == 'Windows': - async def proc_waiter(proc: mp.Process) -> None: - await trio.hazmat.WaitForSingleObject(proc.sentinel) -else: - async def proc_waiter(proc: mp.Process) -> None: - await trio.hazmat.wait_readable(proc.sentinel) - - log = get_logger('tractor') class ActorNursery: """Spawn scoped subprocess actors. """ - def __init__(self, actor: Actor) -> None: + def __init__( + self, + actor: Actor, + ria_nursery: trio.Nursery, + da_nursery: trio.Nursery, + errors: Dict[Tuple[str, str], Exception], + ) -> None: # self.supervisor = supervisor # TODO self._actor: Actor = actor + self._ria_nursery = ria_nursery + self._da_nursery = da_nursery self._children: Dict[ Tuple[str, str], Tuple[Actor, mp.Process, Optional[Portal]] @@ -42,9 +40,8 @@ class ActorNursery: # cancelled when their "main" result arrives self._cancel_after_result_on_exit: set = set() self.cancelled: bool = False - - async def __aenter__(self): - return self + self._join_procs = trio.Event() + self.errors = errors async def start_actor( self, @@ -53,9 +50,11 @@ class ActorNursery: statespace: Optional[Dict[str, Any]] = None, rpc_module_paths: List[str] = None, loglevel: str = None, # set log level per subactor + nursery: trio.Nursery = None, ) -> Portal: loglevel = loglevel or self._actor.loglevel or get_loglevel() - actor = Actor( + + subactor = Actor( name, # modules allowed to invoked funcs from rpc_module_paths=rpc_module_paths or [], @@ -64,37 +63,29 @@ class ActorNursery: arbiter_addr=current_actor()._arb_addr, ) parent_addr = self._actor.accept_addr - proc = _spawn.new_proc( + assert parent_addr + + # start a task to spawn a process + # blocks until process has been started and a portal setup + nursery = nursery or self._da_nursery + + # XXX: the type ignore is actually due to a `mypy` bug + return await nursery.start( # type: ignore + _spawn.new_proc, name, - actor, + self, + subactor, + self.errors, bind_addr, parent_addr, ) - # register the process before start in case we get a cancel - # request before the actor has fully spawned - then we can wait - # for it to fully come up before sending a cancel request - self._children[actor.uid] = (actor, proc, None) - - proc.start() - if not proc.is_alive(): - raise ActorFailure("Couldn't start sub-actor?") - - log.info(f"Started {proc}") - # wait for actor to spawn and connect back to us - # channel should have handshake completed by the - # local actor by the time we get a ref to it - event, chan = await self._actor.wait_for_peer(actor.uid) - portal = Portal(chan) - self._children[actor.uid] = (actor, proc, portal) - - return portal async def run_in_actor( self, name: str, fn: typing.Callable, bind_addr: Tuple[str, int] = ('127.0.0.1', 0), - rpc_module_paths: List[str] = [], + rpc_module_paths: Optional[List[str]] = None, statespace: Dict[str, Any] = None, loglevel: str = None, # set log level per subactor **kwargs, # explicit args to ``fn`` @@ -109,13 +100,15 @@ class ActorNursery: mod_path = fn.__module__ portal = await self.start_actor( name, - rpc_module_paths=[mod_path] + rpc_module_paths, + rpc_module_paths=[mod_path] + (rpc_module_paths or []), bind_addr=bind_addr, statespace=statespace, loglevel=loglevel, + # use the run_in_actor nursery + nursery=self._ria_nursery, ) # this marks the actor to be cancelled after its portal result - # is retreived, see ``wait()`` below. + # is retreived, see logic in `open_nursery()` below. self._cancel_after_result_on_exit.add(portal) await portal._submit_for_result( mod_path, @@ -124,136 +117,6 @@ class ActorNursery: ) return portal - async def wait(self) -> None: - """Wait for all subactors to complete. - - This is probably the most complicated (and confusing, sorry) - function that does all the clever crap to deal with cancellation, - error propagation, and graceful subprocess tear down. - """ - async def exhaust_portal(portal, actor): - """Pull final result from portal (assuming it has one). - - If the main task is an async generator do our best to consume - what's left of it. - """ - try: - log.debug(f"Waiting on final result from {actor.uid}") - final = res = await portal.result() - # if it's an async-gen then alert that we're cancelling it - if inspect.isasyncgen(res): - final = [] - log.warning( - f"Blindly consuming asyncgen for {actor.uid}") - with trio.fail_after(1): - async with aclosing(res) as agen: - async for item in agen: - log.debug(f"Consuming item {item}") - final.append(item) - except (Exception, trio.MultiError) as err: - # we reraise in the parent task via a ``trio.MultiError`` - return err - else: - return final - - async def cancel_on_completion( - portal: Portal, - actor: Actor, - task_status=trio.TASK_STATUS_IGNORED, - ) -> None: - """Cancel actor gracefully once it's "main" portal's - result arrives. - - Should only be called for actors spawned with `run_in_actor()`. - """ - with trio.CancelScope() as cs: - task_status.started(cs) - # if this call errors we store the exception for later - # in ``errors`` which will be reraised inside - # a MultiError and we still send out a cancel request - result = await exhaust_portal(portal, actor) - if isinstance(result, Exception): - errors.append(result) - log.warning( - f"Cancelling {portal.channel.uid} after error {result}" - ) - else: - log.info(f"Cancelling {portal.channel.uid} gracefully") - - # cancel the process now that we have a final result - await portal.cancel_actor() - - # XXX: lol, this will never get run without a shield above.. - # if cs.cancelled_caught: - # log.warning( - # "Result waiter was cancelled, process may have died") - - async def wait_for_proc( - proc: mp.Process, - actor: Actor, - cancel_scope: Optional[trio.CancelScope] = None, - ) -> None: - # TODO: timeout block here? - if proc.is_alive(): - await proc_waiter(proc) - - # please god don't hang - proc.join() - log.debug(f"Joined {proc}") - # indicate we are no longer managing this subactor - self._children.pop(actor.uid) - - # proc terminated, cancel result waiter that may have - # been spawned in tandem if not done already - if cancel_scope: - log.warning( - f"Cancelling existing result waiter task for {actor.uid}") - cancel_scope.cancel() - - log.debug(f"Waiting on all subactors to complete") - # since we pop each child subactor on termination, - # iterate a copy - children = self._children.copy() - errors: List[Exception] = [] - # wait on run_in_actor() tasks, unblocks when all complete - async with trio.open_nursery() as nursery: - for subactor, proc, portal in children.values(): - cs = None - # portal from ``run_in_actor()`` - if portal in self._cancel_after_result_on_exit: - assert portal - cs = await nursery.start( - cancel_on_completion, portal, subactor) - # TODO: how do we handle remote host spawned actors? - nursery.start_soon( - wait_for_proc, proc, subactor, cs) - - if errors: - if not self.cancelled: - # bubble up error(s) here and expect to be called again - # once the nursery has been cancelled externally (ex. - # from within __aexit__() if an error is caught around - # ``self.wait()`` then, ``self.cancel()`` is called - # immediately, in the default supervisor strat, after - # which in turn ``self.wait()`` is called again.) - raise trio.MultiError(errors) - - # wait on all `start_actor()` subactors to complete - # if errors were captured above and we have not been cancelled - # then these ``start_actor()`` spawned actors will block until - # cancelled externally - children = self._children.copy() - async with trio.open_nursery() as nursery: - for subactor, proc, portal in children.values(): - # TODO: how do we handle remote host spawned actors? - assert portal - nursery.start_soon(wait_for_proc, proc, subactor, cs) - - log.debug(f"All subactors for {self} have terminated") - if errors: - # always raise any error if we're also cancelled - raise trio.MultiError(errors) - async def cancel(self, hard_kill: bool = False) -> None: """Cancel this nursery by instructing each subactor to cancel itself and wait for all subactors to terminate. @@ -270,7 +133,7 @@ class ActorNursery: log.debug(f"Cancelling nursery") with trio.move_on_after(3) as cs: - async with trio.open_nursery() as n: + async with trio.open_nursery() as nursery: for subactor, proc, portal in self._children.values(): if hard_kill: do_hard_kill(proc) @@ -297,68 +160,137 @@ class ActorNursery: # spawn cancel tasks for each sub-actor assert portal - n.start_soon(portal.cancel_actor) + nursery.start_soon(portal.cancel_actor) # if we cancelled the cancel (we hung cancelling remote actors) # then hard kill all sub-processes if cs.cancelled_caught: log.error(f"Failed to gracefully cancel {self}, hard killing!") - async with trio.open_nursery() as n: + async with trio.open_nursery(): for subactor, proc, portal in self._children.values(): - n.start_soon(do_hard_kill, proc) + nursery.start_soon(do_hard_kill, proc) # mark ourselves as having (tried to have) cancelled all subactors self.cancelled = True - await self.wait() - - async def __aexit__(self, etype, value, tb): - """Wait on all subactor's main routines to complete. - """ - # XXX: this is effectively the (for now) lone - # cancellation/supervisor strategy (one-cancels-all) - # which exactly mimicks trio's behaviour - if etype is not None: - try: - # XXX: hypothetically an error could be raised and then - # a cancel signal shows up slightly after in which case - # the `else:` block here might not complete? - # For now, shield both. - with trio.CancelScope(shield=True): - if etype in (trio.Cancelled, KeyboardInterrupt): - log.warning( - f"Nursery for {current_actor().uid} was " - f"cancelled with {etype}") - else: - log.exception( - f"Nursery for {current_actor().uid} " - f"errored with {etype}, ") - await self.cancel() - except trio.MultiError as merr: - if value not in merr.exceptions: - raise trio.MultiError(merr.exceptions + [value]) - raise - else: - log.debug(f"Waiting on subactors {self._children} to complete") - try: - await self.wait() - except (Exception, trio.MultiError) as err: - log.warning(f"Nursery cancelling due to {err}") - if self._children: - with trio.CancelScope(shield=True): - await self.cancel() - raise - - log.debug(f"Nursery teardown complete") + self._join_procs.set() @asynccontextmanager async def open_nursery() -> typing.AsyncGenerator[ActorNursery, None]: - """Create and yield a new ``ActorNursery``. + """Create and yield a new ``ActorNursery`` to be used for spawning + structured concurrent subactors. + + When an actor is spawned a new trio task is started which + invokes one of the process spawning backends to create and start + a new subprocess. These tasks are started by one of two nurseries + detailed below. The reason for spawning processes from within + a new task is because ``trio_run_in_process`` itself creates a new + internal nursery and the same task that opens a nursery **must** + close it. It turns out this approach is probably more correct + anyway since it is more clear from the following nested nurseries + which cancellation scopes correspond to each spawned subactor set. """ actor = current_actor() if not actor: raise RuntimeError("No actor instance has been defined yet?") - # TODO: figure out supervisors from erlang - async with ActorNursery(actor) as nursery: - yield nursery + # the collection of errors retreived from spawned sub-actors + errors: Dict[Tuple[str, str], Exception] = {} + + # This is the outermost level "deamon actor" nursery. It is awaited + # **after** the below inner "run in actor nursery". This allows for + # handling errors that are generated by the inner nursery in + # a supervisor strategy **before** blocking indefinitely to wait for + # actors spawned in "daemon mode" (aka started using + # ``ActorNursery.start_actor()``). + async with trio.open_nursery() as da_nursery: + try: + # This is the inner level "run in actor" nursery. It is + # awaited first since actors spawned in this way (using + # ``ActorNusery.run_in_actor()``) are expected to only + # return a single result and then complete (i.e. be canclled + # gracefully). Errors collected from these actors are + # immediately raised for handling by a supervisor strategy. + # As such if the strategy propagates any error(s) upwards + # the above "daemon actor" nursery will be notified. + async with trio.open_nursery() as ria_nursery: + anursery = ActorNursery( + actor, ria_nursery, da_nursery, errors + ) + try: + # spawning of actors happens in the caller's scope + # after we yield upwards + yield anursery + log.debug( + f"Waiting on subactors {anursery._children}" + "to complete" + ) + except (BaseException, Exception) as err: + # if the caller's scope errored then we activate our + # one-cancels-all supervisor strategy (don't + # worry more are coming). + anursery._join_procs.set() + try: + # XXX: hypothetically an error could be raised and then + # a cancel signal shows up slightly after in which case + # the `else:` block here might not complete? + # For now, shield both. + with trio.CancelScope(shield=True): + if err in (trio.Cancelled, KeyboardInterrupt): + log.warning( + f"Nursery for {current_actor().uid} was " + f"cancelled with {err}") + else: + log.exception( + f"Nursery for {current_actor().uid} " + f"errored with {err}, ") + + # cancel all subactors + await anursery.cancel() + + except trio.MultiError as merr: + # If we receive additional errors while waiting on + # remaining subactors that were cancelled, + # aggregate those errors with the original error + # that triggered this teardown. + if err not in merr.exceptions: + raise trio.MultiError(merr.exceptions + [err]) + else: + raise + + # Last bit before first nursery block ends in the case + # where we didn't error in the caller's scope + log.debug(f"Waiting on all subactors to complete") + anursery._join_procs.set() + + # ria_nursery scope end + + except (Exception, trio.MultiError) as err: + # If actor-local error was raised while waiting on + # ".run_in_actor()" actors then we also want to cancel all + # remaining sub-actors (due to our lone strategy: + # one-cancels-all). + log.warning(f"Nursery cancelling due to {err}") + if anursery._children: + with trio.CancelScope(shield=True): + await anursery.cancel() + raise + finally: + # No errors were raised while awaiting ".run_in_actor()" + # actors but those actors may have returned remote errors as + # results (meaning they errored remotely and have relayed + # those errors back to this parent actor). The errors are + # collected in ``errors`` so cancel all actors, summarize + # all errors and re-raise. + if errors: + if anursery._children: + with trio.CancelScope(shield=True): + await anursery.cancel() + if len(errors) > 1: + raise trio.MultiError(tuple(errors.values())) + else: + raise list(errors.values())[0] + + # ria_nursery scope end + + log.debug(f"Nursery teardown complete") diff --git a/tractor/testing/_tractor_test.py b/tractor/testing/_tractor_test.py index 7d8289e..5ca82a6 100644 --- a/tractor/testing/_tractor_test.py +++ b/tractor/testing/_tractor_test.py @@ -1,4 +1,5 @@ import inspect +import platform from functools import partial, wraps from .. import run @@ -19,6 +20,7 @@ def tractor_test(fn): - ``arb_addr`` (a socket addr tuple where arbiter is listening) - ``loglevel`` (logging level passed to tractor internals) + - ``start_method`` (subprocess spawning backend) are defined in the `pytest` fixture space they will be automatically injected to tests declaring these funcargs. @@ -28,7 +30,7 @@ def tractor_test(fn): *args, loglevel=None, arb_addr=None, - start_method='forkserver', + start_method=None, **kwargs ): # __tracebackhide__ = True @@ -40,9 +42,15 @@ def tractor_test(fn): # allows test suites to define a 'loglevel' fixture # that activates the internal logging kwargs['loglevel'] = loglevel + + if start_method is None: + if platform.system() == "Windows": + start_method = 'spawn' + else: + start_method = 'trio_run_in_process' + if 'start_method' in inspect.signature(fn).parameters: - # allows test suites to define a 'loglevel' fixture - # that activates the internal logging + # set of subprocess spawning backends kwargs['start_method'] = start_method return run( partial(fn, *args, **kwargs),