`tsp`: on backfill, do a smart retry on a `NoData`

Presuming the data provider gives us a config with a `frame_types: dict`
(indicating frame sizes per query/request) we try to be clever and
decrement our submitted `end_dt: DateTime` based on it.. hoping for the
best on the next attempt.
distribute_dis
Tyler Goodlet 2024-01-03 19:49:41 -05:00
parent b23d44e21a
commit 7ae7cc829f
1 changed files with 41 additions and 6 deletions

View File

@ -51,10 +51,11 @@ from pendulum import (
import numpy as np import numpy as np
import polars as pl import polars as pl
from ..accounting import ( from piker.brokers import NoData
from piker.accounting import (
MktPair, MktPair,
) )
from ..data._util import ( from piker.data._util import (
log, log,
) )
from ..data._sharedmem import ( from ..data._sharedmem import (
@ -302,16 +303,28 @@ async def maybe_fill_null_segments(
gap: np.ndarray = shm._array[istart:istop] gap: np.ndarray = shm._array[istart:istop]
# copy the oldest OHLC samples forward # copy the oldest OHLC samples forward
gap[ohlc_fields] = shm._array[istart]['close'] cls: float = shm._array[istart]['close']
# TODO: how can we mark this range as being a gap tho?
# -[ ] maybe pg finally supports nulls in ndarray to
# show empty space somehow?
# -[ ] we could put a special value in the vlm or
# another col/field to denote?
gap[ohlc_fields] = cls
start_t: float = shm._array[istart]['time'] start_t: float = shm._array[istart]['time']
t_diff: float = (istop - istart)*timeframe t_diff: float = (istop - istart)*timeframe
gap['time'] = np.arange( gap['time'] = np.arange(
start=start_t, start=start_t,
stop=start_t + t_diff, stop=start_t + t_diff,
step=timeframe, step=timeframe,
) )
# TODO: reimpl using the new `.ui._remote_ctl` ctx
# ideally using some kinda decent
# tractory-reverse-lookup-connnection from some other
# `Context` type thingy?
await sampler_stream.send({ await sampler_stream.send({
'broadcast_all': { 'broadcast_all': {
@ -332,11 +345,11 @@ async def maybe_fill_null_segments(
# parallel possible no matter the backend? # parallel possible no matter the backend?
# -[ ] fill algo: do queries in alternating "latest, then # -[ ] fill algo: do queries in alternating "latest, then
# earliest, then latest.. etc?" # earliest, then latest.. etc?"
# await tractor.pause()
async def start_backfill( async def start_backfill(
get_hist, get_hist,
frame_types: dict[str, Duration] | None,
mod: ModuleType, mod: ModuleType,
mkt: MktPair, mkt: MktPair,
shm: ShmArray, shm: ShmArray,
@ -381,7 +394,6 @@ async def start_backfill(
60: {'years': 6}, 60: {'years': 6},
} }
period_duration: int = periods[timeframe] period_duration: int = periods[timeframe]
update_start_on_prepend = True update_start_on_prepend = True
# NOTE: manually set the "latest" datetime which we intend to # NOTE: manually set the "latest" datetime which we intend to
@ -415,6 +427,28 @@ async def start_backfill(
end_dt=last_start_dt, end_dt=last_start_dt,
) )
except NoData as _daterr:
# 3 cases:
# - frame in the middle of a legit venue gap
# - history actually began at the `last_start_dt`
# - some other unknown error (ib blocking the
# history bc they don't want you seeing how they
# cucked all the tinas..)
if dur := frame_types.get(timeframe):
# decrement by a frame's worth of duration and
# retry a few times.
last_start_dt.subtract(
seconds=dur.total_seconds()
)
log.warning(
f'{mod.name} -> EMPTY FRAME for end_dt?\n'
f'tf@fqme: {timeframe}@{mkt.fqme}\n'
'bf_until <- last_start_dt:\n'
f'{backfill_until_dt} <- {last_start_dt}\n'
f'Decrementing `end_dt` by {dur} and retry..\n'
)
continue
# broker says there never was or is no more history to pull # broker says there never was or is no more history to pull
except DataUnavailable: except DataUnavailable:
log.warning( log.warning(
@ -871,7 +905,7 @@ async def tsdb_backfill(
): ):
log.info( log.info(
f'`{mod}` history client returned backfill config:\n' f'`{mod}` history client returned backfill config:\n'
f'{config}\n' f'{pformat(config)}\n'
) )
dt_eps: list[DateTime, DateTime] = [] dt_eps: list[DateTime, DateTime] = []
@ -943,6 +977,7 @@ async def tsdb_backfill(
partial( partial(
start_backfill, start_backfill,
get_hist=get_hist, get_hist=get_hist,
frame_types=config.get('frame_types', None),
mod=mod, mod=mod,
mkt=mkt, mkt=mkt,
shm=shm, shm=shm,