Compare commits
No commits in common. "f274c3db3b5cc00abee8cec84ef98bd564e437d2" and "cb941a55549b1da48c6ae837865a48dbc1a63026" have entirely different histories.
f274c3db3b
...
cb941a5554
|
@ -49,12 +49,7 @@ from bidict import bidict
|
||||||
import trio
|
import trio
|
||||||
import tractor
|
import tractor
|
||||||
from tractor import to_asyncio
|
from tractor import to_asyncio
|
||||||
from pendulum import (
|
import pendulum
|
||||||
from_timestamp,
|
|
||||||
DateTime,
|
|
||||||
Duration,
|
|
||||||
duration as mk_duration,
|
|
||||||
)
|
|
||||||
from eventkit import Event
|
from eventkit import Event
|
||||||
from ib_insync import (
|
from ib_insync import (
|
||||||
client as ib_client,
|
client as ib_client,
|
||||||
|
@ -226,20 +221,16 @@ def bars_to_np(bars: list) -> np.ndarray:
|
||||||
# https://interactivebrokers.github.io/tws-api/historical_limitations.html#non-available_hd
|
# https://interactivebrokers.github.io/tws-api/historical_limitations.html#non-available_hd
|
||||||
_samplings: dict[int, tuple[str, str]] = {
|
_samplings: dict[int, tuple[str, str]] = {
|
||||||
1: (
|
1: (
|
||||||
# ib strs
|
|
||||||
'1 secs',
|
'1 secs',
|
||||||
f'{int(2e3)} S',
|
f'{int(2e3)} S',
|
||||||
|
pendulum.duration(seconds=2e3),
|
||||||
mk_duration(seconds=2e3),
|
|
||||||
),
|
),
|
||||||
# TODO: benchmark >1 D duration on query to see if
|
# TODO: benchmark >1 D duration on query to see if
|
||||||
# throughput can be made faster during backfilling.
|
# throughput can be made faster during backfilling.
|
||||||
60: (
|
60: (
|
||||||
# ib strs
|
|
||||||
'1 min',
|
'1 min',
|
||||||
'2 D',
|
'2 D',
|
||||||
|
pendulum.duration(days=2),
|
||||||
mk_duration(days=2),
|
|
||||||
),
|
),
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -324,7 +315,7 @@ class Client:
|
||||||
|
|
||||||
**kwargs,
|
**kwargs,
|
||||||
|
|
||||||
) -> tuple[BarDataList, np.ndarray, Duration]:
|
) -> tuple[BarDataList, np.ndarray, pendulum.Duration]:
|
||||||
'''
|
'''
|
||||||
Retreive OHLCV bars for a fqme over a range to the present.
|
Retreive OHLCV bars for a fqme over a range to the present.
|
||||||
|
|
||||||
|
@ -333,20 +324,11 @@ class Client:
|
||||||
# https://interactivebrokers.github.io/tws-api/historical_data.html
|
# https://interactivebrokers.github.io/tws-api/historical_data.html
|
||||||
bars_kwargs = {'whatToShow': 'TRADES'}
|
bars_kwargs = {'whatToShow': 'TRADES'}
|
||||||
bars_kwargs.update(kwargs)
|
bars_kwargs.update(kwargs)
|
||||||
(
|
bar_size, duration, dt_duration = _samplings[sample_period_s]
|
||||||
bar_size,
|
|
||||||
ib_duration_str,
|
|
||||||
default_dt_duration,
|
|
||||||
) = _samplings[sample_period_s]
|
|
||||||
|
|
||||||
dt_duration: DateTime = (
|
|
||||||
duration
|
|
||||||
or default_dt_duration
|
|
||||||
)
|
|
||||||
|
|
||||||
global _enters
|
global _enters
|
||||||
log.info(
|
log.info(
|
||||||
f"REQUESTING {ib_duration_str}'s worth {bar_size} BARS\n"
|
f"REQUESTING {duration}'s worth {bar_size} BARS\n"
|
||||||
f'{_enters} @ end={end_dt}"'
|
f'{_enters} @ end={end_dt}"'
|
||||||
)
|
)
|
||||||
|
|
||||||
|
@ -371,7 +353,7 @@ class Client:
|
||||||
|
|
||||||
# time history length values format:
|
# time history length values format:
|
||||||
# ``durationStr=integer{SPACE}unit (S|D|W|M|Y)``
|
# ``durationStr=integer{SPACE}unit (S|D|W|M|Y)``
|
||||||
durationStr=ib_duration_str,
|
durationStr=duration,
|
||||||
|
|
||||||
# always use extended hours
|
# always use extended hours
|
||||||
useRTH=False,
|
useRTH=False,
|
||||||
|
@ -401,23 +383,16 @@ class Client:
|
||||||
# => we recursively call this method until we get at least
|
# => we recursively call this method until we get at least
|
||||||
# as many bars such that they sum in aggregate to the the
|
# as many bars such that they sum in aggregate to the the
|
||||||
# desired total time (duration) at most.
|
# desired total time (duration) at most.
|
||||||
if end_dt:
|
elif (
|
||||||
nparr: np.ndarray = bars_to_np(bars)
|
end_dt
|
||||||
times: np.ndarray = nparr['time']
|
and (
|
||||||
first: float = times[0]
|
(len(bars) * sample_period_s) < dt_duration.in_seconds()
|
||||||
tdiff: float = times[-1] - first
|
|
||||||
|
|
||||||
if (
|
|
||||||
# len(bars) * sample_period_s) < dt_duration.in_seconds()
|
|
||||||
tdiff < dt_duration.in_seconds()
|
|
||||||
):
|
|
||||||
end_dt: DateTime = from_timestamp(first)
|
|
||||||
log.warning(
|
|
||||||
f'Frame result was shorter then {dt_duration}!?\n'
|
|
||||||
'Recursing for more bars:\n'
|
|
||||||
f'end_dt: {end_dt}\n'
|
|
||||||
f'dt_duration: {dt_duration}\n'
|
|
||||||
)
|
)
|
||||||
|
):
|
||||||
|
log.warning(
|
||||||
|
f'Recursing to get more bars from {end_dt} for {dt_duration}'
|
||||||
|
)
|
||||||
|
end_dt -= dt_duration
|
||||||
(
|
(
|
||||||
r_bars,
|
r_bars,
|
||||||
r_arr,
|
r_arr,
|
||||||
|
@ -426,30 +401,11 @@ class Client:
|
||||||
fqme,
|
fqme,
|
||||||
start_dt=start_dt,
|
start_dt=start_dt,
|
||||||
end_dt=end_dt,
|
end_dt=end_dt,
|
||||||
sample_period_s=sample_period_s,
|
|
||||||
|
|
||||||
# TODO: make a table for Duration to
|
|
||||||
# the ib str values in order to use this?
|
|
||||||
# duration=duration,
|
|
||||||
)
|
)
|
||||||
r_bars.extend(bars)
|
r_bars.extend(bars)
|
||||||
bars = r_bars
|
bars = r_bars
|
||||||
|
|
||||||
nparr = bars_to_np(bars)
|
nparr = bars_to_np(bars)
|
||||||
|
|
||||||
# timestep should always be at least as large as the
|
|
||||||
# period step.
|
|
||||||
tdiff: np.ndarray = np.diff(nparr['time'])
|
|
||||||
to_short: np.ndarray = tdiff < sample_period_s
|
|
||||||
if (to_short).any():
|
|
||||||
# raise ValueError(
|
|
||||||
log.error(
|
|
||||||
f'OHLC frame for {sample_period_s} has {to_short.size} '
|
|
||||||
'time steps which are shorter then expected?!"'
|
|
||||||
)
|
|
||||||
# OOF: this will break teardown?
|
|
||||||
# breakpoint()
|
|
||||||
|
|
||||||
return bars, nparr, dt_duration
|
return bars, nparr, dt_duration
|
||||||
|
|
||||||
async def con_deats(
|
async def con_deats(
|
||||||
|
|
|
@ -20,7 +20,7 @@ Order and trades endpoints for use with ``piker``'s EMS.
|
||||||
"""
|
"""
|
||||||
from __future__ import annotations
|
from __future__ import annotations
|
||||||
from contextlib import ExitStack
|
from contextlib import ExitStack
|
||||||
# from collections import ChainMap
|
from collections import ChainMap
|
||||||
from functools import partial
|
from functools import partial
|
||||||
from pprint import pformat
|
from pprint import pformat
|
||||||
import time
|
import time
|
||||||
|
|
|
@ -196,8 +196,10 @@ async def open_history_client(
|
||||||
f'mean: {mean}'
|
f'mean: {mean}'
|
||||||
)
|
)
|
||||||
|
|
||||||
|
if (
|
||||||
|
out is None
|
||||||
|
):
|
||||||
# could be trying to retreive bars over weekend
|
# could be trying to retreive bars over weekend
|
||||||
if out is None:
|
|
||||||
log.error(f"Can't grab bars starting at {end_dt}!?!?")
|
log.error(f"Can't grab bars starting at {end_dt}!?!?")
|
||||||
raise NoData(
|
raise NoData(
|
||||||
f'{end_dt}',
|
f'{end_dt}',
|
||||||
|
@ -211,24 +213,7 @@ async def open_history_client(
|
||||||
):
|
):
|
||||||
raise DataUnavailable(f'First timestamp is {head_dt}')
|
raise DataUnavailable(f'First timestamp is {head_dt}')
|
||||||
|
|
||||||
# also see return type for `get_bars()`
|
bars, bars_array, first_dt, last_dt = out
|
||||||
bars: ibis.objects.BarDataList
|
|
||||||
bars_array: np.ndarray
|
|
||||||
first_dt: datetime
|
|
||||||
last_dt: datetime
|
|
||||||
(
|
|
||||||
bars,
|
|
||||||
bars_array,
|
|
||||||
first_dt,
|
|
||||||
last_dt,
|
|
||||||
) = out
|
|
||||||
|
|
||||||
# TODO: audit the sampling period here as well?
|
|
||||||
# timestep should always be at least as large as the
|
|
||||||
# period step.
|
|
||||||
# tdiff: np.ndarray = np.diff(bars_array['time'])
|
|
||||||
# if (tdiff < timeframe).any():
|
|
||||||
# await tractor.pause()
|
|
||||||
|
|
||||||
# volume cleaning since there's -ve entries,
|
# volume cleaning since there's -ve entries,
|
||||||
# wood luv to know what crookery that is..
|
# wood luv to know what crookery that is..
|
||||||
|
|
|
@ -56,7 +56,6 @@ __all__: list[str] = [
|
||||||
'ShmArray',
|
'ShmArray',
|
||||||
'iterticks',
|
'iterticks',
|
||||||
'maybe_open_shm_array',
|
'maybe_open_shm_array',
|
||||||
'match_from_pairs',
|
|
||||||
'attach_shm_array',
|
'attach_shm_array',
|
||||||
'open_shm_array',
|
'open_shm_array',
|
||||||
'get_shm_token',
|
'get_shm_token',
|
||||||
|
|
|
@ -23,13 +23,11 @@ Routines are generally implemented in either ``numpy`` or
|
||||||
|
|
||||||
'''
|
'''
|
||||||
from __future__ import annotations
|
from __future__ import annotations
|
||||||
from functools import partial
|
from typing import Literal
|
||||||
from math import (
|
from math import (
|
||||||
ceil,
|
ceil,
|
||||||
floor,
|
floor,
|
||||||
)
|
)
|
||||||
import time
|
|
||||||
from typing import Literal
|
|
||||||
|
|
||||||
import numpy as np
|
import numpy as np
|
||||||
import polars as pl
|
import polars as pl
|
||||||
|
@ -40,18 +38,6 @@ from ..toolz.profile import (
|
||||||
pg_profile_enabled,
|
pg_profile_enabled,
|
||||||
ms_slower_then,
|
ms_slower_then,
|
||||||
)
|
)
|
||||||
from ..log import (
|
|
||||||
get_logger,
|
|
||||||
get_console_log,
|
|
||||||
)
|
|
||||||
# for "time series processing"
|
|
||||||
subsys: str = 'piker.tsp'
|
|
||||||
|
|
||||||
log = get_logger(subsys)
|
|
||||||
get_console_log = partial(
|
|
||||||
get_console_log,
|
|
||||||
name=subsys,
|
|
||||||
)
|
|
||||||
|
|
||||||
|
|
||||||
def slice_from_time(
|
def slice_from_time(
|
||||||
|
@ -262,7 +248,7 @@ def with_dts(
|
||||||
) -> pl.DataFrame:
|
) -> pl.DataFrame:
|
||||||
'''
|
'''
|
||||||
Insert datetime (casted) columns to a (presumably) OHLC sampled
|
Insert datetime (casted) columns to a (presumably) OHLC sampled
|
||||||
time series with an epoch-time column keyed by `time_col: str`.
|
time series with an epoch-time column keyed by ``time_col``.
|
||||||
|
|
||||||
'''
|
'''
|
||||||
return df.with_columns([
|
return df.with_columns([
|
||||||
|
@ -270,9 +256,7 @@ def with_dts(
|
||||||
pl.col(time_col).diff().alias('s_diff'),
|
pl.col(time_col).diff().alias('s_diff'),
|
||||||
pl.from_epoch(pl.col(time_col)).alias('dt'),
|
pl.from_epoch(pl.col(time_col)).alias('dt'),
|
||||||
]).with_columns([
|
]).with_columns([
|
||||||
pl.from_epoch(
|
pl.from_epoch(pl.col(f'{time_col}_prev')).alias('dt_prev'),
|
||||||
pl.col(f'{time_col}_prev')
|
|
||||||
).alias('dt_prev'),
|
|
||||||
pl.col('dt').diff().alias('dt_diff'),
|
pl.col('dt').diff().alias('dt_diff'),
|
||||||
]) #.with_columns(
|
]) #.with_columns(
|
||||||
# pl.col('dt').diff().dt.days().alias('days_dt_diff'),
|
# pl.col('dt').diff().dt.days().alias('days_dt_diff'),
|
||||||
|
@ -365,117 +349,3 @@ def detect_price_gaps(
|
||||||
# (pl.col(time_col) - pl.col(f'{time_col}_previous')).alias('diff'),
|
# (pl.col(time_col) - pl.col(f'{time_col}_previous')).alias('diff'),
|
||||||
# ])
|
# ])
|
||||||
...
|
...
|
||||||
|
|
||||||
|
|
||||||
def dedupe(src_df: pl.DataFrame) -> tuple[
|
|
||||||
pl.DataFrame, # with dts
|
|
||||||
pl.DataFrame, # gaps
|
|
||||||
pl.DataFrame, # with deduplicated dts (aka gap/repeat removal)
|
|
||||||
int, # len diff between input and deduped
|
|
||||||
]:
|
|
||||||
'''
|
|
||||||
Check for time series gaps and if found
|
|
||||||
de-duplicate any datetime entries, check for
|
|
||||||
a frame height diff and return the newly
|
|
||||||
dt-deduplicated frame.
|
|
||||||
|
|
||||||
'''
|
|
||||||
df: pl.DataFrame = with_dts(src_df)
|
|
||||||
gaps: pl.DataFrame = detect_time_gaps(df)
|
|
||||||
|
|
||||||
# if no gaps detected just return carbon copies
|
|
||||||
# and no len diff.
|
|
||||||
if gaps.is_empty():
|
|
||||||
return (
|
|
||||||
df,
|
|
||||||
gaps,
|
|
||||||
df,
|
|
||||||
0,
|
|
||||||
)
|
|
||||||
|
|
||||||
# remove duplicated datetime samples/sections
|
|
||||||
deduped: pl.DataFrame = dedup_dt(df)
|
|
||||||
deduped_gaps = detect_time_gaps(deduped)
|
|
||||||
|
|
||||||
diff: int = (
|
|
||||||
df.height
|
|
||||||
-
|
|
||||||
deduped.height
|
|
||||||
)
|
|
||||||
log.warning(
|
|
||||||
f'Gaps found:\n{gaps}\n'
|
|
||||||
f'deduped Gaps found:\n{deduped_gaps}'
|
|
||||||
)
|
|
||||||
# TODO: rewrite this in polars and/or convert to
|
|
||||||
# ndarray to detect and remove?
|
|
||||||
# null_gaps = detect_null_time_gap()
|
|
||||||
|
|
||||||
return (
|
|
||||||
df,
|
|
||||||
gaps,
|
|
||||||
deduped,
|
|
||||||
diff,
|
|
||||||
)
|
|
||||||
|
|
||||||
|
|
||||||
def sort_diff(
|
|
||||||
src_df: pl.DataFrame,
|
|
||||||
col: str = 'time',
|
|
||||||
|
|
||||||
) -> tuple[
|
|
||||||
pl.DataFrame, # with dts
|
|
||||||
pl.DataFrame, # sorted
|
|
||||||
list[int], # indices of segments that are out-of-order
|
|
||||||
]:
|
|
||||||
ser: pl.Series = src_df[col]
|
|
||||||
|
|
||||||
diff: pl.Series = ser.diff()
|
|
||||||
sortd: pl.DataFrame = ser.sort()
|
|
||||||
sortd_diff: pl.Series = sortd.diff()
|
|
||||||
i_step_diff = (diff != sortd_diff).arg_true()
|
|
||||||
if i_step_diff.len():
|
|
||||||
import pdbp
|
|
||||||
pdbp.set_trace()
|
|
||||||
|
|
||||||
# NOTE: thanks to this SO answer for the below conversion routines
|
|
||||||
# to go from numpy struct-arrays to polars dataframes and back:
|
|
||||||
# https://stackoverflow.com/a/72054819
|
|
||||||
def np2pl(array: np.ndarray) -> pl.DataFrame:
|
|
||||||
start = time.time()
|
|
||||||
|
|
||||||
# XXX: thanks to this SO answer for this conversion tip:
|
|
||||||
# https://stackoverflow.com/a/72054819
|
|
||||||
df = pl.DataFrame({
|
|
||||||
field_name: array[field_name]
|
|
||||||
for field_name in array.dtype.fields
|
|
||||||
})
|
|
||||||
delay: float = round(
|
|
||||||
time.time() - start,
|
|
||||||
ndigits=6,
|
|
||||||
)
|
|
||||||
log.info(
|
|
||||||
f'numpy -> polars conversion took {delay} secs\n'
|
|
||||||
f'polars df: {df}'
|
|
||||||
)
|
|
||||||
return df
|
|
||||||
|
|
||||||
|
|
||||||
def pl2np(
|
|
||||||
df: pl.DataFrame,
|
|
||||||
dtype: np.dtype,
|
|
||||||
|
|
||||||
) -> np.ndarray:
|
|
||||||
|
|
||||||
# Create numpy struct array of the correct size and dtype
|
|
||||||
# and loop through df columns to fill in array fields.
|
|
||||||
array = np.empty(
|
|
||||||
df.height,
|
|
||||||
dtype,
|
|
||||||
)
|
|
||||||
for field, col in zip(
|
|
||||||
dtype.fields,
|
|
||||||
df.columns,
|
|
||||||
):
|
|
||||||
array[field] = df.get_column(col).to_numpy()
|
|
||||||
|
|
||||||
return array
|
|
|
@ -1,19 +1,18 @@
|
||||||
# piker: trading gear for hackers
|
# piker: trading gear for hackers
|
||||||
# Copyright (C) Tyler Goodlet (in stewardship for pikers)
|
# Copyright (C) Tyler Goodlet (in stewardship for pikers)
|
||||||
|
|
||||||
# This program is free software: you can redistribute it and/or
|
# This program is free software: you can redistribute it and/or modify
|
||||||
# modify it under the terms of the GNU Affero General Public
|
# it under the terms of the GNU Affero General Public License as published by
|
||||||
# License as published by the Free Software Foundation, either
|
# the Free Software Foundation, either version 3 of the License, or
|
||||||
# version 3 of the License, or (at your option) any later version.
|
# (at your option) any later version.
|
||||||
|
|
||||||
# This program is distributed in the hope that it will be useful,
|
# This program is distributed in the hope that it will be useful,
|
||||||
# but WITHOUT ANY WARRANTY; without even the implied warranty of
|
# but WITHOUT ANY WARRANTY; without even the implied warranty of
|
||||||
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
|
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
|
||||||
# Affero General Public License for more details.
|
# GNU Affero General Public License for more details.
|
||||||
|
|
||||||
# You should have received a copy of the GNU Affero General Public
|
# You should have received a copy of the GNU Affero General Public License
|
||||||
# License along with this program. If not, see
|
# along with this program. If not, see <https://www.gnu.org/licenses/>.
|
||||||
# <https://www.gnu.org/licenses/>.
|
|
||||||
|
|
||||||
'''
|
'''
|
||||||
Historical data business logic for load, backfill and tsdb storage.
|
Historical data business logic for load, backfill and tsdb storage.
|
||||||
|
@ -40,7 +39,6 @@ from pendulum import (
|
||||||
from_timestamp,
|
from_timestamp,
|
||||||
)
|
)
|
||||||
import numpy as np
|
import numpy as np
|
||||||
import polars as pl
|
|
||||||
|
|
||||||
from ..accounting import (
|
from ..accounting import (
|
||||||
MktPair,
|
MktPair,
|
||||||
|
@ -56,7 +54,6 @@ from ._source import def_iohlcv_fields
|
||||||
from ._sampling import (
|
from ._sampling import (
|
||||||
open_sample_stream,
|
open_sample_stream,
|
||||||
)
|
)
|
||||||
from . import tsp
|
|
||||||
from ..brokers._util import (
|
from ..brokers._util import (
|
||||||
DataUnavailable,
|
DataUnavailable,
|
||||||
)
|
)
|
||||||
|
@ -200,7 +197,7 @@ async def start_backfill(
|
||||||
|
|
||||||
# do a decently sized backfill and load it into storage.
|
# do a decently sized backfill and load it into storage.
|
||||||
periods = {
|
periods = {
|
||||||
1: {'days': 2},
|
1: {'days': 6},
|
||||||
60: {'years': 6},
|
60: {'years': 6},
|
||||||
}
|
}
|
||||||
period_duration: int = periods[timeframe]
|
period_duration: int = periods[timeframe]
|
||||||
|
@ -249,16 +246,13 @@ async def start_backfill(
|
||||||
# broker says there never was or is no more history to pull
|
# broker says there never was or is no more history to pull
|
||||||
except DataUnavailable:
|
except DataUnavailable:
|
||||||
log.warning(
|
log.warning(
|
||||||
f'NO-MORE-DATA: backend {mod.name} halted history:\n'
|
f'NO-MORE-DATA: backend {mod.name} halted history!?'
|
||||||
f'{timeframe}@{mkt.fqme}'
|
|
||||||
)
|
)
|
||||||
|
|
||||||
# ugh, what's a better way?
|
# ugh, what's a better way?
|
||||||
# TODO: fwiw, we probably want a way to signal a throttle
|
# TODO: fwiw, we probably want a way to signal a throttle
|
||||||
# condition (eg. with ib) so that we can halt the
|
# condition (eg. with ib) so that we can halt the
|
||||||
# request loop until the condition is resolved?
|
# request loop until the condition is resolved?
|
||||||
if timeframe > 1:
|
|
||||||
await tractor.pause()
|
|
||||||
return
|
return
|
||||||
|
|
||||||
# TODO: drop this? see todo above..
|
# TODO: drop this? see todo above..
|
||||||
|
@ -306,11 +300,9 @@ async def start_backfill(
|
||||||
array,
|
array,
|
||||||
prepend_until_dt=backfill_until_dt,
|
prepend_until_dt=backfill_until_dt,
|
||||||
)
|
)
|
||||||
ln: int = len(to_push)
|
ln = len(to_push)
|
||||||
if ln:
|
if ln:
|
||||||
log.info(
|
log.info(f'{ln} bars for {next_start_dt} -> {last_start_dt}')
|
||||||
f'{ln} bars for {next_start_dt} -> {last_start_dt}'
|
|
||||||
)
|
|
||||||
|
|
||||||
else:
|
else:
|
||||||
log.warning(
|
log.warning(
|
||||||
|
@ -396,29 +388,14 @@ async def start_backfill(
|
||||||
without_src=True,
|
without_src=True,
|
||||||
)
|
)
|
||||||
else:
|
else:
|
||||||
col_sym_key: str = mkt.get_fqme(
|
col_sym_key: str = mkt.get_fqme(delim_char='')
|
||||||
delim_char='',
|
|
||||||
)
|
|
||||||
|
|
||||||
|
# TODO: implement parquet append!?
|
||||||
await storage.write_ohlcv(
|
await storage.write_ohlcv(
|
||||||
col_sym_key,
|
col_sym_key,
|
||||||
shm.array,
|
shm.array,
|
||||||
timeframe,
|
timeframe,
|
||||||
)
|
)
|
||||||
df: pl.DataFrame = await storage.as_df(
|
|
||||||
fqme=mkt.fqme,
|
|
||||||
period=timeframe,
|
|
||||||
load_from_offline=False,
|
|
||||||
)
|
|
||||||
(
|
|
||||||
df,
|
|
||||||
gaps,
|
|
||||||
deduped,
|
|
||||||
diff,
|
|
||||||
) = tsp.dedupe(df)
|
|
||||||
if diff:
|
|
||||||
tsp.sort_diff(df)
|
|
||||||
|
|
||||||
else:
|
else:
|
||||||
# finally filled gap
|
# finally filled gap
|
||||||
log.info(
|
log.info(
|
||||||
|
@ -429,7 +406,7 @@ async def start_backfill(
|
||||||
# TODO: ideally these never exist but somehow it seems
|
# TODO: ideally these never exist but somehow it seems
|
||||||
# sometimes we're writing zero-ed segments on certain
|
# sometimes we're writing zero-ed segments on certain
|
||||||
# (teardown) cases?
|
# (teardown) cases?
|
||||||
from .tsp import detect_null_time_gap
|
from ._timeseries import detect_null_time_gap
|
||||||
|
|
||||||
gap_indices: tuple | None = detect_null_time_gap(shm)
|
gap_indices: tuple | None = detect_null_time_gap(shm)
|
||||||
while gap_indices:
|
while gap_indices:
|
||||||
|
@ -657,19 +634,12 @@ async def tsdb_backfill(
|
||||||
async with mod.open_history_client(
|
async with mod.open_history_client(
|
||||||
mkt,
|
mkt,
|
||||||
) as (get_hist, config):
|
) as (get_hist, config):
|
||||||
log.info(
|
log.info(f'{mod} history client returned backfill config: {config}')
|
||||||
f'`{mod}` history client returned backfill config:\n'
|
|
||||||
f'{config}\n'
|
|
||||||
)
|
|
||||||
|
|
||||||
# get latest query's worth of history all the way
|
# get latest query's worth of history all the way
|
||||||
# back to what is recorded in the tsdb
|
# back to what is recorded in the tsdb
|
||||||
try:
|
try:
|
||||||
(
|
array, mr_start_dt, mr_end_dt = await get_hist(
|
||||||
array,
|
|
||||||
mr_start_dt,
|
|
||||||
mr_end_dt,
|
|
||||||
) = await get_hist(
|
|
||||||
timeframe,
|
timeframe,
|
||||||
end_dt=None,
|
end_dt=None,
|
||||||
)
|
)
|
||||||
|
@ -679,7 +649,6 @@ async def tsdb_backfill(
|
||||||
# there's no backfilling possible.
|
# there's no backfilling possible.
|
||||||
except DataUnavailable:
|
except DataUnavailable:
|
||||||
task_status.started()
|
task_status.started()
|
||||||
await tractor.pause()
|
|
||||||
return
|
return
|
||||||
|
|
||||||
# TODO: fill in non-zero epoch time values ALWAYS!
|
# TODO: fill in non-zero epoch time values ALWAYS!
|
||||||
|
@ -730,8 +699,9 @@ async def tsdb_backfill(
|
||||||
)
|
)
|
||||||
except TimeseriesNotFound:
|
except TimeseriesNotFound:
|
||||||
log.warning(
|
log.warning(
|
||||||
f'No timeseries yet for {timeframe}@{fqme}'
|
f'No timeseries yet for {fqme}'
|
||||||
)
|
)
|
||||||
|
|
||||||
else:
|
else:
|
||||||
(
|
(
|
||||||
tsdb_history,
|
tsdb_history,
|
||||||
|
@ -761,9 +731,9 @@ async def tsdb_backfill(
|
||||||
# to push datums that have already been recorded in the
|
# to push datums that have already been recorded in the
|
||||||
# tsdb. In this case we instead only retreive and push
|
# tsdb. In this case we instead only retreive and push
|
||||||
# the series portion missing from the db's data set.
|
# the series portion missing from the db's data set.
|
||||||
# if offset_s < 0:
|
if offset_s < 0:
|
||||||
# non_overlap_diff: Duration = mr_end_dt - last_tsdb_dt
|
non_overlap_diff: Duration = mr_end_dt - last_tsdb_dt
|
||||||
# non_overlap_offset_s: float = backfill_diff.in_seconds()
|
non_overlap_offset_s: float = backfill_diff.in_seconds()
|
||||||
|
|
||||||
offset_samples: int = round(offset_s / timeframe)
|
offset_samples: int = round(offset_s / timeframe)
|
||||||
|
|
||||||
|
@ -814,24 +784,25 @@ async def tsdb_backfill(
|
||||||
f'timeframe of {timeframe} seconds..\n'
|
f'timeframe of {timeframe} seconds..\n'
|
||||||
'So yuh.. dun do dat brudder.'
|
'So yuh.. dun do dat brudder.'
|
||||||
)
|
)
|
||||||
|
|
||||||
# if there is a gap to backfill from the first
|
# if there is a gap to backfill from the first
|
||||||
# history frame until the last datum loaded from the tsdb
|
# history frame until the last datum loaded from the tsdb
|
||||||
# continue that now in the background
|
# continue that now in the background
|
||||||
bf_done = await tn.start(
|
bf_done = await tn.start(
|
||||||
partial(
|
partial(
|
||||||
start_backfill,
|
start_backfill,
|
||||||
get_hist=get_hist,
|
get_hist,
|
||||||
mod=mod,
|
mod,
|
||||||
mkt=mkt,
|
mkt,
|
||||||
shm=shm,
|
shm,
|
||||||
timeframe=timeframe,
|
timeframe,
|
||||||
|
|
||||||
backfill_from_shm_index=backfill_gap_from_shm_index,
|
backfill_from_shm_index=backfill_gap_from_shm_index,
|
||||||
backfill_from_dt=mr_start_dt,
|
backfill_from_dt=mr_start_dt,
|
||||||
|
|
||||||
sampler_stream=sampler_stream,
|
sampler_stream=sampler_stream,
|
||||||
|
|
||||||
backfill_until_dt=last_tsdb_dt,
|
backfill_until_dt=last_tsdb_dt,
|
||||||
storage=storage,
|
storage=storage,
|
||||||
write_tsdb=True,
|
|
||||||
)
|
)
|
||||||
)
|
)
|
||||||
|
|
||||||
|
@ -853,11 +824,8 @@ async def tsdb_backfill(
|
||||||
finally:
|
finally:
|
||||||
return
|
return
|
||||||
|
|
||||||
# XXX NOTE: this is legacy from when we were using
|
# IF we need to continue backloading incrementally from the
|
||||||
# marketstore and we needed to continue backloading
|
# tsdb client..
|
||||||
# incrementally from the tsdb client.. (bc it couldn't
|
|
||||||
# handle a single large query with gRPC for some
|
|
||||||
# reason.. classic goolag pos)
|
|
||||||
tn.start_soon(
|
tn.start_soon(
|
||||||
back_load_from_tsdb,
|
back_load_from_tsdb,
|
||||||
|
|
||||||
|
@ -1026,18 +994,19 @@ async def manage_history(
|
||||||
log.info(f'Connected to sampler stream: {sample_stream}')
|
log.info(f'Connected to sampler stream: {sample_stream}')
|
||||||
|
|
||||||
for timeframe in [60, 1]:
|
for timeframe in [60, 1]:
|
||||||
await tn.start(partial(
|
await tn.start(
|
||||||
tsdb_backfill,
|
tsdb_backfill,
|
||||||
mod=mod,
|
mod,
|
||||||
storemod=storemod,
|
storemod,
|
||||||
tn=tn,
|
tn,
|
||||||
# bus,
|
# bus,
|
||||||
storage=client,
|
client,
|
||||||
mkt=mkt,
|
mkt,
|
||||||
shm=tf2mem[timeframe],
|
tf2mem[timeframe],
|
||||||
timeframe=timeframe,
|
timeframe,
|
||||||
sampler_stream=sample_stream,
|
|
||||||
))
|
sample_stream,
|
||||||
|
)
|
||||||
|
|
||||||
# indicate to caller that feed can be delivered to
|
# indicate to caller that feed can be delivered to
|
||||||
# remote requesting client since we've loaded history
|
# remote requesting client since we've loaded history
|
||||||
|
|
|
@ -40,7 +40,6 @@ from piker.data import (
|
||||||
maybe_open_shm_array,
|
maybe_open_shm_array,
|
||||||
def_iohlcv_fields,
|
def_iohlcv_fields,
|
||||||
ShmArray,
|
ShmArray,
|
||||||
tsp,
|
|
||||||
)
|
)
|
||||||
from piker.data.history import (
|
from piker.data.history import (
|
||||||
_default_hist_size,
|
_default_hist_size,
|
||||||
|
@ -99,18 +98,6 @@ def ls(
|
||||||
trio.run(query_all)
|
trio.run(query_all)
|
||||||
|
|
||||||
|
|
||||||
# TODO: like ls but takes in a pattern and matches
|
|
||||||
# @store.command()
|
|
||||||
# def search(
|
|
||||||
# patt: str,
|
|
||||||
# backends: list[str] = typer.Argument(
|
|
||||||
# default=None,
|
|
||||||
# help='Storage backends to query, default is all.'
|
|
||||||
# ),
|
|
||||||
# ):
|
|
||||||
# ...
|
|
||||||
|
|
||||||
|
|
||||||
@store.command()
|
@store.command()
|
||||||
def delete(
|
def delete(
|
||||||
symbols: list[str],
|
symbols: list[str],
|
||||||
|
@ -149,6 +136,53 @@ def delete(
|
||||||
trio.run(main, symbols)
|
trio.run(main, symbols)
|
||||||
|
|
||||||
|
|
||||||
|
def dedupe(src_df: pl.DataFrame) -> tuple[
|
||||||
|
pl.DataFrame, # with dts
|
||||||
|
pl.DataFrame, # gaps
|
||||||
|
pl.DataFrame, # with deduplicated dts (aka gap/repeat removal)
|
||||||
|
bool,
|
||||||
|
]:
|
||||||
|
'''
|
||||||
|
Check for time series gaps and if found
|
||||||
|
de-duplicate any datetime entries, check for
|
||||||
|
a frame height diff and return the newly
|
||||||
|
dt-deduplicated frame.
|
||||||
|
|
||||||
|
'''
|
||||||
|
from piker.data import _timeseries as tsp
|
||||||
|
df: pl.DataFrame = tsp.with_dts(src_df)
|
||||||
|
gaps: pl.DataFrame = tsp.detect_time_gaps(df)
|
||||||
|
if not gaps.is_empty():
|
||||||
|
|
||||||
|
# remove duplicated datetime samples/sections
|
||||||
|
deduped: pl.DataFrame = tsp.dedup_dt(df)
|
||||||
|
deduped_gaps = tsp.detect_time_gaps(deduped)
|
||||||
|
|
||||||
|
log.warning(
|
||||||
|
f'Gaps found:\n{gaps}\n'
|
||||||
|
f'deduped Gaps found:\n{deduped_gaps}'
|
||||||
|
)
|
||||||
|
# TODO: rewrite this in polars and/or convert to
|
||||||
|
# ndarray to detect and remove?
|
||||||
|
# null_gaps = tsp.detect_null_time_gap()
|
||||||
|
|
||||||
|
diff: int = (
|
||||||
|
df.height
|
||||||
|
-
|
||||||
|
deduped.height
|
||||||
|
)
|
||||||
|
was_deduped: bool = False
|
||||||
|
if diff:
|
||||||
|
deduped: bool = True
|
||||||
|
|
||||||
|
return (
|
||||||
|
df,
|
||||||
|
gaps,
|
||||||
|
deduped,
|
||||||
|
was_deduped,
|
||||||
|
)
|
||||||
|
|
||||||
|
|
||||||
@store.command()
|
@store.command()
|
||||||
def anal(
|
def anal(
|
||||||
fqme: str,
|
fqme: str,
|
||||||
|
@ -201,10 +235,10 @@ def anal(
|
||||||
df,
|
df,
|
||||||
gaps,
|
gaps,
|
||||||
deduped,
|
deduped,
|
||||||
diff,
|
shortened,
|
||||||
) = tsp.dedupe(shm_df)
|
) = dedupe(shm_df)
|
||||||
|
|
||||||
if diff:
|
if shortened:
|
||||||
await client.write_ohlcv(
|
await client.write_ohlcv(
|
||||||
fqme,
|
fqme,
|
||||||
ohlcv=deduped,
|
ohlcv=deduped,
|
||||||
|
@ -272,8 +306,22 @@ def iter_dfs_from_shms(fqme: str) -> Generator[
|
||||||
assert not opened
|
assert not opened
|
||||||
ohlcv = shm.array
|
ohlcv = shm.array
|
||||||
|
|
||||||
from ..data import tsp
|
start = time.time()
|
||||||
df: pl.DataFrame = tsp.np2pl(ohlcv)
|
|
||||||
|
# XXX: thanks to this SO answer for this conversion tip:
|
||||||
|
# https://stackoverflow.com/a/72054819
|
||||||
|
df = pl.DataFrame({
|
||||||
|
field_name: ohlcv[field_name]
|
||||||
|
for field_name in ohlcv.dtype.fields
|
||||||
|
})
|
||||||
|
delay: float = round(
|
||||||
|
time.time() - start,
|
||||||
|
ndigits=6,
|
||||||
|
)
|
||||||
|
log.info(
|
||||||
|
f'numpy -> polars conversion took {delay} secs\n'
|
||||||
|
f'polars df: {df}'
|
||||||
|
)
|
||||||
|
|
||||||
yield (
|
yield (
|
||||||
shmfile,
|
shmfile,
|
||||||
|
@ -285,6 +333,7 @@ def iter_dfs_from_shms(fqme: str) -> Generator[
|
||||||
@store.command()
|
@store.command()
|
||||||
def ldshm(
|
def ldshm(
|
||||||
fqme: str,
|
fqme: str,
|
||||||
|
|
||||||
write_parquet: bool = False,
|
write_parquet: bool = False,
|
||||||
|
|
||||||
) -> None:
|
) -> None:
|
||||||
|
@ -313,6 +362,7 @@ def ldshm(
|
||||||
f'Something is wrong with time period for {shm}:\n{times}'
|
f'Something is wrong with time period for {shm}:\n{times}'
|
||||||
)
|
)
|
||||||
|
|
||||||
|
|
||||||
# over-write back to shm?
|
# over-write back to shm?
|
||||||
df: pl.DataFrame # with dts
|
df: pl.DataFrame # with dts
|
||||||
deduped: pl.DataFrame # deduplicated dts
|
deduped: pl.DataFrame # deduplicated dts
|
||||||
|
@ -320,8 +370,8 @@ def ldshm(
|
||||||
df,
|
df,
|
||||||
gaps,
|
gaps,
|
||||||
deduped,
|
deduped,
|
||||||
diff,
|
was_dded,
|
||||||
) = tsp.dedupe(shm_df)
|
) = dedupe(shm_df)
|
||||||
|
|
||||||
# TODO: maybe only optionally enter this depending
|
# TODO: maybe only optionally enter this depending
|
||||||
# on some CLI flags and/or gap detection?
|
# on some CLI flags and/or gap detection?
|
||||||
|
|
|
@ -65,11 +65,8 @@ from pendulum import (
|
||||||
)
|
)
|
||||||
|
|
||||||
from piker import config
|
from piker import config
|
||||||
from piker.data import (
|
from piker.data import def_iohlcv_fields
|
||||||
def_iohlcv_fields,
|
from piker.data import ShmArray
|
||||||
ShmArray,
|
|
||||||
tsp,
|
|
||||||
)
|
|
||||||
from piker.log import get_logger
|
from piker.log import get_logger
|
||||||
from . import TimeseriesNotFound
|
from . import TimeseriesNotFound
|
||||||
|
|
||||||
|
@ -77,6 +74,37 @@ from . import TimeseriesNotFound
|
||||||
log = get_logger('storage.nativedb')
|
log = get_logger('storage.nativedb')
|
||||||
|
|
||||||
|
|
||||||
|
# NOTE: thanks to this SO answer for the below conversion routines
|
||||||
|
# to go from numpy struct-arrays to polars dataframes and back:
|
||||||
|
# https://stackoverflow.com/a/72054819
|
||||||
|
def np2pl(array: np.ndarray) -> pl.DataFrame:
|
||||||
|
return pl.DataFrame({
|
||||||
|
field_name: array[field_name]
|
||||||
|
for field_name in array.dtype.fields
|
||||||
|
})
|
||||||
|
|
||||||
|
|
||||||
|
def pl2np(
|
||||||
|
df: pl.DataFrame,
|
||||||
|
dtype: np.dtype,
|
||||||
|
|
||||||
|
) -> np.ndarray:
|
||||||
|
|
||||||
|
# Create numpy struct array of the correct size and dtype
|
||||||
|
# and loop through df columns to fill in array fields.
|
||||||
|
array = np.empty(
|
||||||
|
df.height,
|
||||||
|
dtype,
|
||||||
|
)
|
||||||
|
for field, col in zip(
|
||||||
|
dtype.fields,
|
||||||
|
df.columns,
|
||||||
|
):
|
||||||
|
array[field] = df.get_column(col).to_numpy()
|
||||||
|
|
||||||
|
return array
|
||||||
|
|
||||||
|
|
||||||
def detect_period(shm: ShmArray) -> float:
|
def detect_period(shm: ShmArray) -> float:
|
||||||
'''
|
'''
|
||||||
Attempt to detect the series time step sampling period
|
Attempt to detect the series time step sampling period
|
||||||
|
@ -236,22 +264,6 @@ class NativeStorageClient:
|
||||||
datadir=self._datadir,
|
datadir=self._datadir,
|
||||||
)
|
)
|
||||||
|
|
||||||
def _cache_df(
|
|
||||||
self,
|
|
||||||
fqme: str,
|
|
||||||
df: pl.DataFrame,
|
|
||||||
timeframe: float,
|
|
||||||
) -> None:
|
|
||||||
# cache df for later usage since we (currently) need to
|
|
||||||
# convert to np.ndarrays to push to our `ShmArray` rt
|
|
||||||
# buffers subsys but later we may operate entirely on
|
|
||||||
# pyarrow arrays/buffers so keeping the dfs around for
|
|
||||||
# a variety of purposes is handy.
|
|
||||||
self._dfs.setdefault(
|
|
||||||
timeframe,
|
|
||||||
{},
|
|
||||||
)[fqme] = df
|
|
||||||
|
|
||||||
async def read_ohlcv(
|
async def read_ohlcv(
|
||||||
self,
|
self,
|
||||||
fqme: str,
|
fqme: str,
|
||||||
|
@ -266,14 +278,19 @@ class NativeStorageClient:
|
||||||
)
|
)
|
||||||
df: pl.DataFrame = pl.read_parquet(path)
|
df: pl.DataFrame = pl.read_parquet(path)
|
||||||
|
|
||||||
self._cache_df(
|
# cache df for later usage since we (currently) need to
|
||||||
fqme=fqme,
|
# convert to np.ndarrays to push to our `ShmArray` rt
|
||||||
df=df,
|
# buffers subsys but later we may operate entirely on
|
||||||
timeframe=timeframe,
|
# pyarrow arrays/buffers so keeping the dfs around for
|
||||||
)
|
# a variety of purposes is handy.
|
||||||
|
self._dfs.setdefault(
|
||||||
|
timeframe,
|
||||||
|
{},
|
||||||
|
)[fqme] = df
|
||||||
|
|
||||||
# TODO: filter by end and limit inputs
|
# TODO: filter by end and limit inputs
|
||||||
# times: pl.Series = df['time']
|
# times: pl.Series = df['time']
|
||||||
array: np.ndarray = tsp.pl2np(
|
array: np.ndarray = pl2np(
|
||||||
df,
|
df,
|
||||||
dtype=np.dtype(def_iohlcv_fields),
|
dtype=np.dtype(def_iohlcv_fields),
|
||||||
)
|
)
|
||||||
|
@ -283,15 +300,11 @@ class NativeStorageClient:
|
||||||
self,
|
self,
|
||||||
fqme: str,
|
fqme: str,
|
||||||
period: int = 60,
|
period: int = 60,
|
||||||
load_from_offline: bool = True,
|
|
||||||
|
|
||||||
) -> pl.DataFrame:
|
) -> pl.DataFrame:
|
||||||
try:
|
try:
|
||||||
return self._dfs[period][fqme]
|
return self._dfs[period][fqme]
|
||||||
except KeyError:
|
except KeyError:
|
||||||
if not load_from_offline:
|
|
||||||
raise
|
|
||||||
|
|
||||||
await self.read_ohlcv(fqme, period)
|
await self.read_ohlcv(fqme, period)
|
||||||
return self._dfs[period][fqme]
|
return self._dfs[period][fqme]
|
||||||
|
|
||||||
|
@ -313,22 +326,14 @@ class NativeStorageClient:
|
||||||
datadir=self._datadir,
|
datadir=self._datadir,
|
||||||
)
|
)
|
||||||
if isinstance(ohlcv, np.ndarray):
|
if isinstance(ohlcv, np.ndarray):
|
||||||
df: pl.DataFrame = tsp.np2pl(ohlcv)
|
df: pl.DataFrame = np2pl(ohlcv)
|
||||||
else:
|
else:
|
||||||
df = ohlcv
|
df = ohlcv
|
||||||
|
|
||||||
self._cache_df(
|
|
||||||
fqme=fqme,
|
|
||||||
df=df,
|
|
||||||
timeframe=timeframe,
|
|
||||||
)
|
|
||||||
|
|
||||||
# TODO: in terms of managing the ultra long term data
|
# TODO: in terms of managing the ultra long term data
|
||||||
# -[ ] use a proper profiler to measure all this IO and
|
# - use a proper profiler to measure all this IO and
|
||||||
# roundtripping!
|
# roundtripping!
|
||||||
# -[ ] implement parquet append!? see issue:
|
# - try out ``fastparquet``'s append writing:
|
||||||
# https://github.com/pikers/piker/issues/536
|
|
||||||
# -[ ] try out ``fastparquet``'s append writing:
|
|
||||||
# https://fastparquet.readthedocs.io/en/latest/api.html#fastparquet.write
|
# https://fastparquet.readthedocs.io/en/latest/api.html#fastparquet.write
|
||||||
start = time.time()
|
start = time.time()
|
||||||
df.write_parquet(path)
|
df.write_parquet(path)
|
||||||
|
|
|
@ -49,7 +49,7 @@ from ..data._formatters import (
|
||||||
OHLCBarsAsCurveFmtr, # OHLC converted to line
|
OHLCBarsAsCurveFmtr, # OHLC converted to line
|
||||||
StepCurveFmtr, # "step" curve (like for vlm)
|
StepCurveFmtr, # "step" curve (like for vlm)
|
||||||
)
|
)
|
||||||
from ..data.tsp import (
|
from ..data._timeseries import (
|
||||||
slice_from_time,
|
slice_from_time,
|
||||||
)
|
)
|
||||||
from ._ohlc import (
|
from ._ohlc import (
|
||||||
|
|
|
@ -31,7 +31,7 @@ import pendulum
|
||||||
import pyqtgraph as pg
|
import pyqtgraph as pg
|
||||||
|
|
||||||
from piker.types import Struct
|
from piker.types import Struct
|
||||||
from ..data.tsp import slice_from_time
|
from ..data._timeseries import slice_from_time
|
||||||
from ..log import get_logger
|
from ..log import get_logger
|
||||||
from ..toolz import Profiler
|
from ..toolz import Profiler
|
||||||
|
|
||||||
|
|
Loading…
Reference in New Issue