diff --git a/piker/data/_flumes.py b/piker/data/_flumes.py
new file mode 100644
index 00000000..82ce2947
--- /dev/null
+++ b/piker/data/_flumes.py
@@ -0,0 +1,304 @@
+# piker: trading gear for hackers
+# Copyright (C) Tyler Goodlet (in stewardship for pikers)
+
+# This program is free software: you can redistribute it and/or modify
+# it under the terms of the GNU Affero General Public License as published by
+# the Free Software Foundation, either version 3 of the License, or
+# (at your option) any later version.
+
+# This program is distributed in the hope that it will be useful,
+# but WITHOUT ANY WARRANTY; without even the implied warranty of
+# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
+# GNU Affero General Public License for more details.
+
+# You should have received a copy of the GNU Affero General Public License
+# along with this program. If not, see .
+
+"""
+Real-time data flow abstractions.
+
+
+
+"""
+from contextlib import asynccontextmanager as acm
+from functools import partial
+from typing import (
+ AsyncIterator,
+ TYPE_CHECKING,
+)
+
+import tractor
+from tractor.trionics import (
+ maybe_open_context,
+)
+import pendulum
+import numpy as np
+
+from .types import Struct
+from ._source import (
+ Symbol,
+)
+from ._sharedmem import (
+ attach_shm_array,
+ ShmArray,
+ _Token,
+)
+from ._sampling import (
+ iter_ohlc_periods,
+)
+
+if TYPE_CHECKING:
+ from pyqtgraph import PlotItem
+ from .feed import Feed
+
+
+class Flume(Struct):
+ '''
+ Composite reference type which points to all the addressing handles
+ and other meta-data necessary for the read, measure and management
+ of a set of real-time updated data flows.
+
+ Can be thought of as a "flow descriptor" or "flow frame" which
+ describes the high level properties of a set of data flows that can
+ be used seamlessly across process-memory boundaries.
+
+ Each instance's sub-components normally includes:
+ - a msg oriented quote stream provided via an IPC transport
+ - history and real-time shm buffers which are both real-time
+ updated and backfilled.
+ - associated startup indexing information related to both buffer
+ real-time-append and historical prepend addresses.
+ - low level APIs to read and measure the updated data and manage
+ queuing properties.
+
+ '''
+ symbol: Symbol
+ first_quote: dict
+ _hist_shm_token: _Token
+ _rt_shm_token: _Token
+
+ # private shm refs loaded dynamically from tokens
+ _hist_shm: ShmArray | None = None
+ _rt_shm: ShmArray | None = None
+
+ stream: tractor.MsgStream | None = None
+ izero_hist: int = 0
+ izero_rt: int = 0
+ throttle_rate: int | None = None
+
+ # TODO: do we need this really if we can pull the `Portal` from
+ # ``tractor``'s internals?
+ feed: Feed | None = None
+
+ @property
+ def rt_shm(self) -> ShmArray:
+
+ if self._rt_shm is None:
+ self._rt_shm = attach_shm_array(
+ token=self._rt_shm_token,
+ readonly=True,
+ )
+
+ return self._rt_shm
+
+ @property
+ def hist_shm(self) -> ShmArray:
+
+ if self._hist_shm is None:
+ self._hist_shm = attach_shm_array(
+ token=self._hist_shm_token,
+ readonly=True,
+ )
+
+ return self._hist_shm
+
+ async def receive(self) -> dict:
+ return await self.stream.receive()
+
+ @acm
+ async def index_stream(
+ self,
+ delay_s: int = 1,
+
+ ) -> AsyncIterator[int]:
+
+ if not self.feed:
+ raise RuntimeError('This flume is not part of any ``Feed``?')
+
+ # TODO: maybe a public (property) API for this in ``tractor``?
+ portal = self.stream._ctx._portal
+ assert portal
+
+ # XXX: this should be singleton on a host,
+ # a lone broker-daemon per provider should be
+ # created for all practical purposes
+ async with maybe_open_context(
+ acm_func=partial(
+ portal.open_context,
+ iter_ohlc_periods,
+ ),
+ kwargs={'delay_s': delay_s},
+ ) as (cache_hit, (ctx, first)):
+ async with ctx.open_stream() as istream:
+ if cache_hit:
+ # add a new broadcast subscription for the quote stream
+ # if this feed is likely already in use
+ async with istream.subscribe() as bistream:
+ yield bistream
+ else:
+ yield istream
+
+ def get_ds_info(
+ self,
+ ) -> tuple[float, float, float]:
+ '''
+ Compute the "downsampling" ratio info between the historical shm
+ buffer and the real-time (HFT) one.
+
+ Return a tuple of the fast sample period, historical sample
+ period and ratio between them.
+
+ '''
+ times = self.hist_shm.array['time']
+ end = pendulum.from_timestamp(times[-1])
+ start = pendulum.from_timestamp(times[times != times[-1]][-1])
+ hist_step_size_s = (end - start).seconds
+
+ times = self.rt_shm.array['time']
+ end = pendulum.from_timestamp(times[-1])
+ start = pendulum.from_timestamp(times[times != times[-1]][-1])
+ rt_step_size_s = (end - start).seconds
+
+ ratio = hist_step_size_s / rt_step_size_s
+ return (
+ rt_step_size_s,
+ hist_step_size_s,
+ ratio,
+ )
+
+ # TODO: get native msgspec decoding for these workinn
+ def to_msg(self) -> dict:
+ msg = self.to_dict()
+ msg['symbol'] = msg['symbol'].to_dict()
+
+ # can't serialize the stream or feed objects, it's expected
+ # you'll have a ref to it since this msg should be rxed on
+ # a stream on whatever far end IPC..
+ msg.pop('stream')
+ msg.pop('feed')
+ return msg
+
+ @classmethod
+ def from_msg(cls, msg: dict) -> dict:
+ symbol = Symbol(**msg.pop('symbol'))
+ return cls(
+ symbol=symbol,
+ **msg,
+ )
+
+ def get_index(
+ self,
+ time_s: float,
+
+ ) -> int:
+ '''
+ Return array shm-buffer index for for epoch time.
+
+ '''
+ array = self.rt_shm.array
+ times = array['time']
+ mask = (times >= time_s)
+
+ if any(mask):
+ return array['index'][mask][0]
+
+ # just the latest index
+ array['index'][-1]
+
+ def slice_from_time(
+ self,
+ array: np.ndarray,
+ start_t: float,
+ stop_t: float,
+ timeframe_s: int = 1,
+ return_data: bool = False,
+
+ ) -> np.ndarray:
+ '''
+ Slice an input struct array providing only datums
+ "in view" of this chart.
+
+ '''
+ arr = {
+ 1: self.rt_shm.array,
+ 60: self.hist_shm.arry,
+ }[timeframe_s]
+
+ times = arr['time']
+ index = array['index']
+
+ # use advanced indexing to map the
+ # time range to the index range.
+ mask = (
+ (times >= start_t)
+ &
+ (times < stop_t)
+ )
+
+ # TODO: if we can ensure each time field has a uniform
+ # step we can instead do some arithmetic to determine
+ # the equivalent index like we used to?
+ # return array[
+ # lbar - ifirst:
+ # (rbar - ifirst) + 1
+ # ]
+
+ i_by_t = index[mask]
+ i_0 = i_by_t[0]
+
+ abs_slc = slice(
+ i_0,
+ i_by_t[-1],
+ )
+ # slice data by offset from the first index
+ # available in the passed datum set.
+ read_slc = slice(
+ 0,
+ i_by_t[-1] - i_0,
+ )
+ if not return_data:
+ return (
+ abs_slc,
+ read_slc,
+ )
+
+ # also return the readable data from the timerange
+ return (
+ abs_slc,
+ read_slc,
+ arr[mask],
+ )
+
+ def view_data(
+ self,
+ plot: PlotItem,
+ timeframe_s: int = 1,
+
+ ) -> np.ndarray:
+
+ # get far-side x-indices plot view
+ vr = plot.viewRect()
+ l = vr.left()
+ r = vr.right()
+
+ (
+ abs_slc,
+ buf_slc,
+ iv_arr,
+ ) = self.slice_from_time(
+ start_t=l,
+ stop_t=r,
+ timeframe_s=timeframe_s,
+ return_data=True,
+ )
+ return iv_arr
diff --git a/piker/data/feed.py b/piker/data/feed.py
index e51b5f6d..59a7b303 100644
--- a/piker/data/feed.py
+++ b/piker/data/feed.py
@@ -29,7 +29,6 @@ import time
from types import ModuleType
from typing import (
Any,
- AsyncIterator,
AsyncContextManager,
Callable,
Optional,
@@ -57,11 +56,10 @@ from .._daemon import (
maybe_spawn_brokerd,
check_for_service,
)
+from ._flumes import Flume
from ._sharedmem import (
maybe_open_shm_array,
- attach_shm_array,
ShmArray,
- _Token,
_secs_in_day,
)
from .ingest import get_ingestormod
@@ -76,7 +74,6 @@ from ._sampling import (
sampler,
broadcast,
increment_ohlc_buffer,
- iter_ohlc_periods,
sample_and_broadcast,
uniform_rate_send,
_default_delay_s,
@@ -87,7 +84,6 @@ from ..brokers._util import (
if TYPE_CHECKING:
from .marketstore import Storage
- from pyqtgraph import PlotItem
log = get_logger(__name__)
@@ -894,258 +890,6 @@ async def manage_history(
await trio.sleep_forever()
-class Flume(Struct):
- '''
- Composite reference type which points to all the addressing handles
- and other meta-data necessary for the read, measure and management
- of a set of real-time updated data flows.
-
- Can be thought of as a "flow descriptor" or "flow frame" which
- describes the high level properties of a set of data flows that can
- be used seamlessly across process-memory boundaries.
-
- Each instance's sub-components normally includes:
- - a msg oriented quote stream provided via an IPC transport
- - history and real-time shm buffers which are both real-time
- updated and backfilled.
- - associated startup indexing information related to both buffer
- real-time-append and historical prepend addresses.
- - low level APIs to read and measure the updated data and manage
- queuing properties.
-
- '''
- symbol: Symbol
- first_quote: dict
- _hist_shm_token: _Token
- _rt_shm_token: _Token
-
- # private shm refs loaded dynamically from tokens
- _hist_shm: ShmArray | None = None
- _rt_shm: ShmArray | None = None
-
- stream: tractor.MsgStream | None = None
- izero_hist: int = 0
- izero_rt: int = 0
- throttle_rate: int | None = None
-
- # TODO: do we need this really if we can pull the `Portal` from
- # ``tractor``'s internals?
- feed: Feed | None = None
-
- @property
- def rt_shm(self) -> ShmArray:
-
- if self._rt_shm is None:
- self._rt_shm = attach_shm_array(
- token=self._rt_shm_token,
- readonly=True,
- )
-
- return self._rt_shm
-
- @property
- def hist_shm(self) -> ShmArray:
-
- if self._hist_shm is None:
- self._hist_shm = attach_shm_array(
- token=self._hist_shm_token,
- readonly=True,
- )
-
- return self._hist_shm
-
- async def receive(self) -> dict:
- return await self.stream.receive()
-
- @acm
- async def index_stream(
- self,
- delay_s: int = 1,
-
- ) -> AsyncIterator[int]:
-
- if not self.feed:
- raise RuntimeError('This flume is not part of any ``Feed``?')
-
- # TODO: maybe a public (property) API for this in ``tractor``?
- portal = self.stream._ctx._portal
- assert portal
-
- # XXX: this should be singleton on a host,
- # a lone broker-daemon per provider should be
- # created for all practical purposes
- async with maybe_open_context(
- acm_func=partial(
- portal.open_context,
- iter_ohlc_periods,
- ),
- kwargs={'delay_s': delay_s},
- ) as (cache_hit, (ctx, first)):
- async with ctx.open_stream() as istream:
- if cache_hit:
- # add a new broadcast subscription for the quote stream
- # if this feed is likely already in use
- async with istream.subscribe() as bistream:
- yield bistream
- else:
- yield istream
-
- def get_ds_info(
- self,
- ) -> tuple[float, float, float]:
- '''
- Compute the "downsampling" ratio info between the historical shm
- buffer and the real-time (HFT) one.
-
- Return a tuple of the fast sample period, historical sample
- period and ratio between them.
-
- '''
- times = self.hist_shm.array['time']
- end = pendulum.from_timestamp(times[-1])
- start = pendulum.from_timestamp(times[times != times[-1]][-1])
- hist_step_size_s = (end - start).seconds
-
- times = self.rt_shm.array['time']
- end = pendulum.from_timestamp(times[-1])
- start = pendulum.from_timestamp(times[times != times[-1]][-1])
- rt_step_size_s = (end - start).seconds
-
- ratio = hist_step_size_s / rt_step_size_s
- return (
- rt_step_size_s,
- hist_step_size_s,
- ratio,
- )
-
- # TODO: get native msgspec decoding for these workinn
- def to_msg(self) -> dict:
- msg = self.to_dict()
- msg['symbol'] = msg['symbol'].to_dict()
-
- # can't serialize the stream or feed objects, it's expected
- # you'll have a ref to it since this msg should be rxed on
- # a stream on whatever far end IPC..
- msg.pop('stream')
- msg.pop('feed')
- return msg
-
- @classmethod
- def from_msg(cls, msg: dict) -> dict:
- symbol = Symbol(**msg.pop('symbol'))
- return cls(
- symbol=symbol,
- **msg,
- )
-
- def get_index(
- self,
- time_s: float,
-
- ) -> int:
- '''
- Return array shm-buffer index for for epoch time.
-
- '''
- array = self.rt_shm.array
- times = array['time']
- mask = (times >= time_s)
-
- if any(mask):
- return array['index'][mask][0]
-
- # just the latest index
- array['index'][-1]
-
- def slice_from_time(
- self,
- array: np.ndarray,
- start_t: float,
- stop_t: float,
- timeframe_s: int = 1,
- return_data: bool = False,
-
- ) -> np.ndarray:
- '''
- Slice an input struct array providing only datums
- "in view" of this chart.
-
- '''
- arr = {
- 1: self.rt_shm.array,
- 60: self.hist_shm.arry,
- }[timeframe_s]
-
- times = arr['time']
- index = array['index']
-
- # use advanced indexing to map the
- # time range to the index range.
- mask = (
- (times >= start_t)
- &
- (times < stop_t)
- )
-
- # TODO: if we can ensure each time field has a uniform
- # step we can instead do some arithmetic to determine
- # the equivalent index like we used to?
- # return array[
- # lbar - ifirst:
- # (rbar - ifirst) + 1
- # ]
-
- i_by_t = index[mask]
- i_0 = i_by_t[0]
-
- abs_slc = slice(
- i_0,
- i_by_t[-1],
- )
- # slice data by offset from the first index
- # available in the passed datum set.
- read_slc = slice(
- 0,
- i_by_t[-1] - i_0,
- )
- if not return_data:
- return (
- abs_slc,
- read_slc,
- )
-
- # also return the readable data from the timerange
- return (
- abs_slc,
- read_slc,
- arr[mask],
- )
-
- def view_data(
- self,
- plot: PlotItem,
- timeframe_s: int = 1,
-
- ) -> np.ndarray:
-
- # get far-side x-indices plot view
- vr = plot.viewRect()
- l = vr.left()
- r = vr.right()
-
- (
- abs_slc,
- buf_slc,
- iv_arr,
- ) = self.slice_from_time(
- start_t=l,
- stop_t=r,
- timeframe_s=timeframe_s,
- return_data=True,
- )
- return iv_arr
-
-
async def allocate_persistent_feed(
bus: _FeedsBus,
sub_registered: trio.Event,