diff --git a/piker/data/history.py b/piker/data/history.py index dbd7c26b..4b7737b0 100644 --- a/piker/data/history.py +++ b/piker/data/history.py @@ -59,9 +59,10 @@ from ._sampling import ( from .tsp import ( dedupe, get_null_segs, + iter_null_segs, sort_diff, Frame, - Seq, + # Seq, ) from ..brokers._util import ( DataUnavailable, @@ -174,75 +175,71 @@ async def maybe_fill_null_segments( ) -> list[Frame]: frame: Frame = shm.array - - null_segs: tuple | None = get_null_segs( + async for ( + absi_start, absi_end, + fi_start, fi_end, + start_t, end_t, + start_dt, end_dt, + ) in iter_null_segs( frame, - period=timeframe, - ) - if null_segs: - absi_pairs_zsegs: list[list[float, float]] - izeros: Seq - zero_t: Frame + timeframe=timeframe, + ): + + # XXX NOTE: ?if we get a badly ordered timestamp + # pair, immediately stop backfilling? + if ( + start_dt + and end_dt < start_dt + ): + break + ( - absi_pairs_zsegs, - izeros, - zero_t, - ) = null_segs + array, + next_start_dt, + next_end_dt, + ) = await get_hist( + timeframe, + start_dt=start_dt, + end_dt=end_dt, + ) - absi_first: int = frame[0]['index'] - for absi_start, absi_end in absi_pairs_zsegs: - # await tractor.pause() - fi_start = absi_start - absi_first - fi_end = absi_end - absi_first - start_row: Seq = frame[fi_start] - end_row: Seq = frame[fi_end] + # XXX TODO: pretty sure if i plot tsla, btcusdt.binance + # and mnq.cme.ib this causes a Qt crash XXDDD - start_t: float = start_row['time'] - end_t: float = end_row['time'] + # make sure we don't overrun the buffer start + len_to_push: int = min(absi_end, array.size) + to_push: np.ndarray = array[-len_to_push:] - start_dt = from_timestamp(start_t) - end_dt = from_timestamp(end_t) + await shm_push_in_between( + shm, + to_push, + prepend_index=absi_end, + update_start_on_prepend=False, + ) + # TODO: UI side needs IPC event to update.. + # - make sure the UI actually always handles + # this update! + # - remember that in the display side, only refersh this + # if the respective history is actually "in view". + # loop + await sampler_stream.send({ + 'broadcast_all': { - # if we get a badly ordered timestamp - # pair, immediately stop backfilling. - if end_dt < start_dt: - break + # XXX NOTE XXX: see the + # `.ui._display.increment_history_view()` if block + # that looks for this info to FORCE a hard viz + # redraw! + 'backfilling': (mkt.fqme, timeframe), + }, + }) - ( - array, - next_start_dt, - next_end_dt, - ) = await get_hist( - timeframe, - start_dt=start_dt, - end_dt=end_dt, - ) + await tractor.pause() - # XXX TODO: pretty sure if i plot tsla, btcusdt.binance - # and mnq.cme.ib this causes a Qt crash XXDDD - - # make sure we don't overrun the buffer start - len_to_push: int = min(absi_end, array.size) - to_push: np.ndarray = array[-len_to_push:] - - await shm_push_in_between( - shm, - to_push, - prepend_index=absi_end, - update_start_on_prepend=False, - ) - - # TODO: UI side needs IPC event to update.. - # - make sure the UI actually always handles - # this update! - # - remember that in the display side, only refersh this - # if the respective history is actually "in view". - # loop - await sampler_stream.send({ - 'broadcast_all': { - 'backfilling': (mkt.fqme, timeframe), - }, - }) + # TODO: interatively step through any remaining time gaps? + # if ( + # next_end_dt not in frame[ + # ): + # pass # RECHECK for more null-gaps frame: Frame = shm.array diff --git a/piker/data/tsp.py b/piker/data/tsp.py index 34cc794a..8027908b 100644 --- a/piker/data/tsp.py +++ b/piker/data/tsp.py @@ -29,10 +29,17 @@ from math import ( floor, ) import time -from typing import Literal +from typing import ( + Literal, + AsyncGenerator, +) import numpy as np import polars as pl +from pendulum import ( + DateTime, + from_timestamp, +) from ..toolz.profile import ( Profiler, @@ -223,7 +230,10 @@ def get_null_segs( col: str = 'time', ) -> tuple[ - Seq, + # Seq, # TODO: can we make it an array-type instead? + list[ + list[int, int], + ], Seq, Frame ] | None: @@ -285,13 +295,27 @@ def get_null_segs( # select out slice index pairs for each null-segment # portion detected throughout entire input frame. + # import pdbp; pdbp.set_trace() + + # only one null-segment in entire frame? if not fi_zgaps.size: + + # check for number null rows # TODO: use ndarray for this! - absi_zsegs = [[ - absi_zeros[0], # - 1, # - ifirst, - # TODO: need the + 1 or no? - absi_zeros[-1] + 1, # - ifirst, - ]] + if absi_zeros.size > 1: + absi_zsegs = [[ + absi_zeros[0], # - 1, # - ifirst, + # TODO: need the + 1 or no? + absi_zeros[-1] + 1, # - ifirst, + ]] + else: + absi_zsegs = [[ + # absi_zeros[0] + 1, + # see `get_hist()` in backend, should ALWAYS be + # able to handle a `start_dt=None`! + None, + absi_zeros[0] + 1, + ]] else: absi_zsegs.append([ absi_zeros[0] - 1, # - ifirst, @@ -305,15 +329,12 @@ def get_null_segs( ) in enumerate(zip( fi_zgaps, fi_zseg_start_rows, - # fi_zgaps, - # start=1, )): assert (zseg_start_row == zero_t[fi]).all() - absi: int = zseg_start_row['index'][0] # row = zero_t[fi] # absi_pre_zseg = row['index'][0] - 1 - absi_pre_zseg = absi - 1 + # absi_pre_zseg = absi - 1 if i > 0: prev_zseg_row = zero_t[fi - 1] @@ -330,7 +351,6 @@ def get_null_segs( assert end assert start < end - # import pdbp; pdbp.set_trace() return ( absi_zsegs, # start indices of null absi_zeros, @@ -338,6 +358,69 @@ def get_null_segs( ) +async def iter_null_segs( + frame: Frame, + timeframe: float, +) -> AsyncGenerator[ + tuple[ + int, int, + int, int, + float, float, + float, float, + + # Seq, # TODO: can we make it an array-type instead? + # list[ + # list[int, int], + # ], + # Seq, + # Frame + ], + None, +]: + if null_segs := get_null_segs( + frame, + period=timeframe, + ): + absi_pairs_zsegs: list[list[float, float]] + izeros: Seq + zero_t: Frame + ( + absi_pairs_zsegs, + izeros, + zero_t, + ) = null_segs + + absi_first: int = frame[0]['index'] + for ( + absi_start, + absi_end, + ) in absi_pairs_zsegs: + + fi_end: int = absi_end - absi_first + end_row: Seq = frame[fi_end] + end_t: float = end_row['time'] + end_dt: DateTime = from_timestamp(end_t) + + if absi_start is not None: + fi_start: int = absi_start - absi_first + start_row: Seq = frame[fi_start] + start_t: float = start_row['time'] + start_dt: DateTime = from_timestamp(start_t) + + else: + fi_start = None + start_row = None + start_t = None + start_dt = None + + yield ( + absi_start, absi_end, # abs indices + fi_start, fi_end, # relative "frame" indices + start_t, end_t, + start_dt, end_dt, + ) + + def with_dts( df: pl.DataFrame, time_col: str = 'time',