diff --git a/piker/storage/cli.py b/piker/storage/cli.py index 8c6d67ea..bf3f9072 100644 --- a/piker/storage/cli.py +++ b/piker/storage/cli.py @@ -24,8 +24,13 @@ from __future__ import annotations # AsyncExitStack, # ) from pathlib import Path +from math import copysign import time from types import ModuleType +from typing import ( + Any, + TYPE_CHECKING, +) import polars as pl import numpy as np @@ -42,15 +47,17 @@ from piker.data import ( ShmArray, ) from piker import tsp -from . import ( - log, -) +from piker.data._formatters import BGM +from . import log from . import ( __tsdbs__, open_storage_client, StorageClient, ) +if TYPE_CHECKING: + from piker.ui._remote_ctl import AnnotCtl + store = typer.Typer() @@ -203,10 +210,12 @@ def anal( deduped: pl.DataFrame # deduplicated dts ( df, - gaps, deduped, diff, - ) = tsp.dedupe(shm_df) + ) = tsp.dedupe( + shm_df, + period=period, + ) write_edits: bool = True if ( @@ -217,7 +226,6 @@ def anal( ) ): await tractor.pause() - await client.write_ohlcv( fqme, ohlcv=deduped, @@ -229,10 +237,117 @@ def anal( # is there something more minimal but nearly as # functional as ipython? await tractor.pause() + assert not null_segs trio.run(main) +async def markup_gaps( + fqme: str, + timeframe: float, + actl: AnnotCtl, + wdts: pl.DataFrame, + gaps: pl.DataFrame, + +) -> dict[int, dict]: + ''' + Remote annotate time-gaps in a dt-fielded ts (normally OHLC) + with rectangles. + + ''' + aids: dict[int] = {} + for i in range(gaps.height): + + row: pl.DataFrame = gaps[i] + + # the gap's RIGHT-most bar's OPEN value + # at that time (sample) step. + iend: int = row['index'][0] + # dt: datetime = row['dt'][0] + # dt_prev: datetime = row['dt_prev'][0] + # dt_end_t: float = dt.timestamp() + + + # TODO: can we eventually remove this + # once we figure out why the epoch cols + # don't match? + # TODO: FIX HOW/WHY these aren't matching + # and are instead off by 4hours (EST + # vs. UTC?!?!) + # end_t: float = row['time'] + # assert ( + # dt.timestamp() + # == + # end_t + # ) + + # the gap's LEFT-most bar's CLOSE value + # at that time (sample) step. + prev_r: pl.DataFrame = wdts.filter( + pl.col('index') == iend - 1 + ) + istart: int = prev_r['index'][0] + # dt_start_t: float = dt_prev.timestamp() + + # start_t: float = prev_r['time'] + # assert ( + # dt_start_t + # == + # start_t + # ) + + # TODO: implement px-col width measure + # and ensure at least as many px-cols + # shown per rect as configured by user. + # gap_w: float = abs((iend - istart)) + # if gap_w < 6: + # margin: float = 6 + # iend += margin + # istart -= margin + + rect_gap: float = BGM*3/8 + opn: float = row['open'][0] + ro: tuple[float, float] = ( + # dt_end_t, + iend + rect_gap + 1, + opn, + ) + cls: float = prev_r['close'][0] + lc: tuple[float, float] = ( + # dt_start_t, + istart - rect_gap, # + 1 , + cls, + ) + + color: str = 'dad_blue' + diff: float = cls - opn + sgn: float = copysign(1, diff) + color: str = { + -1: 'buy_green', + 1: 'sell_red', + }[sgn] + + rect_kwargs: dict[str, Any] = dict( + fqme=fqme, + timeframe=timeframe, + start_pos=lc, + end_pos=ro, + color=color, + ) + + aid: int = await actl.add_rect(**rect_kwargs) + assert aid + aids[aid] = rect_kwargs + + # tell chart to redraw all its + # graphics view layers Bo + await actl.redraw( + fqme=fqme, + timeframe=timeframe, + ) + return aids + + @store.command() def ldshm( fqme: str, @@ -249,7 +364,6 @@ def ldshm( async def main(): from piker.ui._remote_ctl import ( open_annot_ctl, - AnnotCtl, ) actl: AnnotCtl mod: ModuleType @@ -274,111 +388,97 @@ def ldshm( shm_df, ) in tsp.iter_dfs_from_shms(fqme): - # compute ohlc properties for naming times: np.ndarray = shm.array['time'] - period_s: float = float(times[-1] - times[-2]) - if period_s < 1.: + d1: float = float(times[-1] - times[-2]) + d2: float = float(times[-2] - times[-3]) + med: float = np.median(np.diff(times)) + if ( + d1 < 1. + and d2 < 1. + and med < 1. + ): raise ValueError( f'Something is wrong with time period for {shm}:\n{times}' ) + period_s: float = float(max(d1, d2, med)) + # over-write back to shm? - df: pl.DataFrame # with dts + wdts: pl.DataFrame # with dts deduped: pl.DataFrame # deduplicated dts ( - df, - gaps, + wdts, deduped, diff, - ) = tsp.dedupe(shm_df) + ) = tsp.dedupe( + shm_df, + period=period_s, + ) null_segs: tuple = tsp.get_null_segs( frame=shm.array, period=period_s, ) - needs_correction: bool = ( - not gaps.is_empty() - or null_segs + # detect gaps from in expected (uniform OHLC) sample period + step_gaps: pl.DataFrame = tsp.detect_time_gaps( + wdts, + expect_period=period_s, ) - # TODO: maybe only optionally enter this depending - # on some CLI flags and/or gap detection? - if needs_correction: - for i in range(gaps.height): - row: pl.DataFrame = gaps[i] - # TODO: can we eventually remove this - # once we figure out why the epoch cols - # don't match? - iend: int = row['index'][0] - # dt: datetime = row['dt'][0] - # dt_prev: datetime = row['dt_prev'][0] + # TODO: by default we always want to mark these up + # with rects showing up/down gaps Bo + venue_gaps: pl.DataFrame = tsp.detect_time_gaps( + wdts, + expect_period=period_s, - # the gap's right-most bar's OPEN value - # at that time (sample) step. - # dt_end_t: float = dt.timestamp() + # TODO: actually pull the exact duration + # expected for each venue operational period? + gap_dt_unit='days', + gap_thresh=1, + ) - # TODO: FIX HOW/WHY these aren't matching - # and are instead off by 4hours (EST - # vs. UTC?!?!) - # end_t: float = row['time'] - # assert ( - # dt.timestamp() - # == - # end_t - # ) + # TODO: call null-seg fixer somehow? + if null_segs: + await tractor.pause() + # async with ( + # trio.open_nursery() as tn, + # mod.open_history_client( + # mkt, + # ) as (get_hist, config), + # ): + # nulls_detected: trio.Event = await tn.start(partial( + # tsp.maybe_fill_null_segments, - # the gap's left-most bar's CLOSE value - # at that time (sample) step. - prev_r: pl.DataFrame = df.filter( - pl.col('index') == iend - 1 + # shm=shm, + # timeframe=timeframe, + # get_hist=get_hist, + # sampler_stream=sampler_stream, + # mkt=mkt, + # )) + + # TODO: find the disjoint set of step gaps from + # venue (closure) set! + # -[ ] do a set diff by checking for the unique + # gap set only in the step_gaps? + if ( + not venue_gaps.is_empty() + # and not step_gaps.is_empty() + ): + do_markup_gaps: bool = True + if do_markup_gaps: + aids: dict = await markup_gaps( + fqme, + period_s, + actl, + wdts, + step_gaps, ) - istart: int = prev_r['index'][0] - # dt_start_t: float = dt_prev.timestamp() + assert aids - # start_t: float = prev_r['time'] - # assert ( - # dt_start_t - # == - # start_t - # ) - - # TODO: implement px-col width measure - # and ensure at least as many px-cols - # shown per rect as configured by user. - gap_w: float = abs((iend - istart)) - if gap_w < 6: - margin: float = 6 - iend += margin - istart -= margin - - ro: tuple[float, float] = ( - # dt_end_t, - iend, - row['open'][0], - ) - lc: tuple[float, float] = ( - # dt_start_t, - istart, - prev_r['close'][0], - ) - - # async with actl.open_rect( - # ) as aid: - aid: int = await actl.add_rect( - fqme=fqme, - timeframe=period_s, - start_pos=lc, - end_pos=ro, - ) - assert aid - - # write to parquet file? - if ( - write_parquet - ): - # write to fs - start = time.time() + # write repaired ts to parquet-file? + if write_parquet: + start: float = time.time() path: Path = await client.write_ohlcv( fqme, ohlcv=deduped, @@ -390,7 +490,7 @@ def ldshm( ) # read back from fs - start = time.time() + start: float = time.time() read_df: pl.DataFrame = pl.read_parquet(path) read_delay: float = round( time.time() - start, @@ -412,6 +512,8 @@ def ldshm( shm._array.setflags( write=int(1), ) + # last chance manual overwrites in REPL + await tractor.pause() shm.push( new, prepend=True, @@ -419,8 +521,6 @@ def ldshm( update_first=False, # don't update ._first ) - await tractor.pause() - assert diff else: # allow interaction even when no ts problems. @@ -428,8 +528,11 @@ def ldshm( assert not diff - if df is None: - log.error(f'No matching shm buffers for {fqme} ?') + if shm_df is None: + log.error( + f'No matching shm buffers for {fqme} ?' + + ) trio.run(main)