Handle super annoying mkts query bug..

Turns out querying for a high freq timeframe (like 1sec) will still
return a lower freq timeframe (like 1Min) SMH, and no idea if it's the
server or the client's fault, so we have to explicitly check the sample
step size and discard lower freq series-results. Do this inside
`Storage.read_ohlcv()` and return an empty `dict` when the wrong time
step is detected from the query result.

Further enforcements,
- both `.load()` and `read_ohlcv()` now require an explicit `timeframe:
  int` input to guarantee the time step of the output array.
- drop all calls `.load()` with non-timeframe specific input.
ib_1m_hist
Tyler Goodlet 2022-10-25 23:24:15 -04:00
parent 956c7d3435
commit 143e86a80c
1 changed files with 44 additions and 30 deletions

View File

@ -390,33 +390,33 @@ class Storage:
timeframe: int, timeframe: int,
) -> tuple[ ) -> tuple[
dict[int, np.ndarray], # timeframe (in secs) to series np.ndarray, # timeframe sampled array-series
Optional[datetime], # first dt Optional[datetime], # first dt
Optional[datetime], # last dt Optional[datetime], # last dt
]: ]:
first_tsdb_dt, last_tsdb_dt = None, None first_tsdb_dt, last_tsdb_dt = None, None
tsdb_arrays = await self.read_ohlcv( hist = await self.read_ohlcv(
fqsn, fqsn,
# on first load we don't need to pull the max # on first load we don't need to pull the max
# history per request size worth. # history per request size worth.
limit=3000, limit=3000,
timeframe=timeframe, timeframe=timeframe,
) )
log.info(f'Loaded tsdb history {tsdb_arrays}') log.info(f'Loaded tsdb history {hist}')
if len(tsdb_arrays):
# fastest = list(tsdb_arrays.values())[0]
# slowest = list(tsdb_arrays.values())[-1]
hist = tsdb_arrays[timeframe]
if len(hist):
times = hist['Epoch'] times = hist['Epoch']
first, last = times[0], times[-1] first, last = times[0], times[-1]
first_tsdb_dt, last_tsdb_dt = map( first_tsdb_dt, last_tsdb_dt = map(
pendulum.from_timestamp, [first, last] pendulum.from_timestamp, [first, last]
) )
return tsdb_arrays, first_tsdb_dt, last_tsdb_dt return (
hist, # array-data
first_tsdb_dt, # start of query-frame
last_tsdb_dt, # most recent
)
async def read_ohlcv( async def read_ohlcv(
self, self,
@ -458,6 +458,7 @@ class Storage:
log.info(f'querying for {tfstr}@{fqsn}') log.info(f'querying for {tfstr}@{fqsn}')
params.set('timeframe', tfstr) params.set('timeframe', tfstr)
result = await client.query(params) result = await client.query(params)
timeframe = tf_in_1s.inverse[tfstr]
break break
except purerpc.grpclib.exceptions.UnknownError: except purerpc.grpclib.exceptions.UnknownError:
@ -476,18 +477,31 @@ class Storage:
# indicate there is no history for this timeframe # indicate there is no history for this timeframe
return {} return {}
# Fill out a `numpy` array-results map keyed by timeframe # # Fill out a `numpy` array-results map keyed by timeframe
arrays = {} # arrays = {}
# TODO: it turns out column access on recarrays is actually slower: # TODO: it turns out column access on recarrays is actually slower:
# https://jakevdp.github.io/PythonDataScienceHandbook/02.09-structured-data-numpy.html#RecordArrays:-Structured-Arrays-with-a-Twist # https://jakevdp.github.io/PythonDataScienceHandbook/02.09-structured-data-numpy.html#RecordArrays:-Structured-Arrays-with-a-Twist
# it might make sense to make these structured arrays? # it might make sense to make these structured arrays?
for fqsn, data_set in result.by_symbols().items(): data_set = result.by_symbols()[fqsn]
arrays.setdefault(fqsn, {})[ array = data_set.array
tf_in_1s.inverse[data_set.timeframe]
] = data_set.array
return arrays[fqsn] # XXX: market store BUG!
# for wtv cucked reason seems like despite the params
# timeframe being set to a "lower" value the higher
# valued data set will still be pulled?..
time = data_set.array['Epoch']
if len(time) > 1:
time_step = time[1] - time[0]
ts = tf_in_1s.inverse[data_set.timeframe]
if time_step > ts:
log.warning(f'MKTS BUG: wrong timeframe loaded: {time_step}')
print("WTF MKTS")
return {}
else:
ts = timeframe
return array
async def delete_ts( async def delete_ts(
self, self,
@ -662,8 +676,8 @@ async def tsdb_history_update(
): ):
profiler(f'opened feed for {fqsn}') profiler(f'opened feed for {fqsn}')
to_append = feed.hist_shm.array # to_append = feed.hist_shm.array
to_prepend = None # to_prepend = None
if fqsn: if fqsn:
symbol = feed.symbols.get(fqsn) symbol = feed.symbols.get(fqsn)
@ -671,21 +685,21 @@ async def tsdb_history_update(
fqsn = symbol.front_fqsn() fqsn = symbol.front_fqsn()
# diff db history with shm and only write the missing portions # diff db history with shm and only write the missing portions
ohlcv = feed.hist_shm.array # ohlcv = feed.hist_shm.array
# TODO: use pg profiler # TODO: use pg profiler
tsdb_arrays = await storage.read_ohlcv(fqsn) # for secs in (1, 60):
# hist diffing # tsdb_array = await storage.read_ohlcv(
if tsdb_arrays: # fqsn,
for secs in (1, 60): # timeframe=timeframe,
ts = tsdb_arrays.get(secs) # )
if ts is not None and len(ts): # # hist diffing:
# these aren't currently used but can be referenced from # # these aren't currently used but can be referenced from
# within the embedded ipython shell below. # # within the embedded ipython shell below.
to_append = ohlcv[ohlcv['time'] > ts['Epoch'][-1]] # to_append = ohlcv[ohlcv['time'] > ts['Epoch'][-1]]
to_prepend = ohlcv[ohlcv['time'] < ts['Epoch'][0]] # to_prepend = ohlcv[ohlcv['time'] < ts['Epoch'][0]]
profiler('Finished db arrays diffs') # profiler('Finished db arrays diffs')
syms = await storage.client.list_symbols() syms = await storage.client.list_symbols()
log.info(f'Existing tsdb symbol set:\n{pformat(syms)}') log.info(f'Existing tsdb symbol set:\n{pformat(syms)}')