Factor out context cacher to `tractor.trionics`

fspd_cluster
Tyler Goodlet 2021-10-28 09:51:02 -04:00
parent 162c58a8d8
commit 8f023cd66f
3 changed files with 7 additions and 125 deletions

View File

@ -18,30 +18,18 @@
Cacheing apis and toolz. Cacheing apis and toolz.
""" """
# further examples of interest:
# https://gist.github.com/njsmith/cf6fc0a97f53865f2c671659c88c1798#file-cache-py-L8
from collections import OrderedDict from collections import OrderedDict
from typing import (
Any,
Hashable,
Optional,
TypeVar,
AsyncContextManager,
)
from contextlib import ( from contextlib import (
asynccontextmanager, asynccontextmanager,
) )
import trio from tractor.trionics import maybe_open_context
from trio_typing import TaskStatus
import tractor
from .brokers import get_brokermod from .brokers import get_brokermod
from .log import get_logger from .log import get_logger
T = TypeVar('T')
log = get_logger(__name__) log = get_logger(__name__)
@ -74,112 +62,6 @@ def async_lifo_cache(maxsize=128):
return decorator return decorator
_cache: dict[str, 'Client'] = {} # noqa
class cache:
'''Globally (processs wide) cached, task access to a
kept-alive-while-in-use async resource.
'''
lock = trio.Lock()
users: int = 0
values: dict[Any, Any] = {}
resources: dict[
int,
Optional[tuple[trio.Nursery, trio.Event]]
] = {}
no_more_users: Optional[trio.Event] = None
@classmethod
async def run_ctx(
cls,
mng,
key,
task_status: TaskStatus[T] = trio.TASK_STATUS_IGNORED,
) -> None:
async with mng as value:
_, no_more_users = cls.resources[id(mng)]
cls.values[key] = value
task_status.started(value)
try:
await no_more_users.wait()
finally:
value = cls.values.pop(key)
# discard nursery ref so it won't be re-used (an error)
cls.resources.pop(id(mng))
@asynccontextmanager
async def maybe_open_ctx(
key: Hashable,
mngr: AsyncContextManager[T],
) -> (bool, T):
'''Maybe open a context manager if there is not already a cached
version for the provided ``key``. Return the cached instance on
a cache hit.
'''
await cache.lock.acquire()
ctx_key = id(mngr)
value = None
try:
# lock feed acquisition around task racing / ``trio``'s
# scheduler protocol
value = cache.values[key]
log.info(f'Reusing cached resource for {key}')
cache.users += 1
cache.lock.release()
yield True, value
except KeyError:
log.info(f'Allocating new resource for {key}')
# **critical section** that should prevent other tasks from
# checking the cache until complete otherwise the scheduler
# may switch and by accident we create more then one feed.
# TODO: avoid pulling from ``tractor`` internals and
# instead offer a "root nursery" in piker actors?
service_n = tractor.current_actor()._service_n
# TODO: does this need to be a tractor "root nursery"?
ln = cache.resources.get(ctx_key)
assert not ln
ln, _ = cache.resources[ctx_key] = (service_n, trio.Event())
value = await ln.start(cache.run_ctx, mngr, key)
cache.users += 1
cache.lock.release()
yield False, value
finally:
cache.users -= 1
if cache.lock.locked():
cache.lock.release()
if value is not None:
# if no more consumers, teardown the client
if cache.users <= 0:
log.warning(f'De-allocating resource for {key}')
# terminate mngr nursery
entry = cache.resources.get(ctx_key)
if entry:
_, no_more_users = entry
no_more_users.set()
@asynccontextmanager @asynccontextmanager
async def open_cached_client( async def open_cached_client(
brokername: str, brokername: str,
@ -190,7 +72,7 @@ async def open_cached_client(
''' '''
brokermod = get_brokermod(brokername) brokermod = get_brokermod(brokername)
async with maybe_open_ctx( async with maybe_open_context(
key=brokername, key=brokername,
mngr=brokermod.get_client(), mngr=brokermod.get_client(),
) as (cache_hit, client): ) as (cache_hit, client):

View File

@ -37,7 +37,7 @@ import tractor
from pydantic import BaseModel from pydantic import BaseModel
from ..brokers import get_brokermod from ..brokers import get_brokermod
from .._cacheables import maybe_open_ctx from .._cacheables import maybe_open_context
from ..log import get_logger, get_console_log from ..log import get_logger, get_console_log
from .._daemon import ( from .._daemon import (
maybe_spawn_brokerd, maybe_spawn_brokerd,
@ -368,7 +368,7 @@ async def open_sample_step_stream(
# XXX: this should be singleton on a host, # XXX: this should be singleton on a host,
# a lone broker-daemon per provider should be # a lone broker-daemon per provider should be
# created for all practical purposes # created for all practical purposes
async with maybe_open_ctx( async with maybe_open_context(
key=delay_s, key=delay_s,
mngr=portal.open_stream_from( mngr=portal.open_stream_from(
iter_ohlc_periods, iter_ohlc_periods,
@ -590,7 +590,7 @@ async def maybe_open_feed(
''' '''
sym = symbols[0].lower() sym = symbols[0].lower()
async with maybe_open_ctx( async with maybe_open_context(
key=(brokername, sym), key=(brokername, sym),
mngr=open_feed( mngr=open_feed(
brokername, brokername,

View File

@ -35,7 +35,7 @@ import tractor
import trio import trio
from .. import brokers from .. import brokers
from .._cacheables import maybe_open_ctx from .._cacheables import maybe_open_context
from ..trionics import async_enter_all from ..trionics import async_enter_all
from ..data.feed import open_feed, Feed from ..data.feed import open_feed, Feed
from ._chart import ( from ._chart import (
@ -555,7 +555,7 @@ async def maybe_open_fsp_cluster(
) -> AsyncGenerator[int, dict[str, tractor.Portal]]: ) -> AsyncGenerator[int, dict[str, tractor.Portal]]:
uid = tractor.current_actor().uid uid = tractor.current_actor().uid
async with maybe_open_ctx( async with maybe_open_context(
key=uid, # for now make a cluster per client? key=uid, # for now make a cluster per client?
mngr=open_fsp_cluster( mngr=open_fsp_cluster(
workers, workers,