Compare commits

..

No commits in common. "00583b7671f952c070ceed1806948614e3c9cb0e" and "7b05547fccbf60b75a8feead8db10e69c6c325ab" have entirely different histories.

12 changed files with 104 additions and 204 deletions

View File

@ -252,7 +252,7 @@ def test_simple_context(
pass pass
except BaseExceptionGroup as beg: except BaseExceptionGroup as beg:
# XXX: on windows it seems we may have to expect the group error # XXX: on windows it seems we may have to expect the group error
from tractor.trionics import is_multi_cancelled from tractor._exceptions import is_multi_cancelled
assert is_multi_cancelled(beg) assert is_multi_cancelled(beg)
else: else:
trio.run(main) trio.run(main)

View File

@ -1069,25 +1069,9 @@ class Context:
|RemoteActorError # stream overrun caused and ignored by us |RemoteActorError # stream overrun caused and ignored by us
): ):
''' '''
Maybe raise a remote error depending on the type of error and Maybe raise a remote error depending on the type of error
*who*, i.e. which side of the task pair across actors, and *who* (i.e. which task from which actor) requested
requested a cancellation (if any). a cancellation (if any).
Depending on the input config-params suppress raising
certain remote excs:
- if `remote_error: ContextCancelled` (ctxc) AND this side's
task is the "requester", it at somem point called
`Context.cancel()`, then the peer's ctxc is treated
as a "cancel ack".
|_ this behaves exactly like how `trio.Nursery.cancel_scope`
absorbs any `BaseExceptionGroup[trio.Cancelled]` wherein the
owning parent task never will raise a `trio.Cancelled`
if `CancelScope.cancel_called == True`.
- `remote_error: StreamOverrrun` (overrun) AND
`raise_overrun_from_self` is set.
''' '''
__tracebackhide__: bool = hide_tb __tracebackhide__: bool = hide_tb
@ -1129,19 +1113,18 @@ class Context:
# for this ^, NO right? # for this ^, NO right?
) or ( ) or (
# NOTE: whenever this side is the cause of an # NOTE: whenever this context is the cause of an
# overrun on the peer side, i.e. we sent msgs too # overrun on the remote side (aka we sent msgs too
# fast and the peer task was overrun according # fast that the remote task was overrun according
# to `MsgStream` buffer settings, AND this was # to `MsgStream` buffer settings) AND the caller
# called with `raise_overrun_from_self=True` (the # has requested to not raise overruns this side
# default), silently absorb any `StreamOverrun`. # caused, we also silently absorb any remotely
# # boxed `StreamOverrun`. This is mostly useful for
# XXX, this is namely useful for supressing such faults # supressing such faults during
# during cancellation/error/final-result handling inside # cancellation/error/final-result handling inside
# `.msg._ops.drain_to_final_msg()` such that we do not # `msg._ops.drain_to_final_msg()` such that we do not
# raise during a cancellation-request, i.e. when # raise such errors particularly in the case where
# `._cancel_called == True`. # `._cancel_called == True`.
#
not raise_overrun_from_self not raise_overrun_from_self
and isinstance(remote_error, RemoteActorError) and isinstance(remote_error, RemoteActorError)
and remote_error.boxed_type is StreamOverrun and remote_error.boxed_type is StreamOverrun

View File

@ -1246,6 +1246,55 @@ def unpack_error(
return exc return exc
def is_multi_cancelled(
exc: BaseException|BaseExceptionGroup,
ignore_nested: set[BaseException] = set(),
) -> bool|BaseExceptionGroup:
'''
Predicate to determine if an `BaseExceptionGroup` only contains
some (maybe nested) set of sub-grouped exceptions (like only
`trio.Cancelled`s which get swallowed silently by default) and is
thus the result of "gracefully cancelling" a collection of
sub-tasks (or other conc primitives) and receiving a "cancelled
ACK" from each after termination.
Docs:
----
- https://docs.python.org/3/library/exceptions.html#exception-groups
- https://docs.python.org/3/library/exceptions.html#BaseExceptionGroup.subgroup
'''
if (
not ignore_nested
or
trio.Cancelled in ignore_nested
# XXX always count-in `trio`'s native signal
):
ignore_nested.update({trio.Cancelled})
if isinstance(exc, BaseExceptionGroup):
matched_exc: BaseExceptionGroup|None = exc.subgroup(
tuple(ignore_nested),
# TODO, complain about why not allowed XD
# condition=tuple(ignore_nested),
)
if matched_exc is not None:
return matched_exc
# NOTE, IFF no excs types match (throughout the error-tree)
# -> return `False`, OW return the matched sub-eg.
#
# IOW, for the inverse of ^ for the purpose of
# maybe-enter-REPL--logic: "only debug when the err-tree contains
# at least one exc-type NOT in `ignore_nested`" ; i.e. the case where
# we fallthrough and return `False` here.
return False
def _raise_from_unexpected_msg( def _raise_from_unexpected_msg(
ctx: Context, ctx: Context,
msg: MsgType, msg: MsgType,

View File

@ -47,7 +47,6 @@ from ._runtime import (
from .devx import ( from .devx import (
debug, debug,
_frame_stack, _frame_stack,
pformat as _pformat,
) )
from . import _spawn from . import _spawn
from . import _state from . import _state
@ -62,11 +61,9 @@ from ._addr import (
mk_uuid, mk_uuid,
wrap_address, wrap_address,
) )
from .trionics import (
is_multi_cancelled,
)
from ._exceptions import ( from ._exceptions import (
RuntimeFailure, RuntimeFailure,
is_multi_cancelled,
) )
@ -514,14 +511,10 @@ async def open_root_actor(
# for an in nurseries: # for an in nurseries:
# tempn.start_soon(an.exited.wait) # tempn.start_soon(an.exited.wait)
op_nested_actor_repr: str = _pformat.nest_from_op(
input_op='>) ',
tree_str=actor.pformat(),
nest_prefix='|_',
)
logger.info( logger.info(
f'Closing down root actor\n' f'Closing down root actor\n'
f'{op_nested_actor_repr}\n' f'>)\n'
f'|_{actor}\n'
) )
await actor.cancel(None) # self cancel await actor.cancel(None) # self cancel
finally: finally:

View File

@ -55,7 +55,6 @@ from typing import (
TYPE_CHECKING, TYPE_CHECKING,
) )
import uuid import uuid
import textwrap
from types import ModuleType from types import ModuleType
import warnings import warnings
@ -98,10 +97,7 @@ from ._exceptions import (
MsgTypeError, MsgTypeError,
unpack_error, unpack_error,
) )
from .devx import ( from .devx import debug
debug,
pformat as _pformat
)
from ._discovery import get_registry from ._discovery import get_registry
from ._portal import Portal from ._portal import Portal
from . import _state from . import _state
@ -343,76 +339,46 @@ class Actor:
def pid(self) -> int: def pid(self) -> int:
return self._aid.pid return self._aid.pid
def pformat( def pformat(self) -> str:
self, ds: str = '='
ds: str = ':',
indent: int = 0,
) -> str:
fields_sect_prefix: str = ' |_'
parent_uid: tuple|None = None parent_uid: tuple|None = None
if rent_chan := self._parent_chan: if rent_chan := self._parent_chan:
parent_uid = rent_chan.uid parent_uid = rent_chan.uid
peers: list = [] peers: list = []
server: _server.IPCServer = self.ipc_server server: _server.IPCServer = self.ipc_server
ipc_server_sect: str = ''
if server: if server:
peers: list[tuple] = list(server._peer_connected) peers: list[tuple] = list(server._peer_connected)
# create field ln as a key-header indented under
# and up to the section's key prefix.
# field_ln_header: str = textwrap.indent(
# text=f"ipc_server{ds}",
# prefix=' '*len(fields_sect_prefix),
# )
# ^XXX if we were to indent `repr(Server)` to
# '<key>: '
# _here_^
server_repr: str = textwrap.indent(
text=self._ipc_server.pformat(),
# prefix=' '*len(field_ln_header),
prefix=' '*len(fields_sect_prefix),
)
ipc_server_sect: str = (
# f'{field_ln_header}\n'
f'{server_repr}'
)
fmtstr: str = ( fmtstr: str = (
f' |_id: {self.aid!r}\n' f' |_id: {self.aid!r}\n'
# f" aid{ds}{self.aid!r}\n" # f" aid{ds}{self.aid!r}\n"
f" parent{ds}{parent_uid}\n" f" parent{ds}{parent_uid}\n"
# f'\n' f'\n'
f' |_ipc: {len(peers)!r} connected peers\n' f' |_ipc: {len(peers)!r} connected peers\n'
f" peers{ds}{peers!r}\n" f" peers{ds}{peers!r}\n"
f"{ipc_server_sect}" f" ipc_server{ds}{self._ipc_server}\n"
# f'\n' f'\n'
f' |_rpc: {len(self._rpc_tasks)} tasks\n' f' |_rpc: {len(self._rpc_tasks)} tasks\n'
f" ctxs{ds}{len(self._contexts)}\n" f" ctxs{ds}{len(self._contexts)}\n"
# f'\n' f'\n'
f' |_runtime: ._task{ds}{self._task!r}\n' f' |_runtime: ._task{ds}{self._task!r}\n'
f' _spawn_method{ds}{self._spawn_method}\n' f' _spawn_method{ds}{self._spawn_method}\n'
f' _actoruid2nursery{ds}{self._actoruid2nursery}\n' f' _actoruid2nursery{ds}{self._actoruid2nursery}\n'
f' _forkserver_info{ds}{self._forkserver_info}\n' f' _forkserver_info{ds}{self._forkserver_info}\n'
# f'\n' f'\n'
f' |_state: "TODO: .repr_state()"\n' f' |_state: "TODO: .repr_state()"\n'
f' _cancel_complete{ds}{self._cancel_complete}\n' f' _cancel_complete{ds}{self._cancel_complete}\n'
f' _cancel_called_by_remote{ds}{self._cancel_called_by_remote}\n' f' _cancel_called_by_remote{ds}{self._cancel_called_by_remote}\n'
f' _cancel_called{ds}{self._cancel_called}\n' f' _cancel_called{ds}{self._cancel_called}\n'
) )
_repr: str = ( return (
'<Actor(\n' '<Actor(\n'
+ +
fmtstr fmtstr
+ +
')>\n' ')>\n'
) )
if indent:
_repr: str = textwrap.indent(
text=_repr,
prefix=' '*indent,
)
return _repr
__repr__ = pformat __repr__ = pformat
@ -1688,15 +1654,10 @@ async def async_main(
'-> All peer channels are complete\n' '-> All peer channels are complete\n'
) )
op_nested_actor_repr: str = _pformat.nest_from_op(
input_op=')> ',
tree_str=actor.pformat(),
nest_prefix='|_',
back_from_op=2,
)
teardown_report += ( teardown_report += (
'Actor runtime exited\n' 'Actor runtime exiting\n'
f'{op_nested_actor_repr}\n' f'>)\n'
f'|_{actor}\n'
) )
log.info(teardown_report) log.info(teardown_report)

View File

@ -40,10 +40,8 @@ from ._state import current_actor, is_main_process
from .log import get_logger, get_loglevel from .log import get_logger, get_loglevel
from ._runtime import Actor from ._runtime import Actor
from ._portal import Portal from ._portal import Portal
from .trionics import (
is_multi_cancelled,
)
from ._exceptions import ( from ._exceptions import (
is_multi_cancelled,
ContextCancelled, ContextCancelled,
) )
from ._root import ( from ._root import (

View File

@ -59,7 +59,7 @@ from tractor._state import (
debug_mode, debug_mode,
) )
from tractor.log import get_logger from tractor.log import get_logger
from tractor.trionics import ( from tractor._exceptions import (
is_multi_cancelled, is_multi_cancelled,
) )
from ._trace import ( from ._trace import (

View File

@ -328,8 +328,6 @@ def nest_from_op(
# NOTE: so move back-from-the-left of the `input_op` by # NOTE: so move back-from-the-left of the `input_op` by
# this amount. # this amount.
back_from_op: int = 0, back_from_op: int = 0,
nest_prefix: str = ''
) -> str: ) -> str:
''' '''
Depth-increment the input (presumably hierarchy/supervision) Depth-increment the input (presumably hierarchy/supervision)
@ -338,22 +336,15 @@ def nest_from_op(
`tree_str` to nest content aligned with the ops last char. `tree_str` to nest content aligned with the ops last char.
''' '''
indented_tree_str: str = textwrap.indent(
tree_str,
prefix=' ' *(
len(input_op)
-
(back_from_op + 1)
),
)
# inject any provided nesting-prefix chars
# into the head of the first line.
if nest_prefix:
indented_tree_str: str = (
f'{nest_prefix}'
f'{indented_tree_str[len(nest_prefix):]}'
)
return ( return (
f'{input_op}\n' f'{input_op}\n'
f'{indented_tree_str}' +
textwrap.indent(
tree_str,
prefix=(
len(input_op)
-
(back_from_op + 1)
) * ' ',
)
) )

View File

@ -38,6 +38,7 @@ from typing import (
import tractor import tractor
from tractor._exceptions import ( from tractor._exceptions import (
InternalError, InternalError,
is_multi_cancelled,
TrioTaskExited, TrioTaskExited,
TrioCancelled, TrioCancelled,
AsyncioTaskExited, AsyncioTaskExited,
@ -58,9 +59,6 @@ from tractor.log import (
# from tractor.msg import ( # from tractor.msg import (
# pretty_struct, # pretty_struct,
# ) # )
from tractor.trionics import (
is_multi_cancelled,
)
from tractor.trionics._broadcast import ( from tractor.trionics._broadcast import (
broadcast_receiver, broadcast_receiver,
BroadcastReceiver, BroadcastReceiver,

View File

@ -32,5 +32,4 @@ from ._broadcast import (
from ._beg import ( from ._beg import (
collapse_eg as collapse_eg, collapse_eg as collapse_eg,
maybe_collapse_eg as maybe_collapse_eg, maybe_collapse_eg as maybe_collapse_eg,
is_multi_cancelled as is_multi_cancelled,
) )

View File

@ -22,11 +22,6 @@ first-class-`trio` from a historical perspective B)
from contextlib import ( from contextlib import (
asynccontextmanager as acm, asynccontextmanager as acm,
) )
from typing import (
Literal,
)
import trio
def maybe_collapse_eg( def maybe_collapse_eg(
@ -61,62 +56,3 @@ async def collapse_eg():
raise exc raise exc
raise beg raise beg
def is_multi_cancelled(
beg: BaseException|BaseExceptionGroup,
ignore_nested: set[BaseException] = set(),
) -> Literal[False]|BaseExceptionGroup:
'''
Predicate to determine if an `BaseExceptionGroup` only contains
some (maybe nested) set of sub-grouped exceptions (like only
`trio.Cancelled`s which get swallowed silently by default) and is
thus the result of "gracefully cancelling" a collection of
sub-tasks (or other conc primitives) and receiving a "cancelled
ACK" from each after termination.
Docs:
----
- https://docs.python.org/3/library/exceptions.html#exception-groups
- https://docs.python.org/3/library/exceptions.html#BaseExceptionGroup.subgroup
'''
if (
not ignore_nested
or
trio.Cancelled not in ignore_nested
# XXX always count-in `trio`'s native signal
):
ignore_nested.update({trio.Cancelled})
if isinstance(beg, BaseExceptionGroup):
# https://docs.python.org/3/library/exceptions.html#BaseExceptionGroup.subgroup
# |_ "The condition can be an exception type or tuple of
# exception types, in which case each exception is checked
# for a match using the same check that is used in an
# except clause. The condition can also be a callable
# (other than a type object) that accepts an exception as
# its single argument and returns true for the exceptions
# that should be in the subgroup."
matched_exc: BaseExceptionGroup|None = beg.subgroup(
tuple(ignore_nested),
# ??TODO, complain about why not allowed to use
# named arg style calling???
# XD .. wtf?
# condition=tuple(ignore_nested),
)
if matched_exc is not None:
return matched_exc
# NOTE, IFF no excs types match (throughout the error-tree)
# -> return `False`, OW return the matched sub-eg.
#
# IOW, for the inverse of ^ for the purpose of
# maybe-enter-REPL--logic: "only debug when the err-tree contains
# at least one exc-type NOT in `ignore_nested`" ; i.e. the case where
# we fallthrough and return `False` here.
return False

View File

@ -40,8 +40,6 @@ from typing import (
import trio import trio
from tractor._state import current_actor from tractor._state import current_actor
from tractor.log import get_logger from tractor.log import get_logger
# from ._beg import collapse_eg
if TYPE_CHECKING: if TYPE_CHECKING:
from tractor import ActorNursery from tractor import ActorNursery
@ -113,19 +111,17 @@ async def gather_contexts(
None, None,
]: ]:
''' '''
Concurrently enter a sequence of async context managers (`acm`s), Concurrently enter a sequence of async context managers (acms),
each scheduled in a separate `trio.Task` and deliver their each from a separate `trio` task and deliver the unwrapped
unwrapped `yield`-ed values in the same order once all `@acm`s `yield`-ed values in the same order once all managers have entered.
in every task have entered.
On exit, all `acm`s are subsequently and concurrently exited with On exit, all acms are subsequently and concurrently exited.
**no order guarantees**.
This function is somewhat similar to a batch of non-blocking This function is somewhat similar to a batch of non-blocking
calls to `contextlib.AsyncExitStack.enter_async_context()` calls to `contextlib.AsyncExitStack.enter_async_context()`
(inside a loop) *in combo with* a `asyncio.gather()` to get the (inside a loop) *in combo with* a `asyncio.gather()` to get the
`.__aenter__()`-ed values, except the managers are both `.__aenter__()`-ed values, except the managers are both
concurrently entered and exited and *cancellation-just-works*. concurrently entered and exited and *cancellation just works*(R).
''' '''
seed: int = id(mngrs) seed: int = id(mngrs)
@ -145,20 +141,16 @@ async def gather_contexts(
if not mngrs: if not mngrs:
raise ValueError( raise ValueError(
'`.trionics.gather_contexts()` input mngrs is empty?\n' '`.trionics.gather_contexts()` input mngrs is empty?\n'
'\n'
'Did try to use inline generator syntax?\n' 'Did try to use inline generator syntax?\n'
'Use a non-lazy iterator or sequence-type intead!\n' 'Use a non-lazy iterator or sequence type intead!'
) )
async with ( async with trio.open_nursery(
# collapse_eg(), strict_exception_groups=False,
trio.open_nursery( # ^XXX^ TODO? soo roll our own then ??
# strict_exception_groups=False, # -> since we kinda want the "if only one `.exception` then
# ^XXX^ TODO? soo roll our own then ?? # just raise that" interface?
# -> since we kinda want the "if only one `.exception` then ) as tn:
# just raise that" interface?
) as tn,
):
for mngr in mngrs: for mngr in mngrs:
tn.start_soon( tn.start_soon(
_enter_and_wait, _enter_and_wait,
@ -175,7 +167,7 @@ async def gather_contexts(
try: try:
yield tuple(unwrapped.values()) yield tuple(unwrapped.values())
finally: finally:
# XXX NOTE: this is ABSOLUTELY REQUIRED to avoid # NOTE: this is ABSOLUTELY REQUIRED to avoid
# the following wacky bug: # the following wacky bug:
# <tractorbugurlhere> # <tractorbugurlhere>
parent_exit.set() parent_exit.set()