From 1b7cdfe512358d52efea9324e45f791fd0e64c37 Mon Sep 17 00:00:00 2001 From: Tyler Goodlet Date: Tue, 26 Nov 2019 09:23:37 -0500 Subject: [PATCH 01/33] WIP trying out trio_run_in_process --- tractor/_actor.py | 45 +++++++++++++++++++-- tractor/_spawn.py | 94 +++++++++++++++++++++++++------------------- tractor/_trionics.py | 27 ++++++++----- 3 files changed, 113 insertions(+), 53 deletions(-) diff --git a/tractor/_actor.py b/tractor/_actor.py index 22e2253..e962115 100644 --- a/tractor/_actor.py +++ b/tractor/_actor.py @@ -225,9 +225,19 @@ class Actor: the original nursery we need to try and load the local module code (if it exists). """ - for path in self.rpc_module_paths: - log.debug(f"Attempting to import {path}") - self._mods[path] = importlib.import_module(path) + try: + for path in self.rpc_module_paths: + log.debug(f"Attempting to import {path}") + self._mods[path] = importlib.import_module(path) + + # if self.name != 'arbiter': + # importlib.import_module('doggy') + # from celery.contrib import rdb; rdb.set_trace() + except ModuleNotFoundError: + # it is expected the corresponding `ModuleNotExposed` error + # will be raised later + log.error(f"Failed to import {path} in {self.name}") + raise def _get_rpc_func(self, ns, funcname): try: @@ -488,7 +498,7 @@ class Actor: f"Exiting msg loop for {chan} from {chan.uid} " f"with last msg:\n{msg}") - def _fork_main( + def _mp_main( self, accept_addr: Tuple[str, int], forkserver_info: Tuple[Any, Any, Any, Any, Any], @@ -500,13 +510,17 @@ class Actor: self._forkserver_info = forkserver_info from ._spawn import try_set_start_method spawn_ctx = try_set_start_method(start_method) + if self.loglevel is not None: log.info( f"Setting loglevel for {self.uid} to {self.loglevel}") get_console_log(self.loglevel) + log.info( f"Started new {spawn_ctx.current_process()} for {self.uid}") + _state._current_actor = self + log.debug(f"parent_addr is {parent_addr}") try: trio.run(partial( @@ -515,6 +529,21 @@ class Actor: pass # handle it the same way trio does? log.info(f"Actor {self.uid} terminated") + async def _trip_main( + self, + accept_addr: Tuple[str, int], + parent_addr: Tuple[str, int] = None + ) -> None: + if self.loglevel is not None: + log.info( + f"Setting loglevel for {self.uid} to {self.loglevel}") + get_console_log(self.loglevel) + + log.info(f"Started new TRIP process for {self.uid}") + _state._current_actor = self + await self._async_main(accept_addr, parent_addr=parent_addr) + log.info(f"Actor {self.uid} terminated") + async def _async_main( self, accept_addr: Tuple[str, int], @@ -584,6 +613,8 @@ class Actor: # blocks here as expected until the channel server is # killed (i.e. this actor is cancelled or signalled by the parent) except Exception as err: + # if self.name == 'arbiter': + # import pdb; pdb.set_trace() if not registered_with_arbiter: log.exception( f"Actor errored and failed to register with arbiter " @@ -598,12 +629,18 @@ class Actor: f"Failed to ship error to parent " f"{self._parent_chan.uid}, channel was closed") log.exception("Actor errored:") + + if isinstance(err, ModuleNotFoundError): + raise else: # XXX wait, why? # causes a hang if I always raise.. + # A parent process does something weird here? raise finally: + # if self.name == 'arbiter': + # import pdb; pdb.set_trace() if registered_with_arbiter: await self._do_unreg(arbiter_addr) # terminate actor once all it's peers (actors that connected diff --git a/tractor/_spawn.py b/tractor/_spawn.py index f299b9d..311ab0b 100644 --- a/tractor/_spawn.py +++ b/tractor/_spawn.py @@ -5,6 +5,8 @@ Mostly just wrapping around ``multiprocessing``. """ import multiprocessing as mp +import trio_run_in_process + try: from multiprocessing import semaphore_tracker # type: ignore resource_tracker = semaphore_tracker @@ -55,54 +57,66 @@ def is_main_process() -> bool: return mp.current_process().name == 'MainProcess' -def new_proc( +async def new_proc( name: str, actor: Actor, # passed through to actor main bind_addr: Tuple[str, int], parent_addr: Tuple[str, int], + use_trip: bool = True, ) -> mp.Process: """Create a new ``multiprocessing.Process`` using the spawn method as configured using ``try_set_start_method()``. """ - start_method = _ctx.get_start_method() - if start_method == 'forkserver': - # XXX do our hackery on the stdlib to avoid multiple - # forkservers (one at each subproc layer). - fs = forkserver._forkserver - curr_actor = current_actor() - if is_main_process() and not curr_actor._forkserver_info: - # if we're the "main" process start the forkserver only once - # and pass its ipc info to downstream children - # forkserver.set_forkserver_preload(rpc_module_paths) - forkserver.ensure_running() - fs_info = ( - fs._forkserver_address, - fs._forkserver_alive_fd, - getattr(fs, '_forkserver_pid', None), - getattr(resource_tracker._resource_tracker, '_pid', None), - resource_tracker._resource_tracker._fd, - ) - else: - assert curr_actor._forkserver_info - fs_info = ( - fs._forkserver_address, - fs._forkserver_alive_fd, - fs._forkserver_pid, - resource_tracker._resource_tracker._pid, - resource_tracker._resource_tracker._fd, - ) = curr_actor._forkserver_info - else: - fs_info = (None, None, None, None, None) - - return _ctx.Process( # type: ignore - target=actor._fork_main, - args=( + if use_trip: # trio_run_in_process + mng = trio_run_in_process.open_in_process( + actor._trip_main, bind_addr, - fs_info, - start_method, parent_addr - ), - # daemon=True, - name=name, - ) + ) + proc = await mng.__aenter__() + proc.mng = mng + return proc + else: + # use multiprocessing + start_method = _ctx.get_start_method() + if start_method == 'forkserver': + # XXX do our hackery on the stdlib to avoid multiple + # forkservers (one at each subproc layer). + fs = forkserver._forkserver + curr_actor = current_actor() + if is_main_process() and not curr_actor._forkserver_info: + # if we're the "main" process start the forkserver only once + # and pass its ipc info to downstream children + # forkserver.set_forkserver_preload(rpc_module_paths) + forkserver.ensure_running() + fs_info = ( + fs._forkserver_address, + fs._forkserver_alive_fd, + getattr(fs, '_forkserver_pid', None), + getattr(resource_tracker._resource_tracker, '_pid', None), + resource_tracker._resource_tracker._fd, + ) + else: + assert curr_actor._forkserver_info + fs_info = ( + fs._forkserver_address, + fs._forkserver_alive_fd, + fs._forkserver_pid, + resource_tracker._resource_tracker._pid, + resource_tracker._resource_tracker._fd, + ) = curr_actor._forkserver_info + else: + fs_info = (None, None, None, None, None) + + return _ctx.Process( + target=actor._mp_main, + args=( + bind_addr, + fs_info, + start_method, + parent_addr + ), + # daemon=True, + name=name, + ) diff --git a/tractor/_trionics.py b/tractor/_trionics.py index 5250f81..cfed406 100644 --- a/tractor/_trionics.py +++ b/tractor/_trionics.py @@ -9,6 +9,7 @@ import typing import trio from async_generator import asynccontextmanager, aclosing +import trio_run_in_process from ._state import current_actor from .log import get_logger, get_loglevel @@ -64,20 +65,23 @@ class ActorNursery: arbiter_addr=current_actor()._arb_addr, ) parent_addr = self._actor.accept_addr - proc = _spawn.new_proc( + assert parent_addr + proc = await _spawn.new_proc( name, actor, bind_addr, parent_addr, ) + # `multiprocessing` only (since no async interface): # register the process before start in case we get a cancel # request before the actor has fully spawned - then we can wait # for it to fully come up before sending a cancel request self._children[actor.uid] = (actor, proc, None) - proc.start() - if not proc.is_alive(): - raise ActorFailure("Couldn't start sub-actor?") + if not isinstance(proc, trio_run_in_process.process.Process): + proc.start() + if not proc.is_alive(): + raise ActorFailure("Couldn't start sub-actor?") log.info(f"Started {proc}") # wait for actor to spawn and connect back to us @@ -193,12 +197,17 @@ class ActorNursery: actor: Actor, cancel_scope: Optional[trio.CancelScope] = None, ) -> None: - # TODO: timeout block here? - if proc.is_alive(): - await proc_waiter(proc) - # please god don't hang - proc.join() + if not isinstance(proc, trio_run_in_process.process.Process): + # TODO: timeout block here? + if proc.is_alive(): + await proc_waiter(proc) + proc.join() + else: + # trio_run_in_process blocking wait + await proc.mng.__aexit__(None, None, None) + # proc.nursery.cancel_scope.cancel() + log.debug(f"Joined {proc}") # indicate we are no longer managing this subactor self._children.pop(actor.uid) From afa640dcab4e3f93ac6bb55b5e8f76adb07ae820 Mon Sep 17 00:00:00 2001 From: Tyler Goodlet Date: Sat, 11 Jan 2020 21:37:30 -0500 Subject: [PATCH 02/33] More trip WIP stuff working.. kinda Get a few more things working: - fail reliably when remote module loading goes awry - do a real hacky job of module loading using `sys.path` stuffsies - we're still totally borked when trying to spin up and quickly cancel a bunch of subactors... It's a small move forward I guess. --- tractor/_actor.py | 22 +++++++++++++--- tractor/_spawn.py | 12 ++++++++- tractor/_trionics.py | 61 ++++++++++++++++++++++++++++++++++---------- 3 files changed, 77 insertions(+), 18 deletions(-) diff --git a/tractor/_actor.py b/tractor/_actor.py index e962115..b652e9f 100644 --- a/tractor/_actor.py +++ b/tractor/_actor.py @@ -5,10 +5,14 @@ from collections import defaultdict from functools import partial from itertools import chain import importlib +import importlib.util import inspect import uuid import typing from typing import Dict, List, Tuple, Any, Optional +from types import ModuleType +import sys +import os import trio # type: ignore from trio_typing import TaskStatus @@ -165,7 +169,7 @@ class Actor: def __init__( self, name: str, - rpc_module_paths: List[str] = [], + rpc_module_paths: Dict[str, ModuleType] = {}, statespace: Optional[Dict[str, Any]] = None, uid: str = None, loglevel: str = None, @@ -226,9 +230,21 @@ class Actor: code (if it exists). """ try: - for path in self.rpc_module_paths: + for path, absfilepath in self.rpc_module_paths.items(): log.debug(f"Attempting to import {path}") - self._mods[path] = importlib.import_module(path) + # spec = importlib.util.spec_from_file_location( + # path, absfilepath) + # mod = importlib.util.module_from_spec(spec) + + # XXX append the allowed module to the python path + # which should allow for relative (at least downward) + # imports. Seems to be the only that will work currently + # to get `trio-run-in-process` to import modules we "send + # it". + sys.path.append(os.path.dirname(absfilepath)) + # spec.loader.exec_module(mod) + mod = importlib.import_module(path) + self._mods[path] = mod # if self.name != 'arbiter': # importlib.import_module('doggy') diff --git a/tractor/_spawn.py b/tractor/_spawn.py index 311ab0b..026e257 100644 --- a/tractor/_spawn.py +++ b/tractor/_spawn.py @@ -5,6 +5,9 @@ Mostly just wrapping around ``multiprocessing``. """ import multiprocessing as mp +# from . import log + +import trio import trio_run_in_process try: @@ -63,6 +66,7 @@ async def new_proc( # passed through to actor main bind_addr: Tuple[str, int], parent_addr: Tuple[str, int], + nursery: trio.Nursery = None, use_trip: bool = True, ) -> mp.Process: """Create a new ``multiprocessing.Process`` using the @@ -72,8 +76,14 @@ async def new_proc( mng = trio_run_in_process.open_in_process( actor._trip_main, bind_addr, - parent_addr + parent_addr, + nursery=nursery, ) + # XXX playing with trip logging + # l = log.get_console_log(level='debug', name=None, _root_name='trio-run-in-process') + # import logging + # logger = logging.getLogger("trio-run-in-process") + # logger.setLevel('DEBUG') proc = await mng.__aenter__() proc.mng = mng return proc diff --git a/tractor/_trionics.py b/tractor/_trionics.py index cfed406..9808935 100644 --- a/tractor/_trionics.py +++ b/tractor/_trionics.py @@ -2,6 +2,7 @@ ``trio`` inspired apis and helpers """ import inspect +import importlib import platform import multiprocessing as mp from typing import Tuple, List, Dict, Optional, Any @@ -32,9 +33,10 @@ log = get_logger('tractor') class ActorNursery: """Spawn scoped subprocess actors. """ - def __init__(self, actor: Actor) -> None: + def __init__(self, actor: Actor, nursery: trio.Nursery) -> None: # self.supervisor = supervisor # TODO self._actor: Actor = actor + self._nursery = nursery self._children: Dict[ Tuple[str, str], Tuple[Actor, mp.Process, Optional[Portal]] @@ -43,6 +45,7 @@ class ActorNursery: # cancelled when their "main" result arrives self._cancel_after_result_on_exit: set = set() self.cancelled: bool = False + # self._aexitstack = contextlib.AsyncExitStack() async def __aenter__(self): return self @@ -56,10 +59,16 @@ class ActorNursery: loglevel: str = None, # set log level per subactor ) -> Portal: loglevel = loglevel or self._actor.loglevel or get_loglevel() + + mods = {} + for path in rpc_module_paths or (): + mod = importlib.import_module(path) + mods[path] = mod.__file__ + actor = Actor( name, # modules allowed to invoked funcs from - rpc_module_paths=rpc_module_paths or [], + rpc_module_paths=mods, statespace=statespace, # global proc state vars loglevel=loglevel, arbiter_addr=current_actor()._arb_addr, @@ -71,6 +80,7 @@ class ActorNursery: actor, bind_addr, parent_addr, + self._nursery, ) # `multiprocessing` only (since no async interface): # register the process before start in case we get a cancel @@ -98,7 +108,7 @@ class ActorNursery: name: str, fn: typing.Callable, bind_addr: Tuple[str, int] = ('127.0.0.1', 0), - rpc_module_paths: List[str] = [], + rpc_module_paths: Optional[List[str]] = None, statespace: Dict[str, Any] = None, loglevel: str = None, # set log level per subactor **kwargs, # explicit args to ``fn`` @@ -113,7 +123,7 @@ class ActorNursery: mod_path = fn.__module__ portal = await self.start_actor( name, - rpc_module_paths=[mod_path] + rpc_module_paths, + rpc_module_paths=[mod_path] + (rpc_module_paths or []), bind_addr=bind_addr, statespace=statespace, loglevel=loglevel, @@ -195,7 +205,8 @@ class ActorNursery: async def wait_for_proc( proc: mp.Process, actor: Actor, - cancel_scope: Optional[trio.CancelScope] = None, + portal: Portal, + cancel_scope: Optional[trio._core._run.CancelScope] = None, ) -> None: # please god don't hang if not isinstance(proc, trio_run_in_process.process.Process): @@ -205,7 +216,21 @@ class ActorNursery: proc.join() else: # trio_run_in_process blocking wait - await proc.mng.__aexit__(None, None, None) + if errors: + multierror = trio.MultiError(errors) + # import pdb; pdb.set_trace() + # try: + # with trio.CancelScope(shield=True): + # await proc.mng.__aexit__( + # type(multierror), + # multierror, + # multierror.__traceback__, + # ) + # except BaseException as err: + # import pdb; pdb.set_trace() + # pass + # else: + await proc.mng.__aexit__(None, None, None) # proc.nursery.cancel_scope.cancel() log.debug(f"Joined {proc}") @@ -214,7 +239,7 @@ class ActorNursery: # proc terminated, cancel result waiter that may have # been spawned in tandem if not done already - if cancel_scope: + if cancel_scope: # and not portal._cancelled: log.warning( f"Cancelling existing result waiter task for {actor.uid}") cancel_scope.cancel() @@ -226,18 +251,19 @@ class ActorNursery: errors: List[Exception] = [] # wait on run_in_actor() tasks, unblocks when all complete async with trio.open_nursery() as nursery: + # async with self._nursery as nursery: for subactor, proc, portal in children.values(): cs = None # portal from ``run_in_actor()`` if portal in self._cancel_after_result_on_exit: - assert portal cs = await nursery.start( cancel_on_completion, portal, subactor) # TODO: how do we handle remote host spawned actors? nursery.start_soon( - wait_for_proc, proc, subactor, cs) + wait_for_proc, proc, subactor, portal, cs) if errors: + multierror = trio.MultiError(errors) if not self.cancelled: # bubble up error(s) here and expect to be called again # once the nursery has been cancelled externally (ex. @@ -255,8 +281,7 @@ class ActorNursery: async with trio.open_nursery() as nursery: for subactor, proc, portal in children.values(): # TODO: how do we handle remote host spawned actors? - assert portal - nursery.start_soon(wait_for_proc, proc, subactor, cs) + nursery.start_soon(wait_for_proc, proc, subactor, portal, cs) log.debug(f"All subactors for {self} have terminated") if errors: @@ -364,10 +389,18 @@ class ActorNursery: async def open_nursery() -> typing.AsyncGenerator[ActorNursery, None]: """Create and yield a new ``ActorNursery``. """ + # TODO: figure out supervisors from erlang + actor = current_actor() if not actor: raise RuntimeError("No actor instance has been defined yet?") - # TODO: figure out supervisors from erlang - async with ActorNursery(actor) as nursery: - yield nursery + # XXX we need this nursery because TRIP is doing all its stuff with + # an `@asynccontextmanager` which has an internal nursery *and* the + # task that opens a nursery must also close it - so we need a path + # in TRIP to make this all kinda work as well. Note I'm basically + # giving up for now - it's probably equivalent amounts of work to + # make TRIP vs. `multiprocessing` work here. + async with trio.open_nursery() as nursery: + async with ActorNursery(actor, nursery) as anursery: + yield anursery From 91c371696844e5583f64bd1de026ee90a0bdb8b8 Mon Sep 17 00:00:00 2001 From: Tyler Goodlet Date: Mon, 20 Jan 2020 11:04:36 -0500 Subject: [PATCH 03/33] Do module abspath loading in actor init --- tractor/_actor.py | 11 ++++++++--- 1 file changed, 8 insertions(+), 3 deletions(-) diff --git a/tractor/_actor.py b/tractor/_actor.py index b652e9f..fe1f67f 100644 --- a/tractor/_actor.py +++ b/tractor/_actor.py @@ -10,7 +10,6 @@ import inspect import uuid import typing from typing import Dict, List, Tuple, Any, Optional -from types import ModuleType import sys import os @@ -169,7 +168,7 @@ class Actor: def __init__( self, name: str, - rpc_module_paths: Dict[str, ModuleType] = {}, + rpc_module_paths: List[str] = {}, statespace: Optional[Dict[str, Any]] = None, uid: str = None, loglevel: str = None, @@ -177,7 +176,13 @@ class Actor: ) -> None: self.name = name self.uid = (name, uid or str(uuid.uuid4())) - self.rpc_module_paths = rpc_module_paths + + mods = {} + for path in rpc_module_paths or (): + mod = importlib.import_module(path) + mods[path] = mod.__file__ + + self.rpc_module_paths = mods self._mods: dict = {} # TODO: consider making this a dynamically defined From c074aea0309189d23d846365c281be816ee09b35 Mon Sep 17 00:00:00 2001 From: Tyler Goodlet Date: Mon, 20 Jan 2020 11:10:51 -0500 Subject: [PATCH 04/33] Support TRIP for process launching This took a ton of tinkering and a rework of the actor nursery tear down logic. The main changes include: - each subprocess is now spawned from inside a trio task from one of two containing nurseries created in the body of `tractor.open_nursery()`: one for `run_in_actor()` processes and one for `start_actor()` "daemons". This is to address the need for `trio-run-in_process.open_in_process()` opening a nursery which must be closed from the same task that opened it. Using this same approach for `multiprocessing` seems to work well. The nurseries are waited in order (rip actors then daemon actors) during tear down which allows for avoiding the recursive re-entry of `ActorNursery.wait()` handled prior. - pull out all the nested functions / closures that were in `ActorNursery.wait()` and move into the `_spawn` module such that that process shutdown logic takes place in each containing task's code path. This allows for vastly simplifying `.wait()` to just contain an event trigger which initiates process waiting / result collection. Likely `.wait()` should just be removed since it can no longer be used to synchronously wait on the actor nursery. - drop `ActorNursery.__aenter__()` / `.__atexit__()` and move this "supervisor" tear down logic into the closing block of `open_nursery()`. This not only cleans makes the code more comprehensible it also makes our nursery implementation look more like the one in `trio`. Resolves #93 --- tractor/_spawn.py | 267 +++++++++++++++++++++++++-------- tractor/_trionics.py | 340 ++++++++++++------------------------------- 2 files changed, 301 insertions(+), 306 deletions(-) diff --git a/tractor/_spawn.py b/tractor/_spawn.py index 026e257..5e99723 100644 --- a/tractor/_spawn.py +++ b/tractor/_spawn.py @@ -3,12 +3,15 @@ Process spawning. Mostly just wrapping around ``multiprocessing``. """ +import inspect import multiprocessing as mp - -# from . import log +import platform +from typing import Any, List, Dict import trio import trio_run_in_process +from trio_typing import TaskStatus +from async_generator import aclosing try: from multiprocessing import semaphore_tracker # type: ignore @@ -23,12 +26,24 @@ from typing import Tuple from . import _forkserver_override from ._state import current_actor -from ._actor import Actor +from .log import get_logger +from ._portal import Portal +from ._actor import Actor, ActorFailure +log = get_logger('tractor') + _ctx: mp.context.BaseContext = mp.get_context("spawn") # type: ignore +if platform.system() == 'Windows': + async def proc_waiter(proc: mp.Process) -> None: + await trio.hazmat.WaitForSingleObject(proc.sentinel) +else: + async def proc_waiter(proc: mp.Process) -> None: + await trio.hazmat.wait_readable(proc.sentinel) + + def try_set_start_method(name: str) -> mp.context.BaseContext: """Attempt to set the start method for ``multiprocess.Process`` spawning. @@ -60,73 +75,203 @@ def is_main_process() -> bool: return mp.current_process().name == 'MainProcess' +async def exhaust_portal( + portal: Portal, + actor: Actor +) -> Any: + """Pull final result from portal (assuming it has one). + + If the main task is an async generator do our best to consume + what's left of it. + """ + try: + log.debug(f"Waiting on final result from {actor.uid}") + final = res = await portal.result() + # if it's an async-gen then alert that we're cancelling it + if inspect.isasyncgen(res): + final = [] + log.warning( + f"Blindly consuming asyncgen for {actor.uid}") + with trio.fail_after(1): + async with aclosing(res) as agen: + async for item in agen: + log.debug(f"Consuming item {item}") + final.append(item) + except (Exception, trio.MultiError) as err: + # we reraise in the parent task via a ``trio.MultiError`` + return err + else: + return final + + +async def cancel_on_completion( + portal: Portal, + actor: Actor, + errors: List[Exception], + task_status=trio.TASK_STATUS_IGNORED, +) -> None: + """Cancel actor gracefully once it's "main" portal's + result arrives. + + Should only be called for actors spawned with `run_in_actor()`. + """ + with trio.CancelScope() as cs: + task_status.started(cs) + # if this call errors we store the exception for later + # in ``errors`` which will be reraised inside + # a MultiError and we still send out a cancel request + result = await exhaust_portal(portal, actor) + if isinstance(result, Exception): + errors[actor.uid] = result + log.warning( + f"Cancelling {portal.channel.uid} after error {result}" + ) + else: + log.info(f"Cancelling {portal.channel.uid} gracefully") + + # cancel the process now that we have a final result + await portal.cancel_actor() + + # XXX: lol, this will never get run without a shield above.. + # if cs.cancelled_caught: + # log.warning( + # "Result waiter was cancelled, process may have died") + + async def new_proc( name: str, - actor: Actor, + actor_nursery: 'ActorNursery', + subactor: Actor, + errors: Dict[str, Exception], # passed through to actor main bind_addr: Tuple[str, int], parent_addr: Tuple[str, int], - nursery: trio.Nursery = None, + begin_wait_phase: trio.Event, use_trip: bool = True, + task_status: TaskStatus[Portal] = trio.TASK_STATUS_IGNORED ) -> mp.Process: """Create a new ``multiprocessing.Process`` using the spawn method as configured using ``try_set_start_method()``. """ - if use_trip: # trio_run_in_process - mng = trio_run_in_process.open_in_process( - actor._trip_main, - bind_addr, - parent_addr, - nursery=nursery, - ) - # XXX playing with trip logging - # l = log.get_console_log(level='debug', name=None, _root_name='trio-run-in-process') - # import logging - # logger = logging.getLogger("trio-run-in-process") - # logger.setLevel('DEBUG') - proc = await mng.__aenter__() - proc.mng = mng - return proc - else: - # use multiprocessing - start_method = _ctx.get_start_method() - if start_method == 'forkserver': - # XXX do our hackery on the stdlib to avoid multiple - # forkservers (one at each subproc layer). - fs = forkserver._forkserver - curr_actor = current_actor() - if is_main_process() and not curr_actor._forkserver_info: - # if we're the "main" process start the forkserver only once - # and pass its ipc info to downstream children - # forkserver.set_forkserver_preload(rpc_module_paths) - forkserver.ensure_running() - fs_info = ( - fs._forkserver_address, - fs._forkserver_alive_fd, - getattr(fs, '_forkserver_pid', None), - getattr(resource_tracker._resource_tracker, '_pid', None), - resource_tracker._resource_tracker._fd, - ) - else: - assert curr_actor._forkserver_info - fs_info = ( - fs._forkserver_address, - fs._forkserver_alive_fd, - fs._forkserver_pid, - resource_tracker._resource_tracker._pid, - resource_tracker._resource_tracker._fd, - ) = curr_actor._forkserver_info - else: - fs_info = (None, None, None, None, None) + cancel_scope = None - return _ctx.Process( - target=actor._mp_main, - args=( + async with trio.open_nursery() as nursery: + if use_trip: + # trio_run_in_process + async with trio_run_in_process.open_in_process( + subactor._trip_main, bind_addr, - fs_info, - start_method, - parent_addr - ), - # daemon=True, - name=name, - ) + parent_addr, + ) as proc: + log.info(f"Started {proc}") + + # wait for actor to spawn and connect back to us + # channel should have handshake completed by the + # local actor by the time we get a ref to it + event, chan = await actor_nursery._actor.wait_for_peer( + subactor.uid) + portal = Portal(chan) + actor_nursery._children[subactor.uid] = ( + subactor, proc, portal) + task_status.started(portal) + + # wait for ActorNursery.wait() to be called + await actor_nursery._join_procs.wait() + + if portal in actor_nursery._cancel_after_result_on_exit: + cancel_scope = await nursery.start( + cancel_on_completion, portal, subactor, errors) + + # TRIP blocks here until process is complete + else: + # `multiprocessing` + start_method = _ctx.get_start_method() + if start_method == 'forkserver': + # XXX do our hackery on the stdlib to avoid multiple + # forkservers (one at each subproc layer). + fs = forkserver._forkserver + curr_actor = current_actor() + if is_main_process() and not curr_actor._forkserver_info: + # if we're the "main" process start the forkserver + # only once and pass its ipc info to downstream + # children + # forkserver.set_forkserver_preload(rpc_module_paths) + forkserver.ensure_running() + fs_info = ( + fs._forkserver_address, + fs._forkserver_alive_fd, + getattr(fs, '_forkserver_pid', None), + getattr( + resource_tracker._resource_tracker, '_pid', None), + resource_tracker._resource_tracker._fd, + ) + else: + assert curr_actor._forkserver_info + fs_info = ( + fs._forkserver_address, + fs._forkserver_alive_fd, + fs._forkserver_pid, + resource_tracker._resource_tracker._pid, + resource_tracker._resource_tracker._fd, + ) = curr_actor._forkserver_info + else: + fs_info = (None, None, None, None, None) + + proc = _ctx.Process( + target=subactor._mp_main, + args=( + bind_addr, + fs_info, + start_method, + parent_addr + ), + # daemon=True, + name=name, + ) + # `multiprocessing` only (since no async interface): + # register the process before start in case we get a cancel + # request before the actor has fully spawned - then we can wait + # for it to fully come up before sending a cancel request + actor_nursery._children[subactor.uid] = (subactor, proc, None) + + proc.start() + if not proc.is_alive(): + raise ActorFailure("Couldn't start sub-actor?") + + log.info(f"Started {proc}") + + # wait for actor to spawn and connect back to us + # channel should have handshake completed by the + # local actor by the time we get a ref to it + event, chan = await actor_nursery._actor.wait_for_peer( + subactor.uid) + portal = Portal(chan) + actor_nursery._children[subactor.uid] = (subactor, proc, portal) + + # unblock parent task + task_status.started(portal) + + # wait for ActorNursery.wait() to be called + # this is required to ensure synchronization + # with startup and registration of this actor in + # ActorNursery.run_in_actor() + await actor_nursery._join_procs.wait() + + if portal in actor_nursery._cancel_after_result_on_exit: + cancel_scope = await nursery.start( + cancel_on_completion, portal, subactor, errors) + + # TODO: timeout block here? + if proc.is_alive(): + await proc_waiter(proc) + proc.join() + + log.debug(f"Joined {proc}") + # pop child entry to indicate we are no longer managing this subactor + subactor, proc, portal = actor_nursery._children.pop(subactor.uid) + # cancel result waiter that may have been spawned in + # tandem if not done already + if cancel_scope: + log.warning( + f"Cancelling existing result waiter task for {subactor.uid}") + cancel_scope.cancel() diff --git a/tractor/_trionics.py b/tractor/_trionics.py index 9808935..49f7838 100644 --- a/tractor/_trionics.py +++ b/tractor/_trionics.py @@ -1,42 +1,37 @@ """ ``trio`` inspired apis and helpers """ -import inspect -import importlib -import platform import multiprocessing as mp from typing import Tuple, List, Dict, Optional, Any import typing import trio -from async_generator import asynccontextmanager, aclosing -import trio_run_in_process +from async_generator import asynccontextmanager from ._state import current_actor from .log import get_logger, get_loglevel -from ._actor import Actor, ActorFailure +from ._actor import Actor # , ActorFailure from ._portal import Portal from . import _spawn -if platform.system() == 'Windows': - async def proc_waiter(proc: mp.Process) -> None: - await trio.hazmat.WaitForSingleObject(proc.sentinel) -else: - async def proc_waiter(proc: mp.Process) -> None: - await trio.hazmat.wait_readable(proc.sentinel) - - log = get_logger('tractor') class ActorNursery: """Spawn scoped subprocess actors. """ - def __init__(self, actor: Actor, nursery: trio.Nursery) -> None: + def __init__( + self, + actor: Actor, + ria_nursery: trio.Nursery, + da_nursery: trio.Nursery, + errors: Dict[str, Exception], + ) -> None: # self.supervisor = supervisor # TODO self._actor: Actor = actor - self._nursery = nursery + self._ria_nursery = ria_nursery + self._da_nursery = da_nursery self._children: Dict[ Tuple[str, str], Tuple[Actor, mp.Process, Optional[Portal]] @@ -45,10 +40,8 @@ class ActorNursery: # cancelled when their "main" result arrives self._cancel_after_result_on_exit: set = set() self.cancelled: bool = False - # self._aexitstack = contextlib.AsyncExitStack() - - async def __aenter__(self): - return self + self._join_procs = trio.Event() + self.errors = errors async def start_actor( self, @@ -57,51 +50,34 @@ class ActorNursery: statespace: Optional[Dict[str, Any]] = None, rpc_module_paths: List[str] = None, loglevel: str = None, # set log level per subactor + nursery: trio.Nursery = None, ) -> Portal: loglevel = loglevel or self._actor.loglevel or get_loglevel() - mods = {} - for path in rpc_module_paths or (): - mod = importlib.import_module(path) - mods[path] = mod.__file__ - - actor = Actor( + subactor = Actor( name, # modules allowed to invoked funcs from - rpc_module_paths=mods, + rpc_module_paths=rpc_module_paths, statespace=statespace, # global proc state vars loglevel=loglevel, arbiter_addr=current_actor()._arb_addr, ) parent_addr = self._actor.accept_addr assert parent_addr - proc = await _spawn.new_proc( + + # start a task to spawn a process + # blocks until process has been started and a portal setup + nursery = nursery or self._da_nursery + return await nursery.start( + _spawn.new_proc, name, - actor, + self, + subactor, + self.errors, bind_addr, parent_addr, - self._nursery, + nursery, ) - # `multiprocessing` only (since no async interface): - # register the process before start in case we get a cancel - # request before the actor has fully spawned - then we can wait - # for it to fully come up before sending a cancel request - self._children[actor.uid] = (actor, proc, None) - - if not isinstance(proc, trio_run_in_process.process.Process): - proc.start() - if not proc.is_alive(): - raise ActorFailure("Couldn't start sub-actor?") - - log.info(f"Started {proc}") - # wait for actor to spawn and connect back to us - # channel should have handshake completed by the - # local actor by the time we get a ref to it - event, chan = await self._actor.wait_for_peer(actor.uid) - portal = Portal(chan) - self._children[actor.uid] = (actor, proc, portal) - - return portal async def run_in_actor( self, @@ -127,6 +103,8 @@ class ActorNursery: bind_addr=bind_addr, statespace=statespace, loglevel=loglevel, + # use the run_in_actor nursery + nursery=self._ria_nursery, ) # this marks the actor to be cancelled after its portal result # is retreived, see ``wait()`` below. @@ -140,153 +118,9 @@ class ActorNursery: async def wait(self) -> None: """Wait for all subactors to complete. - - This is probably the most complicated (and confusing, sorry) - function that does all the clever crap to deal with cancellation, - error propagation, and graceful subprocess tear down. """ - async def exhaust_portal(portal, actor): - """Pull final result from portal (assuming it has one). - - If the main task is an async generator do our best to consume - what's left of it. - """ - try: - log.debug(f"Waiting on final result from {actor.uid}") - final = res = await portal.result() - # if it's an async-gen then alert that we're cancelling it - if inspect.isasyncgen(res): - final = [] - log.warning( - f"Blindly consuming asyncgen for {actor.uid}") - with trio.fail_after(1): - async with aclosing(res) as agen: - async for item in agen: - log.debug(f"Consuming item {item}") - final.append(item) - except (Exception, trio.MultiError) as err: - # we reraise in the parent task via a ``trio.MultiError`` - return err - else: - return final - - async def cancel_on_completion( - portal: Portal, - actor: Actor, - task_status=trio.TASK_STATUS_IGNORED, - ) -> None: - """Cancel actor gracefully once it's "main" portal's - result arrives. - - Should only be called for actors spawned with `run_in_actor()`. - """ - with trio.CancelScope() as cs: - task_status.started(cs) - # if this call errors we store the exception for later - # in ``errors`` which will be reraised inside - # a MultiError and we still send out a cancel request - result = await exhaust_portal(portal, actor) - if isinstance(result, Exception): - errors.append(result) - log.warning( - f"Cancelling {portal.channel.uid} after error {result}" - ) - else: - log.info(f"Cancelling {portal.channel.uid} gracefully") - - # cancel the process now that we have a final result - await portal.cancel_actor() - - # XXX: lol, this will never get run without a shield above.. - # if cs.cancelled_caught: - # log.warning( - # "Result waiter was cancelled, process may have died") - - async def wait_for_proc( - proc: mp.Process, - actor: Actor, - portal: Portal, - cancel_scope: Optional[trio._core._run.CancelScope] = None, - ) -> None: - # please god don't hang - if not isinstance(proc, trio_run_in_process.process.Process): - # TODO: timeout block here? - if proc.is_alive(): - await proc_waiter(proc) - proc.join() - else: - # trio_run_in_process blocking wait - if errors: - multierror = trio.MultiError(errors) - # import pdb; pdb.set_trace() - # try: - # with trio.CancelScope(shield=True): - # await proc.mng.__aexit__( - # type(multierror), - # multierror, - # multierror.__traceback__, - # ) - # except BaseException as err: - # import pdb; pdb.set_trace() - # pass - # else: - await proc.mng.__aexit__(None, None, None) - # proc.nursery.cancel_scope.cancel() - - log.debug(f"Joined {proc}") - # indicate we are no longer managing this subactor - self._children.pop(actor.uid) - - # proc terminated, cancel result waiter that may have - # been spawned in tandem if not done already - if cancel_scope: # and not portal._cancelled: - log.warning( - f"Cancelling existing result waiter task for {actor.uid}") - cancel_scope.cancel() - log.debug(f"Waiting on all subactors to complete") - # since we pop each child subactor on termination, - # iterate a copy - children = self._children.copy() - errors: List[Exception] = [] - # wait on run_in_actor() tasks, unblocks when all complete - async with trio.open_nursery() as nursery: - # async with self._nursery as nursery: - for subactor, proc, portal in children.values(): - cs = None - # portal from ``run_in_actor()`` - if portal in self._cancel_after_result_on_exit: - cs = await nursery.start( - cancel_on_completion, portal, subactor) - # TODO: how do we handle remote host spawned actors? - nursery.start_soon( - wait_for_proc, proc, subactor, portal, cs) - - if errors: - multierror = trio.MultiError(errors) - if not self.cancelled: - # bubble up error(s) here and expect to be called again - # once the nursery has been cancelled externally (ex. - # from within __aexit__() if an error is caught around - # ``self.wait()`` then, ``self.cancel()`` is called - # immediately, in the default supervisor strat, after - # which in turn ``self.wait()`` is called again.) - raise trio.MultiError(errors) - - # wait on all `start_actor()` subactors to complete - # if errors were captured above and we have not been cancelled - # then these ``start_actor()`` spawned actors will block until - # cancelled externally - children = self._children.copy() - async with trio.open_nursery() as nursery: - for subactor, proc, portal in children.values(): - # TODO: how do we handle remote host spawned actors? - nursery.start_soon(wait_for_proc, proc, subactor, portal, cs) - - log.debug(f"All subactors for {self} have terminated") - if errors: - # always raise any error if we're also cancelled - raise trio.MultiError(errors) + self._join_procs.set() async def cancel(self, hard_kill: bool = False) -> None: """Cancel this nursery by instructing each subactor to cancel @@ -304,7 +138,7 @@ class ActorNursery: log.debug(f"Cancelling nursery") with trio.move_on_after(3) as cs: - async with trio.open_nursery() as n: + async with trio.open_nursery() as nursery: for subactor, proc, portal in self._children.values(): if hard_kill: do_hard_kill(proc) @@ -331,59 +165,20 @@ class ActorNursery: # spawn cancel tasks for each sub-actor assert portal - n.start_soon(portal.cancel_actor) + nursery.start_soon(portal.cancel_actor) # if we cancelled the cancel (we hung cancelling remote actors) # then hard kill all sub-processes if cs.cancelled_caught: log.error(f"Failed to gracefully cancel {self}, hard killing!") - async with trio.open_nursery() as n: + async with trio.open_nursery(): for subactor, proc, portal in self._children.values(): - n.start_soon(do_hard_kill, proc) + nursery.start_soon(do_hard_kill, proc) # mark ourselves as having (tried to have) cancelled all subactors self.cancelled = True await self.wait() - async def __aexit__(self, etype, value, tb): - """Wait on all subactor's main routines to complete. - """ - # XXX: this is effectively the (for now) lone - # cancellation/supervisor strategy (one-cancels-all) - # which exactly mimicks trio's behaviour - if etype is not None: - try: - # XXX: hypothetically an error could be raised and then - # a cancel signal shows up slightly after in which case - # the `else:` block here might not complete? - # For now, shield both. - with trio.CancelScope(shield=True): - if etype in (trio.Cancelled, KeyboardInterrupt): - log.warning( - f"Nursery for {current_actor().uid} was " - f"cancelled with {etype}") - else: - log.exception( - f"Nursery for {current_actor().uid} " - f"errored with {etype}, ") - await self.cancel() - except trio.MultiError as merr: - if value not in merr.exceptions: - raise trio.MultiError(merr.exceptions + [value]) - raise - else: - log.debug(f"Waiting on subactors {self._children} to complete") - try: - await self.wait() - except (Exception, trio.MultiError) as err: - log.warning(f"Nursery cancelling due to {err}") - if self._children: - with trio.CancelScope(shield=True): - await self.cancel() - raise - - log.debug(f"Nursery teardown complete") - @asynccontextmanager async def open_nursery() -> typing.AsyncGenerator[ActorNursery, None]: @@ -395,12 +190,67 @@ async def open_nursery() -> typing.AsyncGenerator[ActorNursery, None]: if not actor: raise RuntimeError("No actor instance has been defined yet?") - # XXX we need this nursery because TRIP is doing all its stuff with + # XXX we use these nurseries because TRIP is doing all its stuff with # an `@asynccontextmanager` which has an internal nursery *and* the # task that opens a nursery must also close it - so we need a path - # in TRIP to make this all kinda work as well. Note I'm basically - # giving up for now - it's probably equivalent amounts of work to - # make TRIP vs. `multiprocessing` work here. - async with trio.open_nursery() as nursery: - async with ActorNursery(actor, nursery) as anursery: - yield anursery + # in TRIP to make this all kinda work as well. + errors: Dict[str, Exception] = {} + async with trio.open_nursery() as da_nursery: + try: + async with trio.open_nursery() as ria_nursery: + anursery = ActorNursery( + actor, ria_nursery, da_nursery, errors + ) + try: + # spawning of actors happens in this scope after + # we yield to the caller. + yield anursery + log.debug( + f"Waiting on subactors {anursery._children}" + "to complete" + ) + # anursery.wait() + # except (trio.Cancelled, KeyboardInterrupt) as err: + except (BaseException, Exception) as err: + anursery._join_procs.set() + try: + # XXX: hypothetically an error could be raised and then + # a cancel signal shows up slightly after in which case + # the `else:` block here might not complete? + # For now, shield both. + with trio.CancelScope(shield=True): + if err in (trio.Cancelled, KeyboardInterrupt): + log.warning( + f"Nursery for {current_actor().uid} was " + f"cancelled with {err}") + else: + log.exception( + f"Nursery for {current_actor().uid} " + f"errored with {err}, ") + await anursery.cancel() + except trio.MultiError as merr: + if err not in merr.exceptions: + raise trio.MultiError(merr.exceptions + [err]) + else: + raise + + # last bit before first nursery block end + log.debug(f"Waiting on all subactors to complete") + anursery._join_procs.set() + # ria_nursery scope + except (Exception, trio.MultiError) as err: + log.warning(f"Nursery cancelling due to {err}") + if anursery._children: + with trio.CancelScope(shield=True): + await anursery.cancel() + raise + finally: + if errors: + if anursery._children: + with trio.CancelScope(shield=True): + await anursery.cancel() + if len(errors) > 1: + raise trio.MultiError(errors.values()) + else: + raise list(errors.values())[0] + log.debug(f"Nursery teardown complete") From 6c454160164477a45798061c5a1be04c147238d7 Mon Sep 17 00:00:00 2001 From: Tyler Goodlet Date: Mon, 20 Jan 2020 13:39:11 -0500 Subject: [PATCH 05/33] Drop ActorNusery.wait(); it's no longer necessary really --- tractor/_trionics.py | 17 ++++------------- 1 file changed, 4 insertions(+), 13 deletions(-) diff --git a/tractor/_trionics.py b/tractor/_trionics.py index 49f7838..4f97b5e 100644 --- a/tractor/_trionics.py +++ b/tractor/_trionics.py @@ -107,7 +107,7 @@ class ActorNursery: nursery=self._ria_nursery, ) # this marks the actor to be cancelled after its portal result - # is retreived, see ``wait()`` below. + # is retreived, see logic in `open_nursery()` below. self._cancel_after_result_on_exit.add(portal) await portal._submit_for_result( mod_path, @@ -116,12 +116,6 @@ class ActorNursery: ) return portal - async def wait(self) -> None: - """Wait for all subactors to complete. - """ - log.debug(f"Waiting on all subactors to complete") - self._join_procs.set() - async def cancel(self, hard_kill: bool = False) -> None: """Cancel this nursery by instructing each subactor to cancel itself and wait for all subactors to terminate. @@ -177,7 +171,7 @@ class ActorNursery: # mark ourselves as having (tried to have) cancelled all subactors self.cancelled = True - await self.wait() + self._join_procs.set() @asynccontextmanager @@ -192,8 +186,7 @@ async def open_nursery() -> typing.AsyncGenerator[ActorNursery, None]: # XXX we use these nurseries because TRIP is doing all its stuff with # an `@asynccontextmanager` which has an internal nursery *and* the - # task that opens a nursery must also close it - so we need a path - # in TRIP to make this all kinda work as well. + # task that opens a nursery must also close it. errors: Dict[str, Exception] = {} async with trio.open_nursery() as da_nursery: try: @@ -209,8 +202,6 @@ async def open_nursery() -> typing.AsyncGenerator[ActorNursery, None]: f"Waiting on subactors {anursery._children}" "to complete" ) - # anursery.wait() - # except (trio.Cancelled, KeyboardInterrupt) as err: except (BaseException, Exception) as err: anursery._join_procs.set() try: @@ -234,7 +225,7 @@ async def open_nursery() -> typing.AsyncGenerator[ActorNursery, None]: else: raise - # last bit before first nursery block end + # last bit before first nursery block ends log.debug(f"Waiting on all subactors to complete") anursery._join_procs.set() # ria_nursery scope From 3c86aa2ab53ff216249ce207f1293428c8018bd9 Mon Sep 17 00:00:00 2001 From: Tyler Goodlet Date: Mon, 20 Jan 2020 13:40:15 -0500 Subject: [PATCH 06/33] Add trio-run-in-process` as dep --- README.rst | 2 -- setup.py | 1 + 2 files changed, 1 insertion(+), 2 deletions(-) diff --git a/README.rst b/README.rst index 7b6992c..dc90e4a 100644 --- a/README.rst +++ b/README.rst @@ -83,8 +83,6 @@ Its tenets non-comprehensively include: are greatly appreciated! .. _concept-in-progress: https://trio.discourse.group/t/structured-concurrency-kickoff/55 -.. _pulsar: http://quantmind.github.io/pulsar/design.html -.. _execnet: https://codespeak.net/execnet/ Install diff --git a/setup.py b/setup.py index cb44d1c..3fe45dc 100755 --- a/setup.py +++ b/setup.py @@ -39,6 +39,7 @@ setup( ], install_requires=[ 'msgpack', 'trio>0.8', 'async_generator', 'colorlog', 'wrapt', + 'trio_typing', 'trio-run-in-process', ], tests_require=['pytest'], python_requires=">=3.7", From e1a55a6f4f5ca60968ea27c2f0f1fb04946fc819 Mon Sep 17 00:00:00 2001 From: Tyler Goodlet Date: Mon, 20 Jan 2020 13:41:08 -0500 Subject: [PATCH 07/33] Importing happens once locally now so expect a local error --- tests/test_rpc.py | 11 +++++++---- 1 file changed, 7 insertions(+), 4 deletions(-) diff --git a/tests/test_rpc.py b/tests/test_rpc.py index 5dec131..0258f35 100644 --- a/tests/test_rpc.py +++ b/tests/test_rpc.py @@ -60,7 +60,7 @@ def test_rpc_errors(arb_addr, to_call, testdir): if exposed_mods == ['tmp_mod']: # create an importable module with a bad import testdir.syspathinsert() - # module should cause raise a ModuleNotFoundError at import + # module should cause a raise of a ModuleNotFoundError at import testdir.makefile('.py', tmp_mod=funcname) # no need to exposed module to the subactor @@ -69,7 +69,9 @@ def test_rpc_errors(arb_addr, to_call, testdir): func_defined = False # subactor should not try to invoke anything subactor_requests_to = None - remote_err = trio.MultiError + # the module will be attempted to be imported locally but will + # fail in the initial local instance of the actor + remote_err = inside_err async def main(): actor = tractor.current_actor() @@ -100,7 +102,7 @@ def test_rpc_errors(arb_addr, to_call, testdir): if exposed_mods and func_defined: run() else: - # underlying errors are propagated upwards (yet) + # underlying errors aren't propagated upwards (yet) with pytest.raises(remote_err) as err: run() @@ -114,4 +116,5 @@ def test_rpc_errors(arb_addr, to_call, testdir): value.exceptions )) - assert value.type is inside_err + if getattr(value, 'type', None): + assert value.type is inside_err From 4b0554b61f41e5b3fe47d0898ee24510c2bb2e94 Mon Sep 17 00:00:00 2001 From: Tyler Goodlet Date: Mon, 20 Jan 2020 21:06:49 -0500 Subject: [PATCH 08/33] Type checker fixes --- tractor/_actor.py | 2 +- tractor/_spawn.py | 21 +++++++++------------ tractor/_trionics.py | 14 ++++++++------ 3 files changed, 18 insertions(+), 19 deletions(-) diff --git a/tractor/_actor.py b/tractor/_actor.py index fe1f67f..d46471d 100644 --- a/tractor/_actor.py +++ b/tractor/_actor.py @@ -168,7 +168,7 @@ class Actor: def __init__( self, name: str, - rpc_module_paths: List[str] = {}, + rpc_module_paths: List[str] = [], statespace: Optional[Dict[str, Any]] = None, uid: str = None, loglevel: str = None, diff --git a/tractor/_spawn.py b/tractor/_spawn.py index 5e99723..a26cb8e 100644 --- a/tractor/_spawn.py +++ b/tractor/_spawn.py @@ -107,8 +107,8 @@ async def exhaust_portal( async def cancel_on_completion( portal: Portal, actor: Actor, - errors: List[Exception], - task_status=trio.TASK_STATUS_IGNORED, + errors: Dict[Tuple[str, str], Exception], + task_status: TaskStatus[trio.CancelScope] = trio.TASK_STATUS_IGNORED, ) -> None: """Cancel actor gracefully once it's "main" portal's result arrives. @@ -127,29 +127,26 @@ async def cancel_on_completion( f"Cancelling {portal.channel.uid} after error {result}" ) else: - log.info(f"Cancelling {portal.channel.uid} gracefully") + log.info( + f"Cancelling {portal.channel.uid} gracefully " + "after result {result}") # cancel the process now that we have a final result await portal.cancel_actor() - # XXX: lol, this will never get run without a shield above.. - # if cs.cancelled_caught: - # log.warning( - # "Result waiter was cancelled, process may have died") - async def new_proc( name: str, - actor_nursery: 'ActorNursery', + actor_nursery: 'ActorNursery', # type: ignore subactor: Actor, - errors: Dict[str, Exception], + errors: Dict[Tuple[str, str], Exception], # passed through to actor main bind_addr: Tuple[str, int], parent_addr: Tuple[str, int], begin_wait_phase: trio.Event, use_trip: bool = True, task_status: TaskStatus[Portal] = trio.TASK_STATUS_IGNORED -) -> mp.Process: +) -> None: """Create a new ``multiprocessing.Process`` using the spawn method as configured using ``try_set_start_method()``. """ @@ -217,7 +214,7 @@ async def new_proc( else: fs_info = (None, None, None, None, None) - proc = _ctx.Process( + proc = _ctx.Process( # type: ignore target=subactor._mp_main, args=( bind_addr, diff --git a/tractor/_trionics.py b/tractor/_trionics.py index 4f97b5e..a20f8bb 100644 --- a/tractor/_trionics.py +++ b/tractor/_trionics.py @@ -26,7 +26,7 @@ class ActorNursery: actor: Actor, ria_nursery: trio.Nursery, da_nursery: trio.Nursery, - errors: Dict[str, Exception], + errors: Dict[Tuple[str, str], Exception], ) -> None: # self.supervisor = supervisor # TODO self._actor: Actor = actor @@ -57,7 +57,7 @@ class ActorNursery: subactor = Actor( name, # modules allowed to invoked funcs from - rpc_module_paths=rpc_module_paths, + rpc_module_paths=rpc_module_paths or [], statespace=statespace, # global proc state vars loglevel=loglevel, arbiter_addr=current_actor()._arb_addr, @@ -68,7 +68,9 @@ class ActorNursery: # start a task to spawn a process # blocks until process has been started and a portal setup nursery = nursery or self._da_nursery - return await nursery.start( + + # XXX: the type ignore is actually due to a `mypy` bug + return await nursery.start( # type: ignore _spawn.new_proc, name, self, @@ -186,8 +188,8 @@ async def open_nursery() -> typing.AsyncGenerator[ActorNursery, None]: # XXX we use these nurseries because TRIP is doing all its stuff with # an `@asynccontextmanager` which has an internal nursery *and* the - # task that opens a nursery must also close it. - errors: Dict[str, Exception] = {} + # task that opens a nursery **must also close it**. + errors: Dict[Tuple[str, str], Exception] = {} async with trio.open_nursery() as da_nursery: try: async with trio.open_nursery() as ria_nursery: @@ -241,7 +243,7 @@ async def open_nursery() -> typing.AsyncGenerator[ActorNursery, None]: with trio.CancelScope(shield=True): await anursery.cancel() if len(errors) > 1: - raise trio.MultiError(errors.values()) + raise trio.MultiError(tuple(errors.values())) else: raise list(errors.values())[0] log.debug(f"Nursery teardown complete") From f1a96c168074842cd14d92e687a155ceb6d81f50 Mon Sep 17 00:00:00 2001 From: Tyler Goodlet Date: Tue, 21 Jan 2020 15:28:12 -0500 Subject: [PATCH 09/33] Add mypy.ini lel --- mypy.ini | 2 ++ 1 file changed, 2 insertions(+) create mode 100644 mypy.ini diff --git a/mypy.ini b/mypy.ini new file mode 100644 index 0000000..b991c7c --- /dev/null +++ b/mypy.ini @@ -0,0 +1,2 @@ +[mypy] +plugins = trio_typing.plugin From ddbf55768f8ef2732444aba3df907e108d20e4e1 Mon Sep 17 00:00:00 2001 From: Tyler Goodlet Date: Thu, 23 Jan 2020 01:15:46 -0500 Subject: [PATCH 10/33] Try out trip as the default spawn_method on unix for now --- tractor/__init__.py | 8 ++++++-- tractor/_spawn.py | 23 +++++++++++++++++------ 2 files changed, 23 insertions(+), 8 deletions(-) diff --git a/tractor/__init__.py b/tractor/__init__.py index 10bb895..aef3821 100644 --- a/tractor/__init__.py +++ b/tractor/__init__.py @@ -4,6 +4,7 @@ tractor: An actor model micro-framework built on """ import importlib from functools import partial +import platform from typing import Tuple, Any, Optional import typing @@ -99,15 +100,18 @@ def run( name: Optional[str] = None, arbiter_addr: Tuple[str, int] = ( _default_arbiter_host, _default_arbiter_port), - # the `multiprocessing` start method: + # either the `multiprocessing` start method: # https://docs.python.org/3/library/multiprocessing.html#contexts-and-start-methods - start_method: str = 'forkserver', + # OR `trio-run-in-process` (the new default). + start_method: str = 'trip', **kwargs, ) -> Any: """Run a trio-actor async function in process. This is tractor's main entry and the start point for any async actor. """ + if platform.system() == 'Windows': + start_method = 'spawn' # only one supported for now _spawn.try_set_start_method(start_method) return trio.run(_main, async_fn, args, kwargs, arbiter_addr, name) diff --git a/tractor/_spawn.py b/tractor/_spawn.py index a26cb8e..39aa22e 100644 --- a/tractor/_spawn.py +++ b/tractor/_spawn.py @@ -34,6 +34,7 @@ from ._actor import Actor, ActorFailure log = get_logger('tractor') _ctx: mp.context.BaseContext = mp.get_context("spawn") # type: ignore +_spawn_method: str = "spawn" if platform.system() == 'Windows': @@ -51,21 +52,31 @@ def try_set_start_method(name: str) -> mp.context.BaseContext: method) is used. """ global _ctx + global _spawn_method allowed = mp.get_all_start_methods() + # no Windows support for trip yet (afaik) + if platform.system() != 'Windows': + allowed += ['trip'] + if name not in allowed: - name = 'spawn' + raise ValueError( + f"Spawn method {name} is unsupported please choose one of {allowed}" + ) + + if name == 'trip': + _spawn_method = name + return name + elif name == 'fork': raise ValueError( "`fork` is unsupported due to incompatibility with `trio`" ) elif name == 'forkserver': _forkserver_override.override_stdlib() + _ctx = mp.get_context(name) - assert name in allowed - - _ctx = mp.get_context(name) return _ctx @@ -144,7 +155,7 @@ async def new_proc( bind_addr: Tuple[str, int], parent_addr: Tuple[str, int], begin_wait_phase: trio.Event, - use_trip: bool = True, + use_trip: bool = False, task_status: TaskStatus[Portal] = trio.TASK_STATUS_IGNORED ) -> None: """Create a new ``multiprocessing.Process`` using the @@ -153,7 +164,7 @@ async def new_proc( cancel_scope = None async with trio.open_nursery() as nursery: - if use_trip: + if use_trip or _spawn_method == 'trip': # trio_run_in_process async with trio_run_in_process.open_in_process( subactor._trip_main, From 44996fe328a4f9802e31466280ecd21bcca56259 Mon Sep 17 00:00:00 2001 From: Tyler Goodlet Date: Thu, 23 Jan 2020 01:16:10 -0500 Subject: [PATCH 11/33] Add trip to start_method parametrizations --- tests/conftest.py | 6 ++++++ 1 file changed, 6 insertions(+) diff --git a/tests/conftest.py b/tests/conftest.py index fd8427f..d9be19a 100644 --- a/tests/conftest.py +++ b/tests/conftest.py @@ -2,6 +2,7 @@ ``tractor`` testing!! """ import random +import platform import pytest import tractor @@ -32,8 +33,13 @@ def arb_addr(): def pytest_generate_tests(metafunc): if 'start_method' in metafunc.fixturenames: + from multiprocessing import get_all_start_methods methods = get_all_start_methods() + + if platform.system() != "Windows": + methods += ['trip'] + if 'fork' in methods: # fork not available on windows, so check before removing # XXX: the fork method is in general incompatible with # trio's global scheduler state From 4c5a60d06a6c93b98cbec4c7672f4bd990ba8a51 Mon Sep 17 00:00:00 2001 From: Tyler Goodlet Date: Thu, 23 Jan 2020 01:23:26 -0500 Subject: [PATCH 12/33] Don't import trip on Windows --- tractor/_spawn.py | 3 ++- 1 file changed, 2 insertions(+), 1 deletion(-) diff --git a/tractor/_spawn.py b/tractor/_spawn.py index 39aa22e..13bcb9a 100644 --- a/tractor/_spawn.py +++ b/tractor/_spawn.py @@ -9,7 +9,6 @@ import platform from typing import Any, List, Dict import trio -import trio_run_in_process from trio_typing import TaskStatus from async_generator import aclosing @@ -41,6 +40,8 @@ if platform.system() == 'Windows': async def proc_waiter(proc: mp.Process) -> None: await trio.hazmat.WaitForSingleObject(proc.sentinel) else: + import trio_run_in_process + async def proc_waiter(proc: mp.Process) -> None: await trio.hazmat.wait_readable(proc.sentinel) From 4837595e36338028a955edd8573da7acc54ed44c Mon Sep 17 00:00:00 2001 From: Tyler Goodlet Date: Thu, 23 Jan 2020 01:32:02 -0500 Subject: [PATCH 13/33] Fake out mypy again --- tractor/_spawn.py | 5 +---- 1 file changed, 1 insertion(+), 4 deletions(-) diff --git a/tractor/_spawn.py b/tractor/_spawn.py index 13bcb9a..b51f1f8 100644 --- a/tractor/_spawn.py +++ b/tractor/_spawn.py @@ -66,10 +66,6 @@ def try_set_start_method(name: str) -> mp.context.BaseContext: f"Spawn method {name} is unsupported please choose one of {allowed}" ) - if name == 'trip': - _spawn_method = name - return name - elif name == 'fork': raise ValueError( "`fork` is unsupported due to incompatibility with `trio`" @@ -78,6 +74,7 @@ def try_set_start_method(name: str) -> mp.context.BaseContext: _forkserver_override.override_stdlib() _ctx = mp.get_context(name) + _spawn_method = name return _ctx From d9803ca906baab7ef6d9394e7b791c8756d38b62 Mon Sep 17 00:00:00 2001 From: Tyler Goodlet Date: Fri, 24 Jan 2020 00:47:01 -0500 Subject: [PATCH 14/33] Be explicit with the real name for trip --- tractor/__init__.py | 4 ++-- tractor/_spawn.py | 6 +++--- 2 files changed, 5 insertions(+), 5 deletions(-) diff --git a/tractor/__init__.py b/tractor/__init__.py index aef3821..a677c59 100644 --- a/tractor/__init__.py +++ b/tractor/__init__.py @@ -102,8 +102,8 @@ def run( _default_arbiter_host, _default_arbiter_port), # either the `multiprocessing` start method: # https://docs.python.org/3/library/multiprocessing.html#contexts-and-start-methods - # OR `trio-run-in-process` (the new default). - start_method: str = 'trip', + # OR `trio_run_in_process` (the new default). + start_method: str = 'trio_run_in_process', **kwargs, ) -> Any: """Run a trio-actor async function in process. diff --git a/tractor/_spawn.py b/tractor/_spawn.py index b51f1f8..f35677c 100644 --- a/tractor/_spawn.py +++ b/tractor/_spawn.py @@ -59,7 +59,7 @@ def try_set_start_method(name: str) -> mp.context.BaseContext: # no Windows support for trip yet (afaik) if platform.system() != 'Windows': - allowed += ['trip'] + allowed += ['trio_run_in_process'] if name not in allowed: raise ValueError( @@ -153,7 +153,7 @@ async def new_proc( bind_addr: Tuple[str, int], parent_addr: Tuple[str, int], begin_wait_phase: trio.Event, - use_trip: bool = False, + use_trio_run_in_process: bool = False, task_status: TaskStatus[Portal] = trio.TASK_STATUS_IGNORED ) -> None: """Create a new ``multiprocessing.Process`` using the @@ -162,7 +162,7 @@ async def new_proc( cancel_scope = None async with trio.open_nursery() as nursery: - if use_trip or _spawn_method == 'trip': + if use_trio_run_in_process or _spawn_method == 'trio_run_in_process': # trio_run_in_process async with trio_run_in_process.open_in_process( subactor._trip_main, From bc259b7eab8a8d609c54ccfd097e0642f24cacf1 Mon Sep 17 00:00:00 2001 From: Tyler Goodlet Date: Fri, 24 Jan 2020 00:54:19 -0500 Subject: [PATCH 15/33] Use trip as default in all tests for now --- tests/conftest.py | 2 +- tractor/testing/_tractor_test.py | 2 +- 2 files changed, 2 insertions(+), 2 deletions(-) diff --git a/tests/conftest.py b/tests/conftest.py index d9be19a..d8607ec 100644 --- a/tests/conftest.py +++ b/tests/conftest.py @@ -38,7 +38,7 @@ def pytest_generate_tests(metafunc): methods = get_all_start_methods() if platform.system() != "Windows": - methods += ['trip'] + methods += ['trio_run_in_process'] if 'fork' in methods: # fork not available on windows, so check before removing # XXX: the fork method is in general incompatible with diff --git a/tractor/testing/_tractor_test.py b/tractor/testing/_tractor_test.py index 7d8289e..2399318 100644 --- a/tractor/testing/_tractor_test.py +++ b/tractor/testing/_tractor_test.py @@ -28,7 +28,7 @@ def tractor_test(fn): *args, loglevel=None, arb_addr=None, - start_method='forkserver', + start_method='trio_run_in_process', **kwargs ): # __tracebackhide__ = True From 783fe53b06c4e53c1cab8095cd3feea7f0a0f360 Mon Sep 17 00:00:00 2001 From: Tyler Goodlet Date: Fri, 24 Jan 2020 00:55:40 -0500 Subject: [PATCH 16/33] Don't mix trip with multiprocessing for now It seems that mixing the two backends in the test suite results in hangs due to lingering forkservers and resource managers from `multiprocessing`? Likely we'll need either 2 separate CI runs to work or someway to be sure that these lingering servers are killed in between tests. --- tests/conftest.py | 2 +- tests/test_cancellation.py | 17 +++++++++++------ 2 files changed, 12 insertions(+), 7 deletions(-) diff --git a/tests/conftest.py b/tests/conftest.py index d8607ec..bdef563 100644 --- a/tests/conftest.py +++ b/tests/conftest.py @@ -38,7 +38,7 @@ def pytest_generate_tests(metafunc): methods = get_all_start_methods() if platform.system() != "Windows": - methods += ['trio_run_in_process'] + methods = ['trio_run_in_process'] if 'fork' in methods: # fork not available on windows, so check before removing # XXX: the fork method is in general incompatible with diff --git a/tests/test_cancellation.py b/tests/test_cancellation.py index 80b66f1..a8e0bec 100644 --- a/tests/test_cancellation.py +++ b/tests/test_cancellation.py @@ -284,16 +284,21 @@ async def spawn_and_error(breadth, depth) -> None: @tractor_test -async def test_nested_multierrors(loglevel, start_method): +async def test_nested_multierrors(loglevel): """Test that failed actor sets are wrapped in `trio.MultiError`s. This test goes only 2 nurseries deep but we should eventually have tests for arbitrary n-depth actor trees. """ - # XXX: forkserver can't seem to handle any more then 2 depth - # process trees for whatever reason. - # Any more process levels then this and we start getting pretty slow anyway - depth = 3 - subactor_breadth = 2 + # if start_method == 'trio_run_in_process': + depth = 2 + subactor_breadth = 3 + # else: + # # XXX: multiprocessing can't seem to handle any more then 2 depth + # # process trees for whatever reason. + # # Any more process levels then this and we see bugs that cause + # # hangs and broken pipes all over the place... + # depth = 1 + # subactor_breadth = 2 with trio.fail_after(120): try: From 27c9760f9647b14a5a72e25b35bfda151f649417 Mon Sep 17 00:00:00 2001 From: Tyler Goodlet Date: Sun, 26 Jan 2020 21:13:29 -0500 Subject: [PATCH 17/33] Be explicit about the spawning backend default Set `trio-run-in-process` as the default on *nix systems and `multiprocessing`'s spawn method on Windows. Enable overriding the default choice using `tractor._spawn.try_set_start_method()`. Allows for easy runs of the test suite using a user chosen backend. --- tractor/__init__.py | 8 +++----- tractor/_actor.py | 6 ++++++ tractor/_spawn.py | 29 ++++++++++++++++++++--------- 3 files changed, 29 insertions(+), 14 deletions(-) diff --git a/tractor/__init__.py b/tractor/__init__.py index a677c59..80eaf05 100644 --- a/tractor/__init__.py +++ b/tractor/__init__.py @@ -4,7 +4,6 @@ tractor: An actor model micro-framework built on """ import importlib from functools import partial -import platform from typing import Tuple, Any, Optional import typing @@ -103,16 +102,15 @@ def run( # either the `multiprocessing` start method: # https://docs.python.org/3/library/multiprocessing.html#contexts-and-start-methods # OR `trio_run_in_process` (the new default). - start_method: str = 'trio_run_in_process', + start_method: Optional[str] = None, **kwargs, ) -> Any: """Run a trio-actor async function in process. This is tractor's main entry and the start point for any async actor. """ - if platform.system() == 'Windows': - start_method = 'spawn' # only one supported for now - _spawn.try_set_start_method(start_method) + if start_method is not None: + _spawn.try_set_start_method(start_method) return trio.run(_main, async_fn, args, kwargs, arbiter_addr, name) diff --git a/tractor/_actor.py b/tractor/_actor.py index d46471d..eb0570d 100644 --- a/tractor/_actor.py +++ b/tractor/_actor.py @@ -537,6 +537,7 @@ class Actor: f"Setting loglevel for {self.uid} to {self.loglevel}") get_console_log(self.loglevel) + assert spawn_ctx log.info( f"Started new {spawn_ctx.current_process()} for {self.uid}") @@ -555,6 +556,11 @@ class Actor: accept_addr: Tuple[str, int], parent_addr: Tuple[str, int] = None ) -> None: + """Entry point for a `trio_run_in_process` subactor. + + Here we don't need to call `trio.run()` since trip does that as + part of its subprocess startup sequence. + """ if self.loglevel is not None: log.info( f"Setting loglevel for {self.uid} to {self.loglevel}") diff --git a/tractor/_spawn.py b/tractor/_spawn.py index f35677c..88cc450 100644 --- a/tractor/_spawn.py +++ b/tractor/_spawn.py @@ -6,7 +6,7 @@ Mostly just wrapping around ``multiprocessing``. import inspect import multiprocessing as mp import platform -from typing import Any, List, Dict +from typing import Any, Dict, Optional import trio from trio_typing import TaskStatus @@ -32,8 +32,13 @@ from ._actor import Actor, ActorFailure log = get_logger('tractor') -_ctx: mp.context.BaseContext = mp.get_context("spawn") # type: ignore -_spawn_method: str = "spawn" +# use trip as our default for now +if platform.system() != 'Windows': + _spawn_method: str = "trio_run_in_process" +else: + _spawn_method = "spawn" + +_ctx: Optional[mp.context.BaseContext] = None if platform.system() == 'Windows': @@ -46,7 +51,7 @@ else: await trio.hazmat.wait_readable(proc.sentinel) -def try_set_start_method(name: str) -> mp.context.BaseContext: +def try_set_start_method(name: str) -> Optional[mp.context.BaseContext]: """Attempt to set the start method for ``multiprocess.Process`` spawning. If the desired method is not supported the sub-interpreter (aka "spawn" @@ -55,15 +60,16 @@ def try_set_start_method(name: str) -> mp.context.BaseContext: global _ctx global _spawn_method - allowed = mp.get_all_start_methods() + methods = mp.get_all_start_methods() + methods.remove('fork') - # no Windows support for trip yet (afaik) + # no Windows support for trip yet if platform.system() != 'Windows': - allowed += ['trio_run_in_process'] + methods += ['trio_run_in_process'] - if name not in allowed: + if name not in methods: raise ValueError( - f"Spawn method {name} is unsupported please choose one of {allowed}" + f"Spawn method `{name}` is invalid please choose one of {methods}" ) elif name == 'fork': @@ -73,6 +79,10 @@ def try_set_start_method(name: str) -> mp.context.BaseContext: elif name == 'forkserver': _forkserver_override.override_stdlib() _ctx = mp.get_context(name) + elif name == 'trio_run_in_process': + _ctx = None + else: + _ctx = mp.get_context(name) _spawn_method = name return _ctx @@ -191,6 +201,7 @@ async def new_proc( # TRIP blocks here until process is complete else: # `multiprocessing` + assert _ctx start_method = _ctx.get_start_method() if start_method == 'forkserver': # XXX do our hackery on the stdlib to avoid multiple From ecced3d09af5178c523afe4b3fd7329549f0631d Mon Sep 17 00:00:00 2001 From: Tyler Goodlet Date: Sun, 26 Jan 2020 21:36:08 -0500 Subject: [PATCH 18/33] Allow choosing the spawn backend per test session Add a `--spawn-backend` option which can be set to one of {'mp', 'trio_run_in_process'} which will either run the test suite using the `multiprocessing` or `trio-run-in-process` backend respectively. Currently trying to run both in the same session can result in hangs seemingly due to a lack of cleanup of forkservers / resource trackers from `multiprocessing` which cause broken pipe errors on occasion (no idea on the details). For `test_cancellation.py::test_nested_multierrors`, use less nesting when mp is used since it breaks if we push it too hard with the whole recursive subprocess spawning thing... --- tests/conftest.py | 41 ++++++++++++++++++++++++-------- tests/test_cancellation.py | 22 ++++++++--------- tests/test_rpc.py | 2 +- tractor/testing/_tractor_test.py | 4 ++-- 4 files changed, 45 insertions(+), 24 deletions(-) diff --git a/tests/conftest.py b/tests/conftest.py index bdef563..924036b 100644 --- a/tests/conftest.py +++ b/tests/conftest.py @@ -14,8 +14,24 @@ _arb_addr = '127.0.0.1', random.randint(1000, 9999) def pytest_addoption(parser): - parser.addoption("--ll", action="store", dest='loglevel', - default=None, help="logging level to set when testing") + parser.addoption( + "--ll", action="store", dest='loglevel', + default=None, help="logging level to set when testing" + ) + + parser.addoption( + "--spawn-backend", action="store", dest='spawn_backend', + default='trio_run_in_process', + help="Processing spawning backend to use for test run", + ) + + +def pytest_configure(config): + backend = config.option.spawn_backend + if backend == 'mp': + tractor._spawn.try_set_start_method('spawn') + elif backend == 'trio_run_in_process': + tractor._spawn.try_set_start_method(backend) @pytest.fixture(scope='session', autouse=True) @@ -32,16 +48,21 @@ def arb_addr(): def pytest_generate_tests(metafunc): + spawn_backend = metafunc.config.getoption("spawn_backend") + assert spawn_backend in ('mp', 'trio_run_in_process') + if 'start_method' in metafunc.fixturenames: + if spawn_backend == 'mp': + from multiprocessing import get_all_start_methods + methods = get_all_start_methods() + if 'fork' in methods: # fork not available on windows, so check before removing + # XXX: the fork method is in general incompatible with + # trio's global scheduler state + methods.remove('fork') + elif spawn_backend == 'trio_run_in_process': + if platform.system() == "Windows": + pytest.fail("Only `--spawn-backend=mp` is supported on Windows") - from multiprocessing import get_all_start_methods - methods = get_all_start_methods() - - if platform.system() != "Windows": methods = ['trio_run_in_process'] - if 'fork' in methods: # fork not available on windows, so check before removing - # XXX: the fork method is in general incompatible with - # trio's global scheduler state - methods.remove('fork') metafunc.parametrize("start_method", methods, scope='module') diff --git a/tests/test_cancellation.py b/tests/test_cancellation.py index a8e0bec..bbf8598 100644 --- a/tests/test_cancellation.py +++ b/tests/test_cancellation.py @@ -284,21 +284,21 @@ async def spawn_and_error(breadth, depth) -> None: @tractor_test -async def test_nested_multierrors(loglevel): +async def test_nested_multierrors(loglevel, start_method): """Test that failed actor sets are wrapped in `trio.MultiError`s. This test goes only 2 nurseries deep but we should eventually have tests for arbitrary n-depth actor trees. """ - # if start_method == 'trio_run_in_process': - depth = 2 - subactor_breadth = 3 - # else: - # # XXX: multiprocessing can't seem to handle any more then 2 depth - # # process trees for whatever reason. - # # Any more process levels then this and we see bugs that cause - # # hangs and broken pipes all over the place... - # depth = 1 - # subactor_breadth = 2 + if start_method == 'trio_run_in_process': + depth = 3 + subactor_breadth = 2 + else: + # XXX: multiprocessing can't seem to handle any more then 2 depth + # process trees for whatever reason. + # Any more process levels then this and we see bugs that cause + # hangs and broken pipes all over the place... + depth = 2 + subactor_breadth = 2 with trio.fail_after(120): try: diff --git a/tests/test_rpc.py b/tests/test_rpc.py index 0258f35..b3fa1df 100644 --- a/tests/test_rpc.py +++ b/tests/test_rpc.py @@ -60,7 +60,7 @@ def test_rpc_errors(arb_addr, to_call, testdir): if exposed_mods == ['tmp_mod']: # create an importable module with a bad import testdir.syspathinsert() - # module should cause a raise of a ModuleNotFoundError at import + # module should raise a ModuleNotFoundError at import testdir.makefile('.py', tmp_mod=funcname) # no need to exposed module to the subactor diff --git a/tractor/testing/_tractor_test.py b/tractor/testing/_tractor_test.py index 2399318..82f38f2 100644 --- a/tractor/testing/_tractor_test.py +++ b/tractor/testing/_tractor_test.py @@ -19,6 +19,7 @@ def tractor_test(fn): - ``arb_addr`` (a socket addr tuple where arbiter is listening) - ``loglevel`` (logging level passed to tractor internals) + - ``start_method`` (subprocess spawning backend) are defined in the `pytest` fixture space they will be automatically injected to tests declaring these funcargs. @@ -41,8 +42,7 @@ def tractor_test(fn): # that activates the internal logging kwargs['loglevel'] = loglevel if 'start_method' in inspect.signature(fn).parameters: - # allows test suites to define a 'loglevel' fixture - # that activates the internal logging + # set of subprocess spawning backends kwargs['start_method'] = start_method return run( partial(fn, *args, **kwargs), From 87948bde3dbeadb067324bdc9775ebb541295157 Mon Sep 17 00:00:00 2001 From: Tyler Goodlet Date: Sun, 26 Jan 2020 21:50:03 -0500 Subject: [PATCH 19/33] Add per backend test runs for each Python version --- .travis.yml | 17 ++++++++++++++--- 1 file changed, 14 insertions(+), 3 deletions(-) diff --git a/.travis.yml b/.travis.yml index d8f58be..e21b629 100644 --- a/.travis.yml +++ b/.travis.yml @@ -22,8 +22,19 @@ matrix: - export PATH="/c/Python:/c/Python/Scripts:$PATH" - python -m pip install --upgrade pip wheel - - python: 3.7 # this works for Linux but is ignored on macOS or Windows - - python: 3.8 + - name: "Python 3.7: multiprocessing" + python: 3.7 # this works for Linux but is ignored on macOS or Windows + env: SPAWN_BACKEND=mp + - name: "Python 3.7: trio-run-in-process" + python: 3.7 # this works for Linux but is ignored on macOS or Windows + env: SPAWN_BACKEND=trio_run_in_process + + - name: "Pytron 3.8: multiprocessing" + python: 3.8 # this works for Linux but is ignored on macOS or Windows + env: SPAWN_BACKEND=mp + - name: "Python 3.8: trio-run-in-process" + python: 3.8 # this works for Linux but is ignored on macOS or Windows + env: SPAWN_BACKEND=trio_run_in_process install: - cd $TRAVIS_BUILD_DIR @@ -32,4 +43,4 @@ install: script: - mypy tractor/ --ignore-missing-imports - - pytest tests/ --no-print-logs + - pytest tests/ --no-print-logs --spawn-backend=$SPAWN_BACKEND From e18fec9b17c094546e26f5db4ad9d9300d7132a9 Mon Sep 17 00:00:00 2001 From: Tyler Goodlet Date: Sun, 26 Jan 2020 22:09:06 -0500 Subject: [PATCH 20/33] Always force mp backend on Windows --- tests/conftest.py | 4 ++++ 1 file changed, 4 insertions(+) diff --git a/tests/conftest.py b/tests/conftest.py index 924036b..b7feae4 100644 --- a/tests/conftest.py +++ b/tests/conftest.py @@ -28,6 +28,10 @@ def pytest_addoption(parser): def pytest_configure(config): backend = config.option.spawn_backend + + if plaform.system() == "Windows": + backend = 'mp' + if backend == 'mp': tractor._spawn.try_set_start_method('spawn') elif backend == 'trio_run_in_process': From 7c1bc1fce4def34b1fa48e4a83e2db829e8c7e24 Mon Sep 17 00:00:00 2001 From: Tyler Goodlet Date: Sun, 26 Jan 2020 22:09:32 -0500 Subject: [PATCH 21/33] Make windows job names explicit --- .travis.yml | 14 +++++++------- tests/conftest.py | 14 ++++++++------ 2 files changed, 15 insertions(+), 13 deletions(-) diff --git a/.travis.yml b/.travis.yml index e21b629..e71a16d 100644 --- a/.travis.yml +++ b/.travis.yml @@ -4,7 +4,7 @@ sudo: required matrix: include: - - name: "Windows, Python Latest" + - name: "Windows, Python Latest: multiprocessing" os: windows language: sh python: 3.x # only works on linux @@ -13,7 +13,7 @@ matrix: - export PATH="/c/Python:/c/Python/Scripts:$PATH" - python -m pip install --upgrade pip wheel - - name: "Windows, Python 3.7" + - name: "Windows, Python 3.7: multiprocessing" os: windows python: 3.7 # only works on linux language: sh @@ -24,17 +24,17 @@ matrix: - name: "Python 3.7: multiprocessing" python: 3.7 # this works for Linux but is ignored on macOS or Windows - env: SPAWN_BACKEND=mp + env: SPAWN_BACKEND="mp" - name: "Python 3.7: trio-run-in-process" python: 3.7 # this works for Linux but is ignored on macOS or Windows - env: SPAWN_BACKEND=trio_run_in_process + env: SPAWN_BACKEND="trio_run_in_process" - name: "Pytron 3.8: multiprocessing" python: 3.8 # this works for Linux but is ignored on macOS or Windows - env: SPAWN_BACKEND=mp + env: SPAWN_BACKEND="mp" - name: "Python 3.8: trio-run-in-process" python: 3.8 # this works for Linux but is ignored on macOS or Windows - env: SPAWN_BACKEND=trio_run_in_process + env: SPAWN_BACKEND="trio_run_in_process" install: - cd $TRAVIS_BUILD_DIR @@ -43,4 +43,4 @@ install: script: - mypy tractor/ --ignore-missing-imports - - pytest tests/ --no-print-logs --spawn-backend=$SPAWN_BACKEND + - pytest tests/ --no-print-logs --spawn-backend=${SPAWN_BACKEND} diff --git a/tests/conftest.py b/tests/conftest.py index b7feae4..9675299 100644 --- a/tests/conftest.py +++ b/tests/conftest.py @@ -16,7 +16,7 @@ _arb_addr = '127.0.0.1', random.randint(1000, 9999) def pytest_addoption(parser): parser.addoption( "--ll", action="store", dest='loglevel', - default=None, help="logging level to set when testing" + default=None, help="logging level to set when testing" ) parser.addoption( @@ -29,7 +29,7 @@ def pytest_addoption(parser): def pytest_configure(config): backend = config.option.spawn_backend - if plaform.system() == "Windows": + if platform.system() == "Windows": backend = 'mp' if backend == 'mp': @@ -59,13 +59,15 @@ def pytest_generate_tests(metafunc): if spawn_backend == 'mp': from multiprocessing import get_all_start_methods methods = get_all_start_methods() - if 'fork' in methods: # fork not available on windows, so check before removing - # XXX: the fork method is in general incompatible with - # trio's global scheduler state + if 'fork' in methods: + # fork not available on windows, so check before + # removing XXX: the fork method is in general + # incompatible with trio's global scheduler state methods.remove('fork') elif spawn_backend == 'trio_run_in_process': if platform.system() == "Windows": - pytest.fail("Only `--spawn-backend=mp` is supported on Windows") + pytest.fail( + "Only `--spawn-backend=mp` is supported on Windows") methods = ['trio_run_in_process'] From e57811a602573b0bbd664c797a10cc20a8a124c4 Mon Sep 17 00:00:00 2001 From: Tyler Goodlet Date: Sun, 26 Jan 2020 22:35:42 -0500 Subject: [PATCH 22/33] Fork isn't present on windows... --- tractor/_spawn.py | 3 ++- 1 file changed, 2 insertions(+), 1 deletion(-) diff --git a/tractor/_spawn.py b/tractor/_spawn.py index 88cc450..25d6368 100644 --- a/tractor/_spawn.py +++ b/tractor/_spawn.py @@ -61,7 +61,8 @@ def try_set_start_method(name: str) -> Optional[mp.context.BaseContext]: global _spawn_method methods = mp.get_all_start_methods() - methods.remove('fork') + if 'fork' in methods: + methods.remove('fork') # no Windows support for trip yet if platform.system() != 'Windows': From b4cb7439a157039c77ff2de357dac3dee5790481 Mon Sep 17 00:00:00 2001 From: Tyler Goodlet Date: Sun, 26 Jan 2020 22:46:48 -0500 Subject: [PATCH 23/33] Drop useless fork error branch --- tests/conftest.py | 2 +- tractor/_spawn.py | 5 ----- 2 files changed, 1 insertion(+), 6 deletions(-) diff --git a/tests/conftest.py b/tests/conftest.py index 9675299..2ca97af 100644 --- a/tests/conftest.py +++ b/tests/conftest.py @@ -52,7 +52,7 @@ def arb_addr(): def pytest_generate_tests(metafunc): - spawn_backend = metafunc.config.getoption("spawn_backend") + spawn_backend = metafunc.config.option.spawn_backend assert spawn_backend in ('mp', 'trio_run_in_process') if 'start_method' in metafunc.fixturenames: diff --git a/tractor/_spawn.py b/tractor/_spawn.py index 25d6368..f94fc62 100644 --- a/tractor/_spawn.py +++ b/tractor/_spawn.py @@ -72,11 +72,6 @@ def try_set_start_method(name: str) -> Optional[mp.context.BaseContext]: raise ValueError( f"Spawn method `{name}` is invalid please choose one of {methods}" ) - - elif name == 'fork': - raise ValueError( - "`fork` is unsupported due to incompatibility with `trio`" - ) elif name == 'forkserver': _forkserver_override.override_stdlib() _ctx = mp.get_context(name) From 5fd38d4618a04c8ba21de2e30f9ff616ef0e3541 Mon Sep 17 00:00:00 2001 From: Tyler Goodlet Date: Sun, 26 Jan 2020 23:16:43 -0500 Subject: [PATCH 24/33] Force `mp` backend if option is blank? --- tests/conftest.py | 3 +++ 1 file changed, 3 insertions(+) diff --git a/tests/conftest.py b/tests/conftest.py index 2ca97af..128ff3e 100644 --- a/tests/conftest.py +++ b/tests/conftest.py @@ -53,6 +53,9 @@ def arb_addr(): def pytest_generate_tests(metafunc): spawn_backend = metafunc.config.option.spawn_backend + if not spawn_backend: + # XXX some weird windows bug with `pytest`? + spawn_backend = 'mp' assert spawn_backend in ('mp', 'trio_run_in_process') if 'start_method' in metafunc.fixturenames: From a6b249cd527134b882e91d5d5278ed5dfc1720cf Mon Sep 17 00:00:00 2001 From: Tyler Goodlet Date: Sun, 26 Jan 2020 23:17:06 -0500 Subject: [PATCH 25/33] Forkserver just can't seem to cut it... --- tests/test_cancellation.py | 2 ++ 1 file changed, 2 insertions(+) diff --git a/tests/test_cancellation.py b/tests/test_cancellation.py index bbf8598..ce74fdc 100644 --- a/tests/test_cancellation.py +++ b/tests/test_cancellation.py @@ -297,6 +297,8 @@ async def test_nested_multierrors(loglevel, start_method): # process trees for whatever reason. # Any more process levels then this and we see bugs that cause # hangs and broken pipes all over the place... + if start_method == 'forkserver': + pytest.skip("Forksever sux hard at nested spawning...") depth = 2 subactor_breadth = 2 From 43cca122f5eab44bb551849860f2b9072ad51955 Mon Sep 17 00:00:00 2001 From: Tyler Goodlet Date: Sun, 26 Jan 2020 23:44:47 -0500 Subject: [PATCH 26/33] Handle windows in `@tractor_test` as well --- tractor/testing/_tractor_test.py | 10 +++++++++- 1 file changed, 9 insertions(+), 1 deletion(-) diff --git a/tractor/testing/_tractor_test.py b/tractor/testing/_tractor_test.py index 82f38f2..5ca82a6 100644 --- a/tractor/testing/_tractor_test.py +++ b/tractor/testing/_tractor_test.py @@ -1,4 +1,5 @@ import inspect +import platform from functools import partial, wraps from .. import run @@ -29,7 +30,7 @@ def tractor_test(fn): *args, loglevel=None, arb_addr=None, - start_method='trio_run_in_process', + start_method=None, **kwargs ): # __tracebackhide__ = True @@ -41,6 +42,13 @@ def tractor_test(fn): # allows test suites to define a 'loglevel' fixture # that activates the internal logging kwargs['loglevel'] = loglevel + + if start_method is None: + if platform.system() == "Windows": + start_method = 'spawn' + else: + start_method = 'trio_run_in_process' + if 'start_method' in inspect.signature(fn).parameters: # set of subprocess spawning backends kwargs['start_method'] = start_method From 7feef44798814603f4594500054d07972167ab12 Mon Sep 17 00:00:00 2001 From: Tyler Goodlet Date: Mon, 27 Jan 2020 15:42:40 -0500 Subject: [PATCH 27/33] Document available process spawning backends --- README.rst | 61 ++++++++++++++++++++++++++++++++++++++++++------------ 1 file changed, 48 insertions(+), 13 deletions(-) diff --git a/README.rst b/README.rst index dc90e4a..f97b623 100644 --- a/README.rst +++ b/README.rst @@ -355,14 +355,15 @@ Depending on the function type ``Portal.run()`` tries to correctly interface exactly like a local version of the remote built-in Python *function type*. Currently async functions, generators, and regular functions are supported. Inspiration for this API comes -from the way execnet_ does `remote function execution`_ but without -the client code (necessarily) having to worry about the underlying -channels_ system or shipping code over the network. +`remote function execution`_ but without the client code being +concerned about the underlying channels_ system or shipping code +over the network. This *portal* approach turns out to be paricularly exciting with the introduction of `asynchronous generators`_ in Python 3.6! It means that actors can compose nicely in a data streaming pipeline. +.. _exactly like trio: https://trio.readthedocs.io/en/latest/reference-core.html#cancellation-semantics Streaming ********* @@ -701,20 +702,54 @@ need to hop into a debugger. You just need to pass the existing tractor.run(main, arbiter_addr=('192.168.0.10', 1616)) -Choosing a ``multiprocessing`` *start method* -********************************************* -``tractor`` supports selection of the `multiprocessing start method`_ via -a ``start_method`` kwarg to ``tractor.run()``. Note that on Windows -*spawn* it the only supported method and on nix systems *forkserver* is -selected by default for speed. +Choosing a process spawning backend +*********************************** +``tractor`` is architected to support multiple actor (sub-process) +spawning backends. Specific defaults are chosen based on your system +but you can also explicitly select a backend of choice at startup +via a ``start_method`` kwarg to ``tractor.run()``. -.. _multiprocessing start method: https://docs.python.org/3/library/multiprocessing.html#contexts-and-start-methods +Currently the options available are: +- ``trio_run_in_process``: a ``trio``-native spawner from the `Ethereum community`_ +- ``spawn``: one of the stdlib's ``multiprocessing`` `start methods`_ +- ``forkserver``: a faster ``multiprocessing`` variant that is Unix only + +.. _start methods: https://docs.python.org/3.8/library/multiprocessing.html#contexts-and-start-methods +.. _Ethereum community : https://github.com/ethereum/trio-run-in-process + + +``trio-run-in-process`` ++++++++++++++++++++++++ +`trio-run-in-process`_ is a young "pure ``trio``" process spawner +which utilizes the native `trio subprocess APIs`_. It has shown great +reliability under testing for predictable teardown when launching +recursive pools of actors (multiple nurseries deep) and as such has been +chosen as the default backend on \*nix systems. + +.. _trio-run-in-process: https://github.com/ethereum/trio-run-in-process +.. _trio subprocess APIs : https://trio.readthedocs.io/en/stable/reference-io.html#spawning-subprocesses + + +``multiprocessing`` ++++++++++++++++++++ +There is support for the stdlib's ``multiprocessing`` `start methods`_. +Note that on Windows *spawn* it the only supported method and on \*nix +systems *forkserver* is the best method for speed but has the caveat +that it will break easily (hangs due to broken pipes) if spawning actors +using nested nurseries. + +In general, the ``multiprocessing`` backend **has not proven reliable** +for handling errors from actors more then 2 nurseries *deep* (see `#89`_). +If you for some reason need this consider sticking with alternative +backends. + +.. _#89: https://github.com/goodboy/tractor/issues/89 Windows "gotchas" -***************** -`tractor` internally uses the stdlib's `multiprocessing` package which -*can* have some gotchas on Windows. Namely, the need for calling +^^^^^^^^^^^^^^^^^ +On Windows (which requires the use of the stdlib's `multiprocessing` +package) there are some gotchas. Namely, the need for calling `freeze_support()`_ inside the ``__main__`` context. Additionally you may need place you `tractor` program entry point in a seperate `__main__.py` module in your package in order to avoid an error like the From 2a4307975d7c577bcaff4d3c628197406f0bb163 Mon Sep 17 00:00:00 2001 From: Tyler Goodlet Date: Wed, 29 Jan 2020 00:51:25 -0500 Subject: [PATCH 28/33] Fix that thing where the first example in your docs is supposed to work Thanks to @salotz for pointing out that the first example in the docs was broken. Though it's somewhat embarrassing this might also explain the problem in #79 and certain issues in #59... The solution here is to import the target RPC module using the its unique basename and absolute filepath in the sub-actor that requires it. Special handling for `__main__` and `__mp_main__` is needed since the spawned subprocess will have no knowledge about these parent- -state-specific module variables. Solution: map the modules name to the respective module file basename in the child process since the module variables will of course have different values in children. --- tractor/_actor.py | 45 +++++++++++++++++++++++++------------------- tractor/_spawn.py | 1 - tractor/_trionics.py | 1 - 3 files changed, 26 insertions(+), 21 deletions(-) diff --git a/tractor/_actor.py b/tractor/_actor.py index eb0570d..42656d2 100644 --- a/tractor/_actor.py +++ b/tractor/_actor.py @@ -151,6 +151,10 @@ async def _invoke( actor._ongoing_rpc_tasks.set() +def _get_mod_abspath(module): + return os.path.abspath(module.__file__) + + class Actor: """The fundamental concurrency primitive. @@ -178,9 +182,13 @@ class Actor: self.uid = (name, uid or str(uuid.uuid4())) mods = {} - for path in rpc_module_paths or (): - mod = importlib.import_module(path) - mods[path] = mod.__file__ + for name in rpc_module_paths or (): + mod = importlib.import_module(name) + suffix_index = mod.__file__.find('.py') + unique_modname = os.path.basename(mod.__file__[:suffix_index]) + mods[unique_modname] = _get_mod_abspath(mod) + if mod.__name__ == '__main__' or mod.__name__ == '__mp_main__': + self._main_mod = unique_modname self.rpc_module_paths = mods self._mods: dict = {} @@ -235,21 +243,20 @@ class Actor: code (if it exists). """ try: - for path, absfilepath in self.rpc_module_paths.items(): - log.debug(f"Attempting to import {path}") - # spec = importlib.util.spec_from_file_location( - # path, absfilepath) - # mod = importlib.util.module_from_spec(spec) + for modname, absfilepath in self.rpc_module_paths.items(): + sys.path.append(os.path.dirname(absfilepath)) + log.debug(f"Attempting to import {modname}@{absfilepath}") + spec = importlib.util.spec_from_file_location( + modname, absfilepath) + mod = importlib.util.module_from_spec(spec) + spec.loader.exec_module(mod) # type: ignore + self._mods[modname] = mod # XXX append the allowed module to the python path # which should allow for relative (at least downward) # imports. Seems to be the only that will work currently # to get `trio-run-in-process` to import modules we "send # it". - sys.path.append(os.path.dirname(absfilepath)) - # spec.loader.exec_module(mod) - mod = importlib.import_module(path) - self._mods[path] = mod # if self.name != 'arbiter': # importlib.import_module('doggy') @@ -257,10 +264,14 @@ class Actor: except ModuleNotFoundError: # it is expected the corresponding `ModuleNotExposed` error # will be raised later - log.error(f"Failed to import {path} in {self.name}") + log.error(f"Failed to import {modname} in {self.name}") raise def _get_rpc_func(self, ns, funcname): + if ns == '__main__' or ns == '__mp_main__': + # lookup the specific module in the child denoted + # as `__main__`/`__mp_main__` in the parent + ns = self._main_mod try: return getattr(self._mods[ns], funcname) except KeyError as err: @@ -640,8 +651,6 @@ class Actor: # blocks here as expected until the channel server is # killed (i.e. this actor is cancelled or signalled by the parent) except Exception as err: - # if self.name == 'arbiter': - # import pdb; pdb.set_trace() if not registered_with_arbiter: log.exception( f"Actor errored and failed to register with arbiter " @@ -666,8 +675,6 @@ class Actor: raise finally: - # if self.name == 'arbiter': - # import pdb; pdb.set_trace() if registered_with_arbiter: await self._do_unreg(arbiter_addr) # terminate actor once all it's peers (actors that connected @@ -713,8 +720,8 @@ class Actor: port=accept_port, host=accept_host, ) ) - log.debug( - f"Started tcp server(s) on {[l.socket for l in listeners]}") # type: ignore + log.debug(f"Started tcp server(s) on" + " {[l.socket for l in listeners]}") # type: ignore self._listeners.extend(listeners) task_status.started() diff --git a/tractor/_spawn.py b/tractor/_spawn.py index f94fc62..3582f8b 100644 --- a/tractor/_spawn.py +++ b/tractor/_spawn.py @@ -158,7 +158,6 @@ async def new_proc( # passed through to actor main bind_addr: Tuple[str, int], parent_addr: Tuple[str, int], - begin_wait_phase: trio.Event, use_trio_run_in_process: bool = False, task_status: TaskStatus[Portal] = trio.TASK_STATUS_IGNORED ) -> None: diff --git a/tractor/_trionics.py b/tractor/_trionics.py index a20f8bb..4bb7467 100644 --- a/tractor/_trionics.py +++ b/tractor/_trionics.py @@ -78,7 +78,6 @@ class ActorNursery: self.errors, bind_addr, parent_addr, - nursery, ) async def run_in_actor( From 6348121d2328d176cea8f616534352a5af878c71 Mon Sep 17 00:00:00 2001 From: Tyler Goodlet Date: Wed, 29 Jan 2020 21:06:40 -0500 Subject: [PATCH 29/33] Do __main__ fixups like ``mulitprocessing does`` Instead of hackery trying to map modules manually from the filesystem let Python do all the work by simply copying what ``multiprocessing`` does to "fixup the __main__ module" in spawned subprocesses. The new private module ``_mp_fixup_main.py`` is simply cherry picked code from ``multiprocessing.spawn`` which does just that. We only need these "fixups" when using a backend other then ``multiprocessing``; for now just when using ``trio_run_in_process``. --- tractor/_actor.py | 71 +++++++++++++++++---------- tractor/_mp_fixup_main.py | 100 ++++++++++++++++++++++++++++++++++++++ tractor/_spawn.py | 3 ++ 3 files changed, 147 insertions(+), 27 deletions(-) create mode 100644 tractor/_mp_fixup_main.py diff --git a/tractor/_actor.py b/tractor/_actor.py index 42656d2..0531a05 100644 --- a/tractor/_actor.py +++ b/tractor/_actor.py @@ -28,6 +28,7 @@ from ._exceptions import ( from ._discovery import get_arbiter from ._portal import Portal from . import _state +from . import _mp_fixup_main log = get_logger('tractor') @@ -169,6 +170,14 @@ class Actor: _root_nursery: trio.Nursery _server_nursery: trio.Nursery + # marked by the process spawning backend at startup + # will be None for the parent most process started manually + # by the user (currently called the "arbiter") + _spawn_method: Optional[str] = None + + # Information about `__main__` from parent + _parent_main_data: Dict[str, str] + def __init__( self, name: str, @@ -178,17 +187,20 @@ class Actor: loglevel: str = None, arbiter_addr: Optional[Tuple[str, int]] = None, ) -> None: + """This constructor is called in the parent actor **before** the spawning + phase (aka before a new process is executed). + """ self.name = name self.uid = (name, uid or str(uuid.uuid4())) + # retreive and store parent `__main__` data which + # will be passed to children + self._parent_main_data = _mp_fixup_main._mp_figure_out_main() + mods = {} for name in rpc_module_paths or (): mod = importlib.import_module(name) - suffix_index = mod.__file__.find('.py') - unique_modname = os.path.basename(mod.__file__[:suffix_index]) - mods[unique_modname] = _get_mod_abspath(mod) - if mod.__name__ == '__main__' or mod.__name__ == '__mp_main__': - self._main_mod = unique_modname + mods[name] = _get_mod_abspath(mod) self.rpc_module_paths = mods self._mods: dict = {} @@ -243,35 +255,40 @@ class Actor: code (if it exists). """ try: - for modname, absfilepath in self.rpc_module_paths.items(): - sys.path.append(os.path.dirname(absfilepath)) - log.debug(f"Attempting to import {modname}@{absfilepath}") - spec = importlib.util.spec_from_file_location( - modname, absfilepath) - mod = importlib.util.module_from_spec(spec) - spec.loader.exec_module(mod) # type: ignore - self._mods[modname] = mod + if self._spawn_method == 'trio_run_in_process': + parent_data = self._parent_main_data + if 'init_main_from_name' in parent_data: + _mp_fixup_main._fixup_main_from_name( + parent_data['init_main_from_name']) + elif 'init_main_from_path' in parent_data: + _mp_fixup_main._fixup_main_from_path( + parent_data['init_main_from_path']) - # XXX append the allowed module to the python path - # which should allow for relative (at least downward) - # imports. Seems to be the only that will work currently - # to get `trio-run-in-process` to import modules we "send - # it". - - # if self.name != 'arbiter': - # importlib.import_module('doggy') - # from celery.contrib import rdb; rdb.set_trace() + for modpath, filepath in self.rpc_module_paths.items(): + # XXX append the allowed module to the python path which + # should allow for relative (at least downward) imports. + sys.path.append(os.path.dirname(filepath)) + # XXX leaving this in for now incase we decide to swap + # it with the above path mutating solution: + # spec = importlib.util.spec_from_file_location( + # modname, absfilepath) + # mod = importlib.util.module_from_spec(spec) + # spec.loader.exec_module(mod) # type: ignore + log.debug(f"Attempting to import {modpath}@{filepath}") + mod = importlib.import_module(modpath) + self._mods[modpath] = mod except ModuleNotFoundError: # it is expected the corresponding `ModuleNotExposed` error # will be raised later - log.error(f"Failed to import {modname} in {self.name}") + log.error(f"Failed to import {modpath} in {self.name}") raise def _get_rpc_func(self, ns, funcname): - if ns == '__main__' or ns == '__mp_main__': - # lookup the specific module in the child denoted - # as `__main__`/`__mp_main__` in the parent - ns = self._main_mod + if ns == "__mp_main__": + # In subprocesses, `__main__` will actually map to + # `__mp_main__` which should be the same entry-point-module + # as the parent. + ns = "__main__" try: return getattr(self._mods[ns], funcname) except KeyError as err: diff --git a/tractor/_mp_fixup_main.py b/tractor/_mp_fixup_main.py new file mode 100644 index 0000000..7869561 --- /dev/null +++ b/tractor/_mp_fixup_main.py @@ -0,0 +1,100 @@ +""" +Helpers pulled mostly verbatim from ``multiprocessing.spawn`` +to aid with "fixing up" the ``__main__`` module in subprocesses. + +These helpers are needed for any spawing backend that doesn't already handle this. +For example when using ``trio_run_in_process`` it is needed but obviously not when +we're already using ``multiprocessing``. +""" +import os +import sys +import platform +import types +import runpy +from typing import Dict + + +ORIGINAL_DIR = os.path.abspath(os.getcwd()) + + +def _mp_figure_out_main() -> Dict[str, str]: + """Taken from ``multiprocessing.spawn.get_preparation_data()``. + + Retrieve parent actor `__main__` module data. + """ + d = {} + # Figure out whether to initialise main in the subprocess as a module + # or through direct execution (or to leave it alone entirely) + main_module = sys.modules['__main__'] + main_mod_name = getattr(main_module.__spec__, "name", None) + if main_mod_name is not None: + d['init_main_from_name'] = main_mod_name + # elif sys.platform != 'win32' or (not WINEXE and not WINSERVICE): + elif platform.system() != 'Windows': + main_path = getattr(main_module, '__file__', None) + if main_path is not None: + if ( + not os.path.isabs(main_path) and ( + ORIGINAL_DIR is not None) + ): + # process.ORIGINAL_DIR is not None): + # main_path = os.path.join(process.ORIGINAL_DIR, main_path) + main_path = os.path.join(ORIGINAL_DIR, main_path) + d['init_main_from_path'] = os.path.normpath(main_path) + + return d + + +# Multiprocessing module helpers to fix up the main module in +# spawned subprocesses +def _fixup_main_from_name(mod_name: str) -> None: + # __main__.py files for packages, directories, zip archives, etc, run + # their "main only" code unconditionally, so we don't even try to + # populate anything in __main__, nor do we make any changes to + # __main__ attributes + current_main = sys.modules['__main__'] + if mod_name == "__main__" or mod_name.endswith(".__main__"): + return + + # If this process was forked, __main__ may already be populated + if getattr(current_main.__spec__, "name", None) == mod_name: + return + + # Otherwise, __main__ may contain some non-main code where we need to + # support unpickling it properly. We rerun it as __mp_main__ and make + # the normal __main__ an alias to that + # old_main_modules.append(current_main) + main_module = types.ModuleType("__mp_main__") + main_content = runpy.run_module(mod_name, + run_name="__mp_main__", + alter_sys=True) + main_module.__dict__.update(main_content) + sys.modules['__main__'] = sys.modules['__mp_main__'] = main_module + + +def _fixup_main_from_path(main_path: str) -> None: + # If this process was forked, __main__ may already be populated + current_main = sys.modules['__main__'] + + # Unfortunately, the main ipython launch script historically had no + # "if __name__ == '__main__'" guard, so we work around that + # by treating it like a __main__.py file + # See https://github.com/ipython/ipython/issues/4698 + main_name = os.path.splitext(os.path.basename(main_path))[0] + if main_name == 'ipython': + return + + # Otherwise, if __file__ already has the setting we expect, + # there's nothing more to do + if getattr(current_main, '__file__', None) == main_path: + return + + # If the parent process has sent a path through rather than a module + # name we assume it is an executable script that may contain + # non-main code that needs to be executed + # old_main_modules.append(current_main) + main_module = types.ModuleType("__mp_main__") + main_content = runpy.run_path(main_path, + run_name="__mp_main__") + main_module.__dict__.update(main_content) + sys.modules['__main__'] = sys.modules['__mp_main__'] = main_module diff --git a/tractor/_spawn.py b/tractor/_spawn.py index 3582f8b..311ad7c 100644 --- a/tractor/_spawn.py +++ b/tractor/_spawn.py @@ -166,6 +166,9 @@ async def new_proc( """ cancel_scope = None + # mark the new actor with the global spawn method + subactor._spawn_method = _spawn_method + async with trio.open_nursery() as nursery: if use_trio_run_in_process or _spawn_method == 'trio_run_in_process': # trio_run_in_process From d64508e1a678b5d2a2e3964eba5fe1e93647df96 Mon Sep 17 00:00:00 2001 From: Tyler Goodlet Date: Fri, 31 Jan 2020 09:50:25 -0500 Subject: [PATCH 30/33] Add more detailed docs around nursery logic The logic in the `ActorNursery` block is critical to cancellation semantics and in particular, understanding how supervisor strategies are invoked. Stick in a bunch of explanatory comments to clear up these details and also prepare to introduce more supervisor strats besides the current one-cancels-all approach. --- tractor/_trionics.py | 68 +++++++++++++++++++++++++++++++++++++------- 1 file changed, 58 insertions(+), 10 deletions(-) diff --git a/tractor/_trionics.py b/tractor/_trionics.py index 4bb7467..17ae548 100644 --- a/tractor/_trionics.py +++ b/tractor/_trionics.py @@ -177,33 +177,58 @@ class ActorNursery: @asynccontextmanager async def open_nursery() -> typing.AsyncGenerator[ActorNursery, None]: - """Create and yield a new ``ActorNursery``. - """ - # TODO: figure out supervisors from erlang + """Create and yield a new ``ActorNursery`` to be used for spawning + structured concurrent subactors. + When an actor is spawned a new trio task is started which + invokes one of the process spawning backends to create and start + a new subprocess. These tasks are started by one of two nurseries + detailed below. The reason for spawning processes from within + a new task is because ``trio_run_in_process`` itself creates a new + internal nursery and the same task that opens a nursery **must** + close it. It turns out this approach is probably more correct + anyway since it is more clear from the following nested nurseries + which cancellation scopes correspond to each spawned subactor set. + """ actor = current_actor() if not actor: raise RuntimeError("No actor instance has been defined yet?") - # XXX we use these nurseries because TRIP is doing all its stuff with - # an `@asynccontextmanager` which has an internal nursery *and* the - # task that opens a nursery **must also close it**. + # the collection of errors retreived from spawned sub-actors errors: Dict[Tuple[str, str], Exception] = {} + + # This is the outermost level "deamon actor" nursery. It is awaited + # **after** the below inner "run in actor nursery". This allows for + # handling errors that are generated by the inner nursery in + # a supervisor strategy **before** blocking indefinitely to wait for + # actors spawned in "daemon mode" (aka started using + # ``ActorNursery.start_actor()``). async with trio.open_nursery() as da_nursery: try: + # This is the inner level "run in actor" nursery. It is + # awaited first since actors spawned in this way (using + # ``ActorNusery.run_in_actor()``) are expected to only + # return a single result and then complete (i.e. be canclled + # gracefully). Errors collected from these actors are + # immediately raised for handling by a supervisor strategy. + # As such if the strategy propagates any error(s) upwards + # the above "daemon actor" nursery will be notified. async with trio.open_nursery() as ria_nursery: anursery = ActorNursery( actor, ria_nursery, da_nursery, errors ) try: - # spawning of actors happens in this scope after - # we yield to the caller. + # spawning of actors happens in the caller's scope + # after we yield upwards yield anursery log.debug( f"Waiting on subactors {anursery._children}" "to complete" ) except (BaseException, Exception) as err: + # if the caller's scope errored then we activate our + # one-cancels-all supervisor strategy (don't + # worry more are coming). anursery._join_procs.set() try: # XXX: hypothetically an error could be raised and then @@ -219,24 +244,44 @@ async def open_nursery() -> typing.AsyncGenerator[ActorNursery, None]: log.exception( f"Nursery for {current_actor().uid} " f"errored with {err}, ") + + # cancel all subactors await anursery.cancel() + except trio.MultiError as merr: + # If we receive additional errors while waiting on + # remaining subactors that were cancelled, + # aggregate those errors with the original error + # that triggered this teardown. if err not in merr.exceptions: raise trio.MultiError(merr.exceptions + [err]) else: raise - # last bit before first nursery block ends + # Last bit before first nursery block ends in the case + # where we didn't error in the caller's scope log.debug(f"Waiting on all subactors to complete") anursery._join_procs.set() - # ria_nursery scope + + # ria_nursery scope end + except (Exception, trio.MultiError) as err: + # If actor-local error was raised while waiting on + # ".run_in_actor()" actors then we also want to cancel all + # remaining sub-actors (due to our lone strategy: + # one-cancels-all). log.warning(f"Nursery cancelling due to {err}") if anursery._children: with trio.CancelScope(shield=True): await anursery.cancel() raise finally: + # No errors were raised while awaiting ".run_in_actor()" + # actors but those actors may have returned remote errors as + # results (meaning they errored remotely and have relayed + # those errors back to this parent actor). The errors are + # collected in ``errors`` so cancel all actors, summarize + # all errors and re-raise. if errors: if anursery._children: with trio.CancelScope(shield=True): @@ -245,4 +290,7 @@ async def open_nursery() -> typing.AsyncGenerator[ActorNursery, None]: raise trio.MultiError(tuple(errors.values())) else: raise list(errors.values())[0] + + # ria_nursery scope end + log.debug(f"Nursery teardown complete") From ee4b014092aa0a1b6c8fdbb525210f2b6dd60844 Mon Sep 17 00:00:00 2001 From: Tyler Goodlet Date: Fri, 31 Jan 2020 12:04:13 -0500 Subject: [PATCH 31/33] Fix typo --- .travis.yml | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/.travis.yml b/.travis.yml index e71a16d..6e57aed 100644 --- a/.travis.yml +++ b/.travis.yml @@ -29,7 +29,7 @@ matrix: python: 3.7 # this works for Linux but is ignored on macOS or Windows env: SPAWN_BACKEND="trio_run_in_process" - - name: "Pytron 3.8: multiprocessing" + - name: "Python 3.8: multiprocessing" python: 3.8 # this works for Linux but is ignored on macOS or Windows env: SPAWN_BACKEND="mp" - name: "Python 3.8: trio-run-in-process" From 8264b7d1361359e888f981b628543fb2b8f2d70c Mon Sep 17 00:00:00 2001 From: Tyler Goodlet Date: Fri, 31 Jan 2020 12:04:46 -0500 Subject: [PATCH 32/33] Drop old module loading from abspath cruft --- tractor/_actor.py | 12 +++--------- 1 file changed, 3 insertions(+), 9 deletions(-) diff --git a/tractor/_actor.py b/tractor/_actor.py index 0531a05..7118a3e 100644 --- a/tractor/_actor.py +++ b/tractor/_actor.py @@ -10,6 +10,7 @@ import inspect import uuid import typing from typing import Dict, List, Tuple, Any, Optional +from types import ModuleType import sys import os @@ -203,7 +204,7 @@ class Actor: mods[name] = _get_mod_abspath(mod) self.rpc_module_paths = mods - self._mods: dict = {} + self._mods: Dict[str, ModuleType] = {} # TODO: consider making this a dynamically defined # @dataclass once we get py3.7 @@ -268,15 +269,8 @@ class Actor: # XXX append the allowed module to the python path which # should allow for relative (at least downward) imports. sys.path.append(os.path.dirname(filepath)) - # XXX leaving this in for now incase we decide to swap - # it with the above path mutating solution: - # spec = importlib.util.spec_from_file_location( - # modname, absfilepath) - # mod = importlib.util.module_from_spec(spec) - # spec.loader.exec_module(mod) # type: ignore log.debug(f"Attempting to import {modpath}@{filepath}") - mod = importlib.import_module(modpath) - self._mods[modpath] = mod + self._mods[modpath] = importlib.import_module(modpath) except ModuleNotFoundError: # it is expected the corresponding `ModuleNotExposed` error # will be raised later From e671cb4f3bf9eac09d9f1dd183b8e3f9cf25941c Mon Sep 17 00:00:00 2001 From: Tyler Goodlet Date: Fri, 31 Jan 2020 12:05:15 -0500 Subject: [PATCH 33/33] Fixup _spawn.py comments to incorporate trip --- tractor/_spawn.py | 28 +++++++++++++++++----------- 1 file changed, 17 insertions(+), 11 deletions(-) diff --git a/tractor/_spawn.py b/tractor/_spawn.py index 311ad7c..9421d21 100644 --- a/tractor/_spawn.py +++ b/tractor/_spawn.py @@ -1,7 +1,5 @@ """ -Process spawning. - -Mostly just wrapping around ``multiprocessing``. +Machinery for actor process spawning using multiple backends. """ import inspect import multiprocessing as mp @@ -32,7 +30,7 @@ from ._actor import Actor, ActorFailure log = get_logger('tractor') -# use trip as our default for now +# use trip as our default on *nix systems for now if platform.system() != 'Windows': _spawn_method: str = "trio_run_in_process" else: @@ -52,16 +50,19 @@ else: def try_set_start_method(name: str) -> Optional[mp.context.BaseContext]: - """Attempt to set the start method for ``multiprocess.Process`` spawning. + """Attempt to set the start method for process starting, aka the "actor + spawning backend". - If the desired method is not supported the sub-interpreter (aka "spawn" - method) is used. + If the desired method is not supported this function will error. On Windows + the only supported option is the ``multiprocessing`` "spawn" method. The default + on *nix systems is ``trio_run_in_process``. """ global _ctx global _spawn_method methods = mp.get_all_start_methods() if 'fork' in methods: + # forking is incompatible with ``trio``s global task tree methods.remove('fork') # no Windows support for trip yet @@ -266,10 +267,15 @@ async def new_proc( # unblock parent task task_status.started(portal) - # wait for ActorNursery.wait() to be called - # this is required to ensure synchronization - # with startup and registration of this actor in - # ActorNursery.run_in_actor() + # wait for ``ActorNursery`` block to signal that + # subprocesses can be waited upon. + # This is required to ensure synchronization + # with user code that may want to manually await results + # from nursery spawned sub-actors. We don't want the + # containing nurseries here to collect results or error + # while user code is still doing it's thing. Only after the + # nursery block closes do we allow subactor results to be + # awaited and reported upwards to the supervisor. await actor_nursery._join_procs.wait() if portal in actor_nursery._cancel_after_result_on_exit: