A bit hacky but, broadcast index streams on each history prepend

m4_corrections
Tyler Goodlet 2022-04-16 18:34:22 -04:00
parent 25be7f8d08
commit 72083eae17
1 changed files with 48 additions and 20 deletions

View File

@ -22,6 +22,7 @@ This module is enabled for ``brokerd`` daemons.
""" """
from __future__ import annotations from __future__ import annotations
from dataclasses import dataclass, field from dataclasses import dataclass, field
from datetime import datetime
from contextlib import asynccontextmanager from contextlib import asynccontextmanager
from functools import partial from functools import partial
from types import ModuleType from types import ModuleType
@ -31,6 +32,7 @@ from typing import (
Awaitable, Awaitable,
) )
import pendulum
import trio import trio
from trio.abc import ReceiveChannel from trio.abc import ReceiveChannel
from trio_typing import TaskStatus from trio_typing import TaskStatus
@ -59,6 +61,7 @@ from ._source import (
from ..ui import _search from ..ui import _search
from ._sampling import ( from ._sampling import (
sampler, sampler,
broadcast,
increment_ohlc_buffer, increment_ohlc_buffer,
iter_ohlc_periods, iter_ohlc_periods,
sample_and_broadcast, sample_and_broadcast,
@ -250,6 +253,8 @@ async def manage_history(
open_history_client = getattr(mod, 'open_history_client', None) open_history_client = getattr(mod, 'open_history_client', None)
bfqsn = fqsn.replace('.' + mod.name, '')
if is_up and opened and open_history_client: if is_up and opened and open_history_client:
log.info('Found existing `marketstored`') log.info('Found existing `marketstored`')
@ -281,9 +286,11 @@ async def manage_history(
array, array,
start_dt, start_dt,
end_dt, end_dt,
last_tsdb_dt: Optional[datetime] = None
) -> np.ndarray: ) -> np.ndarray:
if last_tsdb_dt:
s_diff = (last_tsdb_dt - start_dt).seconds s_diff = (last_tsdb_dt - start_dt).seconds
# if we detect a partial frame's worth of data # if we detect a partial frame's worth of data
@ -298,28 +305,50 @@ async def manage_history(
) )
return to_push return to_push
else:
return array return array
# start history anal and load missing new data via backend. # start history anal and load missing new data via backend.
async with open_history_client(fqsn) as hist:
broker, symbol, expiry = unpack_fqsn(fqsn)
async with open_history_client(bfqsn) as hist:
# get latest query's worth of history all the way # get latest query's worth of history all the way
# back to what is recorded in the tsdb # back to what is recorded in the tsdb
array, start_dt, end_dt = await hist(end_dt='') array, start_dt, end_dt = await hist(end_dt=None)
to_push = diff_history(array, start_dt, end_dt) to_push = diff_history(
array,
start_dt,
end_dt,
last_tsdb_dt=last_tsdb_dt,
)
log.info(f'Pushing {to_push.size} to shm!')
shm.push(to_push) shm.push(to_push)
for delay_s in sampler.subscribers:
await broadcast(delay_s)
# let caller unblock and deliver latest history frame # let caller unblock and deliver latest history frame
task_status.started(shm) task_status.started(shm)
some_data_ready.set() some_data_ready.set()
# pull new history frames until we hit latest # pull new history frames until we hit latest
# already in the tsdb # already in the tsdb
while start_dt > last_tsdb_dt: # while start_dt > last_tsdb_dt:
while True:
array, start_dt, end_dt = await hist(end_dt=start_dt) array, start_dt, end_dt = await hist(end_dt=start_dt)
to_push = diff_history(array, start_dt, end_dt) to_push = diff_history(
array,
start_dt,
end_dt,
# last_tsdb_dt=last_tsdb_dt,
# just run indefinitely
last_tsdb_dt=None,
)
log.info(f'Pushing {to_push.size} to shm!')
shm.push(to_push, prepend=True) shm.push(to_push, prepend=True)
for delay_s in sampler.subscribers:
await broadcast(delay_s)
# TODO: see if there's faster multi-field reads: # TODO: see if there's faster multi-field reads:
# https://numpy.org/doc/stable/user/basics.rec.html#accessing-multiple-fields # https://numpy.org/doc/stable/user/basics.rec.html#accessing-multiple-fields
@ -348,7 +377,6 @@ async def manage_history(
# do a legacy incremental backfill from the provider. # do a legacy incremental backfill from the provider.
log.info('No existing `marketstored` found..') log.info('No existing `marketstored` found..')
bfqsn = fqsn.replace('.' + mod.name, '')
# start history backfill task ``backfill_bars()`` is # start history backfill task ``backfill_bars()`` is
# a required backend func this must block until shm is # a required backend func this must block until shm is
# filled with first set of ohlc bars # filled with first set of ohlc bars