Prototype out writing `1Sec` OHLCV data

marketstore
Tyler Goodlet 2022-02-22 15:21:41 -05:00
parent 96b7d55018
commit 1623d4ed83
1 changed files with 83 additions and 30 deletions

View File

@ -23,7 +23,8 @@
- todo: tick sequence stream-cloning for testing
'''
from contextlib import asynccontextmanager
from contextlib import asynccontextmanager as acm
from pprint import pformat
from typing import (
Any,
Optional,
@ -33,12 +34,13 @@ from typing import (
import time
from math import isnan
from bidict import bidict
import msgpack
import numpy as np
import pandas as pd
import tractor
from trio_websocket import open_websocket_url
from anyio_marketstore import open_marketstore_client, MarketstoreClient
from anyio_marketstore import open_marketstore_client, MarketstoreClient, Params
from ..log import get_logger, get_console_log
from .feed import maybe_open_feed
@ -164,12 +166,17 @@ def timestamp(date, **kwargs) -> int:
return int(pd.Timestamp(date, **kwargs).value)
@asynccontextmanager
@acm
async def get_client(
host: str = 'localhost',
port: int = 5995
) -> MarketstoreClient:
'''
Load a ``anyio_marketstore`` grpc client connected
to an existing ``marketstore`` server.
'''
async with open_marketstore_client(
host,
port
@ -192,6 +199,41 @@ async def get_client(
# raise MarketStoreError(err)
tf_in_1s = bidict({
1: '1Sec',
60: '1Min',
60*5: '5Min',
60*15: '15Min',
60*30: '30Min',
60*60: '1H',
60*60*24: '1D',
})
# @acm
async def load_history(
symbol: Symbol,
period: int = 1, # in seconds
) -> np.ndarray:
async with get_client() as client:
tfstr = tf_in_1s[period]
result = await client.query(
Params(fqsn, tf_in_1s, 'OHLCV',)
)
# Dig out `numpy` results map
arrays = {}
await tractor.breakpoint()
# for qr in [onem, fivem]:
# for name, data_set in qr.by_symbols().items():
# arrays[(name, qr)] = data_set.array
# # TODO: backfiller loop
# array = arrays[(fqsn, qr)]
async def backfill_history(
# symbol: Symbol
@ -201,28 +243,32 @@ async def backfill_history(
# - take ``Symbol`` as input
# - backtrack into history using backend helper endpoint
# broker = 'ib'
# symbol = 'mnq.globex'
broker = 'ib'
symbol = 'mnq.globex'
broker = 'binance'
symbol = 'btcusdt'
# broker = 'binance'
# symbol = 'btcusdt'
fqsn = mk_fqsn(broker, symbol)
print('yo')
async with (
get_client() as msclient,
get_client() as client,
maybe_open_feed(
broker,
[symbol],
loglevel='info',
# backpressure=False,
start_stream=False,
) as (feed, stream),
):
print('yo')
ohlcv = feed.shm.array
mkts_dt = np.dtype(_ohlcv_dt)
syms = await msclient.list_symbols()
print('yo')
syms = await client.list_symbols()
log.info(f'Existing symbol set:\n{pformat(syms)}')
# build mkts schema compat array
@ -240,14 +286,39 @@ async def backfill_history(
'volume',
]]
key = (fqsn, '1Sec', 'OHLCV')
tbk = mk_tbk(key)
# diff vs. existing array and append new history
# TODO:
# write to db
resp = await msclient.write(
resp = await client.write(
mkts_array,
tbk=f'{fqsn}/1Min/OHLCV',
isvariablelength=True,
tbk=tbk,
# NOTE: will will append duplicates
# for the same timestamp-index.
# isvariablelength=True,
)
# TODO: should be no error?
# assert not resp.responses
# # Dig out `numpy` results map
qr = await client.query(
Params(fqsn, '1Min`', 'OHLCV',)
)
qr = await client.query(
# Params(fqsn, '1Sec`', 'OHLCV',)
Params(*key),
)
arrays = {}
# for qr in [onem, fivem]:
for name, data_set in qr.by_symbols().items():
arrays[(name, qr)] = data_set.array
# TODO: backfiller loop
array = arrays[(fqsn, qr)]
await tractor.breakpoint()
async def ingest_quote_stream(
@ -315,24 +386,6 @@ async def ingest_quote_stream(
# await ms_client.write(symbol, a)
# async def stream_quotes(
# symbols: list[str],
# timeframe: str = '1Min',
# attr_group: str = 'TICK',
# host: str = 'localhost',
# port: int = 5993,
# loglevel: str = None
# ) -> None:
# '''
# Open a symbol stream from a running instance of marketstore and
# log to console.
# '''
# tbks: dict[str, str] = {
# sym: f'{sym}/{timeframe}/{attr_group}' for sym in symbols}
async def stream_quotes(
symbols: list[str],
host: str = 'localhost',