Add `.sort()` support to `dedupe()`

distribute_dis
Tyler Goodlet 2023-12-26 17:35:38 -05:00
parent a86573b5a2
commit d9c574e291
2 changed files with 14 additions and 7 deletions

View File

@ -120,7 +120,7 @@ from ..storage import TimeseriesNotFound
if TYPE_CHECKING: if TYPE_CHECKING:
from bidict import bidict from bidict import bidict
from ..service.marketstore import StorageClient from ..service.marketstore import StorageClient
from .feed import _FeedsBus # from .feed import _FeedsBus
# `ShmArray` buffer sizing configuration: # `ShmArray` buffer sizing configuration:
@ -1352,9 +1352,7 @@ def iter_dfs_from_shms(
readonly=True, readonly=True,
) )
assert not opened assert not opened
ohlcv = shm.array ohlcv: np.ndarray = shm.array
from ._anal import np2pl
df: pl.DataFrame = np2pl(ohlcv) df: pl.DataFrame = np2pl(ohlcv)
yield ( yield (

View File

@ -620,7 +620,11 @@ def detect_price_gaps(
... ...
def dedupe(src_df: pl.DataFrame) -> tuple[ def dedupe(
src_df: pl.DataFrame,
sort: bool = True,
) -> tuple[
pl.DataFrame, # with dts pl.DataFrame, # with dts
pl.DataFrame, # gaps pl.DataFrame, # gaps
pl.DataFrame, # with deduplicated dts (aka gap/repeat removal) pl.DataFrame, # with deduplicated dts (aka gap/repeat removal)
@ -634,6 +638,8 @@ def dedupe(src_df: pl.DataFrame) -> tuple[
''' '''
df: pl.DataFrame = with_dts(src_df) df: pl.DataFrame = with_dts(src_df)
# TODO: enable passing existing `with_dts` df for speedup?
gaps: pl.DataFrame = detect_time_gaps(df) gaps: pl.DataFrame = detect_time_gaps(df)
# if no gaps detected just return carbon copies # if no gaps detected just return carbon copies
@ -651,8 +657,10 @@ def dedupe(src_df: pl.DataFrame) -> tuple[
subset=['dt'], subset=['dt'],
maintain_order=True, maintain_order=True,
) )
if sort:
deduped = deduped.sort(by='time')
deduped_gaps = detect_time_gaps(deduped) deduped_gaps: pl.DataFrame = detect_time_gaps(deduped)
diff: int = ( diff: int = (
df.height df.height
@ -660,7 +668,8 @@ def dedupe(src_df: pl.DataFrame) -> tuple[
deduped.height deduped.height
) )
log.warning( log.warning(
f'Gaps found:\n{gaps}\n' f'TIME GAPs FOUND:\n'
# f'{gaps}\n'
f'deduped Gaps found:\n{deduped_gaps}' f'deduped Gaps found:\n{deduped_gaps}'
) )
return ( return (