From 143e86a80ce7a97f13f588ddde788032103b1cac Mon Sep 17 00:00:00 2001 From: Tyler Goodlet Date: Tue, 25 Oct 2022 23:24:15 -0400 Subject: [PATCH] Handle super annoying mkts query bug.. Turns out querying for a high freq timeframe (like 1sec) will still return a lower freq timeframe (like 1Min) SMH, and no idea if it's the server or the client's fault, so we have to explicitly check the sample step size and discard lower freq series-results. Do this inside `Storage.read_ohlcv()` and return an empty `dict` when the wrong time step is detected from the query result. Further enforcements, - both `.load()` and `read_ohlcv()` now require an explicit `timeframe: int` input to guarantee the time step of the output array. - drop all calls `.load()` with non-timeframe specific input. --- piker/data/marketstore.py | 74 +++++++++++++++++++++++---------------- 1 file changed, 44 insertions(+), 30 deletions(-) diff --git a/piker/data/marketstore.py b/piker/data/marketstore.py index ae890011..0d0c30d6 100644 --- a/piker/data/marketstore.py +++ b/piker/data/marketstore.py @@ -390,33 +390,33 @@ class Storage: timeframe: int, ) -> tuple[ - dict[int, np.ndarray], # timeframe (in secs) to series + np.ndarray, # timeframe sampled array-series Optional[datetime], # first dt Optional[datetime], # last dt ]: first_tsdb_dt, last_tsdb_dt = None, None - tsdb_arrays = await self.read_ohlcv( + hist = await self.read_ohlcv( fqsn, # on first load we don't need to pull the max # history per request size worth. limit=3000, timeframe=timeframe, ) - log.info(f'Loaded tsdb history {tsdb_arrays}') - - if len(tsdb_arrays): - # fastest = list(tsdb_arrays.values())[0] - # slowest = list(tsdb_arrays.values())[-1] - hist = tsdb_arrays[timeframe] + log.info(f'Loaded tsdb history {hist}') + if len(hist): times = hist['Epoch'] first, last = times[0], times[-1] first_tsdb_dt, last_tsdb_dt = map( pendulum.from_timestamp, [first, last] ) - return tsdb_arrays, first_tsdb_dt, last_tsdb_dt + return ( + hist, # array-data + first_tsdb_dt, # start of query-frame + last_tsdb_dt, # most recent + ) async def read_ohlcv( self, @@ -458,6 +458,7 @@ class Storage: log.info(f'querying for {tfstr}@{fqsn}') params.set('timeframe', tfstr) result = await client.query(params) + timeframe = tf_in_1s.inverse[tfstr] break except purerpc.grpclib.exceptions.UnknownError: @@ -476,18 +477,31 @@ class Storage: # indicate there is no history for this timeframe return {} - # Fill out a `numpy` array-results map keyed by timeframe - arrays = {} + # # Fill out a `numpy` array-results map keyed by timeframe + # arrays = {} # TODO: it turns out column access on recarrays is actually slower: # https://jakevdp.github.io/PythonDataScienceHandbook/02.09-structured-data-numpy.html#RecordArrays:-Structured-Arrays-with-a-Twist # it might make sense to make these structured arrays? - for fqsn, data_set in result.by_symbols().items(): - arrays.setdefault(fqsn, {})[ - tf_in_1s.inverse[data_set.timeframe] - ] = data_set.array + data_set = result.by_symbols()[fqsn] + array = data_set.array - return arrays[fqsn] + # XXX: market store BUG! + # for wtv cucked reason seems like despite the params + # timeframe being set to a "lower" value the higher + # valued data set will still be pulled?.. + time = data_set.array['Epoch'] + if len(time) > 1: + time_step = time[1] - time[0] + ts = tf_in_1s.inverse[data_set.timeframe] + if time_step > ts: + log.warning(f'MKTS BUG: wrong timeframe loaded: {time_step}') + print("WTF MKTS") + return {} + else: + ts = timeframe + + return array async def delete_ts( self, @@ -662,8 +676,8 @@ async def tsdb_history_update( ): profiler(f'opened feed for {fqsn}') - to_append = feed.hist_shm.array - to_prepend = None + # to_append = feed.hist_shm.array + # to_prepend = None if fqsn: symbol = feed.symbols.get(fqsn) @@ -671,21 +685,21 @@ async def tsdb_history_update( fqsn = symbol.front_fqsn() # diff db history with shm and only write the missing portions - ohlcv = feed.hist_shm.array + # ohlcv = feed.hist_shm.array # TODO: use pg profiler - tsdb_arrays = await storage.read_ohlcv(fqsn) - # hist diffing - if tsdb_arrays: - for secs in (1, 60): - ts = tsdb_arrays.get(secs) - if ts is not None and len(ts): - # these aren't currently used but can be referenced from - # within the embedded ipython shell below. - to_append = ohlcv[ohlcv['time'] > ts['Epoch'][-1]] - to_prepend = ohlcv[ohlcv['time'] < ts['Epoch'][0]] + # for secs in (1, 60): + # tsdb_array = await storage.read_ohlcv( + # fqsn, + # timeframe=timeframe, + # ) + # # hist diffing: + # # these aren't currently used but can be referenced from + # # within the embedded ipython shell below. + # to_append = ohlcv[ohlcv['time'] > ts['Epoch'][-1]] + # to_prepend = ohlcv[ohlcv['time'] < ts['Epoch'][0]] - profiler('Finished db arrays diffs') + # profiler('Finished db arrays diffs') syms = await storage.client.list_symbols() log.info(f'Existing tsdb symbol set:\n{pformat(syms)}')