Fix clients map typing annot
parent
5acd780eb6
commit
2bf95d7ec7
|
@ -28,6 +28,7 @@ from ..log import get_logger
|
|||
|
||||
log = get_logger(__name__)
|
||||
|
||||
_clients: Dict[str, 'Client'] = {}
|
||||
|
||||
@asynccontextmanager
|
||||
async def get_cached_client(
|
||||
|
@ -39,29 +40,38 @@ async def get_cached_client(
|
|||
|
||||
If one has not been setup do it and cache it.
|
||||
"""
|
||||
# check if a cached client is in the local actor's statespace
|
||||
ss = tractor.current_actor().statespace
|
||||
global _clients
|
||||
|
||||
clients = ss.setdefault('clients', {'_lock': trio.Lock()})
|
||||
lock = clients['_lock']
|
||||
|
||||
client = None
|
||||
|
||||
try:
|
||||
log.info(f"Loading existing `{brokername}` daemon")
|
||||
log.info(f"Loading existing `{brokername}` client")
|
||||
|
||||
async with lock:
|
||||
client = clients[brokername]
|
||||
client._consumers += 1
|
||||
|
||||
yield client
|
||||
|
||||
except KeyError:
|
||||
log.info(f"Creating new client for broker {brokername}")
|
||||
|
||||
async with lock:
|
||||
brokermod = get_brokermod(brokername)
|
||||
exit_stack = AsyncExitStack()
|
||||
|
||||
client = await exit_stack.enter_async_context(
|
||||
brokermod.get_client()
|
||||
)
|
||||
client._consumers = 0
|
||||
client._exit_stack = exit_stack
|
||||
clients[brokername] = client
|
||||
|
||||
yield client
|
||||
|
||||
finally:
|
||||
client._consumers -= 1
|
||||
if client._consumers <= 0:
|
||||
|
|
Loading…
Reference in New Issue