# piker: trading gear for hackers # Copyright (C) Tyler Goodlet (in stewardship for pikers) # This program is free software: you can redistribute it and/or modify # it under the terms of the GNU Affero General Public License as published by # the Free Software Foundation, either version 3 of the License, or # (at your option) any later version. # This program is distributed in the hope that it will be useful, # but WITHOUT ANY WARRANTY; without even the implied warranty of # MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the # GNU Affero General Public License for more details. # You should have received a copy of the GNU Affero General Public License # along with this program. If not, see . """ Public abstractions for organizing, managing and generally operating-on real-time data processing data-structures. "Streams, flumes, cascades and flows.." """ from __future__ import annotations from typing import ( TYPE_CHECKING, ) import tractor import pendulum import numpy as np from piker.types import Struct from ._sharedmem import ( attach_shm_array, ShmArray, _Token, ) from piker.accounting import MktPair if TYPE_CHECKING: from piker.data.feed import Feed class Flume(Struct): ''' Composite reference type which points to all the addressing handles and other meta-data necessary for the read, measure and management of a set of real-time updated data flows. Can be thought of as a "flow descriptor" or "flow frame" which describes the high level properties of a set of data flows that can be used seamlessly across process-memory boundaries. Each instance's sub-components normally includes: - a msg oriented quote stream provided via an IPC transport - history and real-time shm buffers which are both real-time updated and backfilled. - associated startup indexing information related to both buffer real-time-append and historical prepend addresses. - low level APIs to read and measure the updated data and manage queuing properties. ''' mkt: MktPair first_quote: dict _rt_shm_token: _Token # optional since some data flows won't have a "downsampled" history # buffer/stream (eg. FSPs). _hist_shm_token: _Token | None = None # private shm refs loaded dynamically from tokens _hist_shm: ShmArray | None = None _rt_shm: ShmArray | None = None _readonly: bool = True stream: tractor.MsgStream | None = None izero_hist: int = 0 izero_rt: int = 0 throttle_rate: int | None = None # TODO: do we need this really if we can pull the `Portal` from # ``tractor``'s internals? feed: Feed|None = None @property def rt_shm(self) -> ShmArray: if self._rt_shm is None: self._rt_shm = attach_shm_array( token=self._rt_shm_token, readonly=self._readonly, ) return self._rt_shm @property def hist_shm(self) -> ShmArray: if self._hist_shm_token is None: raise RuntimeError( 'No shm token has been set for the history buffer?' ) if self._hist_shm is None: self._hist_shm = attach_shm_array( token=self._hist_shm_token, readonly=self._readonly, ) return self._hist_shm async def receive(self) -> dict: return await self.stream.receive() def get_ds_info( self, ) -> tuple[float, float, float]: ''' Compute the "downsampling" ratio info between the historical shm buffer and the real-time (HFT) one. Return a tuple of the fast sample period, historical sample period and ratio between them. ''' times: np.ndarray = self.hist_shm.array['time'] end: float | int = pendulum.from_timestamp(times[-1]) start: float | int = pendulum.from_timestamp(times[times != times[-1]][-1]) hist_step_size_s: float = (end - start).seconds times = self.rt_shm.array['time'] end = pendulum.from_timestamp(times[-1]) start = pendulum.from_timestamp(times[times != times[-1]][-1]) rt_step_size_s = (end - start).seconds ratio = hist_step_size_s / rt_step_size_s return ( rt_step_size_s, hist_step_size_s, ratio, ) # TODO: get native msgspec decoding for these workinn def to_msg(self) -> dict: msg = self.to_dict() msg['mkt'] = self.mkt.to_dict() # NOTE: pop all un-msg-serializable fields: # - `tractor.MsgStream` # - `Feed` # - `Shmarray` # it's expected the `.from_msg()` on the other side # will get instead some kind of msg-compat version # that it can load. msg.pop('stream') msg.pop('feed') msg.pop('_rt_shm') msg.pop('_hist_shm') return msg @classmethod def from_msg( cls, msg: dict, readonly: bool = True, ) -> dict: ''' Load from an IPC msg presumably in either `dict` or `msgspec.Struct` form. ''' mkt_msg = msg.pop('mkt') from ..accounting import MktPair # cycle otherwise.. mkt = MktPair.from_msg(mkt_msg) msg |= {'_readonly': readonly} return cls( mkt=mkt, **msg, ) def get_index( self, time_s: float, array: np.ndarray, ) -> int | float: ''' Return array shm-buffer index for for epoch time. ''' times = array['time'] first = np.searchsorted( times, time_s, side='left', ) imx = times.shape[0] - 1 return min(first, imx) # only set by external msg or creator, never # manually! _has_vlm: bool = True def has_vlm(self) -> bool: if not self._has_vlm: return False # make sure that the instrument supports volume history # (sometimes this is not the case for some commodities and # derivatives) vlm: np.ndarray = self.rt_shm.array['volume'] return not bool( np.all(np.isin(vlm, -1)) or np.all(np.isnan(vlm)) )