diff --git a/piker/tsp/__init__.py b/piker/tsp/__init__.py index 9c80bfae..554f0199 100644 --- a/piker/tsp/__init__.py +++ b/piker/tsp/__init__.py @@ -44,8 +44,10 @@ import trio from trio_typing import TaskStatus import tractor from pendulum import ( + Interval, DateTime, Duration, + duration as mk_duration, from_timestamp, ) import numpy as np @@ -214,7 +216,8 @@ async def maybe_fill_null_segments( # pair, immediately stop backfilling? if ( start_dt - and end_dt < start_dt + and + end_dt < start_dt ): await tractor.pause() break @@ -262,6 +265,7 @@ async def maybe_fill_null_segments( except tractor.ContextCancelled: # log.exception await tractor.pause() + raise null_segs_detected.set() # RECHECK for more null-gaps @@ -349,7 +353,7 @@ async def maybe_fill_null_segments( async def start_backfill( get_hist, - frame_types: dict[str, Duration] | None, + def_frame_duration: Duration, mod: ModuleType, mkt: MktPair, shm: ShmArray, @@ -379,22 +383,23 @@ async def start_backfill( update_start_on_prepend: bool = False if backfill_until_dt is None: - # TODO: drop this right and just expose the backfill - # limits inside a [storage] section in conf.toml? - # when no tsdb "last datum" is provided, we just load - # some near-term history. - # periods = { - # 1: {'days': 1}, - # 60: {'days': 14}, - # } - - # do a decently sized backfill and load it into storage. + # TODO: per-provider default history-durations? + # -[ ] inside the `open_history_client()` config allow + # declaring the history duration limits instead of + # guessing and/or applying the same limits to all? + # + # -[ ] allow declaring (default) per-provider backfill + # limits inside a [storage] sub-section in conf.toml? + # + # NOTE, when no tsdb "last datum" is provided, we just + # load some near-term history by presuming a "decently + # large" 60s duration limit and a much shorter 1s range. periods = { 1: {'days': 2}, 60: {'years': 6}, } period_duration: int = periods[timeframe] - update_start_on_prepend = True + update_start_on_prepend: bool = True # NOTE: manually set the "latest" datetime which we intend to # backfill history "until" so as to adhere to the history @@ -416,7 +421,6 @@ async def start_backfill( f'backfill_until_dt: {backfill_until_dt}\n' f'last_start_dt: {last_start_dt}\n' ) - try: ( array, @@ -426,48 +430,58 @@ async def start_backfill( timeframe, end_dt=last_start_dt, ) - except NoData as _daterr: - # 3 cases: - # - frame in the middle of a legit venue gap - # - history actually began at the `last_start_dt` - # - some other unknown error (ib blocking the - # history bc they don't want you seeing how they - # cucked all the tinas..) - if ( - frame_types - and - (dur := frame_types.get(timeframe)) - ): + orig_last_start_dt: datetime = last_start_dt + gap_report: str = ( + f'EMPTY FRAME for `end_dt: {last_start_dt}`?\n' + f'{mod.name} -> tf@fqme: {timeframe}@{mkt.fqme}\n' + f'last_start_dt: {orig_last_start_dt}\n\n' + f'bf_until: {backfill_until_dt}\n' + ) + # EMPTY FRAME signal with 3 (likely) causes: + # + # 1. range contains legit gap in venue history + # 2. history actually (edge case) **began** at the + # value `last_start_dt` + # 3. some other unknown error (ib blocking the + # history-query bc they don't want you seeing how + # they cucked all the tinas.. like with options + # hist) + # + if def_frame_duration: # decrement by a duration's (frame) worth of time # as maybe indicated by the backend to see if we # can get older data before this possible # "history gap". - orig_last_start_dt = last_start_dt - last_start_dt = last_start_dt.subtract( - seconds=dur.total_seconds() + last_start_dt: datetime = last_start_dt.subtract( + seconds=def_frame_duration.total_seconds() ) - log.warning( - f'{mod.name} -> EMPTY FRAME for end_dt?\n' - f'tf@fqme: {timeframe}@{mkt.fqme}\n' - f'Decrementing `end_dt` by {dur} and retry..\n\n' - - f'orig_last_start_dt: {orig_last_start_dt}\n' - f'dur subtracted last_start_dt: {last_start_dt}\n' - f'bf_until: {backfill_until_dt}\n' + gap_report += ( + f'Decrementing `end_dt` and retrying with,\n' + f'def_frame_duration: {def_frame_duration}\n' + f'(new) last_start_dt: {last_start_dt}\n' ) + log.warning(gap_report) + # skip writing to shm/tsdb and try the next + # duration's worth of prior history. continue - raise + else: + # await tractor.pause() + raise DataUnavailable(gap_report) # broker says there never was or is no more history to pull - except DataUnavailable: + except DataUnavailable as due: + message: str = due.args[0] log.warning( - f'NO-MORE-DATA in range?\n' - f'`{mod.name}` halted history:\n' - f'tf@fqme: {timeframe}@{mkt.fqme}\n' - 'bf_until <- last_start_dt:\n' - f'{backfill_until_dt} <- {last_start_dt}\n' + f'Provider {mod.name!r} halted backfill due to,\n\n' + + f'{message}\n' + + f'fqme: {mkt.fqme}\n' + f'timeframe: {timeframe}\n' + f'last_start_dt: {last_start_dt}\n' + f'bf_until: {backfill_until_dt}\n' ) # UGH: what's a better way? # TODO: backends are responsible for being correct on @@ -476,34 +490,54 @@ async def start_backfill( # to halt the request loop until the condition is # resolved or should the backend be entirely in # charge of solving such faults? yes, right? - # if timeframe > 1: - # await tractor.pause() return + time: np.ndarray = array['time'] assert ( - array['time'][0] + time[0] == next_start_dt.timestamp() ) - diff = last_start_dt - next_start_dt - frame_time_diff_s = diff.seconds + assert time[-1] == next_end_dt.timestamp() + + expected_dur: Interval = last_start_dt - next_start_dt # frame's worth of sample-period-steps, in seconds frame_size_s: float = len(array) * timeframe - expected_frame_size_s: float = frame_size_s + timeframe - if frame_time_diff_s > expected_frame_size_s: - + recv_frame_dur: Duration = ( + from_timestamp(array[-1]['time']) + - + from_timestamp(array[0]['time']) + ) + if ( + (lt_frame := (recv_frame_dur < expected_dur)) + or + (null_frame := (frame_size_s == 0)) + # ^XXX, should NEVER hit now! + ): # XXX: query result includes a start point prior to our # expected "frame size" and thus is likely some kind of # history gap (eg. market closed period, outage, etc.) # so just report it to console for now. + if lt_frame: + reason = 'Possible GAP (or first-datum)' + else: + assert null_frame + reason = 'NULL-FRAME' + + missing_dur: Interval = expected_dur.end - recv_frame_dur.end log.warning( - 'GAP DETECTED:\n' - f'last_start_dt: {last_start_dt}\n' - f'diff: {diff}\n' - f'frame_time_diff_s: {frame_time_diff_s}\n' + f'{timeframe}s-series {reason} detected!\n' + f'fqme: {mkt.fqme}\n' + f'last_start_dt: {last_start_dt}\n\n' + f'recv interval: {recv_frame_dur}\n' + f'expected interval: {expected_dur}\n\n' + + f'Missing duration of history of {missing_dur.in_words()!r}\n' + f'{missing_dur}\n' ) + # await tractor.pause() to_push = diff_history( array, @@ -578,7 +612,8 @@ async def start_backfill( # long-term storage. if ( storage is not None - and write_tsdb + and + write_tsdb ): log.info( f'Writing {ln} frame to storage:\n' @@ -699,7 +734,7 @@ async def back_load_from_tsdb( last_tsdb_dt and latest_start_dt ): - backfilled_size_s = ( + backfilled_size_s: Duration = ( latest_start_dt - last_tsdb_dt ).seconds # if the shm buffer len is not large enough to contain @@ -922,6 +957,8 @@ async def tsdb_backfill( f'{pformat(config)}\n' ) + # concurrently load the provider's most-recent-frame AND any + # pre-existing tsdb history already saved in `piker` storage. dt_eps: list[DateTime, DateTime] = [] async with trio.open_nursery() as tn: tn.start_soon( @@ -932,7 +969,6 @@ async def tsdb_backfill( timeframe, config, ) - tsdb_entry: tuple = await load_tsdb_hist( storage, mkt, @@ -961,6 +997,25 @@ async def tsdb_backfill( mr_end_dt, ) = dt_eps + first_frame_dur_s: Duration = (mr_end_dt - mr_start_dt).seconds + calced_frame_size: Duration = mk_duration( + seconds=first_frame_dur_s, + ) + # NOTE, attempt to use the backend declared default frame + # sizing (as allowed by their time-series query APIs) and + # if not provided try to construct a default from the + # first frame received above. + def_frame_durs: dict[ + int, + Duration, + ]|None = config.get('frame_types', None) + if def_frame_durs: + def_frame_size: Duration = def_frame_durs[timeframe] + assert def_frame_size == calced_frame_size + else: + # use what we calced from first frame above. + def_frame_size = calced_frame_size + # NOTE: when there's no offline data, there's 2 cases: # - data backend doesn't support timeframe/sample # period (in which case `dt_eps` should be `None` and @@ -991,7 +1046,7 @@ async def tsdb_backfill( partial( start_backfill, get_hist=get_hist, - frame_types=config.get('frame_types', None), + def_frame_duration=def_frame_size, mod=mod, mkt=mkt, shm=shm,