diff --git a/piker/brokers/ib/api.py b/piker/brokers/ib/api.py index cb934187..67e63f70 100644 --- a/piker/brokers/ib/api.py +++ b/piker/brokers/ib/api.py @@ -401,7 +401,15 @@ class Client: # => we recursively call this method until we get at least # as many bars such that they sum in aggregate to the the # desired total time (duration) at most. - if end_dt: + # XXX XXX XXX + # WHY DID WE EVEN NEED THIS ORIGINALLY!? + # XXX XXX XXX + # - if you query over a gap and get no data + # that may short circuit the history + if ( + end_dt + and False + ): nparr: np.ndarray = bars_to_np(bars) times: np.ndarray = nparr['time'] first: float = times[0] @@ -410,6 +418,7 @@ class Client: if ( # len(bars) * sample_period_s) < dt_duration.in_seconds() tdiff < dt_duration.in_seconds() + # and False ): end_dt: DateTime = from_timestamp(first) log.warning( @@ -859,6 +868,9 @@ class Client: timeout=timeout, ) except TimeoutError: + import pdbp + pdbp.set_trace() + if raise_on_timeout: raise return None diff --git a/piker/brokers/ib/feed.py b/piker/brokers/ib/feed.py index 99b8aef9..a175209d 100644 --- a/piker/brokers/ib/feed.py +++ b/piker/brokers/ib/feed.py @@ -174,8 +174,15 @@ async def open_history_client( start_dt: datetime | None = None, ) -> tuple[np.ndarray, str]: + nonlocal max_timeout, mean, count + if ( + start_dt + and start_dt.timestamp() == 0 + ): + await tractor.pause() + query_start = time.time() out, timedout = await get_bars( proxy, @@ -403,35 +410,55 @@ async def get_bars( bars, bars_array, dt_duration = out - # not enough bars signal, likely due to venue - # operational gaps. - too_little: bool = False - if ( - end_dt - and ( - not bars - or (too_little := - start_dt - and (len(bars) * timeframe) - < dt_duration.in_seconds() - ) - ) - ): - if ( - end_dt - or too_little - ): - log.warning( - f'History is blank for {dt_duration} from {end_dt}' - ) - end_dt -= dt_duration - continue - - raise NoData(f'{end_dt}') - if bars_array is None: raise SymbolNotFound(fqme) + # not enough bars signal, likely due to venue + # operational gaps. + # too_little: bool = False + if end_dt: + if not bars: + # no data returned? + log.warning( + 'History frame is blank?\n' + f'start_dt: {start_dt}\n' + f'end_dt: {end_dt}\n' + f'duration: {dt_duration}\n' + ) + raise NoData(f'{end_dt}') + + else: + dur_s: float = len(bars) * timeframe + bars_dur = pendulum.Duration(seconds=dur_s) + dt_dur_s: float = dt_duration.in_seconds() + if dur_s < dt_dur_s: + log.warning( + 'History frame is shorter then expected?\n' + f'start_dt: {start_dt}\n' + f'end_dt: {end_dt}\n' + f'duration: {dt_dur_s}\n' + f'frame duration seconds: {dur_s}\n' + f'dur diff: {dt_duration - bars_dur}\n' + ) + # NOTE: we used to try to get a minimal + # set of bars by recursing but this ran + # into possible infinite query loops + # when logic in the `Client.bars()` dt + # diffing went bad. So instead for now + # we just return the + # shorter-then-expected history with + # a warning. + # TODO: in the future it prolly makes + # the most send to do venue operating + # hours lookup and + # timestamp-in-operating-range set + # checking to know for sure if we can + # safely and quickly ignore non-uniform history + # frame timestamp gaps.. + # end_dt -= dt_duration + # continue + # await tractor.pause() + first_dt = pendulum.from_timestamp( bars[0].date.timestamp()) @@ -854,7 +881,13 @@ async def stream_quotes( init_msgs.append(init_msg) con: Contract = details.contract - first_ticker: Ticker = await proxy.get_quote(contract=con) + first_ticker: Ticker | None = None + with trio.move_on_after(1): + first_ticker: Ticker = await proxy.get_quote( + contract=con, + raise_on_timeout=False, + ) + if first_ticker: first_quote: dict = normalize(first_ticker) log.info( @@ -862,18 +895,6 @@ async def stream_quotes( f'{pformat(first_quote)}' ) - # TODO: we should instead spawn a task that waits on a feed - # to start and let it wait indefinitely..instead of this - # hard coded stuff. - # async def wait_for_first_quote(): - # with trio.CancelScope() as cs: - - with trio.move_on_after(1): - first_ticker = await proxy.get_quote( - contract=con, - raise_on_timeout=True, - ) - # NOTE: it might be outside regular trading hours for # assets with "standard venue operating hours" so we # only "pretend the feed is live" when the dst asset @@ -884,6 +905,8 @@ async def stream_quotes( # (equitiies, futes, bonds etc.) we at least try to # grab the OHLC history. if ( + first_ticker + and isnan(first_ticker.last) # SO, if the last quote price value is NaN we ONLY # "pretend to do" `feed_is_live.set()` if it's a known @@ -907,6 +930,19 @@ async def stream_quotes( await trio.sleep_forever() return # we never expect feed to come up? + # TODO: we should instead spawn a task that waits on a feed + # to start and let it wait indefinitely..instead of this + # hard coded stuff. + # async def wait_for_first_quote(): + # with trio.CancelScope() as cs: + + # XXX: MUST acquire a ticker + first quote before starting + # the live quotes loop! + # with trio.move_on_after(1): + first_ticker = await proxy.get_quote( + contract=con, + raise_on_timeout=True, + ) cs: trio.CancelScope | None = None startup: bool = True while (