forked from goodboy/tractor
1
0
Fork 0

Merge pull request #254 from goodboy/graceful_gather

Change to `gather_contexts()`, use event for graceful exit
trionics
goodboy 2021-10-25 10:14:01 -04:00 committed by GitHub
commit 925af28092
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
3 changed files with 31 additions and 9 deletions

View File

@ -3,7 +3,7 @@ import itertools
import trio import trio
import tractor import tractor
from tractor import open_actor_cluster from tractor import open_actor_cluster
from tractor.trionics import async_enter_all from tractor.trionics import gather_contexts
from conftest import tractor_test from conftest import tractor_test
@ -25,10 +25,10 @@ async def worker(ctx: tractor.Context) -> None:
async def test_streaming_to_actor_cluster() -> None: async def test_streaming_to_actor_cluster() -> None:
async with ( async with (
open_actor_cluster(modules=[__name__]) as portals, open_actor_cluster(modules=[__name__]) as portals,
async_enter_all( gather_contexts(
mngrs=[p.open_context(worker) for p in portals.values()], mngrs=[p.open_context(worker) for p in portals.values()],
) as contexts, ) as contexts,
async_enter_all( gather_contexts(
mngrs=[ctx[0].open_stream() for ctx in contexts], mngrs=[ctx[0].open_stream() for ctx in contexts],
) as streams, ) as streams,
): ):

View File

@ -2,12 +2,12 @@
Sugary patterns for trio + tractor designs. Sugary patterns for trio + tractor designs.
''' '''
from ._mngrs import async_enter_all from ._mngrs import gather_contexts
from ._broadcast import broadcast_receiver, BroadcastReceiver, Lagged from ._broadcast import broadcast_receiver, BroadcastReceiver, Lagged
__all__ = [ __all__ = [
'async_enter_all', 'gather_contexts',
'broadcast_receiver', 'broadcast_receiver',
'BroadcastReceiver', 'BroadcastReceiver',
'Lagged', 'Lagged',

View File

@ -14,11 +14,15 @@ T = TypeVar("T")
async def _enter_and_wait( async def _enter_and_wait(
mngr: AsyncContextManager[T], mngr: AsyncContextManager[T],
unwrapped: dict[int, T], unwrapped: dict[int, T],
all_entered: trio.Event, all_entered: trio.Event,
parent_exit: trio.Event,
) -> None: ) -> None:
'''Open the async context manager deliver it's value '''
Open the async context manager deliver it's value
to this task's spawner and sleep until cancelled. to this task's spawner and sleep until cancelled.
''' '''
@ -28,16 +32,31 @@ async def _enter_and_wait(
if all(unwrapped.values()): if all(unwrapped.values()):
all_entered.set() all_entered.set()
await trio.sleep_forever() await parent_exit.wait()
@acm @acm
async def async_enter_all( async def gather_contexts(
mngrs: Sequence[AsyncContextManager[T]], mngrs: Sequence[AsyncContextManager[T]],
) -> AsyncGenerator[tuple[T, ...], None]: ) -> AsyncGenerator[tuple[T, ...], None]:
'''
Concurrently enter a sequence of async context managers, each in
a separate ``trio`` task and deliver the unwrapped values in the
same order once all managers have entered. On exit all contexts are
subsequently and concurrently exited.
This function is somewhat similar to common usage of
``contextlib.AsyncExitStack.enter_async_context()`` (in a loop) in
combo with ``asyncio.gather()`` except the managers are concurrently
entered and exited cancellation just works.
'''
unwrapped = {}.fromkeys(id(mngr) for mngr in mngrs) unwrapped = {}.fromkeys(id(mngr) for mngr in mngrs)
all_entered = trio.Event() all_entered = trio.Event()
parent_exit = trio.Event()
async with trio.open_nursery() as n: async with trio.open_nursery() as n:
for mngr in mngrs: for mngr in mngrs:
@ -46,6 +65,7 @@ async def async_enter_all(
mngr, mngr,
unwrapped, unwrapped,
all_entered, all_entered,
parent_exit,
) )
# deliver control once all managers have started up # deliver control once all managers have started up
@ -53,4 +73,6 @@ async def async_enter_all(
yield tuple(unwrapped.values()) yield tuple(unwrapped.values())
n.cancel_scope.cancel() # we don't need a try/finally since cancellation will be triggered
# by the surrounding nursery on error.
parent_exit.set()