Prototype a high level `Storage` api

Starts a wrapper around the `marketstore` client to do basic ohlcv query
and retrieval and prototypes out write methods for ohlc and tick.
Try to connect to `marketstore` automatically (which will fail if not
started currently) but we will eventually first do a service query.
mkts_backup
Tyler Goodlet 2022-03-01 12:29:49 -05:00
parent eb5a4f7eeb
commit 8af76322c9
2 changed files with 188 additions and 72 deletions

View File

@ -51,7 +51,6 @@ from ._sharedmem import (
from .ingest import get_ingestormod
from ._source import (
base_iohlc_dtype,
mk_symbol,
Symbol,
mk_fqsn,
)
@ -126,7 +125,7 @@ class _FeedsBus(BaseModel):
# def cancel_task(
# self,
# task: trio.lowlevel.Task
# task: trio.lowlevel.Task,
# ) -> bool:
# ...
@ -209,48 +208,68 @@ async def manage_history(
buffer.
'''
task_status.started()
# TODO: history retreival, see if we can pull from an existing
# ``marketstored`` daemon
opened = we_opened_shm
# TODO: history validation
# assert opened, f'Persistent shm for {symbol} was already open?!'
# if not opened:
# raise RuntimeError("Persistent shm for sym was already open?!")
fqsn = mk_fqsn(mod.name, symbol)
if opened:
# ask broker backend for new history
from . import marketstore
log.info('Scanning for existing `marketstored`')
# start history backfill task ``backfill_bars()`` is
# a required backend func this must block until shm is
# filled with first set of ohlc bars
cs = await bus.nursery.start(mod.backfill_bars, symbol, shm)
async with marketstore.open_storage_client(
fqsn,
) as (storage, arrays):
# indicate to caller that feed can be delivered to
# remote requesting client since we've loaded history
# data that can be used.
some_data_ready.set()
# yield back after client connect
task_status.started()
# detect sample step size for sampled historical data
times = shm.array['time']
delay_s = times[-1] - times[times != times[-1]][-1]
# TODO: history validation
# assert opened, f'Persistent shm for {symbol} was already open?!'
# if not opened:
# raise RuntimeError("Persistent shm for sym was already open?!")
# begin real-time updates of shm and tsb once the feed
# goes live.
await feed_is_live.wait()
if opened:
if arrays:
if opened:
sampler.ohlcv_shms.setdefault(delay_s, []).append(shm)
log.info(f'Loaded tsdb history {arrays}')
# push to shm
# set data ready
# some_data_ready.set()
# start shm incrementing for OHLC sampling at the current
# detected sampling period if one dne.
if sampler.incrementers.get(delay_s) is None:
cs = await bus.start_task(
increment_ohlc_buffer,
delay_s,
)
# ask broker backend for new history
await trio.sleep_forever()
cs.cancel()
# start history backfill task ``backfill_bars()`` is
# a required backend func this must block until shm is
# filled with first set of ohlc bars
_ = await bus.nursery.start(mod.backfill_bars, symbol, shm)
# indicate to caller that feed can be delivered to
# remote requesting client since we've loaded history
# data that can be used.
some_data_ready.set()
# detect sample step size for sampled historical data
times = shm.array['time']
delay_s = times[-1] - times[times != times[-1]][-1]
# begin real-time updates of shm and tsb once the feed
# goes live.
await feed_is_live.wait()
if opened:
sampler.ohlcv_shms.setdefault(delay_s, []).append(shm)
# start shm incrementing for OHLC sampling at the current
# detected sampling period if one dne.
if sampler.incrementers.get(delay_s) is None:
await bus.start_task(
increment_ohlc_buffer,
delay_s,
)
await trio.sleep_forever()
# cs.cancel()
async def allocate_persistent_feed(
@ -310,9 +329,9 @@ async def allocate_persistent_feed(
# XXX: neither of these will raise but will cause an inf hang due to:
# https://github.com/python-trio/trio/issues/2258
# bus.nursery.start_soon(
# await bus.start_task(
await bus.nursery.start(
# await bus.nursery.start(
await bus.start_task(
manage_history,
mod,
shm,
@ -339,6 +358,10 @@ async def allocate_persistent_feed(
# can read directly from the memory which will be written by
# this task.
init_msg[symbol]['shm_token'] = shm.token
# symbol = Symbol.from_broker_info(
# fqsn,
# init_msg[symbol]['symbol_info']
# )
# TODO: pretty sure we don't need this? why not just leave 1s as
# the fastest "sample period" since we'll probably always want that
@ -697,15 +720,12 @@ async def open_feed(
for sym, data in init_msg.items():
si = data['symbol_info']
symbol = mk_symbol(
key=sym,
type_key=si.get('asset_type', 'forex'),
tick_size=si.get('price_tick_size', 0.01),
lot_tick_size=si.get('lot_tick_size', 0.0),
symbol = Symbol.from_broker_info(
brokername,
sym,
si,
)
symbol.broker_info[brokername] = si
# symbol.broker_info[brokername] = si
feed.symbols[sym] = symbol
# cast shm dtype to list... can't member why we need this

View File

@ -28,8 +28,9 @@ from pprint import pformat
from typing import (
Any,
Optional,
Union,
# Callable,
TYPE_CHECKING,
# TYPE_CHECKING,
)
import time
from math import isnan
@ -40,12 +41,19 @@ import numpy as np
import pandas as pd
import tractor
from trio_websocket import open_websocket_url
from anyio_marketstore import open_marketstore_client, MarketstoreClient, Params
from anyio_marketstore import (
open_marketstore_client,
MarketstoreClient,
Params,
)
import purerpc
from ..log import get_logger, get_console_log
from .feed import maybe_open_feed
from ._source import mk_fqsn, Symbol
from ._source import (
mk_fqsn,
# Symbol,
)
from ..log import get_logger, get_console_log
# if TYPE_CHECKING:
# from ._sharedmem import ShmArray
@ -210,42 +218,130 @@ tf_in_1s = bidict({
})
async def manage_history(
fqsn: str,
period: int = 1, # in seconds
) -> dict[str, np.ndarray]:
class Storage:
'''
Load a series by key and deliver in ``numpy`` struct array
format.
High level storage api for both real-time and historical ingest.
'''
def __init__(
self,
client: MarketstoreClient,
) -> None:
# TODO: eventually this should be an api/interface type that
# ensures we can support multiple tsdb backends.
self.client = client
# series' cache from tsdb reads
self._arrays: dict[str, np.ndarray] = {}
async def write_ticks(self, ticks: list) -> None:
...
async def write_ohlcv(self, ohlcv: np.ndarray) -> None:
...
async def read_ohlcv(
self,
fqsn: str,
timeframe: Optional[Union[int, str]] = None,
) -> tuple[
MarketstoreClient,
Union[dict, np.ndarray]
]:
client = self.client
syms = await client.list_symbols()
if fqsn not in syms:
return {}
if timeframe is None:
log.info(f'starting {fqsn} tsdb granularity scan..')
# loop through and try to find highest granularity
for tfstr in tf_in_1s.values():
try:
log.info(f'querying for {tfstr}@{fqsn}')
result = await client.query(Params(fqsn, tfstr, 'OHLCV',))
break
except purerpc.grpclib.exceptions.UnknownError:
# XXX: this is already logged by the container and
# thus shows up through `marketstored` logs relay.
# log.warning(f'{tfstr}@{fqsn} not found')
continue
else:
return {}
else:
tfstr = tf_in_1s[timeframe]
result = await client.query(Params(fqsn, tfstr, 'OHLCV',))
# Fill out a `numpy` array-results map
arrays = {}
for fqsn, data_set in result.by_symbols().items():
arrays.setdefault(fqsn, {})[
tf_in_1s.inverse[data_set.timeframe]
] = data_set.array
return (
client,
arrays[fqsn][timeframe] if timeframe else arrays,
)
@acm
async def open_storage_client(
fqsn: str,
period: Optional[Union[int, str]] = None, # in seconds
) -> tuple[Storage, dict[str, np.ndarray]]:
'''
Load a series by key and deliver in ``numpy`` struct array format.
'''
async with get_client() as client:
tfstr = tf_in_1s[period]
result = await client.query(
Params(fqsn, tf_in_1s, 'OHLCV',)
storage_client = Storage(client)
arrays = await storage_client.read_ohlcv(
fqsn,
period,
)
# Dig out `numpy` results map
arrays = {}
# for qr in [onem, fivem]:
for name, data_set in result.by_symbols().items():
arrays[(name, qr)] = data_set.array
await tractor.breakpoint()
# # TODO: backfiller loop
# array = arrays[(fqsn, qr)]
return arrays
yield storage_client, arrays
async def backfill_history_diff(
# symbol: Symbol
) -> list[str]:
# TODO:
# - compute time-smaple step
# - take ``Symbol`` as input
# - backtrack into history using backend helper endpoint
# TODO: real-time dedicated task for ensuring
# history consistency between the tsdb, shm and real-time feed..
# update sequence design notes:
# - load existing highest frequency data from mkts
# * how do we want to offer this to the UI?
# - lazy loading?
# - try to load it all and expect graphics caching/diffing
# to hide extra bits that aren't in view?
# - compute the diff between latest data from broker and shm
# * use sql api in mkts to determine where the backend should
# start querying for data?
# * append any diff with new shm length
# * determine missing (gapped) history by scanning
# * how far back do we look?
# - begin rt update ingest and aggregation
# * could start by always writing ticks to mkts instead of
# worrying about a shm queue for now.
# * we have a short list of shm queues worth groking:
# - https://github.com/pikers/piker/issues/107
# * the original data feed arch blurb:
# - https://github.com/pikers/piker/issues/98
#
broker = 'ib'
symbol = 'mnq.globex'