Define shm buffer sizing in `.data.history`

Also adjust sizing such that the history buffer will backfill the last
six years by default (in 1m OHLC) and the hft buffer will do only 3 days
worth. Also ensure the fsp layer passes the src shm's buffer size when
allocating since the size is now required by allocators in the shm apis.
basic_buy_bot
Tyler Goodlet 2023-06-15 13:04:21 -04:00
parent 33ec27715b
commit 9eeea51165
3 changed files with 36 additions and 5 deletions

View File

@ -49,7 +49,6 @@ from ._util import (
from ._sharedmem import ( from ._sharedmem import (
maybe_open_shm_array, maybe_open_shm_array,
ShmArray, ShmArray,
_secs_in_day,
) )
from ._source import def_iohlcv_fields from ._source import def_iohlcv_fields
from ._sampling import ( from ._sampling import (
@ -65,6 +64,26 @@ if TYPE_CHECKING:
from .feed import _FeedsBus from .feed import _FeedsBus
# `ShmArray` buffer sizing configuration:
_mins_in_day = int(60 * 24)
# how much is probably dependent on lifestyle
# but we reco a buncha times (but only on a
# run-every-other-day kinda week).
_secs_in_day = int(60 * _mins_in_day)
_days_in_week: int = 7
_days_worth: int = 3
_default_hist_size: int = 6 * 365 * _mins_in_day
_hist_buffer_start = int(
_default_hist_size - round(7 * _mins_in_day)
)
_default_rt_size: int = _days_worth * _secs_in_day
# NOTE: start the append index in rt buffer such that 1 day's worth
# can be appenened before overrun.
_rt_buffer_start = int((_days_worth - 1) * _secs_in_day)
def diff_history( def diff_history(
array: np.ndarray, array: np.ndarray,
append_until_dt: datetime | None = None, append_until_dt: datetime | None = None,
@ -812,6 +831,9 @@ async def manage_history(
# (maybe) allocate shm array for this broker/symbol which will # (maybe) allocate shm array for this broker/symbol which will
# be used for fast near-term history capture and processing. # be used for fast near-term history capture and processing.
hist_shm, opened = maybe_open_shm_array( hist_shm, opened = maybe_open_shm_array(
size=_default_hist_size,
append_start_index=_hist_buffer_start,
key=f'piker.{service}[{uuid[:16]}].{fqme}.hist', key=f'piker.{service}[{uuid[:16]}].{fqme}.hist',
# use any broker defined ohlc dtype: # use any broker defined ohlc dtype:
@ -829,6 +851,8 @@ async def manage_history(
) )
rt_shm, opened = maybe_open_shm_array( rt_shm, opened = maybe_open_shm_array(
size=_default_rt_size,
append_start_index=_rt_buffer_start,
key=f'piker.{service}[{uuid[:16]}].{fqme}.rt', key=f'piker.{service}[{uuid[:16]}].{fqme}.rt',
# use any broker defined ohlc dtype: # use any broker defined ohlc dtype:
@ -836,7 +860,6 @@ async def manage_history(
# we expect the sub-actor to write # we expect the sub-actor to write
readonly=False, readonly=False,
size=3*_secs_in_day,
) )
# (for now) set the rt (hft) shm array with space to prepend # (for now) set the rt (hft) shm array with space to prepend

View File

@ -177,6 +177,7 @@ def fsp(
def maybe_mk_fsp_shm( def maybe_mk_fsp_shm(
sym: str, sym: str,
target: Fsp, target: Fsp,
size: int,
readonly: bool = True, readonly: bool = True,
) -> (str, ShmArray, bool): ) -> (str, ShmArray, bool):
@ -185,7 +186,8 @@ def maybe_mk_fsp_shm(
exists, otherwise load the shm already existing for that token. exists, otherwise load the shm already existing for that token.
''' '''
assert isinstance(sym, str), '`sym` should be file-name-friendly `str`' if not isinstance(sym, str):
raise ValueError('`sym: str` should be file-name-friendly')
# TODO: load output types from `Fsp` # TODO: load output types from `Fsp`
# - should `index` be a required internal field? # - should `index` be a required internal field?
@ -204,7 +206,7 @@ def maybe_mk_fsp_shm(
shm, opened = maybe_open_shm_array( shm, opened = maybe_open_shm_array(
key, key,
# TODO: create entry for each time frame size=size,
dtype=fsp_dtype, dtype=fsp_dtype,
readonly=True, readonly=True,
) )

View File

@ -377,7 +377,7 @@ class FspAdmin:
# TODO: make this a `.src_flume` and add # TODO: make this a `.src_flume` and add
# a `dst_flume`? # a `dst_flume`?
# (=> but then wouldn't this be the most basic `Viz`?) # (=> but then wouldn't this be the most basic `Viz`?)
self.flume = flume self.flume: Flume = flume
def rr_next_portal(self) -> tractor.Portal: def rr_next_portal(self) -> tractor.Portal:
name, portal = next(self._rr_next_actor) name, portal = next(self._rr_next_actor)
@ -479,9 +479,15 @@ class FspAdmin:
fqme: str = src_mkt.get_fqme(delim_char='') fqme: str = src_mkt.get_fqme(delim_char='')
# allocate an output shm array # allocate an output shm array
# NOTE: rn we assume the HFT 1s period chart
# is always used!
src_shm: ShmArray = self.flume._rt_shm
key, dst_shm, opened = maybe_mk_fsp_shm( key, dst_shm, opened = maybe_mk_fsp_shm(
fqme, fqme,
target=target, target=target,
size=src_shm._token.size,
readonly=True, readonly=True,
) )