Don't require runtime (for now), type annot fixing

shm_apis
Tyler Goodlet 2022-10-17 15:13:58 -04:00
parent 339d787cf8
commit edb82fdd78
1 changed files with 21 additions and 23 deletions

View File

@ -29,7 +29,7 @@ from typing import Optional
from multiprocessing.shared_memory import (
SharedMemory,
ShareableList,
_USE_POSIX,
_USE_POSIX, # type: ignore
)
if _USE_POSIX:
@ -46,15 +46,6 @@ from .log import get_logger
log = get_logger(__name__)
# how much is probably dependent on lifestyle
_secs_in_day = int(60 * 60 * 24)
# we try for a buncha times, but only on a run-every-other-day kinda week.
_days_worth = 16
_default_size = _days_worth * _secs_in_day
# where to start the new data append index
_rt_buffer_start = int((_days_worth - 1) * _secs_in_day)
def disable_mantracker():
'''
Disable all ``multiprocessing``` "resource tracking" machinery since
@ -79,7 +70,6 @@ def disable_mantracker():
mantracker._resource_tracker = ManTracker()
mantracker.register = mantracker._resource_tracker.register
mantracker.ensure_running = mantracker._resource_tracker.ensure_running
# ensure_running = mantracker._resource_tracker.ensure_running
mantracker.unregister = mantracker._resource_tracker.unregister
mantracker.getfd = mantracker._resource_tracker.getfd
@ -134,9 +124,14 @@ class _NpToken(Struct, frozen=True):
dtype_descr: tuple
size: int # in struct-array index / row terms
# TODO: use nptyping here on dtypes
@property
def dtype(self) -> np.dtype:
return np.dtype(list(map(tuple, self.dtype_descr))).descr
def dtype(self) -> list[tuple[str, str, tuple[int, ...]]]:
return np.dtype(
list(
map(tuple, self.dtype_descr)
)
).descr
def as_msg(self):
return self.to_dict()
@ -161,10 +156,10 @@ class _NpToken(Struct, frozen=True):
# _known_tokens = trio.RunVar('shms', {})
# process-local store of keys to tokens
_known_tokens = {}
_known_tokens: dict[str, _NpToken] = {}
def get_shm_token(key: str) -> _NpToken | str:
def get_shm_token(key: str) -> _NpToken | None:
'''
Convenience func to check if a token
for the provided key is known by this process.
@ -228,11 +223,10 @@ class ShmArray:
self._post_init: bool = False
# pushing data does not write the index (aka primary key)
self._write_fields: list[str] | None = None
dtype = shmarr.dtype
if dtype.fields:
self._write_fields = list(shmarr.dtype.fields.keys())[1:]
else:
self._write_fields = None
# TODO: ringbuf api?
@ -283,9 +277,9 @@ class ShmArray:
self,
fields: Optional[list[str]] = None,
# type that all field values will be cast to
# in the returned view.
common_dtype: np.dtype = np.float,
# type that all field values will be cast to in the returned
# view.
common_dtype: np.dtype = np.float64, # type: ignore
) -> np.ndarray:
@ -543,7 +537,6 @@ def open_shm_ndarray(
# "unlink" created shm on process teardown by
# pushing teardown calls onto actor context stack
stack = tractor.current_actor().lifetime_stack
stack.callback(shmarr.close)
stack.callback(shmarr.destroy)
@ -769,14 +762,19 @@ def open_shm_list(
)
# "close" attached shm on actor teardown
tractor.current_actor().lifetime_stack.callback(shml.shm.close)
tractor.current_actor().lifetime_stack.callback(shml.shm.unlink)
try:
actor = tractor.current_actor()
actor.lifetime_stack.callback(shml.shm.close)
actor.lifetime_stack.callback(shml.shm.unlink)
except RuntimeError:
log.warning('tractor runtime not active, skipping teardown steps')
return shml
def attach_shm_list(
key: str,
) -> ShmList:
return ShmList(name=key)