Compare commits
4 Commits
7b05547fcc
...
00583b7671
Author | SHA1 | Date |
---|---|---|
|
00583b7671 | |
|
78beeebe8f | |
|
333fde39ad | |
|
502c7a1dc6 |
|
@ -252,7 +252,7 @@ def test_simple_context(
|
||||||
pass
|
pass
|
||||||
except BaseExceptionGroup as beg:
|
except BaseExceptionGroup as beg:
|
||||||
# XXX: on windows it seems we may have to expect the group error
|
# XXX: on windows it seems we may have to expect the group error
|
||||||
from tractor._exceptions import is_multi_cancelled
|
from tractor.trionics import is_multi_cancelled
|
||||||
assert is_multi_cancelled(beg)
|
assert is_multi_cancelled(beg)
|
||||||
else:
|
else:
|
||||||
trio.run(main)
|
trio.run(main)
|
||||||
|
|
|
@ -1069,9 +1069,25 @@ class Context:
|
||||||
|RemoteActorError # stream overrun caused and ignored by us
|
|RemoteActorError # stream overrun caused and ignored by us
|
||||||
):
|
):
|
||||||
'''
|
'''
|
||||||
Maybe raise a remote error depending on the type of error
|
Maybe raise a remote error depending on the type of error and
|
||||||
and *who* (i.e. which task from which actor) requested
|
*who*, i.e. which side of the task pair across actors,
|
||||||
a cancellation (if any).
|
requested a cancellation (if any).
|
||||||
|
|
||||||
|
Depending on the input config-params suppress raising
|
||||||
|
certain remote excs:
|
||||||
|
|
||||||
|
- if `remote_error: ContextCancelled` (ctxc) AND this side's
|
||||||
|
task is the "requester", it at somem point called
|
||||||
|
`Context.cancel()`, then the peer's ctxc is treated
|
||||||
|
as a "cancel ack".
|
||||||
|
|
||||||
|
|_ this behaves exactly like how `trio.Nursery.cancel_scope`
|
||||||
|
absorbs any `BaseExceptionGroup[trio.Cancelled]` wherein the
|
||||||
|
owning parent task never will raise a `trio.Cancelled`
|
||||||
|
if `CancelScope.cancel_called == True`.
|
||||||
|
|
||||||
|
- `remote_error: StreamOverrrun` (overrun) AND
|
||||||
|
`raise_overrun_from_self` is set.
|
||||||
|
|
||||||
'''
|
'''
|
||||||
__tracebackhide__: bool = hide_tb
|
__tracebackhide__: bool = hide_tb
|
||||||
|
@ -1113,18 +1129,19 @@ class Context:
|
||||||
# for this ^, NO right?
|
# for this ^, NO right?
|
||||||
|
|
||||||
) or (
|
) or (
|
||||||
# NOTE: whenever this context is the cause of an
|
# NOTE: whenever this side is the cause of an
|
||||||
# overrun on the remote side (aka we sent msgs too
|
# overrun on the peer side, i.e. we sent msgs too
|
||||||
# fast that the remote task was overrun according
|
# fast and the peer task was overrun according
|
||||||
# to `MsgStream` buffer settings) AND the caller
|
# to `MsgStream` buffer settings, AND this was
|
||||||
# has requested to not raise overruns this side
|
# called with `raise_overrun_from_self=True` (the
|
||||||
# caused, we also silently absorb any remotely
|
# default), silently absorb any `StreamOverrun`.
|
||||||
# boxed `StreamOverrun`. This is mostly useful for
|
#
|
||||||
# supressing such faults during
|
# XXX, this is namely useful for supressing such faults
|
||||||
# cancellation/error/final-result handling inside
|
# during cancellation/error/final-result handling inside
|
||||||
# `msg._ops.drain_to_final_msg()` such that we do not
|
# `.msg._ops.drain_to_final_msg()` such that we do not
|
||||||
# raise such errors particularly in the case where
|
# raise during a cancellation-request, i.e. when
|
||||||
# `._cancel_called == True`.
|
# `._cancel_called == True`.
|
||||||
|
#
|
||||||
not raise_overrun_from_self
|
not raise_overrun_from_self
|
||||||
and isinstance(remote_error, RemoteActorError)
|
and isinstance(remote_error, RemoteActorError)
|
||||||
and remote_error.boxed_type is StreamOverrun
|
and remote_error.boxed_type is StreamOverrun
|
||||||
|
|
|
@ -1246,55 +1246,6 @@ def unpack_error(
|
||||||
return exc
|
return exc
|
||||||
|
|
||||||
|
|
||||||
def is_multi_cancelled(
|
|
||||||
exc: BaseException|BaseExceptionGroup,
|
|
||||||
|
|
||||||
ignore_nested: set[BaseException] = set(),
|
|
||||||
|
|
||||||
) -> bool|BaseExceptionGroup:
|
|
||||||
'''
|
|
||||||
Predicate to determine if an `BaseExceptionGroup` only contains
|
|
||||||
some (maybe nested) set of sub-grouped exceptions (like only
|
|
||||||
`trio.Cancelled`s which get swallowed silently by default) and is
|
|
||||||
thus the result of "gracefully cancelling" a collection of
|
|
||||||
sub-tasks (or other conc primitives) and receiving a "cancelled
|
|
||||||
ACK" from each after termination.
|
|
||||||
|
|
||||||
Docs:
|
|
||||||
----
|
|
||||||
- https://docs.python.org/3/library/exceptions.html#exception-groups
|
|
||||||
- https://docs.python.org/3/library/exceptions.html#BaseExceptionGroup.subgroup
|
|
||||||
|
|
||||||
'''
|
|
||||||
|
|
||||||
if (
|
|
||||||
not ignore_nested
|
|
||||||
or
|
|
||||||
trio.Cancelled in ignore_nested
|
|
||||||
# XXX always count-in `trio`'s native signal
|
|
||||||
):
|
|
||||||
ignore_nested.update({trio.Cancelled})
|
|
||||||
|
|
||||||
if isinstance(exc, BaseExceptionGroup):
|
|
||||||
matched_exc: BaseExceptionGroup|None = exc.subgroup(
|
|
||||||
tuple(ignore_nested),
|
|
||||||
|
|
||||||
# TODO, complain about why not allowed XD
|
|
||||||
# condition=tuple(ignore_nested),
|
|
||||||
)
|
|
||||||
if matched_exc is not None:
|
|
||||||
return matched_exc
|
|
||||||
|
|
||||||
# NOTE, IFF no excs types match (throughout the error-tree)
|
|
||||||
# -> return `False`, OW return the matched sub-eg.
|
|
||||||
#
|
|
||||||
# IOW, for the inverse of ^ for the purpose of
|
|
||||||
# maybe-enter-REPL--logic: "only debug when the err-tree contains
|
|
||||||
# at least one exc-type NOT in `ignore_nested`" ; i.e. the case where
|
|
||||||
# we fallthrough and return `False` here.
|
|
||||||
return False
|
|
||||||
|
|
||||||
|
|
||||||
def _raise_from_unexpected_msg(
|
def _raise_from_unexpected_msg(
|
||||||
ctx: Context,
|
ctx: Context,
|
||||||
msg: MsgType,
|
msg: MsgType,
|
||||||
|
|
|
@ -47,6 +47,7 @@ from ._runtime import (
|
||||||
from .devx import (
|
from .devx import (
|
||||||
debug,
|
debug,
|
||||||
_frame_stack,
|
_frame_stack,
|
||||||
|
pformat as _pformat,
|
||||||
)
|
)
|
||||||
from . import _spawn
|
from . import _spawn
|
||||||
from . import _state
|
from . import _state
|
||||||
|
@ -61,9 +62,11 @@ from ._addr import (
|
||||||
mk_uuid,
|
mk_uuid,
|
||||||
wrap_address,
|
wrap_address,
|
||||||
)
|
)
|
||||||
|
from .trionics import (
|
||||||
|
is_multi_cancelled,
|
||||||
|
)
|
||||||
from ._exceptions import (
|
from ._exceptions import (
|
||||||
RuntimeFailure,
|
RuntimeFailure,
|
||||||
is_multi_cancelled,
|
|
||||||
)
|
)
|
||||||
|
|
||||||
|
|
||||||
|
@ -511,10 +514,14 @@ async def open_root_actor(
|
||||||
# for an in nurseries:
|
# for an in nurseries:
|
||||||
# tempn.start_soon(an.exited.wait)
|
# tempn.start_soon(an.exited.wait)
|
||||||
|
|
||||||
|
op_nested_actor_repr: str = _pformat.nest_from_op(
|
||||||
|
input_op='>) ',
|
||||||
|
tree_str=actor.pformat(),
|
||||||
|
nest_prefix='|_',
|
||||||
|
)
|
||||||
logger.info(
|
logger.info(
|
||||||
f'Closing down root actor\n'
|
f'Closing down root actor\n'
|
||||||
f'>)\n'
|
f'{op_nested_actor_repr}\n'
|
||||||
f'|_{actor}\n'
|
|
||||||
)
|
)
|
||||||
await actor.cancel(None) # self cancel
|
await actor.cancel(None) # self cancel
|
||||||
finally:
|
finally:
|
||||||
|
|
|
@ -55,6 +55,7 @@ from typing import (
|
||||||
TYPE_CHECKING,
|
TYPE_CHECKING,
|
||||||
)
|
)
|
||||||
import uuid
|
import uuid
|
||||||
|
import textwrap
|
||||||
from types import ModuleType
|
from types import ModuleType
|
||||||
import warnings
|
import warnings
|
||||||
|
|
||||||
|
@ -97,7 +98,10 @@ from ._exceptions import (
|
||||||
MsgTypeError,
|
MsgTypeError,
|
||||||
unpack_error,
|
unpack_error,
|
||||||
)
|
)
|
||||||
from .devx import debug
|
from .devx import (
|
||||||
|
debug,
|
||||||
|
pformat as _pformat
|
||||||
|
)
|
||||||
from ._discovery import get_registry
|
from ._discovery import get_registry
|
||||||
from ._portal import Portal
|
from ._portal import Portal
|
||||||
from . import _state
|
from . import _state
|
||||||
|
@ -339,46 +343,76 @@ class Actor:
|
||||||
def pid(self) -> int:
|
def pid(self) -> int:
|
||||||
return self._aid.pid
|
return self._aid.pid
|
||||||
|
|
||||||
def pformat(self) -> str:
|
def pformat(
|
||||||
ds: str = '='
|
self,
|
||||||
|
ds: str = ':',
|
||||||
|
indent: int = 0,
|
||||||
|
) -> str:
|
||||||
|
fields_sect_prefix: str = ' |_'
|
||||||
parent_uid: tuple|None = None
|
parent_uid: tuple|None = None
|
||||||
if rent_chan := self._parent_chan:
|
if rent_chan := self._parent_chan:
|
||||||
parent_uid = rent_chan.uid
|
parent_uid = rent_chan.uid
|
||||||
|
|
||||||
peers: list = []
|
peers: list = []
|
||||||
server: _server.IPCServer = self.ipc_server
|
server: _server.IPCServer = self.ipc_server
|
||||||
|
ipc_server_sect: str = ''
|
||||||
if server:
|
if server:
|
||||||
peers: list[tuple] = list(server._peer_connected)
|
peers: list[tuple] = list(server._peer_connected)
|
||||||
|
|
||||||
|
# create field ln as a key-header indented under
|
||||||
|
# and up to the section's key prefix.
|
||||||
|
# field_ln_header: str = textwrap.indent(
|
||||||
|
# text=f"ipc_server{ds}",
|
||||||
|
# prefix=' '*len(fields_sect_prefix),
|
||||||
|
# )
|
||||||
|
# ^XXX if we were to indent `repr(Server)` to
|
||||||
|
# '<key>: '
|
||||||
|
# _here_^
|
||||||
|
server_repr: str = textwrap.indent(
|
||||||
|
text=self._ipc_server.pformat(),
|
||||||
|
# prefix=' '*len(field_ln_header),
|
||||||
|
prefix=' '*len(fields_sect_prefix),
|
||||||
|
)
|
||||||
|
ipc_server_sect: str = (
|
||||||
|
# f'{field_ln_header}\n'
|
||||||
|
f'{server_repr}'
|
||||||
|
)
|
||||||
|
|
||||||
fmtstr: str = (
|
fmtstr: str = (
|
||||||
f' |_id: {self.aid!r}\n'
|
f' |_id: {self.aid!r}\n'
|
||||||
# f" aid{ds}{self.aid!r}\n"
|
# f" aid{ds}{self.aid!r}\n"
|
||||||
f" parent{ds}{parent_uid}\n"
|
f" parent{ds}{parent_uid}\n"
|
||||||
f'\n'
|
# f'\n'
|
||||||
f' |_ipc: {len(peers)!r} connected peers\n'
|
f' |_ipc: {len(peers)!r} connected peers\n'
|
||||||
f" peers{ds}{peers!r}\n"
|
f" peers{ds}{peers!r}\n"
|
||||||
f" ipc_server{ds}{self._ipc_server}\n"
|
f"{ipc_server_sect}"
|
||||||
f'\n'
|
# f'\n'
|
||||||
f' |_rpc: {len(self._rpc_tasks)} tasks\n'
|
f' |_rpc: {len(self._rpc_tasks)} tasks\n'
|
||||||
f" ctxs{ds}{len(self._contexts)}\n"
|
f" ctxs{ds}{len(self._contexts)}\n"
|
||||||
f'\n'
|
# f'\n'
|
||||||
f' |_runtime: ._task{ds}{self._task!r}\n'
|
f' |_runtime: ._task{ds}{self._task!r}\n'
|
||||||
f' _spawn_method{ds}{self._spawn_method}\n'
|
f' _spawn_method{ds}{self._spawn_method}\n'
|
||||||
f' _actoruid2nursery{ds}{self._actoruid2nursery}\n'
|
f' _actoruid2nursery{ds}{self._actoruid2nursery}\n'
|
||||||
f' _forkserver_info{ds}{self._forkserver_info}\n'
|
f' _forkserver_info{ds}{self._forkserver_info}\n'
|
||||||
f'\n'
|
# f'\n'
|
||||||
f' |_state: "TODO: .repr_state()"\n'
|
f' |_state: "TODO: .repr_state()"\n'
|
||||||
f' _cancel_complete{ds}{self._cancel_complete}\n'
|
f' _cancel_complete{ds}{self._cancel_complete}\n'
|
||||||
f' _cancel_called_by_remote{ds}{self._cancel_called_by_remote}\n'
|
f' _cancel_called_by_remote{ds}{self._cancel_called_by_remote}\n'
|
||||||
f' _cancel_called{ds}{self._cancel_called}\n'
|
f' _cancel_called{ds}{self._cancel_called}\n'
|
||||||
)
|
)
|
||||||
return (
|
_repr: str = (
|
||||||
'<Actor(\n'
|
'<Actor(\n'
|
||||||
+
|
+
|
||||||
fmtstr
|
fmtstr
|
||||||
+
|
+
|
||||||
')>\n'
|
')>\n'
|
||||||
)
|
)
|
||||||
|
if indent:
|
||||||
|
_repr: str = textwrap.indent(
|
||||||
|
text=_repr,
|
||||||
|
prefix=' '*indent,
|
||||||
|
)
|
||||||
|
return _repr
|
||||||
|
|
||||||
__repr__ = pformat
|
__repr__ = pformat
|
||||||
|
|
||||||
|
@ -1654,10 +1688,15 @@ async def async_main(
|
||||||
'-> All peer channels are complete\n'
|
'-> All peer channels are complete\n'
|
||||||
)
|
)
|
||||||
|
|
||||||
|
op_nested_actor_repr: str = _pformat.nest_from_op(
|
||||||
|
input_op=')> ',
|
||||||
|
tree_str=actor.pformat(),
|
||||||
|
nest_prefix='|_',
|
||||||
|
back_from_op=2,
|
||||||
|
)
|
||||||
teardown_report += (
|
teardown_report += (
|
||||||
'Actor runtime exiting\n'
|
'Actor runtime exited\n'
|
||||||
f'>)\n'
|
f'{op_nested_actor_repr}\n'
|
||||||
f'|_{actor}\n'
|
|
||||||
)
|
)
|
||||||
log.info(teardown_report)
|
log.info(teardown_report)
|
||||||
|
|
||||||
|
|
|
@ -40,8 +40,10 @@ from ._state import current_actor, is_main_process
|
||||||
from .log import get_logger, get_loglevel
|
from .log import get_logger, get_loglevel
|
||||||
from ._runtime import Actor
|
from ._runtime import Actor
|
||||||
from ._portal import Portal
|
from ._portal import Portal
|
||||||
from ._exceptions import (
|
from .trionics import (
|
||||||
is_multi_cancelled,
|
is_multi_cancelled,
|
||||||
|
)
|
||||||
|
from ._exceptions import (
|
||||||
ContextCancelled,
|
ContextCancelled,
|
||||||
)
|
)
|
||||||
from ._root import (
|
from ._root import (
|
||||||
|
|
|
@ -59,7 +59,7 @@ from tractor._state import (
|
||||||
debug_mode,
|
debug_mode,
|
||||||
)
|
)
|
||||||
from tractor.log import get_logger
|
from tractor.log import get_logger
|
||||||
from tractor._exceptions import (
|
from tractor.trionics import (
|
||||||
is_multi_cancelled,
|
is_multi_cancelled,
|
||||||
)
|
)
|
||||||
from ._trace import (
|
from ._trace import (
|
||||||
|
|
|
@ -328,6 +328,8 @@ def nest_from_op(
|
||||||
# NOTE: so move back-from-the-left of the `input_op` by
|
# NOTE: so move back-from-the-left of the `input_op` by
|
||||||
# this amount.
|
# this amount.
|
||||||
back_from_op: int = 0,
|
back_from_op: int = 0,
|
||||||
|
nest_prefix: str = ''
|
||||||
|
|
||||||
) -> str:
|
) -> str:
|
||||||
'''
|
'''
|
||||||
Depth-increment the input (presumably hierarchy/supervision)
|
Depth-increment the input (presumably hierarchy/supervision)
|
||||||
|
@ -336,15 +338,22 @@ def nest_from_op(
|
||||||
`tree_str` to nest content aligned with the ops last char.
|
`tree_str` to nest content aligned with the ops last char.
|
||||||
|
|
||||||
'''
|
'''
|
||||||
|
indented_tree_str: str = textwrap.indent(
|
||||||
|
tree_str,
|
||||||
|
prefix=' ' *(
|
||||||
|
len(input_op)
|
||||||
|
-
|
||||||
|
(back_from_op + 1)
|
||||||
|
),
|
||||||
|
)
|
||||||
|
# inject any provided nesting-prefix chars
|
||||||
|
# into the head of the first line.
|
||||||
|
if nest_prefix:
|
||||||
|
indented_tree_str: str = (
|
||||||
|
f'{nest_prefix}'
|
||||||
|
f'{indented_tree_str[len(nest_prefix):]}'
|
||||||
|
)
|
||||||
return (
|
return (
|
||||||
f'{input_op}\n'
|
f'{input_op}\n'
|
||||||
+
|
f'{indented_tree_str}'
|
||||||
textwrap.indent(
|
|
||||||
tree_str,
|
|
||||||
prefix=(
|
|
||||||
len(input_op)
|
|
||||||
-
|
|
||||||
(back_from_op + 1)
|
|
||||||
) * ' ',
|
|
||||||
)
|
|
||||||
)
|
)
|
||||||
|
|
|
@ -38,7 +38,6 @@ from typing import (
|
||||||
import tractor
|
import tractor
|
||||||
from tractor._exceptions import (
|
from tractor._exceptions import (
|
||||||
InternalError,
|
InternalError,
|
||||||
is_multi_cancelled,
|
|
||||||
TrioTaskExited,
|
TrioTaskExited,
|
||||||
TrioCancelled,
|
TrioCancelled,
|
||||||
AsyncioTaskExited,
|
AsyncioTaskExited,
|
||||||
|
@ -59,6 +58,9 @@ from tractor.log import (
|
||||||
# from tractor.msg import (
|
# from tractor.msg import (
|
||||||
# pretty_struct,
|
# pretty_struct,
|
||||||
# )
|
# )
|
||||||
|
from tractor.trionics import (
|
||||||
|
is_multi_cancelled,
|
||||||
|
)
|
||||||
from tractor.trionics._broadcast import (
|
from tractor.trionics._broadcast import (
|
||||||
broadcast_receiver,
|
broadcast_receiver,
|
||||||
BroadcastReceiver,
|
BroadcastReceiver,
|
||||||
|
|
|
@ -32,4 +32,5 @@ from ._broadcast import (
|
||||||
from ._beg import (
|
from ._beg import (
|
||||||
collapse_eg as collapse_eg,
|
collapse_eg as collapse_eg,
|
||||||
maybe_collapse_eg as maybe_collapse_eg,
|
maybe_collapse_eg as maybe_collapse_eg,
|
||||||
|
is_multi_cancelled as is_multi_cancelled,
|
||||||
)
|
)
|
||||||
|
|
|
@ -22,6 +22,11 @@ first-class-`trio` from a historical perspective B)
|
||||||
from contextlib import (
|
from contextlib import (
|
||||||
asynccontextmanager as acm,
|
asynccontextmanager as acm,
|
||||||
)
|
)
|
||||||
|
from typing import (
|
||||||
|
Literal,
|
||||||
|
)
|
||||||
|
|
||||||
|
import trio
|
||||||
|
|
||||||
|
|
||||||
def maybe_collapse_eg(
|
def maybe_collapse_eg(
|
||||||
|
@ -56,3 +61,62 @@ async def collapse_eg():
|
||||||
raise exc
|
raise exc
|
||||||
|
|
||||||
raise beg
|
raise beg
|
||||||
|
|
||||||
|
|
||||||
|
def is_multi_cancelled(
|
||||||
|
beg: BaseException|BaseExceptionGroup,
|
||||||
|
|
||||||
|
ignore_nested: set[BaseException] = set(),
|
||||||
|
|
||||||
|
) -> Literal[False]|BaseExceptionGroup:
|
||||||
|
'''
|
||||||
|
Predicate to determine if an `BaseExceptionGroup` only contains
|
||||||
|
some (maybe nested) set of sub-grouped exceptions (like only
|
||||||
|
`trio.Cancelled`s which get swallowed silently by default) and is
|
||||||
|
thus the result of "gracefully cancelling" a collection of
|
||||||
|
sub-tasks (or other conc primitives) and receiving a "cancelled
|
||||||
|
ACK" from each after termination.
|
||||||
|
|
||||||
|
Docs:
|
||||||
|
----
|
||||||
|
- https://docs.python.org/3/library/exceptions.html#exception-groups
|
||||||
|
- https://docs.python.org/3/library/exceptions.html#BaseExceptionGroup.subgroup
|
||||||
|
|
||||||
|
'''
|
||||||
|
|
||||||
|
if (
|
||||||
|
not ignore_nested
|
||||||
|
or
|
||||||
|
trio.Cancelled not in ignore_nested
|
||||||
|
# XXX always count-in `trio`'s native signal
|
||||||
|
):
|
||||||
|
ignore_nested.update({trio.Cancelled})
|
||||||
|
|
||||||
|
if isinstance(beg, BaseExceptionGroup):
|
||||||
|
# https://docs.python.org/3/library/exceptions.html#BaseExceptionGroup.subgroup
|
||||||
|
# |_ "The condition can be an exception type or tuple of
|
||||||
|
# exception types, in which case each exception is checked
|
||||||
|
# for a match using the same check that is used in an
|
||||||
|
# except clause. The condition can also be a callable
|
||||||
|
# (other than a type object) that accepts an exception as
|
||||||
|
# its single argument and returns true for the exceptions
|
||||||
|
# that should be in the subgroup."
|
||||||
|
matched_exc: BaseExceptionGroup|None = beg.subgroup(
|
||||||
|
tuple(ignore_nested),
|
||||||
|
|
||||||
|
# ??TODO, complain about why not allowed to use
|
||||||
|
# named arg style calling???
|
||||||
|
# XD .. wtf?
|
||||||
|
# condition=tuple(ignore_nested),
|
||||||
|
)
|
||||||
|
if matched_exc is not None:
|
||||||
|
return matched_exc
|
||||||
|
|
||||||
|
# NOTE, IFF no excs types match (throughout the error-tree)
|
||||||
|
# -> return `False`, OW return the matched sub-eg.
|
||||||
|
#
|
||||||
|
# IOW, for the inverse of ^ for the purpose of
|
||||||
|
# maybe-enter-REPL--logic: "only debug when the err-tree contains
|
||||||
|
# at least one exc-type NOT in `ignore_nested`" ; i.e. the case where
|
||||||
|
# we fallthrough and return `False` here.
|
||||||
|
return False
|
||||||
|
|
|
@ -40,6 +40,8 @@ from typing import (
|
||||||
import trio
|
import trio
|
||||||
from tractor._state import current_actor
|
from tractor._state import current_actor
|
||||||
from tractor.log import get_logger
|
from tractor.log import get_logger
|
||||||
|
# from ._beg import collapse_eg
|
||||||
|
|
||||||
|
|
||||||
if TYPE_CHECKING:
|
if TYPE_CHECKING:
|
||||||
from tractor import ActorNursery
|
from tractor import ActorNursery
|
||||||
|
@ -111,17 +113,19 @@ async def gather_contexts(
|
||||||
None,
|
None,
|
||||||
]:
|
]:
|
||||||
'''
|
'''
|
||||||
Concurrently enter a sequence of async context managers (acms),
|
Concurrently enter a sequence of async context managers (`acm`s),
|
||||||
each from a separate `trio` task and deliver the unwrapped
|
each scheduled in a separate `trio.Task` and deliver their
|
||||||
`yield`-ed values in the same order once all managers have entered.
|
unwrapped `yield`-ed values in the same order once all `@acm`s
|
||||||
|
in every task have entered.
|
||||||
|
|
||||||
On exit, all acms are subsequently and concurrently exited.
|
On exit, all `acm`s are subsequently and concurrently exited with
|
||||||
|
**no order guarantees**.
|
||||||
|
|
||||||
This function is somewhat similar to a batch of non-blocking
|
This function is somewhat similar to a batch of non-blocking
|
||||||
calls to `contextlib.AsyncExitStack.enter_async_context()`
|
calls to `contextlib.AsyncExitStack.enter_async_context()`
|
||||||
(inside a loop) *in combo with* a `asyncio.gather()` to get the
|
(inside a loop) *in combo with* a `asyncio.gather()` to get the
|
||||||
`.__aenter__()`-ed values, except the managers are both
|
`.__aenter__()`-ed values, except the managers are both
|
||||||
concurrently entered and exited and *cancellation just works*(R).
|
concurrently entered and exited and *cancellation-just-works™*.
|
||||||
|
|
||||||
'''
|
'''
|
||||||
seed: int = id(mngrs)
|
seed: int = id(mngrs)
|
||||||
|
@ -141,16 +145,20 @@ async def gather_contexts(
|
||||||
if not mngrs:
|
if not mngrs:
|
||||||
raise ValueError(
|
raise ValueError(
|
||||||
'`.trionics.gather_contexts()` input mngrs is empty?\n'
|
'`.trionics.gather_contexts()` input mngrs is empty?\n'
|
||||||
|
'\n'
|
||||||
'Did try to use inline generator syntax?\n'
|
'Did try to use inline generator syntax?\n'
|
||||||
'Use a non-lazy iterator or sequence type intead!'
|
'Use a non-lazy iterator or sequence-type intead!\n'
|
||||||
)
|
)
|
||||||
|
|
||||||
async with trio.open_nursery(
|
async with (
|
||||||
strict_exception_groups=False,
|
# collapse_eg(),
|
||||||
# ^XXX^ TODO? soo roll our own then ??
|
trio.open_nursery(
|
||||||
# -> since we kinda want the "if only one `.exception` then
|
# strict_exception_groups=False,
|
||||||
# just raise that" interface?
|
# ^XXX^ TODO? soo roll our own then ??
|
||||||
) as tn:
|
# -> since we kinda want the "if only one `.exception` then
|
||||||
|
# just raise that" interface?
|
||||||
|
) as tn,
|
||||||
|
):
|
||||||
for mngr in mngrs:
|
for mngr in mngrs:
|
||||||
tn.start_soon(
|
tn.start_soon(
|
||||||
_enter_and_wait,
|
_enter_and_wait,
|
||||||
|
@ -167,7 +175,7 @@ async def gather_contexts(
|
||||||
try:
|
try:
|
||||||
yield tuple(unwrapped.values())
|
yield tuple(unwrapped.values())
|
||||||
finally:
|
finally:
|
||||||
# NOTE: this is ABSOLUTELY REQUIRED to avoid
|
# XXX NOTE: this is ABSOLUTELY REQUIRED to avoid
|
||||||
# the following wacky bug:
|
# the following wacky bug:
|
||||||
# <tractorbugurlhere>
|
# <tractorbugurlhere>
|
||||||
parent_exit.set()
|
parent_exit.set()
|
||||||
|
|
Loading…
Reference in New Issue