diff --git a/tractor/_trionics.py b/tractor/_trionics.py index 4bb7467..17ae548 100644 --- a/tractor/_trionics.py +++ b/tractor/_trionics.py @@ -177,33 +177,58 @@ class ActorNursery: @asynccontextmanager async def open_nursery() -> typing.AsyncGenerator[ActorNursery, None]: - """Create and yield a new ``ActorNursery``. - """ - # TODO: figure out supervisors from erlang + """Create and yield a new ``ActorNursery`` to be used for spawning + structured concurrent subactors. + When an actor is spawned a new trio task is started which + invokes one of the process spawning backends to create and start + a new subprocess. These tasks are started by one of two nurseries + detailed below. The reason for spawning processes from within + a new task is because ``trio_run_in_process`` itself creates a new + internal nursery and the same task that opens a nursery **must** + close it. It turns out this approach is probably more correct + anyway since it is more clear from the following nested nurseries + which cancellation scopes correspond to each spawned subactor set. + """ actor = current_actor() if not actor: raise RuntimeError("No actor instance has been defined yet?") - # XXX we use these nurseries because TRIP is doing all its stuff with - # an `@asynccontextmanager` which has an internal nursery *and* the - # task that opens a nursery **must also close it**. + # the collection of errors retreived from spawned sub-actors errors: Dict[Tuple[str, str], Exception] = {} + + # This is the outermost level "deamon actor" nursery. It is awaited + # **after** the below inner "run in actor nursery". This allows for + # handling errors that are generated by the inner nursery in + # a supervisor strategy **before** blocking indefinitely to wait for + # actors spawned in "daemon mode" (aka started using + # ``ActorNursery.start_actor()``). async with trio.open_nursery() as da_nursery: try: + # This is the inner level "run in actor" nursery. It is + # awaited first since actors spawned in this way (using + # ``ActorNusery.run_in_actor()``) are expected to only + # return a single result and then complete (i.e. be canclled + # gracefully). Errors collected from these actors are + # immediately raised for handling by a supervisor strategy. + # As such if the strategy propagates any error(s) upwards + # the above "daemon actor" nursery will be notified. async with trio.open_nursery() as ria_nursery: anursery = ActorNursery( actor, ria_nursery, da_nursery, errors ) try: - # spawning of actors happens in this scope after - # we yield to the caller. + # spawning of actors happens in the caller's scope + # after we yield upwards yield anursery log.debug( f"Waiting on subactors {anursery._children}" "to complete" ) except (BaseException, Exception) as err: + # if the caller's scope errored then we activate our + # one-cancels-all supervisor strategy (don't + # worry more are coming). anursery._join_procs.set() try: # XXX: hypothetically an error could be raised and then @@ -219,24 +244,44 @@ async def open_nursery() -> typing.AsyncGenerator[ActorNursery, None]: log.exception( f"Nursery for {current_actor().uid} " f"errored with {err}, ") + + # cancel all subactors await anursery.cancel() + except trio.MultiError as merr: + # If we receive additional errors while waiting on + # remaining subactors that were cancelled, + # aggregate those errors with the original error + # that triggered this teardown. if err not in merr.exceptions: raise trio.MultiError(merr.exceptions + [err]) else: raise - # last bit before first nursery block ends + # Last bit before first nursery block ends in the case + # where we didn't error in the caller's scope log.debug(f"Waiting on all subactors to complete") anursery._join_procs.set() - # ria_nursery scope + + # ria_nursery scope end + except (Exception, trio.MultiError) as err: + # If actor-local error was raised while waiting on + # ".run_in_actor()" actors then we also want to cancel all + # remaining sub-actors (due to our lone strategy: + # one-cancels-all). log.warning(f"Nursery cancelling due to {err}") if anursery._children: with trio.CancelScope(shield=True): await anursery.cancel() raise finally: + # No errors were raised while awaiting ".run_in_actor()" + # actors but those actors may have returned remote errors as + # results (meaning they errored remotely and have relayed + # those errors back to this parent actor). The errors are + # collected in ``errors`` so cancel all actors, summarize + # all errors and re-raise. if errors: if anursery._children: with trio.CancelScope(shield=True): @@ -245,4 +290,7 @@ async def open_nursery() -> typing.AsyncGenerator[ActorNursery, None]: raise trio.MultiError(tuple(errors.values())) else: raise list(errors.values())[0] + + # ria_nursery scope end + log.debug(f"Nursery teardown complete")