Add diffing logic to `tsdb_history_update()`

Add some basic `numpy` epoch slice logic to generate append and prepend
arrays to write to the db.

Mooar cool things,
- add a `Storage.delete_ts()` method to wipe a column series from the db
  easily.
- don't attempt to read in any OHLC series by default on client load
- add some `pyqtgraph` profiling and drop manual latency measures
- if no db series for the fqsn exists write the entire shm array
marketstore
Tyler Goodlet 2022-03-29 13:26:18 -04:00
parent 94cba54beb
commit 17f469d619
1 changed files with 106 additions and 103 deletions

View File

@ -29,14 +29,13 @@ from typing import (
Any,
Optional,
Union,
# Callable,
# TYPE_CHECKING,
)
import time
from math import isnan
from bidict import bidict
import msgpack
import pyqtgraph as pg
import numpy as np
import pandas as pd
import tractor
@ -49,15 +48,8 @@ from anyio_marketstore import (
import purerpc
from .feed import maybe_open_feed
from ._source import (
mk_fqsn,
# Symbol,
)
from ..log import get_logger, get_console_log
# if TYPE_CHECKING:
# from ._sharedmem import ShmArray
log = get_logger(__name__)
@ -235,6 +227,16 @@ class Storage:
# series' cache from tsdb reads
self._arrays: dict[str, np.ndarray] = {}
async def list_keys(self) -> list[str]:
return await self.client.list_symbols()
async def search_keys(self, pattern: str) -> list[str]:
'''
Search for time series key in the storage backend.
'''
...
async def write_ticks(self, ticks: list) -> None:
...
@ -262,7 +264,9 @@ class Storage:
for tfstr in tf_in_1s.values():
try:
log.info(f'querying for {tfstr}@{fqsn}')
result = await client.query(Params(fqsn, tfstr, 'OHLCV',))
result = await client.query(
Params(fqsn, tfstr, 'OHLCV',)
)
break
except purerpc.grpclib.exceptions.UnknownError:
# XXX: this is already logged by the container and
@ -276,6 +280,9 @@ class Storage:
tfstr = tf_in_1s[timeframe]
result = await client.query(Params(fqsn, tfstr, 'OHLCV',))
# TODO: it turns out column access on recarrays is actually slower:
# https://jakevdp.github.io/PythonDataScienceHandbook/02.09-structured-data-numpy.html#RecordArrays:-Structured-Arrays-with-a-Twist
# it might make sense to make these structured arrays?
# Fill out a `numpy` array-results map
arrays = {}
for fqsn, data_set in result.by_symbols().items():
@ -283,7 +290,22 @@ class Storage:
tf_in_1s.inverse[data_set.timeframe]
] = data_set.array
return arrays[fqsn][timeframe] if timeframe else arrays
return arrays[fqsn][timeframe] if timeframe else arrays[fqsn]
async def delete_ts(
self,
key: str,
timeframe: Optional[Union[int, str]] = None,
) -> bool:
client = self.client
syms = await client.list_symbols()
print(syms)
# if key not in syms:
# raise KeyError(f'`{fqsn}` table key not found?')
return await client.destroy(tbk=key)
@acm
@ -296,19 +318,16 @@ async def open_storage_client(
Load a series by key and deliver in ``numpy`` struct array format.
'''
async with get_client() as client:
storage_client = Storage(client)
arrays = await storage_client.read_ohlcv(
fqsn,
period,
)
yield storage_client, arrays
async with (
# eventually a storage backend endpoint
get_client() as client,
):
# slap on our wrapper api
yield Storage(client)
async def backfill_history_diff(
# symbol: Symbol
async def tsdb_history_update(
fqsn: str,
) -> list[str]:
@ -338,108 +357,92 @@ async def backfill_history_diff(
# * the original data feed arch blurb:
# - https://github.com/pikers/piker/issues/98
#
broker = 'ib'
symbol = 'mnq.globex'
# broker = 'binance'
# symbol = 'btcusdt'
fqsn = mk_fqsn(broker, symbol)
profiler = pg.debug.Profiler(
disabled=False, # not pg_profile_enabled(),
delayed=False,
)
async with (
get_client() as client,
open_storage_client(fqsn) as storage,
maybe_open_feed(
broker,
[symbol],
loglevel='info',
# backpressure=False,
[fqsn],
start_stream=False,
) as (feed, stream),
):
syms = await client.list_symbols()
log.info(f'Existing symbol set:\n{pformat(syms)}')
profiler(f'opened feed for {fqsn}')
symbol = feed.symbols.get(fqsn)
if symbol:
fqsn = symbol.front_fqsn()
syms = await storage.client.list_symbols()
log.info(f'Existing tsdb symbol set:\n{pformat(syms)}')
profiler(f'listed symbols {syms}')
# diff db history with shm and only write the missing portions
ohlcv = feed.shm.array
key = (fqsn, '1Sec', 'OHLCV')
tbk = mk_tbk(key)
# TODO: use pg profiler
tsdb_arrays = await storage.read_ohlcv(fqsn)
# diff vs. existing array and append new history
# TODO:
to_append = feed.shm.array
to_prepend = None
# TODO: should be no error?
# assert not resp.responses
# hist diffing
if tsdb_arrays:
onesec = tsdb_arrays[1]
to_append = ohlcv[ohlcv['time'] > onesec['Epoch'][-1]]
to_prepend = ohlcv[ohlcv['time'] < onesec['Epoch'][0]]
start = time.time()
profiler('Finished db arrays diffs')
qr = await client.query(
# Params(fqsn, '1Sec`', 'OHLCV',)
Params(*key),
)
# # Dig out `numpy` results map
arrays: dict[tuple[str, int], np.ndarray] = {}
for name, data_set in qr.by_symbols().items():
in_secs = tf_in_1s.inverse[data_set.timeframe]
arrays[(name, in_secs)] = data_set.array
for array in [to_append, to_prepend]:
if array is None:
continue
s1 = arrays[(fqsn, 1)]
to_append = ohlcv[ohlcv['time'] > s1['Epoch'][-1]]
log.info(
f'Writing datums {array.size} -> to tsdb from shm\n'
)
end_diff = time.time()
diff_ms = round((end_diff - start) * 1e3, ndigits=2)
# build mkts schema compat array for writing
mkts_dt = np.dtype(_ohlcv_dt)
mkts_array = np.zeros(
len(array),
dtype=mkts_dt,
)
# copy from shm array (yes it's this easy):
# https://numpy.org/doc/stable/user/basics.rec.html#assignment-from-other-structured-arrays
mkts_array[:] = array[[
'time',
'open',
'high',
'low',
'close',
'volume',
]]
log.info(
f'Appending {to_append.size} datums to tsdb from shm\n'
f'Total diff time: {diff_ms} ms'
)
# write to db
resp = await storage.client.write(
mkts_array,
tbk=f'{fqsn}/1Sec/OHLCV',
# build mkts schema compat array for writing
mkts_dt = np.dtype(_ohlcv_dt)
mkts_array = np.zeros(
len(to_append),
dtype=mkts_dt,
)
# copy from shm array (yes it's this easy):
# https://numpy.org/doc/stable/user/basics.rec.html#assignment-from-other-structured-arrays
mkts_array[:] = to_append[[
'time',
'open',
'high',
'low',
'close',
'volume',
]]
# NOTE: will will append duplicates
# for the same timestamp-index.
# TODO: pre deduplicate?
isvariablelength=True,
)
# write to db
resp = await client.write(
mkts_array,
tbk=tbk,
# NOTE: will will append duplicates
# for the same timestamp-index.
isvariablelength=True,
)
end_write = time.time()
diff_ms = round((end_write - end_diff) * 1e3, ndigits=2)
log.info(
f'Wrote {to_append.size} datums to tsdb\n'
f'Total write time: {diff_ms} ms'
)
for resp in resp.responses:
err = resp.error
if err:
raise MarketStoreError(err)
log.info(
f'Wrote {to_append.size} datums to tsdb\n'
)
profiler('Finished db writes')
# TODO: backfiller loop
from piker.ui._compression import downsample
x, y = downsample(
s1['Epoch'],
s1['Close'],
bins=10,
)
await tractor.breakpoint()
for resp in resp.responses:
err = resp.error
if err:
raise MarketStoreError(err)
async def ingest_quote_stream(