Add `ShmList` wrapping the stdlib's `ShareableList`

First attempt at getting `multiprocessing.shared_memory.ShareableList`
working; we wrap the stdlib type with a readonly attr and a `.key` for
cross-actor lookup. Also, rename all `numpy` specific routines to have
a `ndarray` suffix in the func names.
macos_in_ci
Tyler Goodlet 2022-10-16 18:06:07 -04:00
parent 901353c213
commit 0e4b37d122
1 changed files with 141 additions and 65 deletions

View File

@ -28,6 +28,7 @@ import time
from typing import Optional from typing import Optional
from multiprocessing.shared_memory import ( from multiprocessing.shared_memory import (
SharedMemory, SharedMemory,
ShareableList,
_USE_POSIX, _USE_POSIX,
) )
@ -87,10 +88,11 @@ disable_mantracker()
class SharedInt: class SharedInt:
"""Wrapper around a single entry shared memory array which '''
Wrapper around a single entry shared memory array which
holds an ``int`` value used as an index counter. holds an ``int`` value used as an index counter.
""" '''
def __init__( def __init__(
self, self,
shm: SharedMemory, shm: SharedMemory,
@ -117,10 +119,13 @@ class SharedInt:
log.warning(f'Shm for {name} already unlinked?') log.warning(f'Shm for {name} already unlinked?')
class _Token(Struct, frozen=True): class _NpToken(Struct, frozen=True):
''' '''
Internal represenation of a shared memory "token" Internal represenation of a shared memory ``numpy`` array "token"
which can be used to key a system wide post shm entry. which can be used to key and load a system (OS) wide shm entry
and correctly read the array by type signature.
This type is msg safe.
''' '''
shm_name: str # this servers as a "key" value shm_name: str # this servers as a "key" value
@ -137,18 +142,18 @@ class _Token(Struct, frozen=True):
return self.to_dict() return self.to_dict()
@classmethod @classmethod
def from_msg(cls, msg: dict) -> _Token: def from_msg(cls, msg: dict) -> _NpToken:
if isinstance(msg, _Token): if isinstance(msg, _NpToken):
return msg return msg
# TODO: native struct decoding # TODO: native struct decoding
# return _token_dec.decode(msg) # return _token_dec.decode(msg)
msg['dtype_descr'] = tuple(map(tuple, msg['dtype_descr'])) msg['dtype_descr'] = tuple(map(tuple, msg['dtype_descr']))
return _Token(**msg) return _NpToken(**msg)
# _token_dec = msgspec.msgpack.Decoder(_Token) # _token_dec = msgspec.msgpack.Decoder(_NpToken)
# TODO: this api? # TODO: this api?
# _known_tokens = tractor.ActorVar('_shm_tokens', {}) # _known_tokens = tractor.ActorVar('_shm_tokens', {})
@ -159,10 +164,14 @@ class _Token(Struct, frozen=True):
_known_tokens = {} _known_tokens = {}
def get_shm_token(key: str) -> _Token: def get_shm_token(key: str) -> _NpToken | str:
"""Convenience func to check if a token '''
Convenience func to check if a token
for the provided key is known by this process. for the provided key is known by this process.
"""
Returns either the ``numpy`` token or a string for a shared list.
'''
return _known_tokens.get(key) return _known_tokens.get(key)
@ -171,13 +180,13 @@ def _make_token(
size: int, size: int,
dtype: np.dtype, dtype: np.dtype,
) -> _Token: ) -> _NpToken:
''' '''
Create a serializable token that can be used Create a serializable token that can be used
to access a shared array. to access a shared array.
''' '''
return _Token( return _NpToken(
shm_name=key, shm_name=key,
shm_first_index_name=key + "_first", shm_first_index_name=key + "_first",
shm_last_index_name=key + "_last", shm_last_index_name=key + "_last",
@ -188,7 +197,7 @@ def _make_token(
class ShmArray: class ShmArray:
''' '''
A shared memory ``numpy`` (compatible) array API. A shared memory ``numpy.ndarray`` API.
An underlying shared memory buffer is allocated based on An underlying shared memory buffer is allocated based on
a user specified ``numpy.ndarray``. This fixed size array a user specified ``numpy.ndarray``. This fixed size array
@ -228,8 +237,8 @@ class ShmArray:
# TODO: ringbuf api? # TODO: ringbuf api?
@property @property
def _token(self) -> _Token: def _token(self) -> _NpToken:
return _Token( return _NpToken(
shm_name=self._shm.name, shm_name=self._shm.name,
shm_first_index_name=self._first._shm.name, shm_first_index_name=self._first._shm.name,
shm_last_index_name=self._last._shm.name, shm_last_index_name=self._last._shm.name,
@ -446,15 +455,17 @@ class ShmArray:
... ...
def open_shm_array( def open_shm_ndarray(
key: Optional[str] = None, key: Optional[str] = None,
size: int = _default_size, # see above size: int = int(2 ** 10),
dtype: Optional[np.dtype] = None, dtype: np.dtype | None = None,
append_start_index: int = 0,
readonly: bool = False, readonly: bool = False,
) -> ShmArray: ) -> ShmArray:
'''Open a memory shared ``numpy`` using the standard library. '''
Open a memory shared ``numpy`` using the standard library.
This call unlinks (aka permanently destroys) the buffer on teardown This call unlinks (aka permanently destroys) the buffer on teardown
and thus should be used from the parent-most accessor (process). and thus should be used from the parent-most accessor (process).
@ -501,10 +512,10 @@ def open_shm_array(
) )
) )
# start the "real-time" updated section after 3-days worth of 1s # Start the "real-time" append-updated (or "pushed-to") section
# sampled OHLC. this allows appending up to a days worth from # after some start index: ``append_start_index``. This allows appending
# tick/quote feeds before having to flush to a (tsdb) storage # from a start point in the array which isn't the 0 index and looks
# backend, and looks something like, # something like,
# ------------------------- # -------------------------
# | | i # | | i
# _________________________ # _________________________
@ -518,7 +529,7 @@ def open_shm_array(
# this sets the index to 3/4 of the length of the buffer # this sets the index to 3/4 of the length of the buffer
# leaving a "days worth of second samples" for the real-time # leaving a "days worth of second samples" for the real-time
# section. # section.
last.value = first.value = _rt_buffer_start last.value = first.value = append_start_index
shmarr = ShmArray( shmarr = ShmArray(
array, array,
@ -540,7 +551,7 @@ def open_shm_array(
return shmarr return shmarr
def attach_shm_array( def attach_shm_ndarray(
token: tuple[str, str, tuple[str, str]], token: tuple[str, str, tuple[str, str]],
readonly: bool = True, readonly: bool = True,
@ -553,11 +564,11 @@ def attach_shm_array(
access are constructed. access are constructed.
''' '''
token = _Token.from_msg(token) token = _NpToken.from_msg(token)
key = token.shm_name key = token.shm_name
if key in _known_tokens: if key in _known_tokens:
assert _Token.from_msg(_known_tokens[key]) == token, "WTF" assert _NpToken.from_msg(_known_tokens[key]) == token, "WTF"
# XXX: ugh, looks like due to the ``shm_open()`` C api we can't # XXX: ugh, looks like due to the ``shm_open()`` C api we can't
# actually place files in a subdir, see discussion here: # actually place files in a subdir, see discussion here:
@ -625,10 +636,14 @@ def attach_shm_array(
return sha return sha
def maybe_open_shm_array( def maybe_open_shm_ndarray(
key: str, key: str, # unique identifier for segment
dtype: Optional[np.dtype] = None,
**kwargs, # from ``open_shm_array()``
size: int = int(2 ** 10), # array length in index terms
dtype: np.dtype | None = None,
append_start_index: int = 0,
readonly: bool = True,
) -> tuple[ShmArray, bool]: ) -> tuple[ShmArray, bool]:
''' '''
@ -641,18 +656,23 @@ def maybe_open_shm_array(
running in **this** process. Systems where multiple actors may seek running in **this** process. Systems where multiple actors may seek
to access a common block can use this function to attempt to acquire to access a common block can use this function to attempt to acquire
a token as discovered by the actors who have previously stored a token as discovered by the actors who have previously stored
a "key" -> ``_Token`` map in an actor local (aka python global) a "key" -> ``_NpToken`` map in an actor local (aka python global)
variable. variable.
If you know the explicit ``_Token`` for your memory segment instead If you know the explicit ``_NpToken`` for your memory segment instead
use ``attach_shm_array``. use ``attach_shm_array``.
''' '''
size = kwargs.pop('size', _default_size)
try: try:
# see if we already know this key # see if we already know this key
token = _known_tokens[key] token = _known_tokens[key]
return attach_shm_array(token=token, **kwargs), False return (
attach_shm_ndarray(
token=token,
readonly=readonly,
),
False, # not newly opened
)
except KeyError: except KeyError:
log.warning(f"Could not find {key} in shms cache") log.warning(f"Could not find {key} in shms cache")
if dtype: if dtype:
@ -661,8 +681,16 @@ def maybe_open_shm_array(
size=size, size=size,
dtype=dtype, dtype=dtype,
) )
else:
try: try:
return attach_shm_array(token=token, **kwargs), False return (
attach_shm_ndarray(
token=token,
readonly=readonly,
),
False,
)
except FileNotFoundError: except FileNotFoundError:
log.warning(f"Could not attach to shm with token {token}") log.warning(f"Could not attach to shm with token {token}")
@ -671,36 +699,84 @@ def maybe_open_shm_array(
# Attempt to open a block and expect # Attempt to open a block and expect
# to fail if a block has been allocated # to fail if a block has been allocated
# on the OS by someone else. # on the OS by someone else.
return open_shm_array(key=key, dtype=dtype, **kwargs), True return (
open_shm_ndarray(
key=key,
size=size,
dtype=dtype,
append_start_index=append_start_index,
readonly=readonly,
),
True,
)
def try_read( class ShmList(ShareableList):
array: np.ndarray
) -> Optional[np.ndarray]:
''' '''
Try to read the last row from a shared mem array or ``None`` Carbon copy of ``.shared_memory.ShareableList`` but add a
if the array read returns a zero-length array result. readonly state instance var.
Can be used to check for backfilling race conditions where an array
is currently being (re-)written by a writer actor but the reader is
unaware and reads during the window where the first and last indexes
are being updated.
''' '''
try: def __init__(
return array[-1] self,
except IndexError: sequence: list | None = None,
# XXX: race condition with backfilling shm. *,
# name: str | None = None,
# the underlying issue is that a backfill (aka prepend) and subsequent readonly: bool = True
# shm array first/last index update could result in an empty array
# read here since the indices may be updated in such a way that
# a read delivers an empty array (though it seems like we
# *should* be able to prevent that?). also, as and alt and
# something we need anyway, maybe there should be some kind of
# signal that a prepend is taking place and this consumer can
# respond (eg. redrawing graphics) accordingly.
# the array read was emtpy ) -> None:
return None self._readonly = readonly
self._key = name
return super().__init__(
sequence=sequence,
name=name,
)
@property
def key(self) -> str:
return self._key
def __setitem__(
self,
position,
value,
) -> None:
# mimick ``numpy`` error
if self._readonly:
raise ValueError('assignment destination is read-only')
return super().__setitem__(position, value)
def open_shm_list(
key: str,
sequence: list | None = None,
size: int = int(2 ** 10),
dtype: np.dtype | None = None,
readonly: bool = True,
) -> ShmList:
if sequence is None:
sequence = list(map(float, range(size)))
shml = ShmList(
sequence=sequence,
name=key,
readonly=readonly,
)
# "close" attached shm on actor teardown
tractor.current_actor().lifetime_stack.callback(shml.shm.close)
tractor.current_actor().lifetime_stack.callback(shml.shm.unlink)
return shml
def attach_shm_list(
key: str,
) -> ShmList:
return ShmList(name=key)