Get basic OHLCV writes working with `anyio` client

l1_precision_fix
Tyler Goodlet 2022-02-19 16:36:02 -05:00
parent ba82a18890
commit e1bbcff8e0
1 changed files with 80 additions and 13 deletions

View File

@ -21,10 +21,15 @@
- ticK data ingest routines - ticK data ingest routines
- websocket client for subscribing to write triggers - websocket client for subscribing to write triggers
- todo: tick sequence stream-cloning for testing - todo: tick sequence stream-cloning for testing
- todo: docker container management automation
''' '''
from contextlib import asynccontextmanager from contextlib import asynccontextmanager
from typing import Any, Optional from typing import (
Any,
Optional,
# Callable,
TYPE_CHECKING,
)
import time import time
from math import isnan from math import isnan
@ -36,7 +41,12 @@ from trio_websocket import open_websocket_url
from anyio_marketstore import open_marketstore_client, MarketstoreClient from anyio_marketstore import open_marketstore_client, MarketstoreClient
from ..log import get_logger, get_console_log from ..log import get_logger, get_console_log
from ..data.feed import maybe_open_feed from .feed import maybe_open_feed
from ._source import mk_fqsn, Symbol
# if TYPE_CHECKING:
# from ._sharedmem import ShmArray
log = get_logger(__name__) log = get_logger(__name__)
@ -83,7 +93,7 @@ _tick_map = {
_ohlcv_dt = [ _ohlcv_dt = [
# these two are required for as a "primary key" # these two are required for as a "primary key"
('Epoch', 'i8'), ('Epoch', 'i8'),
('Nanoseconds', 'i4'), # ('Nanoseconds', 'i4'),
# ohlcv sampling # ohlcv sampling
('Open', 'f4'), ('Open', 'f4'),
@ -157,30 +167,87 @@ def timestamp(date, **kwargs) -> int:
@asynccontextmanager @asynccontextmanager
async def get_client( async def get_client(
host: str = 'localhost', host: str = 'localhost',
port: int = 5993 port: int = 5995
) -> MarketstoreClient: ) -> MarketstoreClient:
async with open_marketstore_client(host, port) as client: async with open_marketstore_client(
host,
port
) as client:
yield client yield client
async def backfill_history(): # class MarketStoreError(Exception):
# "Generic marketstore client error"
# def err_on_resp(response: dict) -> None:
# """Raise any errors found in responses from client request.
# """
# responses = response['responses']
# if responses is not None:
# for r in responses:
# err = r['error']
# if err:
# raise MarketStoreError(err)
async def backfill_history(
# symbol: Symbol
) -> list[str]:
# TODO:
# - compute time-smaple step
# - take ``Symbol`` as input
# - backtrack into history using backend helper endpoint
# broker = 'ib'
# symbol = 'mnq.globex'
broker = 'binance'
symbol = 'btcusdt'
fqsn = mk_fqsn(broker, symbol)
async with ( async with (
get_client() as msclient, get_client() as msclient,
maybe_open_feed( maybe_open_feed(
'ib', broker,
['mnq.globex'], [symbol],
loglevel='info', loglevel='info',
# backpressure=False, # backpressure=False,
start_stream=False, start_stream=False,
) as (feed, stream), ) as (feed, stream),
): ):
await tractor.breakpoint() ohlcv = feed.shm.array
await msclient.write( mkts_dt = np.dtype(_ohlcv_dt)
feed.shm.array,
tbk='mnq.globex.ib/1Sec/OHLCV', syms = await msclient.list_symbols()
log.info(f'Existing symbol set:\n{pformat(syms)}')
# build mkts schema compat array
mkts_array = np.zeros(
len(ohlcv),
dtype=mkts_dt,
) )
# copy from shm array
mkts_array[:] = ohlcv[[
'time',
'open',
'high',
'low',
'close',
'volume',
]]
# write to db
resp = await msclient.write(
mkts_array,
tbk=f'{fqsn}/1Min/OHLCV',
isvariablelength=True,
)
# TODO: backfiller loop
async def ingest_quote_stream( async def ingest_quote_stream(