From ef7ca49e9b90d14d32bf1ab8b599f05d6f4411b4 Mon Sep 17 00:00:00 2001 From: Tyler Goodlet <jgbt@protonmail.com> Date: Sat, 15 Oct 2022 16:35:32 -0400 Subject: [PATCH 01/12] Initial module import from `piker.data._sharemem` More or less a verbatim copy-paste minus some edgy variable naming and internal `piker` module imports. There is a bunch of OHLC related defaults that need to be dropped and we need to adjust to an optional dependence on `numpy` by supporting shared lists as per the mp docs. --- tractor/_shm.py | 706 ++++++++++++++++++++++++++++++++++++++++++++++++ 1 file changed, 706 insertions(+) create mode 100644 tractor/_shm.py diff --git a/tractor/_shm.py b/tractor/_shm.py new file mode 100644 index 00000000..dca9d5a5 --- /dev/null +++ b/tractor/_shm.py @@ -0,0 +1,706 @@ +# tractor: structured concurrent "actors". +# Copyright 2018-eternity Tyler Goodlet. + +# This program is free software: you can redistribute it and/or modify +# it under the terms of the GNU Affero General Public License as published by +# the Free Software Foundation, either version 3 of the License, or +# (at your option) any later version. + +# This program is distributed in the hope that it will be useful, +# but WITHOUT ANY WARRANTY; without even the implied warranty of +# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the +# GNU Affero General Public License for more details. + +# You should have received a copy of the GNU Affero General Public License +# along with this program. If not, see <https://www.gnu.org/licenses/>. + +""" +SC friendly shared memory management geared at real-time +processing. + +Support for ``numpy`` compatible array-buffers is provided but is +considered optional within the context of this runtime-library. + +""" +from __future__ import annotations +from sys import byteorder +import time +from typing import Optional +from multiprocessing.shared_memory import ( + SharedMemory, + _USE_POSIX, +) + +if _USE_POSIX: + from _posixshmem import shm_unlink + +from msgspec import Struct +import numpy as np +from numpy.lib import recfunctions as rfn +import tractor + +from .log import get_logger + + +log = get_logger(__name__) + + +# how much is probably dependent on lifestyle +_secs_in_day = int(60 * 60 * 24) +# we try for a buncha times, but only on a run-every-other-day kinda week. +_days_worth = 16 +_default_size = _days_worth * _secs_in_day +# where to start the new data append index +_rt_buffer_start = int((_days_worth - 1) * _secs_in_day) + + +def disable_mantracker(): + ''' + Disable all ``multiprocessing``` "resource tracking" machinery since + it's an absolute multi-threaded mess of non-SC madness. + + ''' + from multiprocessing import resource_tracker as mantracker + + # Tell the "resource tracker" thing to fuck off. + class ManTracker(mantracker.ResourceTracker): + def register(self, name, rtype): + pass + + def unregister(self, name, rtype): + pass + + def ensure_running(self): + pass + + # "know your land and know your prey" + # https://www.dailymotion.com/video/x6ozzco + mantracker._resource_tracker = ManTracker() + mantracker.register = mantracker._resource_tracker.register + mantracker.ensure_running = mantracker._resource_tracker.ensure_running + # ensure_running = mantracker._resource_tracker.ensure_running + mantracker.unregister = mantracker._resource_tracker.unregister + mantracker.getfd = mantracker._resource_tracker.getfd + + +disable_mantracker() + + +class SharedInt: + """Wrapper around a single entry shared memory array which + holds an ``int`` value used as an index counter. + + """ + def __init__( + self, + shm: SharedMemory, + ) -> None: + self._shm = shm + + @property + def value(self) -> int: + return int.from_bytes(self._shm.buf, byteorder) + + @value.setter + def value(self, value) -> None: + self._shm.buf[:] = value.to_bytes(self._shm.size, byteorder) + + def destroy(self) -> None: + if _USE_POSIX: + # We manually unlink to bypass all the "resource tracker" + # nonsense meant for non-SC systems. + name = self._shm.name + try: + shm_unlink(name) + except FileNotFoundError: + # might be a teardown race here? + log.warning(f'Shm for {name} already unlinked?') + + +class _Token(Struct, frozen=True): + ''' + Internal represenation of a shared memory "token" + which can be used to key a system wide post shm entry. + + ''' + shm_name: str # this servers as a "key" value + shm_first_index_name: str + shm_last_index_name: str + dtype_descr: tuple + size: int # in struct-array index / row terms + + @property + def dtype(self) -> np.dtype: + return np.dtype(list(map(tuple, self.dtype_descr))).descr + + def as_msg(self): + return self.to_dict() + + @classmethod + def from_msg(cls, msg: dict) -> _Token: + if isinstance(msg, _Token): + return msg + + # TODO: native struct decoding + # return _token_dec.decode(msg) + + msg['dtype_descr'] = tuple(map(tuple, msg['dtype_descr'])) + return _Token(**msg) + + +# _token_dec = msgspec.msgpack.Decoder(_Token) + +# TODO: this api? +# _known_tokens = tractor.ActorVar('_shm_tokens', {}) +# _known_tokens = tractor.ContextStack('_known_tokens', ) +# _known_tokens = trio.RunVar('shms', {}) + +# process-local store of keys to tokens +_known_tokens = {} + + +def get_shm_token(key: str) -> _Token: + """Convenience func to check if a token + for the provided key is known by this process. + """ + return _known_tokens.get(key) + + +def _make_token( + key: str, + size: int, + dtype: np.dtype, + +) -> _Token: + ''' + Create a serializable token that can be used + to access a shared array. + + ''' + return _Token( + shm_name=key, + shm_first_index_name=key + "_first", + shm_last_index_name=key + "_last", + dtype_descr=tuple(np.dtype(dtype).descr), + size=size, + ) + + +class ShmArray: + ''' + A shared memory ``numpy`` (compatible) array API. + + An underlying shared memory buffer is allocated based on + a user specified ``numpy.ndarray``. This fixed size array + can be read and written to by pushing data both onto the "front" + or "back" of a set index range. The indexes for the "first" and + "last" index are themselves stored in shared memory (accessed via + ``SharedInt`` interfaces) values such that multiple processes can + interact with the same array using a synchronized-index. + + ''' + def __init__( + self, + shmarr: np.ndarray, + first: SharedInt, + last: SharedInt, + shm: SharedMemory, + # readonly: bool = True, + ) -> None: + self._array = shmarr + + # indexes for first and last indices corresponding + # to fille data + self._first = first + self._last = last + + self._len = len(shmarr) + self._shm = shm + self._post_init: bool = False + + # pushing data does not write the index (aka primary key) + dtype = shmarr.dtype + if dtype.fields: + self._write_fields = list(shmarr.dtype.fields.keys())[1:] + else: + self._write_fields = None + + # TODO: ringbuf api? + + @property + def _token(self) -> _Token: + return _Token( + shm_name=self._shm.name, + shm_first_index_name=self._first._shm.name, + shm_last_index_name=self._last._shm.name, + dtype_descr=tuple(self._array.dtype.descr), + size=self._len, + ) + + @property + def token(self) -> dict: + """Shared memory token that can be serialized and used by + another process to attach to this array. + """ + return self._token.as_msg() + + @property + def index(self) -> int: + return self._last.value % self._len + + @property + def array(self) -> np.ndarray: + ''' + Return an up-to-date ``np.ndarray`` view of the + so-far-written data to the underlying shm buffer. + + ''' + a = self._array[self._first.value:self._last.value] + + # first, last = self._first.value, self._last.value + # a = self._array[first:last] + + # TODO: eventually comment this once we've not seen it in the + # wild in a long time.. + # XXX: race where first/last indexes cause a reader + # to load an empty array.. + if len(a) == 0 and self._post_init: + raise RuntimeError('Empty array race condition hit!?') + # breakpoint() + + return a + + def ustruct( + self, + fields: Optional[list[str]] = None, + + # type that all field values will be cast to + # in the returned view. + common_dtype: np.dtype = np.float, + + ) -> np.ndarray: + + array = self._array + + if fields: + selection = array[fields] + # fcount = len(fields) + else: + selection = array + # fcount = len(array.dtype.fields) + + # XXX: manual ``.view()`` attempt that also doesn't work. + # uview = selection.view( + # dtype='<f16', + # ).reshape(-1, 4, order='A') + + # assert len(selection) == len(uview) + + u = rfn.structured_to_unstructured( + selection, + # dtype=float, + copy=True, + ) + + # unstruct = np.ndarray(u.shape, dtype=a.dtype, buffer=shm.buf) + # array[:] = a[:] + return u + # return ShmArray( + # shmarr=u, + # first=self._first, + # last=self._last, + # shm=self._shm + # ) + + def last( + self, + length: int = 1, + + ) -> np.ndarray: + ''' + Return the last ``length``'s worth of ("row") entries from the + array. + + ''' + return self.array[-length:] + + def push( + self, + data: np.ndarray, + + field_map: Optional[dict[str, str]] = None, + prepend: bool = False, + update_first: bool = True, + start: Optional[int] = None, + + ) -> int: + ''' + Ring buffer like "push" to append data + into the buffer and return updated "last" index. + + NB: no actual ring logic yet to give a "loop around" on overflow + condition, lel. + + ''' + length = len(data) + + if prepend: + index = (start or self._first.value) - length + + if index < 0: + raise ValueError( + f'Array size of {self._len} was overrun during prepend.\n' + f'You have passed {abs(index)} too many datums.' + ) + + else: + index = start if start is not None else self._last.value + + end = index + length + + if field_map: + src_names, dst_names = zip(*field_map.items()) + else: + dst_names = src_names = self._write_fields + + try: + self._array[ + list(dst_names) + ][index:end] = data[list(src_names)][:] + + # NOTE: there was a race here between updating + # the first and last indices and when the next reader + # tries to access ``.array`` (which due to the index + # overlap will be empty). Pretty sure we've fixed it now + # but leaving this here as a reminder. + if prepend and update_first and length: + assert index < self._first.value + + if ( + index < self._first.value + and update_first + ): + assert prepend, 'prepend=True not passed but index decreased?' + self._first.value = index + + elif not prepend: + self._last.value = end + + self._post_init = True + return end + + except ValueError as err: + if field_map: + raise + + # should raise if diff detected + self.diff_err_fields(data) + raise err + + def diff_err_fields( + self, + data: np.ndarray, + ) -> None: + # reraise with any field discrepancy + our_fields, their_fields = ( + set(self._array.dtype.fields), + set(data.dtype.fields), + ) + + only_in_ours = our_fields - their_fields + only_in_theirs = their_fields - our_fields + + if only_in_ours: + raise TypeError( + f"Input array is missing field(s): {only_in_ours}" + ) + elif only_in_theirs: + raise TypeError( + f"Input array has unknown field(s): {only_in_theirs}" + ) + + # TODO: support "silent" prepends that don't update ._first.value? + def prepend( + self, + data: np.ndarray, + ) -> int: + end = self.push(data, prepend=True) + assert end + + def close(self) -> None: + self._first._shm.close() + self._last._shm.close() + self._shm.close() + + def destroy(self) -> None: + if _USE_POSIX: + # We manually unlink to bypass all the "resource tracker" + # nonsense meant for non-SC systems. + shm_unlink(self._shm.name) + + self._first.destroy() + self._last.destroy() + + def flush(self) -> None: + # TODO: flush to storage backend like markestore? + ... + + +def open_shm_array( + + key: Optional[str] = None, + size: int = _default_size, # see above + dtype: Optional[np.dtype] = None, + readonly: bool = False, + +) -> ShmArray: + '''Open a memory shared ``numpy`` using the standard library. + + This call unlinks (aka permanently destroys) the buffer on teardown + and thus should be used from the parent-most accessor (process). + + ''' + # create new shared mem segment for which we + # have write permission + a = np.zeros(size, dtype=dtype) + a['index'] = np.arange(len(a)) + + shm = SharedMemory( + name=key, + create=True, + size=a.nbytes + ) + array = np.ndarray( + a.shape, + dtype=a.dtype, + buffer=shm.buf + ) + array[:] = a[:] + array.setflags(write=int(not readonly)) + + token = _make_token( + key=key, + size=size, + dtype=dtype, + ) + + # create single entry arrays for storing an first and last indices + first = SharedInt( + shm=SharedMemory( + name=token.shm_first_index_name, + create=True, + size=4, # std int + ) + ) + + last = SharedInt( + shm=SharedMemory( + name=token.shm_last_index_name, + create=True, + size=4, # std int + ) + ) + + # start the "real-time" updated section after 3-days worth of 1s + # sampled OHLC. this allows appending up to a days worth from + # tick/quote feeds before having to flush to a (tsdb) storage + # backend, and looks something like, + # ------------------------- + # | | i + # _________________________ + # <-------------> <-------> + # history real-time + # + # Once fully "prepended", the history section will leave the + # ``ShmArray._start.value: int = 0`` and the yet-to-be written + # real-time section will start at ``ShmArray.index: int``. + + # this sets the index to 3/4 of the length of the buffer + # leaving a "days worth of second samples" for the real-time + # section. + last.value = first.value = _rt_buffer_start + + shmarr = ShmArray( + array, + first, + last, + shm, + ) + + assert shmarr._token == token + _known_tokens[key] = shmarr.token + + # "unlink" created shm on process teardown by + # pushing teardown calls onto actor context stack + + stack = tractor.current_actor().lifetime_stack + stack.callback(shmarr.close) + stack.callback(shmarr.destroy) + + return shmarr + + +def attach_shm_array( + token: tuple[str, str, tuple[str, str]], + readonly: bool = True, + +) -> ShmArray: + ''' + Attach to an existing shared memory array previously + created by another process using ``open_shared_array``. + + No new shared mem is allocated but wrapper types for read/write + access are constructed. + + ''' + token = _Token.from_msg(token) + key = token.shm_name + + if key in _known_tokens: + assert _Token.from_msg(_known_tokens[key]) == token, "WTF" + + # XXX: ugh, looks like due to the ``shm_open()`` C api we can't + # actually place files in a subdir, see discussion here: + # https://stackoverflow.com/a/11103289 + + # attach to array buffer and view as per dtype + _err: Optional[Exception] = None + for _ in range(3): + try: + shm = SharedMemory( + name=key, + create=False, + ) + break + except OSError as oserr: + _err = oserr + time.sleep(0.1) + else: + if _err: + raise _err + + shmarr = np.ndarray( + (token.size,), + dtype=token.dtype, + buffer=shm.buf + ) + shmarr.setflags(write=int(not readonly)) + + first = SharedInt( + shm=SharedMemory( + name=token.shm_first_index_name, + create=False, + size=4, # std int + ), + ) + last = SharedInt( + shm=SharedMemory( + name=token.shm_last_index_name, + create=False, + size=4, # std int + ), + ) + + # make sure we can read + first.value + + sha = ShmArray( + shmarr, + first, + last, + shm, + ) + # read test + sha.array + + # Stash key -> token knowledge for future queries + # via `maybe_opepn_shm_array()` but only after we know + # we can attach. + if key not in _known_tokens: + _known_tokens[key] = token + + # "close" attached shm on actor teardown + tractor.current_actor().lifetime_stack.callback(sha.close) + + return sha + + +def maybe_open_shm_array( + key: str, + dtype: Optional[np.dtype] = None, + **kwargs, + +) -> tuple[ShmArray, bool]: + ''' + Attempt to attach to a shared memory block using a "key" lookup + to registered blocks in the users overall "system" registry + (presumes you don't have the block's explicit token). + + This function is meant to solve the problem of discovering whether + a shared array token has been allocated or discovered by the actor + running in **this** process. Systems where multiple actors may seek + to access a common block can use this function to attempt to acquire + a token as discovered by the actors who have previously stored + a "key" -> ``_Token`` map in an actor local (aka python global) + variable. + + If you know the explicit ``_Token`` for your memory segment instead + use ``attach_shm_array``. + + ''' + size = kwargs.pop('size', _default_size) + try: + # see if we already know this key + token = _known_tokens[key] + return attach_shm_array(token=token, **kwargs), False + except KeyError: + log.warning(f"Could not find {key} in shms cache") + if dtype: + token = _make_token( + key, + size=size, + dtype=dtype, + ) + try: + return attach_shm_array(token=token, **kwargs), False + except FileNotFoundError: + log.warning(f"Could not attach to shm with token {token}") + + # This actor does not know about memory + # associated with the provided "key". + # Attempt to open a block and expect + # to fail if a block has been allocated + # on the OS by someone else. + return open_shm_array(key=key, dtype=dtype, **kwargs), True + + +def try_read( + array: np.ndarray + +) -> Optional[np.ndarray]: + ''' + Try to read the last row from a shared mem array or ``None`` + if the array read returns a zero-length array result. + + Can be used to check for backfilling race conditions where an array + is currently being (re-)written by a writer actor but the reader is + unaware and reads during the window where the first and last indexes + are being updated. + + ''' + try: + return array[-1] + except IndexError: + # XXX: race condition with backfilling shm. + # + # the underlying issue is that a backfill (aka prepend) and subsequent + # shm array first/last index update could result in an empty array + # read here since the indices may be updated in such a way that + # a read delivers an empty array (though it seems like we + # *should* be able to prevent that?). also, as and alt and + # something we need anyway, maybe there should be some kind of + # signal that a prepend is taking place and this consumer can + # respond (eg. redrawing graphics) accordingly. + + # the array read was emtpy + return None -- 2.34.1 From 7ae194baed4c454e2173e44f5ab7405464552d70 Mon Sep 17 00:00:00 2001 From: Tyler Goodlet <jgbt@protonmail.com> Date: Sun, 16 Oct 2022 18:06:07 -0400 Subject: [PATCH 02/12] Add `ShmList` wrapping the stdlib's `ShareableList` First attempt at getting `multiprocessing.shared_memory.ShareableList` working; we wrap the stdlib type with a readonly attr and a `.key` for cross-actor lookup. Also, rename all `numpy` specific routines to have a `ndarray` suffix in the func names. --- tractor/_shm.py | 206 +++++++++++++++++++++++++++++++++--------------- 1 file changed, 141 insertions(+), 65 deletions(-) diff --git a/tractor/_shm.py b/tractor/_shm.py index dca9d5a5..63d18411 100644 --- a/tractor/_shm.py +++ b/tractor/_shm.py @@ -28,6 +28,7 @@ import time from typing import Optional from multiprocessing.shared_memory import ( SharedMemory, + ShareableList, _USE_POSIX, ) @@ -87,10 +88,11 @@ disable_mantracker() class SharedInt: - """Wrapper around a single entry shared memory array which + ''' + Wrapper around a single entry shared memory array which holds an ``int`` value used as an index counter. - """ + ''' def __init__( self, shm: SharedMemory, @@ -117,10 +119,13 @@ class SharedInt: log.warning(f'Shm for {name} already unlinked?') -class _Token(Struct, frozen=True): +class _NpToken(Struct, frozen=True): ''' - Internal represenation of a shared memory "token" - which can be used to key a system wide post shm entry. + Internal represenation of a shared memory ``numpy`` array "token" + which can be used to key and load a system (OS) wide shm entry + and correctly read the array by type signature. + + This type is msg safe. ''' shm_name: str # this servers as a "key" value @@ -137,18 +142,18 @@ class _Token(Struct, frozen=True): return self.to_dict() @classmethod - def from_msg(cls, msg: dict) -> _Token: - if isinstance(msg, _Token): + def from_msg(cls, msg: dict) -> _NpToken: + if isinstance(msg, _NpToken): return msg # TODO: native struct decoding # return _token_dec.decode(msg) msg['dtype_descr'] = tuple(map(tuple, msg['dtype_descr'])) - return _Token(**msg) + return _NpToken(**msg) -# _token_dec = msgspec.msgpack.Decoder(_Token) +# _token_dec = msgspec.msgpack.Decoder(_NpToken) # TODO: this api? # _known_tokens = tractor.ActorVar('_shm_tokens', {}) @@ -159,10 +164,14 @@ class _Token(Struct, frozen=True): _known_tokens = {} -def get_shm_token(key: str) -> _Token: - """Convenience func to check if a token +def get_shm_token(key: str) -> _NpToken | str: + ''' + Convenience func to check if a token for the provided key is known by this process. - """ + + Returns either the ``numpy`` token or a string for a shared list. + + ''' return _known_tokens.get(key) @@ -171,13 +180,13 @@ def _make_token( size: int, dtype: np.dtype, -) -> _Token: +) -> _NpToken: ''' Create a serializable token that can be used to access a shared array. ''' - return _Token( + return _NpToken( shm_name=key, shm_first_index_name=key + "_first", shm_last_index_name=key + "_last", @@ -188,7 +197,7 @@ def _make_token( class ShmArray: ''' - A shared memory ``numpy`` (compatible) array API. + A shared memory ``numpy.ndarray`` API. An underlying shared memory buffer is allocated based on a user specified ``numpy.ndarray``. This fixed size array @@ -228,8 +237,8 @@ class ShmArray: # TODO: ringbuf api? @property - def _token(self) -> _Token: - return _Token( + def _token(self) -> _NpToken: + return _NpToken( shm_name=self._shm.name, shm_first_index_name=self._first._shm.name, shm_last_index_name=self._last._shm.name, @@ -446,15 +455,17 @@ class ShmArray: ... -def open_shm_array( +def open_shm_ndarray( key: Optional[str] = None, - size: int = _default_size, # see above - dtype: Optional[np.dtype] = None, + size: int = int(2 ** 10), + dtype: np.dtype | None = None, + append_start_index: int = 0, readonly: bool = False, ) -> ShmArray: - '''Open a memory shared ``numpy`` using the standard library. + ''' + Open a memory shared ``numpy`` using the standard library. This call unlinks (aka permanently destroys) the buffer on teardown and thus should be used from the parent-most accessor (process). @@ -501,10 +512,10 @@ def open_shm_array( ) ) - # start the "real-time" updated section after 3-days worth of 1s - # sampled OHLC. this allows appending up to a days worth from - # tick/quote feeds before having to flush to a (tsdb) storage - # backend, and looks something like, + # Start the "real-time" append-updated (or "pushed-to") section + # after some start index: ``append_start_index``. This allows appending + # from a start point in the array which isn't the 0 index and looks + # something like, # ------------------------- # | | i # _________________________ @@ -518,7 +529,7 @@ def open_shm_array( # this sets the index to 3/4 of the length of the buffer # leaving a "days worth of second samples" for the real-time # section. - last.value = first.value = _rt_buffer_start + last.value = first.value = append_start_index shmarr = ShmArray( array, @@ -540,7 +551,7 @@ def open_shm_array( return shmarr -def attach_shm_array( +def attach_shm_ndarray( token: tuple[str, str, tuple[str, str]], readonly: bool = True, @@ -553,11 +564,11 @@ def attach_shm_array( access are constructed. ''' - token = _Token.from_msg(token) + token = _NpToken.from_msg(token) key = token.shm_name if key in _known_tokens: - assert _Token.from_msg(_known_tokens[key]) == token, "WTF" + assert _NpToken.from_msg(_known_tokens[key]) == token, "WTF" # XXX: ugh, looks like due to the ``shm_open()`` C api we can't # actually place files in a subdir, see discussion here: @@ -625,10 +636,14 @@ def attach_shm_array( return sha -def maybe_open_shm_array( - key: str, - dtype: Optional[np.dtype] = None, - **kwargs, +def maybe_open_shm_ndarray( + key: str, # unique identifier for segment + + # from ``open_shm_array()`` + size: int = int(2 ** 10), # array length in index terms + dtype: np.dtype | None = None, + append_start_index: int = 0, + readonly: bool = True, ) -> tuple[ShmArray, bool]: ''' @@ -641,18 +656,23 @@ def maybe_open_shm_array( running in **this** process. Systems where multiple actors may seek to access a common block can use this function to attempt to acquire a token as discovered by the actors who have previously stored - a "key" -> ``_Token`` map in an actor local (aka python global) + a "key" -> ``_NpToken`` map in an actor local (aka python global) variable. - If you know the explicit ``_Token`` for your memory segment instead + If you know the explicit ``_NpToken`` for your memory segment instead use ``attach_shm_array``. ''' - size = kwargs.pop('size', _default_size) try: # see if we already know this key token = _known_tokens[key] - return attach_shm_array(token=token, **kwargs), False + return ( + attach_shm_ndarray( + token=token, + readonly=readonly, + ), + False, # not newly opened + ) except KeyError: log.warning(f"Could not find {key} in shms cache") if dtype: @@ -661,8 +681,16 @@ def maybe_open_shm_array( size=size, dtype=dtype, ) + else: + try: - return attach_shm_array(token=token, **kwargs), False + return ( + attach_shm_ndarray( + token=token, + readonly=readonly, + ), + False, + ) except FileNotFoundError: log.warning(f"Could not attach to shm with token {token}") @@ -671,36 +699,84 @@ def maybe_open_shm_array( # Attempt to open a block and expect # to fail if a block has been allocated # on the OS by someone else. - return open_shm_array(key=key, dtype=dtype, **kwargs), True + return ( + open_shm_ndarray( + key=key, + size=size, + dtype=dtype, + append_start_index=append_start_index, + readonly=readonly, + ), + True, + ) -def try_read( - array: np.ndarray - -) -> Optional[np.ndarray]: +class ShmList(ShareableList): ''' - Try to read the last row from a shared mem array or ``None`` - if the array read returns a zero-length array result. - - Can be used to check for backfilling race conditions where an array - is currently being (re-)written by a writer actor but the reader is - unaware and reads during the window where the first and last indexes - are being updated. + Carbon copy of ``.shared_memory.ShareableList`` but add a + readonly state instance var. ''' - try: - return array[-1] - except IndexError: - # XXX: race condition with backfilling shm. - # - # the underlying issue is that a backfill (aka prepend) and subsequent - # shm array first/last index update could result in an empty array - # read here since the indices may be updated in such a way that - # a read delivers an empty array (though it seems like we - # *should* be able to prevent that?). also, as and alt and - # something we need anyway, maybe there should be some kind of - # signal that a prepend is taking place and this consumer can - # respond (eg. redrawing graphics) accordingly. + def __init__( + self, + sequence: list | None = None, + *, + name: str | None = None, + readonly: bool = True - # the array read was emtpy - return None + ) -> None: + self._readonly = readonly + self._key = name + return super().__init__( + sequence=sequence, + name=name, + ) + + @property + def key(self) -> str: + return self._key + + def __setitem__( + self, + position, + value, + + ) -> None: + + # mimick ``numpy`` error + if self._readonly: + raise ValueError('assignment destination is read-only') + + return super().__setitem__(position, value) + + +def open_shm_list( + key: str, + sequence: list | None = None, + size: int = int(2 ** 10), + dtype: np.dtype | None = None, + readonly: bool = True, + +) -> ShmList: + + if sequence is None: + sequence = list(map(float, range(size))) + + shml = ShmList( + sequence=sequence, + name=key, + readonly=readonly, + ) + + # "close" attached shm on actor teardown + tractor.current_actor().lifetime_stack.callback(shml.shm.close) + tractor.current_actor().lifetime_stack.callback(shml.shm.unlink) + + return shml + + +def attach_shm_list( + key: str, +) -> ShmList: + + return ShmList(name=key) -- 2.34.1 From 33482d8f41b72b5bac3f5ee3072f18a5e67a93ec Mon Sep 17 00:00:00 2001 From: Tyler Goodlet <jgbt@protonmail.com> Date: Sun, 16 Oct 2022 18:16:58 -0400 Subject: [PATCH 03/12] Add initial readers-writer shm list tests --- tests/test_shm.py | 84 +++++++++++++++++++++++++++++++++++++++++++++++ 1 file changed, 84 insertions(+) create mode 100644 tests/test_shm.py diff --git a/tests/test_shm.py b/tests/test_shm.py new file mode 100644 index 00000000..83ce7e21 --- /dev/null +++ b/tests/test_shm.py @@ -0,0 +1,84 @@ +""" +Shared mem primitives and APIs. + +""" + +# import numpy +import pytest +import trio +import tractor +from tractor._shm import ( + open_shm_list, + attach_shm_list, +) + + +@tractor.context +async def child_read_shm_list( + ctx: tractor.Context, + shm_key: str, + use_str: bool, +) -> None: + + shml = attach_shm_list(key=shm_key) + await ctx.started(shml.key) + + async with ctx.open_stream() as stream: + async for i in stream: + print(f'reading shm list index: {i}') + + if use_str: + expect = str(float(i)) + else: + expect = float(i) + + assert expect == shml[i] + + +@pytest.mark.parametrize( + 'use_str', [False, True], +) +def test_parent_writer_child_reader( + use_str: bool, +): + + async def main(): + async with tractor.open_nursery() as an: + + # allocate writeable list in parent + key = 'shm_list' + shml = open_shm_list( + key=key, + readonly=False, + ) + + portal = await an.start_actor( + 'shm_reader', + enable_modules=[__name__], + ) + + async with ( + portal.open_context( + child_read_shm_list, # taken from pytest parameterization + shm_key=key, + use_str=use_str, + ) as (ctx, sent), + + ctx.open_stream() as stream, + ): + + assert sent == key + + for i in range(2 ** 10): + + val = float(i) + if use_str: + val = str(val) + + print(f'writing {val}') + shml[i] = val + await stream.send(i) + + await portal.cancel_actor() + + trio.run(main) -- 2.34.1 From c932bb59114b618ef82d56a1fbc977f3eb78b517 Mon Sep 17 00:00:00 2001 From: Tyler Goodlet <jgbt@protonmail.com> Date: Mon, 17 Oct 2022 15:13:05 -0400 Subject: [PATCH 04/12] Add repetitive attach to existing segment test --- tests/test_shm.py | 45 +++++++++++++++++++++++++++++++++++++++++++++ 1 file changed, 45 insertions(+) diff --git a/tests/test_shm.py b/tests/test_shm.py index 83ce7e21..850ccb3e 100644 --- a/tests/test_shm.py +++ b/tests/test_shm.py @@ -2,6 +2,7 @@ Shared mem primitives and APIs. """ +import uuid # import numpy import pytest @@ -13,6 +14,50 @@ from tractor._shm import ( ) +@tractor.context +async def child_attach_shml_alot( + ctx: tractor.Context, + shm_key: str, +) -> None: + + await ctx.started(shm_key) + + # now try to attach a boatload of times in a loop.. + for _ in range(1000): + shml = attach_shm_list(key=shm_key) + assert shml.shm.name == shm_key + await trio.sleep(0.001) + + +def test_child_attaches_alot(): + async def main(): + async with tractor.open_nursery() as an: + + # allocate writeable list in parent + key = f'shml_{uuid.uuid4()}' + shml = open_shm_list( + key=key, + ) + + portal = await an.start_actor( + 'shm_attacher', + enable_modules=[__name__], + ) + + async with ( + portal.open_context( + child_attach_shml_alot, # taken from pytest parameterization + shm_key=key, + ) as (ctx, start_val), + ): + assert start_val == key + await ctx.result() + + await portal.cancel_actor() + + trio.run(main) + + @tractor.context async def child_read_shm_list( ctx: tractor.Context, -- 2.34.1 From e46033cbe7207be0ea6e941b238d0d151a5022e8 Mon Sep 17 00:00:00 2001 From: Tyler Goodlet <jgbt@protonmail.com> Date: Mon, 17 Oct 2022 15:13:58 -0400 Subject: [PATCH 05/12] Don't require runtime (for now), type annot fixing --- tractor/_shm.py | 44 +++++++++++++++++++++----------------------- 1 file changed, 21 insertions(+), 23 deletions(-) diff --git a/tractor/_shm.py b/tractor/_shm.py index 63d18411..80ca49d1 100644 --- a/tractor/_shm.py +++ b/tractor/_shm.py @@ -29,7 +29,7 @@ from typing import Optional from multiprocessing.shared_memory import ( SharedMemory, ShareableList, - _USE_POSIX, + _USE_POSIX, # type: ignore ) if _USE_POSIX: @@ -46,15 +46,6 @@ from .log import get_logger log = get_logger(__name__) -# how much is probably dependent on lifestyle -_secs_in_day = int(60 * 60 * 24) -# we try for a buncha times, but only on a run-every-other-day kinda week. -_days_worth = 16 -_default_size = _days_worth * _secs_in_day -# where to start the new data append index -_rt_buffer_start = int((_days_worth - 1) * _secs_in_day) - - def disable_mantracker(): ''' Disable all ``multiprocessing``` "resource tracking" machinery since @@ -79,7 +70,6 @@ def disable_mantracker(): mantracker._resource_tracker = ManTracker() mantracker.register = mantracker._resource_tracker.register mantracker.ensure_running = mantracker._resource_tracker.ensure_running - # ensure_running = mantracker._resource_tracker.ensure_running mantracker.unregister = mantracker._resource_tracker.unregister mantracker.getfd = mantracker._resource_tracker.getfd @@ -134,9 +124,14 @@ class _NpToken(Struct, frozen=True): dtype_descr: tuple size: int # in struct-array index / row terms + # TODO: use nptyping here on dtypes @property - def dtype(self) -> np.dtype: - return np.dtype(list(map(tuple, self.dtype_descr))).descr + def dtype(self) -> list[tuple[str, str, tuple[int, ...]]]: + return np.dtype( + list( + map(tuple, self.dtype_descr) + ) + ).descr def as_msg(self): return self.to_dict() @@ -161,10 +156,10 @@ class _NpToken(Struct, frozen=True): # _known_tokens = trio.RunVar('shms', {}) # process-local store of keys to tokens -_known_tokens = {} +_known_tokens: dict[str, _NpToken] = {} -def get_shm_token(key: str) -> _NpToken | str: +def get_shm_token(key: str) -> _NpToken | None: ''' Convenience func to check if a token for the provided key is known by this process. @@ -228,11 +223,10 @@ class ShmArray: self._post_init: bool = False # pushing data does not write the index (aka primary key) + self._write_fields: list[str] | None = None dtype = shmarr.dtype if dtype.fields: self._write_fields = list(shmarr.dtype.fields.keys())[1:] - else: - self._write_fields = None # TODO: ringbuf api? @@ -283,9 +277,9 @@ class ShmArray: self, fields: Optional[list[str]] = None, - # type that all field values will be cast to - # in the returned view. - common_dtype: np.dtype = np.float, + # type that all field values will be cast to in the returned + # view. + common_dtype: np.dtype = np.float64, # type: ignore ) -> np.ndarray: @@ -543,7 +537,6 @@ def open_shm_ndarray( # "unlink" created shm on process teardown by # pushing teardown calls onto actor context stack - stack = tractor.current_actor().lifetime_stack stack.callback(shmarr.close) stack.callback(shmarr.destroy) @@ -769,14 +762,19 @@ def open_shm_list( ) # "close" attached shm on actor teardown - tractor.current_actor().lifetime_stack.callback(shml.shm.close) - tractor.current_actor().lifetime_stack.callback(shml.shm.unlink) + try: + actor = tractor.current_actor() + actor.lifetime_stack.callback(shml.shm.close) + actor.lifetime_stack.callback(shml.shm.unlink) + except RuntimeError: + log.warning('tractor runtime not active, skipping teardown steps') return shml def attach_shm_list( key: str, + ) -> ShmList: return ShmList(name=key) -- 2.34.1 From afbdb50a30bb71887b598b01abfca0295fb4debe Mon Sep 17 00:00:00 2001 From: Tyler Goodlet <jgbt@protonmail.com> Date: Mon, 17 Oct 2022 17:21:14 -0400 Subject: [PATCH 06/12] Rename token type to `NDToken` in the style of `nptyping` --- tractor/_shm.py | 30 +++++++++++++++--------------- 1 file changed, 15 insertions(+), 15 deletions(-) diff --git a/tractor/_shm.py b/tractor/_shm.py index 80ca49d1..3f415c52 100644 --- a/tractor/_shm.py +++ b/tractor/_shm.py @@ -109,7 +109,7 @@ class SharedInt: log.warning(f'Shm for {name} already unlinked?') -class _NpToken(Struct, frozen=True): +class NDToken(Struct, frozen=True): ''' Internal represenation of a shared memory ``numpy`` array "token" which can be used to key and load a system (OS) wide shm entry @@ -137,18 +137,18 @@ class _NpToken(Struct, frozen=True): return self.to_dict() @classmethod - def from_msg(cls, msg: dict) -> _NpToken: - if isinstance(msg, _NpToken): + def from_msg(cls, msg: dict) -> NDToken: + if isinstance(msg, NDToken): return msg # TODO: native struct decoding # return _token_dec.decode(msg) msg['dtype_descr'] = tuple(map(tuple, msg['dtype_descr'])) - return _NpToken(**msg) + return NDToken(**msg) -# _token_dec = msgspec.msgpack.Decoder(_NpToken) +# _token_dec = msgspec.msgpack.Decoder(NDToken) # TODO: this api? # _known_tokens = tractor.ActorVar('_shm_tokens', {}) @@ -156,10 +156,10 @@ class _NpToken(Struct, frozen=True): # _known_tokens = trio.RunVar('shms', {}) # process-local store of keys to tokens -_known_tokens: dict[str, _NpToken] = {} +_known_tokens: dict[str, NDToken] = {} -def get_shm_token(key: str) -> _NpToken | None: +def get_shm_token(key: str) -> NDToken | None: ''' Convenience func to check if a token for the provided key is known by this process. @@ -175,13 +175,13 @@ def _make_token( size: int, dtype: np.dtype, -) -> _NpToken: +) -> NDToken: ''' Create a serializable token that can be used to access a shared array. ''' - return _NpToken( + return NDToken( shm_name=key, shm_first_index_name=key + "_first", shm_last_index_name=key + "_last", @@ -231,8 +231,8 @@ class ShmArray: # TODO: ringbuf api? @property - def _token(self) -> _NpToken: - return _NpToken( + def _token(self) -> NDToken: + return NDToken( shm_name=self._shm.name, shm_first_index_name=self._first._shm.name, shm_last_index_name=self._last._shm.name, @@ -557,11 +557,11 @@ def attach_shm_ndarray( access are constructed. ''' - token = _NpToken.from_msg(token) + token = NDToken.from_msg(token) key = token.shm_name if key in _known_tokens: - assert _NpToken.from_msg(_known_tokens[key]) == token, "WTF" + assert NDToken.from_msg(_known_tokens[key]) == token, "WTF" # XXX: ugh, looks like due to the ``shm_open()`` C api we can't # actually place files in a subdir, see discussion here: @@ -649,10 +649,10 @@ def maybe_open_shm_ndarray( running in **this** process. Systems where multiple actors may seek to access a common block can use this function to attempt to acquire a token as discovered by the actors who have previously stored - a "key" -> ``_NpToken`` map in an actor local (aka python global) + a "key" -> ``NDToken`` map in an actor local (aka python global) variable. - If you know the explicit ``_NpToken`` for your memory segment instead + If you know the explicit ``NDToken`` for your memory segment instead use ``attach_shm_array``. ''' -- 2.34.1 From 1c441b0986bc5e8a152701f1897d731bdf24120f Mon Sep 17 00:00:00 2001 From: Tyler Goodlet <jgbt@protonmail.com> Date: Tue, 18 Oct 2022 11:01:02 -0400 Subject: [PATCH 07/12] Add `ShmList` slice support in `.__getitem__()` --- tractor/_shm.py | 29 ++++++++++++++++++++++++----- 1 file changed, 24 insertions(+), 5 deletions(-) diff --git a/tractor/_shm.py b/tractor/_shm.py index 3f415c52..c26c9911 100644 --- a/tractor/_shm.py +++ b/tractor/_shm.py @@ -26,20 +26,26 @@ from __future__ import annotations from sys import byteorder import time from typing import Optional +from multiprocessing import shared_memory as shm from multiprocessing.shared_memory import ( SharedMemory, ShareableList, - _USE_POSIX, # type: ignore + # _USE_POSIX, # type: ignore ) -if _USE_POSIX: +if getattr(shm, '_USE_POSIX', False): from _posixshmem import shm_unlink from msgspec import Struct -import numpy as np -from numpy.lib import recfunctions as rfn import tractor +try: + import numpy as np + from numpy.lib import recfunctions as rfn + import nptyping +except ImportError: + pass + from .log import get_logger @@ -742,6 +748,15 @@ class ShmList(ShareableList): return super().__setitem__(position, value) + def __getitem__( + self, + indexish, + ) -> list: + if isinstance(indexish, slice): + return list(self)[indexish] + + return super().__getitem__(indexish) + def open_shm_list( key: str, @@ -774,7 +789,11 @@ def open_shm_list( def attach_shm_list( key: str, + readonly: bool = False, ) -> ShmList: - return ShmList(name=key) + return ShmList( + name=key, + readonly=readonly, + ) -- 2.34.1 From 9a0d529b180065830c8d1950df388d22e9408787 Mon Sep 17 00:00:00 2001 From: Tyler Goodlet <jgbt@protonmail.com> Date: Tue, 18 Oct 2022 11:01:30 -0400 Subject: [PATCH 08/12] Parametrize rw test with variable frame sizes Demonstrates fixed size frame-oriented reads by the child where the parent only transmits a "read" stream msg on "frame fill events" such that the child incrementally reads the shm list data (much like in a real-time-buffered streaming system). --- tests/test_shm.py | 64 +++++++++++++++++++++++++++++++++++------------ 1 file changed, 48 insertions(+), 16 deletions(-) diff --git a/tests/test_shm.py b/tests/test_shm.py index 850ccb3e..c183040c 100644 --- a/tests/test_shm.py +++ b/tests/test_shm.py @@ -24,7 +24,10 @@ async def child_attach_shml_alot( # now try to attach a boatload of times in a loop.. for _ in range(1000): - shml = attach_shm_list(key=shm_key) + shml = attach_shm_list( + key=shm_key, + readonly=False, + ) assert shml.shm.name == shm_key await trio.sleep(0.001) @@ -46,8 +49,8 @@ def test_child_attaches_alot(): async with ( portal.open_context( - child_attach_shml_alot, # taken from pytest parameterization - shm_key=key, + child_attach_shml_alot, + shm_key=shml.key, ) as (ctx, start_val), ): assert start_val == key @@ -63,50 +66,70 @@ async def child_read_shm_list( ctx: tractor.Context, shm_key: str, use_str: bool, + frame_size: int, ) -> None: + # attach in child shml = attach_shm_list(key=shm_key) await ctx.started(shml.key) async with ctx.open_stream() as stream: async for i in stream: - print(f'reading shm list index: {i}') + print(f'(child): reading shm list index: {i}') if use_str: expect = str(float(i)) else: expect = float(i) - assert expect == shml[i] + if frame_size == 1: + val = shml[i] + assert expect == val + print(f'(child): reading value: {val}') + else: + frame = shml[i - frame_size:i] + print(f'(child): reading frame: {frame}') @pytest.mark.parametrize( 'use_str', [False, True], ) +@pytest.mark.parametrize( + 'frame_size', + [1, 2**6, 2**10], + ids=lambda i: f'frame_size={i}', +) def test_parent_writer_child_reader( use_str: bool, + frame_size: int, ): async def main(): - async with tractor.open_nursery() as an: - - # allocate writeable list in parent - key = 'shm_list' - shml = open_shm_list( - key=key, - readonly=False, - ) + async with tractor.open_nursery( + debug_mode=True, + ) as an: portal = await an.start_actor( 'shm_reader', enable_modules=[__name__], + debug_mode=True, + ) + + # allocate writeable list in parent + key = 'shm_list' + seq_size = int(2 * 2 ** 10) + shml = open_shm_list( + key=key, + size=seq_size, + readonly=False, ) async with ( portal.open_context( - child_read_shm_list, # taken from pytest parameterization + child_read_shm_list, shm_key=key, use_str=use_str, + frame_size=frame_size, ) as (ctx, sent), ctx.open_stream() as stream, @@ -114,14 +137,23 @@ def test_parent_writer_child_reader( assert sent == key - for i in range(2 ** 10): + for i in range(seq_size): val = float(i) if use_str: val = str(val) - print(f'writing {val}') + print(f'(parent): writing {val}') shml[i] = val + + # only on frame fills do we + # signal to the child that a frame's + # worth is ready. + if (i % frame_size) == 0: + print(f'(parent): signalling frame full on {val}') + await stream.send(i) + else: + print(f'(parent): signalling final frame on {val}') await stream.send(i) await portal.cancel_actor() -- 2.34.1 From 255209f881d68ddcdd1cf323b230c98da948e9f2 Mon Sep 17 00:00:00 2001 From: Tyler Goodlet <jgbt@protonmail.com> Date: Tue, 18 Oct 2022 16:28:57 -0400 Subject: [PATCH 09/12] Mod define `_USE_POSIX`, add a of of todos --- tractor/_shm.py | 35 +++++++++++++++++++++++++++-------- 1 file changed, 27 insertions(+), 8 deletions(-) diff --git a/tractor/_shm.py b/tractor/_shm.py index c26c9911..79ac8969 100644 --- a/tractor/_shm.py +++ b/tractor/_shm.py @@ -30,15 +30,19 @@ from multiprocessing import shared_memory as shm from multiprocessing.shared_memory import ( SharedMemory, ShareableList, - # _USE_POSIX, # type: ignore ) -if getattr(shm, '_USE_POSIX', False): - from _posixshmem import shm_unlink - from msgspec import Struct import tractor +from .log import get_logger + + +_USE_POSIX = getattr(shm, '_USE_POSIX', False) +if _USE_POSIX: + from _posixshmem import shm_unlink + + try: import numpy as np from numpy.lib import recfunctions as rfn @@ -46,8 +50,6 @@ try: except ImportError: pass -from .log import get_logger - log = get_logger(__name__) @@ -161,6 +163,8 @@ class NDToken(Struct, frozen=True): # _known_tokens = tractor.ContextStack('_known_tokens', ) # _known_tokens = trio.RunVar('shms', {}) +# TODO: this should maybe be provided via +# a `.trionics.maybe_open_context()` wrapper factory? # process-local store of keys to tokens _known_tokens: dict[str, NDToken] = {} @@ -712,8 +716,12 @@ def maybe_open_shm_ndarray( class ShmList(ShareableList): ''' - Carbon copy of ``.shared_memory.ShareableList`` but add a - readonly state instance var. + Carbon copy of ``.shared_memory.ShareableList`` with a few + enhancements: + + - readonly mode via instance var flag + - ``.__getitem__()`` accepts ``slice`` inputs + - exposes the underlying buffer "name" as a ``.key: str`` ''' def __init__( @@ -752,11 +760,22 @@ class ShmList(ShareableList): self, indexish, ) -> list: + + # NOTE: this is a non-writeable view (copy?) of the buffer + # in a new list instance. if isinstance(indexish, slice): return list(self)[indexish] return super().__getitem__(indexish) + # TODO: should we offer a `.array` and `.push()` equivalent + # to the `ShmArray`? + # currently we have the following limitations: + # - can't write slices of input using traditional slice-assign + # syntax due to the ``ShareableList.__setitem__()`` implementation. + # - ``list(shmlist)`` returns a non-mutable copy instead of + # a writeable view which would be handier numpy-style ops. + def open_shm_list( key: str, -- 2.34.1 From 2683a7f33ad747346af90bc9c80ee450f8aa1928 Mon Sep 17 00:00:00 2001 From: Tyler Goodlet <jgbt@protonmail.com> Date: Wed, 19 Oct 2022 14:20:50 -0400 Subject: [PATCH 10/12] Allocate size-specced "empty" sequence from default values by type --- tractor/_shm.py | 18 +++++++++++++++--- 1 file changed, 15 insertions(+), 3 deletions(-) diff --git a/tractor/_shm.py b/tractor/_shm.py index 79ac8969..2ce148da 100644 --- a/tractor/_shm.py +++ b/tractor/_shm.py @@ -719,7 +719,7 @@ class ShmList(ShareableList): Carbon copy of ``.shared_memory.ShareableList`` with a few enhancements: - - readonly mode via instance var flag + - readonly mode via instance var flag `._readonly: bool` - ``.__getitem__()`` accepts ``slice`` inputs - exposes the underlying buffer "name" as a ``.key: str`` @@ -743,6 +743,10 @@ class ShmList(ShareableList): def key(self) -> str: return self._key + @property + def readonly(self) -> bool: + return self._readonly + def __setitem__( self, position, @@ -781,13 +785,21 @@ def open_shm_list( key: str, sequence: list | None = None, size: int = int(2 ** 10), - dtype: np.dtype | None = None, + dtype: float | int | bool | str | bytes | None = float, readonly: bool = True, ) -> ShmList: if sequence is None: - sequence = list(map(float, range(size))) + default = { + float: 0., + int: 0, + bool: True, + str: 'doggy', + None: None, + }[dtype] + sequence = [default] * size + # sequence = [0.] * size shml = ShmList( sequence=sequence, -- 2.34.1 From 8ebb1f09de99db523d6c7c65eace18a6054cdc72 Mon Sep 17 00:00:00 2001 From: Tyler Goodlet <jgbt@protonmail.com> Date: Thu, 20 Oct 2022 16:08:28 -0400 Subject: [PATCH 11/12] Pass `str` dtype for `use_str` case --- tests/test_shm.py | 14 ++++++++++---- tractor/_shm.py | 2 -- 2 files changed, 10 insertions(+), 6 deletions(-) diff --git a/tests/test_shm.py b/tests/test_shm.py index c183040c..2b7a382f 100644 --- a/tests/test_shm.py +++ b/tests/test_shm.py @@ -70,7 +70,10 @@ async def child_read_shm_list( ) -> None: # attach in child - shml = attach_shm_list(key=shm_key) + shml = attach_shm_list( + key=shm_key, + # dtype=str if use_str else float, + ) await ctx.started(shml.key) async with ctx.open_stream() as stream: @@ -92,7 +95,9 @@ async def child_read_shm_list( @pytest.mark.parametrize( - 'use_str', [False, True], + 'use_str', + [False, True], + ids=lambda i: f'use_str_values={i}', ) @pytest.mark.parametrize( 'frame_size', @@ -106,7 +111,7 @@ def test_parent_writer_child_reader( async def main(): async with tractor.open_nursery( - debug_mode=True, + # debug_mode=True, ) as an: portal = await an.start_actor( @@ -121,6 +126,7 @@ def test_parent_writer_child_reader( shml = open_shm_list( key=key, size=seq_size, + dtype=str if use_str else float, readonly=False, ) @@ -143,7 +149,7 @@ def test_parent_writer_child_reader( if use_str: val = str(val) - print(f'(parent): writing {val}') + # print(f'(parent): writing {val}') shml[i] = val # only on frame fills do we diff --git a/tractor/_shm.py b/tractor/_shm.py index 2ce148da..c4c17335 100644 --- a/tractor/_shm.py +++ b/tractor/_shm.py @@ -460,7 +460,6 @@ class ShmArray: def open_shm_ndarray( - key: Optional[str] = None, size: int = int(2 ** 10), dtype: np.dtype | None = None, @@ -799,7 +798,6 @@ def open_shm_list( None: None, }[dtype] sequence = [default] * size - # sequence = [0.] * size shml = ShmList( sequence=sequence, -- 2.34.1 From 5cee222353640b63dc0fbd1dc82de8946842c607 Mon Sep 17 00:00:00 2001 From: Tyler Goodlet <jgbt@protonmail.com> Date: Thu, 22 Jun 2023 17:16:17 -0400 Subject: [PATCH 12/12] Updates from latest `piker.data._sharedmem` changes --- tractor/_shm.py | 33 +++++++++++++++++++-------------- 1 file changed, 19 insertions(+), 14 deletions(-) diff --git a/tractor/_shm.py b/tractor/_shm.py index c4c17335..f8295105 100644 --- a/tractor/_shm.py +++ b/tractor/_shm.py @@ -287,9 +287,9 @@ class ShmArray: self, fields: Optional[list[str]] = None, - # type that all field values will be cast to in the returned - # view. - common_dtype: np.dtype = np.float64, # type: ignore + # type that all field values will be cast to + # in the returned view. + common_dtype: np.dtype = float, ) -> np.ndarray: @@ -344,7 +344,7 @@ class ShmArray: field_map: Optional[dict[str, str]] = None, prepend: bool = False, update_first: bool = True, - start: Optional[int] = None, + start: int | None = None, ) -> int: ''' @@ -386,7 +386,11 @@ class ShmArray: # tries to access ``.array`` (which due to the index # overlap will be empty). Pretty sure we've fixed it now # but leaving this here as a reminder. - if prepend and update_first and length: + if ( + prepend + and update_first + and length + ): assert index < self._first.value if ( @@ -460,10 +464,10 @@ class ShmArray: def open_shm_ndarray( - key: Optional[str] = None, - size: int = int(2 ** 10), + size: int, + key: str | None = None, dtype: np.dtype | None = None, - append_start_index: int = 0, + append_start_index: int | None = None, readonly: bool = False, ) -> ShmArray: @@ -529,9 +533,12 @@ def open_shm_ndarray( # ``ShmArray._start.value: int = 0`` and the yet-to-be written # real-time section will start at ``ShmArray.index: int``. - # this sets the index to 3/4 of the length of the buffer - # leaving a "days worth of second samples" for the real-time - # section. + # this sets the index to nearly 2/3rds into the the length of + # the buffer leaving at least a "days worth of second samples" + # for the real-time section. + if append_start_index is None: + append_start_index = round(size * 0.616) + last.value = first.value = append_start_index shmarr = ShmArray( @@ -640,9 +647,7 @@ def attach_shm_ndarray( def maybe_open_shm_ndarray( key: str, # unique identifier for segment - - # from ``open_shm_array()`` - size: int = int(2 ** 10), # array length in index terms + size: int, dtype: np.dtype | None = None, append_start_index: int = 0, readonly: bool = True, -- 2.34.1