From 0dcffeee0fefd8900bd4b81f645b8ca975940741 Mon Sep 17 00:00:00 2001 From: Tyler Goodlet Date: Sun, 10 Oct 2021 12:28:42 -0400 Subject: [PATCH] Tweaks to get us down to 3 failed cancel tests The remaining errors all have to do with not getting the exact same format as previous of collected `.run_in_actor()` errors as `MultiError`s. Not even sure at this point if the whole collect single task results and bubble should be a thing but trying to keep the support for now I guess. There's still issues with a hang in the pub sub tests and the one debugger test has a different outcome due to the root getting the lock from the breakpoint forever child too quickly. - go back to raising portal result-that-are-errors in the spawn task - go back to shielding the nursery close / proc join event - report any error on this shielded join and relay to nursery handler method (which should be customizable in the future for alternate strats then OCA) as well try to collect ria (run in actor) result - drop async (via nursery) ria result collection, just do it sync with the soft `proc.wait()` reap immediately after, which should work presuming that the ipc connection will break on process termination anyway and it'll mean no multierror to deal with and no cancel scope to manage on the ria reaper task. --- tractor/_spawn.py | 200 ++++++++++++++++++++++++++++------------------ 1 file changed, 123 insertions(+), 77 deletions(-) diff --git a/tractor/_spawn.py b/tractor/_spawn.py index 8a2e7f4..38298c5 100644 --- a/tractor/_spawn.py +++ b/tractor/_spawn.py @@ -31,7 +31,7 @@ from .log import get_logger from ._portal import Portal from ._actor import Actor from ._entry import _mp_main -from ._exceptions import ActorFailure +from ._exceptions import ActorFailure, RemoteActorError from ._debug import maybe_wait_for_debugger @@ -109,6 +109,8 @@ async def result_from_portal( """ __tracebackhide__ = True + uid = portal.channel.uid + # cancel control is explicityl done by the caller with trio.CancelScope() as cs: task_status.started(cs) @@ -129,12 +131,12 @@ async def result_from_portal( # we reraise in the parent task via a ``trio.MultiError`` result = err errors[actor.uid] = err - # raise + raise except trio.Cancelled as err: # lol, of course we need this too ;P # TODO: merge with above? - log.warning(f"Cancelled result waiter for {portal.channel.uid}") + log.warning(f"Cancelled `Portal.result()` waiter for {uid}") result = err # errors[actor.uid] = err # raise @@ -143,14 +145,14 @@ async def result_from_portal( if isinstance(result, Exception): # errors[actor.uid] = result log.warning( - f"Cancelling {portal.channel.uid} after error {result}" + f"Cancelling single-task-run {uid} after error {result}" ) - raise result + # raise result else: log.runtime( - f"Cancelling {portal.channel.uid} gracefully " - f"after result {result}") + f"Cancelling {uid} gracefully " + f"after one-time-task result {result}") # an actor that was `.run_in_actor()` executes a single task # and delivers the result, then we cancel it. @@ -185,9 +187,13 @@ async def do_hard_kill( # would tear down stdstreams via ``trio.Process.aclose()``. async with proc: log.debug(f"Terminating {proc}") - # proc.terminate() if cs.cancelled_caught: + + # this is a "softer" kill that we should probably use + # eventually and let the zombie lord do the `.kill()` + # proc.terminate() + # XXX: should pretty much never get here unless we have # to move the bits from ``proc.__aexit__()`` out and # into here. @@ -284,8 +290,10 @@ async def new_proc( proc = await trio.open_process(spawn_cmd) log.info(f"Started {proc}") - portal: Optional[Portal] = None + + # handle cancellation during child connect-back, kill + # any cancelled spawn sequence immediately. try: # wait for actor to spawn and connect back to us # channel should have handshake completed by the @@ -300,9 +308,11 @@ async def new_proc( with trio.CancelScope(shield=True): await do_hard_kill(proc, 0.1) - # this should break here + # TODO: should we have a custom error for this maybe derived + # from ``subprocess``? raise + # the child successfully connected back to us. actor_nursery_cancel_called = None portal = Portal(chan) actor_nursery._children[subactor.uid] = ( @@ -326,35 +336,36 @@ async def new_proc( # resume caller at next checkpoint now that child is up task_status.started(portal) - # wait on actor nursery to complete - # with trio.CancelScope(shield=True): - - # this either completes or is cancelled - # and should only arrive once the actor nursery - # has errored or exitted. - await actor_nursery._join_procs.wait() + # this either completes or is cancelled and should only + # **and always** be set once the actor nursery has errored + # or exitted. + with trio.CancelScope(shield=True): + await actor_nursery._join_procs.wait() except ( - trio.Cancelled, + BaseException + # trio.Cancelled, # KeyboardInterrupt, - # required to collect errors from multiple subactors - trio.MultiError, + # trio.MultiError, # RuntimeError, ) as cerr: - actor_nursery_cancel_called = cerr - if actor_nursery.cancelled: - log.cancel(f'{uid}: nursery cancelled before exit') - else: - log.error(f'Child {uid} was cancelled before nursery exit?') + log.exception(f'Relaying unexpected {cerr} to nursery') - # we were specifically cancelled by our parent nursery + # sending IPC-msg level cancel requests is expected to be + # managed by the nursery. with trio.CancelScope(shield=True): + await actor_nursery._handle_err(err, portal=portal) - if portal.channel.connected(): - log.cancel(f'Sending cancel IPC-msg to {uid}') - # try to cancel the actor @ IPC level - await portal.cancel_actor() + if portal.channel.connected(): + if ria: + # this may raise which we want right? + await result_from_portal( + portal, + subactor, + errors, + # True, # cancel_on_result + ) finally: # 2 cases: @@ -382,68 +393,98 @@ async def new_proc( try: log.cancel(f'Starting soft actor reap for {uid}') cancel_scope = None - async with trio.open_nursery() as nursery: - if ria: - # collect any expected ``.run_in_actor()`` results - cancel_scope = await nursery.start( - result_from_portal, - portal, - subactor, - errors, - True, # cancel_on_result - ) + # async with trio.open_nursery() as nursery: - # soft & cancellable - await reap_proc(proc) + if portal.channel.connected() and ria: - # if proc terminates before portal result - if cancel_scope: - cancel_scope.cancel() + # we wait for result and cancel on completion + await result_from_portal( + portal, + subactor, + errors, + True, # cancel_on_result + ) + # # collect any expected ``.run_in_actor()`` results + # cancel_scope = await nursery.start( + # result_from_portal, + # portal, + # subactor, + # errors, + # True, # cancel_on_result + # ) + + # soft & cancellable + await reap_proc(proc) + + # # if proc terminates before portal result + # if cancel_scope: + # cancel_scope.cancel() + + except ( + RemoteActorError, + ) as err: + reaping_cancelled = err + log.exception(f'{uid} remote error') + await actor_nursery._handle_err(err, portal=portal) except ( trio.Cancelled, - # is this required to collect errors from multiple subactors? - trio.MultiError, - ) as rerr: - # nursery was closed but was cancelled during normal - # reaping. - reaping_cancelled = rerr - + ) as err: + reaping_cancelled = err if actor_nursery.cancelled: - log.cancel(f'Nursery cancelled during soft reap for {uid}') + log.cancel(f'{uid} wait cancelled by nursery') + else: + log.exception(f'{uid} soft wait error?') - # hard reap sequence - if proc.poll() is None: - log.cancel('Attempting hard reap for {uid}') + except ( + BaseException + ) as err: + reaping_cancelled = err + log.exception(f'{uid} soft reap local error') - # hard reap sequence + finally: + if reaping_cancelled: + if actor_nursery.cancelled: + log.cancel(f'Nursery cancelled during soft wait for {uid}') + + with trio.CancelScope(shield=True): await maybe_wait_for_debugger() + # XXX: can't do this, it'll hang some tests.. no + # idea why yet. + # with trio.CancelScope(shield=True): + # await actor_nursery._handle_err( + # reaping_cancelled, + # portal=portal + # ) + + # hard reap sequence with timeouts + if proc.poll() is None: + log.cancel(f'Attempting hard reap for {uid}') + with trio.CancelScope(shield=True): - if portal.channel.connected(): - # cancel the process @ the IPC level - await portal.cancel_actor() - # TODO: do we need to try the ria portals - # again? - # await result_from_portal( - # portal, - # subactor, - # errors - # ) + # hard reap sequence + # ``Portal.cancel_actor()`` is expected to have + # been called by the supervising nursery so we + # do **not** call it here. - # hard zombie lord reap, with timeout await reap_proc( proc, - terminate_after=2, + # this is the same as previous timeout + # setting before rewriting this spawn + # section + terminate_after=3, ) - finally: - # 2 cases: - # - the actor terminated gracefully - # - we're cancelled and likely need to re-raise + + # if somehow the hard reap didn't collect the child then + # we send in the big gunz. while proc.poll() is None: - log.critical("ZOMBIE LORD HAS ARRIVED for your {proc}") + log.critical( + f'ZOMBIE LORD HAS ARRIVED for your {uid}:\n' + f'{proc}' + ) with trio.CancelScope(shield=True): await reap_proc( proc, @@ -452,16 +493,21 @@ async def new_proc( log.info(f"Joined {proc}") + # 2 cases: + # - the actor terminated gracefully + # - we're cancelled and likely need to re-raise + # pop child entry to indicate we no longer managing this # subactor subactor, proc, portal = actor_nursery._children.pop( subactor.uid) if not actor_nursery._children: + log.cancel(f"{uid} reports all children complete!") actor_nursery._all_children_reaped.set() - if actor_nursery_cancel_called: - raise actor_nursery_cancel_called - + # not entirely sure why we need this.. but without it + # the reaping cancelled error is never reported upwards + # to the spawn nursery? if reaping_cancelled: raise reaping_cancelled