From 4555a1f279085a81f71d228b1274ff3f1691f015 Mon Sep 17 00:00:00 2001 From: Tyler Goodlet Date: Tue, 22 Feb 2022 15:21:41 -0500 Subject: [PATCH] Prototype out writing `1Sec` OHLCV data --- piker/data/marketstore.py | 113 ++++++++++++++++++++++++++++---------- 1 file changed, 83 insertions(+), 30 deletions(-) diff --git a/piker/data/marketstore.py b/piker/data/marketstore.py index 05ef549a..457f37d7 100644 --- a/piker/data/marketstore.py +++ b/piker/data/marketstore.py @@ -23,7 +23,8 @@ - todo: tick sequence stream-cloning for testing ''' -from contextlib import asynccontextmanager +from contextlib import asynccontextmanager as acm +from pprint import pformat from typing import ( Any, Optional, @@ -33,12 +34,13 @@ from typing import ( import time from math import isnan +from bidict import bidict import msgpack import numpy as np import pandas as pd import tractor from trio_websocket import open_websocket_url -from anyio_marketstore import open_marketstore_client, MarketstoreClient +from anyio_marketstore import open_marketstore_client, MarketstoreClient, Params from ..log import get_logger, get_console_log from .feed import maybe_open_feed @@ -164,12 +166,17 @@ def timestamp(date, **kwargs) -> int: return int(pd.Timestamp(date, **kwargs).value) -@asynccontextmanager +@acm async def get_client( host: str = 'localhost', port: int = 5995 ) -> MarketstoreClient: + ''' + Load a ``anyio_marketstore`` grpc client connected + to an existing ``marketstore`` server. + + ''' async with open_marketstore_client( host, port @@ -192,6 +199,41 @@ async def get_client( # raise MarketStoreError(err) +tf_in_1s = bidict({ + 1: '1Sec', + 60: '1Min', + 60*5: '5Min', + 60*15: '15Min', + 60*30: '30Min', + 60*60: '1H', + 60*60*24: '1D', +}) + + +# @acm +async def load_history( + symbol: Symbol, + period: int = 1, # in seconds + +) -> np.ndarray: + + async with get_client() as client: + + tfstr = tf_in_1s[period] + result = await client.query( + Params(fqsn, tf_in_1s, 'OHLCV',) + ) + # Dig out `numpy` results map + arrays = {} + await tractor.breakpoint() + # for qr in [onem, fivem]: + # for name, data_set in qr.by_symbols().items(): + # arrays[(name, qr)] = data_set.array + + # # TODO: backfiller loop + # array = arrays[(fqsn, qr)] + + async def backfill_history( # symbol: Symbol @@ -201,28 +243,32 @@ async def backfill_history( # - take ``Symbol`` as input # - backtrack into history using backend helper endpoint - # broker = 'ib' - # symbol = 'mnq.globex' + broker = 'ib' + symbol = 'mnq.globex' - broker = 'binance' - symbol = 'btcusdt' + # broker = 'binance' + # symbol = 'btcusdt' fqsn = mk_fqsn(broker, symbol) + print('yo') async with ( - get_client() as msclient, + get_client() as client, maybe_open_feed( broker, [symbol], loglevel='info', # backpressure=False, start_stream=False, + ) as (feed, stream), ): + print('yo') ohlcv = feed.shm.array mkts_dt = np.dtype(_ohlcv_dt) - syms = await msclient.list_symbols() + print('yo') + syms = await client.list_symbols() log.info(f'Existing symbol set:\n{pformat(syms)}') # build mkts schema compat array @@ -240,14 +286,39 @@ async def backfill_history( 'volume', ]] + key = (fqsn, '1Sec', 'OHLCV') + tbk = mk_tbk(key) + + # diff vs. existing array and append new history + # TODO: + # write to db - resp = await msclient.write( + resp = await client.write( mkts_array, - tbk=f'{fqsn}/1Min/OHLCV', - isvariablelength=True, + tbk=tbk, + # NOTE: will will append duplicates + # for the same timestamp-index. + # isvariablelength=True, ) + # TODO: should be no error? + # assert not resp.responses + + # # Dig out `numpy` results map + qr = await client.query( + Params(fqsn, '1Min`', 'OHLCV',) + ) + qr = await client.query( + # Params(fqsn, '1Sec`', 'OHLCV',) + Params(*key), + ) + arrays = {} + # for qr in [onem, fivem]: + for name, data_set in qr.by_symbols().items(): + arrays[(name, qr)] = data_set.array # TODO: backfiller loop + array = arrays[(fqsn, qr)] + await tractor.breakpoint() async def ingest_quote_stream( @@ -315,24 +386,6 @@ async def ingest_quote_stream( # await ms_client.write(symbol, a) -# async def stream_quotes( -# symbols: list[str], -# timeframe: str = '1Min', -# attr_group: str = 'TICK', -# host: str = 'localhost', -# port: int = 5993, -# loglevel: str = None - -# ) -> None: -# ''' -# Open a symbol stream from a running instance of marketstore and -# log to console. - -# ''' -# tbks: dict[str, str] = { -# sym: f'{sym}/{timeframe}/{attr_group}' for sym in symbols} - - async def stream_quotes( symbols: list[str], host: str = 'localhost',