From 8af76322c9967987bef7bfe0617e0e71a5918ab6 Mon Sep 17 00:00:00 2001 From: Tyler Goodlet Date: Tue, 1 Mar 2022 12:29:49 -0500 Subject: [PATCH] Prototype a high level `Storage` api Starts a wrapper around the `marketstore` client to do basic ohlcv query and retrieval and prototypes out write methods for ohlc and tick. Try to connect to `marketstore` automatically (which will fail if not started currently) but we will eventually first do a service query. --- piker/data/feed.py | 108 ++++++++++++++++----------- piker/data/marketstore.py | 152 +++++++++++++++++++++++++++++++------- 2 files changed, 188 insertions(+), 72 deletions(-) diff --git a/piker/data/feed.py b/piker/data/feed.py index d24aa434..c732a8d1 100644 --- a/piker/data/feed.py +++ b/piker/data/feed.py @@ -51,7 +51,6 @@ from ._sharedmem import ( from .ingest import get_ingestormod from ._source import ( base_iohlc_dtype, - mk_symbol, Symbol, mk_fqsn, ) @@ -126,7 +125,7 @@ class _FeedsBus(BaseModel): # def cancel_task( # self, - # task: trio.lowlevel.Task + # task: trio.lowlevel.Task, # ) -> bool: # ... @@ -209,48 +208,68 @@ async def manage_history( buffer. ''' - task_status.started() + # TODO: history retreival, see if we can pull from an existing + # ``marketstored`` daemon opened = we_opened_shm - # TODO: history validation - # assert opened, f'Persistent shm for {symbol} was already open?!' - # if not opened: - # raise RuntimeError("Persistent shm for sym was already open?!") + fqsn = mk_fqsn(mod.name, symbol) - if opened: - # ask broker backend for new history + from . import marketstore + log.info('Scanning for existing `marketstored`') - # start history backfill task ``backfill_bars()`` is - # a required backend func this must block until shm is - # filled with first set of ohlc bars - cs = await bus.nursery.start(mod.backfill_bars, symbol, shm) + async with marketstore.open_storage_client( + fqsn, + ) as (storage, arrays): - # indicate to caller that feed can be delivered to - # remote requesting client since we've loaded history - # data that can be used. - some_data_ready.set() + # yield back after client connect + task_status.started() - # detect sample step size for sampled historical data - times = shm.array['time'] - delay_s = times[-1] - times[times != times[-1]][-1] + # TODO: history validation + # assert opened, f'Persistent shm for {symbol} was already open?!' + # if not opened: + # raise RuntimeError("Persistent shm for sym was already open?!") - # begin real-time updates of shm and tsb once the feed - # goes live. - await feed_is_live.wait() + if opened: + if arrays: - if opened: - sampler.ohlcv_shms.setdefault(delay_s, []).append(shm) + log.info(f'Loaded tsdb history {arrays}') + # push to shm + # set data ready + # some_data_ready.set() - # start shm incrementing for OHLC sampling at the current - # detected sampling period if one dne. - if sampler.incrementers.get(delay_s) is None: - cs = await bus.start_task( - increment_ohlc_buffer, - delay_s, - ) + # ask broker backend for new history - await trio.sleep_forever() - cs.cancel() + # start history backfill task ``backfill_bars()`` is + # a required backend func this must block until shm is + # filled with first set of ohlc bars + _ = await bus.nursery.start(mod.backfill_bars, symbol, shm) + + # indicate to caller that feed can be delivered to + # remote requesting client since we've loaded history + # data that can be used. + some_data_ready.set() + + # detect sample step size for sampled historical data + times = shm.array['time'] + delay_s = times[-1] - times[times != times[-1]][-1] + + # begin real-time updates of shm and tsb once the feed + # goes live. + await feed_is_live.wait() + + if opened: + sampler.ohlcv_shms.setdefault(delay_s, []).append(shm) + + # start shm incrementing for OHLC sampling at the current + # detected sampling period if one dne. + if sampler.incrementers.get(delay_s) is None: + await bus.start_task( + increment_ohlc_buffer, + delay_s, + ) + + await trio.sleep_forever() + # cs.cancel() async def allocate_persistent_feed( @@ -310,9 +329,9 @@ async def allocate_persistent_feed( # XXX: neither of these will raise but will cause an inf hang due to: # https://github.com/python-trio/trio/issues/2258 # bus.nursery.start_soon( - # await bus.start_task( - await bus.nursery.start( + # await bus.nursery.start( + await bus.start_task( manage_history, mod, shm, @@ -339,6 +358,10 @@ async def allocate_persistent_feed( # can read directly from the memory which will be written by # this task. init_msg[symbol]['shm_token'] = shm.token + # symbol = Symbol.from_broker_info( + # fqsn, + # init_msg[symbol]['symbol_info'] + # ) # TODO: pretty sure we don't need this? why not just leave 1s as # the fastest "sample period" since we'll probably always want that @@ -697,15 +720,12 @@ async def open_feed( for sym, data in init_msg.items(): si = data['symbol_info'] - - symbol = mk_symbol( - key=sym, - type_key=si.get('asset_type', 'forex'), - tick_size=si.get('price_tick_size', 0.01), - lot_tick_size=si.get('lot_tick_size', 0.0), + symbol = Symbol.from_broker_info( + brokername, + sym, + si, ) - symbol.broker_info[brokername] = si - + # symbol.broker_info[brokername] = si feed.symbols[sym] = symbol # cast shm dtype to list... can't member why we need this diff --git a/piker/data/marketstore.py b/piker/data/marketstore.py index cdcaeb02..ff4a52eb 100644 --- a/piker/data/marketstore.py +++ b/piker/data/marketstore.py @@ -28,8 +28,9 @@ from pprint import pformat from typing import ( Any, Optional, + Union, # Callable, - TYPE_CHECKING, + # TYPE_CHECKING, ) import time from math import isnan @@ -40,12 +41,19 @@ import numpy as np import pandas as pd import tractor from trio_websocket import open_websocket_url -from anyio_marketstore import open_marketstore_client, MarketstoreClient, Params +from anyio_marketstore import ( + open_marketstore_client, + MarketstoreClient, + Params, +) +import purerpc -from ..log import get_logger, get_console_log from .feed import maybe_open_feed -from ._source import mk_fqsn, Symbol - +from ._source import ( + mk_fqsn, + # Symbol, +) +from ..log import get_logger, get_console_log # if TYPE_CHECKING: # from ._sharedmem import ShmArray @@ -210,42 +218,130 @@ tf_in_1s = bidict({ }) -async def manage_history( - fqsn: str, - period: int = 1, # in seconds - -) -> dict[str, np.ndarray]: +class Storage: ''' - Load a series by key and deliver in ``numpy`` struct array - format. + High level storage api for both real-time and historical ingest. + + + ''' + def __init__( + self, + client: MarketstoreClient, + + ) -> None: + # TODO: eventually this should be an api/interface type that + # ensures we can support multiple tsdb backends. + self.client = client + + # series' cache from tsdb reads + self._arrays: dict[str, np.ndarray] = {} + + async def write_ticks(self, ticks: list) -> None: + ... + + async def write_ohlcv(self, ohlcv: np.ndarray) -> None: + ... + + async def read_ohlcv( + self, + fqsn: str, + timeframe: Optional[Union[int, str]] = None, + + ) -> tuple[ + MarketstoreClient, + Union[dict, np.ndarray] + ]: + client = self.client + syms = await client.list_symbols() + + if fqsn not in syms: + return {} + + if timeframe is None: + log.info(f'starting {fqsn} tsdb granularity scan..') + # loop through and try to find highest granularity + for tfstr in tf_in_1s.values(): + try: + log.info(f'querying for {tfstr}@{fqsn}') + result = await client.query(Params(fqsn, tfstr, 'OHLCV',)) + break + except purerpc.grpclib.exceptions.UnknownError: + # XXX: this is already logged by the container and + # thus shows up through `marketstored` logs relay. + # log.warning(f'{tfstr}@{fqsn} not found') + continue + else: + return {} + + else: + tfstr = tf_in_1s[timeframe] + result = await client.query(Params(fqsn, tfstr, 'OHLCV',)) + + # Fill out a `numpy` array-results map + arrays = {} + for fqsn, data_set in result.by_symbols().items(): + arrays.setdefault(fqsn, {})[ + tf_in_1s.inverse[data_set.timeframe] + ] = data_set.array + + return ( + client, + arrays[fqsn][timeframe] if timeframe else arrays, + ) + + +@acm +async def open_storage_client( + fqsn: str, + period: Optional[Union[int, str]] = None, # in seconds + +) -> tuple[Storage, dict[str, np.ndarray]]: + ''' + Load a series by key and deliver in ``numpy`` struct array format. ''' async with get_client() as client: - tfstr = tf_in_1s[period] - result = await client.query( - Params(fqsn, tf_in_1s, 'OHLCV',) + storage_client = Storage(client) + arrays = await storage_client.read_ohlcv( + fqsn, + period, ) - # Dig out `numpy` results map - arrays = {} - # for qr in [onem, fivem]: - for name, data_set in result.by_symbols().items(): - arrays[(name, qr)] = data_set.array - await tractor.breakpoint() - # # TODO: backfiller loop - # array = arrays[(fqsn, qr)] - return arrays + yield storage_client, arrays async def backfill_history_diff( # symbol: Symbol ) -> list[str]: - # TODO: - # - compute time-smaple step - # - take ``Symbol`` as input - # - backtrack into history using backend helper endpoint + + # TODO: real-time dedicated task for ensuring + # history consistency between the tsdb, shm and real-time feed.. + + # update sequence design notes: + + # - load existing highest frequency data from mkts + # * how do we want to offer this to the UI? + # - lazy loading? + # - try to load it all and expect graphics caching/diffing + # to hide extra bits that aren't in view? + + # - compute the diff between latest data from broker and shm + # * use sql api in mkts to determine where the backend should + # start querying for data? + # * append any diff with new shm length + # * determine missing (gapped) history by scanning + # * how far back do we look? + + # - begin rt update ingest and aggregation + # * could start by always writing ticks to mkts instead of + # worrying about a shm queue for now. + # * we have a short list of shm queues worth groking: + # - https://github.com/pikers/piker/issues/107 + # * the original data feed arch blurb: + # - https://github.com/pikers/piker/issues/98 + # broker = 'ib' symbol = 'mnq.globex'