diff --git a/piker/brokers/ib/feed.py b/piker/brokers/ib/feed.py index 6ef6e915..734ecc39 100644 --- a/piker/brokers/ib/feed.py +++ b/piker/brokers/ib/feed.py @@ -816,7 +816,10 @@ async def stream_quotes( proxy: MethodProxy mkt: MktPair details: ibis.ContractDetails - async with open_data_client() as proxy: + async with ( + open_data_client() as proxy, + trio.open_nursery() as tn, + ): mkt, details = await get_mkt_info( sym, proxy=proxy, # passed to avoid implicit client load @@ -844,18 +847,32 @@ async def stream_quotes( f'{pformat(first_quote)}' ) - # TODO: we should instead spawn a task that waits on a feed to start - # and let it wait indefinitely..instead of this hard coded stuff. + # TODO: we should instead spawn a task that waits on a feed + # to start and let it wait indefinitely..instead of this + # hard coded stuff. + # async def wait_for_first_quote(): + # with trio.CancelScope() as cs: + with trio.move_on_after(1): first_ticker = await proxy.get_quote( contract=con, raise_on_timeout=True, ) - # it might be outside regular trading hours so see if we can at - # least grab history. + # NOTE: it might be outside regular trading hours for + # assets with "standard venue operating hours" so we + # only "pretend the feed is live" when the dst asset + # type is NOT within the NON-NORMAL-venue set: aka not + # commodities, forex or crypto currencies which CAN + # always return a NaN on a snap quote request during + # normal venue hours. In the case of a closed venue + # (equitiies, futes, bonds etc.) we at least try to + # grab the OHLC history. if ( - isnan(first_ticker.last) # last quote price value is nan + isnan(first_ticker.last) + # SO, if the last quote price value is NaN we ONLY + # "pretend to do" `feed_is_live.set()` if it's a known + # dst asset venue with a lot of closed operating hours. and mkt.dst.atype not in { 'commodity', 'fiat', @@ -895,7 +912,7 @@ async def stream_quotes( # only on first entry at feed boot up if startup: - startup = False + startup: bool = False task_status.started(( init_msgs, first_quote, @@ -923,7 +940,7 @@ async def stream_quotes( # generally speaking these feeds don't # include vlm data. - atype = mkt.dst.atype + atype: str = mkt.dst.atype log.info( f'No-vlm {mkt.fqme}@{atype}, skipping quote poll' ) @@ -959,7 +976,8 @@ async def stream_quotes( quote = normalize(ticker) log.debug(f"First ticker received {quote}") - # tell caller quotes are now coming in live + # tell data-layer spawner-caller that live + # quotes are now streaming. feed_is_live.set() # last = time.time()