diff --git a/piker/data/feed.py b/piker/data/feed.py index 44f93a72..a519b395 100644 --- a/piker/data/feed.py +++ b/piker/data/feed.py @@ -355,6 +355,31 @@ async def attach_feed_bus( bus._subscribers[symbol].remove(sub) +@asynccontextmanager +async def open_sample_step_stream( + portal: tractor.Portal, + delay_s: int, + +) -> tractor.ReceiveMsgStream: + # XXX: this should be singleton on a host, + # a lone broker-daemon per provider should be + # created for all practical purposes + async with maybe_open_ctx( + key=delay_s, + mngr=portal.open_stream_from( + iter_ohlc_periods, + delay_s=delay_s, # must be kwarg + ), + ) as (cache_hit, istream): + if cache_hit: + # add a new broadcast subscription for the quote stream + # if this feed is likely already in use + async with istream.subscribe() as bistream: + yield bistream + else: + yield istream + + @dataclass class Feed: """A data feed for client-side interaction with far-process# }}} @@ -371,7 +396,6 @@ class Feed: stream: trio.abc.ReceiveChannel[dict[str, Any]] _brokerd_portal: tractor._portal.Portal - _index_stream: Optional[AsyncIterator[int]] = None _trade_stream: Optional[AsyncIterator[dict[str, Any]]] = None _max_sample_rate: int = 0 @@ -391,20 +415,13 @@ class Feed: ) -> AsyncIterator[int]: - if not self._index_stream: - # XXX: this should be singleton on a host, - # a lone broker-daemon per provider should be - # created for all practical purposes - async with self._brokerd_portal.open_stream_from( + delay_s = delay_s or self._max_sample_rate - iter_ohlc_periods, - delay_s=delay_s or self._max_sample_rate, - - ) as self._index_stream: - - yield self._index_stream - else: - yield self._index_stream + async with open_sample_step_stream( + self._brokerd_portal, + delay_s, + ) as istream: + yield istream async def pause(self) -> None: await self.stream.send('pause')