Add venue-closure gap-detection in `.ib.api.Client.bars()`

With all detection logic coming from our new `.ib.venues` helpers
allowing use to verify IB's OHLC bars frames don't contain unexpected
time-gaps.

`Client.bars()` new checking deats,
- add `is_venue_open()`, `has_weekend()`, `sesh_times()`, and
  `is_venue_closure()` checks when `last_dt < end_dt`
- always calc gap-period in local tz via `ContractDetails.timeZoneId`.
- log warnings on invalid non-closure gaps, debug on closures for now.
- change recursion case to just `log.error()` + `breakpoint()`; we might end
  up tossing it since i don't think i could ever get it to be reliable..
  * mask-out recursive `.bars()` call (likely unnecessary).
- flip `start_dt`/`end_dt` param defaults to `None` vs epoch `str`.
- update docstring to clarify no `start_dt` support by IB
- add mod level `_iso8601_epoch_in_est` const to keep track of orig
  param default value.
- add multiline style to return type-annot, type all `pendulum` objects.

Also,
- uppercase `Crypto.symbol` for PAXOS contracts in `.find_contracts()`,
  tho now we're getting a weird new API error i left in a todo-comment..

(this commit msg was generated in some part by [`claude-code`][claude-code-gh])
[claude-code-gh]: https://github.com/anthropics/claude-code
cont_hist_fixes
Gud Boi 2026-02-05 17:31:59 -05:00
parent f484726a8c
commit ba575d93ea
1 changed files with 142 additions and 39 deletions

View File

@ -50,10 +50,11 @@ import tractor
from tractor import to_asyncio
from tractor import trionics
from pendulum import (
from_timestamp,
DateTime,
Duration,
duration as mk_duration,
from_timestamp,
Interval,
)
from eventkit import Event
from ib_insync import (
@ -260,6 +261,16 @@ def remove_handler_on_err(
event.disconnect(handler)
# (originally?) i thot that,
# > "EST in ISO 8601 format is required.."
#
# XXX, but see `ib_async`'s impl,
# - `ib_async.ib.IB.reqHistoricalDataAsync()`
# - `ib_async.util.formatIBDatetime()`
# below is EPOCH.
_iso8601_epoch_in_est: str = "1970-01-01T00:00:00.000000-05:00"
class Client:
'''
IB wrapped for our broker backend API.
@ -333,9 +344,11 @@ class Client:
self,
fqme: str,
# EST in ISO 8601 format is required... below is EPOCH
start_dt: datetime|str = "1970-01-01T00:00:00.000000-05:00",
end_dt: datetime|str = "",
# EST in ISO 8601 format is required..
# XXX, see `ib_async.ib.IB.reqHistoricalDataAsync()`
# below is EPOCH.
start_dt: datetime|None = None, # _iso8601_epoch_in_est,
end_dt: datetime|None = None,
# ohlc sample period in seconds
sample_period_s: int = 1,
@ -346,9 +359,17 @@ class Client:
**kwargs,
) -> tuple[BarDataList, np.ndarray, Duration]:
) -> tuple[
BarDataList,
np.ndarray,
Duration,
]:
'''
Retreive OHLCV bars for a fqme over a range to the present.
Retreive the `fqme`'s OHLCV-bars for the time-range "until `end_dt`".
Notes:
- IB's api doesn't support a `start_dt` (which is why default
is null) so we only use it for bar-frame duration checking.
'''
# See API docs here:
@ -363,13 +384,19 @@ class Client:
dt_duration: Duration = (
duration
or default_dt_duration
or
default_dt_duration
)
# TODO: maybe remove all this?
global _enters
if not end_dt:
end_dt = ''
if end_dt is None:
end_dt: str = ''
else:
est_end_dt = end_dt.in_tz('EST')
if est_end_dt != end_dt:
breakpoint()
_enters += 1
@ -438,58 +465,127 @@ class Client:
+ query_info
)
# TODO: we could maybe raise ``NoData`` instead if we
# TODO: we could maybe raise `NoData` instead if we
# rewrite the method in the first case?
# right now there's no way to detect a timeout..
return [], np.empty(0), dt_duration
log.info(query_info)
# ------ GAP-DETECTION ------
# NOTE XXX: ensure minimum duration in bars?
# => recursively call this method until we get at least as
# many bars such that they sum in aggregate to the the
# desired total time (duration) at most.
# - if you query over a gap and get no data
# that may short circuit the history
if (
# XXX XXX XXX
# => WHY DID WE EVEN NEED THIS ORIGINALLY!? <=
# XXX XXX XXX
False
and end_dt
):
if end_dt:
nparr: np.ndarray = bars_to_np(bars)
times: np.ndarray = nparr['time']
first: float = times[0]
tdiff: float = times[-1] - first
last: float = times[-1]
# frame_dur: float = times[-1] - first
first_dt: DateTime = from_timestamp(first)
last_dt: DateTime = from_timestamp(last)
tdiff: int = (
last_dt
-
first_dt
).in_seconds() + sample_period_s
# XXX, do gap detections.
if (
last_dt.add(seconds=sample_period_s)
<
end_dt
):
details: ContractDetails = (
await self.ib.reqContractDetailsAsync(contract)
)[0]
from .venues import (
is_venue_open,
has_weekend,
sesh_times,
is_venue_closure,
)
_open_now: bool = is_venue_open(
con_deats=details,
)
open_time, close_time = sesh_times(details)
# XXX, always calc gap in mkt-venue-local timezone
tz: str = details.timeZoneId
gap: Interval = (
end_dt.in_tz(tz)
-
last_dt.in_tz(tz)
)
if (
not has_weekend(gap)
and
# XXX NOT outside venue closures.
# !TODO, replace with,
# `not is_venue_closure()`
# per below assert on inverse case!
gap.end.time() != open_time
and
gap.start.time() != close_time
):
breakpoint()
log.warning(
f'Invalid non-closure gap for {fqme!r} ?!?\n'
f'is-open-now: {_open_now}\n'
f'\n'
f'{gap}\n'
)
else:
assert is_venue_closure(
gap=gap,
con_deats=details,
)
log.debug(
f'Detected venue closure gap (weekend),\n'
f'{gap}\n'
)
if (
start_dt is None
and
tdiff
<
dt_duration.in_seconds()
# and
# len(bars) * sample_period_s) < dt_duration.in_seconds()
tdiff < dt_duration.in_seconds()
# and False
):
end_dt: DateTime = from_timestamp(first)
log.warning(
log.error(
f'Frame result was shorter then {dt_duration}!?\n'
'Recursing for more bars:\n'
f'end_dt: {end_dt}\n'
f'dt_duration: {dt_duration}\n'
# f'\n'
# f'Recursing for more bars:\n'
)
(
r_bars,
r_arr,
r_duration,
) = await self.bars(
fqme,
start_dt=start_dt,
end_dt=end_dt,
sample_period_s=sample_period_s,
breakpoint()
# XXX ? TODO? recursively try to re-request?
# => i think *NO* right?
#
# (
# r_bars,
# r_arr,
# r_duration,
# ) = await self.bars(
# fqme,
# start_dt=start_dt,
# end_dt=end_dt,
# sample_period_s=sample_period_s,
# TODO: make a table for Duration to
# the ib str values in order to use this?
# duration=duration,
)
r_bars.extend(bars)
bars = r_bars
# # TODO: make a table for Duration to
# # the ib str values in order to use this?
# # duration=duration,
# )
# r_bars.extend(bars)
# bars = r_bars
nparr: np.ndarray = bars_to_np(bars)
@ -784,9 +880,16 @@ class Client:
# crypto$
elif exch == 'PAXOS': # btc.paxos
con = Crypto(
symbol=symbol,
currency=currency,
symbol=symbol.upper(),
currency='USD',
exchange='PAXOS',
)
# XXX, on `ib_insync` when first tried this,
# > Error 10299, reqId 141: Expected what to show is
# > AGGTRADES, please use that instead of TRADES.,
# > contract: Crypto(conId=479624278, symbol='BTC',
# > exchange='PAXOS', currency='USD',
# > localSymbol='BTC.USD', tradingClass='BTC')
# stonks
else: