From 7e951f17cadbc4c308d38c91b8ef871d33be04cc Mon Sep 17 00:00:00 2001 From: Tyler Goodlet Date: Tue, 3 May 2022 11:22:47 -0400 Subject: [PATCH] Support large ohlcv writes via slicing, add struct array keymap --- piker/data/marketstore.py | 72 ++++++++++++++++++++++++++++++--------- 1 file changed, 56 insertions(+), 16 deletions(-) diff --git a/piker/data/marketstore.py b/piker/data/marketstore.py index 95fd80ee..5d930c3f 100644 --- a/piker/data/marketstore.py +++ b/piker/data/marketstore.py @@ -108,6 +108,16 @@ _ohlcv_dt = [ ] +ohlc_key_map = bidict({ + 'Epoch': 'time', + 'Open': 'open', + 'High': 'high', + 'Low': 'low', + 'Close': 'close', + 'Volume': 'volume', +}) + + def mk_tbk(keys: tuple[str, str, str]) -> str: ''' Generate a marketstore table key from a tuple. @@ -201,6 +211,7 @@ class MarketStoreError(Exception): # raise MarketStoreError(err) +# map of seconds ints to "time frame" accepted keys tf_in_1s = bidict({ 1: '1Sec', 60: '1Min', @@ -349,6 +360,7 @@ class Storage: fqsn: str, ohlcv: np.ndarray, append_and_duplicate: bool = True, + limit: int = int(800e3), ) -> None: # build mkts schema compat array for writing @@ -368,25 +380,53 @@ class Storage: 'volume', ]] - # write to db - resp = await self.client.write( - mkts_array, - tbk=f'{fqsn}/1Sec/OHLCV', + m, r = divmod(len(mkts_array), limit) - # NOTE: will will append duplicates - # for the same timestamp-index. - # TODO: pre deduplicate? - isvariablelength=append_and_duplicate, - ) + for i in range(m, 1): + to_push = mkts_array[i-1:i*limit] - log.info( - f'Wrote {mkts_array.size} datums to tsdb\n' - ) + # write to db + resp = await self.client.write( + to_push, + tbk=f'{fqsn}/1Sec/OHLCV', - for resp in resp.responses: - err = resp.error - if err: - raise MarketStoreError(err) + # NOTE: will will append duplicates + # for the same timestamp-index. + # TODO: pre deduplicate? + isvariablelength=append_and_duplicate, + ) + + log.info( + f'Wrote {mkts_array.size} datums to tsdb\n' + ) + + for resp in resp.responses: + err = resp.error + if err: + raise MarketStoreError(err) + + if r: + to_push = mkts_array[m*limit:] + + # write to db + resp = await self.client.write( + to_push, + tbk=f'{fqsn}/1Sec/OHLCV', + + # NOTE: will will append duplicates + # for the same timestamp-index. + # TODO: pre deduplicate? + isvariablelength=append_and_duplicate, + ) + + log.info( + f'Wrote {mkts_array.size} datums to tsdb\n' + ) + + for resp in resp.responses: + err = resp.error + if err: + raise MarketStoreError(err) @acm