forked from goodboy/tractor
1
0
Fork 0

Add `pformat()` of `ActorNursery._children` to logging

Such that you see the children entries prior to exit instead of the
prior somewhat detail/use-less logging. Also, rename all `anursery` vars
to just `an` as is the convention in most examples.
modden_spawn_from_client_req
Tyler Goodlet 2024-02-21 13:21:28 -05:00
parent 10adf34be5
commit 28ba5e5435
1 changed files with 36 additions and 27 deletions

View File

@ -21,6 +21,7 @@
from contextlib import asynccontextmanager as acm
from functools import partial
import inspect
from pprint import pformat
from typing import TYPE_CHECKING
import typing
import warnings
@ -189,14 +190,16 @@ class ActorNursery:
**kwargs, # explicit args to ``fn``
) -> Portal:
"""Spawn a new actor, run a lone task, then terminate the actor and
'''
Spawn a new actor, run a lone task, then terminate the actor and
return its result.
Actors spawned using this method are kept alive at nursery teardown
until the task spawned by executing ``fn`` completes at which point
the actor is terminated.
"""
mod_path = fn.__module__
'''
mod_path: str = fn.__module__
if name is None:
# use the explicit function name if not provided
@ -231,7 +234,11 @@ class ActorNursery:
)
return portal
async def cancel(self, hard_kill: bool = False) -> None:
async def cancel(
self,
hard_kill: bool = False,
) -> None:
'''
Cancel this nursery by instructing each subactor to cancel
itself and wait for all subactors to terminate.
@ -242,10 +249,12 @@ class ActorNursery:
'''
self.cancelled = True
log.cancel(f"Cancelling nursery in {self._actor.uid}")
log.cancel(
'Cancelling actor nursery\n'
f'|_{self._children}\n'
)
with trio.move_on_after(3) as cs:
async with trio.open_nursery() as nursery:
async with trio.open_nursery() as tn:
subactor: Actor
proc: trio.Process
@ -288,7 +297,7 @@ class ActorNursery:
# spawn cancel tasks for each sub-actor
assert portal
if portal.channel.connected():
nursery.start_soon(portal.cancel_actor)
tn.start_soon(portal.cancel_actor)
# if we cancelled the cancel (we hung cancelling remote actors)
# then hard kill all sub-processes
@ -343,7 +352,7 @@ async def _open_and_supervise_one_cancels_all_nursery(
# the above "daemon actor" nursery will be notified.
async with trio.open_nursery() as ria_nursery:
anursery = ActorNursery(
an = ActorNursery(
actor,
ria_nursery,
da_nursery,
@ -352,16 +361,16 @@ async def _open_and_supervise_one_cancels_all_nursery(
try:
# spawning of actors happens in the caller's scope
# after we yield upwards
yield anursery
yield an
# When we didn't error in the caller's scope,
# signal all process-monitor-tasks to conduct
# the "hard join phase".
log.runtime(
f"Waiting on subactors {anursery._children} "
"to complete"
'Waiting on subactors to complete:\n'
f'{pformat(an._children)}\n'
)
anursery._join_procs.set()
an._join_procs.set()
except BaseException as inner_err:
errors[actor.uid] = inner_err
@ -373,13 +382,13 @@ async def _open_and_supervise_one_cancels_all_nursery(
# Instead try to wait for pdb to be released before
# tearing down.
await maybe_wait_for_debugger(
child_in_debug=anursery._at_least_one_child_in_debug
child_in_debug=an._at_least_one_child_in_debug
)
# if the caller's scope errored then we activate our
# one-cancels-all supervisor strategy (don't
# worry more are coming).
anursery._join_procs.set()
an._join_procs.set()
# XXX: hypothetically an error could be
# raised and then a cancel signal shows up
@ -413,7 +422,7 @@ async def _open_and_supervise_one_cancels_all_nursery(
)
# cancel all subactors
await anursery.cancel()
await an.cancel()
# ria_nursery scope end
@ -434,7 +443,7 @@ async def _open_and_supervise_one_cancels_all_nursery(
# XXX: yet another guard before allowing the cancel
# sequence in case a (single) child is in debug.
await maybe_wait_for_debugger(
child_in_debug=anursery._at_least_one_child_in_debug
child_in_debug=an._at_least_one_child_in_debug
)
# If actor-local error was raised while waiting on
@ -442,9 +451,9 @@ async def _open_and_supervise_one_cancels_all_nursery(
# remaining sub-actors (due to our lone strategy:
# one-cancels-all).
log.cancel(f"Nursery cancelling due to {err}")
if anursery._children:
if an._children:
with trio.CancelScope(shield=True):
await anursery.cancel()
await an.cancel()
raise
finally:
# No errors were raised while awaiting ".run_in_actor()"
@ -454,9 +463,9 @@ async def _open_and_supervise_one_cancels_all_nursery(
# collected in ``errors`` so cancel all actors, summarize
# all errors and re-raise.
if errors:
if anursery._children:
if an._children:
with trio.CancelScope(shield=True):
await anursery.cancel()
await an.cancel()
# use `BaseExceptionGroup` as needed
if len(errors) > 1:
@ -511,20 +520,20 @@ async def open_nursery(
try:
async with _open_and_supervise_one_cancels_all_nursery(
actor
) as anursery:
yield anursery
) as an:
yield an
finally:
anursery.exited.set()
an.exited.set()
else: # sub-nursery case
try:
async with _open_and_supervise_one_cancels_all_nursery(
actor
) as anursery:
yield anursery
) as an:
yield an
finally:
anursery.exited.set()
an.exited.set()
finally:
log.debug("Nursery teardown complete")