From 64ebb2aff4953ecc03f35a41b674c812a7cc0e57 Mon Sep 17 00:00:00 2001 From: Tyler Goodlet Date: Fri, 8 Oct 2021 18:14:44 -0400 Subject: [PATCH] WIP rework trio spanwer to include cancellation logic; not correct yet.. --- tractor/_spawn.py | 503 +++++++++++++++++++++++++++++++--------------- 1 file changed, 341 insertions(+), 162 deletions(-) diff --git a/tractor/_spawn.py b/tractor/_spawn.py index c9afbfc..779ff4f 100644 --- a/tractor/_spawn.py +++ b/tractor/_spawn.py @@ -1,6 +1,7 @@ """ Machinery for actor process spawning using multiple backends. """ +from __future__ import annotations import sys import multiprocessing as mp import platform @@ -8,7 +9,6 @@ from typing import Any, Dict, Optional import trio from trio_typing import TaskStatus -from async_generator import asynccontextmanager try: from multiprocessing import semaphore_tracker # type: ignore @@ -32,6 +32,7 @@ from ._portal import Portal from ._actor import Actor from ._entry import _mp_main from ._exceptions import ActorFailure +from ._debug import maybe_wait_for_debugger log = get_logger('tractor') @@ -90,234 +91,409 @@ def try_set_start_method(name: str) -> Optional[mp.context.BaseContext]: return _ctx -async def exhaust_portal( - portal: Portal, - actor: Actor -) -> Any: - """Pull final result from portal (assuming it has one). +# async def exhaust_portal( +# portal: Portal, +# actor: Actor +# ) -> Any: +# """Pull final result from portal (assuming it has one). - If the main task is an async generator do our best to consume - what's left of it. - """ - try: - log.debug(f"Waiting on final result from {actor.uid}") +# If the main task is an async generator do our best to consume +# what's left of it. +# """ +# try: +# log.debug(f"Waiting on final result from {actor.uid}") - # XXX: streams should never be reaped here since they should - # always be established and shutdown using a context manager api - final = await portal.result() +# # XXX: streams should never be reaped here since they should +# # always be established and shutdown using a context manager api +# final = await portal.result() - except (Exception, trio.MultiError) as err: - # we reraise in the parent task via a ``trio.MultiError`` - return err - except trio.Cancelled as err: - # lol, of course we need this too ;P - # TODO: merge with above? - log.warning(f"Cancelled result waiter for {portal.actor.uid}") - return err - else: - log.debug(f"Returning final result: {final}") - return final +# except (Exception, trio.MultiError) as err: +# # we reraise in the parent task via a ``trio.MultiError`` +# return err +# except trio.Cancelled as err: +# # lol, of course we need this too ;P +# # TODO: merge with above? +# log.warning(f"Cancelled result waiter for {portal.actor.uid}") +# return err +# else: +# log.debug(f"Returning final result: {final}") +# return final -async def cancel_on_completion( +async def result_from_portal( portal: Portal, actor: Actor, + errors: Dict[Tuple[str, str], Exception], + cancel_on_result: bool = False, task_status: TaskStatus[trio.CancelScope] = trio.TASK_STATUS_IGNORED, + ) -> None: - """Cancel actor gracefully once it's "main" portal's + """ + Cancel actor gracefully once it's "main" portal's result arrives. Should only be called for actors spawned with `run_in_actor()`. - """ - with trio.CancelScope() as cs: + """ + __tracebackhide__ = True + + # cancel control is explicityl done by the caller + with trio.CancelScope() as cs: task_status.started(cs) # if this call errors we store the exception for later # in ``errors`` which will be reraised inside # a MultiError and we still send out a cancel request - result = await exhaust_portal(portal, actor) - if isinstance(result, Exception): - errors[actor.uid] = result - log.warning( - f"Cancelling {portal.channel.uid} after error {result}" - ) + # result = await exhaust_portal(portal, actor) + try: + log.debug(f"Waiting on final result from {actor.uid}") - else: - log.runtime( - f"Cancelling {portal.channel.uid} gracefully " - f"after result {result}") + # XXX: streams should never be reaped here since they should + # always be established and shutdown using a context manager api + result = await portal.result() + log.debug(f"Returning final result: {result}") - # cancel the process now that we have a final result - await portal.cancel_actor() + except (Exception, trio.MultiError) as err: + # we reraise in the parent task via a ``trio.MultiError`` + result = err + errors[actor.uid] = err + # raise + + except trio.Cancelled as err: + # lol, of course we need this too ;P + # TODO: merge with above? + log.warning(f"Cancelled result waiter for {portal.channel.uid}") + result = err + # errors[actor.uid] = err + # raise + + if cancel_on_result: + if isinstance(result, Exception): + # errors[actor.uid] = result + log.warning( + f"Cancelling {portal.channel.uid} after error {result}" + ) + raise result + + else: + log.runtime( + f"Cancelling {portal.channel.uid} gracefully " + f"after result {result}") + + # an actor that was `.run_in_actor()` executes a single task + # and delivers the result, then we cancel it. + # TODO: likely in the future we should just implement this using + # the new `open_context()` IPC api, since it's the more general + # api and can represent this form. + # XXX: do we need this? + # await maybe_wait_for_debugger() + await portal.cancel_actor() + + return result async def do_hard_kill( + proc: trio.Process, + timeout: float, + ) -> None: + ''' + Hard kill a process with timeout. + + ''' + log.debug(f"Hard killing {proc}") # NOTE: this timeout used to do nothing since we were shielding # the ``.wait()`` inside ``new_proc()`` which will pretty much # never release until the process exits, now it acts as # a hard-kill time ultimatum. - with trio.move_on_after(3) as cs: + with trio.move_on_after(timeout) as cs: - # NOTE: This ``__aexit__()`` shields internally. - async with proc: # calls ``trio.Process.aclose()`` + # NOTE: This ``__aexit__()`` shields internally and originally + # would tear down stdstreams via ``trio.Process.aclose()``. + async with proc: log.debug(f"Terminating {proc}") + # proc.terminate() if cs.cancelled_caught: # XXX: should pretty much never get here unless we have # to move the bits from ``proc.__aexit__()`` out and # into here. - log.critical(f"HARD KILLING {proc}") + log.critical(f"{timeout} timeout, HARD KILLING {proc}") proc.kill() -@asynccontextmanager -async def spawn_subactor( - subactor: 'Actor', - parent_addr: Tuple[str, int], - infect_asyncio: bool, -): - spawn_cmd = [ - sys.executable, - "-m", - # Hardcode this (instead of using ``_child.__name__`` to avoid a - # double import warning: https://stackoverflow.com/a/45070583 - "tractor._child", - # We provide the child's unique identifier on this exec/spawn - # line for debugging purposes when viewing the process tree from - # the OS; it otherwise can be passed via the parent channel if - # we prefer in the future (for privacy). - "--uid", - str(subactor.uid), - # Address the child must connect to on startup - "--parent_addr", - str(parent_addr) - ] +async def reap_proc( - if subactor.loglevel: - spawn_cmd += [ - "--loglevel", - subactor.loglevel - ] + proc: trio.Process, + terminate_after: float = float('inf'), + hard_kill_after: int = 0.1, - # Tell child to run in guest mode on top of ``asyncio`` loop - if infect_asyncio: - spawn_cmd.append("--asyncio") +) -> None: + with trio.move_on_after(terminate_after) as cs: + # Wait for proc termination but **dont' yet** do + # any out-of-ipc-land termination / process + # killing. This is a "light" (cancellable) join, + # the hard join is below after timeout + await proc.wait() - proc = await trio.open_process(spawn_cmd) - try: - yield proc + if cs.cancelled_caught and terminate_after is not float('inf'): + # Always "hard" join lingering sub procs since no + # actor zombies are allowed! + log.warning( + # f'Failed to gracefully terminate {subactor.uid}') + f'Failed to gracefully terminate {proc}\n' + f"Attempting to hard kill {proc}") - finally: - log.runtime(f"Attempting to kill {proc}") - - # XXX: do this **after** cancellation/tearfown - # to avoid killing the process too early - # since trio does this internally on ``__aexit__()`` - - await do_hard_kill(proc) + with trio.CancelScope(shield=True): + # XXX: do this **after** + # cancellation/tearfown to avoid killing the + # process too early since trio does this + # internally on ``__aexit__()`` + await do_hard_kill(proc, hard_kill_after) async def new_proc( + name: str, actor_nursery: 'ActorNursery', # type: ignore # noqa subactor: Actor, errors: Dict[Tuple[str, str], Exception], + # passed through to actor main bind_addr: Tuple[str, int], parent_addr: Tuple[str, int], _runtime_vars: Dict[str, Any], # serialized and sent to _child *, + + graceful_kill_timeout: int = 3, infect_asyncio: bool = False, task_status: TaskStatus[Portal] = trio.TASK_STATUS_IGNORED -) -> None: - """Create a new ``multiprocessing.Process`` using the - spawn method as configured using ``try_set_start_method()``. - """ - cancel_scope = None +) -> None: + """ + Create a new ``multiprocessing.Process`` using the + spawn method as configured using ``try_set_start_method()``. + + """ # mark the new actor with the global spawn method subactor._spawn_method = _spawn_method + uid = subactor.uid if _spawn_method == 'trio': - async with trio.open_nursery() as nursery: - async with spawn_subactor( - subactor, - parent_addr, - infect_asyncio=infect_asyncio - ) as proc: - log.runtime(f"Started {proc}") - # wait for actor to spawn and connect back to us - # channel should have handshake completed by the - # local actor by the time we get a ref to it - event, chan = await actor_nursery._actor.wait_for_peer( + spawn_cmd = [ + sys.executable, + "-m", + # Hardcode this (instead of using ``_child.__name__`` to avoid a + # double import warning: https://stackoverflow.com/a/45070583 + "tractor._child", + # We provide the child's unique identifier on this exec/spawn + # line for debugging purposes when viewing the process tree from + # the OS; it otherwise can be passed via the parent channel if + # we prefer in the future (for privacy). + "--uid", + str(uid), + # Address the child must connect to on startup + "--parent_addr", + str(parent_addr) + ] + + if subactor.loglevel: + spawn_cmd += [ + "--loglevel", + subactor.loglevel + ] + + # Tell child to run in guest mode on top of ``asyncio`` loop + if infect_asyncio: + spawn_cmd.append("--asyncio") + + proc = await trio.open_process(spawn_cmd) + + log.info(f"Started {proc}") + + portal: Optional[Portal] = None + try: + # wait for actor to spawn and connect back to us + # channel should have handshake completed by the + # local actor by the time we get a ref to it + event, chan = await actor_nursery._actor.wait_for_peer( + subactor.uid) + + except trio.Cancelled: + # reap un-contacted process which are started + # but never setup a connection to parent. + log.warning(f'Spawning aborted due to cancel {proc}') + with trio.CancelScope(shield=True): + await do_hard_kill(proc, 0.1) + + # this should break here + raise + + actor_nursery_cancel_called = None + portal = Portal(chan) + actor_nursery._children[subactor.uid] = ( + subactor, proc, portal) + + # track child in current nursery + curr_actor = current_actor() + curr_actor._actoruid2nursery[subactor.uid] = actor_nursery + + try: + # send additional init params + await chan.send({ + "_parent_main_data": subactor._parent_main_data, + "enable_modules": subactor.enable_modules, + "_arb_addr": subactor._arb_addr, + "bind_host": bind_addr[0], + "bind_port": bind_addr[1], + "_runtime_vars": _runtime_vars, + }) + + # resume caller at next checkpoint now that child is up + task_status.started(portal) + + # wait on actor nursery to complete + # with trio.CancelScope(shield=True): + + # this either completes or is cancelled + # and should only arrive once the actor nursery + # has errored or exitted. + await actor_nursery._join_procs.wait() + + except ( + trio.Cancelled, + # KeyboardInterrupt, + # required to collect errors from multiple subactors + trio.MultiError, + # RuntimeError, + ) as cerr: + actor_nursery_cancel_called = cerr + + if actor_nursery.cancelled: + log.cancel(f'{uid}: nursery cancelled before exit') + else: + log.error(f'Child {uid} was cancelled before nursery exit?') + + # we were specifically cancelled by our parent nursery + with trio.CancelScope(shield=True): + + if portal.channel.connected(): + log.cancel(f'Sending cancel IPC-msg to {uid}') + # try to cancel the actor @ IPC level + await portal.cancel_actor() + + finally: + # 2 cases: + # - actor nursery was cancelled in which case + # we want to try a soft reap of the actor via + # ipc cancellation and then failing that do a hard + # reap. + # - this is normal termination and we must wait indefinitely + # for ria and daemon actors + reaping_cancelled: bool = False + ria = portal in actor_nursery._cancel_after_result_on_exit + + # this is the soft reap sequence. we can + # either collect results: + # - ria actors get them them via ``Portal.result()`` + # - we wait forever on daemon actors until they're + # cancelled by user code via ``Portal.cancel_actor()`` + # or ``ActorNursery.cancel(). in the latter case + # we have to expect another cancel here since + # the task spawning nurseries will both be cacelled + # by ``ActorNursery.cancel()``. + + # OR, we're cancelled while collecting results, which + # case we need to try another soft cancel and reap attempt. + try: + log.cancel(f'Starting soft actor reap for {uid}') + cancel_scope = None + async with trio.open_nursery() as nursery: + if ria: + # collect any expected ``.run_in_actor()`` results + cancel_scope = await nursery.start( + result_from_portal, + portal, + subactor, + errors, + True, # cancel_on_result + ) + + # soft & cancellable + await reap_proc(proc) + + # if proc terminates before portal result + if cancel_scope: + cancel_scope.cancel() + + except ( + trio.Cancelled, + # is this required to collect errors from multiple subactors? + trio.MultiError, + ) as rerr: + # nursery was closed but was cancelled during normal + # reaping. + reaping_cancelled = rerr + + if actor_nursery.cancelled: + log.cancel(f'Nursery cancelled during soft reap for {uid}') + + # hard reap sequence + if proc.poll() is None: + log.cancel('Attempting hard reap for {uid}') + + # hard reap sequence + await maybe_wait_for_debugger() + + with trio.CancelScope(shield=True): + if portal.channel.connected(): + # cancel the process @ the IPC level + await portal.cancel_actor() + + # TODO: do we need to try the ria portals + # again? + # await result_from_portal( + # portal, + # subactor, + # errors + # ) + + # hard zombie lord reap, with timeout + await reap_proc( + proc, + terminate_after=2, + ) + finally: + # 2 cases: + # - the actor terminated gracefully + # - we're cancelled and likely need to re-raise + + while proc.poll() is None: + log.critical("ZOMBIE LORD HAS ARRIVED for your {proc}") + with trio.CancelScope(shield=True): + await reap_proc( + proc, + terminate_after=0.1, + ) + + log.info(f"Joined {proc}") + + # pop child entry to indicate we no longer managing this + # subactor + subactor, proc, portal = actor_nursery._children.pop( subactor.uid) - portal = Portal(chan) - actor_nursery._children[subactor.uid] = ( - subactor, proc, portal) + if not actor_nursery._children: + actor_nursery._all_children_reaped.set() - # send additional init params - await chan.send({ - "_parent_main_data": subactor._parent_main_data, - "enable_modules": subactor.enable_modules, - "_arb_addr": subactor._arb_addr, - "bind_host": bind_addr[0], - "bind_port": bind_addr[1], - "_runtime_vars": _runtime_vars, - }) + if actor_nursery_cancel_called: + raise actor_nursery_cancel_called - # track subactor in current nursery - curr_actor = current_actor() - curr_actor._actoruid2nursery[subactor.uid] = actor_nursery + if reaping_cancelled: + raise reaping_cancelled - # resume caller at next checkpoint now that child is up - task_status.started(portal) - - # wait for ActorNursery.wait() to be called - with trio.CancelScope(shield=True): - await actor_nursery._join_procs.wait() - - if portal in actor_nursery._cancel_after_result_on_exit: - cancel_scope = await nursery.start( - cancel_on_completion, - portal, - subactor, - errors - ) - - # Wait for proc termination but **dont' yet** call - # ``trio.Process.__aexit__()`` (it tears down stdio - # which will kill any waiting remote pdb trace). - - # TODO: No idea how we can enforce zombie - # reaping more stringently without the shield - # we used to have below... - - # with trio.CancelScope(shield=True): - # async with proc: - - # Always "hard" join sub procs since no actor zombies - # are allowed! - - # this is a "light" (cancellable) join, the hard join is - # in the enclosing scope (see above). - await proc.wait() - - log.debug(f"Joined {proc}") - # pop child entry to indicate we no longer managing this subactor - subactor, proc, portal = actor_nursery._children.pop(subactor.uid) - - # cancel result waiter that may have been spawned in - # tandem if not done already - if cancel_scope: - log.warning( - "Cancelling existing result waiter task for " - f"{subactor.uid}") - cancel_scope.cancel() else: # `multiprocessing` # async with trio.open_nursery() as nursery: @@ -430,6 +606,9 @@ async def mp_new_proc( # while user code is still doing it's thing. Only after the # nursery block closes do we allow subactor results to be # awaited and reported upwards to the supervisor. + + # no shield is required here (vs. above on the trio backend) + # since debug mode is not supported on mp. await actor_nursery._join_procs.wait() finally: @@ -446,7 +625,7 @@ async def mp_new_proc( # async with trio.open_nursery() as n: # n.cancel_scope.shield = True cancel_scope = await nursery.start( - cancel_on_completion, + result_from_portal, portal, subactor, errors