diff --git a/piker/data/cli.py b/piker/data/cli.py index 5b9c854d..6ea2503d 100644 --- a/piker/data/cli.py +++ b/piker/data/cli.py @@ -119,14 +119,14 @@ def ms_shell(config, tl, host, port): Start an IPython shell ready to query the local marketstore db. ''' - from piker.data.marketstore import backfill_history + from piker.data.marketstore import backfill_history_diff from piker._daemon import open_piker_runtime async def main(): async with open_piker_runtime( 'ms_shell', enable_modules=['piker.data._ahab'], ): - await backfill_history() + await backfill_history_diff() # TODO: write magics to query marketstore # from IPython import embed # embed() diff --git a/piker/data/marketstore.py b/piker/data/marketstore.py index 457f37d7..cdcaeb02 100644 --- a/piker/data/marketstore.py +++ b/piker/data/marketstore.py @@ -113,7 +113,7 @@ def mk_tbk(keys: tuple[str, str, str]) -> str: ``('SPY', '1Sec', 'TICK')`` -> ``"SPY/1Sec/TICK"``` ''' - return '{}/' + '/'.join(keys) + return '/'.join(keys) def quote_to_marketstore_structarray( @@ -184,8 +184,8 @@ async def get_client( yield client -# class MarketStoreError(Exception): -# "Generic marketstore client error" +class MarketStoreError(Exception): + "Generic marketstore client error" # def err_on_resp(response: dict) -> None: @@ -210,13 +210,16 @@ tf_in_1s = bidict({ }) -# @acm -async def load_history( - symbol: Symbol, +async def manage_history( + fqsn: str, period: int = 1, # in seconds -) -> np.ndarray: +) -> dict[str, np.ndarray]: + ''' + Load a series by key and deliver in ``numpy`` struct array + format. + ''' async with get_client() as client: tfstr = tf_in_1s[period] @@ -225,16 +228,17 @@ async def load_history( ) # Dig out `numpy` results map arrays = {} - await tractor.breakpoint() # for qr in [onem, fivem]: - # for name, data_set in qr.by_symbols().items(): - # arrays[(name, qr)] = data_set.array + for name, data_set in result.by_symbols().items(): + arrays[(name, qr)] = data_set.array + await tractor.breakpoint() # # TODO: backfiller loop # array = arrays[(fqsn, qr)] + return arrays -async def backfill_history( +async def backfill_history_diff( # symbol: Symbol ) -> list[str]: @@ -251,7 +255,6 @@ async def backfill_history( fqsn = mk_fqsn(broker, symbol) - print('yo') async with ( get_client() as client, maybe_open_feed( @@ -263,21 +266,52 @@ async def backfill_history( ) as (feed, stream), ): - print('yo') - ohlcv = feed.shm.array - mkts_dt = np.dtype(_ohlcv_dt) - - print('yo') syms = await client.list_symbols() log.info(f'Existing symbol set:\n{pformat(syms)}') - # build mkts schema compat array + # diff db history with shm and only write the missing portions + ohlcv = feed.shm.array + + key = (fqsn, '1Sec', 'OHLCV') + tbk = mk_tbk(key) + + # diff vs. existing array and append new history + # TODO: + + # TODO: should be no error? + # assert not resp.responses + + start = time.time() + + qr = await client.query( + # Params(fqsn, '1Sec`', 'OHLCV',) + Params(*key), + ) + # # Dig out `numpy` results map + arrays: dict[tuple[str, int], np.ndarray] = {} + for name, data_set in qr.by_symbols().items(): + in_secs = tf_in_1s.inverse[data_set.timeframe] + arrays[(name, in_secs)] = data_set.array + + s1 = arrays[(fqsn, 1)] + to_append = ohlcv[ohlcv['time'] > s1['Epoch'][-1]] + + end_diff = time.time() + diff_ms = round((end_diff - start) * 1e3, ndigits=2) + + log.info( + f'Appending {to_append.size} datums to tsdb from shm\n' + f'Total diff time: {diff_ms} ms' + ) + + # build mkts schema compat array for writing + mkts_dt = np.dtype(_ohlcv_dt) mkts_array = np.zeros( - len(ohlcv), + len(to_append), dtype=mkts_dt, ) # copy from shm array - mkts_array[:] = ohlcv[[ + mkts_array[:] = to_append[[ 'time', 'open', 'high', @@ -286,39 +320,27 @@ async def backfill_history( 'volume', ]] - key = (fqsn, '1Sec', 'OHLCV') - tbk = mk_tbk(key) - - # diff vs. existing array and append new history - # TODO: - # write to db resp = await client.write( mkts_array, tbk=tbk, # NOTE: will will append duplicates # for the same timestamp-index. - # isvariablelength=True, + isvariablelength=True, ) - # TODO: should be no error? - # assert not resp.responses - - # # Dig out `numpy` results map - qr = await client.query( - Params(fqsn, '1Min`', 'OHLCV',) + end_write = time.time() + diff_ms = round((end_write - end_diff) * 1e3, ndigits=2) + log.info( + f'Wrote {to_append.size} datums to tsdb\n' + f'Total write time: {diff_ms} ms' ) - qr = await client.query( - # Params(fqsn, '1Sec`', 'OHLCV',) - Params(*key), - ) - arrays = {} - # for qr in [onem, fivem]: - for name, data_set in qr.by_symbols().items(): - arrays[(name, qr)] = data_set.array + for resp in resp.responses: + err = resp.error + if err: + raise MarketStoreError(err) # TODO: backfiller loop - array = arrays[(fqsn, qr)] - await tractor.breakpoint() + # await tractor.breakpoint() async def ingest_quote_stream(