Use `tractor.ipc._shm` types directly across codebase

Port all 16 internal import sites from re-exporting
via `piker.data._sharedmem` shim to importing core
shm types directly from `tractor.ipc._shm`.

Deats,
- `ShmArray` now imported from tractor in 10 files.
- `_Token` renamed to `NDToken` everywhere (5 files).
- `attach_shm_array` → `attach_shm_ndarray` at all
  call sites.
- `data/__init__.py` sources `ShmArray`,
  `get_shm_token` from tractor; keeps
  `open/attach_shm_array` as public API aliases.
- Trim shim to only piker-specific wrappers:
  `_make_token()`, `maybe_open_shm_array()`,
  `try_read()`.
- Drop `Optional` usage in shim, use `|None`.

(this patch was generated in some part by [`claude-code`][claude-code-gh])
[claude-code-gh]: https://github.com/anthropics/claude-code
Gud Boi 2026-03-14 17:11:42 -04:00
parent 14e4eb495e
commit fa4130aeae
16 changed files with 65 additions and 84 deletions

View File

@ -32,7 +32,7 @@ import tractor
from piker.brokers import open_cached_client from piker.brokers import open_cached_client
from piker.log import get_logger, get_console_log from piker.log import get_logger, get_console_log
from piker.data import ShmArray from tractor.ipc._shm import ShmArray
from piker.brokers._util import ( from piker.brokers._util import (
BrokerError, BrokerError,
DataUnavailable, DataUnavailable,

View File

@ -23,13 +23,13 @@ sharing live streams over a network.
""" """
from .ticktools import iterticks from .ticktools import iterticks
from ._sharedmem import ( from tractor.ipc._shm import (
maybe_open_shm_array,
attach_shm_array,
open_shm_array,
get_shm_token,
ShmArray, ShmArray,
get_shm_token,
open_shm_ndarray as open_shm_array,
attach_shm_ndarray as attach_shm_array,
) )
from ._sharedmem import maybe_open_shm_array
from ._source import ( from ._source import (
def_iohlcv_fields, def_iohlcv_fields,
def_ohlcv_fields, def_ohlcv_fields,

View File

@ -28,9 +28,7 @@ from msgspec import field
import numpy as np import numpy as np
from numpy.lib import recfunctions as rfn from numpy.lib import recfunctions as rfn
from ._sharedmem import ( from tractor.ipc._shm import ShmArray
ShmArray,
)
from ._pathops import ( from ._pathops import (
path_arrays_from_ohlc, path_arrays_from_ohlc,
) )

View File

@ -55,9 +55,7 @@ from ._util import (
from ..service import maybe_spawn_daemon from ..service import maybe_spawn_daemon
if TYPE_CHECKING: if TYPE_CHECKING:
from ._sharedmem import ( from tractor.ipc._shm import ShmArray
ShmArray,
)
from .feed import ( from .feed import (
_FeedsBus, _FeedsBus,
Sub, Sub,
@ -378,16 +376,16 @@ async def register_with_sampler(
# feed_is_live.is_set() # feed_is_live.is_set()
# ^TODO? pass it in instead? # ^TODO? pass it in instead?
): ):
from ._sharedmem import ( from tractor.ipc._shm import (
attach_shm_array, attach_shm_ndarray,
_Token, NDToken,
) )
for period in shms_by_period: for period in shms_by_period:
# load and register shm handles # load and register shm handles
shm_token_msg = shms_by_period[period] shm_token_msg = shms_by_period[period]
shm = attach_shm_array( shm = attach_shm_ndarray(
_Token.from_msg(shm_token_msg), NDToken.from_msg(shm_token_msg),
readonly=False, readonly=False,
) )
shms_by_period[period] = shm shms_by_period[period] = shm

View File

@ -18,31 +18,23 @@
# <https://www.gnu.org/licenses/>. # <https://www.gnu.org/licenses/>.
''' '''
NumPy shared memory buffers for real-time IPC streaming. Piker-specific shared memory helpers.
Thin shim over ``tractor.ipc._shm`` providing Thin shim providing piker-only wrappers around
backward-compatible aliases for piker's historical API. ``tractor.ipc._shm``; all core types and functions
are now imported directly from tractor throughout
the codebase.
''' '''
from typing import Optional
import numpy as np import numpy as np
from tractor.ipc._shm import ( from tractor.ipc._shm import (
SharedInt, NDToken,
ShmArray, ShmArray,
ShmList,
NDToken as _Token,
open_shm_ndarray as open_shm_array,
attach_shm_ndarray as attach_shm_array,
open_shm_list,
attach_shm_list,
get_shm_token,
_known_tokens, _known_tokens,
_make_token as _tractor_make_token, _make_token as _tractor_make_token,
open_shm_ndarray,
attach_shm_ndarray,
) )
from ._util import log from ._util import log
@ -51,8 +43,8 @@ from ._util import log
def _make_token( def _make_token(
key: str, key: str,
size: int, size: int,
dtype: Optional[np.dtype] = None, dtype: np.dtype|None = None,
) -> _Token: ) -> NDToken:
''' '''
Wrap tractor's ``_make_token()`` with piker's Wrap tractor's ``_make_token()`` with piker's
default dtype fallback to ``def_iohlcv_fields``. default dtype fallback to ``def_iohlcv_fields``.
@ -90,15 +82,16 @@ def maybe_open_shm_array(
historical defaults (``readonly=False``, historical defaults (``readonly=False``,
``append_start_index=None``). ``append_start_index=None``).
If you know the explicit ``_Token`` for your If you know the explicit ``NDToken`` for your
memory segment instead use ``attach_shm_array``. memory segment instead use
``tractor.ipc._shm.attach_shm_ndarray()``.
''' '''
try: try:
# see if we already know this key # see if we already know this key
token = _known_tokens[key] token = _known_tokens[key]
return ( return (
attach_shm_array( attach_shm_ndarray(
token=token, token=token,
readonly=readonly, readonly=readonly,
), ),
@ -116,7 +109,7 @@ def maybe_open_shm_array(
) )
try: try:
return ( return (
attach_shm_array( attach_shm_ndarray(
token=token, token=token,
**kwargs, **kwargs,
), ),
@ -134,7 +127,7 @@ def maybe_open_shm_array(
# to fail if a block has been allocated # to fail if a block has been allocated
# on the OS by someone else. # on the OS by someone else.
return ( return (
open_shm_array( open_shm_ndarray(
key=key, key=key,
size=size, size=size,
dtype=dtype, dtype=dtype,
@ -147,7 +140,7 @@ def maybe_open_shm_array(
def try_read( def try_read(
array: np.ndarray, array: np.ndarray,
) -> Optional[np.ndarray]: ) -> np.ndarray|None:
''' '''
Try to read the last row from a shared mem Try to read the last row from a shared mem
array or ``None`` if the array read returns array or ``None`` if the array read returns

View File

@ -31,10 +31,10 @@ import pendulum
import numpy as np import numpy as np
from piker.types import Struct from piker.types import Struct
from ._sharedmem import ( from tractor.ipc._shm import (
attach_shm_array,
ShmArray, ShmArray,
_Token, NDToken,
attach_shm_ndarray,
) )
from piker.accounting import MktPair from piker.accounting import MktPair
@ -64,11 +64,11 @@ class Flume(Struct):
''' '''
mkt: MktPair mkt: MktPair
first_quote: dict first_quote: dict
_rt_shm_token: _Token _rt_shm_token: NDToken
# optional since some data flows won't have a "downsampled" history # optional since some data flows won't have a "downsampled" history
# buffer/stream (eg. FSPs). # buffer/stream (eg. FSPs).
_hist_shm_token: _Token | None = None _hist_shm_token: NDToken|None = None
# private shm refs loaded dynamically from tokens # private shm refs loaded dynamically from tokens
_hist_shm: ShmArray | None = None _hist_shm: ShmArray | None = None
@ -88,7 +88,7 @@ class Flume(Struct):
def rt_shm(self) -> ShmArray: def rt_shm(self) -> ShmArray:
if self._rt_shm is None: if self._rt_shm is None:
self._rt_shm = attach_shm_array( self._rt_shm = attach_shm_ndarray(
token=self._rt_shm_token, token=self._rt_shm_token,
readonly=self._readonly, readonly=self._readonly,
) )
@ -104,7 +104,7 @@ class Flume(Struct):
) )
if self._hist_shm is None: if self._hist_shm is None:
self._hist_shm = attach_shm_array( self._hist_shm = attach_shm_ndarray(
token=self._hist_shm_token, token=self._hist_shm_token,
readonly=self._readonly, readonly=self._readonly,
) )

View File

@ -37,12 +37,12 @@ import numpy as np
import tractor import tractor
from tractor.msg import NamespacePath from tractor.msg import NamespacePath
from ..data._sharedmem import ( from tractor.ipc._shm import (
ShmArray, ShmArray,
maybe_open_shm_array, NDToken,
attach_shm_array, attach_shm_ndarray,
_Token,
) )
from ..data._sharedmem import maybe_open_shm_array
from ..log import get_logger from ..log import get_logger
log = get_logger(__name__) log = get_logger(__name__)
@ -78,8 +78,8 @@ class Fsp:
# + the consuming fsp *to* the consumers output # + the consuming fsp *to* the consumers output
# shm flow. # shm flow.
_flow_registry: dict[ _flow_registry: dict[
tuple[_Token, str], tuple[NDToken, str],
tuple[_Token, Optional[ShmArray]], tuple[NDToken, Optional[ShmArray]],
] = {} ] = {}
def __init__( def __init__(
@ -148,7 +148,7 @@ class Fsp:
# times as possible as per: # times as possible as per:
# - https://github.com/pikers/piker/issues/359 # - https://github.com/pikers/piker/issues/359
# - https://github.com/pikers/piker/issues/332 # - https://github.com/pikers/piker/issues/332
maybe_array := attach_shm_array(dst_token) maybe_array := attach_shm_ndarray(dst_token)
) )
return maybe_array return maybe_array

View File

@ -40,7 +40,7 @@ from ..log import (
) )
from .. import data from .. import data
from ..data.flows import Flume from ..data.flows import Flume
from ..data._sharedmem import ShmArray from tractor.ipc._shm import ShmArray
from ..data._sampling import ( from ..data._sampling import (
_default_delay_s, _default_delay_s,
open_sample_stream, open_sample_stream,
@ -49,7 +49,7 @@ from ..accounting import MktPair
from ._api import ( from ._api import (
Fsp, Fsp,
_load_builtins, _load_builtins,
_Token, NDToken,
) )
from ..toolz import Profiler from ..toolz import Profiler
@ -414,7 +414,7 @@ async def cascade(
dst_flume_addr: dict, dst_flume_addr: dict,
ns_path: NamespacePath, ns_path: NamespacePath,
shm_registry: dict[str, _Token], shm_registry: dict[str, NDToken],
zero_on_step: bool = False, zero_on_step: bool = False,
loglevel: str|None = None, loglevel: str|None = None,
@ -465,9 +465,9 @@ async def cascade(
# not sure how else to do it. # not sure how else to do it.
for (token, fsp_name, dst_token) in shm_registry: for (token, fsp_name, dst_token) in shm_registry:
Fsp._flow_registry[( Fsp._flow_registry[(
_Token.from_msg(token), NDToken.from_msg(token),
fsp_name, fsp_name,
)] = _Token.from_msg(dst_token), None )] = NDToken.from_msg(dst_token), None
fsp: Fsp = reg.get( fsp: Fsp = reg.get(
NamespacePath(ns_path) NamespacePath(ns_path)

View File

@ -25,7 +25,7 @@ from numba import jit, float64, optional, int64
from ._api import fsp from ._api import fsp
from ..data import iterticks from ..data import iterticks
from ..data._sharedmem import ShmArray from tractor.ipc._shm import ShmArray
@jit( @jit(

View File

@ -21,7 +21,7 @@ from tractor.trionics._broadcast import AsyncReceiver
from ._api import fsp from ._api import fsp
from ..data import iterticks from ..data import iterticks
from ..data._sharedmem import ShmArray from tractor.ipc._shm import ShmArray
from ._momo import _wma from ._momo import _wma
from ..log import get_logger from ..log import get_logger

View File

@ -37,9 +37,7 @@ import typer
from piker.service import open_piker_runtime from piker.service import open_piker_runtime
from piker.cli import cli from piker.cli import cli
from piker.data import ( from tractor.ipc._shm import ShmArray
ShmArray,
)
from piker import tsp from piker import tsp
from . import log from . import log
from . import ( from . import (

View File

@ -64,10 +64,8 @@ from pendulum import (
from piker import config from piker import config
from piker import tsp from piker import tsp
from piker.data import ( from tractor.ipc._shm import ShmArray
def_iohlcv_fields, from piker.data import def_iohlcv_fields
ShmArray,
)
from piker.log import get_logger from piker.log import get_logger
from . import TimeseriesNotFound from . import TimeseriesNotFound

View File

@ -59,11 +59,11 @@ from piker.brokers import NoData
from piker.accounting import ( from piker.accounting import (
MktPair, MktPair,
) )
from piker.log import get_logger from piker.log import (
from ..data._sharedmem import ( get_logger,
maybe_open_shm_array,
ShmArray,
) )
from tractor.ipc._shm import ShmArray
from ..data._sharedmem import maybe_open_shm_array
from piker.data._source import ( from piker.data._source import (
def_iohlcv_fields, def_iohlcv_fields,
) )

View File

@ -49,7 +49,7 @@ from ._cursor import (
Cursor, Cursor,
ContentsLabel, ContentsLabel,
) )
from ..data._sharedmem import ShmArray from tractor.ipc._shm import ShmArray
from ._ohlc import BarItems from ._ohlc import BarItems
from ._curve import ( from ._curve import (
Curve, Curve,

View File

@ -42,9 +42,7 @@ from numpy import (
import pyqtgraph as pg import pyqtgraph as pg
from piker.ui.qt import QLineF from piker.ui.qt import QLineF
from ..data._sharedmem import ( from tractor.ipc._shm import ShmArray
ShmArray,
)
from ..data.flows import Flume from ..data.flows import Flume
from ..data._formatters import ( from ..data._formatters import (
IncrementalFormatter, IncrementalFormatter,

View File

@ -44,14 +44,12 @@ from piker.fsp import (
dolla_vlm, dolla_vlm,
flow_rates, flow_rates,
) )
from piker.data import ( from tractor.ipc._shm import (
Flume,
ShmArray, ShmArray,
NDToken,
) )
from piker.data._sharedmem import ( from piker.data import Flume
_Token, from piker.data._sharedmem import try_read
try_read,
)
from piker.log import get_logger from piker.log import get_logger
from piker.toolz import Profiler from piker.toolz import Profiler
from piker.types import Struct from piker.types import Struct
@ -382,7 +380,7 @@ class FspAdmin:
tuple, tuple,
tuple[tractor.MsgStream, ShmArray] tuple[tractor.MsgStream, ShmArray]
] = {} ] = {}
self._flow_registry: dict[_Token, str] = {} self._flow_registry: dict[NDToken, str] = {}
# TODO: make this a `.src_flume` and add # TODO: make this a `.src_flume` and add
# a `dst_flume`? # a `dst_flume`?