.tsp._history: add gap detection in backfill loop

Add frame-gap detection when `frame_last_dt < end_dt_param` to
warn about potential venue closures or missing data during the
backfill loop in `start_backfill()`.

Deats,
- add `frame_last_dt < end_dt_param` check after frame recv
- log warnings with EST-converted timestamps for clarity
- add `await tractor.pause()` for REPL-investigation on gaps
- add TODO comment about venue closure hour checking
- capture `_until_was_none` walrus var for null-check clarity
- add `last_time` assertion for `time[-1] == next_end_dt`
- rename `_daterr` to `nodata` with `_nodata` capture

Also,
- import `pendulum.timezone` and create `est` tz instance
- change `get_logger()` import from `.data._util` to `.log`
- add parens around `(next_prepend_index - ln) < 0` check

(this commit msg was generated in some part by [`claude-code`][claude-code-gh])
[claude-code-gh]: https://github.com/anthropics/claude-code
cont_hist_fixes
Gud Boi 2026-02-05 17:48:52 -05:00
parent f502851999
commit ce1f038b53
1 changed files with 43 additions and 11 deletions

View File

@ -49,6 +49,7 @@ from pendulum import (
Duration, Duration,
duration as mk_duration, duration as mk_duration,
from_timestamp, from_timestamp,
timezone,
) )
import numpy as np import numpy as np
import polars as pl import polars as pl
@ -57,9 +58,7 @@ from piker.brokers import NoData
from piker.accounting import ( from piker.accounting import (
MktPair, MktPair,
) )
from piker.data._util import ( from piker.log import get_logger
log,
)
from ..data._sharedmem import ( from ..data._sharedmem import (
maybe_open_shm_array, maybe_open_shm_array,
ShmArray, ShmArray,
@ -97,6 +96,9 @@ if TYPE_CHECKING:
# from .feed import _FeedsBus # from .feed import _FeedsBus
log = get_logger()
# `ShmArray` buffer sizing configuration: # `ShmArray` buffer sizing configuration:
_mins_in_day = int(60 * 24) _mins_in_day = int(60 * 24)
# how much is probably dependent on lifestyle # how much is probably dependent on lifestyle
@ -401,7 +403,9 @@ async def start_backfill(
# based on the sample step size, maybe load a certain amount history # based on the sample step size, maybe load a certain amount history
update_start_on_prepend: bool = False update_start_on_prepend: bool = False
if backfill_until_dt is None: if (
_until_was_none := (backfill_until_dt is None)
):
# TODO: per-provider default history-durations? # TODO: per-provider default history-durations?
# -[ ] inside the `open_history_client()` config allow # -[ ] inside the `open_history_client()` config allow
@ -435,6 +439,8 @@ async def start_backfill(
last_start_dt: datetime = backfill_from_dt last_start_dt: datetime = backfill_from_dt
next_prepend_index: int = backfill_from_shm_index next_prepend_index: int = backfill_from_shm_index
est = timezone('EST')
while last_start_dt > backfill_until_dt: while last_start_dt > backfill_until_dt:
log.info( log.info(
f'Requesting {timeframe}s frame:\n' f'Requesting {timeframe}s frame:\n'
@ -448,9 +454,10 @@ async def start_backfill(
next_end_dt, next_end_dt,
) = await get_hist( ) = await get_hist(
timeframe, timeframe,
end_dt=last_start_dt, end_dt=(end_dt_param := last_start_dt),
) )
except NoData as _daterr: except NoData as nodata:
_nodata = nodata
orig_last_start_dt: datetime = last_start_dt orig_last_start_dt: datetime = last_start_dt
gap_report: str = ( gap_report: str = (
f'EMPTY FRAME for `end_dt: {last_start_dt}`?\n' f'EMPTY FRAME for `end_dt: {last_start_dt}`?\n'
@ -518,8 +525,32 @@ async def start_backfill(
== ==
next_start_dt.timestamp() next_start_dt.timestamp()
) )
assert (
(last_time := time[-1])
==
next_end_dt.timestamp()
)
assert time[-1] == next_end_dt.timestamp() frame_last_dt = from_timestamp(last_time)
if (
frame_last_dt.add(seconds=timeframe)
<
end_dt_param
):
est_frame_last_dt = est.convert(frame_last_dt)
est_end_dt_param = est.convert(end_dt_param)
log.warning(
f'Provider frame ending BEFORE requested end_dt={end_dt_param} ??\n'
f'frame_last_dt (EST): {est_frame_last_dt!r}\n'
f'end_dt_param (EST): {est_end_dt_param!r}\n'
f'\n'
f'Likely contains,\n'
f'- a venue closure.\n'
f'- (maybe?) missing data ?\n'
)
# ?TODO, check against venue closure hours
# if/when provided by backend?
await tractor.pause()
expected_dur: Interval = ( expected_dur: Interval = (
last_start_dt.subtract( last_start_dt.subtract(
@ -581,10 +612,11 @@ async def start_backfill(
'0 BARS TO PUSH after diff!?\n' '0 BARS TO PUSH after diff!?\n'
f'{next_start_dt} -> {last_start_dt}' f'{next_start_dt} -> {last_start_dt}'
) )
await tractor.pause()
# Check if we're about to exceed buffer capacity BEFORE # Check if we're about to exceed buffer capacity BEFORE
# attempting the push # attempting the push
if next_prepend_index - ln < 0: if (next_prepend_index - ln) < 0:
log.warning( log.warning(
f'Backfill would exceed buffer capacity!\n' f'Backfill would exceed buffer capacity!\n'
f'next_prepend_index: {next_prepend_index}\n' f'next_prepend_index: {next_prepend_index}\n'
@ -655,7 +687,7 @@ async def start_backfill(
}, },
}) })
# can't push the entire frame? so # XXX, can't push the entire frame? so
# push only the amount that can fit.. # push only the amount that can fit..
break break
@ -715,8 +747,8 @@ async def start_backfill(
) = dedupe(df) ) = dedupe(df)
if diff: if diff:
log.warning( log.warning(
f'Found {diff} duplicates in tsdb, ' f'Found {diff!r} duplicates in tsdb! '
f'overwriting with deduped data\n' f'=> Overwriting with `deduped` data !! <=\n'
) )
await storage.write_ohlcv( await storage.write_ohlcv(
col_sym_key, col_sym_key,