Manually fetch missing out-of-order history frames

It seems once in a while a frame can get missed or dropped (at least
with binance?) so in those cases, when the request erlangs is already at
max, we just manually request the missing frame and presume things will
work out XD

Further, discard out of order frames that are "from the future" that
somehow end up in the async queue once in a while? Not sure why this
happens but it seems thus far just discarding them is nbd.
l1_precision_fix
Tyler Goodlet 2022-05-10 17:21:11 -04:00
parent b1246446c2
commit 1657f51edc
1 changed files with 37 additions and 3 deletions

View File

@ -397,7 +397,7 @@ async def start_backfill(
diff = end_dt - start_dt diff = end_dt - start_dt
frame_time_diff_s = diff.seconds frame_time_diff_s = diff.seconds
expected_frame_size_s = frame_size_s # + step_size_s expected_frame_size_s = frame_size_s + step_size_s
if frame_time_diff_s > expected_frame_size_s: if frame_time_diff_s > expected_frame_size_s:
@ -522,8 +522,18 @@ async def start_backfill(
last_shm_prepend_dt = pendulum.from_timestamp(start) last_shm_prepend_dt = pendulum.from_timestamp(start)
earliest_frame_queue_dt = pendulum.from_timestamp(epoch) earliest_frame_queue_dt = pendulum.from_timestamp(epoch)
diff = epoch - start diff = start - epoch
if abs(diff) > step_size_s:
if diff < 0:
log.warning(
'Discarding out of order frame:\n'
f'{earliest_frame_queue_dt}'
)
frames.pop(epoch)
continue
# await tractor.breakpoint()
if diff > step_size_s:
if earliest_end_dt < earliest_frame_queue_dt: if earliest_end_dt < earliest_frame_queue_dt:
# XXX: an expected gap was encountered (see # XXX: an expected gap was encountered (see
@ -548,8 +558,32 @@ async def start_backfill(
'then erlangs?\n' 'then erlangs?\n'
'There seems to be a gap between:\n' 'There seems to be a gap between:\n'
f'{earliest_frame_queue_dt} <- ' f'{earliest_frame_queue_dt} <- '
f'{last_shm_prepend_dt}\n'
'Conducting manual call for frame ending: '
f'{last_shm_prepend_dt}' f'{last_shm_prepend_dt}'
) )
(
to_push,
start_dt,
end_dt,
) = await get_ohlc_frame(
input_end_dt=last_shm_prepend_dt,
iter_dts_gen=idts,
)
last_epoch = to_push['time'][-1]
diff = start - last_epoch
if diff > step_size_s:
await tractor.breakpoint()
raise DataUnavailable(
'An awkward frame was found:\n'
f'{start_dt} -> {end_dt}:\n{to_push}'
)
else:
frames[last_epoch] = (
to_push, start_dt, end_dt)
break
expect_end = pendulum.from_timestamp(start) expect_end = pendulum.from_timestamp(start)
expect_start = expect_end.subtract( expect_start = expect_end.subtract(