From 14ac351a650fcdc3113c7356e514a9f64f14c09d Mon Sep 17 00:00:00 2001 From: Tyler Goodlet Date: Mon, 29 Sep 2025 16:41:08 -0400 Subject: [PATCH 01/12] Factor to a new `.tsp._history` sub-mod Cleaning out the `piker.tsp` pkg-mod to be only the (re)exports needed for `._anal`/`._history` refs-use elsewhere! --- piker/tsp/__init__.py | 1429 +-------------------------------------- piker/tsp/_history.py | 1471 +++++++++++++++++++++++++++++++++++++++++ 2 files changed, 1477 insertions(+), 1423 deletions(-) create mode 100644 piker/tsp/_history.py diff --git a/piker/tsp/__init__.py b/piker/tsp/__init__.py index 121fcbb7..3c49e71b 100644 --- a/piker/tsp/__init__.py +++ b/piker/tsp/__init__.py @@ -28,1435 +28,18 @@ Historical TSP (time-series processing) lowlevel mgmt machinery and biz logic fo stored offline (in a tsdb). ''' -from __future__ import annotations -from datetime import datetime -from functools import partial -from pathlib import Path -from pprint import pformat -from types import ModuleType -from typing import ( - Callable, - Generator, - TYPE_CHECKING, -) - -import trio -from trio_typing import TaskStatus -import tractor -from pendulum import ( - Interval, - DateTime, - Duration, - duration as mk_duration, - from_timestamp, -) -import numpy as np -import polars as pl - -from piker.brokers import NoData -from piker.accounting import ( - MktPair, -) -from piker.data._util import ( - log, -) -from ..data._sharedmem import ( - maybe_open_shm_array, - ShmArray, -) -from ..data._source import def_iohlcv_fields -from ..data._sampling import ( - open_sample_stream, -) from ._anal import ( - get_null_segs as get_null_segs, - iter_null_segs as iter_null_segs, - Frame as Frame, - Seq as Seq, - # codec-ish - np2pl as np2pl, + # `polars` specific + dedupe as dedupe, + detect_time_gaps as detect_time_gaps, pl2np as pl2np, # `numpy` only slice_from_time as slice_from_time, - - # `polars` specific - dedupe as dedupe, - with_dts as with_dts, - detect_time_gaps as detect_time_gaps, - sort_diff as sort_diff, - - # TODO: - detect_price_gaps as detect_price_gaps ) - -# TODO: break up all this shite into submods! -from ..brokers._util import ( - DataUnavailable, +from ._history import ( + iter_dfs_from_shms as iter_dfs_from_shms, + manage_history as manage_history, ) -from ..storage import TimeseriesNotFound - -if TYPE_CHECKING: - from bidict import bidict - from ..service.marketstore import StorageClient - # from .feed import _FeedsBus - - -# `ShmArray` buffer sizing configuration: -_mins_in_day = int(60 * 24) -# how much is probably dependent on lifestyle -# but we reco a buncha times (but only on a -# run-every-other-day kinda week). -_secs_in_day = int(60 * _mins_in_day) -_days_in_week: int = 7 - -_days_worth: int = 3 -_default_hist_size: int = 6 * 365 * _mins_in_day -_hist_buffer_start = int( - _default_hist_size - round(7 * _mins_in_day) -) - -_default_rt_size: int = _days_worth * _secs_in_day -# NOTE: start the append index in rt buffer such that 1 day's worth -# can be appenened before overrun. -_rt_buffer_start = int((_days_worth - 1) * _secs_in_day) - - -def diff_history( - array: np.ndarray, - append_until_dt: datetime | None = None, - prepend_until_dt: datetime | None = None, - -) -> np.ndarray: - - # no diffing with tsdb dt index possible.. - if ( - prepend_until_dt is None - and append_until_dt is None - ): - return array - - times = array['time'] - - if append_until_dt: - return array[times < append_until_dt.timestamp()] - else: - return array[times >= prepend_until_dt.timestamp()] - - -# TODO: can't we just make this a sync func now? -async def shm_push_in_between( - shm: ShmArray, - to_push: np.ndarray, - prepend_index: int, - - update_start_on_prepend: bool = False, - -) -> int: - # XXX: extremely important, there can be no checkpoints - # in the body of this func to avoid entering new ``frames`` - # values while we're pipelining the current ones to - # memory... - shm.push( - to_push, - prepend=True, - - # XXX: only update the ._first index if no tsdb - # segment was previously prepended by the - # parent task. - update_first=update_start_on_prepend, - - # XXX: only prepend from a manually calculated shm - # index if there was already a tsdb history - # segment prepended (since then the - # ._first.value is going to be wayyy in the - # past!) - start=( - prepend_index - if not update_start_on_prepend - else None - ), - ) - - -async def maybe_fill_null_segments( - shm: ShmArray, - timeframe: float, - get_hist: Callable, - sampler_stream: tractor.MsgStream, - mkt: MktPair, - - task_status: TaskStatus[trio.Event] = trio.TASK_STATUS_IGNORED, - -) -> list[Frame]: - - null_segs_detected = trio.Event() - task_status.started(null_segs_detected) - - frame: Frame = shm.array - - null_segs: tuple | None = get_null_segs( - frame, - period=timeframe, - ) - for ( - absi_start, absi_end, - fi_start, fi_end, - start_t, end_t, - start_dt, end_dt, - ) in iter_null_segs( - null_segs=null_segs, - frame=frame, - timeframe=timeframe, - ): - - # XXX NOTE: ?if we get a badly ordered timestamp - # pair, immediately stop backfilling? - if ( - start_dt - and - end_dt < start_dt - ): - await tractor.pause() - break - - ( - array, - next_start_dt, - next_end_dt, - ) = await get_hist( - timeframe, - start_dt=start_dt, - end_dt=end_dt, - ) - - # XXX TODO: pretty sure if i plot tsla, btcusdt.binance - # and mnq.cme.ib this causes a Qt crash XXDDD - - # make sure we don't overrun the buffer start - len_to_push: int = min(absi_end, array.size) - to_push: np.ndarray = array[-len_to_push:] - - await shm_push_in_between( - shm, - to_push, - prepend_index=absi_end, - update_start_on_prepend=False, - ) - # TODO: UI side needs IPC event to update.. - # - make sure the UI actually always handles - # this update! - # - remember that in the display side, only refersh this - # if the respective history is actually "in view". - # loop - try: - await sampler_stream.send({ - 'broadcast_all': { - - # XXX NOTE XXX: see the - # `.ui._display.increment_history_view()` if block - # that looks for this info to FORCE a hard viz - # redraw! - 'backfilling': (mkt.fqme, timeframe), - }, - }) - except tractor.ContextCancelled: - # log.exception - await tractor.pause() - raise - - null_segs_detected.set() - # RECHECK for more null-gaps - frame: Frame = shm.array - null_segs: tuple | None = get_null_segs( - frame, - period=timeframe, - ) - if ( - null_segs - and - len(null_segs[-1]) - ): - ( - iabs_slices, - iabs_zero_rows, - zero_t, - ) = null_segs - log.warning( - f'{len(iabs_slices)} NULL TIME SEGMENTS DETECTED!\n' - f'{pformat(iabs_slices)}' - ) - - # TODO: always backfill gaps with the earliest (price) datum's - # value to avoid the y-ranger including zeros and completely - # stretching the y-axis.. - # array: np.ndarray = shm.array - # zeros = array[array['low'] == 0] - ohlc_fields: list[str] = [ - 'open', - 'high', - 'low', - 'close', - ] - - for istart, istop in iabs_slices: - - # get view into buffer for null-segment - gap: np.ndarray = shm._array[istart:istop] - - # copy the oldest OHLC samples forward - cls: float = shm._array[istart]['close'] - - # TODO: how can we mark this range as being a gap tho? - # -[ ] maybe pg finally supports nulls in ndarray to - # show empty space somehow? - # -[ ] we could put a special value in the vlm or - # another col/field to denote? - gap[ohlc_fields] = cls - - start_t: float = shm._array[istart]['time'] - t_diff: float = (istop - istart)*timeframe - - gap['time'] = np.arange( - start=start_t, - stop=start_t + t_diff, - step=timeframe, - ) - - # TODO: reimpl using the new `.ui._remote_ctl` ctx - # ideally using some kinda decent - # tractory-reverse-lookup-connnection from some other - # `Context` type thingy? - await sampler_stream.send({ - 'broadcast_all': { - - # XXX NOTE XXX: see the - # `.ui._display.increment_history_view()` if block - # that looks for this info to FORCE a hard viz - # redraw! - 'backfilling': (mkt.fqme, timeframe), - }, - }) - - # TODO: interatively step through any remaining - # time-gaps/null-segments and spawn piecewise backfiller - # tasks in a nursery? - # -[ ] not sure that's going to work so well on the ib - # backend but worth a shot? - # -[ ] mk new history connections to make it properly - # parallel possible no matter the backend? - # -[ ] fill algo: do queries in alternating "latest, then - # earliest, then latest.. etc?" - - -async def start_backfill( - get_hist, - def_frame_duration: Duration, - mod: ModuleType, - mkt: MktPair, - shm: ShmArray, - timeframe: float, - - backfill_from_shm_index: int, - backfill_from_dt: datetime, - - sampler_stream: tractor.MsgStream, - - backfill_until_dt: datetime | None = None, - storage: StorageClient | None = None, - - write_tsdb: bool = True, - - task_status: TaskStatus[tuple] = trio.TASK_STATUS_IGNORED, - -) -> int: - - # let caller unblock and deliver latest history frame - # and use to signal that backfilling the shm gap until - # the tsdb end is complete! - bf_done = trio.Event() - task_status.started(bf_done) - - # based on the sample step size, maybe load a certain amount history - update_start_on_prepend: bool = False - if backfill_until_dt is None: - - # TODO: per-provider default history-durations? - # -[ ] inside the `open_history_client()` config allow - # declaring the history duration limits instead of - # guessing and/or applying the same limits to all? - # - # -[ ] allow declaring (default) per-provider backfill - # limits inside a [storage] sub-section in conf.toml? - # - # NOTE, when no tsdb "last datum" is provided, we just - # load some near-term history by presuming a "decently - # large" 60s duration limit and a much shorter 1s range. - periods = { - 1: {'days': 2}, - 60: {'years': 6}, - } - period_duration: int = periods[timeframe] - update_start_on_prepend: bool = True - - # NOTE: manually set the "latest" datetime which we intend to - # backfill history "until" so as to adhere to the history - # settings above when the tsdb is detected as being empty. - backfill_until_dt = backfill_from_dt.subtract(**period_duration) - - # STAGE NOTE: "backward history gap filling": - # - we push to the shm buffer until we have history back - # until the latest entry loaded from the tsdb's table B) - # - after this loop continue to check for other gaps in the - # (tsdb) history and (at least report) maybe fill them - # from new frame queries to the backend? - last_start_dt: datetime = backfill_from_dt - next_prepend_index: int = backfill_from_shm_index - - while last_start_dt > backfill_until_dt: - log.info( - f'Requesting {timeframe}s frame:\n' - f'backfill_until_dt: {backfill_until_dt}\n' - f'last_start_dt: {last_start_dt}\n' - ) - try: - ( - array, - next_start_dt, - next_end_dt, - ) = await get_hist( - timeframe, - end_dt=last_start_dt, - ) - except NoData as _daterr: - orig_last_start_dt: datetime = last_start_dt - gap_report: str = ( - f'EMPTY FRAME for `end_dt: {last_start_dt}`?\n' - f'{mod.name} -> tf@fqme: {timeframe}@{mkt.fqme}\n' - f'last_start_dt: {orig_last_start_dt}\n\n' - f'bf_until: {backfill_until_dt}\n' - ) - # EMPTY FRAME signal with 3 (likely) causes: - # - # 1. range contains legit gap in venue history - # 2. history actually (edge case) **began** at the - # value `last_start_dt` - # 3. some other unknown error (ib blocking the - # history-query bc they don't want you seeing how - # they cucked all the tinas.. like with options - # hist) - # - if def_frame_duration: - # decrement by a duration's (frame) worth of time - # as maybe indicated by the backend to see if we - # can get older data before this possible - # "history gap". - last_start_dt: datetime = last_start_dt.subtract( - seconds=def_frame_duration.total_seconds() - ) - gap_report += ( - f'Decrementing `end_dt` and retrying with,\n' - f'def_frame_duration: {def_frame_duration}\n' - f'(new) last_start_dt: {last_start_dt}\n' - ) - log.warning(gap_report) - # skip writing to shm/tsdb and try the next - # duration's worth of prior history. - continue - - else: - # await tractor.pause() - raise DataUnavailable(gap_report) - - # broker says there never was or is no more history to pull - except DataUnavailable as due: - message: str = due.args[0] - log.warning( - f'Provider {mod.name!r} halted backfill due to,\n\n' - - f'{message}\n' - - f'fqme: {mkt.fqme}\n' - f'timeframe: {timeframe}\n' - f'last_start_dt: {last_start_dt}\n' - f'bf_until: {backfill_until_dt}\n' - ) - # UGH: what's a better way? - # TODO: backends are responsible for being correct on - # this right!? - # -[ ] in the `ib` case we could maybe offer some way - # to halt the request loop until the condition is - # resolved or should the backend be entirely in - # charge of solving such faults? yes, right? - return - - time: np.ndarray = array['time'] - assert ( - time[0] - == - next_start_dt.timestamp() - ) - - assert time[-1] == next_end_dt.timestamp() - - expected_dur: Interval = last_start_dt - next_start_dt - - # frame's worth of sample-period-steps, in seconds - frame_size_s: float = len(array) * timeframe - recv_frame_dur: Duration = ( - from_timestamp(array[-1]['time']) - - - from_timestamp(array[0]['time']) - ) - if ( - (lt_frame := (recv_frame_dur < expected_dur)) - or - (null_frame := (frame_size_s == 0)) - # ^XXX, should NEVER hit now! - ): - # XXX: query result includes a start point prior to our - # expected "frame size" and thus is likely some kind of - # history gap (eg. market closed period, outage, etc.) - # so just report it to console for now. - if lt_frame: - reason = 'Possible GAP (or first-datum)' - else: - assert null_frame - reason = 'NULL-FRAME' - - missing_dur: Interval = expected_dur.end - recv_frame_dur.end - log.warning( - f'{timeframe}s-series {reason} detected!\n' - f'fqme: {mkt.fqme}\n' - f'last_start_dt: {last_start_dt}\n\n' - f'recv interval: {recv_frame_dur}\n' - f'expected interval: {expected_dur}\n\n' - - f'Missing duration of history of {missing_dur.in_words()!r}\n' - f'{missing_dur}\n' - ) - # await tractor.pause() - - to_push = diff_history( - array, - prepend_until_dt=backfill_until_dt, - ) - ln: int = len(to_push) - if ln: - log.info( - f'{ln} bars for {next_start_dt} -> {last_start_dt}' - ) - - else: - log.warning( - '0 BARS TO PUSH after diff!?\n' - f'{next_start_dt} -> {last_start_dt}' - ) - - # bail gracefully on shm allocation overrun/full - # condition - try: - await shm_push_in_between( - shm, - to_push, - prepend_index=next_prepend_index, - update_start_on_prepend=update_start_on_prepend, - ) - await sampler_stream.send({ - 'broadcast_all': { - 'backfilling': (mkt.fqme, timeframe), - }, - }) - - # decrement next prepend point - next_prepend_index = next_prepend_index - ln - last_start_dt = next_start_dt - - except ValueError as ve: - _ve = ve - log.error( - f'Shm prepend OVERRUN on: {next_start_dt} -> {last_start_dt}?' - ) - - if next_prepend_index < ln: - log.warning( - f'Shm buffer can only hold {next_prepend_index} more rows..\n' - f'Appending those from recent {ln}-sized frame, no more!' - ) - - to_push = to_push[-next_prepend_index + 1:] - await shm_push_in_between( - shm, - to_push, - prepend_index=next_prepend_index, - update_start_on_prepend=update_start_on_prepend, - ) - await sampler_stream.send({ - 'broadcast_all': { - 'backfilling': (mkt.fqme, timeframe), - }, - }) - - # can't push the entire frame? so - # push only the amount that can fit.. - break - - log.info( - f'Shm pushed {ln} frame:\n' - f'{next_start_dt} -> {last_start_dt}' - ) - - # FINALLY, maybe write immediately to the tsdb backend for - # long-term storage. - if ( - storage is not None - and - write_tsdb - ): - log.info( - f'Writing {ln} frame to storage:\n' - f'{next_start_dt} -> {last_start_dt}' - ) - - # NOTE, always drop the src asset token for - # non-currency-pair like market types (for now) - # - # THAT IS, for now our table key schema is NOT - # including the dst[/src] source asset token. SO, - # 'tsla.nasdaq.ib' over 'tsla/usd.nasdaq.ib' for - # historical reasons ONLY. - if mkt.dst.atype not in { - 'crypto', - 'crypto_currency', - 'fiat', # a "forex pair" - 'perpetual_future', # stupid "perps" from cex land - }: - col_sym_key: str = mkt.get_fqme( - delim_char='', - without_src=True, - ) - else: - col_sym_key: str = mkt.get_fqme( - delim_char='', - ) - - await storage.write_ohlcv( - col_sym_key, - shm.array, - timeframe, - ) - df: pl.DataFrame = await storage.as_df( - fqme=mkt.fqme, - period=timeframe, - load_from_offline=False, - ) - ( - wdts, - deduped, - diff, - ) = dedupe(df) - # if diff: - # sort_diff(df) - - else: - # finally filled gap - log.info( - f'Finished filling gap to tsdb start @ {backfill_until_dt}!' - ) - - # XXX: extremely important, there can be no checkpoints - # in the block above to avoid entering new ``frames`` - # values while we're pipelining the current ones to - # memory... - # await sampler_stream.send('broadcast_all') - - # short-circuit (for now) - bf_done.set() - - -# NOTE: originally this was used to cope with a tsdb (marketstore) -# which could not delivery very large frames of history over gRPC -# (thanks goolag) due to corruption issues. NOW, using apache -# parquet (by default in the local filesys) we don't have this -# requirement since the files can be loaded very quickly in -# entirety to memory via -async def back_load_from_tsdb( - storemod: ModuleType, - storage: StorageClient, - - fqme: str, - - tsdb_history: np.ndarray, - - last_tsdb_dt: datetime, - latest_start_dt: datetime, - latest_end_dt: datetime, - - bf_done: trio.Event, - - timeframe: int, - shm: ShmArray, -): - assert len(tsdb_history) - - # sync to backend history task's query/load completion - # if bf_done: - # await bf_done.wait() - - # TODO: eventually it'd be nice to not require a shm array/buffer - # to accomplish this.. maybe we can do some kind of tsdb direct to - # graphics format eventually in a child-actor? - if storemod.name == 'nativedb': - return - - await tractor.pause() - assert shm._first.value == 0 - - array = shm.array - - # if timeframe == 1: - # times = shm.array['time'] - # assert (times[1] - times[0]) == 1 - - if len(array): - shm_last_dt = from_timestamp( - shm.array[0]['time'] - ) - else: - shm_last_dt = None - - if last_tsdb_dt: - assert shm_last_dt >= last_tsdb_dt - - # do diff against start index of last frame of history and only - # fill in an amount of datums from tsdb allows for most recent - # to be loaded into mem *before* tsdb data. - if ( - last_tsdb_dt - and latest_start_dt - ): - backfilled_size_s: Duration = ( - latest_start_dt - last_tsdb_dt - ).seconds - # if the shm buffer len is not large enough to contain - # all missing data between the most recent backend-queried frame - # and the most recent dt-index in the db we warn that we only - # want to load a portion of the next tsdb query to fill that - # space. - log.info( - f'{backfilled_size_s} seconds worth of {timeframe}s loaded' - ) - - # Load TSDB history into shm buffer (for display) if there is - # remaining buffer space. - - time_key: str = 'time' - if getattr(storemod, 'ohlc_key_map', False): - keymap: bidict = storemod.ohlc_key_map - time_key: str = keymap.inverse['time'] - - # if ( - # not len(tsdb_history) - # ): - # return - - tsdb_last_frame_start: datetime = last_tsdb_dt - # load as much from storage into shm possible (depends on - # user's shm size settings). - while shm._first.value > 0: - - tsdb_history = await storage.read_ohlcv( - fqme, - timeframe=timeframe, - end=tsdb_last_frame_start, - ) - - # # empty query - # if not len(tsdb_history): - # break - - next_start = tsdb_history[time_key][0] - if next_start >= tsdb_last_frame_start: - # no earlier data detected - break - - else: - tsdb_last_frame_start = next_start - - # TODO: see if there's faster multi-field reads: - # https://numpy.org/doc/stable/user/basics.rec.html#accessing-multiple-fields - # re-index with a `time` and index field - prepend_start = shm._first.value - - to_push = tsdb_history[-prepend_start:] - shm.push( - to_push, - - # insert the history pre a "days worth" of samples - # to leave some real-time buffer space at the end. - prepend=True, - # update_first=False, - # start=prepend_start, - field_map=storemod.ohlc_key_map, - ) - - log.info(f'Loaded {to_push.shape} datums from storage') - tsdb_last_frame_start = tsdb_history[time_key][0] - - # manually trigger step update to update charts/fsps - # which need an incremental update. - # NOTE: the way this works is super duper - # un-intuitive right now: - # - the broadcaster fires a msg to the fsp subsystem. - # - fsp subsys then checks for a sample step diff and - # possibly recomputes prepended history. - # - the fsp then sends back to the parent actor - # (usually a chart showing graphics for said fsp) - # which tells the chart to conduct a manual full - # graphics loop cycle. - # await sampler_stream.send('broadcast_all') - - -async def push_latest_frame( - # box-type only that should get packed with the datetime - # objects received for the latest history frame - dt_eps: list[DateTime, DateTime], - shm: ShmArray, - get_hist: Callable[ - [int, datetime, datetime], - tuple[np.ndarray, str] - ], - timeframe: float, - config: dict, - - task_status: TaskStatus[ - Exception | list[datetime, datetime] - ] = trio.TASK_STATUS_IGNORED, - -) -> list[datetime, datetime] | None: - # get latest query's worth of history all the way - # back to what is recorded in the tsdb - try: - ( - array, - mr_start_dt, - mr_end_dt, - ) = await get_hist( - timeframe, - end_dt=None, - ) - # so caller can access these ep values - dt_eps.extend([ - mr_start_dt, - mr_end_dt, - ]) - task_status.started(dt_eps) - - # XXX: timeframe not supported for backend (since - # above exception type), terminate immediately since - # there's no backfilling possible. - except DataUnavailable: - task_status.started(None) - - if timeframe > 1: - await tractor.pause() - - # prolly tf not supported - return None - - # NOTE: on the first history, most recent history - # frame we PREPEND from the current shm ._last index - # and thus a gap between the earliest datum loaded here - # and the latest loaded from the tsdb may exist! - log.info(f'Pushing {array.size} to shm!') - shm.push( - array, - prepend=True, # append on first frame - ) - - return dt_eps - - -async def load_tsdb_hist( - storage: StorageClient, - mkt: MktPair, - timeframe: float, - - task_status: TaskStatus[None] = trio.TASK_STATUS_IGNORED, - -) -> tuple[ - np.ndarray, - DateTime, - DateTime, -] | None: - # loads a (large) frame of data from the tsdb depending - # on the db's query size limit; our "nativedb" (using - # parquet) generally can load the entire history into mem - # but if not then below the remaining history can be lazy - # loaded? - fqme: str = mkt.fqme - tsdb_entry: tuple[ - np.ndarray, - DateTime, - DateTime, - ] - try: - tsdb_entry: tuple | None = await storage.load( - fqme, - timeframe=timeframe, - ) - return tsdb_entry - - except TimeseriesNotFound: - log.warning( - f'No timeseries yet for {timeframe}@{fqme}' - ) - return None - - -async def tsdb_backfill( - mod: ModuleType, - storemod: ModuleType, - - storage: StorageClient, - mkt: MktPair, - shm: ShmArray, - timeframe: float, - - sampler_stream: tractor.MsgStream, - - task_status: TaskStatus[ - tuple[ShmArray, ShmArray] - ] = trio.TASK_STATUS_IGNORED, - -) -> None: - - if timeframe not in (1, 60): - raise ValueError( - '`piker` only needs to support 1m and 1s sampling ' - 'but ur api is trying to deliver a longer ' - f'timeframe of {timeframe} seconds..\n' - 'So yuh.. dun do dat brudder.' - ) - - get_hist: Callable[ - [int, datetime, datetime], - tuple[np.ndarray, str] - ] - config: dict[str, int] - async with ( - mod.open_history_client( - mkt, - ) as (get_hist, config), - - # NOTE: this sub-nursery splits to tasks for the given - # sampling rate to concurrently load offline tsdb - # timeseries as well as new data from the venue backend! - ): - log.info( - f'`{mod}` history client returned backfill config:\n' - f'{pformat(config)}\n' - ) - - # concurrently load the provider's most-recent-frame AND any - # pre-existing tsdb history already saved in `piker` storage. - dt_eps: list[DateTime, DateTime] = [] - async with ( - tractor.trionics.collapse_eg(), - trio.open_nursery() as tn - ): - tn.start_soon( - push_latest_frame, - dt_eps, - shm, - get_hist, - timeframe, - config, - ) - tsdb_entry: tuple = await load_tsdb_hist( - storage, - mkt, - timeframe, - ) - - # tell parent task to continue - # TODO: really we'd want this the other way with the - # tsdb load happening asap and the since the latest - # frame query will normally be the main source of - # latency? - task_status.started() - - # NOTE: iabs to start backfilling from, reverse chronological, - # ONLY AFTER the first history frame has been pushed to - # mem! - backfill_gap_from_shm_index: int = shm._first.value + 1 - - # Prepend any tsdb history into the rt-shm-buffer which - # should NOW be getting filled with the most recent history - # pulled from the data-backend. - if dt_eps: - # well then, unpack the latest (gap) backfilled frame dts - ( - mr_start_dt, - mr_end_dt, - ) = dt_eps - - first_frame_dur_s: Duration = (mr_end_dt - mr_start_dt).seconds - calced_frame_size: Duration = mk_duration( - seconds=first_frame_dur_s, - ) - # NOTE, attempt to use the backend declared default frame - # sizing (as allowed by their time-series query APIs) and - # if not provided try to construct a default from the - # first frame received above. - def_frame_durs: dict[ - int, - Duration, - ]|None = config.get('frame_types', None) - - if def_frame_durs: - def_frame_size: Duration = def_frame_durs[timeframe] - - if def_frame_size != calced_frame_size: - log.warning( - f'Expected frame size {def_frame_size}\n' - f'Rxed frame {calced_frame_size}\n' - ) - # await tractor.pause() - else: - # use what we calced from first frame above. - def_frame_size = calced_frame_size - - # NOTE: when there's no offline data, there's 2 cases: - # - data backend doesn't support timeframe/sample - # period (in which case `dt_eps` should be `None` and - # we shouldn't be here!), or - # - no prior history has been stored (yet) and we need - # todo full backfill of the history now. - if tsdb_entry is None: - # indicate to backfill task to fill the whole - # shm buffer as much as it can! - last_tsdb_dt = None - - # there's existing tsdb history from (offline) storage - # so only backfill the gap between the - # most-recent-frame (mrf) and that latest sample. - else: - ( - tsdb_history, - first_tsdb_dt, - last_tsdb_dt, - ) = tsdb_entry - - # if there is a gap to backfill from the first - # history frame until the last datum loaded from the tsdb - # continue that now in the background - async with trio.open_nursery( - strict_exception_groups=False, - ) as tn: - - bf_done = await tn.start( - partial( - start_backfill, - get_hist=get_hist, - def_frame_duration=def_frame_size, - mod=mod, - mkt=mkt, - shm=shm, - timeframe=timeframe, - - backfill_from_shm_index=backfill_gap_from_shm_index, - backfill_from_dt=mr_start_dt, - - sampler_stream=sampler_stream, - backfill_until_dt=last_tsdb_dt, - - storage=storage, - write_tsdb=True, - ) - ) - nulls_detected: trio.Event | None = None - if last_tsdb_dt is not None: - # calc the index from which the tsdb data should be - # prepended, presuming there is a gap between the - # latest frame (loaded/read above) and the latest - # sample loaded from the tsdb. - backfill_diff: Duration = mr_start_dt - last_tsdb_dt - offset_s: float = backfill_diff.in_seconds() - - # XXX EDGE CASEs: the most recent frame overlaps with - # prior tsdb history!! - # - so the latest frame's start time is earlier then - # the tsdb's latest sample. - # - alternatively this may also more generally occur - # when the venue was closed (say over the weeknd) - # causing a timeseries gap, AND the query frames size - # (eg. for ib's 1s we rx 2k datums ~= 33.33m) IS - # GREATER THAN the current venue-market's operating - # session (time) we will receive datums from BEFORE THE - # CLOSURE GAP and thus the `offset_s` value will be - # NEGATIVE! In this case we need to ensure we don't try - # to push datums that have already been recorded in the - # tsdb. In this case we instead only retreive and push - # the series portion missing from the db's data set. - # if offset_s < 0: - # non_overlap_diff: Duration = mr_end_dt - last_tsdb_dt - # non_overlap_offset_s: float = backfill_diff.in_seconds() - - offset_samples: int = round(offset_s / timeframe) - - # TODO: see if there's faster multi-field reads: - # https://numpy.org/doc/stable/user/basics.rec.html#accessing-multiple-fields - # re-index with a `time` and index field - if offset_s > 0: - # NOTE XXX: ONLY when there is an actual gap - # between the earliest sample in the latest history - # frame do we want to NOT stick the latest tsdb - # history adjacent to that latest frame! - prepend_start = shm._first.value - offset_samples + 1 - to_push = tsdb_history[-prepend_start:] - else: - # when there is overlap we want to remove the - # overlapping samples from the tsdb portion (taking - # instead the latest frame's values since THEY - # SHOULD BE THE SAME) and prepend DIRECTLY adjacent - # to the latest frame! - # TODO: assert the overlap segment array contains - # the same values!?! - prepend_start = shm._first.value - to_push = tsdb_history[-(shm._first.value):offset_samples - 1] - - # tsdb history is so far in the past we can't fit it in - # shm buffer space so simply don't load it! - if prepend_start > 0: - shm.push( - to_push, - - # insert the history pre a "days worth" of samples - # to leave some real-time buffer space at the end. - prepend=True, - # update_first=False, - start=prepend_start, - field_map=storemod.ohlc_key_map, - ) - - log.info(f'Loaded {to_push.shape} datums from storage') - - # NOTE: ASYNC-conduct tsdb timestamp gap detection and backfill any - # seemingly missing (null-time) segments.. - # TODO: ideally these can never exist! - # -[ ] somehow it seems sometimes we're writing zero-ed - # segments to tsdbs during teardown? - # -[ ] can we ensure that the backcfiller tasks do this - # work PREVENTAVELY instead? - # -[ ] fill in non-zero epoch time values ALWAYS! - # await maybe_fill_null_segments( - nulls_detected: trio.Event = await tn.start(partial( - maybe_fill_null_segments, - - shm=shm, - timeframe=timeframe, - get_hist=get_hist, - sampler_stream=sampler_stream, - mkt=mkt, - )) - - # 2nd nursery END - - # TODO: who would want to? - if nulls_detected: - await nulls_detected.wait() - - await bf_done.wait() - # TODO: maybe start history anal and load missing "history - # gaps" via backend.. - - # if len(hist_shm.array) < 2: - # TODO: there's an edge case here to solve where if the last - # frame before market close (at least on ib) was pushed and - # there was only "1 new" row pushed from the first backfill - # query-iteration, then the sample step sizing calcs will - # break upstream from here since you can't diff on at least - # 2 steps... probably should also add logic to compute from - # the tsdb series and stash that somewhere as meta data on - # the shm buffer?.. no se. - - # backload any further data from tsdb (concurrently per - # timeframe) if not all data was able to be loaded (in memory) - # from the ``StorageClient.load()`` call above. - await trio.sleep_forever() - - # XXX NOTE: this is legacy from when we were using - # marketstore and we needed to continue backloading - # incrementally from the tsdb client.. (bc it couldn't - # handle a single large query with gRPC for some - # reason.. classic goolag pos) - # tn.start_soon( - # back_load_from_tsdb, - - # storemod, - # storage, - # fqme, - - # tsdb_history, - # last_tsdb_dt, - # mr_start_dt, - # mr_end_dt, - # bf_done, - - # timeframe, - # shm, - # ) - - -async def manage_history( - mod: ModuleType, - mkt: MktPair, - some_data_ready: trio.Event, - feed_is_live: trio.Event, - timeframe: float = 60, # in seconds - - task_status: TaskStatus[ - tuple[ShmArray, ShmArray] - ] = trio.TASK_STATUS_IGNORED, - -) -> None: - ''' - Load and manage historical data including the loading of any - available series from any connected tsdb as well as conduct - real-time update of both that existing db and the allocated - shared memory buffer. - - Init sequence: - - allocate shm (numpy array) buffers for 60s & 1s sample rates - - configure "zero index" for each buffer: the index where - history will prepended *to* and new live data will be - appened *from*. - - open a ``.storage.StorageClient`` and load any existing tsdb - history as well as (async) start a backfill task which loads - missing (newer) history from the data provider backend: - - tsdb history is loaded first and pushed to shm ASAP. - - the backfill task loads the most recent history before - unblocking its parent task, so that the `ShmArray._last` is - up to date to allow the OHLC sampler to begin writing new - samples as the correct buffer index once the provider feed - engages. - - ''' - # TODO: is there a way to make each shm file key - # actor-tree-discovery-addr unique so we avoid collisions - # when doing tests which also allocate shms for certain instruments - # that may be in use on the system by some other running daemons? - # from tractor._state import _runtime_vars - # port = _runtime_vars['_root_mailbox'][1] - - uid: tuple = tractor.current_actor().uid - name, uuid = uid - service: str = name.rstrip(f'.{mod.name}') - fqme: str = mkt.get_fqme(delim_char='') - - # (maybe) allocate shm array for this broker/symbol which will - # be used for fast near-term history capture and processing. - hist_shm, opened = maybe_open_shm_array( - size=_default_hist_size, - append_start_index=_hist_buffer_start, - - key=f'piker.{service}[{uuid[:16]}].{fqme}.hist', - - # use any broker defined ohlc dtype: - dtype=getattr(mod, '_ohlc_dtype', def_iohlcv_fields), - - # we expect the sub-actor to write - readonly=False, - ) - hist_zero_index = hist_shm.index - 1 - - # TODO: history validation - if not opened: - raise RuntimeError( - "Persistent shm for sym was already open?!" - ) - - rt_shm, opened = maybe_open_shm_array( - size=_default_rt_size, - append_start_index=_rt_buffer_start, - key=f'piker.{service}[{uuid[:16]}].{fqme}.rt', - - # use any broker defined ohlc dtype: - dtype=getattr(mod, '_ohlc_dtype', def_iohlcv_fields), - - # we expect the sub-actor to write - readonly=False, - ) - - # (for now) set the rt (hft) shm array with space to prepend - # only a few days worth of 1s history. - days: int = 2 - start_index: int = days*_secs_in_day - rt_shm._first.value = start_index - rt_shm._last.value = start_index - rt_zero_index = rt_shm.index - 1 - - if not opened: - raise RuntimeError( - "Persistent shm for sym was already open?!" - ) - - open_history_client = getattr( - mod, - 'open_history_client', - ) - assert open_history_client - - # TODO: maybe it should be a subpkg of `.data`? - from piker import storage - - async with ( - storage.open_storage_client() as (storemod, client), - - # NOTE: this nursery spawns a task per "timeframe" (aka - # sampling period) data set since normally differently - # sampled timeseries can be loaded / process independently - # ;) - tractor.trionics.collapse_eg(), - trio.open_nursery() as tn, - ): - log.info( - f'Connecting to storage backend `{storemod.name}`:\n' - f'location: {client.address}\n' - f'db cardinality: {client.cardinality}\n' - # TODO: show backend config, eg: - # - network settings - # - storage size with compression - # - number of loaded time series? - ) - - # NOTE: this call ONLY UNBLOCKS once the latest-most frame - # (i.e. history just before the live feed latest datum) of - # history has been loaded and written to the shm buffer: - # - the backfiller task can write in reverse chronological - # to the shm and tsdb - # - the tsdb data can be loaded immediately and the - # backfiller can do a single append from it's end datum and - # then prepends backward to that from the current time - # step. - tf2mem: dict = { - 1: rt_shm, - 60: hist_shm, - } - async with open_sample_stream( - period_s=1., - shms_by_period={ - 1.: rt_shm.token, - 60.: hist_shm.token, - }, - - # NOTE: we want to only open a stream for doing - # broadcasts on backfill operations, not receive the - # sample index-stream (since there's no code in this - # data feed layer that needs to consume it). - open_index_stream=True, - sub_for_broadcasts=False, - - ) as sample_stream: - # register 1s and 1m buffers with the global - # incrementer task - log.info(f'Connected to sampler stream: {sample_stream}') - - for timeframe in [60, 1]: - await tn.start(partial( - tsdb_backfill, - mod=mod, - storemod=storemod, - storage=client, - mkt=mkt, - shm=tf2mem[timeframe], - timeframe=timeframe, - sampler_stream=sample_stream, - )) - - # indicate to caller that feed can be delivered to - # remote requesting client since we've loaded history - # data that can be used. - some_data_ready.set() - - # wait for a live feed before starting the sampler. - await feed_is_live.wait() - - # yield back after client connect with filled shm - task_status.started(( - hist_zero_index, - hist_shm, - rt_zero_index, - rt_shm, - )) - - # history retreival loop depending on user interaction - # and thus a small RPC-prot for remotely controllinlg - # what data is loaded for viewing. - await trio.sleep_forever() - - -def iter_dfs_from_shms( - fqme: str -) -> Generator[ - tuple[Path, ShmArray, pl.DataFrame], - None, - None, -]: - # shm buffer size table based on known sample rates - sizes: dict[str, int] = { - 'hist': _default_hist_size, - 'rt': _default_rt_size, - } - - # load all detected shm buffer files which have the - # passed FQME pattern in the file name. - shmfiles: list[Path] = [] - shmdir = Path('/dev/shm/') - - for shmfile in shmdir.glob(f'*{fqme}*'): - filename: str = shmfile.name - - # skip index files - if ( - '_first' in filename - or '_last' in filename - ): - continue - - assert shmfile.is_file() - log.debug(f'Found matching shm buffer file: {filename}') - shmfiles.append(shmfile) - - for shmfile in shmfiles: - - # lookup array buffer size based on file suffix - # being either .rt or .hist - key: str = shmfile.name.rsplit('.')[-1] - - # skip FSP buffers for now.. - if key not in sizes: - continue - - size: int = sizes[key] - - # attach to any shm buffer, load array into polars df, - # write to local parquet file. - shm, opened = maybe_open_shm_array( - key=shmfile.name, - size=size, - dtype=def_iohlcv_fields, - readonly=True, - ) - assert not opened - ohlcv: np.ndarray = shm.array - df: pl.DataFrame = np2pl(ohlcv) - - yield ( - shmfile, - shm, - df, - ) diff --git a/piker/tsp/_history.py b/piker/tsp/_history.py new file mode 100644 index 00000000..a4ee04c2 --- /dev/null +++ b/piker/tsp/_history.py @@ -0,0 +1,1471 @@ +# piker: trading gear for hackers +# Copyright (C) Tyler Goodlet (in stewardship for pikers) + +# This program is free software: you can redistribute it and/or +# modify it under the terms of the GNU Affero General Public +# License as published by the Free Software Foundation, either +# version 3 of the License, or (at your option) any later version. + +# This program is distributed in the hope that it will be useful, +# but WITHOUT ANY WARRANTY; without even the implied warranty of +# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU +# Affero General Public License for more details. + +# You should have received a copy of the GNU Affero General Public +# License along with this program. If not, see +# . + +''' +Historical TSP (time-series processing) lowlevel mgmt machinery and biz logic for, + +- hi-level biz logics using the `.storage` subpkg APIs for (I/O) + orchestration and mgmt of tsdb data sets. +- core data-provider history backfilling middleware (as task-funcs) via + (what will eventually be `datad`, but are rn is the) `.brokers` backend + APIs. +- various data set cleaning, repairing and issue-detection/analysis + routines to ensure consistent series whether in shm or when + stored offline (in a tsdb). + +''' +from __future__ import annotations +from datetime import datetime +from functools import partial +from pathlib import Path +from pprint import pformat +from types import ModuleType +from typing import ( + Callable, + Generator, + TYPE_CHECKING, +) + +import trio +from trio_typing import TaskStatus +import tractor +from pendulum import ( + Interval, + DateTime, + Duration, + duration as mk_duration, + from_timestamp, +) +import numpy as np +import polars as pl + +from piker.brokers import NoData +from piker.accounting import ( + MktPair, +) +from piker.data._util import ( + log, +) +from ..data._sharedmem import ( + maybe_open_shm_array, + ShmArray, +) +from ..data._source import def_iohlcv_fields +from ..data._sampling import ( + open_sample_stream, +) + + +from piker.brokers._util import ( + DataUnavailable, +) +from piker.storage import TimeseriesNotFound +from ._anal import ( + + dedupe, + get_null_segs, + iter_null_segs, + Frame, + + # codec-ish + np2pl as np2pl, + + # `polars` specific + # with_dts, + # sort_diff, + + # TODO, use this to correct conc-issues during backfill? + # detect_price_gaps, +) + +if TYPE_CHECKING: + from bidict import bidict + from ..service.marketstore import StorageClient + # from .feed import _FeedsBus + + +# `ShmArray` buffer sizing configuration: +_mins_in_day = int(60 * 24) +# how much is probably dependent on lifestyle +# but we reco a buncha times (but only on a +# run-every-other-day kinda week). +_secs_in_day = int(60 * _mins_in_day) +_days_in_week: int = 7 + +_days_worth: int = 3 +_default_hist_size: int = 6 * 365 * _mins_in_day +_hist_buffer_start = int( + _default_hist_size - round(7 * _mins_in_day) +) + +_default_rt_size: int = _days_worth * _secs_in_day +# NOTE: start the append index in rt buffer such that 1 day's worth +# can be appenened before overrun. +_rt_buffer_start = int((_days_worth - 1) * _secs_in_day) + + +def diff_history( + array: np.ndarray, + append_until_dt: datetime | None = None, + prepend_until_dt: datetime | None = None, + +) -> np.ndarray: + + # no diffing with tsdb dt index possible.. + if ( + prepend_until_dt is None + and append_until_dt is None + ): + return array + + times = array['time'] + + if append_until_dt: + return array[times < append_until_dt.timestamp()] + else: + return array[times >= prepend_until_dt.timestamp()] + + +# TODO: can't we just make this a sync func now? +async def shm_push_in_between( + shm: ShmArray, + to_push: np.ndarray, + prepend_index: int, + + update_start_on_prepend: bool = False, + +) -> int: + # XXX: extremely important, there can be no checkpoints + # in the body of this func to avoid entering new ``frames`` + # values while we're pipelining the current ones to + # memory... + shm.push( + to_push, + prepend=True, + + # XXX: only update the ._first index if no tsdb + # segment was previously prepended by the + # parent task. + update_first=update_start_on_prepend, + + # XXX: only prepend from a manually calculated shm + # index if there was already a tsdb history + # segment prepended (since then the + # ._first.value is going to be wayyy in the + # past!) + start=( + prepend_index + if not update_start_on_prepend + else None + ), + ) + + +async def maybe_fill_null_segments( + shm: ShmArray, + timeframe: float, + get_hist: Callable, + sampler_stream: tractor.MsgStream, + mkt: MktPair, + + task_status: TaskStatus[trio.Event] = trio.TASK_STATUS_IGNORED, + +) -> list[Frame]: + + null_segs_detected = trio.Event() + task_status.started(null_segs_detected) + + frame: Frame = shm.array + + null_segs: tuple | None = get_null_segs( + frame, + period=timeframe, + ) + for ( + absi_start, absi_end, + fi_start, fi_end, + start_t, end_t, + start_dt, end_dt, + ) in iter_null_segs( + null_segs=null_segs, + frame=frame, + timeframe=timeframe, + ): + + # XXX NOTE: ?if we get a badly ordered timestamp + # pair, immediately stop backfilling? + if ( + start_dt + and + end_dt < start_dt + ): + await tractor.pause() + break + + ( + array, + next_start_dt, + next_end_dt, + ) = await get_hist( + timeframe, + start_dt=start_dt, + end_dt=end_dt, + ) + + # XXX TODO: pretty sure if i plot tsla, btcusdt.binance + # and mnq.cme.ib this causes a Qt crash XXDDD + + # make sure we don't overrun the buffer start + len_to_push: int = min(absi_end, array.size) + to_push: np.ndarray = array[-len_to_push:] + + await shm_push_in_between( + shm, + to_push, + prepend_index=absi_end, + update_start_on_prepend=False, + ) + # TODO: UI side needs IPC event to update.. + # - make sure the UI actually always handles + # this update! + # - remember that in the display side, only refersh this + # if the respective history is actually "in view". + # loop + try: + await sampler_stream.send({ + 'broadcast_all': { + + # XXX NOTE XXX: see the + # `.ui._display.increment_history_view()` if block + # that looks for this info to FORCE a hard viz + # redraw! + 'backfilling': (mkt.fqme, timeframe), + }, + }) + except tractor.ContextCancelled: + # log.exception + await tractor.pause() + raise + + null_segs_detected.set() + # RECHECK for more null-gaps + frame: Frame = shm.array + null_segs: tuple | None = get_null_segs( + frame, + period=timeframe, + ) + if ( + null_segs + and + len(null_segs[-1]) + ): + ( + iabs_slices, + iabs_zero_rows, + zero_t, + ) = null_segs + log.warning( + f'{len(iabs_slices)} NULL TIME SEGMENTS DETECTED!\n' + f'{pformat(iabs_slices)}' + ) + + # TODO: always backfill gaps with the earliest (price) datum's + # value to avoid the y-ranger including zeros and completely + # stretching the y-axis.. + # array: np.ndarray = shm.array + # zeros = array[array['low'] == 0] + ohlc_fields: list[str] = [ + 'open', + 'high', + 'low', + 'close', + ] + + for istart, istop in iabs_slices: + + # get view into buffer for null-segment + gap: np.ndarray = shm._array[istart:istop] + + # copy the oldest OHLC samples forward + cls: float = shm._array[istart]['close'] + + # TODO: how can we mark this range as being a gap tho? + # -[ ] maybe pg finally supports nulls in ndarray to + # show empty space somehow? + # -[ ] we could put a special value in the vlm or + # another col/field to denote? + gap[ohlc_fields] = cls + + start_t: float = shm._array[istart]['time'] + t_diff: float = (istop - istart)*timeframe + + gap['time'] = np.arange( + start=start_t, + stop=start_t + t_diff, + step=timeframe, + ) + + # TODO: reimpl using the new `.ui._remote_ctl` ctx + # ideally using some kinda decent + # tractory-reverse-lookup-connnection from some other + # `Context` type thingy? + await sampler_stream.send({ + 'broadcast_all': { + + # XXX NOTE XXX: see the + # `.ui._display.increment_history_view()` if block + # that looks for this info to FORCE a hard viz + # redraw! + 'backfilling': (mkt.fqme, timeframe), + }, + }) + + # TODO: interatively step through any remaining + # time-gaps/null-segments and spawn piecewise backfiller + # tasks in a nursery? + # -[ ] not sure that's going to work so well on the ib + # backend but worth a shot? + # -[ ] mk new history connections to make it properly + # parallel possible no matter the backend? + # -[ ] fill algo: do queries in alternating "latest, then + # earliest, then latest.. etc?" + + +async def start_backfill( + get_hist, + def_frame_duration: Duration, + mod: ModuleType, + mkt: MktPair, + shm: ShmArray, + timeframe: float, + + backfill_from_shm_index: int, + backfill_from_dt: datetime, + + sampler_stream: tractor.MsgStream, + + backfill_until_dt: datetime | None = None, + storage: StorageClient | None = None, + + write_tsdb: bool = True, + + task_status: TaskStatus[tuple] = trio.TASK_STATUS_IGNORED, + +) -> int: + + # let caller unblock and deliver latest history frame + # and use to signal that backfilling the shm gap until + # the tsdb end is complete! + bf_done = trio.Event() + task_status.started(bf_done) + + # based on the sample step size, maybe load a certain amount history + update_start_on_prepend: bool = False + if backfill_until_dt is None: + + # TODO: per-provider default history-durations? + # -[ ] inside the `open_history_client()` config allow + # declaring the history duration limits instead of + # guessing and/or applying the same limits to all? + # + # -[ ] allow declaring (default) per-provider backfill + # limits inside a [storage] sub-section in conf.toml? + # + # NOTE, when no tsdb "last datum" is provided, we just + # load some near-term history by presuming a "decently + # large" 60s duration limit and a much shorter 1s range. + periods = { + 1: {'days': 2}, + 60: {'years': 6}, + } + period_duration: int = periods[timeframe] + update_start_on_prepend: bool = True + + # NOTE: manually set the "latest" datetime which we intend to + # backfill history "until" so as to adhere to the history + # settings above when the tsdb is detected as being empty. + backfill_until_dt = backfill_from_dt.subtract(**period_duration) + + # STAGE NOTE: "backward history gap filling": + # - we push to the shm buffer until we have history back + # until the latest entry loaded from the tsdb's table B) + # - after this loop continue to check for other gaps in the + # (tsdb) history and (at least report) maybe fill them + # from new frame queries to the backend? + last_start_dt: datetime = backfill_from_dt + next_prepend_index: int = backfill_from_shm_index + + while last_start_dt > backfill_until_dt: + log.info( + f'Requesting {timeframe}s frame:\n' + f'backfill_until_dt: {backfill_until_dt}\n' + f'last_start_dt: {last_start_dt}\n' + ) + try: + ( + array, + next_start_dt, + next_end_dt, + ) = await get_hist( + timeframe, + end_dt=last_start_dt, + ) + except NoData as _daterr: + orig_last_start_dt: datetime = last_start_dt + gap_report: str = ( + f'EMPTY FRAME for `end_dt: {last_start_dt}`?\n' + f'{mod.name} -> tf@fqme: {timeframe}@{mkt.fqme}\n' + f'last_start_dt: {orig_last_start_dt}\n\n' + f'bf_until: {backfill_until_dt}\n' + ) + # EMPTY FRAME signal with 3 (likely) causes: + # + # 1. range contains legit gap in venue history + # 2. history actually (edge case) **began** at the + # value `last_start_dt` + # 3. some other unknown error (ib blocking the + # history-query bc they don't want you seeing how + # they cucked all the tinas.. like with options + # hist) + # + if def_frame_duration: + # decrement by a duration's (frame) worth of time + # as maybe indicated by the backend to see if we + # can get older data before this possible + # "history gap". + last_start_dt: datetime = last_start_dt.subtract( + seconds=def_frame_duration.total_seconds() + ) + gap_report += ( + f'Decrementing `end_dt` and retrying with,\n' + f'def_frame_duration: {def_frame_duration}\n' + f'(new) last_start_dt: {last_start_dt}\n' + ) + log.warning(gap_report) + # skip writing to shm/tsdb and try the next + # duration's worth of prior history. + continue + + else: + # await tractor.pause() + raise DataUnavailable(gap_report) + + # broker says there never was or is no more history to pull + except DataUnavailable as due: + message: str = due.args[0] + log.warning( + f'Provider {mod.name!r} halted backfill due to,\n\n' + + f'{message}\n' + + f'fqme: {mkt.fqme}\n' + f'timeframe: {timeframe}\n' + f'last_start_dt: {last_start_dt}\n' + f'bf_until: {backfill_until_dt}\n' + ) + # UGH: what's a better way? + # TODO: backends are responsible for being correct on + # this right!? + # -[ ] in the `ib` case we could maybe offer some way + # to halt the request loop until the condition is + # resolved or should the backend be entirely in + # charge of solving such faults? yes, right? + return + + time: np.ndarray = array['time'] + assert ( + time[0] + == + next_start_dt.timestamp() + ) + + assert time[-1] == next_end_dt.timestamp() + + expected_dur: Interval = last_start_dt - next_start_dt + + # frame's worth of sample-period-steps, in seconds + frame_size_s: float = len(array) * timeframe + recv_frame_dur: Duration = ( + from_timestamp(array[-1]['time']) + - + from_timestamp(array[0]['time']) + ) + if ( + (lt_frame := (recv_frame_dur < expected_dur)) + or + (null_frame := (frame_size_s == 0)) + # ^XXX, should NEVER hit now! + ): + # XXX: query result includes a start point prior to our + # expected "frame size" and thus is likely some kind of + # history gap (eg. market closed period, outage, etc.) + # so just report it to console for now. + if lt_frame: + reason = 'Possible GAP (or first-datum)' + else: + assert null_frame + reason = 'NULL-FRAME' + + missing_dur: Interval = expected_dur.end - recv_frame_dur.end + log.warning( + f'{timeframe}s-series {reason} detected!\n' + f'fqme: {mkt.fqme}\n' + f'last_start_dt: {last_start_dt}\n\n' + f'recv interval: {recv_frame_dur}\n' + f'expected interval: {expected_dur}\n\n' + + f'Missing duration of history of {missing_dur.in_words()!r}\n' + f'{missing_dur}\n' + ) + # await tractor.pause() + + to_push = diff_history( + array, + prepend_until_dt=backfill_until_dt, + ) + ln: int = len(to_push) + if ln: + log.info( + f'{ln} bars for {next_start_dt} -> {last_start_dt}' + ) + + else: + log.warning( + '0 BARS TO PUSH after diff!?\n' + f'{next_start_dt} -> {last_start_dt}' + ) + + # bail gracefully on shm allocation overrun/full + # condition + try: + await shm_push_in_between( + shm, + to_push, + prepend_index=next_prepend_index, + update_start_on_prepend=update_start_on_prepend, + ) + await sampler_stream.send({ + 'broadcast_all': { + 'backfilling': (mkt.fqme, timeframe), + }, + }) + + # decrement next prepend point + next_prepend_index = next_prepend_index - ln + last_start_dt = next_start_dt + + except ValueError as ve: + _ve = ve + log.error( + f'Shm prepend OVERRUN on: {next_start_dt} -> {last_start_dt}?' + ) + + if next_prepend_index < ln: + log.warning( + f'Shm buffer can only hold {next_prepend_index} more rows..\n' + f'Appending those from recent {ln}-sized frame, no more!' + ) + + to_push = to_push[-next_prepend_index + 1:] + await shm_push_in_between( + shm, + to_push, + prepend_index=next_prepend_index, + update_start_on_prepend=update_start_on_prepend, + ) + await sampler_stream.send({ + 'broadcast_all': { + 'backfilling': (mkt.fqme, timeframe), + }, + }) + + # can't push the entire frame? so + # push only the amount that can fit.. + break + + log.info( + f'Shm pushed {ln} frame:\n' + f'{next_start_dt} -> {last_start_dt}' + ) + + # FINALLY, maybe write immediately to the tsdb backend for + # long-term storage. + if ( + storage is not None + and + write_tsdb + ): + log.info( + f'Writing {ln} frame to storage:\n' + f'{next_start_dt} -> {last_start_dt}' + ) + + # NOTE, always drop the src asset token for + # non-currency-pair like market types (for now) + # + # THAT IS, for now our table key schema is NOT + # including the dst[/src] source asset token. SO, + # 'tsla.nasdaq.ib' over 'tsla/usd.nasdaq.ib' for + # historical reasons ONLY. + if mkt.dst.atype not in { + 'crypto', + 'crypto_currency', + 'fiat', # a "forex pair" + 'perpetual_future', # stupid "perps" from cex land + }: + col_sym_key: str = mkt.get_fqme( + delim_char='', + without_src=True, + ) + else: + col_sym_key: str = mkt.get_fqme( + delim_char='', + ) + + await storage.write_ohlcv( + col_sym_key, + shm.array, + timeframe, + ) + df: pl.DataFrame = await storage.as_df( + fqme=mkt.fqme, + period=timeframe, + load_from_offline=False, + ) + ( + wdts, + deduped, + diff, + ) = dedupe(df) + # if diff: + # sort_diff(df) + + else: + # finally filled gap + log.info( + f'Finished filling gap to tsdb start @ {backfill_until_dt}!' + ) + + # XXX: extremely important, there can be no checkpoints + # in the block above to avoid entering new ``frames`` + # values while we're pipelining the current ones to + # memory... + # await sampler_stream.send('broadcast_all') + + # short-circuit (for now) + bf_done.set() + + +# NOTE: originally this was used to cope with a tsdb (marketstore) +# which could not delivery very large frames of history over gRPC +# (thanks goolag) due to corruption issues. +# +# NOW, using apache parquet (by default in the local filesys) we +# don't have this requirement since the files can be loaded very +# quickly in entirety to memory via `polars.read_parquet()`. +# +async def back_load_from_tsdb( + storemod: ModuleType, + storage: StorageClient, + + fqme: str, + + tsdb_history: np.ndarray, + + last_tsdb_dt: datetime, + latest_start_dt: datetime, + latest_end_dt: datetime, + + bf_done: trio.Event, + + timeframe: int, + shm: ShmArray, +): + assert len(tsdb_history) + + # sync to backend history task's query/load completion + # if bf_done: + # await bf_done.wait() + + # TODO: eventually it'd be nice to not require a shm array/buffer + # to accomplish this.. maybe we can do some kind of tsdb direct to + # graphics format eventually in a child-actor? + if storemod.name == 'nativedb': + return + + await tractor.pause() + assert shm._first.value == 0 + + array = shm.array + + # if timeframe == 1: + # times = shm.array['time'] + # assert (times[1] - times[0]) == 1 + + if len(array): + shm_last_dt = from_timestamp( + shm.array[0]['time'] + ) + else: + shm_last_dt = None + + if last_tsdb_dt: + assert shm_last_dt >= last_tsdb_dt + + # do diff against start index of last frame of history and only + # fill in an amount of datums from tsdb allows for most recent + # to be loaded into mem *before* tsdb data. + if ( + last_tsdb_dt + and latest_start_dt + ): + backfilled_size_s: Duration = ( + latest_start_dt - last_tsdb_dt + ).seconds + # if the shm buffer len is not large enough to contain + # all missing data between the most recent backend-queried frame + # and the most recent dt-index in the db we warn that we only + # want to load a portion of the next tsdb query to fill that + # space. + log.info( + f'{backfilled_size_s} seconds worth of {timeframe}s loaded' + ) + + # Load TSDB history into shm buffer (for display) if there is + # remaining buffer space. + + time_key: str = 'time' + if getattr(storemod, 'ohlc_key_map', False): + keymap: bidict = storemod.ohlc_key_map + time_key: str = keymap.inverse['time'] + + # if ( + # not len(tsdb_history) + # ): + # return + + tsdb_last_frame_start: datetime = last_tsdb_dt + # load as much from storage into shm possible (depends on + # user's shm size settings). + while shm._first.value > 0: + + tsdb_history = await storage.read_ohlcv( + fqme, + timeframe=timeframe, + end=tsdb_last_frame_start, + ) + + # # empty query + # if not len(tsdb_history): + # break + + next_start = tsdb_history[time_key][0] + if next_start >= tsdb_last_frame_start: + # no earlier data detected + break + + else: + tsdb_last_frame_start = next_start + + # TODO: see if there's faster multi-field reads: + # https://numpy.org/doc/stable/user/basics.rec.html#accessing-multiple-fields + # re-index with a `time` and index field + prepend_start = shm._first.value + + to_push = tsdb_history[-prepend_start:] + shm.push( + to_push, + + # insert the history pre a "days worth" of samples + # to leave some real-time buffer space at the end. + prepend=True, + # update_first=False, + # start=prepend_start, + field_map=storemod.ohlc_key_map, + ) + + log.info(f'Loaded {to_push.shape} datums from storage') + tsdb_last_frame_start = tsdb_history[time_key][0] + + # manually trigger step update to update charts/fsps + # which need an incremental update. + # NOTE: the way this works is super duper + # un-intuitive right now: + # - the broadcaster fires a msg to the fsp subsystem. + # - fsp subsys then checks for a sample step diff and + # possibly recomputes prepended history. + # - the fsp then sends back to the parent actor + # (usually a chart showing graphics for said fsp) + # which tells the chart to conduct a manual full + # graphics loop cycle. + # await sampler_stream.send('broadcast_all') + + +async def push_latest_frame( + # box-type only that should get packed with the datetime + # objects received for the latest history frame + dt_eps: list[DateTime, DateTime], + shm: ShmArray, + get_hist: Callable[ + [int, datetime, datetime], + tuple[np.ndarray, str] + ], + timeframe: float, + config: dict, + + task_status: TaskStatus[ + Exception | list[datetime, datetime] + ] = trio.TASK_STATUS_IGNORED, + +) -> list[datetime, datetime] | None: + # get latest query's worth of history all the way + # back to what is recorded in the tsdb + try: + ( + array, + mr_start_dt, + mr_end_dt, + ) = await get_hist( + timeframe, + end_dt=None, + ) + # so caller can access these ep values + dt_eps.extend([ + mr_start_dt, + mr_end_dt, + ]) + task_status.started(dt_eps) + + # XXX: timeframe not supported for backend (since + # above exception type), terminate immediately since + # there's no backfilling possible. + except DataUnavailable: + task_status.started(None) + + if timeframe > 1: + await tractor.pause() + + # prolly tf not supported + return None + + # NOTE: on the first history, most recent history + # frame we PREPEND from the current shm ._last index + # and thus a gap between the earliest datum loaded here + # and the latest loaded from the tsdb may exist! + log.info(f'Pushing {array.size} to shm!') + shm.push( + array, + prepend=True, # append on first frame + ) + + return dt_eps + + +async def load_tsdb_hist( + storage: StorageClient, + mkt: MktPair, + timeframe: float, + + task_status: TaskStatus[None] = trio.TASK_STATUS_IGNORED, + +) -> tuple[ + np.ndarray, + DateTime, + DateTime, +] | None: + # loads a (large) frame of data from the tsdb depending + # on the db's query size limit; our "nativedb" (using + # parquet) generally can load the entire history into mem + # but if not then below the remaining history can be lazy + # loaded? + fqme: str = mkt.fqme + tsdb_entry: tuple[ + np.ndarray, + DateTime, + DateTime, + ] + try: + tsdb_entry: tuple | None = await storage.load( + fqme, + timeframe=timeframe, + ) + return tsdb_entry + + except TimeseriesNotFound: + log.warning( + f'No timeseries yet for {timeframe}@{fqme}' + ) + return None + + +async def tsdb_backfill( + mod: ModuleType, + storemod: ModuleType, + + storage: StorageClient, + mkt: MktPair, + shm: ShmArray, + timeframe: float, + + sampler_stream: tractor.MsgStream, + + task_status: TaskStatus[ + tuple[ShmArray, ShmArray] + ] = trio.TASK_STATUS_IGNORED, + +) -> None: + + if timeframe not in (1, 60): + raise ValueError( + '`piker` only needs to support 1m and 1s sampling ' + 'but ur api is trying to deliver a longer ' + f'timeframe of {timeframe} seconds..\n' + 'So yuh.. dun do dat brudder.' + ) + + get_hist: Callable[ + [int, datetime, datetime], + tuple[np.ndarray, str] + ] + config: dict[str, int] + async with ( + mod.open_history_client( + mkt, + ) as (get_hist, config), + + # NOTE: this sub-nursery splits to tasks for the given + # sampling rate to concurrently load offline tsdb + # timeseries as well as new data from the venue backend! + ): + log.info( + f'`{mod}` history client returned backfill config:\n' + f'{pformat(config)}\n' + ) + + # concurrently load the provider's most-recent-frame AND any + # pre-existing tsdb history already saved in `piker` storage. + dt_eps: list[DateTime, DateTime] = [] + async with ( + tractor.trionics.collapse_eg(), + trio.open_nursery() as tn + ): + tn.start_soon( + push_latest_frame, + dt_eps, + shm, + get_hist, + timeframe, + config, + ) + tsdb_entry: tuple = await load_tsdb_hist( + storage, + mkt, + timeframe, + ) + + # tell parent task to continue + # TODO: really we'd want this the other way with the + # tsdb load happening asap and the since the latest + # frame query will normally be the main source of + # latency? + task_status.started() + + # NOTE: iabs to start backfilling from, reverse chronological, + # ONLY AFTER the first history frame has been pushed to + # mem! + backfill_gap_from_shm_index: int = shm._first.value + 1 + + # Prepend any tsdb history into the rt-shm-buffer which + # should NOW be getting filled with the most recent history + # pulled from the data-backend. + if dt_eps: + # well then, unpack the latest (gap) backfilled frame dts + ( + mr_start_dt, + mr_end_dt, + ) = dt_eps + + first_frame_dur_s: Duration = (mr_end_dt - mr_start_dt).seconds + calced_frame_size: Duration = mk_duration( + seconds=first_frame_dur_s, + ) + # NOTE, attempt to use the backend declared default frame + # sizing (as allowed by their time-series query APIs) and + # if not provided try to construct a default from the + # first frame received above. + def_frame_durs: dict[ + int, + Duration, + ]|None = config.get('frame_types', None) + + if def_frame_durs: + def_frame_size: Duration = def_frame_durs[timeframe] + + if def_frame_size != calced_frame_size: + log.warning( + f'Expected frame size {def_frame_size}\n' + f'Rxed frame {calced_frame_size}\n' + ) + # await tractor.pause() + else: + # use what we calced from first frame above. + def_frame_size = calced_frame_size + + # NOTE: when there's no offline data, there's 2 cases: + # - data backend doesn't support timeframe/sample + # period (in which case `dt_eps` should be `None` and + # we shouldn't be here!), or + # - no prior history has been stored (yet) and we need + # todo full backfill of the history now. + if tsdb_entry is None: + # indicate to backfill task to fill the whole + # shm buffer as much as it can! + last_tsdb_dt = None + + # there's existing tsdb history from (offline) storage + # so only backfill the gap between the + # most-recent-frame (mrf) and that latest sample. + else: + ( + tsdb_history, + first_tsdb_dt, + last_tsdb_dt, + ) = tsdb_entry + + # if there is a gap to backfill from the first + # history frame until the last datum loaded from the tsdb + # continue that now in the background + async with trio.open_nursery( + strict_exception_groups=False, + ) as tn: + + bf_done = await tn.start( + partial( + start_backfill, + get_hist=get_hist, + def_frame_duration=def_frame_size, + mod=mod, + mkt=mkt, + shm=shm, + timeframe=timeframe, + + backfill_from_shm_index=backfill_gap_from_shm_index, + backfill_from_dt=mr_start_dt, + + sampler_stream=sampler_stream, + backfill_until_dt=last_tsdb_dt, + + storage=storage, + write_tsdb=True, + ) + ) + nulls_detected: trio.Event | None = None + if last_tsdb_dt is not None: + # calc the index from which the tsdb data should be + # prepended, presuming there is a gap between the + # latest frame (loaded/read above) and the latest + # sample loaded from the tsdb. + backfill_diff: Duration = mr_start_dt - last_tsdb_dt + offset_s: float = backfill_diff.in_seconds() + + # XXX EDGE CASEs: the most recent frame overlaps with + # prior tsdb history!! + # - so the latest frame's start time is earlier then + # the tsdb's latest sample. + # - alternatively this may also more generally occur + # when the venue was closed (say over the weeknd) + # causing a timeseries gap, AND the query frames size + # (eg. for ib's 1s we rx 2k datums ~= 33.33m) IS + # GREATER THAN the current venue-market's operating + # session (time) we will receive datums from BEFORE THE + # CLOSURE GAP and thus the `offset_s` value will be + # NEGATIVE! In this case we need to ensure we don't try + # to push datums that have already been recorded in the + # tsdb. In this case we instead only retreive and push + # the series portion missing from the db's data set. + # if offset_s < 0: + # non_overlap_diff: Duration = mr_end_dt - last_tsdb_dt + # non_overlap_offset_s: float = backfill_diff.in_seconds() + + offset_samples: int = round(offset_s / timeframe) + + # TODO: see if there's faster multi-field reads: + # https://numpy.org/doc/stable/user/basics.rec.html#accessing-multiple-fields + # re-index with a `time` and index field + if offset_s > 0: + # NOTE XXX: ONLY when there is an actual gap + # between the earliest sample in the latest history + # frame do we want to NOT stick the latest tsdb + # history adjacent to that latest frame! + prepend_start = shm._first.value - offset_samples + 1 + to_push = tsdb_history[-prepend_start:] + else: + # when there is overlap we want to remove the + # overlapping samples from the tsdb portion (taking + # instead the latest frame's values since THEY + # SHOULD BE THE SAME) and prepend DIRECTLY adjacent + # to the latest frame! + # TODO: assert the overlap segment array contains + # the same values!?! + prepend_start = shm._first.value + to_push = tsdb_history[-(shm._first.value):offset_samples - 1] + + # tsdb history is so far in the past we can't fit it in + # shm buffer space so simply don't load it! + if prepend_start > 0: + shm.push( + to_push, + + # insert the history pre a "days worth" of samples + # to leave some real-time buffer space at the end. + prepend=True, + # update_first=False, + start=prepend_start, + field_map=storemod.ohlc_key_map, + ) + + log.info(f'Loaded {to_push.shape} datums from storage') + + # NOTE: ASYNC-conduct tsdb timestamp gap detection and backfill any + # seemingly missing (null-time) segments.. + # TODO: ideally these can never exist! + # -[ ] somehow it seems sometimes we're writing zero-ed + # segments to tsdbs during teardown? + # -[ ] can we ensure that the backcfiller tasks do this + # work PREVENTAVELY instead? + # -[ ] fill in non-zero epoch time values ALWAYS! + # await maybe_fill_null_segments( + nulls_detected: trio.Event = await tn.start(partial( + maybe_fill_null_segments, + + shm=shm, + timeframe=timeframe, + get_hist=get_hist, + sampler_stream=sampler_stream, + mkt=mkt, + )) + + # 2nd nursery END + + # TODO: who would want to? + if nulls_detected: + await nulls_detected.wait() + + await bf_done.wait() + # TODO: maybe start history anal and load missing "history + # gaps" via backend.. + + # if len(hist_shm.array) < 2: + # TODO: there's an edge case here to solve where if the last + # frame before market close (at least on ib) was pushed and + # there was only "1 new" row pushed from the first backfill + # query-iteration, then the sample step sizing calcs will + # break upstream from here since you can't diff on at least + # 2 steps... probably should also add logic to compute from + # the tsdb series and stash that somewhere as meta data on + # the shm buffer?.. no se. + + # backload any further data from tsdb (concurrently per + # timeframe) if not all data was able to be loaded (in memory) + # from the ``StorageClient.load()`` call above. + await trio.sleep_forever() + + # XXX NOTE: this is legacy from when we were using + # marketstore and we needed to continue backloading + # incrementally from the tsdb client.. (bc it couldn't + # handle a single large query with gRPC for some + # reason.. classic goolag pos) + # tn.start_soon( + # back_load_from_tsdb, + + # storemod, + # storage, + # fqme, + + # tsdb_history, + # last_tsdb_dt, + # mr_start_dt, + # mr_end_dt, + # bf_done, + + # timeframe, + # shm, + # ) + + +async def manage_history( + mod: ModuleType, + mkt: MktPair, + some_data_ready: trio.Event, + feed_is_live: trio.Event, + timeframe: float = 60, # in seconds + + task_status: TaskStatus[ + tuple[ShmArray, ShmArray] + ] = trio.TASK_STATUS_IGNORED, + +) -> None: + ''' + Load historical series data from offline-storage (tsdb) and any + missing new datums from data provider(s). + + This is the primary "backfilling service" `trio.Task` entrypoint + and conducts, + + - time-series retreival for offline-data previously stored in + any (connected) tsdb, + + - queries for missing new datums (compared with the latest found + from ^) onward to the present by pulling from available + `datad`-provider backends. + + - real-time update of both the existing tsdb-records and the + allocated shared-memory-buffer as required by downstream + `piker.data`-layer consumer-wares. + + Init sequence: + ------------- + - allocate shm (numpy array) buffers for 60s & 1s sample rates + - configure "zero index" for each buffer: the index where + history will prepended *to* and new live data will be + appened *from*. + - open a ``.storage.StorageClient`` and load any existing tsdb + history as well as (async) start a backfill task which loads + missing (newer) history from the data provider backend: + - tsdb history is loaded first and pushed to shm ASAP. + - the backfill task loads the most recent history before + unblocking its parent task, so that the `ShmArray._last` is + up to date to allow the OHLC sampler to begin writing new + samples as the correct buffer index once the provider feed + engages. + + ''' + # TODO: is there a way to make each shm file key + # actor-tree-discovery-addr unique so we avoid collisions + # when doing tests which also allocate shms for certain instruments + # that may be in use on the system by some other running daemons? + # from tractor._state import _runtime_vars + # port = _runtime_vars['_root_mailbox'][1] + + uid: tuple = tractor.current_actor().uid + name, uuid = uid + service: str = name.rstrip(f'.{mod.name}') + fqme: str = mkt.get_fqme(delim_char='') + + # (maybe) allocate shm array for this broker/symbol which will + # be used for fast near-term history capture and processing. + hist_shm, opened = maybe_open_shm_array( + size=_default_hist_size, + append_start_index=_hist_buffer_start, + + key=f'piker.{service}[{uuid[:16]}].{fqme}.hist', + + # use any broker defined ohlc dtype: + dtype=getattr(mod, '_ohlc_dtype', def_iohlcv_fields), + + # we expect the sub-actor to write + readonly=False, + ) + hist_zero_index = hist_shm.index - 1 + + # TODO: history validation + if not opened: + raise RuntimeError( + "Persistent shm for sym was already open?!" + ) + + rt_shm, opened = maybe_open_shm_array( + size=_default_rt_size, + append_start_index=_rt_buffer_start, + key=f'piker.{service}[{uuid[:16]}].{fqme}.rt', + + # use any broker defined ohlc dtype: + dtype=getattr(mod, '_ohlc_dtype', def_iohlcv_fields), + + # we expect the sub-actor to write + readonly=False, + ) + + # (for now) set the rt (hft) shm array with space to prepend + # only a few days worth of 1s history. + days: int = 2 + start_index: int = days*_secs_in_day + rt_shm._first.value = start_index + rt_shm._last.value = start_index + rt_zero_index = rt_shm.index - 1 + + if not opened: + raise RuntimeError( + "Persistent shm for sym was already open?!" + ) + + open_history_client = getattr( + mod, + 'open_history_client', + ) + assert open_history_client + + # TODO: maybe it should be a subpkg of `.data`? + from piker import storage + + async with ( + storage.open_storage_client() as (storemod, client), + + # NOTE: this nursery spawns a task per "timeframe" (aka + # sampling period) data set since normally differently + # sampled timeseries can be loaded / process independently + # ;) + tractor.trionics.collapse_eg(), + trio.open_nursery() as tn, + ): + log.info( + f'Connecting to storage backend `{storemod.name}`:\n' + f'location: {client.address}\n' + f'db cardinality: {client.cardinality}\n' + # TODO: show backend config, eg: + # - network settings + # - storage size with compression + # - number of loaded time series? + ) + + # NOTE: this call ONLY UNBLOCKS once the latest-most frame + # (i.e. history just before the live feed latest datum) of + # history has been loaded and written to the shm buffer: + # - the backfiller task can write in reverse chronological + # to the shm and tsdb + # - the tsdb data can be loaded immediately and the + # backfiller can do a single append from it's end datum and + # then prepends backward to that from the current time + # step. + tf2mem: dict = { + 1: rt_shm, + 60: hist_shm, + } + async with open_sample_stream( + period_s=1., + shms_by_period={ + 1.: rt_shm.token, + 60.: hist_shm.token, + }, + + # NOTE: we want to only open a stream for doing + # broadcasts on backfill operations, not receive the + # sample index-stream (since there's no code in this + # data feed layer that needs to consume it). + open_index_stream=True, + sub_for_broadcasts=False, + + ) as sample_stream: + # register 1s and 1m buffers with the global + # incrementer task + log.info(f'Connected to sampler stream: {sample_stream}') + + for timeframe in [60, 1]: + await tn.start(partial( + tsdb_backfill, + mod=mod, + storemod=storemod, + storage=client, + mkt=mkt, + shm=tf2mem[timeframe], + timeframe=timeframe, + sampler_stream=sample_stream, + )) + + # indicate to caller that feed can be delivered to + # remote requesting client since we've loaded history + # data that can be used. + some_data_ready.set() + + # wait for a live feed before starting the sampler. + await feed_is_live.wait() + + # yield back after client connect with filled shm + task_status.started(( + hist_zero_index, + hist_shm, + rt_zero_index, + rt_shm, + )) + + # history retreival loop depending on user interaction + # and thus a small RPC-prot for remotely controllinlg + # what data is loaded for viewing. + await trio.sleep_forever() + + +def iter_dfs_from_shms( + fqme: str +) -> Generator[ + tuple[Path, ShmArray, pl.DataFrame], + None, + None, +]: + # shm buffer size table based on known sample rates + sizes: dict[str, int] = { + 'hist': _default_hist_size, + 'rt': _default_rt_size, + } + + # load all detected shm buffer files which have the + # passed FQME pattern in the file name. + shmfiles: list[Path] = [] + shmdir = Path('/dev/shm/') + + for shmfile in shmdir.glob(f'*{fqme}*'): + filename: str = shmfile.name + + # skip index files + if ( + '_first' in filename + or '_last' in filename + ): + continue + + assert shmfile.is_file() + log.debug(f'Found matching shm buffer file: {filename}') + shmfiles.append(shmfile) + + for shmfile in shmfiles: + + # lookup array buffer size based on file suffix + # being either .rt or .hist + key: str = shmfile.name.rsplit('.')[-1] + + # skip FSP buffers for now.. + if key not in sizes: + continue + + size: int = sizes[key] + + # attach to any shm buffer, load array into polars df, + # write to local parquet file. + shm, opened = maybe_open_shm_array( + key=shmfile.name, + size=size, + dtype=def_iohlcv_fields, + readonly=True, + ) + assert not opened + ohlcv: np.ndarray = shm.array + df: pl.DataFrame = np2pl(ohlcv) + + yield ( + shmfile, + shm, + df, + ) -- 2.34.1 From d6d4fec6664adb31348d50f88317ab9661eba18d Mon Sep 17 00:00:00 2001 From: Tyler Goodlet Date: Thu, 2 Oct 2025 14:14:28 -0400 Subject: [PATCH 02/12] Woops, keep `np2pl` exposed from `.tsp` --- piker/tsp/__init__.py | 1 + 1 file changed, 1 insertion(+) diff --git a/piker/tsp/__init__.py b/piker/tsp/__init__.py index 3c49e71b..1df0a554 100644 --- a/piker/tsp/__init__.py +++ b/piker/tsp/__init__.py @@ -35,6 +35,7 @@ from ._anal import ( dedupe as dedupe, detect_time_gaps as detect_time_gaps, pl2np as pl2np, + np2pl as np2pl, # `numpy` only slice_from_time as slice_from_time, -- 2.34.1 From 108646fdfb467c4f7e944b6e071ca87d9e194b71 Mon Sep 17 00:00:00 2001 From: Tyler Goodlet Date: Thu, 2 Oct 2025 19:53:08 -0400 Subject: [PATCH 03/12] `.tsp._history`: drop `feed_is_live` syncing, another seg flag The `await feed_is_live.wait()` is more or less pointless and would only cause slower startup afaig (as-far-as-i-grok) so i'm masking it here. This also removes the final `strict_exception_groups=False` use from the non-tests code base, flipping to the `tractor.trionics` collapser once and for all! --- piker/tsp/_history.py | 23 ++++++++++++++++------- 1 file changed, 16 insertions(+), 7 deletions(-) diff --git a/piker/tsp/_history.py b/piker/tsp/_history.py index a4ee04c2..b6b15e72 100644 --- a/piker/tsp/_history.py +++ b/piker/tsp/_history.py @@ -886,7 +886,7 @@ async def load_tsdb_hist( np.ndarray, DateTime, DateTime, -] | None: +]|None: # loads a (large) frame of data from the tsdb depending # on the db's query size limit; our "nativedb" (using # parquet) generally can load the entire history into mem @@ -899,7 +899,7 @@ async def load_tsdb_hist( DateTime, ] try: - tsdb_entry: tuple | None = await storage.load( + tsdb_entry: tuple|None = await storage.load( fqme, timeframe=timeframe, ) @@ -1046,12 +1046,15 @@ async def tsdb_backfill( last_tsdb_dt, ) = tsdb_entry + # await tractor.pause() + # if there is a gap to backfill from the first # history frame until the last datum loaded from the tsdb # continue that now in the background - async with trio.open_nursery( - strict_exception_groups=False, - ) as tn: + async with ( + tractor.trionics.collapse_eg(), + trio.open_nursery() as tn, + ): bf_done = await tn.start( partial( @@ -1322,8 +1325,14 @@ async def manage_history( # TODO: maybe it should be a subpkg of `.data`? from piker import storage + storemod: ModuleType + client: StorageClient + tn: trio.Nursery async with ( - storage.open_storage_client() as (storemod, client), + storage.open_storage_client() as ( + storemod, + client, + ), # NOTE: this nursery spawns a task per "timeframe" (aka # sampling period) data set since normally differently @@ -1392,7 +1401,7 @@ async def manage_history( some_data_ready.set() # wait for a live feed before starting the sampler. - await feed_is_live.wait() + # await feed_is_live.wait() # yield back after client connect with filled shm task_status.started(( -- 2.34.1 From 534b13f755a9ae6e1e8adf2b593e9bb74b969773 Mon Sep 17 00:00:00 2001 From: Tyler Goodlet Date: Thu, 9 Oct 2025 20:00:05 -0400 Subject: [PATCH 04/12] `.storage.__init__`: code styling updates --- piker/storage/__init__.py | 20 ++++++++++++++------ 1 file changed, 14 insertions(+), 6 deletions(-) diff --git a/piker/storage/__init__.py b/piker/storage/__init__.py index f32f40b6..361eaadc 100644 --- a/piker/storage/__init__.py +++ b/piker/storage/__init__.py @@ -43,7 +43,6 @@ from typing import ( import numpy as np - from .. import config from ..service import ( check_for_service, @@ -152,7 +151,10 @@ class StorageConnectionError(ConnectionError): ''' -def get_storagemod(name: str) -> ModuleType: +def get_storagemod( + name: str, + +) -> ModuleType: mod: ModuleType = import_module( '.' + name, 'piker.storage', @@ -165,9 +167,12 @@ def get_storagemod(name: str) -> ModuleType: @acm async def open_storage_client( - backend: str | None = None, + backend: str|None = None, -) -> tuple[ModuleType, StorageClient]: +) -> tuple[ + ModuleType, + StorageClient, +]: ''' Load the ``StorageClient`` for named backend. @@ -267,7 +272,10 @@ async def open_tsdb_client( from ..data.feed import maybe_open_feed async with ( - open_storage_client() as (_, storage), + open_storage_client() as ( + _, + storage, + ), maybe_open_feed( [fqme], @@ -275,7 +283,7 @@ async def open_tsdb_client( ) as feed, ): - profiler(f'opened feed for {fqme}') + profiler(f'opened feed for {fqme!r}') # to_append = feed.hist_shm.array # to_prepend = None -- 2.34.1 From 4bfdd388bb3ba53b39680abd9716851095da6b0e Mon Sep 17 00:00:00 2001 From: goodboy Date: Sun, 18 Jan 2026 14:19:51 -0500 Subject: [PATCH 05/12] Fix polars 1.36.0 duration API Polars tightened type safety for `.dt` accessor methods requiring `total_*` methods for duration types vs datetime component accessors like `day()` which now only work on datetime dtypes. `detect_time_gaps()` in `.tsp._anal` was calling `.dt.day()` on `dt_diff` column (a duration from `.diff()`) which throws `InvalidOperationError` on modern polars. Changes: - use f-string to add pluralization to map time unit strings to `total_s` form for the new duration API. - Handle singular/plural forms: 'day' -> 'days' -> 'total_days' - Ensure trailing 's' before applying 'total_' prefix Also updates inline comments explaining the polars type distinction between datetime components vs duration totals. Fixes `piker store ldshm` crashes on datasets with time gaps. (this patch was generated in some part by [`claude-code`][claude-code-gh]) [claude-code-gh]: https://github.com/anthropics/claude-code --- piker/tsp/_anal.py | 13 ++++++++++++- 1 file changed, 12 insertions(+), 1 deletion(-) diff --git a/piker/tsp/_anal.py b/piker/tsp/_anal.py index 42c3aa6c..26c3740e 100644 --- a/piker/tsp/_anal.py +++ b/piker/tsp/_anal.py @@ -578,11 +578,22 @@ def detect_time_gaps( # NOTE: this flag is to indicate that on this (sampling) time # scale we expect to only be filtering against larger venue # closures-scale time gaps. + # + # Map to total_ method since `dt_diff` is a duration type, + # not datetime - modern polars requires `total_*` methods + # for duration types (e.g. `total_days()` not `day()`) + # Ensure plural form for polars API (e.g. 'day' -> 'days') + unit_plural: str = ( + gap_dt_unit + if gap_dt_unit.endswith('s') + else f'{gap_dt_unit}s' + ) + duration_method: str = f'total_{unit_plural}' return step_gaps.filter( # Second by an arbitrary dt-unit step size getattr( pl.col('dt_diff').dt, - gap_dt_unit, + duration_method, )().abs() > gap_thresh ) -- 2.34.1 From 192fe0dc7390ee2fb4ceecd9a201f1065b2734d6 Mon Sep 17 00:00:00 2001 From: goodboy Date: Sun, 18 Jan 2026 18:18:34 -0500 Subject: [PATCH 06/12] Add `pexpect`-based `pdbp`-REPL offline helper Add a new `snippets/claude_debug_helper.py` to provide a programmatic interface to `tractor.pause()` debugger sessions for incremental data inspection matching the interactive UX but able to be run by `claude` "offline" since it can't seem to feed stdin (so it claims) to the `pdb` instance due to lack of ability to allocate a tty internally. The script-wrapper is based on `tractor`'s `tests/devx/` suite's use of `pexpect` patterns for driving `pdbp` prompts and thus enables automated-offline execution of REPL-inspection commands **without** using incremental-realtime output capture (like a human would use it). Features: - `run_pdb_commands()`: batch command execution - `InteractivePdbSession`: context manager for step-by-step REPL interaction - `expect()` wrapper: timeout handling with buffer display - Proper stdin/stdout handling via `pexpect.spawn()` Example usage: ```python from debug_helper import InteractivePdbSession with InteractivePdbSession( cmd='piker store ldshm zecusdt.usdtm.perp.binance' ) as session: session.run('deduped.shape') session.run('step_gaps.shape') ``` (this patch was generated in some part by [`claude-code`][claude-code-gh]) [claude-code-gh]: https://github.com/anthropics/claude-code --- snippets/claude_debug_helper.py | 256 ++++++++++++++++++++++++++++++++ 1 file changed, 256 insertions(+) create mode 100755 snippets/claude_debug_helper.py diff --git a/snippets/claude_debug_helper.py b/snippets/claude_debug_helper.py new file mode 100755 index 00000000..97467d8a --- /dev/null +++ b/snippets/claude_debug_helper.py @@ -0,0 +1,256 @@ +#!/usr/bin/env python +''' +Programmatic debugging helper for `pdbp` REPL human-like +interaction but built to allow `claude` to interact with +crashes and `tractor.pause()` breakpoints along side a human dev. + +Originally written by `clauded` during a backfiller inspection +session with @goodboy trying to resolve duplicate/gappy ohlcv ts +issues discovered while testing the new `nativedb` tsdb. + +Allows `claude` to run `pdb` commands and capture output in an "offline" +manner but generating similar output as if it was iteracting with +the debug REPL. + +The use of `pexpect` is heavily based on tractor's REPL UX test +suite(s), namely various `tests/devx/test_debugger.py` patterns. + +''' +import sys +import os +import time + +import pexpect +from pexpect.exceptions import ( + TIMEOUT, + EOF, +) + + +PROMPT: str = r'\(Pdb\+\)' + + +def expect( + child: pexpect.spawn, + patt: str, + **kwargs, +) -> None: + ''' + Expect wrapper that prints last console data before failing. + + ''' + try: + child.expect( + patt, + **kwargs, + ) + except TIMEOUT: + before: str = ( + str(child.before.decode()) + if isinstance(child.before, bytes) + else str(child.before) + ) + print( + f'TIMEOUT waiting for pattern: {patt}\n' + f'Last seen output:\n{before}' + ) + raise + + +def run_pdb_commands( + commands: list[str], + initial_cmd: str = 'piker store ldshm xmrusdt.usdtm.perp.binance', + timeout: int = 30, + print_output: bool = True, +) -> dict[str, str]: + ''' + Spawn piker process, wait for pdb prompt, execute commands. + + Returns dict mapping command -> output. + + ''' + results: dict[str, str] = {} + + # Disable colored output for easier parsing + os.environ['PYTHON_COLORS'] = '0' + + # Spawn the process + if print_output: + print(f'Spawning: {initial_cmd}') + + child: pexpect.spawn = pexpect.spawn( + initial_cmd, + timeout=timeout, + encoding='utf-8', + echo=False, + ) + + # Wait for pdb prompt + try: + expect(child, PROMPT, timeout=timeout) + if print_output: + print('Reached pdb prompt!') + + # Execute each command + for cmd in commands: + if print_output: + print(f'\n>>> {cmd}') + + child.sendline(cmd) + time.sleep(0.1) + + # Wait for next prompt + expect(child, PROMPT, timeout=timeout) + + # Capture output (everything before the prompt) + output: str = ( + str(child.before.decode()) + if isinstance(child.before, bytes) + else str(child.before) + ) + results[cmd] = output + + if print_output: + print(output) + + # Quit debugger gracefully + child.sendline('quit') + try: + child.expect(EOF, timeout=5) + except (TIMEOUT, EOF): + pass + + except TIMEOUT as e: + print(f'Timeout: {e}') + if child.before: + before: str = ( + str(child.before.decode()) + if isinstance(child.before, bytes) + else str(child.before) + ) + print(f'Buffer:\n{before}') + results['_error'] = str(e) + + finally: + if child.isalive(): + child.close(force=True) + + return results + + +class InteractivePdbSession: + ''' + Interactive pdb session manager for incremental debugging. + + ''' + def __init__( + self, + cmd: str = 'piker store ldshm xmrusdt.usdtm.perp.binance', + timeout: int = 30, + ): + self.cmd: str = cmd + self.timeout: int = timeout + self.child: pexpect.spawn|None = None + self.history: list[tuple[str, str]] = [] + + def start(self) -> None: + ''' + Start the piker process and wait for first prompt. + + ''' + os.environ['PYTHON_COLORS'] = '0' + + print(f'Starting: {self.cmd}') + self.child = pexpect.spawn( + self.cmd, + timeout=self.timeout, + encoding='utf-8', + echo=False, + ) + + # Wait for initial prompt + expect(self.child, PROMPT, timeout=self.timeout) + print('Ready at pdb prompt!') + + def run( + self, + cmd: str, + print_output: bool = True, + ) -> str: + ''' + Execute a single pdb command and return output. + + ''' + if not self.child or not self.child.isalive(): + raise RuntimeError('Session not started or dead') + + if print_output: + print(f'\n>>> {cmd}') + + self.child.sendline(cmd) + time.sleep(0.1) + + # Wait for next prompt + expect(self.child, PROMPT, timeout=self.timeout) + + output: str = ( + str(self.child.before.decode()) + if isinstance(self.child.before, bytes) + else str(self.child.before) + ) + self.history.append((cmd, output)) + + if print_output: + print(output) + + return output + + def quit(self) -> None: + ''' + Exit the debugger and cleanup. + + ''' + if self.child and self.child.isalive(): + self.child.sendline('quit') + try: + self.child.expect(EOF, timeout=5) + except (TIMEOUT, EOF): + pass + self.child.close(force=True) + + def __enter__(self): + self.start() + return self + + def __exit__(self, *args): + self.quit() + + +if __name__ == '__main__': + # Example inspection commands + inspect_cmds: list[str] = [ + 'locals().keys()', + 'type(deduped)', + 'deduped.shape', + ( + 'step_gaps.shape ' + 'if "step_gaps" in locals() ' + 'else "N/A"' + ), + ( + 'venue_gaps.shape ' + 'if "venue_gaps" in locals() ' + 'else "N/A"' + ), + ] + + # Allow commands from CLI args + if len(sys.argv) > 1: + inspect_cmds = sys.argv[1:] + + # Interactive session example + with InteractivePdbSession() as session: + for cmd in inspect_cmds: + session.run(cmd) + + print('\n=== Session Complete ===') -- 2.34.1 From a1048c847b9f9d6a341cde508fa6232d908854d1 Mon Sep 17 00:00:00 2001 From: goodboy Date: Sun, 18 Jan 2026 21:00:17 -0500 Subject: [PATCH 07/12] Add vlm-based "smart" OHLCV de-duping & bar validation Using `claude`, add a `.tsp._dedupe_smart` module that attemps "smarter" duplicate bars by attempting to distinguish between erroneous bars partially written during concurrent backfill race conditions vs. **actual** data quality issues from historical providers. Problem: -------- Concurrent writes (live updates vs. backfilling) can result in create duplicate timestamped ohlcv vars with different values. Some potential scenarios include, - a market live feed is cancelled during live update resulting in the "last" datum being partially updated with all the ticks for the time step. - when the feed is rebooted during charting, the backfiller will not finalize this bar since rn it presumes it should only fill data for time steps not already in the tsdb storage. Our current naive `.unique()` approach obvi keeps the incomplete bar and a "smarter" approach is to compare the provider's final vlm amount vs. the maybe-cancelled tsdb's bar; a higher vlm value from the provider likely indicates the cancelled-during-live-write and **not** a datum discrepancy from said data provider. Analysis (with `claude`) of `zecusdt` data revealed: - 1000 duplicate timestamps - 999 identical bars (pure duplicates from 2022 backfill overlap) - 1 volume-monotonic conflict (live partial vs backfill complete) A soln from `claude` -> `tsp._dedupe_smart.dedupe_ohlcv_smart()` which: - sorts by vlm **before** deduplication and keep the most complete bar based on vlm monotonicity as well as the following OHLCV validation assumptions: * volume should always increase * high should be non-decreasing, * low should be non-increasing * open should be identical - Separates valid race conditions from provider data quality issues and reports and returns both dfs. Change summary by `claude`: - `.tsp._dedupe_smart`: new module with validation logic - `.tsp.__init__`: expose `dedupe_ohlcv_smart()` - `.storage.cli`: integrate smart dedupe, add logging for: * duplicate counts (identical vs monotonic races) * data quality violations (non-monotonic, invalid OHLC ranges) * warnings for provider data issues - Remove `assert not diff` (duplicates are valid now) Verified on `zecusdt`: correctly keeps index 3143645 (volume=287.777) over 3143644 (volume=140.299) for conflicting 2026-01-16 18:54 UTC bar. `claude`'s Summary of reasoning ------------------------------- - volume monotonicity is critical: a bar's volume only increases during its time window. - a backfilled bar should always have volume >= live updated. - violations indicate any of: * Provider data corruption * Non-OHLCV aggregation semantics * Timestamp misalignment (this patch was generated in some part by [`claude-code`][claude-code-gh]) [claude-code-gh]: https://github.com/anthropics/claude-code --- piker/storage/cli.py | 42 +++++++- piker/tsp/__init__.py | 3 + piker/tsp/_dedupe_smart.py | 206 +++++++++++++++++++++++++++++++++++++ 3 files changed, 246 insertions(+), 5 deletions(-) create mode 100644 piker/tsp/_dedupe_smart.py diff --git a/piker/storage/cli.py b/piker/storage/cli.py index 1c8ff11b..5c087898 100644 --- a/piker/storage/cli.py +++ b/piker/storage/cli.py @@ -441,11 +441,37 @@ def ldshm( wdts, deduped, diff, - ) = tsp.dedupe( + valid_races, + dq_issues, + ) = tsp.dedupe_ohlcv_smart( shm_df, - period=period_s, ) + # Report duplicate analysis + if diff > 0: + log.info( + f'Removed {diff} duplicate timestamp(s)\n' + ) + if valid_races is not None: + identical: int = ( + valid_races + .filter(pl.col('identical_bars')) + .height + ) + monotonic: int = valid_races.height - identical + log.info( + f'Valid race conditions: {valid_races.height}\n' + f' - Identical bars: {identical}\n' + f' - Volume monotonic: {monotonic}\n' + ) + + if dq_issues is not None: + log.warning( + f'DATA QUALITY ISSUES from provider: ' + f'{dq_issues.height} timestamp(s)\n' + f'{dq_issues}\n' + ) + # detect gaps from in expected (uniform OHLC) sample period step_gaps: pl.DataFrame = tsp.detect_time_gaps( deduped, @@ -460,7 +486,8 @@ def ldshm( # TODO: actually pull the exact duration # expected for each venue operational period? - gap_dt_unit='days', + # gap_dt_unit='day', + gap_dt_unit='day', gap_thresh=1, ) @@ -534,8 +561,13 @@ def ldshm( tf2aids[period_s] = aids else: - # allow interaction even when no ts problems. - assert not diff + # No significant gaps to handle, but may have had + # duplicates removed (valid race conditions are ok) + if diff > 0 and dq_issues is not None: + log.warning( + 'Found duplicates with data quality issues ' + 'but no significant time gaps!\n' + ) await tractor.pause() log.info('Exiting TSP shm anal-izer!') diff --git a/piker/tsp/__init__.py b/piker/tsp/__init__.py index 1df0a554..81274ed8 100644 --- a/piker/tsp/__init__.py +++ b/piker/tsp/__init__.py @@ -40,6 +40,9 @@ from ._anal import ( # `numpy` only slice_from_time as slice_from_time, ) +from ._dedupe_smart import ( + dedupe_ohlcv_smart as dedupe_ohlcv_smart, +) from ._history import ( iter_dfs_from_shms as iter_dfs_from_shms, manage_history as manage_history, diff --git a/piker/tsp/_dedupe_smart.py b/piker/tsp/_dedupe_smart.py new file mode 100644 index 00000000..8c0ac55a --- /dev/null +++ b/piker/tsp/_dedupe_smart.py @@ -0,0 +1,206 @@ +''' +Smart OHLCV deduplication with data quality validation. + +Handles concurrent write conflicts by keeping the most complete bar +(highest volume) while detecting data quality anomalies. + +''' +import polars as pl + +from ._anal import with_dts + + +def dedupe_ohlcv_smart( + src_df: pl.DataFrame, + time_col: str = 'time', + volume_col: str = 'volume', + sort: bool = True, + +) -> tuple[ + pl.DataFrame, # with dts + pl.DataFrame, # deduped (keeping higher volume bars) + int, # count of dupes removed + pl.DataFrame|None, # valid race conditions + pl.DataFrame|None, # data quality violations +]: + ''' + Smart OHLCV deduplication keeping most complete bars. + + For duplicate timestamps, keeps bar with highest volume under + the assumption that higher volume indicates more complete/final + data from backfill vs partial live updates. + + Returns + ------- + Tuple of: + - wdts: original dataframe with datetime columns added + - deduped: deduplicated frame keeping highest-volume bars + - diff: number of duplicate rows removed + - valid_races: duplicates meeting expected race condition pattern + (volume monotonic, OHLC ranges valid) + - data_quality_issues: duplicates violating expected relationships + indicating provider data problems + + ''' + wdts: pl.DataFrame = with_dts(src_df) + + # Find duplicate timestamps + dupes: pl.DataFrame = wdts.filter( + pl.col(time_col).is_duplicated() + ) + + if dupes.is_empty(): + # No duplicates, return as-is + return (wdts, wdts, 0, None, None) + + # Analyze duplicate groups for validation + dupe_analysis: pl.DataFrame = ( + dupes + .sort([time_col, 'index']) + .group_by(time_col, maintain_order=True) + .agg([ + pl.col('index').alias('indices'), + pl.col('volume').alias('volumes'), + pl.col('high').alias('highs'), + pl.col('low').alias('lows'), + pl.col('open').alias('opens'), + pl.col('close').alias('closes'), + pl.col('dt').first().alias('dt'), + pl.len().alias('count'), + ]) + ) + + # Validate OHLCV monotonicity for each duplicate group + def check_ohlcv_validity(row) -> dict[str, bool]: + ''' + Check if duplicate bars follow expected race condition pattern. + + For a valid live-update → backfill race: + - volume should be monotonically increasing + - high should be monotonically non-decreasing + - low should be monotonically non-increasing + - open should be identical (fixed at bar start) + + Returns dict of violation flags. + + ''' + vols: list = row['volumes'] + highs: list = row['highs'] + lows: list = row['lows'] + opens: list = row['opens'] + + violations: dict[str, bool] = { + 'volume_non_monotonic': False, + 'high_decreased': False, + 'low_increased': False, + 'open_mismatch': False, + 'identical_bars': False, + } + + # Check if all bars are identical (pure duplicate) + if ( + len(set(vols)) == 1 + and len(set(highs)) == 1 + and len(set(lows)) == 1 + and len(set(opens)) == 1 + ): + violations['identical_bars'] = True + return violations + + # Check volume monotonicity + for i in range(1, len(vols)): + if vols[i] < vols[i-1]: + violations['volume_non_monotonic'] = True + break + + # Check high monotonicity (can only increase or stay same) + for i in range(1, len(highs)): + if highs[i] < highs[i-1]: + violations['high_decreased'] = True + break + + # Check low monotonicity (can only decrease or stay same) + for i in range(1, len(lows)): + if lows[i] > lows[i-1]: + violations['low_increased'] = True + break + + # Check open consistency (should be fixed) + if len(set(opens)) > 1: + violations['open_mismatch'] = True + + return violations + + # Apply validation + dupe_analysis = dupe_analysis.with_columns([ + pl.struct(['volumes', 'highs', 'lows', 'opens']) + .map_elements( + check_ohlcv_validity, + return_dtype=pl.Struct([ + pl.Field('volume_non_monotonic', pl.Boolean), + pl.Field('high_decreased', pl.Boolean), + pl.Field('low_increased', pl.Boolean), + pl.Field('open_mismatch', pl.Boolean), + pl.Field('identical_bars', pl.Boolean), + ]) + ) + .alias('validity') + ]) + + # Unnest validity struct + dupe_analysis = dupe_analysis.unnest('validity') + + # Separate valid races from data quality issues + valid_races: pl.DataFrame|None = ( + dupe_analysis + .filter( + # Valid if no violations OR just identical bars + ~pl.col('volume_non_monotonic') + & ~pl.col('high_decreased') + & ~pl.col('low_increased') + & ~pl.col('open_mismatch') + ) + ) + if valid_races.is_empty(): + valid_races = None + + data_quality_issues: pl.DataFrame|None = ( + dupe_analysis + .filter( + # Issues if any non-identical violation exists + ( + pl.col('volume_non_monotonic') + | pl.col('high_decreased') + | pl.col('low_increased') + | pl.col('open_mismatch') + ) + & ~pl.col('identical_bars') + ) + ) + if data_quality_issues.is_empty(): + data_quality_issues = None + + # Deduplicate: keep highest volume bar for each timestamp + deduped: pl.DataFrame = ( + wdts + .sort([time_col, volume_col]) + .unique( + subset=[time_col], + keep='last', + maintain_order=False, + ) + ) + + # Re-sort by time or index + if sort: + deduped = deduped.sort(by=time_col) + + diff: int = wdts.height - deduped.height + + return ( + wdts, + deduped, + diff, + valid_races, + data_quality_issues, + ) -- 2.34.1 From d4b46e0edafa4d8c57ecbe87250fe0699077cea9 Mon Sep 17 00:00:00 2001 From: goodboy Date: Wed, 21 Jan 2026 20:02:10 -0500 Subject: [PATCH 08/12] Fix `Qt6` types for new sub-namespaces --- piker/ui/_l1.py | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/piker/ui/_l1.py b/piker/ui/_l1.py index 8d29d90c..f557de4b 100644 --- a/piker/ui/_l1.py +++ b/piker/ui/_l1.py @@ -237,8 +237,8 @@ class LevelLabel(YAxisLabel): class L1Label(LevelLabel): text_flags = ( - QtCore.Qt.TextDontClip - | QtCore.Qt.AlignLeft + QtCore.Qt.TextFlag.TextDontClip + | QtCore.Qt.AlignmentFlag.AlignLeft ) def set_label_str( -- 2.34.1 From caf2cc5a5b7371c7e34b71217cdd6c6e4df14bb6 Mon Sep 17 00:00:00 2001 From: goodboy Date: Wed, 21 Jan 2026 20:05:07 -0500 Subject: [PATCH 09/12] ib: up API timeout default for remote host conns --- piker/brokers/ib/api.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/piker/brokers/ib/api.py b/piker/brokers/ib/api.py index 5bcc7336..4a63a0f1 100644 --- a/piker/brokers/ib/api.py +++ b/piker/brokers/ib/api.py @@ -1187,7 +1187,7 @@ async def load_aio_clients( # the API TCP in `ib_insync` connection can be flaky af so instead # retry a few times to get the client going.. connect_retries: int = 3, - connect_timeout: float = 10, + connect_timeout: float = 30, # in case a remote-host disconnect_on_exit: bool = True, ) -> dict[str, Client]: -- 2.34.1 From a8e4e1b2c59ec42c8204c1c15c0c5785b52df0d3 Mon Sep 17 00:00:00 2001 From: goodboy Date: Wed, 21 Jan 2026 21:34:45 -0500 Subject: [PATCH 10/12] Tolerate various "bad data" cases in `markup_gaps()` Namely such that when the previous-df-row by our shm-abs-'index' doesn't exist we ignore certain cases which are likely due to borked-but-benign samples written to the tsdb or rt shm buffers prior. Particularly we now ignore, - any `dt`/`prev_dt` values which are UNIX-epoch timestamped (val of 0). - any row-is-first-row in the df; there is no previous. - any missing previous datum by 'index', in which case we lookup the `wdts` prior row and use that instead. * this would indicate a missing sample for the time-step but we can still detect a "gap" by looking at the prior row, by df-abs-index `i`, and use its timestamp to determine the period/size of missing samples (which need to likely still be retrieved). * in this case i'm leaving in a pause-point for introspecting these rarer cases when `--pdb` is passed via CLI. Relatedly in the `piker store` CLI ep, - add `--pdb` flag to `piker store`, pass it verbatim as `debug_mode`. - when `times` has only a single row, don't calc a `period_s` median. - only trace `null_segs` when in debug mode. - always markup/dedupe gaps for `period_s==60` --- piker/storage/cli.py | 77 ++++++++++++++++++++++++++++++-------------- 1 file changed, 53 insertions(+), 24 deletions(-) diff --git a/piker/storage/cli.py b/piker/storage/cli.py index 5c087898..e97f4023 100644 --- a/piker/storage/cli.py +++ b/piker/storage/cli.py @@ -242,6 +242,7 @@ def anal( trio.run(main) +# TODO, move to `.tsp._annotate` async def markup_gaps( fqme: str, timeframe: float, @@ -288,18 +289,38 @@ async def markup_gaps( ) # XXX: probably a gap in the (newly sorted or de-duplicated) # dt-df, so we might need to re-index first.. + dt: pl.Series = row['dt'] + dt_prev: pl.Series = row['dt_prev'] if prev_r.is_empty(): - await tractor.pause() + + # XXX, filter out any special ignore cases, + # - UNIX-epoch stamped datums + # - first row + if ( + dt_prev.dt.epoch()[0] == 0 + or + dt.dt.epoch()[0] == 0 + ): + log.warning('Skipping row with UNIX epoch timestamp ??') + continue + + if wdts[0]['index'][0] == iend: # first row + log.warning('Skipping first-row (has no previous obvi) !!') + continue + + # XXX, if the previous-row by shm-index is missing, + # meaning there is a missing sample (set), get the prior + # row by df index and attempt to use it? + i_wdts: pl.DataFrame = wdts.with_row_index(name='i') + i_row: int = i_wdts.filter(pl.col('index') == iend)['i'][0] + prev_row_by_i = wdts[i_row] + prev_r: pl.DataFrame = prev_row_by_i + + # debug any missing pre-row + if tractor._state.is_debug_mode(): + await tractor.pause() istart: int = prev_r['index'][0] - # dt_start_t: float = dt_prev.timestamp() - - # start_t: float = prev_r['time'] - # assert ( - # dt_start_t - # == - # start_t - # ) # TODO: implement px-col width measure # and ensure at least as many px-cols @@ -358,6 +379,7 @@ def ldshm( fqme: str, write_parquet: bool = True, reload_parquet_to_shm: bool = True, + pdb: bool = False, # --pdb passed? ) -> None: ''' @@ -377,7 +399,7 @@ def ldshm( open_piker_runtime( 'polars_boi', enable_modules=['piker.data._sharedmem'], - debug_mode=True, + debug_mode=pdb, ), open_storage_client() as ( mod, @@ -397,17 +419,19 @@ def ldshm( times: np.ndarray = shm.array['time'] d1: float = float(times[-1] - times[-2]) - d2: float = float(times[-2] - times[-3]) - med: float = np.median(np.diff(times)) - if ( - d1 < 1. - and d2 < 1. - and med < 1. - ): - raise ValueError( - f'Something is wrong with time period for {shm}:\n{times}' - ) - + d2: float = 0 + # XXX, take a median sample rate if sufficient data + if times.size > 2: + d2: float = float(times[-2] - times[-3]) + med: float = np.median(np.diff(times)) + if ( + d1 < 1. + and d2 < 1. + and med < 1. + ): + raise ValueError( + f'Something is wrong with time period for {shm}:\n{times}' + ) period_s: float = float(max(d1, d2, med)) null_segs: tuple = tsp.get_null_segs( @@ -417,7 +441,9 @@ def ldshm( # TODO: call null-seg fixer somehow? if null_segs: - await tractor.pause() + + if tractor._state.is_debug_mode(): + await tractor.pause() # async with ( # trio.open_nursery() as tn, # mod.open_history_client( @@ -498,8 +524,11 @@ def ldshm( if ( not venue_gaps.is_empty() or ( - period_s < 60 - and not step_gaps.is_empty() + not step_gaps.is_empty() + # XXX, i presume i put this bc i was guarding + # for ib venue gaps? + # and + # period_s < 60 ) ): # write repaired ts to parquet-file? -- 2.34.1 From cd6bc105de892165d0efd57065c53ff2289fffd9 Mon Sep 17 00:00:00 2001 From: goodboy Date: Wed, 21 Jan 2026 22:31:30 -0500 Subject: [PATCH 11/12] Enable tracing back insert backfills Namely insertion writes which over-fill the shm buffer past the latest tsdb sample via `.tsp._history.shm_push_in_between()`. Deats, - check earliest `to_push` timestamp and enter pause point if it's earlier then the tsdb's `backfill_until_dt` stamp. - requires actually passing the `backfill_until_dt: datetime` thru, * `get_null_segs()` * `maybe_fill_null_segments()` * `shm_push_in_between()` (obvi XD) --- piker/tsp/_history.py | 58 +++++++++++++++++++++++++++++++------------ 1 file changed, 42 insertions(+), 16 deletions(-) diff --git a/piker/tsp/_history.py b/piker/tsp/_history.py index b6b15e72..361b0e23 100644 --- a/piker/tsp/_history.py +++ b/piker/tsp/_history.py @@ -75,7 +75,6 @@ from piker.brokers._util import ( ) from piker.storage import TimeseriesNotFound from ._anal import ( - dedupe, get_null_segs, iter_null_segs, @@ -120,15 +119,16 @@ _rt_buffer_start = int((_days_worth - 1) * _secs_in_day) def diff_history( array: np.ndarray, - append_until_dt: datetime | None = None, - prepend_until_dt: datetime | None = None, + append_until_dt: datetime|None = None, + prepend_until_dt: datetime|None = None, ) -> np.ndarray: # no diffing with tsdb dt index possible.. if ( prepend_until_dt is None - and append_until_dt is None + and + append_until_dt is None ): return array @@ -140,15 +140,26 @@ def diff_history( return array[times >= prepend_until_dt.timestamp()] -# TODO: can't we just make this a sync func now? async def shm_push_in_between( shm: ShmArray, to_push: np.ndarray, prepend_index: int, + backfill_until_dt: datetime, update_start_on_prepend: bool = False, ) -> int: + + # XXX, try to catch bad inserts by peeking at the first/last + # times and ensure we don't violate order. + f_times: np.ndarray = to_push['time'] + f_start: float = f_times[0] + f_start_dt = from_timestamp(f_start) + if ( + f_start_dt < backfill_until_dt + ): + await tractor.pause() + # XXX: extremely important, there can be no checkpoints # in the body of this func to avoid entering new ``frames`` # values while we're pipelining the current ones to @@ -181,6 +192,7 @@ async def maybe_fill_null_segments( get_hist: Callable, sampler_stream: tractor.MsgStream, mkt: MktPair, + backfill_until_dt: datetime, task_status: TaskStatus[trio.Event] = trio.TASK_STATUS_IGNORED, @@ -191,7 +203,11 @@ async def maybe_fill_null_segments( frame: Frame = shm.array - null_segs: tuple | None = get_null_segs( + # TODO, put in parent task/daemon root! + import greenback + await greenback.ensure_portal() + + null_segs: tuple|None = get_null_segs( frame, period=timeframe, ) @@ -237,6 +253,7 @@ async def maybe_fill_null_segments( shm, to_push, prepend_index=absi_end, + backfill_until_dt=backfill_until_dt, update_start_on_prepend=False, ) # TODO: UI side needs IPC event to update.. @@ -352,15 +369,12 @@ async def start_backfill( mkt: MktPair, shm: ShmArray, timeframe: float, - backfill_from_shm_index: int, backfill_from_dt: datetime, - sampler_stream: tractor.MsgStream, - backfill_until_dt: datetime | None = None, - storage: StorageClient | None = None, - + backfill_until_dt: datetime|None = None, + storage: StorageClient|None = None, write_tsdb: bool = True, task_status: TaskStatus[tuple] = trio.TASK_STATUS_IGNORED, @@ -495,7 +509,14 @@ async def start_backfill( assert time[-1] == next_end_dt.timestamp() - expected_dur: Interval = last_start_dt - next_start_dt + expected_dur: Interval = ( + last_start_dt.subtract( + seconds=timeframe + # ^XXX, always "up to" the bar *before* + ) + - + next_start_dt + ) # frame's worth of sample-period-steps, in seconds frame_size_s: float = len(array) * timeframe @@ -556,6 +577,7 @@ async def start_backfill( shm, to_push, prepend_index=next_prepend_index, + backfill_until_dt=backfill_until_dt, update_start_on_prepend=update_start_on_prepend, ) await sampler_stream.send({ @@ -585,6 +607,7 @@ async def start_backfill( shm, to_push, prepend_index=next_prepend_index, + backfill_until_dt=backfill_until_dt, update_start_on_prepend=update_start_on_prepend, ) await sampler_stream.send({ @@ -899,7 +922,7 @@ async def load_tsdb_hist( DateTime, ] try: - tsdb_entry: tuple|None = await storage.load( + tsdb_entry: tuple|None = await storage.load( fqme, timeframe=timeframe, ) @@ -1056,7 +1079,7 @@ async def tsdb_backfill( trio.open_nursery() as tn, ): - bf_done = await tn.start( + bf_done: trio.Event = await tn.start( partial( start_backfill, get_hist=get_hist, @@ -1076,8 +1099,10 @@ async def tsdb_backfill( write_tsdb=True, ) ) - nulls_detected: trio.Event | None = None + nulls_detected: trio.Event|None = None + if last_tsdb_dt is not None: + # calc the index from which the tsdb data should be # prepended, presuming there is a gap between the # latest frame (loaded/read above) and the latest @@ -1148,7 +1173,7 @@ async def tsdb_backfill( # TODO: ideally these can never exist! # -[ ] somehow it seems sometimes we're writing zero-ed # segments to tsdbs during teardown? - # -[ ] can we ensure that the backcfiller tasks do this + # -[ ] can we ensure that the backfiller tasks do this # work PREVENTAVELY instead? # -[ ] fill in non-zero epoch time values ALWAYS! # await maybe_fill_null_segments( @@ -1160,6 +1185,7 @@ async def tsdb_backfill( get_hist=get_hist, sampler_stream=sampler_stream, mkt=mkt, + backfill_until_dt=last_tsdb_dt, )) # 2nd nursery END -- 2.34.1 From ad299789db9ef964246a3f76510fd1dc87d2ed15 Mon Sep 17 00:00:00 2001 From: goodboy Date: Wed, 21 Jan 2026 23:52:12 -0500 Subject: [PATCH 12/12] Mv `markup_gaps()` to new `.tsp._annotate` mod --- piker/storage/cli.py | 141 +--------------------------------- piker/tsp/__init__.py | 3 + piker/tsp/_annotate.py | 166 +++++++++++++++++++++++++++++++++++++++++ 3 files changed, 170 insertions(+), 140 deletions(-) create mode 100644 piker/tsp/_annotate.py diff --git a/piker/storage/cli.py b/piker/storage/cli.py index e97f4023..90d5baed 100644 --- a/piker/storage/cli.py +++ b/piker/storage/cli.py @@ -19,16 +19,10 @@ Storage middle-ware CLIs. """ from __future__ import annotations -# from datetime import datetime -# from contextlib import ( -# AsyncExitStack, -# ) from pathlib import Path -from math import copysign import time from types import ModuleType from typing import ( - Any, TYPE_CHECKING, ) @@ -47,7 +41,6 @@ from piker.data import ( ShmArray, ) from piker import tsp -from piker.data._formatters import BGM from . import log from . import ( __tsdbs__, @@ -242,138 +235,6 @@ def anal( trio.run(main) -# TODO, move to `.tsp._annotate` -async def markup_gaps( - fqme: str, - timeframe: float, - actl: AnnotCtl, - wdts: pl.DataFrame, - gaps: pl.DataFrame, - -) -> dict[int, dict]: - ''' - Remote annotate time-gaps in a dt-fielded ts (normally OHLC) - with rectangles. - - ''' - aids: dict[int] = {} - for i in range(gaps.height): - - row: pl.DataFrame = gaps[i] - - # the gap's RIGHT-most bar's OPEN value - # at that time (sample) step. - iend: int = row['index'][0] - # dt: datetime = row['dt'][0] - # dt_prev: datetime = row['dt_prev'][0] - # dt_end_t: float = dt.timestamp() - - - # TODO: can we eventually remove this - # once we figure out why the epoch cols - # don't match? - # TODO: FIX HOW/WHY these aren't matching - # and are instead off by 4hours (EST - # vs. UTC?!?!) - # end_t: float = row['time'] - # assert ( - # dt.timestamp() - # == - # end_t - # ) - - # the gap's LEFT-most bar's CLOSE value - # at that time (sample) step. - prev_r: pl.DataFrame = wdts.filter( - pl.col('index') == iend - 1 - ) - # XXX: probably a gap in the (newly sorted or de-duplicated) - # dt-df, so we might need to re-index first.. - dt: pl.Series = row['dt'] - dt_prev: pl.Series = row['dt_prev'] - if prev_r.is_empty(): - - # XXX, filter out any special ignore cases, - # - UNIX-epoch stamped datums - # - first row - if ( - dt_prev.dt.epoch()[0] == 0 - or - dt.dt.epoch()[0] == 0 - ): - log.warning('Skipping row with UNIX epoch timestamp ??') - continue - - if wdts[0]['index'][0] == iend: # first row - log.warning('Skipping first-row (has no previous obvi) !!') - continue - - # XXX, if the previous-row by shm-index is missing, - # meaning there is a missing sample (set), get the prior - # row by df index and attempt to use it? - i_wdts: pl.DataFrame = wdts.with_row_index(name='i') - i_row: int = i_wdts.filter(pl.col('index') == iend)['i'][0] - prev_row_by_i = wdts[i_row] - prev_r: pl.DataFrame = prev_row_by_i - - # debug any missing pre-row - if tractor._state.is_debug_mode(): - await tractor.pause() - - istart: int = prev_r['index'][0] - - # TODO: implement px-col width measure - # and ensure at least as many px-cols - # shown per rect as configured by user. - # gap_w: float = abs((iend - istart)) - # if gap_w < 6: - # margin: float = 6 - # iend += margin - # istart -= margin - - rect_gap: float = BGM*3/8 - opn: float = row['open'][0] - ro: tuple[float, float] = ( - # dt_end_t, - iend + rect_gap + 1, - opn, - ) - cls: float = prev_r['close'][0] - lc: tuple[float, float] = ( - # dt_start_t, - istart - rect_gap, # + 1 , - cls, - ) - - color: str = 'dad_blue' - diff: float = cls - opn - sgn: float = copysign(1, diff) - color: str = { - -1: 'buy_green', - 1: 'sell_red', - }[sgn] - - rect_kwargs: dict[str, Any] = dict( - fqme=fqme, - timeframe=timeframe, - start_pos=lc, - end_pos=ro, - color=color, - ) - - aid: int = await actl.add_rect(**rect_kwargs) - assert aid - aids[aid] = rect_kwargs - - # tell chart to redraw all its - # graphics view layers Bo - await actl.redraw( - fqme=fqme, - timeframe=timeframe, - ) - return aids - - @store.command() def ldshm( fqme: str, @@ -577,7 +438,7 @@ def ldshm( do_markup_gaps: bool = True if do_markup_gaps: new_df: pl.DataFrame = tsp.np2pl(new) - aids: dict = await markup_gaps( + aids: dict = await tsp._annotate.markup_gaps( fqme, period_s, actl, diff --git a/piker/tsp/__init__.py b/piker/tsp/__init__.py index 81274ed8..baa28c82 100644 --- a/piker/tsp/__init__.py +++ b/piker/tsp/__init__.py @@ -47,3 +47,6 @@ from ._history import ( iter_dfs_from_shms as iter_dfs_from_shms, manage_history as manage_history, ) +from ._annotate import ( + markup_gaps as markup_gaps, +) diff --git a/piker/tsp/_annotate.py b/piker/tsp/_annotate.py new file mode 100644 index 00000000..797c38cf --- /dev/null +++ b/piker/tsp/_annotate.py @@ -0,0 +1,166 @@ +# piker: trading gear for hackers +# Copyright (C) 2018-present Tyler Goodlet (in stewardship of pikers) + +# This program is free software: you can redistribute it and/or modify +# it under the terms of the GNU Affero General Public License as published by +# the Free Software Foundation, either version 3 of the License, or +# (at your option) any later version. + +# This program is distributed in the hope that it will be useful, +# but WITHOUT ANY WARRANTY; without even the implied warranty of +# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the +# GNU Affero General Public License for more details. + +# You should have received a copy of the GNU Affero General Public License +# along with this program. If not, see . + +""" +Time-series (remote) annotation APIs. + +""" +from __future__ import annotations +from math import copysign +from typing import ( + Any, + TYPE_CHECKING, +) + +import polars as pl +import tractor + +from piker.data._formatters import BGM +from piker.storage import log + +if TYPE_CHECKING: + from piker.ui._remote_ctl import AnnotCtl + + +async def markup_gaps( + fqme: str, + timeframe: float, + actl: AnnotCtl, + wdts: pl.DataFrame, + gaps: pl.DataFrame, + +) -> dict[int, dict]: + ''' + Remote annotate time-gaps in a dt-fielded ts (normally OHLC) + with rectangles. + + ''' + aids: dict[int] = {} + for i in range(gaps.height): + + row: pl.DataFrame = gaps[i] + + # the gap's RIGHT-most bar's OPEN value + # at that time (sample) step. + iend: int = row['index'][0] + # dt: datetime = row['dt'][0] + # dt_prev: datetime = row['dt_prev'][0] + # dt_end_t: float = dt.timestamp() + + + # TODO: can we eventually remove this + # once we figure out why the epoch cols + # don't match? + # TODO: FIX HOW/WHY these aren't matching + # and are instead off by 4hours (EST + # vs. UTC?!?!) + # end_t: float = row['time'] + # assert ( + # dt.timestamp() + # == + # end_t + # ) + + # the gap's LEFT-most bar's CLOSE value + # at that time (sample) step. + prev_r: pl.DataFrame = wdts.filter( + pl.col('index') == iend - 1 + ) + # XXX: probably a gap in the (newly sorted or de-duplicated) + # dt-df, so we might need to re-index first.. + dt: pl.Series = row['dt'] + dt_prev: pl.Series = row['dt_prev'] + if prev_r.is_empty(): + + # XXX, filter out any special ignore cases, + # - UNIX-epoch stamped datums + # - first row + if ( + dt_prev.dt.epoch()[0] == 0 + or + dt.dt.epoch()[0] == 0 + ): + log.warning('Skipping row with UNIX epoch timestamp ??') + continue + + if wdts[0]['index'][0] == iend: # first row + log.warning('Skipping first-row (has no previous obvi) !!') + continue + + # XXX, if the previous-row by shm-index is missing, + # meaning there is a missing sample (set), get the prior + # row by df index and attempt to use it? + i_wdts: pl.DataFrame = wdts.with_row_index(name='i') + i_row: int = i_wdts.filter(pl.col('index') == iend)['i'][0] + prev_row_by_i = wdts[i_row] + prev_r: pl.DataFrame = prev_row_by_i + + # debug any missing pre-row + if tractor._state.is_debug_mode(): + await tractor.pause() + + istart: int = prev_r['index'][0] + + # TODO: implement px-col width measure + # and ensure at least as many px-cols + # shown per rect as configured by user. + # gap_w: float = abs((iend - istart)) + # if gap_w < 6: + # margin: float = 6 + # iend += margin + # istart -= margin + + rect_gap: float = BGM*3/8 + opn: float = row['open'][0] + ro: tuple[float, float] = ( + # dt_end_t, + iend + rect_gap + 1, + opn, + ) + cls: float = prev_r['close'][0] + lc: tuple[float, float] = ( + # dt_start_t, + istart - rect_gap, # + 1 , + cls, + ) + + color: str = 'dad_blue' + diff: float = cls - opn + sgn: float = copysign(1, diff) + color: str = { + -1: 'buy_green', + 1: 'sell_red', + }[sgn] + + rect_kwargs: dict[str, Any] = dict( + fqme=fqme, + timeframe=timeframe, + start_pos=lc, + end_pos=ro, + color=color, + ) + + aid: int = await actl.add_rect(**rect_kwargs) + assert aid + aids[aid] = rect_kwargs + + # tell chart to redraw all its + # graphics view layers Bo + await actl.redraw( + fqme=fqme, + timeframe=timeframe, + ) + return aids -- 2.34.1