diff --git a/piker/data/marketstore.py b/piker/data/marketstore.py index 34de23ea..887df45e 100644 --- a/piker/data/marketstore.py +++ b/piker/data/marketstore.py @@ -29,14 +29,13 @@ from typing import ( Any, Optional, Union, - # Callable, - # TYPE_CHECKING, ) import time from math import isnan from bidict import bidict import msgpack +import pyqtgraph as pg import numpy as np import pandas as pd import tractor @@ -49,15 +48,8 @@ from anyio_marketstore import ( import purerpc from .feed import maybe_open_feed -from ._source import ( - mk_fqsn, - # Symbol, -) from ..log import get_logger, get_console_log -# if TYPE_CHECKING: -# from ._sharedmem import ShmArray - log = get_logger(__name__) @@ -235,6 +227,16 @@ class Storage: # series' cache from tsdb reads self._arrays: dict[str, np.ndarray] = {} + async def list_keys(self) -> list[str]: + return await self.client.list_symbols() + + async def search_keys(self, pattern: str) -> list[str]: + ''' + Search for time series key in the storage backend. + + ''' + ... + async def write_ticks(self, ticks: list) -> None: ... @@ -262,7 +264,9 @@ class Storage: for tfstr in tf_in_1s.values(): try: log.info(f'querying for {tfstr}@{fqsn}') - result = await client.query(Params(fqsn, tfstr, 'OHLCV',)) + result = await client.query( + Params(fqsn, tfstr, 'OHLCV',) + ) break except purerpc.grpclib.exceptions.UnknownError: # XXX: this is already logged by the container and @@ -276,6 +280,9 @@ class Storage: tfstr = tf_in_1s[timeframe] result = await client.query(Params(fqsn, tfstr, 'OHLCV',)) + # TODO: it turns out column access on recarrays is actually slower: + # https://jakevdp.github.io/PythonDataScienceHandbook/02.09-structured-data-numpy.html#RecordArrays:-Structured-Arrays-with-a-Twist + # it might make sense to make these structured arrays? # Fill out a `numpy` array-results map arrays = {} for fqsn, data_set in result.by_symbols().items(): @@ -283,7 +290,22 @@ class Storage: tf_in_1s.inverse[data_set.timeframe] ] = data_set.array - return arrays[fqsn][timeframe] if timeframe else arrays + return arrays[fqsn][timeframe] if timeframe else arrays[fqsn] + + async def delete_ts( + self, + key: str, + timeframe: Optional[Union[int, str]] = None, + + ) -> bool: + + client = self.client + syms = await client.list_symbols() + print(syms) + # if key not in syms: + # raise KeyError(f'`{fqsn}` table key not found?') + + return await client.destroy(tbk=key) @acm @@ -296,19 +318,16 @@ async def open_storage_client( Load a series by key and deliver in ``numpy`` struct array format. ''' - async with get_client() as client: - - storage_client = Storage(client) - arrays = await storage_client.read_ohlcv( - fqsn, - period, - ) - - yield storage_client, arrays + async with ( + # eventually a storage backend endpoint + get_client() as client, + ): + # slap on our wrapper api + yield Storage(client) -async def backfill_history_diff( - # symbol: Symbol +async def tsdb_history_update( + fqsn: str, ) -> list[str]: @@ -338,108 +357,92 @@ async def backfill_history_diff( # * the original data feed arch blurb: # - https://github.com/pikers/piker/issues/98 # - - broker = 'ib' - symbol = 'mnq.globex' - - # broker = 'binance' - # symbol = 'btcusdt' - - fqsn = mk_fqsn(broker, symbol) + profiler = pg.debug.Profiler( + disabled=False, # not pg_profile_enabled(), + delayed=False, + ) async with ( - get_client() as client, + open_storage_client(fqsn) as storage, + maybe_open_feed( - broker, - [symbol], - loglevel='info', - # backpressure=False, + [fqsn], start_stream=False, ) as (feed, stream), ): - syms = await client.list_symbols() - log.info(f'Existing symbol set:\n{pformat(syms)}') + profiler(f'opened feed for {fqsn}') + + symbol = feed.symbols.get(fqsn) + if symbol: + fqsn = symbol.front_fqsn() + + syms = await storage.client.list_symbols() + log.info(f'Existing tsdb symbol set:\n{pformat(syms)}') + profiler(f'listed symbols {syms}') # diff db history with shm and only write the missing portions ohlcv = feed.shm.array - key = (fqsn, '1Sec', 'OHLCV') - tbk = mk_tbk(key) + # TODO: use pg profiler + tsdb_arrays = await storage.read_ohlcv(fqsn) - # diff vs. existing array and append new history - # TODO: + to_append = feed.shm.array + to_prepend = None - # TODO: should be no error? - # assert not resp.responses + # hist diffing + if tsdb_arrays: + onesec = tsdb_arrays[1] + to_append = ohlcv[ohlcv['time'] > onesec['Epoch'][-1]] + to_prepend = ohlcv[ohlcv['time'] < onesec['Epoch'][0]] - start = time.time() + profiler('Finished db arrays diffs') - qr = await client.query( - # Params(fqsn, '1Sec`', 'OHLCV',) - Params(*key), - ) - # # Dig out `numpy` results map - arrays: dict[tuple[str, int], np.ndarray] = {} - for name, data_set in qr.by_symbols().items(): - in_secs = tf_in_1s.inverse[data_set.timeframe] - arrays[(name, in_secs)] = data_set.array + for array in [to_append, to_prepend]: + if array is None: + continue - s1 = arrays[(fqsn, 1)] - to_append = ohlcv[ohlcv['time'] > s1['Epoch'][-1]] + log.info( + f'Writing datums {array.size} -> to tsdb from shm\n' + ) - end_diff = time.time() - diff_ms = round((end_diff - start) * 1e3, ndigits=2) + # build mkts schema compat array for writing + mkts_dt = np.dtype(_ohlcv_dt) + mkts_array = np.zeros( + len(array), + dtype=mkts_dt, + ) + # copy from shm array (yes it's this easy): + # https://numpy.org/doc/stable/user/basics.rec.html#assignment-from-other-structured-arrays + mkts_array[:] = array[[ + 'time', + 'open', + 'high', + 'low', + 'close', + 'volume', + ]] - log.info( - f'Appending {to_append.size} datums to tsdb from shm\n' - f'Total diff time: {diff_ms} ms' - ) + # write to db + resp = await storage.client.write( + mkts_array, + tbk=f'{fqsn}/1Sec/OHLCV', - # build mkts schema compat array for writing - mkts_dt = np.dtype(_ohlcv_dt) - mkts_array = np.zeros( - len(to_append), - dtype=mkts_dt, - ) - # copy from shm array (yes it's this easy): - # https://numpy.org/doc/stable/user/basics.rec.html#assignment-from-other-structured-arrays - mkts_array[:] = to_append[[ - 'time', - 'open', - 'high', - 'low', - 'close', - 'volume', - ]] + # NOTE: will will append duplicates + # for the same timestamp-index. + # TODO: pre deduplicate? + isvariablelength=True, + ) - # write to db - resp = await client.write( - mkts_array, - tbk=tbk, - # NOTE: will will append duplicates - # for the same timestamp-index. - isvariablelength=True, - ) - end_write = time.time() - diff_ms = round((end_write - end_diff) * 1e3, ndigits=2) - log.info( - f'Wrote {to_append.size} datums to tsdb\n' - f'Total write time: {diff_ms} ms' - ) - for resp in resp.responses: - err = resp.error - if err: - raise MarketStoreError(err) + log.info( + f'Wrote {to_append.size} datums to tsdb\n' + ) + profiler('Finished db writes') - # TODO: backfiller loop - from piker.ui._compression import downsample - x, y = downsample( - s1['Epoch'], - s1['Close'], - bins=10, - ) - await tractor.breakpoint() + for resp in resp.responses: + err = resp.error + if err: + raise MarketStoreError(err) async def ingest_quote_stream(