WIP rework trio spanwer to include cancellation logic; not correct yet..

zombie_lord_infinite
Tyler Goodlet 2021-10-08 18:14:44 -04:00
parent c02a493d8c
commit 64ebb2aff4
1 changed files with 341 additions and 162 deletions

View File

@ -1,6 +1,7 @@
""" """
Machinery for actor process spawning using multiple backends. Machinery for actor process spawning using multiple backends.
""" """
from __future__ import annotations
import sys import sys
import multiprocessing as mp import multiprocessing as mp
import platform import platform
@ -8,7 +9,6 @@ from typing import Any, Dict, Optional
import trio import trio
from trio_typing import TaskStatus from trio_typing import TaskStatus
from async_generator import asynccontextmanager
try: try:
from multiprocessing import semaphore_tracker # type: ignore from multiprocessing import semaphore_tracker # type: ignore
@ -32,6 +32,7 @@ from ._portal import Portal
from ._actor import Actor from ._actor import Actor
from ._entry import _mp_main from ._entry import _mp_main
from ._exceptions import ActorFailure from ._exceptions import ActorFailure
from ._debug import maybe_wait_for_debugger
log = get_logger('tractor') log = get_logger('tractor')
@ -90,96 +91,198 @@ def try_set_start_method(name: str) -> Optional[mp.context.BaseContext]:
return _ctx return _ctx
async def exhaust_portal( # async def exhaust_portal(
portal: Portal, # portal: Portal,
actor: Actor # actor: Actor
) -> Any: # ) -> Any:
"""Pull final result from portal (assuming it has one). # """Pull final result from portal (assuming it has one).
If the main task is an async generator do our best to consume # If the main task is an async generator do our best to consume
what's left of it. # what's left of it.
""" # """
try: # try:
log.debug(f"Waiting on final result from {actor.uid}") # log.debug(f"Waiting on final result from {actor.uid}")
# XXX: streams should never be reaped here since they should # # XXX: streams should never be reaped here since they should
# always be established and shutdown using a context manager api # # always be established and shutdown using a context manager api
final = await portal.result() # final = await portal.result()
except (Exception, trio.MultiError) as err: # except (Exception, trio.MultiError) as err:
# we reraise in the parent task via a ``trio.MultiError`` # # we reraise in the parent task via a ``trio.MultiError``
return err # return err
except trio.Cancelled as err: # except trio.Cancelled as err:
# lol, of course we need this too ;P # # lol, of course we need this too ;P
# TODO: merge with above? # # TODO: merge with above?
log.warning(f"Cancelled result waiter for {portal.actor.uid}") # log.warning(f"Cancelled result waiter for {portal.actor.uid}")
return err # return err
else: # else:
log.debug(f"Returning final result: {final}") # log.debug(f"Returning final result: {final}")
return final # return final
async def cancel_on_completion( async def result_from_portal(
portal: Portal, portal: Portal,
actor: Actor, actor: Actor,
errors: Dict[Tuple[str, str], Exception], errors: Dict[Tuple[str, str], Exception],
cancel_on_result: bool = False,
task_status: TaskStatus[trio.CancelScope] = trio.TASK_STATUS_IGNORED, task_status: TaskStatus[trio.CancelScope] = trio.TASK_STATUS_IGNORED,
) -> None: ) -> None:
"""Cancel actor gracefully once it's "main" portal's """
Cancel actor gracefully once it's "main" portal's
result arrives. result arrives.
Should only be called for actors spawned with `run_in_actor()`. Should only be called for actors spawned with `run_in_actor()`.
"""
with trio.CancelScope() as cs:
"""
__tracebackhide__ = True
# cancel control is explicityl done by the caller
with trio.CancelScope() as cs:
task_status.started(cs) task_status.started(cs)
# if this call errors we store the exception for later # if this call errors we store the exception for later
# in ``errors`` which will be reraised inside # in ``errors`` which will be reraised inside
# a MultiError and we still send out a cancel request # a MultiError and we still send out a cancel request
result = await exhaust_portal(portal, actor) # result = await exhaust_portal(portal, actor)
try:
log.debug(f"Waiting on final result from {actor.uid}")
# XXX: streams should never be reaped here since they should
# always be established and shutdown using a context manager api
result = await portal.result()
log.debug(f"Returning final result: {result}")
except (Exception, trio.MultiError) as err:
# we reraise in the parent task via a ``trio.MultiError``
result = err
errors[actor.uid] = err
# raise
except trio.Cancelled as err:
# lol, of course we need this too ;P
# TODO: merge with above?
log.warning(f"Cancelled result waiter for {portal.channel.uid}")
result = err
# errors[actor.uid] = err
# raise
if cancel_on_result:
if isinstance(result, Exception): if isinstance(result, Exception):
errors[actor.uid] = result # errors[actor.uid] = result
log.warning( log.warning(
f"Cancelling {portal.channel.uid} after error {result}" f"Cancelling {portal.channel.uid} after error {result}"
) )
raise result
else: else:
log.runtime( log.runtime(
f"Cancelling {portal.channel.uid} gracefully " f"Cancelling {portal.channel.uid} gracefully "
f"after result {result}") f"after result {result}")
# cancel the process now that we have a final result # an actor that was `.run_in_actor()` executes a single task
# and delivers the result, then we cancel it.
# TODO: likely in the future we should just implement this using
# the new `open_context()` IPC api, since it's the more general
# api and can represent this form.
# XXX: do we need this?
# await maybe_wait_for_debugger()
await portal.cancel_actor() await portal.cancel_actor()
return result
async def do_hard_kill( async def do_hard_kill(
proc: trio.Process, proc: trio.Process,
timeout: float,
) -> None: ) -> None:
'''
Hard kill a process with timeout.
'''
log.debug(f"Hard killing {proc}")
# NOTE: this timeout used to do nothing since we were shielding # NOTE: this timeout used to do nothing since we were shielding
# the ``.wait()`` inside ``new_proc()`` which will pretty much # the ``.wait()`` inside ``new_proc()`` which will pretty much
# never release until the process exits, now it acts as # never release until the process exits, now it acts as
# a hard-kill time ultimatum. # a hard-kill time ultimatum.
with trio.move_on_after(3) as cs: with trio.move_on_after(timeout) as cs:
# NOTE: This ``__aexit__()`` shields internally. # NOTE: This ``__aexit__()`` shields internally and originally
async with proc: # calls ``trio.Process.aclose()`` # would tear down stdstreams via ``trio.Process.aclose()``.
async with proc:
log.debug(f"Terminating {proc}") log.debug(f"Terminating {proc}")
# proc.terminate()
if cs.cancelled_caught: if cs.cancelled_caught:
# XXX: should pretty much never get here unless we have # XXX: should pretty much never get here unless we have
# to move the bits from ``proc.__aexit__()`` out and # to move the bits from ``proc.__aexit__()`` out and
# into here. # into here.
log.critical(f"HARD KILLING {proc}") log.critical(f"{timeout} timeout, HARD KILLING {proc}")
proc.kill() proc.kill()
@asynccontextmanager async def reap_proc(
async def spawn_subactor(
subactor: 'Actor', proc: trio.Process,
terminate_after: float = float('inf'),
hard_kill_after: int = 0.1,
) -> None:
with trio.move_on_after(terminate_after) as cs:
# Wait for proc termination but **dont' yet** do
# any out-of-ipc-land termination / process
# killing. This is a "light" (cancellable) join,
# the hard join is below after timeout
await proc.wait()
if cs.cancelled_caught and terminate_after is not float('inf'):
# Always "hard" join lingering sub procs since no
# actor zombies are allowed!
log.warning(
# f'Failed to gracefully terminate {subactor.uid}')
f'Failed to gracefully terminate {proc}\n'
f"Attempting to hard kill {proc}")
with trio.CancelScope(shield=True):
# XXX: do this **after**
# cancellation/tearfown to avoid killing the
# process too early since trio does this
# internally on ``__aexit__()``
await do_hard_kill(proc, hard_kill_after)
async def new_proc(
name: str,
actor_nursery: 'ActorNursery', # type: ignore # noqa
subactor: Actor,
errors: Dict[Tuple[str, str], Exception],
# passed through to actor main
bind_addr: Tuple[str, int],
parent_addr: Tuple[str, int], parent_addr: Tuple[str, int],
infect_asyncio: bool, _runtime_vars: Dict[str, Any], # serialized and sent to _child
): *,
graceful_kill_timeout: int = 3,
infect_asyncio: bool = False,
task_status: TaskStatus[Portal] = trio.TASK_STATUS_IGNORED
) -> None:
"""
Create a new ``multiprocessing.Process`` using the
spawn method as configured using ``try_set_start_method()``.
"""
# mark the new actor with the global spawn method
subactor._spawn_method = _spawn_method
uid = subactor.uid
if _spawn_method == 'trio':
spawn_cmd = [ spawn_cmd = [
sys.executable, sys.executable,
"-m", "-m",
@ -191,7 +294,7 @@ async def spawn_subactor(
# the OS; it otherwise can be passed via the parent channel if # the OS; it otherwise can be passed via the parent channel if
# we prefer in the future (for privacy). # we prefer in the future (for privacy).
"--uid", "--uid",
str(subactor.uid), str(uid),
# Address the child must connect to on startup # Address the child must connect to on startup
"--parent_addr", "--parent_addr",
str(parent_addr) str(parent_addr)
@ -208,58 +311,37 @@ async def spawn_subactor(
spawn_cmd.append("--asyncio") spawn_cmd.append("--asyncio")
proc = await trio.open_process(spawn_cmd) proc = await trio.open_process(spawn_cmd)
log.info(f"Started {proc}")
portal: Optional[Portal] = None
try: try:
yield proc
finally:
log.runtime(f"Attempting to kill {proc}")
# XXX: do this **after** cancellation/tearfown
# to avoid killing the process too early
# since trio does this internally on ``__aexit__()``
await do_hard_kill(proc)
async def new_proc(
name: str,
actor_nursery: 'ActorNursery', # type: ignore # noqa
subactor: Actor,
errors: Dict[Tuple[str, str], Exception],
# passed through to actor main
bind_addr: Tuple[str, int],
parent_addr: Tuple[str, int],
_runtime_vars: Dict[str, Any], # serialized and sent to _child
*,
infect_asyncio: bool = False,
task_status: TaskStatus[Portal] = trio.TASK_STATUS_IGNORED
) -> None:
"""Create a new ``multiprocessing.Process`` using the
spawn method as configured using ``try_set_start_method()``.
"""
cancel_scope = None
# mark the new actor with the global spawn method
subactor._spawn_method = _spawn_method
if _spawn_method == 'trio':
async with trio.open_nursery() as nursery:
async with spawn_subactor(
subactor,
parent_addr,
infect_asyncio=infect_asyncio
) as proc:
log.runtime(f"Started {proc}")
# wait for actor to spawn and connect back to us # wait for actor to spawn and connect back to us
# channel should have handshake completed by the # channel should have handshake completed by the
# local actor by the time we get a ref to it # local actor by the time we get a ref to it
event, chan = await actor_nursery._actor.wait_for_peer( event, chan = await actor_nursery._actor.wait_for_peer(
subactor.uid) subactor.uid)
except trio.Cancelled:
# reap un-contacted process which are started
# but never setup a connection to parent.
log.warning(f'Spawning aborted due to cancel {proc}')
with trio.CancelScope(shield=True):
await do_hard_kill(proc, 0.1)
# this should break here
raise
actor_nursery_cancel_called = None
portal = Portal(chan) portal = Portal(chan)
actor_nursery._children[subactor.uid] = ( actor_nursery._children[subactor.uid] = (
subactor, proc, portal) subactor, proc, portal)
# track child in current nursery
curr_actor = current_actor()
curr_actor._actoruid2nursery[subactor.uid] = actor_nursery
try:
# send additional init params # send additional init params
await chan.send({ await chan.send({
"_parent_main_data": subactor._parent_main_data, "_parent_main_data": subactor._parent_main_data,
@ -270,54 +352,148 @@ async def new_proc(
"_runtime_vars": _runtime_vars, "_runtime_vars": _runtime_vars,
}) })
# track subactor in current nursery
curr_actor = current_actor()
curr_actor._actoruid2nursery[subactor.uid] = actor_nursery
# resume caller at next checkpoint now that child is up # resume caller at next checkpoint now that child is up
task_status.started(portal) task_status.started(portal)
# wait for ActorNursery.wait() to be called # wait on actor nursery to complete
with trio.CancelScope(shield=True): # with trio.CancelScope(shield=True):
# this either completes or is cancelled
# and should only arrive once the actor nursery
# has errored or exitted.
await actor_nursery._join_procs.wait() await actor_nursery._join_procs.wait()
if portal in actor_nursery._cancel_after_result_on_exit: except (
trio.Cancelled,
# KeyboardInterrupt,
# required to collect errors from multiple subactors
trio.MultiError,
# RuntimeError,
) as cerr:
actor_nursery_cancel_called = cerr
if actor_nursery.cancelled:
log.cancel(f'{uid}: nursery cancelled before exit')
else:
log.error(f'Child {uid} was cancelled before nursery exit?')
# we were specifically cancelled by our parent nursery
with trio.CancelScope(shield=True):
if portal.channel.connected():
log.cancel(f'Sending cancel IPC-msg to {uid}')
# try to cancel the actor @ IPC level
await portal.cancel_actor()
finally:
# 2 cases:
# - actor nursery was cancelled in which case
# we want to try a soft reap of the actor via
# ipc cancellation and then failing that do a hard
# reap.
# - this is normal termination and we must wait indefinitely
# for ria and daemon actors
reaping_cancelled: bool = False
ria = portal in actor_nursery._cancel_after_result_on_exit
# this is the soft reap sequence. we can
# either collect results:
# - ria actors get them them via ``Portal.result()``
# - we wait forever on daemon actors until they're
# cancelled by user code via ``Portal.cancel_actor()``
# or ``ActorNursery.cancel(). in the latter case
# we have to expect another cancel here since
# the task spawning nurseries will both be cacelled
# by ``ActorNursery.cancel()``.
# OR, we're cancelled while collecting results, which
# case we need to try another soft cancel and reap attempt.
try:
log.cancel(f'Starting soft actor reap for {uid}')
cancel_scope = None
async with trio.open_nursery() as nursery:
if ria:
# collect any expected ``.run_in_actor()`` results
cancel_scope = await nursery.start( cancel_scope = await nursery.start(
cancel_on_completion, result_from_portal,
portal, portal,
subactor, subactor,
errors errors,
True, # cancel_on_result
) )
# Wait for proc termination but **dont' yet** call # soft & cancellable
# ``trio.Process.__aexit__()`` (it tears down stdio await reap_proc(proc)
# which will kill any waiting remote pdb trace).
# TODO: No idea how we can enforce zombie # if proc terminates before portal result
# reaping more stringently without the shield
# we used to have below...
# with trio.CancelScope(shield=True):
# async with proc:
# Always "hard" join sub procs since no actor zombies
# are allowed!
# this is a "light" (cancellable) join, the hard join is
# in the enclosing scope (see above).
await proc.wait()
log.debug(f"Joined {proc}")
# pop child entry to indicate we no longer managing this subactor
subactor, proc, portal = actor_nursery._children.pop(subactor.uid)
# cancel result waiter that may have been spawned in
# tandem if not done already
if cancel_scope: if cancel_scope:
log.warning(
"Cancelling existing result waiter task for "
f"{subactor.uid}")
cancel_scope.cancel() cancel_scope.cancel()
except (
trio.Cancelled,
# is this required to collect errors from multiple subactors?
trio.MultiError,
) as rerr:
# nursery was closed but was cancelled during normal
# reaping.
reaping_cancelled = rerr
if actor_nursery.cancelled:
log.cancel(f'Nursery cancelled during soft reap for {uid}')
# hard reap sequence
if proc.poll() is None:
log.cancel('Attempting hard reap for {uid}')
# hard reap sequence
await maybe_wait_for_debugger()
with trio.CancelScope(shield=True):
if portal.channel.connected():
# cancel the process @ the IPC level
await portal.cancel_actor()
# TODO: do we need to try the ria portals
# again?
# await result_from_portal(
# portal,
# subactor,
# errors
# )
# hard zombie lord reap, with timeout
await reap_proc(
proc,
terminate_after=2,
)
finally:
# 2 cases:
# - the actor terminated gracefully
# - we're cancelled and likely need to re-raise
while proc.poll() is None:
log.critical("ZOMBIE LORD HAS ARRIVED for your {proc}")
with trio.CancelScope(shield=True):
await reap_proc(
proc,
terminate_after=0.1,
)
log.info(f"Joined {proc}")
# pop child entry to indicate we no longer managing this
# subactor
subactor, proc, portal = actor_nursery._children.pop(
subactor.uid)
if not actor_nursery._children:
actor_nursery._all_children_reaped.set()
if actor_nursery_cancel_called:
raise actor_nursery_cancel_called
if reaping_cancelled:
raise reaping_cancelled
else: else:
# `multiprocessing` # `multiprocessing`
# async with trio.open_nursery() as nursery: # async with trio.open_nursery() as nursery:
@ -430,6 +606,9 @@ async def mp_new_proc(
# while user code is still doing it's thing. Only after the # while user code is still doing it's thing. Only after the
# nursery block closes do we allow subactor results to be # nursery block closes do we allow subactor results to be
# awaited and reported upwards to the supervisor. # awaited and reported upwards to the supervisor.
# no shield is required here (vs. above on the trio backend)
# since debug mode is not supported on mp.
await actor_nursery._join_procs.wait() await actor_nursery._join_procs.wait()
finally: finally:
@ -446,7 +625,7 @@ async def mp_new_proc(
# async with trio.open_nursery() as n: # async with trio.open_nursery() as n:
# n.cancel_scope.shield = True # n.cancel_scope.shield = True
cancel_scope = await nursery.start( cancel_scope = await nursery.start(
cancel_on_completion, result_from_portal,
portal, portal,
subactor, subactor,
errors errors