diff --git a/piker/data/_sharedmem.py b/piker/data/_sharedmem.py
index acb8070f..3c7857b0 100644
--- a/piker/data/_sharedmem.py
+++ b/piker/data/_sharedmem.py
@@ -1,151 +1,51 @@
# piker: trading gear for hackers
# Copyright (C) Tyler Goodlet (in stewardship for pikers)
-# This program is free software: you can redistribute it and/or modify
-# it under the terms of the GNU Affero General Public License as published by
-# the Free Software Foundation, either version 3 of the License, or
-# (at your option) any later version.
+# This program is free software: you can redistribute it
+# and/or modify it under the terms of the GNU Affero General
+# Public License as published by the Free Software
+# Foundation, either version 3 of the License, or (at your
+# option) any later version.
-# This program is distributed in the hope that it will be useful,
-# but WITHOUT ANY WARRANTY; without even the implied warranty of
-# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
-# GNU Affero General Public License for more details.
+# This program is distributed in the hope that it will be
+# useful, but WITHOUT ANY WARRANTY; without even the implied
+# warranty of MERCHANTABILITY or FITNESS FOR A PARTICULAR
+# PURPOSE. See the GNU Affero General Public License for
+# more details.
-# You should have received a copy of the GNU Affero General Public License
-# along with this program. If not, see .
+# You should have received a copy of the GNU Affero General
+# Public License along with this program. If not, see
+# .
-"""
-NumPy compatible shared memory buffers for real-time IPC streaming.
+'''
+NumPy shared memory buffers for real-time IPC streaming.
-"""
-from __future__ import annotations
-from sys import byteorder
-import time
+Thin shim over ``tractor.ipc._shm`` providing
+backward-compatible aliases for piker's historical API.
+
+'''
from typing import Optional
-from multiprocessing.shared_memory import SharedMemory, _USE_POSIX
-if _USE_POSIX:
- from _posixshmem import shm_unlink
-
-# import msgspec
import numpy as np
-from numpy.lib import recfunctions as rfn
-import tractor
+
+from tractor.ipc._shm import (
+ SharedInt,
+ ShmArray,
+ ShmList,
+
+ NDToken as _Token,
+
+ open_shm_ndarray as open_shm_array,
+ attach_shm_ndarray as attach_shm_array,
+ open_shm_list,
+ attach_shm_list,
+
+ get_shm_token,
+ _known_tokens,
+ _make_token as _tractor_make_token,
+)
from ._util import log
-from ._source import def_iohlcv_fields
-from piker.types import Struct
-
-
-def cuckoff_mantracker():
- '''
- Disable all ``multiprocessing``` "resource tracking" machinery since
- it's an absolute multi-threaded mess of non-SC madness.
-
- '''
- from multiprocessing import resource_tracker as mantracker
-
- # Tell the "resource tracker" thing to fuck off.
- class ManTracker(mantracker.ResourceTracker):
- def register(self, name, rtype):
- pass
-
- def unregister(self, name, rtype):
- pass
-
- def ensure_running(self):
- pass
-
- # "know your land and know your prey"
- # https://www.dailymotion.com/video/x6ozzco
- mantracker._resource_tracker = ManTracker()
- mantracker.register = mantracker._resource_tracker.register
- mantracker.ensure_running = mantracker._resource_tracker.ensure_running
- mantracker.unregister = mantracker._resource_tracker.unregister
- mantracker.getfd = mantracker._resource_tracker.getfd
-
-
-cuckoff_mantracker()
-
-
-class SharedInt:
- """Wrapper around a single entry shared memory array which
- holds an ``int`` value used as an index counter.
-
- """
- def __init__(
- self,
- shm: SharedMemory,
- ) -> None:
- self._shm = shm
-
- @property
- def value(self) -> int:
- return int.from_bytes(self._shm.buf, byteorder)
-
- @value.setter
- def value(self, value) -> None:
- self._shm.buf[:] = value.to_bytes(self._shm.size, byteorder)
-
- def destroy(self) -> None:
- if _USE_POSIX:
- # We manually unlink to bypass all the "resource tracker"
- # nonsense meant for non-SC systems.
- name = self._shm.name
- try:
- shm_unlink(name)
- except FileNotFoundError:
- # might be a teardown race here?
- log.warning(f'Shm for {name} already unlinked?')
-
-
-class _Token(Struct, frozen=True):
- '''
- Internal represenation of a shared memory "token"
- which can be used to key a system wide post shm entry.
-
- '''
- shm_name: str # this servers as a "key" value
- shm_first_index_name: str
- shm_last_index_name: str
- dtype_descr: tuple
- size: int # in struct-array index / row terms
-
- @property
- def dtype(self) -> np.dtype:
- return np.dtype(list(map(tuple, self.dtype_descr))).descr
-
- def as_msg(self):
- return self.to_dict()
-
- @classmethod
- def from_msg(cls, msg: dict) -> _Token:
- if isinstance(msg, _Token):
- return msg
-
- # TODO: native struct decoding
- # return _token_dec.decode(msg)
-
- msg['dtype_descr'] = tuple(map(tuple, msg['dtype_descr']))
- return _Token(**msg)
-
-
-# _token_dec = msgspec.msgpack.Decoder(_Token)
-
-# TODO: this api?
-# _known_tokens = tractor.ActorVar('_shm_tokens', {})
-# _known_tokens = tractor.ContextStack('_known_tokens', )
-# _known_tokens = trio.RunVar('shms', {})
-
-# process-local store of keys to tokens
-_known_tokens = {}
-
-
-def get_shm_token(key: str) -> _Token:
- """Convenience func to check if a token
- for the provided key is known by this process.
- """
- return _known_tokens.get(key)
def _make_token(
@@ -154,494 +54,44 @@ def _make_token(
dtype: Optional[np.dtype] = None,
) -> _Token:
'''
- Create a serializable token that can be used
- to access a shared array.
+ Wrap tractor's ``_make_token()`` with piker's
+ default dtype fallback to ``def_iohlcv_fields``.
'''
- dtype = def_iohlcv_fields if dtype is None else dtype
- return _Token(
- shm_name=key,
- shm_first_index_name=key + "_first",
- shm_last_index_name=key + "_last",
- dtype_descr=tuple(np.dtype(dtype).descr),
- size=size,
+ from ._source import def_iohlcv_fields
+ dtype = (
+ def_iohlcv_fields
+ if dtype is None
+ else dtype
)
-
-
-class ShmArray:
- '''
- A shared memory ``numpy`` (compatible) array API.
-
- An underlying shared memory buffer is allocated based on
- a user specified ``numpy.ndarray``. This fixed size array
- can be read and written to by pushing data both onto the "front"
- or "back" of a set index range. The indexes for the "first" and
- "last" index are themselves stored in shared memory (accessed via
- ``SharedInt`` interfaces) values such that multiple processes can
- interact with the same array using a synchronized-index.
-
- '''
- def __init__(
- self,
- shmarr: np.ndarray,
- first: SharedInt,
- last: SharedInt,
- shm: SharedMemory,
- # readonly: bool = True,
- ) -> None:
- self._array = shmarr
-
- # indexes for first and last indices corresponding
- # to fille data
- self._first = first
- self._last = last
-
- self._len = len(shmarr)
- self._shm = shm
- self._post_init: bool = False
-
- # pushing data does not write the index (aka primary key)
- dtype = shmarr.dtype
- if dtype.fields:
- self._write_fields = list(shmarr.dtype.fields.keys())[1:]
- else:
- self._write_fields = None
-
- # TODO: ringbuf api?
-
- @property
- def _token(self) -> _Token:
- return _Token(
- shm_name=self._shm.name,
- shm_first_index_name=self._first._shm.name,
- shm_last_index_name=self._last._shm.name,
- dtype_descr=tuple(self._array.dtype.descr),
- size=self._len,
- )
-
- @property
- def token(self) -> dict:
- """Shared memory token that can be serialized and used by
- another process to attach to this array.
- """
- return self._token.as_msg()
-
- @property
- def index(self) -> int:
- return self._last.value % self._len
-
- @property
- def array(self) -> np.ndarray:
- '''
- Return an up-to-date ``np.ndarray`` view of the
- so-far-written data to the underlying shm buffer.
-
- '''
- a = self._array[self._first.value:self._last.value]
-
- # first, last = self._first.value, self._last.value
- # a = self._array[first:last]
-
- # TODO: eventually comment this once we've not seen it in the
- # wild in a long time..
- # XXX: race where first/last indexes cause a reader
- # to load an empty array..
- if len(a) == 0 and self._post_init:
- raise RuntimeError('Empty array race condition hit!?')
-
- return a
-
- def ustruct(
- self,
- fields: Optional[list[str]] = None,
-
- # type that all field values will be cast to
- # in the returned view.
- common_dtype: np.dtype = float,
-
- ) -> np.ndarray:
-
- array = self._array
-
- if fields:
- selection = array[fields]
- # fcount = len(fields)
- else:
- selection = array
- # fcount = len(array.dtype.fields)
-
- # XXX: manual ``.view()`` attempt that also doesn't work.
- # uview = selection.view(
- # dtype=' np.ndarray:
- '''
- Return the last ``length``'s worth of ("row") entries from the
- array.
-
- '''
- return self.array[-length:]
-
- def push(
- self,
- data: np.ndarray,
-
- field_map: Optional[dict[str, str]] = None,
- prepend: bool = False,
- update_first: bool = True,
- start: int | None = None,
-
- ) -> int:
- '''
- Ring buffer like "push" to append data
- into the buffer and return updated "last" index.
-
- NB: no actual ring logic yet to give a "loop around" on overflow
- condition, lel.
-
- '''
- length = len(data)
-
- if prepend:
- index = (start or self._first.value) - length
-
- if index < 0:
- raise ValueError(
- f'Array size of {self._len} was overrun during prepend.\n'
- f'You have passed {abs(index)} too many datums.'
- )
-
- else:
- index = start if start is not None else self._last.value
-
- end = index + length
-
- if field_map:
- src_names, dst_names = zip(*field_map.items())
- else:
- dst_names = src_names = self._write_fields
-
- try:
- self._array[
- list(dst_names)
- ][index:end] = data[list(src_names)][:]
-
- # NOTE: there was a race here between updating
- # the first and last indices and when the next reader
- # tries to access ``.array`` (which due to the index
- # overlap will be empty). Pretty sure we've fixed it now
- # but leaving this here as a reminder.
- if (
- prepend
- and update_first
- and length
- ):
- assert index < self._first.value
-
- if (
- index < self._first.value
- and update_first
- ):
- assert prepend, 'prepend=True not passed but index decreased?'
- self._first.value = index
-
- elif not prepend:
- self._last.value = end
-
- self._post_init = True
- return end
-
- except ValueError as err:
- if field_map:
- raise
-
- # should raise if diff detected
- self.diff_err_fields(data)
- raise err
-
- def diff_err_fields(
- self,
- data: np.ndarray,
- ) -> None:
- # reraise with any field discrepancy
- our_fields, their_fields = (
- set(self._array.dtype.fields),
- set(data.dtype.fields),
- )
-
- only_in_ours = our_fields - their_fields
- only_in_theirs = their_fields - our_fields
-
- if only_in_ours:
- raise TypeError(
- f"Input array is missing field(s): {only_in_ours}"
- )
- elif only_in_theirs:
- raise TypeError(
- f"Input array has unknown field(s): {only_in_theirs}"
- )
-
- # TODO: support "silent" prepends that don't update ._first.value?
- def prepend(
- self,
- data: np.ndarray,
- ) -> int:
- end = self.push(data, prepend=True)
- assert end
-
- def close(self) -> None:
- self._first._shm.close()
- self._last._shm.close()
- self._shm.close()
-
- def destroy(self) -> None:
- if _USE_POSIX:
- # We manually unlink to bypass all the "resource tracker"
- # nonsense meant for non-SC systems.
- shm_unlink(self._shm.name)
-
- self._first.destroy()
- self._last.destroy()
-
- def flush(self) -> None:
- # TODO: flush to storage backend like markestore?
- ...
-
-
-def open_shm_array(
- size: int,
- key: str | None = None,
- dtype: np.dtype | None = None,
- append_start_index: int | None = None,
- readonly: bool = False,
-
-) -> ShmArray:
- '''Open a memory shared ``numpy`` using the standard library.
-
- This call unlinks (aka permanently destroys) the buffer on teardown
- and thus should be used from the parent-most accessor (process).
-
- '''
- # create new shared mem segment for which we
- # have write permission
- a = np.zeros(size, dtype=dtype)
- a['index'] = np.arange(len(a))
-
- shm = SharedMemory(
- name=key,
- create=True,
- size=a.nbytes
- )
- array = np.ndarray(
- a.shape,
- dtype=a.dtype,
- buffer=shm.buf
- )
- array[:] = a[:]
- array.setflags(write=int(not readonly))
-
- token = _make_token(
+ return _tractor_make_token(
key=key,
size=size,
dtype=dtype,
)
- # create single entry arrays for storing an first and last indices
- first = SharedInt(
- shm=SharedMemory(
- name=token.shm_first_index_name,
- create=True,
- size=4, # std int
- )
- )
-
- last = SharedInt(
- shm=SharedMemory(
- name=token.shm_last_index_name,
- create=True,
- size=4, # std int
- )
- )
-
- # start the "real-time" updated section after 3-days worth of 1s
- # sampled OHLC. this allows appending up to a days worth from
- # tick/quote feeds before having to flush to a (tsdb) storage
- # backend, and looks something like,
- # -------------------------
- # | | i
- # _________________________
- # <-------------> <------->
- # history real-time
- #
- # Once fully "prepended", the history section will leave the
- # ``ShmArray._start.value: int = 0`` and the yet-to-be written
- # real-time section will start at ``ShmArray.index: int``.
-
- # this sets the index to nearly 2/3rds into the the length of
- # the buffer leaving at least a "days worth of second samples"
- # for the real-time section.
- if append_start_index is None:
- append_start_index = round(size * 0.616)
-
- last.value = first.value = append_start_index
-
- shmarr = ShmArray(
- array,
- first,
- last,
- shm,
- )
-
- assert shmarr._token == token
- _known_tokens[key] = shmarr.token
-
- # "unlink" created shm on process teardown by
- # pushing teardown calls onto actor context stack
- stack = tractor.current_actor(
- err_on_no_runtime=False,
- ).lifetime_stack
- if stack:
- stack.callback(shmarr.close)
- stack.callback(shmarr.destroy)
-
- return shmarr
-
-
-def attach_shm_array(
- token: tuple[str, str, tuple[str, str]],
- readonly: bool = True,
-
-) -> ShmArray:
- '''
- Attach to an existing shared memory array previously
- created by another process using ``open_shared_array``.
-
- No new shared mem is allocated but wrapper types for read/write
- access are constructed.
-
- '''
- token = _Token.from_msg(token)
- key = token.shm_name
-
- if key in _known_tokens:
- assert _Token.from_msg(_known_tokens[key]) == token, "WTF"
-
- # XXX: ugh, looks like due to the ``shm_open()`` C api we can't
- # actually place files in a subdir, see discussion here:
- # https://stackoverflow.com/a/11103289
-
- # attach to array buffer and view as per dtype
- _err: Optional[Exception] = None
- for _ in range(3):
- try:
- shm = SharedMemory(
- name=key,
- create=False,
- )
- break
- except OSError as oserr:
- _err = oserr
- time.sleep(0.1)
- else:
- if _err:
- raise _err
-
- shmarr = np.ndarray(
- (token.size,),
- dtype=token.dtype,
- buffer=shm.buf
- )
- shmarr.setflags(write=int(not readonly))
-
- first = SharedInt(
- shm=SharedMemory(
- name=token.shm_first_index_name,
- create=False,
- size=4, # std int
- ),
- )
- last = SharedInt(
- shm=SharedMemory(
- name=token.shm_last_index_name,
- create=False,
- size=4, # std int
- ),
- )
-
- # make sure we can read
- first.value
-
- sha = ShmArray(
- shmarr,
- first,
- last,
- shm,
- )
- # read test
- sha.array
-
- # Stash key -> token knowledge for future queries
- # via `maybe_opepn_shm_array()` but only after we know
- # we can attach.
- if key not in _known_tokens:
- _known_tokens[key] = token
-
- # "close" attached shm on actor teardown
- if (actor := tractor.current_actor(
- err_on_no_runtime=False,
- )):
- actor.lifetime_stack.callback(sha.close)
-
- return sha
-
def maybe_open_shm_array(
key: str,
size: int,
- dtype: np.dtype | None = None,
- append_start_index: int | None = None,
+ dtype: np.dtype|None = None,
+ append_start_index: int|None = None,
readonly: bool = False,
**kwargs,
-
) -> tuple[ShmArray, bool]:
'''
- Attempt to attach to a shared memory block using a "key" lookup
- to registered blocks in the users overall "system" registry
- (presumes you don't have the block's explicit token).
+ Attempt to attach to a shared memory block
+ using a "key" lookup to registered blocks in
+ the user's overall "system" registry (presumes
+ you don't have the block's explicit token).
- This function is meant to solve the problem of discovering whether
- a shared array token has been allocated or discovered by the actor
- running in **this** process. Systems where multiple actors may seek
- to access a common block can use this function to attempt to acquire
- a token as discovered by the actors who have previously stored
- a "key" -> ``_Token`` map in an actor local (aka python global)
- variable.
+ This is a thin wrapper around tractor's
+ ``maybe_open_shm_ndarray()`` preserving piker's
+ historical defaults (``readonly=False``,
+ ``append_start_index=None``).
- If you know the explicit ``_Token`` for your memory segment instead
- use ``attach_shm_array``.
+ If you know the explicit ``_Token`` for your
+ memory segment instead use ``attach_shm_array``.
'''
try:
@@ -655,7 +105,9 @@ def maybe_open_shm_array(
False,
)
except KeyError:
- log.debug(f"Could not find {key} in shms cache")
+ log.debug(
+ f'Could not find {key} in shms cache'
+ )
if dtype:
token = _make_token(
key,
@@ -663,9 +115,18 @@ def maybe_open_shm_array(
dtype=dtype,
)
try:
- return attach_shm_array(token=token, **kwargs), False
+ return (
+ attach_shm_array(
+ token=token,
+ **kwargs,
+ ),
+ False,
+ )
except FileNotFoundError:
- log.debug(f"Could not attach to shm with token {token}")
+ log.debug(
+ f'Could not attach to shm'
+ f' with token {token}'
+ )
# This actor does not know about memory
# associated with the provided "key".
@@ -683,18 +144,20 @@ def maybe_open_shm_array(
True,
)
-def try_read(
- array: np.ndarray
+def try_read(
+ array: np.ndarray,
) -> Optional[np.ndarray]:
'''
- Try to read the last row from a shared mem array or ``None``
- if the array read returns a zero-length array result.
+ Try to read the last row from a shared mem
+ array or ``None`` if the array read returns
+ a zero-length array result.
- Can be used to check for backfilling race conditions where an array
- is currently being (re-)written by a writer actor but the reader is
- unaware and reads during the window where the first and last indexes
- are being updated.
+ Can be used to check for backfilling race
+ conditions where an array is currently being
+ (re-)written by a writer actor but the reader
+ is unaware and reads during the window where
+ the first and last indexes are being updated.
'''
try:
@@ -702,14 +165,13 @@ def try_read(
except IndexError:
# XXX: race condition with backfilling shm.
#
- # the underlying issue is that a backfill (aka prepend) and subsequent
- # shm array first/last index update could result in an empty array
- # read here since the indices may be updated in such a way that
- # a read delivers an empty array (though it seems like we
- # *should* be able to prevent that?). also, as and alt and
- # something we need anyway, maybe there should be some kind of
- # signal that a prepend is taking place and this consumer can
- # respond (eg. redrawing graphics) accordingly.
+ # the underlying issue is that a backfill
+ # (aka prepend) and subsequent shm array
+ # first/last index update could result in an
+ # empty array read here since the indices may
+ # be updated in such a way that a read delivers
+ # an empty array (though it seems like we
+ # *should* be able to prevent that?).
- # the array read was emtpy
+ # the array read was empty
return None