From 8f023cd66f398f5993e34a64530ca4e17de932ee Mon Sep 17 00:00:00 2001 From: Tyler Goodlet Date: Thu, 28 Oct 2021 09:51:02 -0400 Subject: [PATCH] Factor out context cacher to `tractor.trionics` --- piker/_cacheables.py | 122 +------------------------------------------ piker/data/feed.py | 6 +-- piker/ui/_display.py | 4 +- 3 files changed, 7 insertions(+), 125 deletions(-) diff --git a/piker/_cacheables.py b/piker/_cacheables.py index 07ad2319..02ac9240 100644 --- a/piker/_cacheables.py +++ b/piker/_cacheables.py @@ -18,30 +18,18 @@ Cacheing apis and toolz. """ -# further examples of interest: -# https://gist.github.com/njsmith/cf6fc0a97f53865f2c671659c88c1798#file-cache-py-L8 from collections import OrderedDict -from typing import ( - Any, - Hashable, - Optional, - TypeVar, - AsyncContextManager, -) from contextlib import ( asynccontextmanager, ) -import trio -from trio_typing import TaskStatus -import tractor +from tractor.trionics import maybe_open_context from .brokers import get_brokermod from .log import get_logger -T = TypeVar('T') log = get_logger(__name__) @@ -74,112 +62,6 @@ def async_lifo_cache(maxsize=128): return decorator -_cache: dict[str, 'Client'] = {} # noqa - - -class cache: - '''Globally (processs wide) cached, task access to a - kept-alive-while-in-use async resource. - - ''' - lock = trio.Lock() - users: int = 0 - values: dict[Any, Any] = {} - resources: dict[ - int, - Optional[tuple[trio.Nursery, trio.Event]] - ] = {} - no_more_users: Optional[trio.Event] = None - - @classmethod - async def run_ctx( - cls, - mng, - key, - task_status: TaskStatus[T] = trio.TASK_STATUS_IGNORED, - - ) -> None: - async with mng as value: - - _, no_more_users = cls.resources[id(mng)] - cls.values[key] = value - task_status.started(value) - try: - await no_more_users.wait() - finally: - value = cls.values.pop(key) - # discard nursery ref so it won't be re-used (an error) - cls.resources.pop(id(mng)) - - -@asynccontextmanager -async def maybe_open_ctx( - - key: Hashable, - mngr: AsyncContextManager[T], - -) -> (bool, T): - '''Maybe open a context manager if there is not already a cached - version for the provided ``key``. Return the cached instance on - a cache hit. - - ''' - - await cache.lock.acquire() - - ctx_key = id(mngr) - - value = None - try: - # lock feed acquisition around task racing / ``trio``'s - # scheduler protocol - value = cache.values[key] - log.info(f'Reusing cached resource for {key}') - cache.users += 1 - cache.lock.release() - yield True, value - - except KeyError: - log.info(f'Allocating new resource for {key}') - - # **critical section** that should prevent other tasks from - # checking the cache until complete otherwise the scheduler - # may switch and by accident we create more then one feed. - - # TODO: avoid pulling from ``tractor`` internals and - # instead offer a "root nursery" in piker actors? - service_n = tractor.current_actor()._service_n - - # TODO: does this need to be a tractor "root nursery"? - ln = cache.resources.get(ctx_key) - assert not ln - - ln, _ = cache.resources[ctx_key] = (service_n, trio.Event()) - - value = await ln.start(cache.run_ctx, mngr, key) - cache.users += 1 - cache.lock.release() - - yield False, value - - finally: - cache.users -= 1 - - if cache.lock.locked(): - cache.lock.release() - - if value is not None: - # if no more consumers, teardown the client - if cache.users <= 0: - log.warning(f'De-allocating resource for {key}') - - # terminate mngr nursery - entry = cache.resources.get(ctx_key) - if entry: - _, no_more_users = entry - no_more_users.set() - - @asynccontextmanager async def open_cached_client( brokername: str, @@ -190,7 +72,7 @@ async def open_cached_client( ''' brokermod = get_brokermod(brokername) - async with maybe_open_ctx( + async with maybe_open_context( key=brokername, mngr=brokermod.get_client(), ) as (cache_hit, client): diff --git a/piker/data/feed.py b/piker/data/feed.py index f8cb0e9b..337a625c 100644 --- a/piker/data/feed.py +++ b/piker/data/feed.py @@ -37,7 +37,7 @@ import tractor from pydantic import BaseModel from ..brokers import get_brokermod -from .._cacheables import maybe_open_ctx +from .._cacheables import maybe_open_context from ..log import get_logger, get_console_log from .._daemon import ( maybe_spawn_brokerd, @@ -368,7 +368,7 @@ async def open_sample_step_stream( # XXX: this should be singleton on a host, # a lone broker-daemon per provider should be # created for all practical purposes - async with maybe_open_ctx( + async with maybe_open_context( key=delay_s, mngr=portal.open_stream_from( iter_ohlc_periods, @@ -590,7 +590,7 @@ async def maybe_open_feed( ''' sym = symbols[0].lower() - async with maybe_open_ctx( + async with maybe_open_context( key=(brokername, sym), mngr=open_feed( brokername, diff --git a/piker/ui/_display.py b/piker/ui/_display.py index a6379d46..463f243d 100644 --- a/piker/ui/_display.py +++ b/piker/ui/_display.py @@ -35,7 +35,7 @@ import tractor import trio from .. import brokers -from .._cacheables import maybe_open_ctx +from .._cacheables import maybe_open_context from ..trionics import async_enter_all from ..data.feed import open_feed, Feed from ._chart import ( @@ -555,7 +555,7 @@ async def maybe_open_fsp_cluster( ) -> AsyncGenerator[int, dict[str, tractor.Portal]]: uid = tractor.current_actor().uid - async with maybe_open_ctx( + async with maybe_open_context( key=uid, # for now make a cluster per client? mngr=open_fsp_cluster( workers,