Refine history gap/termination signalling

Namely handling backends which do not provide a default "frame
size-duration" in their init-config by making the backfiller guess the
value based on the first frame received.

Deats,
- adjust `start_backfill()` to take a more explicit
  `def_frame_duration: Duration` expected to be unpacked from any
  backend hist init-config by the `tsdb_backfill()` caller which now
  also computes a value from the first received frame when the config
  section isn't provided.
- in `start_backfill()` we now always expect the `def_frame_duration`
  input and always decrement the query range by this value whenever
  a `NoData` is raised by the provider-backend paired with an explicit
  `log.warning()` about the handling.
- also relay any `DataUnavailable.args[0]` message from the provider
  in the handler.
- repair "gap reporting" which checks for expected frame duration vs.
  that received with much better humanized logging on the missing
  segment using `pendulum.Interval/Duration.in_words()` output.
fix_deribit_hist_queries
Tyler Goodlet 2024-11-19 16:47:29 -05:00
parent f96bd51442
commit 9232d09440
1 changed files with 115 additions and 60 deletions

View File

@ -44,8 +44,10 @@ import trio
from trio_typing import TaskStatus from trio_typing import TaskStatus
import tractor import tractor
from pendulum import ( from pendulum import (
Interval,
DateTime, DateTime,
Duration, Duration,
duration as mk_duration,
from_timestamp, from_timestamp,
) )
import numpy as np import numpy as np
@ -214,7 +216,8 @@ async def maybe_fill_null_segments(
# pair, immediately stop backfilling? # pair, immediately stop backfilling?
if ( if (
start_dt start_dt
and end_dt < start_dt and
end_dt < start_dt
): ):
await tractor.pause() await tractor.pause()
break break
@ -262,6 +265,7 @@ async def maybe_fill_null_segments(
except tractor.ContextCancelled: except tractor.ContextCancelled:
# log.exception # log.exception
await tractor.pause() await tractor.pause()
raise
null_segs_detected.set() null_segs_detected.set()
# RECHECK for more null-gaps # RECHECK for more null-gaps
@ -349,7 +353,7 @@ async def maybe_fill_null_segments(
async def start_backfill( async def start_backfill(
get_hist, get_hist,
frame_types: dict[str, Duration] | None, def_frame_duration: Duration,
mod: ModuleType, mod: ModuleType,
mkt: MktPair, mkt: MktPair,
shm: ShmArray, shm: ShmArray,
@ -379,22 +383,23 @@ async def start_backfill(
update_start_on_prepend: bool = False update_start_on_prepend: bool = False
if backfill_until_dt is None: if backfill_until_dt is None:
# TODO: drop this right and just expose the backfill # TODO: per-provider default history-durations?
# limits inside a [storage] section in conf.toml? # -[ ] inside the `open_history_client()` config allow
# when no tsdb "last datum" is provided, we just load # declaring the history duration limits instead of
# some near-term history. # guessing and/or applying the same limits to all?
# periods = { #
# 1: {'days': 1}, # -[ ] allow declaring (default) per-provider backfill
# 60: {'days': 14}, # limits inside a [storage] sub-section in conf.toml?
# } #
# NOTE, when no tsdb "last datum" is provided, we just
# do a decently sized backfill and load it into storage. # load some near-term history by presuming a "decently
# large" 60s duration limit and a much shorter 1s range.
periods = { periods = {
1: {'days': 2}, 1: {'days': 2},
60: {'years': 6}, 60: {'years': 6},
} }
period_duration: int = periods[timeframe] period_duration: int = periods[timeframe]
update_start_on_prepend = True update_start_on_prepend: bool = True
# NOTE: manually set the "latest" datetime which we intend to # NOTE: manually set the "latest" datetime which we intend to
# backfill history "until" so as to adhere to the history # backfill history "until" so as to adhere to the history
@ -416,7 +421,6 @@ async def start_backfill(
f'backfill_until_dt: {backfill_until_dt}\n' f'backfill_until_dt: {backfill_until_dt}\n'
f'last_start_dt: {last_start_dt}\n' f'last_start_dt: {last_start_dt}\n'
) )
try: try:
( (
array, array,
@ -426,48 +430,58 @@ async def start_backfill(
timeframe, timeframe,
end_dt=last_start_dt, end_dt=last_start_dt,
) )
except NoData as _daterr: except NoData as _daterr:
# 3 cases: orig_last_start_dt: datetime = last_start_dt
# - frame in the middle of a legit venue gap gap_report: str = (
# - history actually began at the `last_start_dt` f'EMPTY FRAME for `end_dt: {last_start_dt}`?\n'
# - some other unknown error (ib blocking the f'{mod.name} -> tf@fqme: {timeframe}@{mkt.fqme}\n'
# history bc they don't want you seeing how they f'last_start_dt: {orig_last_start_dt}\n\n'
# cucked all the tinas..) f'bf_until: {backfill_until_dt}\n'
if ( )
frame_types # EMPTY FRAME signal with 3 (likely) causes:
and #
(dur := frame_types.get(timeframe)) # 1. range contains legit gap in venue history
): # 2. history actually (edge case) **began** at the
# value `last_start_dt`
# 3. some other unknown error (ib blocking the
# history-query bc they don't want you seeing how
# they cucked all the tinas.. like with options
# hist)
#
if def_frame_duration:
# decrement by a duration's (frame) worth of time # decrement by a duration's (frame) worth of time
# as maybe indicated by the backend to see if we # as maybe indicated by the backend to see if we
# can get older data before this possible # can get older data before this possible
# "history gap". # "history gap".
orig_last_start_dt = last_start_dt last_start_dt: datetime = last_start_dt.subtract(
last_start_dt = last_start_dt.subtract( seconds=def_frame_duration.total_seconds()
seconds=dur.total_seconds()
) )
log.warning( gap_report += (
f'{mod.name} -> EMPTY FRAME for end_dt?\n' f'Decrementing `end_dt` and retrying with,\n'
f'tf@fqme: {timeframe}@{mkt.fqme}\n' f'def_frame_duration: {def_frame_duration}\n'
f'Decrementing `end_dt` by {dur} and retry..\n\n' f'(new) last_start_dt: {last_start_dt}\n'
f'orig_last_start_dt: {orig_last_start_dt}\n'
f'dur subtracted last_start_dt: {last_start_dt}\n'
f'bf_until: {backfill_until_dt}\n'
) )
log.warning(gap_report)
# skip writing to shm/tsdb and try the next
# duration's worth of prior history.
continue continue
raise else:
# await tractor.pause()
raise DataUnavailable(gap_report)
# broker says there never was or is no more history to pull # broker says there never was or is no more history to pull
except DataUnavailable: except DataUnavailable as due:
message: str = due.args[0]
log.warning( log.warning(
f'NO-MORE-DATA in range?\n' f'Provider {mod.name!r} halted backfill due to,\n\n'
f'`{mod.name}` halted history:\n'
f'tf@fqme: {timeframe}@{mkt.fqme}\n' f'{message}\n'
'bf_until <- last_start_dt:\n'
f'{backfill_until_dt} <- {last_start_dt}\n' f'fqme: {mkt.fqme}\n'
f'timeframe: {timeframe}\n'
f'last_start_dt: {last_start_dt}\n'
f'bf_until: {backfill_until_dt}\n'
) )
# UGH: what's a better way? # UGH: what's a better way?
# TODO: backends are responsible for being correct on # TODO: backends are responsible for being correct on
@ -476,34 +490,54 @@ async def start_backfill(
# to halt the request loop until the condition is # to halt the request loop until the condition is
# resolved or should the backend be entirely in # resolved or should the backend be entirely in
# charge of solving such faults? yes, right? # charge of solving such faults? yes, right?
# if timeframe > 1:
# await tractor.pause()
return return
time: np.ndarray = array['time']
assert ( assert (
array['time'][0] time[0]
== ==
next_start_dt.timestamp() next_start_dt.timestamp()
) )
diff = last_start_dt - next_start_dt assert time[-1] == next_end_dt.timestamp()
frame_time_diff_s = diff.seconds
expected_dur: Interval = last_start_dt - next_start_dt
# frame's worth of sample-period-steps, in seconds # frame's worth of sample-period-steps, in seconds
frame_size_s: float = len(array) * timeframe frame_size_s: float = len(array) * timeframe
expected_frame_size_s: float = frame_size_s + timeframe recv_frame_dur: Duration = (
if frame_time_diff_s > expected_frame_size_s: from_timestamp(array[-1]['time'])
-
from_timestamp(array[0]['time'])
)
if (
(lt_frame := (recv_frame_dur < expected_dur))
or
(null_frame := (frame_size_s == 0))
# ^XXX, should NEVER hit now!
):
# XXX: query result includes a start point prior to our # XXX: query result includes a start point prior to our
# expected "frame size" and thus is likely some kind of # expected "frame size" and thus is likely some kind of
# history gap (eg. market closed period, outage, etc.) # history gap (eg. market closed period, outage, etc.)
# so just report it to console for now. # so just report it to console for now.
if lt_frame:
reason = 'Possible GAP (or first-datum)'
else:
assert null_frame
reason = 'NULL-FRAME'
missing_dur: Interval = expected_dur.end - recv_frame_dur.end
log.warning( log.warning(
'GAP DETECTED:\n' f'{timeframe}s-series {reason} detected!\n'
f'last_start_dt: {last_start_dt}\n' f'fqme: {mkt.fqme}\n'
f'diff: {diff}\n' f'last_start_dt: {last_start_dt}\n\n'
f'frame_time_diff_s: {frame_time_diff_s}\n' f'recv interval: {recv_frame_dur}\n'
f'expected interval: {expected_dur}\n\n'
f'Missing duration of history of {missing_dur.in_words()!r}\n'
f'{missing_dur}\n'
) )
# await tractor.pause()
to_push = diff_history( to_push = diff_history(
array, array,
@ -578,7 +612,8 @@ async def start_backfill(
# long-term storage. # long-term storage.
if ( if (
storage is not None storage is not None
and write_tsdb and
write_tsdb
): ):
log.info( log.info(
f'Writing {ln} frame to storage:\n' f'Writing {ln} frame to storage:\n'
@ -699,7 +734,7 @@ async def back_load_from_tsdb(
last_tsdb_dt last_tsdb_dt
and latest_start_dt and latest_start_dt
): ):
backfilled_size_s = ( backfilled_size_s: Duration = (
latest_start_dt - last_tsdb_dt latest_start_dt - last_tsdb_dt
).seconds ).seconds
# if the shm buffer len is not large enough to contain # if the shm buffer len is not large enough to contain
@ -922,6 +957,8 @@ async def tsdb_backfill(
f'{pformat(config)}\n' f'{pformat(config)}\n'
) )
# concurrently load the provider's most-recent-frame AND any
# pre-existing tsdb history already saved in `piker` storage.
dt_eps: list[DateTime, DateTime] = [] dt_eps: list[DateTime, DateTime] = []
async with trio.open_nursery() as tn: async with trio.open_nursery() as tn:
tn.start_soon( tn.start_soon(
@ -932,7 +969,6 @@ async def tsdb_backfill(
timeframe, timeframe,
config, config,
) )
tsdb_entry: tuple = await load_tsdb_hist( tsdb_entry: tuple = await load_tsdb_hist(
storage, storage,
mkt, mkt,
@ -961,6 +997,25 @@ async def tsdb_backfill(
mr_end_dt, mr_end_dt,
) = dt_eps ) = dt_eps
first_frame_dur_s: Duration = (mr_end_dt - mr_start_dt).seconds
calced_frame_size: Duration = mk_duration(
seconds=first_frame_dur_s,
)
# NOTE, attempt to use the backend declared default frame
# sizing (as allowed by their time-series query APIs) and
# if not provided try to construct a default from the
# first frame received above.
def_frame_durs: dict[
int,
Duration,
]|None = config.get('frame_types', None)
if def_frame_durs:
def_frame_size: Duration = def_frame_durs[timeframe]
assert def_frame_size == calced_frame_size
else:
# use what we calced from first frame above.
def_frame_size = calced_frame_size
# NOTE: when there's no offline data, there's 2 cases: # NOTE: when there's no offline data, there's 2 cases:
# - data backend doesn't support timeframe/sample # - data backend doesn't support timeframe/sample
# period (in which case `dt_eps` should be `None` and # period (in which case `dt_eps` should be `None` and
@ -991,7 +1046,7 @@ async def tsdb_backfill(
partial( partial(
start_backfill, start_backfill,
get_hist=get_hist, get_hist=get_hist,
frame_types=config.get('frame_types', None), def_frame_duration=def_frame_size,
mod=mod, mod=mod,
mkt=mkt, mkt=mkt,
shm=shm, shm=shm,