Compare commits
	
		
			10 Commits 
		
	
	
		
			326b258fd5
			...
			da9bc1237d
		
	
	| Author | SHA1 | Date | 
|---|---|---|
| 
							
							
								 | 
						da9bc1237d | |
| 
							
							
								 | 
						ab11ee4fbe | |
| 
							
							
								 | 
						466dce8aed | |
| 
							
							
								 | 
						808dd9d73c | |
| 
							
							
								 | 
						aef306465d | |
| 
							
							
								 | 
						7459a4127c | |
| 
							
							
								 | 
						fc77e6eca5 | |
| 
							
							
								 | 
						26526b86c3 | |
| 
							
							
								 | 
						d079675dd4 | |
| 
							
							
								 | 
						c2acc4f55c | 
| 
						 | 
				
			
			@ -0,0 +1,113 @@
 | 
			
		|||
'''
 | 
			
		||||
Unit-ish tests for specific IPC transport protocol backends.
 | 
			
		||||
 | 
			
		||||
'''
 | 
			
		||||
from __future__ import annotations
 | 
			
		||||
from pathlib import Path
 | 
			
		||||
 | 
			
		||||
import pytest
 | 
			
		||||
import trio
 | 
			
		||||
import tractor
 | 
			
		||||
from tractor import (
 | 
			
		||||
    Actor,
 | 
			
		||||
    _state,
 | 
			
		||||
    _addr,
 | 
			
		||||
)
 | 
			
		||||
 | 
			
		||||
 | 
			
		||||
@pytest.fixture
 | 
			
		||||
def bindspace_dir_str() -> str:
 | 
			
		||||
 | 
			
		||||
    bs_dir_str: str = '/run/user/1000/doggy'
 | 
			
		||||
    bs_dir = Path(bs_dir_str)
 | 
			
		||||
    assert not bs_dir.is_dir()
 | 
			
		||||
 | 
			
		||||
    yield bs_dir_str
 | 
			
		||||
 | 
			
		||||
    # delete it on suite teardown.
 | 
			
		||||
    # ?TODO? should we support this internally
 | 
			
		||||
    # or is leaking it ok?
 | 
			
		||||
    if bs_dir.is_dir():
 | 
			
		||||
        bs_dir.rmdir()
 | 
			
		||||
 | 
			
		||||
 | 
			
		||||
def test_uds_bindspace_created_implicitly(
 | 
			
		||||
    debug_mode: bool,
 | 
			
		||||
    bindspace_dir_str: str,
 | 
			
		||||
):
 | 
			
		||||
    registry_addr: tuple = (
 | 
			
		||||
        f'{bindspace_dir_str}',
 | 
			
		||||
        'registry@doggy.sock',
 | 
			
		||||
    )
 | 
			
		||||
    bs_dir_str: str = registry_addr[0]
 | 
			
		||||
 | 
			
		||||
    # XXX, ensure bindspace-dir DNE beforehand!
 | 
			
		||||
    assert not Path(bs_dir_str).is_dir()
 | 
			
		||||
 | 
			
		||||
    async def main():
 | 
			
		||||
        async with tractor.open_nursery(
 | 
			
		||||
            enable_transports=['uds'],
 | 
			
		||||
            registry_addrs=[registry_addr],
 | 
			
		||||
            debug_mode=debug_mode,
 | 
			
		||||
        ) as _an:
 | 
			
		||||
 | 
			
		||||
            # XXX MUST be created implicitly by
 | 
			
		||||
            # `.ipc._uds.start_listener()`!
 | 
			
		||||
            assert Path(bs_dir_str).is_dir()
 | 
			
		||||
 | 
			
		||||
            root: Actor = tractor.current_actor()
 | 
			
		||||
            assert root.is_registrar
 | 
			
		||||
 | 
			
		||||
            assert registry_addr in root.reg_addrs
 | 
			
		||||
            assert (
 | 
			
		||||
                registry_addr
 | 
			
		||||
                in
 | 
			
		||||
                _state._runtime_vars['_registry_addrs']
 | 
			
		||||
            )
 | 
			
		||||
            assert (
 | 
			
		||||
                _addr.wrap_address(registry_addr)
 | 
			
		||||
                in
 | 
			
		||||
                root.registry_addrs
 | 
			
		||||
            )
 | 
			
		||||
 | 
			
		||||
    trio.run(main)
 | 
			
		||||
 | 
			
		||||
 | 
			
		||||
def test_uds_double_listen_raises_connerr(
 | 
			
		||||
    debug_mode: bool,
 | 
			
		||||
    bindspace_dir_str: str,
 | 
			
		||||
):
 | 
			
		||||
    registry_addr: tuple = (
 | 
			
		||||
        f'{bindspace_dir_str}',
 | 
			
		||||
        'registry@doggy.sock',
 | 
			
		||||
    )
 | 
			
		||||
 | 
			
		||||
    async def main():
 | 
			
		||||
        async with tractor.open_nursery(
 | 
			
		||||
            enable_transports=['uds'],
 | 
			
		||||
            registry_addrs=[registry_addr],
 | 
			
		||||
            debug_mode=debug_mode,
 | 
			
		||||
        ) as _an:
 | 
			
		||||
 | 
			
		||||
            # runtime up
 | 
			
		||||
            root: Actor = tractor.current_actor()
 | 
			
		||||
 | 
			
		||||
            from tractor.ipc._uds import (
 | 
			
		||||
                start_listener,
 | 
			
		||||
                UDSAddress,
 | 
			
		||||
            )
 | 
			
		||||
            ya_bound_addr: UDSAddress = root.registry_addrs[0]
 | 
			
		||||
            try:
 | 
			
		||||
                await start_listener(
 | 
			
		||||
                    addr=ya_bound_addr,
 | 
			
		||||
                )
 | 
			
		||||
            except ConnectionError as connerr:
 | 
			
		||||
                assert type(src_exc := connerr.__context__) is OSError
 | 
			
		||||
                assert 'Address already in use' in src_exc.args
 | 
			
		||||
                # complete, exit test.
 | 
			
		||||
 | 
			
		||||
            else:
 | 
			
		||||
                pytest.fail('It dint raise a connerr !?')
 | 
			
		||||
 | 
			
		||||
 | 
			
		||||
    trio.run(main)
 | 
			
		||||
| 
						 | 
				
			
			@ -573,14 +573,16 @@ def test_basic_interloop_channel_stream(
 | 
			
		|||
    fan_out: bool,
 | 
			
		||||
):
 | 
			
		||||
    async def main():
 | 
			
		||||
        async with tractor.open_nursery() as an:
 | 
			
		||||
            portal = await an.run_in_actor(
 | 
			
		||||
                stream_from_aio,
 | 
			
		||||
                infect_asyncio=True,
 | 
			
		||||
                fan_out=fan_out,
 | 
			
		||||
            )
 | 
			
		||||
            # should raise RAE diectly
 | 
			
		||||
            await portal.result()
 | 
			
		||||
        # TODO, figure out min timeout here!
 | 
			
		||||
        with trio.fail_after(6):
 | 
			
		||||
            async with tractor.open_nursery() as an:
 | 
			
		||||
                portal = await an.run_in_actor(
 | 
			
		||||
                    stream_from_aio,
 | 
			
		||||
                    infect_asyncio=True,
 | 
			
		||||
                    fan_out=fan_out,
 | 
			
		||||
                )
 | 
			
		||||
                # should raise RAE diectly
 | 
			
		||||
                await portal.result()
 | 
			
		||||
 | 
			
		||||
    trio.run(main)
 | 
			
		||||
 | 
			
		||||
| 
						 | 
				
			
			@ -1088,6 +1090,108 @@ def test_sigint_closes_lifetime_stack(
 | 
			
		|||
    trio.run(main)
 | 
			
		||||
 | 
			
		||||
 | 
			
		||||
 | 
			
		||||
# ?TODO asyncio.Task fn-deco?
 | 
			
		||||
# -[ ] do sig checkingat import time like @context?
 | 
			
		||||
# -[ ] maybe name it @aio_task ??
 | 
			
		||||
# -[ ] chan: to_asyncio.InterloopChannel ??
 | 
			
		||||
async def raise_before_started(
 | 
			
		||||
    # from_trio: asyncio.Queue,
 | 
			
		||||
    # to_trio: trio.abc.SendChannel,
 | 
			
		||||
    chan: to_asyncio.LinkedTaskChannel,
 | 
			
		||||
 | 
			
		||||
) -> None:
 | 
			
		||||
    '''
 | 
			
		||||
    `asyncio.Task` entry point which RTEs before calling
 | 
			
		||||
    `to_trio.send_nowait()`.
 | 
			
		||||
 | 
			
		||||
    '''
 | 
			
		||||
    await asyncio.sleep(0.2)
 | 
			
		||||
    raise RuntimeError('Some shite went wrong before `.send_nowait()`!!')
 | 
			
		||||
 | 
			
		||||
    # to_trio.send_nowait('Uhh we shouldve RTE-d ^^ ??')
 | 
			
		||||
    chan.started_nowait('Uhh we shouldve RTE-d ^^ ??')
 | 
			
		||||
    await asyncio.sleep(float('inf'))
 | 
			
		||||
 | 
			
		||||
 | 
			
		||||
@tractor.context
 | 
			
		||||
async def caching_ep(
 | 
			
		||||
    ctx: tractor.Context,
 | 
			
		||||
):
 | 
			
		||||
 | 
			
		||||
    log = tractor.log.get_logger('caching_ep')
 | 
			
		||||
    log.info('syncing via `ctx.started()`')
 | 
			
		||||
    await ctx.started()
 | 
			
		||||
 | 
			
		||||
    # XXX, allocate the `open_channel_from()` inside
 | 
			
		||||
    # a `.trionics.maybe_open_context()`.
 | 
			
		||||
    chan: to_asyncio.LinkedTaskChannel
 | 
			
		||||
    async with (
 | 
			
		||||
        tractor.trionics.maybe_open_context(
 | 
			
		||||
            acm_func=tractor.to_asyncio.open_channel_from,
 | 
			
		||||
            kwargs={
 | 
			
		||||
                'target': raise_before_started,
 | 
			
		||||
                # ^XXX, kwarg to `open_channel_from()`
 | 
			
		||||
            },
 | 
			
		||||
 | 
			
		||||
            # lock around current actor task access
 | 
			
		||||
            key=tractor.current_actor().uid,
 | 
			
		||||
 | 
			
		||||
        ) as (cache_hit, (clients, chan)),
 | 
			
		||||
    ):
 | 
			
		||||
        if cache_hit:
 | 
			
		||||
            log.error(
 | 
			
		||||
                'Re-using cached `.open_from_channel()` call!\n'
 | 
			
		||||
            )
 | 
			
		||||
 | 
			
		||||
        else:
 | 
			
		||||
            log.info(
 | 
			
		||||
                'Allocating SHOULD-FAIL `.open_from_channel()`\n'
 | 
			
		||||
            )
 | 
			
		||||
 | 
			
		||||
        await trio.sleep_forever()
 | 
			
		||||
 | 
			
		||||
 | 
			
		||||
def test_aio_side_raises_before_started(
 | 
			
		||||
    reg_addr: tuple[str, int],
 | 
			
		||||
    debug_mode: bool,
 | 
			
		||||
    loglevel: str,
 | 
			
		||||
):
 | 
			
		||||
    '''
 | 
			
		||||
    Simulates connection-err from `piker.brokers.ib.api`..
 | 
			
		||||
 | 
			
		||||
    Ensure any error raised by child-`asyncio.Task` BEFORE
 | 
			
		||||
    `chan.started()`
 | 
			
		||||
 | 
			
		||||
    '''
 | 
			
		||||
    # delay = 999 if debug_mode else 1
 | 
			
		||||
    async def main():
 | 
			
		||||
        with trio.fail_after(3):
 | 
			
		||||
            an: tractor.ActorNursery
 | 
			
		||||
            async with tractor.open_nursery(
 | 
			
		||||
                debug_mode=debug_mode,
 | 
			
		||||
                loglevel=loglevel,
 | 
			
		||||
            ) as an:
 | 
			
		||||
                p: tractor.Portal = await an.start_actor(
 | 
			
		||||
                    'lchan_cacher_that_raises_fast',
 | 
			
		||||
                    enable_modules=[__name__],
 | 
			
		||||
                    infect_asyncio=True,
 | 
			
		||||
                )
 | 
			
		||||
                async with p.open_context(
 | 
			
		||||
                    caching_ep,
 | 
			
		||||
                ) as (ctx, first):
 | 
			
		||||
                    assert not first
 | 
			
		||||
 | 
			
		||||
    with pytest.raises(
 | 
			
		||||
        expected_exception=(RemoteActorError),
 | 
			
		||||
    ) as excinfo:
 | 
			
		||||
        trio.run(main)
 | 
			
		||||
 | 
			
		||||
    # ensure `asyncio.Task` exception is bubbled
 | 
			
		||||
    # allll the way erp!!
 | 
			
		||||
    rae = excinfo.value
 | 
			
		||||
    assert rae.boxed_type is RuntimeError
 | 
			
		||||
 | 
			
		||||
# TODO: debug_mode tests once we get support for `asyncio`!
 | 
			
		||||
#
 | 
			
		||||
# -[ ] need tests to wrap both scripts:
 | 
			
		||||
| 
						 | 
				
			
			
 | 
			
		|||
| 
						 | 
				
			
			@ -2265,7 +2265,7 @@ async def open_context_from_portal(
 | 
			
		|||
        # await debug.pause()
 | 
			
		||||
        # log.cancel(
 | 
			
		||||
        match scope_err:
 | 
			
		||||
            case trio.Cancelled:
 | 
			
		||||
            case trio.Cancelled():
 | 
			
		||||
                logmeth = log.cancel
 | 
			
		||||
 | 
			
		||||
            # XXX explicitly report on any non-graceful-taskc cases
 | 
			
		||||
| 
						 | 
				
			
			
 | 
			
		|||
| 
						 | 
				
			
			@ -185,7 +185,9 @@ class Channel:
 | 
			
		|||
            addr,
 | 
			
		||||
            **kwargs,
 | 
			
		||||
        )
 | 
			
		||||
        assert transport.raddr == addr
 | 
			
		||||
        # XXX, for UDS *no!* since we recv the peer-pid and build out
 | 
			
		||||
        # a new addr..
 | 
			
		||||
        # assert transport.raddr == addr
 | 
			
		||||
        chan = Channel(transport=transport)
 | 
			
		||||
 | 
			
		||||
        # ?TODO, compact this into adapter level-methods?
 | 
			
		||||
| 
						 | 
				
			
			@ -301,7 +303,7 @@ class Channel:
 | 
			
		|||
        self,
 | 
			
		||||
        payload: Any,
 | 
			
		||||
 | 
			
		||||
        hide_tb: bool = True,
 | 
			
		||||
        hide_tb: bool = False,
 | 
			
		||||
 | 
			
		||||
    ) -> None:
 | 
			
		||||
        '''
 | 
			
		||||
| 
						 | 
				
			
			
 | 
			
		|||
| 
						 | 
				
			
			@ -103,8 +103,6 @@ class UDSAddress(
 | 
			
		|||
            self.filedir
 | 
			
		||||
            or
 | 
			
		||||
            self.def_bindspace
 | 
			
		||||
            # or
 | 
			
		||||
            # get_rt_dir()
 | 
			
		||||
        )
 | 
			
		||||
 | 
			
		||||
    @property
 | 
			
		||||
| 
						 | 
				
			
			@ -230,7 +228,14 @@ async def start_listener(
 | 
			
		|||
    addr: UDSAddress,
 | 
			
		||||
    **kwargs,
 | 
			
		||||
) -> SocketListener:
 | 
			
		||||
    # sock = addr._sock = socket.socket(
 | 
			
		||||
    '''
 | 
			
		||||
    Start listening for inbound connections via
 | 
			
		||||
    a `trio.SocketListener` (task) which `socket.bind()`s on `addr`.
 | 
			
		||||
 | 
			
		||||
    Note, if the `UDSAddress.bindspace: Path` directory dne it is
 | 
			
		||||
    implicitly created.
 | 
			
		||||
 | 
			
		||||
    '''
 | 
			
		||||
    sock = socket.socket(
 | 
			
		||||
        socket.AF_UNIX,
 | 
			
		||||
        socket.SOCK_STREAM
 | 
			
		||||
| 
						 | 
				
			
			@ -241,7 +246,17 @@ async def start_listener(
 | 
			
		|||
        f'|_{addr}\n'
 | 
			
		||||
    )
 | 
			
		||||
 | 
			
		||||
    # ?TODO? should we use the `actor.lifetime_stack`
 | 
			
		||||
    # to rm on shutdown?
 | 
			
		||||
    bindpath: Path = addr.sockpath
 | 
			
		||||
    if not (bs := addr.bindspace).is_dir():
 | 
			
		||||
        log.info(
 | 
			
		||||
            'Creating bindspace dir in file-sys\n'
 | 
			
		||||
            f'>{{\n'
 | 
			
		||||
            f'|_{bs!r}\n'
 | 
			
		||||
        )
 | 
			
		||||
        bs.mkdir()
 | 
			
		||||
 | 
			
		||||
    with _reraise_as_connerr(
 | 
			
		||||
        src_excs=(
 | 
			
		||||
            FileNotFoundError,
 | 
			
		||||
| 
						 | 
				
			
			
 | 
			
		|||
| 
						 | 
				
			
			@ -130,6 +130,7 @@ class LinkedTaskChannel(
 | 
			
		|||
    _trio_task: trio.Task
 | 
			
		||||
    _aio_task_complete: trio.Event
 | 
			
		||||
 | 
			
		||||
    _closed_by_aio_task: bool = False
 | 
			
		||||
    _suppress_graceful_exits: bool = True
 | 
			
		||||
 | 
			
		||||
    _trio_err: BaseException|None = None
 | 
			
		||||
| 
						 | 
				
			
			@ -208,10 +209,15 @@ class LinkedTaskChannel(
 | 
			
		|||
    async def aclose(self) -> None:
 | 
			
		||||
        await self._from_aio.aclose()
 | 
			
		||||
 | 
			
		||||
    def started(
 | 
			
		||||
    # ?TODO? async version of this?
 | 
			
		||||
    def started_nowait(
 | 
			
		||||
        self,
 | 
			
		||||
        val: Any = None,
 | 
			
		||||
    ) -> None:
 | 
			
		||||
        '''
 | 
			
		||||
        Synchronize aio-sde with its trio-parent.
 | 
			
		||||
 | 
			
		||||
        '''
 | 
			
		||||
        self._aio_started_val = val
 | 
			
		||||
        return self._to_trio.send_nowait(val)
 | 
			
		||||
 | 
			
		||||
| 
						 | 
				
			
			@ -242,6 +248,7 @@ class LinkedTaskChannel(
 | 
			
		|||
            # cycle on the trio side?
 | 
			
		||||
            # await trio.lowlevel.checkpoint()
 | 
			
		||||
            return await self._from_aio.receive()
 | 
			
		||||
 | 
			
		||||
        except BaseException as err:
 | 
			
		||||
            async with translate_aio_errors(
 | 
			
		||||
                chan=self,
 | 
			
		||||
| 
						 | 
				
			
			@ -319,7 +326,7 @@ def _run_asyncio_task(
 | 
			
		|||
    qsize: int = 1,
 | 
			
		||||
    provide_channels: bool = False,
 | 
			
		||||
    suppress_graceful_exits: bool = True,
 | 
			
		||||
    hide_tb: bool = False,
 | 
			
		||||
    hide_tb: bool = True,
 | 
			
		||||
    **kwargs,
 | 
			
		||||
 | 
			
		||||
) -> LinkedTaskChannel:
 | 
			
		||||
| 
						 | 
				
			
			@ -347,18 +354,6 @@ def _run_asyncio_task(
 | 
			
		|||
        # value otherwise it would just return ;P
 | 
			
		||||
        assert qsize > 1
 | 
			
		||||
 | 
			
		||||
    if provide_channels:
 | 
			
		||||
        assert 'to_trio' in args
 | 
			
		||||
 | 
			
		||||
    # allow target func to accept/stream results manually by name
 | 
			
		||||
    if 'to_trio' in args:
 | 
			
		||||
        kwargs['to_trio'] = to_trio
 | 
			
		||||
 | 
			
		||||
    if 'from_trio' in args:
 | 
			
		||||
        kwargs['from_trio'] = from_trio
 | 
			
		||||
 | 
			
		||||
    coro = func(**kwargs)
 | 
			
		||||
 | 
			
		||||
    trio_task: trio.Task = trio.lowlevel.current_task()
 | 
			
		||||
    trio_cs = trio.CancelScope()
 | 
			
		||||
    aio_task_complete = trio.Event()
 | 
			
		||||
| 
						 | 
				
			
			@ -373,6 +368,25 @@ def _run_asyncio_task(
 | 
			
		|||
        _suppress_graceful_exits=suppress_graceful_exits,
 | 
			
		||||
    )
 | 
			
		||||
 | 
			
		||||
    # allow target func to accept/stream results manually by name
 | 
			
		||||
    if 'to_trio' in args:
 | 
			
		||||
        kwargs['to_trio'] = to_trio
 | 
			
		||||
 | 
			
		||||
    if 'from_trio' in args:
 | 
			
		||||
        kwargs['from_trio'] = from_trio
 | 
			
		||||
 | 
			
		||||
    if 'chan' in args:
 | 
			
		||||
        kwargs['chan'] = chan
 | 
			
		||||
 | 
			
		||||
    if provide_channels:
 | 
			
		||||
        assert (
 | 
			
		||||
            'to_trio' in args
 | 
			
		||||
            or
 | 
			
		||||
            'chan' in args
 | 
			
		||||
        )
 | 
			
		||||
 | 
			
		||||
    coro = func(**kwargs)
 | 
			
		||||
 | 
			
		||||
    async def wait_on_coro_final_result(
 | 
			
		||||
        to_trio: trio.MemorySendChannel,
 | 
			
		||||
        coro: Awaitable,
 | 
			
		||||
| 
						 | 
				
			
			@ -445,9 +459,15 @@ def _run_asyncio_task(
 | 
			
		|||
                        f'Task exited with final result: {result!r}\n'
 | 
			
		||||
                    )
 | 
			
		||||
 | 
			
		||||
                # only close the sender side which will relay
 | 
			
		||||
                # a `trio.EndOfChannel` to the trio (consumer) side.
 | 
			
		||||
                # only close the aio (child) side which will relay
 | 
			
		||||
                # a `trio.EndOfChannel` to the trio (parent) side.
 | 
			
		||||
                #
 | 
			
		||||
                # XXX NOTE, that trio-side MUST then in such cases
 | 
			
		||||
                # check for a `chan._aio_err` and raise it!!
 | 
			
		||||
                to_trio.close()
 | 
			
		||||
                # specially mark the closure as due to the
 | 
			
		||||
                # asyncio.Task terminating!
 | 
			
		||||
                chan._closed_by_aio_task = True
 | 
			
		||||
 | 
			
		||||
            aio_task_complete.set()
 | 
			
		||||
            log.runtime(
 | 
			
		||||
| 
						 | 
				
			
			@ -645,8 +665,9 @@ def _run_asyncio_task(
 | 
			
		|||
                not trio_cs.cancel_called
 | 
			
		||||
            ):
 | 
			
		||||
                log.cancel(
 | 
			
		||||
                    f'Cancelling `trio` side due to aio-side src exc\n'
 | 
			
		||||
                    f'{curr_aio_err}\n'
 | 
			
		||||
                    f'Cancelling trio-side due to aio-side src exc\n'
 | 
			
		||||
                    f'\n'
 | 
			
		||||
                    f'{curr_aio_err!r}\n'
 | 
			
		||||
                    f'\n'
 | 
			
		||||
                    f'(c>\n'
 | 
			
		||||
                    f'  |_{trio_task}\n'
 | 
			
		||||
| 
						 | 
				
			
			@ -758,6 +779,7 @@ async def translate_aio_errors(
 | 
			
		|||
    aio_done_before_trio: bool = aio_task.done()
 | 
			
		||||
    assert aio_task
 | 
			
		||||
    trio_err: BaseException|None = None
 | 
			
		||||
    eoc: trio.EndOfChannel|None = None
 | 
			
		||||
    try:
 | 
			
		||||
        yield  # back to one of the cross-loop apis
 | 
			
		||||
    except trio.Cancelled as taskc:
 | 
			
		||||
| 
						 | 
				
			
			@ -789,12 +811,50 @@ async def translate_aio_errors(
 | 
			
		|||
        # )
 | 
			
		||||
        # raise
 | 
			
		||||
 | 
			
		||||
    # XXX always passthrough EoC since this translator is often
 | 
			
		||||
    # called from `LinkedTaskChannel.receive()` which we want
 | 
			
		||||
    # passthrough and further we have no special meaning for it in
 | 
			
		||||
    # terms of relaying errors or signals from the aio side!
 | 
			
		||||
    except trio.EndOfChannel as eoc:
 | 
			
		||||
        trio_err = chan._trio_err = eoc
 | 
			
		||||
    # XXX EoC is a special SIGNAL from the aio-side here!
 | 
			
		||||
    # There are 2 cases to handle:
 | 
			
		||||
    # 1. the "EoC passthrough" case.
 | 
			
		||||
    #   - the aio-task actually closed the channel "gracefully" and
 | 
			
		||||
    #     the trio-task should unwind any ongoing channel
 | 
			
		||||
    #     iteration/receiving,
 | 
			
		||||
    #  |_this exc-translator wraps calls to `LinkedTaskChannel.receive()`
 | 
			
		||||
    #    in which case we want to relay the actual "end-of-chan" for
 | 
			
		||||
    #    iteration purposes.
 | 
			
		||||
    #
 | 
			
		||||
    # 2. relaying the "asyncio.Task termination" case.
 | 
			
		||||
    #   - if the aio-task terminates, maybe with an error, AND the
 | 
			
		||||
    #    `open_channel_from()` API was used, it will always signal
 | 
			
		||||
    #    that termination.
 | 
			
		||||
    #  |_`wait_on_coro_final_result()` always calls
 | 
			
		||||
    #    `to_trio.close()` when `provide_channels=True` so we need to
 | 
			
		||||
    #    always check if there is an aio-side exc which needs to be
 | 
			
		||||
    #    relayed to the parent trio side!
 | 
			
		||||
    #  |_in this case the special `chan._closed_by_aio_task` is
 | 
			
		||||
    #    ALWAYS set.
 | 
			
		||||
    #
 | 
			
		||||
    except trio.EndOfChannel as _eoc:
 | 
			
		||||
        eoc = _eoc
 | 
			
		||||
        if (
 | 
			
		||||
            chan._closed_by_aio_task
 | 
			
		||||
            and
 | 
			
		||||
            aio_err
 | 
			
		||||
        ):
 | 
			
		||||
            log.cancel(
 | 
			
		||||
                f'The asyncio-child task terminated due to error\n'
 | 
			
		||||
                f'{aio_err!r}\n'
 | 
			
		||||
            )
 | 
			
		||||
            chan._trio_to_raise = aio_err
 | 
			
		||||
            trio_err = chan._trio_err = eoc
 | 
			
		||||
            #
 | 
			
		||||
            # await tractor.pause(shield=True)
 | 
			
		||||
            #
 | 
			
		||||
            # ?TODO?, raise something like a,
 | 
			
		||||
            # chan._trio_to_raise = AsyncioErrored()
 | 
			
		||||
            # BUT, with the tb rewritten to reflect the underlying
 | 
			
		||||
            # call stack?
 | 
			
		||||
        else:
 | 
			
		||||
            trio_err = chan._trio_err = eoc
 | 
			
		||||
 | 
			
		||||
        raise eoc
 | 
			
		||||
 | 
			
		||||
    # NOTE ALSO SEE the matching note in the `cancel_trio()` asyncio
 | 
			
		||||
| 
						 | 
				
			
			@ -1047,7 +1107,7 @@ async def translate_aio_errors(
 | 
			
		|||
        #
 | 
			
		||||
        if wait_on_aio_task:
 | 
			
		||||
            await chan._aio_task_complete.wait()
 | 
			
		||||
            log.info(
 | 
			
		||||
            log.debug(
 | 
			
		||||
                'asyncio-task is done and unblocked trio-side!\n'
 | 
			
		||||
            )
 | 
			
		||||
 | 
			
		||||
| 
						 | 
				
			
			@ -1064,11 +1124,17 @@ async def translate_aio_errors(
 | 
			
		|||
        trio_to_raise: (
 | 
			
		||||
            AsyncioCancelled|
 | 
			
		||||
            AsyncioTaskExited|
 | 
			
		||||
            Exception|  # relayed from aio-task
 | 
			
		||||
            None
 | 
			
		||||
        ) = chan._trio_to_raise
 | 
			
		||||
 | 
			
		||||
        raise_from: Exception = (
 | 
			
		||||
            trio_err if (aio_err is trio_to_raise)
 | 
			
		||||
            else aio_err
 | 
			
		||||
        )
 | 
			
		||||
 | 
			
		||||
        if not suppress_graceful_exits:
 | 
			
		||||
            raise trio_to_raise from (aio_err or trio_err)
 | 
			
		||||
            raise trio_to_raise from raise_from
 | 
			
		||||
 | 
			
		||||
        if trio_to_raise:
 | 
			
		||||
            match (
 | 
			
		||||
| 
						 | 
				
			
			@ -1101,7 +1167,7 @@ async def translate_aio_errors(
 | 
			
		|||
                        )
 | 
			
		||||
                        return
 | 
			
		||||
                case _:
 | 
			
		||||
                    raise trio_to_raise from (aio_err or trio_err)
 | 
			
		||||
                    raise trio_to_raise from raise_from
 | 
			
		||||
 | 
			
		||||
        # Check if the asyncio-side is the cause of the trio-side
 | 
			
		||||
        # error.
 | 
			
		||||
| 
						 | 
				
			
			@ -1167,7 +1233,6 @@ async def run_task(
 | 
			
		|||
 | 
			
		||||
@acm
 | 
			
		||||
async def open_channel_from(
 | 
			
		||||
 | 
			
		||||
    target: Callable[..., Any],
 | 
			
		||||
    suppress_graceful_exits: bool = True,
 | 
			
		||||
    **target_kwargs,
 | 
			
		||||
| 
						 | 
				
			
			@ -1201,7 +1266,6 @@ async def open_channel_from(
 | 
			
		|||
                    # deliver stream handle upward
 | 
			
		||||
                    yield first, chan
 | 
			
		||||
            except trio.Cancelled as taskc:
 | 
			
		||||
                # await tractor.pause(shield=True)  # ya it worx ;)
 | 
			
		||||
                if cs.cancel_called:
 | 
			
		||||
                    if isinstance(chan._trio_to_raise, AsyncioCancelled):
 | 
			
		||||
                        log.cancel(
 | 
			
		||||
| 
						 | 
				
			
			
 | 
			
		|||
| 
						 | 
				
			
			@ -97,11 +97,11 @@ def get_collapsed_eg(
 | 
			
		|||
async def collapse_eg(
 | 
			
		||||
    hide_tb: bool = True,
 | 
			
		||||
 | 
			
		||||
    # XXX, for ex. will always show begs containing single taskc
 | 
			
		||||
    ignore: set[Type[BaseException]] = {
 | 
			
		||||
        # trio.Cancelled,
 | 
			
		||||
    },
 | 
			
		||||
    add_notes: bool = True,
 | 
			
		||||
    bp: bool = False,
 | 
			
		||||
):
 | 
			
		||||
    '''
 | 
			
		||||
    If `BaseExceptionGroup` raised in the body scope is
 | 
			
		||||
| 
						 | 
				
			
			@ -115,30 +115,6 @@ async def collapse_eg(
 | 
			
		|||
        yield
 | 
			
		||||
    except BaseExceptionGroup as _beg:
 | 
			
		||||
        beg = _beg
 | 
			
		||||
 | 
			
		||||
    # TODO, remove this rant..
 | 
			
		||||
    #
 | 
			
		||||
    # except* BaseException as beg:
 | 
			
		||||
    # ^XXX WOW.. good job cpython. ^
 | 
			
		||||
    # like, never ever EVER use this!! XD
 | 
			
		||||
    #
 | 
			
		||||
    # turns out rasing from an `except*`-block has the opposite
 | 
			
		||||
    # behaviour of normal `except` and further can *never* be used to
 | 
			
		||||
    # get the equiv of,
 | 
			
		||||
    # `trio.open_nursery(strict_exception_groups=False)`
 | 
			
		||||
    #
 | 
			
		||||
    # ------ docs ------
 | 
			
		||||
    # https://docs.python.org/3/reference/compound_stmts.html#except-star
 | 
			
		||||
    #
 | 
			
		||||
    # > Any remaining exceptions that were not handled by any
 | 
			
		||||
    # > except* clause are re-raised at the end, along with all
 | 
			
		||||
    # > exceptions that were raised from within the except*
 | 
			
		||||
    # > clauses. If this list contains more than one exception to
 | 
			
		||||
    # > reraise, they are combined into an exception group.
 | 
			
		||||
        if bp:
 | 
			
		||||
            from tractor.devx import pause
 | 
			
		||||
            await pause(shield=True)
 | 
			
		||||
 | 
			
		||||
        if (
 | 
			
		||||
            (exc := get_collapsed_eg(beg))
 | 
			
		||||
            and
 | 
			
		||||
| 
						 | 
				
			
			@ -158,16 +134,17 @@ async def collapse_eg(
 | 
			
		|||
                ):
 | 
			
		||||
                    exc.add_note(from_group_note)
 | 
			
		||||
 | 
			
		||||
            raise exc
 | 
			
		||||
 | 
			
		||||
            # ?TODO? not needed right?
 | 
			
		||||
            # if cause := exc.__cause__:
 | 
			
		||||
            #     raise exc# from cause
 | 
			
		||||
            # else:
 | 
			
		||||
            #     # raise exc from beg
 | 
			
		||||
            #     # suppress "during handling of <the beg>"
 | 
			
		||||
            #     # output in tb/console.
 | 
			
		||||
            #     raise exc from None
 | 
			
		||||
            # raise exc
 | 
			
		||||
            # ^^ this will leave the orig beg tb above with the
 | 
			
		||||
            # "during the handling of <beg> the following.."
 | 
			
		||||
            # So, instead do..
 | 
			
		||||
            #
 | 
			
		||||
            if cause := exc.__cause__:
 | 
			
		||||
                raise exc from cause
 | 
			
		||||
            else:
 | 
			
		||||
                # suppress "during handling of <the beg>"
 | 
			
		||||
                # output in tb/console.
 | 
			
		||||
                raise exc from None
 | 
			
		||||
 | 
			
		||||
        # keep original
 | 
			
		||||
        raise # beg
 | 
			
		||||
| 
						 | 
				
			
			
 | 
			
		|||
| 
						 | 
				
			
			@ -41,6 +41,9 @@ import trio
 | 
			
		|||
from tractor._state import current_actor
 | 
			
		||||
from tractor.log import get_logger
 | 
			
		||||
# from ._beg import collapse_eg
 | 
			
		||||
# from ._taskc import (
 | 
			
		||||
#     maybe_raise_from_masking_exc,
 | 
			
		||||
# )
 | 
			
		||||
 | 
			
		||||
 | 
			
		||||
if TYPE_CHECKING:
 | 
			
		||||
| 
						 | 
				
			
			@ -106,6 +109,9 @@ async def _enter_and_wait(
 | 
			
		|||
async def gather_contexts(
 | 
			
		||||
    mngrs: Sequence[AsyncContextManager[T]],
 | 
			
		||||
 | 
			
		||||
    # caller can provide their own scope
 | 
			
		||||
    tn: trio.Nursery|None = None,
 | 
			
		||||
 | 
			
		||||
) -> AsyncGenerator[
 | 
			
		||||
    tuple[
 | 
			
		||||
        T | None,
 | 
			
		||||
| 
						 | 
				
			
			@ -148,39 +154,45 @@ async def gather_contexts(
 | 
			
		|||
            '`.trionics.gather_contexts()` input mngrs is empty?\n'
 | 
			
		||||
            '\n'
 | 
			
		||||
            'Did try to use inline generator syntax?\n'
 | 
			
		||||
            'Use a non-lazy iterator or sequence-type intead!\n'
 | 
			
		||||
            'Check that list({mngrs}) works!\n'
 | 
			
		||||
            # 'or sequence-type intead!\n'
 | 
			
		||||
            # 'Use a non-lazy iterator or sequence-type intead!\n'
 | 
			
		||||
        )
 | 
			
		||||
 | 
			
		||||
    async with (
 | 
			
		||||
        # collapse_eg(),
 | 
			
		||||
        trio.open_nursery(
 | 
			
		||||
            strict_exception_groups=False,
 | 
			
		||||
            # ^XXX^ TODO? soo roll our own then ??
 | 
			
		||||
            # -> since we kinda want the "if only one `.exception` then
 | 
			
		||||
            # just raise that" interface?
 | 
			
		||||
        ) as tn,
 | 
			
		||||
    ):
 | 
			
		||||
        for mngr in mngrs:
 | 
			
		||||
            tn.start_soon(
 | 
			
		||||
                _enter_and_wait,
 | 
			
		||||
                mngr,
 | 
			
		||||
                unwrapped,
 | 
			
		||||
                all_entered,
 | 
			
		||||
                parent_exit,
 | 
			
		||||
                seed,
 | 
			
		||||
            )
 | 
			
		||||
    try:
 | 
			
		||||
        async with (
 | 
			
		||||
            #
 | 
			
		||||
            # ?TODO, does including these (eg-collapsing,
 | 
			
		||||
            # taskc-unmasking) improve tb noise-reduction/legibility?
 | 
			
		||||
            #
 | 
			
		||||
            # collapse_eg(),
 | 
			
		||||
            maybe_open_nursery(
 | 
			
		||||
                nursery=tn,
 | 
			
		||||
            ) as tn,
 | 
			
		||||
            # maybe_raise_from_masking_exc(),
 | 
			
		||||
        ):
 | 
			
		||||
            for mngr in mngrs:
 | 
			
		||||
                tn.start_soon(
 | 
			
		||||
                    _enter_and_wait,
 | 
			
		||||
                    mngr,
 | 
			
		||||
                    unwrapped,
 | 
			
		||||
                    all_entered,
 | 
			
		||||
                    parent_exit,
 | 
			
		||||
                    seed,
 | 
			
		||||
                )
 | 
			
		||||
 | 
			
		||||
        # deliver control once all managers have started up
 | 
			
		||||
        await all_entered.wait()
 | 
			
		||||
 | 
			
		||||
        try:
 | 
			
		||||
            # deliver control to caller once all ctx-managers have
 | 
			
		||||
            # started (yielded back to us).
 | 
			
		||||
            await all_entered.wait()
 | 
			
		||||
            yield tuple(unwrapped.values())
 | 
			
		||||
        finally:
 | 
			
		||||
            # XXX NOTE: this is ABSOLUTELY REQUIRED to avoid
 | 
			
		||||
            # the following wacky bug:
 | 
			
		||||
            # <tractorbugurlhere>
 | 
			
		||||
            parent_exit.set()
 | 
			
		||||
 | 
			
		||||
    finally:
 | 
			
		||||
        # XXX NOTE: this is ABSOLUTELY REQUIRED to avoid
 | 
			
		||||
        # the following wacky bug:
 | 
			
		||||
        # <tractorbugurlhere>
 | 
			
		||||
        parent_exit.set()
 | 
			
		||||
 | 
			
		||||
 | 
			
		||||
# Per actor task caching helpers.
 | 
			
		||||
# Further potential examples of interest:
 | 
			
		||||
| 
						 | 
				
			
			@ -233,6 +245,9 @@ async def maybe_open_context(
 | 
			
		|||
    kwargs: dict = {},
 | 
			
		||||
    key: Hashable | Callable[..., Hashable] = None,
 | 
			
		||||
 | 
			
		||||
    # caller can provide their own scope
 | 
			
		||||
    tn: trio.Nursery|None = None,
 | 
			
		||||
 | 
			
		||||
) -> AsyncIterator[tuple[bool, T]]:
 | 
			
		||||
    '''
 | 
			
		||||
    Maybe open an async-context-manager (acm) if there is not already
 | 
			
		||||
| 
						 | 
				
			
			@ -265,7 +280,23 @@ async def maybe_open_context(
 | 
			
		|||
    # have it not be closed until all consumers have exited (which is
 | 
			
		||||
    # currently difficult to implement any other way besides using our
 | 
			
		||||
    # pre-allocated runtime instance..)
 | 
			
		||||
    service_n: trio.Nursery = current_actor()._service_n
 | 
			
		||||
    if tn:
 | 
			
		||||
        # TODO, assert tn is eventual parent of this task!
 | 
			
		||||
        task: trio.Task = trio.lowlevel.current_task()
 | 
			
		||||
        task_tn: trio.Nursery = task.parent_nursery
 | 
			
		||||
        if not tn._cancel_status.encloses(
 | 
			
		||||
            task_tn._cancel_status
 | 
			
		||||
        ):
 | 
			
		||||
            raise RuntimeError(
 | 
			
		||||
                f'Mis-nesting of task under provided {tn} !?\n'
 | 
			
		||||
                f'Current task is NOT a child(-ish)!!\n'
 | 
			
		||||
                f'\n'
 | 
			
		||||
                f'task: {task}\n'
 | 
			
		||||
                f'task_tn: {task_tn}\n'
 | 
			
		||||
            )
 | 
			
		||||
        service_n = tn
 | 
			
		||||
    else:
 | 
			
		||||
        service_n: trio.Nursery = current_actor()._service_n
 | 
			
		||||
 | 
			
		||||
    # TODO: is there any way to allocate
 | 
			
		||||
    # a 'stays-open-till-last-task-finshed nursery?
 | 
			
		||||
| 
						 | 
				
			
			@ -273,39 +304,33 @@ async def maybe_open_context(
 | 
			
		|||
    # async with maybe_open_nursery(_Cache.service_n) as service_n:
 | 
			
		||||
    #     _Cache.service_n = service_n
 | 
			
		||||
 | 
			
		||||
    cache_miss_ke: KeyError|None = None
 | 
			
		||||
    maybe_taskc: trio.Cancelled|None = None
 | 
			
		||||
    try:
 | 
			
		||||
        # **critical section** that should prevent other tasks from
 | 
			
		||||
        # checking the _Cache until complete otherwise the scheduler
 | 
			
		||||
        # may switch and by accident we create more then one resource.
 | 
			
		||||
        yielded = _Cache.values[ctx_key]
 | 
			
		||||
 | 
			
		||||
    except KeyError:
 | 
			
		||||
        log.debug(
 | 
			
		||||
            f'Allocating new @acm-func entry\n'
 | 
			
		||||
            f'ctx_key={ctx_key}\n'
 | 
			
		||||
            f'acm_func={acm_func}\n'
 | 
			
		||||
        )
 | 
			
		||||
        mngr = acm_func(**kwargs)
 | 
			
		||||
        resources = _Cache.resources
 | 
			
		||||
        assert not resources.get(ctx_key), f'Resource exists? {ctx_key}'
 | 
			
		||||
        resources[ctx_key] = (service_n, trio.Event())
 | 
			
		||||
 | 
			
		||||
        # sync up to the mngr's yielded value
 | 
			
		||||
    except KeyError as _ke:
 | 
			
		||||
        # XXX, stay mutexed up to cache-miss yield
 | 
			
		||||
        try:
 | 
			
		||||
            cache_miss_ke = _ke
 | 
			
		||||
            log.debug(
 | 
			
		||||
                f'Allocating new @acm-func entry\n'
 | 
			
		||||
                f'ctx_key={ctx_key}\n'
 | 
			
		||||
                f'acm_func={acm_func}\n'
 | 
			
		||||
            )
 | 
			
		||||
            mngr = acm_func(**kwargs)
 | 
			
		||||
            resources = _Cache.resources
 | 
			
		||||
            assert not resources.get(ctx_key), f'Resource exists? {ctx_key}'
 | 
			
		||||
            resources[ctx_key] = (service_n, trio.Event())
 | 
			
		||||
            yielded: Any = await service_n.start(
 | 
			
		||||
                _Cache.run_ctx,
 | 
			
		||||
                mngr,
 | 
			
		||||
                ctx_key,
 | 
			
		||||
            )
 | 
			
		||||
            _Cache.users += 1
 | 
			
		||||
        except trio.Cancelled as taskc:
 | 
			
		||||
            log.cancel(
 | 
			
		||||
                f'Cancelled during caching?\n'
 | 
			
		||||
                f'\n'
 | 
			
		||||
                f'ctx_key: {ctx_key!r}\n'
 | 
			
		||||
                f'mngr: {mngr!r}\n'
 | 
			
		||||
            )
 | 
			
		||||
            raise taskc
 | 
			
		||||
        finally:
 | 
			
		||||
            # XXX, since this runs from an `except` it's a checkpoint
 | 
			
		||||
            # whih can be `trio.Cancelled`-masked.
 | 
			
		||||
| 
						 | 
				
			
			@ -318,10 +343,27 @@ async def maybe_open_context(
 | 
			
		|||
            # SO just always unlock!
 | 
			
		||||
            lock.release()
 | 
			
		||||
 | 
			
		||||
        yield (
 | 
			
		||||
            False,  # cache_hit = "no"
 | 
			
		||||
            yielded,
 | 
			
		||||
        )
 | 
			
		||||
        try:
 | 
			
		||||
            yield (
 | 
			
		||||
                False,  # cache_hit = "no"
 | 
			
		||||
                yielded,
 | 
			
		||||
            )
 | 
			
		||||
        except trio.Cancelled as taskc:
 | 
			
		||||
            maybe_taskc = taskc
 | 
			
		||||
            log.cancel(
 | 
			
		||||
                f'Cancelled from cache-miss entry\n'
 | 
			
		||||
                f'\n'
 | 
			
		||||
                f'ctx_key: {ctx_key!r}\n'
 | 
			
		||||
                f'mngr: {mngr!r}\n'
 | 
			
		||||
            )
 | 
			
		||||
            # XXX, always unset ke from cancelled context
 | 
			
		||||
            # since we never consider it a masked exc case!
 | 
			
		||||
            # - bc this can be called directly ty `._rpc._invoke()`?
 | 
			
		||||
            #
 | 
			
		||||
            if maybe_taskc.__context__ is cache_miss_ke:
 | 
			
		||||
                maybe_taskc.__context__ = None
 | 
			
		||||
 | 
			
		||||
            raise taskc
 | 
			
		||||
 | 
			
		||||
    else:
 | 
			
		||||
        _Cache.users += 1
 | 
			
		||||
| 
						 | 
				
			
			@ -341,6 +383,13 @@ async def maybe_open_context(
 | 
			
		|||
        )
 | 
			
		||||
 | 
			
		||||
    finally:
 | 
			
		||||
        if lock.locked():
 | 
			
		||||
            stats: trio.LockStatistics = lock.statistics()
 | 
			
		||||
            log.error(
 | 
			
		||||
                f'Lock left locked by last owner !?\n'
 | 
			
		||||
                f'{stats}\n'
 | 
			
		||||
            )
 | 
			
		||||
 | 
			
		||||
        _Cache.users -= 1
 | 
			
		||||
 | 
			
		||||
        if yielded is not None:
 | 
			
		||||
| 
						 | 
				
			
			
 | 
			
		|||
| 
						 | 
				
			
			@ -22,7 +22,10 @@ from __future__ import annotations
 | 
			
		|||
from contextlib import (
 | 
			
		||||
    asynccontextmanager as acm,
 | 
			
		||||
)
 | 
			
		||||
from typing import TYPE_CHECKING
 | 
			
		||||
from typing import (
 | 
			
		||||
    Type,
 | 
			
		||||
    TYPE_CHECKING,
 | 
			
		||||
)
 | 
			
		||||
 | 
			
		||||
import trio
 | 
			
		||||
from tractor.log import get_logger
 | 
			
		||||
| 
						 | 
				
			
			@ -80,9 +83,19 @@ async def maybe_raise_from_masking_exc(
 | 
			
		|||
        # ^TODO? other cases?
 | 
			
		||||
    ),
 | 
			
		||||
 | 
			
		||||
    always_warn_on: tuple[BaseException] = (
 | 
			
		||||
    always_warn_on: tuple[Type[BaseException]] = (
 | 
			
		||||
        trio.Cancelled,
 | 
			
		||||
    ),
 | 
			
		||||
 | 
			
		||||
    # don't ever unmask or warn on any masking pair,
 | 
			
		||||
    # {<masked-excT-key> -> <masking-excT-value>}
 | 
			
		||||
    never_warn_on: dict[
 | 
			
		||||
        Type[BaseException],
 | 
			
		||||
        Type[BaseException],
 | 
			
		||||
    ] = {
 | 
			
		||||
        KeyboardInterrupt: trio.Cancelled,
 | 
			
		||||
        trio.Cancelled: trio.Cancelled,
 | 
			
		||||
    },
 | 
			
		||||
    # ^XXX, special case(s) where we warn-log bc likely
 | 
			
		||||
    # there will be no operational diff since the exc
 | 
			
		||||
    # is always expected to be consumed.
 | 
			
		||||
| 
						 | 
				
			
			@ -144,7 +157,10 @@ async def maybe_raise_from_masking_exc(
 | 
			
		|||
            maybe_masker=exc_match,
 | 
			
		||||
            unmask_from=set(unmask_from),
 | 
			
		||||
        ):
 | 
			
		||||
            masked.append((exc_ctx, exc_match))
 | 
			
		||||
            masked.append((
 | 
			
		||||
                exc_ctx,
 | 
			
		||||
                exc_match,
 | 
			
		||||
            ))
 | 
			
		||||
            boxed_maybe_exc.value = exc_match
 | 
			
		||||
            note: str = (
 | 
			
		||||
                f'\n'
 | 
			
		||||
| 
						 | 
				
			
			@ -156,18 +172,36 @@ async def maybe_raise_from_masking_exc(
 | 
			
		|||
                    f'\n'
 | 
			
		||||
                    f'{extra_note}\n'
 | 
			
		||||
                )
 | 
			
		||||
            exc_ctx.add_note(note)
 | 
			
		||||
 | 
			
		||||
            if type(exc_match) in always_warn_on:
 | 
			
		||||
            do_warn: bool = (
 | 
			
		||||
                never_warn_on.get(
 | 
			
		||||
                    type(exc_ctx)  # masking type
 | 
			
		||||
                )
 | 
			
		||||
                is not
 | 
			
		||||
                type(exc_match)  # masked type
 | 
			
		||||
            )
 | 
			
		||||
 | 
			
		||||
            if do_warn:
 | 
			
		||||
                exc_ctx.add_note(note)
 | 
			
		||||
 | 
			
		||||
            if (
 | 
			
		||||
                do_warn
 | 
			
		||||
                and
 | 
			
		||||
                type(exc_match) in always_warn_on
 | 
			
		||||
            ):
 | 
			
		||||
                log.warning(note)
 | 
			
		||||
 | 
			
		||||
            if raise_unmasked:
 | 
			
		||||
 | 
			
		||||
            if (
 | 
			
		||||
                do_warn
 | 
			
		||||
                and
 | 
			
		||||
                raise_unmasked
 | 
			
		||||
            ):
 | 
			
		||||
                if len(masked) < 2:
 | 
			
		||||
                    raise exc_ctx from exc_match
 | 
			
		||||
 | 
			
		||||
                # ??TODO, see above but, possibly unmasking sub-exc
 | 
			
		||||
                # entries if there are > 1
 | 
			
		||||
                # else:
 | 
			
		||||
                #     # ?TODO, see above but, possibly unmasking sub-exc
 | 
			
		||||
                #     # entries if there are > 1
 | 
			
		||||
                #     await pause(shield=True)
 | 
			
		||||
    else:
 | 
			
		||||
        raise
 | 
			
		||||
| 
						 | 
				
			
			
 | 
			
		|||
		Loading…
	
		Reference in New Issue