From 893ac7a98655f7df61d8d1d77e652ad7ca649317 Mon Sep 17 00:00:00 2001 From: Tyler Goodlet Date: Fri, 18 Feb 2022 12:17:12 -0500 Subject: [PATCH] Add WIP backfiller from data feed helper --- piker/data/marketstore.py | 39 ++++++++++++++++++++++++++++++++++++--- 1 file changed, 36 insertions(+), 3 deletions(-) diff --git a/piker/data/marketstore.py b/piker/data/marketstore.py index c1484706..9419a6b5 100644 --- a/piker/data/marketstore.py +++ b/piker/data/marketstore.py @@ -36,7 +36,7 @@ from trio_websocket import open_websocket_url from anyio_marketstore import open_marketstore_client, MarketstoreClient from ..log import get_logger, get_console_log -from ..data import open_feed +from ..data.feed import maybe_open_feed log = get_logger(__name__) @@ -71,6 +71,7 @@ _quote_dt = [ # ('brokerd_ts', 'i64'), # ('VWAP', 'f4') ] + _quote_tmp = {}.fromkeys(dict(_quote_dt).keys(), np.nan) _tick_map = { 'Up': 1, @@ -79,6 +80,19 @@ _tick_map = { None: np.nan, } +_ohlcv_dt = [ + # these two are required for as a "primary key" + ('Epoch', 'i8'), + ('Nanoseconds', 'i4'), + + # ohlcv sampling + ('Open', 'f4'), + ('High', 'f4'), + ('Low', 'i8'), + ('Close', 'i8'), + ('Volume', 'f4'), +] + def mk_tbk(keys: tuple[str, str, str]) -> str: ''' @@ -143,13 +157,32 @@ def timestamp(date, **kwargs) -> int: @asynccontextmanager async def get_client( host: str = 'localhost', - port: int = 5995 + port: int = 5993 ) -> MarketstoreClient: async with open_marketstore_client(host, port) as client: yield client +async def backfill_history(): + + async with ( + get_client() as msclient, + maybe_open_feed( + 'ib', + ['mnq.globex'], + loglevel='info', + # backpressure=False, + start_stream=False, + ) as (feed, stream), + ): + await tractor.breakpoint() + await msclient.write( + feed.shm.array, + tbk='mnq.globex.ib/1Sec/OHLCV', + ) + + async def ingest_quote_stream( symbols: list[str], brokername: str, @@ -162,7 +195,7 @@ async def ingest_quote_stream( ''' async with ( - open_feed(brokername, symbols, loglevel=loglevel) as feed, + maybe_open_feed(brokername, symbols, loglevel=loglevel) as feed, get_client() as ms_client, ): async for quotes in feed.stream: