Prototype a high level `Storage` api

Starts a wrapper around the `marketstore` client to do basic ohlcv query
and retrieval and prototypes out write methods for ohlc and tick.
Try to connect to `marketstore` automatically (which will fail if not
started currently) but we will eventually first do a service query.

Further:

- get `pikerd` working with and without `--tsdb` flag.
- support spawning `brokerd` with no real-time quotes.
- bring back in "fqsn" support that was originally not
  in this history before commits factoring.
incr_update_backup
Tyler Goodlet 2022-03-01 12:29:49 -05:00
parent 15d3f99410
commit 5c2b9a01e9
2 changed files with 194 additions and 42 deletions

View File

@ -22,6 +22,7 @@ This module is enabled for ``brokerd`` daemons.
""" """
from __future__ import annotations from __future__ import annotations
from dataclasses import dataclass, field from dataclasses import dataclass, field
from datetime import datetime
from contextlib import asynccontextmanager from contextlib import asynccontextmanager
from functools import partial from functools import partial
from types import ModuleType from types import ModuleType
@ -42,11 +43,13 @@ from .._cacheables import maybe_open_context
from ..log import get_logger, get_console_log from ..log import get_logger, get_console_log
from .._daemon import ( from .._daemon import (
maybe_spawn_brokerd, maybe_spawn_brokerd,
check_for_service,
) )
from ._sharedmem import ( from ._sharedmem import (
maybe_open_shm_array, maybe_open_shm_array,
attach_shm_array, attach_shm_array,
ShmArray, ShmArray,
_secs_in_day,
) )
from .ingest import get_ingestormod from .ingest import get_ingestormod
from ._source import ( from ._source import (
@ -125,7 +128,7 @@ class _FeedsBus(BaseModel):
# def cancel_task( # def cancel_task(
# self, # self,
# task: trio.lowlevel.Task # task: trio.lowlevel.Task,
# ) -> bool: # ) -> bool:
# ... # ...
@ -218,7 +221,61 @@ async def manage_history(
readonly=False, readonly=False,
) )
if opened: log.info('Scanning for existing `marketstored`')
is_up = await check_for_service('marketstored')
if is_up and opened:
log.info('Found existing `marketstored`')
from . import marketstore
async with marketstore.open_storage_client(
fqsn,
) as (storage, tsdb_arrays):
# TODO: history validation
# assert opened, f'Persistent shm for {symbol} was already open?!'
# if not opened:
# raise RuntimeError(
# "Persistent shm for sym was already open?!"
# )
if tsdb_arrays:
log.info(f'Loaded tsdb history {tsdb_arrays}')
fastest = list(tsdb_arrays[fqsn].values())[0]
last_s = fastest['Epoch'][-1]
# TODO: see if there's faster multi-field reads:
# https://numpy.org/doc/stable/user/basics.rec.html#accessing-multiple-fields
# re-index with a `time` and index field
shm.push(
fastest[-3 * _secs_in_day:],
# insert the history pre a "days worth" of samples
# to leave some real-time buffer space at the end.
prepend=True,
start=shm._len - _secs_in_day,
field_map={
'Epoch': 'time',
'Open': 'open',
'High': 'high',
'Low': 'low',
'Close': 'close',
'Volume': 'volume',
},
)
# start history anal and load missing new data via backend.
async with mod.open_history_client(fqsn) as hist:
# get latest query's worth of history
array, next_dt = await hist(end_dt='')
last_dt = datetime.fromtimestamp(last_s)
array, next_dt = await hist(end_dt=last_dt)
some_data_ready.set()
elif opened:
log.info('No existing `marketstored` found..') log.info('No existing `marketstored` found..')
# start history backfill task ``backfill_bars()`` is # start history backfill task ``backfill_bars()`` is
@ -254,6 +311,7 @@ async def manage_history(
) )
await trio.sleep_forever() await trio.sleep_forever()
# cs.cancel()
async def allocate_persistent_feed( async def allocate_persistent_feed(
@ -261,6 +319,7 @@ async def allocate_persistent_feed(
brokername: str, brokername: str,
symbol: str, symbol: str,
loglevel: str, loglevel: str,
start_stream: bool = True,
task_status: TaskStatus[trio.CancelScope] = trio.TASK_STATUS_IGNORED, task_status: TaskStatus[trio.CancelScope] = trio.TASK_STATUS_IGNORED,
@ -302,10 +361,8 @@ async def allocate_persistent_feed(
loglevel=loglevel, loglevel=loglevel,
) )
) )
# the broker-specific fully qualified symbol name, # the broker-specific fully qualified symbol name
# but ensure it is lower-cased for external use. bfqsn = init_msg[symbol]['fqsn']
bfqsn = init_msg[symbol]['fqsn'].lower()
init_msg[symbol]['fqsn'] = bfqsn
# HISTORY, run 2 tasks: # HISTORY, run 2 tasks:
# - a history loader / maintainer # - a history loader / maintainer
@ -333,6 +390,7 @@ async def allocate_persistent_feed(
# true fqsn # true fqsn
fqsn = '.'.join((bfqsn, brokername)) fqsn = '.'.join((bfqsn, brokername))
# add a fqsn entry that includes the ``.<broker>`` suffix # add a fqsn entry that includes the ``.<broker>`` suffix
init_msg[fqsn] = msg init_msg[fqsn] = msg
@ -364,6 +422,9 @@ async def allocate_persistent_feed(
# task_status.started((init_msg, generic_first_quotes)) # task_status.started((init_msg, generic_first_quotes))
task_status.started() task_status.started()
if not start_stream:
await trio.sleep_forever()
# backend will indicate when real-time quotes have begun. # backend will indicate when real-time quotes have begun.
await feed_is_live.wait() await feed_is_live.wait()
@ -429,13 +490,12 @@ async def open_feed_bus(
bus=bus, bus=bus,
brokername=brokername, brokername=brokername,
# here we pass through the selected symbol in native # here we pass through the selected symbol in native
# "format" (i.e. upper vs. lowercase depending on # "format" (i.e. upper vs. lowercase depending on
# provider). # provider).
symbol=symbol, symbol=symbol,
loglevel=loglevel, loglevel=loglevel,
start_stream=start_stream,
) )
) )
# TODO: we can remove this? # TODO: we can remove this?
@ -446,7 +506,7 @@ async def open_feed_bus(
init_msg, first_quotes = bus.feeds[symbol] init_msg, first_quotes = bus.feeds[symbol]
msg = init_msg[symbol] msg = init_msg[symbol]
bfqsn = msg['fqsn'].lower() bfqsn = msg['fqsn']
# true fqsn # true fqsn
fqsn = '.'.join([bfqsn, brokername]) fqsn = '.'.join([bfqsn, brokername])
@ -765,10 +825,7 @@ async def maybe_open_feed(
**kwargs, **kwargs,
) -> ( ) -> (Feed, ReceiveChannel[dict[str, Any]]):
Feed,
ReceiveChannel[dict[str, Any]],
):
''' '''
Maybe open a data to a ``brokerd`` daemon only if there is no Maybe open a data to a ``brokerd`` daemon only if there is no
local one for the broker-symbol pair, if one is cached use it wrapped local one for the broker-symbol pair, if one is cached use it wrapped
@ -789,7 +846,6 @@ async def maybe_open_feed(
'start_stream': kwargs.get('start_stream', True), 'start_stream': kwargs.get('start_stream', True),
}, },
key=fqsn, key=fqsn,
) as (cache_hit, feed): ) as (cache_hit, feed):
if cache_hit: if cache_hit:

View File

@ -28,8 +28,9 @@ from pprint import pformat
from typing import ( from typing import (
Any, Any,
Optional, Optional,
Union,
# Callable, # Callable,
TYPE_CHECKING, # TYPE_CHECKING,
) )
import time import time
from math import isnan from math import isnan
@ -40,12 +41,19 @@ import numpy as np
import pandas as pd import pandas as pd
import tractor import tractor
from trio_websocket import open_websocket_url from trio_websocket import open_websocket_url
from anyio_marketstore import open_marketstore_client, MarketstoreClient, Params from anyio_marketstore import (
open_marketstore_client,
MarketstoreClient,
Params,
)
import purerpc
from ..log import get_logger, get_console_log
from .feed import maybe_open_feed from .feed import maybe_open_feed
from ._source import mk_fqsn, Symbol from ._source import (
mk_fqsn,
# Symbol,
)
from ..log import get_logger, get_console_log
# if TYPE_CHECKING: # if TYPE_CHECKING:
# from ._sharedmem import ShmArray # from ._sharedmem import ShmArray
@ -210,42 +218,130 @@ tf_in_1s = bidict({
}) })
async def manage_history( class Storage:
fqsn: str,
period: int = 1, # in seconds
) -> dict[str, np.ndarray]:
''' '''
Load a series by key and deliver in ``numpy`` struct array High level storage api for both real-time and historical ingest.
format.
'''
def __init__(
self,
client: MarketstoreClient,
) -> None:
# TODO: eventually this should be an api/interface type that
# ensures we can support multiple tsdb backends.
self.client = client
# series' cache from tsdb reads
self._arrays: dict[str, np.ndarray] = {}
async def write_ticks(self, ticks: list) -> None:
...
async def write_ohlcv(self, ohlcv: np.ndarray) -> None:
...
async def read_ohlcv(
self,
fqsn: str,
timeframe: Optional[Union[int, str]] = None,
) -> tuple[
MarketstoreClient,
Union[dict, np.ndarray]
]:
client = self.client
syms = await client.list_symbols()
if fqsn not in syms:
return {}
if timeframe is None:
log.info(f'starting {fqsn} tsdb granularity scan..')
# loop through and try to find highest granularity
for tfstr in tf_in_1s.values():
try:
log.info(f'querying for {tfstr}@{fqsn}')
result = await client.query(Params(fqsn, tfstr, 'OHLCV',))
break
except purerpc.grpclib.exceptions.UnknownError:
# XXX: this is already logged by the container and
# thus shows up through `marketstored` logs relay.
# log.warning(f'{tfstr}@{fqsn} not found')
continue
else:
return {}
else:
tfstr = tf_in_1s[timeframe]
result = await client.query(Params(fqsn, tfstr, 'OHLCV',))
# Fill out a `numpy` array-results map
arrays = {}
for fqsn, data_set in result.by_symbols().items():
arrays.setdefault(fqsn, {})[
tf_in_1s.inverse[data_set.timeframe]
] = data_set.array
return (
client,
arrays[fqsn][timeframe] if timeframe else arrays,
)
@acm
async def open_storage_client(
fqsn: str,
period: Optional[Union[int, str]] = None, # in seconds
) -> tuple[Storage, dict[str, np.ndarray]]:
'''
Load a series by key and deliver in ``numpy`` struct array format.
''' '''
async with get_client() as client: async with get_client() as client:
tfstr = tf_in_1s[period] storage_client = Storage(client)
result = await client.query( arrays = await storage_client.read_ohlcv(
Params(fqsn, tf_in_1s, 'OHLCV',) fqsn,
period,
) )
# Dig out `numpy` results map
arrays = {}
# for qr in [onem, fivem]:
for name, data_set in result.by_symbols().items():
arrays[(name, qr)] = data_set.array
await tractor.breakpoint() yield storage_client, arrays
# # TODO: backfiller loop
# array = arrays[(fqsn, qr)]
return arrays
async def backfill_history_diff( async def backfill_history_diff(
# symbol: Symbol # symbol: Symbol
) -> list[str]: ) -> list[str]:
# TODO:
# - compute time-smaple step # TODO: real-time dedicated task for ensuring
# - take ``Symbol`` as input # history consistency between the tsdb, shm and real-time feed..
# - backtrack into history using backend helper endpoint
# update sequence design notes:
# - load existing highest frequency data from mkts
# * how do we want to offer this to the UI?
# - lazy loading?
# - try to load it all and expect graphics caching/diffing
# to hide extra bits that aren't in view?
# - compute the diff between latest data from broker and shm
# * use sql api in mkts to determine where the backend should
# start querying for data?
# * append any diff with new shm length
# * determine missing (gapped) history by scanning
# * how far back do we look?
# - begin rt update ingest and aggregation
# * could start by always writing ticks to mkts instead of
# worrying about a shm queue for now.
# * we have a short list of shm queues worth groking:
# - https://github.com/pikers/piker/issues/107
# * the original data feed arch blurb:
# - https://github.com/pikers/piker/issues/98
#
broker = 'ib' broker = 'ib'
symbol = 'mnq.globex' symbol = 'mnq.globex'