diff --git a/piker/_cacheables.py b/piker/_cacheables.py index 1b756f8f..be1934ca 100644 --- a/piker/_cacheables.py +++ b/piker/_cacheables.py @@ -19,7 +19,7 @@ Cacheing apis and toolz. """ from collections import OrderedDict -from typing import Optional +from typing import Optional, Hashable, TypeVar, AsyncContextManager from contextlib import ( asynccontextmanager, AsyncExitStack, @@ -30,10 +30,10 @@ import trio from .brokers import get_brokermod from .log import get_logger -from . import data from .data.feed import Feed +T = TypeVar('T') log = get_logger(__name__) @@ -69,16 +69,17 @@ def async_lifo_cache(maxsize=128): _cache: dict[str, 'Client'] = {} # noqa +# XXX: this mis mostly an alt-implementation of +# maybe_open_ctx() below except it uses an async exit statck. +# ideally wer pick one or the other. @asynccontextmanager async def open_cached_client( brokername: str, - *args, - **kwargs, ) -> 'Client': # noqa - """Get a cached broker client from the current actor's local vars. + '''Get a cached broker client from the current actor's local vars. If one has not been setup do it and cache it. - """ + ''' global _cache clients = _cache.setdefault('clients', {'_lock': trio.Lock()}) @@ -128,25 +129,27 @@ class cache: ''' lock = trio.Lock() users: int = 0 - feeds: dict[tuple[str, str], Feed] = {} + ctxs: dict[tuple[str, str], Feed] = {} no_more_users: Optional[trio.Event] = None @asynccontextmanager -async def maybe_open_feed( +async def maybe_open_ctx( - broker: str, - symbol: str, + key: Hashable, + mngr: AsyncContextManager[T], loglevel: str, -) -> Feed: - - key = (broker, symbol) +) -> T: + '''Maybe open a context manager if there is not already a cached + version for the provided ``key``. Return the cached instance on + a cache hit. + ''' @contextmanager def get_and_use() -> Feed: # key error must bubble here - feed = cache.feeds[key] + feed = cache.ctxs[key] log.info(f'Reusing cached feed for {key}') try: cache.users += 1 @@ -174,22 +177,15 @@ async def maybe_open_feed( # **critical section** that should prevent other tasks from # checking the cache until complete otherwise the scheduler # may switch and by accident we create more then one feed. - cache.no_more_users = trio.Event() log.info(f'Allocating new feed for {key}') # TODO: eventually support N-brokers - async with ( - data.open_feed( - broker, - [symbol], - loglevel=loglevel, - ) as feed, - ): - cache.feeds[key] = feed + async with mngr as value: + cache.ctxs[key] = value cache.lock.release() try: - yield feed + yield value finally: # don't tear down the feed until there are zero # users of it left. @@ -197,4 +193,4 @@ async def maybe_open_feed( await cache.no_more_users.wait() log.warning('De-allocating feed for {key}') - cache.feeds.pop(key) + cache.ctxs.pop(key) diff --git a/piker/clearing/_ems.py b/piker/clearing/_ems.py index ad5c6cf4..06536859 100644 --- a/piker/clearing/_ems.py +++ b/piker/clearing/_ems.py @@ -32,9 +32,9 @@ import tractor from ..log import get_logger from ..data._normalize import iterticks -from ..data.feed import Feed +from ..data.feed import Feed, open_feed from .._daemon import maybe_spawn_brokerd -from .._cacheables import maybe_open_feed +from .._cacheables import maybe_open_ctx from . import _paper_engine as paper from ._messages import ( Status, Order, @@ -959,9 +959,13 @@ async def _emsd_main( # spawn one task per broker feed async with ( - maybe_open_feed( - broker, - symbol, + maybe_open_ctx( + key=(broker, symbol), + mngr=open_feed( + broker, + [symbol], + loglevel=loglevel, + ), loglevel=loglevel, ) as feed, ):