Only accept 6 tries for the same duplicate hist frame

When we see multiple history frames that are duplicate to the request
set, bail re-trying after a number of tries (6 just cuz) and return
early from the tsdb backfill loop; presume that this many duplicates
means we've hit the beginning of history. Use a `collections.Counter`
for the duplicate counts. Make sure and warn log in such cases.
epoch_index_backup
Tyler Goodlet 2022-12-21 17:30:08 -05:00
parent 7faca820bd
commit 9f37b33167
1 changed files with 27 additions and 10 deletions

View File

@ -21,7 +21,10 @@ This module is enabled for ``brokerd`` daemons.
""" """
from __future__ import annotations from __future__ import annotations
from collections import defaultdict from collections import (
defaultdict,
Counter,
)
from contextlib import asynccontextmanager as acm from contextlib import asynccontextmanager as acm
from datetime import datetime from datetime import datetime
from functools import partial from functools import partial
@ -374,8 +377,9 @@ async def start_backfill(
# erlangs = config.get('erlangs', 1) # erlangs = config.get('erlangs', 1)
# avoid duplicate history frames with a set of datetime frame # avoid duplicate history frames with a set of datetime frame
# starts. # starts and associated counts of how many duplicates we see
starts: set[datetime] = set() # per time stamp.
starts: Counter[datetime] = Counter()
# inline sequential loop where we simply pass the # inline sequential loop where we simply pass the
# last retrieved start dt to the next request as # last retrieved start dt to the next request as
@ -403,14 +407,23 @@ async def start_backfill(
# request loop until the condition is resolved? # request loop until the condition is resolved?
return return
if next_start_dt in starts: if (
next_start_dt in starts
and starts[next_start_dt] <= 6
):
start_dt = min(starts) start_dt = min(starts)
print(f"SKIPPING DUPLICATE FRAME @ {next_start_dt}") print(f"SKIPPING DUPLICATE FRAME @ {next_start_dt}")
continue continue
elif starts[next_start_dt] > 6:
log.warning(
f'NO-MORE-DATA: backend {mod.name} before {next_start_dt}?'
)
return
# only update new start point if not-yet-seen # only update new start point if not-yet-seen
start_dt = next_start_dt start_dt = next_start_dt
starts.add(start_dt) starts[start_dt] += 1
assert array['time'][0] == start_dt.timestamp() assert array['time'][0] == start_dt.timestamp()
@ -656,10 +669,10 @@ async def tsdb_backfill(
# Load TSDB history into shm buffer (for display) if there is # Load TSDB history into shm buffer (for display) if there is
# remaining buffer space. # remaining buffer space.
if ( if (
len(tsdb_history) len(tsdb_history)
): ):
# load the first (smaller) bit of history originally loaded # load the first (smaller) bit of history originally loaded
# above from ``Storage.load()``. # above from ``Storage.load()``.
to_push = tsdb_history[-prepend_start:] to_push = tsdb_history[-prepend_start:]
@ -682,14 +695,12 @@ async def tsdb_backfill(
# load as much from storage into shm possible (depends on # load as much from storage into shm possible (depends on
# user's shm size settings). # user's shm size settings).
while ( while shm._first.value > 0:
shm._first.value > 0
):
tsdb_history = await storage.read_ohlcv( tsdb_history = await storage.read_ohlcv(
fqsn, fqsn,
end=tsdb_last_frame_start,
timeframe=timeframe, timeframe=timeframe,
end=tsdb_last_frame_start,
) )
# empty query # empty query
@ -930,6 +941,8 @@ async def allocate_persistent_feed(
some_data_ready = trio.Event() some_data_ready = trio.Event()
feed_is_live = trio.Event() feed_is_live = trio.Event()
symstr = symstr.lower()
# establish broker backend quote stream by calling # establish broker backend quote stream by calling
# ``stream_quotes()``, which is a required broker backend endpoint. # ``stream_quotes()``, which is a required broker backend endpoint.
init_msg, first_quote = await bus.nursery.start( init_msg, first_quote = await bus.nursery.start(
@ -1129,6 +1142,10 @@ async def open_feed_bus(
flumes: dict[str, Flume] = {} flumes: dict[str, Flume] = {}
for symbol in symbols: for symbol in symbols:
# we always use lower case keys internally
symbol = symbol.lower()
# if no cached feed for this symbol has been created for this # if no cached feed for this symbol has been created for this
# brokerd yet, start persistent stream and shm writer task in # brokerd yet, start persistent stream and shm writer task in
# service nursery # service nursery