Lul, fix timeframe key when writing history

There never was any underlying db bug, it was a hardcoded timeframe in
the column series write key.. Now we always assert a matching timeframe
in results.
ib_1m_hist
Tyler Goodlet 2022-10-26 14:20:15 -04:00
parent 286228c290
commit 2b231ba631
1 changed files with 18 additions and 13 deletions

View File

@ -421,7 +421,7 @@ class Storage:
async def read_ohlcv( async def read_ohlcv(
self, self,
fqsn: str, fqsn: str,
timeframe: Optional[Union[int, str]] = None, timeframe: int | str,
end: Optional[int] = None, end: Optional[int] = None,
limit: int = int(800e3), limit: int = int(800e3),
@ -429,11 +429,13 @@ class Storage:
int, int,
Union[dict, np.ndarray], Union[dict, np.ndarray],
]: ]:
client = self.client client = self.client
syms = await client.list_symbols() syms = await client.list_symbols()
if fqsn not in syms: if fqsn not in syms:
return {} raise KeyError('No entry for {fqsn}')
# return {}
# use the provided timeframe or 1s by default # use the provided timeframe or 1s by default
tfstr = tf_in_1s.get(timeframe, tf_in_1s[1]) tfstr = tf_in_1s.get(timeframe, tf_in_1s[1])
@ -462,17 +464,19 @@ class Storage:
data_set = result.by_symbols()[fqsn] data_set = result.by_symbols()[fqsn]
array = data_set.array array = data_set.array
# XXX: market store BUG! # XXX: ensure sample rate is as expected
# for wtv cucked reason seems like despite the params
# timeframe being set to a "lower" value the higher
# valued data set will still be pulled?..
time = data_set.array['Epoch'] time = data_set.array['Epoch']
if len(time) > 1: if len(time) > 1:
time_step = time[1] - time[0] time_step = time[-1] - time[-2]
ts = tf_in_1s.inverse[data_set.timeframe] ts = tf_in_1s.inverse[data_set.timeframe]
if time_step > ts:
log.warning(f'MKTS BUG: wrong timeframe loaded: {time_step}') assert time_step == ts
return {}
# if time_step != ts:
# log.warning(f'MKTS BUG: wrong timeframe loaded: {time_step}')
# if timeframe == 1:
# await tractor.breakpoint()
# return {}
return array return array
@ -525,17 +529,18 @@ class Storage:
m, r = divmod(len(mkts_array), limit) m, r = divmod(len(mkts_array), limit)
tfkey = tf_in_1s[timeframe]
for i in range(m, 1): for i in range(m, 1):
to_push = mkts_array[i-1:i*limit] to_push = mkts_array[i-1:i*limit]
# write to db # write to db
resp = await self.client.write( resp = await self.client.write(
to_push, to_push,
tbk=f'{fqsn}/{tf_in_1s[timeframe]}/OHLCV', tbk=f'{fqsn}/{tfkey}/OHLCV',
# NOTE: will will append duplicates # NOTE: will will append duplicates
# for the same timestamp-index. # for the same timestamp-index.
# TODO: pre deduplicate? # TODO: pre-deduplicate?
isvariablelength=append_and_duplicate, isvariablelength=append_and_duplicate,
) )
@ -554,7 +559,7 @@ class Storage:
# write to db # write to db
resp = await self.client.write( resp = await self.client.write(
to_push, to_push,
tbk=f'{fqsn}/1Sec/OHLCV', tbk=f'{fqsn}/{tfkey}/OHLCV',
# NOTE: will will append duplicates # NOTE: will will append duplicates
# for the same timestamp-index. # for the same timestamp-index.