Compare commits
	
		
			10 Commits 
		
	
	
		
			4bc443ccae
			...
			8c90521562
		
	
	| Author | SHA1 | Date | 
|---|---|---|
| 
							
							
								 | 
						8c90521562 | |
| 
							
							
								 | 
						f23ee3cd22 | |
| 
							
							
								 | 
						9295af929c | |
| 
							
							
								 | 
						83f53fd0c5 | |
| 
							
							
								 | 
						9b3af1fa16 | |
| 
							
							
								 | 
						f8e4d12494 | |
| 
							
							
								 | 
						af3c14b250 | |
| 
							
							
								 | 
						7de7fd0afd | |
| 
							
							
								 | 
						79888a31a4 | |
| 
							
							
								 | 
						de16a9ac6f | 
| 
						 | 
				
			
			@ -310,7 +310,6 @@ def test_subactor_breakpoint(
 | 
			
		|||
 | 
			
		||||
    assert in_prompt_msg(
 | 
			
		||||
        child, [
 | 
			
		||||
        'MessagingError:',
 | 
			
		||||
        'RemoteActorError:',
 | 
			
		||||
         "('breakpoint_forever'",
 | 
			
		||||
         'bdb.BdbQuit',
 | 
			
		||||
| 
						 | 
				
			
			
 | 
			
		|||
| 
						 | 
				
			
			@ -410,7 +410,6 @@ def test_peer_canceller(
 | 
			
		|||
    '''
 | 
			
		||||
    async def main():
 | 
			
		||||
        async with tractor.open_nursery(
 | 
			
		||||
            # NOTE: to halt the peer tasks on ctxc, uncomment this.
 | 
			
		||||
            debug_mode=debug_mode,
 | 
			
		||||
        ) as an:
 | 
			
		||||
            canceller: Portal = await an.start_actor(
 | 
			
		||||
| 
						 | 
				
			
			
 | 
			
		|||
| 
						 | 
				
			
			@ -101,6 +101,9 @@ from ._state import (
 | 
			
		|||
    debug_mode,
 | 
			
		||||
    _ctxvar_Context,
 | 
			
		||||
)
 | 
			
		||||
from .trionics import (
 | 
			
		||||
    collapse_eg,
 | 
			
		||||
)
 | 
			
		||||
# ------ - ------
 | 
			
		||||
if TYPE_CHECKING:
 | 
			
		||||
    from ._portal import Portal
 | 
			
		||||
| 
						 | 
				
			
			@ -940,7 +943,7 @@ class Context:
 | 
			
		|||
        self.cancel_called = True
 | 
			
		||||
 | 
			
		||||
        header: str = (
 | 
			
		||||
            f'Cancelling ctx from {side.upper()}-side\n'
 | 
			
		||||
            f'Cancelling ctx from {side!r}-side\n'
 | 
			
		||||
        )
 | 
			
		||||
        reminfo: str = (
 | 
			
		||||
            # ' =>\n'
 | 
			
		||||
| 
						 | 
				
			
			@ -948,7 +951,7 @@ class Context:
 | 
			
		|||
            f'\n'
 | 
			
		||||
            f'c)=> {self.chan.uid}\n'
 | 
			
		||||
            f'   |_[{self.dst_maddr}\n'
 | 
			
		||||
            f'     >>{self.repr_rpc}\n'
 | 
			
		||||
            f'     >> {self.repr_rpc}\n'
 | 
			
		||||
            # f'    >> {self._nsf}() -> {codec}[dict]:\n\n'
 | 
			
		||||
            # TODO: pull msg-type from spec re #320
 | 
			
		||||
        )
 | 
			
		||||
| 
						 | 
				
			
			@ -2023,10 +2026,8 @@ async def open_context_from_portal(
 | 
			
		|||
    ctxc_from_callee: ContextCancelled|None = None
 | 
			
		||||
    try:
 | 
			
		||||
        async with (
 | 
			
		||||
            trio.open_nursery(
 | 
			
		||||
                strict_exception_groups=False,
 | 
			
		||||
            ) as tn,
 | 
			
		||||
 | 
			
		||||
            collapse_eg(),
 | 
			
		||||
            trio.open_nursery() as tn,
 | 
			
		||||
            msgops.maybe_limit_plds(
 | 
			
		||||
                ctx=ctx,
 | 
			
		||||
                spec=ctx_meta.get('pld_spec'),
 | 
			
		||||
| 
						 | 
				
			
			
 | 
			
		|||
| 
						 | 
				
			
			@ -28,7 +28,10 @@ from typing import (
 | 
			
		|||
from contextlib import asynccontextmanager as acm
 | 
			
		||||
 | 
			
		||||
from tractor.log import get_logger
 | 
			
		||||
from .trionics import gather_contexts
 | 
			
		||||
from .trionics import (
 | 
			
		||||
    gather_contexts,
 | 
			
		||||
    collapse_eg,
 | 
			
		||||
)
 | 
			
		||||
from .ipc import _connect_chan, Channel
 | 
			
		||||
from ._addr import (
 | 
			
		||||
    UnwrappedAddress,
 | 
			
		||||
| 
						 | 
				
			
			@ -88,7 +91,6 @@ async def get_registry(
 | 
			
		|||
            yield regstr_ptl
 | 
			
		||||
 | 
			
		||||
 | 
			
		||||
 | 
			
		||||
@acm
 | 
			
		||||
async def get_root(
 | 
			
		||||
    **kwargs,
 | 
			
		||||
| 
						 | 
				
			
			@ -249,9 +251,12 @@ async def find_actor(
 | 
			
		|||
        for addr in registry_addrs
 | 
			
		||||
    )
 | 
			
		||||
    portals: list[Portal]
 | 
			
		||||
    async with gather_contexts(
 | 
			
		||||
        mngrs=maybe_portals,
 | 
			
		||||
    ) as portals:
 | 
			
		||||
    async with (
 | 
			
		||||
        collapse_eg(),
 | 
			
		||||
        gather_contexts(
 | 
			
		||||
            mngrs=maybe_portals,
 | 
			
		||||
        ) as portals,
 | 
			
		||||
    ):
 | 
			
		||||
        # log.runtime(
 | 
			
		||||
        #     'Gathered portals:\n'
 | 
			
		||||
        #     f'{portals}'
 | 
			
		||||
| 
						 | 
				
			
			
 | 
			
		|||
| 
						 | 
				
			
			@ -39,7 +39,10 @@ import warnings
 | 
			
		|||
 | 
			
		||||
import trio
 | 
			
		||||
 | 
			
		||||
from .trionics import maybe_open_nursery
 | 
			
		||||
from .trionics import (
 | 
			
		||||
    maybe_open_nursery,
 | 
			
		||||
    collapse_eg,
 | 
			
		||||
)
 | 
			
		||||
from ._state import (
 | 
			
		||||
    current_actor,
 | 
			
		||||
)
 | 
			
		||||
| 
						 | 
				
			
			@ -558,14 +561,13 @@ async def open_portal(
 | 
			
		|||
    assert actor
 | 
			
		||||
    was_connected: bool = False
 | 
			
		||||
 | 
			
		||||
    async with maybe_open_nursery(
 | 
			
		||||
        tn,
 | 
			
		||||
        shield=shield,
 | 
			
		||||
        strict_exception_groups=False,
 | 
			
		||||
        # ^XXX^ TODO? soo roll our own then ??
 | 
			
		||||
        # -> since we kinda want the "if only one `.exception` then
 | 
			
		||||
        # just raise that" interface?
 | 
			
		||||
    ) as tn:
 | 
			
		||||
    async with (
 | 
			
		||||
        collapse_eg(),
 | 
			
		||||
        maybe_open_nursery(
 | 
			
		||||
            tn,
 | 
			
		||||
            shield=shield,
 | 
			
		||||
        ) as tn,
 | 
			
		||||
    ):
 | 
			
		||||
 | 
			
		||||
        if not channel.connected():
 | 
			
		||||
            await channel.connect()
 | 
			
		||||
| 
						 | 
				
			
			
 | 
			
		|||
| 
						 | 
				
			
			@ -37,13 +37,7 @@ import warnings
 | 
			
		|||
 | 
			
		||||
import trio
 | 
			
		||||
 | 
			
		||||
from ._runtime import (
 | 
			
		||||
    Actor,
 | 
			
		||||
    Arbiter,
 | 
			
		||||
    # TODO: rename and make a non-actor subtype?
 | 
			
		||||
    # Arbiter as Registry,
 | 
			
		||||
    async_main,
 | 
			
		||||
)
 | 
			
		||||
from . import _runtime
 | 
			
		||||
from .devx import (
 | 
			
		||||
    debug,
 | 
			
		||||
    _frame_stack,
 | 
			
		||||
| 
						 | 
				
			
			@ -64,6 +58,7 @@ from ._addr import (
 | 
			
		|||
)
 | 
			
		||||
from .trionics import (
 | 
			
		||||
    is_multi_cancelled,
 | 
			
		||||
    collapse_eg,
 | 
			
		||||
)
 | 
			
		||||
from ._exceptions import (
 | 
			
		||||
    RuntimeFailure,
 | 
			
		||||
| 
						 | 
				
			
			@ -197,9 +192,13 @@ async def open_root_actor(
 | 
			
		|||
    # read-only state to sublayers?
 | 
			
		||||
    # extra_rt_vars: dict|None = None,
 | 
			
		||||
 | 
			
		||||
) -> Actor:
 | 
			
		||||
) -> _runtime.Actor:
 | 
			
		||||
    '''
 | 
			
		||||
    Runtime init entry point for ``tractor``.
 | 
			
		||||
    Initialize the `tractor` runtime by starting a "root actor" in
 | 
			
		||||
    a parent-most Python process.
 | 
			
		||||
 | 
			
		||||
    All (disjoint) actor-process-trees-as-programs are created via
 | 
			
		||||
    this entrypoint.
 | 
			
		||||
 | 
			
		||||
    '''
 | 
			
		||||
    # XXX NEVER allow nested actor-trees!
 | 
			
		||||
| 
						 | 
				
			
			@ -379,7 +378,7 @@ async def open_root_actor(
 | 
			
		|||
                f'Registry(s) seem(s) to exist @ {ponged_addrs}'
 | 
			
		||||
            )
 | 
			
		||||
 | 
			
		||||
            actor = Actor(
 | 
			
		||||
            actor = _runtime.Actor(
 | 
			
		||||
                name=name or 'anonymous',
 | 
			
		||||
                uuid=mk_uuid(),
 | 
			
		||||
                registry_addrs=ponged_addrs,
 | 
			
		||||
| 
						 | 
				
			
			@ -414,7 +413,8 @@ async def open_root_actor(
 | 
			
		|||
            # https://github.com/goodboy/tractor/pull/348
 | 
			
		||||
            # https://github.com/goodboy/tractor/issues/296
 | 
			
		||||
 | 
			
		||||
            actor = Arbiter(
 | 
			
		||||
            # TODO: rename as `RootActor` or is that even necessary?
 | 
			
		||||
            actor = _runtime.Arbiter(
 | 
			
		||||
                name=name or 'registrar',
 | 
			
		||||
                uuid=mk_uuid(),
 | 
			
		||||
                registry_addrs=registry_addrs,
 | 
			
		||||
| 
						 | 
				
			
			@ -441,13 +441,13 @@ async def open_root_actor(
 | 
			
		|||
                f'{ml_addrs_str}'
 | 
			
		||||
            )
 | 
			
		||||
 | 
			
		||||
            # start the actor runtime in a new task
 | 
			
		||||
            async with trio.open_nursery(
 | 
			
		||||
                strict_exception_groups=False,
 | 
			
		||||
                # ^XXX^ TODO? instead unpack any RAE as per "loose" style?
 | 
			
		||||
            ) as nursery:
 | 
			
		||||
            # start runtime in a bg sub-task, yield to caller.
 | 
			
		||||
            async with (
 | 
			
		||||
                collapse_eg(),
 | 
			
		||||
                trio.open_nursery() as root_tn,
 | 
			
		||||
            ):
 | 
			
		||||
 | 
			
		||||
                # ``_runtime.async_main()`` creates an internal nursery
 | 
			
		||||
                # `_runtime.async_main()` creates an internal nursery
 | 
			
		||||
                # and blocks here until any underlying actor(-process)
 | 
			
		||||
                # tree has terminated thereby conducting so called
 | 
			
		||||
                # "end-to-end" structured concurrency throughout an
 | 
			
		||||
| 
						 | 
				
			
			@ -455,9 +455,9 @@ async def open_root_actor(
 | 
			
		|||
                # "actor runtime" primitives are SC-compat and thus all
 | 
			
		||||
                # transitively spawned actors/processes must be as
 | 
			
		||||
                # well.
 | 
			
		||||
                await nursery.start(
 | 
			
		||||
                await root_tn.start(
 | 
			
		||||
                    partial(
 | 
			
		||||
                        async_main,
 | 
			
		||||
                        _runtime.async_main,
 | 
			
		||||
                        actor,
 | 
			
		||||
                        accept_addrs=trans_bind_addrs,
 | 
			
		||||
                        parent_addr=None
 | 
			
		||||
| 
						 | 
				
			
			
 | 
			
		|||
| 
						 | 
				
			
			@ -756,7 +756,6 @@ async def _invoke(
 | 
			
		|||
            BaseExceptionGroup,
 | 
			
		||||
            BaseException,
 | 
			
		||||
            trio.Cancelled,
 | 
			
		||||
 | 
			
		||||
        ) as _scope_err:
 | 
			
		||||
            scope_err = _scope_err
 | 
			
		||||
            if (
 | 
			
		||||
| 
						 | 
				
			
			
 | 
			
		|||
| 
						 | 
				
			
			@ -74,6 +74,9 @@ from tractor.msg import (
 | 
			
		|||
    pretty_struct,
 | 
			
		||||
    types as msgtypes,
 | 
			
		||||
)
 | 
			
		||||
from .trionics import (
 | 
			
		||||
    collapse_eg,
 | 
			
		||||
)
 | 
			
		||||
from .ipc import (
 | 
			
		||||
    Channel,
 | 
			
		||||
    # IPCServer,  # causes cycles atm..
 | 
			
		||||
| 
						 | 
				
			
			@ -345,7 +348,7 @@ class Actor:
 | 
			
		|||
 | 
			
		||||
    def pformat(
 | 
			
		||||
        self,
 | 
			
		||||
        ds: str = ':',
 | 
			
		||||
        ds: str = ': ',
 | 
			
		||||
        indent: int = 0,
 | 
			
		||||
    ) -> str:
 | 
			
		||||
        fields_sect_prefix: str = ' |_'
 | 
			
		||||
| 
						 | 
				
			
			@ -1054,6 +1057,7 @@ class Actor:
 | 
			
		|||
        cid: str,
 | 
			
		||||
        parent_chan: Channel,
 | 
			
		||||
        requesting_uid: tuple[str, str]|None,
 | 
			
		||||
        # ^^TODO! use the `Aid` directly here!
 | 
			
		||||
 | 
			
		||||
        ipc_msg: dict|None|bool = False,
 | 
			
		||||
 | 
			
		||||
| 
						 | 
				
			
			@ -1099,9 +1103,12 @@ class Actor:
 | 
			
		|||
 | 
			
		||||
        log.cancel(
 | 
			
		||||
            'Rxed cancel request for RPC task\n'
 | 
			
		||||
            f'<=c) {requesting_uid}\n'
 | 
			
		||||
            f' |_{ctx._task}\n'
 | 
			
		||||
            f'    >> {ctx.repr_rpc}\n'
 | 
			
		||||
            f'{ctx._task!r} <=c) {requesting_uid}\n'
 | 
			
		||||
            f'|_>> {ctx.repr_rpc}\n'
 | 
			
		||||
 | 
			
		||||
            # f'|_{ctx._task}\n'
 | 
			
		||||
            # f'   >> {ctx.repr_rpc}\n'
 | 
			
		||||
 | 
			
		||||
            # f'=> {ctx._task}\n'
 | 
			
		||||
            # f'  >> Actor._cancel_task() => {ctx._task}\n'
 | 
			
		||||
            # f'  |_ {ctx._task}\n\n'
 | 
			
		||||
| 
						 | 
				
			
			@ -1386,10 +1393,12 @@ async def async_main(
 | 
			
		|||
        # parent is kept alive as a resilient service until
 | 
			
		||||
        # cancellation steps have (mostly) occurred in
 | 
			
		||||
        # a deterministic way.
 | 
			
		||||
        async with trio.open_nursery(
 | 
			
		||||
            strict_exception_groups=False,
 | 
			
		||||
        ) as root_nursery:
 | 
			
		||||
            actor._root_n = root_nursery
 | 
			
		||||
        root_tn: trio.Nursery
 | 
			
		||||
        async with (
 | 
			
		||||
            collapse_eg(),
 | 
			
		||||
            trio.open_nursery() as root_tn,
 | 
			
		||||
        ):
 | 
			
		||||
            actor._root_n = root_tn
 | 
			
		||||
            assert actor._root_n
 | 
			
		||||
 | 
			
		||||
            ipc_server: _server.IPCServer
 | 
			
		||||
| 
						 | 
				
			
			@ -1488,7 +1497,7 @@ async def async_main(
 | 
			
		|||
                # their root actor over that channel.
 | 
			
		||||
                if _state._runtime_vars['_is_root']:
 | 
			
		||||
                    for addr in accept_addrs:
 | 
			
		||||
                        waddr = wrap_address(addr)
 | 
			
		||||
                        waddr: Address = wrap_address(addr)
 | 
			
		||||
                        if waddr == waddr.get_root():
 | 
			
		||||
                            _state._runtime_vars['_root_mailbox'] = addr
 | 
			
		||||
 | 
			
		||||
| 
						 | 
				
			
			@ -1533,7 +1542,7 @@ async def async_main(
 | 
			
		|||
                # start processing parent requests until our channel
 | 
			
		||||
                # server is 100% up and running.
 | 
			
		||||
                if actor._parent_chan:
 | 
			
		||||
                    await root_nursery.start(
 | 
			
		||||
                    await root_tn.start(
 | 
			
		||||
                        partial(
 | 
			
		||||
                            _rpc.process_messages,
 | 
			
		||||
                            chan=actor._parent_chan,
 | 
			
		||||
| 
						 | 
				
			
			
 | 
			
		|||
| 
						 | 
				
			
			@ -42,6 +42,7 @@ from ._runtime import Actor
 | 
			
		|||
from ._portal import Portal
 | 
			
		||||
from .trionics import (
 | 
			
		||||
    is_multi_cancelled,
 | 
			
		||||
    collapse_eg,
 | 
			
		||||
)
 | 
			
		||||
from ._exceptions import (
 | 
			
		||||
    ContextCancelled,
 | 
			
		||||
| 
						 | 
				
			
			@ -324,9 +325,10 @@ class ActorNursery:
 | 
			
		|||
        server: IPCServer = self._actor.ipc_server
 | 
			
		||||
 | 
			
		||||
        with trio.move_on_after(3) as cs:
 | 
			
		||||
            async with trio.open_nursery(
 | 
			
		||||
                strict_exception_groups=False,
 | 
			
		||||
            ) as tn:
 | 
			
		||||
            async with (
 | 
			
		||||
                collapse_eg(),
 | 
			
		||||
                trio.open_nursery() as tn,
 | 
			
		||||
            ):
 | 
			
		||||
 | 
			
		||||
                subactor: Actor
 | 
			
		||||
                proc: trio.Process
 | 
			
		||||
| 
						 | 
				
			
			@ -419,10 +421,10 @@ async def _open_and_supervise_one_cancels_all_nursery(
 | 
			
		|||
    # `ActorNursery.start_actor()`).
 | 
			
		||||
 | 
			
		||||
    # errors from this daemon actor nursery bubble up to caller
 | 
			
		||||
    async with trio.open_nursery(
 | 
			
		||||
        strict_exception_groups=False,
 | 
			
		||||
        # ^XXX^ TODO? instead unpack any RAE as per "loose" style?
 | 
			
		||||
    ) as da_nursery:
 | 
			
		||||
    async with (
 | 
			
		||||
        collapse_eg(),
 | 
			
		||||
        trio.open_nursery() as da_nursery,
 | 
			
		||||
    ):
 | 
			
		||||
        try:
 | 
			
		||||
            # This is the inner level "run in actor" nursery. It is
 | 
			
		||||
            # awaited first since actors spawned in this way (using
 | 
			
		||||
| 
						 | 
				
			
			@ -432,11 +434,10 @@ async def _open_and_supervise_one_cancels_all_nursery(
 | 
			
		|||
            # immediately raised for handling by a supervisor strategy.
 | 
			
		||||
            # As such if the strategy propagates any error(s) upwards
 | 
			
		||||
            # the above "daemon actor" nursery will be notified.
 | 
			
		||||
            async with trio.open_nursery(
 | 
			
		||||
                strict_exception_groups=False,
 | 
			
		||||
                # ^XXX^ TODO? instead unpack any RAE as per "loose" style?
 | 
			
		||||
            ) as ria_nursery:
 | 
			
		||||
 | 
			
		||||
            async with (
 | 
			
		||||
                collapse_eg(),
 | 
			
		||||
                trio.open_nursery() as ria_nursery,
 | 
			
		||||
            ):
 | 
			
		||||
                an = ActorNursery(
 | 
			
		||||
                    actor,
 | 
			
		||||
                    ria_nursery,
 | 
			
		||||
| 
						 | 
				
			
			
 | 
			
		|||
| 
						 | 
				
			
			@ -238,7 +238,8 @@ def enable_stack_on_sig(
 | 
			
		|||
        import stackscope
 | 
			
		||||
    except ImportError:
 | 
			
		||||
        log.error(
 | 
			
		||||
            '`stackscope` not installed for use in debug mode!'
 | 
			
		||||
            '`stackscope` not installed for use in debug mode!\n'
 | 
			
		||||
            '`Ignoring {enable_stack_on_sig!r} call!\n'
 | 
			
		||||
        )
 | 
			
		||||
        return None
 | 
			
		||||
 | 
			
		||||
| 
						 | 
				
			
			
 | 
			
		|||
| 
						 | 
				
			
			@ -143,7 +143,7 @@ async def maybe_wait_on_canced_subs(
 | 
			
		|||
        log.cancel(
 | 
			
		||||
            'Waiting on cancel request to peer..\n'
 | 
			
		||||
            f'c)=>\n'
 | 
			
		||||
            f'  |_{chan.uid}\n'
 | 
			
		||||
            f'  |_{chan.aid}\n'
 | 
			
		||||
        )
 | 
			
		||||
 | 
			
		||||
        # XXX: this is a soft wait on the channel (and its
 | 
			
		||||
| 
						 | 
				
			
			@ -156,7 +156,7 @@ async def maybe_wait_on_canced_subs(
 | 
			
		|||
        # local runtime here is now cancelled while
 | 
			
		||||
        # (presumably) in the middle of msg loop processing.
 | 
			
		||||
        chan_info: str = (
 | 
			
		||||
            f'{chan.uid}\n'
 | 
			
		||||
            f'{chan.aid}\n'
 | 
			
		||||
            f'|_{chan}\n'
 | 
			
		||||
            f'  |_{chan.transport}\n\n'
 | 
			
		||||
        )
 | 
			
		||||
| 
						 | 
				
			
			@ -279,7 +279,7 @@ async def maybe_wait_on_canced_subs(
 | 
			
		|||
                    log.runtime(
 | 
			
		||||
                        f'Peer IPC broke but subproc is alive?\n\n'
 | 
			
		||||
 | 
			
		||||
                        f'<=x {chan.uid}@{chan.raddr}\n'
 | 
			
		||||
                        f'<=x {chan.aid}@{chan.raddr}\n'
 | 
			
		||||
                        f'   |_{proc}\n'
 | 
			
		||||
                    )
 | 
			
		||||
 | 
			
		||||
| 
						 | 
				
			
			@ -460,7 +460,7 @@ async def handle_stream_from_peer(
 | 
			
		|||
        # drop ref to channel so it can be gc-ed and disconnected
 | 
			
		||||
        con_teardown_status: str = (
 | 
			
		||||
            f'IPC channel disconnected:\n'
 | 
			
		||||
            f'<=x uid: {chan.uid}\n'
 | 
			
		||||
            f'<=x uid: {chan.aid}\n'
 | 
			
		||||
            f'   |_{pformat(chan)}\n\n'
 | 
			
		||||
        )
 | 
			
		||||
        chans.remove(chan)
 | 
			
		||||
| 
						 | 
				
			
			@ -468,7 +468,7 @@ async def handle_stream_from_peer(
 | 
			
		|||
        # TODO: do we need to be this pedantic?
 | 
			
		||||
        if not chans:
 | 
			
		||||
            con_teardown_status += (
 | 
			
		||||
                f'-> No more channels with {chan.uid}'
 | 
			
		||||
                f'-> No more channels with {chan.aid}'
 | 
			
		||||
            )
 | 
			
		||||
            server._peers.pop(uid, None)
 | 
			
		||||
 | 
			
		||||
| 
						 | 
				
			
			@ -519,7 +519,7 @@ async def handle_stream_from_peer(
 | 
			
		|||
                    and
 | 
			
		||||
                    (ctx_in_debug := pdb_lock.ctx_in_debug)
 | 
			
		||||
                    and
 | 
			
		||||
                    (pdb_user_uid := ctx_in_debug.chan.uid)
 | 
			
		||||
                    (pdb_user_uid := ctx_in_debug.chan.aid)
 | 
			
		||||
                ):
 | 
			
		||||
                    entry: tuple|None = local_nursery._children.get(
 | 
			
		||||
                        tuple(pdb_user_uid)
 | 
			
		||||
| 
						 | 
				
			
			
 | 
			
		|||
| 
						 | 
				
			
			@ -40,7 +40,7 @@ from typing import (
 | 
			
		|||
import trio
 | 
			
		||||
from tractor._state import current_actor
 | 
			
		||||
from tractor.log import get_logger
 | 
			
		||||
# from ._beg import collapse_eg
 | 
			
		||||
from ._beg import collapse_eg
 | 
			
		||||
 | 
			
		||||
 | 
			
		||||
if TYPE_CHECKING:
 | 
			
		||||
| 
						 | 
				
			
			@ -151,13 +151,8 @@ async def gather_contexts(
 | 
			
		|||
        )
 | 
			
		||||
 | 
			
		||||
    async with (
 | 
			
		||||
        # collapse_eg(),
 | 
			
		||||
        trio.open_nursery(
 | 
			
		||||
            # strict_exception_groups=False,
 | 
			
		||||
            # ^XXX^ TODO? soo roll our own then ??
 | 
			
		||||
            # -> since we kinda want the "if only one `.exception` then
 | 
			
		||||
            # just raise that" interface?
 | 
			
		||||
        ) as tn,
 | 
			
		||||
        collapse_eg(),
 | 
			
		||||
        trio.open_nursery() as tn,
 | 
			
		||||
    ):
 | 
			
		||||
        for mngr in mngrs:
 | 
			
		||||
            tn.start_soon(
 | 
			
		||||
| 
						 | 
				
			
			
 | 
			
		|||
		Loading…
	
		Reference in New Issue