From 46070f99ded3d756c2e63b3fedbe04e755949d1d Mon Sep 17 00:00:00 2001 From: Tyler Goodlet Date: Wed, 1 Dec 2021 22:41:10 -0500 Subject: [PATCH] Factor soft-wait logic into a helper, use with mp --- tractor/_spawn.py | 78 +++++++++++++++++++++++++++++++++-------------- 1 file changed, 55 insertions(+), 23 deletions(-) diff --git a/tractor/_spawn.py b/tractor/_spawn.py index 3857f36..d42a63a 100644 --- a/tractor/_spawn.py +++ b/tractor/_spawn.py @@ -5,7 +5,7 @@ Machinery for actor process spawning using multiple backends. import sys import multiprocessing as mp import platform -from typing import Any, Dict, Optional +from typing import Any, Dict, Optional, Union, Callable import trio from trio_typing import TaskStatus @@ -179,10 +179,38 @@ async def do_hard_kill( # XXX: should pretty much never get here unless we have # to move the bits from ``proc.__aexit__()`` out and # into here. - log.critical(f"HARD KILLING {proc}") + log.critical(f"#ZOMBIE_LORD_IS_HERE: {proc}") proc.kill() +async def soft_wait( + + proc: Union[mp.Process, trio.Process], + wait_func: Callable[ + Union[mp.Process, trio.Process], + None, + ], + portal: Portal, + +) -> None: + # Wait for proc termination but **dont' yet** call + # ``trio.Process.__aexit__()`` (it tears down stdio + # which will kill any waiting remote pdb trace). + # This is a "soft" (cancellable) join/reap. + try: + # await proc.wait() + await wait_func(proc) + except trio.Cancelled: + # if cancelled during a soft wait, cancel the child + # actor before entering the hard reap sequence + # below. This means we try to do a graceful teardown + # via sending a cancel message before getting out + # zombie killing tools. + with trio.CancelScope(shield=True): + await portal.cancel_actor() + raise + + async def new_proc( name: str, @@ -199,11 +227,14 @@ async def new_proc( task_status: TaskStatus[Portal] = trio.TASK_STATUS_IGNORED ) -> None: - """ - Create a new ``multiprocessing.Process`` using the - spawn method as configured using ``try_set_start_method()``. + ''' + Create a new ``Process`` using a "spawn method" as (configured using + ``try_set_start_method()``). - """ + This routine should be started in a actor runtime task and the logic + here is to be considered the core supervision strategy. + + ''' # mark the new actor with the global spawn method subactor._spawn_method = _spawn_method uid = subactor.uid @@ -298,21 +329,14 @@ async def new_proc( errors ) - # Wait for proc termination but **dont' yet** call - # ``trio.Process.__aexit__()`` (it tears down stdio - # which will kill any waiting remote pdb trace). - # This is a "soft" (cancellable) join/reap. - try: - await proc.wait() - except trio.Cancelled: - # if cancelled during a soft wait, cancel the child - # actor before entering the hard reap sequence - # below. This means we try to do a graceful teardown - # via sending a cancel message before getting out - # zombie killing tools. - with trio.CancelScope(shield=True): - await portal.cancel_actor() - raise + # This is a "soft" (cancellable) join/reap which + # will remote cancel the actor on a ``trio.Cancelled`` + # condition. + await soft_wait( + proc, + trio.Process.wait, + portal + ) # cancel result waiter that may have been spawned in # tandem if not done already @@ -346,7 +370,7 @@ async def new_proc( log.debug(f"Joined {proc}") else: - log.warning(f'Nursery cancelled before sub-proc started') + log.warning('Nursery cancelled before sub-proc started') if not cancelled_during_spawn: # pop child entry to indicate we no longer managing this @@ -361,6 +385,7 @@ async def new_proc( actor_nursery=actor_nursery, subactor=subactor, errors=errors, + # passed through to actor main bind_addr=bind_addr, parent_addr=parent_addr, @@ -479,7 +504,14 @@ async def mp_new_proc( errors ) - await proc_waiter(proc) + # This is a "soft" (cancellable) join/reap which + # will remote cancel the actor on a ``trio.Cancelled`` + # condition. + await soft_wait( + proc, + proc_waiter, + portal + ) # cancel result waiter that may have been spawned in # tandem if not done already