From fa4130aeae219706e36dc923ba7aacca96f9e501 Mon Sep 17 00:00:00 2001 From: goodboy Date: Sat, 14 Mar 2026 17:11:42 -0400 Subject: [PATCH] Use `tractor.ipc._shm` types directly across codebase MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit Port all 16 internal import sites from re-exporting via `piker.data._sharedmem` shim to importing core shm types directly from `tractor.ipc._shm`. Deats, - `ShmArray` now imported from tractor in 10 files. - `_Token` renamed to `NDToken` everywhere (5 files). - `attach_shm_array` → `attach_shm_ndarray` at all call sites. - `data/__init__.py` sources `ShmArray`, `get_shm_token` from tractor; keeps `open/attach_shm_array` as public API aliases. - Trim shim to only piker-specific wrappers: `_make_token()`, `maybe_open_shm_array()`, `try_read()`. - Drop `Optional` usage in shim, use `|None`. (this patch was generated in some part by [`claude-code`][claude-code-gh]) [claude-code-gh]: https://github.com/anthropics/claude-code --- piker/brokers/deribit/feed.py | 2 +- piker/data/__init__.py | 10 ++++----- piker/data/_formatters.py | 4 +--- piker/data/_sampling.py | 14 +++++------- piker/data/_sharedmem.py | 41 +++++++++++++++-------------------- piker/data/flows.py | 14 ++++++------ piker/fsp/_api.py | 14 ++++++------ piker/fsp/_engine.py | 10 ++++----- piker/fsp/_momo.py | 2 +- piker/fsp/_volume.py | 2 +- piker/storage/cli.py | 4 +--- piker/storage/nativedb.py | 6 ++--- piker/tsp/_history.py | 8 +++---- piker/ui/_chart.py | 2 +- piker/ui/_dataviz.py | 4 +--- piker/ui/_fsp.py | 12 +++++----- 16 files changed, 65 insertions(+), 84 deletions(-) diff --git a/piker/brokers/deribit/feed.py b/piker/brokers/deribit/feed.py index 821aab87..94fb3f89 100644 --- a/piker/brokers/deribit/feed.py +++ b/piker/brokers/deribit/feed.py @@ -32,7 +32,7 @@ import tractor from piker.brokers import open_cached_client from piker.log import get_logger, get_console_log -from piker.data import ShmArray +from tractor.ipc._shm import ShmArray from piker.brokers._util import ( BrokerError, DataUnavailable, diff --git a/piker/data/__init__.py b/piker/data/__init__.py index 207eeaa1..d92c33ca 100644 --- a/piker/data/__init__.py +++ b/piker/data/__init__.py @@ -23,13 +23,13 @@ sharing live streams over a network. """ from .ticktools import iterticks -from ._sharedmem import ( - maybe_open_shm_array, - attach_shm_array, - open_shm_array, - get_shm_token, +from tractor.ipc._shm import ( ShmArray, + get_shm_token, + open_shm_ndarray as open_shm_array, + attach_shm_ndarray as attach_shm_array, ) +from ._sharedmem import maybe_open_shm_array from ._source import ( def_iohlcv_fields, def_ohlcv_fields, diff --git a/piker/data/_formatters.py b/piker/data/_formatters.py index 7c3058bb..7cc793e9 100644 --- a/piker/data/_formatters.py +++ b/piker/data/_formatters.py @@ -28,9 +28,7 @@ from msgspec import field import numpy as np from numpy.lib import recfunctions as rfn -from ._sharedmem import ( - ShmArray, -) +from tractor.ipc._shm import ShmArray from ._pathops import ( path_arrays_from_ohlc, ) diff --git a/piker/data/_sampling.py b/piker/data/_sampling.py index 74ecf114..02420bb0 100644 --- a/piker/data/_sampling.py +++ b/piker/data/_sampling.py @@ -55,9 +55,7 @@ from ._util import ( from ..service import maybe_spawn_daemon if TYPE_CHECKING: - from ._sharedmem import ( - ShmArray, - ) + from tractor.ipc._shm import ShmArray from .feed import ( _FeedsBus, Sub, @@ -378,16 +376,16 @@ async def register_with_sampler( # feed_is_live.is_set() # ^TODO? pass it in instead? ): - from ._sharedmem import ( - attach_shm_array, - _Token, + from tractor.ipc._shm import ( + attach_shm_ndarray, + NDToken, ) for period in shms_by_period: # load and register shm handles shm_token_msg = shms_by_period[period] - shm = attach_shm_array( - _Token.from_msg(shm_token_msg), + shm = attach_shm_ndarray( + NDToken.from_msg(shm_token_msg), readonly=False, ) shms_by_period[period] = shm diff --git a/piker/data/_sharedmem.py b/piker/data/_sharedmem.py index 3c7857b0..d894d392 100644 --- a/piker/data/_sharedmem.py +++ b/piker/data/_sharedmem.py @@ -18,31 +18,23 @@ # . ''' -NumPy shared memory buffers for real-time IPC streaming. +Piker-specific shared memory helpers. -Thin shim over ``tractor.ipc._shm`` providing -backward-compatible aliases for piker's historical API. +Thin shim providing piker-only wrappers around +``tractor.ipc._shm``; all core types and functions +are now imported directly from tractor throughout +the codebase. ''' -from typing import Optional - import numpy as np from tractor.ipc._shm import ( - SharedInt, + NDToken, ShmArray, - ShmList, - - NDToken as _Token, - - open_shm_ndarray as open_shm_array, - attach_shm_ndarray as attach_shm_array, - open_shm_list, - attach_shm_list, - - get_shm_token, _known_tokens, _make_token as _tractor_make_token, + open_shm_ndarray, + attach_shm_ndarray, ) from ._util import log @@ -51,8 +43,8 @@ from ._util import log def _make_token( key: str, size: int, - dtype: Optional[np.dtype] = None, -) -> _Token: + dtype: np.dtype|None = None, +) -> NDToken: ''' Wrap tractor's ``_make_token()`` with piker's default dtype fallback to ``def_iohlcv_fields``. @@ -90,15 +82,16 @@ def maybe_open_shm_array( historical defaults (``readonly=False``, ``append_start_index=None``). - If you know the explicit ``_Token`` for your - memory segment instead use ``attach_shm_array``. + If you know the explicit ``NDToken`` for your + memory segment instead use + ``tractor.ipc._shm.attach_shm_ndarray()``. ''' try: # see if we already know this key token = _known_tokens[key] return ( - attach_shm_array( + attach_shm_ndarray( token=token, readonly=readonly, ), @@ -116,7 +109,7 @@ def maybe_open_shm_array( ) try: return ( - attach_shm_array( + attach_shm_ndarray( token=token, **kwargs, ), @@ -134,7 +127,7 @@ def maybe_open_shm_array( # to fail if a block has been allocated # on the OS by someone else. return ( - open_shm_array( + open_shm_ndarray( key=key, size=size, dtype=dtype, @@ -147,7 +140,7 @@ def maybe_open_shm_array( def try_read( array: np.ndarray, -) -> Optional[np.ndarray]: +) -> np.ndarray|None: ''' Try to read the last row from a shared mem array or ``None`` if the array read returns diff --git a/piker/data/flows.py b/piker/data/flows.py index 573180b9..83b7460c 100644 --- a/piker/data/flows.py +++ b/piker/data/flows.py @@ -31,10 +31,10 @@ import pendulum import numpy as np from piker.types import Struct -from ._sharedmem import ( - attach_shm_array, +from tractor.ipc._shm import ( ShmArray, - _Token, + NDToken, + attach_shm_ndarray, ) from piker.accounting import MktPair @@ -64,11 +64,11 @@ class Flume(Struct): ''' mkt: MktPair first_quote: dict - _rt_shm_token: _Token + _rt_shm_token: NDToken # optional since some data flows won't have a "downsampled" history # buffer/stream (eg. FSPs). - _hist_shm_token: _Token | None = None + _hist_shm_token: NDToken|None = None # private shm refs loaded dynamically from tokens _hist_shm: ShmArray | None = None @@ -88,7 +88,7 @@ class Flume(Struct): def rt_shm(self) -> ShmArray: if self._rt_shm is None: - self._rt_shm = attach_shm_array( + self._rt_shm = attach_shm_ndarray( token=self._rt_shm_token, readonly=self._readonly, ) @@ -104,7 +104,7 @@ class Flume(Struct): ) if self._hist_shm is None: - self._hist_shm = attach_shm_array( + self._hist_shm = attach_shm_ndarray( token=self._hist_shm_token, readonly=self._readonly, ) diff --git a/piker/fsp/_api.py b/piker/fsp/_api.py index bb2dea50..c42391fc 100644 --- a/piker/fsp/_api.py +++ b/piker/fsp/_api.py @@ -37,12 +37,12 @@ import numpy as np import tractor from tractor.msg import NamespacePath -from ..data._sharedmem import ( +from tractor.ipc._shm import ( ShmArray, - maybe_open_shm_array, - attach_shm_array, - _Token, + NDToken, + attach_shm_ndarray, ) +from ..data._sharedmem import maybe_open_shm_array from ..log import get_logger log = get_logger(__name__) @@ -78,8 +78,8 @@ class Fsp: # + the consuming fsp *to* the consumers output # shm flow. _flow_registry: dict[ - tuple[_Token, str], - tuple[_Token, Optional[ShmArray]], + tuple[NDToken, str], + tuple[NDToken, Optional[ShmArray]], ] = {} def __init__( @@ -148,7 +148,7 @@ class Fsp: # times as possible as per: # - https://github.com/pikers/piker/issues/359 # - https://github.com/pikers/piker/issues/332 - maybe_array := attach_shm_array(dst_token) + maybe_array := attach_shm_ndarray(dst_token) ) return maybe_array diff --git a/piker/fsp/_engine.py b/piker/fsp/_engine.py index b7806719..8795a1fb 100644 --- a/piker/fsp/_engine.py +++ b/piker/fsp/_engine.py @@ -40,7 +40,7 @@ from ..log import ( ) from .. import data from ..data.flows import Flume -from ..data._sharedmem import ShmArray +from tractor.ipc._shm import ShmArray from ..data._sampling import ( _default_delay_s, open_sample_stream, @@ -49,7 +49,7 @@ from ..accounting import MktPair from ._api import ( Fsp, _load_builtins, - _Token, + NDToken, ) from ..toolz import Profiler @@ -414,7 +414,7 @@ async def cascade( dst_flume_addr: dict, ns_path: NamespacePath, - shm_registry: dict[str, _Token], + shm_registry: dict[str, NDToken], zero_on_step: bool = False, loglevel: str|None = None, @@ -465,9 +465,9 @@ async def cascade( # not sure how else to do it. for (token, fsp_name, dst_token) in shm_registry: Fsp._flow_registry[( - _Token.from_msg(token), + NDToken.from_msg(token), fsp_name, - )] = _Token.from_msg(dst_token), None + )] = NDToken.from_msg(dst_token), None fsp: Fsp = reg.get( NamespacePath(ns_path) diff --git a/piker/fsp/_momo.py b/piker/fsp/_momo.py index d1463c22..829f5f45 100644 --- a/piker/fsp/_momo.py +++ b/piker/fsp/_momo.py @@ -25,7 +25,7 @@ from numba import jit, float64, optional, int64 from ._api import fsp from ..data import iterticks -from ..data._sharedmem import ShmArray +from tractor.ipc._shm import ShmArray @jit( diff --git a/piker/fsp/_volume.py b/piker/fsp/_volume.py index 594e80e4..d0edfeb3 100644 --- a/piker/fsp/_volume.py +++ b/piker/fsp/_volume.py @@ -21,7 +21,7 @@ from tractor.trionics._broadcast import AsyncReceiver from ._api import fsp from ..data import iterticks -from ..data._sharedmem import ShmArray +from tractor.ipc._shm import ShmArray from ._momo import _wma from ..log import get_logger diff --git a/piker/storage/cli.py b/piker/storage/cli.py index c73d3b6d..b8015904 100644 --- a/piker/storage/cli.py +++ b/piker/storage/cli.py @@ -37,9 +37,7 @@ import typer from piker.service import open_piker_runtime from piker.cli import cli -from piker.data import ( - ShmArray, -) +from tractor.ipc._shm import ShmArray from piker import tsp from . import log from . import ( diff --git a/piker/storage/nativedb.py b/piker/storage/nativedb.py index 8a948cab..edb3a362 100644 --- a/piker/storage/nativedb.py +++ b/piker/storage/nativedb.py @@ -64,10 +64,8 @@ from pendulum import ( from piker import config from piker import tsp -from piker.data import ( - def_iohlcv_fields, - ShmArray, -) +from tractor.ipc._shm import ShmArray +from piker.data import def_iohlcv_fields from piker.log import get_logger from . import TimeseriesNotFound diff --git a/piker/tsp/_history.py b/piker/tsp/_history.py index 0e75162f..fcc1d331 100644 --- a/piker/tsp/_history.py +++ b/piker/tsp/_history.py @@ -59,11 +59,11 @@ from piker.brokers import NoData from piker.accounting import ( MktPair, ) -from piker.log import get_logger -from ..data._sharedmem import ( - maybe_open_shm_array, - ShmArray, +from piker.log import ( + get_logger, ) +from tractor.ipc._shm import ShmArray +from ..data._sharedmem import maybe_open_shm_array from piker.data._source import ( def_iohlcv_fields, ) diff --git a/piker/ui/_chart.py b/piker/ui/_chart.py index e6dbd69f..fc24570e 100644 --- a/piker/ui/_chart.py +++ b/piker/ui/_chart.py @@ -49,7 +49,7 @@ from ._cursor import ( Cursor, ContentsLabel, ) -from ..data._sharedmem import ShmArray +from tractor.ipc._shm import ShmArray from ._ohlc import BarItems from ._curve import ( Curve, diff --git a/piker/ui/_dataviz.py b/piker/ui/_dataviz.py index cc4529be..eeece1fa 100644 --- a/piker/ui/_dataviz.py +++ b/piker/ui/_dataviz.py @@ -42,9 +42,7 @@ from numpy import ( import pyqtgraph as pg from piker.ui.qt import QLineF -from ..data._sharedmem import ( - ShmArray, -) +from tractor.ipc._shm import ShmArray from ..data.flows import Flume from ..data._formatters import ( IncrementalFormatter, diff --git a/piker/ui/_fsp.py b/piker/ui/_fsp.py index 7a2df5e6..8d18beee 100644 --- a/piker/ui/_fsp.py +++ b/piker/ui/_fsp.py @@ -44,14 +44,12 @@ from piker.fsp import ( dolla_vlm, flow_rates, ) -from piker.data import ( - Flume, +from tractor.ipc._shm import ( ShmArray, + NDToken, ) -from piker.data._sharedmem import ( - _Token, - try_read, -) +from piker.data import Flume +from piker.data._sharedmem import try_read from piker.log import get_logger from piker.toolz import Profiler from piker.types import Struct @@ -382,7 +380,7 @@ class FspAdmin: tuple, tuple[tractor.MsgStream, ShmArray] ] = {} - self._flow_registry: dict[_Token, str] = {} + self._flow_registry: dict[NDToken, str] = {} # TODO: make this a `.src_flume` and add # a `dst_flume`?