Compare commits

...

4 Commits

Author SHA1 Message Date
Tyler Goodlet 00583b7671 Use `nest_from_op()` in some runtime logs for actor-state-repring 2025-06-12 23:26:38 -04:00
Tyler Goodlet 78beeebe8f Augment `nest_from_op()` with a `nest_prefix: str`
Such that the caller can pass chars they'd like to prefix the first line
of the (indented) `tree_str`, commonly we use '|_' for "obj fields".
2025-06-12 23:22:46 -04:00
Tyler Goodlet 333fde39ad Detail the docs on `Context._maybe_raise_remote_err()` 2025-06-12 23:22:16 -04:00
Tyler Goodlet 502c7a1dc6 Move `.is_multi_cancelled()` to `.trioniics._beg`
Since it's for beg filtering, the current impl should be renamed anyway;
it's not just for filtering cancelled excs.

Deats,
- added a real doc string, links to official eg docs and fixed the
  return typing.
- adjust all internal imports to match.
2025-06-12 23:16:29 -04:00
12 changed files with 204 additions and 104 deletions

View File

@ -252,7 +252,7 @@ def test_simple_context(
pass
except BaseExceptionGroup as beg:
# XXX: on windows it seems we may have to expect the group error
from tractor._exceptions import is_multi_cancelled
from tractor.trionics import is_multi_cancelled
assert is_multi_cancelled(beg)
else:
trio.run(main)

View File

@ -1069,9 +1069,25 @@ class Context:
|RemoteActorError # stream overrun caused and ignored by us
):
'''
Maybe raise a remote error depending on the type of error
and *who* (i.e. which task from which actor) requested
a cancellation (if any).
Maybe raise a remote error depending on the type of error and
*who*, i.e. which side of the task pair across actors,
requested a cancellation (if any).
Depending on the input config-params suppress raising
certain remote excs:
- if `remote_error: ContextCancelled` (ctxc) AND this side's
task is the "requester", it at somem point called
`Context.cancel()`, then the peer's ctxc is treated
as a "cancel ack".
|_ this behaves exactly like how `trio.Nursery.cancel_scope`
absorbs any `BaseExceptionGroup[trio.Cancelled]` wherein the
owning parent task never will raise a `trio.Cancelled`
if `CancelScope.cancel_called == True`.
- `remote_error: StreamOverrrun` (overrun) AND
`raise_overrun_from_self` is set.
'''
__tracebackhide__: bool = hide_tb
@ -1113,18 +1129,19 @@ class Context:
# for this ^, NO right?
) or (
# NOTE: whenever this context is the cause of an
# overrun on the remote side (aka we sent msgs too
# fast that the remote task was overrun according
# to `MsgStream` buffer settings) AND the caller
# has requested to not raise overruns this side
# caused, we also silently absorb any remotely
# boxed `StreamOverrun`. This is mostly useful for
# supressing such faults during
# cancellation/error/final-result handling inside
# `msg._ops.drain_to_final_msg()` such that we do not
# raise such errors particularly in the case where
# NOTE: whenever this side is the cause of an
# overrun on the peer side, i.e. we sent msgs too
# fast and the peer task was overrun according
# to `MsgStream` buffer settings, AND this was
# called with `raise_overrun_from_self=True` (the
# default), silently absorb any `StreamOverrun`.
#
# XXX, this is namely useful for supressing such faults
# during cancellation/error/final-result handling inside
# `.msg._ops.drain_to_final_msg()` such that we do not
# raise during a cancellation-request, i.e. when
# `._cancel_called == True`.
#
not raise_overrun_from_self
and isinstance(remote_error, RemoteActorError)
and remote_error.boxed_type is StreamOverrun

View File

@ -1246,55 +1246,6 @@ def unpack_error(
return exc
def is_multi_cancelled(
exc: BaseException|BaseExceptionGroup,
ignore_nested: set[BaseException] = set(),
) -> bool|BaseExceptionGroup:
'''
Predicate to determine if an `BaseExceptionGroup` only contains
some (maybe nested) set of sub-grouped exceptions (like only
`trio.Cancelled`s which get swallowed silently by default) and is
thus the result of "gracefully cancelling" a collection of
sub-tasks (or other conc primitives) and receiving a "cancelled
ACK" from each after termination.
Docs:
----
- https://docs.python.org/3/library/exceptions.html#exception-groups
- https://docs.python.org/3/library/exceptions.html#BaseExceptionGroup.subgroup
'''
if (
not ignore_nested
or
trio.Cancelled in ignore_nested
# XXX always count-in `trio`'s native signal
):
ignore_nested.update({trio.Cancelled})
if isinstance(exc, BaseExceptionGroup):
matched_exc: BaseExceptionGroup|None = exc.subgroup(
tuple(ignore_nested),
# TODO, complain about why not allowed XD
# condition=tuple(ignore_nested),
)
if matched_exc is not None:
return matched_exc
# NOTE, IFF no excs types match (throughout the error-tree)
# -> return `False`, OW return the matched sub-eg.
#
# IOW, for the inverse of ^ for the purpose of
# maybe-enter-REPL--logic: "only debug when the err-tree contains
# at least one exc-type NOT in `ignore_nested`" ; i.e. the case where
# we fallthrough and return `False` here.
return False
def _raise_from_unexpected_msg(
ctx: Context,
msg: MsgType,

View File

@ -47,6 +47,7 @@ from ._runtime import (
from .devx import (
debug,
_frame_stack,
pformat as _pformat,
)
from . import _spawn
from . import _state
@ -61,9 +62,11 @@ from ._addr import (
mk_uuid,
wrap_address,
)
from .trionics import (
is_multi_cancelled,
)
from ._exceptions import (
RuntimeFailure,
is_multi_cancelled,
)
@ -511,10 +514,14 @@ async def open_root_actor(
# for an in nurseries:
# tempn.start_soon(an.exited.wait)
op_nested_actor_repr: str = _pformat.nest_from_op(
input_op='>) ',
tree_str=actor.pformat(),
nest_prefix='|_',
)
logger.info(
f'Closing down root actor\n'
f'>)\n'
f'|_{actor}\n'
f'{op_nested_actor_repr}\n'
)
await actor.cancel(None) # self cancel
finally:

View File

@ -55,6 +55,7 @@ from typing import (
TYPE_CHECKING,
)
import uuid
import textwrap
from types import ModuleType
import warnings
@ -97,7 +98,10 @@ from ._exceptions import (
MsgTypeError,
unpack_error,
)
from .devx import debug
from .devx import (
debug,
pformat as _pformat
)
from ._discovery import get_registry
from ._portal import Portal
from . import _state
@ -339,46 +343,76 @@ class Actor:
def pid(self) -> int:
return self._aid.pid
def pformat(self) -> str:
ds: str = '='
def pformat(
self,
ds: str = ':',
indent: int = 0,
) -> str:
fields_sect_prefix: str = ' |_'
parent_uid: tuple|None = None
if rent_chan := self._parent_chan:
parent_uid = rent_chan.uid
peers: list = []
server: _server.IPCServer = self.ipc_server
ipc_server_sect: str = ''
if server:
peers: list[tuple] = list(server._peer_connected)
# create field ln as a key-header indented under
# and up to the section's key prefix.
# field_ln_header: str = textwrap.indent(
# text=f"ipc_server{ds}",
# prefix=' '*len(fields_sect_prefix),
# )
# ^XXX if we were to indent `repr(Server)` to
# '<key>: '
# _here_^
server_repr: str = textwrap.indent(
text=self._ipc_server.pformat(),
# prefix=' '*len(field_ln_header),
prefix=' '*len(fields_sect_prefix),
)
ipc_server_sect: str = (
# f'{field_ln_header}\n'
f'{server_repr}'
)
fmtstr: str = (
f' |_id: {self.aid!r}\n'
# f" aid{ds}{self.aid!r}\n"
f" parent{ds}{parent_uid}\n"
f'\n'
# f'\n'
f' |_ipc: {len(peers)!r} connected peers\n'
f" peers{ds}{peers!r}\n"
f" ipc_server{ds}{self._ipc_server}\n"
f'\n'
f"{ipc_server_sect}"
# f'\n'
f' |_rpc: {len(self._rpc_tasks)} tasks\n'
f" ctxs{ds}{len(self._contexts)}\n"
f'\n'
# f'\n'
f' |_runtime: ._task{ds}{self._task!r}\n'
f' _spawn_method{ds}{self._spawn_method}\n'
f' _actoruid2nursery{ds}{self._actoruid2nursery}\n'
f' _forkserver_info{ds}{self._forkserver_info}\n'
f'\n'
# f'\n'
f' |_state: "TODO: .repr_state()"\n'
f' _cancel_complete{ds}{self._cancel_complete}\n'
f' _cancel_called_by_remote{ds}{self._cancel_called_by_remote}\n'
f' _cancel_called{ds}{self._cancel_called}\n'
)
return (
_repr: str = (
'<Actor(\n'
+
fmtstr
+
')>\n'
)
if indent:
_repr: str = textwrap.indent(
text=_repr,
prefix=' '*indent,
)
return _repr
__repr__ = pformat
@ -1654,10 +1688,15 @@ async def async_main(
'-> All peer channels are complete\n'
)
op_nested_actor_repr: str = _pformat.nest_from_op(
input_op=')> ',
tree_str=actor.pformat(),
nest_prefix='|_',
back_from_op=2,
)
teardown_report += (
'Actor runtime exiting\n'
f'>)\n'
f'|_{actor}\n'
'Actor runtime exited\n'
f'{op_nested_actor_repr}\n'
)
log.info(teardown_report)

View File

@ -40,8 +40,10 @@ from ._state import current_actor, is_main_process
from .log import get_logger, get_loglevel
from ._runtime import Actor
from ._portal import Portal
from ._exceptions import (
from .trionics import (
is_multi_cancelled,
)
from ._exceptions import (
ContextCancelled,
)
from ._root import (

View File

@ -59,7 +59,7 @@ from tractor._state import (
debug_mode,
)
from tractor.log import get_logger
from tractor._exceptions import (
from tractor.trionics import (
is_multi_cancelled,
)
from ._trace import (

View File

@ -328,6 +328,8 @@ def nest_from_op(
# NOTE: so move back-from-the-left of the `input_op` by
# this amount.
back_from_op: int = 0,
nest_prefix: str = ''
) -> str:
'''
Depth-increment the input (presumably hierarchy/supervision)
@ -336,15 +338,22 @@ def nest_from_op(
`tree_str` to nest content aligned with the ops last char.
'''
return (
f'{input_op}\n'
+
textwrap.indent(
indented_tree_str: str = textwrap.indent(
tree_str,
prefix=(
prefix=' ' *(
len(input_op)
-
(back_from_op + 1)
) * ' ',
),
)
# inject any provided nesting-prefix chars
# into the head of the first line.
if nest_prefix:
indented_tree_str: str = (
f'{nest_prefix}'
f'{indented_tree_str[len(nest_prefix):]}'
)
return (
f'{input_op}\n'
f'{indented_tree_str}'
)

View File

@ -38,7 +38,6 @@ from typing import (
import tractor
from tractor._exceptions import (
InternalError,
is_multi_cancelled,
TrioTaskExited,
TrioCancelled,
AsyncioTaskExited,
@ -59,6 +58,9 @@ from tractor.log import (
# from tractor.msg import (
# pretty_struct,
# )
from tractor.trionics import (
is_multi_cancelled,
)
from tractor.trionics._broadcast import (
broadcast_receiver,
BroadcastReceiver,

View File

@ -32,4 +32,5 @@ from ._broadcast import (
from ._beg import (
collapse_eg as collapse_eg,
maybe_collapse_eg as maybe_collapse_eg,
is_multi_cancelled as is_multi_cancelled,
)

View File

@ -22,6 +22,11 @@ first-class-`trio` from a historical perspective B)
from contextlib import (
asynccontextmanager as acm,
)
from typing import (
Literal,
)
import trio
def maybe_collapse_eg(
@ -56,3 +61,62 @@ async def collapse_eg():
raise exc
raise beg
def is_multi_cancelled(
beg: BaseException|BaseExceptionGroup,
ignore_nested: set[BaseException] = set(),
) -> Literal[False]|BaseExceptionGroup:
'''
Predicate to determine if an `BaseExceptionGroup` only contains
some (maybe nested) set of sub-grouped exceptions (like only
`trio.Cancelled`s which get swallowed silently by default) and is
thus the result of "gracefully cancelling" a collection of
sub-tasks (or other conc primitives) and receiving a "cancelled
ACK" from each after termination.
Docs:
----
- https://docs.python.org/3/library/exceptions.html#exception-groups
- https://docs.python.org/3/library/exceptions.html#BaseExceptionGroup.subgroup
'''
if (
not ignore_nested
or
trio.Cancelled not in ignore_nested
# XXX always count-in `trio`'s native signal
):
ignore_nested.update({trio.Cancelled})
if isinstance(beg, BaseExceptionGroup):
# https://docs.python.org/3/library/exceptions.html#BaseExceptionGroup.subgroup
# |_ "The condition can be an exception type or tuple of
# exception types, in which case each exception is checked
# for a match using the same check that is used in an
# except clause. The condition can also be a callable
# (other than a type object) that accepts an exception as
# its single argument and returns true for the exceptions
# that should be in the subgroup."
matched_exc: BaseExceptionGroup|None = beg.subgroup(
tuple(ignore_nested),
# ??TODO, complain about why not allowed to use
# named arg style calling???
# XD .. wtf?
# condition=tuple(ignore_nested),
)
if matched_exc is not None:
return matched_exc
# NOTE, IFF no excs types match (throughout the error-tree)
# -> return `False`, OW return the matched sub-eg.
#
# IOW, for the inverse of ^ for the purpose of
# maybe-enter-REPL--logic: "only debug when the err-tree contains
# at least one exc-type NOT in `ignore_nested`" ; i.e. the case where
# we fallthrough and return `False` here.
return False

View File

@ -40,6 +40,8 @@ from typing import (
import trio
from tractor._state import current_actor
from tractor.log import get_logger
# from ._beg import collapse_eg
if TYPE_CHECKING:
from tractor import ActorNursery
@ -111,17 +113,19 @@ async def gather_contexts(
None,
]:
'''
Concurrently enter a sequence of async context managers (acms),
each from a separate `trio` task and deliver the unwrapped
`yield`-ed values in the same order once all managers have entered.
Concurrently enter a sequence of async context managers (`acm`s),
each scheduled in a separate `trio.Task` and deliver their
unwrapped `yield`-ed values in the same order once all `@acm`s
in every task have entered.
On exit, all acms are subsequently and concurrently exited.
On exit, all `acm`s are subsequently and concurrently exited with
**no order guarantees**.
This function is somewhat similar to a batch of non-blocking
calls to `contextlib.AsyncExitStack.enter_async_context()`
(inside a loop) *in combo with* a `asyncio.gather()` to get the
`.__aenter__()`-ed values, except the managers are both
concurrently entered and exited and *cancellation just works*(R).
concurrently entered and exited and *cancellation-just-works*.
'''
seed: int = id(mngrs)
@ -141,16 +145,20 @@ async def gather_contexts(
if not mngrs:
raise ValueError(
'`.trionics.gather_contexts()` input mngrs is empty?\n'
'\n'
'Did try to use inline generator syntax?\n'
'Use a non-lazy iterator or sequence type intead!'
'Use a non-lazy iterator or sequence-type intead!\n'
)
async with trio.open_nursery(
strict_exception_groups=False,
async with (
# collapse_eg(),
trio.open_nursery(
# strict_exception_groups=False,
# ^XXX^ TODO? soo roll our own then ??
# -> since we kinda want the "if only one `.exception` then
# just raise that" interface?
) as tn:
) as tn,
):
for mngr in mngrs:
tn.start_soon(
_enter_and_wait,
@ -167,7 +175,7 @@ async def gather_contexts(
try:
yield tuple(unwrapped.values())
finally:
# NOTE: this is ABSOLUTELY REQUIRED to avoid
# XXX NOTE: this is ABSOLUTELY REQUIRED to avoid
# the following wacky bug:
# <tractorbugurlhere>
parent_exit.set()