Don't kill root's immediate children when in debug

If the root calls `trio.Process.kill()` on immediate child proc teardown
when the child is using pdb, we can get stdstreams clobbering that
results in a pdb++ repl where the user can't see what's been typed. Not
killing such children on cancellation / error seems to resolve this
issue whilst still giving reliable termination. For now, code that
special path until a time it becomes a problem for ensuring zombie
reaps.
wip_fix_asyncio_gen_streaming
Tyler Goodlet 2021-05-10 07:47:38 -04:00
parent 682ef3425f
commit 19825ba284
1 changed files with 44 additions and 18 deletions

View File

@ -22,7 +22,13 @@ from multiprocessing import forkserver # type: ignore
from typing import Tuple from typing import Tuple
from . import _forkserver_override from . import _forkserver_override
from ._state import current_actor, is_main_process from ._state import (
current_actor,
is_main_process,
is_root_process,
_runtime_vars,
)
from .log import get_logger from .log import get_logger
from ._portal import Portal from ._portal import Portal
from ._actor import Actor, ActorFailure from ._actor import Actor, ActorFailure
@ -180,23 +186,45 @@ async def spawn_subactor(
proc = await trio.open_process(spawn_cmd) proc = await trio.open_process(spawn_cmd)
try: try:
yield proc yield proc
finally: finally:
log.debug(f"Attempting to kill {proc}")
# XXX: do this **after** cancellation/tearfown # XXX: do this **after** cancellation/tearfown
# to avoid killing the process too early # to avoid killing the process too early
# since trio does this internally on ``__aexit__()`` # since trio does this internally on ``__aexit__()``
# NOTE: we always "shield" join sub procs in if (
# the outer scope since no actor zombies are is_root_process()
# ever allowed. This ``__aexit__()`` also shields
# internally.
log.debug(f"Attempting to kill {proc}")
# NOTE: this timeout effectively does nothing right now since # XXX: basically the pre-closing of stdstreams in a
# we are shielding the ``.wait()`` inside ``new_proc()`` which # root-processe's ``trio.Process.aclose()`` can clobber
# will pretty much never release until the process exits. # any existing debugger session so we avoid
and _runtime_vars['_debug_mode']
):
# XXX: this is ``trio.Process.aclose()`` minus
# the std-streams pre-closing steps and ``Process.kill()``
# calls.
try:
await proc.wait()
finally:
if proc.returncode is None:
# XXX: skip this when in debug and a session might
# still be live
# proc.kill()
with trio.CancelScope(shield=True):
await proc.wait()
else:
# NOTE: this timeout used to do nothing since we were shielding
# the ``.wait()`` inside ``new_proc()`` which will pretty much
# never release until the process exits, now it acts as
# a hard-kill time ultimatum.
with trio.move_on_after(3) as cs: with trio.move_on_after(3) as cs:
async with proc:
# NOTE: This ``__aexit__()`` shields internally.
async with proc: # calls ``trio.Process.aclose()``
log.debug(f"Terminating {proc}") log.debug(f"Terminating {proc}")
if cs.cancelled_caught: if cs.cancelled_caught:
log.critical(f"HARD KILLING {proc}") log.critical(f"HARD KILLING {proc}")
proc.kill() proc.kill()
@ -212,7 +240,6 @@ async def new_proc(
parent_addr: Tuple[str, int], parent_addr: Tuple[str, int],
_runtime_vars: Dict[str, Any], # serialized and sent to _child _runtime_vars: Dict[str, Any], # serialized and sent to _child
*, *,
use_trio_run_in_process: bool = False,
task_status: TaskStatus[Portal] = trio.TASK_STATUS_IGNORED task_status: TaskStatus[Portal] = trio.TASK_STATUS_IGNORED
) -> None: ) -> None:
"""Create a new ``multiprocessing.Process`` using the """Create a new ``multiprocessing.Process`` using the
@ -223,7 +250,7 @@ async def new_proc(
# mark the new actor with the global spawn method # mark the new actor with the global spawn method
subactor._spawn_method = _spawn_method subactor._spawn_method = _spawn_method
if use_trio_run_in_process or _spawn_method == 'trio': if _spawn_method == 'trio':
async with trio.open_nursery() as nursery: async with trio.open_nursery() as nursery:
async with spawn_subactor( async with spawn_subactor(
subactor, subactor,
@ -320,7 +347,6 @@ async def mp_new_proc(
parent_addr: Tuple[str, int], parent_addr: Tuple[str, int],
_runtime_vars: Dict[str, Any], # serialized and sent to _child _runtime_vars: Dict[str, Any], # serialized and sent to _child
*, *,
use_trio_run_in_process: bool = False,
task_status: TaskStatus[Portal] = trio.TASK_STATUS_IGNORED task_status: TaskStatus[Portal] = trio.TASK_STATUS_IGNORED
) -> None: ) -> None: