Harden `trio` spawner process waiting
Always shield waiting for he process and always run ``trio.Process.__aexit__()`` on teardown. This enforces that shutdown happens to due cancellation triggered inside the sub-actor instead of the process being killed externally by the parent.dereg_on_channel_aclose
parent
fe45d99f65
commit
532429aec9
|
@ -148,7 +148,7 @@ async def cancel_on_completion(
|
||||||
else:
|
else:
|
||||||
log.info(
|
log.info(
|
||||||
f"Cancelling {portal.channel.uid} gracefully "
|
f"Cancelling {portal.channel.uid} gracefully "
|
||||||
"after result {result}")
|
f"after result {result}")
|
||||||
|
|
||||||
# cancel the process now that we have a final result
|
# cancel the process now that we have a final result
|
||||||
await portal.cancel_actor()
|
await portal.cancel_actor()
|
||||||
|
@ -159,7 +159,6 @@ async def spawn_subactor(
|
||||||
subactor: 'Actor',
|
subactor: 'Actor',
|
||||||
parent_addr: Tuple[str, int],
|
parent_addr: Tuple[str, int],
|
||||||
):
|
):
|
||||||
|
|
||||||
spawn_cmd = [
|
spawn_cmd = [
|
||||||
sys.executable,
|
sys.executable,
|
||||||
"-m",
|
"-m",
|
||||||
|
@ -184,13 +183,19 @@ async def spawn_subactor(
|
||||||
]
|
]
|
||||||
|
|
||||||
proc = await trio.open_process(spawn_cmd)
|
proc = await trio.open_process(spawn_cmd)
|
||||||
yield proc
|
try:
|
||||||
|
yield proc
|
||||||
|
finally:
|
||||||
|
# XXX: do this **after** cancellation/tearfown
|
||||||
|
# to avoid killing the process too early
|
||||||
|
# since trio does this internally on ``__aexit__()``
|
||||||
|
|
||||||
# XXX: do this **after** cancellation/tearfown
|
# NOTE: we always "shield" join sub procs in
|
||||||
# to avoid killing the process too early
|
# the outer scope since no actor zombies are
|
||||||
# since trio does this internally on ``__aexit__()``
|
# ever allowed. This ``__aexit__()`` also shields
|
||||||
async with proc:
|
# internally.
|
||||||
log.debug(f"Terminating {proc}")
|
async with proc:
|
||||||
|
log.debug(f"Terminating {proc}")
|
||||||
|
|
||||||
|
|
||||||
async def new_proc(
|
async def new_proc(
|
||||||
|
@ -243,16 +248,21 @@ async def new_proc(
|
||||||
task_status.started(portal)
|
task_status.started(portal)
|
||||||
|
|
||||||
# wait for ActorNursery.wait() to be called
|
# wait for ActorNursery.wait() to be called
|
||||||
await actor_nursery._join_procs.wait()
|
with trio.CancelScope(shield=True):
|
||||||
|
await actor_nursery._join_procs.wait()
|
||||||
|
|
||||||
if portal in actor_nursery._cancel_after_result_on_exit:
|
if portal in actor_nursery._cancel_after_result_on_exit:
|
||||||
cancel_scope = await nursery.start(
|
cancel_scope = await nursery.start(
|
||||||
cancel_on_completion, portal, subactor, errors)
|
cancel_on_completion, portal, subactor, errors)
|
||||||
|
|
||||||
# Wait for proc termination but **dont'** yet call
|
# Wait for proc termination but **dont' yet** call
|
||||||
# ``trio.Process.__aexit__()`` (it tears down stdio
|
# ``trio.Process.__aexit__()`` (it tears down stdio
|
||||||
# which will kill any waiting remote pdb trace).
|
# which will kill any waiting remote pdb trace).
|
||||||
await proc.wait()
|
|
||||||
|
# always "hard" join sub procs:
|
||||||
|
# no actor zombies allowed
|
||||||
|
with trio.CancelScope(shield=True):
|
||||||
|
await proc.wait()
|
||||||
else:
|
else:
|
||||||
# `multiprocessing`
|
# `multiprocessing`
|
||||||
assert _ctx
|
assert _ctx
|
||||||
|
|
Loading…
Reference in New Issue