A bit hacky but, broadcast index streams on each history prepend

l1_precision_fix
Tyler Goodlet 2022-04-16 18:34:22 -04:00
parent 7d8cf3eaf8
commit bcf3be1fe4
1 changed files with 48 additions and 20 deletions

View File

@ -22,6 +22,7 @@ This module is enabled for ``brokerd`` daemons.
"""
from __future__ import annotations
from dataclasses import dataclass, field
from datetime import datetime
from contextlib import asynccontextmanager
from functools import partial
from types import ModuleType
@ -31,6 +32,7 @@ from typing import (
Awaitable,
)
import pendulum
import trio
from trio.abc import ReceiveChannel
from trio_typing import TaskStatus
@ -59,6 +61,7 @@ from ._source import (
from ..ui import _search
from ._sampling import (
sampler,
broadcast,
increment_ohlc_buffer,
iter_ohlc_periods,
sample_and_broadcast,
@ -250,6 +253,8 @@ async def manage_history(
open_history_client = getattr(mod, 'open_history_client', None)
bfqsn = fqsn.replace('.' + mod.name, '')
if is_up and opened and open_history_client:
log.info('Found existing `marketstored`')
@ -281,45 +286,69 @@ async def manage_history(
array,
start_dt,
end_dt,
last_tsdb_dt: Optional[datetime] = None
) -> np.ndarray:
s_diff = (last_tsdb_dt - start_dt).seconds
if last_tsdb_dt:
s_diff = (last_tsdb_dt - start_dt).seconds
# if we detect a partial frame's worth of data
# that is new, slice out only that history and
# write to shm.
if s_diff > 0:
assert last_tsdb_dt > start_dt
selected = array['time'] > last_tsdb_dt.timestamp()
to_push = array[selected]
log.info(
f'Pushing partial frame {to_push.size} to shm'
)
return to_push
# if we detect a partial frame's worth of data
# that is new, slice out only that history and
# write to shm.
if s_diff > 0:
assert last_tsdb_dt > start_dt
selected = array['time'] > last_tsdb_dt.timestamp()
to_push = array[selected]
log.info(
f'Pushing partial frame {to_push.size} to shm'
)
return to_push
else:
return array
return array
# start history anal and load missing new data via backend.
async with open_history_client(fqsn) as hist:
broker, symbol, expiry = unpack_fqsn(fqsn)
async with open_history_client(bfqsn) as hist:
# get latest query's worth of history all the way
# back to what is recorded in the tsdb
array, start_dt, end_dt = await hist(end_dt='')
to_push = diff_history(array, start_dt, end_dt)
array, start_dt, end_dt = await hist(end_dt=None)
to_push = diff_history(
array,
start_dt,
end_dt,
last_tsdb_dt=last_tsdb_dt,
)
log.info(f'Pushing {to_push.size} to shm!')
shm.push(to_push)
for delay_s in sampler.subscribers:
await broadcast(delay_s)
# let caller unblock and deliver latest history frame
task_status.started(shm)
some_data_ready.set()
# pull new history frames until we hit latest
# already in the tsdb
while start_dt > last_tsdb_dt:
# while start_dt > last_tsdb_dt:
while True:
array, start_dt, end_dt = await hist(end_dt=start_dt)
to_push = diff_history(array, start_dt, end_dt)
to_push = diff_history(
array,
start_dt,
end_dt,
# last_tsdb_dt=last_tsdb_dt,
# just run indefinitely
last_tsdb_dt=None,
)
log.info(f'Pushing {to_push.size} to shm!')
shm.push(to_push, prepend=True)
for delay_s in sampler.subscribers:
await broadcast(delay_s)
# TODO: see if there's faster multi-field reads:
# https://numpy.org/doc/stable/user/basics.rec.html#accessing-multiple-fields
@ -348,7 +377,6 @@ async def manage_history(
# do a legacy incremental backfill from the provider.
log.info('No existing `marketstored` found..')
bfqsn = fqsn.replace('.' + mod.name, '')
# start history backfill task ``backfill_bars()`` is
# a required backend func this must block until shm is
# filled with first set of ohlc bars