diff --git a/piker/data/feed.py b/piker/data/feed.py index 0cfdb848..775e8fc6 100644 --- a/piker/data/feed.py +++ b/piker/data/feed.py @@ -226,6 +226,7 @@ async def allocate_persistent_feed( loglevel: str, start_stream: bool = True, + init_timeout: float = 616, task_status: TaskStatus[FeedInit] = trio.TASK_STATUS_IGNORED, @@ -267,22 +268,23 @@ async def allocate_persistent_feed( # TODO: probably make a struct msg type for this as well # since eventually we do want to have more efficient IPC.. first_quote: dict[str, Any] - ( - init_msgs, - first_quote, - ) = await bus.nursery.start( - partial( - mod.stream_quotes, - send_chan=send, - feed_is_live=feed_is_live, + with trio.fail_after(init_timeout): + ( + init_msgs, + first_quote, + ) = await bus.nursery.start( + partial( + mod.stream_quotes, + send_chan=send, + feed_is_live=feed_is_live, - # NOTE / TODO: eventualy we may support providing more then - # one input here such that a datad daemon can multiplex - # multiple live feeds from one task, instead of getting - # a new request (and thus new task) for each subscription. - symbols=[symstr], + # NOTE / TODO: eventualy we may support providing more then + # one input here such that a datad daemon can multiplex + # multiple live feeds from one task, instead of getting + # a new request (and thus new task) for each subscription. + symbols=[symstr], + ) ) - ) # TODO: this is indexed by symbol for now since we've planned (for # some time) to expect backends to handle single @@ -908,6 +910,7 @@ async def open_feed( for fqme, flume_msg in flumes_msg_dict.items(): flume = Flume.from_msg(flume_msg) + # assert flume.mkt.fqme == fqme feed.flumes[fqme] = flume