Get basic OHLCV writes working with `anyio` client

incr_update_backup
Tyler Goodlet 2022-02-19 16:36:02 -05:00
parent aea42ccbd9
commit 807685d27e
1 changed files with 80 additions and 13 deletions

View File

@ -21,10 +21,15 @@
- ticK data ingest routines
- websocket client for subscribing to write triggers
- todo: tick sequence stream-cloning for testing
- todo: docker container management automation
'''
from contextlib import asynccontextmanager
from typing import Any, Optional
from typing import (
Any,
Optional,
# Callable,
TYPE_CHECKING,
)
import time
from math import isnan
@ -36,7 +41,12 @@ from trio_websocket import open_websocket_url
from anyio_marketstore import open_marketstore_client, MarketstoreClient
from ..log import get_logger, get_console_log
from ..data.feed import maybe_open_feed
from .feed import maybe_open_feed
from ._source import mk_fqsn, Symbol
# if TYPE_CHECKING:
# from ._sharedmem import ShmArray
log = get_logger(__name__)
@ -83,7 +93,7 @@ _tick_map = {
_ohlcv_dt = [
# these two are required for as a "primary key"
('Epoch', 'i8'),
('Nanoseconds', 'i4'),
# ('Nanoseconds', 'i4'),
# ohlcv sampling
('Open', 'f4'),
@ -157,30 +167,87 @@ def timestamp(date, **kwargs) -> int:
@asynccontextmanager
async def get_client(
host: str = 'localhost',
port: int = 5993
port: int = 5995
) -> MarketstoreClient:
async with open_marketstore_client(host, port) as client:
async with open_marketstore_client(
host,
port
) as client:
yield client
async def backfill_history():
# class MarketStoreError(Exception):
# "Generic marketstore client error"
# def err_on_resp(response: dict) -> None:
# """Raise any errors found in responses from client request.
# """
# responses = response['responses']
# if responses is not None:
# for r in responses:
# err = r['error']
# if err:
# raise MarketStoreError(err)
async def backfill_history(
# symbol: Symbol
) -> list[str]:
# TODO:
# - compute time-smaple step
# - take ``Symbol`` as input
# - backtrack into history using backend helper endpoint
# broker = 'ib'
# symbol = 'mnq.globex'
broker = 'binance'
symbol = 'btcusdt'
fqsn = mk_fqsn(broker, symbol)
async with (
get_client() as msclient,
maybe_open_feed(
'ib',
['mnq.globex'],
broker,
[symbol],
loglevel='info',
# backpressure=False,
start_stream=False,
) as (feed, stream),
):
await tractor.breakpoint()
await msclient.write(
feed.shm.array,
tbk='mnq.globex.ib/1Sec/OHLCV',
ohlcv = feed.shm.array
mkts_dt = np.dtype(_ohlcv_dt)
syms = await msclient.list_symbols()
log.info(f'Existing symbol set:\n{pformat(syms)}')
# build mkts schema compat array
mkts_array = np.zeros(
len(ohlcv),
dtype=mkts_dt,
)
# copy from shm array
mkts_array[:] = ohlcv[[
'time',
'open',
'high',
'low',
'close',
'volume',
]]
# write to db
resp = await msclient.write(
mkts_array,
tbk=f'{fqsn}/1Min/OHLCV',
isvariablelength=True,
)
# TODO: backfiller loop
async def ingest_quote_stream(