From 1657f51edc8be0d8aa3ff7c3bb94a3aea00692a4 Mon Sep 17 00:00:00 2001 From: Tyler Goodlet Date: Tue, 10 May 2022 17:21:11 -0400 Subject: [PATCH] Manually fetch missing out-of-order history frames It seems once in a while a frame can get missed or dropped (at least with binance?) so in those cases, when the request erlangs is already at max, we just manually request the missing frame and presume things will work out XD Further, discard out of order frames that are "from the future" that somehow end up in the async queue once in a while? Not sure why this happens but it seems thus far just discarding them is nbd. --- piker/data/feed.py | 40 +++++++++++++++++++++++++++++++++++++--- 1 file changed, 37 insertions(+), 3 deletions(-) diff --git a/piker/data/feed.py b/piker/data/feed.py index e4a998d5..e77052bf 100644 --- a/piker/data/feed.py +++ b/piker/data/feed.py @@ -397,7 +397,7 @@ async def start_backfill( diff = end_dt - start_dt frame_time_diff_s = diff.seconds - expected_frame_size_s = frame_size_s # + step_size_s + expected_frame_size_s = frame_size_s + step_size_s if frame_time_diff_s > expected_frame_size_s: @@ -522,8 +522,18 @@ async def start_backfill( last_shm_prepend_dt = pendulum.from_timestamp(start) earliest_frame_queue_dt = pendulum.from_timestamp(epoch) - diff = epoch - start - if abs(diff) > step_size_s: + diff = start - epoch + + if diff < 0: + log.warning( + 'Discarding out of order frame:\n' + f'{earliest_frame_queue_dt}' + ) + frames.pop(epoch) + continue + # await tractor.breakpoint() + + if diff > step_size_s: if earliest_end_dt < earliest_frame_queue_dt: # XXX: an expected gap was encountered (see @@ -548,8 +558,32 @@ async def start_backfill( 'then erlangs?\n' 'There seems to be a gap between:\n' f'{earliest_frame_queue_dt} <- ' + f'{last_shm_prepend_dt}\n' + 'Conducting manual call for frame ending: ' f'{last_shm_prepend_dt}' ) + ( + to_push, + start_dt, + end_dt, + ) = await get_ohlc_frame( + input_end_dt=last_shm_prepend_dt, + iter_dts_gen=idts, + ) + last_epoch = to_push['time'][-1] + diff = start - last_epoch + + if diff > step_size_s: + await tractor.breakpoint() + raise DataUnavailable( + 'An awkward frame was found:\n' + f'{start_dt} -> {end_dt}:\n{to_push}' + ) + + else: + frames[last_epoch] = ( + to_push, start_dt, end_dt) + break expect_end = pendulum.from_timestamp(start) expect_start = expect_end.subtract(