diff --git a/tractor/trionics/__init__.py b/tractor/trionics/__init__.py index 3d1b9bd..2730834 100644 --- a/tractor/trionics/__init__.py +++ b/tractor/trionics/__init__.py @@ -2,8 +2,15 @@ Sugary patterns for trio + tractor designs. ''' -from ._mngrs import gather_contexts -from ._broadcast import broadcast_receiver, BroadcastReceiver, Lagged +from ._mngrs import ( + gather_contexts, + maybe_open_context, +) +from ._broadcast import ( + broadcast_receiver, + BroadcastReceiver, + Lagged, +) __all__ = [ @@ -11,4 +18,5 @@ __all__ = [ 'broadcast_receiver', 'BroadcastReceiver', 'Lagged', + 'maybe_open_context', ] diff --git a/tractor/trionics/_mngrs.py b/tractor/trionics/_mngrs.py index d31f1e0..29be832 100644 --- a/tractor/trionics/_mngrs.py +++ b/tractor/trionics/_mngrs.py @@ -2,12 +2,25 @@ Async context manager primitives with hard ``trio``-aware semantics ''' -from typing import AsyncContextManager, AsyncGenerator -from typing import TypeVar, Sequence from contextlib import asynccontextmanager as acm +from typing import ( + Any, + AsyncContextManager, + AsyncGenerator, + Hashable, + Optional, + Sequence, + TypeVar, +) import trio +from trio_typing import TaskStatus +from ..log import get_logger +from .._state import current_actor + + +log = get_logger(__name__) # A regular invariant generic type T = TypeVar("T") @@ -76,3 +89,111 @@ async def gather_contexts( # we don't need a try/finally since cancellation will be triggered # by the surrounding nursery on error. parent_exit.set() + + +# Per actor task caching helpers. +# Further potential examples of interest: +# https://gist.github.com/njsmith/cf6fc0a97f53865f2c671659c88c1798#file-cache-py-L8 + +class cache: + ''' + Globally (processs wide) cached, task access to a + kept-alive-while-in-use async resource. + + ''' + lock = trio.Lock() + users: int = 0 + values: dict[Any, Any] = {} + resources: dict[ + int, + Optional[tuple[trio.Nursery, trio.Event]] + ] = {} + no_more_users: Optional[trio.Event] = None + + @classmethod + async def run_ctx( + cls, + mng, + key, + task_status: TaskStatus[T] = trio.TASK_STATUS_IGNORED, + + ) -> None: + async with mng as value: + + _, no_more_users = cls.resources[id(mng)] + cls.values[key] = value + task_status.started(value) + try: + await no_more_users.wait() + finally: + value = cls.values.pop(key) + # discard nursery ref so it won't be re-used (an error) + cls.resources.pop(id(mng)) + + +@acm +async def maybe_open_context( + + key: Hashable, + mngr: AsyncContextManager[T], + +) -> (bool, T): + ''' + Maybe open a context manager if there is not already a cached + version for the provided ``key``. Return the cached instance on + a cache hit. + + ''' + await cache.lock.acquire() + + ctx_key = id(mngr) + + value = None + try: + # lock feed acquisition around task racing / ``trio``'s + # scheduler protocol + value = cache.values[key] + log.info(f'Reusing cached resource for {key}') + cache.users += 1 + cache.lock.release() + yield True, value + + except KeyError: + log.info(f'Allocating new resource for {key}') + + # **critical section** that should prevent other tasks from + # checking the cache until complete otherwise the scheduler + # may switch and by accident we create more then one feed. + + # TODO: avoid pulling from ``tractor`` internals and + # instead offer a "root nursery" in piker actors? + service_n = current_actor()._service_n + + # TODO: does this need to be a tractor "root nursery"? + ln = cache.resources.get(ctx_key) + assert not ln + + ln, _ = cache.resources[ctx_key] = (service_n, trio.Event()) + + value = await ln.start(cache.run_ctx, mngr, key) + cache.users += 1 + cache.lock.release() + + yield False, value + + finally: + cache.users -= 1 + + if cache.lock.locked(): + cache.lock.release() + + if value is not None: + # if no more consumers, teardown the client + if cache.users <= 0: + log.info(f'De-allocating resource for {key}') + + # terminate mngr nursery + entry = cache.resources.get(ctx_key) + if entry: + _, no_more_users = entry + no_more_users.set()