Add diffing logic to `tsdb_history_update()`

Add some basic `numpy` epoch slice logic to generate append and prepend
arrays to write to the db.

Mooar cool things,
- add a `Storage.delete_ts()` method to wipe a column series from the db
  easily.
- don't attempt to read in any OHLC series by default on client load
- add some `pyqtgraph` profiling and drop manual latency measures
- if no db series for the fqsn exists write the entire shm array
mkts_backup
Tyler Goodlet 2022-03-29 13:26:18 -04:00
parent 22c81eb5bf
commit d3adb6dff7
1 changed files with 106 additions and 103 deletions

View File

@ -29,14 +29,13 @@ from typing import (
Any, Any,
Optional, Optional,
Union, Union,
# Callable,
# TYPE_CHECKING,
) )
import time import time
from math import isnan from math import isnan
from bidict import bidict from bidict import bidict
import msgpack import msgpack
import pyqtgraph as pg
import numpy as np import numpy as np
import pandas as pd import pandas as pd
import tractor import tractor
@ -49,15 +48,8 @@ from anyio_marketstore import (
import purerpc import purerpc
from .feed import maybe_open_feed from .feed import maybe_open_feed
from ._source import (
mk_fqsn,
# Symbol,
)
from ..log import get_logger, get_console_log from ..log import get_logger, get_console_log
# if TYPE_CHECKING:
# from ._sharedmem import ShmArray
log = get_logger(__name__) log = get_logger(__name__)
@ -235,6 +227,16 @@ class Storage:
# series' cache from tsdb reads # series' cache from tsdb reads
self._arrays: dict[str, np.ndarray] = {} self._arrays: dict[str, np.ndarray] = {}
async def list_keys(self) -> list[str]:
return await self.client.list_symbols()
async def search_keys(self, pattern: str) -> list[str]:
'''
Search for time series key in the storage backend.
'''
...
async def write_ticks(self, ticks: list) -> None: async def write_ticks(self, ticks: list) -> None:
... ...
@ -262,7 +264,9 @@ class Storage:
for tfstr in tf_in_1s.values(): for tfstr in tf_in_1s.values():
try: try:
log.info(f'querying for {tfstr}@{fqsn}') log.info(f'querying for {tfstr}@{fqsn}')
result = await client.query(Params(fqsn, tfstr, 'OHLCV',)) result = await client.query(
Params(fqsn, tfstr, 'OHLCV',)
)
break break
except purerpc.grpclib.exceptions.UnknownError: except purerpc.grpclib.exceptions.UnknownError:
# XXX: this is already logged by the container and # XXX: this is already logged by the container and
@ -276,6 +280,9 @@ class Storage:
tfstr = tf_in_1s[timeframe] tfstr = tf_in_1s[timeframe]
result = await client.query(Params(fqsn, tfstr, 'OHLCV',)) result = await client.query(Params(fqsn, tfstr, 'OHLCV',))
# TODO: it turns out column access on recarrays is actually slower:
# https://jakevdp.github.io/PythonDataScienceHandbook/02.09-structured-data-numpy.html#RecordArrays:-Structured-Arrays-with-a-Twist
# it might make sense to make these structured arrays?
# Fill out a `numpy` array-results map # Fill out a `numpy` array-results map
arrays = {} arrays = {}
for fqsn, data_set in result.by_symbols().items(): for fqsn, data_set in result.by_symbols().items():
@ -283,7 +290,22 @@ class Storage:
tf_in_1s.inverse[data_set.timeframe] tf_in_1s.inverse[data_set.timeframe]
] = data_set.array ] = data_set.array
return arrays[fqsn][timeframe] if timeframe else arrays return arrays[fqsn][timeframe] if timeframe else arrays[fqsn]
async def delete_ts(
self,
key: str,
timeframe: Optional[Union[int, str]] = None,
) -> bool:
client = self.client
syms = await client.list_symbols()
print(syms)
# if key not in syms:
# raise KeyError(f'`{fqsn}` table key not found?')
return await client.destroy(tbk=key)
@acm @acm
@ -296,19 +318,16 @@ async def open_storage_client(
Load a series by key and deliver in ``numpy`` struct array format. Load a series by key and deliver in ``numpy`` struct array format.
''' '''
async with get_client() as client: async with (
# eventually a storage backend endpoint
storage_client = Storage(client) get_client() as client,
arrays = await storage_client.read_ohlcv( ):
fqsn, # slap on our wrapper api
period, yield Storage(client)
)
yield storage_client, arrays
async def backfill_history_diff( async def tsdb_history_update(
# symbol: Symbol fqsn: str,
) -> list[str]: ) -> list[str]:
@ -338,73 +357,64 @@ async def backfill_history_diff(
# * the original data feed arch blurb: # * the original data feed arch blurb:
# - https://github.com/pikers/piker/issues/98 # - https://github.com/pikers/piker/issues/98
# #
profiler = pg.debug.Profiler(
broker = 'ib' disabled=False, # not pg_profile_enabled(),
symbol = 'mnq.globex' delayed=False,
)
# broker = 'binance'
# symbol = 'btcusdt'
fqsn = mk_fqsn(broker, symbol)
async with ( async with (
get_client() as client, open_storage_client(fqsn) as storage,
maybe_open_feed( maybe_open_feed(
broker, [fqsn],
[symbol],
loglevel='info',
# backpressure=False,
start_stream=False, start_stream=False,
) as (feed, stream), ) as (feed, stream),
): ):
syms = await client.list_symbols() profiler(f'opened feed for {fqsn}')
log.info(f'Existing symbol set:\n{pformat(syms)}')
symbol = feed.symbols.get(fqsn)
if symbol:
fqsn = symbol.front_fqsn()
syms = await storage.client.list_symbols()
log.info(f'Existing tsdb symbol set:\n{pformat(syms)}')
profiler(f'listed symbols {syms}')
# diff db history with shm and only write the missing portions # diff db history with shm and only write the missing portions
ohlcv = feed.shm.array ohlcv = feed.shm.array
key = (fqsn, '1Sec', 'OHLCV') # TODO: use pg profiler
tbk = mk_tbk(key) tsdb_arrays = await storage.read_ohlcv(fqsn)
# diff vs. existing array and append new history to_append = feed.shm.array
# TODO: to_prepend = None
# TODO: should be no error? # hist diffing
# assert not resp.responses if tsdb_arrays:
onesec = tsdb_arrays[1]
to_append = ohlcv[ohlcv['time'] > onesec['Epoch'][-1]]
to_prepend = ohlcv[ohlcv['time'] < onesec['Epoch'][0]]
start = time.time() profiler('Finished db arrays diffs')
qr = await client.query( for array in [to_append, to_prepend]:
# Params(fqsn, '1Sec`', 'OHLCV',) if array is None:
Params(*key), continue
)
# # Dig out `numpy` results map
arrays: dict[tuple[str, int], np.ndarray] = {}
for name, data_set in qr.by_symbols().items():
in_secs = tf_in_1s.inverse[data_set.timeframe]
arrays[(name, in_secs)] = data_set.array
s1 = arrays[(fqsn, 1)]
to_append = ohlcv[ohlcv['time'] > s1['Epoch'][-1]]
end_diff = time.time()
diff_ms = round((end_diff - start) * 1e3, ndigits=2)
log.info( log.info(
f'Appending {to_append.size} datums to tsdb from shm\n' f'Writing datums {array.size} -> to tsdb from shm\n'
f'Total diff time: {diff_ms} ms'
) )
# build mkts schema compat array for writing # build mkts schema compat array for writing
mkts_dt = np.dtype(_ohlcv_dt) mkts_dt = np.dtype(_ohlcv_dt)
mkts_array = np.zeros( mkts_array = np.zeros(
len(to_append), len(array),
dtype=mkts_dt, dtype=mkts_dt,
) )
# copy from shm array (yes it's this easy): # copy from shm array (yes it's this easy):
# https://numpy.org/doc/stable/user/basics.rec.html#assignment-from-other-structured-arrays # https://numpy.org/doc/stable/user/basics.rec.html#assignment-from-other-structured-arrays
mkts_array[:] = to_append[[ mkts_array[:] = array[[
'time', 'time',
'open', 'open',
'high', 'high',
@ -414,33 +424,26 @@ async def backfill_history_diff(
]] ]]
# write to db # write to db
resp = await client.write( resp = await storage.client.write(
mkts_array, mkts_array,
tbk=tbk, tbk=f'{fqsn}/1Sec/OHLCV',
# NOTE: will will append duplicates # NOTE: will will append duplicates
# for the same timestamp-index. # for the same timestamp-index.
# TODO: pre deduplicate?
isvariablelength=True, isvariablelength=True,
) )
end_write = time.time()
diff_ms = round((end_write - end_diff) * 1e3, ndigits=2)
log.info( log.info(
f'Wrote {to_append.size} datums to tsdb\n' f'Wrote {to_append.size} datums to tsdb\n'
f'Total write time: {diff_ms} ms'
) )
profiler('Finished db writes')
for resp in resp.responses: for resp in resp.responses:
err = resp.error err = resp.error
if err: if err:
raise MarketStoreError(err) raise MarketStoreError(err)
# TODO: backfiller loop
from piker.ui._compression import downsample
x, y = downsample(
s1['Epoch'],
s1['Close'],
bins=10,
)
await tractor.breakpoint()
async def ingest_quote_stream( async def ingest_quote_stream(
symbols: list[str], symbols: list[str],