diff --git a/piker/data/feed.py b/piker/data/feed.py index 19504204..35d006de 100644 --- a/piker/data/feed.py +++ b/piker/data/feed.py @@ -22,6 +22,7 @@ This module is enabled for ``brokerd`` daemons. """ from __future__ import annotations from dataclasses import dataclass, field +from datetime import datetime from contextlib import asynccontextmanager from functools import partial from types import ModuleType @@ -42,11 +43,13 @@ from .._cacheables import maybe_open_context from ..log import get_logger, get_console_log from .._daemon import ( maybe_spawn_brokerd, + check_for_service, ) from ._sharedmem import ( maybe_open_shm_array, attach_shm_array, ShmArray, + _secs_in_day, ) from .ingest import get_ingestormod from ._source import ( @@ -125,7 +128,7 @@ class _FeedsBus(BaseModel): # def cancel_task( # self, - # task: trio.lowlevel.Task + # task: trio.lowlevel.Task, # ) -> bool: # ... @@ -218,7 +221,61 @@ async def manage_history( readonly=False, ) - if opened: + log.info('Scanning for existing `marketstored`') + is_up = await check_for_service('marketstored') + if is_up and opened: + log.info('Found existing `marketstored`') + from . import marketstore + + async with marketstore.open_storage_client( + fqsn, + ) as (storage, tsdb_arrays): + + # TODO: history validation + # assert opened, f'Persistent shm for {symbol} was already open?!' + # if not opened: + # raise RuntimeError( + # "Persistent shm for sym was already open?!" + # ) + + if tsdb_arrays: + log.info(f'Loaded tsdb history {tsdb_arrays}') + fastest = list(tsdb_arrays[fqsn].values())[0] + last_s = fastest['Epoch'][-1] + + # TODO: see if there's faster multi-field reads: + # https://numpy.org/doc/stable/user/basics.rec.html#accessing-multiple-fields + + # re-index with a `time` and index field + shm.push( + fastest[-3 * _secs_in_day:], + + # insert the history pre a "days worth" of samples + # to leave some real-time buffer space at the end. + prepend=True, + start=shm._len - _secs_in_day, + field_map={ + 'Epoch': 'time', + 'Open': 'open', + 'High': 'high', + 'Low': 'low', + 'Close': 'close', + 'Volume': 'volume', + }, + ) + + # start history anal and load missing new data via backend. + async with mod.open_history_client(fqsn) as hist: + + # get latest query's worth of history + array, next_dt = await hist(end_dt='') + + last_dt = datetime.fromtimestamp(last_s) + array, next_dt = await hist(end_dt=last_dt) + + some_data_ready.set() + + elif opened: log.info('No existing `marketstored` found..') # start history backfill task ``backfill_bars()`` is @@ -254,6 +311,7 @@ async def manage_history( ) await trio.sleep_forever() + # cs.cancel() async def allocate_persistent_feed( @@ -261,6 +319,7 @@ async def allocate_persistent_feed( brokername: str, symbol: str, loglevel: str, + start_stream: bool = True, task_status: TaskStatus[trio.CancelScope] = trio.TASK_STATUS_IGNORED, @@ -302,10 +361,8 @@ async def allocate_persistent_feed( loglevel=loglevel, ) ) - # the broker-specific fully qualified symbol name, - # but ensure it is lower-cased for external use. - bfqsn = init_msg[symbol]['fqsn'].lower() - init_msg[symbol]['fqsn'] = bfqsn + # the broker-specific fully qualified symbol name + bfqsn = init_msg[symbol]['fqsn'] # HISTORY, run 2 tasks: # - a history loader / maintainer @@ -333,6 +390,7 @@ async def allocate_persistent_feed( # true fqsn fqsn = '.'.join((bfqsn, brokername)) + # add a fqsn entry that includes the ``.`` suffix init_msg[fqsn] = msg @@ -364,6 +422,9 @@ async def allocate_persistent_feed( # task_status.started((init_msg, generic_first_quotes)) task_status.started() + if not start_stream: + await trio.sleep_forever() + # backend will indicate when real-time quotes have begun. await feed_is_live.wait() @@ -429,13 +490,12 @@ async def open_feed_bus( bus=bus, brokername=brokername, - # here we pass through the selected symbol in native # "format" (i.e. upper vs. lowercase depending on # provider). symbol=symbol, - loglevel=loglevel, + start_stream=start_stream, ) ) # TODO: we can remove this? @@ -446,7 +506,7 @@ async def open_feed_bus( init_msg, first_quotes = bus.feeds[symbol] msg = init_msg[symbol] - bfqsn = msg['fqsn'].lower() + bfqsn = msg['fqsn'] # true fqsn fqsn = '.'.join([bfqsn, brokername]) @@ -765,10 +825,7 @@ async def maybe_open_feed( **kwargs, -) -> ( - Feed, - ReceiveChannel[dict[str, Any]], -): +) -> (Feed, ReceiveChannel[dict[str, Any]]): ''' Maybe open a data to a ``brokerd`` daemon only if there is no local one for the broker-symbol pair, if one is cached use it wrapped @@ -789,7 +846,6 @@ async def maybe_open_feed( 'start_stream': kwargs.get('start_stream', True), }, key=fqsn, - ) as (cache_hit, feed): if cache_hit: diff --git a/piker/data/marketstore.py b/piker/data/marketstore.py index cdcaeb02..ff4a52eb 100644 --- a/piker/data/marketstore.py +++ b/piker/data/marketstore.py @@ -28,8 +28,9 @@ from pprint import pformat from typing import ( Any, Optional, + Union, # Callable, - TYPE_CHECKING, + # TYPE_CHECKING, ) import time from math import isnan @@ -40,12 +41,19 @@ import numpy as np import pandas as pd import tractor from trio_websocket import open_websocket_url -from anyio_marketstore import open_marketstore_client, MarketstoreClient, Params +from anyio_marketstore import ( + open_marketstore_client, + MarketstoreClient, + Params, +) +import purerpc -from ..log import get_logger, get_console_log from .feed import maybe_open_feed -from ._source import mk_fqsn, Symbol - +from ._source import ( + mk_fqsn, + # Symbol, +) +from ..log import get_logger, get_console_log # if TYPE_CHECKING: # from ._sharedmem import ShmArray @@ -210,42 +218,130 @@ tf_in_1s = bidict({ }) -async def manage_history( - fqsn: str, - period: int = 1, # in seconds - -) -> dict[str, np.ndarray]: +class Storage: ''' - Load a series by key and deliver in ``numpy`` struct array - format. + High level storage api for both real-time and historical ingest. + + + ''' + def __init__( + self, + client: MarketstoreClient, + + ) -> None: + # TODO: eventually this should be an api/interface type that + # ensures we can support multiple tsdb backends. + self.client = client + + # series' cache from tsdb reads + self._arrays: dict[str, np.ndarray] = {} + + async def write_ticks(self, ticks: list) -> None: + ... + + async def write_ohlcv(self, ohlcv: np.ndarray) -> None: + ... + + async def read_ohlcv( + self, + fqsn: str, + timeframe: Optional[Union[int, str]] = None, + + ) -> tuple[ + MarketstoreClient, + Union[dict, np.ndarray] + ]: + client = self.client + syms = await client.list_symbols() + + if fqsn not in syms: + return {} + + if timeframe is None: + log.info(f'starting {fqsn} tsdb granularity scan..') + # loop through and try to find highest granularity + for tfstr in tf_in_1s.values(): + try: + log.info(f'querying for {tfstr}@{fqsn}') + result = await client.query(Params(fqsn, tfstr, 'OHLCV',)) + break + except purerpc.grpclib.exceptions.UnknownError: + # XXX: this is already logged by the container and + # thus shows up through `marketstored` logs relay. + # log.warning(f'{tfstr}@{fqsn} not found') + continue + else: + return {} + + else: + tfstr = tf_in_1s[timeframe] + result = await client.query(Params(fqsn, tfstr, 'OHLCV',)) + + # Fill out a `numpy` array-results map + arrays = {} + for fqsn, data_set in result.by_symbols().items(): + arrays.setdefault(fqsn, {})[ + tf_in_1s.inverse[data_set.timeframe] + ] = data_set.array + + return ( + client, + arrays[fqsn][timeframe] if timeframe else arrays, + ) + + +@acm +async def open_storage_client( + fqsn: str, + period: Optional[Union[int, str]] = None, # in seconds + +) -> tuple[Storage, dict[str, np.ndarray]]: + ''' + Load a series by key and deliver in ``numpy`` struct array format. ''' async with get_client() as client: - tfstr = tf_in_1s[period] - result = await client.query( - Params(fqsn, tf_in_1s, 'OHLCV',) + storage_client = Storage(client) + arrays = await storage_client.read_ohlcv( + fqsn, + period, ) - # Dig out `numpy` results map - arrays = {} - # for qr in [onem, fivem]: - for name, data_set in result.by_symbols().items(): - arrays[(name, qr)] = data_set.array - await tractor.breakpoint() - # # TODO: backfiller loop - # array = arrays[(fqsn, qr)] - return arrays + yield storage_client, arrays async def backfill_history_diff( # symbol: Symbol ) -> list[str]: - # TODO: - # - compute time-smaple step - # - take ``Symbol`` as input - # - backtrack into history using backend helper endpoint + + # TODO: real-time dedicated task for ensuring + # history consistency between the tsdb, shm and real-time feed.. + + # update sequence design notes: + + # - load existing highest frequency data from mkts + # * how do we want to offer this to the UI? + # - lazy loading? + # - try to load it all and expect graphics caching/diffing + # to hide extra bits that aren't in view? + + # - compute the diff between latest data from broker and shm + # * use sql api in mkts to determine where the backend should + # start querying for data? + # * append any diff with new shm length + # * determine missing (gapped) history by scanning + # * how far back do we look? + + # - begin rt update ingest and aggregation + # * could start by always writing ticks to mkts instead of + # worrying about a shm queue for now. + # * we have a short list of shm queues worth groking: + # - https://github.com/pikers/piker/issues/107 + # * the original data feed arch blurb: + # - https://github.com/pikers/piker/issues/98 + # broker = 'ib' symbol = 'mnq.globex'