Add lifo cache to new module; drop "utils", bleh

pause_feeds_on_sym_switch
Tyler Goodlet 2021-08-09 14:34:26 -04:00
parent 0ce8057823
commit 68ce5b3550
3 changed files with 31 additions and 68 deletions

View File

@ -1,66 +0,0 @@
# piker: trading gear for hackers
# Copyright (C) 2018-present Tyler Goodlet (in stewardship of piker0)
# This program is free software: you can redistribute it and/or modify
# it under the terms of the GNU Affero General Public License as published by
# the Free Software Foundation, either version 3 of the License, or
# (at your option) any later version.
# This program is distributed in the hope that it will be useful,
# but WITHOUT ANY WARRANTY; without even the implied warranty of
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
# GNU Affero General Public License for more details.
# You should have received a copy of the GNU Affero General Public License
# along with this program. If not, see <https://www.gnu.org/licenses/>.
"""
Async utils no one seems to have built into a core lib (yet).
"""
from typing import AsyncContextManager
from collections import OrderedDict
from contextlib import asynccontextmanager
def async_lifo_cache(maxsize=128):
"""Async ``cache`` with a LIFO policy.
Implemented my own since no one else seems to have
a standard. I'll wait for the smarter people to come
up with one, but until then...
"""
cache = OrderedDict()
def decorator(fn):
async def wrapper(*args):
key = args
try:
return cache[key]
except KeyError:
if len(cache) >= maxsize:
# discard last added new entry
cache.popitem()
# do it
cache[key] = await fn(*args)
return cache[key]
return wrapper
return decorator
@asynccontextmanager
async def _just_none():
# noop -> skip entering context
yield None
@asynccontextmanager
async def maybe_with_if(
predicate: bool,
context: AsyncContextManager,
) -> AsyncContextManager:
async with context if predicate else _just_none() as output:
yield output

View File

@ -18,6 +18,7 @@
Cacheing apis and toolz. Cacheing apis and toolz.
""" """
from collections import OrderedDict
from typing import Optional from typing import Optional
from contextlib import ( from contextlib import (
asynccontextmanager, asynccontextmanager,
@ -36,6 +37,35 @@ from .data.feed import Feed
log = get_logger(__name__) log = get_logger(__name__)
def async_lifo_cache(maxsize=128):
"""Async ``cache`` with a LIFO policy.
Implemented my own since no one else seems to have
a standard. I'll wait for the smarter people to come
up with one, but until then...
"""
cache = OrderedDict()
def decorator(fn):
async def wrapper(*args):
key = args
try:
return cache[key]
except KeyError:
if len(cache) >= maxsize:
# discard last added new entry
cache.popitem()
# do it
cache[key] = await fn(*args)
return cache[key]
return wrapper
return decorator
_cache: dict[str, 'Client'] = {} # noqa _cache: dict[str, 'Client'] = {} # noqa

View File

@ -42,11 +42,10 @@ import wrapt
import asks import asks
from ..calc import humanize, percent_change from ..calc import humanize, percent_change
from .._cacheables import open_cached_client from .._cacheables import open_cached_client, async_lifo_cache
from . import config from . import config
from ._util import resproc, BrokerError, SymbolNotFound from ._util import resproc, BrokerError, SymbolNotFound
from ..log import get_logger, colorize_json, get_console_log from ..log import get_logger, colorize_json, get_console_log
from .._async_utils import async_lifo_cache
from . import get_brokermod from . import get_brokermod
from . import api from . import api