Move `Flume` to a new `.data._flumes` module

samplerd_service
Tyler Goodlet 2022-11-24 13:02:12 -05:00
parent e5e70a6011
commit eacd44dd65
2 changed files with 305 additions and 257 deletions

View File

@ -0,0 +1,304 @@
# piker: trading gear for hackers
# Copyright (C) Tyler Goodlet (in stewardship for pikers)
# This program is free software: you can redistribute it and/or modify
# it under the terms of the GNU Affero General Public License as published by
# the Free Software Foundation, either version 3 of the License, or
# (at your option) any later version.
# This program is distributed in the hope that it will be useful,
# but WITHOUT ANY WARRANTY; without even the implied warranty of
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
# GNU Affero General Public License for more details.
# You should have received a copy of the GNU Affero General Public License
# along with this program. If not, see <https://www.gnu.org/licenses/>.
"""
Real-time data flow abstractions.
<writeup on "buffers, streams, flumes and flows..">
"""
from contextlib import asynccontextmanager as acm
from functools import partial
from typing import (
AsyncIterator,
TYPE_CHECKING,
)
import tractor
from tractor.trionics import (
maybe_open_context,
)
import pendulum
import numpy as np
from .types import Struct
from ._source import (
Symbol,
)
from ._sharedmem import (
attach_shm_array,
ShmArray,
_Token,
)
from ._sampling import (
iter_ohlc_periods,
)
if TYPE_CHECKING:
from pyqtgraph import PlotItem
from .feed import Feed
class Flume(Struct):
'''
Composite reference type which points to all the addressing handles
and other meta-data necessary for the read, measure and management
of a set of real-time updated data flows.
Can be thought of as a "flow descriptor" or "flow frame" which
describes the high level properties of a set of data flows that can
be used seamlessly across process-memory boundaries.
Each instance's sub-components normally includes:
- a msg oriented quote stream provided via an IPC transport
- history and real-time shm buffers which are both real-time
updated and backfilled.
- associated startup indexing information related to both buffer
real-time-append and historical prepend addresses.
- low level APIs to read and measure the updated data and manage
queuing properties.
'''
symbol: Symbol
first_quote: dict
_hist_shm_token: _Token
_rt_shm_token: _Token
# private shm refs loaded dynamically from tokens
_hist_shm: ShmArray | None = None
_rt_shm: ShmArray | None = None
stream: tractor.MsgStream | None = None
izero_hist: int = 0
izero_rt: int = 0
throttle_rate: int | None = None
# TODO: do we need this really if we can pull the `Portal` from
# ``tractor``'s internals?
feed: Feed | None = None
@property
def rt_shm(self) -> ShmArray:
if self._rt_shm is None:
self._rt_shm = attach_shm_array(
token=self._rt_shm_token,
readonly=True,
)
return self._rt_shm
@property
def hist_shm(self) -> ShmArray:
if self._hist_shm is None:
self._hist_shm = attach_shm_array(
token=self._hist_shm_token,
readonly=True,
)
return self._hist_shm
async def receive(self) -> dict:
return await self.stream.receive()
@acm
async def index_stream(
self,
delay_s: int = 1,
) -> AsyncIterator[int]:
if not self.feed:
raise RuntimeError('This flume is not part of any ``Feed``?')
# TODO: maybe a public (property) API for this in ``tractor``?
portal = self.stream._ctx._portal
assert portal
# XXX: this should be singleton on a host,
# a lone broker-daemon per provider should be
# created for all practical purposes
async with maybe_open_context(
acm_func=partial(
portal.open_context,
iter_ohlc_periods,
),
kwargs={'delay_s': delay_s},
) as (cache_hit, (ctx, first)):
async with ctx.open_stream() as istream:
if cache_hit:
# add a new broadcast subscription for the quote stream
# if this feed is likely already in use
async with istream.subscribe() as bistream:
yield bistream
else:
yield istream
def get_ds_info(
self,
) -> tuple[float, float, float]:
'''
Compute the "downsampling" ratio info between the historical shm
buffer and the real-time (HFT) one.
Return a tuple of the fast sample period, historical sample
period and ratio between them.
'''
times = self.hist_shm.array['time']
end = pendulum.from_timestamp(times[-1])
start = pendulum.from_timestamp(times[times != times[-1]][-1])
hist_step_size_s = (end - start).seconds
times = self.rt_shm.array['time']
end = pendulum.from_timestamp(times[-1])
start = pendulum.from_timestamp(times[times != times[-1]][-1])
rt_step_size_s = (end - start).seconds
ratio = hist_step_size_s / rt_step_size_s
return (
rt_step_size_s,
hist_step_size_s,
ratio,
)
# TODO: get native msgspec decoding for these workinn
def to_msg(self) -> dict:
msg = self.to_dict()
msg['symbol'] = msg['symbol'].to_dict()
# can't serialize the stream or feed objects, it's expected
# you'll have a ref to it since this msg should be rxed on
# a stream on whatever far end IPC..
msg.pop('stream')
msg.pop('feed')
return msg
@classmethod
def from_msg(cls, msg: dict) -> dict:
symbol = Symbol(**msg.pop('symbol'))
return cls(
symbol=symbol,
**msg,
)
def get_index(
self,
time_s: float,
) -> int:
'''
Return array shm-buffer index for for epoch time.
'''
array = self.rt_shm.array
times = array['time']
mask = (times >= time_s)
if any(mask):
return array['index'][mask][0]
# just the latest index
array['index'][-1]
def slice_from_time(
self,
array: np.ndarray,
start_t: float,
stop_t: float,
timeframe_s: int = 1,
return_data: bool = False,
) -> np.ndarray:
'''
Slice an input struct array providing only datums
"in view" of this chart.
'''
arr = {
1: self.rt_shm.array,
60: self.hist_shm.arry,
}[timeframe_s]
times = arr['time']
index = array['index']
# use advanced indexing to map the
# time range to the index range.
mask = (
(times >= start_t)
&
(times < stop_t)
)
# TODO: if we can ensure each time field has a uniform
# step we can instead do some arithmetic to determine
# the equivalent index like we used to?
# return array[
# lbar - ifirst:
# (rbar - ifirst) + 1
# ]
i_by_t = index[mask]
i_0 = i_by_t[0]
abs_slc = slice(
i_0,
i_by_t[-1],
)
# slice data by offset from the first index
# available in the passed datum set.
read_slc = slice(
0,
i_by_t[-1] - i_0,
)
if not return_data:
return (
abs_slc,
read_slc,
)
# also return the readable data from the timerange
return (
abs_slc,
read_slc,
arr[mask],
)
def view_data(
self,
plot: PlotItem,
timeframe_s: int = 1,
) -> np.ndarray:
# get far-side x-indices plot view
vr = plot.viewRect()
l = vr.left()
r = vr.right()
(
abs_slc,
buf_slc,
iv_arr,
) = self.slice_from_time(
start_t=l,
stop_t=r,
timeframe_s=timeframe_s,
return_data=True,
)
return iv_arr

View File

@ -29,7 +29,6 @@ import time
from types import ModuleType from types import ModuleType
from typing import ( from typing import (
Any, Any,
AsyncIterator,
AsyncContextManager, AsyncContextManager,
Callable, Callable,
Optional, Optional,
@ -57,11 +56,10 @@ from .._daemon import (
maybe_spawn_brokerd, maybe_spawn_brokerd,
check_for_service, check_for_service,
) )
from ._flumes import Flume
from ._sharedmem import ( from ._sharedmem import (
maybe_open_shm_array, maybe_open_shm_array,
attach_shm_array,
ShmArray, ShmArray,
_Token,
_secs_in_day, _secs_in_day,
) )
from .ingest import get_ingestormod from .ingest import get_ingestormod
@ -76,7 +74,6 @@ from ._sampling import (
sampler, sampler,
broadcast, broadcast,
increment_ohlc_buffer, increment_ohlc_buffer,
iter_ohlc_periods,
sample_and_broadcast, sample_and_broadcast,
uniform_rate_send, uniform_rate_send,
_default_delay_s, _default_delay_s,
@ -87,7 +84,6 @@ from ..brokers._util import (
if TYPE_CHECKING: if TYPE_CHECKING:
from .marketstore import Storage from .marketstore import Storage
from pyqtgraph import PlotItem
log = get_logger(__name__) log = get_logger(__name__)
@ -894,258 +890,6 @@ async def manage_history(
await trio.sleep_forever() await trio.sleep_forever()
class Flume(Struct):
'''
Composite reference type which points to all the addressing handles
and other meta-data necessary for the read, measure and management
of a set of real-time updated data flows.
Can be thought of as a "flow descriptor" or "flow frame" which
describes the high level properties of a set of data flows that can
be used seamlessly across process-memory boundaries.
Each instance's sub-components normally includes:
- a msg oriented quote stream provided via an IPC transport
- history and real-time shm buffers which are both real-time
updated and backfilled.
- associated startup indexing information related to both buffer
real-time-append and historical prepend addresses.
- low level APIs to read and measure the updated data and manage
queuing properties.
'''
symbol: Symbol
first_quote: dict
_hist_shm_token: _Token
_rt_shm_token: _Token
# private shm refs loaded dynamically from tokens
_hist_shm: ShmArray | None = None
_rt_shm: ShmArray | None = None
stream: tractor.MsgStream | None = None
izero_hist: int = 0
izero_rt: int = 0
throttle_rate: int | None = None
# TODO: do we need this really if we can pull the `Portal` from
# ``tractor``'s internals?
feed: Feed | None = None
@property
def rt_shm(self) -> ShmArray:
if self._rt_shm is None:
self._rt_shm = attach_shm_array(
token=self._rt_shm_token,
readonly=True,
)
return self._rt_shm
@property
def hist_shm(self) -> ShmArray:
if self._hist_shm is None:
self._hist_shm = attach_shm_array(
token=self._hist_shm_token,
readonly=True,
)
return self._hist_shm
async def receive(self) -> dict:
return await self.stream.receive()
@acm
async def index_stream(
self,
delay_s: int = 1,
) -> AsyncIterator[int]:
if not self.feed:
raise RuntimeError('This flume is not part of any ``Feed``?')
# TODO: maybe a public (property) API for this in ``tractor``?
portal = self.stream._ctx._portal
assert portal
# XXX: this should be singleton on a host,
# a lone broker-daemon per provider should be
# created for all practical purposes
async with maybe_open_context(
acm_func=partial(
portal.open_context,
iter_ohlc_periods,
),
kwargs={'delay_s': delay_s},
) as (cache_hit, (ctx, first)):
async with ctx.open_stream() as istream:
if cache_hit:
# add a new broadcast subscription for the quote stream
# if this feed is likely already in use
async with istream.subscribe() as bistream:
yield bistream
else:
yield istream
def get_ds_info(
self,
) -> tuple[float, float, float]:
'''
Compute the "downsampling" ratio info between the historical shm
buffer and the real-time (HFT) one.
Return a tuple of the fast sample period, historical sample
period and ratio between them.
'''
times = self.hist_shm.array['time']
end = pendulum.from_timestamp(times[-1])
start = pendulum.from_timestamp(times[times != times[-1]][-1])
hist_step_size_s = (end - start).seconds
times = self.rt_shm.array['time']
end = pendulum.from_timestamp(times[-1])
start = pendulum.from_timestamp(times[times != times[-1]][-1])
rt_step_size_s = (end - start).seconds
ratio = hist_step_size_s / rt_step_size_s
return (
rt_step_size_s,
hist_step_size_s,
ratio,
)
# TODO: get native msgspec decoding for these workinn
def to_msg(self) -> dict:
msg = self.to_dict()
msg['symbol'] = msg['symbol'].to_dict()
# can't serialize the stream or feed objects, it's expected
# you'll have a ref to it since this msg should be rxed on
# a stream on whatever far end IPC..
msg.pop('stream')
msg.pop('feed')
return msg
@classmethod
def from_msg(cls, msg: dict) -> dict:
symbol = Symbol(**msg.pop('symbol'))
return cls(
symbol=symbol,
**msg,
)
def get_index(
self,
time_s: float,
) -> int:
'''
Return array shm-buffer index for for epoch time.
'''
array = self.rt_shm.array
times = array['time']
mask = (times >= time_s)
if any(mask):
return array['index'][mask][0]
# just the latest index
array['index'][-1]
def slice_from_time(
self,
array: np.ndarray,
start_t: float,
stop_t: float,
timeframe_s: int = 1,
return_data: bool = False,
) -> np.ndarray:
'''
Slice an input struct array providing only datums
"in view" of this chart.
'''
arr = {
1: self.rt_shm.array,
60: self.hist_shm.arry,
}[timeframe_s]
times = arr['time']
index = array['index']
# use advanced indexing to map the
# time range to the index range.
mask = (
(times >= start_t)
&
(times < stop_t)
)
# TODO: if we can ensure each time field has a uniform
# step we can instead do some arithmetic to determine
# the equivalent index like we used to?
# return array[
# lbar - ifirst:
# (rbar - ifirst) + 1
# ]
i_by_t = index[mask]
i_0 = i_by_t[0]
abs_slc = slice(
i_0,
i_by_t[-1],
)
# slice data by offset from the first index
# available in the passed datum set.
read_slc = slice(
0,
i_by_t[-1] - i_0,
)
if not return_data:
return (
abs_slc,
read_slc,
)
# also return the readable data from the timerange
return (
abs_slc,
read_slc,
arr[mask],
)
def view_data(
self,
plot: PlotItem,
timeframe_s: int = 1,
) -> np.ndarray:
# get far-side x-indices plot view
vr = plot.viewRect()
l = vr.left()
r = vr.right()
(
abs_slc,
buf_slc,
iv_arr,
) = self.slice_from_time(
start_t=l,
stop_t=r,
timeframe_s=timeframe_s,
return_data=True,
)
return iv_arr
async def allocate_persistent_feed( async def allocate_persistent_feed(
bus: _FeedsBus, bus: _FeedsBus,
sub_registered: trio.Event, sub_registered: trio.Event,