Add a configurable timeout around backend live feed startup

For now make it a larger value but ideally in the long run we can tune
it to specific backends and expose it in the config(s).
basic_buy_bot
Tyler Goodlet 2023-06-08 11:53:37 -04:00
parent fda7111305
commit 34dd6ffc22
1 changed files with 17 additions and 14 deletions

View File

@ -226,6 +226,7 @@ async def allocate_persistent_feed(
loglevel: str, loglevel: str,
start_stream: bool = True, start_stream: bool = True,
init_timeout: float = 616,
task_status: TaskStatus[FeedInit] = trio.TASK_STATUS_IGNORED, task_status: TaskStatus[FeedInit] = trio.TASK_STATUS_IGNORED,
@ -267,22 +268,23 @@ async def allocate_persistent_feed(
# TODO: probably make a struct msg type for this as well # TODO: probably make a struct msg type for this as well
# since eventually we do want to have more efficient IPC.. # since eventually we do want to have more efficient IPC..
first_quote: dict[str, Any] first_quote: dict[str, Any]
( with trio.fail_after(init_timeout):
init_msgs, (
first_quote, init_msgs,
) = await bus.nursery.start( first_quote,
partial( ) = await bus.nursery.start(
mod.stream_quotes, partial(
send_chan=send, mod.stream_quotes,
feed_is_live=feed_is_live, send_chan=send,
feed_is_live=feed_is_live,
# NOTE / TODO: eventualy we may support providing more then # NOTE / TODO: eventualy we may support providing more then
# one input here such that a datad daemon can multiplex # one input here such that a datad daemon can multiplex
# multiple live feeds from one task, instead of getting # multiple live feeds from one task, instead of getting
# a new request (and thus new task) for each subscription. # a new request (and thus new task) for each subscription.
symbols=[symstr], symbols=[symstr],
)
) )
)
# TODO: this is indexed by symbol for now since we've planned (for # TODO: this is indexed by symbol for now since we've planned (for
# some time) to expect backends to handle single # some time) to expect backends to handle single
@ -908,6 +910,7 @@ async def open_feed(
for fqme, flume_msg in flumes_msg_dict.items(): for fqme, flume_msg in flumes_msg_dict.items():
flume = Flume.from_msg(flume_msg) flume = Flume.from_msg(flume_msg)
# assert flume.mkt.fqme == fqme # assert flume.mkt.fqme == fqme
feed.flumes[fqme] = flume feed.flumes[fqme] = flume