Compare commits
	
		
			10 Commits 
		
	
	
		
			6bf571a124
			...
			cd16748598
		
	
	| Author | SHA1 | Date | 
|---|---|---|
| 
							
							
								 | 
						cd16748598 | |
| 
							
							
								 | 
						1af35f8170 | |
| 
							
							
								 | 
						4569d11052 | |
| 
							
							
								 | 
						6ba76ab700 | |
| 
							
							
								 | 
						734dda35e9 | |
| 
							
							
								 | 
						b7e04525cc | |
| 
							
							
								 | 
						35977dcebb | |
| 
							
							
								 | 
						e1f26f9611 | |
| 
							
							
								 | 
						63c5b7696a | |
| 
							
							
								 | 
						5f94f52226 | 
| 
						 | 
				
			
			@ -252,7 +252,7 @@ def test_simple_context(
 | 
			
		|||
            pass
 | 
			
		||||
        except BaseExceptionGroup as beg:
 | 
			
		||||
            # XXX: on windows it seems we may have to expect the group error
 | 
			
		||||
            from tractor._exceptions import is_multi_cancelled
 | 
			
		||||
            from tractor.trionics import is_multi_cancelled
 | 
			
		||||
            assert is_multi_cancelled(beg)
 | 
			
		||||
    else:
 | 
			
		||||
        trio.run(main)
 | 
			
		||||
| 
						 | 
				
			
			
 | 
			
		|||
| 
						 | 
				
			
			@ -0,0 +1,237 @@
 | 
			
		|||
'''
 | 
			
		||||
Special case testing for issues not (dis)covered in the primary
 | 
			
		||||
`Context` related functional/scenario suites.
 | 
			
		||||
 | 
			
		||||
**NOTE: this mod is a WIP** space for handling
 | 
			
		||||
odd/rare/undiscovered/not-yet-revealed faults which either
 | 
			
		||||
loudly (ideal case) breakl our supervision protocol
 | 
			
		||||
or (worst case) result in distributed sys hangs.
 | 
			
		||||
 | 
			
		||||
Suites here further try to clarify (if [partially] ill-defined) and
 | 
			
		||||
verify our edge case semantics for inter-actor-relayed-exceptions
 | 
			
		||||
including,
 | 
			
		||||
 | 
			
		||||
- lowlevel: what remote obj-data is interchanged for IPC and what is
 | 
			
		||||
  native-obj form is expected from unpacking in the the new
 | 
			
		||||
  mem-domain.
 | 
			
		||||
 | 
			
		||||
- which kinds of `RemoteActorError` (and its derivs) are expected by which
 | 
			
		||||
  (types of) peers (parent, child, sibling, etc) with what
 | 
			
		||||
  particular meta-data set such as,
 | 
			
		||||
 | 
			
		||||
  - `.src_uid`: the original (maybe) peer who raised.
 | 
			
		||||
  - `.relay_uid`: the next-hop-peer who sent it.
 | 
			
		||||
  - `.relay_path`: the sequence of peer actor hops.
 | 
			
		||||
  - `.is_inception`: a predicate that denotes multi-hop remote errors.
 | 
			
		||||
 | 
			
		||||
- when should `ExceptionGroup`s be relayed from a particular
 | 
			
		||||
  remote endpoint, they should never be caused by implicit `._rpc`
 | 
			
		||||
  nursery machinery!
 | 
			
		||||
 | 
			
		||||
- various special `trio` edge cases around its cancellation semantics
 | 
			
		||||
  and how we (currently) leverage `trio.Cancelled` as a signal for
 | 
			
		||||
  whether a `Context` task should raise `ContextCancelled` (ctx).
 | 
			
		||||
 | 
			
		||||
'''
 | 
			
		||||
import pytest
 | 
			
		||||
import trio
 | 
			
		||||
import tractor
 | 
			
		||||
from tractor import (  # typing
 | 
			
		||||
    ActorNursery,
 | 
			
		||||
    Portal,
 | 
			
		||||
    Context,
 | 
			
		||||
    ContextCancelled,
 | 
			
		||||
)
 | 
			
		||||
 | 
			
		||||
 | 
			
		||||
@tractor.context
 | 
			
		||||
async def sleep_n_chkpt_in_finally(
 | 
			
		||||
    ctx: Context,
 | 
			
		||||
    sleep_n_raise: bool,
 | 
			
		||||
 | 
			
		||||
    chld_raise_delay: float,
 | 
			
		||||
    chld_finally_delay: float,
 | 
			
		||||
 | 
			
		||||
    rent_cancels: bool,
 | 
			
		||||
    rent_ctxc_delay: float,
 | 
			
		||||
 | 
			
		||||
    expect_exc: str|None = None,
 | 
			
		||||
 | 
			
		||||
) -> None:
 | 
			
		||||
    '''
 | 
			
		||||
    Sync, open a tn, then wait for cancel, run a chkpt inside
 | 
			
		||||
    the user's `finally:` teardown.
 | 
			
		||||
 | 
			
		||||
    This covers a footgun case that `trio` core doesn't seem to care about
 | 
			
		||||
    wherein an exc can be masked by a `trio.Cancelled` raised inside a tn emedded
 | 
			
		||||
    `finally:`.
 | 
			
		||||
 | 
			
		||||
    Also see `test_trioisms::test_acm_embedded_nursery_propagates_enter_err`
 | 
			
		||||
    for the down and gritty details.
 | 
			
		||||
 | 
			
		||||
    Since a `@context` endpoint fn can also contain code like this,
 | 
			
		||||
    **and** bc we currently have no easy way other then
 | 
			
		||||
    `trio.Cancelled` to signal cancellation on each side of an IPC `Context`,
 | 
			
		||||
    the footgun issue can compound itself as demonstrated in this suite..
 | 
			
		||||
 | 
			
		||||
    Here are some edge cases codified with our WIP "sclang" syntax
 | 
			
		||||
    (note the parent(rent)/child(chld) naming here is just
 | 
			
		||||
    pragmatism, generally these most of these cases can occurr
 | 
			
		||||
    regardless of the distributed-task's supervision hiearchy),
 | 
			
		||||
 | 
			
		||||
    - rent c)=> chld.raises-then-taskc-in-finally
 | 
			
		||||
     |_ chld's body raises an `exc: BaseException`.
 | 
			
		||||
      _ in its `finally:` block it runs a chkpoint
 | 
			
		||||
        which raises a taskc (`trio.Cancelled`) which
 | 
			
		||||
        masks `exc` instead raising taskc up to the first tn.
 | 
			
		||||
      _ the embedded/chld tn captures the masking taskc and then
 | 
			
		||||
        raises it up to the ._rpc-ep-tn instead of `exc`.
 | 
			
		||||
      _ the rent thinks the child ctxc-ed instead of errored..
 | 
			
		||||
 | 
			
		||||
    '''
 | 
			
		||||
    await ctx.started()
 | 
			
		||||
 | 
			
		||||
    if expect_exc:
 | 
			
		||||
        expect_exc: BaseException = tractor._exceptions.get_err_type(
 | 
			
		||||
            type_name=expect_exc,
 | 
			
		||||
        )
 | 
			
		||||
 | 
			
		||||
    berr: BaseException|None = None
 | 
			
		||||
    try:
 | 
			
		||||
        if not sleep_n_raise:
 | 
			
		||||
            await trio.sleep_forever()
 | 
			
		||||
        elif sleep_n_raise:
 | 
			
		||||
 | 
			
		||||
            # XXX this sleep is less then the sleep the parent
 | 
			
		||||
            # does before calling `ctx.cancel()`
 | 
			
		||||
            await trio.sleep(chld_raise_delay)
 | 
			
		||||
 | 
			
		||||
            # XXX this will be masked by a taskc raised in
 | 
			
		||||
            # the `finally:` if this fn doesn't terminate
 | 
			
		||||
            # before any ctxc-req arrives AND a checkpoint is hit
 | 
			
		||||
            # in that `finally:`.
 | 
			
		||||
            raise RuntimeError('my app krurshed..')
 | 
			
		||||
 | 
			
		||||
    except BaseException as _berr:
 | 
			
		||||
        berr = _berr
 | 
			
		||||
 | 
			
		||||
        # TODO: it'd sure be nice to be able to inject our own
 | 
			
		||||
        # `ContextCancelled` here instead of of `trio.Cancelled`
 | 
			
		||||
        # so that our runtime can expect it and this "user code"
 | 
			
		||||
        # would be able to tell the diff between a generic trio
 | 
			
		||||
        # cancel and a tractor runtime-IPC cancel.
 | 
			
		||||
        if expect_exc:
 | 
			
		||||
            if not isinstance(
 | 
			
		||||
                berr,
 | 
			
		||||
                expect_exc,
 | 
			
		||||
            ):
 | 
			
		||||
                raise ValueError(
 | 
			
		||||
                    f'Unexpected exc type ??\n'
 | 
			
		||||
                    f'{berr!r}\n'
 | 
			
		||||
                    f'\n'
 | 
			
		||||
                    f'Expected a {expect_exc!r}\n'
 | 
			
		||||
                )
 | 
			
		||||
 | 
			
		||||
        raise berr
 | 
			
		||||
 | 
			
		||||
    # simulate what user code might try even though
 | 
			
		||||
    # it's a known boo-boo..
 | 
			
		||||
    finally:
 | 
			
		||||
        # maybe wait for rent ctxc to arrive
 | 
			
		||||
        with trio.CancelScope(shield=True):
 | 
			
		||||
            await trio.sleep(chld_finally_delay)
 | 
			
		||||
 | 
			
		||||
        # !!XXX this will raise `trio.Cancelled` which
 | 
			
		||||
        # will mask the RTE from above!!!
 | 
			
		||||
        #
 | 
			
		||||
        # YES, it's the same case as our extant
 | 
			
		||||
        # `test_trioisms::test_acm_embedded_nursery_propagates_enter_err`
 | 
			
		||||
        try:
 | 
			
		||||
            await trio.lowlevel.checkpoint()
 | 
			
		||||
        except trio.Cancelled as taskc:
 | 
			
		||||
            if (scope_err := taskc.__context__):
 | 
			
		||||
                print(
 | 
			
		||||
                    f'XXX MASKED REMOTE ERROR XXX\n'
 | 
			
		||||
                    f'ENDPOINT exception -> {scope_err!r}\n'
 | 
			
		||||
                    f'will be masked by -> {taskc!r}\n'
 | 
			
		||||
                )
 | 
			
		||||
                # await tractor.pause(shield=True)
 | 
			
		||||
 | 
			
		||||
            raise taskc
 | 
			
		||||
 | 
			
		||||
 | 
			
		||||
@pytest.mark.parametrize(
 | 
			
		||||
    'chld_callspec',
 | 
			
		||||
    [
 | 
			
		||||
        dict(
 | 
			
		||||
            sleep_n_raise=None,
 | 
			
		||||
            chld_raise_delay=0.1,
 | 
			
		||||
            chld_finally_delay=0.1,
 | 
			
		||||
            expect_exc='Cancelled',
 | 
			
		||||
            rent_cancels=True,
 | 
			
		||||
            rent_ctxc_delay=0.1,
 | 
			
		||||
        ),
 | 
			
		||||
        dict(
 | 
			
		||||
            sleep_n_raise='RuntimeError',
 | 
			
		||||
            chld_raise_delay=0.1,
 | 
			
		||||
            chld_finally_delay=1,
 | 
			
		||||
            expect_exc='RuntimeError',
 | 
			
		||||
            rent_cancels=False,
 | 
			
		||||
            rent_ctxc_delay=0.1,
 | 
			
		||||
        ),
 | 
			
		||||
    ],
 | 
			
		||||
    ids=lambda item: f'chld_callspec={item!r}'
 | 
			
		||||
)
 | 
			
		||||
def test_unmasked_remote_exc(
 | 
			
		||||
    debug_mode: bool,
 | 
			
		||||
    chld_callspec: dict,
 | 
			
		||||
    tpt_proto: str,
 | 
			
		||||
):
 | 
			
		||||
    expect_exc_str: str|None = chld_callspec['sleep_n_raise']
 | 
			
		||||
    rent_ctxc_delay: float|None = chld_callspec['rent_ctxc_delay']
 | 
			
		||||
    async def main():
 | 
			
		||||
        an: ActorNursery
 | 
			
		||||
        async with tractor.open_nursery(
 | 
			
		||||
            debug_mode=debug_mode,
 | 
			
		||||
            enable_transports=[tpt_proto],
 | 
			
		||||
        ) as an:
 | 
			
		||||
            ptl: Portal = await an.start_actor(
 | 
			
		||||
                'cancellee',
 | 
			
		||||
                enable_modules=[__name__],
 | 
			
		||||
            )
 | 
			
		||||
            ctx: Context
 | 
			
		||||
            async with (
 | 
			
		||||
                ptl.open_context(
 | 
			
		||||
                    sleep_n_chkpt_in_finally,
 | 
			
		||||
                    **chld_callspec,
 | 
			
		||||
                ) as (ctx, sent),
 | 
			
		||||
            ):
 | 
			
		||||
                assert not sent
 | 
			
		||||
                await trio.sleep(rent_ctxc_delay)
 | 
			
		||||
                await ctx.cancel()
 | 
			
		||||
 | 
			
		||||
                # recv error or result from chld
 | 
			
		||||
                ctxc: ContextCancelled = await ctx.wait_for_result()
 | 
			
		||||
                assert (
 | 
			
		||||
                    ctxc is ctx.outcome
 | 
			
		||||
                    and
 | 
			
		||||
                    isinstance(ctxc, ContextCancelled)
 | 
			
		||||
                )
 | 
			
		||||
 | 
			
		||||
            # always graceful terminate the sub in non-error cases
 | 
			
		||||
            await an.cancel()
 | 
			
		||||
 | 
			
		||||
    if expect_exc_str:
 | 
			
		||||
        expect_exc: BaseException = tractor._exceptions.get_err_type(
 | 
			
		||||
            type_name=expect_exc_str,
 | 
			
		||||
        )
 | 
			
		||||
        with pytest.raises(
 | 
			
		||||
            expected_exception=tractor.RemoteActorError,
 | 
			
		||||
        ) as excinfo:
 | 
			
		||||
            trio.run(main)
 | 
			
		||||
 | 
			
		||||
        rae = excinfo.value
 | 
			
		||||
        assert expect_exc == rae.boxed_type
 | 
			
		||||
 | 
			
		||||
    else:
 | 
			
		||||
        trio.run(main)
 | 
			
		||||
| 
						 | 
				
			
			@ -112,55 +112,11 @@ def test_acm_embedded_nursery_propagates_enter_err(
 | 
			
		|||
    '''
 | 
			
		||||
    import tractor
 | 
			
		||||
 | 
			
		||||
    @acm
 | 
			
		||||
    async def maybe_raise_from_masking_exc(
 | 
			
		||||
        tn: trio.Nursery,
 | 
			
		||||
        unmask_from: BaseException|None = trio.Cancelled
 | 
			
		||||
 | 
			
		||||
        # TODO, maybe offer a collection?
 | 
			
		||||
        # unmask_from: set[BaseException] = {
 | 
			
		||||
        #     trio.Cancelled,
 | 
			
		||||
        # },
 | 
			
		||||
    ):
 | 
			
		||||
        if not unmask_from:
 | 
			
		||||
            yield
 | 
			
		||||
            return
 | 
			
		||||
 | 
			
		||||
        try:
 | 
			
		||||
            yield
 | 
			
		||||
        except* unmask_from as be_eg:
 | 
			
		||||
 | 
			
		||||
            # TODO, if we offer `unmask_from: set`
 | 
			
		||||
            # for masker_exc_type in unmask_from:
 | 
			
		||||
 | 
			
		||||
            matches, rest = be_eg.split(unmask_from)
 | 
			
		||||
            if not matches:
 | 
			
		||||
                raise
 | 
			
		||||
 | 
			
		||||
            for exc_match in be_eg.exceptions:
 | 
			
		||||
                if (
 | 
			
		||||
                    (exc_ctx := exc_match.__context__)
 | 
			
		||||
                    and
 | 
			
		||||
                    type(exc_ctx) not in {
 | 
			
		||||
                        # trio.Cancelled,  # always by default?
 | 
			
		||||
                        unmask_from,
 | 
			
		||||
                    }
 | 
			
		||||
                ):
 | 
			
		||||
                    exc_ctx.add_note(
 | 
			
		||||
                        f'\n'
 | 
			
		||||
                        f'WARNING: the above error was masked by a {unmask_from!r} !?!\n'
 | 
			
		||||
                        f'Are you always cancelling? Say from a `finally:` ?\n\n'
 | 
			
		||||
 | 
			
		||||
                        f'{tn!r}'
 | 
			
		||||
                    )
 | 
			
		||||
                    raise exc_ctx from exc_match
 | 
			
		||||
 | 
			
		||||
 | 
			
		||||
    @acm
 | 
			
		||||
    async def wraps_tn_that_always_cancels():
 | 
			
		||||
        async with (
 | 
			
		||||
            trio.open_nursery() as tn,
 | 
			
		||||
            maybe_raise_from_masking_exc(
 | 
			
		||||
            tractor.trionics.maybe_raise_from_masking_exc(
 | 
			
		||||
                tn=tn,
 | 
			
		||||
                unmask_from=(
 | 
			
		||||
                    trio.Cancelled
 | 
			
		||||
| 
						 | 
				
			
			@ -202,3 +158,60 @@ def test_acm_embedded_nursery_propagates_enter_err(
 | 
			
		|||
    assert_eg, rest_eg = eg.split(AssertionError)
 | 
			
		||||
 | 
			
		||||
    assert len(assert_eg.exceptions) == 1
 | 
			
		||||
 | 
			
		||||
 | 
			
		||||
 | 
			
		||||
def test_gatherctxs_with_memchan_breaks_multicancelled(
 | 
			
		||||
    debug_mode: bool,
 | 
			
		||||
):
 | 
			
		||||
    '''
 | 
			
		||||
    Demo how a using an `async with sndchan` inside a `.trionics.gather_contexts()` task
 | 
			
		||||
    will break a strict-eg-tn's multi-cancelled absorption..
 | 
			
		||||
 | 
			
		||||
    '''
 | 
			
		||||
    from tractor import (
 | 
			
		||||
        trionics,
 | 
			
		||||
    )
 | 
			
		||||
 | 
			
		||||
    @acm
 | 
			
		||||
    async def open_memchan() -> trio.abc.ReceiveChannel:
 | 
			
		||||
 | 
			
		||||
        task: trio.Task = trio.lowlevel.current_task()
 | 
			
		||||
        print(
 | 
			
		||||
            f'Opening {task!r}\n'
 | 
			
		||||
        )
 | 
			
		||||
 | 
			
		||||
        # 1 to force eager sending
 | 
			
		||||
        send, recv = trio.open_memory_channel(16)
 | 
			
		||||
 | 
			
		||||
        try:
 | 
			
		||||
            async with send:
 | 
			
		||||
                yield recv
 | 
			
		||||
        finally:
 | 
			
		||||
            print(
 | 
			
		||||
                f'Closed {task!r}\n'
 | 
			
		||||
            )
 | 
			
		||||
 | 
			
		||||
 | 
			
		||||
    async def main():
 | 
			
		||||
        async with (
 | 
			
		||||
            # XXX should ensure ONLY the KBI
 | 
			
		||||
            # is relayed upward
 | 
			
		||||
            trionics.collapse_eg(),
 | 
			
		||||
            trio.open_nursery(
 | 
			
		||||
                # strict_exception_groups=False,
 | 
			
		||||
            ), # as tn,
 | 
			
		||||
 | 
			
		||||
            trionics.gather_contexts([
 | 
			
		||||
                open_memchan(),
 | 
			
		||||
                open_memchan(),
 | 
			
		||||
            ]) as recv_chans,
 | 
			
		||||
        ):
 | 
			
		||||
            assert len(recv_chans) == 2
 | 
			
		||||
 | 
			
		||||
            await trio.sleep(1)
 | 
			
		||||
            raise KeyboardInterrupt
 | 
			
		||||
            # tn.cancel_scope.cancel()
 | 
			
		||||
 | 
			
		||||
    with pytest.raises(KeyboardInterrupt):
 | 
			
		||||
        trio.run(main)
 | 
			
		||||
| 
						 | 
				
			
			
 | 
			
		|||
| 
						 | 
				
			
			@ -1246,55 +1246,6 @@ def unpack_error(
 | 
			
		|||
    return exc
 | 
			
		||||
 | 
			
		||||
 | 
			
		||||
def is_multi_cancelled(
 | 
			
		||||
    exc: BaseException|BaseExceptionGroup,
 | 
			
		||||
 | 
			
		||||
    ignore_nested: set[BaseException] = set(),
 | 
			
		||||
 | 
			
		||||
) -> bool|BaseExceptionGroup:
 | 
			
		||||
    '''
 | 
			
		||||
    Predicate to determine if an `BaseExceptionGroup` only contains
 | 
			
		||||
    some (maybe nested) set of sub-grouped exceptions (like only
 | 
			
		||||
    `trio.Cancelled`s which get swallowed silently by default) and is
 | 
			
		||||
    thus the result of "gracefully cancelling" a collection of
 | 
			
		||||
    sub-tasks (or other conc primitives) and receiving a "cancelled
 | 
			
		||||
    ACK" from each after termination.
 | 
			
		||||
 | 
			
		||||
    Docs:
 | 
			
		||||
    ----
 | 
			
		||||
    - https://docs.python.org/3/library/exceptions.html#exception-groups
 | 
			
		||||
    - https://docs.python.org/3/library/exceptions.html#BaseExceptionGroup.subgroup
 | 
			
		||||
 | 
			
		||||
    '''
 | 
			
		||||
 | 
			
		||||
    if (
 | 
			
		||||
        not ignore_nested
 | 
			
		||||
        or
 | 
			
		||||
        trio.Cancelled in ignore_nested
 | 
			
		||||
        # XXX always count-in `trio`'s native signal
 | 
			
		||||
    ):
 | 
			
		||||
        ignore_nested.update({trio.Cancelled})
 | 
			
		||||
 | 
			
		||||
    if isinstance(exc, BaseExceptionGroup):
 | 
			
		||||
        matched_exc: BaseExceptionGroup|None = exc.subgroup(
 | 
			
		||||
            tuple(ignore_nested),
 | 
			
		||||
 | 
			
		||||
            # TODO, complain about why not allowed XD
 | 
			
		||||
            # condition=tuple(ignore_nested),
 | 
			
		||||
        )
 | 
			
		||||
        if matched_exc is not None:
 | 
			
		||||
            return matched_exc
 | 
			
		||||
 | 
			
		||||
    # NOTE, IFF no excs types match (throughout the error-tree)
 | 
			
		||||
    # -> return `False`, OW return the matched sub-eg.
 | 
			
		||||
    #
 | 
			
		||||
    # IOW, for the inverse of ^ for the purpose of
 | 
			
		||||
    # maybe-enter-REPL--logic: "only debug when the err-tree contains
 | 
			
		||||
    # at least one exc-type NOT in `ignore_nested`" ; i.e. the case where
 | 
			
		||||
    # we fallthrough and return `False` here.
 | 
			
		||||
    return False
 | 
			
		||||
 | 
			
		||||
 | 
			
		||||
def _raise_from_unexpected_msg(
 | 
			
		||||
    ctx: Context,
 | 
			
		||||
    msg: MsgType,
 | 
			
		||||
| 
						 | 
				
			
			
 | 
			
		|||
| 
						 | 
				
			
			@ -61,9 +61,11 @@ from ._addr import (
 | 
			
		|||
    mk_uuid,
 | 
			
		||||
    wrap_address,
 | 
			
		||||
)
 | 
			
		||||
from .trionics import (
 | 
			
		||||
    is_multi_cancelled,
 | 
			
		||||
)
 | 
			
		||||
from ._exceptions import (
 | 
			
		||||
    RuntimeFailure,
 | 
			
		||||
    is_multi_cancelled,
 | 
			
		||||
)
 | 
			
		||||
 | 
			
		||||
 | 
			
		||||
| 
						 | 
				
			
			
 | 
			
		|||
| 
						 | 
				
			
			@ -37,6 +37,7 @@ import warnings
 | 
			
		|||
 | 
			
		||||
import trio
 | 
			
		||||
from trio import (
 | 
			
		||||
    Cancelled,
 | 
			
		||||
    CancelScope,
 | 
			
		||||
    Nursery,
 | 
			
		||||
    TaskStatus,
 | 
			
		||||
| 
						 | 
				
			
			@ -52,10 +53,14 @@ from ._exceptions import (
 | 
			
		|||
    ModuleNotExposed,
 | 
			
		||||
    MsgTypeError,
 | 
			
		||||
    TransportClosed,
 | 
			
		||||
    is_multi_cancelled,
 | 
			
		||||
    pack_error,
 | 
			
		||||
    unpack_error,
 | 
			
		||||
)
 | 
			
		||||
from .trionics import (
 | 
			
		||||
    collapse_eg,
 | 
			
		||||
    is_multi_cancelled,
 | 
			
		||||
    maybe_raise_from_masking_exc,
 | 
			
		||||
)
 | 
			
		||||
from .devx import (
 | 
			
		||||
    debug,
 | 
			
		||||
    add_div,
 | 
			
		||||
| 
						 | 
				
			
			@ -250,7 +255,7 @@ async def _errors_relayed_via_ipc(
 | 
			
		|||
    ctx: Context,
 | 
			
		||||
    is_rpc: bool,
 | 
			
		||||
 | 
			
		||||
    hide_tb: bool = False,
 | 
			
		||||
    hide_tb: bool = True,
 | 
			
		||||
    debug_kbis: bool = False,
 | 
			
		||||
    task_status: TaskStatus[
 | 
			
		||||
        Context | BaseException
 | 
			
		||||
| 
						 | 
				
			
			@ -375,9 +380,9 @@ async def _errors_relayed_via_ipc(
 | 
			
		|||
    # they can be individually ccancelled.
 | 
			
		||||
    finally:
 | 
			
		||||
 | 
			
		||||
        # if the error is not from user code and instead a failure
 | 
			
		||||
        # of a runtime RPC or transport failure we do prolly want to
 | 
			
		||||
        # show this frame
 | 
			
		||||
        # if the error is not from user code and instead a failure of
 | 
			
		||||
        # an internal-runtime-RPC or IPC-connection, we do (prolly) want
 | 
			
		||||
        # to show this frame!
 | 
			
		||||
        if (
 | 
			
		||||
            rpc_err
 | 
			
		||||
            and (
 | 
			
		||||
| 
						 | 
				
			
			@ -616,32 +621,40 @@ async def _invoke(
 | 
			
		|||
        #  -> the below scope is never exposed to the
 | 
			
		||||
        #     `@context` marked RPC function.
 | 
			
		||||
        # - `._portal` is never set.
 | 
			
		||||
        scope_err: BaseException|None = None
 | 
			
		||||
        try:
 | 
			
		||||
            tn: trio.Nursery
 | 
			
		||||
            # TODO: better `trionics` primitive/tooling usage here!
 | 
			
		||||
            # -[ ] should would be nice to have our `TaskMngr`
 | 
			
		||||
            #   nursery here!
 | 
			
		||||
            # -[ ] payload value checking like we do with
 | 
			
		||||
            #   `.started()` such that the debbuger can engage
 | 
			
		||||
            #   here in the child task instead of waiting for the
 | 
			
		||||
            #   parent to crash with it's own MTE..
 | 
			
		||||
            #
 | 
			
		||||
            tn: Nursery
 | 
			
		||||
            rpc_ctx_cs: CancelScope
 | 
			
		||||
            async with (
 | 
			
		||||
                trio.open_nursery(
 | 
			
		||||
                    strict_exception_groups=False,
 | 
			
		||||
                    # ^XXX^ TODO? instead unpack any RAE as per "loose" style?
 | 
			
		||||
 | 
			
		||||
                ) as tn,
 | 
			
		||||
                collapse_eg(),
 | 
			
		||||
                trio.open_nursery() as tn,
 | 
			
		||||
                msgops.maybe_limit_plds(
 | 
			
		||||
                    ctx=ctx,
 | 
			
		||||
                    spec=ctx_meta.get('pld_spec'),
 | 
			
		||||
                    dec_hook=ctx_meta.get('dec_hook'),
 | 
			
		||||
                ),
 | 
			
		||||
 | 
			
		||||
                # XXX NOTE, this being the "most embedded"
 | 
			
		||||
                # scope ensures unasking of the `await coro` below
 | 
			
		||||
                # *should* never be interfered with!!
 | 
			
		||||
                maybe_raise_from_masking_exc(
 | 
			
		||||
                    tn=tn,
 | 
			
		||||
                    unmask_from=Cancelled,
 | 
			
		||||
                ) as _mbme,  # maybe boxed masked exc
 | 
			
		||||
            ):
 | 
			
		||||
                ctx._scope_nursery = tn
 | 
			
		||||
                rpc_ctx_cs = ctx._scope = tn.cancel_scope
 | 
			
		||||
                task_status.started(ctx)
 | 
			
		||||
 | 
			
		||||
                # TODO: better `trionics` tooling:
 | 
			
		||||
                # -[ ] should would be nice to have our `TaskMngr`
 | 
			
		||||
                #   nursery here!
 | 
			
		||||
                # -[ ] payload value checking like we do with
 | 
			
		||||
                #   `.started()` such that the debbuger can engage
 | 
			
		||||
                #   here in the child task instead of waiting for the
 | 
			
		||||
                #   parent to crash with it's own MTE..
 | 
			
		||||
                # invoke user endpoint fn.
 | 
			
		||||
                res: Any|PayloadT = await coro
 | 
			
		||||
                return_msg: Return|CancelAck = return_msg_type(
 | 
			
		||||
                    cid=cid,
 | 
			
		||||
| 
						 | 
				
			
			@ -744,38 +757,48 @@ async def _invoke(
 | 
			
		|||
            BaseException,
 | 
			
		||||
            trio.Cancelled,
 | 
			
		||||
 | 
			
		||||
        ) as scope_error:
 | 
			
		||||
        ) as _scope_err:
 | 
			
		||||
            scope_err = _scope_err
 | 
			
		||||
            if (
 | 
			
		||||
                isinstance(scope_error, RuntimeError)
 | 
			
		||||
                and scope_error.args
 | 
			
		||||
                and 'Cancel scope stack corrupted' in scope_error.args[0]
 | 
			
		||||
                isinstance(scope_err, RuntimeError)
 | 
			
		||||
                and
 | 
			
		||||
                scope_err.args
 | 
			
		||||
                and
 | 
			
		||||
                'Cancel scope stack corrupted' in scope_err.args[0]
 | 
			
		||||
            ):
 | 
			
		||||
                log.exception('Cancel scope stack corrupted!?\n')
 | 
			
		||||
                # debug.mk_pdb().set_trace()
 | 
			
		||||
 | 
			
		||||
            # always set this (child) side's exception as the
 | 
			
		||||
            # local error on the context
 | 
			
		||||
            ctx._local_error: BaseException = scope_error
 | 
			
		||||
            ctx._local_error: BaseException = scope_err
 | 
			
		||||
            # ^-TODO-^ question,
 | 
			
		||||
            # does this matter other then for
 | 
			
		||||
            # consistentcy/testing?
 | 
			
		||||
            # |_ no user code should be in this scope at this point
 | 
			
		||||
            #    AND we already set this in the block below?
 | 
			
		||||
 | 
			
		||||
            # if a remote error was set then likely the
 | 
			
		||||
            # exception group was raised due to that, so
 | 
			
		||||
            # XXX if a remote error was set then likely the
 | 
			
		||||
            # exc group was raised due to that, so
 | 
			
		||||
            # and we instead raise that error immediately!
 | 
			
		||||
            ctx.maybe_raise()
 | 
			
		||||
            maybe_re: (
 | 
			
		||||
                ContextCancelled|RemoteActorError
 | 
			
		||||
            ) = ctx.maybe_raise()
 | 
			
		||||
            if maybe_re:
 | 
			
		||||
                log.cancel(
 | 
			
		||||
                    f'Suppressing remote-exc from peer,\n'
 | 
			
		||||
                    f'{maybe_re!r}\n'
 | 
			
		||||
                )
 | 
			
		||||
 | 
			
		||||
            # maybe TODO: pack in come kinda
 | 
			
		||||
            # `trio.Cancelled.__traceback__` here so they can be
 | 
			
		||||
            # unwrapped and displayed on the caller side? no se..
 | 
			
		||||
            raise
 | 
			
		||||
            raise scope_err
 | 
			
		||||
 | 
			
		||||
        # `@context` entrypoint task bookeeping.
 | 
			
		||||
        # i.e. only pop the context tracking if used ;)
 | 
			
		||||
        finally:
 | 
			
		||||
            assert chan.uid
 | 
			
		||||
            assert chan.aid
 | 
			
		||||
 | 
			
		||||
            # don't pop the local context until we know the
 | 
			
		||||
            # associated child isn't in debug any more
 | 
			
		||||
| 
						 | 
				
			
			@ -802,6 +825,9 @@ async def _invoke(
 | 
			
		|||
                    descr_str += (
 | 
			
		||||
                        f'\n{merr!r}\n'  # needed?
 | 
			
		||||
                        f'{tb_str}\n'
 | 
			
		||||
                        f'\n'
 | 
			
		||||
                        f'scope_error:\n'
 | 
			
		||||
                        f'{scope_err!r}\n'
 | 
			
		||||
                    )
 | 
			
		||||
                else:
 | 
			
		||||
                    descr_str += f'\n{merr!r}\n'
 | 
			
		||||
| 
						 | 
				
			
			
 | 
			
		|||
| 
						 | 
				
			
			@ -40,8 +40,10 @@ from ._state import current_actor, is_main_process
 | 
			
		|||
from .log import get_logger, get_loglevel
 | 
			
		||||
from ._runtime import Actor
 | 
			
		||||
from ._portal import Portal
 | 
			
		||||
from ._exceptions import (
 | 
			
		||||
from .trionics import (
 | 
			
		||||
    is_multi_cancelled,
 | 
			
		||||
)
 | 
			
		||||
from ._exceptions import (
 | 
			
		||||
    ContextCancelled,
 | 
			
		||||
)
 | 
			
		||||
from ._root import (
 | 
			
		||||
| 
						 | 
				
			
			
 | 
			
		|||
| 
						 | 
				
			
			@ -59,7 +59,7 @@ from tractor._state import (
 | 
			
		|||
    debug_mode,
 | 
			
		||||
)
 | 
			
		||||
from tractor.log import get_logger
 | 
			
		||||
from tractor._exceptions import (
 | 
			
		||||
from tractor.trionics import (
 | 
			
		||||
    is_multi_cancelled,
 | 
			
		||||
)
 | 
			
		||||
from ._trace import (
 | 
			
		||||
| 
						 | 
				
			
			
 | 
			
		|||
| 
						 | 
				
			
			@ -38,7 +38,6 @@ from typing import (
 | 
			
		|||
import tractor
 | 
			
		||||
from tractor._exceptions import (
 | 
			
		||||
    InternalError,
 | 
			
		||||
    is_multi_cancelled,
 | 
			
		||||
    TrioTaskExited,
 | 
			
		||||
    TrioCancelled,
 | 
			
		||||
    AsyncioTaskExited,
 | 
			
		||||
| 
						 | 
				
			
			@ -59,6 +58,9 @@ from tractor.log import (
 | 
			
		|||
# from tractor.msg import (
 | 
			
		||||
#     pretty_struct,
 | 
			
		||||
# )
 | 
			
		||||
from tractor.trionics import (
 | 
			
		||||
    is_multi_cancelled,
 | 
			
		||||
)
 | 
			
		||||
from tractor.trionics._broadcast import (
 | 
			
		||||
    broadcast_receiver,
 | 
			
		||||
    BroadcastReceiver,
 | 
			
		||||
| 
						 | 
				
			
			
 | 
			
		|||
| 
						 | 
				
			
			@ -32,4 +32,8 @@ from ._broadcast import (
 | 
			
		|||
from ._beg import (
 | 
			
		||||
    collapse_eg as collapse_eg,
 | 
			
		||||
    maybe_collapse_eg as maybe_collapse_eg,
 | 
			
		||||
    is_multi_cancelled as is_multi_cancelled,
 | 
			
		||||
)
 | 
			
		||||
from ._taskc import (
 | 
			
		||||
    maybe_raise_from_masking_exc as maybe_raise_from_masking_exc,
 | 
			
		||||
)
 | 
			
		||||
| 
						 | 
				
			
			
 | 
			
		|||
| 
						 | 
				
			
			@ -22,6 +22,11 @@ first-class-`trio` from a historical perspective B)
 | 
			
		|||
from contextlib import (
 | 
			
		||||
    asynccontextmanager as acm,
 | 
			
		||||
)
 | 
			
		||||
from typing import (
 | 
			
		||||
    Literal,
 | 
			
		||||
)
 | 
			
		||||
 | 
			
		||||
import trio
 | 
			
		||||
 | 
			
		||||
 | 
			
		||||
def maybe_collapse_eg(
 | 
			
		||||
| 
						 | 
				
			
			@ -56,3 +61,62 @@ async def collapse_eg():
 | 
			
		|||
            raise exc
 | 
			
		||||
 | 
			
		||||
        raise beg
 | 
			
		||||
 | 
			
		||||
 | 
			
		||||
def is_multi_cancelled(
 | 
			
		||||
    beg: BaseException|BaseExceptionGroup,
 | 
			
		||||
 | 
			
		||||
    ignore_nested: set[BaseException] = set(),
 | 
			
		||||
 | 
			
		||||
) -> Literal[False]|BaseExceptionGroup:
 | 
			
		||||
    '''
 | 
			
		||||
    Predicate to determine if an `BaseExceptionGroup` only contains
 | 
			
		||||
    some (maybe nested) set of sub-grouped exceptions (like only
 | 
			
		||||
    `trio.Cancelled`s which get swallowed silently by default) and is
 | 
			
		||||
    thus the result of "gracefully cancelling" a collection of
 | 
			
		||||
    sub-tasks (or other conc primitives) and receiving a "cancelled
 | 
			
		||||
    ACK" from each after termination.
 | 
			
		||||
 | 
			
		||||
    Docs:
 | 
			
		||||
    ----
 | 
			
		||||
    - https://docs.python.org/3/library/exceptions.html#exception-groups
 | 
			
		||||
    - https://docs.python.org/3/library/exceptions.html#BaseExceptionGroup.subgroup
 | 
			
		||||
 | 
			
		||||
    '''
 | 
			
		||||
 | 
			
		||||
    if (
 | 
			
		||||
        not ignore_nested
 | 
			
		||||
        or
 | 
			
		||||
        trio.Cancelled not in ignore_nested
 | 
			
		||||
        # XXX always count-in `trio`'s native signal
 | 
			
		||||
    ):
 | 
			
		||||
        ignore_nested.update({trio.Cancelled})
 | 
			
		||||
 | 
			
		||||
    if isinstance(beg, BaseExceptionGroup):
 | 
			
		||||
        # https://docs.python.org/3/library/exceptions.html#BaseExceptionGroup.subgroup
 | 
			
		||||
        # |_ "The condition can be an exception type or tuple of
 | 
			
		||||
        #   exception types, in which case each exception is checked
 | 
			
		||||
        #   for a match using the same check that is used in an
 | 
			
		||||
        #   except clause. The condition can also be a callable
 | 
			
		||||
        #   (other than a type object) that accepts an exception as
 | 
			
		||||
        #   its single argument and returns true for the exceptions
 | 
			
		||||
        #   that should be in the subgroup."
 | 
			
		||||
        matched_exc: BaseExceptionGroup|None = beg.subgroup(
 | 
			
		||||
            tuple(ignore_nested),
 | 
			
		||||
 | 
			
		||||
            # ??TODO, complain about why not allowed to use
 | 
			
		||||
            # named arg style calling???
 | 
			
		||||
            # XD .. wtf?
 | 
			
		||||
            # condition=tuple(ignore_nested),
 | 
			
		||||
        )
 | 
			
		||||
        if matched_exc is not None:
 | 
			
		||||
            return matched_exc
 | 
			
		||||
 | 
			
		||||
    # NOTE, IFF no excs types match (throughout the error-tree)
 | 
			
		||||
    # -> return `False`, OW return the matched sub-eg.
 | 
			
		||||
    #
 | 
			
		||||
    # IOW, for the inverse of ^ for the purpose of
 | 
			
		||||
    # maybe-enter-REPL--logic: "only debug when the err-tree contains
 | 
			
		||||
    # at least one exc-type NOT in `ignore_nested`" ; i.e. the case where
 | 
			
		||||
    # we fallthrough and return `False` here.
 | 
			
		||||
    return False
 | 
			
		||||
| 
						 | 
				
			
			
 | 
			
		|||
| 
						 | 
				
			
			@ -40,6 +40,8 @@ from typing import (
 | 
			
		|||
import trio
 | 
			
		||||
from tractor._state import current_actor
 | 
			
		||||
from tractor.log import get_logger
 | 
			
		||||
# from ._beg import collapse_eg
 | 
			
		||||
 | 
			
		||||
 | 
			
		||||
if TYPE_CHECKING:
 | 
			
		||||
    from tractor import ActorNursery
 | 
			
		||||
| 
						 | 
				
			
			@ -112,17 +114,19 @@ async def gather_contexts(
 | 
			
		|||
    None,
 | 
			
		||||
]:
 | 
			
		||||
    '''
 | 
			
		||||
    Concurrently enter a sequence of async context managers (acms),
 | 
			
		||||
    each from a separate `trio` task and deliver the unwrapped
 | 
			
		||||
    `yield`-ed values in the same order once all managers have entered.
 | 
			
		||||
    Concurrently enter a sequence of async context managers (`acm`s),
 | 
			
		||||
    each scheduled in a separate `trio.Task` and deliver their
 | 
			
		||||
    unwrapped `yield`-ed values in the same order once all `@acm`s
 | 
			
		||||
    in every task have entered.
 | 
			
		||||
 | 
			
		||||
    On exit, all acms are subsequently and concurrently exited.
 | 
			
		||||
    On exit, all `acm`s are subsequently and concurrently exited with
 | 
			
		||||
    **no order guarantees**.
 | 
			
		||||
 | 
			
		||||
    This function is somewhat similar to a batch of non-blocking
 | 
			
		||||
    calls to `contextlib.AsyncExitStack.enter_async_context()`
 | 
			
		||||
    (inside a loop) *in combo with* a `asyncio.gather()` to get the
 | 
			
		||||
    `.__aenter__()`-ed values, except the managers are both
 | 
			
		||||
    concurrently entered and exited and *cancellation just works*(R).
 | 
			
		||||
    concurrently entered and exited and *cancellation-just-works™*.
 | 
			
		||||
 | 
			
		||||
    '''
 | 
			
		||||
    seed: int = id(mngrs)
 | 
			
		||||
| 
						 | 
				
			
			@ -142,16 +146,20 @@ async def gather_contexts(
 | 
			
		|||
    if not mngrs:
 | 
			
		||||
        raise ValueError(
 | 
			
		||||
            '`.trionics.gather_contexts()` input mngrs is empty?\n'
 | 
			
		||||
            '\n'
 | 
			
		||||
            'Did try to use inline generator syntax?\n'
 | 
			
		||||
            'Use a non-lazy iterator or sequence type intead!'
 | 
			
		||||
            'Use a non-lazy iterator or sequence-type intead!\n'
 | 
			
		||||
        )
 | 
			
		||||
 | 
			
		||||
    async with trio.open_nursery(
 | 
			
		||||
        strict_exception_groups=False,
 | 
			
		||||
        # ^XXX^ TODO? soo roll our own then ??
 | 
			
		||||
        # -> since we kinda want the "if only one `.exception` then
 | 
			
		||||
        # just raise that" interface?
 | 
			
		||||
    ) as tn:
 | 
			
		||||
    async with (
 | 
			
		||||
        # collapse_eg(),
 | 
			
		||||
        trio.open_nursery(
 | 
			
		||||
            strict_exception_groups=False,
 | 
			
		||||
            # ^XXX^ TODO? soo roll our own then ??
 | 
			
		||||
            # -> since we kinda want the "if only one `.exception` then
 | 
			
		||||
            # just raise that" interface?
 | 
			
		||||
        ) as tn,
 | 
			
		||||
    ):
 | 
			
		||||
        for mngr in mngrs:
 | 
			
		||||
            tn.start_soon(
 | 
			
		||||
                _enter_and_wait,
 | 
			
		||||
| 
						 | 
				
			
			@ -168,7 +176,7 @@ async def gather_contexts(
 | 
			
		|||
        try:
 | 
			
		||||
            yield tuple(unwrapped.values())
 | 
			
		||||
        finally:
 | 
			
		||||
            # NOTE: this is ABSOLUTELY REQUIRED to avoid
 | 
			
		||||
            # XXX NOTE: this is ABSOLUTELY REQUIRED to avoid
 | 
			
		||||
            # the following wacky bug:
 | 
			
		||||
            # <tractorbugurlhere>
 | 
			
		||||
            parent_exit.set()
 | 
			
		||||
| 
						 | 
				
			
			
 | 
			
		|||
| 
						 | 
				
			
			@ -0,0 +1,184 @@
 | 
			
		|||
# tractor: structured concurrent "actors".
 | 
			
		||||
# Copyright 2018-eternity Tyler Goodlet.
 | 
			
		||||
 | 
			
		||||
# This program is free software: you can redistribute it and/or modify
 | 
			
		||||
# it under the terms of the GNU Affero General Public License as published by
 | 
			
		||||
# the Free Software Foundation, either version 3 of the License, or
 | 
			
		||||
# (at your option) any later version.
 | 
			
		||||
 | 
			
		||||
# This program is distributed in the hope that it will be useful,
 | 
			
		||||
# but WITHOUT ANY WARRANTY; without even the implied warranty of
 | 
			
		||||
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
 | 
			
		||||
# GNU Affero General Public License for more details.
 | 
			
		||||
 | 
			
		||||
# You should have received a copy of the GNU Affero General Public License
 | 
			
		||||
# along with this program.  If not, see <https://www.gnu.org/licenses/>.
 | 
			
		||||
 | 
			
		||||
'''
 | 
			
		||||
`trio.Task` cancellation helpers, extensions and "holsters".
 | 
			
		||||
 | 
			
		||||
'''
 | 
			
		||||
from __future__ import annotations
 | 
			
		||||
from contextlib import (
 | 
			
		||||
    asynccontextmanager as acm,
 | 
			
		||||
)
 | 
			
		||||
from typing import TYPE_CHECKING
 | 
			
		||||
 | 
			
		||||
import trio
 | 
			
		||||
from tractor.log import get_logger
 | 
			
		||||
 | 
			
		||||
log = get_logger(__name__)
 | 
			
		||||
 | 
			
		||||
 | 
			
		||||
if TYPE_CHECKING:
 | 
			
		||||
    from tractor.devx.debug import BoxedMaybeException
 | 
			
		||||
 | 
			
		||||
 | 
			
		||||
def find_masked_excs(
 | 
			
		||||
    maybe_masker: BaseException,
 | 
			
		||||
    unmask_from: set[BaseException],
 | 
			
		||||
) -> BaseException|None:
 | 
			
		||||
    ''''
 | 
			
		||||
    Deliver any `maybe_masker.__context__` provided
 | 
			
		||||
    it a declared masking exc-type entry in `unmask_from`.
 | 
			
		||||
 | 
			
		||||
    '''
 | 
			
		||||
    if (
 | 
			
		||||
        type(maybe_masker) in unmask_from
 | 
			
		||||
        and
 | 
			
		||||
        (exc_ctx := maybe_masker.__context__)
 | 
			
		||||
 | 
			
		||||
        # TODO? what about any cases where
 | 
			
		||||
        # they could be the same type but not same instance?
 | 
			
		||||
        # |_i.e. a cancel masking a cancel ??
 | 
			
		||||
        # or (
 | 
			
		||||
        #     exc_ctx is not maybe_masker
 | 
			
		||||
        # )
 | 
			
		||||
    ):
 | 
			
		||||
        return exc_ctx
 | 
			
		||||
 | 
			
		||||
    return None
 | 
			
		||||
 | 
			
		||||
 | 
			
		||||
# XXX, relevant discussion @ `trio`-core,
 | 
			
		||||
# https://github.com/python-trio/trio/issues/455
 | 
			
		||||
#
 | 
			
		||||
@acm
 | 
			
		||||
async def maybe_raise_from_masking_exc(
 | 
			
		||||
    tn: trio.Nursery|None = None,
 | 
			
		||||
    unmask_from: (
 | 
			
		||||
        BaseException|
 | 
			
		||||
        tuple[BaseException]
 | 
			
		||||
    ) = (trio.Cancelled,),
 | 
			
		||||
 | 
			
		||||
    raise_unmasked: bool = True,
 | 
			
		||||
    extra_note: str = (
 | 
			
		||||
        'This can occurr when,\n'
 | 
			
		||||
        ' - a `trio.Nursery` scope embeds a `finally:`-block '
 | 
			
		||||
        'which executes a checkpoint!'
 | 
			
		||||
        #
 | 
			
		||||
        # ^TODO? other cases?
 | 
			
		||||
    ),
 | 
			
		||||
 | 
			
		||||
    always_warn_on: tuple[BaseException] = (
 | 
			
		||||
        trio.Cancelled,
 | 
			
		||||
    ),
 | 
			
		||||
    # ^XXX, special case(s) where we warn-log bc likely
 | 
			
		||||
    # there will be no operational diff since the exc
 | 
			
		||||
    # is always expected to be consumed.
 | 
			
		||||
) -> BoxedMaybeException:
 | 
			
		||||
    '''
 | 
			
		||||
    Maybe un-mask and re-raise exception(s) suppressed by a known
 | 
			
		||||
    error-used-as-signal type (cough namely `trio.Cancelled`).
 | 
			
		||||
 | 
			
		||||
    Though this unmasker targets cancelleds, it can be used more
 | 
			
		||||
    generally to capture and unwrap masked excs detected as
 | 
			
		||||
    `.__context__` values which were suppressed by any error type
 | 
			
		||||
    passed in `unmask_from`.
 | 
			
		||||
 | 
			
		||||
    -------------
 | 
			
		||||
    STILL-TODO ??
 | 
			
		||||
    -------------
 | 
			
		||||
    -[ ] support for egs which have multiple masked entries in
 | 
			
		||||
        `maybe_eg.exceptions`, in which case we should unmask the
 | 
			
		||||
        individual sub-excs but maintain the eg-parent's form right?
 | 
			
		||||
 | 
			
		||||
    '''
 | 
			
		||||
    from tractor.devx.debug import (
 | 
			
		||||
        BoxedMaybeException,
 | 
			
		||||
        pause,
 | 
			
		||||
    )
 | 
			
		||||
    boxed_maybe_exc = BoxedMaybeException(
 | 
			
		||||
        raise_on_exit=raise_unmasked,
 | 
			
		||||
    )
 | 
			
		||||
    matching: list[BaseException]|None = None
 | 
			
		||||
    maybe_eg: ExceptionGroup|None
 | 
			
		||||
 | 
			
		||||
    if tn:
 | 
			
		||||
        try:  # handle egs
 | 
			
		||||
            yield boxed_maybe_exc
 | 
			
		||||
            return
 | 
			
		||||
        except* unmask_from as _maybe_eg:
 | 
			
		||||
            maybe_eg = _maybe_eg
 | 
			
		||||
            matches: ExceptionGroup
 | 
			
		||||
            matches, _ = maybe_eg.split(
 | 
			
		||||
                unmask_from
 | 
			
		||||
            )
 | 
			
		||||
            if not matches:
 | 
			
		||||
                raise
 | 
			
		||||
 | 
			
		||||
            matching: list[BaseException] = matches.exceptions
 | 
			
		||||
    else:
 | 
			
		||||
        try:  # handle non-egs
 | 
			
		||||
            yield boxed_maybe_exc
 | 
			
		||||
            return
 | 
			
		||||
        except unmask_from as _maybe_exc:
 | 
			
		||||
            maybe_exc = _maybe_exc
 | 
			
		||||
            matching: list[BaseException] = [
 | 
			
		||||
                maybe_exc
 | 
			
		||||
            ]
 | 
			
		||||
 | 
			
		||||
        # XXX, only unmask-ed for debuggin!
 | 
			
		||||
        # TODO, remove eventually..
 | 
			
		||||
        except BaseException as _berr:
 | 
			
		||||
            berr = _berr
 | 
			
		||||
            await pause(shield=True)
 | 
			
		||||
            raise berr
 | 
			
		||||
 | 
			
		||||
    if matching is None:
 | 
			
		||||
        raise
 | 
			
		||||
 | 
			
		||||
    masked: list[tuple[BaseException, BaseException]] = []
 | 
			
		||||
    for exc_match in matching:
 | 
			
		||||
 | 
			
		||||
        if exc_ctx := find_masked_excs(
 | 
			
		||||
            maybe_masker=exc_match,
 | 
			
		||||
            unmask_from={unmask_from},
 | 
			
		||||
        ):
 | 
			
		||||
            masked.append((exc_ctx, exc_match))
 | 
			
		||||
            boxed_maybe_exc.value = exc_match
 | 
			
		||||
            note: str = (
 | 
			
		||||
                f'\n'
 | 
			
		||||
                f'^^WARNING^^ the above {exc_ctx!r} was masked by a {unmask_from!r}\n'
 | 
			
		||||
            )
 | 
			
		||||
            if extra_note:
 | 
			
		||||
                note += (
 | 
			
		||||
                    f'\n'
 | 
			
		||||
                    f'{extra_note}\n'
 | 
			
		||||
                )
 | 
			
		||||
            exc_ctx.add_note(note)
 | 
			
		||||
 | 
			
		||||
            if type(exc_match) in always_warn_on:
 | 
			
		||||
                log.warning(note)
 | 
			
		||||
 | 
			
		||||
            # await tractor.pause(shield=True)
 | 
			
		||||
            if raise_unmasked:
 | 
			
		||||
 | 
			
		||||
                if len(masked) < 2:
 | 
			
		||||
                    raise exc_ctx from exc_match
 | 
			
		||||
                else:
 | 
			
		||||
                    # ?TODO, see above but, possibly unmasking sub-exc
 | 
			
		||||
                    # entries if there are > 1
 | 
			
		||||
                    await pause(shield=True)
 | 
			
		||||
    else:
 | 
			
		||||
        raise
 | 
			
		||||
		Loading…
	
		Reference in New Issue