diff --git a/piker/data/history.py b/piker/data/history.py index 2a4c7a81..65514a28 100644 --- a/piker/data/history.py +++ b/piker/data/history.py @@ -16,19 +16,26 @@ # . ''' -Historical data business logic for load, backfill and tsdb storage. +Historical TSP (time-series processing) lowlevel mgmt machinery and biz logic for, + +- hi-level biz logics using the `.storage` subpkg APIs for (I/O) + orchestration and mgmt of tsdb data sets. +- core data-provider history backfilling middleware (as task-funcs) via + (what will eventually be `datad`, but are rn is the) `.brokers` backend + APIs. +- various data set cleaning, repairing and issue-detection/analysis + routines to ensure consistent series whether in shm or when + stored offline (in a tsdb). ''' from __future__ import annotations -# from collections import ( -# Counter, -# ) from datetime import datetime from functools import partial -# import time +from pathlib import Path from types import ModuleType from typing import ( Callable, + Generator, TYPE_CHECKING, ) @@ -118,6 +125,7 @@ def diff_history( return array[times >= prepend_until_dt.timestamp()] +# TODO: can't we just make this a sync func now? async def shm_push_in_between( shm: ShmArray, to_push: np.ndarray, @@ -126,6 +134,10 @@ async def shm_push_in_between( update_start_on_prepend: bool = False, ) -> int: + # XXX: extremely important, there can be no checkpoints + # in the body of this func to avoid entering new ``frames`` + # values while we're pipelining the current ones to + # memory... shm.push( to_push, prepend=True, @@ -146,24 +158,6 @@ async def shm_push_in_between( else None ), ) - # XXX: extremely important, there can be no checkpoints - # in the block above to avoid entering new ``frames`` - # values while we're pipelining the current ones to - # memory... - array = shm.array - zeros = array[array['low'] == 0] - - # always backfill gaps with the earliest (price) datum's - # value to avoid the y-ranger including zeros and completely - # stretching the y-axis.. - if 0 < zeros.size: - zeros[[ - 'open', - 'high', - 'low', - 'close', - ]] = shm._array[zeros['index'][0] - 1]['close'] - # await tractor.pause() async def maybe_fill_null_segments( @@ -260,6 +254,20 @@ async def maybe_fill_null_segments( ): await tractor.pause() + array = shm.array + zeros = array[array['low'] == 0] + + # always backfill gaps with the earliest (price) datum's + # value to avoid the y-ranger including zeros and completely + # stretching the y-axis.. + if 0 < zeros.size: + zeros[[ + 'open', + 'high', + 'low', + 'close', + ]] = shm._array[zeros['index'][0] - 1]['close'] + # TODO: interatively step through any remaining # time-gaps/null-segments and spawn piecewise backfiller # tasks in a nursery? @@ -331,17 +339,6 @@ async def start_backfill( backfill_until_dt = backfill_from_dt.subtract(**period_duration) - # TODO: can we drop this? without conc i don't think this - # is necessary any more? - # configure async query throttling - # rate = config.get('rate', 1) - # XXX: legacy from ``trimeter`` code but unsupported now. - # erlangs = config.get('erlangs', 1) - # avoid duplicate history frames with a set of datetime frame - # starts and associated counts of how many duplicates we see - # per time stamp. - # starts: Counter[datetime] = Counter() - # STAGE NOTE: "backward history gap filling": # - we push to the shm buffer until we have history back # until the latest entry loaded from the tsdb's table B) @@ -1198,3 +1195,70 @@ async def manage_history( # and thus a small RPC-prot for remotely controllinlg # what data is loaded for viewing. await trio.sleep_forever() + + +def iter_dfs_from_shms( + fqme: str +) -> Generator[ + tuple[Path, ShmArray, pl.DataFrame], + None, + None, +]: + # shm buffer size table based on known sample rates + sizes: dict[str, int] = { + 'hist': _default_hist_size, + 'rt': _default_rt_size, + } + + # load all detected shm buffer files which have the + # passed FQME pattern in the file name. + shmfiles: list[Path] = [] + shmdir = Path('/dev/shm/') + + for shmfile in shmdir.glob(f'*{fqme}*'): + filename: str = shmfile.name + + # skip index files + if ( + '_first' in filename + or '_last' in filename + ): + continue + + assert shmfile.is_file() + log.debug(f'Found matching shm buffer file: {filename}') + shmfiles.append(shmfile) + + for shmfile in shmfiles: + + # lookup array buffer size based on file suffix + # being either .rt or .hist + key: str = shmfile.name.rsplit('.')[-1] + + # skip FSP buffers for now.. + if key not in sizes: + continue + + size: int = sizes[key] + + # attach to any shm buffer, load array into polars df, + # write to local parquet file. + shm, opened = maybe_open_shm_array( + key=shmfile.name, + size=size, + dtype=def_iohlcv_fields, + readonly=True, + ) + assert not opened + ohlcv = shm.array + + from ..data import tsp + df: pl.DataFrame = tsp.np2pl(ohlcv) + + yield ( + shmfile, + shm, + df, + ) + + diff --git a/piker/data/tsp.py b/piker/data/tsp.py index a33fb474..0776d3d8 100644 --- a/piker/data/tsp.py +++ b/piker/data/tsp.py @@ -322,10 +322,16 @@ def get_null_segs( # see `get_hist()` in backend, should ALWAYS be # able to handle a `start_dt=None`! # None, - absi_zeros[0] - 1, + max( + absi_zeros[0] - 1, + 0, + ), # NOTE: need the + 1 to guarantee we index "up to" # the next non-null row-datum. - absi_zeros[-1] + 1, + min( + absi_zeros[-1] + 1, + frame['index'][-1], + ), ]] else: # XXX EDGE CASE: only one null-datum found so @@ -484,6 +490,10 @@ def iter_null_segs( start_t: float = start_row['time'] start_dt: DateTime = from_timestamp(start_t) + if absi_start < 0: + import pdbp + pdbp.set_trace() + yield ( absi_start, absi_end, # abs indices fi_start, fi_end, # relative "frame" indices diff --git a/piker/storage/cli.py b/piker/storage/cli.py index 3feb0512..28ff23ab 100644 --- a/piker/storage/cli.py +++ b/piker/storage/cli.py @@ -21,8 +21,6 @@ Storage middle-ware CLIs. from __future__ import annotations from pathlib import Path import time -from typing import Generator -# from typing import TYPE_CHECKING import polars as pl import numpy as np @@ -37,14 +35,11 @@ from piker.service import open_piker_runtime from piker.cli import cli from piker.config import get_conf_dir from piker.data import ( - maybe_open_shm_array, - def_iohlcv_fields, ShmArray, tsp, ) from piker.data.history import ( - _default_hist_size, - _default_rt_size, + iter_dfs_from_shms, ) from . import ( log, @@ -190,6 +185,13 @@ def anal( ) assert first_dt < last_dt + null_segs: tuple = tsp.get_null_segs( + frame=history, + period=period, + ) + if null_segs: + await tractor.pause() + shm_df: pl.DataFrame = await client.as_df( fqme, period, @@ -204,6 +206,7 @@ def anal( diff, ) = tsp.dedupe(shm_df) + if diff: await client.write_ohlcv( fqme, @@ -219,69 +222,6 @@ def anal( trio.run(main) -def iter_dfs_from_shms(fqme: str) -> Generator[ - tuple[Path, ShmArray, pl.DataFrame], - None, - None, -]: - # shm buffer size table based on known sample rates - sizes: dict[str, int] = { - 'hist': _default_hist_size, - 'rt': _default_rt_size, - } - - # load all detected shm buffer files which have the - # passed FQME pattern in the file name. - shmfiles: list[Path] = [] - shmdir = Path('/dev/shm/') - - for shmfile in shmdir.glob(f'*{fqme}*'): - filename: str = shmfile.name - - # skip index files - if ( - '_first' in filename - or '_last' in filename - ): - continue - - assert shmfile.is_file() - log.debug(f'Found matching shm buffer file: {filename}') - shmfiles.append(shmfile) - - for shmfile in shmfiles: - - # lookup array buffer size based on file suffix - # being either .rt or .hist - key: str = shmfile.name.rsplit('.')[-1] - - # skip FSP buffers for now.. - if key not in sizes: - continue - - size: int = sizes[key] - - # attach to any shm buffer, load array into polars df, - # write to local parquet file. - shm, opened = maybe_open_shm_array( - key=shmfile.name, - size=size, - dtype=def_iohlcv_fields, - readonly=True, - ) - assert not opened - ohlcv = shm.array - - from ..data import tsp - df: pl.DataFrame = tsp.np2pl(ohlcv) - - yield ( - shmfile, - shm, - df, - ) - - @store.command() def ldshm( fqme: str, @@ -307,8 +247,8 @@ def ldshm( # compute ohlc properties for naming times: np.ndarray = shm.array['time'] - secs: float = times[-1] - times[-2] - if secs < 1.: + period_s: float = times[-1] - times[-2] + if period_s < 1.: raise ValueError( f'Something is wrong with time period for {shm}:\n{times}' ) @@ -323,17 +263,22 @@ def ldshm( diff, ) = tsp.dedupe(shm_df) + null_segs: tuple = tsp.get_null_segs( + frame=shm.array, + period=period_s, + ) + # TODO: maybe only optionally enter this depending # on some CLI flags and/or gap detection? - if ( - not gaps.is_empty() - or secs > 2 - ): + if not gaps.is_empty(): + await tractor.pause() + + if null_segs: await tractor.pause() # write to parquet file? if write_parquet: - timeframe: str = f'{secs}s' + timeframe: str = f'{period_s}s' datadir: Path = get_conf_dir() / 'nativedb' if not datadir.is_dir():