From af64152640fcd4f69879c6469e83d8924c380646 Mon Sep 17 00:00:00 2001 From: Tyler Goodlet Date: Wed, 31 May 2023 17:56:32 -0400 Subject: [PATCH] .data.history: update to new naming -> `._source.def_iohlcv_fields` -> `.storage.StorageClient` --- piker/data/history.py | 16 +++++++++------- 1 file changed, 9 insertions(+), 7 deletions(-) diff --git a/piker/data/history.py b/piker/data/history.py index a29d2ab9..4a0ab29b 100644 --- a/piker/data/history.py +++ b/piker/data/history.py @@ -48,7 +48,7 @@ from ._sharedmem import ( ShmArray, _secs_in_day, ) -from ._source import base_iohlc_dtype +from ._source import def_iohlcv_fields from ._sampling import ( open_sample_stream, ) @@ -57,7 +57,7 @@ from ..brokers._util import ( ) if TYPE_CHECKING: - from ..service.marketstore import Storage + from ..service.marketstore import StorageClient from .feed import _FeedsBus @@ -87,7 +87,7 @@ async def start_backfill( feed_is_live: trio.Event, last_tsdb_dt: datetime | None = None, - storage: Storage | None = None, + storage: StorageClient | None = None, write_tsdb: bool = True, tsdb_is_up: bool = False, @@ -177,6 +177,7 @@ async def start_backfill( # based on the sample step size, maybe load a certain amount history if last_tsdb_dt is None: + if step_size_s not in (1, 60): raise ValueError( '`piker` only needs to support 1m and 1s sampling ' @@ -388,7 +389,7 @@ async def tsdb_backfill( mod: ModuleType, storemod: ModuleType, bus: _FeedsBus, - storage: Storage, + storage: StorageClient, mkt: MktPair, shms: dict[int, ShmArray], sampler_stream: tractor.MsgStream, @@ -406,6 +407,7 @@ async def tsdb_backfill( fqme: str = mkt.fqme # start history anal and load missing new data via backend. + timeframe: int for timeframe, shm in shms.items(): # loads a (large) frame of data from the tsdb depending # on the db's query size limit. @@ -527,7 +529,7 @@ async def tsdb_backfill( len(tsdb_history) ): # load the first (smaller) bit of history originally loaded - # above from ``Storage.load()``. + # above from ``StorageClient.load()``. to_push = tsdb_history[-prepend_start:] shm.push( to_push, @@ -645,7 +647,7 @@ async def manage_history( key=f'piker.{service}[{uuid[:16]}].{fqme}.hist', # use any broker defined ohlc dtype: - dtype=getattr(mod, '_ohlc_dtype', base_iohlc_dtype), + dtype=getattr(mod, '_ohlc_dtype', def_iohlcv_fields), # we expect the sub-actor to write readonly=False, @@ -662,7 +664,7 @@ async def manage_history( key=f'piker.{service}[{uuid[:16]}].{fqme}.rt', # use any broker defined ohlc dtype: - dtype=getattr(mod, '_ohlc_dtype', base_iohlc_dtype), + dtype=getattr(mod, '_ohlc_dtype', def_iohlcv_fields), # we expect the sub-actor to write readonly=False,