Vastly improve error triggered cancellation
At the expense of a bit more complexity in `ActorNursery.wait()` (which I commented the heck out of fwiw) this adds far superior and correct cancellation semantics for when a nursery is cancelled due to (remote) errors in subactors. This includes: - `wait()` will now raise a `trio.MultiError` if multiple subactors error with the same semantics as in `trio`. - in `wait()` portals which are paired with `run_in_actor()` spawned subactors (versus `start_actor()`) are waited on separately and if the nursery **hasn't** been cancelled but there are errors those are raised immediately before waiting on `start_actor()` subactors which will block indefinitely if they haven't been explicitly cancelled. - if `wait()` does raise when the nursery hasn't yet been cancelled it's expected that it will be called again depending on the actor supervision strategy (i.e. right now we operate with a one-cancels-all strategy, the same as `trio`, so `ActorNursery.__aexit__() calls `cancel()` if any error is raised by `wait()`). Oh and I added `is_main_process()` helper; can't remember why..improved_errors
parent
e75b25dc21
commit
835d1fa07a
|
@ -32,7 +32,8 @@ class ActorNursery:
|
||||||
Tuple[str, str],
|
Tuple[str, str],
|
||||||
Tuple[Actor, mp.Process, Optional[Portal]]
|
Tuple[Actor, mp.Process, Optional[Portal]]
|
||||||
] = {}
|
] = {}
|
||||||
# portals spawned with ``run_in_actor()``
|
# portals spawned with ``run_in_actor()`` are
|
||||||
|
# cancelled when their "main" result arrives
|
||||||
self._cancel_after_result_on_exit: set = set()
|
self._cancel_after_result_on_exit: set = set()
|
||||||
self.cancelled: bool = False
|
self.cancelled: bool = False
|
||||||
self._forkserver: forkserver.ForkServer = None
|
self._forkserver: forkserver.ForkServer = None
|
||||||
|
@ -132,6 +133,8 @@ class ActorNursery:
|
||||||
bind_addr=bind_addr,
|
bind_addr=bind_addr,
|
||||||
statespace=statespace,
|
statespace=statespace,
|
||||||
)
|
)
|
||||||
|
# this marks the actor to be cancelled after its portal result
|
||||||
|
# is retreived, see ``wait()`` below.
|
||||||
self._cancel_after_result_on_exit.add(portal)
|
self._cancel_after_result_on_exit.add(portal)
|
||||||
await portal._submit_for_result(
|
await portal._submit_for_result(
|
||||||
mod_path,
|
mod_path,
|
||||||
|
@ -142,29 +145,65 @@ class ActorNursery:
|
||||||
|
|
||||||
async def wait(self) -> None:
|
async def wait(self) -> None:
|
||||||
"""Wait for all subactors to complete.
|
"""Wait for all subactors to complete.
|
||||||
|
|
||||||
|
This is probably the most complicated (and confusing, sorry)
|
||||||
|
function that does all the clever crap to deal with cancellation,
|
||||||
|
error propagation, and graceful subprocess tear down.
|
||||||
"""
|
"""
|
||||||
async def maybe_consume_result(portal, actor):
|
async def exhaust_portal(portal, actor):
|
||||||
if (
|
"""Pull final result from portal (assuming it was one).
|
||||||
portal in self._cancel_after_result_on_exit and
|
|
||||||
(portal._result is None and portal._exc is None)
|
If the main task is an async generator do our best to consume
|
||||||
):
|
what's left of it.
|
||||||
log.debug(f"Waiting on final result from {subactor.uid}")
|
"""
|
||||||
res = await portal.result()
|
try:
|
||||||
# if it's an async-gen then we should alert the user
|
log.debug(f"Waiting on final result from {actor.uid}")
|
||||||
# that we're cancelling it
|
final = res = await portal.result()
|
||||||
|
# if it's an async-gen then alert that we're cancelling it
|
||||||
if inspect.isasyncgen(res):
|
if inspect.isasyncgen(res):
|
||||||
|
final = []
|
||||||
log.warning(
|
log.warning(
|
||||||
f"Blindly consuming asyncgen for {actor.uid}")
|
f"Blindly consuming asyncgen for {actor.uid}")
|
||||||
with trio.fail_after(1):
|
with trio.fail_after(1):
|
||||||
async with aclosing(res) as agen:
|
async with aclosing(res) as agen:
|
||||||
async for item in agen:
|
async for item in agen:
|
||||||
log.debug(f"Consuming item {item}")
|
log.debug(f"Consuming item {item}")
|
||||||
|
final.append(item)
|
||||||
|
except Exception as err:
|
||||||
|
# we reraise in the parent task via a ``trio.MultiError``
|
||||||
|
return err
|
||||||
|
else:
|
||||||
|
return final
|
||||||
|
|
||||||
|
async def cancel_on_completion(
|
||||||
|
portal: Portal,
|
||||||
|
actor: Actor,
|
||||||
|
task_status=trio.TASK_STATUS_IGNORED,
|
||||||
|
) -> None:
|
||||||
|
"""Cancel actor gracefully once it's "main" portal's
|
||||||
|
result arrives.
|
||||||
|
|
||||||
|
Should only be called for actors spawned with `run_in_actor()`.
|
||||||
|
"""
|
||||||
|
with trio.open_cancel_scope() as cs:
|
||||||
|
task_status.started(cs)
|
||||||
|
# this may error in which case we expect the far end
|
||||||
|
# actor to have already terminated itself
|
||||||
|
result = await exhaust_portal(portal, actor)
|
||||||
|
if isinstance(result, Exception):
|
||||||
|
errors.append(result)
|
||||||
|
log.info(f"Cancelling {portal.channel.uid} gracefully")
|
||||||
|
await portal.cancel_actor()
|
||||||
|
|
||||||
|
if cs.cancelled_caught:
|
||||||
|
log.warning(
|
||||||
|
"Result waiter was cancelled, process may have died")
|
||||||
|
|
||||||
async def wait_for_proc(
|
async def wait_for_proc(
|
||||||
proc: mp.Process,
|
proc: mp.Process,
|
||||||
actor: Actor,
|
actor: Actor,
|
||||||
portal: Portal,
|
portal: Portal,
|
||||||
cancel_scope: trio._core._run.CancelScope,
|
cancel_scope: Optional[trio._core._run.CancelScope] = None,
|
||||||
) -> None:
|
) -> None:
|
||||||
# TODO: timeout block here?
|
# TODO: timeout block here?
|
||||||
if proc.is_alive():
|
if proc.is_alive():
|
||||||
|
@ -172,42 +211,57 @@ class ActorNursery:
|
||||||
# please god don't hang
|
# please god don't hang
|
||||||
proc.join()
|
proc.join()
|
||||||
log.debug(f"Joined {proc}")
|
log.debug(f"Joined {proc}")
|
||||||
await maybe_consume_result(portal, actor)
|
|
||||||
|
|
||||||
self._children.pop(actor.uid)
|
self._children.pop(actor.uid)
|
||||||
# proc terminated, cancel result waiter
|
|
||||||
|
# proc terminated, cancel result waiter that may have
|
||||||
|
# been spawned in tandem
|
||||||
if cancel_scope:
|
if cancel_scope:
|
||||||
log.warning(
|
log.warning(
|
||||||
f"Cancelling existing result waiter task for {actor.uid}")
|
f"Cancelling existing result waiter task for {actor.uid}")
|
||||||
cancel_scope.cancel()
|
cancel_scope.cancel()
|
||||||
|
|
||||||
async def wait_for_actor(
|
log.debug(f"Waiting on all subactors to complete")
|
||||||
portal: Portal,
|
|
||||||
actor: Actor,
|
|
||||||
task_status=trio.TASK_STATUS_IGNORED,
|
|
||||||
) -> None:
|
|
||||||
# cancel the actor gracefully
|
|
||||||
with trio.open_cancel_scope() as cs:
|
|
||||||
task_status.started(cs)
|
|
||||||
await maybe_consume_result(portal, actor)
|
|
||||||
log.info(f"Cancelling {portal.channel.uid} gracefully")
|
|
||||||
await portal.cancel_actor()
|
|
||||||
|
|
||||||
if cs.cancelled_caught:
|
|
||||||
log.warning("Result waiter was cancelled")
|
|
||||||
|
|
||||||
# unblocks when all waiter tasks have completed
|
|
||||||
children = self._children.copy()
|
children = self._children.copy()
|
||||||
|
errors = []
|
||||||
|
# wait on run_in_actor() tasks, unblocks when all complete
|
||||||
async with trio.open_nursery() as nursery:
|
async with trio.open_nursery() as nursery:
|
||||||
for subactor, proc, portal in children.values():
|
for subactor, proc, portal in children.values():
|
||||||
cs = None
|
cs = None
|
||||||
|
# portal from ``run_in_actor()``
|
||||||
if portal in self._cancel_after_result_on_exit:
|
if portal in self._cancel_after_result_on_exit:
|
||||||
cs = await nursery.start(wait_for_actor, portal, subactor)
|
cs = await nursery.start(
|
||||||
|
cancel_on_completion, portal, subactor)
|
||||||
|
# TODO: how do we handle remote host spawned actors?
|
||||||
|
nursery.start_soon(
|
||||||
|
wait_for_proc, proc, subactor, portal, cs)
|
||||||
|
|
||||||
|
if errors:
|
||||||
|
if not self.cancelled:
|
||||||
|
# halt here and expect to be called again once the nursery
|
||||||
|
# has been cancelled externally (ex. from within __aexit__()
|
||||||
|
# if an error is captured from ``wait()`` then ``cancel()``
|
||||||
|
# is called immediately after which in turn calls ``wait()``
|
||||||
|
# again.)
|
||||||
|
raise trio.MultiError(errors)
|
||||||
|
|
||||||
|
# wait on all `start_actor()` subactors to complete
|
||||||
|
# if errors were captured above and we have not been cancelled
|
||||||
|
# then these ``start_actor()`` spawned actors will block until
|
||||||
|
# cancelled externally
|
||||||
|
children = self._children.copy()
|
||||||
|
async with trio.open_nursery() as nursery:
|
||||||
|
for subactor, proc, portal in children.values():
|
||||||
|
# TODO: how do we handle remote host spawned actors?
|
||||||
nursery.start_soon(wait_for_proc, proc, subactor, portal, cs)
|
nursery.start_soon(wait_for_proc, proc, subactor, portal, cs)
|
||||||
|
|
||||||
|
log.debug(f"All subactors for {self} have terminated")
|
||||||
|
if errors:
|
||||||
|
# always raise any error if we're also cancelled
|
||||||
|
raise trio.MultiError(errors)
|
||||||
|
|
||||||
async def cancel(self, hard_kill: bool = False) -> None:
|
async def cancel(self, hard_kill: bool = False) -> None:
|
||||||
"""Cancel this nursery by instructing each subactor to cancel
|
"""Cancel this nursery by instructing each subactor to cancel
|
||||||
iteslf and wait for all subprocesses to terminate.
|
itself and wait for all subactors to terminate.
|
||||||
|
|
||||||
If ``hard_killl`` is set to ``True`` then kill the processes
|
If ``hard_killl`` is set to ``True`` then kill the processes
|
||||||
directly without any far end graceful ``trio`` cancellation.
|
directly without any far end graceful ``trio`` cancellation.
|
||||||
|
@ -234,56 +288,57 @@ class ActorNursery:
|
||||||
# channel/portal should now be up
|
# channel/portal should now be up
|
||||||
_, _, portal = self._children[subactor.uid]
|
_, _, portal = self._children[subactor.uid]
|
||||||
if portal is None:
|
if portal is None:
|
||||||
# cancelled while waiting on the event?
|
# cancelled while waiting on the event
|
||||||
|
# to arrive
|
||||||
chan = self._actor._peers[subactor.uid][-1]
|
chan = self._actor._peers[subactor.uid][-1]
|
||||||
if chan:
|
if chan:
|
||||||
portal = Portal(chan)
|
portal = Portal(chan)
|
||||||
else: # there's no other choice left
|
else: # there's no other choice left
|
||||||
do_hard_kill(proc)
|
do_hard_kill(proc)
|
||||||
|
|
||||||
# spawn cancel tasks async
|
# spawn cancel tasks
|
||||||
assert portal
|
assert portal
|
||||||
n.start_soon(portal.cancel_actor)
|
n.start_soon(portal.cancel_actor)
|
||||||
|
|
||||||
log.debug(f"Waiting on all subactors to complete")
|
# mark ourselves as having (tried to have) cancelled all subactors
|
||||||
await self.wait()
|
|
||||||
self.cancelled = True
|
self.cancelled = True
|
||||||
log.debug(f"All subactors for {self} have terminated")
|
await self.wait()
|
||||||
|
|
||||||
async def __aexit__(self, etype, value, tb):
|
async def __aexit__(self, etype, value, tb):
|
||||||
"""Wait on all subactor's main routines to complete.
|
"""Wait on all subactor's main routines to complete.
|
||||||
"""
|
"""
|
||||||
try:
|
if etype is not None:
|
||||||
if etype is not None:
|
try:
|
||||||
# XXX: hypothetically an error could be raised and then
|
# XXX: hypothetically an error could be raised and then
|
||||||
# a cancel signal shows up slightly after in which case the
|
# a cancel signal shows up slightly after in which case
|
||||||
# else block here might not complete? Should both be shielded?
|
# the `else:` block here might not complete?
|
||||||
|
# For now, shield both.
|
||||||
with trio.open_cancel_scope(shield=True):
|
with trio.open_cancel_scope(shield=True):
|
||||||
if etype is trio.Cancelled:
|
if etype is trio.Cancelled:
|
||||||
log.warning(
|
log.warning(
|
||||||
f"{current_actor().uid} was cancelled with {etype}"
|
f"Nursery for {current_actor().uid} was "
|
||||||
", cancelling actor nursery")
|
f"cancelled with {etype}")
|
||||||
await self.cancel()
|
|
||||||
else:
|
else:
|
||||||
log.exception(
|
log.exception(
|
||||||
f"{current_actor().uid} errored with {etype}, "
|
f"Nursery for {current_actor().uid} "
|
||||||
"cancelling actor nursery")
|
f"errored with {etype}, ")
|
||||||
await self.cancel()
|
|
||||||
else:
|
|
||||||
# XXX: this is effectively the lone cancellation/supervisor
|
|
||||||
# strategy which exactly mimicks trio's behaviour
|
|
||||||
log.debug(f"Waiting on subactors {self._children} to complete")
|
|
||||||
try:
|
|
||||||
await self.wait()
|
|
||||||
except Exception as err:
|
|
||||||
log.warning(f"Nursery caught {err}, cancelling")
|
|
||||||
await self.cancel()
|
await self.cancel()
|
||||||
raise
|
except trio.MultiError as merr:
|
||||||
log.debug(f"Nursery teardown complete")
|
if value not in merr.exceptions:
|
||||||
except Exception:
|
raise trio.MultiError(merr.exceptions + [value])
|
||||||
log.exception("Error on nursery exit:")
|
raise
|
||||||
await self.wait()
|
else:
|
||||||
raise
|
# XXX: this is effectively the (for now) lone
|
||||||
|
# cancellation/supervisor strategy which exactly
|
||||||
|
# mimicks trio's behaviour
|
||||||
|
log.debug(f"Waiting on subactors {self._children} to complete")
|
||||||
|
try:
|
||||||
|
await self.wait()
|
||||||
|
except Exception as err:
|
||||||
|
log.warning(f"Nursery caught {err}, cancelling")
|
||||||
|
await self.cancel()
|
||||||
|
raise
|
||||||
|
log.debug(f"Nursery teardown complete")
|
||||||
|
|
||||||
|
|
||||||
@asynccontextmanager
|
@asynccontextmanager
|
||||||
|
@ -297,3 +352,8 @@ async def open_nursery() -> typing.AsyncGenerator[ActorNursery, None]:
|
||||||
# TODO: figure out supervisors from erlang
|
# TODO: figure out supervisors from erlang
|
||||||
async with ActorNursery(current_actor()) as nursery:
|
async with ActorNursery(current_actor()) as nursery:
|
||||||
yield nursery
|
yield nursery
|
||||||
|
|
||||||
|
|
||||||
|
def is_main_process():
|
||||||
|
"Bool determining if this actor is running in the top-most process."
|
||||||
|
return mp.current_process().name == 'MainProcess'
|
||||||
|
|
Loading…
Reference in New Issue