diff --git a/tractor/_actor.py b/tractor/_actor.py index 22e2253..e962115 100644 --- a/tractor/_actor.py +++ b/tractor/_actor.py @@ -225,9 +225,19 @@ class Actor: the original nursery we need to try and load the local module code (if it exists). """ - for path in self.rpc_module_paths: - log.debug(f"Attempting to import {path}") - self._mods[path] = importlib.import_module(path) + try: + for path in self.rpc_module_paths: + log.debug(f"Attempting to import {path}") + self._mods[path] = importlib.import_module(path) + + # if self.name != 'arbiter': + # importlib.import_module('doggy') + # from celery.contrib import rdb; rdb.set_trace() + except ModuleNotFoundError: + # it is expected the corresponding `ModuleNotExposed` error + # will be raised later + log.error(f"Failed to import {path} in {self.name}") + raise def _get_rpc_func(self, ns, funcname): try: @@ -488,7 +498,7 @@ class Actor: f"Exiting msg loop for {chan} from {chan.uid} " f"with last msg:\n{msg}") - def _fork_main( + def _mp_main( self, accept_addr: Tuple[str, int], forkserver_info: Tuple[Any, Any, Any, Any, Any], @@ -500,13 +510,17 @@ class Actor: self._forkserver_info = forkserver_info from ._spawn import try_set_start_method spawn_ctx = try_set_start_method(start_method) + if self.loglevel is not None: log.info( f"Setting loglevel for {self.uid} to {self.loglevel}") get_console_log(self.loglevel) + log.info( f"Started new {spawn_ctx.current_process()} for {self.uid}") + _state._current_actor = self + log.debug(f"parent_addr is {parent_addr}") try: trio.run(partial( @@ -515,6 +529,21 @@ class Actor: pass # handle it the same way trio does? log.info(f"Actor {self.uid} terminated") + async def _trip_main( + self, + accept_addr: Tuple[str, int], + parent_addr: Tuple[str, int] = None + ) -> None: + if self.loglevel is not None: + log.info( + f"Setting loglevel for {self.uid} to {self.loglevel}") + get_console_log(self.loglevel) + + log.info(f"Started new TRIP process for {self.uid}") + _state._current_actor = self + await self._async_main(accept_addr, parent_addr=parent_addr) + log.info(f"Actor {self.uid} terminated") + async def _async_main( self, accept_addr: Tuple[str, int], @@ -584,6 +613,8 @@ class Actor: # blocks here as expected until the channel server is # killed (i.e. this actor is cancelled or signalled by the parent) except Exception as err: + # if self.name == 'arbiter': + # import pdb; pdb.set_trace() if not registered_with_arbiter: log.exception( f"Actor errored and failed to register with arbiter " @@ -598,12 +629,18 @@ class Actor: f"Failed to ship error to parent " f"{self._parent_chan.uid}, channel was closed") log.exception("Actor errored:") + + if isinstance(err, ModuleNotFoundError): + raise else: # XXX wait, why? # causes a hang if I always raise.. + # A parent process does something weird here? raise finally: + # if self.name == 'arbiter': + # import pdb; pdb.set_trace() if registered_with_arbiter: await self._do_unreg(arbiter_addr) # terminate actor once all it's peers (actors that connected diff --git a/tractor/_spawn.py b/tractor/_spawn.py index f299b9d..311ab0b 100644 --- a/tractor/_spawn.py +++ b/tractor/_spawn.py @@ -5,6 +5,8 @@ Mostly just wrapping around ``multiprocessing``. """ import multiprocessing as mp +import trio_run_in_process + try: from multiprocessing import semaphore_tracker # type: ignore resource_tracker = semaphore_tracker @@ -55,54 +57,66 @@ def is_main_process() -> bool: return mp.current_process().name == 'MainProcess' -def new_proc( +async def new_proc( name: str, actor: Actor, # passed through to actor main bind_addr: Tuple[str, int], parent_addr: Tuple[str, int], + use_trip: bool = True, ) -> mp.Process: """Create a new ``multiprocessing.Process`` using the spawn method as configured using ``try_set_start_method()``. """ - start_method = _ctx.get_start_method() - if start_method == 'forkserver': - # XXX do our hackery on the stdlib to avoid multiple - # forkservers (one at each subproc layer). - fs = forkserver._forkserver - curr_actor = current_actor() - if is_main_process() and not curr_actor._forkserver_info: - # if we're the "main" process start the forkserver only once - # and pass its ipc info to downstream children - # forkserver.set_forkserver_preload(rpc_module_paths) - forkserver.ensure_running() - fs_info = ( - fs._forkserver_address, - fs._forkserver_alive_fd, - getattr(fs, '_forkserver_pid', None), - getattr(resource_tracker._resource_tracker, '_pid', None), - resource_tracker._resource_tracker._fd, - ) - else: - assert curr_actor._forkserver_info - fs_info = ( - fs._forkserver_address, - fs._forkserver_alive_fd, - fs._forkserver_pid, - resource_tracker._resource_tracker._pid, - resource_tracker._resource_tracker._fd, - ) = curr_actor._forkserver_info - else: - fs_info = (None, None, None, None, None) - - return _ctx.Process( # type: ignore - target=actor._fork_main, - args=( + if use_trip: # trio_run_in_process + mng = trio_run_in_process.open_in_process( + actor._trip_main, bind_addr, - fs_info, - start_method, parent_addr - ), - # daemon=True, - name=name, - ) + ) + proc = await mng.__aenter__() + proc.mng = mng + return proc + else: + # use multiprocessing + start_method = _ctx.get_start_method() + if start_method == 'forkserver': + # XXX do our hackery on the stdlib to avoid multiple + # forkservers (one at each subproc layer). + fs = forkserver._forkserver + curr_actor = current_actor() + if is_main_process() and not curr_actor._forkserver_info: + # if we're the "main" process start the forkserver only once + # and pass its ipc info to downstream children + # forkserver.set_forkserver_preload(rpc_module_paths) + forkserver.ensure_running() + fs_info = ( + fs._forkserver_address, + fs._forkserver_alive_fd, + getattr(fs, '_forkserver_pid', None), + getattr(resource_tracker._resource_tracker, '_pid', None), + resource_tracker._resource_tracker._fd, + ) + else: + assert curr_actor._forkserver_info + fs_info = ( + fs._forkserver_address, + fs._forkserver_alive_fd, + fs._forkserver_pid, + resource_tracker._resource_tracker._pid, + resource_tracker._resource_tracker._fd, + ) = curr_actor._forkserver_info + else: + fs_info = (None, None, None, None, None) + + return _ctx.Process( + target=actor._mp_main, + args=( + bind_addr, + fs_info, + start_method, + parent_addr + ), + # daemon=True, + name=name, + ) diff --git a/tractor/_trionics.py b/tractor/_trionics.py index 5250f81..cfed406 100644 --- a/tractor/_trionics.py +++ b/tractor/_trionics.py @@ -9,6 +9,7 @@ import typing import trio from async_generator import asynccontextmanager, aclosing +import trio_run_in_process from ._state import current_actor from .log import get_logger, get_loglevel @@ -64,20 +65,23 @@ class ActorNursery: arbiter_addr=current_actor()._arb_addr, ) parent_addr = self._actor.accept_addr - proc = _spawn.new_proc( + assert parent_addr + proc = await _spawn.new_proc( name, actor, bind_addr, parent_addr, ) + # `multiprocessing` only (since no async interface): # register the process before start in case we get a cancel # request before the actor has fully spawned - then we can wait # for it to fully come up before sending a cancel request self._children[actor.uid] = (actor, proc, None) - proc.start() - if not proc.is_alive(): - raise ActorFailure("Couldn't start sub-actor?") + if not isinstance(proc, trio_run_in_process.process.Process): + proc.start() + if not proc.is_alive(): + raise ActorFailure("Couldn't start sub-actor?") log.info(f"Started {proc}") # wait for actor to spawn and connect back to us @@ -193,12 +197,17 @@ class ActorNursery: actor: Actor, cancel_scope: Optional[trio.CancelScope] = None, ) -> None: - # TODO: timeout block here? - if proc.is_alive(): - await proc_waiter(proc) - # please god don't hang - proc.join() + if not isinstance(proc, trio_run_in_process.process.Process): + # TODO: timeout block here? + if proc.is_alive(): + await proc_waiter(proc) + proc.join() + else: + # trio_run_in_process blocking wait + await proc.mng.__aexit__(None, None, None) + # proc.nursery.cancel_scope.cancel() + log.debug(f"Joined {proc}") # indicate we are no longer managing this subactor self._children.pop(actor.uid)