diff --git a/piker/data/history.py b/piker/data/history.py index 0c2ecc25..dbd7c26b 100644 --- a/piker/data/history.py +++ b/piker/data/history.py @@ -56,7 +56,13 @@ from ._source import def_iohlcv_fields from ._sampling import ( open_sample_stream, ) -from . import tsp +from .tsp import ( + dedupe, + get_null_segs, + sort_diff, + Frame, + Seq, +) from ..brokers._util import ( DataUnavailable, ) @@ -158,6 +164,100 @@ async def shm_push_in_between( # await tractor.pause() +async def maybe_fill_null_segments( + shm: ShmArray, + timeframe: float, + get_hist: Callable, + sampler_stream: tractor.MsgStream, + mkt: MktPair, + +) -> list[Frame]: + + frame: Frame = shm.array + + null_segs: tuple | None = get_null_segs( + frame, + period=timeframe, + ) + if null_segs: + absi_pairs_zsegs: list[list[float, float]] + izeros: Seq + zero_t: Frame + ( + absi_pairs_zsegs, + izeros, + zero_t, + ) = null_segs + + absi_first: int = frame[0]['index'] + for absi_start, absi_end in absi_pairs_zsegs: + # await tractor.pause() + fi_start = absi_start - absi_first + fi_end = absi_end - absi_first + start_row: Seq = frame[fi_start] + end_row: Seq = frame[fi_end] + + start_t: float = start_row['time'] + end_t: float = end_row['time'] + + start_dt = from_timestamp(start_t) + end_dt = from_timestamp(end_t) + + # if we get a badly ordered timestamp + # pair, immediately stop backfilling. + if end_dt < start_dt: + break + + ( + array, + next_start_dt, + next_end_dt, + ) = await get_hist( + timeframe, + start_dt=start_dt, + end_dt=end_dt, + ) + + # XXX TODO: pretty sure if i plot tsla, btcusdt.binance + # and mnq.cme.ib this causes a Qt crash XXDDD + + # make sure we don't overrun the buffer start + len_to_push: int = min(absi_end, array.size) + to_push: np.ndarray = array[-len_to_push:] + + await shm_push_in_between( + shm, + to_push, + prepend_index=absi_end, + update_start_on_prepend=False, + ) + + # TODO: UI side needs IPC event to update.. + # - make sure the UI actually always handles + # this update! + # - remember that in the display side, only refersh this + # if the respective history is actually "in view". + # loop + await sampler_stream.send({ + 'broadcast_all': { + 'backfilling': (mkt.fqme, timeframe), + }, + }) + + # RECHECK for more null-gaps + frame: Frame = shm.array + null_segs: tuple | None = get_null_segs( + frame, + period=timeframe, + ) + if ( + null_segs + and + len(null_segs[-1]) + ): + await tractor.pause() + + async def start_backfill( get_hist, mod: ModuleType, @@ -224,16 +324,20 @@ async def start_backfill( # per time stamp. # starts: Counter[datetime] = Counter() - # conduct "backward history gap filling" where we push to - # the shm buffer until we have history back until the - # latest entry loaded from the tsdb's table B) + # STAGE NOTE: "backward history gap filling": + # - we push to the shm buffer until we have history back + # until the latest entry loaded from the tsdb's table B) + # - after this loop continue to check for other gaps in the + # (tsdb) history and (at least report) maybe fill them + # from new frame queries to the backend? last_start_dt: datetime = backfill_from_dt next_prepend_index: int = backfill_from_shm_index while last_start_dt > backfill_until_dt: - - log.debug( - f'Requesting {timeframe}s frame ending in {last_start_dt}' + log.info( + f'Requesting {timeframe}s frame:\n' + f'backfill_until_dt: {backfill_until_dt}\n' + f'last_start_dt: {last_start_dt}\n' ) try: @@ -261,36 +365,18 @@ async def start_backfill( await tractor.pause() return - # TODO: drop this? see todo above.. - # if ( - # next_start_dt in starts - # and starts[next_start_dt] <= 6 - # ): - # start_dt = min(starts) - # log.warning( - # f"{mkt.fqme}: skipping duplicate frame @ {next_start_dt}" - # ) - # starts[start_dt] += 1 - # await tractor.pause() - # continue - - # elif starts[next_start_dt] > 6: - # log.warning( - # f'NO-MORE-DATA: backend {mod.name} before {next_start_dt}?' - # ) - # return - - # # only update new start point if not-yet-seen - # starts[next_start_dt] += 1 - - assert array['time'][0] == next_start_dt.timestamp() + assert ( + array['time'][0] + == + next_start_dt.timestamp() + ) diff = last_start_dt - next_start_dt frame_time_diff_s = diff.seconds # frame's worth of sample-period-steps, in seconds - frame_size_s = len(array) * timeframe - expected_frame_size_s = frame_size_s + timeframe + frame_size_s: float = len(array) * timeframe + expected_frame_size_s: float = frame_size_s + timeframe if frame_time_diff_s > expected_frame_size_s: # XXX: query result includes a start point prior to our @@ -298,8 +384,10 @@ async def start_backfill( # history gap (eg. market closed period, outage, etc.) # so just report it to console for now. log.warning( - f'History frame ending @ {last_start_dt} appears to have a gap:\n' - f'{diff} ~= {frame_time_diff_s} seconds' + 'GAP DETECTED:\n' + f'last_start_dt: {last_start_dt}\n' + f'diff: {diff}\n' + f'frame_time_diff_s: {frame_time_diff_s}\n' ) to_push = diff_history( @@ -415,74 +503,32 @@ async def start_backfill( gaps, deduped, diff, - ) = tsp.dedupe(df) + ) = dedupe(df) if diff: - tsp.sort_diff(df) + sort_diff(df) else: # finally filled gap log.info( f'Finished filling gap to tsdb start @ {backfill_until_dt}!' ) - # conduct tsdb timestamp gap detection and backfill any - # seemingly missing sequence segments.. - # TODO: ideally these never exist but somehow it seems - # sometimes we're writing zero-ed segments on certain - # (teardown) cases? - from .tsp import detect_null_time_gap - gap_indices: tuple | None = detect_null_time_gap(shm) - while gap_indices: - ( - istart, - start, - end, - iend, - ) = gap_indices + # NOTE: conduct tsdb timestamp gap detection and backfill any + # seemingly missing (null-time) segments.. - start_dt = from_timestamp(start) - end_dt = from_timestamp(end) - - # if we get a baddly ordered timestamp - # pair, imeeditately stop backfilling. - if end_dt < start_dt: - break - - ( - array, - next_start_dt, - next_end_dt, - ) = await get_hist( - timeframe, - start_dt=start_dt, - end_dt=end_dt, - ) - - # XXX TODO: pretty sure if i plot tsla, btcusdt.binance - # and mnq.cme.ib this causes a Qt crash XXDDD - - # make sure we don't overrun the buffer start - len_to_push: int = min(iend, array.size) - to_push: np.ndarray = array[-len_to_push:] - await shm_push_in_between( - shm, - to_push, - prepend_index=iend, - update_start_on_prepend=False, - ) - - # TODO: UI side needs IPC event to update.. - # - make sure the UI actually always handles - # this update! - # - remember that in the display side, only refersh this - # if the respective history is actually "in view". - # loop - await sampler_stream.send({ - 'broadcast_all': { - 'backfilling': (mkt.fqme, timeframe), - }, - }) - gap_indices: tuple | None = detect_null_time_gap(shm) + # TODO: ideally these can never exist! + # -[ ] somehow it seems sometimes we're writing zero-ed + # segments to tsdbs during teardown? + # -[ ] can we ensure that the backcfiller tasks do this + # work PREVENTAVELY instead? + # -[ ] fill in non-zero epoch time values ALWAYS! + await maybe_fill_null_segments( + shm=shm, + timeframe=timeframe, + get_hist=get_hist, + sampler_stream=sampler_stream, + mkt=mkt, + ) # XXX: extremely important, there can be no checkpoints # in the block above to avoid entering new ``frames`` @@ -679,29 +725,12 @@ async def tsdb_backfill( # there's no backfilling possible. except DataUnavailable: task_status.started() - await tractor.pause() + + if timeframe > 1: + await tractor.pause() + return - # TODO: fill in non-zero epoch time values ALWAYS! - # hist_shm._array['time'] = np.arange( - # start= - - # NOTE: removed for now since it'll always break - # on the first 60s of the venue open.. - # times: np.ndarray = array['time'] - # # sample period step size in seconds - # step_size_s = ( - # from_timestamp(times[-1]) - # - from_timestamp(times[-2]) - # ).seconds - - # if step_size_s not in (1, 60): - # log.error(f'Last 2 sample period is off!? -> {step_size_s}') - # step_size_s = ( - # from_timestamp(times[-2]) - # - from_timestamp(times[-3]) - # ).seconds - # NOTE: on the first history, most recent history # frame we PREPEND from the current shm ._last index # and thus a gap between the earliest datum loaded here diff --git a/piker/data/tsp.py b/piker/data/tsp.py index 2b1ada05..34cc794a 100644 --- a/piker/data/tsp.py +++ b/piker/data/tsp.py @@ -34,7 +34,6 @@ from typing import Literal import numpy as np import polars as pl -from ._sharedmem import ShmArray from ..toolz.profile import ( Profiler, pg_profile_enabled, @@ -53,6 +52,14 @@ get_console_log = partial( name=subsys, ) +# NOTE: union type-defs to handle generic `numpy` and `polars` types +# side-by-side Bo +# |_ TODO: schema spec typing? +# -[ ] nptyping! +# -[ ] wtv we can with polars? +Frame = pl.DataFrame | np.ndarray +Seq = pl.Series | np.ndarray + def slice_from_time( arr: np.ndarray, @@ -209,51 +216,126 @@ def slice_from_time( return read_slc -def detect_null_time_gap( - shm: ShmArray, +def get_null_segs( + frame: Frame, + period: float, # sampling step in seconds imargin: int = 1, + col: str = 'time', -) -> tuple[float, float] | None: +) -> tuple[ + Seq, + Seq, + Frame +] | None: ''' - Detect if there are any zero-epoch stamped rows in - the presumed 'time' field-column. + Detect if there are any zero(-epoch stamped) valued + rows in for the provided `col: str` column; by default + presume the 'time' field/column. - Filter to the gap and return a surrounding index range. + Filter to all such zero (time) segments and return + the corresponding frame zeroed segment's, - NOTE: for now presumes only ONE gap XD + - gap absolute (in buffer terms) indices-endpoints as + `absi_zsegs` + - abs indices of all rows with zeroed `col` values as `absi_zeros` + - the corresponding frame's row-entries (view) which are + zeroed for the `col` as `zero_t` ''' - # ensure we read buffer state only once so that ShmArray rt + # TODO: remove this? + # NOTE: ensure we read buffer state only once so that ShmArray rt # circular-buffer updates don't cause a indexing/size mismatch. - array: np.ndarray = shm.array + # frame: np.ndarray = shm.array - zero_pred: np.ndarray = array['time'] == 0 - zero_t: np.ndarray = array[zero_pred] + times: Seq = frame['time'] + zero_pred: Seq = (times == 0) - if zero_t.size: - istart, iend = zero_t['index'][[0, -1]] - start, end = shm._array['time'][ - [istart - imargin, iend + imargin] - ] - return ( - istart - imargin, - start, - end, - iend + imargin, + if isinstance(frame, np.ndarray): + tis_zeros: int = zero_pred.any() + else: + tis_zeros: int = zero_pred.any() + + if not tis_zeros: + return None + + absi_zsegs: list[list[int, int]] = [] + + if isinstance(frame, np.ndarray): + # ifirst: int = frame[0]['index'] + zero_t: np.ndarray = frame[zero_pred] + + absi_zeros = zero_t['index'] + # relative frame-indexes of zeros + # fizeros = np.ndarray = zero_t['index'] - ifirst + absi_zdiff: np.ndarray = np.diff(absi_zeros) + fi_zgaps = np.argwhere( + absi_zdiff > 1 + # OR null / inf? + # OR is 0? for first zero-row entry? ) + fi_zseg_start_rows = zero_t[fi_zgaps] - return None + else: # pl.DataFrame case + izeros: pl.Series = zero_pred.arg_true() + zero_t: pl.DataFrame = frame[izeros] + absi_zeros = zero_t['index'] + absi_zdiff: pl.Series = absi_zeros.diff() + fi_zgaps = (absi_zdiff > 1).arg_true() -t_unit: Literal = Literal[ - 'days', - 'hours', - 'minutes', - 'seconds', - 'miliseconds', - 'microseconds', - 'nanoseconds', -] + # select out slice index pairs for each null-segment + # portion detected throughout entire input frame. + if not fi_zgaps.size: + # TODO: use ndarray for this! + absi_zsegs = [[ + absi_zeros[0], # - 1, # - ifirst, + # TODO: need the + 1 or no? + absi_zeros[-1] + 1, # - ifirst, + ]] + else: + absi_zsegs.append([ + absi_zeros[0] - 1, # - ifirst, + None, + ]) + + # TODO: can we do it with vec ops? + for i, ( + fi, + zseg_start_row, + ) in enumerate(zip( + fi_zgaps, + fi_zseg_start_rows, + # fi_zgaps, + # start=1, + )): + assert (zseg_start_row == zero_t[fi]).all() + + absi: int = zseg_start_row['index'][0] + # row = zero_t[fi] + # absi_pre_zseg = row['index'][0] - 1 + absi_pre_zseg = absi - 1 + + if i > 0: + prev_zseg_row = zero_t[fi - 1] + absi_post_zseg = prev_zseg_row['index'][0] + 1 + absi_zsegs[i - 1][1] = absi_post_zseg + + if (i + 1) < fi_zgaps.size: + absi_zsegs.append([ + absi, + None, + ]) + else: + for start, end in absi_zsegs: + assert end + assert start < end + + # import pdbp; pdbp.set_trace() + return ( + absi_zsegs, # start indices of null + absi_zeros, + zero_t, + ) def with_dts( @@ -292,6 +374,17 @@ def dedup_dt( ) +t_unit: Literal = Literal[ + 'days', + 'hours', + 'minutes', + 'seconds', + 'miliseconds', + 'microseconds', + 'nanoseconds', +] + + def detect_time_gaps( df: pl.DataFrame, @@ -406,10 +499,6 @@ def dedupe(src_df: pl.DataFrame) -> tuple[ f'Gaps found:\n{gaps}\n' f'deduped Gaps found:\n{deduped_gaps}' ) - # TODO: rewrite this in polars and/or convert to - # ndarray to detect and remove? - # null_gaps = detect_null_time_gap() - return ( df, gaps, @@ -428,14 +517,19 @@ def sort_diff( list[int], # indices of segments that are out-of-order ]: ser: pl.Series = src_df[col] - - diff: pl.Series = ser.diff() sortd: pl.DataFrame = ser.sort() + diff: pl.Series = ser.diff() + sortd_diff: pl.Series = sortd.diff() i_step_diff = (diff != sortd_diff).arg_true() - if i_step_diff.len(): - import pdbp - pdbp.set_trace() + frame_reorders: int = i_step_diff.len() + if frame_reorders: + log.warn( + f'Resorted frame on col: {col}\n' + f'{frame_reorders}' + + ) + # import pdbp; pdbp.set_trace() # NOTE: thanks to this SO answer for the below conversion routines # to go from numpy struct-arrays to polars dataframes and back: