diff --git a/piker/data/flows.py b/piker/data/flows.py index 23fa4207..9bb27230 100644 --- a/piker/data/flows.py +++ b/piker/data/flows.py @@ -21,6 +21,7 @@ real-time data processing data-structures. "Streams, flumes, cascades and flows.." """ +from __future__ import annotations from contextlib import asynccontextmanager as acm from functools import partial from typing import ( @@ -45,7 +46,7 @@ from ._sharedmem import ( _Token, ) from ._sampling import ( - iter_ohlc_periods, + open_sample_stream, ) if TYPE_CHECKING: @@ -149,7 +150,7 @@ class Flume(Struct): @acm async def index_stream( self, - delay_s: int = 1, + delay_s: float = 1, ) -> AsyncIterator[int]: @@ -163,21 +164,8 @@ class Flume(Struct): # XXX: this should be singleton on a host, # a lone broker-daemon per provider should be # created for all practical purposes - async with maybe_open_context( - acm_func=partial( - portal.open_context, - iter_ohlc_periods, - ), - kwargs={'delay_s': delay_s}, - ) as (cache_hit, (ctx, first)): - async with ctx.open_stream() as istream: - if cache_hit: - # add a new broadcast subscription for the quote stream - # if this feed is likely already in use - async with istream.subscribe() as bistream: - yield bistream - else: - yield istream + async with open_sample_stream(float(delay_s)) as stream: + yield stream def get_ds_info( self,