diff --git a/piker/storage/nativedb.py b/piker/storage/nativedb.py index 04c4935b..dc2ee7a1 100644 --- a/piker/storage/nativedb.py +++ b/piker/storage/nativedb.py @@ -236,6 +236,22 @@ class NativeStorageClient: datadir=self._datadir, ) + def _cache_df( + self, + fqme: str, + df: pl.DataFrame, + timeframe: float, + ) -> None: + # cache df for later usage since we (currently) need to + # convert to np.ndarrays to push to our `ShmArray` rt + # buffers subsys but later we may operate entirely on + # pyarrow arrays/buffers so keeping the dfs around for + # a variety of purposes is handy. + self._dfs.setdefault( + timeframe, + {}, + )[fqme] = df + async def read_ohlcv( self, fqme: str, @@ -250,16 +266,11 @@ class NativeStorageClient: ) df: pl.DataFrame = pl.read_parquet(path) - # cache df for later usage since we (currently) need to - # convert to np.ndarrays to push to our `ShmArray` rt - # buffers subsys but later we may operate entirely on - # pyarrow arrays/buffers so keeping the dfs around for - # a variety of purposes is handy. - self._dfs.setdefault( - timeframe, - {}, - )[fqme] = df - + self._cache_df( + fqme=fqme, + df=df, + timeframe=timeframe, + ) # TODO: filter by end and limit inputs # times: pl.Series = df['time'] array: np.ndarray = tsp.pl2np( @@ -272,11 +283,15 @@ class NativeStorageClient: self, fqme: str, period: int = 60, + load_from_offline: bool = True, ) -> pl.DataFrame: try: return self._dfs[period][fqme] except KeyError: + if not load_from_offline: + raise + await self.read_ohlcv(fqme, period) return self._dfs[period][fqme] @@ -302,11 +317,19 @@ class NativeStorageClient: else: df = ohlcv + self._cache_df( + fqme=fqme, + df=df, + timeframe=timeframe, + ) + # TODO: in terms of managing the ultra long term data - # - use a proper profiler to measure all this IO and + # -[ ] use a proper profiler to measure all this IO and # roundtripping! - # - try out ``fastparquet``'s append writing: - # https://fastparquet.readthedocs.io/en/latest/api.html#fastparquet.write + # -[ ] implement parquet append!? see issue: + # https://github.com/pikers/piker/issues/536 + # -[ ] try out ``fastparquet``'s append writing: + # https://fastparquet.readthedocs.io/en/latest/api.html#fastparquet.write start = time.time() df.write_parquet(path) delay: float = round(