From ba575d93ea39a47c398715ad2aee2111b7c52cbe Mon Sep 17 00:00:00 2001 From: goodboy Date: Thu, 5 Feb 2026 17:31:59 -0500 Subject: [PATCH] Add venue-closure gap-detection in `.ib.api.Client.bars()` With all detection logic coming from our new `.ib.venues` helpers allowing use to verify IB's OHLC bars frames don't contain unexpected time-gaps. `Client.bars()` new checking deats, - add `is_venue_open()`, `has_weekend()`, `sesh_times()`, and `is_venue_closure()` checks when `last_dt < end_dt` - always calc gap-period in local tz via `ContractDetails.timeZoneId`. - log warnings on invalid non-closure gaps, debug on closures for now. - change recursion case to just `log.error()` + `breakpoint()`; we might end up tossing it since i don't think i could ever get it to be reliable.. * mask-out recursive `.bars()` call (likely unnecessary). - flip `start_dt`/`end_dt` param defaults to `None` vs epoch `str`. - update docstring to clarify no `start_dt` support by IB - add mod level `_iso8601_epoch_in_est` const to keep track of orig param default value. - add multiline style to return type-annot, type all `pendulum` objects. Also, - uppercase `Crypto.symbol` for PAXOS contracts in `.find_contracts()`, tho now we're getting a weird new API error i left in a todo-comment.. (this commit msg was generated in some part by [`claude-code`][claude-code-gh]) [claude-code-gh]: https://github.com/anthropics/claude-code --- piker/brokers/ib/api.py | 181 +++++++++++++++++++++++++++++++--------- 1 file changed, 142 insertions(+), 39 deletions(-) diff --git a/piker/brokers/ib/api.py b/piker/brokers/ib/api.py index 4a63a0f1..02e26a34 100644 --- a/piker/brokers/ib/api.py +++ b/piker/brokers/ib/api.py @@ -50,10 +50,11 @@ import tractor from tractor import to_asyncio from tractor import trionics from pendulum import ( - from_timestamp, DateTime, Duration, duration as mk_duration, + from_timestamp, + Interval, ) from eventkit import Event from ib_insync import ( @@ -260,6 +261,16 @@ def remove_handler_on_err( event.disconnect(handler) +# (originally?) i thot that, +# > "EST in ISO 8601 format is required.." +# +# XXX, but see `ib_async`'s impl, +# - `ib_async.ib.IB.reqHistoricalDataAsync()` +# - `ib_async.util.formatIBDatetime()` +# below is EPOCH. +_iso8601_epoch_in_est: str = "1970-01-01T00:00:00.000000-05:00" + + class Client: ''' IB wrapped for our broker backend API. @@ -333,9 +344,11 @@ class Client: self, fqme: str, - # EST in ISO 8601 format is required... below is EPOCH - start_dt: datetime|str = "1970-01-01T00:00:00.000000-05:00", - end_dt: datetime|str = "", + # EST in ISO 8601 format is required.. + # XXX, see `ib_async.ib.IB.reqHistoricalDataAsync()` + # below is EPOCH. + start_dt: datetime|None = None, # _iso8601_epoch_in_est, + end_dt: datetime|None = None, # ohlc sample period in seconds sample_period_s: int = 1, @@ -346,9 +359,17 @@ class Client: **kwargs, - ) -> tuple[BarDataList, np.ndarray, Duration]: + ) -> tuple[ + BarDataList, + np.ndarray, + Duration, + ]: ''' - Retreive OHLCV bars for a fqme over a range to the present. + Retreive the `fqme`'s OHLCV-bars for the time-range "until `end_dt`". + + Notes: + - IB's api doesn't support a `start_dt` (which is why default + is null) so we only use it for bar-frame duration checking. ''' # See API docs here: @@ -363,13 +384,19 @@ class Client: dt_duration: Duration = ( duration - or default_dt_duration + or + default_dt_duration ) # TODO: maybe remove all this? global _enters - if not end_dt: - end_dt = '' + if end_dt is None: + end_dt: str = '' + + else: + est_end_dt = end_dt.in_tz('EST') + if est_end_dt != end_dt: + breakpoint() _enters += 1 @@ -438,58 +465,127 @@ class Client: + query_info ) - # TODO: we could maybe raise ``NoData`` instead if we + # TODO: we could maybe raise `NoData` instead if we # rewrite the method in the first case? # right now there's no way to detect a timeout.. return [], np.empty(0), dt_duration log.info(query_info) + + # ------ GAP-DETECTION ------ # NOTE XXX: ensure minimum duration in bars? # => recursively call this method until we get at least as # many bars such that they sum in aggregate to the the # desired total time (duration) at most. # - if you query over a gap and get no data # that may short circuit the history - if ( - # XXX XXX XXX - # => WHY DID WE EVEN NEED THIS ORIGINALLY!? <= - # XXX XXX XXX - False - and end_dt - ): + if end_dt: nparr: np.ndarray = bars_to_np(bars) times: np.ndarray = nparr['time'] first: float = times[0] - tdiff: float = times[-1] - first + last: float = times[-1] + # frame_dur: float = times[-1] - first + + first_dt: DateTime = from_timestamp(first) + last_dt: DateTime = from_timestamp(last) + tdiff: int = ( + last_dt + - + first_dt + ).in_seconds() + sample_period_s + + # XXX, do gap detections. + if ( + last_dt.add(seconds=sample_period_s) + < + end_dt + ): + details: ContractDetails = ( + await self.ib.reqContractDetailsAsync(contract) + )[0] + from .venues import ( + is_venue_open, + has_weekend, + sesh_times, + is_venue_closure, + ) + _open_now: bool = is_venue_open( + con_deats=details, + ) + open_time, close_time = sesh_times(details) + # XXX, always calc gap in mkt-venue-local timezone + tz: str = details.timeZoneId + gap: Interval = ( + end_dt.in_tz(tz) + - + last_dt.in_tz(tz) + ) + + if ( + not has_weekend(gap) + and + # XXX NOT outside venue closures. + # !TODO, replace with, + # `not is_venue_closure()` + # per below assert on inverse case! + gap.end.time() != open_time + and + gap.start.time() != close_time + ): + breakpoint() + log.warning( + f'Invalid non-closure gap for {fqme!r} ?!?\n' + f'is-open-now: {_open_now}\n' + f'\n' + f'{gap}\n' + ) + else: + assert is_venue_closure( + gap=gap, + con_deats=details, + ) + log.debug( + f'Detected venue closure gap (weekend),\n' + f'{gap}\n' + ) if ( + start_dt is None + and + tdiff + < + dt_duration.in_seconds() + # and # len(bars) * sample_period_s) < dt_duration.in_seconds() - tdiff < dt_duration.in_seconds() - # and False ): end_dt: DateTime = from_timestamp(first) - log.warning( + log.error( f'Frame result was shorter then {dt_duration}!?\n' - 'Recursing for more bars:\n' f'end_dt: {end_dt}\n' f'dt_duration: {dt_duration}\n' + # f'\n' + # f'Recursing for more bars:\n' ) - ( - r_bars, - r_arr, - r_duration, - ) = await self.bars( - fqme, - start_dt=start_dt, - end_dt=end_dt, - sample_period_s=sample_period_s, + breakpoint() + # XXX ? TODO? recursively try to re-request? + # => i think *NO* right? + # + # ( + # r_bars, + # r_arr, + # r_duration, + # ) = await self.bars( + # fqme, + # start_dt=start_dt, + # end_dt=end_dt, + # sample_period_s=sample_period_s, - # TODO: make a table for Duration to - # the ib str values in order to use this? - # duration=duration, - ) - r_bars.extend(bars) - bars = r_bars + # # TODO: make a table for Duration to + # # the ib str values in order to use this? + # # duration=duration, + # ) + # r_bars.extend(bars) + # bars = r_bars nparr: np.ndarray = bars_to_np(bars) @@ -784,9 +880,16 @@ class Client: # crypto$ elif exch == 'PAXOS': # btc.paxos con = Crypto( - symbol=symbol, - currency=currency, + symbol=symbol.upper(), + currency='USD', + exchange='PAXOS', ) + # XXX, on `ib_insync` when first tried this, + # > Error 10299, reqId 141: Expected what to show is + # > AGGTRADES, please use that instead of TRADES., + # > contract: Crypto(conId=479624278, symbol='BTC', + # > exchange='PAXOS', currency='USD', + # > localSymbol='BTC.USD', tradingClass='BTC') # stonks else: