diff --git a/piker/data/feed.py b/piker/data/feed.py index e6f4990e..8cb89d81 100644 --- a/piker/data/feed.py +++ b/piker/data/feed.py @@ -22,6 +22,7 @@ This module is enabled for ``brokerd`` daemons. """ from __future__ import annotations from dataclasses import dataclass, field +from datetime import datetime from contextlib import asynccontextmanager from functools import partial from types import ModuleType @@ -31,6 +32,7 @@ from typing import ( Awaitable, ) +import pendulum import trio from trio.abc import ReceiveChannel from trio_typing import TaskStatus @@ -59,6 +61,7 @@ from ._source import ( from ..ui import _search from ._sampling import ( sampler, + broadcast, increment_ohlc_buffer, iter_ohlc_periods, sample_and_broadcast, @@ -250,6 +253,8 @@ async def manage_history( open_history_client = getattr(mod, 'open_history_client', None) + bfqsn = fqsn.replace('.' + mod.name, '') + if is_up and opened and open_history_client: log.info('Found existing `marketstored`') @@ -281,45 +286,69 @@ async def manage_history( array, start_dt, end_dt, + last_tsdb_dt: Optional[datetime] = None ) -> np.ndarray: - s_diff = (last_tsdb_dt - start_dt).seconds + if last_tsdb_dt: + s_diff = (last_tsdb_dt - start_dt).seconds - # if we detect a partial frame's worth of data - # that is new, slice out only that history and - # write to shm. - if s_diff > 0: - assert last_tsdb_dt > start_dt - selected = array['time'] > last_tsdb_dt.timestamp() - to_push = array[selected] - log.info( - f'Pushing partial frame {to_push.size} to shm' - ) - return to_push + # if we detect a partial frame's worth of data + # that is new, slice out only that history and + # write to shm. + if s_diff > 0: + assert last_tsdb_dt > start_dt + selected = array['time'] > last_tsdb_dt.timestamp() + to_push = array[selected] + log.info( + f'Pushing partial frame {to_push.size} to shm' + ) + return to_push - else: - return array + return array # start history anal and load missing new data via backend. - async with open_history_client(fqsn) as hist: + + broker, symbol, expiry = unpack_fqsn(fqsn) + + async with open_history_client(bfqsn) as hist: # get latest query's worth of history all the way # back to what is recorded in the tsdb - array, start_dt, end_dt = await hist(end_dt='') - to_push = diff_history(array, start_dt, end_dt) + array, start_dt, end_dt = await hist(end_dt=None) + to_push = diff_history( + array, + start_dt, + end_dt, + last_tsdb_dt=last_tsdb_dt, + ) + log.info(f'Pushing {to_push.size} to shm!') shm.push(to_push) + for delay_s in sampler.subscribers: + await broadcast(delay_s) + # let caller unblock and deliver latest history frame task_status.started(shm) some_data_ready.set() # pull new history frames until we hit latest # already in the tsdb - while start_dt > last_tsdb_dt: + # while start_dt > last_tsdb_dt: + while True: array, start_dt, end_dt = await hist(end_dt=start_dt) - to_push = diff_history(array, start_dt, end_dt) + to_push = diff_history( + array, + start_dt, + end_dt, + # last_tsdb_dt=last_tsdb_dt, + # just run indefinitely + last_tsdb_dt=None, + ) + log.info(f'Pushing {to_push.size} to shm!') shm.push(to_push, prepend=True) + for delay_s in sampler.subscribers: + await broadcast(delay_s) # TODO: see if there's faster multi-field reads: # https://numpy.org/doc/stable/user/basics.rec.html#accessing-multiple-fields @@ -348,7 +377,6 @@ async def manage_history( # do a legacy incremental backfill from the provider. log.info('No existing `marketstored` found..') - bfqsn = fqsn.replace('.' + mod.name, '') # start history backfill task ``backfill_bars()`` is # a required backend func this must block until shm is # filled with first set of ohlc bars