Compare commits
	
		
			10 Commits 
		
	
	
		
			69bba30557
			...
			23240c31e3
		
	
	| Author | SHA1 | Date | 
|---|---|---|
| 
							
							
								 | 
						23240c31e3 | |
| 
							
							
								 | 
						6a82bab627 | |
| 
							
							
								 | 
						b485297411 | |
| 
							
							
								 | 
						dd23ef1d95 | |
| 
							
							
								 | 
						2ec3ff46cd | |
| 
							
							
								 | 
						967d0e4836 | |
| 
							
							
								 | 
						5ccb36af57 | |
| 
							
							
								 | 
						28f8546ac5 | |
| 
							
							
								 | 
						0ff0971aca | |
| 
							
							
								 | 
						dc1091016b | 
| 
						 | 
				
			
			@ -284,20 +284,32 @@ async def test_cancel_infinite_streamer(start_method):
 | 
			
		|||
    ],
 | 
			
		||||
)
 | 
			
		||||
@tractor_test
 | 
			
		||||
async def test_some_cancels_all(num_actors_and_errs, start_method, loglevel):
 | 
			
		||||
    """Verify a subset of failed subactors causes all others in
 | 
			
		||||
async def test_some_cancels_all(
 | 
			
		||||
    num_actors_and_errs: tuple,
 | 
			
		||||
    start_method: str,
 | 
			
		||||
    loglevel: str,
 | 
			
		||||
):
 | 
			
		||||
    '''
 | 
			
		||||
    Verify a subset of failed subactors causes all others in
 | 
			
		||||
    the nursery to be cancelled just like the strategy in trio.
 | 
			
		||||
 | 
			
		||||
    This is the first and only supervisory strategy at the moment.
 | 
			
		||||
    """
 | 
			
		||||
    num_actors, first_err, err_type, ria_func, da_func = num_actors_and_errs
 | 
			
		||||
 | 
			
		||||
    '''
 | 
			
		||||
    (
 | 
			
		||||
        num_actors,
 | 
			
		||||
        first_err,
 | 
			
		||||
        err_type,
 | 
			
		||||
        ria_func,
 | 
			
		||||
        da_func,
 | 
			
		||||
    ) = num_actors_and_errs
 | 
			
		||||
    try:
 | 
			
		||||
        async with tractor.open_nursery() as n:
 | 
			
		||||
        async with tractor.open_nursery() as an:
 | 
			
		||||
 | 
			
		||||
            # spawn the same number of deamon actors which should be cancelled
 | 
			
		||||
            dactor_portals = []
 | 
			
		||||
            for i in range(num_actors):
 | 
			
		||||
                dactor_portals.append(await n.start_actor(
 | 
			
		||||
                dactor_portals.append(await an.start_actor(
 | 
			
		||||
                    f'deamon_{i}',
 | 
			
		||||
                    enable_modules=[__name__],
 | 
			
		||||
                ))
 | 
			
		||||
| 
						 | 
				
			
			@ -307,7 +319,7 @@ async def test_some_cancels_all(num_actors_and_errs, start_method, loglevel):
 | 
			
		|||
            for i in range(num_actors):
 | 
			
		||||
                # start actor(s) that will fail immediately
 | 
			
		||||
                riactor_portals.append(
 | 
			
		||||
                    await n.run_in_actor(
 | 
			
		||||
                    await an.run_in_actor(
 | 
			
		||||
                        func,
 | 
			
		||||
                        name=f'actor_{i}',
 | 
			
		||||
                        **kwargs
 | 
			
		||||
| 
						 | 
				
			
			@ -337,7 +349,8 @@ async def test_some_cancels_all(num_actors_and_errs, start_method, loglevel):
 | 
			
		|||
 | 
			
		||||
        # should error here with a ``RemoteActorError`` or ``MultiError``
 | 
			
		||||
 | 
			
		||||
    except first_err as err:
 | 
			
		||||
    except first_err as _err:
 | 
			
		||||
        err = _err
 | 
			
		||||
        if isinstance(err, BaseExceptionGroup):
 | 
			
		||||
            assert len(err.exceptions) == num_actors
 | 
			
		||||
            for exc in err.exceptions:
 | 
			
		||||
| 
						 | 
				
			
			@ -348,8 +361,8 @@ async def test_some_cancels_all(num_actors_and_errs, start_method, loglevel):
 | 
			
		|||
        elif isinstance(err, tractor.RemoteActorError):
 | 
			
		||||
            assert err.boxed_type == err_type
 | 
			
		||||
 | 
			
		||||
        assert n.cancelled is True
 | 
			
		||||
        assert not n._children
 | 
			
		||||
        assert an.cancelled is True
 | 
			
		||||
        assert not an._children
 | 
			
		||||
    else:
 | 
			
		||||
        pytest.fail("Should have gotten a remote assertion error?")
 | 
			
		||||
 | 
			
		||||
| 
						 | 
				
			
			@ -559,8 +572,10 @@ def test_cancel_while_childs_child_in_sync_sleep(
 | 
			
		|||
 | 
			
		||||
    async def main():
 | 
			
		||||
        with trio.fail_after(2):
 | 
			
		||||
            async with tractor.open_nursery() as tn:
 | 
			
		||||
                await tn.run_in_actor(
 | 
			
		||||
            async with (
 | 
			
		||||
                tractor.open_nursery() as an
 | 
			
		||||
            ):
 | 
			
		||||
                await an.run_in_actor(
 | 
			
		||||
                    spawn,
 | 
			
		||||
                    name='spawn',
 | 
			
		||||
                )
 | 
			
		||||
| 
						 | 
				
			
			
 | 
			
		|||
| 
						 | 
				
			
			@ -147,8 +147,7 @@ def test_trio_prestarted_task_bubbles(
 | 
			
		|||
        await trio.sleep_forever()
 | 
			
		||||
 | 
			
		||||
    async def _trio_main():
 | 
			
		||||
        # with trio.fail_after(2):
 | 
			
		||||
        with trio.fail_after(999):
 | 
			
		||||
        with trio.fail_after(2 if not debug_mode else 999):
 | 
			
		||||
            first: str
 | 
			
		||||
            chan: to_asyncio.LinkedTaskChannel
 | 
			
		||||
            aio_ev = asyncio.Event()
 | 
			
		||||
| 
						 | 
				
			
			@ -217,32 +216,25 @@ def test_trio_prestarted_task_bubbles(
 | 
			
		|||
                        ):
 | 
			
		||||
                            aio_ev.set()
 | 
			
		||||
 | 
			
		||||
    with pytest.raises(
 | 
			
		||||
        expected_exception=ExceptionGroup,
 | 
			
		||||
    ) as excinfo:
 | 
			
		||||
        tractor.to_asyncio.run_as_asyncio_guest(
 | 
			
		||||
            trio_main=_trio_main,
 | 
			
		||||
        )
 | 
			
		||||
 | 
			
		||||
    eg = excinfo.value
 | 
			
		||||
    rte_eg, rest_eg = eg.split(RuntimeError)
 | 
			
		||||
 | 
			
		||||
    # ensure the trio-task's error bubbled despite the aio-side
 | 
			
		||||
    # having (maybe) errored first.
 | 
			
		||||
    if aio_err_trigger in (
 | 
			
		||||
        'after_trio_task_starts',
 | 
			
		||||
        'after_start_point',
 | 
			
		||||
    ):
 | 
			
		||||
        assert len(errs := rest_eg.exceptions) == 1
 | 
			
		||||
        typerr = errs[0]
 | 
			
		||||
        assert (
 | 
			
		||||
            type(typerr) is TypeError
 | 
			
		||||
            and
 | 
			
		||||
            'trio-side' in typerr.args
 | 
			
		||||
        )
 | 
			
		||||
        patt: str = 'trio-side'
 | 
			
		||||
        expect_exc = TypeError
 | 
			
		||||
 | 
			
		||||
    # when aio errors BEFORE (last) trio task is scheduled, we should
 | 
			
		||||
    # never see anythinb but the aio-side.
 | 
			
		||||
    else:
 | 
			
		||||
        assert len(rtes := rte_eg.exceptions) == 1
 | 
			
		||||
        assert 'asyncio-side' in rtes[0].args[0]
 | 
			
		||||
        patt: str = 'asyncio-side'
 | 
			
		||||
        expect_exc = RuntimeError
 | 
			
		||||
 | 
			
		||||
    with pytest.raises(expect_exc) as excinfo:
 | 
			
		||||
        tractor.to_asyncio.run_as_asyncio_guest(
 | 
			
		||||
            trio_main=_trio_main,
 | 
			
		||||
        )
 | 
			
		||||
 | 
			
		||||
    caught_exc = excinfo.value
 | 
			
		||||
    assert patt in caught_exc.args
 | 
			
		||||
| 
						 | 
				
			
			
 | 
			
		|||
| 
						 | 
				
			
			@ -1011,7 +1011,6 @@ class Context:
 | 
			
		|||
                else:
 | 
			
		||||
                    log.cancel(
 | 
			
		||||
                        f'Timed out on cancel request of remote task?\n'
 | 
			
		||||
                        f'\n'
 | 
			
		||||
                        f'{reminfo}'
 | 
			
		||||
                    )
 | 
			
		||||
 | 
			
		||||
| 
						 | 
				
			
			@ -1492,6 +1491,12 @@ class Context:
 | 
			
		|||
                ):
 | 
			
		||||
                    status = 'peer-cancelled'
 | 
			
		||||
 | 
			
		||||
            case (
 | 
			
		||||
                Unresolved,
 | 
			
		||||
                trio.Cancelled(),  # any error-type
 | 
			
		||||
            ) if self.canceller:
 | 
			
		||||
                status = 'actor-cancelled'
 | 
			
		||||
 | 
			
		||||
            # (remote) error condition
 | 
			
		||||
            case (
 | 
			
		||||
                Unresolved,
 | 
			
		||||
| 
						 | 
				
			
			@ -2273,7 +2278,7 @@ async def open_context_from_portal(
 | 
			
		|||
                logmeth = log.exception
 | 
			
		||||
 | 
			
		||||
        logmeth(
 | 
			
		||||
            f'ctx {ctx.side!r}-side exited with {ctx.repr_outcome()}\n'
 | 
			
		||||
            f'ctx {ctx.side!r}-side exited with {ctx.repr_outcome()!r}\n'
 | 
			
		||||
        )
 | 
			
		||||
 | 
			
		||||
        if debug_mode():
 | 
			
		||||
| 
						 | 
				
			
			
 | 
			
		|||
| 
						 | 
				
			
			@ -661,7 +661,7 @@ async def _invoke(
 | 
			
		|||
            tn: Nursery
 | 
			
		||||
            rpc_ctx_cs: CancelScope
 | 
			
		||||
            async with (
 | 
			
		||||
                collapse_eg(),
 | 
			
		||||
                collapse_eg(hide_tb=False),
 | 
			
		||||
                trio.open_nursery() as tn,
 | 
			
		||||
                msgops.maybe_limit_plds(
 | 
			
		||||
                    ctx=ctx,
 | 
			
		||||
| 
						 | 
				
			
			@ -854,24 +854,44 @@ async def _invoke(
 | 
			
		|||
                f'after having {ctx.repr_state!r}\n'
 | 
			
		||||
            )
 | 
			
		||||
            if merr:
 | 
			
		||||
 | 
			
		||||
                logmeth: Callable = log.error
 | 
			
		||||
                if isinstance(merr, ContextCancelled):
 | 
			
		||||
                    logmeth: Callable = log.runtime
 | 
			
		||||
                if (
 | 
			
		||||
                    # ctxc: by `Context.cancel()`
 | 
			
		||||
                    isinstance(merr, ContextCancelled)
 | 
			
		||||
 | 
			
		||||
                if not isinstance(merr, RemoteActorError):
 | 
			
		||||
                    tb_str: str = ''.join(traceback.format_exception(merr))
 | 
			
		||||
                    # out-of-layer cancellation, one of:
 | 
			
		||||
                    # - actorc: by `Portal.cancel_actor()`
 | 
			
		||||
                    # - OSc: by SIGINT or `Process.signal()`
 | 
			
		||||
                    or (
 | 
			
		||||
                        isinstance(merr, trio.Cancelled)
 | 
			
		||||
                        and
 | 
			
		||||
                        ctx.canceller
 | 
			
		||||
                    )
 | 
			
		||||
                ):
 | 
			
		||||
                    logmeth: Callable = log.cancel
 | 
			
		||||
                    descr_str += (
 | 
			
		||||
                        f' with {merr!r}\n'
 | 
			
		||||
                    )
 | 
			
		||||
 | 
			
		||||
                elif (
 | 
			
		||||
                    not isinstance(merr, RemoteActorError)
 | 
			
		||||
                ):
 | 
			
		||||
                    tb_str: str = ''.join(
 | 
			
		||||
                        traceback.format_exception(merr)
 | 
			
		||||
                    )
 | 
			
		||||
                    descr_str += (
 | 
			
		||||
                        f'\n{merr!r}\n'  # needed?
 | 
			
		||||
                        f'{tb_str}\n'
 | 
			
		||||
                        f'\n'
 | 
			
		||||
                        f'scope_error:\n'
 | 
			
		||||
                        f'{scope_err!r}\n'
 | 
			
		||||
                    )
 | 
			
		||||
                else:
 | 
			
		||||
                    descr_str += f'\n{merr!r}\n'
 | 
			
		||||
                    descr_str += (
 | 
			
		||||
                        f'{merr!r}\n'
 | 
			
		||||
                    )
 | 
			
		||||
            else:
 | 
			
		||||
                descr_str += f'\nwith final result {ctx.outcome!r}\n'
 | 
			
		||||
                descr_str += (
 | 
			
		||||
                    f'\n'
 | 
			
		||||
                    f'with final result {ctx.outcome!r}\n'
 | 
			
		||||
                )
 | 
			
		||||
 | 
			
		||||
            logmeth(
 | 
			
		||||
                f'{message}\n'
 | 
			
		||||
| 
						 | 
				
			
			
 | 
			
		|||
| 
						 | 
				
			
			@ -236,10 +236,6 @@ async def hard_kill(
 | 
			
		|||
    # whilst also hacking on it XD
 | 
			
		||||
    # terminate_after: int = 99999,
 | 
			
		||||
 | 
			
		||||
    # NOTE: for mucking with `.pause()`-ing inside the runtime
 | 
			
		||||
    # whilst also hacking on it XD
 | 
			
		||||
    # terminate_after: int = 99999,
 | 
			
		||||
 | 
			
		||||
) -> None:
 | 
			
		||||
    '''
 | 
			
		||||
    Un-gracefully terminate an OS level `trio.Process` after timeout.
 | 
			
		||||
| 
						 | 
				
			
			
 | 
			
		|||
| 
						 | 
				
			
			@ -237,9 +237,9 @@ def enable_stack_on_sig(
 | 
			
		|||
    try:
 | 
			
		||||
        import stackscope
 | 
			
		||||
    except ImportError:
 | 
			
		||||
        log.error(
 | 
			
		||||
            '`stackscope` not installed for use in debug mode!\n'
 | 
			
		||||
            '`Ignoring {enable_stack_on_sig!r} call!\n'
 | 
			
		||||
        log.warning(
 | 
			
		||||
            'The `stackscope` lib is not installed!\n'
 | 
			
		||||
            '`Ignoring enable_stack_on_sig() call!\n'
 | 
			
		||||
        )
 | 
			
		||||
        return None
 | 
			
		||||
 | 
			
		||||
| 
						 | 
				
			
			
 | 
			
		|||
| 
						 | 
				
			
			@ -250,7 +250,7 @@ async def _maybe_enter_pm(
 | 
			
		|||
    *,
 | 
			
		||||
    tb: TracebackType|None = None,
 | 
			
		||||
    api_frame: FrameType|None = None,
 | 
			
		||||
    hide_tb: bool = False,
 | 
			
		||||
    hide_tb: bool = True,
 | 
			
		||||
 | 
			
		||||
    # only enter debugger REPL when returns `True`
 | 
			
		||||
    debug_filter: Callable[
 | 
			
		||||
| 
						 | 
				
			
			
 | 
			
		|||
| 
						 | 
				
			
			@ -58,6 +58,7 @@ from tractor._context import Context
 | 
			
		|||
from tractor import _state
 | 
			
		||||
from tractor._exceptions import (
 | 
			
		||||
    NoRuntime,
 | 
			
		||||
    InternalError,
 | 
			
		||||
)
 | 
			
		||||
from tractor._state import (
 | 
			
		||||
    current_actor,
 | 
			
		||||
| 
						 | 
				
			
			@ -79,6 +80,9 @@ from ._sigint import (
 | 
			
		|||
    sigint_shield as sigint_shield,
 | 
			
		||||
    _ctlc_ignore_header as _ctlc_ignore_header
 | 
			
		||||
)
 | 
			
		||||
from ..pformat import (
 | 
			
		||||
    ppfmt,
 | 
			
		||||
)
 | 
			
		||||
 | 
			
		||||
if TYPE_CHECKING:
 | 
			
		||||
    from trio.lowlevel import Task
 | 
			
		||||
| 
						 | 
				
			
			@ -1153,9 +1157,10 @@ def pause_from_sync(
 | 
			
		|||
                        'use_greenback',
 | 
			
		||||
                        False,
 | 
			
		||||
                ):
 | 
			
		||||
                    raise RuntimeError(
 | 
			
		||||
                        '`greenback` was never initialized in this actor!?\n\n'
 | 
			
		||||
                        f'{_state._runtime_vars}\n'
 | 
			
		||||
                    raise InternalError(
 | 
			
		||||
                        f'`greenback` was never initialized in this actor?\n'
 | 
			
		||||
                        f'\n'
 | 
			
		||||
                        f'{ppfmt(_state._runtime_vars)}\n'
 | 
			
		||||
                    ) from rte
 | 
			
		||||
 | 
			
		||||
                raise
 | 
			
		||||
| 
						 | 
				
			
			
 | 
			
		|||
| 
						 | 
				
			
			@ -430,20 +430,24 @@ class MsgpackTransport(MsgTransport):
 | 
			
		|||
                return await self.stream.send_all(size + bytes_data)
 | 
			
		||||
            except (
 | 
			
		||||
                trio.BrokenResourceError,
 | 
			
		||||
            ) as bre:
 | 
			
		||||
                trans_err = bre
 | 
			
		||||
            ) as _re:
 | 
			
		||||
                trans_err = _re
 | 
			
		||||
                tpt_name: str = f'{type(self).__name__!r}'
 | 
			
		||||
 | 
			
		||||
                match trans_err:
 | 
			
		||||
 | 
			
		||||
                    # XXX, specifc to UDS transport and its,
 | 
			
		||||
                    # well, "speediness".. XD
 | 
			
		||||
                    # |_ likely todo with races related to how fast
 | 
			
		||||
                    #    the socket is setup/torn-down on linux
 | 
			
		||||
                    #    as it pertains to rando pings from the
 | 
			
		||||
                    #    `.discovery` subsys and protos.
 | 
			
		||||
                    case trio.BrokenResourceError() if (
 | 
			
		||||
                        '[Errno 32] Broken pipe' in trans_err.args[0]
 | 
			
		||||
                        # ^XXX, specifc to UDS transport and its,
 | 
			
		||||
                        # well, "speediness".. XD
 | 
			
		||||
                        # |_ likely todo with races related to how fast
 | 
			
		||||
                        #    the socket is setup/torn-down on linux
 | 
			
		||||
                        #    as it pertains to rando pings from the
 | 
			
		||||
                        #    `.discovery` subsys and protos.
 | 
			
		||||
                        '[Errno 32] Broken pipe'
 | 
			
		||||
                        in
 | 
			
		||||
                        trans_err.args[0]
 | 
			
		||||
                    ):
 | 
			
		||||
                        raise TransportClosed.from_src_exc(
 | 
			
		||||
                        tpt_closed = TransportClosed.from_src_exc(
 | 
			
		||||
                            message=(
 | 
			
		||||
                                f'{tpt_name} already closed by peer\n'
 | 
			
		||||
                            ),
 | 
			
		||||
| 
						 | 
				
			
			@ -451,14 +455,15 @@ class MsgpackTransport(MsgTransport):
 | 
			
		|||
                            src_exc=trans_err,
 | 
			
		||||
                            raise_on_report=True,
 | 
			
		||||
                            loglevel='transport',
 | 
			
		||||
                        ) from bre
 | 
			
		||||
                        )
 | 
			
		||||
                        raise tpt_closed from trans_err
 | 
			
		||||
 | 
			
		||||
                    # unless the disconnect condition falls under "a
 | 
			
		||||
                    # normal operation breakage" we usualy console warn
 | 
			
		||||
                    # about it.
 | 
			
		||||
                    case _:
 | 
			
		||||
                        log.exception(
 | 
			
		||||
                            '{tpt_name} layer failed pre-send ??\n'
 | 
			
		||||
                            f'{tpt_name} layer failed pre-send ??\n'
 | 
			
		||||
                        )
 | 
			
		||||
                        raise trans_err
 | 
			
		||||
 | 
			
		||||
| 
						 | 
				
			
			@ -503,7 +508,7 @@ class MsgpackTransport(MsgTransport):
 | 
			
		|||
    def pformat(self) -> str:
 | 
			
		||||
        return (
 | 
			
		||||
            f'<{type(self).__name__}(\n'
 | 
			
		||||
            f' |_peers: 2\n'
 | 
			
		||||
            f' |_peers: 1\n'
 | 
			
		||||
            f'   laddr: {self._laddr}\n'
 | 
			
		||||
            f'   raddr: {self._raddr}\n'
 | 
			
		||||
            # f'\n'
 | 
			
		||||
| 
						 | 
				
			
			
 | 
			
		|||
| 
						 | 
				
			
			@ -223,16 +223,18 @@ class _Cache:
 | 
			
		|||
        task_status: trio.TaskStatus[T] = trio.TASK_STATUS_IGNORED,
 | 
			
		||||
 | 
			
		||||
    ) -> None:
 | 
			
		||||
        async with mng as value:
 | 
			
		||||
            _, no_more_users = cls.resources[ctx_key]
 | 
			
		||||
            cls.values[ctx_key] = value
 | 
			
		||||
            task_status.started(value)
 | 
			
		||||
            try:
 | 
			
		||||
                await no_more_users.wait()
 | 
			
		||||
            finally:
 | 
			
		||||
                # discard nursery ref so it won't be re-used (an error)?
 | 
			
		||||
                value = cls.values.pop(ctx_key)
 | 
			
		||||
                cls.resources.pop(ctx_key)
 | 
			
		||||
        try:
 | 
			
		||||
            async with mng as value:
 | 
			
		||||
                _, no_more_users = cls.resources[ctx_key]
 | 
			
		||||
                try:
 | 
			
		||||
                    cls.values[ctx_key] = value
 | 
			
		||||
                    task_status.started(value)
 | 
			
		||||
                    await no_more_users.wait()
 | 
			
		||||
                finally:
 | 
			
		||||
                    value = cls.values.pop(ctx_key)
 | 
			
		||||
        finally:
 | 
			
		||||
            # discard nursery ref so it won't be re-used (an error)?
 | 
			
		||||
            cls.resources.pop(ctx_key)
 | 
			
		||||
 | 
			
		||||
 | 
			
		||||
@acm
 | 
			
		||||
| 
						 | 
				
			
			
 | 
			
		|||
		Loading…
	
		Reference in New Issue