diff --git a/tractor/_spawn.py b/tractor/_spawn.py index a6edfec..b55cad7 100644 --- a/tractor/_spawn.py +++ b/tractor/_spawn.py @@ -1,5 +1,6 @@ """ Machinery for actor process spawning using multiple backends. + """ import sys import multiprocessing as mp @@ -21,10 +22,14 @@ from multiprocessing import forkserver # type: ignore from typing import Tuple from . import _forkserver_override -from ._debug import maybe_wait_for_debugger +from ._debug import ( + maybe_wait_for_debugger, + acquire_debug_lock, +) from ._state import ( current_actor, is_main_process, + is_root_process, ) from .log import get_logger @@ -153,12 +158,13 @@ async def cancel_on_completion( async def do_hard_kill( proc: trio.Process, + terminate_after: int = 3, ) -> None: # NOTE: this timeout used to do nothing since we were shielding # the ``.wait()`` inside ``new_proc()`` which will pretty much # never release until the process exits, now it acts as # a hard-kill time ultimatum. - with trio.move_on_after(3) as cs: + with trio.move_on_after(terminate_after) as cs: # NOTE: This ``__aexit__()`` shields internally. async with proc: # calls ``trio.Process.aclose()`` @@ -173,16 +179,20 @@ async def do_hard_kill( async def new_proc( + name: str, actor_nursery: 'ActorNursery', # type: ignore # noqa subactor: Actor, errors: Dict[Tuple[str, str], Exception], + # passed through to actor main bind_addr: Tuple[str, int], parent_addr: Tuple[str, int], _runtime_vars: Dict[str, Any], # serialized and sent to _child + *, task_status: TaskStatus[Portal] = trio.TASK_STATUS_IGNORED + ) -> None: """ Create a new ``multiprocessing.Process`` using the @@ -191,6 +201,7 @@ async def new_proc( """ # mark the new actor with the global spawn method subactor._spawn_method = _spawn_method + uid = subactor.uid if _spawn_method == 'trio': @@ -217,6 +228,7 @@ async def new_proc( subactor.loglevel ] + cancel_during_spawn: bool = False try: proc = await trio.open_process(spawn_cmd) @@ -225,8 +237,24 @@ async def new_proc( # wait for actor to spawn and connect back to us # channel should have handshake completed by the # local actor by the time we get a ref to it - event, chan = await actor_nursery._actor.wait_for_peer( - subactor.uid) + try: + event, chan = await actor_nursery._actor.wait_for_peer( + subactor.uid) + except trio.Cancelled: + cancel_during_spawn = True + # we may cancel before the child connects back in which + # case avoid clobbering the pdb tty. + with trio.CancelScope(shield=True): + # don't clobber an ongoing pdb + if is_root_process(): + await maybe_wait_for_debugger() + else: + async with acquire_debug_lock(): + # soft wait on the proc to terminate + with trio.move_on_after(0.5): + await proc.wait() + raise + portal = Portal(chan) actor_nursery._children[subactor.uid] = ( subactor, proc, portal) @@ -254,7 +282,6 @@ async def new_proc( async with trio.open_nursery() as nursery: if portal in actor_nursery._cancel_after_result_on_exit: - # cancel_scope = await nursery.start( nursery.start_soon( cancel_on_completion, portal, @@ -279,11 +306,20 @@ async def new_proc( # The "hard" reap since no actor zombies are allowed! # XXX: do this **after** cancellation/tearfown to avoid # killing the process too early. - if proc.poll() is None: - with trio.CancelScope(shield=True): - # don't clobber an ongoing pdb - await maybe_wait_for_debugger() + log.cancel(f'Hard reap sequence starting for {uid}') + with trio.CancelScope(shield=True): + # don't clobber an ongoing pdb + await maybe_wait_for_debugger() + + if cancel_during_spawn: + + # Try again to avoid TTY clobbering. + async with acquire_debug_lock(): + with trio.move_on_after(0.5): + await proc.wait() + + if proc.poll() is None: log.cancel(f"Attempting to hard kill {proc}") await do_hard_kill(proc)