diff --git a/piker/data/feed.py b/piker/data/feed.py index aa2a6bad..744d301f 100644 --- a/piker/data/feed.py +++ b/piker/data/feed.py @@ -21,7 +21,10 @@ This module is enabled for ``brokerd`` daemons. """ from __future__ import annotations -from collections import defaultdict +from collections import ( + defaultdict, + Counter, +) from contextlib import asynccontextmanager as acm from datetime import datetime from functools import partial @@ -374,8 +377,9 @@ async def start_backfill( # erlangs = config.get('erlangs', 1) # avoid duplicate history frames with a set of datetime frame - # starts. - starts: set[datetime] = set() + # starts and associated counts of how many duplicates we see + # per time stamp. + starts: Counter[datetime] = Counter() # inline sequential loop where we simply pass the # last retrieved start dt to the next request as @@ -403,14 +407,24 @@ async def start_backfill( # request loop until the condition is resolved? return - if next_start_dt in starts: + if ( + next_start_dt in starts + and starts[next_start_dt] <= 6 + ): start_dt = min(starts) print(f"SKIPPING DUPLICATE FRAME @ {next_start_dt}") + starts[start_dt] += 1 continue + elif starts[next_start_dt] > 6: + log.warning( + f'NO-MORE-DATA: backend {mod.name} before {next_start_dt}?' + ) + return + # only update new start point if not-yet-seen start_dt = next_start_dt - starts.add(start_dt) + starts[start_dt] += 1 assert array['time'][0] == start_dt.timestamp() @@ -656,10 +670,10 @@ async def tsdb_backfill( # Load TSDB history into shm buffer (for display) if there is # remaining buffer space. + if ( len(tsdb_history) ): - # load the first (smaller) bit of history originally loaded # above from ``Storage.load()``. to_push = tsdb_history[-prepend_start:] @@ -682,14 +696,12 @@ async def tsdb_backfill( # load as much from storage into shm possible (depends on # user's shm size settings). - while ( - shm._first.value > 0 - ): + while shm._first.value > 0: tsdb_history = await storage.read_ohlcv( fqsn, - end=tsdb_last_frame_start, timeframe=timeframe, + end=tsdb_last_frame_start, ) # empty query @@ -930,6 +942,8 @@ async def allocate_persistent_feed( some_data_ready = trio.Event() feed_is_live = trio.Event() + symstr = symstr.lower() + # establish broker backend quote stream by calling # ``stream_quotes()``, which is a required broker backend endpoint. init_msg, first_quote = await bus.nursery.start( @@ -1130,6 +1144,10 @@ async def open_feed_bus( flumes: dict[str, Flume] = {} for symbol in symbols: + + # we always use lower case keys internally + symbol = symbol.lower() + # if no cached feed for this symbol has been created for this # brokerd yet, start persistent stream and shm writer task in # service nursery