From 33ec27715b5ef13503156eb7e879d29936537047 Mon Sep 17 00:00:00 2001 From: Tyler Goodlet Date: Thu, 15 Jun 2023 12:59:50 -0400 Subject: [PATCH] Sync shm mod with dev version in `tractor`, drop buffer sizing vars, require `size: int` to all allocators --- piker/data/_sharedmem.py | 56 +++++++++++++++++++++++----------------- 1 file changed, 32 insertions(+), 24 deletions(-) diff --git a/piker/data/_sharedmem.py b/piker/data/_sharedmem.py index 3366621b..78f66f63 100644 --- a/piker/data/_sharedmem.py +++ b/piker/data/_sharedmem.py @@ -37,15 +37,6 @@ from ._source import def_iohlcv_fields from .types import Struct -# how much is probably dependent on lifestyle -_secs_in_day = int(60 * 60 * 24) -# we try for a buncha times, but only on a run-every-other-day kinda week. -_days_worth = 16 -_default_size = _days_worth * _secs_in_day -# where to start the new data append index -_rt_buffer_start = int((_days_worth - 1) * _secs_in_day) - - def cuckoff_mantracker(): ''' Disable all ``multiprocessing``` "resource tracking" machinery since @@ -70,7 +61,6 @@ def cuckoff_mantracker(): mantracker._resource_tracker = ManTracker() mantracker.register = mantracker._resource_tracker.register mantracker.ensure_running = mantracker._resource_tracker.ensure_running - # ensure_running = mantracker._resource_tracker.ensure_running mantracker.unregister = mantracker._resource_tracker.unregister mantracker.getfd = mantracker._resource_tracker.getfd @@ -442,10 +432,10 @@ class ShmArray: def open_shm_array( - - key: Optional[str] = None, - size: int = _default_size, # see above - dtype: Optional[np.dtype] = None, + size: int, + key: str | None = None, + dtype: np.dtype | None = None, + append_start_index: int | None = None, readonly: bool = False, ) -> ShmArray: @@ -510,10 +500,13 @@ def open_shm_array( # ``ShmArray._start.value: int = 0`` and the yet-to-be written # real-time section will start at ``ShmArray.index: int``. - # this sets the index to 3/4 of the length of the buffer - # leaving a "days worth of second samples" for the real-time - # section. - last.value = first.value = _rt_buffer_start + # this sets the index to nearly 2/3rds into the the length of + # the buffer leaving at least a "days worth of second samples" + # for the real-time section. + if append_start_index is None: + append_start_index = round(size * 0.616) + + last.value = first.value = append_start_index shmarr = ShmArray( array, @@ -527,7 +520,6 @@ def open_shm_array( # "unlink" created shm on process teardown by # pushing teardown calls onto actor context stack - stack = tractor.current_actor().lifetime_stack stack.callback(shmarr.close) stack.callback(shmarr.destroy) @@ -622,7 +614,10 @@ def attach_shm_array( def maybe_open_shm_array( key: str, - dtype: Optional[np.dtype] = None, + size: int, + dtype: np.dtype | None = None, + append_start_index: int | None = None, + readonly: bool = False, **kwargs, ) -> tuple[ShmArray, bool]: @@ -643,11 +638,16 @@ def maybe_open_shm_array( use ``attach_shm_array``. ''' - size = kwargs.pop('size', _default_size) try: # see if we already know this key token = _known_tokens[key] - return attach_shm_array(token=token, **kwargs), False + return ( + attach_shm_array( + token=token, + readonly=readonly, + ), + False, + ) except KeyError: log.debug(f"Could not find {key} in shms cache") if dtype: @@ -666,8 +666,16 @@ def maybe_open_shm_array( # Attempt to open a block and expect # to fail if a block has been allocated # on the OS by someone else. - return open_shm_array(key=key, dtype=dtype, **kwargs), True - + return ( + open_shm_array( + key=key, + size=size, + dtype=dtype, + append_start_index=append_start_index, + readonly=readonly, + ), + True, + ) def try_read( array: np.ndarray