Merge pull request #394 from pikers/size_in_shm_token

Store shm array size in token schema, use for loading
kraken_nameerr_fix
Guillermo Rodriguez 2022-08-29 15:15:49 -03:00 committed by GitHub
commit 739a231afc
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
1 changed files with 21 additions and 7 deletions

View File

@ -1,5 +1,5 @@
# piker: trading gear for hackers # piker: trading gear for hackers
# Copyright (C) Tyler Goodlet (in stewardship for piker0) # Copyright (C) Tyler Goodlet (in stewardship for pikers)
# This program is free software: you can redistribute it and/or modify # This program is free software: you can redistribute it and/or modify
# it under the terms of the GNU Affero General Public License as published by # it under the terms of the GNU Affero General Public License as published by
@ -50,7 +50,11 @@ _rt_buffer_start = int((_days_worth - 1) * _secs_in_day)
def cuckoff_mantracker(): def cuckoff_mantracker():
'''
Disable all ``multiprocessing``` "resource tracking" machinery since
it's an absolute multi-threaded mess of non-SC madness.
'''
from multiprocessing import resource_tracker as mantracker from multiprocessing import resource_tracker as mantracker
# Tell the "resource tracker" thing to fuck off. # Tell the "resource tracker" thing to fuck off.
@ -118,6 +122,7 @@ class _Token(Struct, frozen=True):
shm_first_index_name: str shm_first_index_name: str
shm_last_index_name: str shm_last_index_name: str
dtype_descr: tuple dtype_descr: tuple
size: int # in struct-array index / row terms
@property @property
def dtype(self) -> np.dtype: def dtype(self) -> np.dtype:
@ -158,6 +163,7 @@ def get_shm_token(key: str) -> _Token:
def _make_token( def _make_token(
key: str, key: str,
size: int,
dtype: Optional[np.dtype] = None, dtype: Optional[np.dtype] = None,
) -> _Token: ) -> _Token:
''' '''
@ -170,7 +176,8 @@ def _make_token(
shm_name=key, shm_name=key,
shm_first_index_name=key + "_first", shm_first_index_name=key + "_first",
shm_last_index_name=key + "_last", shm_last_index_name=key + "_last",
dtype_descr=tuple(np.dtype(dtype).descr) dtype_descr=tuple(np.dtype(dtype).descr),
size=size,
) )
@ -222,6 +229,7 @@ class ShmArray:
shm_first_index_name=self._first._shm.name, shm_first_index_name=self._first._shm.name,
shm_last_index_name=self._last._shm.name, shm_last_index_name=self._last._shm.name,
dtype_descr=tuple(self._array.dtype.descr), dtype_descr=tuple(self._array.dtype.descr),
size=self._len,
) )
@property @property
@ -436,7 +444,7 @@ class ShmArray:
def open_shm_array( def open_shm_array(
key: Optional[str] = None, key: Optional[str] = None,
size: int = _default_size, size: int = _default_size, # see above
dtype: Optional[np.dtype] = None, dtype: Optional[np.dtype] = None,
readonly: bool = False, readonly: bool = False,
@ -467,7 +475,8 @@ def open_shm_array(
token = _make_token( token = _make_token(
key=key, key=key,
dtype=dtype size=size,
dtype=dtype,
) )
# create single entry arrays for storing an first and last indices # create single entry arrays for storing an first and last indices
@ -519,6 +528,7 @@ def open_shm_array(
# "unlink" created shm on process teardown by # "unlink" created shm on process teardown by
# pushing teardown calls onto actor context stack # pushing teardown calls onto actor context stack
# TODO: make this a public API in ``tractor``..
tractor._actor._lifetime_stack.callback(shmarr.close) tractor._actor._lifetime_stack.callback(shmarr.close)
tractor._actor._lifetime_stack.callback(shmarr.destroy) tractor._actor._lifetime_stack.callback(shmarr.destroy)
@ -527,7 +537,6 @@ def open_shm_array(
def attach_shm_array( def attach_shm_array(
token: tuple[str, str, tuple[str, str]], token: tuple[str, str, tuple[str, str]],
size: int = _default_size,
readonly: bool = True, readonly: bool = True,
) -> ShmArray: ) -> ShmArray:
@ -566,7 +575,7 @@ def attach_shm_array(
raise _err raise _err
shmarr = np.ndarray( shmarr = np.ndarray(
(size,), (token.size,),
dtype=token.dtype, dtype=token.dtype,
buffer=shm.buf buffer=shm.buf
) )
@ -634,6 +643,7 @@ def maybe_open_shm_array(
use ``attach_shm_array``. use ``attach_shm_array``.
''' '''
size = kwargs.pop('size', _default_size)
try: try:
# see if we already know this key # see if we already know this key
token = _known_tokens[key] token = _known_tokens[key]
@ -641,7 +651,11 @@ def maybe_open_shm_array(
except KeyError: except KeyError:
log.warning(f"Could not find {key} in shms cache") log.warning(f"Could not find {key} in shms cache")
if dtype: if dtype:
token = _make_token(key, dtype) token = _make_token(
key,
size=size,
dtype=dtype,
)
try: try:
return attach_shm_array(token=token, **kwargs), False return attach_shm_array(token=token, **kwargs), False
except FileNotFoundError: except FileNotFoundError: