211 lines
6.3 KiB
Python
211 lines
6.3 KiB
Python
# piker: trading gear for hackers
|
|
# Copyright (C) Tyler Goodlet (in stewardship for pikers)
|
|
|
|
# This program is free software: you can redistribute it and/or modify
|
|
# it under the terms of the GNU Affero General Public License as published by
|
|
# the Free Software Foundation, either version 3 of the License, or
|
|
# (at your option) any later version.
|
|
|
|
# This program is distributed in the hope that it will be useful,
|
|
# but WITHOUT ANY WARRANTY; without even the implied warranty of
|
|
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
|
|
# GNU Affero General Public License for more details.
|
|
|
|
# You should have received a copy of the GNU Affero General Public License
|
|
# along with this program. If not, see <https://www.gnu.org/licenses/>.
|
|
|
|
"""
|
|
abstractions for organizing, managing and generally operating-on
|
|
real-time data processing data-structures.
|
|
|
|
"Streams, flumes, cascades and flows.."
|
|
|
|
"""
|
|
from __future__ import annotations
|
|
from typing import (
|
|
TYPE_CHECKING,
|
|
)
|
|
|
|
import tractor
|
|
import pendulum
|
|
import numpy as np
|
|
|
|
from .types import Struct
|
|
from ._source import (
|
|
Symbol,
|
|
)
|
|
from ._sharedmem import (
|
|
attach_shm_array,
|
|
ShmArray,
|
|
_Token,
|
|
)
|
|
# from .._profile import (
|
|
# Profiler,
|
|
# pg_profile_enabled,
|
|
# )
|
|
|
|
if TYPE_CHECKING:
|
|
# from pyqtgraph import PlotItem
|
|
from .feed import Feed
|
|
|
|
|
|
# TODO: ideas for further abstractions as per
|
|
# https://github.com/pikers/piker/issues/216 and
|
|
# https://github.com/pikers/piker/issues/270:
|
|
# - a ``Cascade`` would be the minimal "connection" of 2 ``Flumes``
|
|
# as per circuit parlance:
|
|
# https://en.wikipedia.org/wiki/Two-port_network#Cascade_connection
|
|
# - could cover the combination of our `FspAdmin` and the
|
|
# backend `.fsp._engine` related machinery to "connect" one flume
|
|
# to another?
|
|
# - a (financial signal) ``Flow`` would be the a "collection" of such
|
|
# minmial cascades. Some engineering based jargon concepts:
|
|
# - https://en.wikipedia.org/wiki/Signal_chain
|
|
# - https://en.wikipedia.org/wiki/Daisy_chain_(electrical_engineering)
|
|
# - https://en.wikipedia.org/wiki/Audio_signal_flow
|
|
# - https://en.wikipedia.org/wiki/Digital_signal_processing#Implementation
|
|
# - https://en.wikipedia.org/wiki/Dataflow_programming
|
|
# - https://en.wikipedia.org/wiki/Signal_programming
|
|
# - https://en.wikipedia.org/wiki/Incremental_computing
|
|
|
|
|
|
class Flume(Struct):
|
|
'''
|
|
Composite reference type which points to all the addressing handles
|
|
and other meta-data necessary for the read, measure and management
|
|
of a set of real-time updated data flows.
|
|
|
|
Can be thought of as a "flow descriptor" or "flow frame" which
|
|
describes the high level properties of a set of data flows that can
|
|
be used seamlessly across process-memory boundaries.
|
|
|
|
Each instance's sub-components normally includes:
|
|
- a msg oriented quote stream provided via an IPC transport
|
|
- history and real-time shm buffers which are both real-time
|
|
updated and backfilled.
|
|
- associated startup indexing information related to both buffer
|
|
real-time-append and historical prepend addresses.
|
|
- low level APIs to read and measure the updated data and manage
|
|
queuing properties.
|
|
|
|
'''
|
|
symbol: Symbol
|
|
first_quote: dict
|
|
_rt_shm_token: _Token
|
|
|
|
# optional since some data flows won't have a "downsampled" history
|
|
# buffer/stream (eg. FSPs).
|
|
_hist_shm_token: _Token | None = None
|
|
|
|
# private shm refs loaded dynamically from tokens
|
|
_hist_shm: ShmArray | None = None
|
|
_rt_shm: ShmArray | None = None
|
|
|
|
stream: tractor.MsgStream | None = None
|
|
izero_hist: int = 0
|
|
izero_rt: int = 0
|
|
throttle_rate: int | None = None
|
|
|
|
# TODO: do we need this really if we can pull the `Portal` from
|
|
# ``tractor``'s internals?
|
|
feed: Feed | None = None
|
|
|
|
@property
|
|
def rt_shm(self) -> ShmArray:
|
|
|
|
if self._rt_shm is None:
|
|
self._rt_shm = attach_shm_array(
|
|
token=self._rt_shm_token,
|
|
readonly=True,
|
|
)
|
|
|
|
return self._rt_shm
|
|
|
|
@property
|
|
def hist_shm(self) -> ShmArray:
|
|
|
|
if self._hist_shm_token is None:
|
|
raise RuntimeError(
|
|
'No shm token has been set for the history buffer?'
|
|
)
|
|
|
|
if (
|
|
self._hist_shm is None
|
|
):
|
|
self._hist_shm = attach_shm_array(
|
|
token=self._hist_shm_token,
|
|
readonly=True,
|
|
)
|
|
|
|
return self._hist_shm
|
|
|
|
async def receive(self) -> dict:
|
|
return await self.stream.receive()
|
|
|
|
def get_ds_info(
|
|
self,
|
|
) -> tuple[float, float, float]:
|
|
'''
|
|
Compute the "downsampling" ratio info between the historical shm
|
|
buffer and the real-time (HFT) one.
|
|
|
|
Return a tuple of the fast sample period, historical sample
|
|
period and ratio between them.
|
|
|
|
'''
|
|
times = self.hist_shm.array['time']
|
|
end = pendulum.from_timestamp(times[-1])
|
|
start = pendulum.from_timestamp(times[times != times[-1]][-1])
|
|
hist_step_size_s = (end - start).seconds
|
|
|
|
times = self.rt_shm.array['time']
|
|
end = pendulum.from_timestamp(times[-1])
|
|
start = pendulum.from_timestamp(times[times != times[-1]][-1])
|
|
rt_step_size_s = (end - start).seconds
|
|
|
|
ratio = hist_step_size_s / rt_step_size_s
|
|
return (
|
|
rt_step_size_s,
|
|
hist_step_size_s,
|
|
ratio,
|
|
)
|
|
|
|
# TODO: get native msgspec decoding for these workinn
|
|
def to_msg(self) -> dict:
|
|
msg = self.to_dict()
|
|
msg['symbol'] = msg['symbol'].to_dict()
|
|
|
|
# can't serialize the stream or feed objects, it's expected
|
|
# you'll have a ref to it since this msg should be rxed on
|
|
# a stream on whatever far end IPC..
|
|
msg.pop('stream')
|
|
msg.pop('feed')
|
|
return msg
|
|
|
|
@classmethod
|
|
def from_msg(cls, msg: dict) -> dict:
|
|
symbol = Symbol(**msg.pop('symbol'))
|
|
return cls(
|
|
symbol=symbol,
|
|
**msg,
|
|
)
|
|
|
|
def get_index(
|
|
self,
|
|
time_s: float,
|
|
array: np.ndarray,
|
|
|
|
) -> int | float:
|
|
'''
|
|
Return array shm-buffer index for for epoch time.
|
|
|
|
'''
|
|
times = array['time']
|
|
first = np.searchsorted(
|
|
times,
|
|
time_s,
|
|
side='left',
|
|
)
|
|
imx = times.shape[0] - 1
|
|
return min(first, imx)
|