From f25248c87188ac37673d748c52f27353c6473cdb Mon Sep 17 00:00:00 2001 From: Tyler Goodlet Date: Thu, 8 Jun 2023 11:11:13 -0400 Subject: [PATCH] Add `.data._timeseries` utility mod Org all the new (time) gap detection routines here and also move in the `slice_from_time()` epoch -> index converter routine from `._pathops` B) --- piker/data/_pathops.py | 157 +------------------ piker/data/_timeseries.py | 309 ++++++++++++++++++++++++++++++++++++++ piker/storage/nativedb.py | 104 ------------- piker/ui/_dataviz.py | 2 +- piker/ui/view_mode.py | 2 +- 5 files changed, 312 insertions(+), 262 deletions(-) create mode 100644 piker/data/_timeseries.py diff --git a/piker/data/_pathops.py b/piker/data/_pathops.py index 48a11f40..a17f289a 100644 --- a/piker/data/_pathops.py +++ b/piker/data/_pathops.py @@ -1,5 +1,5 @@ # piker: trading gear for hackers -# Copyright (C) 2018-present Tyler Goodlet (in stewardship of piker0) +# Copyright (C) 2018-present Tyler Goodlet (in stewardship of pikers) # This program is free software: you can redistribute it and/or modify # it under the terms of the GNU Affero General Public License as published by @@ -289,158 +289,3 @@ def ohlc_flatten( num=len(flat), ) return x, flat - - -def slice_from_time( - arr: np.ndarray, - start_t: float, - stop_t: float, - step: float, # sampler period step-diff - -) -> slice: - ''' - Calculate array indices mapped from a time range and return them in - a slice. - - Given an input array with an epoch `'time'` series entry, calculate - the indices which span the time range and return in a slice. Presume - each `'time'` step increment is uniform and when the time stamp - series contains gaps (the uniform presumption is untrue) use - ``np.searchsorted()`` binary search to look up the appropriate - index. - - ''' - profiler = Profiler( - msg='slice_from_time()', - disabled=not pg_profile_enabled(), - ms_threshold=ms_slower_then, - ) - - times = arr['time'] - t_first = floor(times[0]) - t_last = ceil(times[-1]) - - # the greatest index we can return which slices to the - # end of the input array. - read_i_max = arr.shape[0] - - # compute (presumed) uniform-time-step index offsets - i_start_t = floor(start_t) - read_i_start = floor(((i_start_t - t_first) // step)) - 1 - - i_stop_t = ceil(stop_t) - - # XXX: edge case -> always set stop index to last in array whenever - # the input stop time is detected to be greater then the equiv time - # stamp at that last entry. - if i_stop_t >= t_last: - read_i_stop = read_i_max - else: - read_i_stop = ceil((i_stop_t - t_first) // step) + 1 - - # always clip outputs to array support - # for read start: - # - never allow a start < the 0 index - # - never allow an end index > the read array len - read_i_start = min( - max(0, read_i_start), - read_i_max - 1, - ) - read_i_stop = max( - 0, - min(read_i_stop, read_i_max), - ) - - # check for larger-then-latest calculated index for given start - # time, in which case we do a binary search for the correct index. - # NOTE: this is usually the result of a time series with time gaps - # where it is expected that each index step maps to a uniform step - # in the time stamp series. - t_iv_start = times[read_i_start] - if ( - t_iv_start > i_start_t - ): - # do a binary search for the best index mapping to ``start_t`` - # given we measured an overshoot using the uniform-time-step - # calculation from above. - - # TODO: once we start caching these per source-array, - # we can just overwrite ``read_i_start`` directly. - new_read_i_start = np.searchsorted( - times, - i_start_t, - side='left', - ) - - # TODO: minimize binary search work as much as possible: - # - cache these remap values which compensate for gaps in the - # uniform time step basis where we calc a later start - # index for the given input ``start_t``. - # - can we shorten the input search sequence by heuristic? - # up_to_arith_start = index[:read_i_start] - - if ( - new_read_i_start <= read_i_start - ): - # t_diff = t_iv_start - start_t - # print( - # f"WE'RE CUTTING OUT TIME - STEP:{step}\n" - # f'start_t:{start_t} -> 0index start_t:{t_iv_start}\n' - # f'diff: {t_diff}\n' - # f'REMAPPED START i: {read_i_start} -> {new_read_i_start}\n' - # ) - read_i_start = new_read_i_start - - t_iv_stop = times[read_i_stop - 1] - if ( - t_iv_stop > i_stop_t - ): - # t_diff = stop_t - t_iv_stop - # print( - # f"WE'RE CUTTING OUT TIME - STEP:{step}\n" - # f'calced iv stop:{t_iv_stop} -> stop_t:{stop_t}\n' - # f'diff: {t_diff}\n' - # # f'SHOULD REMAP STOP: {read_i_start} -> {new_read_i_start}\n' - # ) - new_read_i_stop = np.searchsorted( - times[read_i_start:], - # times, - i_stop_t, - side='right', - ) - - if ( - new_read_i_stop <= read_i_stop - ): - read_i_stop = read_i_start + new_read_i_stop + 1 - - # sanity checks for range size - # samples = (i_stop_t - i_start_t) // step - # index_diff = read_i_stop - read_i_start + 1 - # if index_diff > (samples + 3): - # breakpoint() - - # read-relative indexes: gives a slice where `shm.array[read_slc]` - # will be the data spanning the input time range `start_t` -> - # `stop_t` - read_slc = slice( - int(read_i_start), - int(read_i_stop), - ) - - profiler( - 'slicing complete' - # f'{start_t} -> {abs_slc.start} | {read_slc.start}\n' - # f'{stop_t} -> {abs_slc.stop} | {read_slc.stop}\n' - ) - - # NOTE: if caller needs absolute buffer indices they can - # slice the buffer abs index like so: - # index = arr['index'] - # abs_indx = index[read_slc] - # abs_slc = slice( - # int(abs_indx[0]), - # int(abs_indx[-1]), - # ) - - return read_slc diff --git a/piker/data/_timeseries.py b/piker/data/_timeseries.py new file mode 100644 index 00000000..81d380c7 --- /dev/null +++ b/piker/data/_timeseries.py @@ -0,0 +1,309 @@ +# piker: trading gear for hackers +# Copyright (C) 2018-present Tyler Goodlet (in stewardship of pikers) + +# This program is free software: you can redistribute it and/or modify +# it under the terms of the GNU Affero General Public License as published by +# the Free Software Foundation, either version 3 of the License, or +# (at your option) any later version. + +# This program is distributed in the hope that it will be useful, +# but WITHOUT ANY WARRANTY; without even the implied warranty of +# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the +# GNU Affero General Public License for more details. + +# You should have received a copy of the GNU Affero General Public License +# along with this program. If not, see . + +''' +Financial time series processing utilities usually +pertaining to OHLCV style sampled data. + +Routines are generally implemented in either ``numpy`` or ``polars`` B) + +''' +from __future__ import annotations +from typing import Literal +from math import ( + ceil, + floor, +) + +import numpy as np +import polars as pl + +from ._sharedmem import ShmArray +from .._profile import ( + Profiler, + pg_profile_enabled, + ms_slower_then, +) + + +def slice_from_time( + arr: np.ndarray, + start_t: float, + stop_t: float, + step: float, # sampler period step-diff + +) -> slice: + ''' + Calculate array indices mapped from a time range and return them in + a slice. + + Given an input array with an epoch `'time'` series entry, calculate + the indices which span the time range and return in a slice. Presume + each `'time'` step increment is uniform and when the time stamp + series contains gaps (the uniform presumption is untrue) use + ``np.searchsorted()`` binary search to look up the appropriate + index. + + ''' + profiler = Profiler( + msg='slice_from_time()', + disabled=not pg_profile_enabled(), + ms_threshold=ms_slower_then, + ) + + times = arr['time'] + t_first = floor(times[0]) + t_last = ceil(times[-1]) + + # the greatest index we can return which slices to the + # end of the input array. + read_i_max = arr.shape[0] + + # compute (presumed) uniform-time-step index offsets + i_start_t = floor(start_t) + read_i_start = floor(((i_start_t - t_first) // step)) - 1 + + i_stop_t = ceil(stop_t) + + # XXX: edge case -> always set stop index to last in array whenever + # the input stop time is detected to be greater then the equiv time + # stamp at that last entry. + if i_stop_t >= t_last: + read_i_stop = read_i_max + else: + read_i_stop = ceil((i_stop_t - t_first) // step) + 1 + + # always clip outputs to array support + # for read start: + # - never allow a start < the 0 index + # - never allow an end index > the read array len + read_i_start = min( + max(0, read_i_start), + read_i_max - 1, + ) + read_i_stop = max( + 0, + min(read_i_stop, read_i_max), + ) + + # check for larger-then-latest calculated index for given start + # time, in which case we do a binary search for the correct index. + # NOTE: this is usually the result of a time series with time gaps + # where it is expected that each index step maps to a uniform step + # in the time stamp series. + t_iv_start = times[read_i_start] + if ( + t_iv_start > i_start_t + ): + # do a binary search for the best index mapping to ``start_t`` + # given we measured an overshoot using the uniform-time-step + # calculation from above. + + # TODO: once we start caching these per source-array, + # we can just overwrite ``read_i_start`` directly. + new_read_i_start = np.searchsorted( + times, + i_start_t, + side='left', + ) + + # TODO: minimize binary search work as much as possible: + # - cache these remap values which compensate for gaps in the + # uniform time step basis where we calc a later start + # index for the given input ``start_t``. + # - can we shorten the input search sequence by heuristic? + # up_to_arith_start = index[:read_i_start] + + if ( + new_read_i_start <= read_i_start + ): + # t_diff = t_iv_start - start_t + # print( + # f"WE'RE CUTTING OUT TIME - STEP:{step}\n" + # f'start_t:{start_t} -> 0index start_t:{t_iv_start}\n' + # f'diff: {t_diff}\n' + # f'REMAPPED START i: {read_i_start} -> {new_read_i_start}\n' + # ) + read_i_start = new_read_i_start + + t_iv_stop = times[read_i_stop - 1] + if ( + t_iv_stop > i_stop_t + ): + # t_diff = stop_t - t_iv_stop + # print( + # f"WE'RE CUTTING OUT TIME - STEP:{step}\n" + # f'calced iv stop:{t_iv_stop} -> stop_t:{stop_t}\n' + # f'diff: {t_diff}\n' + # # f'SHOULD REMAP STOP: {read_i_start} -> {new_read_i_start}\n' + # ) + new_read_i_stop = np.searchsorted( + times[read_i_start:], + # times, + i_stop_t, + side='right', + ) + + if ( + new_read_i_stop <= read_i_stop + ): + read_i_stop = read_i_start + new_read_i_stop + 1 + + # sanity checks for range size + # samples = (i_stop_t - i_start_t) // step + # index_diff = read_i_stop - read_i_start + 1 + # if index_diff > (samples + 3): + # breakpoint() + + # read-relative indexes: gives a slice where `shm.array[read_slc]` + # will be the data spanning the input time range `start_t` -> + # `stop_t` + read_slc = slice( + int(read_i_start), + int(read_i_stop), + ) + + profiler( + 'slicing complete' + # f'{start_t} -> {abs_slc.start} | {read_slc.start}\n' + # f'{stop_t} -> {abs_slc.stop} | {read_slc.stop}\n' + ) + + # NOTE: if caller needs absolute buffer indices they can + # slice the buffer abs index like so: + # index = arr['index'] + # abs_indx = index[read_slc] + # abs_slc = slice( + # int(abs_indx[0]), + # int(abs_indx[-1]), + # ) + + return read_slc + + +def detect_null_time_gap(shm: ShmArray) -> tuple[float, float] | None: + # detect if there are any zero-epoch stamped rows + zero_pred: np.ndarray = shm.array['time'] == 0 + zero_t: np.ndarray = shm.array[zero_pred] + if zero_t.size: + istart, iend = zero_t['index'][[0, -1]] + start, end = shm._array['time'][ + [istart - 2, iend + 2] + ] + return istart - 2, start, end, iend + 2 + + return None + + +t_unit: Literal[ + 'days', + 'hours', + 'minutes', + 'seconds', + 'miliseconds', + 'microseconds', + 'nanoseconds', +] + + +def with_dts( + df: pl.DataFrame, + time_col: str = 'time', +) -> pl.DataFrame: + ''' + Insert datetime (casted) columns to a (presumably) OHLC sampled + time series with an epoch-time column keyed by ``time_col``. + + ''' + return df.with_columns([ + pl.col(time_col).shift(1).suffix('_prev'), + pl.col(time_col).diff().alias('s_diff'), + pl.from_epoch(pl.col(time_col)).alias('dt'), + ]).with_columns([ + pl.from_epoch(pl.col(f'{time_col}_prev')).alias('dt_prev'), + pl.col('dt').diff().alias('dt_diff'), + ]) #.with_columns( + # pl.col('dt').diff().dt.days().alias('days_dt_diff'), + # ) + + +def detect_time_gaps( + df: pl.DataFrame, + + time_col: str = 'time', + # epoch sampling step diff + expect_period: float = 60, + + # datetime diff unit and gap value + # crypto mkts + # gap_dt_unit: t_unit = 'minutes', + # gap_thresh: int = 1, + + # legacy stock mkts + gap_dt_unit: t_unit = 'days', + gap_thresh: int = 2, + +) -> pl.DataFrame: + ''' + Filter to OHLC datums which contain sample step gaps. + + For eg. legacy markets which have venue close gaps and/or + actual missing data segments. + + ''' + dt_gap_col: str = f'{gap_dt_unit}_diff' + return with_dts( + df + ).filter( + pl.col('s_diff').abs() > expect_period + ).with_columns( + getattr( + pl.col('dt_diff').dt, + gap_dt_unit, # NOTE: must be valid ``Expr.dt.`` + )().alias(dt_gap_col) + ).filter( + pl.col(dt_gap_col).abs() > gap_thresh + ) + + +def detect_price_gaps( + df: pl.DataFrame, + gt_multiplier: float = 2., + price_fields: list[str] = ['high', 'low'], + +) -> pl.DataFrame: + ''' + Detect gaps in clearing price over an OHLC series. + + 2 types of gaps generally exist; up gaps and down gaps: + + - UP gap: when any next sample's lo price is strictly greater + then the current sample's hi price. + + - DOWN gap: when any next sample's hi price is strictly + less then the current samples lo price. + + ''' + # return df.filter( + # pl.col('high') - ) > expect_period, + # ).select([ + # pl.dt.datetime(pl.col(time_col).shift(1)).suffix('_previous'), + # pl.all(), + # ]).select([ + # pl.all(), + # (pl.col(time_col) - pl.col(f'{time_col}_previous')).alias('diff'), + # ]) + ... diff --git a/piker/storage/nativedb.py b/piker/storage/nativedb.py index 4a6ecf0e..e96856d1 100644 --- a/piker/storage/nativedb.py +++ b/piker/storage/nativedb.py @@ -54,9 +54,6 @@ from contextlib import asynccontextmanager as acm from datetime import datetime from pathlib import Path import time -from typing import ( - Literal, -) # from bidict import bidict # import tractor @@ -374,104 +371,3 @@ async def get_client( client = NativeStorageClient(datadir) client.index_files() yield client - - -def with_dts( - df: pl.DataFrame, - time_col: str = 'time', -) -> pl.DataFrame: - ''' - Insert datetime (casted) columns to a (presumably) OHLC sampled - time series with an epoch-time column keyed by ``time_col``. - - ''' - return df.with_columns([ - pl.col(time_col).shift(1).suffix('_prev'), - pl.col(time_col).diff().alias('s_diff'), - pl.from_epoch(pl.col(time_col)).alias('dt'), - ]).with_columns([ - pl.from_epoch(pl.col(f'{time_col}_prev')).alias('dt_prev'), - pl.col('dt').diff().alias('dt_diff'), - ]) #.with_columns( - # pl.col('dt').diff().dt.days().alias('days_dt_diff'), - # ) - - -t_unit: Literal[ - 'days', - 'hours', - 'minutes', - 'seconds', - 'miliseconds', - 'microseconds', - 'nanoseconds', -] - - -def detect_time_gaps( - df: pl.DataFrame, - - time_col: str = 'time', - # epoch sampling step diff - expect_period: float = 60, - - # datetime diff unit and gap value - # crypto mkts - # gap_dt_unit: t_unit = 'minutes', - # gap_thresh: int = 1, - - # legacy stock mkts - gap_dt_unit: t_unit = 'days', - gap_thresh: int = 2, - -) -> pl.DataFrame: - ''' - Filter to OHLC datums which contain sample step gaps. - - For eg. legacy markets which have venue close gaps and/or - actual missing data segments. - - ''' - dt_gap_col: str = f'{gap_dt_unit}_diff' - return with_dts( - df - ).filter( - pl.col('s_diff').abs() > expect_period - ).with_columns( - getattr( - pl.col('dt_diff').dt, - gap_dt_unit, # NOTE: must be valid ``Expr.dt.`` - )().alias(dt_gap_col) - ).filter( - pl.col(dt_gap_col).abs() > gap_thresh - ) - - -def detect_price_gaps( - df: pl.DataFrame, - gt_multiplier: float = 2., - price_fields: list[str] = ['high', 'low'], - -) -> pl.DataFrame: - ''' - Detect gaps in clearing price over an OHLC series. - - 2 types of gaps generally exist; up gaps and down gaps: - - - UP gap: when any next sample's lo price is strictly greater - then the current sample's hi price. - - - DOWN gap: when any next sample's hi price is strictly - less then the current samples lo price. - - ''' - # return df.filter( - # pl.col('high') - ) > expect_period, - # ).select([ - # pl.dt.datetime(pl.col(time_col).shift(1)).suffix('_previous'), - # pl.all(), - # ]).select([ - # pl.all(), - # (pl.col(time_col) - pl.col(f'{time_col}_previous')).alias('diff'), - # ]) - ... diff --git a/piker/ui/_dataviz.py b/piker/ui/_dataviz.py index 57a477d1..242386fa 100644 --- a/piker/ui/_dataviz.py +++ b/piker/ui/_dataviz.py @@ -49,7 +49,7 @@ from ..data._formatters import ( OHLCBarsAsCurveFmtr, # OHLC converted to line StepCurveFmtr, # "step" curve (like for vlm) ) -from ..data._pathops import ( +from ..data._timeseries import ( slice_from_time, ) from ._ohlc import ( diff --git a/piker/ui/view_mode.py b/piker/ui/view_mode.py index ecb62557..78e58f7a 100644 --- a/piker/ui/view_mode.py +++ b/piker/ui/view_mode.py @@ -30,7 +30,7 @@ import pendulum import pyqtgraph as pg from ..data.types import Struct -from ..data._pathops import slice_from_time +from ..data._timeseries import slice_from_time from ..log import get_logger from .._profile import Profiler