forked from goodboy/tractor
1
0
Fork 0

Mk `gather_contexts()` support `@acm`s yielding `None`

We were using a `all(<yielded values>)` condition which obviously won't
work if the batched managers yield any non-truthy value. So instead see
the `unwrapped: dict` with the `id(mngrs)` and only unblock once all
values have been filled in to be something that is not that value.
multihomed
Tyler Goodlet 2023-09-27 14:05:22 -04:00
parent 22c14e235e
commit ee151b00af
1 changed files with 21 additions and 5 deletions

View File

@ -70,6 +70,7 @@ async def _enter_and_wait(
unwrapped: dict[int, T], unwrapped: dict[int, T],
all_entered: trio.Event, all_entered: trio.Event,
parent_exit: trio.Event, parent_exit: trio.Event,
seed: int,
) -> None: ) -> None:
''' '''
@ -80,7 +81,10 @@ async def _enter_and_wait(
async with mngr as value: async with mngr as value:
unwrapped[id(mngr)] = value unwrapped[id(mngr)] = value
if all(unwrapped.values()): if all(
val != seed
for val in unwrapped.values()
):
all_entered.set() all_entered.set()
await parent_exit.wait() await parent_exit.wait()
@ -91,7 +95,13 @@ async def gather_contexts(
mngrs: Sequence[AsyncContextManager[T]], mngrs: Sequence[AsyncContextManager[T]],
) -> AsyncGenerator[tuple[Optional[T], ...], None]: ) -> AsyncGenerator[
tuple[
T | None,
...
],
None,
]:
''' '''
Concurrently enter a sequence of async context managers, each in Concurrently enter a sequence of async context managers, each in
a separate ``trio`` task and deliver the unwrapped values in the a separate ``trio`` task and deliver the unwrapped values in the
@ -104,7 +114,11 @@ async def gather_contexts(
entered and exited, and cancellation just works. entered and exited, and cancellation just works.
''' '''
unwrapped: dict[int, Optional[T]] = {}.fromkeys(id(mngr) for mngr in mngrs) seed: int = id(mngrs)
unwrapped: dict[int, T | None] = {}.fromkeys(
(id(mngr) for mngr in mngrs),
seed,
)
all_entered = trio.Event() all_entered = trio.Event()
parent_exit = trio.Event() parent_exit = trio.Event()
@ -116,8 +130,9 @@ async def gather_contexts(
if not mngrs: if not mngrs:
raise ValueError( raise ValueError(
'input mngrs is empty?\n' '`.trionics.gather_contexts()` input mngrs is empty?\n'
'Did try to use inline generator syntax?' 'Did try to use inline generator syntax?\n'
'Use a non-lazy iterator or sequence type intead!'
) )
async with trio.open_nursery() as n: async with trio.open_nursery() as n:
@ -128,6 +143,7 @@ async def gather_contexts(
unwrapped, unwrapped,
all_entered, all_entered,
parent_exit, parent_exit,
seed,
) )
# deliver control once all managers have started up # deliver control once all managers have started up