Get sync-to-marketstore-tsdb history retrieval workinnn

l1_precision_fix
Tyler Goodlet 2022-03-30 14:11:21 -04:00
parent 53ad5e6f65
commit ce3229df7d
1 changed files with 29 additions and 23 deletions

View File

@ -36,6 +36,7 @@ from trio.abc import ReceiveChannel
from trio_typing import TaskStatus
import tractor
from pydantic import BaseModel
import numpy as np
from ..brokers import get_brokermod
from .._cacheables import maybe_open_context
@ -276,13 +277,38 @@ async def manage_history(
# TODO: this should be used verbatim for the pure
# shm backfiller approach below.
def diff_history(
array,
start_dt,
end_dt,
) -> np.ndarray:
s_diff = (last_tsdb_dt - start_dt).seconds
# if we detect a partial frame's worth of data
# that is new, slice out only that history and
# write to shm.
if s_diff > 0:
assert last_tsdb_dt > start_dt
selected = array['time'] > last_tsdb_dt.timestamp()
to_push = array[selected]
log.info(
f'Pushing partial frame {to_push.size} to shm'
)
return to_push
else:
return array
# start history anal and load missing new data via backend.
async with open_history_client(fqsn) as hist:
# get latest query's worth of history all the way
# back to what is recorded in the tsdb
array, start_dt, end_dt = await hist(end_dt='')
shm.push(array)
to_push = diff_history(array, start_dt, end_dt)
shm.push(to_push)
# let caller unblock and deliver latest history frame
task_status.started(shm)
@ -291,33 +317,13 @@ async def manage_history(
# pull new history frames until we hit latest
# already in the tsdb
while start_dt > last_tsdb_dt:
array, start_dt, end_dt = await hist(end_dt=start_dt)
s_diff = (last_tsdb_dt - start_dt).seconds
# if we detect a partial frame's worth of data
# that is new, slice out only that history and
# write to shm.
if s_diff > 0:
assert last_tsdb_dt > start_dt
selected = array['time'] > last_tsdb_dt.timestamp()
to_push = array[selected]
log.info(
f'Pushing partial frame {to_push.size} to shm'
)
shm.push(to_push, prepend=True)
break
else:
# write to shm
log.info(f'Pushing {array.size} datums to shm')
shm.push(array, prepend=True)
to_push = diff_history(array, start_dt, end_dt)
shm.push(to_push, prepend=True)
# TODO: see if there's faster multi-field reads:
# https://numpy.org/doc/stable/user/basics.rec.html#accessing-multiple-fields
# re-index with a `time` and index field
# await tractor.breakpoint()
shm.push(
fastest[-shm._first.value:],