diff --git a/piker/data/history.py b/piker/data/history.py index eea6e83f..fce81063 100644 --- a/piker/data/history.py +++ b/piker/data/history.py @@ -19,9 +19,9 @@ Historical data business logic for load, backfill and tsdb storage. ''' from __future__ import annotations -from collections import ( - Counter, -) +# from collections import ( +# Counter, +# ) from datetime import datetime from functools import partial # import time @@ -86,6 +86,7 @@ def diff_history( else: return array[times >= prepend_until_dt.timestamp()] + async def shm_push_in_between( shm: ShmArray, to_push: np.ndarray, @@ -191,7 +192,7 @@ async def start_backfill( # avoid duplicate history frames with a set of datetime frame # starts and associated counts of how many duplicates we see # per time stamp. - starts: Counter[datetime] = Counter() + # starts: Counter[datetime] = Counter() # conduct "backward history gap filling" where we push to # the shm buffer until we have history back until the @@ -201,11 +202,6 @@ async def start_backfill( while last_start_dt > backfill_until_dt: - # if timeframe == 60: - # await tractor.breakpoint() - # else: - # return - log.debug( f'Requesting {timeframe}s frame ending in {last_start_dt}' ) @@ -242,6 +238,7 @@ async def start_backfill( # f"{mkt.fqme}: skipping duplicate frame @ {next_start_dt}" # ) # starts[start_dt] += 1 + # await tractor.breakpoint() # continue # elif starts[next_start_dt] > 6: @@ -250,13 +247,12 @@ async def start_backfill( # ) # return - # only update new start point if not-yet-seen - start_dt: datetime = next_start_dt - starts[start_dt] += 1 + # # only update new start point if not-yet-seen + # starts[next_start_dt] += 1 - assert array['time'][0] == start_dt.timestamp() + assert array['time'][0] == next_start_dt.timestamp() - diff = last_start_dt - start_dt + diff = last_start_dt - next_start_dt frame_time_diff_s = diff.seconds # frame's worth of sample-period-steps, in seconds @@ -279,11 +275,12 @@ async def start_backfill( ) ln = len(to_push) if ln: - log.info(f'{ln} bars for {start_dt} -> {last_start_dt}') + log.info(f'{ln} bars for {next_start_dt} -> {last_start_dt}') else: log.warning( - f'{ln} BARS TO PUSH after diff?!: {start_dt} -> {last_start_dt}' + '0 BARS TO PUSH after diff!?\n' + f'{next_start_dt} -> {last_start_dt}' ) # bail gracefully on shm allocation overrun/full @@ -308,7 +305,7 @@ async def start_backfill( except ValueError as ve: _ve = ve log.error( - f'Shm buffer prepend OVERRUN on: {start_dt} -> {last_start_dt}?' + f'Shm prepend OVERRUN on: {next_start_dt} -> {last_start_dt}?' ) if next_prepend_index < ln: @@ -336,7 +333,7 @@ async def start_backfill( log.info( f'Shm pushed {ln} frame:\n' - f'{start_dt} -> {last_start_dt}' + f'{next_start_dt} -> {last_start_dt}' ) # FINALLY, maybe write immediately to the tsdb backend for @@ -347,7 +344,7 @@ async def start_backfill( ): log.info( f'Writing {ln} frame to storage:\n' - f'{start_dt} -> {last_start_dt}' + f'{next_start_dt} -> {last_start_dt}' ) if mkt.dst.atype not in {'crypto', 'crypto_currency'}: @@ -372,50 +369,52 @@ async def start_backfill( f'Finished filling gap to tsdb start @ {backfill_until_dt}!' ) # conduct tsdb timestamp gap detection and backfill any - # seemingly missing portions! - + # seemingly missing sequence segments.. + # TODO: ideally these never exist but somehow it seems + # sometimes we're writing zero-ed segments on certain + # (teardown) cases? from ._timeseries import detect_null_time_gap - indices: tuple | None = detect_null_time_gap(shm) - if indices: + gap_indices: tuple | None = detect_null_time_gap(shm) + while gap_indices: ( istart, start, end, iend, - ) = indices + ) = gap_indices + + start_dt = from_timestamp(start) + end_dt = from_timestamp(end) ( array, next_start_dt, next_end_dt, ) = await get_hist( timeframe, - start_dt=from_timestamp(start), - end_dt=from_timestamp(end), + start_dt=start_dt, + end_dt=end_dt, ) + await shm_push_in_between( shm, array, prepend_index=iend, update_start_on_prepend=False, ) + + # TODO: UI side needs IPC event to update.. + # - make sure the UI actually always handles + # this update! + # - remember that in the display side, only refersh this + # if the respective history is actually "in view". + # loop await sampler_stream.send({ 'broadcast_all': { 'backfilling': True }, }) - indices: tuple | None = detect_null_time_gap(shm) - if indices: - ( - istart, - start, - end, - iend, - ) = indices - await tractor.breakpoint() - - # TODO: can we only trigger this if the respective - # history in "in view"?!? + gap_indices: tuple | None = detect_null_time_gap(shm) # XXX: extremely important, there can be no checkpoints # in the block above to avoid entering new ``frames``