Support large ohlcv writes via slicing, add struct array keymap

l1_precision_fix
Tyler Goodlet 2022-05-03 11:22:47 -04:00
parent fcb85873de
commit 7e951f17ca
1 changed files with 56 additions and 16 deletions

View File

@ -108,6 +108,16 @@ _ohlcv_dt = [
] ]
ohlc_key_map = bidict({
'Epoch': 'time',
'Open': 'open',
'High': 'high',
'Low': 'low',
'Close': 'close',
'Volume': 'volume',
})
def mk_tbk(keys: tuple[str, str, str]) -> str: def mk_tbk(keys: tuple[str, str, str]) -> str:
''' '''
Generate a marketstore table key from a tuple. Generate a marketstore table key from a tuple.
@ -201,6 +211,7 @@ class MarketStoreError(Exception):
# raise MarketStoreError(err) # raise MarketStoreError(err)
# map of seconds ints to "time frame" accepted keys
tf_in_1s = bidict({ tf_in_1s = bidict({
1: '1Sec', 1: '1Sec',
60: '1Min', 60: '1Min',
@ -349,6 +360,7 @@ class Storage:
fqsn: str, fqsn: str,
ohlcv: np.ndarray, ohlcv: np.ndarray,
append_and_duplicate: bool = True, append_and_duplicate: bool = True,
limit: int = int(800e3),
) -> None: ) -> None:
# build mkts schema compat array for writing # build mkts schema compat array for writing
@ -368,25 +380,53 @@ class Storage:
'volume', 'volume',
]] ]]
# write to db m, r = divmod(len(mkts_array), limit)
resp = await self.client.write(
mkts_array,
tbk=f'{fqsn}/1Sec/OHLCV',
# NOTE: will will append duplicates for i in range(m, 1):
# for the same timestamp-index. to_push = mkts_array[i-1:i*limit]
# TODO: pre deduplicate?
isvariablelength=append_and_duplicate,
)
log.info( # write to db
f'Wrote {mkts_array.size} datums to tsdb\n' resp = await self.client.write(
) to_push,
tbk=f'{fqsn}/1Sec/OHLCV',
for resp in resp.responses: # NOTE: will will append duplicates
err = resp.error # for the same timestamp-index.
if err: # TODO: pre deduplicate?
raise MarketStoreError(err) isvariablelength=append_and_duplicate,
)
log.info(
f'Wrote {mkts_array.size} datums to tsdb\n'
)
for resp in resp.responses:
err = resp.error
if err:
raise MarketStoreError(err)
if r:
to_push = mkts_array[m*limit:]
# write to db
resp = await self.client.write(
to_push,
tbk=f'{fqsn}/1Sec/OHLCV',
# NOTE: will will append duplicates
# for the same timestamp-index.
# TODO: pre deduplicate?
isvariablelength=append_and_duplicate,
)
log.info(
f'Wrote {mkts_array.size} datums to tsdb\n'
)
for resp in resp.responses:
err = resp.error
if err:
raise MarketStoreError(err)
@acm @acm