diff --git a/piker/tsp/__init__.py b/piker/tsp/__init__.py index 559da92c..adbe484e 100644 --- a/piker/tsp/__init__.py +++ b/piker/tsp/__init__.py @@ -51,10 +51,11 @@ from pendulum import ( import numpy as np import polars as pl -from ..accounting import ( +from piker.brokers import NoData +from piker.accounting import ( MktPair, ) -from ..data._util import ( +from piker.data._util import ( log, ) from ..data._sharedmem import ( @@ -302,16 +303,28 @@ async def maybe_fill_null_segments( gap: np.ndarray = shm._array[istart:istop] # copy the oldest OHLC samples forward - gap[ohlc_fields] = shm._array[istart]['close'] + cls: float = shm._array[istart]['close'] + + # TODO: how can we mark this range as being a gap tho? + # -[ ] maybe pg finally supports nulls in ndarray to + # show empty space somehow? + # -[ ] we could put a special value in the vlm or + # another col/field to denote? + gap[ohlc_fields] = cls start_t: float = shm._array[istart]['time'] t_diff: float = (istop - istart)*timeframe + gap['time'] = np.arange( start=start_t, stop=start_t + t_diff, step=timeframe, ) + # TODO: reimpl using the new `.ui._remote_ctl` ctx + # ideally using some kinda decent + # tractory-reverse-lookup-connnection from some other + # `Context` type thingy? await sampler_stream.send({ 'broadcast_all': { @@ -332,11 +345,11 @@ async def maybe_fill_null_segments( # parallel possible no matter the backend? # -[ ] fill algo: do queries in alternating "latest, then # earliest, then latest.. etc?" - # await tractor.pause() async def start_backfill( get_hist, + frame_types: dict[str, Duration] | None, mod: ModuleType, mkt: MktPair, shm: ShmArray, @@ -381,7 +394,6 @@ async def start_backfill( 60: {'years': 6}, } period_duration: int = periods[timeframe] - update_start_on_prepend = True # NOTE: manually set the "latest" datetime which we intend to @@ -415,6 +427,28 @@ async def start_backfill( end_dt=last_start_dt, ) + except NoData as _daterr: + # 3 cases: + # - frame in the middle of a legit venue gap + # - history actually began at the `last_start_dt` + # - some other unknown error (ib blocking the + # history bc they don't want you seeing how they + # cucked all the tinas..) + if dur := frame_types.get(timeframe): + # decrement by a frame's worth of duration and + # retry a few times. + last_start_dt.subtract( + seconds=dur.total_seconds() + ) + log.warning( + f'{mod.name} -> EMPTY FRAME for end_dt?\n' + f'tf@fqme: {timeframe}@{mkt.fqme}\n' + 'bf_until <- last_start_dt:\n' + f'{backfill_until_dt} <- {last_start_dt}\n' + f'Decrementing `end_dt` by {dur} and retry..\n' + ) + continue + # broker says there never was or is no more history to pull except DataUnavailable: log.warning( @@ -871,7 +905,7 @@ async def tsdb_backfill( ): log.info( f'`{mod}` history client returned backfill config:\n' - f'{config}\n' + f'{pformat(config)}\n' ) dt_eps: list[DateTime, DateTime] = [] @@ -943,6 +977,7 @@ async def tsdb_backfill( partial( start_backfill, get_hist=get_hist, + frame_types=config.get('frame_types', None), mod=mod, mkt=mkt, shm=shm,