From 0bb2163b0c9b0575e9632685ed8a162d77db38dd Mon Sep 17 00:00:00 2001 From: Tyler Goodlet Date: Sat, 2 Jan 2021 21:35:47 -0500 Subject: [PATCH] Implicitly open root actor on first nursery use. --- tractor/_trionics.py | 204 ++++++++++++++++++++++++------------------- 1 file changed, 115 insertions(+), 89 deletions(-) diff --git a/tractor/_trionics.py b/tractor/_trionics.py index 0d54b24..b6fe0dc 100644 --- a/tractor/_trionics.py +++ b/tractor/_trionics.py @@ -5,15 +5,17 @@ from functools import partial import multiprocessing as mp from typing import Tuple, List, Dict, Optional, Any import typing +from contextlib import AsyncExitStack import trio from async_generator import asynccontextmanager -from ._state import current_actor +from ._state import current_actor, is_root_process, is_main_process from .log import get_logger, get_loglevel from ._actor import Actor from ._portal import Portal from ._exceptions import is_multi_cancelled +from ._root import open_root_actor from . import _state from . import _spawn @@ -186,7 +188,9 @@ class ActorNursery: @asynccontextmanager -async def open_nursery() -> typing.AsyncGenerator[ActorNursery, None]: +async def open_nursery( + **kwargs, +) -> typing.AsyncGenerator[ActorNursery, None]: """Create and yield a new ``ActorNursery`` to be used for spawning structured concurrent subactors. @@ -200,9 +204,23 @@ async def open_nursery() -> typing.AsyncGenerator[ActorNursery, None]: anyway since it is more clear from the following nested nurseries which cancellation scopes correspond to each spawned subactor set. """ - actor = current_actor() - if not actor: - raise RuntimeError("No actor instance has been defined yet?") + implicit_runtime = False + + actor = current_actor(err_on_no_runtime=False) + + if actor is None and is_main_process(): + + # if we are the parent process start the actor runtime implicitly + log.info("Starting actor runtime!") + root_runtime_stack = AsyncExitStack() + actor = await root_runtime_stack.enter_async_context( + open_root_actor(**kwargs) + ) + assert actor is current_actor() + + # mark us for teardown on exit + implicit_runtime = True + # the collection of errors retreived from spawned sub-actors errors: Dict[Tuple[str, str], Exception] = {} @@ -213,100 +231,108 @@ async def open_nursery() -> typing.AsyncGenerator[ActorNursery, None]: # a supervisor strategy **before** blocking indefinitely to wait for # actors spawned in "daemon mode" (aka started using # ``ActorNursery.start_actor()``). - async with trio.open_nursery() as da_nursery: - try: - # This is the inner level "run in actor" nursery. It is - # awaited first since actors spawned in this way (using - # ``ActorNusery.run_in_actor()``) are expected to only - # return a single result and then complete (i.e. be canclled - # gracefully). Errors collected from these actors are - # immediately raised for handling by a supervisor strategy. - # As such if the strategy propagates any error(s) upwards - # the above "daemon actor" nursery will be notified. - async with trio.open_nursery() as ria_nursery: - anursery = ActorNursery( - actor, ria_nursery, da_nursery, errors - ) - try: - # spawning of actors happens in the caller's scope - # after we yield upwards - yield anursery - log.debug( - f"Waiting on subactors {anursery._children} " - "to complete" + try: + async with trio.open_nursery() as da_nursery: + try: + # This is the inner level "run in actor" nursery. It is + # awaited first since actors spawned in this way (using + # ``ActorNusery.run_in_actor()``) are expected to only + # return a single result and then complete (i.e. be canclled + # gracefully). Errors collected from these actors are + # immediately raised for handling by a supervisor strategy. + # As such if the strategy propagates any error(s) upwards + # the above "daemon actor" nursery will be notified. + async with trio.open_nursery() as ria_nursery: + anursery = ActorNursery( + actor, ria_nursery, da_nursery, errors ) - except BaseException as err: - # if the caller's scope errored then we activate our - # one-cancels-all supervisor strategy (don't - # worry more are coming). - anursery._join_procs.set() try: - # XXX: hypothetically an error could be raised and then - # a cancel signal shows up slightly after in which case - # the `else:` block here might not complete? - # For now, shield both. - with trio.CancelScope(shield=True): - etype = type(err) - if etype in (trio.Cancelled, KeyboardInterrupt) or ( - is_multi_cancelled(err) - ): - log.warning( - f"Nursery for {current_actor().uid} was " - f"cancelled with {etype}") - else: - log.exception( - f"Nursery for {current_actor().uid} " - f"errored with {err}, ") + # spawning of actors happens in the caller's scope + # after we yield upwards + yield anursery + log.debug( + f"Waiting on subactors {anursery._children} " + "to complete" + ) + except BaseException as err: + # if the caller's scope errored then we activate our + # one-cancels-all supervisor strategy (don't + # worry more are coming). + anursery._join_procs.set() + try: + # XXX: hypothetically an error could be raised and then + # a cancel signal shows up slightly after in which case + # the `else:` block here might not complete? + # For now, shield both. + with trio.CancelScope(shield=True): + etype = type(err) + if etype in (trio.Cancelled, KeyboardInterrupt) or ( + is_multi_cancelled(err) + ): + log.warning( + f"Nursery for {current_actor().uid} was " + f"cancelled with {etype}") + else: + log.exception( + f"Nursery for {current_actor().uid} " + f"errored with {err}, ") - # cancel all subactors - await anursery.cancel() + # cancel all subactors + await anursery.cancel() - except trio.MultiError as merr: - # If we receive additional errors while waiting on - # remaining subactors that were cancelled, - # aggregate those errors with the original error - # that triggered this teardown. - if err not in merr.exceptions: - raise trio.MultiError(merr.exceptions + [err]) - else: - raise + except trio.MultiError as merr: + # If we receive additional errors while waiting on + # remaining subactors that were cancelled, + # aggregate those errors with the original error + # that triggered this teardown. + if err not in merr.exceptions: + raise trio.MultiError(merr.exceptions + [err]) + else: + raise - # Last bit before first nursery block ends in the case - # where we didn't error in the caller's scope - log.debug("Waiting on all subactors to complete") - anursery._join_procs.set() + # Last bit before first nursery block ends in the case + # where we didn't error in the caller's scope + log.debug("Waiting on all subactors to complete") + anursery._join_procs.set() - # ria_nursery scope end + # ria_nursery scope end - # XXX: do we need a `trio.Cancelled` catch here as well? - except (Exception, trio.MultiError, trio.Cancelled) as err: - # If actor-local error was raised while waiting on - # ".run_in_actor()" actors then we also want to cancel all - # remaining sub-actors (due to our lone strategy: - # one-cancels-all). - log.warning(f"Nursery cancelling due to {err}") - if anursery._children: - with trio.CancelScope(shield=True): - await anursery.cancel() - raise - finally: - # No errors were raised while awaiting ".run_in_actor()" - # actors but those actors may have returned remote errors as - # results (meaning they errored remotely and have relayed - # those errors back to this parent actor). The errors are - # collected in ``errors`` so cancel all actors, summarize - # all errors and re-raise. - if errors: + # XXX: do we need a `trio.Cancelled` catch here as well? + except (Exception, trio.MultiError, trio.Cancelled) as err: + # If actor-local error was raised while waiting on + # ".run_in_actor()" actors then we also want to cancel all + # remaining sub-actors (due to our lone strategy: + # one-cancels-all). + log.warning(f"Nursery cancelling due to {err}") if anursery._children: with trio.CancelScope(shield=True): await anursery.cancel() + raise + finally: + # No errors were raised while awaiting ".run_in_actor()" + # actors but those actors may have returned remote errors as + # results (meaning they errored remotely and have relayed + # those errors back to this parent actor). The errors are + # collected in ``errors`` so cancel all actors, summarize + # all errors and re-raise. + if errors: + if anursery._children: + with trio.CancelScope(shield=True): + await anursery.cancel() - # use `MultiError` as needed - if len(errors) > 1: - raise trio.MultiError(tuple(errors.values())) - else: - raise list(errors.values())[0] + # use `MultiError` as needed + if len(errors) > 1: + raise trio.MultiError(tuple(errors.values())) + else: + raise list(errors.values())[0] - # ria_nursery scope end + # ria_nursery scope end - nursery checkpoint - log.debug("Nursery teardown complete") + # after nursery exit + finally: + log.debug("Nursery teardown complete") + + # shutdown runtime if it was started + if implicit_runtime: + log.info("Shutting down actor tree") + await root_runtime_stack.aclose()