Compare commits

...

10 Commits

Author SHA1 Message Date
Tyler Goodlet f274c3db3b Import `np2pl()` from `.data.tsp`
Also toss in todo for a timeseries search CLI cmd which can be handy
when doing offine store mgmt.
2023-12-13 09:25:44 -05:00
Tyler Goodlet b95932ea09 `.data.history`: run `.tsp.dedupe()` in backloader
In an effort to catch out-of-order and/or partial-frame-duplicated
segments, add some `.tsp` calls throughout the backloader tasks
including a call to the new `.sort_diff()` to catch the out-of-order
history cases.
2023-12-12 19:57:46 -05:00
Tyler Goodlet e8bf4c6e04 Return the `.len()` diff from `dedupe()` instead
Since the `diff: int` serves as a predicate anyway (when `0` nothing
duplicate was detected) might as well just return it directly since it's
likely also useful for the caller when doing deeper anal.

Also, handle the zero-diff case by just returning early with a copy of
the input frame and a `diff=0`.

CHERRY INTO #486
2023-12-12 16:48:56 -05:00
Tyler Goodlet 8e4d1a48ed Bleh, fix ib's `Client.bars()` recursion..
Turns out this was the main source of all sorts of gaps and overlaps
in history frame backfilling. The original idea was that when a gap
causes not enough (1m) bars to be delivered (like over a weekend or
holiday) when we just implicitly do another frame query to try and at
least fill out the default duration (normally 1-2 days). Doing the
recursion sloppily was causing all sorts of stupid problems..

It's kinda obvious now what was wrong in hindsight:
- always pass the sampling period (timeframe) when recursing
- adjust the logic to not be mutex with the no-data case (since it
  already is mutex..)
- pack to the `numpy` array BEFORE the recursive call to ensure the
  `end_dt: DateTime` is selected and passed correctly!

Toss in some other helpfuls:
- more explicit `pendulum` typing imports
- some masked out sorted-diffing checks (that can be enabled when
  debugging out-of-order frame issues)
- always error log about less-than time step mismatches since we should never
  have time-diff steps **smaller** then specified in the
  `sample_period_s`!
2023-12-12 16:19:21 -05:00
Tyler Goodlet b03eceebef data.tsp: drop masked `return` one liner 2023-12-11 20:11:42 -05:00
Tyler Goodlet f7a8d79b7b Add `NativeStorageClient._cache_df()` use it in `.write_ohlcv()` for caching on writes as well 2023-12-11 20:10:53 -05:00
Tyler Goodlet 49c458710e Move `numpy` <-> `polars` converters into `.data.tsp`
Yet again these are (going to be) generally useful in the data proc
layer as well as going forward with (possibly) moving the history and
shm rt-processing layer to apache (arrow or other) shared-ds
equivalents.
2023-12-11 17:53:31 -05:00
Tyler Goodlet b94582cb35 Move `dedupe()` to `.data.tsp` (so it has pals)
Includes a rename of `.data._timeseries` -> `.data.tsp` for "time series
processing", making it a public sub-mod; it contains a highly useful set
of data-frame and `numpy.ndarray` ops routines in various subsystems Bo
2023-12-11 16:24:27 -05:00
Tyler Goodlet 7311000846 Facepalm, set `was_deduped` as bool not the deduped frame.. 2023-12-11 13:18:10 -05:00
Tyler Goodlet e719733f97 Comment out overlap case block for now too? 2023-12-08 19:08:10 -05:00
10 changed files with 366 additions and 200 deletions

View File

@ -49,7 +49,12 @@ from bidict import bidict
import trio
import tractor
from tractor import to_asyncio
import pendulum
from pendulum import (
from_timestamp,
DateTime,
Duration,
duration as mk_duration,
)
from eventkit import Event
from ib_insync import (
client as ib_client,
@ -221,16 +226,20 @@ def bars_to_np(bars: list) -> np.ndarray:
# https://interactivebrokers.github.io/tws-api/historical_limitations.html#non-available_hd
_samplings: dict[int, tuple[str, str]] = {
1: (
# ib strs
'1 secs',
f'{int(2e3)} S',
pendulum.duration(seconds=2e3),
mk_duration(seconds=2e3),
),
# TODO: benchmark >1 D duration on query to see if
# throughput can be made faster during backfilling.
60: (
# ib strs
'1 min',
'2 D',
pendulum.duration(days=2),
mk_duration(days=2),
),
}
@ -315,7 +324,7 @@ class Client:
**kwargs,
) -> tuple[BarDataList, np.ndarray, pendulum.Duration]:
) -> tuple[BarDataList, np.ndarray, Duration]:
'''
Retreive OHLCV bars for a fqme over a range to the present.
@ -324,11 +333,20 @@ class Client:
# https://interactivebrokers.github.io/tws-api/historical_data.html
bars_kwargs = {'whatToShow': 'TRADES'}
bars_kwargs.update(kwargs)
bar_size, duration, dt_duration = _samplings[sample_period_s]
(
bar_size,
ib_duration_str,
default_dt_duration,
) = _samplings[sample_period_s]
dt_duration: DateTime = (
duration
or default_dt_duration
)
global _enters
log.info(
f"REQUESTING {duration}'s worth {bar_size} BARS\n"
f"REQUESTING {ib_duration_str}'s worth {bar_size} BARS\n"
f'{_enters} @ end={end_dt}"'
)
@ -353,7 +371,7 @@ class Client:
# time history length values format:
# ``durationStr=integer{SPACE}unit (S|D|W|M|Y)``
durationStr=duration,
durationStr=ib_duration_str,
# always use extended hours
useRTH=False,
@ -383,29 +401,55 @@ class Client:
# => we recursively call this method until we get at least
# as many bars such that they sum in aggregate to the the
# desired total time (duration) at most.
elif (
end_dt
and (
(len(bars) * sample_period_s) < dt_duration.in_seconds()
)
):
log.warning(
f'Recursing to get more bars from {end_dt} for {dt_duration}'
)
end_dt -= dt_duration
(
r_bars,
r_arr,
r_duration,
) = await self.bars(
fqme,
start_dt=start_dt,
end_dt=end_dt,
)
r_bars.extend(bars)
bars = r_bars
if end_dt:
nparr: np.ndarray = bars_to_np(bars)
times: np.ndarray = nparr['time']
first: float = times[0]
tdiff: float = times[-1] - first
if (
# len(bars) * sample_period_s) < dt_duration.in_seconds()
tdiff < dt_duration.in_seconds()
):
end_dt: DateTime = from_timestamp(first)
log.warning(
f'Frame result was shorter then {dt_duration}!?\n'
'Recursing for more bars:\n'
f'end_dt: {end_dt}\n'
f'dt_duration: {dt_duration}\n'
)
(
r_bars,
r_arr,
r_duration,
) = await self.bars(
fqme,
start_dt=start_dt,
end_dt=end_dt,
sample_period_s=sample_period_s,
# TODO: make a table for Duration to
# the ib str values in order to use this?
# duration=duration,
)
r_bars.extend(bars)
bars = r_bars
nparr = bars_to_np(bars)
# timestep should always be at least as large as the
# period step.
tdiff: np.ndarray = np.diff(nparr['time'])
to_short: np.ndarray = tdiff < sample_period_s
if (to_short).any():
# raise ValueError(
log.error(
f'OHLC frame for {sample_period_s} has {to_short.size} '
'time steps which are shorter then expected?!"'
)
# OOF: this will break teardown?
# breakpoint()
return bars, nparr, dt_duration
async def con_deats(

View File

@ -20,7 +20,7 @@ Order and trades endpoints for use with ``piker``'s EMS.
"""
from __future__ import annotations
from contextlib import ExitStack
from collections import ChainMap
# from collections import ChainMap
from functools import partial
from pprint import pformat
import time

View File

@ -196,10 +196,8 @@ async def open_history_client(
f'mean: {mean}'
)
if (
out is None
):
# could be trying to retreive bars over weekend
# could be trying to retreive bars over weekend
if out is None:
log.error(f"Can't grab bars starting at {end_dt}!?!?")
raise NoData(
f'{end_dt}',
@ -213,7 +211,24 @@ async def open_history_client(
):
raise DataUnavailable(f'First timestamp is {head_dt}')
bars, bars_array, first_dt, last_dt = out
# also see return type for `get_bars()`
bars: ibis.objects.BarDataList
bars_array: np.ndarray
first_dt: datetime
last_dt: datetime
(
bars,
bars_array,
first_dt,
last_dt,
) = out
# TODO: audit the sampling period here as well?
# timestep should always be at least as large as the
# period step.
# tdiff: np.ndarray = np.diff(bars_array['time'])
# if (tdiff < timeframe).any():
# await tractor.pause()
# volume cleaning since there's -ve entries,
# wood luv to know what crookery that is..

View File

@ -56,6 +56,7 @@ __all__: list[str] = [
'ShmArray',
'iterticks',
'maybe_open_shm_array',
'match_from_pairs',
'attach_shm_array',
'open_shm_array',
'get_shm_token',

View File

@ -1,18 +1,19 @@
# piker: trading gear for hackers
# Copyright (C) Tyler Goodlet (in stewardship for pikers)
# This program is free software: you can redistribute it and/or modify
# it under the terms of the GNU Affero General Public License as published by
# the Free Software Foundation, either version 3 of the License, or
# (at your option) any later version.
# This program is free software: you can redistribute it and/or
# modify it under the terms of the GNU Affero General Public
# License as published by the Free Software Foundation, either
# version 3 of the License, or (at your option) any later version.
# This program is distributed in the hope that it will be useful,
# but WITHOUT ANY WARRANTY; without even the implied warranty of
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
# GNU Affero General Public License for more details.
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
# Affero General Public License for more details.
# You should have received a copy of the GNU Affero General Public License
# along with this program. If not, see <https://www.gnu.org/licenses/>.
# You should have received a copy of the GNU Affero General Public
# License along with this program. If not, see
# <https://www.gnu.org/licenses/>.
'''
Historical data business logic for load, backfill and tsdb storage.
@ -39,6 +40,7 @@ from pendulum import (
from_timestamp,
)
import numpy as np
import polars as pl
from ..accounting import (
MktPair,
@ -54,6 +56,7 @@ from ._source import def_iohlcv_fields
from ._sampling import (
open_sample_stream,
)
from . import tsp
from ..brokers._util import (
DataUnavailable,
)
@ -197,7 +200,7 @@ async def start_backfill(
# do a decently sized backfill and load it into storage.
periods = {
1: {'days': 6},
1: {'days': 2},
60: {'years': 6},
}
period_duration: int = periods[timeframe]
@ -246,13 +249,16 @@ async def start_backfill(
# broker says there never was or is no more history to pull
except DataUnavailable:
log.warning(
f'NO-MORE-DATA: backend {mod.name} halted history!?'
f'NO-MORE-DATA: backend {mod.name} halted history:\n'
f'{timeframe}@{mkt.fqme}'
)
# ugh, what's a better way?
# TODO: fwiw, we probably want a way to signal a throttle
# condition (eg. with ib) so that we can halt the
# request loop until the condition is resolved?
if timeframe > 1:
await tractor.pause()
return
# TODO: drop this? see todo above..
@ -300,9 +306,11 @@ async def start_backfill(
array,
prepend_until_dt=backfill_until_dt,
)
ln = len(to_push)
ln: int = len(to_push)
if ln:
log.info(f'{ln} bars for {next_start_dt} -> {last_start_dt}')
log.info(
f'{ln} bars for {next_start_dt} -> {last_start_dt}'
)
else:
log.warning(
@ -388,14 +396,29 @@ async def start_backfill(
without_src=True,
)
else:
col_sym_key: str = mkt.get_fqme(delim_char='')
col_sym_key: str = mkt.get_fqme(
delim_char='',
)
# TODO: implement parquet append!?
await storage.write_ohlcv(
col_sym_key,
shm.array,
timeframe,
)
df: pl.DataFrame = await storage.as_df(
fqme=mkt.fqme,
period=timeframe,
load_from_offline=False,
)
(
df,
gaps,
deduped,
diff,
) = tsp.dedupe(df)
if diff:
tsp.sort_diff(df)
else:
# finally filled gap
log.info(
@ -406,7 +429,7 @@ async def start_backfill(
# TODO: ideally these never exist but somehow it seems
# sometimes we're writing zero-ed segments on certain
# (teardown) cases?
from ._timeseries import detect_null_time_gap
from .tsp import detect_null_time_gap
gap_indices: tuple | None = detect_null_time_gap(shm)
while gap_indices:
@ -634,12 +657,19 @@ async def tsdb_backfill(
async with mod.open_history_client(
mkt,
) as (get_hist, config):
log.info(f'{mod} history client returned backfill config: {config}')
log.info(
f'`{mod}` history client returned backfill config:\n'
f'{config}\n'
)
# get latest query's worth of history all the way
# back to what is recorded in the tsdb
try:
array, mr_start_dt, mr_end_dt = await get_hist(
(
array,
mr_start_dt,
mr_end_dt,
) = await get_hist(
timeframe,
end_dt=None,
)
@ -649,6 +679,7 @@ async def tsdb_backfill(
# there's no backfilling possible.
except DataUnavailable:
task_status.started()
await tractor.pause()
return
# TODO: fill in non-zero epoch time values ALWAYS!
@ -699,9 +730,8 @@ async def tsdb_backfill(
)
except TimeseriesNotFound:
log.warning(
f'No timeseries yet for {fqme}'
f'No timeseries yet for {timeframe}@{fqme}'
)
else:
(
tsdb_history,
@ -731,9 +761,9 @@ async def tsdb_backfill(
# to push datums that have already been recorded in the
# tsdb. In this case we instead only retreive and push
# the series portion missing from the db's data set.
if offset_s < 0:
non_overlap_diff: Duration = mr_end_dt - last_tsdb_dt
non_overlap_offset_s: float = backfill_diff.in_seconds()
# if offset_s < 0:
# non_overlap_diff: Duration = mr_end_dt - last_tsdb_dt
# non_overlap_offset_s: float = backfill_diff.in_seconds()
offset_samples: int = round(offset_s / timeframe)
@ -784,25 +814,24 @@ async def tsdb_backfill(
f'timeframe of {timeframe} seconds..\n'
'So yuh.. dun do dat brudder.'
)
# if there is a gap to backfill from the first
# history frame until the last datum loaded from the tsdb
# continue that now in the background
bf_done = await tn.start(
partial(
start_backfill,
get_hist,
mod,
mkt,
shm,
timeframe,
get_hist=get_hist,
mod=mod,
mkt=mkt,
shm=shm,
timeframe=timeframe,
backfill_from_shm_index=backfill_gap_from_shm_index,
backfill_from_dt=mr_start_dt,
sampler_stream=sampler_stream,
backfill_until_dt=last_tsdb_dt,
storage=storage,
write_tsdb=True,
)
)
@ -824,8 +853,11 @@ async def tsdb_backfill(
finally:
return
# IF we need to continue backloading incrementally from the
# tsdb client..
# XXX NOTE: this is legacy from when we were using
# marketstore and we needed to continue backloading
# incrementally from the tsdb client.. (bc it couldn't
# handle a single large query with gRPC for some
# reason.. classic goolag pos)
tn.start_soon(
back_load_from_tsdb,
@ -994,19 +1026,18 @@ async def manage_history(
log.info(f'Connected to sampler stream: {sample_stream}')
for timeframe in [60, 1]:
await tn.start(
await tn.start(partial(
tsdb_backfill,
mod,
storemod,
tn,
mod=mod,
storemod=storemod,
tn=tn,
# bus,
client,
mkt,
tf2mem[timeframe],
timeframe,
sample_stream,
)
storage=client,
mkt=mkt,
shm=tf2mem[timeframe],
timeframe=timeframe,
sampler_stream=sample_stream,
))
# indicate to caller that feed can be delivered to
# remote requesting client since we've loaded history

View File

@ -23,11 +23,13 @@ Routines are generally implemented in either ``numpy`` or
'''
from __future__ import annotations
from typing import Literal
from functools import partial
from math import (
ceil,
floor,
)
import time
from typing import Literal
import numpy as np
import polars as pl
@ -38,6 +40,18 @@ from ..toolz.profile import (
pg_profile_enabled,
ms_slower_then,
)
from ..log import (
get_logger,
get_console_log,
)
# for "time series processing"
subsys: str = 'piker.tsp'
log = get_logger(subsys)
get_console_log = partial(
get_console_log,
name=subsys,
)
def slice_from_time(
@ -248,7 +262,7 @@ def with_dts(
) -> pl.DataFrame:
'''
Insert datetime (casted) columns to a (presumably) OHLC sampled
time series with an epoch-time column keyed by ``time_col``.
time series with an epoch-time column keyed by `time_col: str`.
'''
return df.with_columns([
@ -256,7 +270,9 @@ def with_dts(
pl.col(time_col).diff().alias('s_diff'),
pl.from_epoch(pl.col(time_col)).alias('dt'),
]).with_columns([
pl.from_epoch(pl.col(f'{time_col}_prev')).alias('dt_prev'),
pl.from_epoch(
pl.col(f'{time_col}_prev')
).alias('dt_prev'),
pl.col('dt').diff().alias('dt_diff'),
]) #.with_columns(
# pl.col('dt').diff().dt.days().alias('days_dt_diff'),
@ -349,3 +365,117 @@ def detect_price_gaps(
# (pl.col(time_col) - pl.col(f'{time_col}_previous')).alias('diff'),
# ])
...
def dedupe(src_df: pl.DataFrame) -> tuple[
pl.DataFrame, # with dts
pl.DataFrame, # gaps
pl.DataFrame, # with deduplicated dts (aka gap/repeat removal)
int, # len diff between input and deduped
]:
'''
Check for time series gaps and if found
de-duplicate any datetime entries, check for
a frame height diff and return the newly
dt-deduplicated frame.
'''
df: pl.DataFrame = with_dts(src_df)
gaps: pl.DataFrame = detect_time_gaps(df)
# if no gaps detected just return carbon copies
# and no len diff.
if gaps.is_empty():
return (
df,
gaps,
df,
0,
)
# remove duplicated datetime samples/sections
deduped: pl.DataFrame = dedup_dt(df)
deduped_gaps = detect_time_gaps(deduped)
diff: int = (
df.height
-
deduped.height
)
log.warning(
f'Gaps found:\n{gaps}\n'
f'deduped Gaps found:\n{deduped_gaps}'
)
# TODO: rewrite this in polars and/or convert to
# ndarray to detect and remove?
# null_gaps = detect_null_time_gap()
return (
df,
gaps,
deduped,
diff,
)
def sort_diff(
src_df: pl.DataFrame,
col: str = 'time',
) -> tuple[
pl.DataFrame, # with dts
pl.DataFrame, # sorted
list[int], # indices of segments that are out-of-order
]:
ser: pl.Series = src_df[col]
diff: pl.Series = ser.diff()
sortd: pl.DataFrame = ser.sort()
sortd_diff: pl.Series = sortd.diff()
i_step_diff = (diff != sortd_diff).arg_true()
if i_step_diff.len():
import pdbp
pdbp.set_trace()
# NOTE: thanks to this SO answer for the below conversion routines
# to go from numpy struct-arrays to polars dataframes and back:
# https://stackoverflow.com/a/72054819
def np2pl(array: np.ndarray) -> pl.DataFrame:
start = time.time()
# XXX: thanks to this SO answer for this conversion tip:
# https://stackoverflow.com/a/72054819
df = pl.DataFrame({
field_name: array[field_name]
for field_name in array.dtype.fields
})
delay: float = round(
time.time() - start,
ndigits=6,
)
log.info(
f'numpy -> polars conversion took {delay} secs\n'
f'polars df: {df}'
)
return df
def pl2np(
df: pl.DataFrame,
dtype: np.dtype,
) -> np.ndarray:
# Create numpy struct array of the correct size and dtype
# and loop through df columns to fill in array fields.
array = np.empty(
df.height,
dtype,
)
for field, col in zip(
dtype.fields,
df.columns,
):
array[field] = df.get_column(col).to_numpy()
return array

View File

@ -40,6 +40,7 @@ from piker.data import (
maybe_open_shm_array,
def_iohlcv_fields,
ShmArray,
tsp,
)
from piker.data.history import (
_default_hist_size,
@ -98,6 +99,18 @@ def ls(
trio.run(query_all)
# TODO: like ls but takes in a pattern and matches
# @store.command()
# def search(
# patt: str,
# backends: list[str] = typer.Argument(
# default=None,
# help='Storage backends to query, default is all.'
# ),
# ):
# ...
@store.command()
def delete(
symbols: list[str],
@ -136,53 +149,6 @@ def delete(
trio.run(main, symbols)
def dedupe(src_df: pl.DataFrame) -> tuple[
pl.DataFrame, # with dts
pl.DataFrame, # gaps
pl.DataFrame, # with deduplicated dts (aka gap/repeat removal)
bool,
]:
'''
Check for time series gaps and if found
de-duplicate any datetime entries, check for
a frame height diff and return the newly
dt-deduplicated frame.
'''
from piker.data import _timeseries as tsp
df: pl.DataFrame = tsp.with_dts(src_df)
gaps: pl.DataFrame = tsp.detect_time_gaps(df)
if not gaps.is_empty():
# remove duplicated datetime samples/sections
deduped: pl.DataFrame = tsp.dedup_dt(df)
deduped_gaps = tsp.detect_time_gaps(deduped)
log.warning(
f'Gaps found:\n{gaps}\n'
f'deduped Gaps found:\n{deduped_gaps}'
)
# TODO: rewrite this in polars and/or convert to
# ndarray to detect and remove?
# null_gaps = tsp.detect_null_time_gap()
diff: int = (
df.height
-
deduped.height
)
was_deduped: bool = False
if diff:
deduped: bool = True
return (
df,
gaps,
deduped,
was_deduped,
)
@store.command()
def anal(
fqme: str,
@ -235,10 +201,10 @@ def anal(
df,
gaps,
deduped,
shortened,
) = dedupe(shm_df)
diff,
) = tsp.dedupe(shm_df)
if shortened:
if diff:
await client.write_ohlcv(
fqme,
ohlcv=deduped,
@ -306,22 +272,8 @@ def iter_dfs_from_shms(fqme: str) -> Generator[
assert not opened
ohlcv = shm.array
start = time.time()
# XXX: thanks to this SO answer for this conversion tip:
# https://stackoverflow.com/a/72054819
df = pl.DataFrame({
field_name: ohlcv[field_name]
for field_name in ohlcv.dtype.fields
})
delay: float = round(
time.time() - start,
ndigits=6,
)
log.info(
f'numpy -> polars conversion took {delay} secs\n'
f'polars df: {df}'
)
from ..data import tsp
df: pl.DataFrame = tsp.np2pl(ohlcv)
yield (
shmfile,
@ -333,7 +285,6 @@ def iter_dfs_from_shms(fqme: str) -> Generator[
@store.command()
def ldshm(
fqme: str,
write_parquet: bool = False,
) -> None:
@ -362,7 +313,6 @@ def ldshm(
f'Something is wrong with time period for {shm}:\n{times}'
)
# over-write back to shm?
df: pl.DataFrame # with dts
deduped: pl.DataFrame # deduplicated dts
@ -370,8 +320,8 @@ def ldshm(
df,
gaps,
deduped,
was_dded,
) = dedupe(shm_df)
diff,
) = tsp.dedupe(shm_df)
# TODO: maybe only optionally enter this depending
# on some CLI flags and/or gap detection?

View File

@ -65,8 +65,11 @@ from pendulum import (
)
from piker import config
from piker.data import def_iohlcv_fields
from piker.data import ShmArray
from piker.data import (
def_iohlcv_fields,
ShmArray,
tsp,
)
from piker.log import get_logger
from . import TimeseriesNotFound
@ -74,37 +77,6 @@ from . import TimeseriesNotFound
log = get_logger('storage.nativedb')
# NOTE: thanks to this SO answer for the below conversion routines
# to go from numpy struct-arrays to polars dataframes and back:
# https://stackoverflow.com/a/72054819
def np2pl(array: np.ndarray) -> pl.DataFrame:
return pl.DataFrame({
field_name: array[field_name]
for field_name in array.dtype.fields
})
def pl2np(
df: pl.DataFrame,
dtype: np.dtype,
) -> np.ndarray:
# Create numpy struct array of the correct size and dtype
# and loop through df columns to fill in array fields.
array = np.empty(
df.height,
dtype,
)
for field, col in zip(
dtype.fields,
df.columns,
):
array[field] = df.get_column(col).to_numpy()
return array
def detect_period(shm: ShmArray) -> float:
'''
Attempt to detect the series time step sampling period
@ -264,6 +236,22 @@ class NativeStorageClient:
datadir=self._datadir,
)
def _cache_df(
self,
fqme: str,
df: pl.DataFrame,
timeframe: float,
) -> None:
# cache df for later usage since we (currently) need to
# convert to np.ndarrays to push to our `ShmArray` rt
# buffers subsys but later we may operate entirely on
# pyarrow arrays/buffers so keeping the dfs around for
# a variety of purposes is handy.
self._dfs.setdefault(
timeframe,
{},
)[fqme] = df
async def read_ohlcv(
self,
fqme: str,
@ -278,19 +266,14 @@ class NativeStorageClient:
)
df: pl.DataFrame = pl.read_parquet(path)
# cache df for later usage since we (currently) need to
# convert to np.ndarrays to push to our `ShmArray` rt
# buffers subsys but later we may operate entirely on
# pyarrow arrays/buffers so keeping the dfs around for
# a variety of purposes is handy.
self._dfs.setdefault(
timeframe,
{},
)[fqme] = df
self._cache_df(
fqme=fqme,
df=df,
timeframe=timeframe,
)
# TODO: filter by end and limit inputs
# times: pl.Series = df['time']
array: np.ndarray = pl2np(
array: np.ndarray = tsp.pl2np(
df,
dtype=np.dtype(def_iohlcv_fields),
)
@ -300,11 +283,15 @@ class NativeStorageClient:
self,
fqme: str,
period: int = 60,
load_from_offline: bool = True,
) -> pl.DataFrame:
try:
return self._dfs[period][fqme]
except KeyError:
if not load_from_offline:
raise
await self.read_ohlcv(fqme, period)
return self._dfs[period][fqme]
@ -326,15 +313,23 @@ class NativeStorageClient:
datadir=self._datadir,
)
if isinstance(ohlcv, np.ndarray):
df: pl.DataFrame = np2pl(ohlcv)
df: pl.DataFrame = tsp.np2pl(ohlcv)
else:
df = ohlcv
self._cache_df(
fqme=fqme,
df=df,
timeframe=timeframe,
)
# TODO: in terms of managing the ultra long term data
# - use a proper profiler to measure all this IO and
# -[ ] use a proper profiler to measure all this IO and
# roundtripping!
# - try out ``fastparquet``'s append writing:
# https://fastparquet.readthedocs.io/en/latest/api.html#fastparquet.write
# -[ ] implement parquet append!? see issue:
# https://github.com/pikers/piker/issues/536
# -[ ] try out ``fastparquet``'s append writing:
# https://fastparquet.readthedocs.io/en/latest/api.html#fastparquet.write
start = time.time()
df.write_parquet(path)
delay: float = round(

View File

@ -49,7 +49,7 @@ from ..data._formatters import (
OHLCBarsAsCurveFmtr, # OHLC converted to line
StepCurveFmtr, # "step" curve (like for vlm)
)
from ..data._timeseries import (
from ..data.tsp import (
slice_from_time,
)
from ._ohlc import (

View File

@ -31,7 +31,7 @@ import pendulum
import pyqtgraph as pg
from piker.types import Struct
from ..data._timeseries import slice_from_time
from ..data.tsp import slice_from_time
from ..log import get_logger
from ..toolz import Profiler