diff --git a/piker/tsp/__init__.py b/piker/tsp/__init__.py index 324f8229..417b5f30 100644 --- a/piker/tsp/__init__.py +++ b/piker/tsp/__init__.py @@ -120,7 +120,7 @@ from ..storage import TimeseriesNotFound if TYPE_CHECKING: from bidict import bidict from ..service.marketstore import StorageClient - from .feed import _FeedsBus + # from .feed import _FeedsBus # `ShmArray` buffer sizing configuration: @@ -1352,9 +1352,7 @@ def iter_dfs_from_shms( readonly=True, ) assert not opened - ohlcv = shm.array - - from ._anal import np2pl + ohlcv: np.ndarray = shm.array df: pl.DataFrame = np2pl(ohlcv) yield ( diff --git a/piker/tsp/_anal.py b/piker/tsp/_anal.py index c8044bef..d99607c7 100644 --- a/piker/tsp/_anal.py +++ b/piker/tsp/_anal.py @@ -620,7 +620,11 @@ def detect_price_gaps( ... -def dedupe(src_df: pl.DataFrame) -> tuple[ +def dedupe( + src_df: pl.DataFrame, + sort: bool = True, + +) -> tuple[ pl.DataFrame, # with dts pl.DataFrame, # gaps pl.DataFrame, # with deduplicated dts (aka gap/repeat removal) @@ -634,6 +638,8 @@ def dedupe(src_df: pl.DataFrame) -> tuple[ ''' df: pl.DataFrame = with_dts(src_df) + + # TODO: enable passing existing `with_dts` df for speedup? gaps: pl.DataFrame = detect_time_gaps(df) # if no gaps detected just return carbon copies @@ -651,8 +657,10 @@ def dedupe(src_df: pl.DataFrame) -> tuple[ subset=['dt'], maintain_order=True, ) + if sort: + deduped = deduped.sort(by='time') - deduped_gaps = detect_time_gaps(deduped) + deduped_gaps: pl.DataFrame = detect_time_gaps(deduped) diff: int = ( df.height @@ -660,7 +668,8 @@ def dedupe(src_df: pl.DataFrame) -> tuple[ deduped.height ) log.warning( - f'Gaps found:\n{gaps}\n' + f'TIME GAPs FOUND:\n' + # f'{gaps}\n' f'deduped Gaps found:\n{deduped_gaps}' ) return (