forked from goodboy/tractor
1
0
Fork 0

Drop error-repacking for `.run_in_actor()`s block

If we pack the nursery parent task's error into the `errors` table
directly in the handler, we don't need to specially handle packing that
same error into any exception group raised while handling sub-actor
cancellation; drops some ugly indentation ;)
eg_backup
Tyler Goodlet 2022-10-14 16:17:22 -04:00
parent 0a1bf8e57d
commit f39414ce12
1 changed files with 43 additions and 51 deletions

View File

@ -298,8 +298,12 @@ class ActorNursery:
@acm @acm
async def _open_and_supervise_one_cancels_all_nursery( async def _open_and_supervise_one_cancels_all_nursery(
actor: Actor, actor: Actor,
) -> typing.AsyncGenerator[ActorNursery, None]: ) -> typing.AsyncGenerator[ActorNursery, None]:
# TODO: yay or nay?
# __tracebackhide__ = True
# the collection of errors retreived from spawned sub-actors # the collection of errors retreived from spawned sub-actors
errors: dict[tuple[str, str], BaseException] = {} errors: dict[tuple[str, str], BaseException] = {}
@ -334,19 +338,18 @@ async def _open_and_supervise_one_cancels_all_nursery(
# after we yield upwards # after we yield upwards
yield anursery yield anursery
# When we didn't error in the caller's scope,
# signal all process-monitor-tasks to conduct
# the "hard join phase".
log.runtime( log.runtime(
f"Waiting on subactors {anursery._children} " f"Waiting on subactors {anursery._children} "
"to complete" "to complete"
) )
# Last bit before first nursery block ends in the case
# where we didn't error in the caller's scope
# signal all process monitor tasks to conduct
# hard join phase.
anursery._join_procs.set() anursery._join_procs.set()
except BaseException as err: except BaseException as inner_err:
errors[actor.uid] = inner_err
# If we error in the root but the debugger is # If we error in the root but the debugger is
# engaged we don't want to prematurely kill (and # engaged we don't want to prematurely kill (and
# thus clobber access to) the local tty since it # thus clobber access to) the local tty since it
@ -362,55 +365,45 @@ async def _open_and_supervise_one_cancels_all_nursery(
# worry more are coming). # worry more are coming).
anursery._join_procs.set() anursery._join_procs.set()
try: # XXX: hypothetically an error could be
# XXX: hypothetically an error could be # raised and then a cancel signal shows up
# raised and then a cancel signal shows up # slightly after in which case the `else:`
# slightly after in which case the `else:` # block here might not complete? For now,
# block here might not complete? For now, # shield both.
# shield both. with trio.CancelScope(shield=True):
with trio.CancelScope(shield=True): etype = type(inner_err)
etype = type(err) if etype in (
if etype in ( trio.Cancelled,
trio.Cancelled, KeyboardInterrupt
KeyboardInterrupt ) or (
) or ( is_multi_cancelled(inner_err)
is_multi_cancelled(err) ):
): log.cancel(
log.cancel( f"Nursery for {current_actor().uid} "
f"Nursery for {current_actor().uid} " f"was cancelled with {etype}")
f"was cancelled with {etype}") else:
else: log.exception(
log.exception( f"Nursery for {current_actor().uid} "
f"Nursery for {current_actor().uid} " f"errored with")
f"errored with")
# cancel all subactors # cancel all subactors
await anursery.cancel() await anursery.cancel()
except BaseExceptionGroup as merr: # ria_nursery scope end
# If we receive additional errors while waiting on
# remaining subactors that were cancelled,
# aggregate those errors with the original error
# that triggered this teardown.
if err not in merr.exceptions:
raise BaseExceptionGroup(
'tractor.ActorNursery errored with',
list(merr.exceptions) + [err],
)
else:
raise
# ria_nursery scope end # TODO: this is the handler around the ``.run_in_actor()``
# nursery. Ideally we can drop this entirely in the future as
# XXX: do we need a `trio.Cancelled` catch here as well? # the whole ``.run_in_actor()`` API should be built "on top of"
# this is the catch around the ``.run_in_actor()`` nursery # this lower level spawn-request-cancel "daemon actor" API where
# a local in-actor task nursery is used with one-to-one task
# + `await Portal.run()` calls and the results/errors are
# handled directly (inline) and errors by the local nursery.
except ( except (
Exception, Exception,
BaseExceptionGroup, BaseExceptionGroup,
trio.Cancelled trio.Cancelled
) as err: # noqa ) as err:
errors[actor.uid] = err
# XXX: yet another guard before allowing the cancel # XXX: yet another guard before allowing the cancel
# sequence in case a (single) child is in debug. # sequence in case a (single) child is in debug.
@ -448,9 +441,8 @@ async def _open_and_supervise_one_cancels_all_nursery(
else: else:
raise list(errors.values())[0] raise list(errors.values())[0]
# ria_nursery scope end - nursery checkpoint # da_nursery scope end - nursery checkpoint
# final exit
# after nursery exit
@acm @acm