forked from goodboy/tractor
1
0
Fork 0

Implicitly open root actor on first nursery use.

implicit_runtime
Tyler Goodlet 2021-01-02 21:35:47 -05:00
parent bd3059f01b
commit 0bb2163b0c
1 changed files with 115 additions and 89 deletions

View File

@ -5,15 +5,17 @@ from functools import partial
import multiprocessing as mp import multiprocessing as mp
from typing import Tuple, List, Dict, Optional, Any from typing import Tuple, List, Dict, Optional, Any
import typing import typing
from contextlib import AsyncExitStack
import trio import trio
from async_generator import asynccontextmanager from async_generator import asynccontextmanager
from ._state import current_actor from ._state import current_actor, is_root_process, is_main_process
from .log import get_logger, get_loglevel from .log import get_logger, get_loglevel
from ._actor import Actor from ._actor import Actor
from ._portal import Portal from ._portal import Portal
from ._exceptions import is_multi_cancelled from ._exceptions import is_multi_cancelled
from ._root import open_root_actor
from . import _state from . import _state
from . import _spawn from . import _spawn
@ -186,7 +188,9 @@ class ActorNursery:
@asynccontextmanager @asynccontextmanager
async def open_nursery() -> typing.AsyncGenerator[ActorNursery, None]: async def open_nursery(
**kwargs,
) -> typing.AsyncGenerator[ActorNursery, None]:
"""Create and yield a new ``ActorNursery`` to be used for spawning """Create and yield a new ``ActorNursery`` to be used for spawning
structured concurrent subactors. structured concurrent subactors.
@ -200,9 +204,23 @@ async def open_nursery() -> typing.AsyncGenerator[ActorNursery, None]:
anyway since it is more clear from the following nested nurseries anyway since it is more clear from the following nested nurseries
which cancellation scopes correspond to each spawned subactor set. which cancellation scopes correspond to each spawned subactor set.
""" """
actor = current_actor() implicit_runtime = False
if not actor:
raise RuntimeError("No actor instance has been defined yet?") actor = current_actor(err_on_no_runtime=False)
if actor is None and is_main_process():
# if we are the parent process start the actor runtime implicitly
log.info("Starting actor runtime!")
root_runtime_stack = AsyncExitStack()
actor = await root_runtime_stack.enter_async_context(
open_root_actor(**kwargs)
)
assert actor is current_actor()
# mark us for teardown on exit
implicit_runtime = True
# the collection of errors retreived from spawned sub-actors # the collection of errors retreived from spawned sub-actors
errors: Dict[Tuple[str, str], Exception] = {} errors: Dict[Tuple[str, str], Exception] = {}
@ -213,100 +231,108 @@ async def open_nursery() -> typing.AsyncGenerator[ActorNursery, None]:
# a supervisor strategy **before** blocking indefinitely to wait for # a supervisor strategy **before** blocking indefinitely to wait for
# actors spawned in "daemon mode" (aka started using # actors spawned in "daemon mode" (aka started using
# ``ActorNursery.start_actor()``). # ``ActorNursery.start_actor()``).
async with trio.open_nursery() as da_nursery: try:
try: async with trio.open_nursery() as da_nursery:
# This is the inner level "run in actor" nursery. It is try:
# awaited first since actors spawned in this way (using # This is the inner level "run in actor" nursery. It is
# ``ActorNusery.run_in_actor()``) are expected to only # awaited first since actors spawned in this way (using
# return a single result and then complete (i.e. be canclled # ``ActorNusery.run_in_actor()``) are expected to only
# gracefully). Errors collected from these actors are # return a single result and then complete (i.e. be canclled
# immediately raised for handling by a supervisor strategy. # gracefully). Errors collected from these actors are
# As such if the strategy propagates any error(s) upwards # immediately raised for handling by a supervisor strategy.
# the above "daemon actor" nursery will be notified. # As such if the strategy propagates any error(s) upwards
async with trio.open_nursery() as ria_nursery: # the above "daemon actor" nursery will be notified.
anursery = ActorNursery( async with trio.open_nursery() as ria_nursery:
actor, ria_nursery, da_nursery, errors anursery = ActorNursery(
) actor, ria_nursery, da_nursery, errors
try:
# spawning of actors happens in the caller's scope
# after we yield upwards
yield anursery
log.debug(
f"Waiting on subactors {anursery._children} "
"to complete"
) )
except BaseException as err:
# if the caller's scope errored then we activate our
# one-cancels-all supervisor strategy (don't
# worry more are coming).
anursery._join_procs.set()
try: try:
# XXX: hypothetically an error could be raised and then # spawning of actors happens in the caller's scope
# a cancel signal shows up slightly after in which case # after we yield upwards
# the `else:` block here might not complete? yield anursery
# For now, shield both. log.debug(
with trio.CancelScope(shield=True): f"Waiting on subactors {anursery._children} "
etype = type(err) "to complete"
if etype in (trio.Cancelled, KeyboardInterrupt) or ( )
is_multi_cancelled(err) except BaseException as err:
): # if the caller's scope errored then we activate our
log.warning( # one-cancels-all supervisor strategy (don't
f"Nursery for {current_actor().uid} was " # worry more are coming).
f"cancelled with {etype}") anursery._join_procs.set()
else: try:
log.exception( # XXX: hypothetically an error could be raised and then
f"Nursery for {current_actor().uid} " # a cancel signal shows up slightly after in which case
f"errored with {err}, ") # the `else:` block here might not complete?
# For now, shield both.
with trio.CancelScope(shield=True):
etype = type(err)
if etype in (trio.Cancelled, KeyboardInterrupt) or (
is_multi_cancelled(err)
):
log.warning(
f"Nursery for {current_actor().uid} was "
f"cancelled with {etype}")
else:
log.exception(
f"Nursery for {current_actor().uid} "
f"errored with {err}, ")
# cancel all subactors # cancel all subactors
await anursery.cancel() await anursery.cancel()
except trio.MultiError as merr: except trio.MultiError as merr:
# If we receive additional errors while waiting on # If we receive additional errors while waiting on
# remaining subactors that were cancelled, # remaining subactors that were cancelled,
# aggregate those errors with the original error # aggregate those errors with the original error
# that triggered this teardown. # that triggered this teardown.
if err not in merr.exceptions: if err not in merr.exceptions:
raise trio.MultiError(merr.exceptions + [err]) raise trio.MultiError(merr.exceptions + [err])
else: else:
raise raise
# Last bit before first nursery block ends in the case # Last bit before first nursery block ends in the case
# where we didn't error in the caller's scope # where we didn't error in the caller's scope
log.debug("Waiting on all subactors to complete") log.debug("Waiting on all subactors to complete")
anursery._join_procs.set() anursery._join_procs.set()
# ria_nursery scope end # ria_nursery scope end
# XXX: do we need a `trio.Cancelled` catch here as well? # XXX: do we need a `trio.Cancelled` catch here as well?
except (Exception, trio.MultiError, trio.Cancelled) as err: except (Exception, trio.MultiError, trio.Cancelled) as err:
# If actor-local error was raised while waiting on # If actor-local error was raised while waiting on
# ".run_in_actor()" actors then we also want to cancel all # ".run_in_actor()" actors then we also want to cancel all
# remaining sub-actors (due to our lone strategy: # remaining sub-actors (due to our lone strategy:
# one-cancels-all). # one-cancels-all).
log.warning(f"Nursery cancelling due to {err}") log.warning(f"Nursery cancelling due to {err}")
if anursery._children:
with trio.CancelScope(shield=True):
await anursery.cancel()
raise
finally:
# No errors were raised while awaiting ".run_in_actor()"
# actors but those actors may have returned remote errors as
# results (meaning they errored remotely and have relayed
# those errors back to this parent actor). The errors are
# collected in ``errors`` so cancel all actors, summarize
# all errors and re-raise.
if errors:
if anursery._children: if anursery._children:
with trio.CancelScope(shield=True): with trio.CancelScope(shield=True):
await anursery.cancel() await anursery.cancel()
raise
finally:
# No errors were raised while awaiting ".run_in_actor()"
# actors but those actors may have returned remote errors as
# results (meaning they errored remotely and have relayed
# those errors back to this parent actor). The errors are
# collected in ``errors`` so cancel all actors, summarize
# all errors and re-raise.
if errors:
if anursery._children:
with trio.CancelScope(shield=True):
await anursery.cancel()
# use `MultiError` as needed # use `MultiError` as needed
if len(errors) > 1: if len(errors) > 1:
raise trio.MultiError(tuple(errors.values())) raise trio.MultiError(tuple(errors.values()))
else: else:
raise list(errors.values())[0] raise list(errors.values())[0]
# ria_nursery scope end # ria_nursery scope end - nursery checkpoint
log.debug("Nursery teardown complete") # after nursery exit
finally:
log.debug("Nursery teardown complete")
# shutdown runtime if it was started
if implicit_runtime:
log.info("Shutting down actor tree")
await root_runtime_stack.aclose()