Enable tracing back insert backfills

Namely insertion writes which over-fill the shm buffer past the latest
tsdb sample via `.tsp._history.shm_push_in_between()`.

Deats,
- check earliest `to_push` timestamp and enter pause point if it's
  earlier then the tsdb's `backfill_until_dt` stamp.
- requires actually passing the `backfill_until_dt: datetime` thru,
  * `get_null_segs()`
  * `maybe_fill_null_segments()`
  * `shm_push_in_between()` (obvi XD)
hist_backfill_fixes
Gud Boi 2026-01-21 22:31:30 -05:00
parent a8e4e1b2c5
commit cd6bc105de
1 changed files with 42 additions and 16 deletions

View File

@ -75,7 +75,6 @@ from piker.brokers._util import (
) )
from piker.storage import TimeseriesNotFound from piker.storage import TimeseriesNotFound
from ._anal import ( from ._anal import (
dedupe, dedupe,
get_null_segs, get_null_segs,
iter_null_segs, iter_null_segs,
@ -120,15 +119,16 @@ _rt_buffer_start = int((_days_worth - 1) * _secs_in_day)
def diff_history( def diff_history(
array: np.ndarray, array: np.ndarray,
append_until_dt: datetime | None = None, append_until_dt: datetime|None = None,
prepend_until_dt: datetime | None = None, prepend_until_dt: datetime|None = None,
) -> np.ndarray: ) -> np.ndarray:
# no diffing with tsdb dt index possible.. # no diffing with tsdb dt index possible..
if ( if (
prepend_until_dt is None prepend_until_dt is None
and append_until_dt is None and
append_until_dt is None
): ):
return array return array
@ -140,15 +140,26 @@ def diff_history(
return array[times >= prepend_until_dt.timestamp()] return array[times >= prepend_until_dt.timestamp()]
# TODO: can't we just make this a sync func now?
async def shm_push_in_between( async def shm_push_in_between(
shm: ShmArray, shm: ShmArray,
to_push: np.ndarray, to_push: np.ndarray,
prepend_index: int, prepend_index: int,
backfill_until_dt: datetime,
update_start_on_prepend: bool = False, update_start_on_prepend: bool = False,
) -> int: ) -> int:
# XXX, try to catch bad inserts by peeking at the first/last
# times and ensure we don't violate order.
f_times: np.ndarray = to_push['time']
f_start: float = f_times[0]
f_start_dt = from_timestamp(f_start)
if (
f_start_dt < backfill_until_dt
):
await tractor.pause()
# XXX: extremely important, there can be no checkpoints # XXX: extremely important, there can be no checkpoints
# in the body of this func to avoid entering new ``frames`` # in the body of this func to avoid entering new ``frames``
# values while we're pipelining the current ones to # values while we're pipelining the current ones to
@ -181,6 +192,7 @@ async def maybe_fill_null_segments(
get_hist: Callable, get_hist: Callable,
sampler_stream: tractor.MsgStream, sampler_stream: tractor.MsgStream,
mkt: MktPair, mkt: MktPair,
backfill_until_dt: datetime,
task_status: TaskStatus[trio.Event] = trio.TASK_STATUS_IGNORED, task_status: TaskStatus[trio.Event] = trio.TASK_STATUS_IGNORED,
@ -191,7 +203,11 @@ async def maybe_fill_null_segments(
frame: Frame = shm.array frame: Frame = shm.array
null_segs: tuple | None = get_null_segs( # TODO, put in parent task/daemon root!
import greenback
await greenback.ensure_portal()
null_segs: tuple|None = get_null_segs(
frame, frame,
period=timeframe, period=timeframe,
) )
@ -237,6 +253,7 @@ async def maybe_fill_null_segments(
shm, shm,
to_push, to_push,
prepend_index=absi_end, prepend_index=absi_end,
backfill_until_dt=backfill_until_dt,
update_start_on_prepend=False, update_start_on_prepend=False,
) )
# TODO: UI side needs IPC event to update.. # TODO: UI side needs IPC event to update..
@ -352,15 +369,12 @@ async def start_backfill(
mkt: MktPair, mkt: MktPair,
shm: ShmArray, shm: ShmArray,
timeframe: float, timeframe: float,
backfill_from_shm_index: int, backfill_from_shm_index: int,
backfill_from_dt: datetime, backfill_from_dt: datetime,
sampler_stream: tractor.MsgStream, sampler_stream: tractor.MsgStream,
backfill_until_dt: datetime | None = None, backfill_until_dt: datetime|None = None,
storage: StorageClient | None = None, storage: StorageClient|None = None,
write_tsdb: bool = True, write_tsdb: bool = True,
task_status: TaskStatus[tuple] = trio.TASK_STATUS_IGNORED, task_status: TaskStatus[tuple] = trio.TASK_STATUS_IGNORED,
@ -495,7 +509,14 @@ async def start_backfill(
assert time[-1] == next_end_dt.timestamp() assert time[-1] == next_end_dt.timestamp()
expected_dur: Interval = last_start_dt - next_start_dt expected_dur: Interval = (
last_start_dt.subtract(
seconds=timeframe
# ^XXX, always "up to" the bar *before*
)
-
next_start_dt
)
# frame's worth of sample-period-steps, in seconds # frame's worth of sample-period-steps, in seconds
frame_size_s: float = len(array) * timeframe frame_size_s: float = len(array) * timeframe
@ -556,6 +577,7 @@ async def start_backfill(
shm, shm,
to_push, to_push,
prepend_index=next_prepend_index, prepend_index=next_prepend_index,
backfill_until_dt=backfill_until_dt,
update_start_on_prepend=update_start_on_prepend, update_start_on_prepend=update_start_on_prepend,
) )
await sampler_stream.send({ await sampler_stream.send({
@ -585,6 +607,7 @@ async def start_backfill(
shm, shm,
to_push, to_push,
prepend_index=next_prepend_index, prepend_index=next_prepend_index,
backfill_until_dt=backfill_until_dt,
update_start_on_prepend=update_start_on_prepend, update_start_on_prepend=update_start_on_prepend,
) )
await sampler_stream.send({ await sampler_stream.send({
@ -899,7 +922,7 @@ async def load_tsdb_hist(
DateTime, DateTime,
] ]
try: try:
tsdb_entry: tuple|None = await storage.load( tsdb_entry: tuple|None = await storage.load(
fqme, fqme,
timeframe=timeframe, timeframe=timeframe,
) )
@ -1056,7 +1079,7 @@ async def tsdb_backfill(
trio.open_nursery() as tn, trio.open_nursery() as tn,
): ):
bf_done = await tn.start( bf_done: trio.Event = await tn.start(
partial( partial(
start_backfill, start_backfill,
get_hist=get_hist, get_hist=get_hist,
@ -1076,8 +1099,10 @@ async def tsdb_backfill(
write_tsdb=True, write_tsdb=True,
) )
) )
nulls_detected: trio.Event | None = None nulls_detected: trio.Event|None = None
if last_tsdb_dt is not None: if last_tsdb_dt is not None:
# calc the index from which the tsdb data should be # calc the index from which the tsdb data should be
# prepended, presuming there is a gap between the # prepended, presuming there is a gap between the
# latest frame (loaded/read above) and the latest # latest frame (loaded/read above) and the latest
@ -1148,7 +1173,7 @@ async def tsdb_backfill(
# TODO: ideally these can never exist! # TODO: ideally these can never exist!
# -[ ] somehow it seems sometimes we're writing zero-ed # -[ ] somehow it seems sometimes we're writing zero-ed
# segments to tsdbs during teardown? # segments to tsdbs during teardown?
# -[ ] can we ensure that the backcfiller tasks do this # -[ ] can we ensure that the backfiller tasks do this
# work PREVENTAVELY instead? # work PREVENTAVELY instead?
# -[ ] fill in non-zero epoch time values ALWAYS! # -[ ] fill in non-zero epoch time values ALWAYS!
# await maybe_fill_null_segments( # await maybe_fill_null_segments(
@ -1160,6 +1185,7 @@ async def tsdb_backfill(
get_hist=get_hist, get_hist=get_hist,
sampler_stream=sampler_stream, sampler_stream=sampler_stream,
mkt=mkt, mkt=mkt,
backfill_until_dt=last_tsdb_dt,
)) ))
# 2nd nursery END # 2nd nursery END