diff --git a/piker/data/feed.py b/piker/data/feed.py index e4a998d5..e77052bf 100644 --- a/piker/data/feed.py +++ b/piker/data/feed.py @@ -397,7 +397,7 @@ async def start_backfill( diff = end_dt - start_dt frame_time_diff_s = diff.seconds - expected_frame_size_s = frame_size_s # + step_size_s + expected_frame_size_s = frame_size_s + step_size_s if frame_time_diff_s > expected_frame_size_s: @@ -522,8 +522,18 @@ async def start_backfill( last_shm_prepend_dt = pendulum.from_timestamp(start) earliest_frame_queue_dt = pendulum.from_timestamp(epoch) - diff = epoch - start - if abs(diff) > step_size_s: + diff = start - epoch + + if diff < 0: + log.warning( + 'Discarding out of order frame:\n' + f'{earliest_frame_queue_dt}' + ) + frames.pop(epoch) + continue + # await tractor.breakpoint() + + if diff > step_size_s: if earliest_end_dt < earliest_frame_queue_dt: # XXX: an expected gap was encountered (see @@ -548,8 +558,32 @@ async def start_backfill( 'then erlangs?\n' 'There seems to be a gap between:\n' f'{earliest_frame_queue_dt} <- ' + f'{last_shm_prepend_dt}\n' + 'Conducting manual call for frame ending: ' f'{last_shm_prepend_dt}' ) + ( + to_push, + start_dt, + end_dt, + ) = await get_ohlc_frame( + input_end_dt=last_shm_prepend_dt, + iter_dts_gen=idts, + ) + last_epoch = to_push['time'][-1] + diff = start - last_epoch + + if diff > step_size_s: + await tractor.breakpoint() + raise DataUnavailable( + 'An awkward frame was found:\n' + f'{start_dt} -> {end_dt}:\n{to_push}' + ) + + else: + frames[last_epoch] = ( + to_push, start_dt, end_dt) + break expect_end = pendulum.from_timestamp(start) expect_start = expect_end.subtract(