diff --git a/tractor/_spawn.py b/tractor/_spawn.py index 8a2e7f4..38298c5 100644 --- a/tractor/_spawn.py +++ b/tractor/_spawn.py @@ -31,7 +31,7 @@ from .log import get_logger from ._portal import Portal from ._actor import Actor from ._entry import _mp_main -from ._exceptions import ActorFailure +from ._exceptions import ActorFailure, RemoteActorError from ._debug import maybe_wait_for_debugger @@ -109,6 +109,8 @@ async def result_from_portal( """ __tracebackhide__ = True + uid = portal.channel.uid + # cancel control is explicityl done by the caller with trio.CancelScope() as cs: task_status.started(cs) @@ -129,12 +131,12 @@ async def result_from_portal( # we reraise in the parent task via a ``trio.MultiError`` result = err errors[actor.uid] = err - # raise + raise except trio.Cancelled as err: # lol, of course we need this too ;P # TODO: merge with above? - log.warning(f"Cancelled result waiter for {portal.channel.uid}") + log.warning(f"Cancelled `Portal.result()` waiter for {uid}") result = err # errors[actor.uid] = err # raise @@ -143,14 +145,14 @@ async def result_from_portal( if isinstance(result, Exception): # errors[actor.uid] = result log.warning( - f"Cancelling {portal.channel.uid} after error {result}" + f"Cancelling single-task-run {uid} after error {result}" ) - raise result + # raise result else: log.runtime( - f"Cancelling {portal.channel.uid} gracefully " - f"after result {result}") + f"Cancelling {uid} gracefully " + f"after one-time-task result {result}") # an actor that was `.run_in_actor()` executes a single task # and delivers the result, then we cancel it. @@ -185,9 +187,13 @@ async def do_hard_kill( # would tear down stdstreams via ``trio.Process.aclose()``. async with proc: log.debug(f"Terminating {proc}") - # proc.terminate() if cs.cancelled_caught: + + # this is a "softer" kill that we should probably use + # eventually and let the zombie lord do the `.kill()` + # proc.terminate() + # XXX: should pretty much never get here unless we have # to move the bits from ``proc.__aexit__()`` out and # into here. @@ -284,8 +290,10 @@ async def new_proc( proc = await trio.open_process(spawn_cmd) log.info(f"Started {proc}") - portal: Optional[Portal] = None + + # handle cancellation during child connect-back, kill + # any cancelled spawn sequence immediately. try: # wait for actor to spawn and connect back to us # channel should have handshake completed by the @@ -300,9 +308,11 @@ async def new_proc( with trio.CancelScope(shield=True): await do_hard_kill(proc, 0.1) - # this should break here + # TODO: should we have a custom error for this maybe derived + # from ``subprocess``? raise + # the child successfully connected back to us. actor_nursery_cancel_called = None portal = Portal(chan) actor_nursery._children[subactor.uid] = ( @@ -326,35 +336,36 @@ async def new_proc( # resume caller at next checkpoint now that child is up task_status.started(portal) - # wait on actor nursery to complete - # with trio.CancelScope(shield=True): - - # this either completes or is cancelled - # and should only arrive once the actor nursery - # has errored or exitted. - await actor_nursery._join_procs.wait() + # this either completes or is cancelled and should only + # **and always** be set once the actor nursery has errored + # or exitted. + with trio.CancelScope(shield=True): + await actor_nursery._join_procs.wait() except ( - trio.Cancelled, + BaseException + # trio.Cancelled, # KeyboardInterrupt, - # required to collect errors from multiple subactors - trio.MultiError, + # trio.MultiError, # RuntimeError, ) as cerr: - actor_nursery_cancel_called = cerr - if actor_nursery.cancelled: - log.cancel(f'{uid}: nursery cancelled before exit') - else: - log.error(f'Child {uid} was cancelled before nursery exit?') + log.exception(f'Relaying unexpected {cerr} to nursery') - # we were specifically cancelled by our parent nursery + # sending IPC-msg level cancel requests is expected to be + # managed by the nursery. with trio.CancelScope(shield=True): + await actor_nursery._handle_err(err, portal=portal) - if portal.channel.connected(): - log.cancel(f'Sending cancel IPC-msg to {uid}') - # try to cancel the actor @ IPC level - await portal.cancel_actor() + if portal.channel.connected(): + if ria: + # this may raise which we want right? + await result_from_portal( + portal, + subactor, + errors, + # True, # cancel_on_result + ) finally: # 2 cases: @@ -382,68 +393,98 @@ async def new_proc( try: log.cancel(f'Starting soft actor reap for {uid}') cancel_scope = None - async with trio.open_nursery() as nursery: - if ria: - # collect any expected ``.run_in_actor()`` results - cancel_scope = await nursery.start( - result_from_portal, - portal, - subactor, - errors, - True, # cancel_on_result - ) + # async with trio.open_nursery() as nursery: - # soft & cancellable - await reap_proc(proc) + if portal.channel.connected() and ria: - # if proc terminates before portal result - if cancel_scope: - cancel_scope.cancel() + # we wait for result and cancel on completion + await result_from_portal( + portal, + subactor, + errors, + True, # cancel_on_result + ) + # # collect any expected ``.run_in_actor()`` results + # cancel_scope = await nursery.start( + # result_from_portal, + # portal, + # subactor, + # errors, + # True, # cancel_on_result + # ) + + # soft & cancellable + await reap_proc(proc) + + # # if proc terminates before portal result + # if cancel_scope: + # cancel_scope.cancel() + + except ( + RemoteActorError, + ) as err: + reaping_cancelled = err + log.exception(f'{uid} remote error') + await actor_nursery._handle_err(err, portal=portal) except ( trio.Cancelled, - # is this required to collect errors from multiple subactors? - trio.MultiError, - ) as rerr: - # nursery was closed but was cancelled during normal - # reaping. - reaping_cancelled = rerr - + ) as err: + reaping_cancelled = err if actor_nursery.cancelled: - log.cancel(f'Nursery cancelled during soft reap for {uid}') + log.cancel(f'{uid} wait cancelled by nursery') + else: + log.exception(f'{uid} soft wait error?') - # hard reap sequence - if proc.poll() is None: - log.cancel('Attempting hard reap for {uid}') + except ( + BaseException + ) as err: + reaping_cancelled = err + log.exception(f'{uid} soft reap local error') - # hard reap sequence + finally: + if reaping_cancelled: + if actor_nursery.cancelled: + log.cancel(f'Nursery cancelled during soft wait for {uid}') + + with trio.CancelScope(shield=True): await maybe_wait_for_debugger() + # XXX: can't do this, it'll hang some tests.. no + # idea why yet. + # with trio.CancelScope(shield=True): + # await actor_nursery._handle_err( + # reaping_cancelled, + # portal=portal + # ) + + # hard reap sequence with timeouts + if proc.poll() is None: + log.cancel(f'Attempting hard reap for {uid}') + with trio.CancelScope(shield=True): - if portal.channel.connected(): - # cancel the process @ the IPC level - await portal.cancel_actor() - # TODO: do we need to try the ria portals - # again? - # await result_from_portal( - # portal, - # subactor, - # errors - # ) + # hard reap sequence + # ``Portal.cancel_actor()`` is expected to have + # been called by the supervising nursery so we + # do **not** call it here. - # hard zombie lord reap, with timeout await reap_proc( proc, - terminate_after=2, + # this is the same as previous timeout + # setting before rewriting this spawn + # section + terminate_after=3, ) - finally: - # 2 cases: - # - the actor terminated gracefully - # - we're cancelled and likely need to re-raise + + # if somehow the hard reap didn't collect the child then + # we send in the big gunz. while proc.poll() is None: - log.critical("ZOMBIE LORD HAS ARRIVED for your {proc}") + log.critical( + f'ZOMBIE LORD HAS ARRIVED for your {uid}:\n' + f'{proc}' + ) with trio.CancelScope(shield=True): await reap_proc( proc, @@ -452,16 +493,21 @@ async def new_proc( log.info(f"Joined {proc}") + # 2 cases: + # - the actor terminated gracefully + # - we're cancelled and likely need to re-raise + # pop child entry to indicate we no longer managing this # subactor subactor, proc, portal = actor_nursery._children.pop( subactor.uid) if not actor_nursery._children: + log.cancel(f"{uid} reports all children complete!") actor_nursery._all_children_reaped.set() - if actor_nursery_cancel_called: - raise actor_nursery_cancel_called - + # not entirely sure why we need this.. but without it + # the reaping cancelled error is never reported upwards + # to the spawn nursery? if reaping_cancelled: raise reaping_cancelled