diff --git a/piker/data/marketstore.py b/piker/data/marketstore.py index c84b5a78..fccd13b4 100644 --- a/piker/data/marketstore.py +++ b/piker/data/marketstore.py @@ -421,7 +421,7 @@ class Storage: async def read_ohlcv( self, fqsn: str, - timeframe: Optional[Union[int, str]] = None, + timeframe: int | str, end: Optional[int] = None, limit: int = int(800e3), @@ -429,11 +429,13 @@ class Storage: int, Union[dict, np.ndarray], ]: + client = self.client syms = await client.list_symbols() if fqsn not in syms: - return {} + raise KeyError('No entry for {fqsn}') + # return {} # use the provided timeframe or 1s by default tfstr = tf_in_1s.get(timeframe, tf_in_1s[1]) @@ -462,17 +464,19 @@ class Storage: data_set = result.by_symbols()[fqsn] array = data_set.array - # XXX: market store BUG! - # for wtv cucked reason seems like despite the params - # timeframe being set to a "lower" value the higher - # valued data set will still be pulled?.. + # XXX: ensure sample rate is as expected time = data_set.array['Epoch'] if len(time) > 1: - time_step = time[1] - time[0] + time_step = time[-1] - time[-2] ts = tf_in_1s.inverse[data_set.timeframe] - if time_step > ts: - log.warning(f'MKTS BUG: wrong timeframe loaded: {time_step}') - return {} + + assert time_step == ts + + # if time_step != ts: + # log.warning(f'MKTS BUG: wrong timeframe loaded: {time_step}') + # if timeframe == 1: + # await tractor.breakpoint() + # return {} return array @@ -525,17 +529,18 @@ class Storage: m, r = divmod(len(mkts_array), limit) + tfkey = tf_in_1s[timeframe] for i in range(m, 1): to_push = mkts_array[i-1:i*limit] # write to db resp = await self.client.write( to_push, - tbk=f'{fqsn}/{tf_in_1s[timeframe]}/OHLCV', + tbk=f'{fqsn}/{tfkey}/OHLCV', # NOTE: will will append duplicates # for the same timestamp-index. - # TODO: pre deduplicate? + # TODO: pre-deduplicate? isvariablelength=append_and_duplicate, ) @@ -554,7 +559,7 @@ class Storage: # write to db resp = await self.client.write( to_push, - tbk=f'{fqsn}/1Sec/OHLCV', + tbk=f'{fqsn}/{tfkey}/OHLCV', # NOTE: will will append duplicates # for the same timestamp-index.