Handle depth > 1 nursery owners which use debug mode

immediate_remote_cancels
Tyler Goodlet 2021-10-13 23:33:31 -04:00
parent 4b2710b8a5
commit daa28ea0e9
1 changed files with 45 additions and 9 deletions

View File

@ -1,5 +1,6 @@
""" """
Machinery for actor process spawning using multiple backends. Machinery for actor process spawning using multiple backends.
""" """
import sys import sys
import multiprocessing as mp import multiprocessing as mp
@ -21,10 +22,14 @@ from multiprocessing import forkserver # type: ignore
from typing import Tuple from typing import Tuple
from . import _forkserver_override from . import _forkserver_override
from ._debug import maybe_wait_for_debugger from ._debug import (
maybe_wait_for_debugger,
acquire_debug_lock,
)
from ._state import ( from ._state import (
current_actor, current_actor,
is_main_process, is_main_process,
is_root_process,
) )
from .log import get_logger from .log import get_logger
@ -153,12 +158,13 @@ async def cancel_on_completion(
async def do_hard_kill( async def do_hard_kill(
proc: trio.Process, proc: trio.Process,
terminate_after: int = 3,
) -> None: ) -> None:
# NOTE: this timeout used to do nothing since we were shielding # NOTE: this timeout used to do nothing since we were shielding
# the ``.wait()`` inside ``new_proc()`` which will pretty much # the ``.wait()`` inside ``new_proc()`` which will pretty much
# never release until the process exits, now it acts as # never release until the process exits, now it acts as
# a hard-kill time ultimatum. # a hard-kill time ultimatum.
with trio.move_on_after(3) as cs: with trio.move_on_after(terminate_after) as cs:
# NOTE: This ``__aexit__()`` shields internally. # NOTE: This ``__aexit__()`` shields internally.
async with proc: # calls ``trio.Process.aclose()`` async with proc: # calls ``trio.Process.aclose()``
@ -173,16 +179,20 @@ async def do_hard_kill(
async def new_proc( async def new_proc(
name: str, name: str,
actor_nursery: 'ActorNursery', # type: ignore # noqa actor_nursery: 'ActorNursery', # type: ignore # noqa
subactor: Actor, subactor: Actor,
errors: Dict[Tuple[str, str], Exception], errors: Dict[Tuple[str, str], Exception],
# passed through to actor main # passed through to actor main
bind_addr: Tuple[str, int], bind_addr: Tuple[str, int],
parent_addr: Tuple[str, int], parent_addr: Tuple[str, int],
_runtime_vars: Dict[str, Any], # serialized and sent to _child _runtime_vars: Dict[str, Any], # serialized and sent to _child
*, *,
task_status: TaskStatus[Portal] = trio.TASK_STATUS_IGNORED task_status: TaskStatus[Portal] = trio.TASK_STATUS_IGNORED
) -> None: ) -> None:
""" """
Create a new ``multiprocessing.Process`` using the Create a new ``multiprocessing.Process`` using the
@ -191,6 +201,7 @@ async def new_proc(
""" """
# mark the new actor with the global spawn method # mark the new actor with the global spawn method
subactor._spawn_method = _spawn_method subactor._spawn_method = _spawn_method
uid = subactor.uid
if _spawn_method == 'trio': if _spawn_method == 'trio':
@ -217,6 +228,7 @@ async def new_proc(
subactor.loglevel subactor.loglevel
] ]
cancel_during_spawn: bool = False
try: try:
proc = await trio.open_process(spawn_cmd) proc = await trio.open_process(spawn_cmd)
@ -225,8 +237,24 @@ async def new_proc(
# wait for actor to spawn and connect back to us # wait for actor to spawn and connect back to us
# channel should have handshake completed by the # channel should have handshake completed by the
# local actor by the time we get a ref to it # local actor by the time we get a ref to it
try:
event, chan = await actor_nursery._actor.wait_for_peer( event, chan = await actor_nursery._actor.wait_for_peer(
subactor.uid) subactor.uid)
except trio.Cancelled:
cancel_during_spawn = True
# we may cancel before the child connects back in which
# case avoid clobbering the pdb tty.
with trio.CancelScope(shield=True):
# don't clobber an ongoing pdb
if is_root_process():
await maybe_wait_for_debugger()
else:
async with acquire_debug_lock():
# soft wait on the proc to terminate
with trio.move_on_after(0.5):
await proc.wait()
raise
portal = Portal(chan) portal = Portal(chan)
actor_nursery._children[subactor.uid] = ( actor_nursery._children[subactor.uid] = (
subactor, proc, portal) subactor, proc, portal)
@ -254,7 +282,6 @@ async def new_proc(
async with trio.open_nursery() as nursery: async with trio.open_nursery() as nursery:
if portal in actor_nursery._cancel_after_result_on_exit: if portal in actor_nursery._cancel_after_result_on_exit:
# cancel_scope = await nursery.start(
nursery.start_soon( nursery.start_soon(
cancel_on_completion, cancel_on_completion,
portal, portal,
@ -279,11 +306,20 @@ async def new_proc(
# The "hard" reap since no actor zombies are allowed! # The "hard" reap since no actor zombies are allowed!
# XXX: do this **after** cancellation/tearfown to avoid # XXX: do this **after** cancellation/tearfown to avoid
# killing the process too early. # killing the process too early.
if proc.poll() is None: log.cancel(f'Hard reap sequence starting for {uid}')
with trio.CancelScope(shield=True): with trio.CancelScope(shield=True):
# don't clobber an ongoing pdb # don't clobber an ongoing pdb
await maybe_wait_for_debugger() await maybe_wait_for_debugger()
if cancel_during_spawn:
# Try again to avoid TTY clobbering.
async with acquire_debug_lock():
with trio.move_on_after(0.5):
await proc.wait()
if proc.poll() is None:
log.cancel(f"Attempting to hard kill {proc}") log.cancel(f"Attempting to hard kill {proc}")
await do_hard_kill(proc) await do_hard_kill(proc)