Fix earliest frame-end not-yet-pushed check

Bleh/🤦, the ``end_dt`` in scope is not the "earliest" frame's
`end_dt` in the async response queue.. Parse the queue's latest epoch
and use **that** to compare to the last last pushed datetime index..

Add more detailed logging to help debug any (un)expected datetime index
gaps.
l1_precision_fix
Tyler Goodlet 2022-05-10 10:47:41 -04:00
parent 4b6ecbfc79
commit 26fddae3c0
1 changed files with 20 additions and 5 deletions

View File

@ -25,6 +25,7 @@ from dataclasses import dataclass, field
from datetime import datetime from datetime import datetime
from contextlib import asynccontextmanager from contextlib import asynccontextmanager
from functools import partial from functools import partial
from pprint import pformat
from types import ModuleType from types import ModuleType
from typing import ( from typing import (
Any, Any,
@ -322,7 +323,8 @@ async def start_backfill(
start, start,
last_tsdb_dt, last_tsdb_dt,
) )
dtrange = hist_period.range('seconds', frame_size_s) dtrange = list(hist_period.range('seconds', frame_size_s))
log.debug(f'New datetime index:\n{pformat(dtrange)}')
for end_dt in dtrange: for end_dt in dtrange:
log.warning(f'Yielding next frame start {end_dt}') log.warning(f'Yielding next frame start {end_dt}')
@ -395,7 +397,7 @@ async def start_backfill(
diff = end_dt - start_dt diff = end_dt - start_dt
frame_time_diff_s = diff.seconds frame_time_diff_s = diff.seconds
expected_frame_size_s = frame_size_s + step_size_s expected_frame_size_s = frame_size_s # + step_size_s
if frame_time_diff_s > expected_frame_size_s: if frame_time_diff_s > expected_frame_size_s:
@ -515,27 +517,40 @@ async def start_backfill(
epochs = list(reversed(sorted(frames))) epochs = list(reversed(sorted(frames)))
for epoch in epochs: for epoch in epochs:
start = shm.array['time'][0] start = shm.array['time'][0]
last_shm_prepend_dt = pendulum.from_timestamp(start)
earliest_frame_queue_dt = pendulum.from_timestamp(epoch)
diff = epoch - start diff = epoch - start
if abs(diff) > step_size_s: if abs(diff) > step_size_s:
if earliest_end_dt < end_dt: if earliest_end_dt < earliest_frame_queue_dt:
# XXX: an expected gap was encountered (see # XXX: an expected gap was encountered (see
# logic in ``get_ohlc_frame()``, so allow # logic in ``get_ohlc_frame()``, so allow
# this frame through to the storage layer. # this frame through to the storage layer.
log.warning( log.warning(
f'there is an expected history gap of {diff}s:' f'Expected history gap of {diff}s:\n'
f'{earliest_frame_queue_dt} <- '
f'{earliest_end_dt}'
) )
elif ( elif (
erlangs > 1 erlangs > 1
and len(epochs) < erlangs
): ):
# we don't yet have the next frame to push # we don't yet have the next frame to push
# so break back to the async request loop # so break back to the async request loop
# while we wait for more async frame-results # while we wait for more async frame-results
# to arrive. # to arrive.
if len(frames) >= erlangs:
log.warning(
'Frame count in async-queue is greater '
'then erlangs?\n'
'There seems to be a gap between:\n'
f'{earliest_frame_queue_dt} <- '
f'{last_shm_prepend_dt}'
)
expect_end = pendulum.from_timestamp(start) expect_end = pendulum.from_timestamp(start)
expect_start = expect_end.subtract( expect_start = expect_end.subtract(
seconds=frame_size_s) seconds=frame_size_s)