forked from goodboy/tractor
Distinctly separate and harden mp spawning
It's clear now that special attention is needed to handle the case where a spawned `multiprocessing` proc is started but then the parent is cancelled before the child can connect back; in this case we need to be sure to kill the near-zombie child asap. This may end up being the solution to other resiliency issues seen around mp with nested process trees too. More testing is needed to be sure. Relates to #84 #89 #134 #146mp_teardown_hardening
parent
af93b8532a
commit
607c48f1ac
|
@ -2,14 +2,13 @@
|
||||||
Machinery for actor process spawning using multiple backends.
|
Machinery for actor process spawning using multiple backends.
|
||||||
"""
|
"""
|
||||||
import sys
|
import sys
|
||||||
import inspect
|
|
||||||
import multiprocessing as mp
|
import multiprocessing as mp
|
||||||
import platform
|
import platform
|
||||||
from typing import Any, Dict, Optional
|
from typing import Any, Dict, Optional
|
||||||
|
|
||||||
import trio
|
import trio
|
||||||
from trio_typing import TaskStatus
|
from trio_typing import TaskStatus
|
||||||
from async_generator import aclosing, asynccontextmanager
|
from async_generator import asynccontextmanager
|
||||||
|
|
||||||
try:
|
try:
|
||||||
from multiprocessing import semaphore_tracker # type: ignore
|
from multiprocessing import semaphore_tracker # type: ignore
|
||||||
|
@ -128,7 +127,9 @@ async def cancel_on_completion(
|
||||||
Should only be called for actors spawned with `run_in_actor()`.
|
Should only be called for actors spawned with `run_in_actor()`.
|
||||||
"""
|
"""
|
||||||
with trio.CancelScope() as cs:
|
with trio.CancelScope() as cs:
|
||||||
|
|
||||||
task_status.started(cs)
|
task_status.started(cs)
|
||||||
|
|
||||||
# if this call errors we store the exception for later
|
# if this call errors we store the exception for later
|
||||||
# in ``errors`` which will be reraised inside
|
# in ``errors`` which will be reraised inside
|
||||||
# a MultiError and we still send out a cancel request
|
# a MultiError and we still send out a cancel request
|
||||||
|
@ -138,6 +139,7 @@ async def cancel_on_completion(
|
||||||
log.warning(
|
log.warning(
|
||||||
f"Cancelling {portal.channel.uid} after error {result}"
|
f"Cancelling {portal.channel.uid} after error {result}"
|
||||||
)
|
)
|
||||||
|
|
||||||
else:
|
else:
|
||||||
log.info(
|
log.info(
|
||||||
f"Cancelling {portal.channel.uid} gracefully "
|
f"Cancelling {portal.channel.uid} gracefully "
|
||||||
|
@ -202,7 +204,7 @@ async def spawn_subactor(
|
||||||
|
|
||||||
async def new_proc(
|
async def new_proc(
|
||||||
name: str,
|
name: str,
|
||||||
actor_nursery: 'ActorNursery', # type: ignore
|
actor_nursery: 'ActorNursery', # type: ignore # noqa
|
||||||
subactor: Actor,
|
subactor: Actor,
|
||||||
errors: Dict[Tuple[str, str], Exception],
|
errors: Dict[Tuple[str, str], Exception],
|
||||||
# passed through to actor main
|
# passed through to actor main
|
||||||
|
@ -221,8 +223,8 @@ async def new_proc(
|
||||||
# mark the new actor with the global spawn method
|
# mark the new actor with the global spawn method
|
||||||
subactor._spawn_method = _spawn_method
|
subactor._spawn_method = _spawn_method
|
||||||
|
|
||||||
async with trio.open_nursery() as nursery:
|
|
||||||
if use_trio_run_in_process or _spawn_method == 'trio':
|
if use_trio_run_in_process or _spawn_method == 'trio':
|
||||||
|
async with trio.open_nursery() as nursery:
|
||||||
async with spawn_subactor(
|
async with spawn_subactor(
|
||||||
subactor,
|
subactor,
|
||||||
parent_addr,
|
parent_addr,
|
||||||
|
@ -261,7 +263,11 @@ async def new_proc(
|
||||||
|
|
||||||
if portal in actor_nursery._cancel_after_result_on_exit:
|
if portal in actor_nursery._cancel_after_result_on_exit:
|
||||||
cancel_scope = await nursery.start(
|
cancel_scope = await nursery.start(
|
||||||
cancel_on_completion, portal, subactor, errors)
|
cancel_on_completion,
|
||||||
|
portal,
|
||||||
|
subactor,
|
||||||
|
errors
|
||||||
|
)
|
||||||
|
|
||||||
# Wait for proc termination but **dont' yet** call
|
# Wait for proc termination but **dont' yet** call
|
||||||
# ``trio.Process.__aexit__()`` (it tears down stdio
|
# ``trio.Process.__aexit__()`` (it tears down stdio
|
||||||
|
@ -275,8 +281,49 @@ async def new_proc(
|
||||||
# no actor zombies allowed
|
# no actor zombies allowed
|
||||||
# with trio.CancelScope(shield=True):
|
# with trio.CancelScope(shield=True):
|
||||||
await proc.wait()
|
await proc.wait()
|
||||||
|
|
||||||
|
log.debug(f"Joined {proc}")
|
||||||
|
# pop child entry to indicate we are no longer managing this subactor
|
||||||
|
subactor, proc, portal = actor_nursery._children.pop(subactor.uid)
|
||||||
|
|
||||||
|
# cancel result waiter that may have been spawned in
|
||||||
|
# tandem if not done already
|
||||||
|
if cancel_scope:
|
||||||
|
log.warning(
|
||||||
|
f"Cancelling existing result waiter task for {subactor.uid}")
|
||||||
|
cancel_scope.cancel()
|
||||||
else:
|
else:
|
||||||
# `multiprocessing`
|
# `multiprocessing`
|
||||||
|
# async with trio.open_nursery() as nursery:
|
||||||
|
await mp_new_proc(
|
||||||
|
name=name,
|
||||||
|
actor_nursery=actor_nursery,
|
||||||
|
subactor=subactor,
|
||||||
|
errors=errors,
|
||||||
|
# passed through to actor main
|
||||||
|
bind_addr=bind_addr,
|
||||||
|
parent_addr=parent_addr,
|
||||||
|
_runtime_vars=_runtime_vars,
|
||||||
|
task_status=task_status,
|
||||||
|
)
|
||||||
|
|
||||||
|
|
||||||
|
async def mp_new_proc(
|
||||||
|
|
||||||
|
name: str,
|
||||||
|
actor_nursery: 'ActorNursery', # type: ignore # noqa
|
||||||
|
subactor: Actor,
|
||||||
|
errors: Dict[Tuple[str, str], Exception],
|
||||||
|
# passed through to actor main
|
||||||
|
bind_addr: Tuple[str, int],
|
||||||
|
parent_addr: Tuple[str, int],
|
||||||
|
_runtime_vars: Dict[str, Any], # serialized and sent to _child
|
||||||
|
*,
|
||||||
|
use_trio_run_in_process: bool = False,
|
||||||
|
task_status: TaskStatus[Portal] = trio.TASK_STATUS_IGNORED
|
||||||
|
|
||||||
|
) -> None:
|
||||||
|
async with trio.open_nursery() as nursery:
|
||||||
assert _ctx
|
assert _ctx
|
||||||
start_method = _ctx.get_start_method()
|
start_method = _ctx.get_start_method()
|
||||||
if start_method == 'forkserver':
|
if start_method == 'forkserver':
|
||||||
|
@ -334,6 +381,7 @@ async def new_proc(
|
||||||
|
|
||||||
log.info(f"Started {proc}")
|
log.info(f"Started {proc}")
|
||||||
|
|
||||||
|
try:
|
||||||
# wait for actor to spawn and connect back to us
|
# wait for actor to spawn and connect back to us
|
||||||
# channel should have handshake completed by the
|
# channel should have handshake completed by the
|
||||||
# local actor by the time we get a ref to it
|
# local actor by the time we get a ref to it
|
||||||
|
@ -356,20 +404,50 @@ async def new_proc(
|
||||||
# awaited and reported upwards to the supervisor.
|
# awaited and reported upwards to the supervisor.
|
||||||
await actor_nursery._join_procs.wait()
|
await actor_nursery._join_procs.wait()
|
||||||
|
|
||||||
if portal in actor_nursery._cancel_after_result_on_exit:
|
finally:
|
||||||
cancel_scope = await nursery.start(
|
# XXX: in the case we were cancelled before the sub-proc
|
||||||
cancel_on_completion, portal, subactor, errors)
|
# registered itself back we must be sure to try and clean
|
||||||
|
# any process we may have started.
|
||||||
|
|
||||||
# TODO: timeout block here?
|
reaping_cancelled = False
|
||||||
|
cancel_scope = None
|
||||||
|
|
||||||
|
if portal in actor_nursery._cancel_after_result_on_exit:
|
||||||
|
try:
|
||||||
|
# async with trio.open_nursery() as n:
|
||||||
|
# n.cancel_scope.shield = True
|
||||||
|
cancel_scope = await nursery.start(
|
||||||
|
cancel_on_completion,
|
||||||
|
portal,
|
||||||
|
subactor,
|
||||||
|
errors
|
||||||
|
)
|
||||||
|
except trio.Cancelled:
|
||||||
|
# if the reaping task was cancelled we may have hit
|
||||||
|
# a race where the subproc disconnected before we
|
||||||
|
# could send it a message to cancel (classic 2 generals)
|
||||||
|
# in that case, wait shortly then kill the process.
|
||||||
|
reaping_cancelled = True
|
||||||
|
|
||||||
|
if proc.is_alive():
|
||||||
|
with trio.move_on_after(0.1) as cs:
|
||||||
|
cs.shield = True
|
||||||
|
await proc_waiter(proc)
|
||||||
|
|
||||||
|
if cs.cancelled_caught:
|
||||||
|
proc.terminate()
|
||||||
|
|
||||||
|
if not reaping_cancelled:
|
||||||
if proc.is_alive():
|
if proc.is_alive():
|
||||||
await proc_waiter(proc)
|
await proc_waiter(proc)
|
||||||
proc.join()
|
|
||||||
|
|
||||||
# This is again common logic for all backends:
|
# TODO: timeout block here?
|
||||||
|
proc.join()
|
||||||
|
|
||||||
log.debug(f"Joined {proc}")
|
log.debug(f"Joined {proc}")
|
||||||
# pop child entry to indicate we are no longer managing this subactor
|
# pop child entry to indicate we are no longer managing this subactor
|
||||||
subactor, proc, portal = actor_nursery._children.pop(subactor.uid)
|
subactor, proc, portal = actor_nursery._children.pop(subactor.uid)
|
||||||
|
|
||||||
# cancel result waiter that may have been spawned in
|
# cancel result waiter that may have been spawned in
|
||||||
# tandem if not done already
|
# tandem if not done already
|
||||||
if cancel_scope:
|
if cancel_scope:
|
||||||
|
|
Loading…
Reference in New Issue