From ef7ca49e9b90d14d32bf1ab8b599f05d6f4411b4 Mon Sep 17 00:00:00 2001
From: Tyler Goodlet <jgbt@protonmail.com>
Date: Sat, 15 Oct 2022 16:35:32 -0400
Subject: [PATCH 01/12] Initial module import from `piker.data._sharemem`

More or less a verbatim copy-paste minus some edgy variable naming and
internal `piker` module imports. There is a bunch of OHLC related
defaults that need to be dropped and we need to adjust to an optional
dependence on `numpy` by supporting shared lists as per the mp docs.
---
 tractor/_shm.py | 706 ++++++++++++++++++++++++++++++++++++++++++++++++
 1 file changed, 706 insertions(+)
 create mode 100644 tractor/_shm.py

diff --git a/tractor/_shm.py b/tractor/_shm.py
new file mode 100644
index 00000000..dca9d5a5
--- /dev/null
+++ b/tractor/_shm.py
@@ -0,0 +1,706 @@
+# tractor: structured concurrent "actors".
+# Copyright 2018-eternity Tyler Goodlet.
+
+# This program is free software: you can redistribute it and/or modify
+# it under the terms of the GNU Affero General Public License as published by
+# the Free Software Foundation, either version 3 of the License, or
+# (at your option) any later version.
+
+# This program is distributed in the hope that it will be useful,
+# but WITHOUT ANY WARRANTY; without even the implied warranty of
+# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
+# GNU Affero General Public License for more details.
+
+# You should have received a copy of the GNU Affero General Public License
+# along with this program.  If not, see <https://www.gnu.org/licenses/>.
+
+"""
+SC friendly shared memory management geared at real-time
+processing.
+
+Support for ``numpy`` compatible array-buffers is provided but is
+considered optional within the context of this runtime-library.
+
+"""
+from __future__ import annotations
+from sys import byteorder
+import time
+from typing import Optional
+from multiprocessing.shared_memory import (
+    SharedMemory,
+    _USE_POSIX,
+)
+
+if _USE_POSIX:
+    from _posixshmem import shm_unlink
+
+from msgspec import Struct
+import numpy as np
+from numpy.lib import recfunctions as rfn
+import tractor
+
+from .log import get_logger
+
+
+log = get_logger(__name__)
+
+
+# how  much is probably dependent on lifestyle
+_secs_in_day = int(60 * 60 * 24)
+# we try for a buncha times, but only on a run-every-other-day kinda week.
+_days_worth = 16
+_default_size = _days_worth * _secs_in_day
+# where to start the new data append index
+_rt_buffer_start = int((_days_worth - 1) * _secs_in_day)
+
+
+def disable_mantracker():
+    '''
+    Disable all ``multiprocessing``` "resource tracking" machinery since
+    it's an absolute multi-threaded mess of non-SC madness.
+
+    '''
+    from multiprocessing import resource_tracker as mantracker
+
+    # Tell the "resource tracker" thing to fuck off.
+    class ManTracker(mantracker.ResourceTracker):
+        def register(self, name, rtype):
+            pass
+
+        def unregister(self, name, rtype):
+            pass
+
+        def ensure_running(self):
+            pass
+
+    # "know your land and know your prey"
+    # https://www.dailymotion.com/video/x6ozzco
+    mantracker._resource_tracker = ManTracker()
+    mantracker.register = mantracker._resource_tracker.register
+    mantracker.ensure_running = mantracker._resource_tracker.ensure_running
+    # ensure_running = mantracker._resource_tracker.ensure_running
+    mantracker.unregister = mantracker._resource_tracker.unregister
+    mantracker.getfd = mantracker._resource_tracker.getfd
+
+
+disable_mantracker()
+
+
+class SharedInt:
+    """Wrapper around a single entry shared memory array which
+    holds an ``int`` value used as an index counter.
+
+    """
+    def __init__(
+        self,
+        shm: SharedMemory,
+    ) -> None:
+        self._shm = shm
+
+    @property
+    def value(self) -> int:
+        return int.from_bytes(self._shm.buf, byteorder)
+
+    @value.setter
+    def value(self, value) -> None:
+        self._shm.buf[:] = value.to_bytes(self._shm.size, byteorder)
+
+    def destroy(self) -> None:
+        if _USE_POSIX:
+            # We manually unlink to bypass all the "resource tracker"
+            # nonsense meant for non-SC systems.
+            name = self._shm.name
+            try:
+                shm_unlink(name)
+            except FileNotFoundError:
+                # might be a teardown race here?
+                log.warning(f'Shm for {name} already unlinked?')
+
+
+class _Token(Struct, frozen=True):
+    '''
+    Internal represenation of a shared memory "token"
+    which can be used to key a system wide post shm entry.
+
+    '''
+    shm_name: str  # this servers as a "key" value
+    shm_first_index_name: str
+    shm_last_index_name: str
+    dtype_descr: tuple
+    size: int  # in struct-array index / row terms
+
+    @property
+    def dtype(self) -> np.dtype:
+        return np.dtype(list(map(tuple, self.dtype_descr))).descr
+
+    def as_msg(self):
+        return self.to_dict()
+
+    @classmethod
+    def from_msg(cls, msg: dict) -> _Token:
+        if isinstance(msg, _Token):
+            return msg
+
+        # TODO: native struct decoding
+        # return _token_dec.decode(msg)
+
+        msg['dtype_descr'] = tuple(map(tuple, msg['dtype_descr']))
+        return _Token(**msg)
+
+
+# _token_dec = msgspec.msgpack.Decoder(_Token)
+
+# TODO: this api?
+# _known_tokens = tractor.ActorVar('_shm_tokens', {})
+# _known_tokens = tractor.ContextStack('_known_tokens', )
+# _known_tokens = trio.RunVar('shms', {})
+
+# process-local store of keys to tokens
+_known_tokens = {}
+
+
+def get_shm_token(key: str) -> _Token:
+    """Convenience func to check if a token
+    for the provided key is known by this process.
+    """
+    return _known_tokens.get(key)
+
+
+def _make_token(
+    key: str,
+    size: int,
+    dtype: np.dtype,
+
+) -> _Token:
+    '''
+    Create a serializable token that can be used
+    to access a shared array.
+
+    '''
+    return _Token(
+        shm_name=key,
+        shm_first_index_name=key + "_first",
+        shm_last_index_name=key + "_last",
+        dtype_descr=tuple(np.dtype(dtype).descr),
+        size=size,
+    )
+
+
+class ShmArray:
+    '''
+    A shared memory ``numpy`` (compatible) array API.
+
+    An underlying shared memory buffer is allocated based on
+    a user specified ``numpy.ndarray``. This fixed size array
+    can be read and written to by pushing data both onto the "front"
+    or "back" of a set index range. The indexes for the "first" and
+    "last" index are themselves stored in shared memory (accessed via
+    ``SharedInt`` interfaces) values such that multiple processes can
+    interact with the same array using a synchronized-index.
+
+    '''
+    def __init__(
+        self,
+        shmarr: np.ndarray,
+        first: SharedInt,
+        last: SharedInt,
+        shm: SharedMemory,
+        # readonly: bool = True,
+    ) -> None:
+        self._array = shmarr
+
+        # indexes for first and last indices corresponding
+        # to fille data
+        self._first = first
+        self._last = last
+
+        self._len = len(shmarr)
+        self._shm = shm
+        self._post_init: bool = False
+
+        # pushing data does not write the index (aka primary key)
+        dtype = shmarr.dtype
+        if dtype.fields:
+            self._write_fields = list(shmarr.dtype.fields.keys())[1:]
+        else:
+            self._write_fields = None
+
+    # TODO: ringbuf api?
+
+    @property
+    def _token(self) -> _Token:
+        return _Token(
+            shm_name=self._shm.name,
+            shm_first_index_name=self._first._shm.name,
+            shm_last_index_name=self._last._shm.name,
+            dtype_descr=tuple(self._array.dtype.descr),
+            size=self._len,
+        )
+
+    @property
+    def token(self) -> dict:
+        """Shared memory token that can be serialized and used by
+        another process to attach to this array.
+        """
+        return self._token.as_msg()
+
+    @property
+    def index(self) -> int:
+        return self._last.value % self._len
+
+    @property
+    def array(self) -> np.ndarray:
+        '''
+        Return an up-to-date ``np.ndarray`` view of the
+        so-far-written data to the underlying shm buffer.
+
+        '''
+        a = self._array[self._first.value:self._last.value]
+
+        # first, last = self._first.value, self._last.value
+        # a = self._array[first:last]
+
+        # TODO: eventually comment this once we've not seen it in the
+        # wild in a long time..
+        # XXX: race where first/last indexes cause a reader
+        # to load an empty array..
+        if len(a) == 0 and self._post_init:
+            raise RuntimeError('Empty array race condition hit!?')
+            # breakpoint()
+
+        return a
+
+    def ustruct(
+        self,
+        fields: Optional[list[str]] = None,
+
+        # type that all field values will be cast to
+        # in the returned view.
+        common_dtype: np.dtype = np.float,
+
+    ) -> np.ndarray:
+
+        array = self._array
+
+        if fields:
+            selection = array[fields]
+            # fcount = len(fields)
+        else:
+            selection = array
+            # fcount = len(array.dtype.fields)
+
+        # XXX: manual ``.view()`` attempt that also doesn't work.
+        # uview = selection.view(
+        #     dtype='<f16',
+        # ).reshape(-1, 4, order='A')
+
+        # assert len(selection) == len(uview)
+
+        u = rfn.structured_to_unstructured(
+            selection,
+            # dtype=float,
+            copy=True,
+        )
+
+        # unstruct = np.ndarray(u.shape, dtype=a.dtype, buffer=shm.buf)
+        # array[:] = a[:]
+        return u
+        # return ShmArray(
+        #     shmarr=u,
+        #     first=self._first,
+        #     last=self._last,
+        #     shm=self._shm
+        # )
+
+    def last(
+        self,
+        length: int = 1,
+
+    ) -> np.ndarray:
+        '''
+        Return the last ``length``'s worth of ("row") entries from the
+        array.
+
+        '''
+        return self.array[-length:]
+
+    def push(
+        self,
+        data: np.ndarray,
+
+        field_map: Optional[dict[str, str]] = None,
+        prepend: bool = False,
+        update_first: bool = True,
+        start: Optional[int] = None,
+
+    ) -> int:
+        '''
+        Ring buffer like "push" to append data
+        into the buffer and return updated "last" index.
+
+        NB: no actual ring logic yet to give a "loop around" on overflow
+        condition, lel.
+
+        '''
+        length = len(data)
+
+        if prepend:
+            index = (start or self._first.value) - length
+
+            if index < 0:
+                raise ValueError(
+                    f'Array size of {self._len} was overrun during prepend.\n'
+                    f'You have passed {abs(index)} too many datums.'
+                )
+
+        else:
+            index = start if start is not None else self._last.value
+
+        end = index + length
+
+        if field_map:
+            src_names, dst_names = zip(*field_map.items())
+        else:
+            dst_names = src_names = self._write_fields
+
+        try:
+            self._array[
+                list(dst_names)
+            ][index:end] = data[list(src_names)][:]
+
+            # NOTE: there was a race here between updating
+            # the first and last indices and when the next reader
+            # tries to access ``.array`` (which due to the index
+            # overlap will be empty). Pretty sure we've fixed it now
+            # but leaving this here as a reminder.
+            if prepend and update_first and length:
+                assert index < self._first.value
+
+            if (
+                index < self._first.value
+                and update_first
+            ):
+                assert prepend, 'prepend=True not passed but index decreased?'
+                self._first.value = index
+
+            elif not prepend:
+                self._last.value = end
+
+            self._post_init = True
+            return end
+
+        except ValueError as err:
+            if field_map:
+                raise
+
+            # should raise if diff detected
+            self.diff_err_fields(data)
+            raise err
+
+    def diff_err_fields(
+        self,
+        data: np.ndarray,
+    ) -> None:
+        # reraise with any field discrepancy
+        our_fields, their_fields = (
+            set(self._array.dtype.fields),
+            set(data.dtype.fields),
+        )
+
+        only_in_ours = our_fields - their_fields
+        only_in_theirs = their_fields - our_fields
+
+        if only_in_ours:
+            raise TypeError(
+                f"Input array is missing field(s): {only_in_ours}"
+            )
+        elif only_in_theirs:
+            raise TypeError(
+                f"Input array has unknown field(s): {only_in_theirs}"
+            )
+
+    # TODO: support "silent" prepends that don't update ._first.value?
+    def prepend(
+        self,
+        data: np.ndarray,
+    ) -> int:
+        end = self.push(data, prepend=True)
+        assert end
+
+    def close(self) -> None:
+        self._first._shm.close()
+        self._last._shm.close()
+        self._shm.close()
+
+    def destroy(self) -> None:
+        if _USE_POSIX:
+            # We manually unlink to bypass all the "resource tracker"
+            # nonsense meant for non-SC systems.
+            shm_unlink(self._shm.name)
+
+        self._first.destroy()
+        self._last.destroy()
+
+    def flush(self) -> None:
+        # TODO: flush to storage backend like markestore?
+        ...
+
+
+def open_shm_array(
+
+    key: Optional[str] = None,
+    size: int = _default_size,  # see above
+    dtype: Optional[np.dtype] = None,
+    readonly: bool = False,
+
+) -> ShmArray:
+    '''Open a memory shared ``numpy`` using the standard library.
+
+    This call unlinks (aka permanently destroys) the buffer on teardown
+    and thus should be used from the parent-most accessor (process).
+
+    '''
+    # create new shared mem segment for which we
+    # have write permission
+    a = np.zeros(size, dtype=dtype)
+    a['index'] = np.arange(len(a))
+
+    shm = SharedMemory(
+        name=key,
+        create=True,
+        size=a.nbytes
+    )
+    array = np.ndarray(
+        a.shape,
+        dtype=a.dtype,
+        buffer=shm.buf
+    )
+    array[:] = a[:]
+    array.setflags(write=int(not readonly))
+
+    token = _make_token(
+        key=key,
+        size=size,
+        dtype=dtype,
+    )
+
+    # create single entry arrays for storing an first and last indices
+    first = SharedInt(
+        shm=SharedMemory(
+            name=token.shm_first_index_name,
+            create=True,
+            size=4,  # std int
+        )
+    )
+
+    last = SharedInt(
+        shm=SharedMemory(
+            name=token.shm_last_index_name,
+            create=True,
+            size=4,  # std int
+        )
+    )
+
+    # start the "real-time" updated section after 3-days worth of 1s
+    # sampled OHLC. this allows appending up to a days worth from
+    # tick/quote feeds before having to flush to a (tsdb) storage
+    # backend, and looks something like,
+    # -------------------------
+    # |              |        i
+    # _________________________
+    # <-------------> <------->
+    #  history         real-time
+    #
+    # Once fully "prepended", the history section will leave the
+    # ``ShmArray._start.value: int = 0`` and the yet-to-be written
+    # real-time section will start at ``ShmArray.index: int``.
+
+    # this sets the index to 3/4 of the length of the buffer
+    # leaving a "days worth of second samples" for the real-time
+    # section.
+    last.value = first.value = _rt_buffer_start
+
+    shmarr = ShmArray(
+        array,
+        first,
+        last,
+        shm,
+    )
+
+    assert shmarr._token == token
+    _known_tokens[key] = shmarr.token
+
+    # "unlink" created shm on process teardown by
+    # pushing teardown calls onto actor context stack
+
+    stack = tractor.current_actor().lifetime_stack
+    stack.callback(shmarr.close)
+    stack.callback(shmarr.destroy)
+
+    return shmarr
+
+
+def attach_shm_array(
+    token: tuple[str, str, tuple[str, str]],
+    readonly: bool = True,
+
+) -> ShmArray:
+    '''
+    Attach to an existing shared memory array previously
+    created by another process using ``open_shared_array``.
+
+    No new shared mem is allocated but wrapper types for read/write
+    access are constructed.
+
+    '''
+    token = _Token.from_msg(token)
+    key = token.shm_name
+
+    if key in _known_tokens:
+        assert _Token.from_msg(_known_tokens[key]) == token, "WTF"
+
+    # XXX: ugh, looks like due to the ``shm_open()`` C api we can't
+    # actually place files in a subdir, see discussion here:
+    # https://stackoverflow.com/a/11103289
+
+    # attach to array buffer and view as per dtype
+    _err: Optional[Exception] = None
+    for _ in range(3):
+        try:
+            shm = SharedMemory(
+                name=key,
+                create=False,
+            )
+            break
+        except OSError as oserr:
+            _err = oserr
+            time.sleep(0.1)
+    else:
+        if _err:
+            raise _err
+
+    shmarr = np.ndarray(
+        (token.size,),
+        dtype=token.dtype,
+        buffer=shm.buf
+    )
+    shmarr.setflags(write=int(not readonly))
+
+    first = SharedInt(
+        shm=SharedMemory(
+            name=token.shm_first_index_name,
+            create=False,
+            size=4,  # std int
+        ),
+    )
+    last = SharedInt(
+        shm=SharedMemory(
+            name=token.shm_last_index_name,
+            create=False,
+            size=4,  # std int
+        ),
+    )
+
+    # make sure we can read
+    first.value
+
+    sha = ShmArray(
+        shmarr,
+        first,
+        last,
+        shm,
+    )
+    # read test
+    sha.array
+
+    # Stash key -> token knowledge for future queries
+    # via `maybe_opepn_shm_array()` but only after we know
+    # we can attach.
+    if key not in _known_tokens:
+        _known_tokens[key] = token
+
+    # "close" attached shm on actor teardown
+    tractor.current_actor().lifetime_stack.callback(sha.close)
+
+    return sha
+
+
+def maybe_open_shm_array(
+    key: str,
+    dtype: Optional[np.dtype] = None,
+    **kwargs,
+
+) -> tuple[ShmArray, bool]:
+    '''
+    Attempt to attach to a shared memory block using a "key" lookup
+    to registered blocks in the users overall "system" registry
+    (presumes you don't have the block's explicit token).
+
+    This function is meant to solve the problem of discovering whether
+    a shared array token has been allocated or discovered by the actor
+    running in **this** process. Systems where multiple actors may seek
+    to access a common block can use this function to attempt to acquire
+    a token as discovered by the actors who have previously stored
+    a "key" -> ``_Token`` map in an actor local (aka python global)
+    variable.
+
+    If you know the explicit ``_Token`` for your memory segment instead
+    use ``attach_shm_array``.
+
+    '''
+    size = kwargs.pop('size', _default_size)
+    try:
+        # see if we already know this key
+        token = _known_tokens[key]
+        return attach_shm_array(token=token, **kwargs), False
+    except KeyError:
+        log.warning(f"Could not find {key} in shms cache")
+        if dtype:
+            token = _make_token(
+                key,
+                size=size,
+                dtype=dtype,
+            )
+            try:
+                return attach_shm_array(token=token, **kwargs), False
+            except FileNotFoundError:
+                log.warning(f"Could not attach to shm with token {token}")
+
+        # This actor does not know about memory
+        # associated with the provided "key".
+        # Attempt to open a block and expect
+        # to fail if a block has been allocated
+        # on the OS by someone else.
+        return open_shm_array(key=key, dtype=dtype, **kwargs), True
+
+
+def try_read(
+    array: np.ndarray
+
+) -> Optional[np.ndarray]:
+    '''
+    Try to read the last row from a shared mem array or ``None``
+    if the array read returns a zero-length array result.
+
+    Can be used to check for backfilling race conditions where an array
+    is currently being (re-)written by a writer actor but the reader is
+    unaware and reads during the window where the first and last indexes
+    are being updated.
+
+    '''
+    try:
+        return array[-1]
+    except IndexError:
+        # XXX: race condition with backfilling shm.
+        #
+        # the underlying issue is that a backfill (aka prepend) and subsequent
+        # shm array first/last index update could result in an empty array
+        # read here since the indices may be updated in such a way that
+        # a read delivers an empty array (though it seems like we
+        # *should* be able to prevent that?). also, as and alt and
+        # something we need anyway, maybe there should be some kind of
+        # signal that a prepend is taking place and this consumer can
+        # respond (eg. redrawing graphics) accordingly.
+
+        # the array read was emtpy
+        return None
-- 
2.34.1


From 7ae194baed4c454e2173e44f5ab7405464552d70 Mon Sep 17 00:00:00 2001
From: Tyler Goodlet <jgbt@protonmail.com>
Date: Sun, 16 Oct 2022 18:06:07 -0400
Subject: [PATCH 02/12] Add `ShmList` wrapping the stdlib's `ShareableList`

First attempt at getting `multiprocessing.shared_memory.ShareableList`
working; we wrap the stdlib type with a readonly attr and a `.key` for
cross-actor lookup. Also, rename all `numpy` specific routines to have
a `ndarray` suffix in the func names.
---
 tractor/_shm.py | 206 +++++++++++++++++++++++++++++++++---------------
 1 file changed, 141 insertions(+), 65 deletions(-)

diff --git a/tractor/_shm.py b/tractor/_shm.py
index dca9d5a5..63d18411 100644
--- a/tractor/_shm.py
+++ b/tractor/_shm.py
@@ -28,6 +28,7 @@ import time
 from typing import Optional
 from multiprocessing.shared_memory import (
     SharedMemory,
+    ShareableList,
     _USE_POSIX,
 )
 
@@ -87,10 +88,11 @@ disable_mantracker()
 
 
 class SharedInt:
-    """Wrapper around a single entry shared memory array which
+    '''
+    Wrapper around a single entry shared memory array which
     holds an ``int`` value used as an index counter.
 
-    """
+    '''
     def __init__(
         self,
         shm: SharedMemory,
@@ -117,10 +119,13 @@ class SharedInt:
                 log.warning(f'Shm for {name} already unlinked?')
 
 
-class _Token(Struct, frozen=True):
+class _NpToken(Struct, frozen=True):
     '''
-    Internal represenation of a shared memory "token"
-    which can be used to key a system wide post shm entry.
+    Internal represenation of a shared memory ``numpy`` array "token"
+    which can be used to key and load a system (OS) wide shm entry
+    and correctly read the array by type signature.
+
+    This type is msg safe.
 
     '''
     shm_name: str  # this servers as a "key" value
@@ -137,18 +142,18 @@ class _Token(Struct, frozen=True):
         return self.to_dict()
 
     @classmethod
-    def from_msg(cls, msg: dict) -> _Token:
-        if isinstance(msg, _Token):
+    def from_msg(cls, msg: dict) -> _NpToken:
+        if isinstance(msg, _NpToken):
             return msg
 
         # TODO: native struct decoding
         # return _token_dec.decode(msg)
 
         msg['dtype_descr'] = tuple(map(tuple, msg['dtype_descr']))
-        return _Token(**msg)
+        return _NpToken(**msg)
 
 
-# _token_dec = msgspec.msgpack.Decoder(_Token)
+# _token_dec = msgspec.msgpack.Decoder(_NpToken)
 
 # TODO: this api?
 # _known_tokens = tractor.ActorVar('_shm_tokens', {})
@@ -159,10 +164,14 @@ class _Token(Struct, frozen=True):
 _known_tokens = {}
 
 
-def get_shm_token(key: str) -> _Token:
-    """Convenience func to check if a token
+def get_shm_token(key: str) -> _NpToken | str:
+    '''
+    Convenience func to check if a token
     for the provided key is known by this process.
-    """
+
+    Returns either the ``numpy`` token or a string for a shared list.
+
+    '''
     return _known_tokens.get(key)
 
 
@@ -171,13 +180,13 @@ def _make_token(
     size: int,
     dtype: np.dtype,
 
-) -> _Token:
+) -> _NpToken:
     '''
     Create a serializable token that can be used
     to access a shared array.
 
     '''
-    return _Token(
+    return _NpToken(
         shm_name=key,
         shm_first_index_name=key + "_first",
         shm_last_index_name=key + "_last",
@@ -188,7 +197,7 @@ def _make_token(
 
 class ShmArray:
     '''
-    A shared memory ``numpy`` (compatible) array API.
+    A shared memory ``numpy.ndarray`` API.
 
     An underlying shared memory buffer is allocated based on
     a user specified ``numpy.ndarray``. This fixed size array
@@ -228,8 +237,8 @@ class ShmArray:
     # TODO: ringbuf api?
 
     @property
-    def _token(self) -> _Token:
-        return _Token(
+    def _token(self) -> _NpToken:
+        return _NpToken(
             shm_name=self._shm.name,
             shm_first_index_name=self._first._shm.name,
             shm_last_index_name=self._last._shm.name,
@@ -446,15 +455,17 @@ class ShmArray:
         ...
 
 
-def open_shm_array(
+def open_shm_ndarray(
 
     key: Optional[str] = None,
-    size: int = _default_size,  # see above
-    dtype: Optional[np.dtype] = None,
+    size: int = int(2 ** 10),
+    dtype: np.dtype | None = None,
+    append_start_index: int = 0,
     readonly: bool = False,
 
 ) -> ShmArray:
-    '''Open a memory shared ``numpy`` using the standard library.
+    '''
+    Open a memory shared ``numpy`` using the standard library.
 
     This call unlinks (aka permanently destroys) the buffer on teardown
     and thus should be used from the parent-most accessor (process).
@@ -501,10 +512,10 @@ def open_shm_array(
         )
     )
 
-    # start the "real-time" updated section after 3-days worth of 1s
-    # sampled OHLC. this allows appending up to a days worth from
-    # tick/quote feeds before having to flush to a (tsdb) storage
-    # backend, and looks something like,
+    # Start the "real-time" append-updated (or "pushed-to") section
+    # after some start index: ``append_start_index``. This allows appending
+    # from a start point in the array which isn't the 0 index and looks
+    # something like,
     # -------------------------
     # |              |        i
     # _________________________
@@ -518,7 +529,7 @@ def open_shm_array(
     # this sets the index to 3/4 of the length of the buffer
     # leaving a "days worth of second samples" for the real-time
     # section.
-    last.value = first.value = _rt_buffer_start
+    last.value = first.value = append_start_index
 
     shmarr = ShmArray(
         array,
@@ -540,7 +551,7 @@ def open_shm_array(
     return shmarr
 
 
-def attach_shm_array(
+def attach_shm_ndarray(
     token: tuple[str, str, tuple[str, str]],
     readonly: bool = True,
 
@@ -553,11 +564,11 @@ def attach_shm_array(
     access are constructed.
 
     '''
-    token = _Token.from_msg(token)
+    token = _NpToken.from_msg(token)
     key = token.shm_name
 
     if key in _known_tokens:
-        assert _Token.from_msg(_known_tokens[key]) == token, "WTF"
+        assert _NpToken.from_msg(_known_tokens[key]) == token, "WTF"
 
     # XXX: ugh, looks like due to the ``shm_open()`` C api we can't
     # actually place files in a subdir, see discussion here:
@@ -625,10 +636,14 @@ def attach_shm_array(
     return sha
 
 
-def maybe_open_shm_array(
-    key: str,
-    dtype: Optional[np.dtype] = None,
-    **kwargs,
+def maybe_open_shm_ndarray(
+    key: str,  # unique identifier for segment
+
+    # from ``open_shm_array()``
+    size: int = int(2 ** 10),  # array length in index terms
+    dtype: np.dtype | None = None,
+    append_start_index: int = 0,
+    readonly: bool = True,
 
 ) -> tuple[ShmArray, bool]:
     '''
@@ -641,18 +656,23 @@ def maybe_open_shm_array(
     running in **this** process. Systems where multiple actors may seek
     to access a common block can use this function to attempt to acquire
     a token as discovered by the actors who have previously stored
-    a "key" -> ``_Token`` map in an actor local (aka python global)
+    a "key" -> ``_NpToken`` map in an actor local (aka python global)
     variable.
 
-    If you know the explicit ``_Token`` for your memory segment instead
+    If you know the explicit ``_NpToken`` for your memory segment instead
     use ``attach_shm_array``.
 
     '''
-    size = kwargs.pop('size', _default_size)
     try:
         # see if we already know this key
         token = _known_tokens[key]
-        return attach_shm_array(token=token, **kwargs), False
+        return (
+            attach_shm_ndarray(
+                token=token,
+                readonly=readonly,
+            ),
+            False,  # not newly opened
+        )
     except KeyError:
         log.warning(f"Could not find {key} in shms cache")
         if dtype:
@@ -661,8 +681,16 @@ def maybe_open_shm_array(
                 size=size,
                 dtype=dtype,
             )
+        else:
+
             try:
-                return attach_shm_array(token=token, **kwargs), False
+                return (
+                    attach_shm_ndarray(
+                        token=token,
+                        readonly=readonly,
+                    ),
+                    False,
+                )
             except FileNotFoundError:
                 log.warning(f"Could not attach to shm with token {token}")
 
@@ -671,36 +699,84 @@ def maybe_open_shm_array(
         # Attempt to open a block and expect
         # to fail if a block has been allocated
         # on the OS by someone else.
-        return open_shm_array(key=key, dtype=dtype, **kwargs), True
+        return (
+            open_shm_ndarray(
+                key=key,
+                size=size,
+                dtype=dtype,
+                append_start_index=append_start_index,
+                readonly=readonly,
+            ),
+            True,
+        )
 
 
-def try_read(
-    array: np.ndarray
-
-) -> Optional[np.ndarray]:
+class ShmList(ShareableList):
     '''
-    Try to read the last row from a shared mem array or ``None``
-    if the array read returns a zero-length array result.
-
-    Can be used to check for backfilling race conditions where an array
-    is currently being (re-)written by a writer actor but the reader is
-    unaware and reads during the window where the first and last indexes
-    are being updated.
+    Carbon copy of ``.shared_memory.ShareableList`` but add a
+    readonly state instance var.
 
     '''
-    try:
-        return array[-1]
-    except IndexError:
-        # XXX: race condition with backfilling shm.
-        #
-        # the underlying issue is that a backfill (aka prepend) and subsequent
-        # shm array first/last index update could result in an empty array
-        # read here since the indices may be updated in such a way that
-        # a read delivers an empty array (though it seems like we
-        # *should* be able to prevent that?). also, as and alt and
-        # something we need anyway, maybe there should be some kind of
-        # signal that a prepend is taking place and this consumer can
-        # respond (eg. redrawing graphics) accordingly.
+    def __init__(
+        self,
+        sequence: list | None = None,
+        *,
+        name: str | None = None,
+        readonly: bool = True
 
-        # the array read was emtpy
-        return None
+    ) -> None:
+        self._readonly = readonly
+        self._key = name
+        return super().__init__(
+            sequence=sequence,
+            name=name,
+        )
+
+    @property
+    def key(self) -> str:
+        return self._key
+
+    def __setitem__(
+        self,
+        position,
+        value,
+
+    ) -> None:
+
+        # mimick ``numpy`` error
+        if self._readonly:
+            raise ValueError('assignment destination is read-only')
+
+        return super().__setitem__(position, value)
+
+
+def open_shm_list(
+    key: str,
+    sequence: list | None = None,
+    size: int = int(2 ** 10),
+    dtype: np.dtype | None = None,
+    readonly: bool = True,
+
+) -> ShmList:
+
+    if sequence is None:
+        sequence = list(map(float, range(size)))
+
+    shml = ShmList(
+        sequence=sequence,
+        name=key,
+        readonly=readonly,
+    )
+
+    # "close" attached shm on actor teardown
+    tractor.current_actor().lifetime_stack.callback(shml.shm.close)
+    tractor.current_actor().lifetime_stack.callback(shml.shm.unlink)
+
+    return shml
+
+
+def attach_shm_list(
+    key: str,
+) -> ShmList:
+
+    return ShmList(name=key)
-- 
2.34.1


From 33482d8f41b72b5bac3f5ee3072f18a5e67a93ec Mon Sep 17 00:00:00 2001
From: Tyler Goodlet <jgbt@protonmail.com>
Date: Sun, 16 Oct 2022 18:16:58 -0400
Subject: [PATCH 03/12] Add initial readers-writer shm list tests

---
 tests/test_shm.py | 84 +++++++++++++++++++++++++++++++++++++++++++++++
 1 file changed, 84 insertions(+)
 create mode 100644 tests/test_shm.py

diff --git a/tests/test_shm.py b/tests/test_shm.py
new file mode 100644
index 00000000..83ce7e21
--- /dev/null
+++ b/tests/test_shm.py
@@ -0,0 +1,84 @@
+"""
+Shared mem primitives and APIs.
+
+"""
+
+# import numpy
+import pytest
+import trio
+import tractor
+from tractor._shm import (
+    open_shm_list,
+    attach_shm_list,
+)
+
+
+@tractor.context
+async def child_read_shm_list(
+    ctx: tractor.Context,
+    shm_key: str,
+    use_str: bool,
+) -> None:
+
+    shml = attach_shm_list(key=shm_key)
+    await ctx.started(shml.key)
+
+    async with ctx.open_stream() as stream:
+        async for i in stream:
+            print(f'reading shm list index: {i}')
+
+            if use_str:
+                expect = str(float(i))
+            else:
+                expect = float(i)
+
+            assert expect == shml[i]
+
+
+@pytest.mark.parametrize(
+    'use_str', [False, True],
+)
+def test_parent_writer_child_reader(
+    use_str: bool,
+):
+
+    async def main():
+        async with tractor.open_nursery() as an:
+
+            # allocate writeable list in parent
+            key = 'shm_list'
+            shml = open_shm_list(
+                key=key,
+                readonly=False,
+            )
+
+            portal = await an.start_actor(
+                'shm_reader',
+                enable_modules=[__name__],
+            )
+
+            async with (
+                portal.open_context(
+                    child_read_shm_list,  # taken from pytest parameterization
+                    shm_key=key,
+                    use_str=use_str,
+                ) as (ctx, sent),
+
+                ctx.open_stream() as stream,
+            ):
+
+                assert sent == key
+
+                for i in range(2 ** 10):
+
+                    val = float(i)
+                    if use_str:
+                        val = str(val)
+
+                    print(f'writing {val}')
+                    shml[i] = val
+                    await stream.send(i)
+
+            await portal.cancel_actor()
+
+    trio.run(main)
-- 
2.34.1


From c932bb59114b618ef82d56a1fbc977f3eb78b517 Mon Sep 17 00:00:00 2001
From: Tyler Goodlet <jgbt@protonmail.com>
Date: Mon, 17 Oct 2022 15:13:05 -0400
Subject: [PATCH 04/12] Add repetitive attach to existing segment test

---
 tests/test_shm.py | 45 +++++++++++++++++++++++++++++++++++++++++++++
 1 file changed, 45 insertions(+)

diff --git a/tests/test_shm.py b/tests/test_shm.py
index 83ce7e21..850ccb3e 100644
--- a/tests/test_shm.py
+++ b/tests/test_shm.py
@@ -2,6 +2,7 @@
 Shared mem primitives and APIs.
 
 """
+import uuid
 
 # import numpy
 import pytest
@@ -13,6 +14,50 @@ from tractor._shm import (
 )
 
 
+@tractor.context
+async def child_attach_shml_alot(
+    ctx: tractor.Context,
+    shm_key: str,
+) -> None:
+
+    await ctx.started(shm_key)
+
+    # now try to attach a boatload of times in a loop..
+    for _ in range(1000):
+        shml = attach_shm_list(key=shm_key)
+        assert shml.shm.name == shm_key
+        await trio.sleep(0.001)
+
+
+def test_child_attaches_alot():
+    async def main():
+        async with tractor.open_nursery() as an:
+
+            # allocate writeable list in parent
+            key = f'shml_{uuid.uuid4()}'
+            shml = open_shm_list(
+                key=key,
+            )
+
+            portal = await an.start_actor(
+                'shm_attacher',
+                enable_modules=[__name__],
+            )
+
+            async with (
+                portal.open_context(
+                    child_attach_shml_alot,  # taken from pytest parameterization
+                    shm_key=key,
+                ) as (ctx, start_val),
+            ):
+                assert start_val == key
+                await ctx.result()
+
+            await portal.cancel_actor()
+
+    trio.run(main)
+
+
 @tractor.context
 async def child_read_shm_list(
     ctx: tractor.Context,
-- 
2.34.1


From e46033cbe7207be0ea6e941b238d0d151a5022e8 Mon Sep 17 00:00:00 2001
From: Tyler Goodlet <jgbt@protonmail.com>
Date: Mon, 17 Oct 2022 15:13:58 -0400
Subject: [PATCH 05/12] Don't require runtime (for now), type annot fixing

---
 tractor/_shm.py | 44 +++++++++++++++++++++-----------------------
 1 file changed, 21 insertions(+), 23 deletions(-)

diff --git a/tractor/_shm.py b/tractor/_shm.py
index 63d18411..80ca49d1 100644
--- a/tractor/_shm.py
+++ b/tractor/_shm.py
@@ -29,7 +29,7 @@ from typing import Optional
 from multiprocessing.shared_memory import (
     SharedMemory,
     ShareableList,
-    _USE_POSIX,
+    _USE_POSIX,  # type: ignore
 )
 
 if _USE_POSIX:
@@ -46,15 +46,6 @@ from .log import get_logger
 log = get_logger(__name__)
 
 
-# how  much is probably dependent on lifestyle
-_secs_in_day = int(60 * 60 * 24)
-# we try for a buncha times, but only on a run-every-other-day kinda week.
-_days_worth = 16
-_default_size = _days_worth * _secs_in_day
-# where to start the new data append index
-_rt_buffer_start = int((_days_worth - 1) * _secs_in_day)
-
-
 def disable_mantracker():
     '''
     Disable all ``multiprocessing``` "resource tracking" machinery since
@@ -79,7 +70,6 @@ def disable_mantracker():
     mantracker._resource_tracker = ManTracker()
     mantracker.register = mantracker._resource_tracker.register
     mantracker.ensure_running = mantracker._resource_tracker.ensure_running
-    # ensure_running = mantracker._resource_tracker.ensure_running
     mantracker.unregister = mantracker._resource_tracker.unregister
     mantracker.getfd = mantracker._resource_tracker.getfd
 
@@ -134,9 +124,14 @@ class _NpToken(Struct, frozen=True):
     dtype_descr: tuple
     size: int  # in struct-array index / row terms
 
+    # TODO: use nptyping here on dtypes
     @property
-    def dtype(self) -> np.dtype:
-        return np.dtype(list(map(tuple, self.dtype_descr))).descr
+    def dtype(self) -> list[tuple[str, str, tuple[int, ...]]]:
+        return np.dtype(
+            list(
+                map(tuple, self.dtype_descr)
+            )
+        ).descr
 
     def as_msg(self):
         return self.to_dict()
@@ -161,10 +156,10 @@ class _NpToken(Struct, frozen=True):
 # _known_tokens = trio.RunVar('shms', {})
 
 # process-local store of keys to tokens
-_known_tokens = {}
+_known_tokens: dict[str, _NpToken] = {}
 
 
-def get_shm_token(key: str) -> _NpToken | str:
+def get_shm_token(key: str) -> _NpToken | None:
     '''
     Convenience func to check if a token
     for the provided key is known by this process.
@@ -228,11 +223,10 @@ class ShmArray:
         self._post_init: bool = False
 
         # pushing data does not write the index (aka primary key)
+        self._write_fields: list[str] | None = None
         dtype = shmarr.dtype
         if dtype.fields:
             self._write_fields = list(shmarr.dtype.fields.keys())[1:]
-        else:
-            self._write_fields = None
 
     # TODO: ringbuf api?
 
@@ -283,9 +277,9 @@ class ShmArray:
         self,
         fields: Optional[list[str]] = None,
 
-        # type that all field values will be cast to
-        # in the returned view.
-        common_dtype: np.dtype = np.float,
+        # type that all field values will be cast to in the returned
+        # view.
+        common_dtype: np.dtype = np.float64,  # type: ignore
 
     ) -> np.ndarray:
 
@@ -543,7 +537,6 @@ def open_shm_ndarray(
 
     # "unlink" created shm on process teardown by
     # pushing teardown calls onto actor context stack
-
     stack = tractor.current_actor().lifetime_stack
     stack.callback(shmarr.close)
     stack.callback(shmarr.destroy)
@@ -769,14 +762,19 @@ def open_shm_list(
     )
 
     # "close" attached shm on actor teardown
-    tractor.current_actor().lifetime_stack.callback(shml.shm.close)
-    tractor.current_actor().lifetime_stack.callback(shml.shm.unlink)
+    try:
+        actor = tractor.current_actor()
+        actor.lifetime_stack.callback(shml.shm.close)
+        actor.lifetime_stack.callback(shml.shm.unlink)
+    except RuntimeError:
+        log.warning('tractor runtime not active, skipping teardown steps')
 
     return shml
 
 
 def attach_shm_list(
     key: str,
+
 ) -> ShmList:
 
     return ShmList(name=key)
-- 
2.34.1


From afbdb50a30bb71887b598b01abfca0295fb4debe Mon Sep 17 00:00:00 2001
From: Tyler Goodlet <jgbt@protonmail.com>
Date: Mon, 17 Oct 2022 17:21:14 -0400
Subject: [PATCH 06/12] Rename token type to `NDToken` in the style of
 `nptyping`

---
 tractor/_shm.py | 30 +++++++++++++++---------------
 1 file changed, 15 insertions(+), 15 deletions(-)

diff --git a/tractor/_shm.py b/tractor/_shm.py
index 80ca49d1..3f415c52 100644
--- a/tractor/_shm.py
+++ b/tractor/_shm.py
@@ -109,7 +109,7 @@ class SharedInt:
                 log.warning(f'Shm for {name} already unlinked?')
 
 
-class _NpToken(Struct, frozen=True):
+class NDToken(Struct, frozen=True):
     '''
     Internal represenation of a shared memory ``numpy`` array "token"
     which can be used to key and load a system (OS) wide shm entry
@@ -137,18 +137,18 @@ class _NpToken(Struct, frozen=True):
         return self.to_dict()
 
     @classmethod
-    def from_msg(cls, msg: dict) -> _NpToken:
-        if isinstance(msg, _NpToken):
+    def from_msg(cls, msg: dict) -> NDToken:
+        if isinstance(msg, NDToken):
             return msg
 
         # TODO: native struct decoding
         # return _token_dec.decode(msg)
 
         msg['dtype_descr'] = tuple(map(tuple, msg['dtype_descr']))
-        return _NpToken(**msg)
+        return NDToken(**msg)
 
 
-# _token_dec = msgspec.msgpack.Decoder(_NpToken)
+# _token_dec = msgspec.msgpack.Decoder(NDToken)
 
 # TODO: this api?
 # _known_tokens = tractor.ActorVar('_shm_tokens', {})
@@ -156,10 +156,10 @@ class _NpToken(Struct, frozen=True):
 # _known_tokens = trio.RunVar('shms', {})
 
 # process-local store of keys to tokens
-_known_tokens: dict[str, _NpToken] = {}
+_known_tokens: dict[str, NDToken] = {}
 
 
-def get_shm_token(key: str) -> _NpToken | None:
+def get_shm_token(key: str) -> NDToken | None:
     '''
     Convenience func to check if a token
     for the provided key is known by this process.
@@ -175,13 +175,13 @@ def _make_token(
     size: int,
     dtype: np.dtype,
 
-) -> _NpToken:
+) -> NDToken:
     '''
     Create a serializable token that can be used
     to access a shared array.
 
     '''
-    return _NpToken(
+    return NDToken(
         shm_name=key,
         shm_first_index_name=key + "_first",
         shm_last_index_name=key + "_last",
@@ -231,8 +231,8 @@ class ShmArray:
     # TODO: ringbuf api?
 
     @property
-    def _token(self) -> _NpToken:
-        return _NpToken(
+    def _token(self) -> NDToken:
+        return NDToken(
             shm_name=self._shm.name,
             shm_first_index_name=self._first._shm.name,
             shm_last_index_name=self._last._shm.name,
@@ -557,11 +557,11 @@ def attach_shm_ndarray(
     access are constructed.
 
     '''
-    token = _NpToken.from_msg(token)
+    token = NDToken.from_msg(token)
     key = token.shm_name
 
     if key in _known_tokens:
-        assert _NpToken.from_msg(_known_tokens[key]) == token, "WTF"
+        assert NDToken.from_msg(_known_tokens[key]) == token, "WTF"
 
     # XXX: ugh, looks like due to the ``shm_open()`` C api we can't
     # actually place files in a subdir, see discussion here:
@@ -649,10 +649,10 @@ def maybe_open_shm_ndarray(
     running in **this** process. Systems where multiple actors may seek
     to access a common block can use this function to attempt to acquire
     a token as discovered by the actors who have previously stored
-    a "key" -> ``_NpToken`` map in an actor local (aka python global)
+    a "key" -> ``NDToken`` map in an actor local (aka python global)
     variable.
 
-    If you know the explicit ``_NpToken`` for your memory segment instead
+    If you know the explicit ``NDToken`` for your memory segment instead
     use ``attach_shm_array``.
 
     '''
-- 
2.34.1


From 1c441b0986bc5e8a152701f1897d731bdf24120f Mon Sep 17 00:00:00 2001
From: Tyler Goodlet <jgbt@protonmail.com>
Date: Tue, 18 Oct 2022 11:01:02 -0400
Subject: [PATCH 07/12] Add `ShmList` slice support in `.__getitem__()`

---
 tractor/_shm.py | 29 ++++++++++++++++++++++++-----
 1 file changed, 24 insertions(+), 5 deletions(-)

diff --git a/tractor/_shm.py b/tractor/_shm.py
index 3f415c52..c26c9911 100644
--- a/tractor/_shm.py
+++ b/tractor/_shm.py
@@ -26,20 +26,26 @@ from __future__ import annotations
 from sys import byteorder
 import time
 from typing import Optional
+from multiprocessing import shared_memory as shm
 from multiprocessing.shared_memory import (
     SharedMemory,
     ShareableList,
-    _USE_POSIX,  # type: ignore
+    # _USE_POSIX,  # type: ignore
 )
 
-if _USE_POSIX:
+if getattr(shm, '_USE_POSIX', False):
     from _posixshmem import shm_unlink
 
 from msgspec import Struct
-import numpy as np
-from numpy.lib import recfunctions as rfn
 import tractor
 
+try:
+    import numpy as np
+    from numpy.lib import recfunctions as rfn
+    import nptyping
+except ImportError:
+    pass
+
 from .log import get_logger
 
 
@@ -742,6 +748,15 @@ class ShmList(ShareableList):
 
         return super().__setitem__(position, value)
 
+    def __getitem__(
+        self,
+        indexish,
+    ) -> list:
+        if isinstance(indexish, slice):
+            return list(self)[indexish]
+
+        return super().__getitem__(indexish)
+
 
 def open_shm_list(
     key: str,
@@ -774,7 +789,11 @@ def open_shm_list(
 
 def attach_shm_list(
     key: str,
+    readonly: bool = False,
 
 ) -> ShmList:
 
-    return ShmList(name=key)
+    return ShmList(
+        name=key,
+        readonly=readonly,
+    )
-- 
2.34.1


From 9a0d529b180065830c8d1950df388d22e9408787 Mon Sep 17 00:00:00 2001
From: Tyler Goodlet <jgbt@protonmail.com>
Date: Tue, 18 Oct 2022 11:01:30 -0400
Subject: [PATCH 08/12] Parametrize rw test with variable frame sizes

Demonstrates fixed size frame-oriented reads by the child where the
parent only transmits a "read" stream msg on "frame fill events" such
that the child incrementally reads the shm list data (much like in
a real-time-buffered streaming system).
---
 tests/test_shm.py | 64 +++++++++++++++++++++++++++++++++++------------
 1 file changed, 48 insertions(+), 16 deletions(-)

diff --git a/tests/test_shm.py b/tests/test_shm.py
index 850ccb3e..c183040c 100644
--- a/tests/test_shm.py
+++ b/tests/test_shm.py
@@ -24,7 +24,10 @@ async def child_attach_shml_alot(
 
     # now try to attach a boatload of times in a loop..
     for _ in range(1000):
-        shml = attach_shm_list(key=shm_key)
+        shml = attach_shm_list(
+            key=shm_key,
+            readonly=False,
+        )
         assert shml.shm.name == shm_key
         await trio.sleep(0.001)
 
@@ -46,8 +49,8 @@ def test_child_attaches_alot():
 
             async with (
                 portal.open_context(
-                    child_attach_shml_alot,  # taken from pytest parameterization
-                    shm_key=key,
+                    child_attach_shml_alot,
+                    shm_key=shml.key,
                 ) as (ctx, start_val),
             ):
                 assert start_val == key
@@ -63,50 +66,70 @@ async def child_read_shm_list(
     ctx: tractor.Context,
     shm_key: str,
     use_str: bool,
+    frame_size: int,
 ) -> None:
 
+    # attach in child
     shml = attach_shm_list(key=shm_key)
     await ctx.started(shml.key)
 
     async with ctx.open_stream() as stream:
         async for i in stream:
-            print(f'reading shm list index: {i}')
+            print(f'(child): reading shm list index: {i}')
 
             if use_str:
                 expect = str(float(i))
             else:
                 expect = float(i)
 
-            assert expect == shml[i]
+            if frame_size == 1:
+                val = shml[i]
+                assert expect == val
+                print(f'(child): reading value: {val}')
+            else:
+                frame = shml[i - frame_size:i]
+                print(f'(child): reading frame: {frame}')
 
 
 @pytest.mark.parametrize(
     'use_str', [False, True],
 )
+@pytest.mark.parametrize(
+    'frame_size',
+    [1, 2**6, 2**10],
+    ids=lambda i: f'frame_size={i}',
+)
 def test_parent_writer_child_reader(
     use_str: bool,
+    frame_size: int,
 ):
 
     async def main():
-        async with tractor.open_nursery() as an:
-
-            # allocate writeable list in parent
-            key = 'shm_list'
-            shml = open_shm_list(
-                key=key,
-                readonly=False,
-            )
+        async with tractor.open_nursery(
+            debug_mode=True,
+        ) as an:
 
             portal = await an.start_actor(
                 'shm_reader',
                 enable_modules=[__name__],
+                debug_mode=True,
+            )
+
+            # allocate writeable list in parent
+            key = 'shm_list'
+            seq_size = int(2 * 2 ** 10)
+            shml = open_shm_list(
+                key=key,
+                size=seq_size,
+                readonly=False,
             )
 
             async with (
                 portal.open_context(
-                    child_read_shm_list,  # taken from pytest parameterization
+                    child_read_shm_list,
                     shm_key=key,
                     use_str=use_str,
+                    frame_size=frame_size,
                 ) as (ctx, sent),
 
                 ctx.open_stream() as stream,
@@ -114,14 +137,23 @@ def test_parent_writer_child_reader(
 
                 assert sent == key
 
-                for i in range(2 ** 10):
+                for i in range(seq_size):
 
                     val = float(i)
                     if use_str:
                         val = str(val)
 
-                    print(f'writing {val}')
+                    print(f'(parent): writing {val}')
                     shml[i] = val
+
+                    # only on frame fills do we
+                    # signal to the child that a frame's
+                    # worth is ready.
+                    if (i % frame_size) == 0:
+                        print(f'(parent): signalling frame full on {val}')
+                        await stream.send(i)
+                else:
+                    print(f'(parent): signalling final frame on {val}')
                     await stream.send(i)
 
             await portal.cancel_actor()
-- 
2.34.1


From 255209f881d68ddcdd1cf323b230c98da948e9f2 Mon Sep 17 00:00:00 2001
From: Tyler Goodlet <jgbt@protonmail.com>
Date: Tue, 18 Oct 2022 16:28:57 -0400
Subject: [PATCH 09/12] Mod define `_USE_POSIX`, add a of of todos

---
 tractor/_shm.py | 35 +++++++++++++++++++++++++++--------
 1 file changed, 27 insertions(+), 8 deletions(-)

diff --git a/tractor/_shm.py b/tractor/_shm.py
index c26c9911..79ac8969 100644
--- a/tractor/_shm.py
+++ b/tractor/_shm.py
@@ -30,15 +30,19 @@ from multiprocessing import shared_memory as shm
 from multiprocessing.shared_memory import (
     SharedMemory,
     ShareableList,
-    # _USE_POSIX,  # type: ignore
 )
 
-if getattr(shm, '_USE_POSIX', False):
-    from _posixshmem import shm_unlink
-
 from msgspec import Struct
 import tractor
 
+from .log import get_logger
+
+
+_USE_POSIX = getattr(shm, '_USE_POSIX', False)
+if _USE_POSIX:
+    from _posixshmem import shm_unlink
+
+
 try:
     import numpy as np
     from numpy.lib import recfunctions as rfn
@@ -46,8 +50,6 @@ try:
 except ImportError:
     pass
 
-from .log import get_logger
-
 
 log = get_logger(__name__)
 
@@ -161,6 +163,8 @@ class NDToken(Struct, frozen=True):
 # _known_tokens = tractor.ContextStack('_known_tokens', )
 # _known_tokens = trio.RunVar('shms', {})
 
+# TODO: this should maybe be provided via
+# a `.trionics.maybe_open_context()` wrapper factory?
 # process-local store of keys to tokens
 _known_tokens: dict[str, NDToken] = {}
 
@@ -712,8 +716,12 @@ def maybe_open_shm_ndarray(
 
 class ShmList(ShareableList):
     '''
-    Carbon copy of ``.shared_memory.ShareableList`` but add a
-    readonly state instance var.
+    Carbon copy of ``.shared_memory.ShareableList`` with a few
+    enhancements:
+
+    - readonly mode via instance var flag
+    - ``.__getitem__()`` accepts ``slice`` inputs
+    - exposes the underlying buffer "name" as a ``.key: str``
 
     '''
     def __init__(
@@ -752,11 +760,22 @@ class ShmList(ShareableList):
         self,
         indexish,
     ) -> list:
+
+        # NOTE: this is a non-writeable view (copy?) of the buffer
+        # in a new list instance.
         if isinstance(indexish, slice):
             return list(self)[indexish]
 
         return super().__getitem__(indexish)
 
+    # TODO: should we offer a `.array` and `.push()` equivalent
+    # to the `ShmArray`?
+    # currently we have the following limitations:
+    # - can't write slices of input using traditional slice-assign
+    #   syntax due to the ``ShareableList.__setitem__()`` implementation.
+    # - ``list(shmlist)`` returns a non-mutable copy instead of
+    #   a writeable view which would be handier numpy-style ops.
+
 
 def open_shm_list(
     key: str,
-- 
2.34.1


From 2683a7f33ad747346af90bc9c80ee450f8aa1928 Mon Sep 17 00:00:00 2001
From: Tyler Goodlet <jgbt@protonmail.com>
Date: Wed, 19 Oct 2022 14:20:50 -0400
Subject: [PATCH 10/12] Allocate size-specced "empty" sequence from default
 values by type

---
 tractor/_shm.py | 18 +++++++++++++++---
 1 file changed, 15 insertions(+), 3 deletions(-)

diff --git a/tractor/_shm.py b/tractor/_shm.py
index 79ac8969..2ce148da 100644
--- a/tractor/_shm.py
+++ b/tractor/_shm.py
@@ -719,7 +719,7 @@ class ShmList(ShareableList):
     Carbon copy of ``.shared_memory.ShareableList`` with a few
     enhancements:
 
-    - readonly mode via instance var flag
+    - readonly mode via instance var flag  `._readonly: bool`
     - ``.__getitem__()`` accepts ``slice`` inputs
     - exposes the underlying buffer "name" as a ``.key: str``
 
@@ -743,6 +743,10 @@ class ShmList(ShareableList):
     def key(self) -> str:
         return self._key
 
+    @property
+    def readonly(self) -> bool:
+        return self._readonly
+
     def __setitem__(
         self,
         position,
@@ -781,13 +785,21 @@ def open_shm_list(
     key: str,
     sequence: list | None = None,
     size: int = int(2 ** 10),
-    dtype: np.dtype | None = None,
+    dtype: float | int | bool | str | bytes | None = float,
     readonly: bool = True,
 
 ) -> ShmList:
 
     if sequence is None:
-        sequence = list(map(float, range(size)))
+        default = {
+            float: 0.,
+            int: 0,
+            bool: True,
+            str: 'doggy',
+            None: None,
+        }[dtype]
+        sequence = [default] * size
+        # sequence = [0.] * size
 
     shml = ShmList(
         sequence=sequence,
-- 
2.34.1


From 8ebb1f09de99db523d6c7c65eace18a6054cdc72 Mon Sep 17 00:00:00 2001
From: Tyler Goodlet <jgbt@protonmail.com>
Date: Thu, 20 Oct 2022 16:08:28 -0400
Subject: [PATCH 11/12] Pass `str` dtype for `use_str` case

---
 tests/test_shm.py | 14 ++++++++++----
 tractor/_shm.py   |  2 --
 2 files changed, 10 insertions(+), 6 deletions(-)

diff --git a/tests/test_shm.py b/tests/test_shm.py
index c183040c..2b7a382f 100644
--- a/tests/test_shm.py
+++ b/tests/test_shm.py
@@ -70,7 +70,10 @@ async def child_read_shm_list(
 ) -> None:
 
     # attach in child
-    shml = attach_shm_list(key=shm_key)
+    shml = attach_shm_list(
+        key=shm_key,
+        # dtype=str if use_str else float,
+    )
     await ctx.started(shml.key)
 
     async with ctx.open_stream() as stream:
@@ -92,7 +95,9 @@ async def child_read_shm_list(
 
 
 @pytest.mark.parametrize(
-    'use_str', [False, True],
+    'use_str',
+    [False, True],
+    ids=lambda i: f'use_str_values={i}',
 )
 @pytest.mark.parametrize(
     'frame_size',
@@ -106,7 +111,7 @@ def test_parent_writer_child_reader(
 
     async def main():
         async with tractor.open_nursery(
-            debug_mode=True,
+            # debug_mode=True,
         ) as an:
 
             portal = await an.start_actor(
@@ -121,6 +126,7 @@ def test_parent_writer_child_reader(
             shml = open_shm_list(
                 key=key,
                 size=seq_size,
+                dtype=str if use_str else float,
                 readonly=False,
             )
 
@@ -143,7 +149,7 @@ def test_parent_writer_child_reader(
                     if use_str:
                         val = str(val)
 
-                    print(f'(parent): writing {val}')
+                    # print(f'(parent): writing {val}')
                     shml[i] = val
 
                     # only on frame fills do we
diff --git a/tractor/_shm.py b/tractor/_shm.py
index 2ce148da..c4c17335 100644
--- a/tractor/_shm.py
+++ b/tractor/_shm.py
@@ -460,7 +460,6 @@ class ShmArray:
 
 
 def open_shm_ndarray(
-
     key: Optional[str] = None,
     size: int = int(2 ** 10),
     dtype: np.dtype | None = None,
@@ -799,7 +798,6 @@ def open_shm_list(
             None: None,
         }[dtype]
         sequence = [default] * size
-        # sequence = [0.] * size
 
     shml = ShmList(
         sequence=sequence,
-- 
2.34.1


From 5cee222353640b63dc0fbd1dc82de8946842c607 Mon Sep 17 00:00:00 2001
From: Tyler Goodlet <jgbt@protonmail.com>
Date: Thu, 22 Jun 2023 17:16:17 -0400
Subject: [PATCH 12/12] Updates from latest `piker.data._sharedmem` changes

---
 tractor/_shm.py | 33 +++++++++++++++++++--------------
 1 file changed, 19 insertions(+), 14 deletions(-)

diff --git a/tractor/_shm.py b/tractor/_shm.py
index c4c17335..f8295105 100644
--- a/tractor/_shm.py
+++ b/tractor/_shm.py
@@ -287,9 +287,9 @@ class ShmArray:
         self,
         fields: Optional[list[str]] = None,
 
-        # type that all field values will be cast to in the returned
-        # view.
-        common_dtype: np.dtype = np.float64,  # type: ignore
+        # type that all field values will be cast to
+        # in the returned view.
+        common_dtype: np.dtype = float,
 
     ) -> np.ndarray:
 
@@ -344,7 +344,7 @@ class ShmArray:
         field_map: Optional[dict[str, str]] = None,
         prepend: bool = False,
         update_first: bool = True,
-        start: Optional[int] = None,
+        start: int | None = None,
 
     ) -> int:
         '''
@@ -386,7 +386,11 @@ class ShmArray:
             # tries to access ``.array`` (which due to the index
             # overlap will be empty). Pretty sure we've fixed it now
             # but leaving this here as a reminder.
-            if prepend and update_first and length:
+            if (
+                prepend
+                and update_first
+                and length
+            ):
                 assert index < self._first.value
 
             if (
@@ -460,10 +464,10 @@ class ShmArray:
 
 
 def open_shm_ndarray(
-    key: Optional[str] = None,
-    size: int = int(2 ** 10),
+    size: int,
+    key: str | None = None,
     dtype: np.dtype | None = None,
-    append_start_index: int = 0,
+    append_start_index: int | None = None,
     readonly: bool = False,
 
 ) -> ShmArray:
@@ -529,9 +533,12 @@ def open_shm_ndarray(
     # ``ShmArray._start.value: int = 0`` and the yet-to-be written
     # real-time section will start at ``ShmArray.index: int``.
 
-    # this sets the index to 3/4 of the length of the buffer
-    # leaving a "days worth of second samples" for the real-time
-    # section.
+    # this sets the index to nearly 2/3rds into the the length of
+    # the buffer leaving at least a "days worth of second samples"
+    # for the real-time section.
+    if append_start_index is None:
+        append_start_index = round(size * 0.616)
+
     last.value = first.value = append_start_index
 
     shmarr = ShmArray(
@@ -640,9 +647,7 @@ def attach_shm_ndarray(
 
 def maybe_open_shm_ndarray(
     key: str,  # unique identifier for segment
-
-    # from ``open_shm_array()``
-    size: int = int(2 ** 10),  # array length in index terms
+    size: int,
     dtype: np.dtype | None = None,
     append_start_index: int = 0,
     readonly: bool = True,
-- 
2.34.1