From cd6bc105de892165d0efd57065c53ff2289fffd9 Mon Sep 17 00:00:00 2001 From: goodboy Date: Wed, 21 Jan 2026 22:31:30 -0500 Subject: [PATCH] Enable tracing back insert backfills Namely insertion writes which over-fill the shm buffer past the latest tsdb sample via `.tsp._history.shm_push_in_between()`. Deats, - check earliest `to_push` timestamp and enter pause point if it's earlier then the tsdb's `backfill_until_dt` stamp. - requires actually passing the `backfill_until_dt: datetime` thru, * `get_null_segs()` * `maybe_fill_null_segments()` * `shm_push_in_between()` (obvi XD) --- piker/tsp/_history.py | 58 +++++++++++++++++++++++++++++++------------ 1 file changed, 42 insertions(+), 16 deletions(-) diff --git a/piker/tsp/_history.py b/piker/tsp/_history.py index b6b15e72..361b0e23 100644 --- a/piker/tsp/_history.py +++ b/piker/tsp/_history.py @@ -75,7 +75,6 @@ from piker.brokers._util import ( ) from piker.storage import TimeseriesNotFound from ._anal import ( - dedupe, get_null_segs, iter_null_segs, @@ -120,15 +119,16 @@ _rt_buffer_start = int((_days_worth - 1) * _secs_in_day) def diff_history( array: np.ndarray, - append_until_dt: datetime | None = None, - prepend_until_dt: datetime | None = None, + append_until_dt: datetime|None = None, + prepend_until_dt: datetime|None = None, ) -> np.ndarray: # no diffing with tsdb dt index possible.. if ( prepend_until_dt is None - and append_until_dt is None + and + append_until_dt is None ): return array @@ -140,15 +140,26 @@ def diff_history( return array[times >= prepend_until_dt.timestamp()] -# TODO: can't we just make this a sync func now? async def shm_push_in_between( shm: ShmArray, to_push: np.ndarray, prepend_index: int, + backfill_until_dt: datetime, update_start_on_prepend: bool = False, ) -> int: + + # XXX, try to catch bad inserts by peeking at the first/last + # times and ensure we don't violate order. + f_times: np.ndarray = to_push['time'] + f_start: float = f_times[0] + f_start_dt = from_timestamp(f_start) + if ( + f_start_dt < backfill_until_dt + ): + await tractor.pause() + # XXX: extremely important, there can be no checkpoints # in the body of this func to avoid entering new ``frames`` # values while we're pipelining the current ones to @@ -181,6 +192,7 @@ async def maybe_fill_null_segments( get_hist: Callable, sampler_stream: tractor.MsgStream, mkt: MktPair, + backfill_until_dt: datetime, task_status: TaskStatus[trio.Event] = trio.TASK_STATUS_IGNORED, @@ -191,7 +203,11 @@ async def maybe_fill_null_segments( frame: Frame = shm.array - null_segs: tuple | None = get_null_segs( + # TODO, put in parent task/daemon root! + import greenback + await greenback.ensure_portal() + + null_segs: tuple|None = get_null_segs( frame, period=timeframe, ) @@ -237,6 +253,7 @@ async def maybe_fill_null_segments( shm, to_push, prepend_index=absi_end, + backfill_until_dt=backfill_until_dt, update_start_on_prepend=False, ) # TODO: UI side needs IPC event to update.. @@ -352,15 +369,12 @@ async def start_backfill( mkt: MktPair, shm: ShmArray, timeframe: float, - backfill_from_shm_index: int, backfill_from_dt: datetime, - sampler_stream: tractor.MsgStream, - backfill_until_dt: datetime | None = None, - storage: StorageClient | None = None, - + backfill_until_dt: datetime|None = None, + storage: StorageClient|None = None, write_tsdb: bool = True, task_status: TaskStatus[tuple] = trio.TASK_STATUS_IGNORED, @@ -495,7 +509,14 @@ async def start_backfill( assert time[-1] == next_end_dt.timestamp() - expected_dur: Interval = last_start_dt - next_start_dt + expected_dur: Interval = ( + last_start_dt.subtract( + seconds=timeframe + # ^XXX, always "up to" the bar *before* + ) + - + next_start_dt + ) # frame's worth of sample-period-steps, in seconds frame_size_s: float = len(array) * timeframe @@ -556,6 +577,7 @@ async def start_backfill( shm, to_push, prepend_index=next_prepend_index, + backfill_until_dt=backfill_until_dt, update_start_on_prepend=update_start_on_prepend, ) await sampler_stream.send({ @@ -585,6 +607,7 @@ async def start_backfill( shm, to_push, prepend_index=next_prepend_index, + backfill_until_dt=backfill_until_dt, update_start_on_prepend=update_start_on_prepend, ) await sampler_stream.send({ @@ -899,7 +922,7 @@ async def load_tsdb_hist( DateTime, ] try: - tsdb_entry: tuple|None = await storage.load( + tsdb_entry: tuple|None = await storage.load( fqme, timeframe=timeframe, ) @@ -1056,7 +1079,7 @@ async def tsdb_backfill( trio.open_nursery() as tn, ): - bf_done = await tn.start( + bf_done: trio.Event = await tn.start( partial( start_backfill, get_hist=get_hist, @@ -1076,8 +1099,10 @@ async def tsdb_backfill( write_tsdb=True, ) ) - nulls_detected: trio.Event | None = None + nulls_detected: trio.Event|None = None + if last_tsdb_dt is not None: + # calc the index from which the tsdb data should be # prepended, presuming there is a gap between the # latest frame (loaded/read above) and the latest @@ -1148,7 +1173,7 @@ async def tsdb_backfill( # TODO: ideally these can never exist! # -[ ] somehow it seems sometimes we're writing zero-ed # segments to tsdbs during teardown? - # -[ ] can we ensure that the backcfiller tasks do this + # -[ ] can we ensure that the backfiller tasks do this # work PREVENTAVELY instead? # -[ ] fill in non-zero epoch time values ALWAYS! # await maybe_fill_null_segments( @@ -1160,6 +1185,7 @@ async def tsdb_backfill( get_hist=get_hist, sampler_stream=sampler_stream, mkt=mkt, + backfill_until_dt=last_tsdb_dt, )) # 2nd nursery END