forked from goodboy/tractor
1
0
Fork 0

Don't kill root's immediate children when in debug

If the root calls `trio.Process.kill()` on immediate child proc teardown
when the child is using pdb, we can get stdstreams clobbering that
results in a pdb++ repl where the user can't see what's been typed. Not
killing such children on cancellation / error seems to resolve this
issue whilst still giving reliable termination. For now, code that
special path until a time it becomes a problem for ensuring zombie
reaps.
try_msgspec
Tyler Goodlet 2021-05-10 07:47:38 -04:00
parent 8bb2ab117f
commit 40fcc2bd68
1 changed files with 44 additions and 18 deletions

View File

@ -22,7 +22,13 @@ from multiprocessing import forkserver # type: ignore
from typing import Tuple
from . import _forkserver_override
from ._state import current_actor, is_main_process
from ._state import (
current_actor,
is_main_process,
is_root_process,
_runtime_vars,
)
from .log import get_logger
from ._portal import Portal
from ._actor import Actor, ActorFailure
@ -180,23 +186,45 @@ async def spawn_subactor(
proc = await trio.open_process(spawn_cmd)
try:
yield proc
finally:
log.debug(f"Attempting to kill {proc}")
# XXX: do this **after** cancellation/tearfown
# to avoid killing the process too early
# since trio does this internally on ``__aexit__()``
# NOTE: we always "shield" join sub procs in
# the outer scope since no actor zombies are
# ever allowed. This ``__aexit__()`` also shields
# internally.
log.debug(f"Attempting to kill {proc}")
if (
is_root_process()
# NOTE: this timeout effectively does nothing right now since
# we are shielding the ``.wait()`` inside ``new_proc()`` which
# will pretty much never release until the process exits.
# XXX: basically the pre-closing of stdstreams in a
# root-processe's ``trio.Process.aclose()`` can clobber
# any existing debugger session so we avoid
and _runtime_vars['_debug_mode']
):
# XXX: this is ``trio.Process.aclose()`` minus
# the std-streams pre-closing steps and ``Process.kill()``
# calls.
try:
await proc.wait()
finally:
if proc.returncode is None:
# XXX: skip this when in debug and a session might
# still be live
# proc.kill()
with trio.CancelScope(shield=True):
await proc.wait()
else:
# NOTE: this timeout used to do nothing since we were shielding
# the ``.wait()`` inside ``new_proc()`` which will pretty much
# never release until the process exits, now it acts as
# a hard-kill time ultimatum.
with trio.move_on_after(3) as cs:
async with proc:
# NOTE: This ``__aexit__()`` shields internally.
async with proc: # calls ``trio.Process.aclose()``
log.debug(f"Terminating {proc}")
if cs.cancelled_caught:
log.critical(f"HARD KILLING {proc}")
proc.kill()
@ -212,7 +240,6 @@ async def new_proc(
parent_addr: Tuple[str, int],
_runtime_vars: Dict[str, Any], # serialized and sent to _child
*,
use_trio_run_in_process: bool = False,
task_status: TaskStatus[Portal] = trio.TASK_STATUS_IGNORED
) -> None:
"""Create a new ``multiprocessing.Process`` using the
@ -223,7 +250,7 @@ async def new_proc(
# mark the new actor with the global spawn method
subactor._spawn_method = _spawn_method
if use_trio_run_in_process or _spawn_method == 'trio':
if _spawn_method == 'trio':
async with trio.open_nursery() as nursery:
async with spawn_subactor(
subactor,
@ -320,7 +347,6 @@ async def mp_new_proc(
parent_addr: Tuple[str, int],
_runtime_vars: Dict[str, Any], # serialized and sent to _child
*,
use_trio_run_in_process: bool = False,
task_status: TaskStatus[Portal] = trio.TASK_STATUS_IGNORED
) -> None: