From 87971de1d91dc70ce0e85b77d01ed97b0c3854db Mon Sep 17 00:00:00 2001 From: Tyler Goodlet Date: Thu, 6 May 2021 12:04:50 -0400 Subject: [PATCH] Re-raise any sidestepped `trio.Cancelled` --- tractor/_spawn.py | 28 ++++++++++++++++++---------- 1 file changed, 18 insertions(+), 10 deletions(-) diff --git a/tractor/_spawn.py b/tractor/_spawn.py index cb1f3b2..d893479 100644 --- a/tractor/_spawn.py +++ b/tractor/_spawn.py @@ -283,14 +283,15 @@ async def new_proc( await proc.wait() log.debug(f"Joined {proc}") - # pop child entry to indicate we are no longer managing this subactor + # pop child entry to indicate we no longer managing this subactor subactor, proc, portal = actor_nursery._children.pop(subactor.uid) # cancel result waiter that may have been spawned in # tandem if not done already if cancel_scope: log.warning( - f"Cancelling existing result waiter task for {subactor.uid}") + "Cancelling existing result waiter task for " + f"{subactor.uid}") cancel_scope.cancel() else: # `multiprocessing` @@ -409,8 +410,9 @@ async def mp_new_proc( # registered itself back we must be sure to try and clean # any process we may have started. - reaping_cancelled = False - cancel_scope = None + reaping_cancelled: bool = False + cancel_scope: Optional[trio.CancelScope] = None + cancel_exc: Optional[trio.Cancelled] = None if portal in actor_nursery._cancel_after_result_on_exit: try: @@ -422,7 +424,9 @@ async def mp_new_proc( subactor, errors ) - except trio.Cancelled: + except trio.Cancelled as err: + cancel_exc = err + # if the reaping task was cancelled we may have hit # a race where the subproc disconnected before we # could send it a message to cancel (classic 2 generals) @@ -437,20 +441,24 @@ async def mp_new_proc( if cs.cancelled_caught: proc.terminate() - if not reaping_cancelled: - if proc.is_alive(): - await proc_waiter(proc) + if not reaping_cancelled and proc.is_alive(): + await proc_waiter(proc) # TODO: timeout block here? proc.join() log.debug(f"Joined {proc}") - # pop child entry to indicate we are no longer managing this subactor + # pop child entry to indicate we are no longer managing subactor subactor, proc, portal = actor_nursery._children.pop(subactor.uid) # cancel result waiter that may have been spawned in # tandem if not done already if cancel_scope: log.warning( - f"Cancelling existing result waiter task for {subactor.uid}") + "Cancelling existing result waiter task for " + f"{subactor.uid}") cancel_scope.cancel() + + elif reaping_cancelled: # let the cancellation bubble up + assert cancel_exc + raise cancel_exc