Change to `gather_contexts()`, use event for graceful exit

The api we've made here is actually closer to `asyncio.gather()` but
with opening async context managers instead of funcs. Use another event
to allow for graceful teardown of children on non-cancellation exits
and add a doc string.
graceful_gather
Tyler Goodlet 2021-10-24 13:48:36 -04:00
parent ebf080b8a2
commit d0f5c7a5e2
3 changed files with 31 additions and 9 deletions

View File

@ -3,7 +3,7 @@ import itertools
import trio
import tractor
from tractor import open_actor_cluster
from tractor.trionics import async_enter_all
from tractor.trionics import gather_contexts
from conftest import tractor_test
@ -25,10 +25,10 @@ async def worker(ctx: tractor.Context) -> None:
async def test_streaming_to_actor_cluster() -> None:
async with (
open_actor_cluster(modules=[__name__]) as portals,
async_enter_all(
gather_contexts(
mngrs=[p.open_context(worker) for p in portals.values()],
) as contexts,
async_enter_all(
gather_contexts(
mngrs=[ctx[0].open_stream() for ctx in contexts],
) as streams,
):

View File

@ -2,12 +2,12 @@
Sugary patterns for trio + tractor designs.
'''
from ._mngrs import async_enter_all
from ._mngrs import gather_contexts
from ._broadcast import broadcast_receiver, BroadcastReceiver, Lagged
__all__ = [
'async_enter_all',
'gather_contexts',
'broadcast_receiver',
'BroadcastReceiver',
'Lagged',

View File

@ -14,11 +14,15 @@ T = TypeVar("T")
async def _enter_and_wait(
mngr: AsyncContextManager[T],
unwrapped: dict[int, T],
all_entered: trio.Event,
parent_exit: trio.Event,
) -> None:
'''Open the async context manager deliver it's value
'''
Open the async context manager deliver it's value
to this task's spawner and sleep until cancelled.
'''
@ -28,16 +32,31 @@ async def _enter_and_wait(
if all(unwrapped.values()):
all_entered.set()
await trio.sleep_forever()
await parent_exit.wait()
@acm
async def async_enter_all(
async def gather_contexts(
mngrs: Sequence[AsyncContextManager[T]],
) -> AsyncGenerator[tuple[T, ...], None]:
'''
Concurrently enter a sequence of async context managers, each in
a separate ``trio`` task and deliver the unwrapped values in the
same order once all managers have entered. On exit all contexts are
subsequently and concurrently exited.
This function is somewhat similar to common usage of
``contextlib.AsyncExitStack.enter_async_context()`` (in a loop) in
combo with ``asyncio.gather()`` except the managers are concurrently
entered and exited cancellation just works.
'''
unwrapped = {}.fromkeys(id(mngr) for mngr in mngrs)
all_entered = trio.Event()
parent_exit = trio.Event()
async with trio.open_nursery() as n:
for mngr in mngrs:
@ -46,6 +65,7 @@ async def async_enter_all(
mngr,
unwrapped,
all_entered,
parent_exit,
)
# deliver control once all managers have started up
@ -53,4 +73,6 @@ async def async_enter_all(
yield tuple(unwrapped.values())
n.cancel_scope.cancel()
# we don't need a try/finally since cancellation will be triggered
# by the surrounding nursery on error.
parent_exit.set()