Prototype a high level `Storage` api
Starts a wrapper around the `marketstore` client to do basic ohlcv query and retrieval and prototypes out write methods for ohlc and tick. Try to connect to `marketstore` automatically (which will fail if not started currently) but we will eventually first do a service query. Further: - get `pikerd` working with and without `--tsdb` flag. - support spawning `brokerd` with no real-time quotes. - bring back in "fqsn" support that was originally not in this history before commits factoring.marketstore_backup
parent
387cb51ca2
commit
85658a165c
|
@ -22,6 +22,7 @@ This module is enabled for ``brokerd`` daemons.
|
|||
"""
|
||||
from __future__ import annotations
|
||||
from dataclasses import dataclass, field
|
||||
from datetime import datetime
|
||||
from contextlib import asynccontextmanager
|
||||
from functools import partial
|
||||
from types import ModuleType
|
||||
|
@ -42,11 +43,13 @@ from .._cacheables import maybe_open_context
|
|||
from ..log import get_logger, get_console_log
|
||||
from .._daemon import (
|
||||
maybe_spawn_brokerd,
|
||||
check_for_service,
|
||||
)
|
||||
from ._sharedmem import (
|
||||
maybe_open_shm_array,
|
||||
attach_shm_array,
|
||||
ShmArray,
|
||||
_secs_in_day,
|
||||
)
|
||||
from .ingest import get_ingestormod
|
||||
from ._source import (
|
||||
|
@ -125,7 +128,7 @@ class _FeedsBus(BaseModel):
|
|||
|
||||
# def cancel_task(
|
||||
# self,
|
||||
# task: trio.lowlevel.Task
|
||||
# task: trio.lowlevel.Task,
|
||||
# ) -> bool:
|
||||
# ...
|
||||
|
||||
|
@ -218,7 +221,61 @@ async def manage_history(
|
|||
readonly=False,
|
||||
)
|
||||
|
||||
if opened:
|
||||
log.info('Scanning for existing `marketstored`')
|
||||
is_up = await check_for_service('marketstored')
|
||||
if is_up and opened:
|
||||
log.info('Found existing `marketstored`')
|
||||
from . import marketstore
|
||||
|
||||
async with marketstore.open_storage_client(
|
||||
fqsn,
|
||||
) as (storage, tsdb_arrays):
|
||||
|
||||
# TODO: history validation
|
||||
# assert opened, f'Persistent shm for {symbol} was already open?!'
|
||||
# if not opened:
|
||||
# raise RuntimeError(
|
||||
# "Persistent shm for sym was already open?!"
|
||||
# )
|
||||
|
||||
if tsdb_arrays:
|
||||
log.info(f'Loaded tsdb history {tsdb_arrays}')
|
||||
fastest = list(tsdb_arrays[fqsn].values())[0]
|
||||
last_s = fastest['Epoch'][-1]
|
||||
|
||||
# TODO: see if there's faster multi-field reads:
|
||||
# https://numpy.org/doc/stable/user/basics.rec.html#accessing-multiple-fields
|
||||
|
||||
# re-index with a `time` and index field
|
||||
shm.push(
|
||||
fastest[-3 * _secs_in_day:],
|
||||
|
||||
# insert the history pre a "days worth" of samples
|
||||
# to leave some real-time buffer space at the end.
|
||||
prepend=True,
|
||||
start=shm._len - _secs_in_day,
|
||||
field_map={
|
||||
'Epoch': 'time',
|
||||
'Open': 'open',
|
||||
'High': 'high',
|
||||
'Low': 'low',
|
||||
'Close': 'close',
|
||||
'Volume': 'volume',
|
||||
},
|
||||
)
|
||||
|
||||
# start history anal and load missing new data via backend.
|
||||
async with mod.open_history_client(fqsn) as hist:
|
||||
|
||||
# get latest query's worth of history
|
||||
array, next_dt = await hist(end_dt='')
|
||||
|
||||
last_dt = datetime.fromtimestamp(last_s)
|
||||
array, next_dt = await hist(end_dt=last_dt)
|
||||
|
||||
some_data_ready.set()
|
||||
|
||||
elif opened:
|
||||
log.info('No existing `marketstored` found..')
|
||||
|
||||
# start history backfill task ``backfill_bars()`` is
|
||||
|
@ -254,6 +311,7 @@ async def manage_history(
|
|||
)
|
||||
|
||||
await trio.sleep_forever()
|
||||
# cs.cancel()
|
||||
|
||||
|
||||
async def allocate_persistent_feed(
|
||||
|
@ -261,6 +319,7 @@ async def allocate_persistent_feed(
|
|||
brokername: str,
|
||||
symbol: str,
|
||||
loglevel: str,
|
||||
start_stream: bool = True,
|
||||
|
||||
task_status: TaskStatus[trio.CancelScope] = trio.TASK_STATUS_IGNORED,
|
||||
|
||||
|
@ -302,10 +361,8 @@ async def allocate_persistent_feed(
|
|||
loglevel=loglevel,
|
||||
)
|
||||
)
|
||||
# the broker-specific fully qualified symbol name,
|
||||
# but ensure it is lower-cased for external use.
|
||||
bfqsn = init_msg[symbol]['fqsn'].lower()
|
||||
init_msg[symbol]['fqsn'] = bfqsn
|
||||
# the broker-specific fully qualified symbol name
|
||||
bfqsn = init_msg[symbol]['fqsn']
|
||||
|
||||
# HISTORY, run 2 tasks:
|
||||
# - a history loader / maintainer
|
||||
|
@ -333,6 +390,7 @@ async def allocate_persistent_feed(
|
|||
|
||||
# true fqsn
|
||||
fqsn = '.'.join((bfqsn, brokername))
|
||||
|
||||
# add a fqsn entry that includes the ``.<broker>`` suffix
|
||||
init_msg[fqsn] = msg
|
||||
|
||||
|
@ -364,6 +422,9 @@ async def allocate_persistent_feed(
|
|||
# task_status.started((init_msg, generic_first_quotes))
|
||||
task_status.started()
|
||||
|
||||
if not start_stream:
|
||||
await trio.sleep_forever()
|
||||
|
||||
# backend will indicate when real-time quotes have begun.
|
||||
await feed_is_live.wait()
|
||||
|
||||
|
@ -429,13 +490,12 @@ async def open_feed_bus(
|
|||
|
||||
bus=bus,
|
||||
brokername=brokername,
|
||||
|
||||
# here we pass through the selected symbol in native
|
||||
# "format" (i.e. upper vs. lowercase depending on
|
||||
# provider).
|
||||
symbol=symbol,
|
||||
|
||||
loglevel=loglevel,
|
||||
start_stream=start_stream,
|
||||
)
|
||||
)
|
||||
# TODO: we can remove this?
|
||||
|
@ -446,7 +506,7 @@ async def open_feed_bus(
|
|||
init_msg, first_quotes = bus.feeds[symbol]
|
||||
|
||||
msg = init_msg[symbol]
|
||||
bfqsn = msg['fqsn'].lower()
|
||||
bfqsn = msg['fqsn']
|
||||
|
||||
# true fqsn
|
||||
fqsn = '.'.join([bfqsn, brokername])
|
||||
|
@ -765,10 +825,7 @@ async def maybe_open_feed(
|
|||
|
||||
**kwargs,
|
||||
|
||||
) -> (
|
||||
Feed,
|
||||
ReceiveChannel[dict[str, Any]],
|
||||
):
|
||||
) -> (Feed, ReceiveChannel[dict[str, Any]]):
|
||||
'''
|
||||
Maybe open a data to a ``brokerd`` daemon only if there is no
|
||||
local one for the broker-symbol pair, if one is cached use it wrapped
|
||||
|
@ -789,7 +846,6 @@ async def maybe_open_feed(
|
|||
'start_stream': kwargs.get('start_stream', True),
|
||||
},
|
||||
key=fqsn,
|
||||
|
||||
) as (cache_hit, feed):
|
||||
|
||||
if cache_hit:
|
||||
|
|
|
@ -28,8 +28,9 @@ from pprint import pformat
|
|||
from typing import (
|
||||
Any,
|
||||
Optional,
|
||||
Union,
|
||||
# Callable,
|
||||
TYPE_CHECKING,
|
||||
# TYPE_CHECKING,
|
||||
)
|
||||
import time
|
||||
from math import isnan
|
||||
|
@ -40,12 +41,19 @@ import numpy as np
|
|||
import pandas as pd
|
||||
import tractor
|
||||
from trio_websocket import open_websocket_url
|
||||
from anyio_marketstore import open_marketstore_client, MarketstoreClient, Params
|
||||
from anyio_marketstore import (
|
||||
open_marketstore_client,
|
||||
MarketstoreClient,
|
||||
Params,
|
||||
)
|
||||
import purerpc
|
||||
|
||||
from ..log import get_logger, get_console_log
|
||||
from .feed import maybe_open_feed
|
||||
from ._source import mk_fqsn, Symbol
|
||||
|
||||
from ._source import (
|
||||
mk_fqsn,
|
||||
# Symbol,
|
||||
)
|
||||
from ..log import get_logger, get_console_log
|
||||
|
||||
# if TYPE_CHECKING:
|
||||
# from ._sharedmem import ShmArray
|
||||
|
@ -210,42 +218,130 @@ tf_in_1s = bidict({
|
|||
})
|
||||
|
||||
|
||||
async def manage_history(
|
||||
fqsn: str,
|
||||
period: int = 1, # in seconds
|
||||
|
||||
) -> dict[str, np.ndarray]:
|
||||
class Storage:
|
||||
'''
|
||||
Load a series by key and deliver in ``numpy`` struct array
|
||||
format.
|
||||
High level storage api for both real-time and historical ingest.
|
||||
|
||||
|
||||
'''
|
||||
def __init__(
|
||||
self,
|
||||
client: MarketstoreClient,
|
||||
|
||||
) -> None:
|
||||
# TODO: eventually this should be an api/interface type that
|
||||
# ensures we can support multiple tsdb backends.
|
||||
self.client = client
|
||||
|
||||
# series' cache from tsdb reads
|
||||
self._arrays: dict[str, np.ndarray] = {}
|
||||
|
||||
async def write_ticks(self, ticks: list) -> None:
|
||||
...
|
||||
|
||||
async def write_ohlcv(self, ohlcv: np.ndarray) -> None:
|
||||
...
|
||||
|
||||
async def read_ohlcv(
|
||||
self,
|
||||
fqsn: str,
|
||||
timeframe: Optional[Union[int, str]] = None,
|
||||
|
||||
) -> tuple[
|
||||
MarketstoreClient,
|
||||
Union[dict, np.ndarray]
|
||||
]:
|
||||
client = self.client
|
||||
syms = await client.list_symbols()
|
||||
|
||||
if fqsn not in syms:
|
||||
return {}
|
||||
|
||||
if timeframe is None:
|
||||
log.info(f'starting {fqsn} tsdb granularity scan..')
|
||||
# loop through and try to find highest granularity
|
||||
for tfstr in tf_in_1s.values():
|
||||
try:
|
||||
log.info(f'querying for {tfstr}@{fqsn}')
|
||||
result = await client.query(Params(fqsn, tfstr, 'OHLCV',))
|
||||
break
|
||||
except purerpc.grpclib.exceptions.UnknownError:
|
||||
# XXX: this is already logged by the container and
|
||||
# thus shows up through `marketstored` logs relay.
|
||||
# log.warning(f'{tfstr}@{fqsn} not found')
|
||||
continue
|
||||
else:
|
||||
return {}
|
||||
|
||||
else:
|
||||
tfstr = tf_in_1s[timeframe]
|
||||
result = await client.query(Params(fqsn, tfstr, 'OHLCV',))
|
||||
|
||||
# Fill out a `numpy` array-results map
|
||||
arrays = {}
|
||||
for fqsn, data_set in result.by_symbols().items():
|
||||
arrays.setdefault(fqsn, {})[
|
||||
tf_in_1s.inverse[data_set.timeframe]
|
||||
] = data_set.array
|
||||
|
||||
return (
|
||||
client,
|
||||
arrays[fqsn][timeframe] if timeframe else arrays,
|
||||
)
|
||||
|
||||
|
||||
@acm
|
||||
async def open_storage_client(
|
||||
fqsn: str,
|
||||
period: Optional[Union[int, str]] = None, # in seconds
|
||||
|
||||
) -> tuple[Storage, dict[str, np.ndarray]]:
|
||||
'''
|
||||
Load a series by key and deliver in ``numpy`` struct array format.
|
||||
|
||||
'''
|
||||
async with get_client() as client:
|
||||
|
||||
tfstr = tf_in_1s[period]
|
||||
result = await client.query(
|
||||
Params(fqsn, tf_in_1s, 'OHLCV',)
|
||||
storage_client = Storage(client)
|
||||
arrays = await storage_client.read_ohlcv(
|
||||
fqsn,
|
||||
period,
|
||||
)
|
||||
# Dig out `numpy` results map
|
||||
arrays = {}
|
||||
# for qr in [onem, fivem]:
|
||||
for name, data_set in result.by_symbols().items():
|
||||
arrays[(name, qr)] = data_set.array
|
||||
|
||||
await tractor.breakpoint()
|
||||
# # TODO: backfiller loop
|
||||
# array = arrays[(fqsn, qr)]
|
||||
return arrays
|
||||
yield storage_client, arrays
|
||||
|
||||
|
||||
async def backfill_history_diff(
|
||||
# symbol: Symbol
|
||||
|
||||
) -> list[str]:
|
||||
# TODO:
|
||||
# - compute time-smaple step
|
||||
# - take ``Symbol`` as input
|
||||
# - backtrack into history using backend helper endpoint
|
||||
|
||||
# TODO: real-time dedicated task for ensuring
|
||||
# history consistency between the tsdb, shm and real-time feed..
|
||||
|
||||
# update sequence design notes:
|
||||
|
||||
# - load existing highest frequency data from mkts
|
||||
# * how do we want to offer this to the UI?
|
||||
# - lazy loading?
|
||||
# - try to load it all and expect graphics caching/diffing
|
||||
# to hide extra bits that aren't in view?
|
||||
|
||||
# - compute the diff between latest data from broker and shm
|
||||
# * use sql api in mkts to determine where the backend should
|
||||
# start querying for data?
|
||||
# * append any diff with new shm length
|
||||
# * determine missing (gapped) history by scanning
|
||||
# * how far back do we look?
|
||||
|
||||
# - begin rt update ingest and aggregation
|
||||
# * could start by always writing ticks to mkts instead of
|
||||
# worrying about a shm queue for now.
|
||||
# * we have a short list of shm queues worth groking:
|
||||
# - https://github.com/pikers/piker/issues/107
|
||||
# * the original data feed arch blurb:
|
||||
# - https://github.com/pikers/piker/issues/98
|
||||
#
|
||||
|
||||
broker = 'ib'
|
||||
symbol = 'mnq.globex'
|
||||
|
|
Loading…
Reference in New Issue