From ecea1e16588e451586c1092b8338ea45d207f99b Mon Sep 17 00:00:00 2001 From: Tyler Goodlet Date: Mon, 17 Oct 2022 15:13:58 -0400 Subject: [PATCH] Don't require runtime (for now), type annot fixing --- tractor/_shm.py | 44 +++++++++++++++++++++----------------------- 1 file changed, 21 insertions(+), 23 deletions(-) diff --git a/tractor/_shm.py b/tractor/_shm.py index 63d1841..80ca49d 100644 --- a/tractor/_shm.py +++ b/tractor/_shm.py @@ -29,7 +29,7 @@ from typing import Optional from multiprocessing.shared_memory import ( SharedMemory, ShareableList, - _USE_POSIX, + _USE_POSIX, # type: ignore ) if _USE_POSIX: @@ -46,15 +46,6 @@ from .log import get_logger log = get_logger(__name__) -# how much is probably dependent on lifestyle -_secs_in_day = int(60 * 60 * 24) -# we try for a buncha times, but only on a run-every-other-day kinda week. -_days_worth = 16 -_default_size = _days_worth * _secs_in_day -# where to start the new data append index -_rt_buffer_start = int((_days_worth - 1) * _secs_in_day) - - def disable_mantracker(): ''' Disable all ``multiprocessing``` "resource tracking" machinery since @@ -79,7 +70,6 @@ def disable_mantracker(): mantracker._resource_tracker = ManTracker() mantracker.register = mantracker._resource_tracker.register mantracker.ensure_running = mantracker._resource_tracker.ensure_running - # ensure_running = mantracker._resource_tracker.ensure_running mantracker.unregister = mantracker._resource_tracker.unregister mantracker.getfd = mantracker._resource_tracker.getfd @@ -134,9 +124,14 @@ class _NpToken(Struct, frozen=True): dtype_descr: tuple size: int # in struct-array index / row terms + # TODO: use nptyping here on dtypes @property - def dtype(self) -> np.dtype: - return np.dtype(list(map(tuple, self.dtype_descr))).descr + def dtype(self) -> list[tuple[str, str, tuple[int, ...]]]: + return np.dtype( + list( + map(tuple, self.dtype_descr) + ) + ).descr def as_msg(self): return self.to_dict() @@ -161,10 +156,10 @@ class _NpToken(Struct, frozen=True): # _known_tokens = trio.RunVar('shms', {}) # process-local store of keys to tokens -_known_tokens = {} +_known_tokens: dict[str, _NpToken] = {} -def get_shm_token(key: str) -> _NpToken | str: +def get_shm_token(key: str) -> _NpToken | None: ''' Convenience func to check if a token for the provided key is known by this process. @@ -228,11 +223,10 @@ class ShmArray: self._post_init: bool = False # pushing data does not write the index (aka primary key) + self._write_fields: list[str] | None = None dtype = shmarr.dtype if dtype.fields: self._write_fields = list(shmarr.dtype.fields.keys())[1:] - else: - self._write_fields = None # TODO: ringbuf api? @@ -283,9 +277,9 @@ class ShmArray: self, fields: Optional[list[str]] = None, - # type that all field values will be cast to - # in the returned view. - common_dtype: np.dtype = np.float, + # type that all field values will be cast to in the returned + # view. + common_dtype: np.dtype = np.float64, # type: ignore ) -> np.ndarray: @@ -543,7 +537,6 @@ def open_shm_ndarray( # "unlink" created shm on process teardown by # pushing teardown calls onto actor context stack - stack = tractor.current_actor().lifetime_stack stack.callback(shmarr.close) stack.callback(shmarr.destroy) @@ -769,14 +762,19 @@ def open_shm_list( ) # "close" attached shm on actor teardown - tractor.current_actor().lifetime_stack.callback(shml.shm.close) - tractor.current_actor().lifetime_stack.callback(shml.shm.unlink) + try: + actor = tractor.current_actor() + actor.lifetime_stack.callback(shml.shm.close) + actor.lifetime_stack.callback(shml.shm.unlink) + except RuntimeError: + log.warning('tractor runtime not active, skipping teardown steps') return shml def attach_shm_list( key: str, + ) -> ShmList: return ShmList(name=key)