Make `marketstore` storage api timeframe aware

The `Store.load()`, `.read_ohlcv()` and `.write_ohlcv()` and
`.delete_ts()` now can take a `timeframe: Optional[float]` param which
is used to look up the appropriate sampling period table-key from
`marketstore`.
clears_table_events
Tyler Goodlet 2022-09-15 13:52:07 -04:00
parent 87f7a03dbe
commit 0c061d8957
1 changed files with 37 additions and 17 deletions

View File

@ -387,6 +387,7 @@ class Storage:
async def load(
self,
fqsn: str,
timeframe: int,
) -> tuple[
dict[int, np.ndarray], # timeframe (in secs) to series
@ -400,12 +401,16 @@ class Storage:
# on first load we don't need to pull the max
# history per request size worth.
limit=3000,
timeframe=timeframe,
)
log.info(f'Loaded tsdb history {tsdb_arrays}')
if tsdb_arrays:
fastest = list(tsdb_arrays.values())[0]
times = fastest['Epoch']
if len(tsdb_arrays):
# fastest = list(tsdb_arrays.values())[0]
# slowest = list(tsdb_arrays.values())[-1]
hist = tsdb_arrays[timeframe]
times = hist['Epoch']
first, last = times[0], times[-1]
first_tsdb_dt, last_tsdb_dt = map(
pendulum.from_timestamp, [first, last]
@ -420,9 +425,9 @@ class Storage:
end: Optional[int] = None,
limit: int = int(800e3),
) -> tuple[
MarketstoreClient,
Union[dict, np.ndarray]
) -> dict[
int,
Union[dict, np.ndarray],
]:
client = self.client
syms = await client.list_symbols()
@ -430,7 +435,8 @@ class Storage:
if fqsn not in syms:
return {}
tfstr = tf_in_1s[1]
# use the provided timeframe or 1s by default
tfstr = tf_in_1s.get(timeframe, tf_in_1s[1])
params = Params(
symbols=fqsn,
@ -463,39 +469,52 @@ class Storage:
return {}
else:
params.set('timeframe', tfstr)
try:
result = await client.query(params)
except purerpc.grpclib.exceptions.UnknownError:
# indicate there is no history for this timeframe
return {}
# Fill out a `numpy` array-results map keyed by timeframe
arrays = {}
# TODO: it turns out column access on recarrays is actually slower:
# https://jakevdp.github.io/PythonDataScienceHandbook/02.09-structured-data-numpy.html#RecordArrays:-Structured-Arrays-with-a-Twist
# it might make sense to make these structured arrays?
# Fill out a `numpy` array-results map
arrays = {}
for fqsn, data_set in result.by_symbols().items():
arrays.setdefault(fqsn, {})[
tf_in_1s.inverse[data_set.timeframe]
] = data_set.array
return arrays[fqsn][timeframe] if timeframe else arrays[fqsn]
return arrays[fqsn]
async def delete_ts(
self,
key: str,
timeframe: Optional[Union[int, str]] = None,
fmt: str = 'OHLCV',
) -> bool:
client = self.client
syms = await client.list_symbols()
print(syms)
# if key not in syms:
# raise KeyError(f'`{fqsn}` table key not found?')
if key not in syms:
raise KeyError(f'`{key}` table key not found in\n{syms}?')
return await client.destroy(tbk=key)
tbk = mk_tbk((
key,
tf_in_1s.get(timeframe, tf_in_1s[60]),
fmt,
))
return await client.destroy(tbk=tbk)
async def write_ohlcv(
self,
fqsn: str,
ohlcv: np.ndarray,
timeframe: int,
append_and_duplicate: bool = True,
limit: int = int(800e3),
@ -525,7 +544,7 @@ class Storage:
# write to db
resp = await self.client.write(
to_push,
tbk=f'{fqsn}/1Sec/OHLCV',
tbk=f'{fqsn}/{tf_in_1s[timeframe]}/OHLCV',
# NOTE: will will append duplicates
# for the same timestamp-index.
@ -577,6 +596,7 @@ class Storage:
# def delete_range(self, start_dt, end_dt) -> None:
# ...
@acm
async def open_storage_client(
fqsn: str,
@ -642,7 +662,7 @@ async def tsdb_history_update(
):
profiler(f'opened feed for {fqsn}')
to_append = feed.shm.array
to_append = feed.hist_shm.array
to_prepend = None
if fqsn:
@ -651,7 +671,7 @@ async def tsdb_history_update(
fqsn = symbol.front_fqsn()
# diff db history with shm and only write the missing portions
ohlcv = feed.shm.array
ohlcv = feed.hist_shm.array
# TODO: use pg profiler
tsdb_arrays = await storage.read_ohlcv(fqsn)