diff --git a/piker/data/feed.py b/piker/data/feed.py index ff8a543a..5be7e3b7 100644 --- a/piker/data/feed.py +++ b/piker/data/feed.py @@ -22,7 +22,6 @@ This module is enabled for ``brokerd`` daemons. """ from __future__ import annotations from dataclasses import dataclass, field -from datetime import datetime from contextlib import asynccontextmanager from functools import partial from types import ModuleType @@ -32,6 +31,8 @@ from typing import ( Awaitable, ) + +import pendulum import trio from trio.abc import ReceiveChannel from trio_typing import TaskStatus @@ -49,7 +50,6 @@ from ._sharedmem import ( maybe_open_shm_array, attach_shm_array, ShmArray, - _secs_in_day, ) from .ingest import get_ingestormod from ._source import ( @@ -236,119 +236,137 @@ async def manage_history( # we expect the sub-actor to write readonly=False, ) + # TODO: history validation + if not opened: + raise RuntimeError( + "Persistent shm for sym was already open?!" + ) log.info('Scanning for existing `marketstored`') is_up = await check_for_service('marketstored') # for now only do backfilling if no tsdb can be found - do_backfill = not is_up and opened + do_legacy_backfill = not is_up and opened + + open_history_client = getattr(mod, 'open_history_client', None) + + if is_up and opened and open_history_client: - if is_up and opened: log.info('Found existing `marketstored`') from . import marketstore async with marketstore.open_storage_client( fqsn, - ) as (storage, tsdb_arrays): + ) as storage: - # TODO: get this shit workin - from tractor.trionics import ipython_embed - await ipython_embed() - # await ipython_embed(ns=locals()) + tsdb_arrays = await storage.read_ohlcv(fqsn) - # TODO: history validation - # assert opened, f'Persistent shm for {symbol} was already open?!' - # if not opened: - # raise RuntimeError( - # "Persistent shm for sym was already open?!" - # ) + if not tsdb_arrays: + do_legacy_backfill = True - if tsdb_arrays: + else: log.info(f'Loaded tsdb history {tsdb_arrays}') - fastest = list(tsdb_arrays[fqsn].values())[0] - last_s = fastest['Epoch'][-1] - # TODO: see if there's faster multi-field reads: - # https://numpy.org/doc/stable/user/basics.rec.html#accessing-multiple-fields - - # re-index with a `time` and index field - shm.push( - fastest[-3 * _secs_in_day:], - - # insert the history pre a "days worth" of samples - # to leave some real-time buffer space at the end. - prepend=True, - start=shm._len - _secs_in_day, - field_map={ - 'Epoch': 'time', - 'Open': 'open', - 'High': 'high', - 'Low': 'low', - 'Close': 'close', - 'Volume': 'volume', - }, + fastest = list(tsdb_arrays.values())[0] + times = fastest['Epoch'] + first, last = times[0], times[-1] + first_tsdb_dt, last_tsdb_dt = map( + pendulum.from_timestamp, [first, last] ) + # TODO: this should be used verbatim for the pure + # shm backfiller approach below. + # start history anal and load missing new data via backend. - async with mod.open_history_client(fqsn) as hist: + async with open_history_client(fqsn) as hist: - # get latest query's worth of history - array, next_dt = await hist(end_dt='') + # get latest query's worth of history all the way + # back to what is recorded in the tsdb + array, start_dt, end_dt = await hist(end_dt='') + shm.push(array) - last_dt = datetime.fromtimestamp(last_s) - array, next_dt = await hist(end_dt=last_dt) - else: - do_backfill = True + # let caller unblock and deliver latest history frame + task_status.started(shm) + some_data_ready.set() - # await tractor.breakpoint() + # pull new history frames until we hit latest + # already in the tsdb + while start_dt > last_tsdb_dt: - some_data_ready.set() + array, start_dt, end_dt = await hist(end_dt=start_dt) + s_diff = (last_tsdb_dt - start_dt).seconds - if do_backfill: + # if we detect a partial frame's worth of data + # that is new, slice out only that history and + # write to shm. + if s_diff > 0: + assert last_tsdb_dt > start_dt + selected = array['time'] > last_tsdb_dt.timestamp() + to_push = array[selected] + log.info( + f'Pushing partial frame {to_push.size} to shm' + ) + shm.push(to_push, prepend=True) + break + + else: + # write to shm + log.info(f'Pushing {array.size} datums to shm') + shm.push(array, prepend=True) + + # TODO: see if there's faster multi-field reads: + # https://numpy.org/doc/stable/user/basics.rec.html#accessing-multiple-fields + # re-index with a `time` and index field + # await tractor.breakpoint() + + shm.push( + fastest[-shm._first.value:], + + # insert the history pre a "days worth" of samples + # to leave some real-time buffer space at the end. + prepend=True, + # start=shm._len - _secs_in_day, + field_map={ + 'Epoch': 'time', + 'Open': 'open', + 'High': 'high', + 'Low': 'low', + 'Close': 'close', + 'Volume': 'volume', + }, + ) + + # TODO: write new data to tsdb to be ready to for next + # read. + + if do_legacy_backfill: + # do a legacy incremental backfill from the provider. log.info('No existing `marketstored` found..') + bfqsn = fqsn.replace('.' + mod.name, '') # start history backfill task ``backfill_bars()`` is # a required backend func this must block until shm is # filled with first set of ohlc bars await bus.nursery.start( start_backfill, mod, - fqsn, + bfqsn, shm, ) - # _ = await bus.nursery.start(mod.backfill_bars, fqsn, shm) + # yield back after client connect with filled shm + task_status.started(shm) - # yield back after client connect with filled shm - task_status.started(shm) - - # indicate to caller that feed can be delivered to - # remote requesting client since we've loaded history - # data that can be used. - some_data_ready.set() - - # detect sample step size for sampled historical data - times = shm.array['time'] - delay_s = times[-1] - times[times != times[-1]][-1] - - # begin real-time updates of shm and tsb once the feed - # goes live. - await feed_is_live.wait() - - if opened: - sampler.ohlcv_shms.setdefault(delay_s, []).append(shm) - - # start shm incrementing for OHLC sampling at the current - # detected sampling period if one dne. - if sampler.incrementers.get(delay_s) is None: - await bus.start_task( - increment_ohlc_buffer, - delay_s, - ) + # indicate to caller that feed can be delivered to + # remote requesting client since we've loaded history + # data that can be used. + some_data_ready.set() + # history retreival loop depending on user interaction and thus + # a small RPC-prot for remotely controllinlg what data is loaded + # for viewing. await trio.sleep_forever() - # cs.cancel() async def allocate_persistent_feed( @@ -416,7 +434,7 @@ async def allocate_persistent_feed( manage_history, mod, bus, - bfqsn, + '.'.join((bfqsn, brokername)), some_data_ready, feed_is_live, ) @@ -429,7 +447,6 @@ async def allocate_persistent_feed( # true fqsn fqsn = '.'.join((bfqsn, brokername)) - # add a fqsn entry that includes the ``.`` suffix init_msg[fqsn] = msg @@ -464,9 +481,22 @@ async def allocate_persistent_feed( if not start_stream: await trio.sleep_forever() - # backend will indicate when real-time quotes have begun. + # begin real-time updates of shm and tsb once the feed goes live and + # the backend will indicate when real-time quotes have begun. await feed_is_live.wait() + # start shm incrementer task for OHLC style sampling + # at the current detected step period. + times = shm.array['time'] + delay_s = times[-1] - times[times != times[-1]][-1] + + sampler.ohlcv_shms.setdefault(delay_s, []).append(shm) + if sampler.incrementers.get(delay_s) is None: + await bus.start_task( + increment_ohlc_buffer, + delay_s, + ) + sum_tick_vlm: bool = init_msg.get( 'shm_write_opts', {} ).get('sum_tick_vlm', True) @@ -545,7 +575,7 @@ async def open_feed_bus( init_msg, first_quotes = bus.feeds[symbol] msg = init_msg[symbol] - bfqsn = msg['fqsn'] + bfqsn = msg['fqsn'].lower() # true fqsn fqsn = '.'.join([bfqsn, brokername]) @@ -864,7 +894,10 @@ async def maybe_open_feed( **kwargs, -) -> (Feed, ReceiveChannel[dict[str, Any]]): +) -> ( + Feed, + ReceiveChannel[dict[str, Any]], +): ''' Maybe open a data to a ``brokerd`` daemon only if there is no local one for the broker-symbol pair, if one is cached use it wrapped @@ -885,6 +918,7 @@ async def maybe_open_feed( 'start_stream': kwargs.get('start_stream', True), }, key=fqsn, + ) as (cache_hit, feed): if cache_hit: