Compare commits
	
		
			10 Commits 
		
	
	
		
			d4ca1a15a5
			...
			7e49ac678b
		
	
	| Author | SHA1 | Date | 
|---|---|---|
| 
							
							
								 | 
						7e49ac678b | |
| 
							
							
								 | 
						7a075494f1 | |
| 
							
							
								 | 
						c3aa29e7fa | |
| 
							
							
								 | 
						9f6acf9ac3 | |
| 
							
							
								 | 
						2a69d179e6 | |
| 
							
							
								 | 
						c51a49b045 | |
| 
							
							
								 | 
						6627a3bfda | |
| 
							
							
								 | 
						285ebba4b1 | |
| 
							
							
								 | 
						20628cc0b8 | |
| 
							
							
								 | 
						2536c5b3d2 | 
| 
						 | 
				
			
			@ -72,11 +72,13 @@ def test_resource_only_entered_once(key_on):
 | 
			
		|||
        with trio.move_on_after(0.5):
 | 
			
		||||
            async with (
 | 
			
		||||
                tractor.open_root_actor(),
 | 
			
		||||
                trio.open_nursery() as n,
 | 
			
		||||
                trio.open_nursery() as tn,
 | 
			
		||||
            ):
 | 
			
		||||
 | 
			
		||||
                for i in range(10):
 | 
			
		||||
                    n.start_soon(enter_cached_mngr, f'task_{i}')
 | 
			
		||||
                    tn.start_soon(
 | 
			
		||||
                        enter_cached_mngr,
 | 
			
		||||
                        f'task_{i}',
 | 
			
		||||
                    )
 | 
			
		||||
                    await trio.sleep(0.001)
 | 
			
		||||
 | 
			
		||||
    trio.run(main)
 | 
			
		||||
| 
						 | 
				
			
			@ -98,23 +100,34 @@ async def streamer(
 | 
			
		|||
 | 
			
		||||
 | 
			
		||||
@acm
 | 
			
		||||
async def open_stream() -> Awaitable[tractor.MsgStream]:
 | 
			
		||||
 | 
			
		||||
async def open_stream() -> Awaitable[
 | 
			
		||||
    tuple[
 | 
			
		||||
        tractor.ActorNursery,
 | 
			
		||||
        tractor.MsgStream,
 | 
			
		||||
    ]
 | 
			
		||||
]:
 | 
			
		||||
    try:
 | 
			
		||||
        async with tractor.open_nursery() as an:
 | 
			
		||||
            portal = await an.start_actor(
 | 
			
		||||
                'streamer',
 | 
			
		||||
                enable_modules=[__name__],
 | 
			
		||||
            )
 | 
			
		||||
            async with (
 | 
			
		||||
                portal.open_context(streamer) as (ctx, first),
 | 
			
		||||
                ctx.open_stream() as stream,
 | 
			
		||||
            ):
 | 
			
		||||
                yield stream
 | 
			
		||||
            try:
 | 
			
		||||
                async with (
 | 
			
		||||
                    portal.open_context(streamer) as (ctx, first),
 | 
			
		||||
                    ctx.open_stream() as stream,
 | 
			
		||||
                ):
 | 
			
		||||
                    print('Entered open_stream() caller')
 | 
			
		||||
                    yield an, stream
 | 
			
		||||
                    print('Exited open_stream() caller')
 | 
			
		||||
 | 
			
		||||
            print('Cancelling streamer')
 | 
			
		||||
            await portal.cancel_actor()
 | 
			
		||||
            print('Cancelled streamer')
 | 
			
		||||
            finally:
 | 
			
		||||
                print(
 | 
			
		||||
                    'Cancelling streamer with,\n'
 | 
			
		||||
                    '=> `Portal.cancel_actor()`'
 | 
			
		||||
                )
 | 
			
		||||
                await portal.cancel_actor()
 | 
			
		||||
                print('Cancelled streamer')
 | 
			
		||||
 | 
			
		||||
    except Exception as err:
 | 
			
		||||
        print(
 | 
			
		||||
| 
						 | 
				
			
			@ -130,8 +143,12 @@ async def maybe_open_stream(taskname: str):
 | 
			
		|||
    async with tractor.trionics.maybe_open_context(
 | 
			
		||||
        # NOTE: all secondary tasks should cache hit on the same key
 | 
			
		||||
        acm_func=open_stream,
 | 
			
		||||
    ) as (cache_hit, stream):
 | 
			
		||||
 | 
			
		||||
    ) as (
 | 
			
		||||
        cache_hit,
 | 
			
		||||
        (an, stream)
 | 
			
		||||
    ):
 | 
			
		||||
        # when the actor + portal + ctx + stream has already been
 | 
			
		||||
        # allocated we want to just bcast to this task.
 | 
			
		||||
        if cache_hit:
 | 
			
		||||
            print(f'{taskname} loaded from cache')
 | 
			
		||||
 | 
			
		||||
| 
						 | 
				
			
			@ -139,10 +156,43 @@ async def maybe_open_stream(taskname: str):
 | 
			
		|||
            # if this feed is already allocated by the first
 | 
			
		||||
            # task that entereed
 | 
			
		||||
            async with stream.subscribe() as bstream:
 | 
			
		||||
                yield bstream
 | 
			
		||||
                yield an, bstream
 | 
			
		||||
                print(
 | 
			
		||||
                    f'cached task exited\n'
 | 
			
		||||
                    f')>\n'
 | 
			
		||||
                    f' |_{taskname}\n'
 | 
			
		||||
                )
 | 
			
		||||
 | 
			
		||||
            # we should always unreg the "cloned" bcrc for this
 | 
			
		||||
            # consumer-task
 | 
			
		||||
            assert id(bstream) not in bstream._state.subs
 | 
			
		||||
 | 
			
		||||
        else:
 | 
			
		||||
            # yield the actual stream
 | 
			
		||||
            yield stream
 | 
			
		||||
            try:
 | 
			
		||||
                yield an, stream
 | 
			
		||||
            finally:
 | 
			
		||||
                print(
 | 
			
		||||
                    f'NON-cached task exited\n'
 | 
			
		||||
                    f')>\n'
 | 
			
		||||
                    f' |_{taskname}\n'
 | 
			
		||||
                )
 | 
			
		||||
 | 
			
		||||
        first_bstream = stream._broadcaster
 | 
			
		||||
        bcrx_state = first_bstream._state
 | 
			
		||||
        subs: dict[int, int] = bcrx_state.subs
 | 
			
		||||
        if len(subs) == 1:
 | 
			
		||||
            assert id(first_bstream) in subs
 | 
			
		||||
            # ^^TODO! the bcrx should always de-allocate all subs,
 | 
			
		||||
            # including the implicit first one allocated on entry
 | 
			
		||||
            # by the first subscribing peer task, no?
 | 
			
		||||
            #
 | 
			
		||||
            # -[ ] adjust `MsgStream.subscribe()` to do this mgmt!
 | 
			
		||||
            #  |_ allows reverting `MsgStream.receive()` to the
 | 
			
		||||
            #    non-bcaster method.
 | 
			
		||||
            #  |_ we can decide whether to reset `._broadcaster`?
 | 
			
		||||
            #
 | 
			
		||||
            # await tractor.pause(shield=True)
 | 
			
		||||
 | 
			
		||||
 | 
			
		||||
def test_open_local_sub_to_stream(
 | 
			
		||||
| 
						 | 
				
			
			@ -159,16 +209,24 @@ def test_open_local_sub_to_stream(
 | 
			
		|||
 | 
			
		||||
    if debug_mode:
 | 
			
		||||
        timeout = 999
 | 
			
		||||
        print(f'IN debug_mode, setting large timeout={timeout!r}..')
 | 
			
		||||
 | 
			
		||||
    async def main():
 | 
			
		||||
 | 
			
		||||
        full = list(range(1000))
 | 
			
		||||
        an: tractor.ActorNursery|None = None
 | 
			
		||||
        num_tasks: int = 10
 | 
			
		||||
 | 
			
		||||
        async def get_sub_and_pull(taskname: str):
 | 
			
		||||
 | 
			
		||||
            nonlocal an
 | 
			
		||||
 | 
			
		||||
            stream: tractor.MsgStream
 | 
			
		||||
            async with (
 | 
			
		||||
                maybe_open_stream(taskname) as stream,
 | 
			
		||||
                maybe_open_stream(taskname) as (
 | 
			
		||||
                    an,
 | 
			
		||||
                    stream,
 | 
			
		||||
                ),
 | 
			
		||||
            ):
 | 
			
		||||
                if '0' in taskname:
 | 
			
		||||
                    assert isinstance(stream, tractor.MsgStream)
 | 
			
		||||
| 
						 | 
				
			
			@ -180,34 +238,70 @@ def test_open_local_sub_to_stream(
 | 
			
		|||
 | 
			
		||||
                first = await stream.receive()
 | 
			
		||||
                print(f'{taskname} started with value {first}')
 | 
			
		||||
                seq = []
 | 
			
		||||
                seq: list[int] = []
 | 
			
		||||
                async for msg in stream:
 | 
			
		||||
                    seq.append(msg)
 | 
			
		||||
 | 
			
		||||
                assert set(seq).issubset(set(full))
 | 
			
		||||
 | 
			
		||||
            # end of @acm block
 | 
			
		||||
            print(f'{taskname} finished')
 | 
			
		||||
 | 
			
		||||
        root: tractor.Actor
 | 
			
		||||
        with trio.fail_after(timeout) as cs:
 | 
			
		||||
            # TODO: turns out this isn't multi-task entrant XD
 | 
			
		||||
            # We probably need an indepotent entry semantic?
 | 
			
		||||
            async with tractor.open_root_actor(
 | 
			
		||||
                debug_mode=debug_mode,
 | 
			
		||||
            ):
 | 
			
		||||
                # maybe_enable_greenback=True,
 | 
			
		||||
                #
 | 
			
		||||
                # ^TODO? doesn't seem to mk breakpoint() usage work
 | 
			
		||||
                # bc each bg task needs to open a portal??
 | 
			
		||||
                # - [ ] we should consider making this part of
 | 
			
		||||
                #      our taskman defaults?
 | 
			
		||||
                #   |_see https://github.com/goodboy/tractor/pull/363
 | 
			
		||||
                #
 | 
			
		||||
            ) as root:
 | 
			
		||||
                assert root.is_registrar
 | 
			
		||||
 | 
			
		||||
                async with (
 | 
			
		||||
                    trio.open_nursery() as tn,
 | 
			
		||||
                ):
 | 
			
		||||
                    for i in range(10):
 | 
			
		||||
                    for i in range(num_tasks):
 | 
			
		||||
                        tn.start_soon(
 | 
			
		||||
                            get_sub_and_pull,
 | 
			
		||||
                            f'task_{i}',
 | 
			
		||||
                        )
 | 
			
		||||
                        await trio.sleep(0.001)
 | 
			
		||||
 | 
			
		||||
                print('all consumer tasks finished')
 | 
			
		||||
                print('all consumer tasks finished!')
 | 
			
		||||
 | 
			
		||||
                # ?XXX, ensure actor-nursery is shutdown or we might
 | 
			
		||||
                # hang here due to a minor task deadlock/race-condition?
 | 
			
		||||
                #
 | 
			
		||||
                # - seems that all we need is a checkpoint to ensure
 | 
			
		||||
                #   the last suspended task, which is inside
 | 
			
		||||
                #   `.maybe_open_context()`, can do the
 | 
			
		||||
                #   `Portal.cancel_actor()` call?
 | 
			
		||||
                #
 | 
			
		||||
                # - if that bg task isn't resumed, then this blocks
 | 
			
		||||
                #   timeout might hit before that?
 | 
			
		||||
                #
 | 
			
		||||
                if root.ipc_server.has_peers():
 | 
			
		||||
                    await trio.lowlevel.checkpoint()
 | 
			
		||||
 | 
			
		||||
                    # alt approach, cancel the entire `an`
 | 
			
		||||
                    # await tractor.pause()
 | 
			
		||||
                    # await an.cancel()
 | 
			
		||||
 | 
			
		||||
            # end of runtime scope
 | 
			
		||||
            print('root actor terminated.')
 | 
			
		||||
 | 
			
		||||
        if cs.cancelled_caught:
 | 
			
		||||
            pytest.fail(
 | 
			
		||||
                'Should NOT time out in `open_root_actor()` ?'
 | 
			
		||||
            )
 | 
			
		||||
 | 
			
		||||
        print('exiting main.')
 | 
			
		||||
 | 
			
		||||
    trio.run(main)
 | 
			
		||||
| 
						 | 
				
			
			
 | 
			
		|||
| 
						 | 
				
			
			@ -67,7 +67,6 @@ async def ensure_sequence(
 | 
			
		|||
 | 
			
		||||
@acm
 | 
			
		||||
async def open_sequence_streamer(
 | 
			
		||||
 | 
			
		||||
    sequence: list[int],
 | 
			
		||||
    reg_addr: tuple[str, int],
 | 
			
		||||
    start_method: str,
 | 
			
		||||
| 
						 | 
				
			
			@ -96,39 +95,43 @@ async def open_sequence_streamer(
 | 
			
		|||
 | 
			
		||||
 | 
			
		||||
def test_stream_fan_out_to_local_subscriptions(
 | 
			
		||||
    reg_addr,
 | 
			
		||||
    debug_mode: bool,
 | 
			
		||||
    reg_addr: tuple,
 | 
			
		||||
    start_method,
 | 
			
		||||
):
 | 
			
		||||
 | 
			
		||||
    sequence = list(range(1000))
 | 
			
		||||
 | 
			
		||||
    async def main():
 | 
			
		||||
        with trio.fail_after(9):
 | 
			
		||||
            async with open_sequence_streamer(
 | 
			
		||||
                sequence,
 | 
			
		||||
                reg_addr,
 | 
			
		||||
                start_method,
 | 
			
		||||
            ) as stream:
 | 
			
		||||
 | 
			
		||||
        async with open_sequence_streamer(
 | 
			
		||||
            sequence,
 | 
			
		||||
            reg_addr,
 | 
			
		||||
            start_method,
 | 
			
		||||
        ) as stream:
 | 
			
		||||
                async with (
 | 
			
		||||
                    collapse_eg(),
 | 
			
		||||
                    trio.open_nursery() as tn,
 | 
			
		||||
                ):
 | 
			
		||||
                    for i in range(10):
 | 
			
		||||
                        tn.start_soon(
 | 
			
		||||
                            ensure_sequence,
 | 
			
		||||
                            stream,
 | 
			
		||||
                            sequence.copy(),
 | 
			
		||||
                            name=f'consumer_{i}',
 | 
			
		||||
                        )
 | 
			
		||||
 | 
			
		||||
            async with trio.open_nursery() as n:
 | 
			
		||||
                for i in range(10):
 | 
			
		||||
                    n.start_soon(
 | 
			
		||||
                        ensure_sequence,
 | 
			
		||||
                        stream,
 | 
			
		||||
                        sequence.copy(),
 | 
			
		||||
                        name=f'consumer_{i}',
 | 
			
		||||
                    )
 | 
			
		||||
                    await stream.send(tuple(sequence))
 | 
			
		||||
 | 
			
		||||
                await stream.send(tuple(sequence))
 | 
			
		||||
                    async for value in stream:
 | 
			
		||||
                        print(f'source stream rx: {value}')
 | 
			
		||||
                        assert value == sequence[0]
 | 
			
		||||
                        sequence.remove(value)
 | 
			
		||||
 | 
			
		||||
                async for value in stream:
 | 
			
		||||
                    print(f'source stream rx: {value}')
 | 
			
		||||
                    assert value == sequence[0]
 | 
			
		||||
                    sequence.remove(value)
 | 
			
		||||
 | 
			
		||||
                    if not sequence:
 | 
			
		||||
                        # fully consumed
 | 
			
		||||
                        break
 | 
			
		||||
                        if not sequence:
 | 
			
		||||
                            # fully consumed
 | 
			
		||||
                            break
 | 
			
		||||
 | 
			
		||||
    trio.run(main)
 | 
			
		||||
 | 
			
		||||
| 
						 | 
				
			
			@ -151,67 +154,69 @@ def test_consumer_and_parent_maybe_lag(
 | 
			
		|||
        sequence = list(range(300))
 | 
			
		||||
        parent_delay, sub_delay = task_delays
 | 
			
		||||
 | 
			
		||||
        async with open_sequence_streamer(
 | 
			
		||||
            sequence,
 | 
			
		||||
            reg_addr,
 | 
			
		||||
            start_method,
 | 
			
		||||
        ) as stream:
 | 
			
		||||
        # TODO, maybe mak a cm-deco for main()s?
 | 
			
		||||
        with trio.fail_after(3):
 | 
			
		||||
            async with open_sequence_streamer(
 | 
			
		||||
                sequence,
 | 
			
		||||
                reg_addr,
 | 
			
		||||
                start_method,
 | 
			
		||||
            ) as stream:
 | 
			
		||||
 | 
			
		||||
            try:
 | 
			
		||||
                async with (
 | 
			
		||||
                    collapse_eg(),
 | 
			
		||||
                    trio.open_nursery() as tn,
 | 
			
		||||
                ):
 | 
			
		||||
                try:
 | 
			
		||||
                    async with (
 | 
			
		||||
                        collapse_eg(),
 | 
			
		||||
                        trio.open_nursery() as tn,
 | 
			
		||||
                    ):
 | 
			
		||||
 | 
			
		||||
                    tn.start_soon(
 | 
			
		||||
                        ensure_sequence,
 | 
			
		||||
                        stream,
 | 
			
		||||
                        sequence.copy(),
 | 
			
		||||
                        sub_delay,
 | 
			
		||||
                        name='consumer_task',
 | 
			
		||||
                    )
 | 
			
		||||
                        tn.start_soon(
 | 
			
		||||
                            ensure_sequence,
 | 
			
		||||
                            stream,
 | 
			
		||||
                            sequence.copy(),
 | 
			
		||||
                            sub_delay,
 | 
			
		||||
                            name='consumer_task',
 | 
			
		||||
                        )
 | 
			
		||||
 | 
			
		||||
                    await stream.send(tuple(sequence))
 | 
			
		||||
                        await stream.send(tuple(sequence))
 | 
			
		||||
 | 
			
		||||
                    # async for value in stream:
 | 
			
		||||
                    lagged = False
 | 
			
		||||
                    lag_count = 0
 | 
			
		||||
                        # async for value in stream:
 | 
			
		||||
                        lagged = False
 | 
			
		||||
                        lag_count = 0
 | 
			
		||||
 | 
			
		||||
                    while True:
 | 
			
		||||
                        try:
 | 
			
		||||
                            value = await stream.receive()
 | 
			
		||||
                            print(f'source stream rx: {value}')
 | 
			
		||||
                        while True:
 | 
			
		||||
                            try:
 | 
			
		||||
                                value = await stream.receive()
 | 
			
		||||
                                print(f'source stream rx: {value}')
 | 
			
		||||
 | 
			
		||||
                            if lagged:
 | 
			
		||||
                                # re set the sequence starting at our last
 | 
			
		||||
                                # value
 | 
			
		||||
                                sequence = sequence[sequence.index(value) + 1:]
 | 
			
		||||
                            else:
 | 
			
		||||
                                assert value == sequence[0]
 | 
			
		||||
                                sequence.remove(value)
 | 
			
		||||
                                if lagged:
 | 
			
		||||
                                    # re set the sequence starting at our last
 | 
			
		||||
                                    # value
 | 
			
		||||
                                    sequence = sequence[sequence.index(value) + 1:]
 | 
			
		||||
                                else:
 | 
			
		||||
                                    assert value == sequence[0]
 | 
			
		||||
                                    sequence.remove(value)
 | 
			
		||||
 | 
			
		||||
                            lagged = False
 | 
			
		||||
                                lagged = False
 | 
			
		||||
 | 
			
		||||
                        except Lagged:
 | 
			
		||||
                            lagged = True
 | 
			
		||||
                            print(f'source stream lagged after {value}')
 | 
			
		||||
                            lag_count += 1
 | 
			
		||||
                            continue
 | 
			
		||||
                            except Lagged:
 | 
			
		||||
                                lagged = True
 | 
			
		||||
                                print(f'source stream lagged after {value}')
 | 
			
		||||
                                lag_count += 1
 | 
			
		||||
                                continue
 | 
			
		||||
 | 
			
		||||
                        # lag the parent
 | 
			
		||||
                        await trio.sleep(parent_delay)
 | 
			
		||||
                            # lag the parent
 | 
			
		||||
                            await trio.sleep(parent_delay)
 | 
			
		||||
 | 
			
		||||
                        if not sequence:
 | 
			
		||||
                            # fully consumed
 | 
			
		||||
                            break
 | 
			
		||||
                    print(f'parent + source stream lagged: {lag_count}')
 | 
			
		||||
                            if not sequence:
 | 
			
		||||
                                # fully consumed
 | 
			
		||||
                                break
 | 
			
		||||
                        print(f'parent + source stream lagged: {lag_count}')
 | 
			
		||||
 | 
			
		||||
                    if parent_delay > sub_delay:
 | 
			
		||||
                        assert lag_count > 0
 | 
			
		||||
                        if parent_delay > sub_delay:
 | 
			
		||||
                            assert lag_count > 0
 | 
			
		||||
 | 
			
		||||
            except Lagged:
 | 
			
		||||
                # child was lagged
 | 
			
		||||
                assert parent_delay < sub_delay
 | 
			
		||||
                except Lagged:
 | 
			
		||||
                    # child was lagged
 | 
			
		||||
                    assert parent_delay < sub_delay
 | 
			
		||||
 | 
			
		||||
    trio.run(main)
 | 
			
		||||
 | 
			
		||||
| 
						 | 
				
			
			@ -285,7 +290,11 @@ def test_faster_task_to_recv_is_cancelled_by_slower(
 | 
			
		|||
 | 
			
		||||
 | 
			
		||||
def test_subscribe_errors_after_close():
 | 
			
		||||
    '''
 | 
			
		||||
    Verify after calling `BroadcastReceiver.aclose()` you can't
 | 
			
		||||
    "re-open" it via `.subscribe()`.
 | 
			
		||||
 | 
			
		||||
    '''
 | 
			
		||||
    async def main():
 | 
			
		||||
 | 
			
		||||
        size = 1
 | 
			
		||||
| 
						 | 
				
			
			@ -293,6 +302,8 @@ def test_subscribe_errors_after_close():
 | 
			
		|||
        async with broadcast_receiver(rx, size) as brx:
 | 
			
		||||
            pass
 | 
			
		||||
 | 
			
		||||
        assert brx.key not in brx._state.subs
 | 
			
		||||
 | 
			
		||||
        try:
 | 
			
		||||
            # open and close
 | 
			
		||||
            async with brx.subscribe():
 | 
			
		||||
| 
						 | 
				
			
			@ -302,7 +313,7 @@ def test_subscribe_errors_after_close():
 | 
			
		|||
            assert brx.key not in brx._state.subs
 | 
			
		||||
 | 
			
		||||
        else:
 | 
			
		||||
            assert 0
 | 
			
		||||
            pytest.fail('brx.subscribe() never raised!?')
 | 
			
		||||
 | 
			
		||||
    trio.run(main)
 | 
			
		||||
 | 
			
		||||
| 
						 | 
				
			
			
 | 
			
		|||
| 
						 | 
				
			
			@ -300,7 +300,7 @@ class Portal:
 | 
			
		|||
        )
 | 
			
		||||
 | 
			
		||||
        # XXX the one spot we set it?
 | 
			
		||||
        self.channel._cancel_called: bool = True
 | 
			
		||||
        chan._cancel_called: bool = True
 | 
			
		||||
        try:
 | 
			
		||||
            # send cancel cmd - might not get response
 | 
			
		||||
            # XXX: sure would be nice to make this work with
 | 
			
		||||
| 
						 | 
				
			
			
 | 
			
		|||
| 
						 | 
				
			
			@ -552,6 +552,14 @@ class Actor:
 | 
			
		|||
            )
 | 
			
		||||
            raise
 | 
			
		||||
 | 
			
		||||
    # ?TODO, factor this meth-iface into a new `.rpc` subsys primitive?
 | 
			
		||||
    # - _get_rpc_func(),
 | 
			
		||||
    # - _deliver_ctx_payload(),
 | 
			
		||||
    # - get_context(),
 | 
			
		||||
    # - start_remote_task(),
 | 
			
		||||
    # - cancel_rpc_tasks(),
 | 
			
		||||
    # - _cancel_task(),
 | 
			
		||||
    #
 | 
			
		||||
    def _get_rpc_func(self, ns, funcname):
 | 
			
		||||
        '''
 | 
			
		||||
        Try to lookup and return a target RPC func from the
 | 
			
		||||
| 
						 | 
				
			
			@ -1119,14 +1127,6 @@ class Actor:
 | 
			
		|||
        self._cancel_complete.set()
 | 
			
		||||
        return True
 | 
			
		||||
 | 
			
		||||
    # XXX: hard kill logic if needed?
 | 
			
		||||
    # def _hard_mofo_kill(self):
 | 
			
		||||
    #     # If we're the root actor or zombied kill everything
 | 
			
		||||
    #     if self._parent_chan is None:  # TODO: more robust check
 | 
			
		||||
    #         root = trio.lowlevel.current_root_task()
 | 
			
		||||
    #         for n in root.child_nurseries:
 | 
			
		||||
    #             n.cancel_scope.cancel()
 | 
			
		||||
 | 
			
		||||
    async def _cancel_task(
 | 
			
		||||
        self,
 | 
			
		||||
        cid: str,
 | 
			
		||||
| 
						 | 
				
			
			@ -1361,25 +1361,13 @@ class Actor:
 | 
			
		|||
        '''
 | 
			
		||||
        return self.accept_addrs[0]
 | 
			
		||||
 | 
			
		||||
    def get_parent(self) -> Portal:
 | 
			
		||||
        '''
 | 
			
		||||
        Return a `Portal` to our parent.
 | 
			
		||||
 | 
			
		||||
        '''
 | 
			
		||||
        assert self._parent_chan, "No parent channel for this actor?"
 | 
			
		||||
        return Portal(self._parent_chan)
 | 
			
		||||
 | 
			
		||||
    def get_chans(
 | 
			
		||||
        self,
 | 
			
		||||
        uid: tuple[str, str],
 | 
			
		||||
 | 
			
		||||
    ) -> list[Channel]:
 | 
			
		||||
        '''
 | 
			
		||||
        Return all IPC channels to the actor with provided `uid`.
 | 
			
		||||
 | 
			
		||||
        '''
 | 
			
		||||
        return self._ipc_server._peers[uid]
 | 
			
		||||
 | 
			
		||||
    # TODO, this should delegate ONLY to the
 | 
			
		||||
    # `._spawn_spec._runtime_vars: dict` / `._state` APIs?
 | 
			
		||||
    #
 | 
			
		||||
    # XXX, AH RIGHT that's why..
 | 
			
		||||
    #   it's bc we pass this as a CLI flag to the child.py precisely
 | 
			
		||||
    #   bc we need the bootstrapping pre `async_main()`.. but maybe
 | 
			
		||||
    #   keep this as an impl deat and not part of the pub iface impl?
 | 
			
		||||
    def is_infected_aio(self) -> bool:
 | 
			
		||||
        '''
 | 
			
		||||
        If `True`, this actor is running `trio` in guest mode on
 | 
			
		||||
| 
						 | 
				
			
			@ -1390,6 +1378,23 @@ class Actor:
 | 
			
		|||
        '''
 | 
			
		||||
        return self._infected_aio
 | 
			
		||||
 | 
			
		||||
    # ?TODO, is this the right type for this method?
 | 
			
		||||
    def get_parent(self) -> Portal:
 | 
			
		||||
        '''
 | 
			
		||||
        Return a `Portal` to our parent.
 | 
			
		||||
 | 
			
		||||
        '''
 | 
			
		||||
        assert self._parent_chan, "No parent channel for this actor?"
 | 
			
		||||
        return Portal(self._parent_chan)
 | 
			
		||||
 | 
			
		||||
    # XXX: hard kill logic if needed?
 | 
			
		||||
    # def _hard_mofo_kill(self):
 | 
			
		||||
    #     # If we're the root actor or zombied kill everything
 | 
			
		||||
    #     if self._parent_chan is None:  # TODO: more robust check
 | 
			
		||||
    #         root = trio.lowlevel.current_root_task()
 | 
			
		||||
    #         for n in root.child_nurseries:
 | 
			
		||||
    #             n.cancel_scope.cancel()
 | 
			
		||||
 | 
			
		||||
 | 
			
		||||
async def async_main(
 | 
			
		||||
    actor: Actor,
 | 
			
		||||
| 
						 | 
				
			
			@ -1755,9 +1760,7 @@ async def async_main(
 | 
			
		|||
                f'   {pformat(ipc_server._peers)}'
 | 
			
		||||
            )
 | 
			
		||||
            log.runtime(teardown_report)
 | 
			
		||||
            await ipc_server.wait_for_no_more_peers(
 | 
			
		||||
                shield=True,
 | 
			
		||||
            )
 | 
			
		||||
            await ipc_server.wait_for_no_more_peers()
 | 
			
		||||
 | 
			
		||||
        teardown_report += (
 | 
			
		||||
            '-]> all peer channels are complete.\n'
 | 
			
		||||
| 
						 | 
				
			
			
 | 
			
		|||
| 
						 | 
				
			
			@ -102,6 +102,9 @@ class MsgStream(trio.abc.Channel):
 | 
			
		|||
        self._eoc: bool|trio.EndOfChannel = False
 | 
			
		||||
        self._closed: bool|trio.ClosedResourceError = False
 | 
			
		||||
 | 
			
		||||
    def is_eoc(self) -> bool|trio.EndOfChannel:
 | 
			
		||||
        return self._eoc
 | 
			
		||||
 | 
			
		||||
    @property
 | 
			
		||||
    def ctx(self) -> Context:
 | 
			
		||||
        '''
 | 
			
		||||
| 
						 | 
				
			
			@ -188,7 +191,14 @@ class MsgStream(trio.abc.Channel):
 | 
			
		|||
 | 
			
		||||
        return pld
 | 
			
		||||
 | 
			
		||||
    async def receive(
 | 
			
		||||
    # XXX NOTE, this is left private because in `.subscribe()` usage
 | 
			
		||||
    # we rebind the public `.recieve()` to a `BroadcastReceiver` but
 | 
			
		||||
    # on `.subscribe().__aexit__()`, for the first task which enters,
 | 
			
		||||
    # we want to revert to this msg-stream-instance's method since
 | 
			
		||||
    # mult-task-tracking provided by the b-caster is then no longer
 | 
			
		||||
    # necessary.
 | 
			
		||||
    #
 | 
			
		||||
    async def _receive(
 | 
			
		||||
        self,
 | 
			
		||||
        hide_tb: bool = False,
 | 
			
		||||
    ):
 | 
			
		||||
| 
						 | 
				
			
			@ -313,6 +323,8 @@ class MsgStream(trio.abc.Channel):
 | 
			
		|||
 | 
			
		||||
            raise src_err
 | 
			
		||||
 | 
			
		||||
    receive = _receive
 | 
			
		||||
 | 
			
		||||
    async def aclose(self) -> list[Exception|dict]:
 | 
			
		||||
        '''
 | 
			
		||||
        Cancel associated remote actor task and local memory channel on
 | 
			
		||||
| 
						 | 
				
			
			@ -528,10 +540,15 @@ class MsgStream(trio.abc.Channel):
 | 
			
		|||
        receiver wrapper.
 | 
			
		||||
 | 
			
		||||
        '''
 | 
			
		||||
        # NOTE: This operation is indempotent and non-reversible, so be
 | 
			
		||||
        # sure you can deal with any (theoretical) overhead of the the
 | 
			
		||||
        # allocated ``BroadcastReceiver`` before calling this method for
 | 
			
		||||
        # the first time.
 | 
			
		||||
        # XXX NOTE, This operation was originally implemented as
 | 
			
		||||
        # indempotent and non-reversible, so you had to be **VERY**
 | 
			
		||||
        # aware of any (theoretical) overhead from the allocated
 | 
			
		||||
        # `BroadcastReceiver.receive()`.
 | 
			
		||||
        #
 | 
			
		||||
        # HOWEVER, NOw we do revert and de-alloc the ._broadcaster
 | 
			
		||||
        # when the final caller (task) exits.
 | 
			
		||||
        #
 | 
			
		||||
        bcast: BroadcastReceiver|None = None
 | 
			
		||||
        if self._broadcaster is None:
 | 
			
		||||
 | 
			
		||||
            bcast = self._broadcaster = broadcast_receiver(
 | 
			
		||||
| 
						 | 
				
			
			@ -541,29 +558,60 @@ class MsgStream(trio.abc.Channel):
 | 
			
		|||
 | 
			
		||||
                # TODO: can remove this kwarg right since
 | 
			
		||||
                # by default behaviour is to do this anyway?
 | 
			
		||||
                receive_afunc=self.receive,
 | 
			
		||||
                receive_afunc=self._receive,
 | 
			
		||||
            )
 | 
			
		||||
 | 
			
		||||
            # NOTE: we override the original stream instance's receive
 | 
			
		||||
            # method to now delegate to the broadcaster's ``.receive()``
 | 
			
		||||
            # such that new subscribers will be copied received values
 | 
			
		||||
            # and this stream doesn't have to expect it's original
 | 
			
		||||
            # consumer(s) to get a new broadcast rx handle.
 | 
			
		||||
            # XXX NOTE, we override the original stream instance's
 | 
			
		||||
            # receive method to instead delegate to the broadcaster's
 | 
			
		||||
            # `.receive()` such that new subscribers (multiple
 | 
			
		||||
            # `trio.Task`s) will be copied received values and the
 | 
			
		||||
            # *first* task to enter here doesn't have to expect its original consumer(s)
 | 
			
		||||
            # to get a new broadcast rx handle; everything happens
 | 
			
		||||
            # underneath this iface seemlessly.
 | 
			
		||||
            #
 | 
			
		||||
            self.receive = bcast.receive  # type: ignore
 | 
			
		||||
            # seems there's no graceful way to type this with ``mypy``?
 | 
			
		||||
            # seems there's no graceful way to type this with `mypy`?
 | 
			
		||||
            # https://github.com/python/mypy/issues/708
 | 
			
		||||
 | 
			
		||||
        async with self._broadcaster.subscribe() as bstream:
 | 
			
		||||
            assert bstream.key != self._broadcaster.key
 | 
			
		||||
            assert bstream._recv == self._broadcaster._recv
 | 
			
		||||
        # TODO, prevent re-entrant sub scope?
 | 
			
		||||
        # if self._broadcaster._closed:
 | 
			
		||||
        #     raise RuntimeError(
 | 
			
		||||
        #         'This stream
 | 
			
		||||
 | 
			
		||||
            # NOTE: we patch on a `.send()` to the bcaster so that the
 | 
			
		||||
            # caller can still conduct 2-way streaming using this
 | 
			
		||||
            # ``bstream`` handle transparently as though it was the msg
 | 
			
		||||
            # stream instance.
 | 
			
		||||
            bstream.send = self.send  # type: ignore
 | 
			
		||||
        try:
 | 
			
		||||
            aenter = self._broadcaster.subscribe()
 | 
			
		||||
            async with aenter as bstream:
 | 
			
		||||
                # ?TODO, move into test suite?
 | 
			
		||||
                assert bstream.key != self._broadcaster.key
 | 
			
		||||
                assert bstream._recv == self._broadcaster._recv
 | 
			
		||||
 | 
			
		||||
            yield bstream
 | 
			
		||||
                # NOTE: we patch on a `.send()` to the bcaster so that the
 | 
			
		||||
                # caller can still conduct 2-way streaming using this
 | 
			
		||||
                # ``bstream`` handle transparently as though it was the msg
 | 
			
		||||
                # stream instance.
 | 
			
		||||
                bstream.send = self.send  # type: ignore
 | 
			
		||||
 | 
			
		||||
                # newly-allocated instance
 | 
			
		||||
                yield bstream
 | 
			
		||||
 | 
			
		||||
        finally:
 | 
			
		||||
            # XXX, the first-enterer task should, like all other
 | 
			
		||||
            # subs, close the first allocated bcrx, which adjusts the
 | 
			
		||||
            # common `bcrx.state`
 | 
			
		||||
            with trio.CancelScope(shield=True):
 | 
			
		||||
                if bcast is not None:
 | 
			
		||||
                    await bcast.aclose()
 | 
			
		||||
 | 
			
		||||
                # XXX, when the bcrx.state reports there are no more subs
 | 
			
		||||
                # we can revert to this obj's method, removing any
 | 
			
		||||
                # delegation overhead!
 | 
			
		||||
                if (
 | 
			
		||||
                    (orig_bcast := self._broadcaster)
 | 
			
		||||
                    and
 | 
			
		||||
                    not orig_bcast.state.subs
 | 
			
		||||
                ):
 | 
			
		||||
                    self.receive = self._receive
 | 
			
		||||
                    # self._broadcaster = None
 | 
			
		||||
 | 
			
		||||
    async def send(
 | 
			
		||||
        self,
 | 
			
		||||
| 
						 | 
				
			
			
 | 
			
		|||
| 
						 | 
				
			
			@ -117,7 +117,6 @@ class ActorNursery:
 | 
			
		|||
            ]
 | 
			
		||||
        ] = {}
 | 
			
		||||
 | 
			
		||||
        self.cancelled: bool = False
 | 
			
		||||
        self._join_procs = trio.Event()
 | 
			
		||||
        self._at_least_one_child_in_debug: bool = False
 | 
			
		||||
        self.errors = errors
 | 
			
		||||
| 
						 | 
				
			
			@ -135,10 +134,53 @@ class ActorNursery:
 | 
			
		|||
        # TODO: remove the `.run_in_actor()` API and thus this 2ndary
 | 
			
		||||
        # nursery when that API get's moved outside this primitive!
 | 
			
		||||
        self._ria_nursery = ria_nursery
 | 
			
		||||
 | 
			
		||||
        # TODO, factor this into a .hilevel api!
 | 
			
		||||
        #
 | 
			
		||||
        # portals spawned with ``run_in_actor()`` are
 | 
			
		||||
        # cancelled when their "main" result arrives
 | 
			
		||||
        self._cancel_after_result_on_exit: set = set()
 | 
			
		||||
 | 
			
		||||
        # trio.Nursery-like cancel (request) statuses
 | 
			
		||||
        self._cancelled_caught: bool = False
 | 
			
		||||
        self._cancel_called: bool = False
 | 
			
		||||
 | 
			
		||||
    @property
 | 
			
		||||
    def cancel_called(self) -> bool:
 | 
			
		||||
        '''
 | 
			
		||||
        Records whether cancellation has been requested for this
 | 
			
		||||
        actor-nursery by a call to  `.cancel()` either due to,
 | 
			
		||||
        - an explicit call by some actor-local-task,
 | 
			
		||||
        - an implicit call due to an error/cancel emited inside
 | 
			
		||||
          the `tractor.open_nursery()` block.
 | 
			
		||||
 | 
			
		||||
        '''
 | 
			
		||||
        return self._cancel_called
 | 
			
		||||
 | 
			
		||||
    @property
 | 
			
		||||
    def cancelled_caught(self) -> bool:
 | 
			
		||||
        '''
 | 
			
		||||
        Set when this nursery was able to cance all spawned subactors
 | 
			
		||||
        gracefully via an (implicit) call to `.cancel()`.
 | 
			
		||||
 | 
			
		||||
        '''
 | 
			
		||||
        return self._cancelled_caught
 | 
			
		||||
 | 
			
		||||
    # TODO! remove internal/test-suite usage!
 | 
			
		||||
    @property
 | 
			
		||||
    def cancelled(self) -> bool:
 | 
			
		||||
        warnings.warn(
 | 
			
		||||
            "`ActorNursery.cancelled` is now deprecated, use "
 | 
			
		||||
            " `.cancel_called` instead.",
 | 
			
		||||
            DeprecationWarning,
 | 
			
		||||
            stacklevel=2,
 | 
			
		||||
        )
 | 
			
		||||
        return (
 | 
			
		||||
            self._cancel_called
 | 
			
		||||
            # and
 | 
			
		||||
            # self._cancelled_caught
 | 
			
		||||
        )
 | 
			
		||||
 | 
			
		||||
    async def start_actor(
 | 
			
		||||
        self,
 | 
			
		||||
        name: str,
 | 
			
		||||
| 
						 | 
				
			
			@ -316,7 +358,7 @@ class ActorNursery:
 | 
			
		|||
 | 
			
		||||
        '''
 | 
			
		||||
        __runtimeframe__: int = 1  # noqa
 | 
			
		||||
        self.cancelled = True
 | 
			
		||||
        self._cancel_called = True
 | 
			
		||||
 | 
			
		||||
        # TODO: impl a repr for spawn more compact
 | 
			
		||||
        # then `._children`..
 | 
			
		||||
| 
						 | 
				
			
			@ -394,6 +436,8 @@ class ActorNursery:
 | 
			
		|||
            ) in children.values():
 | 
			
		||||
                log.warning(f"Hard killing process {proc}")
 | 
			
		||||
                proc.terminate()
 | 
			
		||||
        else:
 | 
			
		||||
            self._cancelled_caught
 | 
			
		||||
 | 
			
		||||
        # mark ourselves as having (tried to have) cancelled all subactors
 | 
			
		||||
        self._join_procs.set()
 | 
			
		||||
| 
						 | 
				
			
			
 | 
			
		|||
| 
						 | 
				
			
			@ -101,11 +101,27 @@ class Channel:
 | 
			
		|||
        # ^XXX! ONLY set if a remote actor sends an `Error`-msg
 | 
			
		||||
        self._closed: bool = False
 | 
			
		||||
 | 
			
		||||
        # flag set by ``Portal.cancel_actor()`` indicating remote
 | 
			
		||||
        # (possibly peer) cancellation of the far end actor
 | 
			
		||||
        # runtime.
 | 
			
		||||
        # flag set by `Portal.cancel_actor()` indicating remote
 | 
			
		||||
        # (possibly peer) cancellation of the far end actor runtime.
 | 
			
		||||
        self._cancel_called: bool = False
 | 
			
		||||
 | 
			
		||||
    @property
 | 
			
		||||
    def closed(self) -> bool:
 | 
			
		||||
        '''
 | 
			
		||||
        Was `.aclose()` successfully called?
 | 
			
		||||
 | 
			
		||||
        '''
 | 
			
		||||
        return self._closed
 | 
			
		||||
 | 
			
		||||
    @property
 | 
			
		||||
    def cancel_called(self) -> bool:
 | 
			
		||||
        '''
 | 
			
		||||
        Set when `Portal.cancel_actor()` is called on a portal which
 | 
			
		||||
        wraps this IPC channel.
 | 
			
		||||
 | 
			
		||||
        '''
 | 
			
		||||
        return self._cancel_called
 | 
			
		||||
 | 
			
		||||
    @property
 | 
			
		||||
    def uid(self) -> tuple[str, str]:
 | 
			
		||||
        '''
 | 
			
		||||
| 
						 | 
				
			
			
 | 
			
		|||
| 
						 | 
				
			
			@ -814,10 +814,14 @@ class Server(Struct):
 | 
			
		|||
 | 
			
		||||
    async def wait_for_no_more_peers(
 | 
			
		||||
        self,
 | 
			
		||||
        shield: bool = False,
 | 
			
		||||
        # XXX, should this even be allowed?
 | 
			
		||||
        # -> i've seen it cause hangs on teardown
 | 
			
		||||
        #    in `test_resource_cache.py`
 | 
			
		||||
        # _shield: bool = False,
 | 
			
		||||
    ) -> None:
 | 
			
		||||
        with trio.CancelScope(shield=shield):
 | 
			
		||||
            await self._no_more_peers.wait()
 | 
			
		||||
        await self._no_more_peers.wait()
 | 
			
		||||
        # with trio.CancelScope(shield=_shield):
 | 
			
		||||
        #     await self._no_more_peers.wait()
 | 
			
		||||
 | 
			
		||||
    async def wait_for_peer(
 | 
			
		||||
        self,
 | 
			
		||||
| 
						 | 
				
			
			
 | 
			
		|||
| 
						 | 
				
			
			@ -100,6 +100,32 @@ class Lagged(trio.TooSlowError):
 | 
			
		|||
    '''
 | 
			
		||||
 | 
			
		||||
 | 
			
		||||
def wrap_rx_for_eoc(
 | 
			
		||||
    rx: AsyncReceiver,
 | 
			
		||||
) -> AsyncReceiver:
 | 
			
		||||
 | 
			
		||||
    match rx:
 | 
			
		||||
        case trio.MemoryReceiveChannel():
 | 
			
		||||
 | 
			
		||||
            # XXX, taken verbatim from .receive_nowait()
 | 
			
		||||
            def is_eoc() -> bool:
 | 
			
		||||
                if not rx._state.open_send_channels:
 | 
			
		||||
                    return trio.EndOfChannel
 | 
			
		||||
 | 
			
		||||
                return False
 | 
			
		||||
 | 
			
		||||
            rx.is_eoc = is_eoc
 | 
			
		||||
 | 
			
		||||
        case _:
 | 
			
		||||
            # XXX, ensure we define a private field!
 | 
			
		||||
            # case tractor.MsgStream:
 | 
			
		||||
            assert (
 | 
			
		||||
                getattr(rx, '_eoc', False) is not None
 | 
			
		||||
            )
 | 
			
		||||
 | 
			
		||||
    return rx
 | 
			
		||||
 | 
			
		||||
 | 
			
		||||
class BroadcastState(Struct):
 | 
			
		||||
    '''
 | 
			
		||||
    Common state to all receivers of a broadcast.
 | 
			
		||||
| 
						 | 
				
			
			@ -186,11 +212,23 @@ class BroadcastReceiver(ReceiveChannel):
 | 
			
		|||
        state.subs[self.key] = -1
 | 
			
		||||
 | 
			
		||||
        # underlying for this receiver
 | 
			
		||||
        self._rx = rx_chan
 | 
			
		||||
        self._rx = wrap_rx_for_eoc(rx_chan)
 | 
			
		||||
        self._recv = receive_afunc or rx_chan.receive
 | 
			
		||||
        self._closed: bool = False
 | 
			
		||||
        self._raise_on_lag = raise_on_lag
 | 
			
		||||
 | 
			
		||||
    @property
 | 
			
		||||
    def state(self) -> BroadcastState:
 | 
			
		||||
        '''
 | 
			
		||||
        Read-only access to this receivers internal `._state`
 | 
			
		||||
        instance ref.
 | 
			
		||||
 | 
			
		||||
        If you just want to read the high-level state metrics,
 | 
			
		||||
        use `.state.statistics()`.
 | 
			
		||||
 | 
			
		||||
        '''
 | 
			
		||||
        return self._state
 | 
			
		||||
 | 
			
		||||
    def receive_nowait(
 | 
			
		||||
        self,
 | 
			
		||||
        _key: int | None = None,
 | 
			
		||||
| 
						 | 
				
			
			@ -215,7 +253,23 @@ class BroadcastReceiver(ReceiveChannel):
 | 
			
		|||
        try:
 | 
			
		||||
            seq = state.subs[key]
 | 
			
		||||
        except KeyError:
 | 
			
		||||
            # from tractor import pause_from_sync
 | 
			
		||||
            # pause_from_sync(shield=True)
 | 
			
		||||
            if (
 | 
			
		||||
                (rx_eoc := self._rx.is_eoc())
 | 
			
		||||
                or
 | 
			
		||||
                self.state.eoc
 | 
			
		||||
            ):
 | 
			
		||||
                raise trio.EndOfChannel(
 | 
			
		||||
                    'Broadcast-Rx underlying already ended!'
 | 
			
		||||
                ) from rx_eoc
 | 
			
		||||
 | 
			
		||||
            if self._closed:
 | 
			
		||||
                # if (rx_eoc := self._rx._eoc):
 | 
			
		||||
                #     raise trio.EndOfChannel(
 | 
			
		||||
                #         'Broadcast-Rx underlying already ended!'
 | 
			
		||||
                #     ) from rx_eoc
 | 
			
		||||
 | 
			
		||||
                raise trio.ClosedResourceError
 | 
			
		||||
 | 
			
		||||
            raise RuntimeError(
 | 
			
		||||
| 
						 | 
				
			
			@ -453,8 +507,9 @@ class BroadcastReceiver(ReceiveChannel):
 | 
			
		|||
        self._closed = True
 | 
			
		||||
 | 
			
		||||
 | 
			
		||||
# NOTE, this can we use as an `@acm` since `BroadcastReceiver`
 | 
			
		||||
# derives from `ReceiveChannel`.
 | 
			
		||||
def broadcast_receiver(
 | 
			
		||||
 | 
			
		||||
    recv_chan: AsyncReceiver,
 | 
			
		||||
    max_buffer_size: int,
 | 
			
		||||
    receive_afunc: Callable[[], Awaitable[Any]]|None = None,
 | 
			
		||||
| 
						 | 
				
			
			
 | 
			
		|||
| 
						 | 
				
			
			@ -289,7 +289,10 @@ async def maybe_open_context(
 | 
			
		|||
        )
 | 
			
		||||
        _Cache.users += 1
 | 
			
		||||
        lock.release()
 | 
			
		||||
        yield False, yielded
 | 
			
		||||
        yield (
 | 
			
		||||
            False,  # cache_hit = "no"
 | 
			
		||||
            yielded,
 | 
			
		||||
        )
 | 
			
		||||
 | 
			
		||||
    else:
 | 
			
		||||
        _Cache.users += 1
 | 
			
		||||
| 
						 | 
				
			
			@ -303,7 +306,10 @@ async def maybe_open_context(
 | 
			
		|||
            # f'{ctx_key!r} -> {yielded!r}\n'
 | 
			
		||||
        )
 | 
			
		||||
        lock.release()
 | 
			
		||||
        yield True, yielded
 | 
			
		||||
        yield (
 | 
			
		||||
            True,  # cache_hit = "yes"
 | 
			
		||||
            yielded,
 | 
			
		||||
        )
 | 
			
		||||
 | 
			
		||||
    finally:
 | 
			
		||||
        _Cache.users -= 1
 | 
			
		||||
| 
						 | 
				
			
			
 | 
			
		|||
		Loading…
	
		Reference in New Issue