diff --git a/piker/data/feed.py b/piker/data/feed.py index 55f8b9b9..75a37545 100644 --- a/piker/data/feed.py +++ b/piker/data/feed.py @@ -257,6 +257,7 @@ async def open_feed_bus( symbol: str, loglevel: str, tick_throttle: Optional[float] = None, + start_stream: bool = True, ) -> None: @@ -305,6 +306,9 @@ async def open_feed_bus( # deliver initial info message a first quote asap await ctx.started((init_msg, first_quotes)) + if not start_stream: + await trio.sleep_forever() + async with ( ctx.open_stream() as stream, trio.open_nursery() as n, @@ -490,6 +494,8 @@ async def open_feed( symbols: Sequence[str], loglevel: Optional[str] = None, + backpressure: bool = True, + start_stream: bool = True, tick_throttle: Optional[float] = None, # Hz ) -> Feed: @@ -518,7 +524,7 @@ async def open_feed( brokername=brokername, symbol=sym, loglevel=loglevel, - + start_stream=start_stream, tick_throttle=tick_throttle, ) as (ctx, (init_msg, first_quotes)), @@ -527,7 +533,7 @@ async def open_feed( # XXX: be explicit about stream backpressure since we should # **never** overrun on feeds being too fast, which will # pretty much always happen with HFT XD - backpressure=True + backpressure=backpressure, ) as stream, ): @@ -607,6 +613,9 @@ async def maybe_open_feed( 'symbols': [sym], 'loglevel': loglevel, 'tick_throttle': kwargs.get('tick_throttle'), + 'backpressure': kwargs.get('backpressure'), + 'backpressure': kwargs.get('backpressure'), + 'start_stream': kwargs.get('start_stream'), }, key=sym, ) as (cache_hit, feed):