Prototype out writing `1Sec` OHLCV data
parent
4402b2dc73
commit
f60d9dd79c
|
@ -23,7 +23,8 @@
|
||||||
- todo: tick sequence stream-cloning for testing
|
- todo: tick sequence stream-cloning for testing
|
||||||
|
|
||||||
'''
|
'''
|
||||||
from contextlib import asynccontextmanager
|
from contextlib import asynccontextmanager as acm
|
||||||
|
from pprint import pformat
|
||||||
from typing import (
|
from typing import (
|
||||||
Any,
|
Any,
|
||||||
Optional,
|
Optional,
|
||||||
|
@ -33,12 +34,13 @@ from typing import (
|
||||||
import time
|
import time
|
||||||
from math import isnan
|
from math import isnan
|
||||||
|
|
||||||
|
from bidict import bidict
|
||||||
import msgpack
|
import msgpack
|
||||||
import numpy as np
|
import numpy as np
|
||||||
import pandas as pd
|
import pandas as pd
|
||||||
import tractor
|
import tractor
|
||||||
from trio_websocket import open_websocket_url
|
from trio_websocket import open_websocket_url
|
||||||
from anyio_marketstore import open_marketstore_client, MarketstoreClient
|
from anyio_marketstore import open_marketstore_client, MarketstoreClient, Params
|
||||||
|
|
||||||
from ..log import get_logger, get_console_log
|
from ..log import get_logger, get_console_log
|
||||||
from .feed import maybe_open_feed
|
from .feed import maybe_open_feed
|
||||||
|
@ -164,12 +166,17 @@ def timestamp(date, **kwargs) -> int:
|
||||||
return int(pd.Timestamp(date, **kwargs).value)
|
return int(pd.Timestamp(date, **kwargs).value)
|
||||||
|
|
||||||
|
|
||||||
@asynccontextmanager
|
@acm
|
||||||
async def get_client(
|
async def get_client(
|
||||||
host: str = 'localhost',
|
host: str = 'localhost',
|
||||||
port: int = 5995
|
port: int = 5995
|
||||||
|
|
||||||
) -> MarketstoreClient:
|
) -> MarketstoreClient:
|
||||||
|
'''
|
||||||
|
Load a ``anyio_marketstore`` grpc client connected
|
||||||
|
to an existing ``marketstore`` server.
|
||||||
|
|
||||||
|
'''
|
||||||
async with open_marketstore_client(
|
async with open_marketstore_client(
|
||||||
host,
|
host,
|
||||||
port
|
port
|
||||||
|
@ -192,6 +199,41 @@ async def get_client(
|
||||||
# raise MarketStoreError(err)
|
# raise MarketStoreError(err)
|
||||||
|
|
||||||
|
|
||||||
|
tf_in_1s = bidict({
|
||||||
|
1: '1Sec',
|
||||||
|
60: '1Min',
|
||||||
|
60*5: '5Min',
|
||||||
|
60*15: '15Min',
|
||||||
|
60*30: '30Min',
|
||||||
|
60*60: '1H',
|
||||||
|
60*60*24: '1D',
|
||||||
|
})
|
||||||
|
|
||||||
|
|
||||||
|
# @acm
|
||||||
|
async def load_history(
|
||||||
|
symbol: Symbol,
|
||||||
|
period: int = 1, # in seconds
|
||||||
|
|
||||||
|
) -> np.ndarray:
|
||||||
|
|
||||||
|
async with get_client() as client:
|
||||||
|
|
||||||
|
tfstr = tf_in_1s[period]
|
||||||
|
result = await client.query(
|
||||||
|
Params(fqsn, tf_in_1s, 'OHLCV',)
|
||||||
|
)
|
||||||
|
# Dig out `numpy` results map
|
||||||
|
arrays = {}
|
||||||
|
await tractor.breakpoint()
|
||||||
|
# for qr in [onem, fivem]:
|
||||||
|
# for name, data_set in qr.by_symbols().items():
|
||||||
|
# arrays[(name, qr)] = data_set.array
|
||||||
|
|
||||||
|
# # TODO: backfiller loop
|
||||||
|
# array = arrays[(fqsn, qr)]
|
||||||
|
|
||||||
|
|
||||||
async def backfill_history(
|
async def backfill_history(
|
||||||
# symbol: Symbol
|
# symbol: Symbol
|
||||||
|
|
||||||
|
@ -201,28 +243,32 @@ async def backfill_history(
|
||||||
# - take ``Symbol`` as input
|
# - take ``Symbol`` as input
|
||||||
# - backtrack into history using backend helper endpoint
|
# - backtrack into history using backend helper endpoint
|
||||||
|
|
||||||
# broker = 'ib'
|
broker = 'ib'
|
||||||
# symbol = 'mnq.globex'
|
symbol = 'mnq.globex'
|
||||||
|
|
||||||
broker = 'binance'
|
# broker = 'binance'
|
||||||
symbol = 'btcusdt'
|
# symbol = 'btcusdt'
|
||||||
|
|
||||||
fqsn = mk_fqsn(broker, symbol)
|
fqsn = mk_fqsn(broker, symbol)
|
||||||
|
|
||||||
|
print('yo')
|
||||||
async with (
|
async with (
|
||||||
get_client() as msclient,
|
get_client() as client,
|
||||||
maybe_open_feed(
|
maybe_open_feed(
|
||||||
broker,
|
broker,
|
||||||
[symbol],
|
[symbol],
|
||||||
loglevel='info',
|
loglevel='info',
|
||||||
# backpressure=False,
|
# backpressure=False,
|
||||||
start_stream=False,
|
start_stream=False,
|
||||||
|
|
||||||
) as (feed, stream),
|
) as (feed, stream),
|
||||||
):
|
):
|
||||||
|
print('yo')
|
||||||
ohlcv = feed.shm.array
|
ohlcv = feed.shm.array
|
||||||
mkts_dt = np.dtype(_ohlcv_dt)
|
mkts_dt = np.dtype(_ohlcv_dt)
|
||||||
|
|
||||||
syms = await msclient.list_symbols()
|
print('yo')
|
||||||
|
syms = await client.list_symbols()
|
||||||
log.info(f'Existing symbol set:\n{pformat(syms)}')
|
log.info(f'Existing symbol set:\n{pformat(syms)}')
|
||||||
|
|
||||||
# build mkts schema compat array
|
# build mkts schema compat array
|
||||||
|
@ -240,14 +286,39 @@ async def backfill_history(
|
||||||
'volume',
|
'volume',
|
||||||
]]
|
]]
|
||||||
|
|
||||||
|
key = (fqsn, '1Sec', 'OHLCV')
|
||||||
|
tbk = mk_tbk(key)
|
||||||
|
|
||||||
|
# diff vs. existing array and append new history
|
||||||
|
# TODO:
|
||||||
|
|
||||||
# write to db
|
# write to db
|
||||||
resp = await msclient.write(
|
resp = await client.write(
|
||||||
mkts_array,
|
mkts_array,
|
||||||
tbk=f'{fqsn}/1Min/OHLCV',
|
tbk=tbk,
|
||||||
isvariablelength=True,
|
# NOTE: will will append duplicates
|
||||||
|
# for the same timestamp-index.
|
||||||
|
# isvariablelength=True,
|
||||||
)
|
)
|
||||||
|
# TODO: should be no error?
|
||||||
|
# assert not resp.responses
|
||||||
|
|
||||||
|
# # Dig out `numpy` results map
|
||||||
|
qr = await client.query(
|
||||||
|
Params(fqsn, '1Min`', 'OHLCV',)
|
||||||
|
)
|
||||||
|
qr = await client.query(
|
||||||
|
# Params(fqsn, '1Sec`', 'OHLCV',)
|
||||||
|
Params(*key),
|
||||||
|
)
|
||||||
|
arrays = {}
|
||||||
|
# for qr in [onem, fivem]:
|
||||||
|
for name, data_set in qr.by_symbols().items():
|
||||||
|
arrays[(name, qr)] = data_set.array
|
||||||
|
|
||||||
# TODO: backfiller loop
|
# TODO: backfiller loop
|
||||||
|
array = arrays[(fqsn, qr)]
|
||||||
|
await tractor.breakpoint()
|
||||||
|
|
||||||
|
|
||||||
async def ingest_quote_stream(
|
async def ingest_quote_stream(
|
||||||
|
@ -315,24 +386,6 @@ async def ingest_quote_stream(
|
||||||
# await ms_client.write(symbol, a)
|
# await ms_client.write(symbol, a)
|
||||||
|
|
||||||
|
|
||||||
# async def stream_quotes(
|
|
||||||
# symbols: list[str],
|
|
||||||
# timeframe: str = '1Min',
|
|
||||||
# attr_group: str = 'TICK',
|
|
||||||
# host: str = 'localhost',
|
|
||||||
# port: int = 5993,
|
|
||||||
# loglevel: str = None
|
|
||||||
|
|
||||||
# ) -> None:
|
|
||||||
# '''
|
|
||||||
# Open a symbol stream from a running instance of marketstore and
|
|
||||||
# log to console.
|
|
||||||
|
|
||||||
# '''
|
|
||||||
# tbks: dict[str, str] = {
|
|
||||||
# sym: f'{sym}/{timeframe}/{attr_group}' for sym in symbols}
|
|
||||||
|
|
||||||
|
|
||||||
async def stream_quotes(
|
async def stream_quotes(
|
||||||
symbols: list[str],
|
symbols: list[str],
|
||||||
host: str = 'localhost',
|
host: str = 'localhost',
|
||||||
|
|
Loading…
Reference in New Issue