forked from goodboy/tractor
Add `._implicit_runtime_started` mark, better logs
After some deep logging improvements to many parts of `._runtime`, I realized a silly detail where we are always waiting on any opened `local_nursery: ActorNursery` to signal exit from `Actor._stream_handler()` even in the case of being an implicitly opened root actor (`open_root_actor()` wasn't called by user/app code) via `._supervise.open_nursery()`.. So, to address this add a `ActorNursery._implicit_runtime_started: bool` that can be set and then checked to avoid doing the unnecessary `.exited.wait()` (and any subsequent warn logging on an exit timeout) in that special but most common case XD Matching with other subsys log format refinements, improve readability and simplicity of the actor-nursery supervisory log msgs, including: - simplify and/or remove any content that more or less duplicates msg content found in emissions from lower-level primitives and sub-systems (like `._runtime`, `_context`, `_portal` etc.). - add a specific `._open_and_supervise_one_cancels_all_nursery()` handler block for `ContextCancelled` to log with `.cancel()` level noting that the case is a "remote cancellation". - put the nursery-exit and actor-tree shutdown status into a single msg in the `implicit_runtime` case.modden_spawn_from_client_req
parent
50465d4b34
commit
08a6a51cb8
|
@ -34,7 +34,10 @@ from ._state import current_actor, is_main_process
|
||||||
from .log import get_logger, get_loglevel
|
from .log import get_logger, get_loglevel
|
||||||
from ._runtime import Actor
|
from ._runtime import Actor
|
||||||
from ._portal import Portal
|
from ._portal import Portal
|
||||||
from ._exceptions import is_multi_cancelled
|
from ._exceptions import (
|
||||||
|
is_multi_cancelled,
|
||||||
|
ContextCancelled,
|
||||||
|
)
|
||||||
from ._root import open_root_actor
|
from ._root import open_root_actor
|
||||||
from . import _state
|
from . import _state
|
||||||
from . import _spawn
|
from . import _spawn
|
||||||
|
@ -104,6 +107,14 @@ class ActorNursery:
|
||||||
self.errors = errors
|
self.errors = errors
|
||||||
self.exited = trio.Event()
|
self.exited = trio.Event()
|
||||||
|
|
||||||
|
# NOTE: when no explicit call is made to
|
||||||
|
# `.open_root_actor()` by application code,
|
||||||
|
# `.open_nursery()` will implicitly call it to start the
|
||||||
|
# actor-tree runtime. In this case we mark ourselves as
|
||||||
|
# such so that runtime components can be aware for logging
|
||||||
|
# and syncing purposes to any actor opened nurseries.
|
||||||
|
self._implicit_runtime_started: bool = False
|
||||||
|
|
||||||
async def start_actor(
|
async def start_actor(
|
||||||
self,
|
self,
|
||||||
name: str,
|
name: str,
|
||||||
|
@ -249,10 +260,11 @@ class ActorNursery:
|
||||||
'''
|
'''
|
||||||
self.cancelled = True
|
self.cancelled = True
|
||||||
|
|
||||||
log.cancel(
|
# TODO: impl a repr for spawn more compact
|
||||||
'Cancelling actor nursery\n'
|
# then `._children`..
|
||||||
f'|_{self._children}\n'
|
children: dict = self._children
|
||||||
)
|
child_count: int = len(children)
|
||||||
|
msg: str = f'Cancelling actor nursery with {child_count} children\n'
|
||||||
with trio.move_on_after(3) as cs:
|
with trio.move_on_after(3) as cs:
|
||||||
async with trio.open_nursery() as tn:
|
async with trio.open_nursery() as tn:
|
||||||
|
|
||||||
|
@ -263,7 +275,7 @@ class ActorNursery:
|
||||||
subactor,
|
subactor,
|
||||||
proc,
|
proc,
|
||||||
portal,
|
portal,
|
||||||
) in self._children.values():
|
) in children.values():
|
||||||
|
|
||||||
# TODO: are we ever even going to use this or
|
# TODO: are we ever even going to use this or
|
||||||
# is the spawning backend responsible for such
|
# is the spawning backend responsible for such
|
||||||
|
@ -275,12 +287,13 @@ class ActorNursery:
|
||||||
if portal is None: # actor hasn't fully spawned yet
|
if portal is None: # actor hasn't fully spawned yet
|
||||||
event = self._actor._peer_connected[subactor.uid]
|
event = self._actor._peer_connected[subactor.uid]
|
||||||
log.warning(
|
log.warning(
|
||||||
f"{subactor.uid} wasn't finished spawning?")
|
f"{subactor.uid} never 't finished spawning?"
|
||||||
|
)
|
||||||
|
|
||||||
await event.wait()
|
await event.wait()
|
||||||
|
|
||||||
# channel/portal should now be up
|
# channel/portal should now be up
|
||||||
_, _, portal = self._children[subactor.uid]
|
_, _, portal = children[subactor.uid]
|
||||||
|
|
||||||
# XXX should be impossible to get here
|
# XXX should be impossible to get here
|
||||||
# unless method was called from within
|
# unless method was called from within
|
||||||
|
@ -299,11 +312,13 @@ class ActorNursery:
|
||||||
if portal.channel.connected():
|
if portal.channel.connected():
|
||||||
tn.start_soon(portal.cancel_actor)
|
tn.start_soon(portal.cancel_actor)
|
||||||
|
|
||||||
|
log.cancel(msg)
|
||||||
# if we cancelled the cancel (we hung cancelling remote actors)
|
# if we cancelled the cancel (we hung cancelling remote actors)
|
||||||
# then hard kill all sub-processes
|
# then hard kill all sub-processes
|
||||||
if cs.cancelled_caught:
|
if cs.cancelled_caught:
|
||||||
log.error(
|
log.error(
|
||||||
f'Failed to cancel {self}\nHard killing process tree!'
|
f'Failed to cancel {self}?\n'
|
||||||
|
'Hard killing underlying subprocess tree!\n'
|
||||||
)
|
)
|
||||||
subactor: Actor
|
subactor: Actor
|
||||||
proc: trio.Process
|
proc: trio.Process
|
||||||
|
@ -312,7 +327,7 @@ class ActorNursery:
|
||||||
subactor,
|
subactor,
|
||||||
proc,
|
proc,
|
||||||
portal,
|
portal,
|
||||||
) in self._children.values():
|
) in children.values():
|
||||||
log.warning(f"Hard killing process {proc}")
|
log.warning(f"Hard killing process {proc}")
|
||||||
proc.terminate()
|
proc.terminate()
|
||||||
|
|
||||||
|
@ -390,26 +405,39 @@ async def _open_and_supervise_one_cancels_all_nursery(
|
||||||
# worry more are coming).
|
# worry more are coming).
|
||||||
an._join_procs.set()
|
an._join_procs.set()
|
||||||
|
|
||||||
# XXX: hypothetically an error could be
|
# XXX NOTE XXX: hypothetically an error could
|
||||||
# raised and then a cancel signal shows up
|
# be raised and then a cancel signal shows up
|
||||||
# slightly after in which case the `else:`
|
# slightly after in which case the `else:`
|
||||||
# block here might not complete? For now,
|
# block here might not complete? For now,
|
||||||
# shield both.
|
# shield both.
|
||||||
with trio.CancelScope(shield=True):
|
with trio.CancelScope(shield=True):
|
||||||
etype = type(inner_err)
|
etype: type = type(inner_err)
|
||||||
if etype in (
|
if etype in (
|
||||||
trio.Cancelled,
|
trio.Cancelled,
|
||||||
KeyboardInterrupt
|
KeyboardInterrupt,
|
||||||
) or (
|
) or (
|
||||||
is_multi_cancelled(inner_err)
|
is_multi_cancelled(inner_err)
|
||||||
):
|
):
|
||||||
log.cancel(
|
log.cancel(
|
||||||
f"Nursery for {current_actor().uid} "
|
f'Actor-nursery cancelled by {etype}\n\n'
|
||||||
f"was cancelled with {etype}")
|
|
||||||
|
f'{current_actor().uid}\n'
|
||||||
|
f' |_{an}\n\n'
|
||||||
|
|
||||||
|
# TODO: show tb str?
|
||||||
|
# f'{tb_str}'
|
||||||
|
)
|
||||||
|
elif etype in {
|
||||||
|
ContextCancelled,
|
||||||
|
}:
|
||||||
|
log.cancel(
|
||||||
|
'Actor-nursery caught remote cancellation\n\n'
|
||||||
|
|
||||||
|
f'{inner_err.tb_str}'
|
||||||
|
)
|
||||||
else:
|
else:
|
||||||
log.exception(
|
log.exception(
|
||||||
f"Nursery for {current_actor().uid} "
|
'Nursery errored with:\n'
|
||||||
"errored with:"
|
|
||||||
|
|
||||||
# TODO: same thing as in
|
# TODO: same thing as in
|
||||||
# `._invoke()` to compute how to
|
# `._invoke()` to compute how to
|
||||||
|
@ -450,11 +478,15 @@ async def _open_and_supervise_one_cancels_all_nursery(
|
||||||
# ".run_in_actor()" actors then we also want to cancel all
|
# ".run_in_actor()" actors then we also want to cancel all
|
||||||
# remaining sub-actors (due to our lone strategy:
|
# remaining sub-actors (due to our lone strategy:
|
||||||
# one-cancels-all).
|
# one-cancels-all).
|
||||||
log.cancel(f"Nursery cancelling due to {err}")
|
|
||||||
if an._children:
|
if an._children:
|
||||||
|
log.cancel(
|
||||||
|
'Actor-nursery cancelling due error type:\n'
|
||||||
|
f'{err}\n'
|
||||||
|
)
|
||||||
with trio.CancelScope(shield=True):
|
with trio.CancelScope(shield=True):
|
||||||
await an.cancel()
|
await an.cancel()
|
||||||
raise
|
raise
|
||||||
|
|
||||||
finally:
|
finally:
|
||||||
# No errors were raised while awaiting ".run_in_actor()"
|
# No errors were raised while awaiting ".run_in_actor()"
|
||||||
# actors but those actors may have returned remote errors as
|
# actors but those actors may have returned remote errors as
|
||||||
|
@ -500,7 +532,7 @@ async def open_nursery(
|
||||||
which cancellation scopes correspond to each spawned subactor set.
|
which cancellation scopes correspond to each spawned subactor set.
|
||||||
|
|
||||||
'''
|
'''
|
||||||
implicit_runtime = False
|
implicit_runtime: bool = False
|
||||||
|
|
||||||
actor = current_actor(err_on_no_runtime=False)
|
actor = current_actor(err_on_no_runtime=False)
|
||||||
|
|
||||||
|
@ -512,7 +544,7 @@ async def open_nursery(
|
||||||
log.info("Starting actor runtime!")
|
log.info("Starting actor runtime!")
|
||||||
|
|
||||||
# mark us for teardown on exit
|
# mark us for teardown on exit
|
||||||
implicit_runtime = True
|
implicit_runtime: bool = True
|
||||||
|
|
||||||
async with open_root_actor(**kwargs) as actor:
|
async with open_root_actor(**kwargs) as actor:
|
||||||
assert actor is current_actor()
|
assert actor is current_actor()
|
||||||
|
@ -521,8 +553,21 @@ async def open_nursery(
|
||||||
async with _open_and_supervise_one_cancels_all_nursery(
|
async with _open_and_supervise_one_cancels_all_nursery(
|
||||||
actor
|
actor
|
||||||
) as an:
|
) as an:
|
||||||
|
|
||||||
|
# NOTE: mark this nursery as having
|
||||||
|
# implicitly started the root actor so
|
||||||
|
# that `._runtime` machinery can avoid
|
||||||
|
# certain teardown synchronization
|
||||||
|
# blocking/waits and any associated (warn)
|
||||||
|
# logging when it's known that this
|
||||||
|
# nursery shouldn't be exited before the
|
||||||
|
# root actor is.
|
||||||
|
an._implicit_runtime_started = True
|
||||||
yield an
|
yield an
|
||||||
finally:
|
finally:
|
||||||
|
# XXX: this event will be set after the root actor
|
||||||
|
# runtime is already torn down, so we want to
|
||||||
|
# avoid any blocking on it.
|
||||||
an.exited.set()
|
an.exited.set()
|
||||||
|
|
||||||
else: # sub-nursery case
|
else: # sub-nursery case
|
||||||
|
@ -536,8 +581,13 @@ async def open_nursery(
|
||||||
an.exited.set()
|
an.exited.set()
|
||||||
|
|
||||||
finally:
|
finally:
|
||||||
log.debug("Nursery teardown complete")
|
msg: str = (
|
||||||
|
'Actor-nursery exited\n'
|
||||||
|
f'|_{an}\n\n'
|
||||||
|
)
|
||||||
|
|
||||||
# shutdown runtime if it was started
|
# shutdown runtime if it was started
|
||||||
if implicit_runtime:
|
if implicit_runtime:
|
||||||
log.info("Shutting down actor tree")
|
msg += '=> Shutting down actor runtime <=\n'
|
||||||
|
|
||||||
|
log.info(msg)
|
||||||
|
|
Loading…
Reference in New Issue