diff --git a/tractor/_spawn.py b/tractor/_spawn.py index 943cccb..55ac9a3 100644 --- a/tractor/_spawn.py +++ b/tractor/_spawn.py @@ -8,7 +8,6 @@ from typing import Any, Dict, Optional import trio from trio_typing import TaskStatus -from async_generator import asynccontextmanager try: from multiprocessing import semaphore_tracker # type: ignore @@ -123,34 +122,32 @@ async def cancel_on_completion( portal: Portal, actor: Actor, errors: Dict[Tuple[str, str], Exception], - task_status: TaskStatus[trio.CancelScope] = trio.TASK_STATUS_IGNORED, + ) -> None: - """Cancel actor gracefully once it's "main" portal's + """ + Cancel actor gracefully once it's "main" portal's result arrives. Should only be called for actors spawned with `run_in_actor()`. + """ - with trio.CancelScope() as cs: + # if this call errors we store the exception for later + # in ``errors`` which will be reraised inside + # a MultiError and we still send out a cancel request + result = await exhaust_portal(portal, actor) + if isinstance(result, Exception): + errors[actor.uid] = result + log.warning( + f"Cancelling {portal.channel.uid} after error {result}" + ) - task_status.started(cs) + else: + log.runtime( + f"Cancelling {portal.channel.uid} gracefully " + f"after result {result}") - # if this call errors we store the exception for later - # in ``errors`` which will be reraised inside - # a MultiError and we still send out a cancel request - result = await exhaust_portal(portal, actor) - if isinstance(result, Exception): - errors[actor.uid] = result - log.warning( - f"Cancelling {portal.channel.uid} after error {result}" - ) - - else: - log.runtime( - f"Cancelling {portal.channel.uid} gracefully " - f"after result {result}") - - # cancel the process now that we have a final result - await portal.cancel_actor() + # cancel the process now that we have a final result + await portal.cancel_actor() async def do_hard_kill( @@ -191,8 +188,6 @@ async def new_proc( spawn method as configured using ``try_set_start_method()``. """ - cancel_scope = None - # mark the new actor with the global spawn method subactor._spawn_method = _spawn_method @@ -258,7 +253,8 @@ async def new_proc( async with trio.open_nursery() as nursery: if portal in actor_nursery._cancel_after_result_on_exit: - cancel_scope = await nursery.start( + # cancel_scope = await nursery.start( + nursery.start_soon( cancel_on_completion, portal, subactor, @@ -271,22 +267,22 @@ async def new_proc( # This is a "soft" (cancellable) join/reap. await proc.wait() - finally: - # cancel result waiter that may have been spawned in - # tandem if not done already - if cancel_scope: + # cancel result waiter that may have been spawned in + # tandem if not done already log.warning( "Cancelling existing result waiter task for " f"{subactor.uid}") - cancel_scope.cancel() + nursery.cancel_scope.cancel() - log.runtime(f"Attempting to kill {proc}") + finally: + if proc.poll() is None: + log.cancel(f"Attempting to hard kill {proc}") - # The "hard" reap since no actor zombies are allowed! - # XXX: do this **after** cancellation/tearfown to avoid - # killing the process too early. - with trio.CancelScope(shield=True): - await do_hard_kill(proc) + # The "hard" reap since no actor zombies are allowed! + # XXX: do this **after** cancellation/tearfown to avoid + # killing the process too early. + with trio.CancelScope(shield=True): + await do_hard_kill(proc) log.debug(f"Joined {proc}") # pop child entry to indicate we no longer managing this subactor @@ -322,141 +318,124 @@ async def mp_new_proc( task_status: TaskStatus[Portal] = trio.TASK_STATUS_IGNORED ) -> None: - async with trio.open_nursery() as nursery: - assert _ctx - start_method = _ctx.get_start_method() - if start_method == 'forkserver': - # XXX do our hackery on the stdlib to avoid multiple - # forkservers (one at each subproc layer). - fs = forkserver._forkserver - curr_actor = current_actor() - if is_main_process() and not curr_actor._forkserver_info: - # if we're the "main" process start the forkserver - # only once and pass its ipc info to downstream - # children - # forkserver.set_forkserver_preload(enable_modules) - forkserver.ensure_running() - fs_info = ( - fs._forkserver_address, - fs._forkserver_alive_fd, - getattr(fs, '_forkserver_pid', None), - getattr( - resource_tracker._resource_tracker, '_pid', None), - resource_tracker._resource_tracker._fd, - ) - else: - assert curr_actor._forkserver_info - fs_info = ( - fs._forkserver_address, - fs._forkserver_alive_fd, - fs._forkserver_pid, - resource_tracker._resource_tracker._pid, - resource_tracker._resource_tracker._fd, - ) = curr_actor._forkserver_info + + assert _ctx + start_method = _ctx.get_start_method() + if start_method == 'forkserver': + # XXX do our hackery on the stdlib to avoid multiple + # forkservers (one at each subproc layer). + fs = forkserver._forkserver + curr_actor = current_actor() + if is_main_process() and not curr_actor._forkserver_info: + # if we're the "main" process start the forkserver + # only once and pass its ipc info to downstream + # children + # forkserver.set_forkserver_preload(enable_modules) + forkserver.ensure_running() + fs_info = ( + fs._forkserver_address, + fs._forkserver_alive_fd, + getattr(fs, '_forkserver_pid', None), + getattr( + resource_tracker._resource_tracker, '_pid', None), + resource_tracker._resource_tracker._fd, + ) else: - fs_info = (None, None, None, None, None) + assert curr_actor._forkserver_info + fs_info = ( + fs._forkserver_address, + fs._forkserver_alive_fd, + fs._forkserver_pid, + resource_tracker._resource_tracker._pid, + resource_tracker._resource_tracker._fd, + ) = curr_actor._forkserver_info + else: + fs_info = (None, None, None, None, None) - proc: mp.Process = _ctx.Process( # type: ignore - target=_mp_main, - args=( - subactor, - bind_addr, - fs_info, - start_method, - parent_addr, - ), - # daemon=True, - name=name, - ) - # `multiprocessing` only (since no async interface): - # register the process before start in case we get a cancel - # request before the actor has fully spawned - then we can wait - # for it to fully come up before sending a cancel request - actor_nursery._children[subactor.uid] = (subactor, proc, None) + proc: mp.Process = _ctx.Process( # type: ignore + target=_mp_main, + args=( + subactor, + bind_addr, + fs_info, + start_method, + parent_addr, + ), + # daemon=True, + name=name, + ) + # `multiprocessing` only (since no async interface): + # register the process before start in case we get a cancel + # request before the actor has fully spawned - then we can wait + # for it to fully come up before sending a cancel request + actor_nursery._children[subactor.uid] = (subactor, proc, None) - proc.start() - if not proc.is_alive(): - raise ActorFailure("Couldn't start sub-actor?") + proc.start() + if not proc.is_alive(): + raise ActorFailure("Couldn't start sub-actor?") - log.runtime(f"Started {proc}") + log.runtime(f"Started {proc}") - try: - # wait for actor to spawn and connect back to us - # channel should have handshake completed by the - # local actor by the time we get a ref to it - event, chan = await actor_nursery._actor.wait_for_peer( - subactor.uid) - portal = Portal(chan) - actor_nursery._children[subactor.uid] = (subactor, proc, portal) + try: + # wait for actor to spawn and connect back to us + # channel should have handshake completed by the + # local actor by the time we get a ref to it + event, chan = await actor_nursery._actor.wait_for_peer( + subactor.uid) + # except: + # TODO: in the case we were cancelled before the sub-proc + # registered itself back we must be sure to try and clean + # any process we may have started. - # unblock parent task - task_status.started(portal) + portal = Portal(chan) + actor_nursery._children[subactor.uid] = (subactor, proc, portal) - # wait for ``ActorNursery`` block to signal that - # subprocesses can be waited upon. - # This is required to ensure synchronization - # with user code that may want to manually await results - # from nursery spawned sub-actors. We don't want the - # containing nurseries here to collect results or error - # while user code is still doing it's thing. Only after the - # nursery block closes do we allow subactor results to be - # awaited and reported upwards to the supervisor. + # unblock parent task + task_status.started(portal) + + # wait for ``ActorNursery`` block to signal that + # subprocesses can be waited upon. + # This is required to ensure synchronization + # with user code that may want to manually await results + # from nursery spawned sub-actors. We don't want the + # containing nurseries here to collect results or error + # while user code is still doing it's thing. Only after the + # nursery block closes do we allow subactor results to be + # awaited and reported upwards to the supervisor. + with trio.CancelScope(shield=True): await actor_nursery._join_procs.wait() - finally: - # XXX: in the case we were cancelled before the sub-proc - # registered itself back we must be sure to try and clean - # any process we may have started. - - reaping_cancelled: bool = False - cancel_scope: Optional[trio.CancelScope] = None - cancel_exc: Optional[trio.Cancelled] = None - + async with trio.open_nursery() as nursery: if portal in actor_nursery._cancel_after_result_on_exit: - try: - # async with trio.open_nursery() as n: - # n.cancel_scope.shield = True - cancel_scope = await nursery.start( - cancel_on_completion, - portal, - subactor, - errors - ) - except trio.Cancelled as err: - cancel_exc = err + nursery.start_soon( + cancel_on_completion, + portal, + subactor, + errors + ) - # if the reaping task was cancelled we may have hit - # a race where the subproc disconnected before we - # could send it a message to cancel (classic 2 generals) - # in that case, wait shortly then kill the process. - reaping_cancelled = True - - if proc.is_alive(): - with trio.move_on_after(0.1) as cs: - cs.shield = True - await proc_waiter(proc) - - if cs.cancelled_caught: - proc.terminate() - - if not reaping_cancelled and proc.is_alive(): - await proc_waiter(proc) - - # TODO: timeout block here? - proc.join() - - log.debug(f"Joined {proc}") - # pop child entry to indicate we are no longer managing subactor - subactor, proc, portal = actor_nursery._children.pop(subactor.uid) + await proc_waiter(proc) # cancel result waiter that may have been spawned in # tandem if not done already - if cancel_scope: - log.warning( - "Cancelling existing result waiter task for " - f"{subactor.uid}") - cancel_scope.cancel() + log.warning( + "Cancelling existing result waiter task for " + f"{subactor.uid}") + nursery.cancel_scope.cancel() - elif reaping_cancelled: # let the cancellation bubble up - assert cancel_exc - raise cancel_exc + finally: + # hard reap sequence + if proc.is_alive(): + log.cancel(f"Attempting to hard kill {proc}") + with trio.move_on_after(0.1) as cs: + cs.shield = True + await proc_waiter(proc) + + if cs.cancelled_caught: + proc.terminate() + + proc.join() + log.debug(f"Joined {proc}") + + # pop child entry to indicate we are no longer managing subactor + subactor, proc, portal = actor_nursery._children.pop(subactor.uid)