From bf7d5e9a71679050761f611e549832ae0c64d516 Mon Sep 17 00:00:00 2001 From: Tyler Goodlet Date: Thu, 15 Sep 2022 13:52:07 -0400 Subject: [PATCH] Make `marketstore` storage api timeframe aware The `Store.load()`, `.read_ohlcv()` and `.write_ohlcv()` and `.delete_ts()` now can take a `timeframe: Optional[float]` param which is used to look up the appropriate sampling period table-key from `marketstore`. --- piker/data/marketstore.py | 54 +++++++++++++++++++++++++++------------ 1 file changed, 37 insertions(+), 17 deletions(-) diff --git a/piker/data/marketstore.py b/piker/data/marketstore.py index 7f39ad88..ae890011 100644 --- a/piker/data/marketstore.py +++ b/piker/data/marketstore.py @@ -387,6 +387,7 @@ class Storage: async def load( self, fqsn: str, + timeframe: int, ) -> tuple[ dict[int, np.ndarray], # timeframe (in secs) to series @@ -400,12 +401,16 @@ class Storage: # on first load we don't need to pull the max # history per request size worth. limit=3000, + timeframe=timeframe, ) log.info(f'Loaded tsdb history {tsdb_arrays}') - if tsdb_arrays: - fastest = list(tsdb_arrays.values())[0] - times = fastest['Epoch'] + if len(tsdb_arrays): + # fastest = list(tsdb_arrays.values())[0] + # slowest = list(tsdb_arrays.values())[-1] + hist = tsdb_arrays[timeframe] + + times = hist['Epoch'] first, last = times[0], times[-1] first_tsdb_dt, last_tsdb_dt = map( pendulum.from_timestamp, [first, last] @@ -420,9 +425,9 @@ class Storage: end: Optional[int] = None, limit: int = int(800e3), - ) -> tuple[ - MarketstoreClient, - Union[dict, np.ndarray] + ) -> dict[ + int, + Union[dict, np.ndarray], ]: client = self.client syms = await client.list_symbols() @@ -430,7 +435,8 @@ class Storage: if fqsn not in syms: return {} - tfstr = tf_in_1s[1] + # use the provided timeframe or 1s by default + tfstr = tf_in_1s.get(timeframe, tf_in_1s[1]) params = Params( symbols=fqsn, @@ -463,39 +469,52 @@ class Storage: return {} else: - result = await client.query(params) + params.set('timeframe', tfstr) + try: + result = await client.query(params) + except purerpc.grpclib.exceptions.UnknownError: + # indicate there is no history for this timeframe + return {} + + # Fill out a `numpy` array-results map keyed by timeframe + arrays = {} # TODO: it turns out column access on recarrays is actually slower: # https://jakevdp.github.io/PythonDataScienceHandbook/02.09-structured-data-numpy.html#RecordArrays:-Structured-Arrays-with-a-Twist # it might make sense to make these structured arrays? - # Fill out a `numpy` array-results map - arrays = {} for fqsn, data_set in result.by_symbols().items(): arrays.setdefault(fqsn, {})[ tf_in_1s.inverse[data_set.timeframe] ] = data_set.array - return arrays[fqsn][timeframe] if timeframe else arrays[fqsn] + return arrays[fqsn] async def delete_ts( self, key: str, timeframe: Optional[Union[int, str]] = None, + fmt: str = 'OHLCV', ) -> bool: client = self.client syms = await client.list_symbols() print(syms) - # if key not in syms: - # raise KeyError(f'`{fqsn}` table key not found?') + if key not in syms: + raise KeyError(f'`{key}` table key not found in\n{syms}?') - return await client.destroy(tbk=key) + tbk = mk_tbk(( + key, + tf_in_1s.get(timeframe, tf_in_1s[60]), + fmt, + )) + return await client.destroy(tbk=tbk) async def write_ohlcv( self, fqsn: str, ohlcv: np.ndarray, + timeframe: int, append_and_duplicate: bool = True, limit: int = int(800e3), @@ -525,7 +544,7 @@ class Storage: # write to db resp = await self.client.write( to_push, - tbk=f'{fqsn}/1Sec/OHLCV', + tbk=f'{fqsn}/{tf_in_1s[timeframe]}/OHLCV', # NOTE: will will append duplicates # for the same timestamp-index. @@ -577,6 +596,7 @@ class Storage: # def delete_range(self, start_dt, end_dt) -> None: # ... + @acm async def open_storage_client( fqsn: str, @@ -642,7 +662,7 @@ async def tsdb_history_update( ): profiler(f'opened feed for {fqsn}') - to_append = feed.shm.array + to_append = feed.hist_shm.array to_prepend = None if fqsn: @@ -651,7 +671,7 @@ async def tsdb_history_update( fqsn = symbol.front_fqsn() # diff db history with shm and only write the missing portions - ohlcv = feed.shm.array + ohlcv = feed.hist_shm.array # TODO: use pg profiler tsdb_arrays = await storage.read_ohlcv(fqsn)