From e1bbcff8e037032de7f46a6fc3ebb052114d57e6 Mon Sep 17 00:00:00 2001 From: Tyler Goodlet Date: Sat, 19 Feb 2022 16:36:02 -0500 Subject: [PATCH] Get basic OHLCV writes working with `anyio` client --- piker/data/marketstore.py | 93 +++++++++++++++++++++++++++++++++------ 1 file changed, 80 insertions(+), 13 deletions(-) diff --git a/piker/data/marketstore.py b/piker/data/marketstore.py index 9419a6b5..05ef549a 100644 --- a/piker/data/marketstore.py +++ b/piker/data/marketstore.py @@ -21,10 +21,15 @@ - ticK data ingest routines - websocket client for subscribing to write triggers - todo: tick sequence stream-cloning for testing -- todo: docker container management automation + ''' from contextlib import asynccontextmanager -from typing import Any, Optional +from typing import ( + Any, + Optional, + # Callable, + TYPE_CHECKING, +) import time from math import isnan @@ -36,7 +41,12 @@ from trio_websocket import open_websocket_url from anyio_marketstore import open_marketstore_client, MarketstoreClient from ..log import get_logger, get_console_log -from ..data.feed import maybe_open_feed +from .feed import maybe_open_feed +from ._source import mk_fqsn, Symbol + + +# if TYPE_CHECKING: +# from ._sharedmem import ShmArray log = get_logger(__name__) @@ -83,7 +93,7 @@ _tick_map = { _ohlcv_dt = [ # these two are required for as a "primary key" ('Epoch', 'i8'), - ('Nanoseconds', 'i4'), + # ('Nanoseconds', 'i4'), # ohlcv sampling ('Open', 'f4'), @@ -157,30 +167,87 @@ def timestamp(date, **kwargs) -> int: @asynccontextmanager async def get_client( host: str = 'localhost', - port: int = 5993 + port: int = 5995 ) -> MarketstoreClient: - async with open_marketstore_client(host, port) as client: + async with open_marketstore_client( + host, + port + ) as client: yield client -async def backfill_history(): +# class MarketStoreError(Exception): +# "Generic marketstore client error" + + +# def err_on_resp(response: dict) -> None: +# """Raise any errors found in responses from client request. +# """ +# responses = response['responses'] +# if responses is not None: +# for r in responses: +# err = r['error'] +# if err: +# raise MarketStoreError(err) + + +async def backfill_history( + # symbol: Symbol + +) -> list[str]: + # TODO: + # - compute time-smaple step + # - take ``Symbol`` as input + # - backtrack into history using backend helper endpoint + + # broker = 'ib' + # symbol = 'mnq.globex' + + broker = 'binance' + symbol = 'btcusdt' + + fqsn = mk_fqsn(broker, symbol) async with ( get_client() as msclient, maybe_open_feed( - 'ib', - ['mnq.globex'], + broker, + [symbol], loglevel='info', # backpressure=False, start_stream=False, ) as (feed, stream), ): - await tractor.breakpoint() - await msclient.write( - feed.shm.array, - tbk='mnq.globex.ib/1Sec/OHLCV', + ohlcv = feed.shm.array + mkts_dt = np.dtype(_ohlcv_dt) + + syms = await msclient.list_symbols() + log.info(f'Existing symbol set:\n{pformat(syms)}') + + # build mkts schema compat array + mkts_array = np.zeros( + len(ohlcv), + dtype=mkts_dt, ) + # copy from shm array + mkts_array[:] = ohlcv[[ + 'time', + 'open', + 'high', + 'low', + 'close', + 'volume', + ]] + + # write to db + resp = await msclient.write( + mkts_array, + tbk=f'{fqsn}/1Min/OHLCV', + isvariablelength=True, + ) + + # TODO: backfiller loop async def ingest_quote_stream(