Start `trionics` sub-pkg with `async_enter_all()`

Since it seems we're building out more and more higher level primitives
in order to support certain parallel style actor trees and messaging
patterns (eg. task broadcast channels), we might as well start a new
sub-package for purely `trio` constructions. We hereby dub this
the realm of `trionics` (like electronics but for trios instead of
electrons).

To kick things off, add an `async_enter_all()` concurrent
exit-stack-like context manager API which will concurrently spawn
a sequence of provided async context managers and deliver their ordered
results but with proper support for `trio` cancellation semantics.
The stdlib's `AsyncExitStack` is not compatible with nurseries not
`trio` tasks (which are cancelled) since as task will be suspended on
the stack after push and does not ever hit a checkpoint until the stack
is closed.
graceful_gather
Tyler Goodlet 2021-10-04 11:02:51 -04:00
parent 340ddba4ae
commit 680a841282
2 changed files with 74 additions and 0 deletions

View File

@ -0,0 +1,10 @@
'''
Sugary patterns for trio + tractor designs.
'''
from ._mngrs import async_enter_all
__all__ = [
'async_enter_all',
]

View File

@ -0,0 +1,64 @@
'''
Async context manager primitives with hard ``trio``-aware semantics
'''
from typing import AsyncContextManager
from typing import TypeVar
from contextlib import asynccontextmanager as acm
import trio
# A regular invariant generic type
T = TypeVar("T")
async def _enter_and_sleep(
mngr: AsyncContextManager[T],
to_yield: dict[int, T],
all_entered: trio.Event,
# task_status: TaskStatus[T] = trio.TASK_STATUS_IGNORED,
) -> T:
'''Open the async context manager deliver it's value
to this task's spawner and sleep until cancelled.
'''
async with mngr as value:
to_yield[id(mngr)] = value
if all(to_yield.values()):
all_entered.set()
# sleep until cancelled
await trio.sleep_forever()
@acm
async def async_enter_all(
*mngrs: list[AsyncContextManager[T]],
) -> tuple[T]:
to_yield = {}.fromkeys(id(mngr) for mngr in mngrs)
all_entered = trio.Event()
async with trio.open_nursery() as n:
for mngr in mngrs:
n.start_soon(
_enter_and_sleep,
mngr,
to_yield,
all_entered,
)
# deliver control once all managers have started up
await all_entered.wait()
yield tuple(to_yield.values())
# tear down all sleeper tasks thus triggering individual
# mngr ``__aexit__()``s.
n.cancel_scope.cancel()