Compare commits
	
		
			23 Commits 
		
	
	
		
			b32121f540
			...
			0cbf02bf2e
		
	
	| Author | SHA1 | Date | 
|---|---|---|
| 
							
							
								 | 
						0cbf02bf2e | |
| 
							
							
								 | 
						4a0b78d447 | |
| 
							
							
								 | 
						718d0887c9 | |
| 
							
							
								 | 
						40a40f613a | |
| 
							
							
								 | 
						c692b30e7f | |
| 
							
							
								 | 
						c91bf6dcc2 | |
| 
							
							
								 | 
						5afa9407c9 | |
| 
							
							
								 | 
						9b1dd7b279 | |
| 
							
							
								 | 
						910257bb46 | |
| 
							
							
								 | 
						ca37d8ed91 | |
| 
							
							
								 | 
						072cdd0c66 | |
| 
							
							
								 | 
						30302b051d | |
| 
							
							
								 | 
						bb04e55d5f | |
| 
							
							
								 | 
						226d06dbfa | |
| 
							
							
								 | 
						1b502736d6 | |
| 
							
							
								 | 
						51992ef546 | |
| 
							
							
								 | 
						6da470918a | |
| 
							
							
								 | 
						93d97c3ff9 | |
| 
							
							
								 | 
						abaea4de8e | |
| 
							
							
								 | 
						4b3ba35cd6 | |
| 
							
							
								 | 
						70a508e9d7 | |
| 
							
							
								 | 
						59822ff093 | |
| 
							
							
								 | 
						ca427aec7e | 
| 
						 | 
				
			
			@ -0,0 +1,145 @@
 | 
			
		|||
from contextlib import (
 | 
			
		||||
    contextmanager as cm,
 | 
			
		||||
    # TODO, any diff in async case(s)??
 | 
			
		||||
    # asynccontextmanager as acm,
 | 
			
		||||
)
 | 
			
		||||
from functools import partial
 | 
			
		||||
 | 
			
		||||
import tractor
 | 
			
		||||
import trio
 | 
			
		||||
 | 
			
		||||
 | 
			
		||||
log = tractor.log.get_logger(__name__)
 | 
			
		||||
tractor.log.get_console_log('info')
 | 
			
		||||
 | 
			
		||||
@cm
 | 
			
		||||
def teardown_on_exc(
 | 
			
		||||
    raise_from_handler: bool = False,
 | 
			
		||||
):
 | 
			
		||||
    '''
 | 
			
		||||
    You could also have a teardown handler which catches any exc and
 | 
			
		||||
    does some required teardown. In this case the problem is
 | 
			
		||||
    compounded UNLESS you ensure the handler's scope is OUTSIDE the
 | 
			
		||||
    `ux.aclose()`.. that is in the caller's enclosing scope.
 | 
			
		||||
 | 
			
		||||
    '''
 | 
			
		||||
    try:
 | 
			
		||||
        yield
 | 
			
		||||
    except BaseException as _berr:
 | 
			
		||||
        berr = _berr
 | 
			
		||||
        log.exception(
 | 
			
		||||
            f'Handling termination teardown in child due to,\n'
 | 
			
		||||
            f'{berr!r}\n'
 | 
			
		||||
        )
 | 
			
		||||
        if raise_from_handler:
 | 
			
		||||
            # XXX teardown ops XXX
 | 
			
		||||
            # on termination these steps say need to be run to
 | 
			
		||||
            # ensure wider system consistency (like the state of
 | 
			
		||||
            # remote connections/services).
 | 
			
		||||
            #
 | 
			
		||||
            # HOWEVER, any bug in this teardown code is also
 | 
			
		||||
            # masked by the `tx.aclose()`!
 | 
			
		||||
            # this is also true if `_tn.cancel_scope` is
 | 
			
		||||
            # `.cancel_called` by the parent in a graceful
 | 
			
		||||
            # request case..
 | 
			
		||||
 | 
			
		||||
            # simulate a bug in teardown handler.
 | 
			
		||||
            raise RuntimeError(
 | 
			
		||||
                'woopsie teardown bug!'
 | 
			
		||||
            )
 | 
			
		||||
 | 
			
		||||
        raise  # no teardown bug.
 | 
			
		||||
 | 
			
		||||
 | 
			
		||||
async def finite_stream_to_rent(
 | 
			
		||||
    tx: trio.abc.SendChannel,
 | 
			
		||||
    child_errors_mid_stream: bool,
 | 
			
		||||
 | 
			
		||||
    task_status: trio.TaskStatus[
 | 
			
		||||
        trio.CancelScope,
 | 
			
		||||
    ] = trio.TASK_STATUS_IGNORED,
 | 
			
		||||
):
 | 
			
		||||
    async with (
 | 
			
		||||
        # XXX without this unmasker the mid-streaming RTE is never
 | 
			
		||||
        # reported since it is masked by the `tx.aclose()`
 | 
			
		||||
        # call which in turn raises `Cancelled`!
 | 
			
		||||
        #
 | 
			
		||||
        # NOTE, this is WITHOUT doing any exception handling
 | 
			
		||||
        # inside the child  task!
 | 
			
		||||
        #
 | 
			
		||||
        # TODO, uncomment next LoC to see the supprsessed beg[RTE]!
 | 
			
		||||
        # tractor.trionics.maybe_raise_from_masking_exc(),
 | 
			
		||||
 | 
			
		||||
        tx as tx,  # .aclose() is the guilty masker chkpt!
 | 
			
		||||
        trio.open_nursery() as _tn,
 | 
			
		||||
    ):
 | 
			
		||||
        # pass our scope back to parent for supervision\
 | 
			
		||||
        # control.
 | 
			
		||||
        task_status.started(_tn.cancel_scope)
 | 
			
		||||
 | 
			
		||||
        with teardown_on_exc(
 | 
			
		||||
            raise_from_handler=not child_errors_mid_stream,
 | 
			
		||||
        ):
 | 
			
		||||
            for i in range(100):
 | 
			
		||||
                log.info(
 | 
			
		||||
                    f'Child tx {i!r}\n'
 | 
			
		||||
                )
 | 
			
		||||
                if (
 | 
			
		||||
                    child_errors_mid_stream
 | 
			
		||||
                    and
 | 
			
		||||
                    i == 66
 | 
			
		||||
                ):
 | 
			
		||||
                    # oh wait but WOOPS there's a bug
 | 
			
		||||
                    # in that teardown code!?
 | 
			
		||||
                    raise RuntimeError(
 | 
			
		||||
                        'woopsie, a mid-streaming bug!?'
 | 
			
		||||
                    )
 | 
			
		||||
 | 
			
		||||
                await tx.send(i)
 | 
			
		||||
 | 
			
		||||
 | 
			
		||||
async def main(
 | 
			
		||||
    # TODO! toggle this for the 2 cases!
 | 
			
		||||
    # 1. child errors mid-stream while parent is also requesting
 | 
			
		||||
    #   (graceful) cancel of that child streamer.
 | 
			
		||||
    #
 | 
			
		||||
    # 2. child contains a teardown handler which contains a
 | 
			
		||||
    #   bug and raises.
 | 
			
		||||
    #
 | 
			
		||||
    child_errors_mid_stream: bool,
 | 
			
		||||
):
 | 
			
		||||
    tx, rx = trio.open_memory_channel(1)
 | 
			
		||||
 | 
			
		||||
    async with (
 | 
			
		||||
        trio.open_nursery() as tn,
 | 
			
		||||
        rx as rx,
 | 
			
		||||
    ):
 | 
			
		||||
 | 
			
		||||
        _child_cs = await tn.start(
 | 
			
		||||
            partial(
 | 
			
		||||
                finite_stream_to_rent,
 | 
			
		||||
                child_errors_mid_stream=child_errors_mid_stream,
 | 
			
		||||
                tx=tx,
 | 
			
		||||
            )
 | 
			
		||||
        )
 | 
			
		||||
        async for msg in rx:
 | 
			
		||||
            log.info(
 | 
			
		||||
                f'Rent rx {msg!r}\n'
 | 
			
		||||
            )
 | 
			
		||||
 | 
			
		||||
            # simulate some external cancellation
 | 
			
		||||
            # request **JUST BEFORE** the child errors.
 | 
			
		||||
            if msg == 65:
 | 
			
		||||
                log.cancel(
 | 
			
		||||
                    f'Cancelling parent on,\n'
 | 
			
		||||
                    f'msg={msg}\n'
 | 
			
		||||
                    f'\n'
 | 
			
		||||
                    f'Simulates OOB cancel request!\n'
 | 
			
		||||
                )
 | 
			
		||||
                tn.cancel_scope.cancel()
 | 
			
		||||
 | 
			
		||||
 | 
			
		||||
if __name__ == '__main__':
 | 
			
		||||
 | 
			
		||||
    for case in [True, False]:
 | 
			
		||||
        trio.run(main, case)
 | 
			
		||||
| 
						 | 
				
			
			@ -317,7 +317,6 @@ def test_subactor_breakpoint(
 | 
			
		|||
 | 
			
		||||
    assert in_prompt_msg(
 | 
			
		||||
        child, [
 | 
			
		||||
        'MessagingError:',
 | 
			
		||||
        'RemoteActorError:',
 | 
			
		||||
         "('breakpoint_forever'",
 | 
			
		||||
         'bdb.BdbQuit',
 | 
			
		||||
| 
						 | 
				
			
			
 | 
			
		|||
| 
						 | 
				
			
			@ -284,20 +284,32 @@ async def test_cancel_infinite_streamer(start_method):
 | 
			
		|||
    ],
 | 
			
		||||
)
 | 
			
		||||
@tractor_test
 | 
			
		||||
async def test_some_cancels_all(num_actors_and_errs, start_method, loglevel):
 | 
			
		||||
    """Verify a subset of failed subactors causes all others in
 | 
			
		||||
async def test_some_cancels_all(
 | 
			
		||||
    num_actors_and_errs: tuple,
 | 
			
		||||
    start_method: str,
 | 
			
		||||
    loglevel: str,
 | 
			
		||||
):
 | 
			
		||||
    '''
 | 
			
		||||
    Verify a subset of failed subactors causes all others in
 | 
			
		||||
    the nursery to be cancelled just like the strategy in trio.
 | 
			
		||||
 | 
			
		||||
    This is the first and only supervisory strategy at the moment.
 | 
			
		||||
    """
 | 
			
		||||
    num_actors, first_err, err_type, ria_func, da_func = num_actors_and_errs
 | 
			
		||||
 | 
			
		||||
    '''
 | 
			
		||||
    (
 | 
			
		||||
        num_actors,
 | 
			
		||||
        first_err,
 | 
			
		||||
        err_type,
 | 
			
		||||
        ria_func,
 | 
			
		||||
        da_func,
 | 
			
		||||
    ) = num_actors_and_errs
 | 
			
		||||
    try:
 | 
			
		||||
        async with tractor.open_nursery() as n:
 | 
			
		||||
        async with tractor.open_nursery() as an:
 | 
			
		||||
 | 
			
		||||
            # spawn the same number of deamon actors which should be cancelled
 | 
			
		||||
            dactor_portals = []
 | 
			
		||||
            for i in range(num_actors):
 | 
			
		||||
                dactor_portals.append(await n.start_actor(
 | 
			
		||||
                dactor_portals.append(await an.start_actor(
 | 
			
		||||
                    f'deamon_{i}',
 | 
			
		||||
                    enable_modules=[__name__],
 | 
			
		||||
                ))
 | 
			
		||||
| 
						 | 
				
			
			@ -307,7 +319,7 @@ async def test_some_cancels_all(num_actors_and_errs, start_method, loglevel):
 | 
			
		|||
            for i in range(num_actors):
 | 
			
		||||
                # start actor(s) that will fail immediately
 | 
			
		||||
                riactor_portals.append(
 | 
			
		||||
                    await n.run_in_actor(
 | 
			
		||||
                    await an.run_in_actor(
 | 
			
		||||
                        func,
 | 
			
		||||
                        name=f'actor_{i}',
 | 
			
		||||
                        **kwargs
 | 
			
		||||
| 
						 | 
				
			
			@ -337,7 +349,8 @@ async def test_some_cancels_all(num_actors_and_errs, start_method, loglevel):
 | 
			
		|||
 | 
			
		||||
        # should error here with a ``RemoteActorError`` or ``MultiError``
 | 
			
		||||
 | 
			
		||||
    except first_err as err:
 | 
			
		||||
    except first_err as _err:
 | 
			
		||||
        err = _err
 | 
			
		||||
        if isinstance(err, BaseExceptionGroup):
 | 
			
		||||
            assert len(err.exceptions) == num_actors
 | 
			
		||||
            for exc in err.exceptions:
 | 
			
		||||
| 
						 | 
				
			
			@ -348,8 +361,8 @@ async def test_some_cancels_all(num_actors_and_errs, start_method, loglevel):
 | 
			
		|||
        elif isinstance(err, tractor.RemoteActorError):
 | 
			
		||||
            assert err.boxed_type == err_type
 | 
			
		||||
 | 
			
		||||
        assert n.cancelled is True
 | 
			
		||||
        assert not n._children
 | 
			
		||||
        assert an.cancelled is True
 | 
			
		||||
        assert not an._children
 | 
			
		||||
    else:
 | 
			
		||||
        pytest.fail("Should have gotten a remote assertion error?")
 | 
			
		||||
 | 
			
		||||
| 
						 | 
				
			
			@ -533,38 +546,123 @@ def test_cancel_via_SIGINT_other_task(
 | 
			
		|||
 | 
			
		||||
async def spin_for(period=3):
 | 
			
		||||
    "Sync sleep."
 | 
			
		||||
    print(f'sync sleeping in sub-sub for {period}\n')
 | 
			
		||||
    time.sleep(period)
 | 
			
		||||
 | 
			
		||||
 | 
			
		||||
async def spawn():
 | 
			
		||||
    async with tractor.open_nursery() as tn:
 | 
			
		||||
        await tn.run_in_actor(
 | 
			
		||||
async def spawn_sub_with_sync_blocking_task():
 | 
			
		||||
    async with tractor.open_nursery() as an:
 | 
			
		||||
        print('starting sync blocking subactor..\n')
 | 
			
		||||
        await an.run_in_actor(
 | 
			
		||||
            spin_for,
 | 
			
		||||
            name='sleeper',
 | 
			
		||||
        )
 | 
			
		||||
        print('exiting first subactor layer..\n')
 | 
			
		||||
 | 
			
		||||
 | 
			
		||||
@pytest.mark.parametrize(
 | 
			
		||||
    'man_cancel_outer',
 | 
			
		||||
    [
 | 
			
		||||
        False,  # passes if delay != 2
 | 
			
		||||
 | 
			
		||||
        # always causes an unexpected eg-w-embedded-assert-err?
 | 
			
		||||
        pytest.param(True,
 | 
			
		||||
             marks=pytest.mark.xfail(
 | 
			
		||||
                 reason=(
 | 
			
		||||
                    'always causes an unexpected eg-w-embedded-assert-err?'
 | 
			
		||||
                )
 | 
			
		||||
            ),
 | 
			
		||||
        ),
 | 
			
		||||
    ],
 | 
			
		||||
)
 | 
			
		||||
@no_windows
 | 
			
		||||
def test_cancel_while_childs_child_in_sync_sleep(
 | 
			
		||||
    loglevel,
 | 
			
		||||
    start_method,
 | 
			
		||||
    spawn_backend,
 | 
			
		||||
    loglevel: str,
 | 
			
		||||
    start_method: str,
 | 
			
		||||
    spawn_backend: str,
 | 
			
		||||
    debug_mode: bool,
 | 
			
		||||
    reg_addr: tuple,
 | 
			
		||||
    man_cancel_outer: bool,
 | 
			
		||||
):
 | 
			
		||||
    """Verify that a child cancelled while executing sync code is torn
 | 
			
		||||
    '''
 | 
			
		||||
    Verify that a child cancelled while executing sync code is torn
 | 
			
		||||
    down even when that cancellation is triggered by the parent
 | 
			
		||||
    2 nurseries "up".
 | 
			
		||||
    """
 | 
			
		||||
 | 
			
		||||
    Though the grandchild should stay blocking its actor runtime, its
 | 
			
		||||
    parent should issue a "zombie reaper" to hard kill it after
 | 
			
		||||
    sufficient timeout.
 | 
			
		||||
 | 
			
		||||
    '''
 | 
			
		||||
    if start_method == 'forkserver':
 | 
			
		||||
        pytest.skip("Forksever sux hard at resuming from sync sleep...")
 | 
			
		||||
 | 
			
		||||
    async def main():
 | 
			
		||||
        with trio.fail_after(2):
 | 
			
		||||
            async with tractor.open_nursery() as tn:
 | 
			
		||||
                await tn.run_in_actor(
 | 
			
		||||
                    spawn,
 | 
			
		||||
                    name='spawn',
 | 
			
		||||
        #
 | 
			
		||||
        # XXX BIG TODO NOTE XXX
 | 
			
		||||
        #
 | 
			
		||||
        # it seems there's a strange race that can happen
 | 
			
		||||
        # where where the fail-after will trigger outer scope
 | 
			
		||||
        # .cancel() which then causes the inner scope to raise,
 | 
			
		||||
        #
 | 
			
		||||
        # BaseExceptionGroup('Exceptions from Trio nursery', [
 | 
			
		||||
        #   BaseExceptionGroup('Exceptions from Trio nursery',
 | 
			
		||||
        #   [
 | 
			
		||||
        #       Cancelled(),
 | 
			
		||||
        #       Cancelled(),
 | 
			
		||||
        #   ]
 | 
			
		||||
        #   ),
 | 
			
		||||
        #   AssertionError('assert 0')
 | 
			
		||||
        # ])
 | 
			
		||||
        #
 | 
			
		||||
        # WHY THIS DOESN'T MAKE SENSE:
 | 
			
		||||
        # ---------------------------
 | 
			
		||||
        # - it should raise too-slow-error when too slow..
 | 
			
		||||
        #  * verified that using simple-cs and manually cancelling
 | 
			
		||||
        #    you get same outcome -> indicates that the fail-after
 | 
			
		||||
        #    can have its TooSlowError overriden!
 | 
			
		||||
        #  |_ to check this it's easy, simplly decrease the timeout
 | 
			
		||||
        #     as per the var below.
 | 
			
		||||
        #
 | 
			
		||||
        # - when using the manual simple-cs the outcome is different
 | 
			
		||||
        #   DESPITE the `assert 0` which means regardless of the
 | 
			
		||||
        #   inner scope effectively failing in the same way, the
 | 
			
		||||
        #   bubbling up **is NOT the same**.
 | 
			
		||||
        #
 | 
			
		||||
        # delays trigger diff outcomes..
 | 
			
		||||
        # ---------------------------
 | 
			
		||||
        # as seen by uncommenting various lines below there is from
 | 
			
		||||
        # my POV an unexpected outcome due to the delay=2 case.
 | 
			
		||||
        #
 | 
			
		||||
        # delay = 1  # no AssertionError in eg, TooSlowError raised.
 | 
			
		||||
        # delay = 2  # is AssertionError in eg AND no TooSlowError !?
 | 
			
		||||
        delay = 4  # is AssertionError in eg AND no _cs cancellation.
 | 
			
		||||
 | 
			
		||||
        with trio.fail_after(delay) as _cs:
 | 
			
		||||
        # with trio.CancelScope() as cs:
 | 
			
		||||
        # ^XXX^ can be used instead to see same outcome.
 | 
			
		||||
 | 
			
		||||
            async with (
 | 
			
		||||
                # tractor.trionics.collapse_eg(),  # doesn't help
 | 
			
		||||
                tractor.open_nursery(
 | 
			
		||||
                    hide_tb=False,
 | 
			
		||||
                    debug_mode=debug_mode,
 | 
			
		||||
                    registry_addrs=[reg_addr],
 | 
			
		||||
                ) as an,
 | 
			
		||||
            ):
 | 
			
		||||
                await an.run_in_actor(
 | 
			
		||||
                    spawn_sub_with_sync_blocking_task,
 | 
			
		||||
                    name='sync_blocking_sub',
 | 
			
		||||
                )
 | 
			
		||||
                await trio.sleep(1)
 | 
			
		||||
 | 
			
		||||
                if man_cancel_outer:
 | 
			
		||||
                    print('Cancelling manually in root')
 | 
			
		||||
                    _cs.cancel()
 | 
			
		||||
 | 
			
		||||
                # trigger exc-srced taskc down
 | 
			
		||||
                # the actor tree.
 | 
			
		||||
                print('RAISING IN ROOT')
 | 
			
		||||
                assert 0
 | 
			
		||||
 | 
			
		||||
    with pytest.raises(AssertionError):
 | 
			
		||||
| 
						 | 
				
			
			
 | 
			
		|||
| 
						 | 
				
			
			@ -13,26 +13,24 @@ MESSAGE = 'tractoring at full speed'
 | 
			
		|||
def test_empty_mngrs_input_raises() -> None:
 | 
			
		||||
 | 
			
		||||
    async def main():
 | 
			
		||||
        with trio.fail_after(1):
 | 
			
		||||
        with trio.fail_after(3):
 | 
			
		||||
            async with (
 | 
			
		||||
                open_actor_cluster(
 | 
			
		||||
                    modules=[__name__],
 | 
			
		||||
 | 
			
		||||
                    # NOTE: ensure we can passthrough runtime opts
 | 
			
		||||
                    loglevel='info',
 | 
			
		||||
                    # debug_mode=True,
 | 
			
		||||
                    loglevel='cancel',
 | 
			
		||||
                    debug_mode=False,
 | 
			
		||||
 | 
			
		||||
                ) as portals,
 | 
			
		||||
 | 
			
		||||
                gather_contexts(
 | 
			
		||||
                    # NOTE: it's the use of inline-generator syntax
 | 
			
		||||
                    # here that causes the empty input.
 | 
			
		||||
                    mngrs=(
 | 
			
		||||
                        p.open_context(worker) for p in portals.values()
 | 
			
		||||
                    ),
 | 
			
		||||
                ),
 | 
			
		||||
                gather_contexts(mngrs=()),
 | 
			
		||||
            ):
 | 
			
		||||
                assert 0
 | 
			
		||||
                # should fail before this?
 | 
			
		||||
                assert portals
 | 
			
		||||
 | 
			
		||||
                # test should fail if we mk it here!
 | 
			
		||||
                assert 0, 'Should have raised val-err !?'
 | 
			
		||||
 | 
			
		||||
    with pytest.raises(ValueError):
 | 
			
		||||
        trio.run(main)
 | 
			
		||||
| 
						 | 
				
			
			
 | 
			
		|||
| 
						 | 
				
			
			@ -235,10 +235,16 @@ async def cancel_after(wait, reg_addr):
 | 
			
		|||
 | 
			
		||||
 | 
			
		||||
@pytest.fixture(scope='module')
 | 
			
		||||
def time_quad_ex(reg_addr, ci_env, spawn_backend):
 | 
			
		||||
def time_quad_ex(
 | 
			
		||||
    reg_addr: tuple,
 | 
			
		||||
    ci_env: bool,
 | 
			
		||||
    spawn_backend: str,
 | 
			
		||||
):
 | 
			
		||||
    if spawn_backend == 'mp':
 | 
			
		||||
        """no idea but the  mp *nix runs are flaking out here often...
 | 
			
		||||
        """
 | 
			
		||||
        '''
 | 
			
		||||
        no idea but the  mp *nix runs are flaking out here often...
 | 
			
		||||
 | 
			
		||||
        '''
 | 
			
		||||
        pytest.skip("Test is too flaky on mp in CI")
 | 
			
		||||
 | 
			
		||||
    timeout = 7 if platform.system() in ('Windows', 'Darwin') else 4
 | 
			
		||||
| 
						 | 
				
			
			@ -249,12 +255,24 @@ def time_quad_ex(reg_addr, ci_env, spawn_backend):
 | 
			
		|||
    return results, diff
 | 
			
		||||
 | 
			
		||||
 | 
			
		||||
def test_a_quadruple_example(time_quad_ex, ci_env, spawn_backend):
 | 
			
		||||
    """This also serves as a kind of "we'd like to be this fast test"."""
 | 
			
		||||
def test_a_quadruple_example(
 | 
			
		||||
    time_quad_ex: tuple,
 | 
			
		||||
    ci_env: bool,
 | 
			
		||||
    spawn_backend: str,
 | 
			
		||||
):
 | 
			
		||||
    '''
 | 
			
		||||
    This also serves as a kind of "we'd like to be this fast test".
 | 
			
		||||
 | 
			
		||||
    '''
 | 
			
		||||
    results, diff = time_quad_ex
 | 
			
		||||
    assert results
 | 
			
		||||
    this_fast = 6 if platform.system() in ('Windows', 'Darwin') else 3
 | 
			
		||||
    this_fast = (
 | 
			
		||||
        6 if platform.system() in (
 | 
			
		||||
            'Windows',
 | 
			
		||||
            'Darwin',
 | 
			
		||||
        )
 | 
			
		||||
        else 3
 | 
			
		||||
    )
 | 
			
		||||
    assert diff < this_fast
 | 
			
		||||
 | 
			
		||||
 | 
			
		||||
| 
						 | 
				
			
			
 | 
			
		|||
| 
						 | 
				
			
			@ -147,8 +147,7 @@ def test_trio_prestarted_task_bubbles(
 | 
			
		|||
        await trio.sleep_forever()
 | 
			
		||||
 | 
			
		||||
    async def _trio_main():
 | 
			
		||||
        # with trio.fail_after(2):
 | 
			
		||||
        with trio.fail_after(999):
 | 
			
		||||
        with trio.fail_after(2 if not debug_mode else 999):
 | 
			
		||||
            first: str
 | 
			
		||||
            chan: to_asyncio.LinkedTaskChannel
 | 
			
		||||
            aio_ev = asyncio.Event()
 | 
			
		||||
| 
						 | 
				
			
			@ -217,32 +216,25 @@ def test_trio_prestarted_task_bubbles(
 | 
			
		|||
                        ):
 | 
			
		||||
                            aio_ev.set()
 | 
			
		||||
 | 
			
		||||
    with pytest.raises(
 | 
			
		||||
        expected_exception=ExceptionGroup,
 | 
			
		||||
    ) as excinfo:
 | 
			
		||||
        tractor.to_asyncio.run_as_asyncio_guest(
 | 
			
		||||
            trio_main=_trio_main,
 | 
			
		||||
        )
 | 
			
		||||
 | 
			
		||||
    eg = excinfo.value
 | 
			
		||||
    rte_eg, rest_eg = eg.split(RuntimeError)
 | 
			
		||||
 | 
			
		||||
    # ensure the trio-task's error bubbled despite the aio-side
 | 
			
		||||
    # having (maybe) errored first.
 | 
			
		||||
    if aio_err_trigger in (
 | 
			
		||||
        'after_trio_task_starts',
 | 
			
		||||
        'after_start_point',
 | 
			
		||||
    ):
 | 
			
		||||
        assert len(errs := rest_eg.exceptions) == 1
 | 
			
		||||
        typerr = errs[0]
 | 
			
		||||
        assert (
 | 
			
		||||
            type(typerr) is TypeError
 | 
			
		||||
            and
 | 
			
		||||
            'trio-side' in typerr.args
 | 
			
		||||
        )
 | 
			
		||||
        patt: str = 'trio-side'
 | 
			
		||||
        expect_exc = TypeError
 | 
			
		||||
 | 
			
		||||
    # when aio errors BEFORE (last) trio task is scheduled, we should
 | 
			
		||||
    # never see anythinb but the aio-side.
 | 
			
		||||
    else:
 | 
			
		||||
        assert len(rtes := rte_eg.exceptions) == 1
 | 
			
		||||
        assert 'asyncio-side' in rtes[0].args[0]
 | 
			
		||||
        patt: str = 'asyncio-side'
 | 
			
		||||
        expect_exc = RuntimeError
 | 
			
		||||
 | 
			
		||||
    with pytest.raises(expect_exc) as excinfo:
 | 
			
		||||
        tractor.to_asyncio.run_as_asyncio_guest(
 | 
			
		||||
            trio_main=_trio_main,
 | 
			
		||||
        )
 | 
			
		||||
 | 
			
		||||
    caught_exc = excinfo.value
 | 
			
		||||
    assert patt in caught_exc.args
 | 
			
		||||
| 
						 | 
				
			
			
 | 
			
		|||
| 
						 | 
				
			
			@ -55,10 +55,17 @@ async def open_actor_cluster(
 | 
			
		|||
        raise ValueError(
 | 
			
		||||
            'Number of names is {len(names)} but count it {count}')
 | 
			
		||||
 | 
			
		||||
    async with tractor.open_nursery(
 | 
			
		||||
        **runtime_kwargs,
 | 
			
		||||
    ) as an:
 | 
			
		||||
        async with trio.open_nursery() as n:
 | 
			
		||||
    async with (
 | 
			
		||||
        # tractor.trionics.collapse_eg(),
 | 
			
		||||
        tractor.open_nursery(
 | 
			
		||||
            **runtime_kwargs,
 | 
			
		||||
        ) as an
 | 
			
		||||
    ):
 | 
			
		||||
        async with (
 | 
			
		||||
            # tractor.trionics.collapse_eg(),
 | 
			
		||||
            trio.open_nursery() as tn,
 | 
			
		||||
            tractor.trionics.maybe_raise_from_masking_exc()
 | 
			
		||||
        ):
 | 
			
		||||
            uid = tractor.current_actor().uid
 | 
			
		||||
 | 
			
		||||
            async def _start(name: str) -> None:
 | 
			
		||||
| 
						 | 
				
			
			@ -69,9 +76,8 @@ async def open_actor_cluster(
 | 
			
		|||
                )
 | 
			
		||||
 | 
			
		||||
            for name in names:
 | 
			
		||||
                n.start_soon(_start, name)
 | 
			
		||||
                tn.start_soon(_start, name)
 | 
			
		||||
 | 
			
		||||
        assert len(portals) == count
 | 
			
		||||
        yield portals
 | 
			
		||||
 | 
			
		||||
        await an.cancel(hard_kill=hard_kill)
 | 
			
		||||
| 
						 | 
				
			
			
 | 
			
		|||
| 
						 | 
				
			
			@ -101,6 +101,9 @@ from ._state import (
 | 
			
		|||
    debug_mode,
 | 
			
		||||
    _ctxvar_Context,
 | 
			
		||||
)
 | 
			
		||||
from .trionics import (
 | 
			
		||||
    collapse_eg,
 | 
			
		||||
)
 | 
			
		||||
# ------ - ------
 | 
			
		||||
if TYPE_CHECKING:
 | 
			
		||||
    from ._portal import Portal
 | 
			
		||||
| 
						 | 
				
			
			@ -942,7 +945,7 @@ class Context:
 | 
			
		|||
        self.cancel_called = True
 | 
			
		||||
 | 
			
		||||
        header: str = (
 | 
			
		||||
            f'Cancelling ctx from {side.upper()}-side\n'
 | 
			
		||||
            f'Cancelling ctx from {side!r}-side\n'
 | 
			
		||||
        )
 | 
			
		||||
        reminfo: str = (
 | 
			
		||||
            # ' =>\n'
 | 
			
		||||
| 
						 | 
				
			
			@ -950,7 +953,7 @@ class Context:
 | 
			
		|||
            f'\n'
 | 
			
		||||
            f'c)=> {self.chan.uid}\n'
 | 
			
		||||
            f'   |_[{self.dst_maddr}\n'
 | 
			
		||||
            f'     >>{self.repr_rpc}\n'
 | 
			
		||||
            f'     >> {self.repr_rpc}\n'
 | 
			
		||||
            # f'    >> {self._nsf}() -> {codec}[dict]:\n\n'
 | 
			
		||||
            # TODO: pull msg-type from spec re #320
 | 
			
		||||
        )
 | 
			
		||||
| 
						 | 
				
			
			@ -2025,10 +2028,8 @@ async def open_context_from_portal(
 | 
			
		|||
    ctxc_from_callee: ContextCancelled|None = None
 | 
			
		||||
    try:
 | 
			
		||||
        async with (
 | 
			
		||||
            trio.open_nursery(
 | 
			
		||||
                strict_exception_groups=False,
 | 
			
		||||
            ) as tn,
 | 
			
		||||
 | 
			
		||||
            collapse_eg(),
 | 
			
		||||
            trio.open_nursery() as tn,
 | 
			
		||||
            msgops.maybe_limit_plds(
 | 
			
		||||
                ctx=ctx,
 | 
			
		||||
                spec=ctx_meta.get('pld_spec'),
 | 
			
		||||
| 
						 | 
				
			
			
 | 
			
		|||
| 
						 | 
				
			
			@ -28,7 +28,10 @@ from typing import (
 | 
			
		|||
from contextlib import asynccontextmanager as acm
 | 
			
		||||
 | 
			
		||||
from tractor.log import get_logger
 | 
			
		||||
from .trionics import gather_contexts
 | 
			
		||||
from .trionics import (
 | 
			
		||||
    gather_contexts,
 | 
			
		||||
    collapse_eg,
 | 
			
		||||
)
 | 
			
		||||
from .ipc import _connect_chan, Channel
 | 
			
		||||
from ._addr import (
 | 
			
		||||
    UnwrappedAddress,
 | 
			
		||||
| 
						 | 
				
			
			@ -87,7 +90,6 @@ async def get_registry(
 | 
			
		|||
            yield regstr_ptl
 | 
			
		||||
 | 
			
		||||
 | 
			
		||||
 | 
			
		||||
@acm
 | 
			
		||||
async def get_root(
 | 
			
		||||
    **kwargs,
 | 
			
		||||
| 
						 | 
				
			
			@ -253,9 +255,12 @@ async def find_actor(
 | 
			
		|||
        for addr in registry_addrs
 | 
			
		||||
    )
 | 
			
		||||
    portals: list[Portal]
 | 
			
		||||
    async with gather_contexts(
 | 
			
		||||
        mngrs=maybe_portals,
 | 
			
		||||
    ) as portals:
 | 
			
		||||
    async with (
 | 
			
		||||
        collapse_eg(),
 | 
			
		||||
        gather_contexts(
 | 
			
		||||
            mngrs=maybe_portals,
 | 
			
		||||
        ) as portals,
 | 
			
		||||
    ):
 | 
			
		||||
        # log.runtime(
 | 
			
		||||
        #     'Gathered portals:\n'
 | 
			
		||||
        #     f'{portals}'
 | 
			
		||||
| 
						 | 
				
			
			
 | 
			
		|||
| 
						 | 
				
			
			@ -39,7 +39,10 @@ import warnings
 | 
			
		|||
 | 
			
		||||
import trio
 | 
			
		||||
 | 
			
		||||
from .trionics import maybe_open_nursery
 | 
			
		||||
from .trionics import (
 | 
			
		||||
    maybe_open_nursery,
 | 
			
		||||
    collapse_eg,
 | 
			
		||||
)
 | 
			
		||||
from ._state import (
 | 
			
		||||
    current_actor,
 | 
			
		||||
)
 | 
			
		||||
| 
						 | 
				
			
			@ -583,14 +586,13 @@ async def open_portal(
 | 
			
		|||
    assert actor
 | 
			
		||||
    was_connected: bool = False
 | 
			
		||||
 | 
			
		||||
    async with maybe_open_nursery(
 | 
			
		||||
        tn,
 | 
			
		||||
        shield=shield,
 | 
			
		||||
        strict_exception_groups=False,
 | 
			
		||||
        # ^XXX^ TODO? soo roll our own then ??
 | 
			
		||||
        # -> since we kinda want the "if only one `.exception` then
 | 
			
		||||
        # just raise that" interface?
 | 
			
		||||
    ) as tn:
 | 
			
		||||
    async with (
 | 
			
		||||
        collapse_eg(),
 | 
			
		||||
        maybe_open_nursery(
 | 
			
		||||
            tn,
 | 
			
		||||
            shield=shield,
 | 
			
		||||
        ) as tn,
 | 
			
		||||
    ):
 | 
			
		||||
 | 
			
		||||
        if not channel.connected():
 | 
			
		||||
            await channel.connect()
 | 
			
		||||
| 
						 | 
				
			
			
 | 
			
		|||
| 
						 | 
				
			
			@ -37,13 +37,7 @@ import warnings
 | 
			
		|||
 | 
			
		||||
import trio
 | 
			
		||||
 | 
			
		||||
from ._runtime import (
 | 
			
		||||
    Actor,
 | 
			
		||||
    Arbiter,
 | 
			
		||||
    # TODO: rename and make a non-actor subtype?
 | 
			
		||||
    # Arbiter as Registry,
 | 
			
		||||
    async_main,
 | 
			
		||||
)
 | 
			
		||||
from . import _runtime
 | 
			
		||||
from .devx import (
 | 
			
		||||
    debug,
 | 
			
		||||
    _frame_stack,
 | 
			
		||||
| 
						 | 
				
			
			@ -64,6 +58,7 @@ from ._addr import (
 | 
			
		|||
)
 | 
			
		||||
from .trionics import (
 | 
			
		||||
    is_multi_cancelled,
 | 
			
		||||
    collapse_eg,
 | 
			
		||||
)
 | 
			
		||||
from ._exceptions import (
 | 
			
		||||
    RuntimeFailure,
 | 
			
		||||
| 
						 | 
				
			
			@ -102,7 +97,7 @@ async def maybe_block_bp(
 | 
			
		|||
    ):
 | 
			
		||||
        logger.info(
 | 
			
		||||
            f'Found `greenback` installed @ {maybe_mod}\n'
 | 
			
		||||
            'Enabling `tractor.pause_from_sync()` support!\n'
 | 
			
		||||
            f'Enabling `tractor.pause_from_sync()` support!\n'
 | 
			
		||||
        )
 | 
			
		||||
        os.environ['PYTHONBREAKPOINT'] = (
 | 
			
		||||
            'tractor.devx.debug._sync_pause_from_builtin'
 | 
			
		||||
| 
						 | 
				
			
			@ -197,9 +192,13 @@ async def open_root_actor(
 | 
			
		|||
    # read-only state to sublayers?
 | 
			
		||||
    # extra_rt_vars: dict|None = None,
 | 
			
		||||
 | 
			
		||||
) -> Actor:
 | 
			
		||||
) -> _runtime.Actor:
 | 
			
		||||
    '''
 | 
			
		||||
    Runtime init entry point for ``tractor``.
 | 
			
		||||
    Initialize the `tractor` runtime by starting a "root actor" in
 | 
			
		||||
    a parent-most Python process.
 | 
			
		||||
 | 
			
		||||
    All (disjoint) actor-process-trees-as-programs are created via
 | 
			
		||||
    this entrypoint.
 | 
			
		||||
 | 
			
		||||
    '''
 | 
			
		||||
    # XXX NEVER allow nested actor-trees!
 | 
			
		||||
| 
						 | 
				
			
			@ -397,7 +396,7 @@ async def open_root_actor(
 | 
			
		|||
                f'Registry(s) seem(s) to exist @ {ponged_addrs}'
 | 
			
		||||
            )
 | 
			
		||||
 | 
			
		||||
            actor = Actor(
 | 
			
		||||
            actor = _runtime.Actor(
 | 
			
		||||
                name=name or 'anonymous',
 | 
			
		||||
                uuid=mk_uuid(),
 | 
			
		||||
                registry_addrs=ponged_addrs,
 | 
			
		||||
| 
						 | 
				
			
			@ -436,7 +435,8 @@ async def open_root_actor(
 | 
			
		|||
            # https://github.com/goodboy/tractor/pull/348
 | 
			
		||||
            # https://github.com/goodboy/tractor/issues/296
 | 
			
		||||
 | 
			
		||||
            actor = Arbiter(
 | 
			
		||||
            # TODO: rename as `RootActor` or is that even necessary?
 | 
			
		||||
            actor = _runtime.Arbiter(
 | 
			
		||||
                name=name or 'registrar',
 | 
			
		||||
                uuid=mk_uuid(),
 | 
			
		||||
                registry_addrs=registry_addrs,
 | 
			
		||||
| 
						 | 
				
			
			@ -471,18 +471,21 @@ async def open_root_actor(
 | 
			
		|||
                    '-> Opening new registry @ '
 | 
			
		||||
                    +
 | 
			
		||||
                    '\n'.join(
 | 
			
		||||
                        f'@{addr}' for addr in reg_addrs
 | 
			
		||||
                        f'{addr}' for addr in reg_addrs
 | 
			
		||||
                    )
 | 
			
		||||
                )
 | 
			
		||||
            logger.info(f'{report}\n')
 | 
			
		||||
 | 
			
		||||
            # start the actor runtime in a new task
 | 
			
		||||
            async with trio.open_nursery(
 | 
			
		||||
                strict_exception_groups=False,
 | 
			
		||||
                # ^XXX^ TODO? instead unpack any RAE as per "loose" style?
 | 
			
		||||
            ) as nursery:
 | 
			
		||||
            # start runtime in a bg sub-task, yield to caller.
 | 
			
		||||
            async with (
 | 
			
		||||
                collapse_eg(),
 | 
			
		||||
                trio.open_nursery() as root_tn,
 | 
			
		||||
 | 
			
		||||
                # ``_runtime.async_main()`` creates an internal nursery
 | 
			
		||||
                # XXX, finally-footgun below?
 | 
			
		||||
                # -> see note on why shielding.
 | 
			
		||||
                # maybe_raise_from_masking_exc(),
 | 
			
		||||
            ):
 | 
			
		||||
                # `_runtime.async_main()` creates an internal nursery
 | 
			
		||||
                # and blocks here until any underlying actor(-process)
 | 
			
		||||
                # tree has terminated thereby conducting so called
 | 
			
		||||
                # "end-to-end" structured concurrency throughout an
 | 
			
		||||
| 
						 | 
				
			
			@ -490,9 +493,9 @@ async def open_root_actor(
 | 
			
		|||
                # "actor runtime" primitives are SC-compat and thus all
 | 
			
		||||
                # transitively spawned actors/processes must be as
 | 
			
		||||
                # well.
 | 
			
		||||
                await nursery.start(
 | 
			
		||||
                await root_tn.start(
 | 
			
		||||
                    partial(
 | 
			
		||||
                        async_main,
 | 
			
		||||
                        _runtime.async_main,
 | 
			
		||||
                        actor,
 | 
			
		||||
                        accept_addrs=trans_bind_addrs,
 | 
			
		||||
                        parent_addr=None
 | 
			
		||||
| 
						 | 
				
			
			@ -540,7 +543,7 @@ async def open_root_actor(
 | 
			
		|||
                    raise
 | 
			
		||||
 | 
			
		||||
                finally:
 | 
			
		||||
                    # NOTE: not sure if we'll ever need this but it's
 | 
			
		||||
                    # NOTE/TODO?, not sure if we'll ever need this but it's
 | 
			
		||||
                    # possibly better for even more determinism?
 | 
			
		||||
                    # logger.cancel(
 | 
			
		||||
                    #     f'Waiting on {len(nurseries)} nurseries in root..')
 | 
			
		||||
| 
						 | 
				
			
			
 | 
			
		|||
| 
						 | 
				
			
			@ -765,7 +765,6 @@ async def _invoke(
 | 
			
		|||
            BaseExceptionGroup,
 | 
			
		||||
            BaseException,
 | 
			
		||||
            trio.Cancelled,
 | 
			
		||||
 | 
			
		||||
        ) as _scope_err:
 | 
			
		||||
            scope_err = _scope_err
 | 
			
		||||
            if (
 | 
			
		||||
| 
						 | 
				
			
			
 | 
			
		|||
| 
						 | 
				
			
			@ -74,6 +74,9 @@ from tractor.msg import (
 | 
			
		|||
    pretty_struct,
 | 
			
		||||
    types as msgtypes,
 | 
			
		||||
)
 | 
			
		||||
from .trionics import (
 | 
			
		||||
    collapse_eg,
 | 
			
		||||
)
 | 
			
		||||
from .ipc import (
 | 
			
		||||
    Channel,
 | 
			
		||||
    # IPCServer,  # causes cycles atm..
 | 
			
		||||
| 
						 | 
				
			
			@ -359,7 +362,7 @@ class Actor:
 | 
			
		|||
 | 
			
		||||
    def pformat(
 | 
			
		||||
        self,
 | 
			
		||||
        ds: str = ':',
 | 
			
		||||
        ds: str = ': ',
 | 
			
		||||
        indent: int = 0,
 | 
			
		||||
        privates: bool = False,
 | 
			
		||||
    ) -> str:
 | 
			
		||||
| 
						 | 
				
			
			@ -549,6 +552,14 @@ class Actor:
 | 
			
		|||
            )
 | 
			
		||||
            raise
 | 
			
		||||
 | 
			
		||||
    # ?TODO, factor this meth-iface into a new `.rpc` subsys primitive?
 | 
			
		||||
    # - _get_rpc_func(),
 | 
			
		||||
    # - _deliver_ctx_payload(),
 | 
			
		||||
    # - get_context(),
 | 
			
		||||
    # - start_remote_task(),
 | 
			
		||||
    # - cancel_rpc_tasks(),
 | 
			
		||||
    # - _cancel_task(),
 | 
			
		||||
    #
 | 
			
		||||
    def _get_rpc_func(self, ns, funcname):
 | 
			
		||||
        '''
 | 
			
		||||
        Try to lookup and return a target RPC func from the
 | 
			
		||||
| 
						 | 
				
			
			@ -1116,14 +1127,6 @@ class Actor:
 | 
			
		|||
        self._cancel_complete.set()
 | 
			
		||||
        return True
 | 
			
		||||
 | 
			
		||||
    # XXX: hard kill logic if needed?
 | 
			
		||||
    # def _hard_mofo_kill(self):
 | 
			
		||||
    #     # If we're the root actor or zombied kill everything
 | 
			
		||||
    #     if self._parent_chan is None:  # TODO: more robust check
 | 
			
		||||
    #         root = trio.lowlevel.current_root_task()
 | 
			
		||||
    #         for n in root.child_nurseries:
 | 
			
		||||
    #             n.cancel_scope.cancel()
 | 
			
		||||
 | 
			
		||||
    async def _cancel_task(
 | 
			
		||||
        self,
 | 
			
		||||
        cid: str,
 | 
			
		||||
| 
						 | 
				
			
			@ -1358,25 +1361,13 @@ class Actor:
 | 
			
		|||
        '''
 | 
			
		||||
        return self.accept_addrs[0]
 | 
			
		||||
 | 
			
		||||
    def get_parent(self) -> Portal:
 | 
			
		||||
        '''
 | 
			
		||||
        Return a `Portal` to our parent.
 | 
			
		||||
 | 
			
		||||
        '''
 | 
			
		||||
        assert self._parent_chan, "No parent channel for this actor?"
 | 
			
		||||
        return Portal(self._parent_chan)
 | 
			
		||||
 | 
			
		||||
    def get_chans(
 | 
			
		||||
        self,
 | 
			
		||||
        uid: tuple[str, str],
 | 
			
		||||
 | 
			
		||||
    ) -> list[Channel]:
 | 
			
		||||
        '''
 | 
			
		||||
        Return all IPC channels to the actor with provided `uid`.
 | 
			
		||||
 | 
			
		||||
        '''
 | 
			
		||||
        return self._ipc_server._peers[uid]
 | 
			
		||||
 | 
			
		||||
    # TODO, this should delegate ONLY to the
 | 
			
		||||
    # `._spawn_spec._runtime_vars: dict` / `._state` APIs?
 | 
			
		||||
    #
 | 
			
		||||
    # XXX, AH RIGHT that's why..
 | 
			
		||||
    #   it's bc we pass this as a CLI flag to the child.py precisely
 | 
			
		||||
    #   bc we need the bootstrapping pre `async_main()`.. but maybe
 | 
			
		||||
    #   keep this as an impl deat and not part of the pub iface impl?
 | 
			
		||||
    def is_infected_aio(self) -> bool:
 | 
			
		||||
        '''
 | 
			
		||||
        If `True`, this actor is running `trio` in guest mode on
 | 
			
		||||
| 
						 | 
				
			
			@ -1387,6 +1378,23 @@ class Actor:
 | 
			
		|||
        '''
 | 
			
		||||
        return self._infected_aio
 | 
			
		||||
 | 
			
		||||
    # ?TODO, is this the right type for this method?
 | 
			
		||||
    def get_parent(self) -> Portal:
 | 
			
		||||
        '''
 | 
			
		||||
        Return a `Portal` to our parent.
 | 
			
		||||
 | 
			
		||||
        '''
 | 
			
		||||
        assert self._parent_chan, "No parent channel for this actor?"
 | 
			
		||||
        return Portal(self._parent_chan)
 | 
			
		||||
 | 
			
		||||
    # XXX: hard kill logic if needed?
 | 
			
		||||
    # def _hard_mofo_kill(self):
 | 
			
		||||
    #     # If we're the root actor or zombied kill everything
 | 
			
		||||
    #     if self._parent_chan is None:  # TODO: more robust check
 | 
			
		||||
    #         root = trio.lowlevel.current_root_task()
 | 
			
		||||
    #         for n in root.child_nurseries:
 | 
			
		||||
    #             n.cancel_scope.cancel()
 | 
			
		||||
 | 
			
		||||
 | 
			
		||||
async def async_main(
 | 
			
		||||
    actor: Actor,
 | 
			
		||||
| 
						 | 
				
			
			@ -1466,17 +1474,18 @@ async def async_main(
 | 
			
		|||
        # parent is kept alive as a resilient service until
 | 
			
		||||
        # cancellation steps have (mostly) occurred in
 | 
			
		||||
        # a deterministic way.
 | 
			
		||||
        async with trio.open_nursery(
 | 
			
		||||
            strict_exception_groups=False,
 | 
			
		||||
        ) as root_nursery:
 | 
			
		||||
            actor._root_n = root_nursery
 | 
			
		||||
        root_tn: trio.Nursery
 | 
			
		||||
        async with (
 | 
			
		||||
            collapse_eg(),
 | 
			
		||||
            trio.open_nursery() as root_tn,
 | 
			
		||||
        ):
 | 
			
		||||
            actor._root_n = root_tn
 | 
			
		||||
            assert actor._root_n
 | 
			
		||||
 | 
			
		||||
            ipc_server: _server.IPCServer
 | 
			
		||||
            async with (
 | 
			
		||||
                trio.open_nursery(
 | 
			
		||||
                    strict_exception_groups=False,
 | 
			
		||||
                ) as service_nursery,
 | 
			
		||||
                collapse_eg(),
 | 
			
		||||
                trio.open_nursery() as service_nursery,
 | 
			
		||||
                _server.open_ipc_server(
 | 
			
		||||
                    parent_tn=service_nursery,
 | 
			
		||||
                    stream_handler_tn=service_nursery,
 | 
			
		||||
| 
						 | 
				
			
			@ -1600,7 +1609,7 @@ async def async_main(
 | 
			
		|||
                # start processing parent requests until our channel
 | 
			
		||||
                # server is 100% up and running.
 | 
			
		||||
                if actor._parent_chan:
 | 
			
		||||
                    await root_nursery.start(
 | 
			
		||||
                    await root_tn.start(
 | 
			
		||||
                        partial(
 | 
			
		||||
                            _rpc.process_messages,
 | 
			
		||||
                            chan=actor._parent_chan,
 | 
			
		||||
| 
						 | 
				
			
			
 | 
			
		|||
| 
						 | 
				
			
			@ -44,6 +44,7 @@ from ._runtime import Actor
 | 
			
		|||
from ._portal import Portal
 | 
			
		||||
from .trionics import (
 | 
			
		||||
    is_multi_cancelled,
 | 
			
		||||
    collapse_eg,
 | 
			
		||||
)
 | 
			
		||||
from ._exceptions import (
 | 
			
		||||
    ContextCancelled,
 | 
			
		||||
| 
						 | 
				
			
			@ -326,9 +327,10 @@ class ActorNursery:
 | 
			
		|||
        server: IPCServer = self._actor.ipc_server
 | 
			
		||||
 | 
			
		||||
        with trio.move_on_after(3) as cs:
 | 
			
		||||
            async with trio.open_nursery(
 | 
			
		||||
                strict_exception_groups=False,
 | 
			
		||||
            ) as tn:
 | 
			
		||||
            async with (
 | 
			
		||||
                collapse_eg(),
 | 
			
		||||
                trio.open_nursery() as tn,
 | 
			
		||||
            ):
 | 
			
		||||
 | 
			
		||||
                subactor: Actor
 | 
			
		||||
                proc: trio.Process
 | 
			
		||||
| 
						 | 
				
			
			@ -421,10 +423,10 @@ async def _open_and_supervise_one_cancels_all_nursery(
 | 
			
		|||
    # `ActorNursery.start_actor()`).
 | 
			
		||||
 | 
			
		||||
    # errors from this daemon actor nursery bubble up to caller
 | 
			
		||||
    async with trio.open_nursery(
 | 
			
		||||
        strict_exception_groups=False,
 | 
			
		||||
        # ^XXX^ TODO? instead unpack any RAE as per "loose" style?
 | 
			
		||||
    ) as da_nursery:
 | 
			
		||||
    async with (
 | 
			
		||||
        collapse_eg(),
 | 
			
		||||
        trio.open_nursery() as da_nursery,
 | 
			
		||||
    ):
 | 
			
		||||
        try:
 | 
			
		||||
            # This is the inner level "run in actor" nursery. It is
 | 
			
		||||
            # awaited first since actors spawned in this way (using
 | 
			
		||||
| 
						 | 
				
			
			@ -434,11 +436,10 @@ async def _open_and_supervise_one_cancels_all_nursery(
 | 
			
		|||
            # immediately raised for handling by a supervisor strategy.
 | 
			
		||||
            # As such if the strategy propagates any error(s) upwards
 | 
			
		||||
            # the above "daemon actor" nursery will be notified.
 | 
			
		||||
            async with trio.open_nursery(
 | 
			
		||||
                strict_exception_groups=False,
 | 
			
		||||
                # ^XXX^ TODO? instead unpack any RAE as per "loose" style?
 | 
			
		||||
            ) as ria_nursery:
 | 
			
		||||
 | 
			
		||||
            async with (
 | 
			
		||||
                collapse_eg(),
 | 
			
		||||
                trio.open_nursery() as ria_nursery,
 | 
			
		||||
            ):
 | 
			
		||||
                an = ActorNursery(
 | 
			
		||||
                    actor,
 | 
			
		||||
                    ria_nursery,
 | 
			
		||||
| 
						 | 
				
			
			
 | 
			
		|||
| 
						 | 
				
			
			@ -31,7 +31,7 @@ from ._broadcast import (
 | 
			
		|||
)
 | 
			
		||||
from ._beg import (
 | 
			
		||||
    collapse_eg as collapse_eg,
 | 
			
		||||
    maybe_collapse_eg as maybe_collapse_eg,
 | 
			
		||||
    get_collapsed_eg as get_collapsed_eg,
 | 
			
		||||
    is_multi_cancelled as is_multi_cancelled,
 | 
			
		||||
)
 | 
			
		||||
from ._taskc import (
 | 
			
		||||
| 
						 | 
				
			
			
 | 
			
		|||
| 
						 | 
				
			
			@ -15,8 +15,9 @@
 | 
			
		|||
# along with this program.  If not, see <https://www.gnu.org/licenses/>.
 | 
			
		||||
 | 
			
		||||
'''
 | 
			
		||||
`BaseExceptionGroup` related utils and helpers pertaining to
 | 
			
		||||
first-class-`trio` from a historical perspective B)
 | 
			
		||||
`BaseExceptionGroup` utils and helpers pertaining to
 | 
			
		||||
first-class-`trio` from a "historical" perspective, like "loose
 | 
			
		||||
exception group" task-nurseries.
 | 
			
		||||
 | 
			
		||||
'''
 | 
			
		||||
from contextlib import (
 | 
			
		||||
| 
						 | 
				
			
			@ -24,27 +25,84 @@ from contextlib import (
 | 
			
		|||
)
 | 
			
		||||
from typing import (
 | 
			
		||||
    Literal,
 | 
			
		||||
    Type,
 | 
			
		||||
)
 | 
			
		||||
 | 
			
		||||
import trio
 | 
			
		||||
# from trio._core._concat_tb import (
 | 
			
		||||
#     concat_tb,
 | 
			
		||||
# )
 | 
			
		||||
 | 
			
		||||
 | 
			
		||||
def maybe_collapse_eg(
 | 
			
		||||
    beg: BaseExceptionGroup,
 | 
			
		||||
# XXX NOTE
 | 
			
		||||
# taken verbatim from `trio._core._run` except,
 | 
			
		||||
# - remove the NONSTRICT_EXCEPTIONGROUP_NOTE deprecation-note
 | 
			
		||||
#   guard-check; we know we want an explicit collapse.
 | 
			
		||||
# - mask out tb rewriting in collapse case, i don't think it really
 | 
			
		||||
#   matters?
 | 
			
		||||
#
 | 
			
		||||
def collapse_exception_group(
 | 
			
		||||
    excgroup: BaseExceptionGroup[BaseException],
 | 
			
		||||
) -> BaseException:
 | 
			
		||||
    """Recursively collapse any single-exception groups into that single contained
 | 
			
		||||
    exception.
 | 
			
		||||
 | 
			
		||||
    """
 | 
			
		||||
    exceptions = list(excgroup.exceptions)
 | 
			
		||||
    modified = False
 | 
			
		||||
    for i, exc in enumerate(exceptions):
 | 
			
		||||
        if isinstance(exc, BaseExceptionGroup):
 | 
			
		||||
            new_exc = collapse_exception_group(exc)
 | 
			
		||||
            if new_exc is not exc:
 | 
			
		||||
                modified = True
 | 
			
		||||
                exceptions[i] = new_exc
 | 
			
		||||
 | 
			
		||||
    if (
 | 
			
		||||
        len(exceptions) == 1
 | 
			
		||||
        and isinstance(excgroup, BaseExceptionGroup)
 | 
			
		||||
 | 
			
		||||
        # XXX trio's loose-setting condition..
 | 
			
		||||
        # and NONSTRICT_EXCEPTIONGROUP_NOTE in getattr(excgroup, "__notes__", ())
 | 
			
		||||
    ):
 | 
			
		||||
        # exceptions[0].__traceback__ = concat_tb(
 | 
			
		||||
        #     excgroup.__traceback__,
 | 
			
		||||
        #     exceptions[0].__traceback__,
 | 
			
		||||
        # )
 | 
			
		||||
        return exceptions[0]
 | 
			
		||||
    elif modified:
 | 
			
		||||
        return excgroup.derive(exceptions)
 | 
			
		||||
    else:
 | 
			
		||||
        return excgroup
 | 
			
		||||
 | 
			
		||||
 | 
			
		||||
def get_collapsed_eg(
 | 
			
		||||
    beg: BaseExceptionGroup,
 | 
			
		||||
 | 
			
		||||
) -> BaseException|None:
 | 
			
		||||
    '''
 | 
			
		||||
    If the input beg can collapse to a single non-eg sub-exception,
 | 
			
		||||
    return it instead.
 | 
			
		||||
    If the input beg can collapse to a single sub-exception which is
 | 
			
		||||
    itself **not** an eg, return it.
 | 
			
		||||
 | 
			
		||||
    '''
 | 
			
		||||
    if len(excs := beg.exceptions) == 1:
 | 
			
		||||
        return excs[0]
 | 
			
		||||
    maybe_exc = collapse_exception_group(beg)
 | 
			
		||||
    if maybe_exc is beg:
 | 
			
		||||
        return None
 | 
			
		||||
 | 
			
		||||
    return beg
 | 
			
		||||
    return maybe_exc
 | 
			
		||||
 | 
			
		||||
 | 
			
		||||
@acm
 | 
			
		||||
async def collapse_eg():
 | 
			
		||||
async def collapse_eg(
 | 
			
		||||
    hide_tb: bool = True,
 | 
			
		||||
 | 
			
		||||
    # XXX, for ex. will always show begs containing single taskc
 | 
			
		||||
    ignore: set[Type[BaseException]] = {
 | 
			
		||||
        # trio.Cancelled,
 | 
			
		||||
    },
 | 
			
		||||
    add_notes: bool = True,
 | 
			
		||||
 | 
			
		||||
    bp: bool = False,
 | 
			
		||||
):
 | 
			
		||||
    '''
 | 
			
		||||
    If `BaseExceptionGroup` raised in the body scope is
 | 
			
		||||
    "collapse-able" (in the same way that
 | 
			
		||||
| 
						 | 
				
			
			@ -52,15 +110,58 @@ async def collapse_eg():
 | 
			
		|||
    only raise the lone emedded non-eg in in place.
 | 
			
		||||
 | 
			
		||||
    '''
 | 
			
		||||
    __tracebackhide__: bool = hide_tb
 | 
			
		||||
    try:
 | 
			
		||||
        yield
 | 
			
		||||
    except* BaseException as beg:
 | 
			
		||||
        if (
 | 
			
		||||
            exc := maybe_collapse_eg(beg)
 | 
			
		||||
        ) is not beg:
 | 
			
		||||
            raise exc
 | 
			
		||||
    except BaseExceptionGroup as _beg:
 | 
			
		||||
        beg = _beg
 | 
			
		||||
 | 
			
		||||
        raise beg
 | 
			
		||||
        if (
 | 
			
		||||
            bp
 | 
			
		||||
            and
 | 
			
		||||
            len(beg.exceptions) > 1
 | 
			
		||||
        ):
 | 
			
		||||
            import tractor
 | 
			
		||||
            if tractor.current_actor(
 | 
			
		||||
                err_on_no_runtime=False,
 | 
			
		||||
            ):
 | 
			
		||||
                await tractor.pause(shield=True)
 | 
			
		||||
            else:
 | 
			
		||||
                breakpoint()
 | 
			
		||||
 | 
			
		||||
        if (
 | 
			
		||||
            (exc := get_collapsed_eg(beg))
 | 
			
		||||
            and
 | 
			
		||||
            type(exc) not in ignore
 | 
			
		||||
        ):
 | 
			
		||||
 | 
			
		||||
            # TODO? report number of nested groups it was collapsed
 | 
			
		||||
            # *from*?
 | 
			
		||||
            if add_notes:
 | 
			
		||||
                from_group_note: str = (
 | 
			
		||||
                    '( ^^^ this exc was collapsed from a group ^^^ )\n'
 | 
			
		||||
                )
 | 
			
		||||
                if (
 | 
			
		||||
                    from_group_note
 | 
			
		||||
                    not in
 | 
			
		||||
                    getattr(exc, "__notes__", ())
 | 
			
		||||
                ):
 | 
			
		||||
                    exc.add_note(from_group_note)
 | 
			
		||||
 | 
			
		||||
            # raise exc
 | 
			
		||||
            # ^^ this will leave the orig beg tb above with the
 | 
			
		||||
            # "during the handling of <beg> the following.."
 | 
			
		||||
            # So, instead do..
 | 
			
		||||
            #
 | 
			
		||||
            if cause := exc.__cause__:
 | 
			
		||||
                raise exc from cause
 | 
			
		||||
            else:
 | 
			
		||||
                # suppress "during handling of <the beg>"
 | 
			
		||||
                # output in tb/console.
 | 
			
		||||
                raise exc from None
 | 
			
		||||
 | 
			
		||||
        # keep original
 | 
			
		||||
        raise # beg
 | 
			
		||||
 | 
			
		||||
 | 
			
		||||
def is_multi_cancelled(
 | 
			
		||||
| 
						 | 
				
			
			
 | 
			
		|||
		Loading…
	
		Reference in New Issue