Fix .parquet filenaming..

Apparently `.storage.nativedb.mk_ohlcv_shm_keyed_filepath()` was always
kinda broken if you passed in a `period: float` with an actual non-`int`
to the format string? Fixed it to strictly cast to `int()` before
str-ifying so that you don't get weird `60.0s.parquet` in there..

Further this rejigs the `sotre ldshm` gap correction-annotation loop to,
- use `StorageClient.write_ohlcv()` instead of hackily re-implementing
  it.. now that problem from above is fixed!
- use a `needs_correction: bool` var to determine if gap markup and
  de-duplictated data should be pushed to the shm buffer,
- go back to using `AnnotCtl.add_rect()` for all detected gaps such that
  they all persist (and thus are shown together) until the client
  disconnects.
distribute_dis
Tyler Goodlet 2023-12-26 17:14:26 -05:00
parent 1d7e97a295
commit a86573b5a2
2 changed files with 151 additions and 110 deletions

View File

@ -20,8 +20,12 @@ Storage middle-ware CLIs.
""" """
from __future__ import annotations from __future__ import annotations
# from datetime import datetime # from datetime import datetime
# from contextlib import (
# AsyncExitStack,
# )
from pathlib import Path from pathlib import Path
import time import time
from types import ModuleType
import polars as pl import polars as pl
import numpy as np import numpy as np
@ -34,7 +38,6 @@ import typer
from piker.service import open_piker_runtime from piker.service import open_piker_runtime
from piker.cli import cli from piker.cli import cli
from piker.config import get_conf_dir
from piker.data import ( from piker.data import (
ShmArray, ShmArray,
) )
@ -45,6 +48,7 @@ from . import (
from . import ( from . import (
__tsdbs__, __tsdbs__,
open_storage_client, open_storage_client,
StorageClient,
) )
@ -232,7 +236,8 @@ def anal(
@store.command() @store.command()
def ldshm( def ldshm(
fqme: str, fqme: str,
write_parquet: bool = False, write_parquet: bool = True,
reload_parquet_to_shm: bool = True,
) -> None: ) -> None:
''' '''
@ -242,15 +247,32 @@ def ldshm(
''' '''
async def main(): async def main():
from piker.ui._remote_ctl import (
open_annot_ctl,
AnnotCtl,
)
actl: AnnotCtl
mod: ModuleType
client: StorageClient
async with ( async with (
open_piker_runtime( open_piker_runtime(
'polars_boi', 'polars_boi',
enable_modules=['piker.data._sharedmem'], enable_modules=['piker.data._sharedmem'],
debug_mode=True, debug_mode=True,
), ),
open_storage_client() as (
mod,
client,
),
open_annot_ctl() as actl,
): ):
df: pl.DataFrame | None = None shm_df: pl.DataFrame | None = None
for shmfile, shm, shm_df in tsp.iter_dfs_from_shms(fqme): for (
shmfile,
shm,
# parquet_path,
shm_df,
) in tsp.iter_dfs_from_shms(fqme):
# compute ohlc properties for naming # compute ohlc properties for naming
times: np.ndarray = shm.array['time'] times: np.ndarray = shm.array['time']
@ -275,20 +297,14 @@ def ldshm(
period=period_s, period=period_s,
) )
# TODO: maybe only optionally enter this depending needs_correction: bool = (
# on some CLI flags and/or gap detection?
if (
not gaps.is_empty() not gaps.is_empty()
or null_segs or null_segs
):
from piker.ui._remote_ctl import (
open_annot_ctl,
AnnotCtl,
) )
annot_ctl: AnnotCtl # TODO: maybe only optionally enter this depending
async with open_annot_ctl() as annot_ctl: # on some CLI flags and/or gap detection?
if needs_correction:
for i in range(gaps.height): for i in range(gaps.height):
row: pl.DataFrame = gaps[i] row: pl.DataFrame = gaps[i]
# TODO: can we eventually remove this # TODO: can we eventually remove this
@ -314,9 +330,8 @@ def ldshm(
# the gap's left-most bar's CLOSE value # the gap's left-most bar's CLOSE value
# at that time (sample) step. # at that time (sample) step.
prev_r: pl.DataFrame = df.filter( prev_r: pl.DataFrame = df.filter(
pl.col('index') == gaps[0]['index'] - 1 pl.col('index') == iend - 1
) )
istart: int = prev_r['index'][0] istart: int = prev_r['index'][0]
# dt_start_t: float = dt_prev.timestamp() # dt_start_t: float = dt_prev.timestamp()
@ -332,7 +347,6 @@ def ldshm(
# and ensure at least as many px-cols # and ensure at least as many px-cols
# shown per rect as configured by user. # shown per rect as configured by user.
gap_w: float = abs((iend - istart)) gap_w: float = abs((iend - istart))
# await tractor.pause()
if gap_w < 6: if gap_w < 6:
margin: float = 6 margin: float = 6
iend += margin iend += margin
@ -349,49 +363,71 @@ def ldshm(
prev_r['close'][0], prev_r['close'][0],
) )
aid: int = await annot_ctl.add_rect( # async with actl.open_rect(
# ) as aid:
aid: int = await actl.add_rect(
fqme=fqme, fqme=fqme,
timeframe=period_s, timeframe=period_s,
start_pos=lc, start_pos=lc,
end_pos=ro, end_pos=ro,
) )
assert aid assert aid
await tractor.pause()
# write to parquet file? # write to parquet file?
if write_parquet: if (
timeframe: str = f'{period_s}s' write_parquet
):
datadir: Path = get_conf_dir() / 'nativedb'
if not datadir.is_dir():
datadir.mkdir()
path: Path = datadir / f'{fqme}.{timeframe}.parquet'
# write to fs # write to fs
start = time.time() start = time.time()
df.write_parquet(path) path: Path = await client.write_ohlcv(
delay: float = round( fqme,
ohlcv=deduped,
timeframe=period_s,
)
write_delay: float = round(
time.time() - start, time.time() - start,
ndigits=6, ndigits=6,
) )
log.info(
f'parquet write took {delay} secs\n'
f'file path: {path}'
)
# read back from fs # read back from fs
start = time.time() start = time.time()
read_df: pl.DataFrame = pl.read_parquet(path) read_df: pl.DataFrame = pl.read_parquet(path)
delay: float = round( read_delay: float = round(
time.time() - start, time.time() - start,
ndigits=6, ndigits=6,
) )
print( log.info(
f'parquet read took {delay} secs\n' f'parquet write took {write_delay} secs\n'
f'file path: {path}'
f'parquet read took {read_delay} secs\n'
f'polars df: {read_df}' f'polars df: {read_df}'
) )
if reload_parquet_to_shm:
new = tsp.pl2np(
deduped,
dtype=shm.array.dtype,
)
# since normally readonly
shm._array.setflags(
write=int(1),
)
shm.push(
new,
prepend=True,
start=new['index'][-1],
update_first=False, # don't update ._first
)
await tractor.pause()
assert diff
else:
# allow interaction even when no ts problems.
await tractor.pause()
assert not diff
if df is None: if df is None:
log.error(f'No matching shm buffers for {fqme} ?') log.error(f'No matching shm buffers for {fqme} ?')

View File

@ -95,16 +95,19 @@ def detect_period(shm: ShmArray) -> float:
def mk_ohlcv_shm_keyed_filepath( def mk_ohlcv_shm_keyed_filepath(
fqme: str, fqme: str,
period: float, # ow known as the "timeframe" period: float | int, # ow known as the "timeframe"
datadir: Path, datadir: Path,
) -> str: ) -> Path:
if period < 1.: if period < 1.:
raise ValueError('Sample period should be >= 1.!?') raise ValueError('Sample period should be >= 1.!?')
period_s: str = f'{period}s' path: Path = (
path: Path = datadir / f'{fqme}.ohlcv{period_s}.parquet' datadir
/
f'{fqme}.ohlcv{int(period)}s.parquet'
)
return path return path
@ -227,6 +230,7 @@ class NativeStorageClient:
self, self,
fqme: str, fqme: str,
period: float, period: float,
) -> Path: ) -> Path:
return mk_ohlcv_shm_keyed_filepath( return mk_ohlcv_shm_keyed_filepath(
fqme=fqme, fqme=fqme,
@ -239,6 +243,7 @@ class NativeStorageClient:
fqme: str, fqme: str,
df: pl.DataFrame, df: pl.DataFrame,
timeframe: float, timeframe: float,
) -> None: ) -> None:
# cache df for later usage since we (currently) need to # cache df for later usage since we (currently) need to
# convert to np.ndarrays to push to our `ShmArray` rt # convert to np.ndarrays to push to our `ShmArray` rt