diff --git a/piker/data/__init__.py b/piker/data/__init__.py index 25c2912a..207eeaa1 100644 --- a/piker/data/__init__.py +++ b/piker/data/__init__.py @@ -56,6 +56,7 @@ __all__: list[str] = [ 'ShmArray', 'iterticks', 'maybe_open_shm_array', + 'match_from_pairs', 'attach_shm_array', 'open_shm_array', 'get_shm_token', diff --git a/piker/data/history.py b/piker/data/history.py index 0598e08c..048769fa 100644 --- a/piker/data/history.py +++ b/piker/data/history.py @@ -406,7 +406,7 @@ async def start_backfill( # TODO: ideally these never exist but somehow it seems # sometimes we're writing zero-ed segments on certain # (teardown) cases? - from ._timeseries import detect_null_time_gap + from .tsp import detect_null_time_gap gap_indices: tuple | None = detect_null_time_gap(shm) while gap_indices: diff --git a/piker/data/_timeseries.py b/piker/data/tsp.py similarity index 87% rename from piker/data/_timeseries.py rename to piker/data/tsp.py index 6da534b4..3f293d83 100644 --- a/piker/data/_timeseries.py +++ b/piker/data/tsp.py @@ -23,11 +23,12 @@ Routines are generally implemented in either ``numpy`` or ''' from __future__ import annotations -from typing import Literal +from functools import partial from math import ( ceil, floor, ) +from typing import Literal import numpy as np import polars as pl @@ -38,6 +39,18 @@ from ..toolz.profile import ( pg_profile_enabled, ms_slower_then, ) +from ..log import ( + get_logger, + get_console_log, +) +# for "time series processing" +subsys: str = 'piker.tsp' + +log = get_logger(subsys) +get_console_log = partial( + get_console_log, + name=subsys, +) def slice_from_time( @@ -349,3 +362,49 @@ def detect_price_gaps( # (pl.col(time_col) - pl.col(f'{time_col}_previous')).alias('diff'), # ]) ... + + +def dedupe(src_df: pl.DataFrame) -> tuple[ + pl.DataFrame, # with dts + pl.DataFrame, # gaps + pl.DataFrame, # with deduplicated dts (aka gap/repeat removal) + bool, +]: + ''' + Check for time series gaps and if found + de-duplicate any datetime entries, check for + a frame height diff and return the newly + dt-deduplicated frame. + + ''' + df: pl.DataFrame = with_dts(src_df) + gaps: pl.DataFrame = detect_time_gaps(df) + if not gaps.is_empty(): + + # remove duplicated datetime samples/sections + deduped: pl.DataFrame = dedup_dt(df) + deduped_gaps = detect_time_gaps(deduped) + + log.warning( + f'Gaps found:\n{gaps}\n' + f'deduped Gaps found:\n{deduped_gaps}' + ) + # TODO: rewrite this in polars and/or convert to + # ndarray to detect and remove? + # null_gaps = detect_null_time_gap() + + diff: int = ( + df.height + - + deduped.height + ) + was_deduped: bool = False + if diff: + was_deduped: bool = True + + return ( + df, + gaps, + deduped, + was_deduped, + ) diff --git a/piker/storage/cli.py b/piker/storage/cli.py index c9f5bb74..34d46046 100644 --- a/piker/storage/cli.py +++ b/piker/storage/cli.py @@ -40,6 +40,7 @@ from piker.data import ( maybe_open_shm_array, def_iohlcv_fields, ShmArray, + tsp, ) from piker.data.history import ( _default_hist_size, @@ -136,53 +137,6 @@ def delete( trio.run(main, symbols) -def dedupe(src_df: pl.DataFrame) -> tuple[ - pl.DataFrame, # with dts - pl.DataFrame, # gaps - pl.DataFrame, # with deduplicated dts (aka gap/repeat removal) - bool, -]: - ''' - Check for time series gaps and if found - de-duplicate any datetime entries, check for - a frame height diff and return the newly - dt-deduplicated frame. - - ''' - from piker.data import _timeseries as tsp - df: pl.DataFrame = tsp.with_dts(src_df) - gaps: pl.DataFrame = tsp.detect_time_gaps(df) - if not gaps.is_empty(): - - # remove duplicated datetime samples/sections - deduped: pl.DataFrame = tsp.dedup_dt(df) - deduped_gaps = tsp.detect_time_gaps(deduped) - - log.warning( - f'Gaps found:\n{gaps}\n' - f'deduped Gaps found:\n{deduped_gaps}' - ) - # TODO: rewrite this in polars and/or convert to - # ndarray to detect and remove? - # null_gaps = tsp.detect_null_time_gap() - - diff: int = ( - df.height - - - deduped.height - ) - was_deduped: bool = False - if diff: - was_deduped: bool = True - - return ( - df, - gaps, - deduped, - was_deduped, - ) - - @store.command() def anal( fqme: str, @@ -236,7 +190,7 @@ def anal( gaps, deduped, shortened, - ) = dedupe(shm_df) + ) = tsp.dedupe(shm_df) if shortened: await client.write_ohlcv( @@ -371,7 +325,7 @@ def ldshm( gaps, deduped, was_dded, - ) = dedupe(shm_df) + ) = tsp.dedupe(shm_df) # TODO: maybe only optionally enter this depending # on some CLI flags and/or gap detection? diff --git a/piker/ui/_dataviz.py b/piker/ui/_dataviz.py index c011bff0..d5a7195e 100644 --- a/piker/ui/_dataviz.py +++ b/piker/ui/_dataviz.py @@ -49,7 +49,7 @@ from ..data._formatters import ( OHLCBarsAsCurveFmtr, # OHLC converted to line StepCurveFmtr, # "step" curve (like for vlm) ) -from ..data._timeseries import ( +from ..data.tsp import ( slice_from_time, ) from ._ohlc import ( diff --git a/piker/ui/view_mode.py b/piker/ui/view_mode.py index a9a093d0..31a06645 100644 --- a/piker/ui/view_mode.py +++ b/piker/ui/view_mode.py @@ -31,7 +31,7 @@ import pendulum import pyqtgraph as pg from piker.types import Struct -from ..data._timeseries import slice_from_time +from ..data.tsp import slice_from_time from ..log import get_logger from ..toolz import Profiler