diff --git a/piker/_cacheables.py b/piker/_cacheables.py index f9d42f0a..995efbd3 100644 --- a/piker/_cacheables.py +++ b/piker/_cacheables.py @@ -146,9 +146,8 @@ async def maybe_open_ctx( key: Hashable, mngr: AsyncContextManager[T], - loglevel: str, -) -> T: +) -> (bool, T): '''Maybe open a context manager if there is not already a cached version for the provided ``key``. Return the cached instance on a cache hit. @@ -161,7 +160,7 @@ async def maybe_open_ctx( log.info(f'Reusing cached feed for {key}') try: cache.users += 1 - yield feed + yield True, feed finally: cache.users -= 1 if cache.users == 0: @@ -170,7 +169,7 @@ async def maybe_open_ctx( try: with get_and_use() as feed: - yield feed + yield True, feed except KeyError: # lock feed acquisition around task racing / ``trio``'s # scheduler protocol @@ -193,7 +192,7 @@ async def maybe_open_ctx( cache.ctxs[key] = value cache.lock.release() try: - yield value + yield True, value finally: # don't tear down the feed until there are zero # users of it left. diff --git a/piker/data/feed.py b/piker/data/feed.py index ed24a095..c56c0720 100644 --- a/piker/data/feed.py +++ b/piker/data/feed.py @@ -31,11 +31,14 @@ from typing import ( ) import trio +from trio.abc import ReceiveChannel from trio_typing import TaskStatus import tractor +from tractor import _broadcast from pydantic import BaseModel from ..brokers import get_brokermod +from .._cacheables import maybe_open_ctx from ..log import get_logger, get_console_log from .._daemon import ( maybe_spawn_brokerd, @@ -345,10 +348,10 @@ class Feed: memory buffer orchestration. """ name: str - stream: AsyncIterator[dict[str, Any]] shm: ShmArray mod: ModuleType first_quote: dict + stream: trio.abc.ReceiveChannel[dict[str, Any]] _brokerd_portal: tractor._portal.Portal _index_stream: Optional[AsyncIterator[int]] = None @@ -362,7 +365,7 @@ class Feed: symbols: dict[str, Symbol] = field(default_factory=dict) async def receive(self) -> dict: - return await self.stream.__anext__() + return await self.stream.receive() @asynccontextmanager async def index_stream( @@ -376,8 +379,10 @@ class Feed: # a lone broker-daemon per provider should be # created for all practical purposes async with self._brokerd_portal.open_stream_from( + iter_ohlc_periods, delay_s=delay_s or self._max_sample_rate, + ) as self._index_stream: yield self._index_stream @@ -395,7 +400,7 @@ def sym_to_shm_key( @asynccontextmanager async def install_brokerd_search( - portal: tractor._portal.Portal, + portal: tractor.Portal, brokermod: ModuleType, ) -> None: @@ -434,34 +439,21 @@ async def open_feed( loglevel: Optional[str] = None, tick_throttle: Optional[float] = None, # Hz + shielded_stream: bool = False, -) -> AsyncIterator[dict[str, Any]]: +) -> ReceiveChannel[dict[str, Any]]: ''' Open a "data feed" which provides streamed real-time quotes. ''' sym = symbols[0].lower() - # TODO: feed cache locking, right now this is causing - # issues when reconnecting to a long running emsd? - # global _searcher_cache - - # async with _cache_lock: - # feed = _searcher_cache.get((brokername, sym)) - - # # if feed is not None and sym in feed.symbols: - # if feed is not None: - # yield feed - # # short circuit - # return - try: mod = get_brokermod(brokername) except ImportError: mod = get_ingestormod(brokername) # no feed for broker exists so maybe spawn a data brokerd - async with ( maybe_spawn_brokerd( @@ -480,21 +472,25 @@ async def open_feed( ) as (ctx, (init_msg, first_quote)), - ctx.open_stream() as stream, - ): + ctx.open_stream(shield=shielded_stream) as stream, + ): # we can only read from shm shm = attach_shm_array( token=init_msg[sym]['shm_token'], readonly=True, ) + bstream = _broadcast.broadcast_receiver( + stream, + 2**10, + ) feed = Feed( name=brokername, - stream=stream, shm=shm, mod=mod, first_quote=first_quote, + stream=bstream, #brx_stream, _brokerd_portal=portal, ) ohlc_sample_rates = [] @@ -526,7 +522,43 @@ async def open_feed( feed._max_sample_rate = max(ohlc_sample_rates) try: - yield feed + yield feed, bstream finally: # drop the infinite stream connection await ctx.cancel() + + +@asynccontextmanager +async def maybe_open_feed( + + brokername: str, + symbols: Sequence[str], + loglevel: Optional[str] = None, + + tick_throttle: Optional[float] = None, # Hz + shielded_stream: bool = False, + +) -> ReceiveChannel[dict[str, Any]]: + '''Maybe open a data to a ``brokerd`` daemon only if there is no + local one for the broker-symbol pair, if one is cached use it wrapped + in a tractor broadcast receiver. + + ''' + sym = symbols[0].lower() + + async with maybe_open_ctx( + key=(brokername, sym), + mngr=open_feed( + brokername, + [sym], + loglevel=loglevel, + ), + ) as (cache_hit, (feed, stream)): + + if cache_hit: + # add a new broadcast subscription for the quote stream + # if this feed is likely already in use + async with stream.subscribe() as bstream: + yield feed, bstream + else: + yield feed, stream