Make `marketstore` storage api timeframe aware

The `Store.load()`, `.read_ohlcv()` and `.write_ohlcv()` and
`.delete_ts()` now can take a `timeframe: Optional[float]` param which
is used to look up the appropriate sampling period table-key from
`marketstore`.
ib_1m_hist
Tyler Goodlet 2022-09-15 13:52:07 -04:00
parent 2a866dde65
commit bf7d5e9a71
1 changed files with 37 additions and 17 deletions
piker/data

View File

@ -387,6 +387,7 @@ class Storage:
async def load( async def load(
self, self,
fqsn: str, fqsn: str,
timeframe: int,
) -> tuple[ ) -> tuple[
dict[int, np.ndarray], # timeframe (in secs) to series dict[int, np.ndarray], # timeframe (in secs) to series
@ -400,12 +401,16 @@ class Storage:
# on first load we don't need to pull the max # on first load we don't need to pull the max
# history per request size worth. # history per request size worth.
limit=3000, limit=3000,
timeframe=timeframe,
) )
log.info(f'Loaded tsdb history {tsdb_arrays}') log.info(f'Loaded tsdb history {tsdb_arrays}')
if tsdb_arrays: if len(tsdb_arrays):
fastest = list(tsdb_arrays.values())[0] # fastest = list(tsdb_arrays.values())[0]
times = fastest['Epoch'] # slowest = list(tsdb_arrays.values())[-1]
hist = tsdb_arrays[timeframe]
times = hist['Epoch']
first, last = times[0], times[-1] first, last = times[0], times[-1]
first_tsdb_dt, last_tsdb_dt = map( first_tsdb_dt, last_tsdb_dt = map(
pendulum.from_timestamp, [first, last] pendulum.from_timestamp, [first, last]
@ -420,9 +425,9 @@ class Storage:
end: Optional[int] = None, end: Optional[int] = None,
limit: int = int(800e3), limit: int = int(800e3),
) -> tuple[ ) -> dict[
MarketstoreClient, int,
Union[dict, np.ndarray] Union[dict, np.ndarray],
]: ]:
client = self.client client = self.client
syms = await client.list_symbols() syms = await client.list_symbols()
@ -430,7 +435,8 @@ class Storage:
if fqsn not in syms: if fqsn not in syms:
return {} return {}
tfstr = tf_in_1s[1] # use the provided timeframe or 1s by default
tfstr = tf_in_1s.get(timeframe, tf_in_1s[1])
params = Params( params = Params(
symbols=fqsn, symbols=fqsn,
@ -463,39 +469,52 @@ class Storage:
return {} return {}
else: else:
params.set('timeframe', tfstr)
try:
result = await client.query(params) result = await client.query(params)
except purerpc.grpclib.exceptions.UnknownError:
# indicate there is no history for this timeframe
return {}
# Fill out a `numpy` array-results map keyed by timeframe
arrays = {}
# TODO: it turns out column access on recarrays is actually slower: # TODO: it turns out column access on recarrays is actually slower:
# https://jakevdp.github.io/PythonDataScienceHandbook/02.09-structured-data-numpy.html#RecordArrays:-Structured-Arrays-with-a-Twist # https://jakevdp.github.io/PythonDataScienceHandbook/02.09-structured-data-numpy.html#RecordArrays:-Structured-Arrays-with-a-Twist
# it might make sense to make these structured arrays? # it might make sense to make these structured arrays?
# Fill out a `numpy` array-results map
arrays = {}
for fqsn, data_set in result.by_symbols().items(): for fqsn, data_set in result.by_symbols().items():
arrays.setdefault(fqsn, {})[ arrays.setdefault(fqsn, {})[
tf_in_1s.inverse[data_set.timeframe] tf_in_1s.inverse[data_set.timeframe]
] = data_set.array ] = data_set.array
return arrays[fqsn][timeframe] if timeframe else arrays[fqsn] return arrays[fqsn]
async def delete_ts( async def delete_ts(
self, self,
key: str, key: str,
timeframe: Optional[Union[int, str]] = None, timeframe: Optional[Union[int, str]] = None,
fmt: str = 'OHLCV',
) -> bool: ) -> bool:
client = self.client client = self.client
syms = await client.list_symbols() syms = await client.list_symbols()
print(syms) print(syms)
# if key not in syms: if key not in syms:
# raise KeyError(f'`{fqsn}` table key not found?') raise KeyError(f'`{key}` table key not found in\n{syms}?')
return await client.destroy(tbk=key) tbk = mk_tbk((
key,
tf_in_1s.get(timeframe, tf_in_1s[60]),
fmt,
))
return await client.destroy(tbk=tbk)
async def write_ohlcv( async def write_ohlcv(
self, self,
fqsn: str, fqsn: str,
ohlcv: np.ndarray, ohlcv: np.ndarray,
timeframe: int,
append_and_duplicate: bool = True, append_and_duplicate: bool = True,
limit: int = int(800e3), limit: int = int(800e3),
@ -525,7 +544,7 @@ class Storage:
# write to db # write to db
resp = await self.client.write( resp = await self.client.write(
to_push, to_push,
tbk=f'{fqsn}/1Sec/OHLCV', tbk=f'{fqsn}/{tf_in_1s[timeframe]}/OHLCV',
# NOTE: will will append duplicates # NOTE: will will append duplicates
# for the same timestamp-index. # for the same timestamp-index.
@ -577,6 +596,7 @@ class Storage:
# def delete_range(self, start_dt, end_dt) -> None: # def delete_range(self, start_dt, end_dt) -> None:
# ... # ...
@acm @acm
async def open_storage_client( async def open_storage_client(
fqsn: str, fqsn: str,
@ -642,7 +662,7 @@ async def tsdb_history_update(
): ):
profiler(f'opened feed for {fqsn}') profiler(f'opened feed for {fqsn}')
to_append = feed.shm.array to_append = feed.hist_shm.array
to_prepend = None to_prepend = None
if fqsn: if fqsn:
@ -651,7 +671,7 @@ async def tsdb_history_update(
fqsn = symbol.front_fqsn() fqsn = symbol.front_fqsn()
# diff db history with shm and only write the missing portions # diff db history with shm and only write the missing portions
ohlcv = feed.shm.array ohlcv = feed.hist_shm.array
# TODO: use pg profiler # TODO: use pg profiler
tsdb_arrays = await storage.read_ohlcv(fqsn) tsdb_arrays = await storage.read_ohlcv(fqsn)