Escalate cancel-ack timeouts to `proc.terminate()`
Wires SC-discipline cancel-then-escalate into `ActorNursery.cancel()`: graceful cancel-req -> bounded wait -> hard-kill Deats, - add `raise_on_timeout: bool = False` kwarg to `Portal.cancel_actor()`. When `True`, bounded- wait expiry raises `ActorTooSlowError` instead of the legacy DEBUG-log + return-`False` path. Default stays `False` for callers that handle their own escalation (e.g. `_spawn.soft_kill()` polling `proc.poll()`). - add `_try_cancel_then_kill()` helper in `_supervise` used by per-child cancel tasks. On `ActorTooSlowError`, escalates via `proc.terminate()` (SIGTERM) so a non-acking sub doesn't park `soft_kill()` forever waiting on `proc.poll()`. - replace `tn.start_soon(portal.cancel_actor)` in `ActorNursery.cancel()` with the helper. Debug-mode bypass: ----------------- skip escalation (fall back to legacy fire-and-forget cancel) when ANY of: - `Lock.ctx_in_debug is not None` (some actor is currently REPL-locked) - `_runtime_vars['_debug_mode']` (root opened with `debug_mode=True`). - `ActorNursery._at_least_one_child_in_debug` (per-child `debug_mode=` opt-in). ORing covers root-debug, child-debug, and active- REPL-lock cases without false-positively SIGTERM- ing a sub-tree proxying stdio for a REPL session. Motivated by the `subint_forkserver` dup-name hang where a same-named sibling subactor's cancel-RPC failed to ack within `Portal.cancel_timeout` (TCP+ forkserver register-RPC contention) and the nursery `__aexit__` deadlocked. (this patch was generated in some part by [`claude-code`][claude-code-gh]) [claude-code-gh]: https://github.com/anthropics/claude-codesubint_forkserver_backend
parent
38ffb875bd
commit
34f333a026
|
|
@ -55,6 +55,7 @@ from ..msg import (
|
||||||
Return,
|
Return,
|
||||||
)
|
)
|
||||||
from .._exceptions import (
|
from .._exceptions import (
|
||||||
|
ActorTooSlowError,
|
||||||
NoResult,
|
NoResult,
|
||||||
TransportClosed,
|
TransportClosed,
|
||||||
)
|
)
|
||||||
|
|
@ -268,6 +269,7 @@ class Portal:
|
||||||
async def cancel_actor(
|
async def cancel_actor(
|
||||||
self,
|
self,
|
||||||
timeout: float | None = None,
|
timeout: float | None = None,
|
||||||
|
raise_on_timeout: bool = False,
|
||||||
|
|
||||||
) -> bool:
|
) -> bool:
|
||||||
'''
|
'''
|
||||||
|
|
@ -281,6 +283,17 @@ class Portal:
|
||||||
`._context.Context.cancel()` which CAN be used for this
|
`._context.Context.cancel()` which CAN be used for this
|
||||||
purpose.
|
purpose.
|
||||||
|
|
||||||
|
`raise_on_timeout` (default `False`):
|
||||||
|
|
||||||
|
- `False` (legacy): on bounded-wait expiry, log at DEBUG
|
||||||
|
and return `False`. Used by callers that issue cancel
|
||||||
|
fire-and-forget and have their own escalation
|
||||||
|
(e.g. `_spawn.soft_kill()` checks `proc.poll()` after).
|
||||||
|
- `True`: on bounded-wait expiry, raise `ActorTooSlowError`
|
||||||
|
so the caller MUST handle the failure explicitly.
|
||||||
|
`ActorNursery.cancel()` opts in so it can escalate via
|
||||||
|
`proc.terminate()` per SC-discipline.
|
||||||
|
|
||||||
'''
|
'''
|
||||||
__runtimeframe__: int = 1 # noqa
|
__runtimeframe__: int = 1 # noqa
|
||||||
|
|
||||||
|
|
@ -301,15 +314,16 @@ class Portal:
|
||||||
|
|
||||||
# XXX the one spot we set it?
|
# XXX the one spot we set it?
|
||||||
chan._cancel_called: bool = True
|
chan._cancel_called: bool = True
|
||||||
|
cancel_timeout: float = (
|
||||||
|
timeout
|
||||||
|
or
|
||||||
|
self.cancel_timeout
|
||||||
|
)
|
||||||
try:
|
try:
|
||||||
# send cancel cmd - might not get response
|
# send cancel cmd - might not get response
|
||||||
# XXX: sure would be nice to make this work with
|
# XXX: sure would be nice to make this work with
|
||||||
# a proper shield
|
# a proper shield
|
||||||
with trio.move_on_after(
|
with trio.move_on_after(cancel_timeout) as cs:
|
||||||
timeout
|
|
||||||
or
|
|
||||||
self.cancel_timeout
|
|
||||||
) as cs:
|
|
||||||
cs.shield: bool = True
|
cs.shield: bool = True
|
||||||
await self.run_from_ns(
|
await self.run_from_ns(
|
||||||
'self',
|
'self',
|
||||||
|
|
@ -317,16 +331,24 @@ class Portal:
|
||||||
)
|
)
|
||||||
return True
|
return True
|
||||||
|
|
||||||
if cs.cancelled_caught:
|
# `move_on_after` fired — peer didn't ack within
|
||||||
# may timeout and we never get an ack (obvi racy)
|
# bounded window. Behaviour depends on
|
||||||
# but that doesn't mean it wasn't cancelled.
|
# `raise_on_timeout`:
|
||||||
log.debug(
|
assert cs.cancelled_caught
|
||||||
f'May have failed to cancel peer?\n'
|
if raise_on_timeout:
|
||||||
f'\n'
|
raise ActorTooSlowError(
|
||||||
f'c)=?> {peer_id}\n'
|
f'Peer {peer_id} did not ack `Actor.cancel()`'
|
||||||
|
f'-RPC within bounded wait of '
|
||||||
|
f'{cancel_timeout!r}s'
|
||||||
)
|
)
|
||||||
|
|
||||||
# if we get here some weird cancellation case happened
|
# legacy fire-and-forget path: log + return False so
|
||||||
|
# the caller can decide whether to escalate.
|
||||||
|
log.debug(
|
||||||
|
f'May have failed to cancel peer?\n'
|
||||||
|
f'\n'
|
||||||
|
f'c)=?> {peer_id}\n'
|
||||||
|
)
|
||||||
return False
|
return False
|
||||||
|
|
||||||
except TransportClosed as tpt_err:
|
except TransportClosed as tpt_err:
|
||||||
|
|
|
||||||
|
|
@ -38,8 +38,14 @@ from ..discovery._addr import (
|
||||||
UnwrappedAddress,
|
UnwrappedAddress,
|
||||||
mk_uuid,
|
mk_uuid,
|
||||||
)
|
)
|
||||||
from ._state import current_actor, is_main_process
|
from ._state import (
|
||||||
from ..log import get_logger, get_loglevel
|
current_actor,
|
||||||
|
is_main_process,
|
||||||
|
)
|
||||||
|
from ..log import (
|
||||||
|
get_logger,
|
||||||
|
get_loglevel,
|
||||||
|
)
|
||||||
from ._runtime import Actor
|
from ._runtime import Actor
|
||||||
from ._portal import Portal
|
from ._portal import Portal
|
||||||
from ..trionics import (
|
from ..trionics import (
|
||||||
|
|
@ -47,6 +53,7 @@ from ..trionics import (
|
||||||
collapse_eg,
|
collapse_eg,
|
||||||
)
|
)
|
||||||
from .._exceptions import (
|
from .._exceptions import (
|
||||||
|
ActorTooSlowError,
|
||||||
ContextCancelled,
|
ContextCancelled,
|
||||||
)
|
)
|
||||||
from .._root import (
|
from .._root import (
|
||||||
|
|
@ -60,11 +67,93 @@ if TYPE_CHECKING:
|
||||||
import multiprocessing as mp
|
import multiprocessing as mp
|
||||||
# from ..ipc._server import IPCServer
|
# from ..ipc._server import IPCServer
|
||||||
from ..ipc import IPCServer
|
from ..ipc import IPCServer
|
||||||
|
from ..spawn._spawn import ProcessType
|
||||||
|
|
||||||
|
|
||||||
log = get_logger()
|
log = get_logger()
|
||||||
|
|
||||||
|
|
||||||
|
async def _try_cancel_then_kill(
|
||||||
|
portal: Portal,
|
||||||
|
# `ProcessType` is `TYPE_CHECKING`-only (defined under that
|
||||||
|
# guard in `..spawn._spawn`) so we stringify here to avoid
|
||||||
|
# eager runtime eval of the annotation at function-def time
|
||||||
|
# (this module has no `from __future__ import annotations`).
|
||||||
|
proc: 'ProcessType',
|
||||||
|
subactor: Actor,
|
||||||
|
debug_mode_active: bool = False,
|
||||||
|
) -> None:
|
||||||
|
'''
|
||||||
|
Per-child cancel-then-escalate helper used by
|
||||||
|
`ActorNursery.cancel()`.
|
||||||
|
|
||||||
|
Sends a graceful actor-runtime cancel-RPC via
|
||||||
|
`Portal.cancel_actor(raise_on_timeout=True)`. If the bounded-wait
|
||||||
|
expires before the peer ack's, `ActorTooSlowError` is raised and
|
||||||
|
we escalate via `proc.terminate()` (SIGTERM) per SC-discipline:
|
||||||
|
|
||||||
|
graceful cancel-req -> bounded wait -> hard-kill
|
||||||
|
|
||||||
|
Without this escalation, a same-name sibling subactor whose
|
||||||
|
cancel-RPC failed to ack within `Portal.cancel_timeout` (e.g.
|
||||||
|
under TCP+forkserver register-RPC contention) would park the
|
||||||
|
parent's `soft_kill()` watcher forever waiting on `proc.poll()`,
|
||||||
|
deadlocking nursery `__aexit__`. See `ActorTooSlowError` for
|
||||||
|
the wider write-up.
|
||||||
|
|
||||||
|
'''
|
||||||
|
# XXX, do NOT escalate to `proc.terminate()` while ANY of
|
||||||
|
# the following are true — SIGTERM-ing a sub would tear
|
||||||
|
# down its sub-tree including any descendant proxying
|
||||||
|
# stdio to/from a REPL-locked actor, clobbering the user's
|
||||||
|
# debug session:
|
||||||
|
#
|
||||||
|
# - `Lock.ctx_in_debug is not None`: most precise — some
|
||||||
|
# actor in the tree is currently REPL-locked. Set in the
|
||||||
|
# root actor for the lifetime of the lock. Raceable
|
||||||
|
# (false negative if SIGINT arrives before lock-acquire
|
||||||
|
# RPC completes).
|
||||||
|
#
|
||||||
|
# - `_runtime_vars['_debug_mode']`: root-actor was opened
|
||||||
|
# with `debug_mode=True` (via `open_root_actor` /
|
||||||
|
# `open_nursery`). Set once at root boot, never cleared.
|
||||||
|
# Catches deep-descendant REPL sessions even when the
|
||||||
|
# intermediate nurseries didn't pass `debug_mode=` per-
|
||||||
|
# child.
|
||||||
|
#
|
||||||
|
# - `debug_mode_active`: this nursery has at least one
|
||||||
|
# child started with an explicit `debug_mode=` arg
|
||||||
|
# (`ActorNursery._at_least_one_child_in_debug`). Catches
|
||||||
|
# the case where root is NOT in debug-mode but a
|
||||||
|
# nursery-direct child opted in.
|
||||||
|
#
|
||||||
|
# Independent because root may NOT be in debug-mode even
|
||||||
|
# when a child is (only the child's `_runtime_vars` is
|
||||||
|
# mutated by per-child `debug_mode=True`). ORing covers
|
||||||
|
# every flavor without false-positively skipping
|
||||||
|
# legitimate hard-kill paths in non-debug trees.
|
||||||
|
if (
|
||||||
|
debug.Lock.ctx_in_debug is not None
|
||||||
|
or
|
||||||
|
_state._runtime_vars.get('_debug_mode', False)
|
||||||
|
or
|
||||||
|
debug_mode_active
|
||||||
|
):
|
||||||
|
await portal.cancel_actor()
|
||||||
|
return
|
||||||
|
|
||||||
|
try:
|
||||||
|
await portal.cancel_actor(raise_on_timeout=True)
|
||||||
|
except ActorTooSlowError as too_slow:
|
||||||
|
log.error(
|
||||||
|
f'Cancel-ack TIMED OUT for sub-actor\n'
|
||||||
|
f' uid: {subactor.aid.reprol()!r}\n'
|
||||||
|
f' reason: {too_slow}\n'
|
||||||
|
f'-> escalating to `proc.terminate()` (hard-kill)\n'
|
||||||
|
)
|
||||||
|
proc.terminate()
|
||||||
|
|
||||||
|
|
||||||
class ActorNursery:
|
class ActorNursery:
|
||||||
'''
|
'''
|
||||||
The fundamental actor supervision construct: spawn and manage
|
The fundamental actor supervision construct: spawn and manage
|
||||||
|
|
@ -428,10 +517,23 @@ class ActorNursery:
|
||||||
else: # there's no other choice left
|
else: # there's no other choice left
|
||||||
proc.terminate()
|
proc.terminate()
|
||||||
|
|
||||||
# spawn cancel tasks for each sub-actor
|
# spawn per-child cancel tasks; the helper
|
||||||
|
# escalates to hard-kill on
|
||||||
|
# `ActorTooSlowError` rather than silently
|
||||||
|
# swallowing the cancel-ack timeout, EXCEPT
|
||||||
|
# when this nursery has any debug-eligible
|
||||||
|
# child (in which case we keep legacy
|
||||||
|
# fire-and-forget semantics to avoid
|
||||||
|
# clobbering an active REPL).
|
||||||
assert portal
|
assert portal
|
||||||
if portal.channel.connected():
|
if portal.channel.connected():
|
||||||
tn.start_soon(portal.cancel_actor)
|
tn.start_soon(
|
||||||
|
_try_cancel_then_kill,
|
||||||
|
portal,
|
||||||
|
proc,
|
||||||
|
subactor,
|
||||||
|
self._at_least_one_child_in_debug,
|
||||||
|
)
|
||||||
|
|
||||||
log.cancel(msg)
|
log.cancel(msg)
|
||||||
# if we cancelled the cancel (we hung cancelling remote actors)
|
# if we cancelled the cancel (we hung cancelling remote actors)
|
||||||
|
|
|
||||||
Loading…
Reference in New Issue