A PoC tsdb prototype: `parqdb` using `polars`

Turns out just (over)writing `.parquet` files with >= 1M datums is like
less then a second, and we can likely speed up appends using
`fastparquet` (usage coming soon).

Includes:
- a new `clone` CLI subcmd to test this all out by ad-hoc copy of
  (literally hardcoded to a daemon-actor specific shm allocation X) an
  existing `/dev/shm/<ShmArray>` and push to `.parquet` file.
  - code to convert from our `ShmArray.array: np.ndarray` ->
    `polars.DataFrame` (thanks SO).
  - timing checks around the file IO and np -> polars conversion.
- a `read` subcmd which i was using to test the sync `pymarketstore`
  client against our async one to see if the issues from
  https://github.com/pikers/piker/issues/443 were resolved, but nope!
basic_buy_bot
Tyler Goodlet 2023-05-31 18:39:41 -04:00
parent 7d1cc47db9
commit 94733c4a0b
1 changed files with 189 additions and 4 deletions

View File

@ -19,13 +19,18 @@ Storage middle-ware CLIs.
""" """
from __future__ import annotations from __future__ import annotations
from pathlib import Path
from typing import TYPE_CHECKING from typing import TYPE_CHECKING
import trio
import numpy as np
import pendulum
from rich.console import Console from rich.console import Console
import trio
# from rich.markdown import Markdown # from rich.markdown import Markdown
import typer import typer
from ..cli import cli from piker.service import open_piker_runtime
from piker.cli import cli
from . import ( from . import (
log, log,
) )
@ -44,7 +49,7 @@ def ls(
help='Storage backends to query, default is all.' help='Storage backends to query, default is all.'
), ),
): ):
from piker.service import open_piker_runtime # from piker.service import open_piker_runtime
from . import ( from . import (
__tsdbs__, __tsdbs__,
open_storage_client, open_storage_client,
@ -132,7 +137,6 @@ def delete(
``symbols``. ``symbols``.
''' '''
from piker.service import open_piker_runtime
from . import open_storage_client from . import open_storage_client
async def main(symbols: list[str]): async def main(symbols: list[str]):
@ -157,5 +161,186 @@ def delete(
trio.run(main, symbols) trio.run(main, symbols)
@store.command()
def read(
fqme: str,
limit: int = int(800e3),
client_type: str = 'async',
) -> np.ndarray:
end: int | None = None
if client_type == 'sync':
import pymarketstore as pymkts
cli = pymkts.Client()
while end != 0:
param = pymkts.Params(
fqme,
'1Min',
'OHLCV',
limit=limit,
# limit_from_start=True,
end=end,
)
if end is not None:
breakpoint()
reply = cli.query(param)
ds: pymkts.results.DataSet = reply.first()
array: np.ndarray = ds.array
print(f'loaded {len(array)}-len array:\n{array}')
times = array['Epoch']
end: float = float(times[0])
dt = pendulum.from_timestamp(end)
# end: str = dt.isoformat('T')
breakpoint()
print(
f'trying to load next {limit} datums frame starting @ {dt}'
)
else:
from anyio_marketstore import ( # noqa
open_marketstore_client,
MarketstoreClient,
Params,
)
async def main():
end: int | None = None
async with open_marketstore_client(
'localhost',
5995,
) as client:
while end != 0:
params = Params(
symbols=fqme,
# timeframe=tfstr,
timeframe='1Min',
attrgroup='OHLCV',
end=end,
# limit_from_start=True,
# TODO: figure the max limit here given the
# ``purepc`` msg size limit of purerpc: 33554432
limit=limit,
)
if end is not None:
breakpoint()
result = await client.query(params)
data_set = result.by_symbols()[fqme]
array = data_set.array
times = array['Epoch']
end: float = float(times[0])
dt = pendulum.from_timestamp(end)
breakpoint()
print(
f'trying to load next {limit} datums frame starting @ {dt}'
)
trio.run(main)
@store.command()
def clone(
fqme: str,
) -> None:
import time
from piker.config import get_conf_dir
from piker.data import (
maybe_open_shm_array,
def_iohlcv_fields,
)
import polars as pl
# open existing shm buffer for kucoin backend
key: str = 'piker.brokerd[d07c9bb7-b720-41].tlosusdt.kucoin.hist'
shmpath: Path = Path('/dev/shm') / key
assert shmpath.is_file()
async def main():
async with (
open_piker_runtime(
'polars_boi',
enable_modules=['piker.data._sharedmem'],
),
):
# attach to any shm buffer, load array into polars df,
# write to local parquet file.
shm, opened = maybe_open_shm_array(
key=key,
dtype=def_iohlcv_fields,
)
assert not opened
ohlcv = shm.array
start = time.time()
# XXX: thanks to this SO answer for this conversion tip:
# https://stackoverflow.com/a/72054819
df = pl.DataFrame({
field_name: ohlcv[field_name]
for field_name in ohlcv.dtype.fields
})
delay: float = round(
time.time() - start,
ndigits=6,
)
print(
f'numpy -> polars conversion took {delay} secs\n'
f'polars df: {df}'
)
# compute ohlc properties for naming
times: np.ndarray = ohlcv['time']
secs: float = times[-1] - times[-2]
if secs < 1.:
breakpoint()
raise ValueError(
f'Something is wrong with time period for {shm}:\n{ohlcv}'
)
timeframe: str = f'{secs}s'
# write to parquet file
datadir: Path = get_conf_dir() / 'parqdb'
if not datadir.is_dir():
datadir.mkdir()
path: Path = datadir / f'{fqme}.{timeframe}.parquet'
# write to fs
start = time.time()
df.write_parquet(path)
delay: float = round(
time.time() - start,
ndigits=6,
)
print(
f'parquet write took {delay} secs\n'
f'file path: {path}'
)
# read back from fs
start = time.time()
read_df: pl.DataFrame = pl.read_parquet(path)
delay: float = round(
time.time() - start,
ndigits=6,
)
print(
f'parquet read took {delay} secs\n'
f'polars df: {read_df}'
)
trio.run(main)
typer_click_object = typer.main.get_command(store) typer_click_object = typer.main.get_command(store)
cli.add_command(typer_click_object, 'store') cli.add_command(typer_click_object, 'store')