Get sync-to-marketstore-tsdb history retrieval workinnn

mkts_backup
Tyler Goodlet 2022-03-30 14:11:21 -04:00
parent 54466db554
commit 25a3a123ec
1 changed files with 29 additions and 23 deletions

View File

@ -38,6 +38,7 @@ from trio.abc import ReceiveChannel
from trio_typing import TaskStatus from trio_typing import TaskStatus
import tractor import tractor
from pydantic import BaseModel from pydantic import BaseModel
import numpy as np
from ..brokers import get_brokermod from ..brokers import get_brokermod
from .._cacheables import maybe_open_context from .._cacheables import maybe_open_context
@ -278,23 +279,13 @@ async def manage_history(
# TODO: this should be used verbatim for the pure # TODO: this should be used verbatim for the pure
# shm backfiller approach below. # shm backfiller approach below.
# start history anal and load missing new data via backend. def diff_history(
async with open_history_client(fqsn) as hist: array,
start_dt,
end_dt,
# get latest query's worth of history all the way ) -> np.ndarray:
# back to what is recorded in the tsdb
array, start_dt, end_dt = await hist(end_dt='')
shm.push(array)
# let caller unblock and deliver latest history frame
task_status.started(shm)
some_data_ready.set()
# pull new history frames until we hit latest
# already in the tsdb
while start_dt > last_tsdb_dt:
array, start_dt, end_dt = await hist(end_dt=start_dt)
s_diff = (last_tsdb_dt - start_dt).seconds s_diff = (last_tsdb_dt - start_dt).seconds
# if we detect a partial frame's worth of data # if we detect a partial frame's worth of data
@ -307,19 +298,34 @@ async def manage_history(
log.info( log.info(
f'Pushing partial frame {to_push.size} to shm' f'Pushing partial frame {to_push.size} to shm'
) )
shm.push(to_push, prepend=True) return to_push
break
else: else:
# write to shm return array
log.info(f'Pushing {array.size} datums to shm')
shm.push(array, prepend=True) # start history anal and load missing new data via backend.
async with open_history_client(fqsn) as hist:
# get latest query's worth of history all the way
# back to what is recorded in the tsdb
array, start_dt, end_dt = await hist(end_dt='')
to_push = diff_history(array, start_dt, end_dt)
shm.push(to_push)
# let caller unblock and deliver latest history frame
task_status.started(shm)
some_data_ready.set()
# pull new history frames until we hit latest
# already in the tsdb
while start_dt > last_tsdb_dt:
array, start_dt, end_dt = await hist(end_dt=start_dt)
to_push = diff_history(array, start_dt, end_dt)
shm.push(to_push, prepend=True)
# TODO: see if there's faster multi-field reads: # TODO: see if there's faster multi-field reads:
# https://numpy.org/doc/stable/user/basics.rec.html#accessing-multiple-fields # https://numpy.org/doc/stable/user/basics.rec.html#accessing-multiple-fields
# re-index with a `time` and index field # re-index with a `time` and index field
# await tractor.breakpoint()
shm.push( shm.push(
fastest[-shm._first.value:], fastest[-shm._first.value:],