diff --git a/tractor/_shm.py b/tractor/_shm.py index dca9d5a..63d1841 100644 --- a/tractor/_shm.py +++ b/tractor/_shm.py @@ -28,6 +28,7 @@ import time from typing import Optional from multiprocessing.shared_memory import ( SharedMemory, + ShareableList, _USE_POSIX, ) @@ -87,10 +88,11 @@ disable_mantracker() class SharedInt: - """Wrapper around a single entry shared memory array which + ''' + Wrapper around a single entry shared memory array which holds an ``int`` value used as an index counter. - """ + ''' def __init__( self, shm: SharedMemory, @@ -117,10 +119,13 @@ class SharedInt: log.warning(f'Shm for {name} already unlinked?') -class _Token(Struct, frozen=True): +class _NpToken(Struct, frozen=True): ''' - Internal represenation of a shared memory "token" - which can be used to key a system wide post shm entry. + Internal represenation of a shared memory ``numpy`` array "token" + which can be used to key and load a system (OS) wide shm entry + and correctly read the array by type signature. + + This type is msg safe. ''' shm_name: str # this servers as a "key" value @@ -137,18 +142,18 @@ class _Token(Struct, frozen=True): return self.to_dict() @classmethod - def from_msg(cls, msg: dict) -> _Token: - if isinstance(msg, _Token): + def from_msg(cls, msg: dict) -> _NpToken: + if isinstance(msg, _NpToken): return msg # TODO: native struct decoding # return _token_dec.decode(msg) msg['dtype_descr'] = tuple(map(tuple, msg['dtype_descr'])) - return _Token(**msg) + return _NpToken(**msg) -# _token_dec = msgspec.msgpack.Decoder(_Token) +# _token_dec = msgspec.msgpack.Decoder(_NpToken) # TODO: this api? # _known_tokens = tractor.ActorVar('_shm_tokens', {}) @@ -159,10 +164,14 @@ class _Token(Struct, frozen=True): _known_tokens = {} -def get_shm_token(key: str) -> _Token: - """Convenience func to check if a token +def get_shm_token(key: str) -> _NpToken | str: + ''' + Convenience func to check if a token for the provided key is known by this process. - """ + + Returns either the ``numpy`` token or a string for a shared list. + + ''' return _known_tokens.get(key) @@ -171,13 +180,13 @@ def _make_token( size: int, dtype: np.dtype, -) -> _Token: +) -> _NpToken: ''' Create a serializable token that can be used to access a shared array. ''' - return _Token( + return _NpToken( shm_name=key, shm_first_index_name=key + "_first", shm_last_index_name=key + "_last", @@ -188,7 +197,7 @@ def _make_token( class ShmArray: ''' - A shared memory ``numpy`` (compatible) array API. + A shared memory ``numpy.ndarray`` API. An underlying shared memory buffer is allocated based on a user specified ``numpy.ndarray``. This fixed size array @@ -228,8 +237,8 @@ class ShmArray: # TODO: ringbuf api? @property - def _token(self) -> _Token: - return _Token( + def _token(self) -> _NpToken: + return _NpToken( shm_name=self._shm.name, shm_first_index_name=self._first._shm.name, shm_last_index_name=self._last._shm.name, @@ -446,15 +455,17 @@ class ShmArray: ... -def open_shm_array( +def open_shm_ndarray( key: Optional[str] = None, - size: int = _default_size, # see above - dtype: Optional[np.dtype] = None, + size: int = int(2 ** 10), + dtype: np.dtype | None = None, + append_start_index: int = 0, readonly: bool = False, ) -> ShmArray: - '''Open a memory shared ``numpy`` using the standard library. + ''' + Open a memory shared ``numpy`` using the standard library. This call unlinks (aka permanently destroys) the buffer on teardown and thus should be used from the parent-most accessor (process). @@ -501,10 +512,10 @@ def open_shm_array( ) ) - # start the "real-time" updated section after 3-days worth of 1s - # sampled OHLC. this allows appending up to a days worth from - # tick/quote feeds before having to flush to a (tsdb) storage - # backend, and looks something like, + # Start the "real-time" append-updated (or "pushed-to") section + # after some start index: ``append_start_index``. This allows appending + # from a start point in the array which isn't the 0 index and looks + # something like, # ------------------------- # | | i # _________________________ @@ -518,7 +529,7 @@ def open_shm_array( # this sets the index to 3/4 of the length of the buffer # leaving a "days worth of second samples" for the real-time # section. - last.value = first.value = _rt_buffer_start + last.value = first.value = append_start_index shmarr = ShmArray( array, @@ -540,7 +551,7 @@ def open_shm_array( return shmarr -def attach_shm_array( +def attach_shm_ndarray( token: tuple[str, str, tuple[str, str]], readonly: bool = True, @@ -553,11 +564,11 @@ def attach_shm_array( access are constructed. ''' - token = _Token.from_msg(token) + token = _NpToken.from_msg(token) key = token.shm_name if key in _known_tokens: - assert _Token.from_msg(_known_tokens[key]) == token, "WTF" + assert _NpToken.from_msg(_known_tokens[key]) == token, "WTF" # XXX: ugh, looks like due to the ``shm_open()`` C api we can't # actually place files in a subdir, see discussion here: @@ -625,10 +636,14 @@ def attach_shm_array( return sha -def maybe_open_shm_array( - key: str, - dtype: Optional[np.dtype] = None, - **kwargs, +def maybe_open_shm_ndarray( + key: str, # unique identifier for segment + + # from ``open_shm_array()`` + size: int = int(2 ** 10), # array length in index terms + dtype: np.dtype | None = None, + append_start_index: int = 0, + readonly: bool = True, ) -> tuple[ShmArray, bool]: ''' @@ -641,18 +656,23 @@ def maybe_open_shm_array( running in **this** process. Systems where multiple actors may seek to access a common block can use this function to attempt to acquire a token as discovered by the actors who have previously stored - a "key" -> ``_Token`` map in an actor local (aka python global) + a "key" -> ``_NpToken`` map in an actor local (aka python global) variable. - If you know the explicit ``_Token`` for your memory segment instead + If you know the explicit ``_NpToken`` for your memory segment instead use ``attach_shm_array``. ''' - size = kwargs.pop('size', _default_size) try: # see if we already know this key token = _known_tokens[key] - return attach_shm_array(token=token, **kwargs), False + return ( + attach_shm_ndarray( + token=token, + readonly=readonly, + ), + False, # not newly opened + ) except KeyError: log.warning(f"Could not find {key} in shms cache") if dtype: @@ -661,8 +681,16 @@ def maybe_open_shm_array( size=size, dtype=dtype, ) + else: + try: - return attach_shm_array(token=token, **kwargs), False + return ( + attach_shm_ndarray( + token=token, + readonly=readonly, + ), + False, + ) except FileNotFoundError: log.warning(f"Could not attach to shm with token {token}") @@ -671,36 +699,84 @@ def maybe_open_shm_array( # Attempt to open a block and expect # to fail if a block has been allocated # on the OS by someone else. - return open_shm_array(key=key, dtype=dtype, **kwargs), True + return ( + open_shm_ndarray( + key=key, + size=size, + dtype=dtype, + append_start_index=append_start_index, + readonly=readonly, + ), + True, + ) -def try_read( - array: np.ndarray - -) -> Optional[np.ndarray]: +class ShmList(ShareableList): ''' - Try to read the last row from a shared mem array or ``None`` - if the array read returns a zero-length array result. - - Can be used to check for backfilling race conditions where an array - is currently being (re-)written by a writer actor but the reader is - unaware and reads during the window where the first and last indexes - are being updated. + Carbon copy of ``.shared_memory.ShareableList`` but add a + readonly state instance var. ''' - try: - return array[-1] - except IndexError: - # XXX: race condition with backfilling shm. - # - # the underlying issue is that a backfill (aka prepend) and subsequent - # shm array first/last index update could result in an empty array - # read here since the indices may be updated in such a way that - # a read delivers an empty array (though it seems like we - # *should* be able to prevent that?). also, as and alt and - # something we need anyway, maybe there should be some kind of - # signal that a prepend is taking place and this consumer can - # respond (eg. redrawing graphics) accordingly. + def __init__( + self, + sequence: list | None = None, + *, + name: str | None = None, + readonly: bool = True - # the array read was emtpy - return None + ) -> None: + self._readonly = readonly + self._key = name + return super().__init__( + sequence=sequence, + name=name, + ) + + @property + def key(self) -> str: + return self._key + + def __setitem__( + self, + position, + value, + + ) -> None: + + # mimick ``numpy`` error + if self._readonly: + raise ValueError('assignment destination is read-only') + + return super().__setitem__(position, value) + + +def open_shm_list( + key: str, + sequence: list | None = None, + size: int = int(2 ** 10), + dtype: np.dtype | None = None, + readonly: bool = True, + +) -> ShmList: + + if sequence is None: + sequence = list(map(float, range(size))) + + shml = ShmList( + sequence=sequence, + name=key, + readonly=readonly, + ) + + # "close" attached shm on actor teardown + tractor.current_actor().lifetime_stack.callback(shml.shm.close) + tractor.current_actor().lifetime_stack.callback(shml.shm.unlink) + + return shml + + +def attach_shm_list( + key: str, +) -> ShmList: + + return ShmList(name=key)