diff --git a/piker/data/feed.py b/piker/data/feed.py index 1843b302..e4f4844a 100644 --- a/piker/data/feed.py +++ b/piker/data/feed.py @@ -25,6 +25,7 @@ from dataclasses import dataclass, field from datetime import datetime from contextlib import asynccontextmanager from functools import partial +from pprint import pformat from types import ModuleType from typing import ( Any, @@ -322,7 +323,8 @@ async def start_backfill( start, last_tsdb_dt, ) - dtrange = hist_period.range('seconds', frame_size_s) + dtrange = list(hist_period.range('seconds', frame_size_s)) + log.debug(f'New datetime index:\n{pformat(dtrange)}') for end_dt in dtrange: log.warning(f'Yielding next frame start {end_dt}') @@ -395,7 +397,7 @@ async def start_backfill( diff = end_dt - start_dt frame_time_diff_s = diff.seconds - expected_frame_size_s = frame_size_s + step_size_s + expected_frame_size_s = frame_size_s # + step_size_s if frame_time_diff_s > expected_frame_size_s: @@ -515,27 +517,40 @@ async def start_backfill( epochs = list(reversed(sorted(frames))) for epoch in epochs: + start = shm.array['time'][0] + last_shm_prepend_dt = pendulum.from_timestamp(start) + earliest_frame_queue_dt = pendulum.from_timestamp(epoch) diff = epoch - start if abs(diff) > step_size_s: - if earliest_end_dt < end_dt: + if earliest_end_dt < earliest_frame_queue_dt: # XXX: an expected gap was encountered (see # logic in ``get_ohlc_frame()``, so allow # this frame through to the storage layer. log.warning( - f'there is an expected history gap of {diff}s:' + f'Expected history gap of {diff}s:\n' + f'{earliest_frame_queue_dt} <- ' + f'{earliest_end_dt}' ) elif ( erlangs > 1 - and len(epochs) < erlangs ): # we don't yet have the next frame to push # so break back to the async request loop # while we wait for more async frame-results # to arrive. + if len(frames) >= erlangs: + log.warning( + 'Frame count in async-queue is greater ' + 'then erlangs?\n' + 'There seems to be a gap between:\n' + f'{earliest_frame_queue_dt} <- ' + f'{last_shm_prepend_dt}' + ) + expect_end = pendulum.from_timestamp(start) expect_start = expect_end.subtract( seconds=frame_size_s)