diff --git a/piker/storage/cli.py b/piker/storage/cli.py index 6890192d..1c8ff11b 100644 --- a/piker/storage/cli.py +++ b/piker/storage/cli.py @@ -386,6 +386,8 @@ def ldshm( open_annot_ctl() as actl, ): shm_df: pl.DataFrame | None = None + tf2aids: dict[float, dict] = {} + for ( shmfile, shm, @@ -526,16 +528,17 @@ def ldshm( new_df, step_gaps, ) - # last chance manual overwrites in REPL - await tractor.pause() + # await tractor.pause() assert aids + tf2aids[period_s] = aids else: # allow interaction even when no ts problems. - await tractor.pause() - # assert not diff + assert not diff + await tractor.pause() + log.info('Exiting TSP shm anal-izer!') if shm_df is None: log.error( diff --git a/piker/storage/nativedb.py b/piker/storage/nativedb.py index bc7f10e3..8a948cab 100644 --- a/piker/storage/nativedb.py +++ b/piker/storage/nativedb.py @@ -161,7 +161,13 @@ class NativeStorageClient: def index_files(self): for path in self._datadir.iterdir(): - if path.name in {'borked', 'expired',}: + if ( + path.is_dir() + or + '.parquet' not in str(path) + # or + # path.name in {'borked', 'expired',} + ): continue key: str = path.name.rstrip('.parquet') diff --git a/piker/tsp/__init__.py b/piker/tsp/__init__.py index adbe484e..f10f0f75 100644 --- a/piker/tsp/__init__.py +++ b/piker/tsp/__init__.py @@ -44,8 +44,10 @@ import trio from trio_typing import TaskStatus import tractor from pendulum import ( + Interval, DateTime, Duration, + duration as mk_duration, from_timestamp, ) import numpy as np @@ -214,7 +216,8 @@ async def maybe_fill_null_segments( # pair, immediately stop backfilling? if ( start_dt - and end_dt < start_dt + and + end_dt < start_dt ): await tractor.pause() break @@ -262,6 +265,7 @@ async def maybe_fill_null_segments( except tractor.ContextCancelled: # log.exception await tractor.pause() + raise null_segs_detected.set() # RECHECK for more null-gaps @@ -349,7 +353,7 @@ async def maybe_fill_null_segments( async def start_backfill( get_hist, - frame_types: dict[str, Duration] | None, + def_frame_duration: Duration, mod: ModuleType, mkt: MktPair, shm: ShmArray, @@ -379,22 +383,23 @@ async def start_backfill( update_start_on_prepend: bool = False if backfill_until_dt is None: - # TODO: drop this right and just expose the backfill - # limits inside a [storage] section in conf.toml? - # when no tsdb "last datum" is provided, we just load - # some near-term history. - # periods = { - # 1: {'days': 1}, - # 60: {'days': 14}, - # } - - # do a decently sized backfill and load it into storage. + # TODO: per-provider default history-durations? + # -[ ] inside the `open_history_client()` config allow + # declaring the history duration limits instead of + # guessing and/or applying the same limits to all? + # + # -[ ] allow declaring (default) per-provider backfill + # limits inside a [storage] sub-section in conf.toml? + # + # NOTE, when no tsdb "last datum" is provided, we just + # load some near-term history by presuming a "decently + # large" 60s duration limit and a much shorter 1s range. periods = { 1: {'days': 2}, 60: {'years': 6}, } period_duration: int = periods[timeframe] - update_start_on_prepend = True + update_start_on_prepend: bool = True # NOTE: manually set the "latest" datetime which we intend to # backfill history "until" so as to adhere to the history @@ -416,7 +421,6 @@ async def start_backfill( f'backfill_until_dt: {backfill_until_dt}\n' f'last_start_dt: {last_start_dt}\n' ) - try: ( array, @@ -426,71 +430,114 @@ async def start_backfill( timeframe, end_dt=last_start_dt, ) - except NoData as _daterr: - # 3 cases: - # - frame in the middle of a legit venue gap - # - history actually began at the `last_start_dt` - # - some other unknown error (ib blocking the - # history bc they don't want you seeing how they - # cucked all the tinas..) - if dur := frame_types.get(timeframe): - # decrement by a frame's worth of duration and - # retry a few times. - last_start_dt.subtract( - seconds=dur.total_seconds() + orig_last_start_dt: datetime = last_start_dt + gap_report: str = ( + f'EMPTY FRAME for `end_dt: {last_start_dt}`?\n' + f'{mod.name} -> tf@fqme: {timeframe}@{mkt.fqme}\n' + f'last_start_dt: {orig_last_start_dt}\n\n' + f'bf_until: {backfill_until_dt}\n' + ) + # EMPTY FRAME signal with 3 (likely) causes: + # + # 1. range contains legit gap in venue history + # 2. history actually (edge case) **began** at the + # value `last_start_dt` + # 3. some other unknown error (ib blocking the + # history-query bc they don't want you seeing how + # they cucked all the tinas.. like with options + # hist) + # + if def_frame_duration: + # decrement by a duration's (frame) worth of time + # as maybe indicated by the backend to see if we + # can get older data before this possible + # "history gap". + last_start_dt: datetime = last_start_dt.subtract( + seconds=def_frame_duration.total_seconds() ) - log.warning( - f'{mod.name} -> EMPTY FRAME for end_dt?\n' - f'tf@fqme: {timeframe}@{mkt.fqme}\n' - 'bf_until <- last_start_dt:\n' - f'{backfill_until_dt} <- {last_start_dt}\n' - f'Decrementing `end_dt` by {dur} and retry..\n' + gap_report += ( + f'Decrementing `end_dt` and retrying with,\n' + f'def_frame_duration: {def_frame_duration}\n' + f'(new) last_start_dt: {last_start_dt}\n' ) + log.warning(gap_report) + # skip writing to shm/tsdb and try the next + # duration's worth of prior history. continue - # broker says there never was or is no more history to pull - except DataUnavailable: - log.warning( - f'NO-MORE-DATA in range?\n' - f'`{mod.name}` halted history:\n' - f'tf@fqme: {timeframe}@{mkt.fqme}\n' - 'bf_until <- last_start_dt:\n' - f'{backfill_until_dt} <- {last_start_dt}\n' - ) + else: + # await tractor.pause() + raise DataUnavailable(gap_report) - # ugh, what's a better way? - # TODO: fwiw, we probably want a way to signal a throttle - # condition (eg. with ib) so that we can halt the - # request loop until the condition is resolved? - if timeframe > 1: - await tractor.pause() + # broker says there never was or is no more history to pull + except DataUnavailable as due: + message: str = due.args[0] + log.warning( + f'Provider {mod.name!r} halted backfill due to,\n\n' + + f'{message}\n' + + f'fqme: {mkt.fqme}\n' + f'timeframe: {timeframe}\n' + f'last_start_dt: {last_start_dt}\n' + f'bf_until: {backfill_until_dt}\n' + ) + # UGH: what's a better way? + # TODO: backends are responsible for being correct on + # this right!? + # -[ ] in the `ib` case we could maybe offer some way + # to halt the request loop until the condition is + # resolved or should the backend be entirely in + # charge of solving such faults? yes, right? return + time: np.ndarray = array['time'] assert ( - array['time'][0] + time[0] == next_start_dt.timestamp() ) - diff = last_start_dt - next_start_dt - frame_time_diff_s = diff.seconds + assert time[-1] == next_end_dt.timestamp() + + expected_dur: Interval = last_start_dt - next_start_dt # frame's worth of sample-period-steps, in seconds frame_size_s: float = len(array) * timeframe - expected_frame_size_s: float = frame_size_s + timeframe - if frame_time_diff_s > expected_frame_size_s: - + recv_frame_dur: Duration = ( + from_timestamp(array[-1]['time']) + - + from_timestamp(array[0]['time']) + ) + if ( + (lt_frame := (recv_frame_dur < expected_dur)) + or + (null_frame := (frame_size_s == 0)) + # ^XXX, should NEVER hit now! + ): # XXX: query result includes a start point prior to our # expected "frame size" and thus is likely some kind of # history gap (eg. market closed period, outage, etc.) # so just report it to console for now. + if lt_frame: + reason = 'Possible GAP (or first-datum)' + else: + assert null_frame + reason = 'NULL-FRAME' + + missing_dur: Interval = expected_dur.end - recv_frame_dur.end log.warning( - 'GAP DETECTED:\n' - f'last_start_dt: {last_start_dt}\n' - f'diff: {diff}\n' - f'frame_time_diff_s: {frame_time_diff_s}\n' + f'{timeframe}s-series {reason} detected!\n' + f'fqme: {mkt.fqme}\n' + f'last_start_dt: {last_start_dt}\n\n' + f'recv interval: {recv_frame_dur}\n' + f'expected interval: {expected_dur}\n\n' + + f'Missing duration of history of {missing_dur.in_words()!r}\n' + f'{missing_dur}\n' ) + # await tractor.pause() to_push = diff_history( array, @@ -565,22 +612,27 @@ async def start_backfill( # long-term storage. if ( storage is not None - and write_tsdb + and + write_tsdb ): log.info( f'Writing {ln} frame to storage:\n' f'{next_start_dt} -> {last_start_dt}' ) - # always drop the src asset token for + # NOTE, always drop the src asset token for # non-currency-pair like market types (for now) + # + # THAT IS, for now our table key schema is NOT + # including the dst[/src] source asset token. SO, + # 'tsla.nasdaq.ib' over 'tsla/usd.nasdaq.ib' for + # historical reasons ONLY. if mkt.dst.atype not in { 'crypto', 'crypto_currency', 'fiat', # a "forex pair" + 'perpetual_future', # stupid "perps" from cex land }: - # for now, our table key schema is not including - # the dst[/src] source asset token. col_sym_key: str = mkt.get_fqme( delim_char='', without_src=True, @@ -685,7 +737,7 @@ async def back_load_from_tsdb( last_tsdb_dt and latest_start_dt ): - backfilled_size_s = ( + backfilled_size_s: Duration = ( latest_start_dt - last_tsdb_dt ).seconds # if the shm buffer len is not large enough to contain @@ -908,6 +960,8 @@ async def tsdb_backfill( f'{pformat(config)}\n' ) + # concurrently load the provider's most-recent-frame AND any + # pre-existing tsdb history already saved in `piker` storage. dt_eps: list[DateTime, DateTime] = [] async with trio.open_nursery() as tn: tn.start_soon( @@ -918,7 +972,6 @@ async def tsdb_backfill( timeframe, config, ) - tsdb_entry: tuple = await load_tsdb_hist( storage, mkt, @@ -947,6 +1000,25 @@ async def tsdb_backfill( mr_end_dt, ) = dt_eps + first_frame_dur_s: Duration = (mr_end_dt - mr_start_dt).seconds + calced_frame_size: Duration = mk_duration( + seconds=first_frame_dur_s, + ) + # NOTE, attempt to use the backend declared default frame + # sizing (as allowed by their time-series query APIs) and + # if not provided try to construct a default from the + # first frame received above. + def_frame_durs: dict[ + int, + Duration, + ]|None = config.get('frame_types', None) + if def_frame_durs: + def_frame_size: Duration = def_frame_durs[timeframe] + assert def_frame_size == calced_frame_size + else: + # use what we calced from first frame above. + def_frame_size = calced_frame_size + # NOTE: when there's no offline data, there's 2 cases: # - data backend doesn't support timeframe/sample # period (in which case `dt_eps` should be `None` and @@ -977,7 +1049,7 @@ async def tsdb_backfill( partial( start_backfill, get_hist=get_hist, - frame_types=config.get('frame_types', None), + def_frame_duration=def_frame_size, mod=mod, mkt=mkt, shm=shm, diff --git a/piker/tsp/_anal.py b/piker/tsp/_anal.py index c34a0c3a..ea78c46a 100644 --- a/piker/tsp/_anal.py +++ b/piker/tsp/_anal.py @@ -616,6 +616,18 @@ def detect_price_gaps( # ]) ... +# TODO: probably just use the null_segs impl above? +def detect_vlm_gaps( + df: pl.DataFrame, + col: str = 'volume', + +) -> pl.DataFrame: + + vnull: pl.DataFrame = w_dts.filter( + pl.col(col) == 0 + ) + return vnull + def dedupe( src_df: pl.DataFrame, @@ -626,7 +638,6 @@ def dedupe( ) -> tuple[ pl.DataFrame, # with dts - pl.DataFrame, # gaps pl.DataFrame, # with deduplicated dts (aka gap/repeat removal) int, # len diff between input and deduped ]: @@ -639,19 +650,22 @@ def dedupe( ''' wdts: pl.DataFrame = with_dts(src_df) - # maybe sort on any time field - if sort: - wdts = wdts.sort(by='time') - # TODO: detect out-of-order segments which were corrected! - # -[ ] report in log msg - # -[ ] possibly return segment sections which were moved? + deduped = wdts # remove duplicated datetime samples/sections deduped: pl.DataFrame = wdts.unique( - subset=['dt'], + # subset=['dt'], + subset=['time'], maintain_order=True, ) + # maybe sort on any time field + if sort: + deduped = deduped.sort(by='time') + # TODO: detect out-of-order segments which were corrected! + # -[ ] report in log msg + # -[ ] possibly return segment sections which were moved? + diff: int = ( wdts.height -