Create `piker.tsp` "time series processing" subpkg

Move `.data.history` -> `.tsp.__init__.py` for now as main pkg-mod
and `.data.tsp` -> `.tsp._anal` (for analysis).

Obviously follow commits will change surrounding codebase (imports) to
match..
distribute_dis
Tyler Goodlet 2023-12-18 11:48:33 -05:00
parent d5d68f75ea
commit 4568c55f17
2 changed files with 125 additions and 55 deletions

View File

@ -32,6 +32,7 @@ from __future__ import annotations
from datetime import datetime from datetime import datetime
from functools import partial from functools import partial
from pathlib import Path from pathlib import Path
from pprint import pformat
from types import ModuleType from types import ModuleType
from typing import ( from typing import (
Callable, Callable,
@ -53,25 +54,64 @@ import polars as pl
from ..accounting import ( from ..accounting import (
MktPair, MktPair,
) )
from ._util import ( from ..data._util import (
log, log,
) )
from ._sharedmem import ( from ..data._sharedmem import (
maybe_open_shm_array, maybe_open_shm_array,
ShmArray, ShmArray,
) )
from ._source import def_iohlcv_fields from ..data._source import def_iohlcv_fields
from ._sampling import ( from ..data._sampling import (
open_sample_stream, open_sample_stream,
) )
from .tsp import ( from ._anal import (
dedupe,
get_null_segs, get_null_segs,
iter_null_segs, iter_null_segs,
sort_diff,
Frame, Frame,
# Seq, Seq,
# codec-ish
np2pl,
pl2np,
# `numpy` only
slice_from_time,
# `polars` specific
dedupe,
with_dts,
detect_time_gaps,
sort_diff,
# TODO:
detect_price_gaps
) )
__all__: list[str] = [
'dedupe',
'get_null_segs',
'iter_null_segs',
'sort_diff',
'slice_from_time',
'Frame',
'Seq',
'np2pl',
'pl2np',
'slice_from_time',
'with_dts',
'detect_time_gaps',
'sort_diff',
# TODO:
'detect_price_gaps'
]
# TODO: break up all this shite into submods!
from ..brokers._util import ( from ..brokers._util import (
DataUnavailable, DataUnavailable,
) )
@ -252,21 +292,54 @@ async def maybe_fill_null_segments(
and and
len(null_segs[-1]) len(null_segs[-1])
): ):
await tractor.pause() (
iabs_slices,
iabs_zero_rows,
zero_t,
) = null_segs
log.warning(
f'{len(iabs_slices)} NULL TIME SEGMENTS DETECTED!\n'
f'{pformat(iabs_slices)}'
)
array = shm.array # TODO: always backfill gaps with the earliest (price) datum's
zeros = array[array['low'] == 0]
# always backfill gaps with the earliest (price) datum's
# value to avoid the y-ranger including zeros and completely # value to avoid the y-ranger including zeros and completely
# stretching the y-axis.. # stretching the y-axis..
if 0 < zeros.size: # array: np.ndarray = shm.array
zeros[[ # zeros = array[array['low'] == 0]
ohlc_fields: list[str] = [
'open', 'open',
'high', 'high',
'low', 'low',
'close', 'close',
]] = shm._array[zeros['index'][0] - 1]['close'] ]
for istart, istop in iabs_slices:
# get view into buffer for null-segment
gap: np.ndarray = shm._array[istart:istop]
# copy the oldest OHLC samples forward
gap[ohlc_fields] = shm._array[istart]['close']
start_t: float = shm._array[istart]['time']
t_diff: float = (istop - istart)*timeframe
gap['time'] = np.arange(
start=start_t,
stop=start_t + t_diff,
step=timeframe,
)
await sampler_stream.send({
'broadcast_all': {
# XXX NOTE XXX: see the
# `.ui._display.increment_history_view()` if block
# that looks for this info to FORCE a hard viz
# redraw!
'backfilling': (mkt.fqme, timeframe),
},
})
# TODO: interatively step through any remaining # TODO: interatively step through any remaining
# time-gaps/null-segments and spawn piecewise backfiller # time-gaps/null-segments and spawn piecewise backfiller
@ -277,10 +350,7 @@ async def maybe_fill_null_segments(
# parallel possible no matter the backend? # parallel possible no matter the backend?
# -[ ] fill algo: do queries in alternating "latest, then # -[ ] fill algo: do queries in alternating "latest, then
# earliest, then latest.. etc?" # earliest, then latest.. etc?"
# if ( # await tractor.pause()
# next_end_dt not in frame[
# ):
# pass
async def start_backfill( async def start_backfill(
@ -1252,8 +1322,8 @@ def iter_dfs_from_shms(
assert not opened assert not opened
ohlcv = shm.array ohlcv = shm.array
from ..data import tsp from ._anal import np2pl
df: pl.DataFrame = tsp.np2pl(ohlcv) df: pl.DataFrame = np2pl(ohlcv)
yield ( yield (
shmfile, shmfile,

View File

@ -319,9 +319,8 @@ def get_null_segs(
if num_gaps < 1: if num_gaps < 1:
if absi_zeros.size > 1: if absi_zeros.size > 1:
absi_zsegs = [[ absi_zsegs = [[
# see `get_hist()` in backend, should ALWAYS be # TODO: maybe mk these max()/min() limits func
# able to handle a `start_dt=None`! # consts instead of called more then once?
# None,
max( max(
absi_zeros[0] - 1, absi_zeros[0] - 1,
0, 0,
@ -359,7 +358,10 @@ def get_null_segs(
# corresponding to the first zero-segment's row, we add it # corresponding to the first zero-segment's row, we add it
# manually here. # manually here.
absi_zsegs.append([ absi_zsegs.append([
max(
absi_zeros[0] - 1, absi_zeros[0] - 1,
0,
),
None, None,
]) ])
@ -400,14 +402,18 @@ def get_null_segs(
else: else:
if 0 < num_gaps < 2: if 0 < num_gaps < 2:
absi_zsegs[-1][1] = absi_zeros[-1] + 1 absi_zsegs[-1][1] = min(
absi_zeros[-1] + 1,
frame['index'][-1],
)
iabs_first: int = frame['index'][0] iabs_first: int = frame['index'][0]
for start, end in absi_zsegs: for start, end in absi_zsegs:
ts_start: float = times[start - iabs_first] ts_start: float = times[start - iabs_first]
ts_end: float = times[end - iabs_first] ts_end: float = times[end - iabs_first]
if ( if (
ts_start == 0 (ts_start == 0 and not start == 0)
or or
ts_end == 0 ts_end == 0
): ):
@ -451,11 +457,13 @@ def iter_null_segs(
], ],
None, None,
]: ]:
if null_segs is None: if not (
null_segs: tuple = get_null_segs( null_segs := get_null_segs(
frame, frame,
period=timeframe, period=timeframe,
) )
):
return
absi_pairs_zsegs: list[list[float, float]] absi_pairs_zsegs: list[list[float, float]]
izeros: Seq izeros: Seq
@ -502,6 +510,7 @@ def iter_null_segs(
) )
# TODO: move to ._pl_anal
def with_dts( def with_dts(
df: pl.DataFrame, df: pl.DataFrame,
time_col: str = 'time', time_col: str = 'time',
@ -525,19 +534,6 @@ def with_dts(
# ) # )
def dedup_dt(
df: pl.DataFrame,
) -> pl.DataFrame:
'''
Drop duplicate date-time rows (normally from an OHLC frame).
'''
return df.unique(
subset=['dt'],
maintain_order=True,
)
t_unit: Literal = Literal[ t_unit: Literal = Literal[
'days', 'days',
'hours', 'hours',
@ -651,7 +647,11 @@ def dedupe(src_df: pl.DataFrame) -> tuple[
) )
# remove duplicated datetime samples/sections # remove duplicated datetime samples/sections
deduped: pl.DataFrame = dedup_dt(df) deduped: pl.DataFrame = df.unique(
subset=['dt'],
maintain_order=True,
)
deduped_gaps = detect_time_gaps(deduped) deduped_gaps = detect_time_gaps(deduped)
diff: int = ( diff: int = (