Merge pull request #387 from goodboy/the_finally_footgun
Coping with "`finally` footguns": avoiding `trio.Cancelled` exc masking as best we can..main
commit
cd16748598
|
@ -252,7 +252,7 @@ def test_simple_context(
|
||||||
pass
|
pass
|
||||||
except BaseExceptionGroup as beg:
|
except BaseExceptionGroup as beg:
|
||||||
# XXX: on windows it seems we may have to expect the group error
|
# XXX: on windows it seems we may have to expect the group error
|
||||||
from tractor._exceptions import is_multi_cancelled
|
from tractor.trionics import is_multi_cancelled
|
||||||
assert is_multi_cancelled(beg)
|
assert is_multi_cancelled(beg)
|
||||||
else:
|
else:
|
||||||
trio.run(main)
|
trio.run(main)
|
||||||
|
|
|
@ -0,0 +1,237 @@
|
||||||
|
'''
|
||||||
|
Special case testing for issues not (dis)covered in the primary
|
||||||
|
`Context` related functional/scenario suites.
|
||||||
|
|
||||||
|
**NOTE: this mod is a WIP** space for handling
|
||||||
|
odd/rare/undiscovered/not-yet-revealed faults which either
|
||||||
|
loudly (ideal case) breakl our supervision protocol
|
||||||
|
or (worst case) result in distributed sys hangs.
|
||||||
|
|
||||||
|
Suites here further try to clarify (if [partially] ill-defined) and
|
||||||
|
verify our edge case semantics for inter-actor-relayed-exceptions
|
||||||
|
including,
|
||||||
|
|
||||||
|
- lowlevel: what remote obj-data is interchanged for IPC and what is
|
||||||
|
native-obj form is expected from unpacking in the the new
|
||||||
|
mem-domain.
|
||||||
|
|
||||||
|
- which kinds of `RemoteActorError` (and its derivs) are expected by which
|
||||||
|
(types of) peers (parent, child, sibling, etc) with what
|
||||||
|
particular meta-data set such as,
|
||||||
|
|
||||||
|
- `.src_uid`: the original (maybe) peer who raised.
|
||||||
|
- `.relay_uid`: the next-hop-peer who sent it.
|
||||||
|
- `.relay_path`: the sequence of peer actor hops.
|
||||||
|
- `.is_inception`: a predicate that denotes multi-hop remote errors.
|
||||||
|
|
||||||
|
- when should `ExceptionGroup`s be relayed from a particular
|
||||||
|
remote endpoint, they should never be caused by implicit `._rpc`
|
||||||
|
nursery machinery!
|
||||||
|
|
||||||
|
- various special `trio` edge cases around its cancellation semantics
|
||||||
|
and how we (currently) leverage `trio.Cancelled` as a signal for
|
||||||
|
whether a `Context` task should raise `ContextCancelled` (ctx).
|
||||||
|
|
||||||
|
'''
|
||||||
|
import pytest
|
||||||
|
import trio
|
||||||
|
import tractor
|
||||||
|
from tractor import ( # typing
|
||||||
|
ActorNursery,
|
||||||
|
Portal,
|
||||||
|
Context,
|
||||||
|
ContextCancelled,
|
||||||
|
)
|
||||||
|
|
||||||
|
|
||||||
|
@tractor.context
|
||||||
|
async def sleep_n_chkpt_in_finally(
|
||||||
|
ctx: Context,
|
||||||
|
sleep_n_raise: bool,
|
||||||
|
|
||||||
|
chld_raise_delay: float,
|
||||||
|
chld_finally_delay: float,
|
||||||
|
|
||||||
|
rent_cancels: bool,
|
||||||
|
rent_ctxc_delay: float,
|
||||||
|
|
||||||
|
expect_exc: str|None = None,
|
||||||
|
|
||||||
|
) -> None:
|
||||||
|
'''
|
||||||
|
Sync, open a tn, then wait for cancel, run a chkpt inside
|
||||||
|
the user's `finally:` teardown.
|
||||||
|
|
||||||
|
This covers a footgun case that `trio` core doesn't seem to care about
|
||||||
|
wherein an exc can be masked by a `trio.Cancelled` raised inside a tn emedded
|
||||||
|
`finally:`.
|
||||||
|
|
||||||
|
Also see `test_trioisms::test_acm_embedded_nursery_propagates_enter_err`
|
||||||
|
for the down and gritty details.
|
||||||
|
|
||||||
|
Since a `@context` endpoint fn can also contain code like this,
|
||||||
|
**and** bc we currently have no easy way other then
|
||||||
|
`trio.Cancelled` to signal cancellation on each side of an IPC `Context`,
|
||||||
|
the footgun issue can compound itself as demonstrated in this suite..
|
||||||
|
|
||||||
|
Here are some edge cases codified with our WIP "sclang" syntax
|
||||||
|
(note the parent(rent)/child(chld) naming here is just
|
||||||
|
pragmatism, generally these most of these cases can occurr
|
||||||
|
regardless of the distributed-task's supervision hiearchy),
|
||||||
|
|
||||||
|
- rent c)=> chld.raises-then-taskc-in-finally
|
||||||
|
|_ chld's body raises an `exc: BaseException`.
|
||||||
|
_ in its `finally:` block it runs a chkpoint
|
||||||
|
which raises a taskc (`trio.Cancelled`) which
|
||||||
|
masks `exc` instead raising taskc up to the first tn.
|
||||||
|
_ the embedded/chld tn captures the masking taskc and then
|
||||||
|
raises it up to the ._rpc-ep-tn instead of `exc`.
|
||||||
|
_ the rent thinks the child ctxc-ed instead of errored..
|
||||||
|
|
||||||
|
'''
|
||||||
|
await ctx.started()
|
||||||
|
|
||||||
|
if expect_exc:
|
||||||
|
expect_exc: BaseException = tractor._exceptions.get_err_type(
|
||||||
|
type_name=expect_exc,
|
||||||
|
)
|
||||||
|
|
||||||
|
berr: BaseException|None = None
|
||||||
|
try:
|
||||||
|
if not sleep_n_raise:
|
||||||
|
await trio.sleep_forever()
|
||||||
|
elif sleep_n_raise:
|
||||||
|
|
||||||
|
# XXX this sleep is less then the sleep the parent
|
||||||
|
# does before calling `ctx.cancel()`
|
||||||
|
await trio.sleep(chld_raise_delay)
|
||||||
|
|
||||||
|
# XXX this will be masked by a taskc raised in
|
||||||
|
# the `finally:` if this fn doesn't terminate
|
||||||
|
# before any ctxc-req arrives AND a checkpoint is hit
|
||||||
|
# in that `finally:`.
|
||||||
|
raise RuntimeError('my app krurshed..')
|
||||||
|
|
||||||
|
except BaseException as _berr:
|
||||||
|
berr = _berr
|
||||||
|
|
||||||
|
# TODO: it'd sure be nice to be able to inject our own
|
||||||
|
# `ContextCancelled` here instead of of `trio.Cancelled`
|
||||||
|
# so that our runtime can expect it and this "user code"
|
||||||
|
# would be able to tell the diff between a generic trio
|
||||||
|
# cancel and a tractor runtime-IPC cancel.
|
||||||
|
if expect_exc:
|
||||||
|
if not isinstance(
|
||||||
|
berr,
|
||||||
|
expect_exc,
|
||||||
|
):
|
||||||
|
raise ValueError(
|
||||||
|
f'Unexpected exc type ??\n'
|
||||||
|
f'{berr!r}\n'
|
||||||
|
f'\n'
|
||||||
|
f'Expected a {expect_exc!r}\n'
|
||||||
|
)
|
||||||
|
|
||||||
|
raise berr
|
||||||
|
|
||||||
|
# simulate what user code might try even though
|
||||||
|
# it's a known boo-boo..
|
||||||
|
finally:
|
||||||
|
# maybe wait for rent ctxc to arrive
|
||||||
|
with trio.CancelScope(shield=True):
|
||||||
|
await trio.sleep(chld_finally_delay)
|
||||||
|
|
||||||
|
# !!XXX this will raise `trio.Cancelled` which
|
||||||
|
# will mask the RTE from above!!!
|
||||||
|
#
|
||||||
|
# YES, it's the same case as our extant
|
||||||
|
# `test_trioisms::test_acm_embedded_nursery_propagates_enter_err`
|
||||||
|
try:
|
||||||
|
await trio.lowlevel.checkpoint()
|
||||||
|
except trio.Cancelled as taskc:
|
||||||
|
if (scope_err := taskc.__context__):
|
||||||
|
print(
|
||||||
|
f'XXX MASKED REMOTE ERROR XXX\n'
|
||||||
|
f'ENDPOINT exception -> {scope_err!r}\n'
|
||||||
|
f'will be masked by -> {taskc!r}\n'
|
||||||
|
)
|
||||||
|
# await tractor.pause(shield=True)
|
||||||
|
|
||||||
|
raise taskc
|
||||||
|
|
||||||
|
|
||||||
|
@pytest.mark.parametrize(
|
||||||
|
'chld_callspec',
|
||||||
|
[
|
||||||
|
dict(
|
||||||
|
sleep_n_raise=None,
|
||||||
|
chld_raise_delay=0.1,
|
||||||
|
chld_finally_delay=0.1,
|
||||||
|
expect_exc='Cancelled',
|
||||||
|
rent_cancels=True,
|
||||||
|
rent_ctxc_delay=0.1,
|
||||||
|
),
|
||||||
|
dict(
|
||||||
|
sleep_n_raise='RuntimeError',
|
||||||
|
chld_raise_delay=0.1,
|
||||||
|
chld_finally_delay=1,
|
||||||
|
expect_exc='RuntimeError',
|
||||||
|
rent_cancels=False,
|
||||||
|
rent_ctxc_delay=0.1,
|
||||||
|
),
|
||||||
|
],
|
||||||
|
ids=lambda item: f'chld_callspec={item!r}'
|
||||||
|
)
|
||||||
|
def test_unmasked_remote_exc(
|
||||||
|
debug_mode: bool,
|
||||||
|
chld_callspec: dict,
|
||||||
|
tpt_proto: str,
|
||||||
|
):
|
||||||
|
expect_exc_str: str|None = chld_callspec['sleep_n_raise']
|
||||||
|
rent_ctxc_delay: float|None = chld_callspec['rent_ctxc_delay']
|
||||||
|
async def main():
|
||||||
|
an: ActorNursery
|
||||||
|
async with tractor.open_nursery(
|
||||||
|
debug_mode=debug_mode,
|
||||||
|
enable_transports=[tpt_proto],
|
||||||
|
) as an:
|
||||||
|
ptl: Portal = await an.start_actor(
|
||||||
|
'cancellee',
|
||||||
|
enable_modules=[__name__],
|
||||||
|
)
|
||||||
|
ctx: Context
|
||||||
|
async with (
|
||||||
|
ptl.open_context(
|
||||||
|
sleep_n_chkpt_in_finally,
|
||||||
|
**chld_callspec,
|
||||||
|
) as (ctx, sent),
|
||||||
|
):
|
||||||
|
assert not sent
|
||||||
|
await trio.sleep(rent_ctxc_delay)
|
||||||
|
await ctx.cancel()
|
||||||
|
|
||||||
|
# recv error or result from chld
|
||||||
|
ctxc: ContextCancelled = await ctx.wait_for_result()
|
||||||
|
assert (
|
||||||
|
ctxc is ctx.outcome
|
||||||
|
and
|
||||||
|
isinstance(ctxc, ContextCancelled)
|
||||||
|
)
|
||||||
|
|
||||||
|
# always graceful terminate the sub in non-error cases
|
||||||
|
await an.cancel()
|
||||||
|
|
||||||
|
if expect_exc_str:
|
||||||
|
expect_exc: BaseException = tractor._exceptions.get_err_type(
|
||||||
|
type_name=expect_exc_str,
|
||||||
|
)
|
||||||
|
with pytest.raises(
|
||||||
|
expected_exception=tractor.RemoteActorError,
|
||||||
|
) as excinfo:
|
||||||
|
trio.run(main)
|
||||||
|
|
||||||
|
rae = excinfo.value
|
||||||
|
assert expect_exc == rae.boxed_type
|
||||||
|
|
||||||
|
else:
|
||||||
|
trio.run(main)
|
|
@ -112,55 +112,11 @@ def test_acm_embedded_nursery_propagates_enter_err(
|
||||||
'''
|
'''
|
||||||
import tractor
|
import tractor
|
||||||
|
|
||||||
@acm
|
|
||||||
async def maybe_raise_from_masking_exc(
|
|
||||||
tn: trio.Nursery,
|
|
||||||
unmask_from: BaseException|None = trio.Cancelled
|
|
||||||
|
|
||||||
# TODO, maybe offer a collection?
|
|
||||||
# unmask_from: set[BaseException] = {
|
|
||||||
# trio.Cancelled,
|
|
||||||
# },
|
|
||||||
):
|
|
||||||
if not unmask_from:
|
|
||||||
yield
|
|
||||||
return
|
|
||||||
|
|
||||||
try:
|
|
||||||
yield
|
|
||||||
except* unmask_from as be_eg:
|
|
||||||
|
|
||||||
# TODO, if we offer `unmask_from: set`
|
|
||||||
# for masker_exc_type in unmask_from:
|
|
||||||
|
|
||||||
matches, rest = be_eg.split(unmask_from)
|
|
||||||
if not matches:
|
|
||||||
raise
|
|
||||||
|
|
||||||
for exc_match in be_eg.exceptions:
|
|
||||||
if (
|
|
||||||
(exc_ctx := exc_match.__context__)
|
|
||||||
and
|
|
||||||
type(exc_ctx) not in {
|
|
||||||
# trio.Cancelled, # always by default?
|
|
||||||
unmask_from,
|
|
||||||
}
|
|
||||||
):
|
|
||||||
exc_ctx.add_note(
|
|
||||||
f'\n'
|
|
||||||
f'WARNING: the above error was masked by a {unmask_from!r} !?!\n'
|
|
||||||
f'Are you always cancelling? Say from a `finally:` ?\n\n'
|
|
||||||
|
|
||||||
f'{tn!r}'
|
|
||||||
)
|
|
||||||
raise exc_ctx from exc_match
|
|
||||||
|
|
||||||
|
|
||||||
@acm
|
@acm
|
||||||
async def wraps_tn_that_always_cancels():
|
async def wraps_tn_that_always_cancels():
|
||||||
async with (
|
async with (
|
||||||
trio.open_nursery() as tn,
|
trio.open_nursery() as tn,
|
||||||
maybe_raise_from_masking_exc(
|
tractor.trionics.maybe_raise_from_masking_exc(
|
||||||
tn=tn,
|
tn=tn,
|
||||||
unmask_from=(
|
unmask_from=(
|
||||||
trio.Cancelled
|
trio.Cancelled
|
||||||
|
@ -202,3 +158,60 @@ def test_acm_embedded_nursery_propagates_enter_err(
|
||||||
assert_eg, rest_eg = eg.split(AssertionError)
|
assert_eg, rest_eg = eg.split(AssertionError)
|
||||||
|
|
||||||
assert len(assert_eg.exceptions) == 1
|
assert len(assert_eg.exceptions) == 1
|
||||||
|
|
||||||
|
|
||||||
|
|
||||||
|
def test_gatherctxs_with_memchan_breaks_multicancelled(
|
||||||
|
debug_mode: bool,
|
||||||
|
):
|
||||||
|
'''
|
||||||
|
Demo how a using an `async with sndchan` inside a `.trionics.gather_contexts()` task
|
||||||
|
will break a strict-eg-tn's multi-cancelled absorption..
|
||||||
|
|
||||||
|
'''
|
||||||
|
from tractor import (
|
||||||
|
trionics,
|
||||||
|
)
|
||||||
|
|
||||||
|
@acm
|
||||||
|
async def open_memchan() -> trio.abc.ReceiveChannel:
|
||||||
|
|
||||||
|
task: trio.Task = trio.lowlevel.current_task()
|
||||||
|
print(
|
||||||
|
f'Opening {task!r}\n'
|
||||||
|
)
|
||||||
|
|
||||||
|
# 1 to force eager sending
|
||||||
|
send, recv = trio.open_memory_channel(16)
|
||||||
|
|
||||||
|
try:
|
||||||
|
async with send:
|
||||||
|
yield recv
|
||||||
|
finally:
|
||||||
|
print(
|
||||||
|
f'Closed {task!r}\n'
|
||||||
|
)
|
||||||
|
|
||||||
|
|
||||||
|
async def main():
|
||||||
|
async with (
|
||||||
|
# XXX should ensure ONLY the KBI
|
||||||
|
# is relayed upward
|
||||||
|
trionics.collapse_eg(),
|
||||||
|
trio.open_nursery(
|
||||||
|
# strict_exception_groups=False,
|
||||||
|
), # as tn,
|
||||||
|
|
||||||
|
trionics.gather_contexts([
|
||||||
|
open_memchan(),
|
||||||
|
open_memchan(),
|
||||||
|
]) as recv_chans,
|
||||||
|
):
|
||||||
|
assert len(recv_chans) == 2
|
||||||
|
|
||||||
|
await trio.sleep(1)
|
||||||
|
raise KeyboardInterrupt
|
||||||
|
# tn.cancel_scope.cancel()
|
||||||
|
|
||||||
|
with pytest.raises(KeyboardInterrupt):
|
||||||
|
trio.run(main)
|
||||||
|
|
|
@ -1246,55 +1246,6 @@ def unpack_error(
|
||||||
return exc
|
return exc
|
||||||
|
|
||||||
|
|
||||||
def is_multi_cancelled(
|
|
||||||
exc: BaseException|BaseExceptionGroup,
|
|
||||||
|
|
||||||
ignore_nested: set[BaseException] = set(),
|
|
||||||
|
|
||||||
) -> bool|BaseExceptionGroup:
|
|
||||||
'''
|
|
||||||
Predicate to determine if an `BaseExceptionGroup` only contains
|
|
||||||
some (maybe nested) set of sub-grouped exceptions (like only
|
|
||||||
`trio.Cancelled`s which get swallowed silently by default) and is
|
|
||||||
thus the result of "gracefully cancelling" a collection of
|
|
||||||
sub-tasks (or other conc primitives) and receiving a "cancelled
|
|
||||||
ACK" from each after termination.
|
|
||||||
|
|
||||||
Docs:
|
|
||||||
----
|
|
||||||
- https://docs.python.org/3/library/exceptions.html#exception-groups
|
|
||||||
- https://docs.python.org/3/library/exceptions.html#BaseExceptionGroup.subgroup
|
|
||||||
|
|
||||||
'''
|
|
||||||
|
|
||||||
if (
|
|
||||||
not ignore_nested
|
|
||||||
or
|
|
||||||
trio.Cancelled in ignore_nested
|
|
||||||
# XXX always count-in `trio`'s native signal
|
|
||||||
):
|
|
||||||
ignore_nested.update({trio.Cancelled})
|
|
||||||
|
|
||||||
if isinstance(exc, BaseExceptionGroup):
|
|
||||||
matched_exc: BaseExceptionGroup|None = exc.subgroup(
|
|
||||||
tuple(ignore_nested),
|
|
||||||
|
|
||||||
# TODO, complain about why not allowed XD
|
|
||||||
# condition=tuple(ignore_nested),
|
|
||||||
)
|
|
||||||
if matched_exc is not None:
|
|
||||||
return matched_exc
|
|
||||||
|
|
||||||
# NOTE, IFF no excs types match (throughout the error-tree)
|
|
||||||
# -> return `False`, OW return the matched sub-eg.
|
|
||||||
#
|
|
||||||
# IOW, for the inverse of ^ for the purpose of
|
|
||||||
# maybe-enter-REPL--logic: "only debug when the err-tree contains
|
|
||||||
# at least one exc-type NOT in `ignore_nested`" ; i.e. the case where
|
|
||||||
# we fallthrough and return `False` here.
|
|
||||||
return False
|
|
||||||
|
|
||||||
|
|
||||||
def _raise_from_unexpected_msg(
|
def _raise_from_unexpected_msg(
|
||||||
ctx: Context,
|
ctx: Context,
|
||||||
msg: MsgType,
|
msg: MsgType,
|
||||||
|
|
|
@ -61,9 +61,11 @@ from ._addr import (
|
||||||
mk_uuid,
|
mk_uuid,
|
||||||
wrap_address,
|
wrap_address,
|
||||||
)
|
)
|
||||||
|
from .trionics import (
|
||||||
|
is_multi_cancelled,
|
||||||
|
)
|
||||||
from ._exceptions import (
|
from ._exceptions import (
|
||||||
RuntimeFailure,
|
RuntimeFailure,
|
||||||
is_multi_cancelled,
|
|
||||||
)
|
)
|
||||||
|
|
||||||
|
|
||||||
|
|
|
@ -37,6 +37,7 @@ import warnings
|
||||||
|
|
||||||
import trio
|
import trio
|
||||||
from trio import (
|
from trio import (
|
||||||
|
Cancelled,
|
||||||
CancelScope,
|
CancelScope,
|
||||||
Nursery,
|
Nursery,
|
||||||
TaskStatus,
|
TaskStatus,
|
||||||
|
@ -52,10 +53,14 @@ from ._exceptions import (
|
||||||
ModuleNotExposed,
|
ModuleNotExposed,
|
||||||
MsgTypeError,
|
MsgTypeError,
|
||||||
TransportClosed,
|
TransportClosed,
|
||||||
is_multi_cancelled,
|
|
||||||
pack_error,
|
pack_error,
|
||||||
unpack_error,
|
unpack_error,
|
||||||
)
|
)
|
||||||
|
from .trionics import (
|
||||||
|
collapse_eg,
|
||||||
|
is_multi_cancelled,
|
||||||
|
maybe_raise_from_masking_exc,
|
||||||
|
)
|
||||||
from .devx import (
|
from .devx import (
|
||||||
debug,
|
debug,
|
||||||
add_div,
|
add_div,
|
||||||
|
@ -250,7 +255,7 @@ async def _errors_relayed_via_ipc(
|
||||||
ctx: Context,
|
ctx: Context,
|
||||||
is_rpc: bool,
|
is_rpc: bool,
|
||||||
|
|
||||||
hide_tb: bool = False,
|
hide_tb: bool = True,
|
||||||
debug_kbis: bool = False,
|
debug_kbis: bool = False,
|
||||||
task_status: TaskStatus[
|
task_status: TaskStatus[
|
||||||
Context | BaseException
|
Context | BaseException
|
||||||
|
@ -375,9 +380,9 @@ async def _errors_relayed_via_ipc(
|
||||||
# they can be individually ccancelled.
|
# they can be individually ccancelled.
|
||||||
finally:
|
finally:
|
||||||
|
|
||||||
# if the error is not from user code and instead a failure
|
# if the error is not from user code and instead a failure of
|
||||||
# of a runtime RPC or transport failure we do prolly want to
|
# an internal-runtime-RPC or IPC-connection, we do (prolly) want
|
||||||
# show this frame
|
# to show this frame!
|
||||||
if (
|
if (
|
||||||
rpc_err
|
rpc_err
|
||||||
and (
|
and (
|
||||||
|
@ -616,32 +621,40 @@ async def _invoke(
|
||||||
# -> the below scope is never exposed to the
|
# -> the below scope is never exposed to the
|
||||||
# `@context` marked RPC function.
|
# `@context` marked RPC function.
|
||||||
# - `._portal` is never set.
|
# - `._portal` is never set.
|
||||||
|
scope_err: BaseException|None = None
|
||||||
try:
|
try:
|
||||||
tn: trio.Nursery
|
# TODO: better `trionics` primitive/tooling usage here!
|
||||||
|
# -[ ] should would be nice to have our `TaskMngr`
|
||||||
|
# nursery here!
|
||||||
|
# -[ ] payload value checking like we do with
|
||||||
|
# `.started()` such that the debbuger can engage
|
||||||
|
# here in the child task instead of waiting for the
|
||||||
|
# parent to crash with it's own MTE..
|
||||||
|
#
|
||||||
|
tn: Nursery
|
||||||
rpc_ctx_cs: CancelScope
|
rpc_ctx_cs: CancelScope
|
||||||
async with (
|
async with (
|
||||||
trio.open_nursery(
|
collapse_eg(),
|
||||||
strict_exception_groups=False,
|
trio.open_nursery() as tn,
|
||||||
# ^XXX^ TODO? instead unpack any RAE as per "loose" style?
|
|
||||||
|
|
||||||
) as tn,
|
|
||||||
msgops.maybe_limit_plds(
|
msgops.maybe_limit_plds(
|
||||||
ctx=ctx,
|
ctx=ctx,
|
||||||
spec=ctx_meta.get('pld_spec'),
|
spec=ctx_meta.get('pld_spec'),
|
||||||
dec_hook=ctx_meta.get('dec_hook'),
|
dec_hook=ctx_meta.get('dec_hook'),
|
||||||
),
|
),
|
||||||
|
|
||||||
|
# XXX NOTE, this being the "most embedded"
|
||||||
|
# scope ensures unasking of the `await coro` below
|
||||||
|
# *should* never be interfered with!!
|
||||||
|
maybe_raise_from_masking_exc(
|
||||||
|
tn=tn,
|
||||||
|
unmask_from=Cancelled,
|
||||||
|
) as _mbme, # maybe boxed masked exc
|
||||||
):
|
):
|
||||||
ctx._scope_nursery = tn
|
ctx._scope_nursery = tn
|
||||||
rpc_ctx_cs = ctx._scope = tn.cancel_scope
|
rpc_ctx_cs = ctx._scope = tn.cancel_scope
|
||||||
task_status.started(ctx)
|
task_status.started(ctx)
|
||||||
|
|
||||||
# TODO: better `trionics` tooling:
|
# invoke user endpoint fn.
|
||||||
# -[ ] should would be nice to have our `TaskMngr`
|
|
||||||
# nursery here!
|
|
||||||
# -[ ] payload value checking like we do with
|
|
||||||
# `.started()` such that the debbuger can engage
|
|
||||||
# here in the child task instead of waiting for the
|
|
||||||
# parent to crash with it's own MTE..
|
|
||||||
res: Any|PayloadT = await coro
|
res: Any|PayloadT = await coro
|
||||||
return_msg: Return|CancelAck = return_msg_type(
|
return_msg: Return|CancelAck = return_msg_type(
|
||||||
cid=cid,
|
cid=cid,
|
||||||
|
@ -744,38 +757,48 @@ async def _invoke(
|
||||||
BaseException,
|
BaseException,
|
||||||
trio.Cancelled,
|
trio.Cancelled,
|
||||||
|
|
||||||
) as scope_error:
|
) as _scope_err:
|
||||||
|
scope_err = _scope_err
|
||||||
if (
|
if (
|
||||||
isinstance(scope_error, RuntimeError)
|
isinstance(scope_err, RuntimeError)
|
||||||
and scope_error.args
|
and
|
||||||
and 'Cancel scope stack corrupted' in scope_error.args[0]
|
scope_err.args
|
||||||
|
and
|
||||||
|
'Cancel scope stack corrupted' in scope_err.args[0]
|
||||||
):
|
):
|
||||||
log.exception('Cancel scope stack corrupted!?\n')
|
log.exception('Cancel scope stack corrupted!?\n')
|
||||||
# debug.mk_pdb().set_trace()
|
# debug.mk_pdb().set_trace()
|
||||||
|
|
||||||
# always set this (child) side's exception as the
|
# always set this (child) side's exception as the
|
||||||
# local error on the context
|
# local error on the context
|
||||||
ctx._local_error: BaseException = scope_error
|
ctx._local_error: BaseException = scope_err
|
||||||
# ^-TODO-^ question,
|
# ^-TODO-^ question,
|
||||||
# does this matter other then for
|
# does this matter other then for
|
||||||
# consistentcy/testing?
|
# consistentcy/testing?
|
||||||
# |_ no user code should be in this scope at this point
|
# |_ no user code should be in this scope at this point
|
||||||
# AND we already set this in the block below?
|
# AND we already set this in the block below?
|
||||||
|
|
||||||
# if a remote error was set then likely the
|
# XXX if a remote error was set then likely the
|
||||||
# exception group was raised due to that, so
|
# exc group was raised due to that, so
|
||||||
# and we instead raise that error immediately!
|
# and we instead raise that error immediately!
|
||||||
ctx.maybe_raise()
|
maybe_re: (
|
||||||
|
ContextCancelled|RemoteActorError
|
||||||
|
) = ctx.maybe_raise()
|
||||||
|
if maybe_re:
|
||||||
|
log.cancel(
|
||||||
|
f'Suppressing remote-exc from peer,\n'
|
||||||
|
f'{maybe_re!r}\n'
|
||||||
|
)
|
||||||
|
|
||||||
# maybe TODO: pack in come kinda
|
# maybe TODO: pack in come kinda
|
||||||
# `trio.Cancelled.__traceback__` here so they can be
|
# `trio.Cancelled.__traceback__` here so they can be
|
||||||
# unwrapped and displayed on the caller side? no se..
|
# unwrapped and displayed on the caller side? no se..
|
||||||
raise
|
raise scope_err
|
||||||
|
|
||||||
# `@context` entrypoint task bookeeping.
|
# `@context` entrypoint task bookeeping.
|
||||||
# i.e. only pop the context tracking if used ;)
|
# i.e. only pop the context tracking if used ;)
|
||||||
finally:
|
finally:
|
||||||
assert chan.uid
|
assert chan.aid
|
||||||
|
|
||||||
# don't pop the local context until we know the
|
# don't pop the local context until we know the
|
||||||
# associated child isn't in debug any more
|
# associated child isn't in debug any more
|
||||||
|
@ -802,6 +825,9 @@ async def _invoke(
|
||||||
descr_str += (
|
descr_str += (
|
||||||
f'\n{merr!r}\n' # needed?
|
f'\n{merr!r}\n' # needed?
|
||||||
f'{tb_str}\n'
|
f'{tb_str}\n'
|
||||||
|
f'\n'
|
||||||
|
f'scope_error:\n'
|
||||||
|
f'{scope_err!r}\n'
|
||||||
)
|
)
|
||||||
else:
|
else:
|
||||||
descr_str += f'\n{merr!r}\n'
|
descr_str += f'\n{merr!r}\n'
|
||||||
|
|
|
@ -40,8 +40,10 @@ from ._state import current_actor, is_main_process
|
||||||
from .log import get_logger, get_loglevel
|
from .log import get_logger, get_loglevel
|
||||||
from ._runtime import Actor
|
from ._runtime import Actor
|
||||||
from ._portal import Portal
|
from ._portal import Portal
|
||||||
from ._exceptions import (
|
from .trionics import (
|
||||||
is_multi_cancelled,
|
is_multi_cancelled,
|
||||||
|
)
|
||||||
|
from ._exceptions import (
|
||||||
ContextCancelled,
|
ContextCancelled,
|
||||||
)
|
)
|
||||||
from ._root import (
|
from ._root import (
|
||||||
|
|
|
@ -59,7 +59,7 @@ from tractor._state import (
|
||||||
debug_mode,
|
debug_mode,
|
||||||
)
|
)
|
||||||
from tractor.log import get_logger
|
from tractor.log import get_logger
|
||||||
from tractor._exceptions import (
|
from tractor.trionics import (
|
||||||
is_multi_cancelled,
|
is_multi_cancelled,
|
||||||
)
|
)
|
||||||
from ._trace import (
|
from ._trace import (
|
||||||
|
|
|
@ -38,7 +38,6 @@ from typing import (
|
||||||
import tractor
|
import tractor
|
||||||
from tractor._exceptions import (
|
from tractor._exceptions import (
|
||||||
InternalError,
|
InternalError,
|
||||||
is_multi_cancelled,
|
|
||||||
TrioTaskExited,
|
TrioTaskExited,
|
||||||
TrioCancelled,
|
TrioCancelled,
|
||||||
AsyncioTaskExited,
|
AsyncioTaskExited,
|
||||||
|
@ -59,6 +58,9 @@ from tractor.log import (
|
||||||
# from tractor.msg import (
|
# from tractor.msg import (
|
||||||
# pretty_struct,
|
# pretty_struct,
|
||||||
# )
|
# )
|
||||||
|
from tractor.trionics import (
|
||||||
|
is_multi_cancelled,
|
||||||
|
)
|
||||||
from tractor.trionics._broadcast import (
|
from tractor.trionics._broadcast import (
|
||||||
broadcast_receiver,
|
broadcast_receiver,
|
||||||
BroadcastReceiver,
|
BroadcastReceiver,
|
||||||
|
|
|
@ -32,4 +32,8 @@ from ._broadcast import (
|
||||||
from ._beg import (
|
from ._beg import (
|
||||||
collapse_eg as collapse_eg,
|
collapse_eg as collapse_eg,
|
||||||
maybe_collapse_eg as maybe_collapse_eg,
|
maybe_collapse_eg as maybe_collapse_eg,
|
||||||
|
is_multi_cancelled as is_multi_cancelled,
|
||||||
|
)
|
||||||
|
from ._taskc import (
|
||||||
|
maybe_raise_from_masking_exc as maybe_raise_from_masking_exc,
|
||||||
)
|
)
|
||||||
|
|
|
@ -22,6 +22,11 @@ first-class-`trio` from a historical perspective B)
|
||||||
from contextlib import (
|
from contextlib import (
|
||||||
asynccontextmanager as acm,
|
asynccontextmanager as acm,
|
||||||
)
|
)
|
||||||
|
from typing import (
|
||||||
|
Literal,
|
||||||
|
)
|
||||||
|
|
||||||
|
import trio
|
||||||
|
|
||||||
|
|
||||||
def maybe_collapse_eg(
|
def maybe_collapse_eg(
|
||||||
|
@ -56,3 +61,62 @@ async def collapse_eg():
|
||||||
raise exc
|
raise exc
|
||||||
|
|
||||||
raise beg
|
raise beg
|
||||||
|
|
||||||
|
|
||||||
|
def is_multi_cancelled(
|
||||||
|
beg: BaseException|BaseExceptionGroup,
|
||||||
|
|
||||||
|
ignore_nested: set[BaseException] = set(),
|
||||||
|
|
||||||
|
) -> Literal[False]|BaseExceptionGroup:
|
||||||
|
'''
|
||||||
|
Predicate to determine if an `BaseExceptionGroup` only contains
|
||||||
|
some (maybe nested) set of sub-grouped exceptions (like only
|
||||||
|
`trio.Cancelled`s which get swallowed silently by default) and is
|
||||||
|
thus the result of "gracefully cancelling" a collection of
|
||||||
|
sub-tasks (or other conc primitives) and receiving a "cancelled
|
||||||
|
ACK" from each after termination.
|
||||||
|
|
||||||
|
Docs:
|
||||||
|
----
|
||||||
|
- https://docs.python.org/3/library/exceptions.html#exception-groups
|
||||||
|
- https://docs.python.org/3/library/exceptions.html#BaseExceptionGroup.subgroup
|
||||||
|
|
||||||
|
'''
|
||||||
|
|
||||||
|
if (
|
||||||
|
not ignore_nested
|
||||||
|
or
|
||||||
|
trio.Cancelled not in ignore_nested
|
||||||
|
# XXX always count-in `trio`'s native signal
|
||||||
|
):
|
||||||
|
ignore_nested.update({trio.Cancelled})
|
||||||
|
|
||||||
|
if isinstance(beg, BaseExceptionGroup):
|
||||||
|
# https://docs.python.org/3/library/exceptions.html#BaseExceptionGroup.subgroup
|
||||||
|
# |_ "The condition can be an exception type or tuple of
|
||||||
|
# exception types, in which case each exception is checked
|
||||||
|
# for a match using the same check that is used in an
|
||||||
|
# except clause. The condition can also be a callable
|
||||||
|
# (other than a type object) that accepts an exception as
|
||||||
|
# its single argument and returns true for the exceptions
|
||||||
|
# that should be in the subgroup."
|
||||||
|
matched_exc: BaseExceptionGroup|None = beg.subgroup(
|
||||||
|
tuple(ignore_nested),
|
||||||
|
|
||||||
|
# ??TODO, complain about why not allowed to use
|
||||||
|
# named arg style calling???
|
||||||
|
# XD .. wtf?
|
||||||
|
# condition=tuple(ignore_nested),
|
||||||
|
)
|
||||||
|
if matched_exc is not None:
|
||||||
|
return matched_exc
|
||||||
|
|
||||||
|
# NOTE, IFF no excs types match (throughout the error-tree)
|
||||||
|
# -> return `False`, OW return the matched sub-eg.
|
||||||
|
#
|
||||||
|
# IOW, for the inverse of ^ for the purpose of
|
||||||
|
# maybe-enter-REPL--logic: "only debug when the err-tree contains
|
||||||
|
# at least one exc-type NOT in `ignore_nested`" ; i.e. the case where
|
||||||
|
# we fallthrough and return `False` here.
|
||||||
|
return False
|
||||||
|
|
|
@ -40,6 +40,8 @@ from typing import (
|
||||||
import trio
|
import trio
|
||||||
from tractor._state import current_actor
|
from tractor._state import current_actor
|
||||||
from tractor.log import get_logger
|
from tractor.log import get_logger
|
||||||
|
# from ._beg import collapse_eg
|
||||||
|
|
||||||
|
|
||||||
if TYPE_CHECKING:
|
if TYPE_CHECKING:
|
||||||
from tractor import ActorNursery
|
from tractor import ActorNursery
|
||||||
|
@ -112,17 +114,19 @@ async def gather_contexts(
|
||||||
None,
|
None,
|
||||||
]:
|
]:
|
||||||
'''
|
'''
|
||||||
Concurrently enter a sequence of async context managers (acms),
|
Concurrently enter a sequence of async context managers (`acm`s),
|
||||||
each from a separate `trio` task and deliver the unwrapped
|
each scheduled in a separate `trio.Task` and deliver their
|
||||||
`yield`-ed values in the same order once all managers have entered.
|
unwrapped `yield`-ed values in the same order once all `@acm`s
|
||||||
|
in every task have entered.
|
||||||
|
|
||||||
On exit, all acms are subsequently and concurrently exited.
|
On exit, all `acm`s are subsequently and concurrently exited with
|
||||||
|
**no order guarantees**.
|
||||||
|
|
||||||
This function is somewhat similar to a batch of non-blocking
|
This function is somewhat similar to a batch of non-blocking
|
||||||
calls to `contextlib.AsyncExitStack.enter_async_context()`
|
calls to `contextlib.AsyncExitStack.enter_async_context()`
|
||||||
(inside a loop) *in combo with* a `asyncio.gather()` to get the
|
(inside a loop) *in combo with* a `asyncio.gather()` to get the
|
||||||
`.__aenter__()`-ed values, except the managers are both
|
`.__aenter__()`-ed values, except the managers are both
|
||||||
concurrently entered and exited and *cancellation just works*(R).
|
concurrently entered and exited and *cancellation-just-works™*.
|
||||||
|
|
||||||
'''
|
'''
|
||||||
seed: int = id(mngrs)
|
seed: int = id(mngrs)
|
||||||
|
@ -142,16 +146,20 @@ async def gather_contexts(
|
||||||
if not mngrs:
|
if not mngrs:
|
||||||
raise ValueError(
|
raise ValueError(
|
||||||
'`.trionics.gather_contexts()` input mngrs is empty?\n'
|
'`.trionics.gather_contexts()` input mngrs is empty?\n'
|
||||||
|
'\n'
|
||||||
'Did try to use inline generator syntax?\n'
|
'Did try to use inline generator syntax?\n'
|
||||||
'Use a non-lazy iterator or sequence type intead!'
|
'Use a non-lazy iterator or sequence-type intead!\n'
|
||||||
)
|
)
|
||||||
|
|
||||||
async with trio.open_nursery(
|
async with (
|
||||||
strict_exception_groups=False,
|
# collapse_eg(),
|
||||||
# ^XXX^ TODO? soo roll our own then ??
|
trio.open_nursery(
|
||||||
# -> since we kinda want the "if only one `.exception` then
|
strict_exception_groups=False,
|
||||||
# just raise that" interface?
|
# ^XXX^ TODO? soo roll our own then ??
|
||||||
) as tn:
|
# -> since we kinda want the "if only one `.exception` then
|
||||||
|
# just raise that" interface?
|
||||||
|
) as tn,
|
||||||
|
):
|
||||||
for mngr in mngrs:
|
for mngr in mngrs:
|
||||||
tn.start_soon(
|
tn.start_soon(
|
||||||
_enter_and_wait,
|
_enter_and_wait,
|
||||||
|
@ -168,7 +176,7 @@ async def gather_contexts(
|
||||||
try:
|
try:
|
||||||
yield tuple(unwrapped.values())
|
yield tuple(unwrapped.values())
|
||||||
finally:
|
finally:
|
||||||
# NOTE: this is ABSOLUTELY REQUIRED to avoid
|
# XXX NOTE: this is ABSOLUTELY REQUIRED to avoid
|
||||||
# the following wacky bug:
|
# the following wacky bug:
|
||||||
# <tractorbugurlhere>
|
# <tractorbugurlhere>
|
||||||
parent_exit.set()
|
parent_exit.set()
|
||||||
|
|
|
@ -0,0 +1,184 @@
|
||||||
|
# tractor: structured concurrent "actors".
|
||||||
|
# Copyright 2018-eternity Tyler Goodlet.
|
||||||
|
|
||||||
|
# This program is free software: you can redistribute it and/or modify
|
||||||
|
# it under the terms of the GNU Affero General Public License as published by
|
||||||
|
# the Free Software Foundation, either version 3 of the License, or
|
||||||
|
# (at your option) any later version.
|
||||||
|
|
||||||
|
# This program is distributed in the hope that it will be useful,
|
||||||
|
# but WITHOUT ANY WARRANTY; without even the implied warranty of
|
||||||
|
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
|
||||||
|
# GNU Affero General Public License for more details.
|
||||||
|
|
||||||
|
# You should have received a copy of the GNU Affero General Public License
|
||||||
|
# along with this program. If not, see <https://www.gnu.org/licenses/>.
|
||||||
|
|
||||||
|
'''
|
||||||
|
`trio.Task` cancellation helpers, extensions and "holsters".
|
||||||
|
|
||||||
|
'''
|
||||||
|
from __future__ import annotations
|
||||||
|
from contextlib import (
|
||||||
|
asynccontextmanager as acm,
|
||||||
|
)
|
||||||
|
from typing import TYPE_CHECKING
|
||||||
|
|
||||||
|
import trio
|
||||||
|
from tractor.log import get_logger
|
||||||
|
|
||||||
|
log = get_logger(__name__)
|
||||||
|
|
||||||
|
|
||||||
|
if TYPE_CHECKING:
|
||||||
|
from tractor.devx.debug import BoxedMaybeException
|
||||||
|
|
||||||
|
|
||||||
|
def find_masked_excs(
|
||||||
|
maybe_masker: BaseException,
|
||||||
|
unmask_from: set[BaseException],
|
||||||
|
) -> BaseException|None:
|
||||||
|
''''
|
||||||
|
Deliver any `maybe_masker.__context__` provided
|
||||||
|
it a declared masking exc-type entry in `unmask_from`.
|
||||||
|
|
||||||
|
'''
|
||||||
|
if (
|
||||||
|
type(maybe_masker) in unmask_from
|
||||||
|
and
|
||||||
|
(exc_ctx := maybe_masker.__context__)
|
||||||
|
|
||||||
|
# TODO? what about any cases where
|
||||||
|
# they could be the same type but not same instance?
|
||||||
|
# |_i.e. a cancel masking a cancel ??
|
||||||
|
# or (
|
||||||
|
# exc_ctx is not maybe_masker
|
||||||
|
# )
|
||||||
|
):
|
||||||
|
return exc_ctx
|
||||||
|
|
||||||
|
return None
|
||||||
|
|
||||||
|
|
||||||
|
# XXX, relevant discussion @ `trio`-core,
|
||||||
|
# https://github.com/python-trio/trio/issues/455
|
||||||
|
#
|
||||||
|
@acm
|
||||||
|
async def maybe_raise_from_masking_exc(
|
||||||
|
tn: trio.Nursery|None = None,
|
||||||
|
unmask_from: (
|
||||||
|
BaseException|
|
||||||
|
tuple[BaseException]
|
||||||
|
) = (trio.Cancelled,),
|
||||||
|
|
||||||
|
raise_unmasked: bool = True,
|
||||||
|
extra_note: str = (
|
||||||
|
'This can occurr when,\n'
|
||||||
|
' - a `trio.Nursery` scope embeds a `finally:`-block '
|
||||||
|
'which executes a checkpoint!'
|
||||||
|
#
|
||||||
|
# ^TODO? other cases?
|
||||||
|
),
|
||||||
|
|
||||||
|
always_warn_on: tuple[BaseException] = (
|
||||||
|
trio.Cancelled,
|
||||||
|
),
|
||||||
|
# ^XXX, special case(s) where we warn-log bc likely
|
||||||
|
# there will be no operational diff since the exc
|
||||||
|
# is always expected to be consumed.
|
||||||
|
) -> BoxedMaybeException:
|
||||||
|
'''
|
||||||
|
Maybe un-mask and re-raise exception(s) suppressed by a known
|
||||||
|
error-used-as-signal type (cough namely `trio.Cancelled`).
|
||||||
|
|
||||||
|
Though this unmasker targets cancelleds, it can be used more
|
||||||
|
generally to capture and unwrap masked excs detected as
|
||||||
|
`.__context__` values which were suppressed by any error type
|
||||||
|
passed in `unmask_from`.
|
||||||
|
|
||||||
|
-------------
|
||||||
|
STILL-TODO ??
|
||||||
|
-------------
|
||||||
|
-[ ] support for egs which have multiple masked entries in
|
||||||
|
`maybe_eg.exceptions`, in which case we should unmask the
|
||||||
|
individual sub-excs but maintain the eg-parent's form right?
|
||||||
|
|
||||||
|
'''
|
||||||
|
from tractor.devx.debug import (
|
||||||
|
BoxedMaybeException,
|
||||||
|
pause,
|
||||||
|
)
|
||||||
|
boxed_maybe_exc = BoxedMaybeException(
|
||||||
|
raise_on_exit=raise_unmasked,
|
||||||
|
)
|
||||||
|
matching: list[BaseException]|None = None
|
||||||
|
maybe_eg: ExceptionGroup|None
|
||||||
|
|
||||||
|
if tn:
|
||||||
|
try: # handle egs
|
||||||
|
yield boxed_maybe_exc
|
||||||
|
return
|
||||||
|
except* unmask_from as _maybe_eg:
|
||||||
|
maybe_eg = _maybe_eg
|
||||||
|
matches: ExceptionGroup
|
||||||
|
matches, _ = maybe_eg.split(
|
||||||
|
unmask_from
|
||||||
|
)
|
||||||
|
if not matches:
|
||||||
|
raise
|
||||||
|
|
||||||
|
matching: list[BaseException] = matches.exceptions
|
||||||
|
else:
|
||||||
|
try: # handle non-egs
|
||||||
|
yield boxed_maybe_exc
|
||||||
|
return
|
||||||
|
except unmask_from as _maybe_exc:
|
||||||
|
maybe_exc = _maybe_exc
|
||||||
|
matching: list[BaseException] = [
|
||||||
|
maybe_exc
|
||||||
|
]
|
||||||
|
|
||||||
|
# XXX, only unmask-ed for debuggin!
|
||||||
|
# TODO, remove eventually..
|
||||||
|
except BaseException as _berr:
|
||||||
|
berr = _berr
|
||||||
|
await pause(shield=True)
|
||||||
|
raise berr
|
||||||
|
|
||||||
|
if matching is None:
|
||||||
|
raise
|
||||||
|
|
||||||
|
masked: list[tuple[BaseException, BaseException]] = []
|
||||||
|
for exc_match in matching:
|
||||||
|
|
||||||
|
if exc_ctx := find_masked_excs(
|
||||||
|
maybe_masker=exc_match,
|
||||||
|
unmask_from={unmask_from},
|
||||||
|
):
|
||||||
|
masked.append((exc_ctx, exc_match))
|
||||||
|
boxed_maybe_exc.value = exc_match
|
||||||
|
note: str = (
|
||||||
|
f'\n'
|
||||||
|
f'^^WARNING^^ the above {exc_ctx!r} was masked by a {unmask_from!r}\n'
|
||||||
|
)
|
||||||
|
if extra_note:
|
||||||
|
note += (
|
||||||
|
f'\n'
|
||||||
|
f'{extra_note}\n'
|
||||||
|
)
|
||||||
|
exc_ctx.add_note(note)
|
||||||
|
|
||||||
|
if type(exc_match) in always_warn_on:
|
||||||
|
log.warning(note)
|
||||||
|
|
||||||
|
# await tractor.pause(shield=True)
|
||||||
|
if raise_unmasked:
|
||||||
|
|
||||||
|
if len(masked) < 2:
|
||||||
|
raise exc_ctx from exc_match
|
||||||
|
else:
|
||||||
|
# ?TODO, see above but, possibly unmasking sub-exc
|
||||||
|
# entries if there are > 1
|
||||||
|
await pause(shield=True)
|
||||||
|
else:
|
||||||
|
raise
|
Loading…
Reference in New Issue