Fix up `@async_lifo_cache` typing, add TODOs for move to `tractor`

rekt_pps
Tyler Goodlet 2023-04-19 15:08:10 -04:00
parent 1b50bff625
commit bcf355e2c8
1 changed files with 56 additions and 11 deletions

View File

@ -1,5 +1,5 @@
# piker: trading gear for hackers
# Copyright (C) Tyler Goodlet (in stewardship for piker0)
# Copyright (C) Tyler Goodlet (in stewardship for pikers)
# This program is free software: you can redistribute it and/or modify
# it under the terms of the GNU Affero General Public License as published by
@ -14,15 +14,21 @@
# You should have received a copy of the GNU Affero General Public License
# along with this program. If not, see <https://www.gnu.org/licenses/>.
"""
'''
Cacheing apis and toolz.
"""
'''
from collections import OrderedDict
from contextlib import (
asynccontextmanager as acm,
)
from typing import (
Awaitable,
Callable,
ParamSpec,
TypeVar,
)
from tractor.trionics import maybe_open_context
@ -32,19 +38,54 @@ from .log import get_logger
log = get_logger(__name__)
T = TypeVar("T")
P = ParamSpec("P")
def async_lifo_cache(maxsize=128):
"""Async ``cache`` with a LIFO policy.
# TODO: move this to `tractor.trionics`..
# - egs. to replicate for tests: https://github.com/aio-libs/async-lru#usage
# - their suite as well:
# https://github.com/aio-libs/async-lru/tree/master/tests
# - asked trio_util about it too:
# https://github.com/groove-x/trio-util/issues/21
def async_lifo_cache(
maxsize=128,
# NOTE: typing style was learned from:
# https://stackoverflow.com/a/71132186
) -> Callable[
Callable[P, Awaitable[T]],
Callable[
Callable[P, Awaitable[T]],
Callable[P, Awaitable[T]],
],
]:
'''
Async ``cache`` with a LIFO policy.
Implemented my own since no one else seems to have
a standard. I'll wait for the smarter people to come
up with one, but until then...
"""
NOTE: when decorating, due to this simple/naive implementation, you
MUST call the decorator like,
.. code:: python
@async_lifo_cache()
async def cache_target():
'''
cache = OrderedDict()
def decorator(fn):
def decorator(
fn: Callable[P, Awaitable[T]],
) -> Callable[P, Awaitable[T]]:
async def wrapper(*args):
async def decorated(
*args: P.args,
**kwargs: P.kwargs,
) -> T:
key = args
try:
return cache[key]
@ -53,15 +94,19 @@ def async_lifo_cache(maxsize=128):
# discard last added new entry
cache.popitem()
# do it
cache[key] = await fn(*args)
# call underlying
cache[key] = await fn(
*args,
**kwargs,
)
return cache[key]
return wrapper
return decorated
return decorator
# TODO: move this to `.brokers.utils`..
@acm
async def open_cached_client(
brokername: str,