Re-raise any sidestepped `trio.Cancelled`

mp_teardown_hardening
Tyler Goodlet 2021-05-06 12:04:50 -04:00
parent 9f38406e85
commit 87971de1d9
1 changed files with 18 additions and 10 deletions

View File

@ -283,14 +283,15 @@ async def new_proc(
await proc.wait() await proc.wait()
log.debug(f"Joined {proc}") log.debug(f"Joined {proc}")
# pop child entry to indicate we are no longer managing this subactor # pop child entry to indicate we no longer managing this subactor
subactor, proc, portal = actor_nursery._children.pop(subactor.uid) subactor, proc, portal = actor_nursery._children.pop(subactor.uid)
# cancel result waiter that may have been spawned in # cancel result waiter that may have been spawned in
# tandem if not done already # tandem if not done already
if cancel_scope: if cancel_scope:
log.warning( log.warning(
f"Cancelling existing result waiter task for {subactor.uid}") "Cancelling existing result waiter task for "
f"{subactor.uid}")
cancel_scope.cancel() cancel_scope.cancel()
else: else:
# `multiprocessing` # `multiprocessing`
@ -409,8 +410,9 @@ async def mp_new_proc(
# registered itself back we must be sure to try and clean # registered itself back we must be sure to try and clean
# any process we may have started. # any process we may have started.
reaping_cancelled = False reaping_cancelled: bool = False
cancel_scope = None cancel_scope: Optional[trio.CancelScope] = None
cancel_exc: Optional[trio.Cancelled] = None
if portal in actor_nursery._cancel_after_result_on_exit: if portal in actor_nursery._cancel_after_result_on_exit:
try: try:
@ -422,7 +424,9 @@ async def mp_new_proc(
subactor, subactor,
errors errors
) )
except trio.Cancelled: except trio.Cancelled as err:
cancel_exc = err
# if the reaping task was cancelled we may have hit # if the reaping task was cancelled we may have hit
# a race where the subproc disconnected before we # a race where the subproc disconnected before we
# could send it a message to cancel (classic 2 generals) # could send it a message to cancel (classic 2 generals)
@ -437,20 +441,24 @@ async def mp_new_proc(
if cs.cancelled_caught: if cs.cancelled_caught:
proc.terminate() proc.terminate()
if not reaping_cancelled: if not reaping_cancelled and proc.is_alive():
if proc.is_alive():
await proc_waiter(proc) await proc_waiter(proc)
# TODO: timeout block here? # TODO: timeout block here?
proc.join() proc.join()
log.debug(f"Joined {proc}") log.debug(f"Joined {proc}")
# pop child entry to indicate we are no longer managing this subactor # pop child entry to indicate we are no longer managing subactor
subactor, proc, portal = actor_nursery._children.pop(subactor.uid) subactor, proc, portal = actor_nursery._children.pop(subactor.uid)
# cancel result waiter that may have been spawned in # cancel result waiter that may have been spawned in
# tandem if not done already # tandem if not done already
if cancel_scope: if cancel_scope:
log.warning( log.warning(
f"Cancelling existing result waiter task for {subactor.uid}") "Cancelling existing result waiter task for "
f"{subactor.uid}")
cancel_scope.cancel() cancel_scope.cancel()
elif reaping_cancelled: # let the cancellation bubble up
assert cancel_exc
raise cancel_exc