From 917d94dcbbbc9f0f8e82131054130af14bbcc98e Mon Sep 17 00:00:00 2001 From: Tyler Goodlet Date: Mon, 4 Oct 2021 11:02:51 -0400 Subject: [PATCH] Start `trionics` sub-pkg with `async_enter_all()` Since it seems we're building out more and more higher level primitives in order to support certain parallel style actor trees and messaging patterns (eg. task broadcast channels), we might as well start a new sub-package for purely `trio` constructions. We hereby dub this the realm of `trionics` (like electronics but for trios instead of electrons). To kick things off, add an `async_enter_all()` concurrent exit-stack-like context manager API which will concurrently spawn a sequence of provided async context managers and deliver their ordered results but with proper support for `trio` cancellation semantics. The stdlib's `AsyncExitStack` is not compatible with nurseries not `trio` tasks (which are cancelled) since as task will be suspended on the stack after push and does not ever hit a checkpoint until the stack is closed. --- tractor/trionics/__init__.py | 10 ++++++ tractor/trionics/_mngrs.py | 64 ++++++++++++++++++++++++++++++++++++ 2 files changed, 74 insertions(+) create mode 100644 tractor/trionics/__init__.py create mode 100644 tractor/trionics/_mngrs.py diff --git a/tractor/trionics/__init__.py b/tractor/trionics/__init__.py new file mode 100644 index 0000000..620d25a --- /dev/null +++ b/tractor/trionics/__init__.py @@ -0,0 +1,10 @@ +''' +Sugary patterns for trio + tractor designs. + +''' +from ._mngrs import async_enter_all + + +__all__ = [ + 'async_enter_all', +] diff --git a/tractor/trionics/_mngrs.py b/tractor/trionics/_mngrs.py new file mode 100644 index 0000000..4e9a86a --- /dev/null +++ b/tractor/trionics/_mngrs.py @@ -0,0 +1,64 @@ +''' +Async context manager primitives with hard ``trio``-aware semantics + +''' +from typing import AsyncContextManager +from typing import TypeVar +from contextlib import asynccontextmanager as acm + +import trio + + +# A regular invariant generic type +T = TypeVar("T") + + +async def _enter_and_sleep( + + mngr: AsyncContextManager[T], + to_yield: dict[int, T], + all_entered: trio.Event, + # task_status: TaskStatus[T] = trio.TASK_STATUS_IGNORED, + +) -> T: + '''Open the async context manager deliver it's value + to this task's spawner and sleep until cancelled. + + ''' + async with mngr as value: + to_yield[id(mngr)] = value + + if all(to_yield.values()): + all_entered.set() + + # sleep until cancelled + await trio.sleep_forever() + + +@acm +async def async_enter_all( + + *mngrs: list[AsyncContextManager[T]], + +) -> tuple[T]: + + to_yield = {}.fromkeys(id(mngr) for mngr in mngrs) + + all_entered = trio.Event() + + async with trio.open_nursery() as n: + for mngr in mngrs: + n.start_soon( + _enter_and_sleep, + mngr, + to_yield, + all_entered, + ) + + # deliver control once all managers have started up + await all_entered.wait() + yield tuple(to_yield.values()) + + # tear down all sleeper tasks thus triggering individual + # mngr ``__aexit__()``s. + n.cancel_scope.cancel()