piker/piker/_cacheables.py

205 lines
5.5 KiB
Python
Raw Normal View History

2020-11-06 17:23:14 +00:00
# piker: trading gear for hackers
2021-02-21 17:32:40 +00:00
# Copyright (C) Tyler Goodlet (in stewardship for piker0)
2020-11-06 17:23:14 +00:00
# This program is free software: you can redistribute it and/or modify
# it under the terms of the GNU Affero General Public License as published by
# the Free Software Foundation, either version 3 of the License, or
# (at your option) any later version.
# This program is distributed in the hope that it will be useful,
# but WITHOUT ANY WARRANTY; without even the implied warranty of
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
# GNU Affero General Public License for more details.
# You should have received a copy of the GNU Affero General Public License
# along with this program. If not, see <https://www.gnu.org/licenses/>.
2020-08-21 18:28:02 +00:00
"""
2021-08-09 15:43:45 +00:00
Cacheing apis and toolz.
2021-02-21 17:32:40 +00:00
2020-08-21 18:28:02 +00:00
"""
2021-08-10 12:51:03 +00:00
# further examples of interest:
# https://gist.github.com/njsmith/cf6fc0a97f53865f2c671659c88c1798#file-cache-py-L8
from collections import OrderedDict
2021-08-10 13:32:59 +00:00
from typing import (
Optional,
Hashable,
TypeVar,
AsyncContextManager,
AsyncIterable,
)
from contextlib import (
asynccontextmanager,
AsyncExitStack,
contextmanager,
)
2020-08-21 18:28:02 +00:00
import trio
2021-08-09 15:43:45 +00:00
from .brokers import get_brokermod
from .log import get_logger
2020-08-21 18:28:02 +00:00
2021-08-09 23:27:42 +00:00
T = TypeVar('T')
2020-08-21 18:28:02 +00:00
log = get_logger(__name__)
def async_lifo_cache(maxsize=128):
"""Async ``cache`` with a LIFO policy.
Implemented my own since no one else seems to have
a standard. I'll wait for the smarter people to come
up with one, but until then...
"""
cache = OrderedDict()
def decorator(fn):
async def wrapper(*args):
key = args
try:
return cache[key]
except KeyError:
if len(cache) >= maxsize:
# discard last added new entry
cache.popitem()
# do it
cache[key] = await fn(*args)
return cache[key]
return wrapper
return decorator
_cache: dict[str, 'Client'] = {} # noqa
2020-08-21 18:28:02 +00:00
2021-08-09 23:27:42 +00:00
# XXX: this mis mostly an alt-implementation of
# maybe_open_ctx() below except it uses an async exit statck.
# ideally wer pick one or the other.
2020-08-21 18:28:02 +00:00
@asynccontextmanager
async def open_cached_client(
2020-08-21 18:28:02 +00:00
brokername: str,
) -> 'Client': # noqa
2021-08-09 23:27:42 +00:00
'''Get a cached broker client from the current actor's local vars.
2020-08-21 18:28:02 +00:00
If one has not been setup do it and cache it.
2021-08-09 23:27:42 +00:00
'''
2021-02-21 17:32:40 +00:00
global _cache
clients = _cache.setdefault('clients', {'_lock': trio.Lock()})
2021-01-16 01:57:25 +00:00
2021-02-21 17:32:40 +00:00
# global cache task lock
2020-08-21 18:28:02 +00:00
lock = clients['_lock']
2021-01-16 01:57:25 +00:00
2020-08-21 18:28:02 +00:00
client = None
2021-01-16 01:57:25 +00:00
2020-08-21 18:28:02 +00:00
try:
2021-01-16 01:57:25 +00:00
log.info(f"Loading existing `{brokername}` client")
2020-08-21 18:28:02 +00:00
async with lock:
client = clients[brokername]
client._consumers += 1
2021-01-16 01:57:25 +00:00
2020-08-21 18:28:02 +00:00
yield client
2021-01-16 01:57:25 +00:00
2020-08-21 18:28:02 +00:00
except KeyError:
log.info(f"Creating new client for broker {brokername}")
2021-01-16 01:57:25 +00:00
2020-08-21 18:28:02 +00:00
async with lock:
brokermod = get_brokermod(brokername)
exit_stack = AsyncExitStack()
2021-01-16 01:57:25 +00:00
2020-08-21 18:28:02 +00:00
client = await exit_stack.enter_async_context(
brokermod.get_client()
)
client._consumers = 0
client._exit_stack = exit_stack
clients[brokername] = client
2021-01-16 01:57:25 +00:00
yield client
2021-01-16 01:57:25 +00:00
2020-08-21 18:28:02 +00:00
finally:
if client is not None:
# if no more consumers, teardown the client
client._consumers -= 1
if client._consumers <= 0:
await client._exit_stack.aclose()
class cache:
'''Globally (processs wide) cached, task access to a
kept-alive-while-in-use data feed.
'''
lock = trio.Lock()
users: int = 0
2021-08-10 13:32:59 +00:00
ctxs: dict[tuple[str, str], AsyncIterable] = {}
no_more_users: Optional[trio.Event] = None
@asynccontextmanager
2021-08-09 23:27:42 +00:00
async def maybe_open_ctx(
2021-08-09 23:27:42 +00:00
key: Hashable,
mngr: AsyncContextManager[T],
loglevel: str,
2021-08-09 23:27:42 +00:00
) -> T:
'''Maybe open a context manager if there is not already a cached
version for the provided ``key``. Return the cached instance on
a cache hit.
2021-08-09 23:27:42 +00:00
'''
@contextmanager
2021-08-10 13:32:59 +00:00
def get_and_use() -> AsyncIterable[T]:
# key error must bubble here
2021-08-09 23:27:42 +00:00
feed = cache.ctxs[key]
log.info(f'Reusing cached feed for {key}')
try:
cache.users += 1
yield feed
finally:
cache.users -= 1
if cache.users == 0:
# signal to original allocator task feed use is complete
cache.no_more_users.set()
try:
with get_and_use() as feed:
yield feed
except KeyError:
# lock feed acquisition around task racing / ``trio``'s
# scheduler protocol
await cache.lock.acquire()
try:
with get_and_use() as feed:
cache.lock.release()
yield feed
return
except KeyError:
# **critical section** that should prevent other tasks from
# checking the cache until complete otherwise the scheduler
# may switch and by accident we create more then one feed.
cache.no_more_users = trio.Event()
log.info(f'Allocating new feed for {key}')
# TODO: eventually support N-brokers
2021-08-09 23:27:42 +00:00
async with mngr as value:
cache.ctxs[key] = value
cache.lock.release()
try:
2021-08-09 23:27:42 +00:00
yield value
finally:
# don't tear down the feed until there are zero
# users of it left.
if cache.users > 0:
await cache.no_more_users.wait()
log.warning('De-allocating feed for {key}')
2021-08-09 23:27:42 +00:00
cache.ctxs.pop(key)