diff --git a/piker/tsp/_history.py b/piker/tsp/_history.py index 3384e516..0db071fe 100644 --- a/piker/tsp/_history.py +++ b/piker/tsp/_history.py @@ -49,6 +49,7 @@ from pendulum import ( Duration, duration as mk_duration, from_timestamp, + timezone, ) import numpy as np import polars as pl @@ -57,9 +58,7 @@ from piker.brokers import NoData from piker.accounting import ( MktPair, ) -from piker.data._util import ( - log, -) +from piker.log import get_logger from ..data._sharedmem import ( maybe_open_shm_array, ShmArray, @@ -97,6 +96,9 @@ if TYPE_CHECKING: # from .feed import _FeedsBus +log = get_logger() + + # `ShmArray` buffer sizing configuration: _mins_in_day = int(60 * 24) # how much is probably dependent on lifestyle @@ -401,7 +403,9 @@ async def start_backfill( # based on the sample step size, maybe load a certain amount history update_start_on_prepend: bool = False - if backfill_until_dt is None: + if ( + _until_was_none := (backfill_until_dt is None) + ): # TODO: per-provider default history-durations? # -[ ] inside the `open_history_client()` config allow @@ -435,6 +439,8 @@ async def start_backfill( last_start_dt: datetime = backfill_from_dt next_prepend_index: int = backfill_from_shm_index + est = timezone('EST') + while last_start_dt > backfill_until_dt: log.info( f'Requesting {timeframe}s frame:\n' @@ -448,9 +454,10 @@ async def start_backfill( next_end_dt, ) = await get_hist( timeframe, - end_dt=last_start_dt, + end_dt=(end_dt_param := last_start_dt), ) - except NoData as _daterr: + except NoData as nodata: + _nodata = nodata orig_last_start_dt: datetime = last_start_dt gap_report: str = ( f'EMPTY FRAME for `end_dt: {last_start_dt}`?\n' @@ -518,8 +525,32 @@ async def start_backfill( == next_start_dt.timestamp() ) + assert ( + (last_time := time[-1]) + == + next_end_dt.timestamp() + ) - assert time[-1] == next_end_dt.timestamp() + frame_last_dt = from_timestamp(last_time) + if ( + frame_last_dt.add(seconds=timeframe) + < + end_dt_param + ): + est_frame_last_dt = est.convert(frame_last_dt) + est_end_dt_param = est.convert(end_dt_param) + log.warning( + f'Provider frame ending BEFORE requested end_dt={end_dt_param} ??\n' + f'frame_last_dt (EST): {est_frame_last_dt!r}\n' + f'end_dt_param (EST): {est_end_dt_param!r}\n' + f'\n' + f'Likely contains,\n' + f'- a venue closure.\n' + f'- (maybe?) missing data ?\n' + ) + # ?TODO, check against venue closure hours + # if/when provided by backend? + await tractor.pause() expected_dur: Interval = ( last_start_dt.subtract( @@ -581,10 +612,11 @@ async def start_backfill( '0 BARS TO PUSH after diff!?\n' f'{next_start_dt} -> {last_start_dt}' ) + await tractor.pause() # Check if we're about to exceed buffer capacity BEFORE # attempting the push - if next_prepend_index - ln < 0: + if (next_prepend_index - ln) < 0: log.warning( f'Backfill would exceed buffer capacity!\n' f'next_prepend_index: {next_prepend_index}\n' @@ -655,7 +687,7 @@ async def start_backfill( }, }) - # can't push the entire frame? so + # XXX, can't push the entire frame? so # push only the amount that can fit.. break @@ -715,8 +747,8 @@ async def start_backfill( ) = dedupe(df) if diff: log.warning( - f'Found {diff} duplicates in tsdb, ' - f'overwriting with deduped data\n' + f'Found {diff!r} duplicates in tsdb! ' + f'=> Overwriting with `deduped` data !! <=\n' ) await storage.write_ohlcv( col_sym_key,