Compare commits
4 Commits
b9af6176c5
...
cb941a5554
Author | SHA1 | Date |
---|---|---|
Tyler Goodlet | cb941a5554 | |
Tyler Goodlet | 2d72a052aa | |
Tyler Goodlet | 2eeef2a123 | |
Tyler Goodlet | b6d2550f33 |
|
@ -263,6 +263,19 @@ def with_dts(
|
||||||
# )
|
# )
|
||||||
|
|
||||||
|
|
||||||
|
def dedup_dt(
|
||||||
|
df: pl.DataFrame,
|
||||||
|
) -> pl.DataFrame:
|
||||||
|
'''
|
||||||
|
Drop duplicate date-time rows (normally from an OHLC frame).
|
||||||
|
|
||||||
|
'''
|
||||||
|
return df.unique(
|
||||||
|
subset=['dt'],
|
||||||
|
maintain_order=True,
|
||||||
|
)
|
||||||
|
|
||||||
|
|
||||||
def detect_time_gaps(
|
def detect_time_gaps(
|
||||||
df: pl.DataFrame,
|
df: pl.DataFrame,
|
||||||
|
|
||||||
|
@ -294,10 +307,12 @@ def detect_time_gaps(
|
||||||
'''
|
'''
|
||||||
return (
|
return (
|
||||||
with_dts(df)
|
with_dts(df)
|
||||||
|
# First by a seconds unit step size
|
||||||
.filter(
|
.filter(
|
||||||
pl.col('s_diff').abs() > expect_period
|
pl.col('s_diff').abs() > expect_period
|
||||||
)
|
)
|
||||||
.filter(
|
.filter(
|
||||||
|
# Second by an arbitrary dt-unit step size
|
||||||
getattr(
|
getattr(
|
||||||
pl.col('dt_diff').dt,
|
pl.col('dt_diff').dt,
|
||||||
gap_dt_unit,
|
gap_dt_unit,
|
||||||
|
|
|
@ -716,9 +716,14 @@ async def tsdb_backfill(
|
||||||
backfill_diff: Duration = mr_start_dt - last_tsdb_dt
|
backfill_diff: Duration = mr_start_dt - last_tsdb_dt
|
||||||
offset_s: float = backfill_diff.in_seconds()
|
offset_s: float = backfill_diff.in_seconds()
|
||||||
|
|
||||||
# XXX EDGE CASE: when the venue was closed (say over
|
# XXX EDGE CASEs: the most recent frame overlaps with
|
||||||
# the weeknd) causing a timeseries gap, AND the query
|
# prior tsdb history!!
|
||||||
# frames size (eg. for 1s we rx 2k datums ~= 33.33m) IS
|
# - so the latest frame's start time is earlier then
|
||||||
|
# the tsdb's latest sample.
|
||||||
|
# - alternatively this may also more generally occur
|
||||||
|
# when the venue was closed (say over the weeknd)
|
||||||
|
# causing a timeseries gap, AND the query frames size
|
||||||
|
# (eg. for ib's 1s we rx 2k datums ~= 33.33m) IS
|
||||||
# GREATER THAN the current venue-market's operating
|
# GREATER THAN the current venue-market's operating
|
||||||
# session (time) we will receive datums from BEFORE THE
|
# session (time) we will receive datums from BEFORE THE
|
||||||
# CLOSURE GAP and thus the `offset_s` value will be
|
# CLOSURE GAP and thus the `offset_s` value will be
|
||||||
|
@ -727,20 +732,35 @@ async def tsdb_backfill(
|
||||||
# tsdb. In this case we instead only retreive and push
|
# tsdb. In this case we instead only retreive and push
|
||||||
# the series portion missing from the db's data set.
|
# the series portion missing from the db's data set.
|
||||||
if offset_s < 0:
|
if offset_s < 0:
|
||||||
backfill_diff: Duration = mr_end_dt - last_tsdb_dt
|
non_overlap_diff: Duration = mr_end_dt - last_tsdb_dt
|
||||||
offset_s: float = backfill_diff.in_seconds()
|
non_overlap_offset_s: float = backfill_diff.in_seconds()
|
||||||
|
|
||||||
offset_samples: int = round(offset_s / timeframe)
|
offset_samples: int = round(offset_s / timeframe)
|
||||||
|
|
||||||
# TODO: see if there's faster multi-field reads:
|
# TODO: see if there's faster multi-field reads:
|
||||||
# https://numpy.org/doc/stable/user/basics.rec.html#accessing-multiple-fields
|
# https://numpy.org/doc/stable/user/basics.rec.html#accessing-multiple-fields
|
||||||
# re-index with a `time` and index field
|
# re-index with a `time` and index field
|
||||||
|
if offset_s > 0:
|
||||||
|
# NOTE XXX: ONLY when there is an actual gap
|
||||||
|
# between the earliest sample in the latest history
|
||||||
|
# frame do we want to NOT stick the latest tsdb
|
||||||
|
# history adjacent to that latest frame!
|
||||||
prepend_start = shm._first.value - offset_samples + 1
|
prepend_start = shm._first.value - offset_samples + 1
|
||||||
|
to_push = tsdb_history[-prepend_start:]
|
||||||
|
else:
|
||||||
|
# when there is overlap we want to remove the
|
||||||
|
# overlapping samples from the tsdb portion (taking
|
||||||
|
# instead the latest frame's values since THEY
|
||||||
|
# SHOULD BE THE SAME) and prepend DIRECTLY adjacent
|
||||||
|
# to the latest frame!
|
||||||
|
# TODO: assert the overlap segment array contains
|
||||||
|
# the same values!?!
|
||||||
|
prepend_start = shm._first.value
|
||||||
|
to_push = tsdb_history[-(shm._first.value):offset_samples - 1]
|
||||||
|
|
||||||
# tsdb history is so far in the past we can't fit it in
|
# tsdb history is so far in the past we can't fit it in
|
||||||
# shm buffer space so simply don't load it!
|
# shm buffer space so simply don't load it!
|
||||||
if prepend_start > 0:
|
if prepend_start > 0:
|
||||||
to_push = tsdb_history[-prepend_start:]
|
|
||||||
shm.push(
|
shm.push(
|
||||||
to_push,
|
to_push,
|
||||||
|
|
||||||
|
|
|
@ -282,16 +282,21 @@ async def maybe_open_pikerd(
|
||||||
loglevel=loglevel,
|
loglevel=loglevel,
|
||||||
**kwargs,
|
**kwargs,
|
||||||
) as (actor, addrs),
|
) as (actor, addrs),
|
||||||
|
):
|
||||||
|
if _root_dname in actor.uid:
|
||||||
|
yield None
|
||||||
|
return
|
||||||
|
|
||||||
# try to attach to any existing (host-local) `pikerd`
|
# NOTE: IFF running in disti mode, try to attach to any
|
||||||
tractor.find_actor(
|
# existing (host-local) `pikerd`.
|
||||||
|
else:
|
||||||
|
async with tractor.find_actor(
|
||||||
_root_dname,
|
_root_dname,
|
||||||
registry_addrs=registry_addrs,
|
registry_addrs=registry_addrs,
|
||||||
only_first=True,
|
only_first=True,
|
||||||
# raise_on_none=True,
|
# raise_on_none=True,
|
||||||
) as pikerd_portal,
|
) as pikerd_portal:
|
||||||
|
|
||||||
):
|
|
||||||
# connect to any existing remote daemon presuming its
|
# connect to any existing remote daemon presuming its
|
||||||
# registry socket was selected.
|
# registry socket was selected.
|
||||||
if pikerd_portal is not None:
|
if pikerd_portal is not None:
|
||||||
|
|
|
@ -136,6 +136,53 @@ def delete(
|
||||||
trio.run(main, symbols)
|
trio.run(main, symbols)
|
||||||
|
|
||||||
|
|
||||||
|
def dedupe(src_df: pl.DataFrame) -> tuple[
|
||||||
|
pl.DataFrame, # with dts
|
||||||
|
pl.DataFrame, # gaps
|
||||||
|
pl.DataFrame, # with deduplicated dts (aka gap/repeat removal)
|
||||||
|
bool,
|
||||||
|
]:
|
||||||
|
'''
|
||||||
|
Check for time series gaps and if found
|
||||||
|
de-duplicate any datetime entries, check for
|
||||||
|
a frame height diff and return the newly
|
||||||
|
dt-deduplicated frame.
|
||||||
|
|
||||||
|
'''
|
||||||
|
from piker.data import _timeseries as tsp
|
||||||
|
df: pl.DataFrame = tsp.with_dts(src_df)
|
||||||
|
gaps: pl.DataFrame = tsp.detect_time_gaps(df)
|
||||||
|
if not gaps.is_empty():
|
||||||
|
|
||||||
|
# remove duplicated datetime samples/sections
|
||||||
|
deduped: pl.DataFrame = tsp.dedup_dt(df)
|
||||||
|
deduped_gaps = tsp.detect_time_gaps(deduped)
|
||||||
|
|
||||||
|
log.warning(
|
||||||
|
f'Gaps found:\n{gaps}\n'
|
||||||
|
f'deduped Gaps found:\n{deduped_gaps}'
|
||||||
|
)
|
||||||
|
# TODO: rewrite this in polars and/or convert to
|
||||||
|
# ndarray to detect and remove?
|
||||||
|
# null_gaps = tsp.detect_null_time_gap()
|
||||||
|
|
||||||
|
diff: int = (
|
||||||
|
df.height
|
||||||
|
-
|
||||||
|
deduped.height
|
||||||
|
)
|
||||||
|
was_deduped: bool = False
|
||||||
|
if diff:
|
||||||
|
deduped: bool = True
|
||||||
|
|
||||||
|
return (
|
||||||
|
df,
|
||||||
|
gaps,
|
||||||
|
deduped,
|
||||||
|
was_deduped,
|
||||||
|
)
|
||||||
|
|
||||||
|
|
||||||
@store.command()
|
@store.command()
|
||||||
def anal(
|
def anal(
|
||||||
fqme: str,
|
fqme: str,
|
||||||
|
@ -144,7 +191,11 @@ def anal(
|
||||||
|
|
||||||
) -> np.ndarray:
|
) -> np.ndarray:
|
||||||
'''
|
'''
|
||||||
Anal-ysis is when you take the data do stuff to it, i think.
|
Anal-ysis is when you take the data do stuff to it.
|
||||||
|
|
||||||
|
NOTE: This ONLY loads the offline timeseries data (by default
|
||||||
|
from a parquet file) NOT the in-shm version you might be seeing
|
||||||
|
in a chart.
|
||||||
|
|
||||||
'''
|
'''
|
||||||
async def main():
|
async def main():
|
||||||
|
@ -162,6 +213,7 @@ def anal(
|
||||||
syms: list[str] = await client.list_keys()
|
syms: list[str] = await client.list_keys()
|
||||||
log.info(f'{len(syms)} FOUND for {mod.name}')
|
log.info(f'{len(syms)} FOUND for {mod.name}')
|
||||||
|
|
||||||
|
history: ShmArray # np buffer format
|
||||||
(
|
(
|
||||||
history,
|
history,
|
||||||
first_dt,
|
first_dt,
|
||||||
|
@ -172,13 +224,26 @@ def anal(
|
||||||
)
|
)
|
||||||
assert first_dt < last_dt
|
assert first_dt < last_dt
|
||||||
|
|
||||||
src_df = await client.as_df(fqme, period)
|
shm_df: pl.DataFrame = await client.as_df(
|
||||||
from piker.data import _timeseries as tsmod
|
fqme,
|
||||||
df: pl.DataFrame = tsmod.with_dts(src_df)
|
period,
|
||||||
gaps: pl.DataFrame = tsmod.detect_time_gaps(df)
|
)
|
||||||
|
|
||||||
if not gaps.is_empty():
|
df: pl.DataFrame # with dts
|
||||||
print(f'Gaps found:\n{gaps}')
|
deduped: pl.DataFrame # deduplicated dts
|
||||||
|
(
|
||||||
|
df,
|
||||||
|
gaps,
|
||||||
|
deduped,
|
||||||
|
shortened,
|
||||||
|
) = dedupe(shm_df)
|
||||||
|
|
||||||
|
if shortened:
|
||||||
|
await client.write_ohlcv(
|
||||||
|
fqme,
|
||||||
|
ohlcv=deduped,
|
||||||
|
timeframe=period,
|
||||||
|
)
|
||||||
|
|
||||||
# TODO: something better with tab completion..
|
# TODO: something better with tab completion..
|
||||||
# is there something more minimal but nearly as
|
# is there something more minimal but nearly as
|
||||||
|
@ -275,7 +340,7 @@ def ldshm(
|
||||||
'''
|
'''
|
||||||
Linux ONLY: load any fqme file name matching shm buffer from
|
Linux ONLY: load any fqme file name matching shm buffer from
|
||||||
/dev/shm/ into an OHLCV numpy array and polars DataFrame,
|
/dev/shm/ into an OHLCV numpy array and polars DataFrame,
|
||||||
optionally write to .parquet file.
|
optionally write to offline storage via `.parquet` file.
|
||||||
|
|
||||||
'''
|
'''
|
||||||
async def main():
|
async def main():
|
||||||
|
@ -287,7 +352,7 @@ def ldshm(
|
||||||
),
|
),
|
||||||
):
|
):
|
||||||
df: pl.DataFrame | None = None
|
df: pl.DataFrame | None = None
|
||||||
for shmfile, shm, src_df in iter_dfs_from_shms(fqme):
|
for shmfile, shm, shm_df in iter_dfs_from_shms(fqme):
|
||||||
|
|
||||||
# compute ohlc properties for naming
|
# compute ohlc properties for naming
|
||||||
times: np.ndarray = shm.array['time']
|
times: np.ndarray = shm.array['time']
|
||||||
|
@ -297,9 +362,16 @@ def ldshm(
|
||||||
f'Something is wrong with time period for {shm}:\n{times}'
|
f'Something is wrong with time period for {shm}:\n{times}'
|
||||||
)
|
)
|
||||||
|
|
||||||
from piker.data import _timeseries as tsmod
|
|
||||||
df: pl.DataFrame = tsmod.with_dts(src_df)
|
# over-write back to shm?
|
||||||
gaps: pl.DataFrame = tsmod.detect_time_gaps(df)
|
df: pl.DataFrame # with dts
|
||||||
|
deduped: pl.DataFrame # deduplicated dts
|
||||||
|
(
|
||||||
|
df,
|
||||||
|
gaps,
|
||||||
|
deduped,
|
||||||
|
was_dded,
|
||||||
|
) = dedupe(shm_df)
|
||||||
|
|
||||||
# TODO: maybe only optionally enter this depending
|
# TODO: maybe only optionally enter this depending
|
||||||
# on some CLI flags and/or gap detection?
|
# on some CLI flags and/or gap detection?
|
||||||
|
|
|
@ -272,9 +272,21 @@ class NativeStorageClient:
|
||||||
# limit: int = int(200e3),
|
# limit: int = int(200e3),
|
||||||
|
|
||||||
) -> np.ndarray:
|
) -> np.ndarray:
|
||||||
path: Path = self.mk_path(fqme, period=int(timeframe))
|
path: Path = self.mk_path(
|
||||||
|
fqme,
|
||||||
|
period=int(timeframe),
|
||||||
|
)
|
||||||
df: pl.DataFrame = pl.read_parquet(path)
|
df: pl.DataFrame = pl.read_parquet(path)
|
||||||
self._dfs.setdefault(timeframe, {})[fqme] = df
|
|
||||||
|
# cache df for later usage since we (currently) need to
|
||||||
|
# convert to np.ndarrays to push to our `ShmArray` rt
|
||||||
|
# buffers subsys but later we may operate entirely on
|
||||||
|
# pyarrow arrays/buffers so keeping the dfs around for
|
||||||
|
# a variety of purposes is handy.
|
||||||
|
self._dfs.setdefault(
|
||||||
|
timeframe,
|
||||||
|
{},
|
||||||
|
)[fqme] = df
|
||||||
|
|
||||||
# TODO: filter by end and limit inputs
|
# TODO: filter by end and limit inputs
|
||||||
# times: pl.Series = df['time']
|
# times: pl.Series = df['time']
|
||||||
|
@ -329,7 +341,7 @@ class NativeStorageClient:
|
||||||
time.time() - start,
|
time.time() - start,
|
||||||
ndigits=6,
|
ndigits=6,
|
||||||
)
|
)
|
||||||
print(
|
log.info(
|
||||||
f'parquet write took {delay} secs\n'
|
f'parquet write took {delay} secs\n'
|
||||||
f'file path: {path}'
|
f'file path: {path}'
|
||||||
)
|
)
|
||||||
|
@ -339,7 +351,7 @@ class NativeStorageClient:
|
||||||
async def write_ohlcv(
|
async def write_ohlcv(
|
||||||
self,
|
self,
|
||||||
fqme: str,
|
fqme: str,
|
||||||
ohlcv: np.ndarray,
|
ohlcv: np.ndarray | pl.DataFrame,
|
||||||
timeframe: int,
|
timeframe: int,
|
||||||
|
|
||||||
) -> Path:
|
) -> Path:
|
||||||
|
|
Loading…
Reference in New Issue