diff --git a/tractor/_root.py b/tractor/_root.py index 48ea462..8f4eb9a 100644 --- a/tractor/_root.py +++ b/tractor/_root.py @@ -169,10 +169,13 @@ async def open_root_actor( logger.exception("Actor crashed:") await _debug._maybe_enter_pm(err) + # always re-raise raise + finally: logger.info("Shutting down root actor") - await actor.cancel() + with trio.CancelScope(shield=True): + await actor.cancel() finally: _state._current_actor = None logger.info("Root actor terminated") diff --git a/tractor/_spawn.py b/tractor/_spawn.py index a07f60c..d893479 100644 --- a/tractor/_spawn.py +++ b/tractor/_spawn.py @@ -2,14 +2,13 @@ Machinery for actor process spawning using multiple backends. """ import sys -import inspect import multiprocessing as mp import platform from typing import Any, Dict, Optional import trio from trio_typing import TaskStatus -from async_generator import aclosing, asynccontextmanager +from async_generator import asynccontextmanager try: from multiprocessing import semaphore_tracker # type: ignore @@ -128,7 +127,9 @@ async def cancel_on_completion( Should only be called for actors spawned with `run_in_actor()`. """ with trio.CancelScope() as cs: + task_status.started(cs) + # if this call errors we store the exception for later # in ``errors`` which will be reraised inside # a MultiError and we still send out a cancel request @@ -138,6 +139,7 @@ async def cancel_on_completion( log.warning( f"Cancelling {portal.channel.uid} after error {result}" ) + else: log.info( f"Cancelling {portal.channel.uid} gracefully " @@ -202,7 +204,7 @@ async def spawn_subactor( async def new_proc( name: str, - actor_nursery: 'ActorNursery', # type: ignore + actor_nursery: 'ActorNursery', # type: ignore # noqa subactor: Actor, errors: Dict[Tuple[str, str], Exception], # passed through to actor main @@ -221,8 +223,8 @@ async def new_proc( # mark the new actor with the global spawn method subactor._spawn_method = _spawn_method - async with trio.open_nursery() as nursery: - if use_trio_run_in_process or _spawn_method == 'trio': + if use_trio_run_in_process or _spawn_method == 'trio': + async with trio.open_nursery() as nursery: async with spawn_subactor( subactor, parent_addr, @@ -261,7 +263,11 @@ async def new_proc( if portal in actor_nursery._cancel_after_result_on_exit: cancel_scope = await nursery.start( - cancel_on_completion, portal, subactor, errors) + cancel_on_completion, + portal, + subactor, + errors + ) # Wait for proc termination but **dont' yet** call # ``trio.Process.__aexit__()`` (it tears down stdio @@ -275,65 +281,108 @@ async def new_proc( # no actor zombies allowed # with trio.CancelScope(shield=True): await proc.wait() - else: - # `multiprocessing` - assert _ctx - start_method = _ctx.get_start_method() - if start_method == 'forkserver': - # XXX do our hackery on the stdlib to avoid multiple - # forkservers (one at each subproc layer). - fs = forkserver._forkserver - curr_actor = current_actor() - if is_main_process() and not curr_actor._forkserver_info: - # if we're the "main" process start the forkserver - # only once and pass its ipc info to downstream - # children - # forkserver.set_forkserver_preload(enable_modules) - forkserver.ensure_running() - fs_info = ( - fs._forkserver_address, - fs._forkserver_alive_fd, - getattr(fs, '_forkserver_pid', None), - getattr( - resource_tracker._resource_tracker, '_pid', None), - resource_tracker._resource_tracker._fd, - ) - else: - assert curr_actor._forkserver_info - fs_info = ( - fs._forkserver_address, - fs._forkserver_alive_fd, - fs._forkserver_pid, - resource_tracker._resource_tracker._pid, - resource_tracker._resource_tracker._fd, - ) = curr_actor._forkserver_info + + log.debug(f"Joined {proc}") + # pop child entry to indicate we no longer managing this subactor + subactor, proc, portal = actor_nursery._children.pop(subactor.uid) + + # cancel result waiter that may have been spawned in + # tandem if not done already + if cancel_scope: + log.warning( + "Cancelling existing result waiter task for " + f"{subactor.uid}") + cancel_scope.cancel() + else: + # `multiprocessing` + # async with trio.open_nursery() as nursery: + await mp_new_proc( + name=name, + actor_nursery=actor_nursery, + subactor=subactor, + errors=errors, + # passed through to actor main + bind_addr=bind_addr, + parent_addr=parent_addr, + _runtime_vars=_runtime_vars, + task_status=task_status, + ) + + +async def mp_new_proc( + + name: str, + actor_nursery: 'ActorNursery', # type: ignore # noqa + subactor: Actor, + errors: Dict[Tuple[str, str], Exception], + # passed through to actor main + bind_addr: Tuple[str, int], + parent_addr: Tuple[str, int], + _runtime_vars: Dict[str, Any], # serialized and sent to _child + *, + use_trio_run_in_process: bool = False, + task_status: TaskStatus[Portal] = trio.TASK_STATUS_IGNORED + +) -> None: + async with trio.open_nursery() as nursery: + assert _ctx + start_method = _ctx.get_start_method() + if start_method == 'forkserver': + # XXX do our hackery on the stdlib to avoid multiple + # forkservers (one at each subproc layer). + fs = forkserver._forkserver + curr_actor = current_actor() + if is_main_process() and not curr_actor._forkserver_info: + # if we're the "main" process start the forkserver + # only once and pass its ipc info to downstream + # children + # forkserver.set_forkserver_preload(enable_modules) + forkserver.ensure_running() + fs_info = ( + fs._forkserver_address, + fs._forkserver_alive_fd, + getattr(fs, '_forkserver_pid', None), + getattr( + resource_tracker._resource_tracker, '_pid', None), + resource_tracker._resource_tracker._fd, + ) else: - fs_info = (None, None, None, None, None) + assert curr_actor._forkserver_info + fs_info = ( + fs._forkserver_address, + fs._forkserver_alive_fd, + fs._forkserver_pid, + resource_tracker._resource_tracker._pid, + resource_tracker._resource_tracker._fd, + ) = curr_actor._forkserver_info + else: + fs_info = (None, None, None, None, None) - proc = _ctx.Process( # type: ignore - target=_mp_main, - args=( - subactor, - bind_addr, - fs_info, - start_method, - parent_addr, - ), - # daemon=True, - name=name, - ) - # `multiprocessing` only (since no async interface): - # register the process before start in case we get a cancel - # request before the actor has fully spawned - then we can wait - # for it to fully come up before sending a cancel request - actor_nursery._children[subactor.uid] = (subactor, proc, None) + proc: mp.Process = _ctx.Process( # type: ignore + target=_mp_main, + args=( + subactor, + bind_addr, + fs_info, + start_method, + parent_addr, + ), + # daemon=True, + name=name, + ) + # `multiprocessing` only (since no async interface): + # register the process before start in case we get a cancel + # request before the actor has fully spawned - then we can wait + # for it to fully come up before sending a cancel request + actor_nursery._children[subactor.uid] = (subactor, proc, None) - proc.start() - if not proc.is_alive(): - raise ActorFailure("Couldn't start sub-actor?") + proc.start() + if not proc.is_alive(): + raise ActorFailure("Couldn't start sub-actor?") - log.info(f"Started {proc}") + log.info(f"Started {proc}") + try: # wait for actor to spawn and connect back to us # channel should have handshake completed by the # local actor by the time we get a ref to it @@ -356,23 +405,60 @@ async def new_proc( # awaited and reported upwards to the supervisor. await actor_nursery._join_procs.wait() + finally: + # XXX: in the case we were cancelled before the sub-proc + # registered itself back we must be sure to try and clean + # any process we may have started. + + reaping_cancelled: bool = False + cancel_scope: Optional[trio.CancelScope] = None + cancel_exc: Optional[trio.Cancelled] = None + if portal in actor_nursery._cancel_after_result_on_exit: - cancel_scope = await nursery.start( - cancel_on_completion, portal, subactor, errors) + try: + # async with trio.open_nursery() as n: + # n.cancel_scope.shield = True + cancel_scope = await nursery.start( + cancel_on_completion, + portal, + subactor, + errors + ) + except trio.Cancelled as err: + cancel_exc = err + + # if the reaping task was cancelled we may have hit + # a race where the subproc disconnected before we + # could send it a message to cancel (classic 2 generals) + # in that case, wait shortly then kill the process. + reaping_cancelled = True + + if proc.is_alive(): + with trio.move_on_after(0.1) as cs: + cs.shield = True + await proc_waiter(proc) + + if cs.cancelled_caught: + proc.terminate() + + if not reaping_cancelled and proc.is_alive(): + await proc_waiter(proc) # TODO: timeout block here? - if proc.is_alive(): - await proc_waiter(proc) proc.join() - # This is again common logic for all backends: + log.debug(f"Joined {proc}") + # pop child entry to indicate we are no longer managing subactor + subactor, proc, portal = actor_nursery._children.pop(subactor.uid) - log.debug(f"Joined {proc}") - # pop child entry to indicate we are no longer managing this subactor - subactor, proc, portal = actor_nursery._children.pop(subactor.uid) - # cancel result waiter that may have been spawned in - # tandem if not done already - if cancel_scope: - log.warning( - f"Cancelling existing result waiter task for {subactor.uid}") - cancel_scope.cancel() + # cancel result waiter that may have been spawned in + # tandem if not done already + if cancel_scope: + log.warning( + "Cancelling existing result waiter task for " + f"{subactor.uid}") + cancel_scope.cancel() + + elif reaping_cancelled: # let the cancellation bubble up + assert cancel_exc + raise cancel_exc