Move `numpy` <-> `polars` converters into `.data.tsp`

Yet again these are (going to be) generally useful in the data proc
layer as well as going forward with (possibly) moving the history and
shm rt-processing layer to apache (arrow or other) shared-ds
equivalents.
distribute_dis
Tyler Goodlet 2023-12-11 17:53:31 -05:00
parent b94582cb35
commit 49c458710e
3 changed files with 58 additions and 52 deletions

View File

@ -28,6 +28,7 @@ from math import (
ceil, ceil,
floor, floor,
) )
import time
from typing import Literal from typing import Literal
import numpy as np import numpy as np
@ -408,3 +409,51 @@ def dedupe(src_df: pl.DataFrame) -> tuple[
deduped, deduped,
was_deduped, was_deduped,
) )
# NOTE: thanks to this SO answer for the below conversion routines
# to go from numpy struct-arrays to polars dataframes and back:
# https://stackoverflow.com/a/72054819
def np2pl(array: np.ndarray) -> pl.DataFrame:
start = time.time()
# XXX: thanks to this SO answer for this conversion tip:
# https://stackoverflow.com/a/72054819
df = pl.DataFrame({
field_name: array[field_name]
for field_name in array.dtype.fields
})
delay: float = round(
time.time() - start,
ndigits=6,
)
log.info(
f'numpy -> polars conversion took {delay} secs\n'
f'polars df: {df}'
)
return df
# return pl.DataFrame({
# field_name: array[field_name]
# for field_name in array.dtype.fields
# })
def pl2np(
df: pl.DataFrame,
dtype: np.dtype,
) -> np.ndarray:
# Create numpy struct array of the correct size and dtype
# and loop through df columns to fill in array fields.
array = np.empty(
df.height,
dtype,
)
for field, col in zip(
dtype.fields,
df.columns,
):
array[field] = df.get_column(col).to_numpy()
return array

View File

@ -260,22 +260,8 @@ def iter_dfs_from_shms(fqme: str) -> Generator[
assert not opened assert not opened
ohlcv = shm.array ohlcv = shm.array
start = time.time() from .nativedb import np2pl
df: pl.DataFrame = np2pl(ohlcv)
# XXX: thanks to this SO answer for this conversion tip:
# https://stackoverflow.com/a/72054819
df = pl.DataFrame({
field_name: ohlcv[field_name]
for field_name in ohlcv.dtype.fields
})
delay: float = round(
time.time() - start,
ndigits=6,
)
log.info(
f'numpy -> polars conversion took {delay} secs\n'
f'polars df: {df}'
)
yield ( yield (
shmfile, shmfile,
@ -316,7 +302,6 @@ def ldshm(
f'Something is wrong with time period for {shm}:\n{times}' f'Something is wrong with time period for {shm}:\n{times}'
) )
# over-write back to shm? # over-write back to shm?
df: pl.DataFrame # with dts df: pl.DataFrame # with dts
deduped: pl.DataFrame # deduplicated dts deduped: pl.DataFrame # deduplicated dts

View File

@ -65,8 +65,11 @@ from pendulum import (
) )
from piker import config from piker import config
from piker.data import def_iohlcv_fields from piker.data import (
from piker.data import ShmArray def_iohlcv_fields,
ShmArray,
tsp,
)
from piker.log import get_logger from piker.log import get_logger
from . import TimeseriesNotFound from . import TimeseriesNotFound
@ -74,37 +77,6 @@ from . import TimeseriesNotFound
log = get_logger('storage.nativedb') log = get_logger('storage.nativedb')
# NOTE: thanks to this SO answer for the below conversion routines
# to go from numpy struct-arrays to polars dataframes and back:
# https://stackoverflow.com/a/72054819
def np2pl(array: np.ndarray) -> pl.DataFrame:
return pl.DataFrame({
field_name: array[field_name]
for field_name in array.dtype.fields
})
def pl2np(
df: pl.DataFrame,
dtype: np.dtype,
) -> np.ndarray:
# Create numpy struct array of the correct size and dtype
# and loop through df columns to fill in array fields.
array = np.empty(
df.height,
dtype,
)
for field, col in zip(
dtype.fields,
df.columns,
):
array[field] = df.get_column(col).to_numpy()
return array
def detect_period(shm: ShmArray) -> float: def detect_period(shm: ShmArray) -> float:
''' '''
Attempt to detect the series time step sampling period Attempt to detect the series time step sampling period
@ -290,7 +262,7 @@ class NativeStorageClient:
# TODO: filter by end and limit inputs # TODO: filter by end and limit inputs
# times: pl.Series = df['time'] # times: pl.Series = df['time']
array: np.ndarray = pl2np( array: np.ndarray = tsp.pl2np(
df, df,
dtype=np.dtype(def_iohlcv_fields), dtype=np.dtype(def_iohlcv_fields),
) )
@ -326,7 +298,7 @@ class NativeStorageClient:
datadir=self._datadir, datadir=self._datadir,
) )
if isinstance(ohlcv, np.ndarray): if isinstance(ohlcv, np.ndarray):
df: pl.DataFrame = np2pl(ohlcv) df: pl.DataFrame = tsp.np2pl(ohlcv)
else: else:
df = ohlcv df = ohlcv