WIP, actor-nursery non-graceful-cancel raises EG
Attempting a rework of the post-cancellation "raising semantics" such that subactors which are `ActorCancelled` as a result of a non-graceful in-scope error, are acked via a re-raised `ExceptionGroup[ActorCancelled*N, Exception]` *outside the an-block*. Eventually, the idea is to have `ActorCancelled` be relayed from each subactor in response to any `Actor.cancel()/Portal.cancel_actor()` request much like `Context.cancel()/ContextCancelled`. This is a WIP bc it does break a few tests and requires related `_spawn`-mod-machinery changes to match some of which I'm not yet sure are required; need to dig into to the details of the currently failing suites first. `._supervise` patch deats, - add `ActorNursery.maybe_error` which delivers the maybe-EG or `._scope_error` depending on `.errors` (now `._errors`, a mapping from `Aid`-keys) has entries seet for subs. - raise ^ if non-null in a new outer-`finally` in `_open_and_supervise_one_cancels_all_nursery()`; an "outer" block is added to ensure all sub-actor-excs are emited/captured as part of `ActorNursery.cancel()` being called (as prior) as well as the `da_nursery` being explicitly cancelled alongside it (to unblock the tn-block, but still not sure why this is necessary yet?..). - (now masked) tried injecting actorcs from `.cancel()` loop, but (again per more explanation in section below) seems to be suffering a race issue with RAE relay? - left in buncha notes obvi for all this.. `._spawn` patch deats, - as above, expect `errors: dict` to map from `Aid`-keys. - pass `errors: dict` into `soft_kill()` since it seemed like we'd want to (for now) inject `ActoreCancelled` in some cases (but now i'm not sure XD). - tried out a couple spots (which are now masked) to inject `ActorCancelled` after calling `Portal.cancel()` in various subactor-supervision routines whenev an RAE is not set.. - oddly seems to be overwriting actual errors (likely due to racing with RAE receive and/or actorc-request timeout?) despite the guard logic..which clearly doesn't resolve the issue.. - buncha `tn`-style renaming.actor_cancelled_exc_type
parent
d432e2e245
commit
b3d348ee6a
|
@ -50,7 +50,11 @@ from tractor._addr import UnwrappedAddress
|
||||||
from tractor._portal import Portal
|
from tractor._portal import Portal
|
||||||
from tractor._runtime import Actor
|
from tractor._runtime import Actor
|
||||||
from tractor._entry import _mp_main
|
from tractor._entry import _mp_main
|
||||||
from tractor._exceptions import ActorFailure
|
from tractor._exceptions import (
|
||||||
|
ActorCancelled,
|
||||||
|
ActorFailure,
|
||||||
|
# NoResult,
|
||||||
|
)
|
||||||
from tractor.msg import (
|
from tractor.msg import (
|
||||||
types as msgtypes,
|
types as msgtypes,
|
||||||
pretty_struct,
|
pretty_struct,
|
||||||
|
@ -137,7 +141,6 @@ def try_set_start_method(
|
||||||
|
|
||||||
|
|
||||||
async def exhaust_portal(
|
async def exhaust_portal(
|
||||||
|
|
||||||
portal: Portal,
|
portal: Portal,
|
||||||
actor: Actor
|
actor: Actor
|
||||||
|
|
||||||
|
@ -185,10 +188,12 @@ async def exhaust_portal(
|
||||||
|
|
||||||
|
|
||||||
async def cancel_on_completion(
|
async def cancel_on_completion(
|
||||||
|
|
||||||
portal: Portal,
|
portal: Portal,
|
||||||
actor: Actor,
|
actor: Actor,
|
||||||
errors: dict[tuple[str, str], Exception],
|
errors: dict[
|
||||||
|
msgtypes.Aid,
|
||||||
|
Exception,
|
||||||
|
],
|
||||||
|
|
||||||
) -> None:
|
) -> None:
|
||||||
'''
|
'''
|
||||||
|
@ -209,24 +214,57 @@ async def cancel_on_completion(
|
||||||
portal,
|
portal,
|
||||||
actor,
|
actor,
|
||||||
)
|
)
|
||||||
|
aid: msgtypes.Aid = actor.aid
|
||||||
|
repr_aid: str = aid.reprol(sin_uuid=False)
|
||||||
|
|
||||||
if isinstance(result, Exception):
|
if isinstance(result, Exception):
|
||||||
errors[actor.uid]: Exception = result
|
errors[aid]: Exception = result
|
||||||
log.cancel(
|
log.cancel(
|
||||||
'Cancelling subactor runtime due to error:\n\n'
|
'Cancelling subactor {repr_aid!r} runtime due to error\n'
|
||||||
f'Portal.cancel_actor() => {portal.channel.uid}\n\n'
|
f'\n'
|
||||||
f'error: {result}\n'
|
f'Portal.cancel_actor() => {portal.channel.uid}\n'
|
||||||
|
f'\n'
|
||||||
|
f'{result!r}\n'
|
||||||
)
|
)
|
||||||
|
|
||||||
else:
|
else:
|
||||||
log.runtime(
|
report: str = (
|
||||||
'Cancelling subactor gracefully:\n\n'
|
f'Cancelling subactor {repr_aid!r} gracefully..\n'
|
||||||
f'Portal.cancel_actor() => {portal.channel.uid}\n\n'
|
f'\n'
|
||||||
f'result: {result}\n'
|
)
|
||||||
|
canc_info: str = (
|
||||||
|
f'Portal.cancel_actor() => {portal.chan.uid}\n'
|
||||||
|
f'\n'
|
||||||
|
f'final-result => {result!r}\n'
|
||||||
|
)
|
||||||
|
log.cancel(
|
||||||
|
report
|
||||||
|
+
|
||||||
|
canc_info
|
||||||
)
|
)
|
||||||
|
|
||||||
# cancel the process now that we have a final result
|
# cancel the process now that we have a final result
|
||||||
await portal.cancel_actor()
|
await portal.cancel_actor()
|
||||||
|
|
||||||
|
if (
|
||||||
|
not errors.get(aid)
|
||||||
|
# and
|
||||||
|
# result is NoResult
|
||||||
|
):
|
||||||
|
pass
|
||||||
|
# await debug.pause(shield=True)
|
||||||
|
|
||||||
|
# errors[aid] = ActorCancelled(
|
||||||
|
# message=(
|
||||||
|
# f'Cancelled subactor {repr_aid!r}\n'
|
||||||
|
# f'{canc_info}\n'
|
||||||
|
# ),
|
||||||
|
# canceller=current_actor().aid,
|
||||||
|
# # TODO? should we have a ack-msg?
|
||||||
|
# # ipc_msg=??
|
||||||
|
# # boxed_type=trio.Cancelled,
|
||||||
|
# )
|
||||||
|
|
||||||
|
|
||||||
async def hard_kill(
|
async def hard_kill(
|
||||||
proc: trio.Process,
|
proc: trio.Process,
|
||||||
|
@ -331,6 +369,10 @@ async def soft_kill(
|
||||||
Awaitable,
|
Awaitable,
|
||||||
],
|
],
|
||||||
portal: Portal,
|
portal: Portal,
|
||||||
|
errors: dict[
|
||||||
|
msgtypes.Aid,
|
||||||
|
Exception,
|
||||||
|
],
|
||||||
|
|
||||||
) -> None:
|
) -> None:
|
||||||
'''
|
'''
|
||||||
|
@ -374,8 +416,8 @@ async def soft_kill(
|
||||||
# below. This means we try to do a graceful teardown
|
# below. This means we try to do a graceful teardown
|
||||||
# via sending a cancel message before getting out
|
# via sending a cancel message before getting out
|
||||||
# zombie killing tools.
|
# zombie killing tools.
|
||||||
async with trio.open_nursery() as n:
|
async with trio.open_nursery() as tn:
|
||||||
n.cancel_scope.shield = True
|
tn.cancel_scope.shield = True
|
||||||
|
|
||||||
async def cancel_on_proc_deth():
|
async def cancel_on_proc_deth():
|
||||||
'''
|
'''
|
||||||
|
@ -385,24 +427,35 @@ async def soft_kill(
|
||||||
|
|
||||||
'''
|
'''
|
||||||
await wait_func(proc)
|
await wait_func(proc)
|
||||||
n.cancel_scope.cancel()
|
tn.cancel_scope.cancel()
|
||||||
|
|
||||||
# start a task to wait on the termination of the
|
# start a task to wait on the termination of the
|
||||||
# process by itself waiting on a (caller provided) wait
|
# process by itself waiting on a (caller provided) wait
|
||||||
# function which should unblock when the target process
|
# function which should unblock when the target process
|
||||||
# has terminated.
|
# has terminated.
|
||||||
n.start_soon(cancel_on_proc_deth)
|
tn.start_soon(cancel_on_proc_deth)
|
||||||
|
|
||||||
# send the actor-runtime a cancel request.
|
# send the actor-runtime a cancel request.
|
||||||
await portal.cancel_actor()
|
await portal.cancel_actor()
|
||||||
|
|
||||||
|
# if not errors.get(peer_aid):
|
||||||
|
# errors[peer_aid] = ActorCancelled(
|
||||||
|
# message=(
|
||||||
|
# 'Sub-actor cancelled gracefully by parent\n'
|
||||||
|
# ),
|
||||||
|
# canceller=current_actor().aid,
|
||||||
|
# # TODO? should we have a ack-msg?
|
||||||
|
# # ipc_msg=??
|
||||||
|
# # boxed_type=trio.Cancelled,
|
||||||
|
# )
|
||||||
|
|
||||||
if proc.poll() is None: # type: ignore
|
if proc.poll() is None: # type: ignore
|
||||||
log.warning(
|
log.warning(
|
||||||
'Subactor still alive after cancel request?\n\n'
|
'Subactor still alive after cancel request?\n\n'
|
||||||
f'uid: {peer_aid}\n'
|
f'uid: {peer_aid}\n'
|
||||||
f'|_{proc}\n'
|
f'|_{proc}\n'
|
||||||
)
|
)
|
||||||
n.cancel_scope.cancel()
|
tn.cancel_scope.cancel()
|
||||||
raise
|
raise
|
||||||
|
|
||||||
|
|
||||||
|
@ -410,7 +463,10 @@ async def new_proc(
|
||||||
name: str,
|
name: str,
|
||||||
actor_nursery: ActorNursery,
|
actor_nursery: ActorNursery,
|
||||||
subactor: Actor,
|
subactor: Actor,
|
||||||
errors: dict[tuple[str, str], Exception],
|
errors: dict[
|
||||||
|
msgtypes.Aid,
|
||||||
|
Exception,
|
||||||
|
],
|
||||||
|
|
||||||
# passed through to actor main
|
# passed through to actor main
|
||||||
bind_addrs: list[UnwrappedAddress],
|
bind_addrs: list[UnwrappedAddress],
|
||||||
|
@ -449,7 +505,10 @@ async def trio_proc(
|
||||||
name: str,
|
name: str,
|
||||||
actor_nursery: ActorNursery,
|
actor_nursery: ActorNursery,
|
||||||
subactor: Actor,
|
subactor: Actor,
|
||||||
errors: dict[tuple[str, str], Exception],
|
errors: dict[
|
||||||
|
msgtypes.Aid,
|
||||||
|
Exception,
|
||||||
|
],
|
||||||
|
|
||||||
# passed through to actor main
|
# passed through to actor main
|
||||||
bind_addrs: list[UnwrappedAddress],
|
bind_addrs: list[UnwrappedAddress],
|
||||||
|
@ -572,9 +631,9 @@ async def trio_proc(
|
||||||
with trio.CancelScope(shield=True):
|
with trio.CancelScope(shield=True):
|
||||||
await actor_nursery._join_procs.wait()
|
await actor_nursery._join_procs.wait()
|
||||||
|
|
||||||
async with trio.open_nursery() as nursery:
|
async with trio.open_nursery() as ptl_reaper_tn:
|
||||||
if portal in actor_nursery._cancel_after_result_on_exit:
|
if portal in actor_nursery._cancel_after_result_on_exit:
|
||||||
nursery.start_soon(
|
ptl_reaper_tn.start_soon(
|
||||||
cancel_on_completion,
|
cancel_on_completion,
|
||||||
portal,
|
portal,
|
||||||
subactor,
|
subactor,
|
||||||
|
@ -587,7 +646,8 @@ async def trio_proc(
|
||||||
await soft_kill(
|
await soft_kill(
|
||||||
proc,
|
proc,
|
||||||
trio.Process.wait, # XXX, uses `pidfd_open()` below.
|
trio.Process.wait, # XXX, uses `pidfd_open()` below.
|
||||||
portal
|
portal,
|
||||||
|
errors,
|
||||||
)
|
)
|
||||||
|
|
||||||
# cancel result waiter that may have been spawned in
|
# cancel result waiter that may have been spawned in
|
||||||
|
@ -596,7 +656,7 @@ async def trio_proc(
|
||||||
'Cancelling portal result reaper task\n'
|
'Cancelling portal result reaper task\n'
|
||||||
f'c)> {subactor.aid.reprol()!r}\n'
|
f'c)> {subactor.aid.reprol()!r}\n'
|
||||||
)
|
)
|
||||||
nursery.cancel_scope.cancel()
|
ptl_reaper_tn.cancel_scope.cancel()
|
||||||
|
|
||||||
finally:
|
finally:
|
||||||
# XXX NOTE XXX: The "hard" reap since no actor zombies are
|
# XXX NOTE XXX: The "hard" reap since no actor zombies are
|
||||||
|
@ -669,7 +729,10 @@ async def mp_proc(
|
||||||
name: str,
|
name: str,
|
||||||
actor_nursery: ActorNursery, # type: ignore # noqa
|
actor_nursery: ActorNursery, # type: ignore # noqa
|
||||||
subactor: Actor,
|
subactor: Actor,
|
||||||
errors: dict[tuple[str, str], Exception],
|
errors: dict[
|
||||||
|
msgtypes.Aid,
|
||||||
|
Exception,
|
||||||
|
],
|
||||||
# passed through to actor main
|
# passed through to actor main
|
||||||
bind_addrs: list[UnwrappedAddress],
|
bind_addrs: list[UnwrappedAddress],
|
||||||
parent_addr: UnwrappedAddress,
|
parent_addr: UnwrappedAddress,
|
||||||
|
@ -794,7 +857,7 @@ async def mp_proc(
|
||||||
cancel_on_completion,
|
cancel_on_completion,
|
||||||
portal,
|
portal,
|
||||||
subactor,
|
subactor,
|
||||||
errors
|
errors,
|
||||||
)
|
)
|
||||||
|
|
||||||
# This is a "soft" (cancellable) join/reap which
|
# This is a "soft" (cancellable) join/reap which
|
||||||
|
@ -803,7 +866,8 @@ async def mp_proc(
|
||||||
await soft_kill(
|
await soft_kill(
|
||||||
proc,
|
proc,
|
||||||
proc_waiter,
|
proc_waiter,
|
||||||
portal
|
portal,
|
||||||
|
errors,
|
||||||
)
|
)
|
||||||
|
|
||||||
# cancel result waiter that may have been spawned in
|
# cancel result waiter that may have been spawned in
|
||||||
|
|
|
@ -30,6 +30,9 @@ import warnings
|
||||||
import trio
|
import trio
|
||||||
|
|
||||||
|
|
||||||
|
from .msg import (
|
||||||
|
types as msgtypes,
|
||||||
|
)
|
||||||
from .devx import (
|
from .devx import (
|
||||||
debug,
|
debug,
|
||||||
pformat as _pformat,
|
pformat as _pformat,
|
||||||
|
@ -48,6 +51,7 @@ from .trionics import (
|
||||||
)
|
)
|
||||||
from ._exceptions import (
|
from ._exceptions import (
|
||||||
ContextCancelled,
|
ContextCancelled,
|
||||||
|
ActorCancelled,
|
||||||
)
|
)
|
||||||
from ._root import (
|
from ._root import (
|
||||||
open_root_actor,
|
open_root_actor,
|
||||||
|
@ -99,7 +103,10 @@ class ActorNursery:
|
||||||
actor: Actor,
|
actor: Actor,
|
||||||
ria_nursery: trio.Nursery,
|
ria_nursery: trio.Nursery,
|
||||||
da_nursery: trio.Nursery,
|
da_nursery: trio.Nursery,
|
||||||
errors: dict[tuple[str, str], BaseException],
|
errors: dict[
|
||||||
|
msgtypes.Aid,
|
||||||
|
BaseException,
|
||||||
|
],
|
||||||
|
|
||||||
) -> None:
|
) -> None:
|
||||||
# self.supervisor = supervisor # TODO
|
# self.supervisor = supervisor # TODO
|
||||||
|
@ -117,9 +124,11 @@ class ActorNursery:
|
||||||
]
|
]
|
||||||
] = {}
|
] = {}
|
||||||
|
|
||||||
|
# signals when it is ok to start waiting o subactor procs
|
||||||
|
# for termination.
|
||||||
self._join_procs = trio.Event()
|
self._join_procs = trio.Event()
|
||||||
self._at_least_one_child_in_debug: bool = False
|
self._at_least_one_child_in_debug: bool = False
|
||||||
self.errors = errors
|
self._errors = errors
|
||||||
self._scope_error: BaseException|None = None
|
self._scope_error: BaseException|None = None
|
||||||
self.exited = trio.Event()
|
self.exited = trio.Event()
|
||||||
|
|
||||||
|
@ -260,7 +269,7 @@ class ActorNursery:
|
||||||
name,
|
name,
|
||||||
self,
|
self,
|
||||||
subactor,
|
subactor,
|
||||||
self.errors,
|
self._errors,
|
||||||
bind_addrs,
|
bind_addrs,
|
||||||
parent_addr,
|
parent_addr,
|
||||||
_rtv, # run time vars
|
_rtv, # run time vars
|
||||||
|
@ -364,7 +373,9 @@ class ActorNursery:
|
||||||
# then `._children`..
|
# then `._children`..
|
||||||
children: dict = self._children
|
children: dict = self._children
|
||||||
child_count: int = len(children)
|
child_count: int = len(children)
|
||||||
msg: str = f'Cancelling actor nursery with {child_count} children\n'
|
msg: str = (
|
||||||
|
f'Cancelling actor-nursery with {child_count} children\n'
|
||||||
|
)
|
||||||
|
|
||||||
server: IPCServer = self._actor.ipc_server
|
server: IPCServer = self._actor.ipc_server
|
||||||
|
|
||||||
|
@ -391,7 +402,9 @@ class ActorNursery:
|
||||||
|
|
||||||
else:
|
else:
|
||||||
if portal is None: # actor hasn't fully spawned yet
|
if portal is None: # actor hasn't fully spawned yet
|
||||||
event: trio.Event = server._peer_connected[subactor.uid]
|
event: trio.Event = server._peer_connected[
|
||||||
|
subactor.uid
|
||||||
|
]
|
||||||
log.warning(
|
log.warning(
|
||||||
f"{subactor.uid} never 't finished spawning?"
|
f"{subactor.uid} never 't finished spawning?"
|
||||||
)
|
)
|
||||||
|
@ -416,7 +429,20 @@ class ActorNursery:
|
||||||
# spawn cancel tasks for each sub-actor
|
# spawn cancel tasks for each sub-actor
|
||||||
assert portal
|
assert portal
|
||||||
if portal.channel.connected():
|
if portal.channel.connected():
|
||||||
tn.start_soon(portal.cancel_actor)
|
|
||||||
|
async def canc_subactor():
|
||||||
|
await portal.cancel_actor()
|
||||||
|
# aid: msgtypes.Aid = subactor.aid
|
||||||
|
# reprol: str = aid.reprol(sin_uuid=False)
|
||||||
|
# if not self._errors.get(aid):
|
||||||
|
# self._errors[aid] = ActorCancelled(
|
||||||
|
# message=(
|
||||||
|
# f'Sub-actor {reprol!r} cancelled gracefully by parent nursery\n'
|
||||||
|
# ),
|
||||||
|
# canceller=self._actor.aid,
|
||||||
|
# )
|
||||||
|
|
||||||
|
tn.start_soon(canc_subactor)
|
||||||
|
|
||||||
log.cancel(msg)
|
log.cancel(msg)
|
||||||
# if we cancelled the cancel (we hung cancelling remote actors)
|
# if we cancelled the cancel (we hung cancelling remote actors)
|
||||||
|
@ -442,6 +468,47 @@ class ActorNursery:
|
||||||
# mark ourselves as having (tried to have) cancelled all subactors
|
# mark ourselves as having (tried to have) cancelled all subactors
|
||||||
self._join_procs.set()
|
self._join_procs.set()
|
||||||
|
|
||||||
|
@property
|
||||||
|
def maybe_error(self) -> (
|
||||||
|
BaseException|
|
||||||
|
BaseExceptionGroup|
|
||||||
|
None
|
||||||
|
):
|
||||||
|
'''
|
||||||
|
Deliver any captured scope errors including those relayed
|
||||||
|
from subactors such as `ActorCancelled` during a non-graceful
|
||||||
|
cancellation scenario.
|
||||||
|
|
||||||
|
When more then a "graceful cancel" occurrs wrap all collected
|
||||||
|
sub-exceptions in a raised `ExceptionGroup`.
|
||||||
|
|
||||||
|
'''
|
||||||
|
scope_exc: BaseException|None = self._scope_error
|
||||||
|
|
||||||
|
# XXX NOTE, only pack an eg if there i at least one
|
||||||
|
# non-actorc exception received from a subactor, OR
|
||||||
|
# return `._scope_error` verbatim.
|
||||||
|
if (errors := self._errors):
|
||||||
|
# use `BaseExceptionGroup` as needed
|
||||||
|
excs: list[BaseException] = list(errors.values())
|
||||||
|
if (
|
||||||
|
len(excs) > 1
|
||||||
|
and
|
||||||
|
any(
|
||||||
|
type(exc) not in {ActorCancelled,}
|
||||||
|
for exc in excs
|
||||||
|
)
|
||||||
|
):
|
||||||
|
return ExceptionGroup(
|
||||||
|
'ActorNursery multi-errored with',
|
||||||
|
tuple(excs),
|
||||||
|
)
|
||||||
|
|
||||||
|
# raise the lone subactor exc
|
||||||
|
return list(excs)[0]
|
||||||
|
|
||||||
|
return scope_exc
|
||||||
|
|
||||||
|
|
||||||
@acm
|
@acm
|
||||||
async def _open_and_supervise_one_cancels_all_nursery(
|
async def _open_and_supervise_one_cancels_all_nursery(
|
||||||
|
@ -457,7 +524,10 @@ async def _open_and_supervise_one_cancels_all_nursery(
|
||||||
inner_err: BaseException|None = None
|
inner_err: BaseException|None = None
|
||||||
|
|
||||||
# the collection of errors retreived from spawned sub-actors
|
# the collection of errors retreived from spawned sub-actors
|
||||||
errors: dict[tuple[str, str], BaseException] = {}
|
errors: dict[
|
||||||
|
msgtypes.Aid,
|
||||||
|
BaseException,
|
||||||
|
] = {}
|
||||||
|
|
||||||
# This is the outermost level "deamon actor" nursery. It is awaited
|
# This is the outermost level "deamon actor" nursery. It is awaited
|
||||||
# **after** the below inner "run in actor nursery". This allows for
|
# **after** the below inner "run in actor nursery". This allows for
|
||||||
|
@ -467,176 +537,212 @@ async def _open_and_supervise_one_cancels_all_nursery(
|
||||||
# `ActorNursery.start_actor()`).
|
# `ActorNursery.start_actor()`).
|
||||||
|
|
||||||
# errors from this daemon actor nursery bubble up to caller
|
# errors from this daemon actor nursery bubble up to caller
|
||||||
async with (
|
try:
|
||||||
collapse_eg(),
|
async with (
|
||||||
trio.open_nursery() as da_nursery,
|
collapse_eg(),
|
||||||
):
|
trio.open_nursery() as da_nursery,
|
||||||
try:
|
):
|
||||||
# This is the inner level "run in actor" nursery. It is
|
try:
|
||||||
# awaited first since actors spawned in this way (using
|
# This is the inner level "run in actor" nursery. It is
|
||||||
# `ActorNusery.run_in_actor()`) are expected to only
|
# awaited first since actors spawned in this way (using
|
||||||
# return a single result and then complete (i.e. be canclled
|
# `ActorNusery.run_in_actor()`) are expected to only
|
||||||
# gracefully). Errors collected from these actors are
|
# return a single result and then complete (i.e. be canclled
|
||||||
# immediately raised for handling by a supervisor strategy.
|
# gracefully). Errors collected from these actors are
|
||||||
# As such if the strategy propagates any error(s) upwards
|
# immediately raised for handling by a supervisor strategy.
|
||||||
# the above "daemon actor" nursery will be notified.
|
# As such if the strategy propagates any error(s) upwards
|
||||||
async with (
|
# the above "daemon actor" nursery will be notified.
|
||||||
collapse_eg(),
|
async with (
|
||||||
trio.open_nursery() as ria_nursery,
|
collapse_eg(),
|
||||||
):
|
trio.open_nursery() as ria_nursery,
|
||||||
an = ActorNursery(
|
):
|
||||||
actor,
|
an = ActorNursery(
|
||||||
ria_nursery,
|
actor,
|
||||||
da_nursery,
|
ria_nursery,
|
||||||
errors
|
da_nursery,
|
||||||
)
|
errors
|
||||||
try:
|
|
||||||
# spawning of actors happens in the caller's scope
|
|
||||||
# after we yield upwards
|
|
||||||
yield an
|
|
||||||
|
|
||||||
# When we didn't error in the caller's scope,
|
|
||||||
# signal all process-monitor-tasks to conduct
|
|
||||||
# the "hard join phase".
|
|
||||||
log.runtime(
|
|
||||||
'Waiting on subactors to complete:\n'
|
|
||||||
f'>}} {len(an._children)}\n'
|
|
||||||
)
|
)
|
||||||
an._join_procs.set()
|
try:
|
||||||
|
# spawning of actors happens in the caller's scope
|
||||||
|
# after we yield upwards
|
||||||
|
yield an
|
||||||
|
|
||||||
except BaseException as _inner_err:
|
# When we didn't error in the caller's scope,
|
||||||
inner_err = _inner_err
|
# signal all process-monitor-tasks to conduct
|
||||||
errors[actor.uid] = inner_err
|
# the "hard join phase".
|
||||||
|
log.runtime(
|
||||||
|
'Waiting on subactors to complete:\n'
|
||||||
|
f'>}} {len(an._children)}\n'
|
||||||
|
)
|
||||||
|
an._join_procs.set()
|
||||||
|
|
||||||
# If we error in the root but the debugger is
|
except BaseException as _inner_err:
|
||||||
# engaged we don't want to prematurely kill (and
|
inner_err = _inner_err
|
||||||
# thus clobber access to) the local tty since it
|
# errors[actor.aid] = inner_err
|
||||||
# will make the pdb repl unusable.
|
|
||||||
# Instead try to wait for pdb to be released before
|
|
||||||
# tearing down.
|
|
||||||
await debug.maybe_wait_for_debugger(
|
|
||||||
child_in_debug=an._at_least_one_child_in_debug
|
|
||||||
)
|
|
||||||
|
|
||||||
# if the caller's scope errored then we activate our
|
# If we error in the root but the debugger is
|
||||||
# one-cancels-all supervisor strategy (don't
|
# engaged we don't want to prematurely kill (and
|
||||||
# worry more are coming).
|
# thus clobber access to) the local tty since it
|
||||||
an._join_procs.set()
|
# will make the pdb repl unusable.
|
||||||
|
# Instead try to wait for pdb to be released before
|
||||||
|
# tearing down.
|
||||||
|
await debug.maybe_wait_for_debugger(
|
||||||
|
child_in_debug=an._at_least_one_child_in_debug
|
||||||
|
)
|
||||||
|
|
||||||
# XXX NOTE XXX: hypothetically an error could
|
# if the caller's scope errored then we activate our
|
||||||
# be raised and then a cancel signal shows up
|
# one-cancels-all supervisor strategy (don't
|
||||||
# slightly after in which case the `else:`
|
# worry more are coming).
|
||||||
# block here might not complete? For now,
|
an._join_procs.set()
|
||||||
# shield both.
|
|
||||||
with trio.CancelScope(shield=True):
|
|
||||||
etype: type = type(inner_err)
|
|
||||||
if etype in (
|
|
||||||
trio.Cancelled,
|
|
||||||
KeyboardInterrupt,
|
|
||||||
) or (
|
|
||||||
is_multi_cancelled(inner_err)
|
|
||||||
):
|
|
||||||
log.cancel(
|
|
||||||
f'Actor-nursery cancelled by {etype}\n\n'
|
|
||||||
|
|
||||||
f'{current_actor().uid}\n'
|
# XXX NOTE XXX: hypothetically an error could
|
||||||
f' |_{an}\n\n'
|
# be raised and then a cancel signal shows up
|
||||||
|
# slightly after in which case the `else:`
|
||||||
|
# block here might not complete? For now,
|
||||||
|
# shield both.
|
||||||
|
with trio.CancelScope(shield=True):
|
||||||
|
etype: type = type(inner_err)
|
||||||
|
if etype in (
|
||||||
|
trio.Cancelled,
|
||||||
|
KeyboardInterrupt,
|
||||||
|
) or (
|
||||||
|
is_multi_cancelled(inner_err)
|
||||||
|
):
|
||||||
|
log.cancel(
|
||||||
|
f'Actor-nursery cancelled by {etype}\n\n'
|
||||||
|
|
||||||
# TODO: show tb str?
|
f'{current_actor().uid}\n'
|
||||||
# f'{tb_str}'
|
f' |_{an}\n\n'
|
||||||
)
|
|
||||||
elif etype in {
|
|
||||||
ContextCancelled,
|
|
||||||
}:
|
|
||||||
log.cancel(
|
|
||||||
'Actor-nursery caught remote cancellation\n'
|
|
||||||
'\n'
|
|
||||||
f'{inner_err.tb_str}'
|
|
||||||
)
|
|
||||||
else:
|
|
||||||
log.exception(
|
|
||||||
'Nursery errored with:\n'
|
|
||||||
|
|
||||||
# TODO: same thing as in
|
# TODO: show tb str?
|
||||||
# `._invoke()` to compute how to
|
# f'{tb_str}'
|
||||||
# place this div-line in the
|
)
|
||||||
# middle of the above msg
|
elif etype in {
|
||||||
# content..
|
ContextCancelled,
|
||||||
# -[ ] prolly helper-func it too
|
}:
|
||||||
# in our `.log` module..
|
log.cancel(
|
||||||
# '------ - ------'
|
'Actor-nursery caught remote cancellation\n'
|
||||||
)
|
'\n'
|
||||||
|
f'{inner_err.tb_str}'
|
||||||
|
)
|
||||||
|
else:
|
||||||
|
log.exception(
|
||||||
|
'Nursery errored with:\n'
|
||||||
|
|
||||||
# cancel all subactors
|
# TODO: same thing as in
|
||||||
await an.cancel()
|
# `._invoke()` to compute how to
|
||||||
|
# place this div-line in the
|
||||||
|
# middle of the above msg
|
||||||
|
# content..
|
||||||
|
# -[ ] prolly helper-func it too
|
||||||
|
# in our `.log` module..
|
||||||
|
# '------ - ------'
|
||||||
|
)
|
||||||
|
|
||||||
# ria_nursery scope end
|
# cancel all subactors
|
||||||
|
await an.cancel()
|
||||||
|
|
||||||
# TODO: this is the handler around the ``.run_in_actor()``
|
# ria_nursery scope end
|
||||||
# nursery. Ideally we can drop this entirely in the future as
|
|
||||||
# the whole ``.run_in_actor()`` API should be built "on top of"
|
|
||||||
# this lower level spawn-request-cancel "daemon actor" API where
|
|
||||||
# a local in-actor task nursery is used with one-to-one task
|
|
||||||
# + `await Portal.run()` calls and the results/errors are
|
|
||||||
# handled directly (inline) and errors by the local nursery.
|
|
||||||
except (
|
|
||||||
Exception,
|
|
||||||
BaseExceptionGroup,
|
|
||||||
trio.Cancelled
|
|
||||||
) as _outer_err:
|
|
||||||
outer_err = _outer_err
|
|
||||||
|
|
||||||
an._scope_error = outer_err or inner_err
|
# TODO: this is the handler around the ``.run_in_actor()``
|
||||||
|
# nursery. Ideally we can drop this entirely in the future as
|
||||||
|
# the whole ``.run_in_actor()`` API should be built "on top of"
|
||||||
|
# this lower level spawn-request-cancel "daemon actor" API where
|
||||||
|
# a local in-actor task nursery is used with one-to-one task
|
||||||
|
# + `await Portal.run()` calls and the results/errors are
|
||||||
|
# handled directly (inline) and errors by the local nursery.
|
||||||
|
except (
|
||||||
|
Exception,
|
||||||
|
BaseExceptionGroup,
|
||||||
|
trio.Cancelled
|
||||||
|
) as _outer_err:
|
||||||
|
outer_err = _outer_err
|
||||||
|
|
||||||
# XXX: yet another guard before allowing the cancel
|
# XXX: yet another guard before allowing the cancel
|
||||||
# sequence in case a (single) child is in debug.
|
# sequence in case a (single) child is in debug.
|
||||||
await debug.maybe_wait_for_debugger(
|
await debug.maybe_wait_for_debugger(
|
||||||
child_in_debug=an._at_least_one_child_in_debug
|
child_in_debug=an._at_least_one_child_in_debug
|
||||||
)
|
|
||||||
|
|
||||||
# If actor-local error was raised while waiting on
|
|
||||||
# ".run_in_actor()" actors then we also want to cancel all
|
|
||||||
# remaining sub-actors (due to our lone strategy:
|
|
||||||
# one-cancels-all).
|
|
||||||
if an._children:
|
|
||||||
log.cancel(
|
|
||||||
'Actor-nursery cancelling due error type:\n'
|
|
||||||
f'{outer_err}\n'
|
|
||||||
)
|
)
|
||||||
with trio.CancelScope(shield=True):
|
|
||||||
await an.cancel()
|
|
||||||
raise
|
|
||||||
|
|
||||||
finally:
|
# If actor-local error was raised while waiting on
|
||||||
# No errors were raised while awaiting ".run_in_actor()"
|
# ".run_in_actor()" actors then we also want to cancel all
|
||||||
# actors but those actors may have returned remote errors as
|
# remaining sub-actors (due to our lone strategy:
|
||||||
# results (meaning they errored remotely and have relayed
|
# one-cancels-all).
|
||||||
# those errors back to this parent actor). The errors are
|
|
||||||
# collected in ``errors`` so cancel all actors, summarize
|
|
||||||
# all errors and re-raise.
|
|
||||||
if errors:
|
|
||||||
if an._children:
|
if an._children:
|
||||||
|
log.cancel(
|
||||||
|
'Actor-nursery cancelling due error type:\n'
|
||||||
|
f'{outer_err}\n'
|
||||||
|
)
|
||||||
with trio.CancelScope(shield=True):
|
with trio.CancelScope(shield=True):
|
||||||
await an.cancel()
|
await an.cancel()
|
||||||
|
|
||||||
# use `BaseExceptionGroup` as needed
|
raise
|
||||||
if len(errors) > 1:
|
|
||||||
raise BaseExceptionGroup(
|
|
||||||
'tractor.ActorNursery errored with',
|
|
||||||
tuple(errors.values()),
|
|
||||||
)
|
|
||||||
else:
|
|
||||||
raise list(errors.values())[0]
|
|
||||||
|
|
||||||
# show frame on any (likely) internal error
|
finally:
|
||||||
if (
|
scope_exc = an._scope_error = outer_err or inner_err
|
||||||
not an.cancelled
|
# await debug.pause(shield=True)
|
||||||
and an._scope_error
|
# if scope_exc:
|
||||||
):
|
# errors[actor.aid] = scope_exc
|
||||||
__tracebackhide__: bool = False
|
|
||||||
|
|
||||||
# da_nursery scope end - nursery checkpoint
|
# show this frame on any internal error
|
||||||
# final exit
|
if (
|
||||||
|
not an.cancelled
|
||||||
|
and
|
||||||
|
scope_exc
|
||||||
|
):
|
||||||
|
__tracebackhide__: bool = False
|
||||||
|
|
||||||
|
# NOTE, it's possible no errors were raised while
|
||||||
|
# awaiting ".run_in_actor()" actors but those
|
||||||
|
# sub-actors may have delivered remote errors as
|
||||||
|
# results, normally captured via machinery in
|
||||||
|
# `._spawn.cancel_on_completion()`.
|
||||||
|
#
|
||||||
|
# Any such remote errors are collected in `an._errors`
|
||||||
|
# which is summarized via `ActorNursery.maybe_error`
|
||||||
|
# which is maybe re-raised in an outer block (below).
|
||||||
|
#
|
||||||
|
# So here we first cancel all subactors the summarize
|
||||||
|
# all errors and then later (in that outer block)
|
||||||
|
# maybe-raise on a "non-graceful" cancellation
|
||||||
|
# outcome, normally as a summary EG.
|
||||||
|
if (
|
||||||
|
scope_exc
|
||||||
|
or
|
||||||
|
errors
|
||||||
|
):
|
||||||
|
|
||||||
|
if an._children:
|
||||||
|
with trio.CancelScope(shield=True):
|
||||||
|
await an.cancel()
|
||||||
|
|
||||||
|
# cancel outer tn so we unblock outside this
|
||||||
|
# finally!
|
||||||
|
da_nursery.cance_scope.cancel()
|
||||||
|
#
|
||||||
|
# ^TODO? still don't get why needed?
|
||||||
|
# - an.cancel() should cause all spawn-subtasks
|
||||||
|
# to eventually exit?
|
||||||
|
# - also, could (instead) we sync to an event here before
|
||||||
|
# (ever) calling `an.cancel()`??
|
||||||
|
|
||||||
|
# `da_nursery` scope end, thus a checkpoint.
|
||||||
|
finally:
|
||||||
|
|
||||||
|
# raise any eg compiled from all subs
|
||||||
|
# ??TODO should we also adopt strict-egs here like
|
||||||
|
# `trio.Nursery`??
|
||||||
|
#
|
||||||
|
# XXX justification notes,
|
||||||
|
# docs: https://trio.readthedocs.io/en/stable/reference-core.html#historical-note-non-strict-exceptiongroups
|
||||||
|
# anthropic: https://discuss.python.org/t/using-exceptiongroup-at-anthropic-experience-report/20888
|
||||||
|
# gh: https://github.com/python-trio/trio/issues/611
|
||||||
|
if an_exc := an.maybe_error:
|
||||||
|
raise an_exc
|
||||||
|
|
||||||
|
if scope_exc := an._scope_error:
|
||||||
|
raise scope_exc
|
||||||
|
|
||||||
|
# @acm-fn scope exit
|
||||||
|
|
||||||
|
|
||||||
_shutdown_msg: str = (
|
_shutdown_msg: str = (
|
||||||
|
@ -648,7 +754,7 @@ _shutdown_msg: str = (
|
||||||
# @api_frame
|
# @api_frame
|
||||||
async def open_nursery(
|
async def open_nursery(
|
||||||
*, # named params only!
|
*, # named params only!
|
||||||
hide_tb: bool = True,
|
hide_tb: bool = False,
|
||||||
**kwargs,
|
**kwargs,
|
||||||
# ^TODO, paramspec for `open_root_actor()`
|
# ^TODO, paramspec for `open_root_actor()`
|
||||||
|
|
||||||
|
@ -684,16 +790,21 @@ async def open_nursery(
|
||||||
# mark us for teardown on exit
|
# mark us for teardown on exit
|
||||||
implicit_runtime: bool = True
|
implicit_runtime: bool = True
|
||||||
|
|
||||||
async with open_root_actor(
|
async with (
|
||||||
hide_tb=hide_tb,
|
# collapse_eg(hide_tb=hide_tb),
|
||||||
**kwargs,
|
open_root_actor(
|
||||||
) as actor:
|
hide_tb=hide_tb,
|
||||||
|
**kwargs,
|
||||||
|
) as actor,
|
||||||
|
):
|
||||||
assert actor is current_actor()
|
assert actor is current_actor()
|
||||||
|
|
||||||
try:
|
try:
|
||||||
async with _open_and_supervise_one_cancels_all_nursery(
|
async with (
|
||||||
actor
|
_open_and_supervise_one_cancels_all_nursery(
|
||||||
) as an:
|
actor
|
||||||
|
) as an
|
||||||
|
):
|
||||||
|
|
||||||
# NOTE: mark this nursery as having
|
# NOTE: mark this nursery as having
|
||||||
# implicitly started the root actor so
|
# implicitly started the root actor so
|
||||||
|
|
Loading…
Reference in New Issue