Factor soft-wait logic into a helper, use with mp
parent
d81eb1a51e
commit
46070f99de
|
@ -5,7 +5,7 @@ Machinery for actor process spawning using multiple backends.
|
||||||
import sys
|
import sys
|
||||||
import multiprocessing as mp
|
import multiprocessing as mp
|
||||||
import platform
|
import platform
|
||||||
from typing import Any, Dict, Optional
|
from typing import Any, Dict, Optional, Union, Callable
|
||||||
|
|
||||||
import trio
|
import trio
|
||||||
from trio_typing import TaskStatus
|
from trio_typing import TaskStatus
|
||||||
|
@ -179,10 +179,38 @@ async def do_hard_kill(
|
||||||
# XXX: should pretty much never get here unless we have
|
# XXX: should pretty much never get here unless we have
|
||||||
# to move the bits from ``proc.__aexit__()`` out and
|
# to move the bits from ``proc.__aexit__()`` out and
|
||||||
# into here.
|
# into here.
|
||||||
log.critical(f"HARD KILLING {proc}")
|
log.critical(f"#ZOMBIE_LORD_IS_HERE: {proc}")
|
||||||
proc.kill()
|
proc.kill()
|
||||||
|
|
||||||
|
|
||||||
|
async def soft_wait(
|
||||||
|
|
||||||
|
proc: Union[mp.Process, trio.Process],
|
||||||
|
wait_func: Callable[
|
||||||
|
Union[mp.Process, trio.Process],
|
||||||
|
None,
|
||||||
|
],
|
||||||
|
portal: Portal,
|
||||||
|
|
||||||
|
) -> None:
|
||||||
|
# Wait for proc termination but **dont' yet** call
|
||||||
|
# ``trio.Process.__aexit__()`` (it tears down stdio
|
||||||
|
# which will kill any waiting remote pdb trace).
|
||||||
|
# This is a "soft" (cancellable) join/reap.
|
||||||
|
try:
|
||||||
|
# await proc.wait()
|
||||||
|
await wait_func(proc)
|
||||||
|
except trio.Cancelled:
|
||||||
|
# if cancelled during a soft wait, cancel the child
|
||||||
|
# actor before entering the hard reap sequence
|
||||||
|
# below. This means we try to do a graceful teardown
|
||||||
|
# via sending a cancel message before getting out
|
||||||
|
# zombie killing tools.
|
||||||
|
with trio.CancelScope(shield=True):
|
||||||
|
await portal.cancel_actor()
|
||||||
|
raise
|
||||||
|
|
||||||
|
|
||||||
async def new_proc(
|
async def new_proc(
|
||||||
|
|
||||||
name: str,
|
name: str,
|
||||||
|
@ -199,11 +227,14 @@ async def new_proc(
|
||||||
task_status: TaskStatus[Portal] = trio.TASK_STATUS_IGNORED
|
task_status: TaskStatus[Portal] = trio.TASK_STATUS_IGNORED
|
||||||
|
|
||||||
) -> None:
|
) -> None:
|
||||||
"""
|
'''
|
||||||
Create a new ``multiprocessing.Process`` using the
|
Create a new ``Process`` using a "spawn method" as (configured using
|
||||||
spawn method as configured using ``try_set_start_method()``.
|
``try_set_start_method()``).
|
||||||
|
|
||||||
"""
|
This routine should be started in a actor runtime task and the logic
|
||||||
|
here is to be considered the core supervision strategy.
|
||||||
|
|
||||||
|
'''
|
||||||
# mark the new actor with the global spawn method
|
# mark the new actor with the global spawn method
|
||||||
subactor._spawn_method = _spawn_method
|
subactor._spawn_method = _spawn_method
|
||||||
uid = subactor.uid
|
uid = subactor.uid
|
||||||
|
@ -298,21 +329,14 @@ async def new_proc(
|
||||||
errors
|
errors
|
||||||
)
|
)
|
||||||
|
|
||||||
# Wait for proc termination but **dont' yet** call
|
# This is a "soft" (cancellable) join/reap which
|
||||||
# ``trio.Process.__aexit__()`` (it tears down stdio
|
# will remote cancel the actor on a ``trio.Cancelled``
|
||||||
# which will kill any waiting remote pdb trace).
|
# condition.
|
||||||
# This is a "soft" (cancellable) join/reap.
|
await soft_wait(
|
||||||
try:
|
proc,
|
||||||
await proc.wait()
|
trio.Process.wait,
|
||||||
except trio.Cancelled:
|
portal
|
||||||
# if cancelled during a soft wait, cancel the child
|
)
|
||||||
# actor before entering the hard reap sequence
|
|
||||||
# below. This means we try to do a graceful teardown
|
|
||||||
# via sending a cancel message before getting out
|
|
||||||
# zombie killing tools.
|
|
||||||
with trio.CancelScope(shield=True):
|
|
||||||
await portal.cancel_actor()
|
|
||||||
raise
|
|
||||||
|
|
||||||
# cancel result waiter that may have been spawned in
|
# cancel result waiter that may have been spawned in
|
||||||
# tandem if not done already
|
# tandem if not done already
|
||||||
|
@ -346,7 +370,7 @@ async def new_proc(
|
||||||
|
|
||||||
log.debug(f"Joined {proc}")
|
log.debug(f"Joined {proc}")
|
||||||
else:
|
else:
|
||||||
log.warning(f'Nursery cancelled before sub-proc started')
|
log.warning('Nursery cancelled before sub-proc started')
|
||||||
|
|
||||||
if not cancelled_during_spawn:
|
if not cancelled_during_spawn:
|
||||||
# pop child entry to indicate we no longer managing this
|
# pop child entry to indicate we no longer managing this
|
||||||
|
@ -361,6 +385,7 @@ async def new_proc(
|
||||||
actor_nursery=actor_nursery,
|
actor_nursery=actor_nursery,
|
||||||
subactor=subactor,
|
subactor=subactor,
|
||||||
errors=errors,
|
errors=errors,
|
||||||
|
|
||||||
# passed through to actor main
|
# passed through to actor main
|
||||||
bind_addr=bind_addr,
|
bind_addr=bind_addr,
|
||||||
parent_addr=parent_addr,
|
parent_addr=parent_addr,
|
||||||
|
@ -479,7 +504,14 @@ async def mp_new_proc(
|
||||||
errors
|
errors
|
||||||
)
|
)
|
||||||
|
|
||||||
await proc_waiter(proc)
|
# This is a "soft" (cancellable) join/reap which
|
||||||
|
# will remote cancel the actor on a ``trio.Cancelled``
|
||||||
|
# condition.
|
||||||
|
await soft_wait(
|
||||||
|
proc,
|
||||||
|
proc_waiter,
|
||||||
|
portal
|
||||||
|
)
|
||||||
|
|
||||||
# cancel result waiter that may have been spawned in
|
# cancel result waiter that may have been spawned in
|
||||||
# tandem if not done already
|
# tandem if not done already
|
||||||
|
|
Loading…
Reference in New Issue