Compare commits

...

4 Commits

Author SHA1 Message Date
Tyler Goodlet 00583b7671 Use `nest_from_op()` in some runtime logs for actor-state-repring 2025-06-12 23:26:38 -04:00
Tyler Goodlet 78beeebe8f Augment `nest_from_op()` with a `nest_prefix: str`
Such that the caller can pass chars they'd like to prefix the first line
of the (indented) `tree_str`, commonly we use '|_' for "obj fields".
2025-06-12 23:22:46 -04:00
Tyler Goodlet 333fde39ad Detail the docs on `Context._maybe_raise_remote_err()` 2025-06-12 23:22:16 -04:00
Tyler Goodlet 502c7a1dc6 Move `.is_multi_cancelled()` to `.trioniics._beg`
Since it's for beg filtering, the current impl should be renamed anyway;
it's not just for filtering cancelled excs.

Deats,
- added a real doc string, links to official eg docs and fixed the
  return typing.
- adjust all internal imports to match.
2025-06-12 23:16:29 -04:00
12 changed files with 204 additions and 104 deletions

View File

@ -252,7 +252,7 @@ def test_simple_context(
pass pass
except BaseExceptionGroup as beg: except BaseExceptionGroup as beg:
# XXX: on windows it seems we may have to expect the group error # XXX: on windows it seems we may have to expect the group error
from tractor._exceptions import is_multi_cancelled from tractor.trionics import is_multi_cancelled
assert is_multi_cancelled(beg) assert is_multi_cancelled(beg)
else: else:
trio.run(main) trio.run(main)

View File

@ -1069,9 +1069,25 @@ class Context:
|RemoteActorError # stream overrun caused and ignored by us |RemoteActorError # stream overrun caused and ignored by us
): ):
''' '''
Maybe raise a remote error depending on the type of error Maybe raise a remote error depending on the type of error and
and *who* (i.e. which task from which actor) requested *who*, i.e. which side of the task pair across actors,
a cancellation (if any). requested a cancellation (if any).
Depending on the input config-params suppress raising
certain remote excs:
- if `remote_error: ContextCancelled` (ctxc) AND this side's
task is the "requester", it at somem point called
`Context.cancel()`, then the peer's ctxc is treated
as a "cancel ack".
|_ this behaves exactly like how `trio.Nursery.cancel_scope`
absorbs any `BaseExceptionGroup[trio.Cancelled]` wherein the
owning parent task never will raise a `trio.Cancelled`
if `CancelScope.cancel_called == True`.
- `remote_error: StreamOverrrun` (overrun) AND
`raise_overrun_from_self` is set.
''' '''
__tracebackhide__: bool = hide_tb __tracebackhide__: bool = hide_tb
@ -1113,18 +1129,19 @@ class Context:
# for this ^, NO right? # for this ^, NO right?
) or ( ) or (
# NOTE: whenever this context is the cause of an # NOTE: whenever this side is the cause of an
# overrun on the remote side (aka we sent msgs too # overrun on the peer side, i.e. we sent msgs too
# fast that the remote task was overrun according # fast and the peer task was overrun according
# to `MsgStream` buffer settings) AND the caller # to `MsgStream` buffer settings, AND this was
# has requested to not raise overruns this side # called with `raise_overrun_from_self=True` (the
# caused, we also silently absorb any remotely # default), silently absorb any `StreamOverrun`.
# boxed `StreamOverrun`. This is mostly useful for #
# supressing such faults during # XXX, this is namely useful for supressing such faults
# cancellation/error/final-result handling inside # during cancellation/error/final-result handling inside
# `msg._ops.drain_to_final_msg()` such that we do not # `.msg._ops.drain_to_final_msg()` such that we do not
# raise such errors particularly in the case where # raise during a cancellation-request, i.e. when
# `._cancel_called == True`. # `._cancel_called == True`.
#
not raise_overrun_from_self not raise_overrun_from_self
and isinstance(remote_error, RemoteActorError) and isinstance(remote_error, RemoteActorError)
and remote_error.boxed_type is StreamOverrun and remote_error.boxed_type is StreamOverrun

View File

@ -1246,55 +1246,6 @@ def unpack_error(
return exc return exc
def is_multi_cancelled(
exc: BaseException|BaseExceptionGroup,
ignore_nested: set[BaseException] = set(),
) -> bool|BaseExceptionGroup:
'''
Predicate to determine if an `BaseExceptionGroup` only contains
some (maybe nested) set of sub-grouped exceptions (like only
`trio.Cancelled`s which get swallowed silently by default) and is
thus the result of "gracefully cancelling" a collection of
sub-tasks (or other conc primitives) and receiving a "cancelled
ACK" from each after termination.
Docs:
----
- https://docs.python.org/3/library/exceptions.html#exception-groups
- https://docs.python.org/3/library/exceptions.html#BaseExceptionGroup.subgroup
'''
if (
not ignore_nested
or
trio.Cancelled in ignore_nested
# XXX always count-in `trio`'s native signal
):
ignore_nested.update({trio.Cancelled})
if isinstance(exc, BaseExceptionGroup):
matched_exc: BaseExceptionGroup|None = exc.subgroup(
tuple(ignore_nested),
# TODO, complain about why not allowed XD
# condition=tuple(ignore_nested),
)
if matched_exc is not None:
return matched_exc
# NOTE, IFF no excs types match (throughout the error-tree)
# -> return `False`, OW return the matched sub-eg.
#
# IOW, for the inverse of ^ for the purpose of
# maybe-enter-REPL--logic: "only debug when the err-tree contains
# at least one exc-type NOT in `ignore_nested`" ; i.e. the case where
# we fallthrough and return `False` here.
return False
def _raise_from_unexpected_msg( def _raise_from_unexpected_msg(
ctx: Context, ctx: Context,
msg: MsgType, msg: MsgType,

View File

@ -47,6 +47,7 @@ from ._runtime import (
from .devx import ( from .devx import (
debug, debug,
_frame_stack, _frame_stack,
pformat as _pformat,
) )
from . import _spawn from . import _spawn
from . import _state from . import _state
@ -61,9 +62,11 @@ from ._addr import (
mk_uuid, mk_uuid,
wrap_address, wrap_address,
) )
from .trionics import (
is_multi_cancelled,
)
from ._exceptions import ( from ._exceptions import (
RuntimeFailure, RuntimeFailure,
is_multi_cancelled,
) )
@ -511,10 +514,14 @@ async def open_root_actor(
# for an in nurseries: # for an in nurseries:
# tempn.start_soon(an.exited.wait) # tempn.start_soon(an.exited.wait)
op_nested_actor_repr: str = _pformat.nest_from_op(
input_op='>) ',
tree_str=actor.pformat(),
nest_prefix='|_',
)
logger.info( logger.info(
f'Closing down root actor\n' f'Closing down root actor\n'
f'>)\n' f'{op_nested_actor_repr}\n'
f'|_{actor}\n'
) )
await actor.cancel(None) # self cancel await actor.cancel(None) # self cancel
finally: finally:

View File

@ -55,6 +55,7 @@ from typing import (
TYPE_CHECKING, TYPE_CHECKING,
) )
import uuid import uuid
import textwrap
from types import ModuleType from types import ModuleType
import warnings import warnings
@ -97,7 +98,10 @@ from ._exceptions import (
MsgTypeError, MsgTypeError,
unpack_error, unpack_error,
) )
from .devx import debug from .devx import (
debug,
pformat as _pformat
)
from ._discovery import get_registry from ._discovery import get_registry
from ._portal import Portal from ._portal import Portal
from . import _state from . import _state
@ -339,46 +343,76 @@ class Actor:
def pid(self) -> int: def pid(self) -> int:
return self._aid.pid return self._aid.pid
def pformat(self) -> str: def pformat(
ds: str = '=' self,
ds: str = ':',
indent: int = 0,
) -> str:
fields_sect_prefix: str = ' |_'
parent_uid: tuple|None = None parent_uid: tuple|None = None
if rent_chan := self._parent_chan: if rent_chan := self._parent_chan:
parent_uid = rent_chan.uid parent_uid = rent_chan.uid
peers: list = [] peers: list = []
server: _server.IPCServer = self.ipc_server server: _server.IPCServer = self.ipc_server
ipc_server_sect: str = ''
if server: if server:
peers: list[tuple] = list(server._peer_connected) peers: list[tuple] = list(server._peer_connected)
# create field ln as a key-header indented under
# and up to the section's key prefix.
# field_ln_header: str = textwrap.indent(
# text=f"ipc_server{ds}",
# prefix=' '*len(fields_sect_prefix),
# )
# ^XXX if we were to indent `repr(Server)` to
# '<key>: '
# _here_^
server_repr: str = textwrap.indent(
text=self._ipc_server.pformat(),
# prefix=' '*len(field_ln_header),
prefix=' '*len(fields_sect_prefix),
)
ipc_server_sect: str = (
# f'{field_ln_header}\n'
f'{server_repr}'
)
fmtstr: str = ( fmtstr: str = (
f' |_id: {self.aid!r}\n' f' |_id: {self.aid!r}\n'
# f" aid{ds}{self.aid!r}\n" # f" aid{ds}{self.aid!r}\n"
f" parent{ds}{parent_uid}\n" f" parent{ds}{parent_uid}\n"
f'\n' # f'\n'
f' |_ipc: {len(peers)!r} connected peers\n' f' |_ipc: {len(peers)!r} connected peers\n'
f" peers{ds}{peers!r}\n" f" peers{ds}{peers!r}\n"
f" ipc_server{ds}{self._ipc_server}\n" f"{ipc_server_sect}"
f'\n' # f'\n'
f' |_rpc: {len(self._rpc_tasks)} tasks\n' f' |_rpc: {len(self._rpc_tasks)} tasks\n'
f" ctxs{ds}{len(self._contexts)}\n" f" ctxs{ds}{len(self._contexts)}\n"
f'\n' # f'\n'
f' |_runtime: ._task{ds}{self._task!r}\n' f' |_runtime: ._task{ds}{self._task!r}\n'
f' _spawn_method{ds}{self._spawn_method}\n' f' _spawn_method{ds}{self._spawn_method}\n'
f' _actoruid2nursery{ds}{self._actoruid2nursery}\n' f' _actoruid2nursery{ds}{self._actoruid2nursery}\n'
f' _forkserver_info{ds}{self._forkserver_info}\n' f' _forkserver_info{ds}{self._forkserver_info}\n'
f'\n' # f'\n'
f' |_state: "TODO: .repr_state()"\n' f' |_state: "TODO: .repr_state()"\n'
f' _cancel_complete{ds}{self._cancel_complete}\n' f' _cancel_complete{ds}{self._cancel_complete}\n'
f' _cancel_called_by_remote{ds}{self._cancel_called_by_remote}\n' f' _cancel_called_by_remote{ds}{self._cancel_called_by_remote}\n'
f' _cancel_called{ds}{self._cancel_called}\n' f' _cancel_called{ds}{self._cancel_called}\n'
) )
return ( _repr: str = (
'<Actor(\n' '<Actor(\n'
+ +
fmtstr fmtstr
+ +
')>\n' ')>\n'
) )
if indent:
_repr: str = textwrap.indent(
text=_repr,
prefix=' '*indent,
)
return _repr
__repr__ = pformat __repr__ = pformat
@ -1654,10 +1688,15 @@ async def async_main(
'-> All peer channels are complete\n' '-> All peer channels are complete\n'
) )
op_nested_actor_repr: str = _pformat.nest_from_op(
input_op=')> ',
tree_str=actor.pformat(),
nest_prefix='|_',
back_from_op=2,
)
teardown_report += ( teardown_report += (
'Actor runtime exiting\n' 'Actor runtime exited\n'
f'>)\n' f'{op_nested_actor_repr}\n'
f'|_{actor}\n'
) )
log.info(teardown_report) log.info(teardown_report)

View File

@ -40,8 +40,10 @@ from ._state import current_actor, is_main_process
from .log import get_logger, get_loglevel from .log import get_logger, get_loglevel
from ._runtime import Actor from ._runtime import Actor
from ._portal import Portal from ._portal import Portal
from ._exceptions import ( from .trionics import (
is_multi_cancelled, is_multi_cancelled,
)
from ._exceptions import (
ContextCancelled, ContextCancelled,
) )
from ._root import ( from ._root import (

View File

@ -59,7 +59,7 @@ from tractor._state import (
debug_mode, debug_mode,
) )
from tractor.log import get_logger from tractor.log import get_logger
from tractor._exceptions import ( from tractor.trionics import (
is_multi_cancelled, is_multi_cancelled,
) )
from ._trace import ( from ._trace import (

View File

@ -328,6 +328,8 @@ def nest_from_op(
# NOTE: so move back-from-the-left of the `input_op` by # NOTE: so move back-from-the-left of the `input_op` by
# this amount. # this amount.
back_from_op: int = 0, back_from_op: int = 0,
nest_prefix: str = ''
) -> str: ) -> str:
''' '''
Depth-increment the input (presumably hierarchy/supervision) Depth-increment the input (presumably hierarchy/supervision)
@ -336,15 +338,22 @@ def nest_from_op(
`tree_str` to nest content aligned with the ops last char. `tree_str` to nest content aligned with the ops last char.
''' '''
indented_tree_str: str = textwrap.indent(
tree_str,
prefix=' ' *(
len(input_op)
-
(back_from_op + 1)
),
)
# inject any provided nesting-prefix chars
# into the head of the first line.
if nest_prefix:
indented_tree_str: str = (
f'{nest_prefix}'
f'{indented_tree_str[len(nest_prefix):]}'
)
return ( return (
f'{input_op}\n' f'{input_op}\n'
+ f'{indented_tree_str}'
textwrap.indent(
tree_str,
prefix=(
len(input_op)
-
(back_from_op + 1)
) * ' ',
)
) )

View File

@ -38,7 +38,6 @@ from typing import (
import tractor import tractor
from tractor._exceptions import ( from tractor._exceptions import (
InternalError, InternalError,
is_multi_cancelled,
TrioTaskExited, TrioTaskExited,
TrioCancelled, TrioCancelled,
AsyncioTaskExited, AsyncioTaskExited,
@ -59,6 +58,9 @@ from tractor.log import (
# from tractor.msg import ( # from tractor.msg import (
# pretty_struct, # pretty_struct,
# ) # )
from tractor.trionics import (
is_multi_cancelled,
)
from tractor.trionics._broadcast import ( from tractor.trionics._broadcast import (
broadcast_receiver, broadcast_receiver,
BroadcastReceiver, BroadcastReceiver,

View File

@ -32,4 +32,5 @@ from ._broadcast import (
from ._beg import ( from ._beg import (
collapse_eg as collapse_eg, collapse_eg as collapse_eg,
maybe_collapse_eg as maybe_collapse_eg, maybe_collapse_eg as maybe_collapse_eg,
is_multi_cancelled as is_multi_cancelled,
) )

View File

@ -22,6 +22,11 @@ first-class-`trio` from a historical perspective B)
from contextlib import ( from contextlib import (
asynccontextmanager as acm, asynccontextmanager as acm,
) )
from typing import (
Literal,
)
import trio
def maybe_collapse_eg( def maybe_collapse_eg(
@ -56,3 +61,62 @@ async def collapse_eg():
raise exc raise exc
raise beg raise beg
def is_multi_cancelled(
beg: BaseException|BaseExceptionGroup,
ignore_nested: set[BaseException] = set(),
) -> Literal[False]|BaseExceptionGroup:
'''
Predicate to determine if an `BaseExceptionGroup` only contains
some (maybe nested) set of sub-grouped exceptions (like only
`trio.Cancelled`s which get swallowed silently by default) and is
thus the result of "gracefully cancelling" a collection of
sub-tasks (or other conc primitives) and receiving a "cancelled
ACK" from each after termination.
Docs:
----
- https://docs.python.org/3/library/exceptions.html#exception-groups
- https://docs.python.org/3/library/exceptions.html#BaseExceptionGroup.subgroup
'''
if (
not ignore_nested
or
trio.Cancelled not in ignore_nested
# XXX always count-in `trio`'s native signal
):
ignore_nested.update({trio.Cancelled})
if isinstance(beg, BaseExceptionGroup):
# https://docs.python.org/3/library/exceptions.html#BaseExceptionGroup.subgroup
# |_ "The condition can be an exception type or tuple of
# exception types, in which case each exception is checked
# for a match using the same check that is used in an
# except clause. The condition can also be a callable
# (other than a type object) that accepts an exception as
# its single argument and returns true for the exceptions
# that should be in the subgroup."
matched_exc: BaseExceptionGroup|None = beg.subgroup(
tuple(ignore_nested),
# ??TODO, complain about why not allowed to use
# named arg style calling???
# XD .. wtf?
# condition=tuple(ignore_nested),
)
if matched_exc is not None:
return matched_exc
# NOTE, IFF no excs types match (throughout the error-tree)
# -> return `False`, OW return the matched sub-eg.
#
# IOW, for the inverse of ^ for the purpose of
# maybe-enter-REPL--logic: "only debug when the err-tree contains
# at least one exc-type NOT in `ignore_nested`" ; i.e. the case where
# we fallthrough and return `False` here.
return False

View File

@ -40,6 +40,8 @@ from typing import (
import trio import trio
from tractor._state import current_actor from tractor._state import current_actor
from tractor.log import get_logger from tractor.log import get_logger
# from ._beg import collapse_eg
if TYPE_CHECKING: if TYPE_CHECKING:
from tractor import ActorNursery from tractor import ActorNursery
@ -111,17 +113,19 @@ async def gather_contexts(
None, None,
]: ]:
''' '''
Concurrently enter a sequence of async context managers (acms), Concurrently enter a sequence of async context managers (`acm`s),
each from a separate `trio` task and deliver the unwrapped each scheduled in a separate `trio.Task` and deliver their
`yield`-ed values in the same order once all managers have entered. unwrapped `yield`-ed values in the same order once all `@acm`s
in every task have entered.
On exit, all acms are subsequently and concurrently exited. On exit, all `acm`s are subsequently and concurrently exited with
**no order guarantees**.
This function is somewhat similar to a batch of non-blocking This function is somewhat similar to a batch of non-blocking
calls to `contextlib.AsyncExitStack.enter_async_context()` calls to `contextlib.AsyncExitStack.enter_async_context()`
(inside a loop) *in combo with* a `asyncio.gather()` to get the (inside a loop) *in combo with* a `asyncio.gather()` to get the
`.__aenter__()`-ed values, except the managers are both `.__aenter__()`-ed values, except the managers are both
concurrently entered and exited and *cancellation just works*(R). concurrently entered and exited and *cancellation-just-works*.
''' '''
seed: int = id(mngrs) seed: int = id(mngrs)
@ -141,16 +145,20 @@ async def gather_contexts(
if not mngrs: if not mngrs:
raise ValueError( raise ValueError(
'`.trionics.gather_contexts()` input mngrs is empty?\n' '`.trionics.gather_contexts()` input mngrs is empty?\n'
'\n'
'Did try to use inline generator syntax?\n' 'Did try to use inline generator syntax?\n'
'Use a non-lazy iterator or sequence type intead!' 'Use a non-lazy iterator or sequence-type intead!\n'
) )
async with trio.open_nursery( async with (
strict_exception_groups=False, # collapse_eg(),
# ^XXX^ TODO? soo roll our own then ?? trio.open_nursery(
# -> since we kinda want the "if only one `.exception` then # strict_exception_groups=False,
# just raise that" interface? # ^XXX^ TODO? soo roll our own then ??
) as tn: # -> since we kinda want the "if only one `.exception` then
# just raise that" interface?
) as tn,
):
for mngr in mngrs: for mngr in mngrs:
tn.start_soon( tn.start_soon(
_enter_and_wait, _enter_and_wait,
@ -167,7 +175,7 @@ async def gather_contexts(
try: try:
yield tuple(unwrapped.values()) yield tuple(unwrapped.values())
finally: finally:
# NOTE: this is ABSOLUTELY REQUIRED to avoid # XXX NOTE: this is ABSOLUTELY REQUIRED to avoid
# the following wacky bug: # the following wacky bug:
# <tractorbugurlhere> # <tractorbugurlhere>
parent_exit.set() parent_exit.set()