Move `iter_dfs_from_shms` into `.data.history`

Thinking about just moving all of that module (after a content breakup)
to a new `.piker.tsp` which will mostly depend on the `.data` and
`.storage` sub-pkgs; the idea is to move biz-logic for tsdb IO/mgmt and
orchestration with real-time (shm) buffers and the graphics layer into
a common spot for both manual analysis/research work and better
separation of low level data structure primitives from their higher
level usage.

Add a better `data.history` mod doc string in prep for this move
as well as clean out a bunch of legacy commented cruft from the
`trimeter` and `marketstore` days.

TO CHERRY #486 (if we can)
distribute_dis
Tyler Goodlet 2023-12-15 15:53:02 -05:00
parent 3639f360c3
commit 8989c73a93
3 changed files with 131 additions and 112 deletions

View File

@ -16,19 +16,26 @@
# <https://www.gnu.org/licenses/>. # <https://www.gnu.org/licenses/>.
''' '''
Historical data business logic for load, backfill and tsdb storage. Historical TSP (time-series processing) lowlevel mgmt machinery and biz logic for,
- hi-level biz logics using the `.storage` subpkg APIs for (I/O)
orchestration and mgmt of tsdb data sets.
- core data-provider history backfilling middleware (as task-funcs) via
(what will eventually be `datad`, but are rn is the) `.brokers` backend
APIs.
- various data set cleaning, repairing and issue-detection/analysis
routines to ensure consistent series whether in shm or when
stored offline (in a tsdb).
''' '''
from __future__ import annotations from __future__ import annotations
# from collections import (
# Counter,
# )
from datetime import datetime from datetime import datetime
from functools import partial from functools import partial
# import time from pathlib import Path
from types import ModuleType from types import ModuleType
from typing import ( from typing import (
Callable, Callable,
Generator,
TYPE_CHECKING, TYPE_CHECKING,
) )
@ -118,6 +125,7 @@ def diff_history(
return array[times >= prepend_until_dt.timestamp()] return array[times >= prepend_until_dt.timestamp()]
# TODO: can't we just make this a sync func now?
async def shm_push_in_between( async def shm_push_in_between(
shm: ShmArray, shm: ShmArray,
to_push: np.ndarray, to_push: np.ndarray,
@ -126,6 +134,10 @@ async def shm_push_in_between(
update_start_on_prepend: bool = False, update_start_on_prepend: bool = False,
) -> int: ) -> int:
# XXX: extremely important, there can be no checkpoints
# in the body of this func to avoid entering new ``frames``
# values while we're pipelining the current ones to
# memory...
shm.push( shm.push(
to_push, to_push,
prepend=True, prepend=True,
@ -146,24 +158,6 @@ async def shm_push_in_between(
else None else None
), ),
) )
# XXX: extremely important, there can be no checkpoints
# in the block above to avoid entering new ``frames``
# values while we're pipelining the current ones to
# memory...
array = shm.array
zeros = array[array['low'] == 0]
# always backfill gaps with the earliest (price) datum's
# value to avoid the y-ranger including zeros and completely
# stretching the y-axis..
if 0 < zeros.size:
zeros[[
'open',
'high',
'low',
'close',
]] = shm._array[zeros['index'][0] - 1]['close']
# await tractor.pause()
async def maybe_fill_null_segments( async def maybe_fill_null_segments(
@ -260,6 +254,20 @@ async def maybe_fill_null_segments(
): ):
await tractor.pause() await tractor.pause()
array = shm.array
zeros = array[array['low'] == 0]
# always backfill gaps with the earliest (price) datum's
# value to avoid the y-ranger including zeros and completely
# stretching the y-axis..
if 0 < zeros.size:
zeros[[
'open',
'high',
'low',
'close',
]] = shm._array[zeros['index'][0] - 1]['close']
# TODO: interatively step through any remaining # TODO: interatively step through any remaining
# time-gaps/null-segments and spawn piecewise backfiller # time-gaps/null-segments and spawn piecewise backfiller
# tasks in a nursery? # tasks in a nursery?
@ -331,17 +339,6 @@ async def start_backfill(
backfill_until_dt = backfill_from_dt.subtract(**period_duration) backfill_until_dt = backfill_from_dt.subtract(**period_duration)
# TODO: can we drop this? without conc i don't think this
# is necessary any more?
# configure async query throttling
# rate = config.get('rate', 1)
# XXX: legacy from ``trimeter`` code but unsupported now.
# erlangs = config.get('erlangs', 1)
# avoid duplicate history frames with a set of datetime frame
# starts and associated counts of how many duplicates we see
# per time stamp.
# starts: Counter[datetime] = Counter()
# STAGE NOTE: "backward history gap filling": # STAGE NOTE: "backward history gap filling":
# - we push to the shm buffer until we have history back # - we push to the shm buffer until we have history back
# until the latest entry loaded from the tsdb's table B) # until the latest entry loaded from the tsdb's table B)
@ -1198,3 +1195,70 @@ async def manage_history(
# and thus a small RPC-prot for remotely controllinlg # and thus a small RPC-prot for remotely controllinlg
# what data is loaded for viewing. # what data is loaded for viewing.
await trio.sleep_forever() await trio.sleep_forever()
def iter_dfs_from_shms(
fqme: str
) -> Generator[
tuple[Path, ShmArray, pl.DataFrame],
None,
None,
]:
# shm buffer size table based on known sample rates
sizes: dict[str, int] = {
'hist': _default_hist_size,
'rt': _default_rt_size,
}
# load all detected shm buffer files which have the
# passed FQME pattern in the file name.
shmfiles: list[Path] = []
shmdir = Path('/dev/shm/')
for shmfile in shmdir.glob(f'*{fqme}*'):
filename: str = shmfile.name
# skip index files
if (
'_first' in filename
or '_last' in filename
):
continue
assert shmfile.is_file()
log.debug(f'Found matching shm buffer file: {filename}')
shmfiles.append(shmfile)
for shmfile in shmfiles:
# lookup array buffer size based on file suffix
# being either .rt or .hist
key: str = shmfile.name.rsplit('.')[-1]
# skip FSP buffers for now..
if key not in sizes:
continue
size: int = sizes[key]
# attach to any shm buffer, load array into polars df,
# write to local parquet file.
shm, opened = maybe_open_shm_array(
key=shmfile.name,
size=size,
dtype=def_iohlcv_fields,
readonly=True,
)
assert not opened
ohlcv = shm.array
from ..data import tsp
df: pl.DataFrame = tsp.np2pl(ohlcv)
yield (
shmfile,
shm,
df,
)

View File

@ -322,10 +322,16 @@ def get_null_segs(
# see `get_hist()` in backend, should ALWAYS be # see `get_hist()` in backend, should ALWAYS be
# able to handle a `start_dt=None`! # able to handle a `start_dt=None`!
# None, # None,
absi_zeros[0] - 1, max(
absi_zeros[0] - 1,
0,
),
# NOTE: need the + 1 to guarantee we index "up to" # NOTE: need the + 1 to guarantee we index "up to"
# the next non-null row-datum. # the next non-null row-datum.
absi_zeros[-1] + 1, min(
absi_zeros[-1] + 1,
frame['index'][-1],
),
]] ]]
else: else:
# XXX EDGE CASE: only one null-datum found so # XXX EDGE CASE: only one null-datum found so
@ -484,6 +490,10 @@ def iter_null_segs(
start_t: float = start_row['time'] start_t: float = start_row['time']
start_dt: DateTime = from_timestamp(start_t) start_dt: DateTime = from_timestamp(start_t)
if absi_start < 0:
import pdbp
pdbp.set_trace()
yield ( yield (
absi_start, absi_end, # abs indices absi_start, absi_end, # abs indices
fi_start, fi_end, # relative "frame" indices fi_start, fi_end, # relative "frame" indices

View File

@ -21,8 +21,6 @@ Storage middle-ware CLIs.
from __future__ import annotations from __future__ import annotations
from pathlib import Path from pathlib import Path
import time import time
from typing import Generator
# from typing import TYPE_CHECKING
import polars as pl import polars as pl
import numpy as np import numpy as np
@ -37,14 +35,11 @@ from piker.service import open_piker_runtime
from piker.cli import cli from piker.cli import cli
from piker.config import get_conf_dir from piker.config import get_conf_dir
from piker.data import ( from piker.data import (
maybe_open_shm_array,
def_iohlcv_fields,
ShmArray, ShmArray,
tsp, tsp,
) )
from piker.data.history import ( from piker.data.history import (
_default_hist_size, iter_dfs_from_shms,
_default_rt_size,
) )
from . import ( from . import (
log, log,
@ -190,6 +185,13 @@ def anal(
) )
assert first_dt < last_dt assert first_dt < last_dt
null_segs: tuple = tsp.get_null_segs(
frame=history,
period=period,
)
if null_segs:
await tractor.pause()
shm_df: pl.DataFrame = await client.as_df( shm_df: pl.DataFrame = await client.as_df(
fqme, fqme,
period, period,
@ -204,6 +206,7 @@ def anal(
diff, diff,
) = tsp.dedupe(shm_df) ) = tsp.dedupe(shm_df)
if diff: if diff:
await client.write_ohlcv( await client.write_ohlcv(
fqme, fqme,
@ -219,69 +222,6 @@ def anal(
trio.run(main) trio.run(main)
def iter_dfs_from_shms(fqme: str) -> Generator[
tuple[Path, ShmArray, pl.DataFrame],
None,
None,
]:
# shm buffer size table based on known sample rates
sizes: dict[str, int] = {
'hist': _default_hist_size,
'rt': _default_rt_size,
}
# load all detected shm buffer files which have the
# passed FQME pattern in the file name.
shmfiles: list[Path] = []
shmdir = Path('/dev/shm/')
for shmfile in shmdir.glob(f'*{fqme}*'):
filename: str = shmfile.name
# skip index files
if (
'_first' in filename
or '_last' in filename
):
continue
assert shmfile.is_file()
log.debug(f'Found matching shm buffer file: {filename}')
shmfiles.append(shmfile)
for shmfile in shmfiles:
# lookup array buffer size based on file suffix
# being either .rt or .hist
key: str = shmfile.name.rsplit('.')[-1]
# skip FSP buffers for now..
if key not in sizes:
continue
size: int = sizes[key]
# attach to any shm buffer, load array into polars df,
# write to local parquet file.
shm, opened = maybe_open_shm_array(
key=shmfile.name,
size=size,
dtype=def_iohlcv_fields,
readonly=True,
)
assert not opened
ohlcv = shm.array
from ..data import tsp
df: pl.DataFrame = tsp.np2pl(ohlcv)
yield (
shmfile,
shm,
df,
)
@store.command() @store.command()
def ldshm( def ldshm(
fqme: str, fqme: str,
@ -307,8 +247,8 @@ def ldshm(
# compute ohlc properties for naming # compute ohlc properties for naming
times: np.ndarray = shm.array['time'] times: np.ndarray = shm.array['time']
secs: float = times[-1] - times[-2] period_s: float = times[-1] - times[-2]
if secs < 1.: if period_s < 1.:
raise ValueError( raise ValueError(
f'Something is wrong with time period for {shm}:\n{times}' f'Something is wrong with time period for {shm}:\n{times}'
) )
@ -323,17 +263,22 @@ def ldshm(
diff, diff,
) = tsp.dedupe(shm_df) ) = tsp.dedupe(shm_df)
null_segs: tuple = tsp.get_null_segs(
frame=shm.array,
period=period_s,
)
# TODO: maybe only optionally enter this depending # TODO: maybe only optionally enter this depending
# on some CLI flags and/or gap detection? # on some CLI flags and/or gap detection?
if ( if not gaps.is_empty():
not gaps.is_empty() await tractor.pause()
or secs > 2
): if null_segs:
await tractor.pause() await tractor.pause()
# write to parquet file? # write to parquet file?
if write_parquet: if write_parquet:
timeframe: str = f'{secs}s' timeframe: str = f'{period_s}s'
datadir: Path = get_conf_dir() / 'nativedb' datadir: Path = get_conf_dir() / 'nativedb'
if not datadir.is_dir(): if not datadir.is_dir():