diff --git a/piker/data/feed.py b/piker/data/feed.py index 9ab98600..6cbfbebc 100644 --- a/piker/data/feed.py +++ b/piker/data/feed.py @@ -36,6 +36,7 @@ from trio.abc import ReceiveChannel from trio_typing import TaskStatus import tractor from pydantic import BaseModel +import numpy as np from ..brokers import get_brokermod from .._cacheables import maybe_open_context @@ -276,13 +277,38 @@ async def manage_history( # TODO: this should be used verbatim for the pure # shm backfiller approach below. + def diff_history( + array, + start_dt, + end_dt, + + ) -> np.ndarray: + + s_diff = (last_tsdb_dt - start_dt).seconds + + # if we detect a partial frame's worth of data + # that is new, slice out only that history and + # write to shm. + if s_diff > 0: + assert last_tsdb_dt > start_dt + selected = array['time'] > last_tsdb_dt.timestamp() + to_push = array[selected] + log.info( + f'Pushing partial frame {to_push.size} to shm' + ) + return to_push + + else: + return array + # start history anal and load missing new data via backend. async with open_history_client(fqsn) as hist: # get latest query's worth of history all the way # back to what is recorded in the tsdb array, start_dt, end_dt = await hist(end_dt='') - shm.push(array) + to_push = diff_history(array, start_dt, end_dt) + shm.push(to_push) # let caller unblock and deliver latest history frame task_status.started(shm) @@ -291,33 +317,13 @@ async def manage_history( # pull new history frames until we hit latest # already in the tsdb while start_dt > last_tsdb_dt: - array, start_dt, end_dt = await hist(end_dt=start_dt) - s_diff = (last_tsdb_dt - start_dt).seconds - - # if we detect a partial frame's worth of data - # that is new, slice out only that history and - # write to shm. - if s_diff > 0: - assert last_tsdb_dt > start_dt - selected = array['time'] > last_tsdb_dt.timestamp() - to_push = array[selected] - log.info( - f'Pushing partial frame {to_push.size} to shm' - ) - shm.push(to_push, prepend=True) - break - - else: - # write to shm - log.info(f'Pushing {array.size} datums to shm') - shm.push(array, prepend=True) + to_push = diff_history(array, start_dt, end_dt) + shm.push(to_push, prepend=True) # TODO: see if there's faster multi-field reads: # https://numpy.org/doc/stable/user/basics.rec.html#accessing-multiple-fields # re-index with a `time` and index field - # await tractor.breakpoint() - shm.push( fastest[-shm._first.value:],