diff --git a/piker/_cacheables.py b/piker/_cacheables.py index d4e6ccfa..d86b477d 100644 --- a/piker/_cacheables.py +++ b/piker/_cacheables.py @@ -23,19 +23,20 @@ Cacheing apis and toolz. from collections import OrderedDict from typing import ( - Optional, + Any, Hashable, + Optional, TypeVar, AsyncContextManager, - AsyncIterable, ) from contextlib import ( asynccontextmanager, AsyncExitStack, - contextmanager, ) import trio +from trio_typing import TaskStatus +from tractor._portal import maybe_open_nursery from .brokers import get_brokermod from .log import get_logger @@ -132,14 +133,35 @@ async def open_cached_client( class cache: '''Globally (processs wide) cached, task access to a - kept-alive-while-in-use data feed. + kept-alive-while-in-use async resource. ''' lock = trio.Lock() users: int = 0 - ctxs: dict[tuple[str, str], AsyncIterable] = {} + values: dict[tuple[str, str], tuple[AsyncExitStack, Any]] = {} + nurseries: dict[int, Optional[trio.Nursery]] = {} no_more_users: Optional[trio.Event] = None + @classmethod + async def run_ctx( + cls, + mng, + key, + task_status: TaskStatus[T] = trio.TASK_STATUS_IGNORED, + + ) -> None: + async with mng as value: + + cls.no_more_users = trio.Event() + cls.values[key] = value + task_status.started(value) + try: + await cls.no_more_users.wait() + finally: + value = cls.values.pop(key) + # discard nursery ref so it won't be re-used (an error) + cls.nurseries.pop(id(mng)) + @asynccontextmanager async def maybe_open_ctx( @@ -153,51 +175,48 @@ async def maybe_open_ctx( a cache hit. ''' - @contextmanager - def get_and_use() -> AsyncIterable[T]: - # key error must bubble here - value = cache.ctxs[key] - log.info(f'Reusing cached feed for {key}') - try: - cache.users += 1 - yield value - finally: - cache.users -= 1 - if cache.users == 0: - # signal to original allocator task feed use is complete - cache.no_more_users.set() - try: - with get_and_use() as value: - yield True, value - except KeyError: - # lock feed acquisition around task racing / ``trio``'s - # scheduler protocol - await cache.lock.acquire() + await cache.lock.acquire() + + ctx_key = id(mngr) + + # TODO: does this need to be a tractor "root nursery"? + async with maybe_open_nursery(cache.nurseries.get(ctx_key)) as n: + cache.nurseries[ctx_key] = n + + value = None try: - with get_and_use() as value: - cache.lock.release() - yield True, value - return + # lock feed acquisition around task racing / ``trio``'s + # scheduler protocol + value = cache.values[key] + log.info(f'Reusing cached feed for {key}') + cache.users += 1 + cache.lock.release() + yield True, value except KeyError: + log.info(f'Allocating new feed for {key}') + # **critical section** that should prevent other tasks from # checking the cache until complete otherwise the scheduler # may switch and by accident we create more then one feed. - cache.no_more_users = trio.Event() - log.info(f'Allocating new feed for {key}') - # TODO: eventually support N-brokers - async with mngr as value: - cache.ctxs[key] = value + value = await n.start(cache.run_ctx, mngr, key) + cache.users += 1 + cache.lock.release() + + yield False, value + + finally: + cache.users -= 1 + + if cache.lock.locked(): cache.lock.release() - try: - yield True, value - finally: - # don't tear down the feed until there are zero - # users of it left. - if cache.users > 0: - await cache.no_more_users.wait() - log.warning('De-allocating feed for {key}') - cache.ctxs.pop(key) + if value is not None: + # if no more consumers, teardown the client + if cache.users <= 0: + log.warning(f'De-allocating feed for {key}') + + # terminate mngr nursery + cache.no_more_users.set()