Only accept 6 tries for the same duplicate hist frame
When we see multiple history frames that are duplicate to the request set, bail re-trying after a number of tries (6 just cuz) and return early from the tsdb backfill loop; presume that this many duplicates means we've hit the beginning of history. Use a `collections.Counter` for the duplicate counts. Make sure and warn log in such cases.samplerd_service
parent
b0a6dd46e4
commit
daf7b3f4a5
|
@ -21,7 +21,10 @@ This module is enabled for ``brokerd`` daemons.
|
||||||
|
|
||||||
"""
|
"""
|
||||||
from __future__ import annotations
|
from __future__ import annotations
|
||||||
from collections import defaultdict
|
from collections import (
|
||||||
|
defaultdict,
|
||||||
|
Counter,
|
||||||
|
)
|
||||||
from contextlib import asynccontextmanager as acm
|
from contextlib import asynccontextmanager as acm
|
||||||
from datetime import datetime
|
from datetime import datetime
|
||||||
from functools import partial
|
from functools import partial
|
||||||
|
@ -374,8 +377,9 @@ async def start_backfill(
|
||||||
# erlangs = config.get('erlangs', 1)
|
# erlangs = config.get('erlangs', 1)
|
||||||
|
|
||||||
# avoid duplicate history frames with a set of datetime frame
|
# avoid duplicate history frames with a set of datetime frame
|
||||||
# starts.
|
# starts and associated counts of how many duplicates we see
|
||||||
starts: set[datetime] = set()
|
# per time stamp.
|
||||||
|
starts: Counter[datetime] = Counter()
|
||||||
|
|
||||||
# inline sequential loop where we simply pass the
|
# inline sequential loop where we simply pass the
|
||||||
# last retrieved start dt to the next request as
|
# last retrieved start dt to the next request as
|
||||||
|
@ -403,14 +407,24 @@ async def start_backfill(
|
||||||
# request loop until the condition is resolved?
|
# request loop until the condition is resolved?
|
||||||
return
|
return
|
||||||
|
|
||||||
if next_start_dt in starts:
|
if (
|
||||||
|
next_start_dt in starts
|
||||||
|
and starts[next_start_dt] <= 6
|
||||||
|
):
|
||||||
start_dt = min(starts)
|
start_dt = min(starts)
|
||||||
print(f"SKIPPING DUPLICATE FRAME @ {next_start_dt}")
|
print(f"SKIPPING DUPLICATE FRAME @ {next_start_dt}")
|
||||||
|
starts[start_dt] += 1
|
||||||
continue
|
continue
|
||||||
|
|
||||||
|
elif starts[next_start_dt] > 6:
|
||||||
|
log.warning(
|
||||||
|
f'NO-MORE-DATA: backend {mod.name} before {next_start_dt}?'
|
||||||
|
)
|
||||||
|
return
|
||||||
|
|
||||||
# only update new start point if not-yet-seen
|
# only update new start point if not-yet-seen
|
||||||
start_dt = next_start_dt
|
start_dt = next_start_dt
|
||||||
starts.add(start_dt)
|
starts[start_dt] += 1
|
||||||
|
|
||||||
assert array['time'][0] == start_dt.timestamp()
|
assert array['time'][0] == start_dt.timestamp()
|
||||||
|
|
||||||
|
@ -656,10 +670,10 @@ async def tsdb_backfill(
|
||||||
|
|
||||||
# Load TSDB history into shm buffer (for display) if there is
|
# Load TSDB history into shm buffer (for display) if there is
|
||||||
# remaining buffer space.
|
# remaining buffer space.
|
||||||
|
|
||||||
if (
|
if (
|
||||||
len(tsdb_history)
|
len(tsdb_history)
|
||||||
):
|
):
|
||||||
|
|
||||||
# load the first (smaller) bit of history originally loaded
|
# load the first (smaller) bit of history originally loaded
|
||||||
# above from ``Storage.load()``.
|
# above from ``Storage.load()``.
|
||||||
to_push = tsdb_history[-prepend_start:]
|
to_push = tsdb_history[-prepend_start:]
|
||||||
|
@ -682,14 +696,12 @@ async def tsdb_backfill(
|
||||||
|
|
||||||
# load as much from storage into shm possible (depends on
|
# load as much from storage into shm possible (depends on
|
||||||
# user's shm size settings).
|
# user's shm size settings).
|
||||||
while (
|
while shm._first.value > 0:
|
||||||
shm._first.value > 0
|
|
||||||
):
|
|
||||||
|
|
||||||
tsdb_history = await storage.read_ohlcv(
|
tsdb_history = await storage.read_ohlcv(
|
||||||
fqsn,
|
fqsn,
|
||||||
end=tsdb_last_frame_start,
|
|
||||||
timeframe=timeframe,
|
timeframe=timeframe,
|
||||||
|
end=tsdb_last_frame_start,
|
||||||
)
|
)
|
||||||
|
|
||||||
# empty query
|
# empty query
|
||||||
|
@ -930,6 +942,8 @@ async def allocate_persistent_feed(
|
||||||
some_data_ready = trio.Event()
|
some_data_ready = trio.Event()
|
||||||
feed_is_live = trio.Event()
|
feed_is_live = trio.Event()
|
||||||
|
|
||||||
|
symstr = symstr.lower()
|
||||||
|
|
||||||
# establish broker backend quote stream by calling
|
# establish broker backend quote stream by calling
|
||||||
# ``stream_quotes()``, which is a required broker backend endpoint.
|
# ``stream_quotes()``, which is a required broker backend endpoint.
|
||||||
init_msg, first_quote = await bus.nursery.start(
|
init_msg, first_quote = await bus.nursery.start(
|
||||||
|
@ -1130,6 +1144,10 @@ async def open_feed_bus(
|
||||||
flumes: dict[str, Flume] = {}
|
flumes: dict[str, Flume] = {}
|
||||||
|
|
||||||
for symbol in symbols:
|
for symbol in symbols:
|
||||||
|
|
||||||
|
# we always use lower case keys internally
|
||||||
|
symbol = symbol.lower()
|
||||||
|
|
||||||
# if no cached feed for this symbol has been created for this
|
# if no cached feed for this symbol has been created for this
|
||||||
# brokerd yet, start persistent stream and shm writer task in
|
# brokerd yet, start persistent stream and shm writer task in
|
||||||
# service nursery
|
# service nursery
|
||||||
|
|
Loading…
Reference in New Issue