From 7b4f4bf804d2364d31be3527cfe95d6a587aca90 Mon Sep 17 00:00:00 2001 From: Tyler Goodlet Date: Fri, 2 Jun 2023 12:02:49 -0400 Subject: [PATCH] First draft `.storage.nativedb.` using parquet files After much frustration with a particular tsdb (cough) this instead implements a new native-file (and apache tech based) backend which stores time series in parquet files (for now) using the `polars` apis (since we plan to use that lib as well for processing). Note this code is currently **very** rough and in draft mode. Details: - add conversion routines for going from `polars.DataFrame` to `numpy.ndarray` and back. - lay out a simple file-name as series key symbology: `fqme..parquet`, though probably it will evolve. - implement the entire `StorageClient` interface as it stands. - adjust `storage.cli` cmds to instead expect to use this new backend, which means it's a complete mess XD Main benefits/motivation: - wayy faster load times with no "datums to load limit" required. - smaller space footprint and we haven't even touched compression settings yet! - wayyy more compatible with other systems which can lever the apache ecosystem. - gives us finer grained control over the filesystem usage so we can choose to swap out stuff like the replication system or networking access. --- piker/storage/__init__.py | 5 +- piker/storage/cli.py | 174 ++++++++------- piker/storage/marketstore/__init__.py | 2 +- piker/storage/nativedb.py | 309 ++++++++++++++++++++++++++ setup.py | 2 + 5 files changed, 414 insertions(+), 78 deletions(-) create mode 100644 piker/storage/nativedb.py diff --git a/piker/storage/__init__.py b/piker/storage/__init__.py index 21e258a6..cca77c69 100644 --- a/piker/storage/__init__.py +++ b/piker/storage/__init__.py @@ -157,7 +157,7 @@ def get_storagemod(name: str) -> ModuleType: @acm async def open_storage_client( - name: str | None = None, + name: str = 'nativedb', ) -> tuple[ModuleType, StorageClient]: ''' @@ -168,6 +168,9 @@ async def open_storage_client( # load root config and any tsdb user defined settings conf, path = config.load('conf', touch_if_dne=True) + + # TODO: maybe not under a "network" section.. since + # no more chitty mkts.. net = conf.get('network') if net: tsdbconf = net.get('tsdb') diff --git a/piker/storage/cli.py b/piker/storage/cli.py index 3afb696a..11d2b490 100644 --- a/piker/storage/cli.py +++ b/piker/storage/cli.py @@ -93,7 +93,8 @@ async def del_ts_by_timeframe( ) -> None: - resp = await client.delete_ts(fqme, timeframe) + path: Path = await client.delete_ts(fqme, timeframe) + log.info(f'Deleted {path}') # TODO: encapsulate per backend errors.. # - MEGA LOL, apparently the symbols don't @@ -105,15 +106,15 @@ async def del_ts_by_timeframe( # for the delete errurz..llululu # if fqme not in syms: # log.error(f'Pair {fqme} dne in DB') - msgish = resp.ListFields()[0][1] - if 'error' in str(msgish): - log.error( - f'Deletion error:\n' - f'backend: {client.name}\n' - f'fqme: {fqme}\n' - f'timeframe: {timeframe}s\n' - f'Error msg:\n\n{msgish}\n', - ) + # msgish = resp.ListFields()[0][1] + # if 'error' in str(msgish): + # log.error( + # f'Deletion error:\n' + # f'backend: {client.name}\n' + # f'fqme: {fqme}\n' + # f'timeframe: {timeframe}s\n' + # f'Error msg:\n\n{msgish}\n', + # ) @store.command() @@ -166,85 +167,106 @@ def read( fqme: str, limit: int = int(800e3), - client_type: str = 'async', + # client_type: str = 'async', ) -> np.ndarray: - end: int | None = None + # end: int | None = None + # import tractor + from .nativedb import get_client - if client_type == 'sync': - import pymarketstore as pymkts - cli = pymkts.Client() + async def main(): + async with get_client() as client: + syms: list[str] = await client.list_keys() - - while end != 0: - param = pymkts.Params( + ( + history, + first_dt, + last_dt, + ) = await client.load( fqme, - '1Min', - 'OHLCV', - limit=limit, - # limit_from_start=True, - end=end, + 60, ) - if end is not None: - breakpoint() - reply = cli.query(param) - ds: pymkts.results.DataSet = reply.first() - array: np.ndarray = ds.array - - print(f'loaded {len(array)}-len array:\n{array}') - - times = array['Epoch'] - end: float = float(times[0]) - dt = pendulum.from_timestamp(end) - # end: str = dt.isoformat('T') + assert first_dt < last_dt + print(f'{fqme} SIZE -> {history.size}') breakpoint() - print( - f'trying to load next {limit} datums frame starting @ {dt}' - ) - else: - from anyio_marketstore import ( # noqa - open_marketstore_client, - MarketstoreClient, - Params, - ) - async def main(): + # await tractor.breakpoint() - end: int | None = None + trio.run(main) - async with open_marketstore_client( - 'localhost', - 5995, - ) as client: + # if client_type == 'sync': + # import pymarketstore as pymkts + # cli = pymkts.Client() - while end != 0: - params = Params( - symbols=fqme, - # timeframe=tfstr, - timeframe='1Min', - attrgroup='OHLCV', - end=end, - # limit_from_start=True, - # TODO: figure the max limit here given the - # ``purepc`` msg size limit of purerpc: 33554432 - limit=limit, - ) + # while end != 0: + # param = pymkts.Params( + # fqme, + # '1Min', + # 'OHLCV', + # limit=limit, + # # limit_from_start=True, + # end=end, + # ) + # if end is not None: + # breakpoint() + # reply = cli.query(param) + # ds: pymkts.results.DataSet = reply.first() + # array: np.ndarray = ds.array - if end is not None: - breakpoint() - result = await client.query(params) - data_set = result.by_symbols()[fqme] - array = data_set.array - times = array['Epoch'] - end: float = float(times[0]) - dt = pendulum.from_timestamp(end) - breakpoint() - print( - f'trying to load next {limit} datums frame starting @ {dt}' - ) + # print(f'loaded {len(array)}-len array:\n{array}') - trio.run(main) + # times = array['Epoch'] + # end: float = float(times[0]) + # dt = pendulum.from_timestamp(end) + # # end: str = dt.isoformat('T') + # breakpoint() + # print( + # f'trying to load next {limit} datums frame starting @ {dt}' + # ) + # else: + # from anyio_marketstore import ( # noqa + # open_marketstore_client, + # MarketstoreClient, + # Params, + # ) + # async def main(): + + # end: int | None = None + + # async with open_marketstore_client( + # 'localhost', + # 5995, + # ) as client: + + # while end != 0: + # params = Params( + # symbols=fqme, + # # timeframe=tfstr, + # timeframe='1Min', + # attrgroup='OHLCV', + # end=end, + # # limit_from_start=True, + + # # TODO: figure the max limit here given the + # # ``purepc`` msg size limit of purerpc: 33554432 + # limit=limit, + # ) + + # if end is not None: + # breakpoint() + # result = await client.query(params) + # data_set = result.by_symbols()[fqme] + # array = data_set.array + # times = array['Epoch'] + # end: float = float(times[0]) + # dt = pendulum.from_timestamp(end) + # breakpoint() + # print( + # f'trying to load next {limit} datums frame starting @ {dt}' + # ) + + # trio.run(main) @store.command() @@ -260,7 +282,7 @@ def clone( import polars as pl # open existing shm buffer for kucoin backend - key: str = 'piker.brokerd[d07c9bb7-b720-41].tlosusdt.kucoin.hist' + key: str = 'piker.brokerd[a9e7a4fe-39ae-44].btcusdt.binance.hist' shmpath: Path = Path('/dev/shm') / key assert shmpath.is_file() diff --git a/piker/storage/marketstore/__init__.py b/piker/storage/marketstore/__init__.py index d435fb66..416ef7eb 100644 --- a/piker/storage/marketstore/__init__.py +++ b/piker/storage/marketstore/__init__.py @@ -44,7 +44,7 @@ from anyio_marketstore import ( Params, ) import pendulum -import purerpc +# import purerpc from piker.service.marketstore import ( MarketstoreClient, diff --git a/piker/storage/nativedb.py b/piker/storage/nativedb.py new file mode 100644 index 00000000..8b151111 --- /dev/null +++ b/piker/storage/nativedb.py @@ -0,0 +1,309 @@ +# piker: trading gear for hackers +# Copyright (C) Tyler Goodlet (in stewardship for pikers) + +# This program is free software: you can redistribute it and/or modify +# it under the terms of the GNU Affero General Public License as published by +# the Free Software Foundation, either version 3 of the License, or +# (at your option) any later version. + +# This program is distributed in the hope that it will be useful, +# but WITHOUT ANY WARRANTY; without even the implied warranty of +# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the +# GNU Affero General Public License for more details. + +# You should have received a copy of the GNU Affero General Public License +# along with this program. If not, see . + +''' +`nativedb`: a lulzy Apache-parquet file manager (that some might + call a poor man's tsdb). + +AKA a `piker`-native file-system native "time series database" +without needing an extra process and no standard TSDB features, YET! + +''' +# TODO: like there's soo much.. +# - better name like "parkdb" or "nativedb" (lel)? bundle this lib with +# others to make full system: +# - tractor for failover and reliablity? +# - borg for replication and sync? +# +# - use `fastparquet` for appends: +# https://fastparquet.readthedocs.io/en/latest/api.html#fastparquet.write +# (presuming it's actually faster then overwrites and +# makes sense in terms of impl?) +# +# - use `polars` support for lazy scanning, processing and schema +# validation? +# - https://pola-rs.github.io/polars-book/user-guide/io/parquet/#scan +# - https://pola-rs.github.io/polars-book/user-guide/concepts/lazy-vs-eager/ +# - consider delta writes for appends? +# - https://github.com/pola-rs/polars/blob/main/py-polars/polars/dataframe/frame.py#L3232 +# - consider multi-file appends with appropriate time-range naming? +# - https://pola-rs.github.io/polars-book/user-guide/io/multiple/ +# +# - use `borg` for replication? +# - https://borgbackup.readthedocs.io/en/stable/quickstart.html#remote-repositories +# - https://github.com/borgbackup/borg +# - https://borgbackup.readthedocs.io/en/stable/faq.html#usage-limitations +# - https://github.com/borgbackup/community +# - https://github.com/spslater/borgapi +# - https://nixos.wiki/wiki/ZFS +from __future__ import annotations +from contextlib import asynccontextmanager as acm +from datetime import datetime +from pathlib import Path +import time + +# from bidict import bidict +# import tractor +import numpy as np +import polars as pl +from pendulum import ( + from_timestamp, +) + +from piker import config +from piker.data import def_iohlcv_fields +# from piker.data import ShmArray +from piker.log import get_logger +# from .._profile import Profiler + + +log = get_logger('storage.nativedb') + + +# NOTE: thanks to this SO answer for the below conversion routines +# to go from numpy struct-arrays to polars dataframes and back: +# https://stackoverflow.com/a/72054819 +def np2pl(array: np.ndarray) -> pl.DataFrame: + return pl.DataFrame({ + field_name: array[field_name] + for field_name in array.dtype.fields + }) + + +def pl2np( + df: pl.DataFrame, + dtype: np.dtype, +) -> np.ndarray: + + # Create numpy struct array of the correct size and dtype + # and loop through df columns to fill in array fields. + array = np.empty( + df.height, + dtype, + ) + for field, col in zip( + dtype.fields, + df.columns, + ): + array[field] = df.get_column(col).to_numpy() + + return array + + +def mk_ohlcv_shm_keyed_filepath( + fqme: str, + period: float, # ow known as the "timeframe" + # shm: ShmArray, + datadir: Path, + +) -> str: + + # calc ohlc sample period for naming + # ohlcv: np.ndarray = shm.array + # times: np.ndarray = ohlcv['time'] + # period: float = times[-1] - times[-2] + if period < 1.: + raise ValueError('Sample period should be >= 1.!?') + + period_s: str = f'{period}s' + path: Path = datadir / f'{fqme}.ohlcv{period_s}.parquet' + return path + + +ohlc_key_map = None + + +class NativeStorageClient: + ''' + High level storage api for OHLCV time series stored in + a (modern) filesystem as apache parquet files B) + + Part of a grander scheme to use arrow and parquet as our main + lowlevel data framework: https://arrow.apache.org/faq/. + + ''' + name: str = 'nativedb' + + def __init__( + self, + datadir: Path, + + ) -> None: + self._datadir = datadir + self._index: dict[str, dict] = {} + + # series' cache from tsdb reads + self._dfs: dict[str, pl.DataFrame] = {} + + @property + def address(self) -> str: + return self._datadir.as_uri() + + @property + def cardinality(self) -> int: + return len(self._index) + + # @property + # def compression(self) -> str: + # ... + + async def list_keys(self) -> list[str]: + return list(self._index) + + def index_files(self): + for path in self._datadir.iterdir(): + if 'borked' in path.name: + continue + + key: str = path.name.rstrip('.parquet') + fqme, _, descr = key.rpartition('.') + prefix, _, suffix = descr.partition('ohlcv') + period: int = int(suffix.strip('s')) + + # cache description data + self._index[fqme] = { + 'path': path, + 'period': period, + } + + return self._index + + + # async def search_keys(self, pattern: str) -> list[str]: + # ''' + # Search for time series key in the storage backend. + + # ''' + # ... + + # async def write_ticks(self, ticks: list) -> None: + # ... + + async def load( + self, + fqme: str, + timeframe: int, + + ) -> tuple[ + np.ndarray, # timeframe sampled array-series + datetime | None, # first dt + datetime | None, # last dt + ] | None: + try: + array: np.ndarray = await self.read_ohlcv( + fqme, + timeframe, + ) + except FileNotFoundError: + return None + + times = array['time'] + return ( + array, + from_timestamp(times[0]), + from_timestamp(times[-1]), + ) + + async def read_ohlcv( + self, + fqme: str, + timeframe: int | str, + end: float | None = None, # epoch or none + # limit: int = int(200e3), + + ) -> np.ndarray: + path: Path = mk_ohlcv_shm_keyed_filepath( + fqme=fqme, + period=timeframe, + datadir=self._datadir, + ) + df: pl.DataFrame = pl.read_parquet(path) + + # TODO: filter by end and limit inputs + # times: pl.Series = df['time'] + + return pl2np( + df, + dtype=np.dtype(def_iohlcv_fields), + ) + + async def write_ohlcv( + self, + fqme: str, + ohlcv: np.ndarray, + timeframe: int, + # limit: int = int(800e3), + + ) -> Path: + + path: Path = mk_ohlcv_shm_keyed_filepath( + fqme=fqme, + period=timeframe, + datadir=self._datadir, + ) + df: pl.DataFrame = np2pl(ohlcv) + + # TODO: use a proper profiler + start = time.time() + df.write_parquet(path) + delay: float = round( + time.time() - start, + ndigits=6, + ) + print( + f'parquet write took {delay} secs\n' + f'file path: {path}' + ) + return path + + async def delete_ts( + self, + key: str, + timeframe: int | None = None, + + ) -> bool: + path: Path = mk_ohlcv_shm_keyed_filepath( + fqme=key, + period=timeframe, + datadir=self._datadir, + ) + path.unlink() + return path + + +@acm +async def get_client( + + # TODO: eventually support something something apache arrow + # transport over ssh something..? + # host: str | None = None, + + **kwargs, + +) -> NativeStorageClient: + ''' + Load a ``anyio_marketstore`` grpc client connected + to an existing ``marketstore`` server. + + ''' + datadir: Path = config.get_conf_dir() / 'nativedb' + if not datadir.is_dir(): + log.info(f'Creating `nativedb` director: {datadir}') + datadir.mkdir() + + client = NativeStorageClient(datadir) + client.index_files() + yield client diff --git a/setup.py b/setup.py index c63622b2..cb5d7df8 100755 --- a/setup.py +++ b/setup.py @@ -41,6 +41,7 @@ setup( 'piker = piker.cli:cli', 'pikerd = piker.cli:pikerd', 'ledger = piker.accounting.cli:ledger', + # 'store = piker.storage.cli:store', ] }, install_requires=[ @@ -78,6 +79,7 @@ setup( 'cython', 'numpy', 'numba', + 'polars', # dataframes # UI 'PyQt5',