Re-implement client caching using `maybe_open_ctx`
parent
c3682348fe
commit
2df16e11ed
|
@ -79,59 +79,6 @@ def async_lifo_cache(maxsize=128):
|
||||||
_cache: dict[str, 'Client'] = {} # noqa
|
_cache: dict[str, 'Client'] = {} # noqa
|
||||||
|
|
||||||
|
|
||||||
# XXX: this mis mostly an alt-implementation of
|
|
||||||
# maybe_open_ctx() below except it uses an async exit statck.
|
|
||||||
# ideally wer pick one or the other.
|
|
||||||
@asynccontextmanager
|
|
||||||
async def open_cached_client(
|
|
||||||
brokername: str,
|
|
||||||
) -> 'Client': # noqa
|
|
||||||
'''Get a cached broker client from the current actor's local vars.
|
|
||||||
|
|
||||||
If one has not been setup do it and cache it.
|
|
||||||
'''
|
|
||||||
global _cache
|
|
||||||
|
|
||||||
clients = _cache.setdefault('clients', {'_lock': trio.Lock()})
|
|
||||||
|
|
||||||
# global cache task lock
|
|
||||||
lock = clients['_lock']
|
|
||||||
|
|
||||||
client = None
|
|
||||||
|
|
||||||
try:
|
|
||||||
log.info(f"Loading existing `{brokername}` client")
|
|
||||||
|
|
||||||
async with lock:
|
|
||||||
client = clients[brokername]
|
|
||||||
client._consumers += 1
|
|
||||||
|
|
||||||
yield client
|
|
||||||
|
|
||||||
except KeyError:
|
|
||||||
log.info(f"Creating new client for broker {brokername}")
|
|
||||||
|
|
||||||
async with lock:
|
|
||||||
brokermod = get_brokermod(brokername)
|
|
||||||
exit_stack = AsyncExitStack()
|
|
||||||
|
|
||||||
client = await exit_stack.enter_async_context(
|
|
||||||
brokermod.get_client()
|
|
||||||
)
|
|
||||||
client._consumers = 0
|
|
||||||
client._exit_stack = exit_stack
|
|
||||||
clients[brokername] = client
|
|
||||||
|
|
||||||
yield client
|
|
||||||
|
|
||||||
finally:
|
|
||||||
if client is not None:
|
|
||||||
# if no more consumers, teardown the client
|
|
||||||
client._consumers -= 1
|
|
||||||
if client._consumers <= 0:
|
|
||||||
await client._exit_stack.aclose()
|
|
||||||
|
|
||||||
|
|
||||||
class cache:
|
class cache:
|
||||||
'''Globally (processs wide) cached, task access to a
|
'''Globally (processs wide) cached, task access to a
|
||||||
kept-alive-while-in-use async resource.
|
kept-alive-while-in-use async resource.
|
||||||
|
@ -227,3 +174,20 @@ async def maybe_open_ctx(
|
||||||
|
|
||||||
# terminate mngr nursery
|
# terminate mngr nursery
|
||||||
cache.no_more_users.set()
|
cache.no_more_users.set()
|
||||||
|
|
||||||
|
|
||||||
|
@asynccontextmanager
|
||||||
|
async def open_cached_client(
|
||||||
|
brokername: str,
|
||||||
|
) -> 'Client': # noqa
|
||||||
|
'''Get a cached broker client from the current actor's local vars.
|
||||||
|
|
||||||
|
If one has not been setup do it and cache it.
|
||||||
|
|
||||||
|
'''
|
||||||
|
brokermod = get_brokermod(brokername)
|
||||||
|
async with maybe_open_ctx(
|
||||||
|
key=brokername,
|
||||||
|
mngr=brokermod.get_client(),
|
||||||
|
) as (cache_hit, client):
|
||||||
|
yield client
|
||||||
|
|
Loading…
Reference in New Issue