Move `dedupe()` to `.data.tsp` (so it has pals)

Includes a rename of `.data._timeseries` -> `.data.tsp` for "time series
processing", making it a public sub-mod; it contains a highly useful set
of data-frame and `numpy.ndarray` ops routines in various subsystems Bo
distribute_dis
Tyler Goodlet 2023-12-11 16:07:19 -05:00
parent 7311000846
commit b94582cb35
6 changed files with 67 additions and 53 deletions

View File

@ -56,6 +56,7 @@ __all__: list[str] = [
'ShmArray',
'iterticks',
'maybe_open_shm_array',
'match_from_pairs',
'attach_shm_array',
'open_shm_array',
'get_shm_token',

View File

@ -406,7 +406,7 @@ async def start_backfill(
# TODO: ideally these never exist but somehow it seems
# sometimes we're writing zero-ed segments on certain
# (teardown) cases?
from ._timeseries import detect_null_time_gap
from .tsp import detect_null_time_gap
gap_indices: tuple | None = detect_null_time_gap(shm)
while gap_indices:

View File

@ -23,11 +23,12 @@ Routines are generally implemented in either ``numpy`` or
'''
from __future__ import annotations
from typing import Literal
from functools import partial
from math import (
ceil,
floor,
)
from typing import Literal
import numpy as np
import polars as pl
@ -38,6 +39,18 @@ from ..toolz.profile import (
pg_profile_enabled,
ms_slower_then,
)
from ..log import (
get_logger,
get_console_log,
)
# for "time series processing"
subsys: str = 'piker.tsp'
log = get_logger(subsys)
get_console_log = partial(
get_console_log,
name=subsys,
)
def slice_from_time(
@ -349,3 +362,49 @@ def detect_price_gaps(
# (pl.col(time_col) - pl.col(f'{time_col}_previous')).alias('diff'),
# ])
...
def dedupe(src_df: pl.DataFrame) -> tuple[
pl.DataFrame, # with dts
pl.DataFrame, # gaps
pl.DataFrame, # with deduplicated dts (aka gap/repeat removal)
bool,
]:
'''
Check for time series gaps and if found
de-duplicate any datetime entries, check for
a frame height diff and return the newly
dt-deduplicated frame.
'''
df: pl.DataFrame = with_dts(src_df)
gaps: pl.DataFrame = detect_time_gaps(df)
if not gaps.is_empty():
# remove duplicated datetime samples/sections
deduped: pl.DataFrame = dedup_dt(df)
deduped_gaps = detect_time_gaps(deduped)
log.warning(
f'Gaps found:\n{gaps}\n'
f'deduped Gaps found:\n{deduped_gaps}'
)
# TODO: rewrite this in polars and/or convert to
# ndarray to detect and remove?
# null_gaps = detect_null_time_gap()
diff: int = (
df.height
-
deduped.height
)
was_deduped: bool = False
if diff:
was_deduped: bool = True
return (
df,
gaps,
deduped,
was_deduped,
)

View File

@ -40,6 +40,7 @@ from piker.data import (
maybe_open_shm_array,
def_iohlcv_fields,
ShmArray,
tsp,
)
from piker.data.history import (
_default_hist_size,
@ -136,53 +137,6 @@ def delete(
trio.run(main, symbols)
def dedupe(src_df: pl.DataFrame) -> tuple[
pl.DataFrame, # with dts
pl.DataFrame, # gaps
pl.DataFrame, # with deduplicated dts (aka gap/repeat removal)
bool,
]:
'''
Check for time series gaps and if found
de-duplicate any datetime entries, check for
a frame height diff and return the newly
dt-deduplicated frame.
'''
from piker.data import _timeseries as tsp
df: pl.DataFrame = tsp.with_dts(src_df)
gaps: pl.DataFrame = tsp.detect_time_gaps(df)
if not gaps.is_empty():
# remove duplicated datetime samples/sections
deduped: pl.DataFrame = tsp.dedup_dt(df)
deduped_gaps = tsp.detect_time_gaps(deduped)
log.warning(
f'Gaps found:\n{gaps}\n'
f'deduped Gaps found:\n{deduped_gaps}'
)
# TODO: rewrite this in polars and/or convert to
# ndarray to detect and remove?
# null_gaps = tsp.detect_null_time_gap()
diff: int = (
df.height
-
deduped.height
)
was_deduped: bool = False
if diff:
was_deduped: bool = True
return (
df,
gaps,
deduped,
was_deduped,
)
@store.command()
def anal(
fqme: str,
@ -236,7 +190,7 @@ def anal(
gaps,
deduped,
shortened,
) = dedupe(shm_df)
) = tsp.dedupe(shm_df)
if shortened:
await client.write_ohlcv(
@ -371,7 +325,7 @@ def ldshm(
gaps,
deduped,
was_dded,
) = dedupe(shm_df)
) = tsp.dedupe(shm_df)
# TODO: maybe only optionally enter this depending
# on some CLI flags and/or gap detection?

View File

@ -49,7 +49,7 @@ from ..data._formatters import (
OHLCBarsAsCurveFmtr, # OHLC converted to line
StepCurveFmtr, # "step" curve (like for vlm)
)
from ..data._timeseries import (
from ..data.tsp import (
slice_from_time,
)
from ._ohlc import (

View File

@ -31,7 +31,7 @@ import pendulum
import pyqtgraph as pg
from piker.types import Struct
from ..data._timeseries import slice_from_time
from ..data.tsp import slice_from_time
from ..log import get_logger
from ..toolz import Profiler