Add `NativeStorageClient._cache_df()` use it in `.write_ohlcv()` for caching on writes as well

distribute_dis
Tyler Goodlet 2023-12-11 20:10:53 -05:00
parent 49c458710e
commit f7a8d79b7b
1 changed files with 36 additions and 13 deletions

View File

@ -236,6 +236,22 @@ class NativeStorageClient:
datadir=self._datadir, datadir=self._datadir,
) )
def _cache_df(
self,
fqme: str,
df: pl.DataFrame,
timeframe: float,
) -> None:
# cache df for later usage since we (currently) need to
# convert to np.ndarrays to push to our `ShmArray` rt
# buffers subsys but later we may operate entirely on
# pyarrow arrays/buffers so keeping the dfs around for
# a variety of purposes is handy.
self._dfs.setdefault(
timeframe,
{},
)[fqme] = df
async def read_ohlcv( async def read_ohlcv(
self, self,
fqme: str, fqme: str,
@ -250,16 +266,11 @@ class NativeStorageClient:
) )
df: pl.DataFrame = pl.read_parquet(path) df: pl.DataFrame = pl.read_parquet(path)
# cache df for later usage since we (currently) need to self._cache_df(
# convert to np.ndarrays to push to our `ShmArray` rt fqme=fqme,
# buffers subsys but later we may operate entirely on df=df,
# pyarrow arrays/buffers so keeping the dfs around for timeframe=timeframe,
# a variety of purposes is handy. )
self._dfs.setdefault(
timeframe,
{},
)[fqme] = df
# TODO: filter by end and limit inputs # TODO: filter by end and limit inputs
# times: pl.Series = df['time'] # times: pl.Series = df['time']
array: np.ndarray = tsp.pl2np( array: np.ndarray = tsp.pl2np(
@ -272,11 +283,15 @@ class NativeStorageClient:
self, self,
fqme: str, fqme: str,
period: int = 60, period: int = 60,
load_from_offline: bool = True,
) -> pl.DataFrame: ) -> pl.DataFrame:
try: try:
return self._dfs[period][fqme] return self._dfs[period][fqme]
except KeyError: except KeyError:
if not load_from_offline:
raise
await self.read_ohlcv(fqme, period) await self.read_ohlcv(fqme, period)
return self._dfs[period][fqme] return self._dfs[period][fqme]
@ -302,11 +317,19 @@ class NativeStorageClient:
else: else:
df = ohlcv df = ohlcv
self._cache_df(
fqme=fqme,
df=df,
timeframe=timeframe,
)
# TODO: in terms of managing the ultra long term data # TODO: in terms of managing the ultra long term data
# - use a proper profiler to measure all this IO and # -[ ] use a proper profiler to measure all this IO and
# roundtripping! # roundtripping!
# - try out ``fastparquet``'s append writing: # -[ ] implement parquet append!? see issue:
# https://fastparquet.readthedocs.io/en/latest/api.html#fastparquet.write # https://github.com/pikers/piker/issues/536
# -[ ] try out ``fastparquet``'s append writing:
# https://fastparquet.readthedocs.io/en/latest/api.html#fastparquet.write
start = time.time() start = time.time()
df.write_parquet(path) df.write_parquet(path)
delay: float = round( delay: float = round(