Compare commits
No commits in common. "cb941a55549b1da48c6ae837865a48dbc1a63026" and "b9af6176c5e0929595fba4112e4eb605505ac1d3" have entirely different histories.
cb941a5554
...
b9af6176c5
|
@ -263,19 +263,6 @@ def with_dts(
|
||||||
# )
|
# )
|
||||||
|
|
||||||
|
|
||||||
def dedup_dt(
|
|
||||||
df: pl.DataFrame,
|
|
||||||
) -> pl.DataFrame:
|
|
||||||
'''
|
|
||||||
Drop duplicate date-time rows (normally from an OHLC frame).
|
|
||||||
|
|
||||||
'''
|
|
||||||
return df.unique(
|
|
||||||
subset=['dt'],
|
|
||||||
maintain_order=True,
|
|
||||||
)
|
|
||||||
|
|
||||||
|
|
||||||
def detect_time_gaps(
|
def detect_time_gaps(
|
||||||
df: pl.DataFrame,
|
df: pl.DataFrame,
|
||||||
|
|
||||||
|
@ -307,12 +294,10 @@ def detect_time_gaps(
|
||||||
'''
|
'''
|
||||||
return (
|
return (
|
||||||
with_dts(df)
|
with_dts(df)
|
||||||
# First by a seconds unit step size
|
|
||||||
.filter(
|
.filter(
|
||||||
pl.col('s_diff').abs() > expect_period
|
pl.col('s_diff').abs() > expect_period
|
||||||
)
|
)
|
||||||
.filter(
|
.filter(
|
||||||
# Second by an arbitrary dt-unit step size
|
|
||||||
getattr(
|
getattr(
|
||||||
pl.col('dt_diff').dt,
|
pl.col('dt_diff').dt,
|
||||||
gap_dt_unit,
|
gap_dt_unit,
|
||||||
|
|
|
@ -716,14 +716,9 @@ async def tsdb_backfill(
|
||||||
backfill_diff: Duration = mr_start_dt - last_tsdb_dt
|
backfill_diff: Duration = mr_start_dt - last_tsdb_dt
|
||||||
offset_s: float = backfill_diff.in_seconds()
|
offset_s: float = backfill_diff.in_seconds()
|
||||||
|
|
||||||
# XXX EDGE CASEs: the most recent frame overlaps with
|
# XXX EDGE CASE: when the venue was closed (say over
|
||||||
# prior tsdb history!!
|
# the weeknd) causing a timeseries gap, AND the query
|
||||||
# - so the latest frame's start time is earlier then
|
# frames size (eg. for 1s we rx 2k datums ~= 33.33m) IS
|
||||||
# the tsdb's latest sample.
|
|
||||||
# - alternatively this may also more generally occur
|
|
||||||
# when the venue was closed (say over the weeknd)
|
|
||||||
# causing a timeseries gap, AND the query frames size
|
|
||||||
# (eg. for ib's 1s we rx 2k datums ~= 33.33m) IS
|
|
||||||
# GREATER THAN the current venue-market's operating
|
# GREATER THAN the current venue-market's operating
|
||||||
# session (time) we will receive datums from BEFORE THE
|
# session (time) we will receive datums from BEFORE THE
|
||||||
# CLOSURE GAP and thus the `offset_s` value will be
|
# CLOSURE GAP and thus the `offset_s` value will be
|
||||||
|
@ -732,35 +727,20 @@ async def tsdb_backfill(
|
||||||
# tsdb. In this case we instead only retreive and push
|
# tsdb. In this case we instead only retreive and push
|
||||||
# the series portion missing from the db's data set.
|
# the series portion missing from the db's data set.
|
||||||
if offset_s < 0:
|
if offset_s < 0:
|
||||||
non_overlap_diff: Duration = mr_end_dt - last_tsdb_dt
|
backfill_diff: Duration = mr_end_dt - last_tsdb_dt
|
||||||
non_overlap_offset_s: float = backfill_diff.in_seconds()
|
offset_s: float = backfill_diff.in_seconds()
|
||||||
|
|
||||||
offset_samples: int = round(offset_s / timeframe)
|
offset_samples: int = round(offset_s / timeframe)
|
||||||
|
|
||||||
# TODO: see if there's faster multi-field reads:
|
# TODO: see if there's faster multi-field reads:
|
||||||
# https://numpy.org/doc/stable/user/basics.rec.html#accessing-multiple-fields
|
# https://numpy.org/doc/stable/user/basics.rec.html#accessing-multiple-fields
|
||||||
# re-index with a `time` and index field
|
# re-index with a `time` and index field
|
||||||
if offset_s > 0:
|
prepend_start = shm._first.value - offset_samples + 1
|
||||||
# NOTE XXX: ONLY when there is an actual gap
|
|
||||||
# between the earliest sample in the latest history
|
|
||||||
# frame do we want to NOT stick the latest tsdb
|
|
||||||
# history adjacent to that latest frame!
|
|
||||||
prepend_start = shm._first.value - offset_samples + 1
|
|
||||||
to_push = tsdb_history[-prepend_start:]
|
|
||||||
else:
|
|
||||||
# when there is overlap we want to remove the
|
|
||||||
# overlapping samples from the tsdb portion (taking
|
|
||||||
# instead the latest frame's values since THEY
|
|
||||||
# SHOULD BE THE SAME) and prepend DIRECTLY adjacent
|
|
||||||
# to the latest frame!
|
|
||||||
# TODO: assert the overlap segment array contains
|
|
||||||
# the same values!?!
|
|
||||||
prepend_start = shm._first.value
|
|
||||||
to_push = tsdb_history[-(shm._first.value):offset_samples - 1]
|
|
||||||
|
|
||||||
# tsdb history is so far in the past we can't fit it in
|
# tsdb history is so far in the past we can't fit it in
|
||||||
# shm buffer space so simply don't load it!
|
# shm buffer space so simply don't load it!
|
||||||
if prepend_start > 0:
|
if prepend_start > 0:
|
||||||
|
to_push = tsdb_history[-prepend_start:]
|
||||||
shm.push(
|
shm.push(
|
||||||
to_push,
|
to_push,
|
||||||
|
|
||||||
|
|
|
@ -282,33 +282,28 @@ async def maybe_open_pikerd(
|
||||||
loglevel=loglevel,
|
loglevel=loglevel,
|
||||||
**kwargs,
|
**kwargs,
|
||||||
) as (actor, addrs),
|
) as (actor, addrs),
|
||||||
|
|
||||||
|
# try to attach to any existing (host-local) `pikerd`
|
||||||
|
tractor.find_actor(
|
||||||
|
_root_dname,
|
||||||
|
registry_addrs=registry_addrs,
|
||||||
|
only_first=True,
|
||||||
|
# raise_on_none=True,
|
||||||
|
) as pikerd_portal,
|
||||||
|
|
||||||
):
|
):
|
||||||
if _root_dname in actor.uid:
|
# connect to any existing remote daemon presuming its
|
||||||
yield None
|
# registry socket was selected.
|
||||||
|
if pikerd_portal is not None:
|
||||||
|
|
||||||
|
# sanity check that we are actually connecting to
|
||||||
|
# a remote process and not ourselves.
|
||||||
|
assert actor.uid != pikerd_portal.channel.uid
|
||||||
|
assert registry_addrs
|
||||||
|
|
||||||
|
yield pikerd_portal
|
||||||
return
|
return
|
||||||
|
|
||||||
# NOTE: IFF running in disti mode, try to attach to any
|
|
||||||
# existing (host-local) `pikerd`.
|
|
||||||
else:
|
|
||||||
async with tractor.find_actor(
|
|
||||||
_root_dname,
|
|
||||||
registry_addrs=registry_addrs,
|
|
||||||
only_first=True,
|
|
||||||
# raise_on_none=True,
|
|
||||||
) as pikerd_portal:
|
|
||||||
|
|
||||||
# connect to any existing remote daemon presuming its
|
|
||||||
# registry socket was selected.
|
|
||||||
if pikerd_portal is not None:
|
|
||||||
|
|
||||||
# sanity check that we are actually connecting to
|
|
||||||
# a remote process and not ourselves.
|
|
||||||
assert actor.uid != pikerd_portal.channel.uid
|
|
||||||
assert registry_addrs
|
|
||||||
|
|
||||||
yield pikerd_portal
|
|
||||||
return
|
|
||||||
|
|
||||||
# presume pikerd role since no daemon could be found at
|
# presume pikerd role since no daemon could be found at
|
||||||
# configured address
|
# configured address
|
||||||
async with open_pikerd(
|
async with open_pikerd(
|
||||||
|
|
|
@ -136,53 +136,6 @@ def delete(
|
||||||
trio.run(main, symbols)
|
trio.run(main, symbols)
|
||||||
|
|
||||||
|
|
||||||
def dedupe(src_df: pl.DataFrame) -> tuple[
|
|
||||||
pl.DataFrame, # with dts
|
|
||||||
pl.DataFrame, # gaps
|
|
||||||
pl.DataFrame, # with deduplicated dts (aka gap/repeat removal)
|
|
||||||
bool,
|
|
||||||
]:
|
|
||||||
'''
|
|
||||||
Check for time series gaps and if found
|
|
||||||
de-duplicate any datetime entries, check for
|
|
||||||
a frame height diff and return the newly
|
|
||||||
dt-deduplicated frame.
|
|
||||||
|
|
||||||
'''
|
|
||||||
from piker.data import _timeseries as tsp
|
|
||||||
df: pl.DataFrame = tsp.with_dts(src_df)
|
|
||||||
gaps: pl.DataFrame = tsp.detect_time_gaps(df)
|
|
||||||
if not gaps.is_empty():
|
|
||||||
|
|
||||||
# remove duplicated datetime samples/sections
|
|
||||||
deduped: pl.DataFrame = tsp.dedup_dt(df)
|
|
||||||
deduped_gaps = tsp.detect_time_gaps(deduped)
|
|
||||||
|
|
||||||
log.warning(
|
|
||||||
f'Gaps found:\n{gaps}\n'
|
|
||||||
f'deduped Gaps found:\n{deduped_gaps}'
|
|
||||||
)
|
|
||||||
# TODO: rewrite this in polars and/or convert to
|
|
||||||
# ndarray to detect and remove?
|
|
||||||
# null_gaps = tsp.detect_null_time_gap()
|
|
||||||
|
|
||||||
diff: int = (
|
|
||||||
df.height
|
|
||||||
-
|
|
||||||
deduped.height
|
|
||||||
)
|
|
||||||
was_deduped: bool = False
|
|
||||||
if diff:
|
|
||||||
deduped: bool = True
|
|
||||||
|
|
||||||
return (
|
|
||||||
df,
|
|
||||||
gaps,
|
|
||||||
deduped,
|
|
||||||
was_deduped,
|
|
||||||
)
|
|
||||||
|
|
||||||
|
|
||||||
@store.command()
|
@store.command()
|
||||||
def anal(
|
def anal(
|
||||||
fqme: str,
|
fqme: str,
|
||||||
|
@ -191,11 +144,7 @@ def anal(
|
||||||
|
|
||||||
) -> np.ndarray:
|
) -> np.ndarray:
|
||||||
'''
|
'''
|
||||||
Anal-ysis is when you take the data do stuff to it.
|
Anal-ysis is when you take the data do stuff to it, i think.
|
||||||
|
|
||||||
NOTE: This ONLY loads the offline timeseries data (by default
|
|
||||||
from a parquet file) NOT the in-shm version you might be seeing
|
|
||||||
in a chart.
|
|
||||||
|
|
||||||
'''
|
'''
|
||||||
async def main():
|
async def main():
|
||||||
|
@ -213,7 +162,6 @@ def anal(
|
||||||
syms: list[str] = await client.list_keys()
|
syms: list[str] = await client.list_keys()
|
||||||
log.info(f'{len(syms)} FOUND for {mod.name}')
|
log.info(f'{len(syms)} FOUND for {mod.name}')
|
||||||
|
|
||||||
history: ShmArray # np buffer format
|
|
||||||
(
|
(
|
||||||
history,
|
history,
|
||||||
first_dt,
|
first_dt,
|
||||||
|
@ -224,26 +172,13 @@ def anal(
|
||||||
)
|
)
|
||||||
assert first_dt < last_dt
|
assert first_dt < last_dt
|
||||||
|
|
||||||
shm_df: pl.DataFrame = await client.as_df(
|
src_df = await client.as_df(fqme, period)
|
||||||
fqme,
|
from piker.data import _timeseries as tsmod
|
||||||
period,
|
df: pl.DataFrame = tsmod.with_dts(src_df)
|
||||||
)
|
gaps: pl.DataFrame = tsmod.detect_time_gaps(df)
|
||||||
|
|
||||||
df: pl.DataFrame # with dts
|
if not gaps.is_empty():
|
||||||
deduped: pl.DataFrame # deduplicated dts
|
print(f'Gaps found:\n{gaps}')
|
||||||
(
|
|
||||||
df,
|
|
||||||
gaps,
|
|
||||||
deduped,
|
|
||||||
shortened,
|
|
||||||
) = dedupe(shm_df)
|
|
||||||
|
|
||||||
if shortened:
|
|
||||||
await client.write_ohlcv(
|
|
||||||
fqme,
|
|
||||||
ohlcv=deduped,
|
|
||||||
timeframe=period,
|
|
||||||
)
|
|
||||||
|
|
||||||
# TODO: something better with tab completion..
|
# TODO: something better with tab completion..
|
||||||
# is there something more minimal but nearly as
|
# is there something more minimal but nearly as
|
||||||
|
@ -340,7 +275,7 @@ def ldshm(
|
||||||
'''
|
'''
|
||||||
Linux ONLY: load any fqme file name matching shm buffer from
|
Linux ONLY: load any fqme file name matching shm buffer from
|
||||||
/dev/shm/ into an OHLCV numpy array and polars DataFrame,
|
/dev/shm/ into an OHLCV numpy array and polars DataFrame,
|
||||||
optionally write to offline storage via `.parquet` file.
|
optionally write to .parquet file.
|
||||||
|
|
||||||
'''
|
'''
|
||||||
async def main():
|
async def main():
|
||||||
|
@ -352,7 +287,7 @@ def ldshm(
|
||||||
),
|
),
|
||||||
):
|
):
|
||||||
df: pl.DataFrame | None = None
|
df: pl.DataFrame | None = None
|
||||||
for shmfile, shm, shm_df in iter_dfs_from_shms(fqme):
|
for shmfile, shm, src_df in iter_dfs_from_shms(fqme):
|
||||||
|
|
||||||
# compute ohlc properties for naming
|
# compute ohlc properties for naming
|
||||||
times: np.ndarray = shm.array['time']
|
times: np.ndarray = shm.array['time']
|
||||||
|
@ -362,16 +297,9 @@ def ldshm(
|
||||||
f'Something is wrong with time period for {shm}:\n{times}'
|
f'Something is wrong with time period for {shm}:\n{times}'
|
||||||
)
|
)
|
||||||
|
|
||||||
|
from piker.data import _timeseries as tsmod
|
||||||
# over-write back to shm?
|
df: pl.DataFrame = tsmod.with_dts(src_df)
|
||||||
df: pl.DataFrame # with dts
|
gaps: pl.DataFrame = tsmod.detect_time_gaps(df)
|
||||||
deduped: pl.DataFrame # deduplicated dts
|
|
||||||
(
|
|
||||||
df,
|
|
||||||
gaps,
|
|
||||||
deduped,
|
|
||||||
was_dded,
|
|
||||||
) = dedupe(shm_df)
|
|
||||||
|
|
||||||
# TODO: maybe only optionally enter this depending
|
# TODO: maybe only optionally enter this depending
|
||||||
# on some CLI flags and/or gap detection?
|
# on some CLI flags and/or gap detection?
|
||||||
|
|
|
@ -272,21 +272,9 @@ class NativeStorageClient:
|
||||||
# limit: int = int(200e3),
|
# limit: int = int(200e3),
|
||||||
|
|
||||||
) -> np.ndarray:
|
) -> np.ndarray:
|
||||||
path: Path = self.mk_path(
|
path: Path = self.mk_path(fqme, period=int(timeframe))
|
||||||
fqme,
|
|
||||||
period=int(timeframe),
|
|
||||||
)
|
|
||||||
df: pl.DataFrame = pl.read_parquet(path)
|
df: pl.DataFrame = pl.read_parquet(path)
|
||||||
|
self._dfs.setdefault(timeframe, {})[fqme] = df
|
||||||
# cache df for later usage since we (currently) need to
|
|
||||||
# convert to np.ndarrays to push to our `ShmArray` rt
|
|
||||||
# buffers subsys but later we may operate entirely on
|
|
||||||
# pyarrow arrays/buffers so keeping the dfs around for
|
|
||||||
# a variety of purposes is handy.
|
|
||||||
self._dfs.setdefault(
|
|
||||||
timeframe,
|
|
||||||
{},
|
|
||||||
)[fqme] = df
|
|
||||||
|
|
||||||
# TODO: filter by end and limit inputs
|
# TODO: filter by end and limit inputs
|
||||||
# times: pl.Series = df['time']
|
# times: pl.Series = df['time']
|
||||||
|
@ -341,7 +329,7 @@ class NativeStorageClient:
|
||||||
time.time() - start,
|
time.time() - start,
|
||||||
ndigits=6,
|
ndigits=6,
|
||||||
)
|
)
|
||||||
log.info(
|
print(
|
||||||
f'parquet write took {delay} secs\n'
|
f'parquet write took {delay} secs\n'
|
||||||
f'file path: {path}'
|
f'file path: {path}'
|
||||||
)
|
)
|
||||||
|
@ -351,7 +339,7 @@ class NativeStorageClient:
|
||||||
async def write_ohlcv(
|
async def write_ohlcv(
|
||||||
self,
|
self,
|
||||||
fqme: str,
|
fqme: str,
|
||||||
ohlcv: np.ndarray | pl.DataFrame,
|
ohlcv: np.ndarray,
|
||||||
timeframe: int,
|
timeframe: int,
|
||||||
|
|
||||||
) -> Path:
|
) -> Path:
|
||||||
|
|
Loading…
Reference in New Issue