Compare commits

..

29 Commits

Author SHA1 Message Date
Gud Boi 4b9c6938b6 Exclude crypto futes from `without_src` sym key
Extend the `col_sym_key` asset-type check in `start_backfill()`
to also exclude crypto-denominated futures (where `src` is
`'crypto_currency'` and `dst` is `'future'`) from the
`without_src=True` fqme path.

Also in `.brokers.binance` backend (it being the guilty culprit in the
discovery of this bug; and why i touched styling this code),

- reformat `make_sub()` fn sig to multiline style in
  `.binance.feed`.
- add backtick around `dict` in `make_sub()` docstring.
- reformat `or` conditionals to multiline style in
  `.binance.feed.get_mkt_info()`.

(this commit msg was generated in some part by [`claude-code`][claude-code-gh])
[claude-code-gh]: https://github.com/anthropics/claude-code
2026-03-17 18:14:06 -04:00
Gud Boi 2b000677d4 Drop `Flume.feed`, it's unused yet causes import cycles.. 2026-03-17 18:14:01 -04:00
Gud Boi cca1e19fa8 Just warn on single-bar nulls instead of bping
Replace the debug breakpoint with a warning-log when a single-bar
null-segment is detected in `get_null_segs()`. This lets the gap
analysis continue while still alerting about the anomaly.

Deats,
- extract the 3-bar window (before, null, after) and calculate
  a `gap: pendulum.Interval` for the warning msg.
- comment-out the old breakpoint block for optional debugging as needed.

(this commit msg was generated in some part by [`claude-code`][claude-code-gh])
[claude-code-gh]: https://github.com/anthropics/claude-code
2026-03-17 18:14:01 -04:00
Gud Boi e65b909a27 Lul, drop long unused poetry lock file 2026-03-17 18:14:01 -04:00
Gud Boi 0a08a74b33 Pin `pg` at latest official `0.14.0` release
Keep in masked GH sources lines for easy hackin against upstream
`master` branch when needed as well!
2026-03-17 18:14:01 -04:00
Gud Boi 3c13be519c .ui._editors: log multiline styling and re-leveling 2026-03-17 18:14:01 -04:00
Gud Boi e8a465eb51 .ui._lines: drop unused graphics-item import 2026-03-17 18:14:01 -04:00
Gud Boi 2ade3cfa24 Add batch-submit API for gap annotations
Introduce `AnnotCtl.add_batch()` and `serve_rc_annots()` batch
handler to submit 1000s of gaps in single IPC msg instead of
per-annot round-trips. Server builds `GapAnnotations` from specs
and handles vectorized timestamp-to-index lookups.

Deats,
- add `'cmd': 'batch'` handler in `serve_rc_annots()`
- vectorized timestamp lookup via `np.searchsorted()` + masking
- build `gap_specs: list[dict]` from rect+arrow specs client-side
- create single `GapAnnotations` item for all gaps server-side
- handle `GapAnnotations.reposition()` in redraw handler
- add profiling to batch path for perf measurement
- support optional individual arrows for A/B comparison

Also,
- refactor `markup_gaps()` to collect specs + single batch call
- add `no_qt_updates()` context mgr for batch render ops
- add profiling to annotation teardown path
- add `GapAnnotations` case to `rm_annot()` match block

(this patch was generated in some part by [`claude-code`][claude-code-gh])
[claude-code-gh]: https://github.com/anthropics/claude-code
2026-03-17 18:14:01 -04:00
Gud Boi d5edb7bbea Add a `GapAnnotations` path-renderer
For a ~1000x perf gain says ol' claudy, our boi who wrote this entire
patch! Bo

Introduce `GapAnnotations` in `.ui._annotate` for batch-rendering gap
rects/arrows instead of individual `QGraphicsItem` instances. Uses
upstream's `pyqtgraph.Qt.internals.PrimitiveArray` for rects and
a `QPainterPath` for arrows. This API-replicates our prior annotator's
in view shape-graphics but now using (what we're dubbing)
"single-array-multiple-graphics" tech much like our `.ui._curve`
extensions to `pg` B)

Impl deats,
- batch draw ~1000 gaps in single paint call vs 1000 items
- arrows render in scene coords to maintain pixel size on zoom
- add vectorized timestamp-to-index lookup for repositioning
- cache bounding rect, rebuild on `reposition()` calls
- match `SelectRect` + `ArrowItem` visual style/colors
- skip reposition when timeframe doesn't match gap's period

Other,
- fix typo in `LevelMarker` docstring: "graphich" -> "graphic"
- reflow docstring in `qgo_draw_markers()` to 67 char limit

(this patch was generated in some part by [`claude-code`][claude-code-gh])
[claude-code-gh]: https://github.com/anthropics/claude-code
2026-03-17 18:14:01 -04:00
Gud Boi 19e79a4926 Add info log for shm processing in `ldshm` CLI cmd
Log shm file name and detected period before null segment
processing to aid debugging.

(this patch was generated in some part by [`claude-code`][claude-code-gh])
[claude-code-gh]: https://github.com/anthropics/claude-code
2026-03-17 18:14:01 -04:00
Gud Boi 4957db8e74 Bump to latest official `pyqtgraph` release 2026-03-17 18:14:01 -04:00
Gud Boi 3ff5542c00 Improve styling and logging for UI font-size zoom
Refine zoom methods in `MainWindow` and font helpers
in `_style` to return `px_size` up the call chain and
log detailed zoom state on each change.

Deats,
- make `_set_qfont_px_size()` return `self.px_size`.
- make `configure_to_dpi()` and `_config_fonts_to_screen()`
  return the new `px_size` up through the call chain.
- add `font_size` to `log.info()` in `zoom_in()`,
  `zoom_out()`, and `reset_zoom()` alongside
  `zoom_step` and `zoom_level(%)`.
- reformat `has_ctrl`/`_has_shift` bitwise checks and
  key-match tuples to multiline style.
- comment out `Shift` modifier requirement for zoom
  hotkeys (now `Ctrl`-only).
- comment out unused `mn_dpi` and `dpi` locals.

Also,
- convert all single-line docstrings to `'''` multiline
  style across zoom and font methods.
- rewrap `configure_to_dpi()` docstring to 67 chars.
- move `from . import _style` to module-level import
  in `_window.py`.
- drop unused `screen` binding in `boundingRect()`.

(this commit msg was generated in some part by [`claude-code`][claude-code-gh])
[claude-code-gh]: https://github.com/anthropics/claude-code
2026-03-17 18:13:41 -04:00
di1ara 018dd573bc improve ui zoom defaults 2026-03-17 18:13:41 -04:00
Gud Boi 8d1927f651 Fix chart axis scaling on UI zoom level change
Again a patch (vibed) from our very own @dnks
(just a commit msg reworking using his new `/commit-msg` skill added by
@goodboy B)

Deats,
- add `Axis.update_fonts()` to recalculate tick font, text offset,
  bounding rect and `pyqtgraph`'s internal text-width/height tracking
  after a zoom change; store `_typical_max_str` at init for later reuse.
- rework `PriceAxis.size_to_values()` and
  `DynamicDateAxis.size_to_values()` to use pyqtgraph's
  `_updateWidth()`/`_updateHeight()` with `updateGeometry()` instead of
  raw `setWidth()`/ `setHeight()` so auto-expand constraints are
  respected.
- fix `GlobalZoomEventFilter` to mask out `KeypadModifier` and
  explicitly require both Ctrl+Shift, letting plain Ctrl+Plus/Minus pass
  through to chart zoom.
- add `_update_chart_axes()` to walk all plot-item axes during
  `_apply_zoom()` and call `splits.resize_sidepanes()` to sync subplot
  widths.

(this commit msg, and likely patch, was generated in some part by
[`claude-code`][claude-code-gh])
[claude-code-gh]: https://github.com/anthropics/claude-code
2026-03-17 18:13:41 -04:00
Gud Boi 084b2ee151 Add global UI font-size zoom scaling (from @dnks)
Add `Ctrl+Shift+Plus/Minus/0` shortcuts for zooming all
UI widget font sizes via a `GlobalZoomEventFilter`
installed at the `QApplication` level.

Deats,
- `.ui._window`: add `GlobalZoomEventFilter` event
  filter class and `MainWindow.zoom_in/out/reset_zoom()`
  methods that reconfigure `DpiAwareFont` with a
  `zoom_level` multiplier then propagate to all child
  widgets.
- `.ui._style`: extend `DpiAwareFont.configure_to_dpi()`
  and `_config_fonts_to_screen()` to accept a
  `zoom_level` float multiplier; cast `px_size` to `int`.
- `.ui._forms`: add `update_fonts()` to `Edit`,
  `Selection`, `FieldsForm` and `FillStatusBar` for
  stylesheet regen.
- `.ui._label`: add `FormatLabel.update_font()` method.
- `.ui._position`: add `SettingsPane.update_fonts()`.
- `.ui._search`: add `update_fonts()` to `CompleterView`
  and `SearchWidget`.
- `.ui._exec`: install the zoom filter on window show.
- `.ui.qt`: import `QObject` from `PyQt6`.

(this commit msg was generated in some part by [`claude-code`][claude-code-gh])
[claude-code-gh]: https://github.com/anthropics/claude-code
2026-03-17 18:13:41 -04:00
Tyler Goodlet 5d90a332a8 Add `.xsh` script mentioned in gitea #50
Note since it's actually `xonsh` code run with either,
- most pedantically: `xonsh ./snippets/calc_ppi.xsh`
- or relying on how shebang: `./snippets/calc_ppi.xsh`
  * an sheboom.
2026-03-17 18:13:41 -04:00
Tyler Goodlet b18d3be73f Reorder imports in `qt_screen_info.py` ??
For wtv reason on nixos importing `pyqtgraph` first is causing `numpy`
to fail import?? No idea, but likely something to do with recent
`flake.nix`'s ld-lib-linking with `<nixpkgs>` marlarky?
2026-03-17 18:13:41 -04:00
Tyler Goodlet f9940ea249 Add some Qt DPI extras to `qt_screen_info.py`
- set `QT_USE_PHYSICAL_DPI='1'` env var for Qt6 high-DPI
  * we likely want to do this in `piker.ui` as well!
- move `pxr` calc from widget to per-screen in loop.
- add `unscaled_size` calc using `pxr * size`.
- switch from `.availableGeometry()` to `.geometry()` for full
  bounds.
- shorten output labels, add `!r` repr formatting
- add Qt6 DPI rounding policy TODO with doc links

(this patch was generated in some part by [`claude-code`][claude-code-gh])
[claude-code-gh]: https://github.com/anthropics/claude-code
2026-03-17 18:13:41 -04:00
Tyler Goodlet 71ec6b3516 Re-fmt and `.info()` the `.configure_to_dpi()` DPI calcs for now 2026-03-17 18:13:41 -04:00
di1ara d77963fbf2 fixed spacing 2026-03-17 18:13:41 -04:00
di1ara 9ba7a3b9e4 fixed pytest test for dpi font auto calculation 2026-03-17 18:13:41 -04:00
di1ara 0fd7acbc25 added pytest, moved dependencies 2026-03-17 18:13:41 -04:00
di1ara 8f81d8a604 fix DpiAwareFont default size calculation 2026-03-17 18:13:41 -04:00
Gud Boi e7a867126b Add a console log in `.tsp._history.manage_history()`
Set the passed down `loglevel`.
2026-03-17 18:12:18 -04:00
Gud Boi 0a2059d00f Use `tractor.ipc._shm` types directly across codebase
Port all 16 internal import sites from re-exporting
via `piker.data._sharedmem` shim to importing core
shm types directly from `tractor.ipc._shm`.

Deats,
- `ShmArray` now imported from tractor in 10 files.
- `_Token` renamed to `NDToken` everywhere (5 files).
- `attach_shm_array` → `attach_shm_ndarray` at all
  call sites.
- `data/__init__.py` sources `ShmArray`,
  `get_shm_token` from tractor; keeps
  `open/attach_shm_array` as public API aliases.
- Trim shim to only piker-specific wrappers:
  `_make_token()`, `maybe_open_shm_array()`,
  `try_read()`.
- Drop `Optional` usage in shim, use `|None`.

(this patch was generated in some part by [`claude-code`][claude-code-gh])
[claude-code-gh]: https://github.com/anthropics/claude-code
2026-03-17 18:00:29 -04:00
Gud Boi b71d0533b2 Port `_sharedmem` to thin shim over `tractor.ipc._shm`
Replace the ~716 line `piker.data._sharedmem` mod with a thin re-export
shim consuming `tractor.ipc._shm` types directly, since the `tractor`
version is the refined factoring of piker's original impl.

Deats,
- Re-export `SharedInt`, `ShmArray`, `ShmList`, `get_shm_token`,
  `_known_tokens` directly
- Alias renames: `NDToken as _Token`, `open_shm_ndarray as
  open_shm_array`, `attach_shm_ndarray as attach_shm_array`
- Keep `_make_token()` wrapper for piker's default dtype fallback to
  `def_iohlcv_fields`
- Keep `maybe_open_shm_array()` wrapper preserving piker's historical
  defaults (`readonly=False`, `append_start_index=None`)
- Keep `try_read()` race-condition guard (not in `tractor`)

All 13 import sites across piker continue to work unchanged with no
modifications needed.

(this patch was generated in some part by [`claude-code`][claude-code-gh])
[claude-code-gh]: https://github.com/anthropics/claude-code
2026-03-17 18:00:29 -04:00
Gud Boi c23d034935 Merge pull request 'Swap `tractor.to_asyncio.open_channel_from()` yield-pair order' (#90) from to_asyncio_api_update into main
Reviewed-on: https://www.pikers.dev/pikers/piker/pulls/90
Reviewed-by: guille <guillermo@telos.net>
2026-03-17 21:58:15 +00:00
Gud Boi 347a2d67f9 Pin to `tractor` upstream `main` branch 2026-03-17 16:51:15 -04:00
Gud Boi b22c59e7a0 Swap `open_channel_from()` yield-pair order
Match upstream `tractor` API change where
`open_channel_from()` now yields `(chan, first)`
instead of `(first, chan)` — i.e.
`tuple[LinkedTaskChannel, Any]`.

- `brokers/ib/api.py`
- `brokers/ib/broker.py`
- `brokers/ib/feed.py`
- `brokers/deribit/api.py` (2 sites)

(this patch was generated in some part by [`claude-code`][claude-code-gh])
[claude-code-gh]: https://github.com/anthropics/claude-code
2026-03-17 15:06:30 -04:00
18 changed files with 136 additions and 696 deletions

View File

@ -32,7 +32,7 @@ import tractor
from piker.brokers import open_cached_client
from piker.log import get_logger, get_console_log
from piker.data import ShmArray
from tractor.ipc._shm import ShmArray
from piker.brokers._util import (
BrokerError,
DataUnavailable,

View File

@ -23,13 +23,13 @@ sharing live streams over a network.
"""
from .ticktools import iterticks
from ._sharedmem import (
maybe_open_shm_array,
attach_shm_array,
open_shm_array,
get_shm_token,
from tractor.ipc._shm import (
ShmArray,
get_shm_token,
open_shm_ndarray as open_shm_array,
attach_shm_ndarray as attach_shm_array,
)
from ._sharedmem import maybe_open_shm_array
from ._source import (
def_iohlcv_fields,
def_ohlcv_fields,

View File

@ -28,9 +28,7 @@ from msgspec import field
import numpy as np
from numpy.lib import recfunctions as rfn
from ._sharedmem import (
ShmArray,
)
from tractor.ipc._shm import ShmArray
from ._pathops import (
path_arrays_from_ohlc,
)

View File

@ -55,9 +55,7 @@ from ._util import (
from ..service import maybe_spawn_daemon
if TYPE_CHECKING:
from ._sharedmem import (
ShmArray,
)
from tractor.ipc._shm import ShmArray
from .feed import (
_FeedsBus,
Sub,
@ -378,16 +376,16 @@ async def register_with_sampler(
# feed_is_live.is_set()
# ^TODO? pass it in instead?
):
from ._sharedmem import (
attach_shm_array,
_Token,
from tractor.ipc._shm import (
attach_shm_ndarray,
NDToken,
)
for period in shms_by_period:
# load and register shm handles
shm_token_msg = shms_by_period[period]
shm = attach_shm_array(
_Token.from_msg(shm_token_msg),
shm = attach_shm_ndarray(
NDToken.from_msg(shm_token_msg),
readonly=False,
)
shms_by_period[period] = shm

View File

@ -1,622 +1,67 @@
# piker: trading gear for hackers
# Copyright (C) Tyler Goodlet (in stewardship for pikers)
# This program is free software: you can redistribute it and/or modify
# it under the terms of the GNU Affero General Public License as published by
# the Free Software Foundation, either version 3 of the License, or
# (at your option) any later version.
# This program is free software: you can redistribute it
# and/or modify it under the terms of the GNU Affero General
# Public License as published by the Free Software
# Foundation, either version 3 of the License, or (at your
# option) any later version.
# This program is distributed in the hope that it will be useful,
# but WITHOUT ANY WARRANTY; without even the implied warranty of
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
# GNU Affero General Public License for more details.
# This program is distributed in the hope that it will be
# useful, but WITHOUT ANY WARRANTY; without even the implied
# warranty of MERCHANTABILITY or FITNESS FOR A PARTICULAR
# PURPOSE. See the GNU Affero General Public License for
# more details.
# You should have received a copy of the GNU Affero General Public License
# along with this program. If not, see <https://www.gnu.org/licenses/>.
# You should have received a copy of the GNU Affero General
# Public License along with this program. If not, see
# <https://www.gnu.org/licenses/>.
"""
NumPy compatible shared memory buffers for real-time IPC streaming.
'''
Piker-specific shared memory helpers.
"""
from __future__ import annotations
from sys import byteorder
import time
from typing import Optional
from multiprocessing.shared_memory import SharedMemory, _USE_POSIX
Thin shim providing piker-only wrappers around
``tractor.ipc._shm``; all core types and functions
are now imported directly from tractor throughout
the codebase.
if _USE_POSIX:
from _posixshmem import shm_unlink
# import msgspec
'''
import numpy as np
from numpy.lib import recfunctions as rfn
import tractor
from tractor.ipc._shm import (
NDToken,
ShmArray,
_known_tokens,
_make_token as _tractor_make_token,
open_shm_ndarray,
attach_shm_ndarray,
)
from ._util import log
from ._source import def_iohlcv_fields
from piker.types import Struct
def cuckoff_mantracker():
'''
Disable all ``multiprocessing``` "resource tracking" machinery since
it's an absolute multi-threaded mess of non-SC madness.
'''
from multiprocessing import resource_tracker as mantracker
# Tell the "resource tracker" thing to fuck off.
class ManTracker(mantracker.ResourceTracker):
def register(self, name, rtype):
pass
def unregister(self, name, rtype):
pass
def ensure_running(self):
pass
# "know your land and know your prey"
# https://www.dailymotion.com/video/x6ozzco
mantracker._resource_tracker = ManTracker()
mantracker.register = mantracker._resource_tracker.register
mantracker.ensure_running = mantracker._resource_tracker.ensure_running
mantracker.unregister = mantracker._resource_tracker.unregister
mantracker.getfd = mantracker._resource_tracker.getfd
cuckoff_mantracker()
class SharedInt:
"""Wrapper around a single entry shared memory array which
holds an ``int`` value used as an index counter.
"""
def __init__(
self,
shm: SharedMemory,
) -> None:
self._shm = shm
@property
def value(self) -> int:
return int.from_bytes(self._shm.buf, byteorder)
@value.setter
def value(self, value) -> None:
self._shm.buf[:] = value.to_bytes(self._shm.size, byteorder)
def destroy(self) -> None:
if _USE_POSIX:
# We manually unlink to bypass all the "resource tracker"
# nonsense meant for non-SC systems.
name = self._shm.name
try:
shm_unlink(name)
except FileNotFoundError:
# might be a teardown race here?
log.warning(f'Shm for {name} already unlinked?')
class _Token(Struct, frozen=True):
'''
Internal represenation of a shared memory "token"
which can be used to key a system wide post shm entry.
'''
shm_name: str # this servers as a "key" value
shm_first_index_name: str
shm_last_index_name: str
dtype_descr: tuple
size: int # in struct-array index / row terms
@property
def dtype(self) -> np.dtype:
return np.dtype(list(map(tuple, self.dtype_descr))).descr
def as_msg(self):
return self.to_dict()
@classmethod
def from_msg(cls, msg: dict) -> _Token:
if isinstance(msg, _Token):
return msg
# TODO: native struct decoding
# return _token_dec.decode(msg)
msg['dtype_descr'] = tuple(map(tuple, msg['dtype_descr']))
return _Token(**msg)
# _token_dec = msgspec.msgpack.Decoder(_Token)
# TODO: this api?
# _known_tokens = tractor.ActorVar('_shm_tokens', {})
# _known_tokens = tractor.ContextStack('_known_tokens', )
# _known_tokens = trio.RunVar('shms', {})
# process-local store of keys to tokens
_known_tokens = {}
def get_shm_token(key: str) -> _Token:
"""Convenience func to check if a token
for the provided key is known by this process.
"""
return _known_tokens.get(key)
def _make_token(
key: str,
size: int,
dtype: Optional[np.dtype] = None,
) -> _Token:
'''
Create a serializable token that can be used
to access a shared array.
'''
dtype = def_iohlcv_fields if dtype is None else dtype
return _Token(
shm_name=key,
shm_first_index_name=key + "_first",
shm_last_index_name=key + "_last",
dtype_descr=tuple(np.dtype(dtype).descr),
size=size,
)
class ShmArray:
'''
A shared memory ``numpy`` (compatible) array API.
An underlying shared memory buffer is allocated based on
a user specified ``numpy.ndarray``. This fixed size array
can be read and written to by pushing data both onto the "front"
or "back" of a set index range. The indexes for the "first" and
"last" index are themselves stored in shared memory (accessed via
``SharedInt`` interfaces) values such that multiple processes can
interact with the same array using a synchronized-index.
'''
def __init__(
self,
shmarr: np.ndarray,
first: SharedInt,
last: SharedInt,
shm: SharedMemory,
# readonly: bool = True,
) -> None:
self._array = shmarr
# indexes for first and last indices corresponding
# to fille data
self._first = first
self._last = last
self._len = len(shmarr)
self._shm = shm
self._post_init: bool = False
# pushing data does not write the index (aka primary key)
dtype = shmarr.dtype
if dtype.fields:
self._write_fields = list(shmarr.dtype.fields.keys())[1:]
else:
self._write_fields = None
# TODO: ringbuf api?
@property
def _token(self) -> _Token:
return _Token(
shm_name=self._shm.name,
shm_first_index_name=self._first._shm.name,
shm_last_index_name=self._last._shm.name,
dtype_descr=tuple(self._array.dtype.descr),
size=self._len,
)
@property
def token(self) -> dict:
"""Shared memory token that can be serialized and used by
another process to attach to this array.
"""
return self._token.as_msg()
@property
def index(self) -> int:
return self._last.value % self._len
@property
def array(self) -> np.ndarray:
'''
Return an up-to-date ``np.ndarray`` view of the
so-far-written data to the underlying shm buffer.
'''
a = self._array[self._first.value:self._last.value]
# first, last = self._first.value, self._last.value
# a = self._array[first:last]
# TODO: eventually comment this once we've not seen it in the
# wild in a long time..
# XXX: race where first/last indexes cause a reader
# to load an empty array..
if len(a) == 0 and self._post_init:
raise RuntimeError('Empty array race condition hit!?')
return a
def ustruct(
self,
fields: Optional[list[str]] = None,
# type that all field values will be cast to
# in the returned view.
common_dtype: np.dtype = float,
) -> np.ndarray:
array = self._array
if fields:
selection = array[fields]
# fcount = len(fields)
else:
selection = array
# fcount = len(array.dtype.fields)
# XXX: manual ``.view()`` attempt that also doesn't work.
# uview = selection.view(
# dtype='<f16',
# ).reshape(-1, 4, order='A')
# assert len(selection) == len(uview)
u = rfn.structured_to_unstructured(
selection,
# dtype=float,
copy=True,
)
# unstruct = np.ndarray(u.shape, dtype=a.dtype, buffer=shm.buf)
# array[:] = a[:]
return u
# return ShmArray(
# shmarr=u,
# first=self._first,
# last=self._last,
# shm=self._shm
# )
def last(
self,
length: int = 1,
) -> np.ndarray:
'''
Return the last ``length``'s worth of ("row") entries from the
array.
'''
return self.array[-length:]
def push(
self,
data: np.ndarray,
field_map: Optional[dict[str, str]] = None,
prepend: bool = False,
update_first: bool = True,
start: int | None = None,
) -> int:
'''
Ring buffer like "push" to append data
into the buffer and return updated "last" index.
NB: no actual ring logic yet to give a "loop around" on overflow
condition, lel.
'''
length = len(data)
if prepend:
index = (start or self._first.value) - length
if index < 0:
raise ValueError(
f'Array size of {self._len} was overrun during prepend.\n'
f'You have passed {abs(index)} too many datums.'
)
else:
index = start if start is not None else self._last.value
end = index + length
if field_map:
src_names, dst_names = zip(*field_map.items())
else:
dst_names = src_names = self._write_fields
try:
self._array[
list(dst_names)
][index:end] = data[list(src_names)][:]
# NOTE: there was a race here between updating
# the first and last indices and when the next reader
# tries to access ``.array`` (which due to the index
# overlap will be empty). Pretty sure we've fixed it now
# but leaving this here as a reminder.
if (
prepend
and update_first
and length
):
assert index < self._first.value
if (
index < self._first.value
and update_first
):
assert prepend, 'prepend=True not passed but index decreased?'
self._first.value = index
elif not prepend:
self._last.value = end
self._post_init = True
return end
except ValueError as err:
if field_map:
raise
# should raise if diff detected
self.diff_err_fields(data)
raise err
def diff_err_fields(
self,
data: np.ndarray,
) -> None:
# reraise with any field discrepancy
our_fields, their_fields = (
set(self._array.dtype.fields),
set(data.dtype.fields),
)
only_in_ours = our_fields - their_fields
only_in_theirs = their_fields - our_fields
if only_in_ours:
raise TypeError(
f"Input array is missing field(s): {only_in_ours}"
)
elif only_in_theirs:
raise TypeError(
f"Input array has unknown field(s): {only_in_theirs}"
)
# TODO: support "silent" prepends that don't update ._first.value?
def prepend(
self,
data: np.ndarray,
) -> int:
end = self.push(data, prepend=True)
assert end
def close(self) -> None:
self._first._shm.close()
self._last._shm.close()
self._shm.close()
def destroy(self) -> None:
if _USE_POSIX:
# We manually unlink to bypass all the "resource tracker"
# nonsense meant for non-SC systems.
shm_unlink(self._shm.name)
self._first.destroy()
self._last.destroy()
def flush(self) -> None:
# TODO: flush to storage backend like markestore?
...
def open_shm_array(
size: int,
key: str | None = None,
dtype: np.dtype|None = None,
append_start_index: int | None = None,
readonly: bool = False,
) -> ShmArray:
'''Open a memory shared ``numpy`` using the standard library.
This call unlinks (aka permanently destroys) the buffer on teardown
and thus should be used from the parent-most accessor (process).
) -> NDToken:
'''
Wrap tractor's ``_make_token()`` with piker's
default dtype fallback to ``def_iohlcv_fields``.
'''
# create new shared mem segment for which we
# have write permission
a = np.zeros(size, dtype=dtype)
a['index'] = np.arange(len(a))
shm = SharedMemory(
name=key,
create=True,
size=a.nbytes
from ._source import def_iohlcv_fields
dtype = (
def_iohlcv_fields
if dtype is None
else dtype
)
array = np.ndarray(
a.shape,
dtype=a.dtype,
buffer=shm.buf
)
array[:] = a[:]
array.setflags(write=int(not readonly))
token = _make_token(
return _tractor_make_token(
key=key,
size=size,
dtype=dtype,
)
# create single entry arrays for storing an first and last indices
first = SharedInt(
shm=SharedMemory(
name=token.shm_first_index_name,
create=True,
size=4, # std int
)
)
last = SharedInt(
shm=SharedMemory(
name=token.shm_last_index_name,
create=True,
size=4, # std int
)
)
# start the "real-time" updated section after 3-days worth of 1s
# sampled OHLC. this allows appending up to a days worth from
# tick/quote feeds before having to flush to a (tsdb) storage
# backend, and looks something like,
# -------------------------
# | | i
# _________________________
# <-------------> <------->
# history real-time
#
# Once fully "prepended", the history section will leave the
# ``ShmArray._start.value: int = 0`` and the yet-to-be written
# real-time section will start at ``ShmArray.index: int``.
# this sets the index to nearly 2/3rds into the the length of
# the buffer leaving at least a "days worth of second samples"
# for the real-time section.
if append_start_index is None:
append_start_index = round(size * 0.616)
last.value = first.value = append_start_index
shmarr = ShmArray(
array,
first,
last,
shm,
)
assert shmarr._token == token
_known_tokens[key] = shmarr.token
# "unlink" created shm on process teardown by
# pushing teardown calls onto actor context stack
stack = tractor.current_actor(
err_on_no_runtime=False,
).lifetime_stack
if stack:
stack.callback(shmarr.close)
stack.callback(shmarr.destroy)
return shmarr
def attach_shm_array(
token: tuple[str, str, tuple[str, str]],
readonly: bool = True,
) -> ShmArray:
'''
Attach to an existing shared memory array previously
created by another process using ``open_shared_array``.
No new shared mem is allocated but wrapper types for read/write
access are constructed.
'''
token = _Token.from_msg(token)
key = token.shm_name
if key in _known_tokens:
assert _Token.from_msg(_known_tokens[key]) == token, "WTF"
# XXX: ugh, looks like due to the ``shm_open()`` C api we can't
# actually place files in a subdir, see discussion here:
# https://stackoverflow.com/a/11103289
# attach to array buffer and view as per dtype
_err: Optional[Exception] = None
for _ in range(3):
try:
shm = SharedMemory(
name=key,
create=False,
)
break
except OSError as oserr:
_err = oserr
time.sleep(0.1)
else:
if _err:
raise _err
shmarr = np.ndarray(
(token.size,),
dtype=token.dtype,
buffer=shm.buf
)
shmarr.setflags(write=int(not readonly))
first = SharedInt(
shm=SharedMemory(
name=token.shm_first_index_name,
create=False,
size=4, # std int
),
)
last = SharedInt(
shm=SharedMemory(
name=token.shm_last_index_name,
create=False,
size=4, # std int
),
)
# make sure we can read
first.value
sha = ShmArray(
shmarr,
first,
last,
shm,
)
# read test
sha.array
# Stash key -> token knowledge for future queries
# via `maybe_opepn_shm_array()` but only after we know
# we can attach.
if key not in _known_tokens:
_known_tokens[key] = token
# "close" attached shm on actor teardown
if (actor := tractor.current_actor(
err_on_no_runtime=False,
)):
actor.lifetime_stack.callback(sha.close)
return sha
def maybe_open_shm_array(
key: str,
@ -625,37 +70,37 @@ def maybe_open_shm_array(
append_start_index: int|None = None,
readonly: bool = False,
**kwargs,
) -> tuple[ShmArray, bool]:
'''
Attempt to attach to a shared memory block using a "key" lookup
to registered blocks in the users overall "system" registry
(presumes you don't have the block's explicit token).
Attempt to attach to a shared memory block
using a "key" lookup to registered blocks in
the user's overall "system" registry (presumes
you don't have the block's explicit token).
This function is meant to solve the problem of discovering whether
a shared array token has been allocated or discovered by the actor
running in **this** process. Systems where multiple actors may seek
to access a common block can use this function to attempt to acquire
a token as discovered by the actors who have previously stored
a "key" -> ``_Token`` map in an actor local (aka python global)
variable.
This is a thin wrapper around tractor's
``maybe_open_shm_ndarray()`` preserving piker's
historical defaults (``readonly=False``,
``append_start_index=None``).
If you know the explicit ``_Token`` for your memory segment instead
use ``attach_shm_array``.
If you know the explicit ``NDToken`` for your
memory segment instead use
``tractor.ipc._shm.attach_shm_ndarray()``.
'''
try:
# see if we already know this key
token = _known_tokens[key]
return (
attach_shm_array(
attach_shm_ndarray(
token=token,
readonly=readonly,
),
False,
)
except KeyError:
log.debug(f"Could not find {key} in shms cache")
log.debug(
f'Could not find {key} in shms cache'
)
if dtype:
token = _make_token(
key,
@ -663,9 +108,18 @@ def maybe_open_shm_array(
dtype=dtype,
)
try:
return attach_shm_array(token=token, **kwargs), False
return (
attach_shm_ndarray(
token=token,
**kwargs,
),
False,
)
except FileNotFoundError:
log.debug(f"Could not attach to shm with token {token}")
log.debug(
f'Could not attach to shm'
f' with token {token}'
)
# This actor does not know about memory
# associated with the provided "key".
@ -673,7 +127,7 @@ def maybe_open_shm_array(
# to fail if a block has been allocated
# on the OS by someone else.
return (
open_shm_array(
open_shm_ndarray(
key=key,
size=size,
dtype=dtype,
@ -683,18 +137,20 @@ def maybe_open_shm_array(
True,
)
def try_read(
array: np.ndarray
) -> Optional[np.ndarray]:
array: np.ndarray,
) -> np.ndarray|None:
'''
Try to read the last row from a shared mem array or ``None``
if the array read returns a zero-length array result.
Try to read the last row from a shared mem
array or ``None`` if the array read returns
a zero-length array result.
Can be used to check for backfilling race conditions where an array
is currently being (re-)written by a writer actor but the reader is
unaware and reads during the window where the first and last indexes
are being updated.
Can be used to check for backfilling race
conditions where an array is currently being
(re-)written by a writer actor but the reader
is unaware and reads during the window where
the first and last indexes are being updated.
'''
try:
@ -702,14 +158,13 @@ def try_read(
except IndexError:
# XXX: race condition with backfilling shm.
#
# the underlying issue is that a backfill (aka prepend) and subsequent
# shm array first/last index update could result in an empty array
# read here since the indices may be updated in such a way that
# a read delivers an empty array (though it seems like we
# *should* be able to prevent that?). also, as and alt and
# something we need anyway, maybe there should be some kind of
# signal that a prepend is taking place and this consumer can
# respond (eg. redrawing graphics) accordingly.
# the underlying issue is that a backfill
# (aka prepend) and subsequent shm array
# first/last index update could result in an
# empty array read here since the indices may
# be updated in such a way that a read delivers
# an empty array (though it seems like we
# *should* be able to prevent that?).
# the array read was emtpy
# the array read was empty
return None

View File

@ -28,10 +28,10 @@ import pendulum
import numpy as np
from piker.types import Struct
from ._sharedmem import (
attach_shm_array,
from tractor.ipc._shm import (
ShmArray,
_Token,
NDToken,
attach_shm_ndarray,
)
from piker.accounting import MktPair
@ -58,11 +58,11 @@ class Flume(Struct):
'''
mkt: MktPair
first_quote: dict
_rt_shm_token: _Token
_rt_shm_token: NDToken
# optional since some data flows won't have a "downsampled" history
# buffer/stream (eg. FSPs).
_hist_shm_token: _Token | None = None
_hist_shm_token: NDToken|None = None
# private shm refs loaded dynamically from tokens
_hist_shm: ShmArray | None = None
@ -78,7 +78,7 @@ class Flume(Struct):
def rt_shm(self) -> ShmArray:
if self._rt_shm is None:
self._rt_shm = attach_shm_array(
self._rt_shm = attach_shm_ndarray(
token=self._rt_shm_token,
readonly=self._readonly,
)
@ -94,7 +94,7 @@ class Flume(Struct):
)
if self._hist_shm is None:
self._hist_shm = attach_shm_array(
self._hist_shm = attach_shm_ndarray(
token=self._hist_shm_token,
readonly=self._readonly,
)

View File

@ -37,12 +37,12 @@ import numpy as np
import tractor
from tractor.msg import NamespacePath
from ..data._sharedmem import (
from tractor.ipc._shm import (
ShmArray,
maybe_open_shm_array,
attach_shm_array,
_Token,
NDToken,
attach_shm_ndarray,
)
from ..data._sharedmem import maybe_open_shm_array
from ..log import get_logger
log = get_logger(__name__)
@ -78,8 +78,8 @@ class Fsp:
# + the consuming fsp *to* the consumers output
# shm flow.
_flow_registry: dict[
tuple[_Token, str],
tuple[_Token, Optional[ShmArray]],
tuple[NDToken, str],
tuple[NDToken, Optional[ShmArray]],
] = {}
def __init__(
@ -148,7 +148,7 @@ class Fsp:
# times as possible as per:
# - https://github.com/pikers/piker/issues/359
# - https://github.com/pikers/piker/issues/332
maybe_array := attach_shm_array(dst_token)
maybe_array := attach_shm_ndarray(dst_token)
)
return maybe_array

View File

@ -40,7 +40,7 @@ from ..log import (
)
from .. import data
from ..data.flows import Flume
from ..data._sharedmem import ShmArray
from tractor.ipc._shm import ShmArray
from ..data._sampling import (
_default_delay_s,
open_sample_stream,
@ -49,7 +49,7 @@ from ..accounting import MktPair
from ._api import (
Fsp,
_load_builtins,
_Token,
NDToken,
)
from ..toolz import Profiler
@ -414,7 +414,7 @@ async def cascade(
dst_flume_addr: dict,
ns_path: NamespacePath,
shm_registry: dict[str, _Token],
shm_registry: dict[str, NDToken],
zero_on_step: bool = False,
loglevel: str|None = None,
@ -465,9 +465,9 @@ async def cascade(
# not sure how else to do it.
for (token, fsp_name, dst_token) in shm_registry:
Fsp._flow_registry[(
_Token.from_msg(token),
NDToken.from_msg(token),
fsp_name,
)] = _Token.from_msg(dst_token), None
)] = NDToken.from_msg(dst_token), None
fsp: Fsp = reg.get(
NamespacePath(ns_path)

View File

@ -25,7 +25,7 @@ from numba import jit, float64, optional, int64
from ._api import fsp
from ..data import iterticks
from ..data._sharedmem import ShmArray
from tractor.ipc._shm import ShmArray
@jit(

View File

@ -21,7 +21,7 @@ from tractor.trionics._broadcast import AsyncReceiver
from ._api import fsp
from ..data import iterticks
from ..data._sharedmem import ShmArray
from tractor.ipc._shm import ShmArray
from ._momo import _wma
from ..log import get_logger

View File

@ -37,9 +37,7 @@ import typer
from piker.service import open_piker_runtime
from piker.cli import cli
from piker.data import (
ShmArray,
)
from tractor.ipc._shm import ShmArray
from piker import tsp
from . import log
from . import (

View File

@ -64,10 +64,8 @@ from pendulum import (
from piker import config
from piker import tsp
from piker.data import (
def_iohlcv_fields,
ShmArray,
)
from tractor.ipc._shm import ShmArray
from piker.data import def_iohlcv_fields
from piker.log import get_logger
from . import TimeseriesNotFound

View File

@ -63,10 +63,8 @@ from piker.log import (
get_logger,
get_console_log,
)
from ..data._sharedmem import (
maybe_open_shm_array,
ShmArray,
)
from tractor.ipc._shm import ShmArray
from ..data._sharedmem import maybe_open_shm_array
from piker.data._source import (
def_iohlcv_fields,
)

View File

@ -49,7 +49,7 @@ from ._cursor import (
Cursor,
ContentsLabel,
)
from ..data._sharedmem import ShmArray
from tractor.ipc._shm import ShmArray
from ._ohlc import BarItems
from ._curve import (
Curve,

View File

@ -42,9 +42,7 @@ from numpy import (
import pyqtgraph as pg
from piker.ui.qt import QLineF
from ..data._sharedmem import (
ShmArray,
)
from tractor.ipc._shm import ShmArray
from ..data.flows import Flume
from ..data._formatters import (
IncrementalFormatter,

View File

@ -44,14 +44,12 @@ from piker.fsp import (
dolla_vlm,
flow_rates,
)
from piker.data import (
Flume,
from tractor.ipc._shm import (
ShmArray,
NDToken,
)
from piker.data._sharedmem import (
_Token,
try_read,
)
from piker.data import Flume
from piker.data._sharedmem import try_read
from piker.log import get_logger
from piker.toolz import Profiler
from piker.types import Struct
@ -382,7 +380,7 @@ class FspAdmin:
tuple,
tuple[tractor.MsgStream, ShmArray]
] = {}
self._flow_registry: dict[_Token, str] = {}
self._flow_registry: dict[NDToken, str] = {}
# TODO: make this a `.src_flume` and add
# a `dst_flume`?

View File

@ -206,9 +206,8 @@ pyvnc = { git = "https://github.com/regulad/pyvnc.git" }
# xonsh = { git = 'https://github.com/xonsh/xonsh.git', branch = 'main' }
# XXX since, we're like, always hacking new shite all-the-time. Bp
tractor = { git = "https://github.com/goodboy/tractor.git", branch ="piker_pin" }
tractor = { git = "https://github.com/goodboy/tractor.git", branch ="main" }
# tractor = { git = "https://pikers.dev/goodboy/tractor", branch = "piker_pin" }
# tractor = { git = "https://pikers.dev/goodboy/tractor", branch = "main" }
# ------ goodboy ------
# hackin dev-envs, usually there's something new he's hackin in..
# tractor = { path = "../tractor", editable = true }

View File

@ -1034,7 +1034,7 @@ requires-dist = [
{ name = "tomli", specifier = ">=2.0.1,<3.0.0" },
{ name = "tomli-w", specifier = ">=1.0.0,<2.0.0" },
{ name = "tomlkit", git = "https://github.com/pikers/tomlkit.git?branch=piker_pin" },
{ name = "tractor", git = "https://github.com/goodboy/tractor.git?branch=piker_pin" },
{ name = "tractor", git = "https://github.com/goodboy/tractor.git?branch=main" },
{ name = "trio", specifier = ">=0.27" },
{ name = "trio-typing", specifier = ">=0.10.0" },
{ name = "trio-util", specifier = ">=0.7.0,<0.8.0" },
@ -1680,7 +1680,7 @@ wheels = [
[[package]]
name = "tractor"
version = "0.1.0a6.dev0"
source = { git = "https://github.com/goodboy/tractor.git?branch=piker_pin#566a17ba92469000a01fd18c709ea3e336e72796" }
source = { git = "https://github.com/goodboy/tractor.git?branch=main#e77198bb64f0467a50e251ed140daee439752354" }
dependencies = [
{ name = "bidict" },
{ name = "cffi" },