forked from goodboy/tractor
1
0
Fork 0

Add more detailed docs around nursery logic

The logic in the `ActorNursery` block is critical to cancellation
semantics and in particular, understanding how supervisor strategies are
invoked. Stick in a bunch of explanatory comments to clear up these
details and also prepare to introduce more supervisor strats besides
the current one-cancels-all approach.
try_trip^2
Tyler Goodlet 2020-01-31 09:50:25 -05:00
parent 6348121d23
commit d64508e1a6
1 changed files with 58 additions and 10 deletions

View File

@ -177,33 +177,58 @@ class ActorNursery:
@asynccontextmanager @asynccontextmanager
async def open_nursery() -> typing.AsyncGenerator[ActorNursery, None]: async def open_nursery() -> typing.AsyncGenerator[ActorNursery, None]:
"""Create and yield a new ``ActorNursery``. """Create and yield a new ``ActorNursery`` to be used for spawning
""" structured concurrent subactors.
# TODO: figure out supervisors from erlang
When an actor is spawned a new trio task is started which
invokes one of the process spawning backends to create and start
a new subprocess. These tasks are started by one of two nurseries
detailed below. The reason for spawning processes from within
a new task is because ``trio_run_in_process`` itself creates a new
internal nursery and the same task that opens a nursery **must**
close it. It turns out this approach is probably more correct
anyway since it is more clear from the following nested nurseries
which cancellation scopes correspond to each spawned subactor set.
"""
actor = current_actor() actor = current_actor()
if not actor: if not actor:
raise RuntimeError("No actor instance has been defined yet?") raise RuntimeError("No actor instance has been defined yet?")
# XXX we use these nurseries because TRIP is doing all its stuff with # the collection of errors retreived from spawned sub-actors
# an `@asynccontextmanager` which has an internal nursery *and* the
# task that opens a nursery **must also close it**.
errors: Dict[Tuple[str, str], Exception] = {} errors: Dict[Tuple[str, str], Exception] = {}
# This is the outermost level "deamon actor" nursery. It is awaited
# **after** the below inner "run in actor nursery". This allows for
# handling errors that are generated by the inner nursery in
# a supervisor strategy **before** blocking indefinitely to wait for
# actors spawned in "daemon mode" (aka started using
# ``ActorNursery.start_actor()``).
async with trio.open_nursery() as da_nursery: async with trio.open_nursery() as da_nursery:
try: try:
# This is the inner level "run in actor" nursery. It is
# awaited first since actors spawned in this way (using
# ``ActorNusery.run_in_actor()``) are expected to only
# return a single result and then complete (i.e. be canclled
# gracefully). Errors collected from these actors are
# immediately raised for handling by a supervisor strategy.
# As such if the strategy propagates any error(s) upwards
# the above "daemon actor" nursery will be notified.
async with trio.open_nursery() as ria_nursery: async with trio.open_nursery() as ria_nursery:
anursery = ActorNursery( anursery = ActorNursery(
actor, ria_nursery, da_nursery, errors actor, ria_nursery, da_nursery, errors
) )
try: try:
# spawning of actors happens in this scope after # spawning of actors happens in the caller's scope
# we yield to the caller. # after we yield upwards
yield anursery yield anursery
log.debug( log.debug(
f"Waiting on subactors {anursery._children}" f"Waiting on subactors {anursery._children}"
"to complete" "to complete"
) )
except (BaseException, Exception) as err: except (BaseException, Exception) as err:
# if the caller's scope errored then we activate our
# one-cancels-all supervisor strategy (don't
# worry more are coming).
anursery._join_procs.set() anursery._join_procs.set()
try: try:
# XXX: hypothetically an error could be raised and then # XXX: hypothetically an error could be raised and then
@ -219,24 +244,44 @@ async def open_nursery() -> typing.AsyncGenerator[ActorNursery, None]:
log.exception( log.exception(
f"Nursery for {current_actor().uid} " f"Nursery for {current_actor().uid} "
f"errored with {err}, ") f"errored with {err}, ")
# cancel all subactors
await anursery.cancel() await anursery.cancel()
except trio.MultiError as merr: except trio.MultiError as merr:
# If we receive additional errors while waiting on
# remaining subactors that were cancelled,
# aggregate those errors with the original error
# that triggered this teardown.
if err not in merr.exceptions: if err not in merr.exceptions:
raise trio.MultiError(merr.exceptions + [err]) raise trio.MultiError(merr.exceptions + [err])
else: else:
raise raise
# last bit before first nursery block ends # Last bit before first nursery block ends in the case
# where we didn't error in the caller's scope
log.debug(f"Waiting on all subactors to complete") log.debug(f"Waiting on all subactors to complete")
anursery._join_procs.set() anursery._join_procs.set()
# ria_nursery scope
# ria_nursery scope end
except (Exception, trio.MultiError) as err: except (Exception, trio.MultiError) as err:
# If actor-local error was raised while waiting on
# ".run_in_actor()" actors then we also want to cancel all
# remaining sub-actors (due to our lone strategy:
# one-cancels-all).
log.warning(f"Nursery cancelling due to {err}") log.warning(f"Nursery cancelling due to {err}")
if anursery._children: if anursery._children:
with trio.CancelScope(shield=True): with trio.CancelScope(shield=True):
await anursery.cancel() await anursery.cancel()
raise raise
finally: finally:
# No errors were raised while awaiting ".run_in_actor()"
# actors but those actors may have returned remote errors as
# results (meaning they errored remotely and have relayed
# those errors back to this parent actor). The errors are
# collected in ``errors`` so cancel all actors, summarize
# all errors and re-raise.
if errors: if errors:
if anursery._children: if anursery._children:
with trio.CancelScope(shield=True): with trio.CancelScope(shield=True):
@ -245,4 +290,7 @@ async def open_nursery() -> typing.AsyncGenerator[ActorNursery, None]:
raise trio.MultiError(tuple(errors.values())) raise trio.MultiError(tuple(errors.values()))
else: else:
raise list(errors.values())[0] raise list(errors.values())[0]
# ria_nursery scope end
log.debug(f"Nursery teardown complete") log.debug(f"Nursery teardown complete")