From 607c48f1ace802b3f793d2f726e5574ee04212c1 Mon Sep 17 00:00:00 2001 From: Tyler Goodlet Date: Wed, 28 Apr 2021 22:11:59 -0400 Subject: [PATCH 1/4] Distinctly separate and harden mp spawning It's clear now that special attention is needed to handle the case where a spawned `multiprocessing` proc is started but then the parent is cancelled before the child can connect back; in this case we need to be sure to kill the near-zombie child asap. This may end up being the solution to other resiliency issues seen around mp with nested process trees too. More testing is needed to be sure. Relates to #84 #89 #134 #146 --- tractor/_spawn.py | 226 +++++++++++++++++++++++++++++++--------------- 1 file changed, 152 insertions(+), 74 deletions(-) diff --git a/tractor/_spawn.py b/tractor/_spawn.py index a07f60c..3f07ba0 100644 --- a/tractor/_spawn.py +++ b/tractor/_spawn.py @@ -2,14 +2,13 @@ Machinery for actor process spawning using multiple backends. """ import sys -import inspect import multiprocessing as mp import platform from typing import Any, Dict, Optional import trio from trio_typing import TaskStatus -from async_generator import aclosing, asynccontextmanager +from async_generator import asynccontextmanager try: from multiprocessing import semaphore_tracker # type: ignore @@ -128,7 +127,9 @@ async def cancel_on_completion( Should only be called for actors spawned with `run_in_actor()`. """ with trio.CancelScope() as cs: + task_status.started(cs) + # if this call errors we store the exception for later # in ``errors`` which will be reraised inside # a MultiError and we still send out a cancel request @@ -138,6 +139,7 @@ async def cancel_on_completion( log.warning( f"Cancelling {portal.channel.uid} after error {result}" ) + else: log.info( f"Cancelling {portal.channel.uid} gracefully " @@ -202,7 +204,7 @@ async def spawn_subactor( async def new_proc( name: str, - actor_nursery: 'ActorNursery', # type: ignore + actor_nursery: 'ActorNursery', # type: ignore # noqa subactor: Actor, errors: Dict[Tuple[str, str], Exception], # passed through to actor main @@ -221,8 +223,8 @@ async def new_proc( # mark the new actor with the global spawn method subactor._spawn_method = _spawn_method - async with trio.open_nursery() as nursery: - if use_trio_run_in_process or _spawn_method == 'trio': + if use_trio_run_in_process or _spawn_method == 'trio': + async with trio.open_nursery() as nursery: async with spawn_subactor( subactor, parent_addr, @@ -261,7 +263,11 @@ async def new_proc( if portal in actor_nursery._cancel_after_result_on_exit: cancel_scope = await nursery.start( - cancel_on_completion, portal, subactor, errors) + cancel_on_completion, + portal, + subactor, + errors + ) # Wait for proc termination but **dont' yet** call # ``trio.Process.__aexit__()`` (it tears down stdio @@ -275,65 +281,107 @@ async def new_proc( # no actor zombies allowed # with trio.CancelScope(shield=True): await proc.wait() - else: - # `multiprocessing` - assert _ctx - start_method = _ctx.get_start_method() - if start_method == 'forkserver': - # XXX do our hackery on the stdlib to avoid multiple - # forkservers (one at each subproc layer). - fs = forkserver._forkserver - curr_actor = current_actor() - if is_main_process() and not curr_actor._forkserver_info: - # if we're the "main" process start the forkserver - # only once and pass its ipc info to downstream - # children - # forkserver.set_forkserver_preload(enable_modules) - forkserver.ensure_running() - fs_info = ( - fs._forkserver_address, - fs._forkserver_alive_fd, - getattr(fs, '_forkserver_pid', None), - getattr( - resource_tracker._resource_tracker, '_pid', None), - resource_tracker._resource_tracker._fd, - ) - else: - assert curr_actor._forkserver_info - fs_info = ( - fs._forkserver_address, - fs._forkserver_alive_fd, - fs._forkserver_pid, - resource_tracker._resource_tracker._pid, - resource_tracker._resource_tracker._fd, - ) = curr_actor._forkserver_info + + log.debug(f"Joined {proc}") + # pop child entry to indicate we are no longer managing this subactor + subactor, proc, portal = actor_nursery._children.pop(subactor.uid) + + # cancel result waiter that may have been spawned in + # tandem if not done already + if cancel_scope: + log.warning( + f"Cancelling existing result waiter task for {subactor.uid}") + cancel_scope.cancel() + else: + # `multiprocessing` + # async with trio.open_nursery() as nursery: + await mp_new_proc( + name=name, + actor_nursery=actor_nursery, + subactor=subactor, + errors=errors, + # passed through to actor main + bind_addr=bind_addr, + parent_addr=parent_addr, + _runtime_vars=_runtime_vars, + task_status=task_status, + ) + + +async def mp_new_proc( + + name: str, + actor_nursery: 'ActorNursery', # type: ignore # noqa + subactor: Actor, + errors: Dict[Tuple[str, str], Exception], + # passed through to actor main + bind_addr: Tuple[str, int], + parent_addr: Tuple[str, int], + _runtime_vars: Dict[str, Any], # serialized and sent to _child + *, + use_trio_run_in_process: bool = False, + task_status: TaskStatus[Portal] = trio.TASK_STATUS_IGNORED + +) -> None: + async with trio.open_nursery() as nursery: + assert _ctx + start_method = _ctx.get_start_method() + if start_method == 'forkserver': + # XXX do our hackery on the stdlib to avoid multiple + # forkservers (one at each subproc layer). + fs = forkserver._forkserver + curr_actor = current_actor() + if is_main_process() and not curr_actor._forkserver_info: + # if we're the "main" process start the forkserver + # only once and pass its ipc info to downstream + # children + # forkserver.set_forkserver_preload(enable_modules) + forkserver.ensure_running() + fs_info = ( + fs._forkserver_address, + fs._forkserver_alive_fd, + getattr(fs, '_forkserver_pid', None), + getattr( + resource_tracker._resource_tracker, '_pid', None), + resource_tracker._resource_tracker._fd, + ) else: - fs_info = (None, None, None, None, None) + assert curr_actor._forkserver_info + fs_info = ( + fs._forkserver_address, + fs._forkserver_alive_fd, + fs._forkserver_pid, + resource_tracker._resource_tracker._pid, + resource_tracker._resource_tracker._fd, + ) = curr_actor._forkserver_info + else: + fs_info = (None, None, None, None, None) - proc = _ctx.Process( # type: ignore - target=_mp_main, - args=( - subactor, - bind_addr, - fs_info, - start_method, - parent_addr, - ), - # daemon=True, - name=name, - ) - # `multiprocessing` only (since no async interface): - # register the process before start in case we get a cancel - # request before the actor has fully spawned - then we can wait - # for it to fully come up before sending a cancel request - actor_nursery._children[subactor.uid] = (subactor, proc, None) + proc = _ctx.Process( # type: ignore + target=_mp_main, + args=( + subactor, + bind_addr, + fs_info, + start_method, + parent_addr, + ), + # daemon=True, + name=name, + ) + # `multiprocessing` only (since no async interface): + # register the process before start in case we get a cancel + # request before the actor has fully spawned - then we can wait + # for it to fully come up before sending a cancel request + actor_nursery._children[subactor.uid] = (subactor, proc, None) - proc.start() - if not proc.is_alive(): - raise ActorFailure("Couldn't start sub-actor?") + proc.start() + if not proc.is_alive(): + raise ActorFailure("Couldn't start sub-actor?") - log.info(f"Started {proc}") + log.info(f"Started {proc}") + try: # wait for actor to spawn and connect back to us # channel should have handshake completed by the # local actor by the time we get a ref to it @@ -356,23 +404,53 @@ async def new_proc( # awaited and reported upwards to the supervisor. await actor_nursery._join_procs.wait() + finally: + # XXX: in the case we were cancelled before the sub-proc + # registered itself back we must be sure to try and clean + # any process we may have started. + + reaping_cancelled = False + cancel_scope = None + if portal in actor_nursery._cancel_after_result_on_exit: - cancel_scope = await nursery.start( - cancel_on_completion, portal, subactor, errors) + try: + # async with trio.open_nursery() as n: + # n.cancel_scope.shield = True + cancel_scope = await nursery.start( + cancel_on_completion, + portal, + subactor, + errors + ) + except trio.Cancelled: + # if the reaping task was cancelled we may have hit + # a race where the subproc disconnected before we + # could send it a message to cancel (classic 2 generals) + # in that case, wait shortly then kill the process. + reaping_cancelled = True + + if proc.is_alive(): + with trio.move_on_after(0.1) as cs: + cs.shield = True + await proc_waiter(proc) + + if cs.cancelled_caught: + proc.terminate() + + if not reaping_cancelled: + if proc.is_alive(): + await proc_waiter(proc) # TODO: timeout block here? - if proc.is_alive(): - await proc_waiter(proc) proc.join() - # This is again common logic for all backends: + log.debug(f"Joined {proc}") + # pop child entry to indicate we are no longer managing this subactor + subactor, proc, portal = actor_nursery._children.pop(subactor.uid) - log.debug(f"Joined {proc}") - # pop child entry to indicate we are no longer managing this subactor - subactor, proc, portal = actor_nursery._children.pop(subactor.uid) - # cancel result waiter that may have been spawned in - # tandem if not done already - if cancel_scope: - log.warning( - f"Cancelling existing result waiter task for {subactor.uid}") - cancel_scope.cancel() + # cancel result waiter that may have been spawned in + # tandem if not done already + if cancel_scope: + log.warning( + f"Cancelling existing result waiter task for {subactor.uid}") + cancel_scope.cancel() From c4b42000ebd4288ff8f8b99792d76dddfa0e388f Mon Sep 17 00:00:00 2001 From: Tyler Goodlet Date: Wed, 28 Apr 2021 22:19:36 -0400 Subject: [PATCH 2/4] Shield around root actor cancel --- tractor/_root.py | 5 ++++- 1 file changed, 4 insertions(+), 1 deletion(-) diff --git a/tractor/_root.py b/tractor/_root.py index 48ea462..8f4eb9a 100644 --- a/tractor/_root.py +++ b/tractor/_root.py @@ -169,10 +169,13 @@ async def open_root_actor( logger.exception("Actor crashed:") await _debug._maybe_enter_pm(err) + # always re-raise raise + finally: logger.info("Shutting down root actor") - await actor.cancel() + with trio.CancelScope(shield=True): + await actor.cancel() finally: _state._current_actor = None logger.info("Root actor terminated") From 9f38406e8571d62de690a95c064bf54b3db274cd Mon Sep 17 00:00:00 2001 From: Tyler Goodlet Date: Wed, 28 Apr 2021 22:26:06 -0400 Subject: [PATCH 3/4] Appease mypy --- tractor/_spawn.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/tractor/_spawn.py b/tractor/_spawn.py index 3f07ba0..cb1f3b2 100644 --- a/tractor/_spawn.py +++ b/tractor/_spawn.py @@ -357,7 +357,7 @@ async def mp_new_proc( else: fs_info = (None, None, None, None, None) - proc = _ctx.Process( # type: ignore + proc: mp.Process = _ctx.Process( # type: ignore target=_mp_main, args=( subactor, From 87971de1d91dc70ce0e85b77d01ed97b0c3854db Mon Sep 17 00:00:00 2001 From: Tyler Goodlet Date: Thu, 6 May 2021 12:04:50 -0400 Subject: [PATCH 4/4] Re-raise any sidestepped `trio.Cancelled` --- tractor/_spawn.py | 28 ++++++++++++++++++---------- 1 file changed, 18 insertions(+), 10 deletions(-) diff --git a/tractor/_spawn.py b/tractor/_spawn.py index cb1f3b2..d893479 100644 --- a/tractor/_spawn.py +++ b/tractor/_spawn.py @@ -283,14 +283,15 @@ async def new_proc( await proc.wait() log.debug(f"Joined {proc}") - # pop child entry to indicate we are no longer managing this subactor + # pop child entry to indicate we no longer managing this subactor subactor, proc, portal = actor_nursery._children.pop(subactor.uid) # cancel result waiter that may have been spawned in # tandem if not done already if cancel_scope: log.warning( - f"Cancelling existing result waiter task for {subactor.uid}") + "Cancelling existing result waiter task for " + f"{subactor.uid}") cancel_scope.cancel() else: # `multiprocessing` @@ -409,8 +410,9 @@ async def mp_new_proc( # registered itself back we must be sure to try and clean # any process we may have started. - reaping_cancelled = False - cancel_scope = None + reaping_cancelled: bool = False + cancel_scope: Optional[trio.CancelScope] = None + cancel_exc: Optional[trio.Cancelled] = None if portal in actor_nursery._cancel_after_result_on_exit: try: @@ -422,7 +424,9 @@ async def mp_new_proc( subactor, errors ) - except trio.Cancelled: + except trio.Cancelled as err: + cancel_exc = err + # if the reaping task was cancelled we may have hit # a race where the subproc disconnected before we # could send it a message to cancel (classic 2 generals) @@ -437,20 +441,24 @@ async def mp_new_proc( if cs.cancelled_caught: proc.terminate() - if not reaping_cancelled: - if proc.is_alive(): - await proc_waiter(proc) + if not reaping_cancelled and proc.is_alive(): + await proc_waiter(proc) # TODO: timeout block here? proc.join() log.debug(f"Joined {proc}") - # pop child entry to indicate we are no longer managing this subactor + # pop child entry to indicate we are no longer managing subactor subactor, proc, portal = actor_nursery._children.pop(subactor.uid) # cancel result waiter that may have been spawned in # tandem if not done already if cancel_scope: log.warning( - f"Cancelling existing result waiter task for {subactor.uid}") + "Cancelling existing result waiter task for " + f"{subactor.uid}") cancel_scope.cancel() + + elif reaping_cancelled: # let the cancellation bubble up + assert cancel_exc + raise cancel_exc