diff --git a/piker/data/feed.py b/piker/data/feed.py index 09dce25e..82b7b59b 100644 --- a/piker/data/feed.py +++ b/piker/data/feed.py @@ -220,40 +220,8 @@ def diff_history( if last_tsdb_dt is None: return array - time_attr = { - 1: 'seconds', - 60: 'minutes', - }[timeframe] - i_diff = getattr((end_dt - last_tsdb_dt), time_attr) - - # if we detect a partial frame's worth of data - # that is new, slice out only that history and - # write to shm. - if i_diff < 0: - # empty case since tsdb already has this history - return array[:0] - - ln = len(array) - to_push = array - - if i_diff < ln: - # slice out missing history from end of frame - to_push = array[ln - i_diff:ln] - - # XXX: OLD GAP HANDLING..if there's a gap in a partial frame - # worth. we don't need this any more with the timeframe aware - # diffing above right? - # else: - # # pass back only the portion of the array that is - # # greater then the last time stamp in the tsdb. - # time = array['time'] - # to_push = array[time >= last_tsdb_dt.timestamp()] - - log.debug( - f'Pushing partial frame {to_push.size} to shm' - ) - - return to_push + time = array['time'] + return array[time > last_tsdb_dt.timestamp()] async def start_backfill( @@ -375,8 +343,6 @@ async def start_backfill( timeframe, end_dt=start_dt, ) - # if timeframe == 1: - # await tractor.breakpoint() # broker says there never was or is no more history to pull except DataUnavailable: @@ -441,7 +407,6 @@ async def start_backfill( ) # can't push the entire frame? so # push only the amount that can fit.. - await tractor.breakpoint() break log.info( @@ -614,7 +579,11 @@ async def tsdb_backfill( # https://numpy.org/doc/stable/user/basics.rec.html#accessing-multiple-fields # re-index with a `time` and index field prepend_start = shm._first.value - shm_last_dt = pendulum.from_timestamp(shm.array[0]['time']) + array = shm.array + if len(array): + shm_last_dt = pendulum.from_timestamp(shm.array[0]['time']) + else: + shm_last_dt = None if last_tsdb_dt: assert shm_last_dt >= last_tsdb_dt @@ -629,27 +598,24 @@ async def tsdb_backfill( backfilled_size_s = ( latest_start_dt - last_tsdb_dt ).seconds - else: - backfilled_size_s = ( - latest_start_dt - shm_last_dt - ).seconds + # if the shm buffer len is not large enough to contain + # all missing data between the most recent backend-queried frame + # and the most recent dt-index in the db we warn that we only + # want to load a portion of the next tsdb query to fill that + # space. + log.info( + f'{backfilled_size_s} seconds worth of {timeframe}s loaded' + ) # Load TSDB history into shm buffer (for display) if there is # remaining buffer space. - - # if the shm buffer len is not large enough to contain - # all missing data between the most recent backend-queried frame - # and the most recent dt-index in the db we warn that we only - # want to load a portion of the next tsdb query to fill that - # space. - log.info( - f'{backfilled_size_s} seconds worth of {timeframe}s data loaded' - ) - if ( len(tsdb_history) ): - to_push = tsdb_history[:prepend_start] + + # load the first (smaller) bit of history originally loaded + # above from ``Storage.load()``. + to_push = tsdb_history[-prepend_start:] shm.push( to_push, @@ -660,37 +626,30 @@ async def tsdb_backfill( # start=prepend_start, field_map=marketstore.ohlc_key_map, ) - prepend_start = shm._first.value - - # load as much from storage into shm as space will - # allow according to user's shm size settings. - last_frame_start = tsdb_history['Epoch'][0] while ( shm._first.value > 0 ): + # load as much from storage into shm as space will + # allow according to user's shm size settings. + tsdb_last_frame_start = tsdb_history['Epoch'][0] + tsdb_history = await storage.read_ohlcv( fqsn, - end=last_frame_start, + end=tsdb_last_frame_start, timeframe=timeframe, ) if ( - not len(tsdb_history) + not len(tsdb_history) # empty query + + # no earlier data detected + or tsdb_history['Epoch'][0] >= tsdb_last_frame_start + ): - # on empty db history - break - - time = tsdb_history['Epoch'] - frame_start = time[0] - frame_end = time[0] - print(f"LOADING MKTS HISTORY: {frame_start} - {frame_end}") - - if frame_start >= last_frame_start: - # no new data loaded was from tsdb, so we can exit. break prepend_start = shm._first.value - to_push = tsdb_history[:prepend_start] + to_push = tsdb_history[-prepend_start:] # insert the history pre a "days worth" of samples # to leave some real-time buffer space at the end. @@ -699,8 +658,6 @@ async def tsdb_backfill( prepend=True, field_map=marketstore.ohlc_key_map, ) - last_frame_start = frame_start - log.info(f'Loaded {to_push.shape} datums from storage') # manually trigger step update to update charts/fsps @@ -1248,6 +1205,11 @@ async def open_feed_bus( # ensure we are who we think we are servicename = tractor.current_actor().name assert 'brokerd' in servicename + + # XXX: figure this not crashing into debug! + # await tractor.breakpoint() + # assert 0 + assert brokername in servicename bus = get_feed_bus(brokername) @@ -1557,6 +1519,7 @@ async def open_feed( bus_ctxs.append( portal.open_context( open_feed_bus, + # brokername=brokermod.name, brokername=brokername, symbols=bfqsns, loglevel=loglevel,