From c709a4ad7228de0773f30f9455a78f79f25cced2 Mon Sep 17 00:00:00 2001
From: Tyler Goodlet <jgbt@protonmail.com>
Date: Sun, 16 Oct 2022 18:06:07 -0400
Subject: [PATCH] Add `ShmList` wrapping the stdlib's `ShareableList`

First attempt at getting `multiprocessing.shared_memory.ShareableList`
working; we wrap the stdlib type with a readonly attr and a `.key` for
cross-actor lookup. Also, rename all `numpy` specific routines to have
a `ndarray` suffix in the func names.
---
 tractor/_shm.py | 206 +++++++++++++++++++++++++++++++++---------------
 1 file changed, 141 insertions(+), 65 deletions(-)

diff --git a/tractor/_shm.py b/tractor/_shm.py
index dca9d5a5..63d18411 100644
--- a/tractor/_shm.py
+++ b/tractor/_shm.py
@@ -28,6 +28,7 @@ import time
 from typing import Optional
 from multiprocessing.shared_memory import (
     SharedMemory,
+    ShareableList,
     _USE_POSIX,
 )
 
@@ -87,10 +88,11 @@ disable_mantracker()
 
 
 class SharedInt:
-    """Wrapper around a single entry shared memory array which
+    '''
+    Wrapper around a single entry shared memory array which
     holds an ``int`` value used as an index counter.
 
-    """
+    '''
     def __init__(
         self,
         shm: SharedMemory,
@@ -117,10 +119,13 @@ class SharedInt:
                 log.warning(f'Shm for {name} already unlinked?')
 
 
-class _Token(Struct, frozen=True):
+class _NpToken(Struct, frozen=True):
     '''
-    Internal represenation of a shared memory "token"
-    which can be used to key a system wide post shm entry.
+    Internal represenation of a shared memory ``numpy`` array "token"
+    which can be used to key and load a system (OS) wide shm entry
+    and correctly read the array by type signature.
+
+    This type is msg safe.
 
     '''
     shm_name: str  # this servers as a "key" value
@@ -137,18 +142,18 @@ class _Token(Struct, frozen=True):
         return self.to_dict()
 
     @classmethod
-    def from_msg(cls, msg: dict) -> _Token:
-        if isinstance(msg, _Token):
+    def from_msg(cls, msg: dict) -> _NpToken:
+        if isinstance(msg, _NpToken):
             return msg
 
         # TODO: native struct decoding
         # return _token_dec.decode(msg)
 
         msg['dtype_descr'] = tuple(map(tuple, msg['dtype_descr']))
-        return _Token(**msg)
+        return _NpToken(**msg)
 
 
-# _token_dec = msgspec.msgpack.Decoder(_Token)
+# _token_dec = msgspec.msgpack.Decoder(_NpToken)
 
 # TODO: this api?
 # _known_tokens = tractor.ActorVar('_shm_tokens', {})
@@ -159,10 +164,14 @@ class _Token(Struct, frozen=True):
 _known_tokens = {}
 
 
-def get_shm_token(key: str) -> _Token:
-    """Convenience func to check if a token
+def get_shm_token(key: str) -> _NpToken | str:
+    '''
+    Convenience func to check if a token
     for the provided key is known by this process.
-    """
+
+    Returns either the ``numpy`` token or a string for a shared list.
+
+    '''
     return _known_tokens.get(key)
 
 
@@ -171,13 +180,13 @@ def _make_token(
     size: int,
     dtype: np.dtype,
 
-) -> _Token:
+) -> _NpToken:
     '''
     Create a serializable token that can be used
     to access a shared array.
 
     '''
-    return _Token(
+    return _NpToken(
         shm_name=key,
         shm_first_index_name=key + "_first",
         shm_last_index_name=key + "_last",
@@ -188,7 +197,7 @@ def _make_token(
 
 class ShmArray:
     '''
-    A shared memory ``numpy`` (compatible) array API.
+    A shared memory ``numpy.ndarray`` API.
 
     An underlying shared memory buffer is allocated based on
     a user specified ``numpy.ndarray``. This fixed size array
@@ -228,8 +237,8 @@ class ShmArray:
     # TODO: ringbuf api?
 
     @property
-    def _token(self) -> _Token:
-        return _Token(
+    def _token(self) -> _NpToken:
+        return _NpToken(
             shm_name=self._shm.name,
             shm_first_index_name=self._first._shm.name,
             shm_last_index_name=self._last._shm.name,
@@ -446,15 +455,17 @@ class ShmArray:
         ...
 
 
-def open_shm_array(
+def open_shm_ndarray(
 
     key: Optional[str] = None,
-    size: int = _default_size,  # see above
-    dtype: Optional[np.dtype] = None,
+    size: int = int(2 ** 10),
+    dtype: np.dtype | None = None,
+    append_start_index: int = 0,
     readonly: bool = False,
 
 ) -> ShmArray:
-    '''Open a memory shared ``numpy`` using the standard library.
+    '''
+    Open a memory shared ``numpy`` using the standard library.
 
     This call unlinks (aka permanently destroys) the buffer on teardown
     and thus should be used from the parent-most accessor (process).
@@ -501,10 +512,10 @@ def open_shm_array(
         )
     )
 
-    # start the "real-time" updated section after 3-days worth of 1s
-    # sampled OHLC. this allows appending up to a days worth from
-    # tick/quote feeds before having to flush to a (tsdb) storage
-    # backend, and looks something like,
+    # Start the "real-time" append-updated (or "pushed-to") section
+    # after some start index: ``append_start_index``. This allows appending
+    # from a start point in the array which isn't the 0 index and looks
+    # something like,
     # -------------------------
     # |              |        i
     # _________________________
@@ -518,7 +529,7 @@ def open_shm_array(
     # this sets the index to 3/4 of the length of the buffer
     # leaving a "days worth of second samples" for the real-time
     # section.
-    last.value = first.value = _rt_buffer_start
+    last.value = first.value = append_start_index
 
     shmarr = ShmArray(
         array,
@@ -540,7 +551,7 @@ def open_shm_array(
     return shmarr
 
 
-def attach_shm_array(
+def attach_shm_ndarray(
     token: tuple[str, str, tuple[str, str]],
     readonly: bool = True,
 
@@ -553,11 +564,11 @@ def attach_shm_array(
     access are constructed.
 
     '''
-    token = _Token.from_msg(token)
+    token = _NpToken.from_msg(token)
     key = token.shm_name
 
     if key in _known_tokens:
-        assert _Token.from_msg(_known_tokens[key]) == token, "WTF"
+        assert _NpToken.from_msg(_known_tokens[key]) == token, "WTF"
 
     # XXX: ugh, looks like due to the ``shm_open()`` C api we can't
     # actually place files in a subdir, see discussion here:
@@ -625,10 +636,14 @@ def attach_shm_array(
     return sha
 
 
-def maybe_open_shm_array(
-    key: str,
-    dtype: Optional[np.dtype] = None,
-    **kwargs,
+def maybe_open_shm_ndarray(
+    key: str,  # unique identifier for segment
+
+    # from ``open_shm_array()``
+    size: int = int(2 ** 10),  # array length in index terms
+    dtype: np.dtype | None = None,
+    append_start_index: int = 0,
+    readonly: bool = True,
 
 ) -> tuple[ShmArray, bool]:
     '''
@@ -641,18 +656,23 @@ def maybe_open_shm_array(
     running in **this** process. Systems where multiple actors may seek
     to access a common block can use this function to attempt to acquire
     a token as discovered by the actors who have previously stored
-    a "key" -> ``_Token`` map in an actor local (aka python global)
+    a "key" -> ``_NpToken`` map in an actor local (aka python global)
     variable.
 
-    If you know the explicit ``_Token`` for your memory segment instead
+    If you know the explicit ``_NpToken`` for your memory segment instead
     use ``attach_shm_array``.
 
     '''
-    size = kwargs.pop('size', _default_size)
     try:
         # see if we already know this key
         token = _known_tokens[key]
-        return attach_shm_array(token=token, **kwargs), False
+        return (
+            attach_shm_ndarray(
+                token=token,
+                readonly=readonly,
+            ),
+            False,  # not newly opened
+        )
     except KeyError:
         log.warning(f"Could not find {key} in shms cache")
         if dtype:
@@ -661,8 +681,16 @@ def maybe_open_shm_array(
                 size=size,
                 dtype=dtype,
             )
+        else:
+
             try:
-                return attach_shm_array(token=token, **kwargs), False
+                return (
+                    attach_shm_ndarray(
+                        token=token,
+                        readonly=readonly,
+                    ),
+                    False,
+                )
             except FileNotFoundError:
                 log.warning(f"Could not attach to shm with token {token}")
 
@@ -671,36 +699,84 @@ def maybe_open_shm_array(
         # Attempt to open a block and expect
         # to fail if a block has been allocated
         # on the OS by someone else.
-        return open_shm_array(key=key, dtype=dtype, **kwargs), True
+        return (
+            open_shm_ndarray(
+                key=key,
+                size=size,
+                dtype=dtype,
+                append_start_index=append_start_index,
+                readonly=readonly,
+            ),
+            True,
+        )
 
 
-def try_read(
-    array: np.ndarray
-
-) -> Optional[np.ndarray]:
+class ShmList(ShareableList):
     '''
-    Try to read the last row from a shared mem array or ``None``
-    if the array read returns a zero-length array result.
-
-    Can be used to check for backfilling race conditions where an array
-    is currently being (re-)written by a writer actor but the reader is
-    unaware and reads during the window where the first and last indexes
-    are being updated.
+    Carbon copy of ``.shared_memory.ShareableList`` but add a
+    readonly state instance var.
 
     '''
-    try:
-        return array[-1]
-    except IndexError:
-        # XXX: race condition with backfilling shm.
-        #
-        # the underlying issue is that a backfill (aka prepend) and subsequent
-        # shm array first/last index update could result in an empty array
-        # read here since the indices may be updated in such a way that
-        # a read delivers an empty array (though it seems like we
-        # *should* be able to prevent that?). also, as and alt and
-        # something we need anyway, maybe there should be some kind of
-        # signal that a prepend is taking place and this consumer can
-        # respond (eg. redrawing graphics) accordingly.
+    def __init__(
+        self,
+        sequence: list | None = None,
+        *,
+        name: str | None = None,
+        readonly: bool = True
 
-        # the array read was emtpy
-        return None
+    ) -> None:
+        self._readonly = readonly
+        self._key = name
+        return super().__init__(
+            sequence=sequence,
+            name=name,
+        )
+
+    @property
+    def key(self) -> str:
+        return self._key
+
+    def __setitem__(
+        self,
+        position,
+        value,
+
+    ) -> None:
+
+        # mimick ``numpy`` error
+        if self._readonly:
+            raise ValueError('assignment destination is read-only')
+
+        return super().__setitem__(position, value)
+
+
+def open_shm_list(
+    key: str,
+    sequence: list | None = None,
+    size: int = int(2 ** 10),
+    dtype: np.dtype | None = None,
+    readonly: bool = True,
+
+) -> ShmList:
+
+    if sequence is None:
+        sequence = list(map(float, range(size)))
+
+    shml = ShmList(
+        sequence=sequence,
+        name=key,
+        readonly=readonly,
+    )
+
+    # "close" attached shm on actor teardown
+    tractor.current_actor().lifetime_stack.callback(shml.shm.close)
+    tractor.current_actor().lifetime_stack.callback(shml.shm.unlink)
+
+    return shml
+
+
+def attach_shm_list(
+    key: str,
+) -> ShmList:
+
+    return ShmList(name=key)