tsp_gaps: fixes for fault-less OHLCV time-series loads #35

Merged
goodboy merged 6 commits from tsp_gaps into gitea_feats 2025-02-21 20:46:37 +00:00
4 changed files with 172 additions and 77 deletions

View File

@ -386,6 +386,8 @@ def ldshm(
open_annot_ctl() as actl,
):
shm_df: pl.DataFrame | None = None
tf2aids: dict[float, dict] = {}
for (
shmfile,
shm,
@ -526,16 +528,17 @@ def ldshm(
new_df,
step_gaps,
)
# last chance manual overwrites in REPL
await tractor.pause()
# await tractor.pause()
assert aids
tf2aids[period_s] = aids
else:
# allow interaction even when no ts problems.
await tractor.pause()
# assert not diff
assert not diff
await tractor.pause()
log.info('Exiting TSP shm anal-izer!')
if shm_df is None:
log.error(

View File

@ -161,7 +161,13 @@ class NativeStorageClient:
def index_files(self):
for path in self._datadir.iterdir():
if path.name in {'borked', 'expired',}:
if (
path.is_dir()
or
'.parquet' not in str(path)
# or
# path.name in {'borked', 'expired',}
):
continue
key: str = path.name.rstrip('.parquet')

View File

@ -44,8 +44,10 @@ import trio
from trio_typing import TaskStatus
import tractor
from pendulum import (
Interval,
DateTime,
Duration,
duration as mk_duration,
from_timestamp,
)
import numpy as np
@ -214,7 +216,8 @@ async def maybe_fill_null_segments(
# pair, immediately stop backfilling?
if (
start_dt
and end_dt < start_dt
and
end_dt < start_dt
):
await tractor.pause()
break
@ -262,6 +265,7 @@ async def maybe_fill_null_segments(
except tractor.ContextCancelled:
# log.exception
await tractor.pause()
raise
null_segs_detected.set()
# RECHECK for more null-gaps
@ -349,7 +353,7 @@ async def maybe_fill_null_segments(
async def start_backfill(
get_hist,
frame_types: dict[str, Duration] | None,
def_frame_duration: Duration,
mod: ModuleType,
mkt: MktPair,
shm: ShmArray,
@ -379,22 +383,23 @@ async def start_backfill(
update_start_on_prepend: bool = False
if backfill_until_dt is None:
# TODO: drop this right and just expose the backfill
# limits inside a [storage] section in conf.toml?
# when no tsdb "last datum" is provided, we just load
# some near-term history.
# periods = {
# 1: {'days': 1},
# 60: {'days': 14},
# }
# do a decently sized backfill and load it into storage.
# TODO: per-provider default history-durations?
# -[ ] inside the `open_history_client()` config allow
# declaring the history duration limits instead of
# guessing and/or applying the same limits to all?
#
# -[ ] allow declaring (default) per-provider backfill
# limits inside a [storage] sub-section in conf.toml?
#
# NOTE, when no tsdb "last datum" is provided, we just
# load some near-term history by presuming a "decently
# large" 60s duration limit and a much shorter 1s range.
periods = {
1: {'days': 2},
60: {'years': 6},
}
period_duration: int = periods[timeframe]
update_start_on_prepend = True
update_start_on_prepend: bool = True
# NOTE: manually set the "latest" datetime which we intend to
# backfill history "until" so as to adhere to the history
@ -416,7 +421,6 @@ async def start_backfill(
f'backfill_until_dt: {backfill_until_dt}\n'
f'last_start_dt: {last_start_dt}\n'
)
try:
(
array,
@ -426,71 +430,114 @@ async def start_backfill(
timeframe,
end_dt=last_start_dt,
)
except NoData as _daterr:
# 3 cases:
# - frame in the middle of a legit venue gap
# - history actually began at the `last_start_dt`
# - some other unknown error (ib blocking the
# history bc they don't want you seeing how they
# cucked all the tinas..)
if dur := frame_types.get(timeframe):
# decrement by a frame's worth of duration and
# retry a few times.
last_start_dt.subtract(
seconds=dur.total_seconds()
orig_last_start_dt: datetime = last_start_dt
gap_report: str = (
f'EMPTY FRAME for `end_dt: {last_start_dt}`?\n'
f'{mod.name} -> tf@fqme: {timeframe}@{mkt.fqme}\n'
f'last_start_dt: {orig_last_start_dt}\n\n'
f'bf_until: {backfill_until_dt}\n'
)
log.warning(
f'{mod.name} -> EMPTY FRAME for end_dt?\n'
f'tf@fqme: {timeframe}@{mkt.fqme}\n'
'bf_until <- last_start_dt:\n'
f'{backfill_until_dt} <- {last_start_dt}\n'
f'Decrementing `end_dt` by {dur} and retry..\n'
# EMPTY FRAME signal with 3 (likely) causes:
#
# 1. range contains legit gap in venue history
# 2. history actually (edge case) **began** at the
# value `last_start_dt`
# 3. some other unknown error (ib blocking the
# history-query bc they don't want you seeing how
# they cucked all the tinas.. like with options
# hist)
#
if def_frame_duration:
# decrement by a duration's (frame) worth of time
# as maybe indicated by the backend to see if we
# can get older data before this possible
# "history gap".
last_start_dt: datetime = last_start_dt.subtract(
seconds=def_frame_duration.total_seconds()
)
gap_report += (
f'Decrementing `end_dt` and retrying with,\n'
f'def_frame_duration: {def_frame_duration}\n'
f'(new) last_start_dt: {last_start_dt}\n'
)
log.warning(gap_report)
# skip writing to shm/tsdb and try the next
# duration's worth of prior history.
continue
# broker says there never was or is no more history to pull
except DataUnavailable:
log.warning(
f'NO-MORE-DATA in range?\n'
f'`{mod.name}` halted history:\n'
f'tf@fqme: {timeframe}@{mkt.fqme}\n'
'bf_until <- last_start_dt:\n'
f'{backfill_until_dt} <- {last_start_dt}\n'
)
else:
# await tractor.pause()
raise DataUnavailable(gap_report)
# ugh, what's a better way?
# TODO: fwiw, we probably want a way to signal a throttle
# condition (eg. with ib) so that we can halt the
# request loop until the condition is resolved?
if timeframe > 1:
await tractor.pause()
# broker says there never was or is no more history to pull
except DataUnavailable as due:
message: str = due.args[0]
log.warning(
f'Provider {mod.name!r} halted backfill due to,\n\n'
f'{message}\n'
f'fqme: {mkt.fqme}\n'
f'timeframe: {timeframe}\n'
f'last_start_dt: {last_start_dt}\n'
f'bf_until: {backfill_until_dt}\n'
)
# UGH: what's a better way?
# TODO: backends are responsible for being correct on
# this right!?
# -[ ] in the `ib` case we could maybe offer some way
# to halt the request loop until the condition is
# resolved or should the backend be entirely in
# charge of solving such faults? yes, right?
return
time: np.ndarray = array['time']
assert (
array['time'][0]
time[0]
==
next_start_dt.timestamp()
)
diff = last_start_dt - next_start_dt
frame_time_diff_s = diff.seconds
assert time[-1] == next_end_dt.timestamp()
expected_dur: Interval = last_start_dt - next_start_dt
# frame's worth of sample-period-steps, in seconds
frame_size_s: float = len(array) * timeframe
expected_frame_size_s: float = frame_size_s + timeframe
if frame_time_diff_s > expected_frame_size_s:
recv_frame_dur: Duration = (
from_timestamp(array[-1]['time'])
-
from_timestamp(array[0]['time'])
)
if (
(lt_frame := (recv_frame_dur < expected_dur))
or
(null_frame := (frame_size_s == 0))
# ^XXX, should NEVER hit now!
):
# XXX: query result includes a start point prior to our
# expected "frame size" and thus is likely some kind of
# history gap (eg. market closed period, outage, etc.)
# so just report it to console for now.
if lt_frame:
reason = 'Possible GAP (or first-datum)'
else:
assert null_frame
reason = 'NULL-FRAME'
missing_dur: Interval = expected_dur.end - recv_frame_dur.end
log.warning(
'GAP DETECTED:\n'
f'last_start_dt: {last_start_dt}\n'
f'diff: {diff}\n'
f'frame_time_diff_s: {frame_time_diff_s}\n'
f'{timeframe}s-series {reason} detected!\n'
f'fqme: {mkt.fqme}\n'
f'last_start_dt: {last_start_dt}\n\n'
f'recv interval: {recv_frame_dur}\n'
f'expected interval: {expected_dur}\n\n'
f'Missing duration of history of {missing_dur.in_words()!r}\n'
f'{missing_dur}\n'
)
# await tractor.pause()
to_push = diff_history(
array,
@ -565,22 +612,27 @@ async def start_backfill(
# long-term storage.
if (
storage is not None
and write_tsdb
and
write_tsdb
):
log.info(
f'Writing {ln} frame to storage:\n'
f'{next_start_dt} -> {last_start_dt}'
)
# always drop the src asset token for
# NOTE, always drop the src asset token for
# non-currency-pair like market types (for now)
#
# THAT IS, for now our table key schema is NOT
# including the dst[/src] source asset token. SO,
# 'tsla.nasdaq.ib' over 'tsla/usd.nasdaq.ib' for
# historical reasons ONLY.
if mkt.dst.atype not in {
'crypto',
'crypto_currency',
'fiat', # a "forex pair"
'perpetual_future', # stupid "perps" from cex land
}:
# for now, our table key schema is not including
# the dst[/src] source asset token.
col_sym_key: str = mkt.get_fqme(
delim_char='',
without_src=True,
@ -685,7 +737,7 @@ async def back_load_from_tsdb(
last_tsdb_dt
and latest_start_dt
):
backfilled_size_s = (
backfilled_size_s: Duration = (
latest_start_dt - last_tsdb_dt
).seconds
# if the shm buffer len is not large enough to contain
@ -908,6 +960,8 @@ async def tsdb_backfill(
f'{pformat(config)}\n'
)
# concurrently load the provider's most-recent-frame AND any
# pre-existing tsdb history already saved in `piker` storage.
dt_eps: list[DateTime, DateTime] = []
async with trio.open_nursery() as tn:
tn.start_soon(
@ -918,7 +972,6 @@ async def tsdb_backfill(
timeframe,
config,
)
tsdb_entry: tuple = await load_tsdb_hist(
storage,
mkt,
@ -947,6 +1000,25 @@ async def tsdb_backfill(
mr_end_dt,
) = dt_eps
first_frame_dur_s: Duration = (mr_end_dt - mr_start_dt).seconds
calced_frame_size: Duration = mk_duration(
seconds=first_frame_dur_s,
)
# NOTE, attempt to use the backend declared default frame
# sizing (as allowed by their time-series query APIs) and
# if not provided try to construct a default from the
# first frame received above.
def_frame_durs: dict[
int,
Duration,
]|None = config.get('frame_types', None)
if def_frame_durs:
def_frame_size: Duration = def_frame_durs[timeframe]
assert def_frame_size == calced_frame_size
else:
# use what we calced from first frame above.
def_frame_size = calced_frame_size
# NOTE: when there's no offline data, there's 2 cases:
# - data backend doesn't support timeframe/sample
# period (in which case `dt_eps` should be `None` and
@ -977,7 +1049,7 @@ async def tsdb_backfill(
partial(
start_backfill,
get_hist=get_hist,
frame_types=config.get('frame_types', None),
def_frame_duration=def_frame_size,
mod=mod,
mkt=mkt,
shm=shm,

View File

@ -616,6 +616,18 @@ def detect_price_gaps(
# ])
...
# TODO: probably just use the null_segs impl above?
def detect_vlm_gaps(
df: pl.DataFrame,
col: str = 'volume',
) -> pl.DataFrame:
vnull: pl.DataFrame = w_dts.filter(
pl.col(col) == 0
)
return vnull
def dedupe(
src_df: pl.DataFrame,
@ -626,7 +638,6 @@ def dedupe(
) -> tuple[
pl.DataFrame, # with dts
pl.DataFrame, # gaps
pl.DataFrame, # with deduplicated dts (aka gap/repeat removal)
int, # len diff between input and deduped
]:
@ -639,19 +650,22 @@ def dedupe(
'''
wdts: pl.DataFrame = with_dts(src_df)
# maybe sort on any time field
if sort:
wdts = wdts.sort(by='time')
# TODO: detect out-of-order segments which were corrected!
# -[ ] report in log msg
# -[ ] possibly return segment sections which were moved?
deduped = wdts
# remove duplicated datetime samples/sections
deduped: pl.DataFrame = wdts.unique(
subset=['dt'],
# subset=['dt'],
subset=['time'],
maintain_order=True,
)
# maybe sort on any time field
if sort:
deduped = deduped.sort(by='time')
# TODO: detect out-of-order segments which were corrected!
# -[ ] report in log msg
# -[ ] possibly return segment sections which were moved?
diff: int = (
wdts.height
-