Support large ohlcv writes via slicing, add struct array keymap
parent
2e6b7da4bc
commit
303a5cc66c
|
@ -108,6 +108,16 @@ _ohlcv_dt = [
|
||||||
]
|
]
|
||||||
|
|
||||||
|
|
||||||
|
ohlc_key_map = bidict({
|
||||||
|
'Epoch': 'time',
|
||||||
|
'Open': 'open',
|
||||||
|
'High': 'high',
|
||||||
|
'Low': 'low',
|
||||||
|
'Close': 'close',
|
||||||
|
'Volume': 'volume',
|
||||||
|
})
|
||||||
|
|
||||||
|
|
||||||
def mk_tbk(keys: tuple[str, str, str]) -> str:
|
def mk_tbk(keys: tuple[str, str, str]) -> str:
|
||||||
'''
|
'''
|
||||||
Generate a marketstore table key from a tuple.
|
Generate a marketstore table key from a tuple.
|
||||||
|
@ -201,6 +211,7 @@ class MarketStoreError(Exception):
|
||||||
# raise MarketStoreError(err)
|
# raise MarketStoreError(err)
|
||||||
|
|
||||||
|
|
||||||
|
# map of seconds ints to "time frame" accepted keys
|
||||||
tf_in_1s = bidict({
|
tf_in_1s = bidict({
|
||||||
1: '1Sec',
|
1: '1Sec',
|
||||||
60: '1Min',
|
60: '1Min',
|
||||||
|
@ -349,6 +360,7 @@ class Storage:
|
||||||
fqsn: str,
|
fqsn: str,
|
||||||
ohlcv: np.ndarray,
|
ohlcv: np.ndarray,
|
||||||
append_and_duplicate: bool = True,
|
append_and_duplicate: bool = True,
|
||||||
|
limit: int = int(800e3),
|
||||||
|
|
||||||
) -> None:
|
) -> None:
|
||||||
# build mkts schema compat array for writing
|
# build mkts schema compat array for writing
|
||||||
|
@ -368,25 +380,53 @@ class Storage:
|
||||||
'volume',
|
'volume',
|
||||||
]]
|
]]
|
||||||
|
|
||||||
# write to db
|
m, r = divmod(len(mkts_array), limit)
|
||||||
resp = await self.client.write(
|
|
||||||
mkts_array,
|
|
||||||
tbk=f'{fqsn}/1Sec/OHLCV',
|
|
||||||
|
|
||||||
# NOTE: will will append duplicates
|
for i in range(m, 1):
|
||||||
# for the same timestamp-index.
|
to_push = mkts_array[i-1:i*limit]
|
||||||
# TODO: pre deduplicate?
|
|
||||||
isvariablelength=append_and_duplicate,
|
|
||||||
)
|
|
||||||
|
|
||||||
log.info(
|
# write to db
|
||||||
f'Wrote {mkts_array.size} datums to tsdb\n'
|
resp = await self.client.write(
|
||||||
)
|
to_push,
|
||||||
|
tbk=f'{fqsn}/1Sec/OHLCV',
|
||||||
|
|
||||||
for resp in resp.responses:
|
# NOTE: will will append duplicates
|
||||||
err = resp.error
|
# for the same timestamp-index.
|
||||||
if err:
|
# TODO: pre deduplicate?
|
||||||
raise MarketStoreError(err)
|
isvariablelength=append_and_duplicate,
|
||||||
|
)
|
||||||
|
|
||||||
|
log.info(
|
||||||
|
f'Wrote {mkts_array.size} datums to tsdb\n'
|
||||||
|
)
|
||||||
|
|
||||||
|
for resp in resp.responses:
|
||||||
|
err = resp.error
|
||||||
|
if err:
|
||||||
|
raise MarketStoreError(err)
|
||||||
|
|
||||||
|
if r:
|
||||||
|
to_push = mkts_array[m*limit:]
|
||||||
|
|
||||||
|
# write to db
|
||||||
|
resp = await self.client.write(
|
||||||
|
to_push,
|
||||||
|
tbk=f'{fqsn}/1Sec/OHLCV',
|
||||||
|
|
||||||
|
# NOTE: will will append duplicates
|
||||||
|
# for the same timestamp-index.
|
||||||
|
# TODO: pre deduplicate?
|
||||||
|
isvariablelength=append_and_duplicate,
|
||||||
|
)
|
||||||
|
|
||||||
|
log.info(
|
||||||
|
f'Wrote {mkts_array.size} datums to tsdb\n'
|
||||||
|
)
|
||||||
|
|
||||||
|
for resp in resp.responses:
|
||||||
|
err = resp.error
|
||||||
|
if err:
|
||||||
|
raise MarketStoreError(err)
|
||||||
|
|
||||||
|
|
||||||
@acm
|
@acm
|
||||||
|
|
Loading…
Reference in New Issue