diff --git a/piker/_cacheables.py b/piker/_cacheables.py index 47ffa6a4..d15e7c45 100644 --- a/piker/_cacheables.py +++ b/piker/_cacheables.py @@ -79,59 +79,6 @@ def async_lifo_cache(maxsize=128): _cache: dict[str, 'Client'] = {} # noqa -# XXX: this mis mostly an alt-implementation of -# maybe_open_ctx() below except it uses an async exit statck. -# ideally wer pick one or the other. -@asynccontextmanager -async def open_cached_client( - brokername: str, -) -> 'Client': # noqa - '''Get a cached broker client from the current actor's local vars. - - If one has not been setup do it and cache it. - ''' - global _cache - - clients = _cache.setdefault('clients', {'_lock': trio.Lock()}) - - # global cache task lock - lock = clients['_lock'] - - client = None - - try: - log.info(f"Loading existing `{brokername}` client") - - async with lock: - client = clients[brokername] - client._consumers += 1 - - yield client - - except KeyError: - log.info(f"Creating new client for broker {brokername}") - - async with lock: - brokermod = get_brokermod(brokername) - exit_stack = AsyncExitStack() - - client = await exit_stack.enter_async_context( - brokermod.get_client() - ) - client._consumers = 0 - client._exit_stack = exit_stack - clients[brokername] = client - - yield client - - finally: - if client is not None: - # if no more consumers, teardown the client - client._consumers -= 1 - if client._consumers <= 0: - await client._exit_stack.aclose() - - class cache: '''Globally (processs wide) cached, task access to a kept-alive-while-in-use async resource. @@ -227,3 +174,20 @@ async def maybe_open_ctx( # terminate mngr nursery cache.no_more_users.set() + + +@asynccontextmanager +async def open_cached_client( + brokername: str, +) -> 'Client': # noqa + '''Get a cached broker client from the current actor's local vars. + + If one has not been setup do it and cache it. + + ''' + brokermod = get_brokermod(brokername) + async with maybe_open_ctx( + key=brokername, + mngr=brokermod.get_client(), + ) as (cache_hit, client): + yield client