tsp_gaps: fixes for fault-less OHLCV time-series loads #35
			
				
			
		
		
		
	|  | @ -386,6 +386,8 @@ def ldshm( | |||
|             open_annot_ctl() as actl, | ||||
|         ): | ||||
|             shm_df: pl.DataFrame | None = None | ||||
|             tf2aids: dict[float, dict] = {} | ||||
| 
 | ||||
|             for ( | ||||
|                 shmfile, | ||||
|                 shm, | ||||
|  | @ -526,16 +528,17 @@ def ldshm( | |||
|                             new_df, | ||||
|                             step_gaps, | ||||
|                         ) | ||||
| 
 | ||||
|                         # last chance manual overwrites in REPL | ||||
|                         await tractor.pause() | ||||
|                         # await tractor.pause() | ||||
|                         assert aids | ||||
|                         tf2aids[period_s] = aids | ||||
| 
 | ||||
|                 else: | ||||
|                     # allow interaction even when no ts problems. | ||||
|                     await tractor.pause() | ||||
|                     # assert not diff | ||||
|                     assert not diff | ||||
| 
 | ||||
|             await tractor.pause() | ||||
|             log.info('Exiting TSP shm anal-izer!') | ||||
| 
 | ||||
|             if shm_df is None: | ||||
|                 log.error( | ||||
|  |  | |||
|  | @ -161,7 +161,13 @@ class NativeStorageClient: | |||
| 
 | ||||
|     def index_files(self): | ||||
|         for path in self._datadir.iterdir(): | ||||
|             if path.name in {'borked', 'expired',}: | ||||
|             if ( | ||||
|                 path.is_dir() | ||||
|                 or | ||||
|                 '.parquet' not in str(path) | ||||
|                 # or | ||||
|                 # path.name in {'borked', 'expired',} | ||||
|             ): | ||||
|                 continue | ||||
| 
 | ||||
|             key: str = path.name.rstrip('.parquet') | ||||
|  |  | |||
|  | @ -44,8 +44,10 @@ import trio | |||
| from trio_typing import TaskStatus | ||||
| import tractor | ||||
| from pendulum import ( | ||||
|     Interval, | ||||
|     DateTime, | ||||
|     Duration, | ||||
|     duration as mk_duration, | ||||
|     from_timestamp, | ||||
| ) | ||||
| import numpy as np | ||||
|  | @ -214,7 +216,8 @@ async def maybe_fill_null_segments( | |||
|         # pair, immediately stop backfilling? | ||||
|         if ( | ||||
|             start_dt | ||||
|             and end_dt < start_dt | ||||
|             and | ||||
|             end_dt < start_dt | ||||
|         ): | ||||
|             await tractor.pause() | ||||
|             break | ||||
|  | @ -262,6 +265,7 @@ async def maybe_fill_null_segments( | |||
|         except tractor.ContextCancelled: | ||||
|             # log.exception | ||||
|             await tractor.pause() | ||||
|             raise | ||||
| 
 | ||||
|     null_segs_detected.set() | ||||
|     # RECHECK for more null-gaps | ||||
|  | @ -349,7 +353,7 @@ async def maybe_fill_null_segments( | |||
| 
 | ||||
| async def start_backfill( | ||||
|     get_hist, | ||||
|     frame_types: dict[str, Duration] | None, | ||||
|     def_frame_duration: Duration, | ||||
|     mod: ModuleType, | ||||
|     mkt: MktPair, | ||||
|     shm: ShmArray, | ||||
|  | @ -379,22 +383,23 @@ async def start_backfill( | |||
|         update_start_on_prepend: bool = False | ||||
|         if backfill_until_dt is None: | ||||
| 
 | ||||
|             # TODO: drop this right and just expose the backfill | ||||
|             # limits inside a [storage] section in conf.toml? | ||||
|             # when no tsdb "last datum" is provided, we just load | ||||
|             # some near-term history. | ||||
|             # periods = { | ||||
|             #     1: {'days': 1}, | ||||
|             #     60: {'days': 14}, | ||||
|             # } | ||||
| 
 | ||||
|             # do a decently sized backfill and load it into storage. | ||||
|             # TODO: per-provider default history-durations? | ||||
|             # -[ ] inside the `open_history_client()` config allow | ||||
|             #    declaring the history duration limits instead of | ||||
|             #    guessing and/or applying the same limits to all? | ||||
|             # | ||||
|             # -[ ] allow declaring (default) per-provider backfill | ||||
|             #     limits inside a [storage] sub-section in conf.toml? | ||||
|             # | ||||
|             # NOTE, when no tsdb "last datum" is provided, we just | ||||
|             # load some near-term history by presuming a "decently | ||||
|             # large" 60s duration limit and a much shorter 1s range. | ||||
|             periods = { | ||||
|                 1: {'days': 2}, | ||||
|                 60: {'years': 6}, | ||||
|             } | ||||
|             period_duration: int = periods[timeframe] | ||||
|             update_start_on_prepend = True | ||||
|             update_start_on_prepend: bool = True | ||||
| 
 | ||||
|             # NOTE: manually set the "latest" datetime which we intend to | ||||
|             # backfill history "until" so as to adhere to the history | ||||
|  | @ -416,7 +421,6 @@ async def start_backfill( | |||
|                 f'backfill_until_dt: {backfill_until_dt}\n' | ||||
|                 f'last_start_dt: {last_start_dt}\n' | ||||
|             ) | ||||
| 
 | ||||
|             try: | ||||
|                 ( | ||||
|                     array, | ||||
|  | @ -426,71 +430,114 @@ async def start_backfill( | |||
|                     timeframe, | ||||
|                     end_dt=last_start_dt, | ||||
|                 ) | ||||
| 
 | ||||
|             except NoData as _daterr: | ||||
|                 # 3 cases: | ||||
|                 # - frame in the middle of a legit venue gap | ||||
|                 # - history actually began at the `last_start_dt` | ||||
|                 # - some other unknown error (ib blocking the | ||||
|                 #   history bc they don't want you seeing how they | ||||
|                 #   cucked all the tinas..) | ||||
|                 if dur := frame_types.get(timeframe): | ||||
|                     # decrement by a frame's worth of duration and | ||||
|                     # retry a few times. | ||||
|                     last_start_dt.subtract( | ||||
|                         seconds=dur.total_seconds() | ||||
|                 orig_last_start_dt: datetime = last_start_dt | ||||
|                 gap_report: str = ( | ||||
|                     f'EMPTY FRAME for `end_dt: {last_start_dt}`?\n' | ||||
|                     f'{mod.name} -> tf@fqme: {timeframe}@{mkt.fqme}\n' | ||||
|                     f'last_start_dt: {orig_last_start_dt}\n\n' | ||||
|                     f'bf_until: {backfill_until_dt}\n' | ||||
|                 ) | ||||
|                     log.warning( | ||||
|                         f'{mod.name} -> EMPTY FRAME for end_dt?\n' | ||||
|                         f'tf@fqme: {timeframe}@{mkt.fqme}\n' | ||||
|                         'bf_until <- last_start_dt:\n' | ||||
|                         f'{backfill_until_dt} <- {last_start_dt}\n' | ||||
|                         f'Decrementing `end_dt` by {dur} and retry..\n' | ||||
|                 # EMPTY FRAME signal with 3 (likely) causes: | ||||
|                 # | ||||
|                 # 1. range contains legit gap in venue history | ||||
|                 # 2. history actually (edge case) **began** at the | ||||
|                 #    value `last_start_dt` | ||||
|                 # 3. some other unknown error (ib blocking the | ||||
|                 #    history-query bc they don't want you seeing how | ||||
|                 #    they cucked all the tinas.. like with options | ||||
|                 #    hist) | ||||
|                 # | ||||
|                 if def_frame_duration: | ||||
|                     # decrement by a duration's (frame) worth of time | ||||
|                     # as maybe indicated by the backend to see if we | ||||
|                     # can get older data before this possible | ||||
|                     # "history gap". | ||||
|                     last_start_dt: datetime = last_start_dt.subtract( | ||||
|                         seconds=def_frame_duration.total_seconds() | ||||
|                     ) | ||||
|                     gap_report += ( | ||||
|                         f'Decrementing `end_dt` and retrying with,\n' | ||||
|                         f'def_frame_duration: {def_frame_duration}\n' | ||||
|                         f'(new) last_start_dt: {last_start_dt}\n' | ||||
|                     ) | ||||
|                     log.warning(gap_report) | ||||
|                     # skip writing to shm/tsdb and try the next | ||||
|                     # duration's worth of prior history. | ||||
|                     continue | ||||
| 
 | ||||
|             # broker says there never was or is no more history to pull | ||||
|             except DataUnavailable: | ||||
|                 log.warning( | ||||
|                     f'NO-MORE-DATA in range?\n' | ||||
|                     f'`{mod.name}` halted history:\n' | ||||
|                     f'tf@fqme: {timeframe}@{mkt.fqme}\n' | ||||
|                     'bf_until <- last_start_dt:\n' | ||||
|                     f'{backfill_until_dt} <- {last_start_dt}\n' | ||||
|                 ) | ||||
|                 else: | ||||
|                     # await tractor.pause() | ||||
|                     raise DataUnavailable(gap_report) | ||||
| 
 | ||||
|                 # ugh, what's a better way? | ||||
|                 # TODO: fwiw, we probably want a way to signal a throttle | ||||
|                 # condition (eg. with ib) so that we can halt the | ||||
|                 # request loop until the condition is resolved? | ||||
|                 if timeframe > 1: | ||||
|                     await tractor.pause() | ||||
|             # broker says there never was or is no more history to pull | ||||
|             except DataUnavailable as due: | ||||
|                 message: str = due.args[0] | ||||
|                 log.warning( | ||||
|                     f'Provider {mod.name!r} halted backfill due to,\n\n' | ||||
| 
 | ||||
|                     f'{message}\n' | ||||
| 
 | ||||
|                     f'fqme: {mkt.fqme}\n' | ||||
|                     f'timeframe: {timeframe}\n' | ||||
|                     f'last_start_dt: {last_start_dt}\n' | ||||
|                     f'bf_until: {backfill_until_dt}\n' | ||||
|                 ) | ||||
|                 # UGH: what's a better way? | ||||
|                 # TODO: backends are responsible for being correct on | ||||
|                 # this right!? | ||||
|                 # -[ ] in the `ib` case we could maybe offer some way | ||||
|                 #     to halt the request loop until the condition is | ||||
|                 #     resolved or should the backend be entirely in | ||||
|                 #     charge of solving such faults? yes, right? | ||||
|                 return | ||||
| 
 | ||||
|             time: np.ndarray = array['time'] | ||||
|             assert ( | ||||
|                 array['time'][0] | ||||
|                 time[0] | ||||
|                 == | ||||
|                 next_start_dt.timestamp() | ||||
|             ) | ||||
| 
 | ||||
|             diff = last_start_dt - next_start_dt | ||||
|             frame_time_diff_s = diff.seconds | ||||
|             assert time[-1] == next_end_dt.timestamp() | ||||
| 
 | ||||
|             expected_dur: Interval = last_start_dt - next_start_dt | ||||
| 
 | ||||
|             # frame's worth of sample-period-steps, in seconds | ||||
|             frame_size_s: float = len(array) * timeframe | ||||
|             expected_frame_size_s: float = frame_size_s + timeframe | ||||
|             if frame_time_diff_s > expected_frame_size_s: | ||||
| 
 | ||||
|             recv_frame_dur: Duration = ( | ||||
|                 from_timestamp(array[-1]['time']) | ||||
|                 - | ||||
|                 from_timestamp(array[0]['time']) | ||||
|             ) | ||||
|             if ( | ||||
|                 (lt_frame := (recv_frame_dur < expected_dur)) | ||||
|                 or | ||||
|                 (null_frame := (frame_size_s == 0)) | ||||
|                 # ^XXX, should NEVER hit now! | ||||
|             ): | ||||
|                 # XXX: query result includes a start point prior to our | ||||
|                 # expected "frame size" and thus is likely some kind of | ||||
|                 # history gap (eg. market closed period, outage, etc.) | ||||
|                 # so just report it to console for now. | ||||
|                 if lt_frame: | ||||
|                     reason = 'Possible GAP (or first-datum)' | ||||
|                 else: | ||||
|                     assert null_frame | ||||
|                     reason = 'NULL-FRAME' | ||||
| 
 | ||||
|                 missing_dur: Interval = expected_dur.end - recv_frame_dur.end | ||||
|                 log.warning( | ||||
|                     'GAP DETECTED:\n' | ||||
|                     f'last_start_dt: {last_start_dt}\n' | ||||
|                     f'diff: {diff}\n' | ||||
|                     f'frame_time_diff_s: {frame_time_diff_s}\n' | ||||
|                     f'{timeframe}s-series {reason} detected!\n' | ||||
|                     f'fqme: {mkt.fqme}\n' | ||||
|                     f'last_start_dt: {last_start_dt}\n\n' | ||||
|                     f'recv interval: {recv_frame_dur}\n' | ||||
|                     f'expected interval: {expected_dur}\n\n' | ||||
| 
 | ||||
|                     f'Missing duration of history of {missing_dur.in_words()!r}\n' | ||||
|                     f'{missing_dur}\n' | ||||
|                 ) | ||||
|                 # await tractor.pause() | ||||
| 
 | ||||
|             to_push = diff_history( | ||||
|                 array, | ||||
|  | @ -565,22 +612,27 @@ async def start_backfill( | |||
|             # long-term storage. | ||||
|             if ( | ||||
|                 storage is not None | ||||
|                 and write_tsdb | ||||
|                 and | ||||
|                 write_tsdb | ||||
|             ): | ||||
|                 log.info( | ||||
|                     f'Writing {ln} frame to storage:\n' | ||||
|                     f'{next_start_dt} -> {last_start_dt}' | ||||
|                 ) | ||||
| 
 | ||||
|                 # always drop the src asset token for | ||||
|                 # NOTE, always drop the src asset token for | ||||
|                 # non-currency-pair like market types (for now) | ||||
|                 # | ||||
|                 # THAT IS, for now our table key schema is NOT | ||||
|                 # including the dst[/src] source asset token. SO, | ||||
|                 # 'tsla.nasdaq.ib' over 'tsla/usd.nasdaq.ib' for | ||||
|                 # historical reasons ONLY. | ||||
|                 if mkt.dst.atype not in { | ||||
|                     'crypto', | ||||
|                     'crypto_currency', | ||||
|                     'fiat',  # a "forex pair" | ||||
|                     'perpetual_future',  # stupid "perps" from cex land | ||||
|                 }: | ||||
|                     # for now, our table key schema is not including | ||||
|                     # the dst[/src] source asset token. | ||||
|                     col_sym_key: str = mkt.get_fqme( | ||||
|                         delim_char='', | ||||
|                         without_src=True, | ||||
|  | @ -685,7 +737,7 @@ async def back_load_from_tsdb( | |||
|         last_tsdb_dt | ||||
|         and latest_start_dt | ||||
|     ): | ||||
|         backfilled_size_s = ( | ||||
|         backfilled_size_s: Duration = ( | ||||
|             latest_start_dt - last_tsdb_dt | ||||
|         ).seconds | ||||
|         # if the shm buffer len is not large enough to contain | ||||
|  | @ -908,6 +960,8 @@ async def tsdb_backfill( | |||
|             f'{pformat(config)}\n' | ||||
|         ) | ||||
| 
 | ||||
|         # concurrently load the provider's most-recent-frame AND any | ||||
|         # pre-existing tsdb history already saved in `piker` storage. | ||||
|         dt_eps: list[DateTime, DateTime] = [] | ||||
|         async with trio.open_nursery() as tn: | ||||
|             tn.start_soon( | ||||
|  | @ -918,7 +972,6 @@ async def tsdb_backfill( | |||
|                 timeframe, | ||||
|                 config, | ||||
|             ) | ||||
| 
 | ||||
|             tsdb_entry: tuple = await load_tsdb_hist( | ||||
|                 storage, | ||||
|                 mkt, | ||||
|  | @ -947,6 +1000,25 @@ async def tsdb_backfill( | |||
|                 mr_end_dt, | ||||
|             ) = dt_eps | ||||
| 
 | ||||
|             first_frame_dur_s: Duration = (mr_end_dt - mr_start_dt).seconds | ||||
|             calced_frame_size: Duration = mk_duration( | ||||
|                 seconds=first_frame_dur_s, | ||||
|             ) | ||||
|             # NOTE, attempt to use the backend declared default frame | ||||
|             # sizing (as allowed by their time-series query APIs) and | ||||
|             # if not provided try to construct a default from the | ||||
|             # first frame received above. | ||||
|             def_frame_durs: dict[ | ||||
|                 int, | ||||
|                 Duration, | ||||
|             ]|None = config.get('frame_types', None) | ||||
|             if def_frame_durs: | ||||
|                 def_frame_size: Duration = def_frame_durs[timeframe] | ||||
|                 assert def_frame_size == calced_frame_size | ||||
|             else: | ||||
|                 # use what we calced from first frame above. | ||||
|                 def_frame_size = calced_frame_size | ||||
| 
 | ||||
|             # NOTE: when there's no offline data, there's 2 cases: | ||||
|             # - data backend doesn't support timeframe/sample | ||||
|             #   period (in which case `dt_eps` should be `None` and | ||||
|  | @ -977,7 +1049,7 @@ async def tsdb_backfill( | |||
|                     partial( | ||||
|                         start_backfill, | ||||
|                         get_hist=get_hist, | ||||
|                         frame_types=config.get('frame_types', None), | ||||
|                         def_frame_duration=def_frame_size, | ||||
|                         mod=mod, | ||||
|                         mkt=mkt, | ||||
|                         shm=shm, | ||||
|  |  | |||
|  | @ -616,6 +616,18 @@ def detect_price_gaps( | |||
|     # ]) | ||||
|     ... | ||||
| 
 | ||||
| # TODO: probably just use the null_segs impl above? | ||||
| def detect_vlm_gaps( | ||||
|     df: pl.DataFrame, | ||||
|     col: str = 'volume', | ||||
| 
 | ||||
| ) -> pl.DataFrame: | ||||
| 
 | ||||
|     vnull: pl.DataFrame = w_dts.filter( | ||||
|         pl.col(col) == 0 | ||||
|     ) | ||||
|     return vnull | ||||
| 
 | ||||
| 
 | ||||
| def dedupe( | ||||
|     src_df: pl.DataFrame, | ||||
|  | @ -626,7 +638,6 @@ def dedupe( | |||
| 
 | ||||
| ) -> tuple[ | ||||
|     pl.DataFrame,  # with dts | ||||
|     pl.DataFrame,  # gaps | ||||
|     pl.DataFrame,  # with deduplicated dts (aka gap/repeat removal) | ||||
|     int,  # len diff between input and deduped | ||||
| ]: | ||||
|  | @ -639,19 +650,22 @@ def dedupe( | |||
|     ''' | ||||
|     wdts: pl.DataFrame = with_dts(src_df) | ||||
| 
 | ||||
|     # maybe sort on any time field | ||||
|     if sort: | ||||
|         wdts = wdts.sort(by='time') | ||||
|         # TODO: detect out-of-order segments which were corrected! | ||||
|         # -[ ] report in log msg | ||||
|         # -[ ] possibly return segment sections which were moved? | ||||
|     deduped = wdts | ||||
| 
 | ||||
|     # remove duplicated datetime samples/sections | ||||
|     deduped: pl.DataFrame = wdts.unique( | ||||
|         subset=['dt'], | ||||
|         # subset=['dt'], | ||||
|         subset=['time'], | ||||
|         maintain_order=True, | ||||
|     ) | ||||
| 
 | ||||
|     # maybe sort on any time field | ||||
|     if sort: | ||||
|         deduped = deduped.sort(by='time') | ||||
|         # TODO: detect out-of-order segments which were corrected! | ||||
|         # -[ ] report in log msg | ||||
|         # -[ ] possibly return segment sections which were moved? | ||||
| 
 | ||||
|     diff: int = ( | ||||
|         wdts.height | ||||
|         - | ||||
|  |  | |||
		Loading…
	
		Reference in New Issue