diff --git a/piker/data/marketstore.py b/piker/data/marketstore.py index f82770ab..50804031 100644 --- a/piker/data/marketstore.py +++ b/piker/data/marketstore.py @@ -24,6 +24,7 @@ ''' from contextlib import asynccontextmanager as acm +from datetime import datetime from pprint import pformat from typing import ( Any, @@ -45,6 +46,7 @@ from anyio_marketstore import ( MarketstoreClient, Params, ) +import pendulum import purerpc from .feed import maybe_open_feed @@ -240,8 +242,29 @@ class Storage: async def write_ticks(self, ticks: list) -> None: ... - async def write_ohlcv(self, ohlcv: np.ndarray) -> None: - ... + async def load( + self, + fqsn: str, + + ) -> tuple[ + dict[int, np.ndarray], # timeframe (in secs) to series + Optional[datetime], # first dt + Optional[datetime], # last dt + ]: + + first_tsdb_dt, last_tsdb_dt = None, None + tsdb_arrays = await self.read_ohlcv(fqsn) + log.info(f'Loaded tsdb history {tsdb_arrays}') + + if tsdb_arrays: + fastest = list(tsdb_arrays.values())[0] + times = fastest['Epoch'] + first, last = times[0], times[-1] + first_tsdb_dt, last_tsdb_dt = map( + pendulum.from_timestamp, [first, last] + ) + + return tsdb_arrays, first_tsdb_dt, last_tsdb_dt async def read_ohlcv( self, @@ -319,6 +342,49 @@ class Storage: return await client.destroy(tbk=key) + async def write_ohlcv( + self, + fqsn: str, + ohlcv: np.ndarray, + + ) -> None: + # build mkts schema compat array for writing + mkts_dt = np.dtype(_ohlcv_dt) + mkts_array = np.zeros( + len(ohlcv), + dtype=mkts_dt, + ) + # copy from shm array (yes it's this easy): + # https://numpy.org/doc/stable/user/basics.rec.html#assignment-from-other-structured-arrays + mkts_array[:] = ohlcv[[ + 'time', + 'open', + 'high', + 'low', + 'close', + 'volume', + ]] + + # write to db + resp = await self.client.write( + mkts_array, + tbk=f'{fqsn}/1Sec/OHLCV', + + # NOTE: will will append duplicates + # for the same timestamp-index. + # TODO: pre deduplicate? + isvariablelength=True, + ) + + log.info( + f'Wrote {mkts_array.size} datums to tsdb\n' + ) + + for resp in resp.responses: + err = resp.error + if err: + raise MarketStoreError(err) + @acm async def open_storage_client( @@ -402,6 +468,9 @@ async def tsdb_history_update( to_append = feed.shm.array to_prepend = None + from tractor.trionics import ipython_embed + await ipython_embed() + # hist diffing if tsdb_arrays: onesec = tsdb_arrays[1] @@ -417,47 +486,9 @@ async def tsdb_history_update( log.info( f'Writing datums {array.size} -> to tsdb from shm\n' ) + await storage.write_ohlcv(fqsn, array) - # build mkts schema compat array for writing - mkts_dt = np.dtype(_ohlcv_dt) - mkts_array = np.zeros( - len(array), - dtype=mkts_dt, - ) - # copy from shm array (yes it's this easy): - # https://numpy.org/doc/stable/user/basics.rec.html#assignment-from-other-structured-arrays - mkts_array[:] = array[[ - 'time', - 'open', - 'high', - 'low', - 'close', - 'volume', - ]] - - # write to db - resp = await storage.client.write( - mkts_array, - tbk=f'{fqsn}/1Sec/OHLCV', - - # NOTE: will will append duplicates - # for the same timestamp-index. - # TODO: pre deduplicate? - isvariablelength=True, - ) - - log.info( - f'Wrote {to_append.size} datums to tsdb\n' - ) - profiler('Finished db writes') - - for resp in resp.responses: - err = resp.error - if err: - raise MarketStoreError(err) - - from tractor.trionics import ipython_embed - await ipython_embed() + profiler('Finished db writes') async def ingest_quote_stream(