forked from goodboy/tractor
Don't require runtime (for now), type annot fixing
parent
971ac50756
commit
ecea1e1658
|
@ -29,7 +29,7 @@ from typing import Optional
|
||||||
from multiprocessing.shared_memory import (
|
from multiprocessing.shared_memory import (
|
||||||
SharedMemory,
|
SharedMemory,
|
||||||
ShareableList,
|
ShareableList,
|
||||||
_USE_POSIX,
|
_USE_POSIX, # type: ignore
|
||||||
)
|
)
|
||||||
|
|
||||||
if _USE_POSIX:
|
if _USE_POSIX:
|
||||||
|
@ -46,15 +46,6 @@ from .log import get_logger
|
||||||
log = get_logger(__name__)
|
log = get_logger(__name__)
|
||||||
|
|
||||||
|
|
||||||
# how much is probably dependent on lifestyle
|
|
||||||
_secs_in_day = int(60 * 60 * 24)
|
|
||||||
# we try for a buncha times, but only on a run-every-other-day kinda week.
|
|
||||||
_days_worth = 16
|
|
||||||
_default_size = _days_worth * _secs_in_day
|
|
||||||
# where to start the new data append index
|
|
||||||
_rt_buffer_start = int((_days_worth - 1) * _secs_in_day)
|
|
||||||
|
|
||||||
|
|
||||||
def disable_mantracker():
|
def disable_mantracker():
|
||||||
'''
|
'''
|
||||||
Disable all ``multiprocessing``` "resource tracking" machinery since
|
Disable all ``multiprocessing``` "resource tracking" machinery since
|
||||||
|
@ -79,7 +70,6 @@ def disable_mantracker():
|
||||||
mantracker._resource_tracker = ManTracker()
|
mantracker._resource_tracker = ManTracker()
|
||||||
mantracker.register = mantracker._resource_tracker.register
|
mantracker.register = mantracker._resource_tracker.register
|
||||||
mantracker.ensure_running = mantracker._resource_tracker.ensure_running
|
mantracker.ensure_running = mantracker._resource_tracker.ensure_running
|
||||||
# ensure_running = mantracker._resource_tracker.ensure_running
|
|
||||||
mantracker.unregister = mantracker._resource_tracker.unregister
|
mantracker.unregister = mantracker._resource_tracker.unregister
|
||||||
mantracker.getfd = mantracker._resource_tracker.getfd
|
mantracker.getfd = mantracker._resource_tracker.getfd
|
||||||
|
|
||||||
|
@ -134,9 +124,14 @@ class _NpToken(Struct, frozen=True):
|
||||||
dtype_descr: tuple
|
dtype_descr: tuple
|
||||||
size: int # in struct-array index / row terms
|
size: int # in struct-array index / row terms
|
||||||
|
|
||||||
|
# TODO: use nptyping here on dtypes
|
||||||
@property
|
@property
|
||||||
def dtype(self) -> np.dtype:
|
def dtype(self) -> list[tuple[str, str, tuple[int, ...]]]:
|
||||||
return np.dtype(list(map(tuple, self.dtype_descr))).descr
|
return np.dtype(
|
||||||
|
list(
|
||||||
|
map(tuple, self.dtype_descr)
|
||||||
|
)
|
||||||
|
).descr
|
||||||
|
|
||||||
def as_msg(self):
|
def as_msg(self):
|
||||||
return self.to_dict()
|
return self.to_dict()
|
||||||
|
@ -161,10 +156,10 @@ class _NpToken(Struct, frozen=True):
|
||||||
# _known_tokens = trio.RunVar('shms', {})
|
# _known_tokens = trio.RunVar('shms', {})
|
||||||
|
|
||||||
# process-local store of keys to tokens
|
# process-local store of keys to tokens
|
||||||
_known_tokens = {}
|
_known_tokens: dict[str, _NpToken] = {}
|
||||||
|
|
||||||
|
|
||||||
def get_shm_token(key: str) -> _NpToken | str:
|
def get_shm_token(key: str) -> _NpToken | None:
|
||||||
'''
|
'''
|
||||||
Convenience func to check if a token
|
Convenience func to check if a token
|
||||||
for the provided key is known by this process.
|
for the provided key is known by this process.
|
||||||
|
@ -228,11 +223,10 @@ class ShmArray:
|
||||||
self._post_init: bool = False
|
self._post_init: bool = False
|
||||||
|
|
||||||
# pushing data does not write the index (aka primary key)
|
# pushing data does not write the index (aka primary key)
|
||||||
|
self._write_fields: list[str] | None = None
|
||||||
dtype = shmarr.dtype
|
dtype = shmarr.dtype
|
||||||
if dtype.fields:
|
if dtype.fields:
|
||||||
self._write_fields = list(shmarr.dtype.fields.keys())[1:]
|
self._write_fields = list(shmarr.dtype.fields.keys())[1:]
|
||||||
else:
|
|
||||||
self._write_fields = None
|
|
||||||
|
|
||||||
# TODO: ringbuf api?
|
# TODO: ringbuf api?
|
||||||
|
|
||||||
|
@ -283,9 +277,9 @@ class ShmArray:
|
||||||
self,
|
self,
|
||||||
fields: Optional[list[str]] = None,
|
fields: Optional[list[str]] = None,
|
||||||
|
|
||||||
# type that all field values will be cast to
|
# type that all field values will be cast to in the returned
|
||||||
# in the returned view.
|
# view.
|
||||||
common_dtype: np.dtype = np.float,
|
common_dtype: np.dtype = np.float64, # type: ignore
|
||||||
|
|
||||||
) -> np.ndarray:
|
) -> np.ndarray:
|
||||||
|
|
||||||
|
@ -543,7 +537,6 @@ def open_shm_ndarray(
|
||||||
|
|
||||||
# "unlink" created shm on process teardown by
|
# "unlink" created shm on process teardown by
|
||||||
# pushing teardown calls onto actor context stack
|
# pushing teardown calls onto actor context stack
|
||||||
|
|
||||||
stack = tractor.current_actor().lifetime_stack
|
stack = tractor.current_actor().lifetime_stack
|
||||||
stack.callback(shmarr.close)
|
stack.callback(shmarr.close)
|
||||||
stack.callback(shmarr.destroy)
|
stack.callback(shmarr.destroy)
|
||||||
|
@ -769,14 +762,19 @@ def open_shm_list(
|
||||||
)
|
)
|
||||||
|
|
||||||
# "close" attached shm on actor teardown
|
# "close" attached shm on actor teardown
|
||||||
tractor.current_actor().lifetime_stack.callback(shml.shm.close)
|
try:
|
||||||
tractor.current_actor().lifetime_stack.callback(shml.shm.unlink)
|
actor = tractor.current_actor()
|
||||||
|
actor.lifetime_stack.callback(shml.shm.close)
|
||||||
|
actor.lifetime_stack.callback(shml.shm.unlink)
|
||||||
|
except RuntimeError:
|
||||||
|
log.warning('tractor runtime not active, skipping teardown steps')
|
||||||
|
|
||||||
return shml
|
return shml
|
||||||
|
|
||||||
|
|
||||||
def attach_shm_list(
|
def attach_shm_list(
|
||||||
key: str,
|
key: str,
|
||||||
|
|
||||||
) -> ShmList:
|
) -> ShmList:
|
||||||
|
|
||||||
return ShmList(name=key)
|
return ShmList(name=key)
|
||||||
|
|
Loading…
Reference in New Issue