Add `wait_for_peer_or_proc_death()` to `_spawn`
Race `IPCServer.wait_for_peer(uid)` against the sub-proc's `.wait()` inside a `trio` nursery; whichever completes first cancels the other. Prevents the spawning task from parking forever on an unsignalled `_peer_connected[uid]` event when a sub-actor dies during boot (e.g. crashed on import before reaching `_actor_child_main`). Instead of hanging, raises `ActorFailure` w/ the proc's exit code for clean supervisor error reporting. Also, - use the new racer in `main_thread_forkserver_proc()` spawn path. - keep `proc_wait` generic so each backend passes its own callable (`trio.Process.wait`, `_ForkedProc.wait`, etc.). (this patch was generated in some part by [`claude-code`][claude-code-gh]) [claude-code-gh]: https://github.com/anthropics/claude-codesubint_forkserver_backend
parent
cec6cc2a56
commit
3b0724eba8
|
|
@ -368,6 +368,7 @@ from tractor.runtime._portal import Portal
|
||||||
from ._spawn import (
|
from ._spawn import (
|
||||||
cancel_on_completion,
|
cancel_on_completion,
|
||||||
soft_kill,
|
soft_kill,
|
||||||
|
wait_for_peer_or_proc_death,
|
||||||
)
|
)
|
||||||
|
|
||||||
if TYPE_CHECKING:
|
if TYPE_CHECKING:
|
||||||
|
|
@ -968,7 +969,18 @@ async def main_thread_forkserver_proc(
|
||||||
f' |_{proc}\n'
|
f' |_{proc}\n'
|
||||||
)
|
)
|
||||||
|
|
||||||
event, chan = await ipc_server.wait_for_peer(uid)
|
# race the handshake-wait against proc-death so a
|
||||||
|
# sub that dies during boot (e.g. crashed on import
|
||||||
|
# before reaching `_actor_child_main`, leaving a
|
||||||
|
# zombie + no cmdline) surfaces as `ActorFailure`
|
||||||
|
# instead of parking the spawning task forever on
|
||||||
|
# an unsignalled `_peer_connected[uid]` event.
|
||||||
|
event, chan = await wait_for_peer_or_proc_death(
|
||||||
|
ipc_server,
|
||||||
|
uid,
|
||||||
|
proc_wait=proc.wait,
|
||||||
|
proc_repr=repr(proc),
|
||||||
|
)
|
||||||
|
|
||||||
except trio.Cancelled:
|
except trio.Cancelled:
|
||||||
cancelled_during_spawn = True
|
cancelled_during_spawn = True
|
||||||
|
|
|
||||||
|
|
@ -43,6 +43,7 @@ from tractor.log import get_logger
|
||||||
from tractor.discovery._addr import (
|
from tractor.discovery._addr import (
|
||||||
UnwrappedAddress,
|
UnwrappedAddress,
|
||||||
)
|
)
|
||||||
|
from .._exceptions import ActorFailure
|
||||||
from ._reap import unlink_uds_bind_addrs
|
from ._reap import unlink_uds_bind_addrs
|
||||||
from tractor.runtime._portal import Portal
|
from tractor.runtime._portal import Portal
|
||||||
from tractor.runtime._runtime import Actor
|
from tractor.runtime._runtime import Actor
|
||||||
|
|
@ -106,6 +107,71 @@ else:
|
||||||
await trio.lowlevel.wait_readable(proc.sentinel)
|
await trio.lowlevel.wait_readable(proc.sentinel)
|
||||||
|
|
||||||
|
|
||||||
|
async def wait_for_peer_or_proc_death(
|
||||||
|
ipc_server,
|
||||||
|
uid: tuple[str, str],
|
||||||
|
# TODO? not not types?
|
||||||
|
proc_wait: 'Callable[[], Awaitable]',
|
||||||
|
proc_repr: str = '',
|
||||||
|
|
||||||
|
) -> 'tuple[trio.Event, Channel]':
|
||||||
|
'''
|
||||||
|
Race `IPCServer.wait_for_peer(uid)` against the sub-proc's
|
||||||
|
own `.wait()` coroutine. Whichever completes first cancels
|
||||||
|
the other.
|
||||||
|
|
||||||
|
Used by every spawn-backend to detect a sub-actor that
|
||||||
|
*dies during boot* before completing the parent-handshake-
|
||||||
|
callback (e.g. crashed on import, exec'd-out, kernel-killed
|
||||||
|
pre-`_actor_child_main`). Without this race, the
|
||||||
|
handshake-wait — backed by an unsignalled `trio.Event` —
|
||||||
|
parks the spawning task forever and leaves the dead child
|
||||||
|
as a zombie since nobody calls `proc.wait()` to reap.
|
||||||
|
|
||||||
|
On normal handshake-complete: returns `(event, chan)`
|
||||||
|
identical to a bare `wait_for_peer`.
|
||||||
|
|
||||||
|
On proc-death-first: raises `ActorFailure` carrying the
|
||||||
|
proc's exit code, allowing the supervisor to surface a
|
||||||
|
clean error rather than hanging indefinitely.
|
||||||
|
|
||||||
|
`proc_wait` is a 0-arg async callable returning the proc's
|
||||||
|
exit-status — kept generic so each backend can pass its
|
||||||
|
own (`trio.Process.wait`, `_ForkedProc.wait`,
|
||||||
|
`proc_waiter(mp.Process)`, etc.).
|
||||||
|
|
||||||
|
`proc_repr` is an optional string used in the
|
||||||
|
`ActorFailure` message for diag.
|
||||||
|
|
||||||
|
'''
|
||||||
|
result: dict = {}
|
||||||
|
|
||||||
|
async def _await_handshake():
|
||||||
|
event, chan = await ipc_server.wait_for_peer(uid)
|
||||||
|
result['handshake'] = (event, chan)
|
||||||
|
boot_n.cancel_scope.cancel()
|
||||||
|
|
||||||
|
async def _await_death():
|
||||||
|
rc = await proc_wait()
|
||||||
|
result['died'] = rc
|
||||||
|
boot_n.cancel_scope.cancel()
|
||||||
|
|
||||||
|
async with trio.open_nursery() as boot_n:
|
||||||
|
boot_n.start_soon(_await_handshake)
|
||||||
|
boot_n.start_soon(_await_death)
|
||||||
|
|
||||||
|
if 'handshake' in result:
|
||||||
|
return result['handshake']
|
||||||
|
|
||||||
|
# only reached if proc-death won the race
|
||||||
|
raise ActorFailure(
|
||||||
|
f'Sub-actor {uid!r} died during boot '
|
||||||
|
f'(rc={result.get("died")!r}) before completing '
|
||||||
|
f'parent-handshake.\n'
|
||||||
|
f' proc: {proc_repr}'
|
||||||
|
)
|
||||||
|
|
||||||
|
|
||||||
def try_set_start_method(
|
def try_set_start_method(
|
||||||
key: SpawnMethodKey
|
key: SpawnMethodKey
|
||||||
|
|
||||||
|
|
|
||||||
Loading…
Reference in New Issue