Support no realtime stream sending with feed bus

async_hist_loading
Tyler Goodlet 2022-02-18 12:16:07 -05:00
parent 7252094f90
commit c2a13c474c
1 changed files with 11 additions and 2 deletions

View File

@ -257,6 +257,7 @@ async def open_feed_bus(
symbol: str,
loglevel: str,
tick_throttle: Optional[float] = None,
start_stream: bool = True,
) -> None:
@ -305,6 +306,9 @@ async def open_feed_bus(
# deliver initial info message a first quote asap
await ctx.started((init_msg, first_quotes))
if not start_stream:
await trio.sleep_forever()
async with (
ctx.open_stream() as stream,
trio.open_nursery() as n,
@ -490,6 +494,8 @@ async def open_feed(
symbols: Sequence[str],
loglevel: Optional[str] = None,
backpressure: bool = True,
start_stream: bool = True,
tick_throttle: Optional[float] = None, # Hz
) -> Feed:
@ -518,7 +524,7 @@ async def open_feed(
brokername=brokername,
symbol=sym,
loglevel=loglevel,
start_stream=start_stream,
tick_throttle=tick_throttle,
) as (ctx, (init_msg, first_quotes)),
@ -527,7 +533,7 @@ async def open_feed(
# XXX: be explicit about stream backpressure since we should
# **never** overrun on feeds being too fast, which will
# pretty much always happen with HFT XD
backpressure=True
backpressure=backpressure,
) as stream,
):
@ -607,6 +613,9 @@ async def maybe_open_feed(
'symbols': [sym],
'loglevel': loglevel,
'tick_throttle': kwargs.get('tick_throttle'),
'backpressure': kwargs.get('backpressure'),
'backpressure': kwargs.get('backpressure'),
'start_stream': kwargs.get('start_stream'),
},
key=sym,
) as (cache_hit, feed):