diff --git a/piker/data/history.py b/piker/tsp/__init__.py similarity index 93% rename from piker/data/history.py rename to piker/tsp/__init__.py index 65514a28..17652ee5 100644 --- a/piker/data/history.py +++ b/piker/tsp/__init__.py @@ -32,6 +32,7 @@ from __future__ import annotations from datetime import datetime from functools import partial from pathlib import Path +from pprint import pformat from types import ModuleType from typing import ( Callable, @@ -53,25 +54,64 @@ import polars as pl from ..accounting import ( MktPair, ) -from ._util import ( +from ..data._util import ( log, ) -from ._sharedmem import ( +from ..data._sharedmem import ( maybe_open_shm_array, ShmArray, ) -from ._source import def_iohlcv_fields -from ._sampling import ( +from ..data._source import def_iohlcv_fields +from ..data._sampling import ( open_sample_stream, ) -from .tsp import ( - dedupe, +from ._anal import ( + get_null_segs, iter_null_segs, - sort_diff, Frame, - # Seq, + Seq, + + # codec-ish + np2pl, + pl2np, + + # `numpy` only + slice_from_time, + + # `polars` specific + dedupe, + with_dts, + detect_time_gaps, + sort_diff, + + # TODO: + detect_price_gaps ) + +__all__: list[str] = [ + 'dedupe', + 'get_null_segs', + 'iter_null_segs', + 'sort_diff', + 'slice_from_time', + 'Frame', + 'Seq', + + 'np2pl', + 'pl2np', + + 'slice_from_time', + + 'with_dts', + 'detect_time_gaps', + 'sort_diff', + + # TODO: + 'detect_price_gaps' +] + +# TODO: break up all this shite into submods! from ..brokers._util import ( DataUnavailable, ) @@ -252,35 +292,65 @@ async def maybe_fill_null_segments( and len(null_segs[-1]) ): - await tractor.pause() + ( + iabs_slices, + iabs_zero_rows, + zero_t, + ) = null_segs + log.warning( + f'{len(iabs_slices)} NULL TIME SEGMENTS DETECTED!\n' + f'{pformat(iabs_slices)}' + ) - array = shm.array - zeros = array[array['low'] == 0] - - # always backfill gaps with the earliest (price) datum's - # value to avoid the y-ranger including zeros and completely - # stretching the y-axis.. - if 0 < zeros.size: - zeros[[ + # TODO: always backfill gaps with the earliest (price) datum's + # value to avoid the y-ranger including zeros and completely + # stretching the y-axis.. + # array: np.ndarray = shm.array + # zeros = array[array['low'] == 0] + ohlc_fields: list[str] = [ 'open', 'high', 'low', 'close', - ]] = shm._array[zeros['index'][0] - 1]['close'] + ] - # TODO: interatively step through any remaining - # time-gaps/null-segments and spawn piecewise backfiller - # tasks in a nursery? - # -[ ] not sure that's going to work so well on the ib - # backend but worth a shot? - # -[ ] mk new history connections to make it properly - # parallel possible no matter the backend? - # -[ ] fill algo: do queries in alternating "latest, then - # earliest, then latest.. etc?" - # if ( - # next_end_dt not in frame[ - # ): - # pass + for istart, istop in iabs_slices: + + # get view into buffer for null-segment + gap: np.ndarray = shm._array[istart:istop] + + # copy the oldest OHLC samples forward + gap[ohlc_fields] = shm._array[istart]['close'] + + start_t: float = shm._array[istart]['time'] + t_diff: float = (istop - istart)*timeframe + gap['time'] = np.arange( + start=start_t, + stop=start_t + t_diff, + step=timeframe, + ) + + await sampler_stream.send({ + 'broadcast_all': { + + # XXX NOTE XXX: see the + # `.ui._display.increment_history_view()` if block + # that looks for this info to FORCE a hard viz + # redraw! + 'backfilling': (mkt.fqme, timeframe), + }, + }) + + # TODO: interatively step through any remaining + # time-gaps/null-segments and spawn piecewise backfiller + # tasks in a nursery? + # -[ ] not sure that's going to work so well on the ib + # backend but worth a shot? + # -[ ] mk new history connections to make it properly + # parallel possible no matter the backend? + # -[ ] fill algo: do queries in alternating "latest, then + # earliest, then latest.. etc?" + # await tractor.pause() async def start_backfill( @@ -1252,8 +1322,8 @@ def iter_dfs_from_shms( assert not opened ohlcv = shm.array - from ..data import tsp - df: pl.DataFrame = tsp.np2pl(ohlcv) + from ._anal import np2pl + df: pl.DataFrame = np2pl(ohlcv) yield ( shmfile, diff --git a/piker/data/tsp.py b/piker/tsp/_anal.py similarity index 97% rename from piker/data/tsp.py rename to piker/tsp/_anal.py index 0776d3d8..35b64dcb 100644 --- a/piker/data/tsp.py +++ b/piker/tsp/_anal.py @@ -319,9 +319,8 @@ def get_null_segs( if num_gaps < 1: if absi_zeros.size > 1: absi_zsegs = [[ - # see `get_hist()` in backend, should ALWAYS be - # able to handle a `start_dt=None`! - # None, + # TODO: maybe mk these max()/min() limits func + # consts instead of called more then once? max( absi_zeros[0] - 1, 0, @@ -359,7 +358,10 @@ def get_null_segs( # corresponding to the first zero-segment's row, we add it # manually here. absi_zsegs.append([ - absi_zeros[0] - 1, + max( + absi_zeros[0] - 1, + 0, + ), None, ]) @@ -400,14 +402,18 @@ def get_null_segs( else: if 0 < num_gaps < 2: - absi_zsegs[-1][1] = absi_zeros[-1] + 1 + absi_zsegs[-1][1] = min( + absi_zeros[-1] + 1, + frame['index'][-1], + ) iabs_first: int = frame['index'][0] for start, end in absi_zsegs: + ts_start: float = times[start - iabs_first] ts_end: float = times[end - iabs_first] if ( - ts_start == 0 + (ts_start == 0 and not start == 0) or ts_end == 0 ): @@ -451,11 +457,13 @@ def iter_null_segs( ], None, ]: - if null_segs is None: - null_segs: tuple = get_null_segs( + if not ( + null_segs := get_null_segs( frame, period=timeframe, ) + ): + return absi_pairs_zsegs: list[list[float, float]] izeros: Seq @@ -502,6 +510,7 @@ def iter_null_segs( ) +# TODO: move to ._pl_anal def with_dts( df: pl.DataFrame, time_col: str = 'time', @@ -525,19 +534,6 @@ def with_dts( # ) -def dedup_dt( - df: pl.DataFrame, -) -> pl.DataFrame: - ''' - Drop duplicate date-time rows (normally from an OHLC frame). - - ''' - return df.unique( - subset=['dt'], - maintain_order=True, - ) - - t_unit: Literal = Literal[ 'days', 'hours', @@ -651,7 +647,11 @@ def dedupe(src_df: pl.DataFrame) -> tuple[ ) # remove duplicated datetime samples/sections - deduped: pl.DataFrame = dedup_dt(df) + deduped: pl.DataFrame = df.unique( + subset=['dt'], + maintain_order=True, + ) + deduped_gaps = detect_time_gaps(deduped) diff: int = (