Factor gap annotating into new `markup_gaps()`

Since we definitely want to markup gaps that are both data-errors and
just plain old venue closures (time gaps), generalize the `gaps:
pl.DataFrame` loop in a func and call it from the `ldshm` cmd for now.

Some other tweaks to `store ldshm`:
- add `np.median()` failover when detecting (ohlc) ts sample rate.
- use new `tsp.dedupe()` signature.
- differentiate between sample-period size gaps and venue closure gaps
  by calling `tsp.detect_time_gaps()` with diff size thresholds.
- add todo for backfilling null-segs when detected.
distribute_dis
Tyler Goodlet 2024-01-04 11:01:21 -05:00
parent 05f874001a
commit 6959429af8
1 changed files with 197 additions and 94 deletions

View File

@ -24,8 +24,13 @@ from __future__ import annotations
# AsyncExitStack,
# )
from pathlib import Path
from math import copysign
import time
from types import ModuleType
from typing import (
Any,
TYPE_CHECKING,
)
import polars as pl
import numpy as np
@ -42,15 +47,17 @@ from piker.data import (
ShmArray,
)
from piker import tsp
from . import (
log,
)
from piker.data._formatters import BGM
from . import log
from . import (
__tsdbs__,
open_storage_client,
StorageClient,
)
if TYPE_CHECKING:
from piker.ui._remote_ctl import AnnotCtl
store = typer.Typer()
@ -203,10 +210,12 @@ def anal(
deduped: pl.DataFrame # deduplicated dts
(
df,
gaps,
deduped,
diff,
) = tsp.dedupe(shm_df)
) = tsp.dedupe(
shm_df,
period=period,
)
write_edits: bool = True
if (
@ -217,7 +226,6 @@ def anal(
)
):
await tractor.pause()
await client.write_ohlcv(
fqme,
ohlcv=deduped,
@ -229,10 +237,117 @@ def anal(
# is there something more minimal but nearly as
# functional as ipython?
await tractor.pause()
assert not null_segs
trio.run(main)
async def markup_gaps(
fqme: str,
timeframe: float,
actl: AnnotCtl,
wdts: pl.DataFrame,
gaps: pl.DataFrame,
) -> dict[int, dict]:
'''
Remote annotate time-gaps in a dt-fielded ts (normally OHLC)
with rectangles.
'''
aids: dict[int] = {}
for i in range(gaps.height):
row: pl.DataFrame = gaps[i]
# the gap's RIGHT-most bar's OPEN value
# at that time (sample) step.
iend: int = row['index'][0]
# dt: datetime = row['dt'][0]
# dt_prev: datetime = row['dt_prev'][0]
# dt_end_t: float = dt.timestamp()
# TODO: can we eventually remove this
# once we figure out why the epoch cols
# don't match?
# TODO: FIX HOW/WHY these aren't matching
# and are instead off by 4hours (EST
# vs. UTC?!?!)
# end_t: float = row['time']
# assert (
# dt.timestamp()
# ==
# end_t
# )
# the gap's LEFT-most bar's CLOSE value
# at that time (sample) step.
prev_r: pl.DataFrame = wdts.filter(
pl.col('index') == iend - 1
)
istart: int = prev_r['index'][0]
# dt_start_t: float = dt_prev.timestamp()
# start_t: float = prev_r['time']
# assert (
# dt_start_t
# ==
# start_t
# )
# TODO: implement px-col width measure
# and ensure at least as many px-cols
# shown per rect as configured by user.
# gap_w: float = abs((iend - istart))
# if gap_w < 6:
# margin: float = 6
# iend += margin
# istart -= margin
rect_gap: float = BGM*3/8
opn: float = row['open'][0]
ro: tuple[float, float] = (
# dt_end_t,
iend + rect_gap + 1,
opn,
)
cls: float = prev_r['close'][0]
lc: tuple[float, float] = (
# dt_start_t,
istart - rect_gap, # + 1 ,
cls,
)
color: str = 'dad_blue'
diff: float = cls - opn
sgn: float = copysign(1, diff)
color: str = {
-1: 'buy_green',
1: 'sell_red',
}[sgn]
rect_kwargs: dict[str, Any] = dict(
fqme=fqme,
timeframe=timeframe,
start_pos=lc,
end_pos=ro,
color=color,
)
aid: int = await actl.add_rect(**rect_kwargs)
assert aid
aids[aid] = rect_kwargs
# tell chart to redraw all its
# graphics view layers Bo
await actl.redraw(
fqme=fqme,
timeframe=timeframe,
)
return aids
@store.command()
def ldshm(
fqme: str,
@ -249,7 +364,6 @@ def ldshm(
async def main():
from piker.ui._remote_ctl import (
open_annot_ctl,
AnnotCtl,
)
actl: AnnotCtl
mod: ModuleType
@ -274,111 +388,97 @@ def ldshm(
shm_df,
) in tsp.iter_dfs_from_shms(fqme):
# compute ohlc properties for naming
times: np.ndarray = shm.array['time']
period_s: float = float(times[-1] - times[-2])
if period_s < 1.:
d1: float = float(times[-1] - times[-2])
d2: float = float(times[-2] - times[-3])
med: float = np.median(np.diff(times))
if (
d1 < 1.
and d2 < 1.
and med < 1.
):
raise ValueError(
f'Something is wrong with time period for {shm}:\n{times}'
)
period_s: float = float(max(d1, d2, med))
# over-write back to shm?
df: pl.DataFrame # with dts
wdts: pl.DataFrame # with dts
deduped: pl.DataFrame # deduplicated dts
(
df,
gaps,
wdts,
deduped,
diff,
) = tsp.dedupe(shm_df)
) = tsp.dedupe(
shm_df,
period=period_s,
)
null_segs: tuple = tsp.get_null_segs(
frame=shm.array,
period=period_s,
)
needs_correction: bool = (
not gaps.is_empty()
or null_segs
# detect gaps from in expected (uniform OHLC) sample period
step_gaps: pl.DataFrame = tsp.detect_time_gaps(
wdts,
expect_period=period_s,
)
# TODO: maybe only optionally enter this depending
# on some CLI flags and/or gap detection?
if needs_correction:
for i in range(gaps.height):
row: pl.DataFrame = gaps[i]
# TODO: can we eventually remove this
# once we figure out why the epoch cols
# don't match?
iend: int = row['index'][0]
# dt: datetime = row['dt'][0]
# dt_prev: datetime = row['dt_prev'][0]
# TODO: by default we always want to mark these up
# with rects showing up/down gaps Bo
venue_gaps: pl.DataFrame = tsp.detect_time_gaps(
wdts,
expect_period=period_s,
# the gap's right-most bar's OPEN value
# at that time (sample) step.
# dt_end_t: float = dt.timestamp()
# TODO: actually pull the exact duration
# expected for each venue operational period?
gap_dt_unit='days',
gap_thresh=1,
)
# TODO: FIX HOW/WHY these aren't matching
# and are instead off by 4hours (EST
# vs. UTC?!?!)
# end_t: float = row['time']
# assert (
# dt.timestamp()
# ==
# end_t
# )
# TODO: call null-seg fixer somehow?
if null_segs:
await tractor.pause()
# async with (
# trio.open_nursery() as tn,
# mod.open_history_client(
# mkt,
# ) as (get_hist, config),
# ):
# nulls_detected: trio.Event = await tn.start(partial(
# tsp.maybe_fill_null_segments,
# the gap's left-most bar's CLOSE value
# at that time (sample) step.
prev_r: pl.DataFrame = df.filter(
pl.col('index') == iend - 1
# shm=shm,
# timeframe=timeframe,
# get_hist=get_hist,
# sampler_stream=sampler_stream,
# mkt=mkt,
# ))
# TODO: find the disjoint set of step gaps from
# venue (closure) set!
# -[ ] do a set diff by checking for the unique
# gap set only in the step_gaps?
if (
not venue_gaps.is_empty()
# and not step_gaps.is_empty()
):
do_markup_gaps: bool = True
if do_markup_gaps:
aids: dict = await markup_gaps(
fqme,
period_s,
actl,
wdts,
step_gaps,
)
istart: int = prev_r['index'][0]
# dt_start_t: float = dt_prev.timestamp()
assert aids
# start_t: float = prev_r['time']
# assert (
# dt_start_t
# ==
# start_t
# )
# TODO: implement px-col width measure
# and ensure at least as many px-cols
# shown per rect as configured by user.
gap_w: float = abs((iend - istart))
if gap_w < 6:
margin: float = 6
iend += margin
istart -= margin
ro: tuple[float, float] = (
# dt_end_t,
iend,
row['open'][0],
)
lc: tuple[float, float] = (
# dt_start_t,
istart,
prev_r['close'][0],
)
# async with actl.open_rect(
# ) as aid:
aid: int = await actl.add_rect(
fqme=fqme,
timeframe=period_s,
start_pos=lc,
end_pos=ro,
)
assert aid
# write to parquet file?
if (
write_parquet
):
# write to fs
start = time.time()
# write repaired ts to parquet-file?
if write_parquet:
start: float = time.time()
path: Path = await client.write_ohlcv(
fqme,
ohlcv=deduped,
@ -390,7 +490,7 @@ def ldshm(
)
# read back from fs
start = time.time()
start: float = time.time()
read_df: pl.DataFrame = pl.read_parquet(path)
read_delay: float = round(
time.time() - start,
@ -412,6 +512,8 @@ def ldshm(
shm._array.setflags(
write=int(1),
)
# last chance manual overwrites in REPL
await tractor.pause()
shm.push(
new,
prepend=True,
@ -419,8 +521,6 @@ def ldshm(
update_first=False, # don't update ._first
)
await tractor.pause()
assert diff
else:
# allow interaction even when no ts problems.
@ -428,8 +528,11 @@ def ldshm(
assert not diff
if df is None:
log.error(f'No matching shm buffers for {fqme} ?')
if shm_df is None:
log.error(
f'No matching shm buffers for {fqme} ?'
)
trio.run(main)