Prototype out writing `1Sec` OHLCV data
parent
6dfe59cce6
commit
7d628c4059
|
@ -23,7 +23,8 @@
|
|||
- todo: tick sequence stream-cloning for testing
|
||||
|
||||
'''
|
||||
from contextlib import asynccontextmanager
|
||||
from contextlib import asynccontextmanager as acm
|
||||
from pprint import pformat
|
||||
from typing import (
|
||||
Any,
|
||||
Optional,
|
||||
|
@ -33,12 +34,13 @@ from typing import (
|
|||
import time
|
||||
from math import isnan
|
||||
|
||||
from bidict import bidict
|
||||
import msgpack
|
||||
import numpy as np
|
||||
import pandas as pd
|
||||
import tractor
|
||||
from trio_websocket import open_websocket_url
|
||||
from anyio_marketstore import open_marketstore_client, MarketstoreClient
|
||||
from anyio_marketstore import open_marketstore_client, MarketstoreClient, Params
|
||||
|
||||
from ..log import get_logger, get_console_log
|
||||
from .feed import maybe_open_feed
|
||||
|
@ -164,12 +166,17 @@ def timestamp(date, **kwargs) -> int:
|
|||
return int(pd.Timestamp(date, **kwargs).value)
|
||||
|
||||
|
||||
@asynccontextmanager
|
||||
@acm
|
||||
async def get_client(
|
||||
host: str = 'localhost',
|
||||
port: int = 5995
|
||||
|
||||
) -> MarketstoreClient:
|
||||
'''
|
||||
Load a ``anyio_marketstore`` grpc client connected
|
||||
to an existing ``marketstore`` server.
|
||||
|
||||
'''
|
||||
async with open_marketstore_client(
|
||||
host,
|
||||
port
|
||||
|
@ -192,6 +199,41 @@ async def get_client(
|
|||
# raise MarketStoreError(err)
|
||||
|
||||
|
||||
tf_in_1s = bidict({
|
||||
1: '1Sec',
|
||||
60: '1Min',
|
||||
60*5: '5Min',
|
||||
60*15: '15Min',
|
||||
60*30: '30Min',
|
||||
60*60: '1H',
|
||||
60*60*24: '1D',
|
||||
})
|
||||
|
||||
|
||||
# @acm
|
||||
async def load_history(
|
||||
symbol: Symbol,
|
||||
period: int = 1, # in seconds
|
||||
|
||||
) -> np.ndarray:
|
||||
|
||||
async with get_client() as client:
|
||||
|
||||
tfstr = tf_in_1s[period]
|
||||
result = await client.query(
|
||||
Params(fqsn, tf_in_1s, 'OHLCV',)
|
||||
)
|
||||
# Dig out `numpy` results map
|
||||
arrays = {}
|
||||
await tractor.breakpoint()
|
||||
# for qr in [onem, fivem]:
|
||||
# for name, data_set in qr.by_symbols().items():
|
||||
# arrays[(name, qr)] = data_set.array
|
||||
|
||||
# # TODO: backfiller loop
|
||||
# array = arrays[(fqsn, qr)]
|
||||
|
||||
|
||||
async def backfill_history(
|
||||
# symbol: Symbol
|
||||
|
||||
|
@ -201,28 +243,32 @@ async def backfill_history(
|
|||
# - take ``Symbol`` as input
|
||||
# - backtrack into history using backend helper endpoint
|
||||
|
||||
# broker = 'ib'
|
||||
# symbol = 'mnq.globex'
|
||||
broker = 'ib'
|
||||
symbol = 'mnq.globex'
|
||||
|
||||
broker = 'binance'
|
||||
symbol = 'btcusdt'
|
||||
# broker = 'binance'
|
||||
# symbol = 'btcusdt'
|
||||
|
||||
fqsn = mk_fqsn(broker, symbol)
|
||||
|
||||
print('yo')
|
||||
async with (
|
||||
get_client() as msclient,
|
||||
get_client() as client,
|
||||
maybe_open_feed(
|
||||
broker,
|
||||
[symbol],
|
||||
loglevel='info',
|
||||
# backpressure=False,
|
||||
start_stream=False,
|
||||
|
||||
) as (feed, stream),
|
||||
):
|
||||
print('yo')
|
||||
ohlcv = feed.shm.array
|
||||
mkts_dt = np.dtype(_ohlcv_dt)
|
||||
|
||||
syms = await msclient.list_symbols()
|
||||
print('yo')
|
||||
syms = await client.list_symbols()
|
||||
log.info(f'Existing symbol set:\n{pformat(syms)}')
|
||||
|
||||
# build mkts schema compat array
|
||||
|
@ -240,14 +286,39 @@ async def backfill_history(
|
|||
'volume',
|
||||
]]
|
||||
|
||||
key = (fqsn, '1Sec', 'OHLCV')
|
||||
tbk = mk_tbk(key)
|
||||
|
||||
# diff vs. existing array and append new history
|
||||
# TODO:
|
||||
|
||||
# write to db
|
||||
resp = await msclient.write(
|
||||
resp = await client.write(
|
||||
mkts_array,
|
||||
tbk=f'{fqsn}/1Min/OHLCV',
|
||||
isvariablelength=True,
|
||||
tbk=tbk,
|
||||
# NOTE: will will append duplicates
|
||||
# for the same timestamp-index.
|
||||
# isvariablelength=True,
|
||||
)
|
||||
# TODO: should be no error?
|
||||
# assert not resp.responses
|
||||
|
||||
# # Dig out `numpy` results map
|
||||
qr = await client.query(
|
||||
Params(fqsn, '1Min`', 'OHLCV',)
|
||||
)
|
||||
qr = await client.query(
|
||||
# Params(fqsn, '1Sec`', 'OHLCV',)
|
||||
Params(*key),
|
||||
)
|
||||
arrays = {}
|
||||
# for qr in [onem, fivem]:
|
||||
for name, data_set in qr.by_symbols().items():
|
||||
arrays[(name, qr)] = data_set.array
|
||||
|
||||
# TODO: backfiller loop
|
||||
array = arrays[(fqsn, qr)]
|
||||
await tractor.breakpoint()
|
||||
|
||||
|
||||
async def ingest_quote_stream(
|
||||
|
@ -315,24 +386,6 @@ async def ingest_quote_stream(
|
|||
# await ms_client.write(symbol, a)
|
||||
|
||||
|
||||
# async def stream_quotes(
|
||||
# symbols: list[str],
|
||||
# timeframe: str = '1Min',
|
||||
# attr_group: str = 'TICK',
|
||||
# host: str = 'localhost',
|
||||
# port: int = 5993,
|
||||
# loglevel: str = None
|
||||
|
||||
# ) -> None:
|
||||
# '''
|
||||
# Open a symbol stream from a running instance of marketstore and
|
||||
# log to console.
|
||||
|
||||
# '''
|
||||
# tbks: dict[str, str] = {
|
||||
# sym: f'{sym}/{timeframe}/{attr_group}' for sym in symbols}
|
||||
|
||||
|
||||
async def stream_quotes(
|
||||
symbols: list[str],
|
||||
host: str = 'localhost',
|
||||
|
|
Loading…
Reference in New Issue