diff --git a/piker/data/marketstore.py b/piker/data/marketstore.py index ae890011..0d0c30d6 100644 --- a/piker/data/marketstore.py +++ b/piker/data/marketstore.py @@ -390,33 +390,33 @@ class Storage: timeframe: int, ) -> tuple[ - dict[int, np.ndarray], # timeframe (in secs) to series + np.ndarray, # timeframe sampled array-series Optional[datetime], # first dt Optional[datetime], # last dt ]: first_tsdb_dt, last_tsdb_dt = None, None - tsdb_arrays = await self.read_ohlcv( + hist = await self.read_ohlcv( fqsn, # on first load we don't need to pull the max # history per request size worth. limit=3000, timeframe=timeframe, ) - log.info(f'Loaded tsdb history {tsdb_arrays}') - - if len(tsdb_arrays): - # fastest = list(tsdb_arrays.values())[0] - # slowest = list(tsdb_arrays.values())[-1] - hist = tsdb_arrays[timeframe] + log.info(f'Loaded tsdb history {hist}') + if len(hist): times = hist['Epoch'] first, last = times[0], times[-1] first_tsdb_dt, last_tsdb_dt = map( pendulum.from_timestamp, [first, last] ) - return tsdb_arrays, first_tsdb_dt, last_tsdb_dt + return ( + hist, # array-data + first_tsdb_dt, # start of query-frame + last_tsdb_dt, # most recent + ) async def read_ohlcv( self, @@ -458,6 +458,7 @@ class Storage: log.info(f'querying for {tfstr}@{fqsn}') params.set('timeframe', tfstr) result = await client.query(params) + timeframe = tf_in_1s.inverse[tfstr] break except purerpc.grpclib.exceptions.UnknownError: @@ -476,18 +477,31 @@ class Storage: # indicate there is no history for this timeframe return {} - # Fill out a `numpy` array-results map keyed by timeframe - arrays = {} + # # Fill out a `numpy` array-results map keyed by timeframe + # arrays = {} # TODO: it turns out column access on recarrays is actually slower: # https://jakevdp.github.io/PythonDataScienceHandbook/02.09-structured-data-numpy.html#RecordArrays:-Structured-Arrays-with-a-Twist # it might make sense to make these structured arrays? - for fqsn, data_set in result.by_symbols().items(): - arrays.setdefault(fqsn, {})[ - tf_in_1s.inverse[data_set.timeframe] - ] = data_set.array + data_set = result.by_symbols()[fqsn] + array = data_set.array - return arrays[fqsn] + # XXX: market store BUG! + # for wtv cucked reason seems like despite the params + # timeframe being set to a "lower" value the higher + # valued data set will still be pulled?.. + time = data_set.array['Epoch'] + if len(time) > 1: + time_step = time[1] - time[0] + ts = tf_in_1s.inverse[data_set.timeframe] + if time_step > ts: + log.warning(f'MKTS BUG: wrong timeframe loaded: {time_step}') + print("WTF MKTS") + return {} + else: + ts = timeframe + + return array async def delete_ts( self, @@ -662,8 +676,8 @@ async def tsdb_history_update( ): profiler(f'opened feed for {fqsn}') - to_append = feed.hist_shm.array - to_prepend = None + # to_append = feed.hist_shm.array + # to_prepend = None if fqsn: symbol = feed.symbols.get(fqsn) @@ -671,21 +685,21 @@ async def tsdb_history_update( fqsn = symbol.front_fqsn() # diff db history with shm and only write the missing portions - ohlcv = feed.hist_shm.array + # ohlcv = feed.hist_shm.array # TODO: use pg profiler - tsdb_arrays = await storage.read_ohlcv(fqsn) - # hist diffing - if tsdb_arrays: - for secs in (1, 60): - ts = tsdb_arrays.get(secs) - if ts is not None and len(ts): - # these aren't currently used but can be referenced from - # within the embedded ipython shell below. - to_append = ohlcv[ohlcv['time'] > ts['Epoch'][-1]] - to_prepend = ohlcv[ohlcv['time'] < ts['Epoch'][0]] + # for secs in (1, 60): + # tsdb_array = await storage.read_ohlcv( + # fqsn, + # timeframe=timeframe, + # ) + # # hist diffing: + # # these aren't currently used but can be referenced from + # # within the embedded ipython shell below. + # to_append = ohlcv[ohlcv['time'] > ts['Epoch'][-1]] + # to_prepend = ohlcv[ohlcv['time'] < ts['Epoch'][0]] - profiler('Finished db arrays diffs') + # profiler('Finished db arrays diffs') syms = await storage.client.list_symbols() log.info(f'Existing tsdb symbol set:\n{pformat(syms)}')