diff --git a/piker/data/history.py b/piker/data/history.py index d6f5fb5f..99fd425d 100644 --- a/piker/data/history.py +++ b/piker/data/history.py @@ -49,7 +49,6 @@ from ._util import ( from ._sharedmem import ( maybe_open_shm_array, ShmArray, - _secs_in_day, ) from ._source import def_iohlcv_fields from ._sampling import ( @@ -65,6 +64,26 @@ if TYPE_CHECKING: from .feed import _FeedsBus +# `ShmArray` buffer sizing configuration: +_mins_in_day = int(60 * 24) +# how much is probably dependent on lifestyle +# but we reco a buncha times (but only on a +# run-every-other-day kinda week). +_secs_in_day = int(60 * _mins_in_day) +_days_in_week: int = 7 + +_days_worth: int = 3 +_default_hist_size: int = 6 * 365 * _mins_in_day +_hist_buffer_start = int( + _default_hist_size - round(7 * _mins_in_day) +) + +_default_rt_size: int = _days_worth * _secs_in_day +# NOTE: start the append index in rt buffer such that 1 day's worth +# can be appenened before overrun. +_rt_buffer_start = int((_days_worth - 1) * _secs_in_day) + + def diff_history( array: np.ndarray, append_until_dt: datetime | None = None, @@ -812,6 +831,9 @@ async def manage_history( # (maybe) allocate shm array for this broker/symbol which will # be used for fast near-term history capture and processing. hist_shm, opened = maybe_open_shm_array( + size=_default_hist_size, + append_start_index=_hist_buffer_start, + key=f'piker.{service}[{uuid[:16]}].{fqme}.hist', # use any broker defined ohlc dtype: @@ -829,6 +851,8 @@ async def manage_history( ) rt_shm, opened = maybe_open_shm_array( + size=_default_rt_size, + append_start_index=_rt_buffer_start, key=f'piker.{service}[{uuid[:16]}].{fqme}.rt', # use any broker defined ohlc dtype: @@ -836,7 +860,6 @@ async def manage_history( # we expect the sub-actor to write readonly=False, - size=3*_secs_in_day, ) # (for now) set the rt (hft) shm array with space to prepend diff --git a/piker/fsp/_api.py b/piker/fsp/_api.py index 11d1e7dc..92f8f271 100644 --- a/piker/fsp/_api.py +++ b/piker/fsp/_api.py @@ -177,6 +177,7 @@ def fsp( def maybe_mk_fsp_shm( sym: str, target: Fsp, + size: int, readonly: bool = True, ) -> (str, ShmArray, bool): @@ -185,7 +186,8 @@ def maybe_mk_fsp_shm( exists, otherwise load the shm already existing for that token. ''' - assert isinstance(sym, str), '`sym` should be file-name-friendly `str`' + if not isinstance(sym, str): + raise ValueError('`sym: str` should be file-name-friendly') # TODO: load output types from `Fsp` # - should `index` be a required internal field? @@ -204,7 +206,7 @@ def maybe_mk_fsp_shm( shm, opened = maybe_open_shm_array( key, - # TODO: create entry for each time frame + size=size, dtype=fsp_dtype, readonly=True, ) diff --git a/piker/ui/_fsp.py b/piker/ui/_fsp.py index b4aa2b10..f00b1e3d 100644 --- a/piker/ui/_fsp.py +++ b/piker/ui/_fsp.py @@ -377,7 +377,7 @@ class FspAdmin: # TODO: make this a `.src_flume` and add # a `dst_flume`? # (=> but then wouldn't this be the most basic `Viz`?) - self.flume = flume + self.flume: Flume = flume def rr_next_portal(self) -> tractor.Portal: name, portal = next(self._rr_next_actor) @@ -479,9 +479,15 @@ class FspAdmin: fqme: str = src_mkt.get_fqme(delim_char='') # allocate an output shm array + + # NOTE: rn we assume the HFT 1s period chart + # is always used! + src_shm: ShmArray = self.flume._rt_shm + key, dst_shm, opened = maybe_mk_fsp_shm( fqme, target=target, + size=src_shm._token.size, readonly=True, )