Add `Flow` type with a real chitty mxmn cacheing method

This new type wraps a shm data flow and will eventually include things
like incremental path-graphics updates and serialization + bg downsampling
techniques. The main immediate motivation was to get a cached y-range max/min
calc going since profiling revealed the `numpy` equivalents were
actually quite slow as the data set grows large. Likely we can use all
this to drive a streaming mx/mn routine that's always launched as part
of each on-host flow.

This is our official foray into use of `msgspec.Struct` B) and I have to
say, pretty impressed; we'll likely completely ditch `pydantic` from
here on out.
big_data_lines
Tyler Goodlet 2022-04-06 17:05:57 -04:00
parent a1de097a43
commit eec329a221
1 changed files with 116 additions and 44 deletions

View File

@ -34,10 +34,11 @@ from PyQt5.QtWidgets import (
QVBoxLayout,
QSplitter,
)
import msgspec
import numpy as np
# from pydantic import BaseModel
import pyqtgraph as pg
import trio
from pydantic import BaseModel
from ._axes import (
DynamicDateAxis,
@ -59,10 +60,14 @@ from ._style import (
)
from ..data.feed import Feed
from ..data._source import Symbol
from ..data._sharedmem import ShmArray
from ..data._sharedmem import (
ShmArray,
# _Token,
)
from ..log import get_logger
from ._interaction import ChartView
from ._forms import FieldsForm
from .._profile import pg_profile_enabled, ms_slower_then
from ._overlay import PlotItemOverlay
if TYPE_CHECKING:
@ -663,19 +668,90 @@ class LinkedSplits(QWidget):
# flows: dict[str, np.ndarray] = {}
class Flow(BaseModel):
class Flow(msgspec.Struct): # , frozen=True):
'''
(FinancialSignal-)Flow compound type which wraps a real-time
graphics (curve) and its backing data stream together for high level
access and control.
'''
class Config:
arbitrary_types_allowed = True
The intention is for this type to eventually be capable of shm-passing
of incrementally updated graphics stream data between actors.
'''
name: str
plot: pg.PlotItem
shm: Optional[ShmArray] = None # may be filled in "later"
is_ohlc: bool = False
# TODO: hackery to be able to set a shm later
# but whilst also allowing this type to hashable,
# likely will require serializable token that is used to attach
# to the underlying shm ref after startup?
_shm: Optional[ShmArray] = None # currently, may be filled in "later"
# cache of y-range values per x-range input.
_mxmns: dict[tuple[int, int], tuple[float, float]] = {}
@property
def shm(self) -> ShmArray:
return self._shm
@shm.setter
def shm(self, shm: ShmArray) -> ShmArray:
self._shm = shm
def maxmin(
self,
lbar,
rbar,
) -> tuple[float, float]:
'''
Compute the cached max and min y-range values for a given
x-range determined by ``lbar`` and ``rbar``.
'''
rkey = (lbar, rbar)
result = self._mxmns.get(rkey)
if result:
return result
shm = self.shm
if shm is None:
# print(f'no shm {self.name}?')
return 0, 0
arr = shm.array
# build relative indexes into shm array
# TODO: should we just add/use a method
# on the shm to do this?
ifirst = arr[0]['index']
slice_view = arr[
lbar - ifirst:(rbar - ifirst) + 1]
if not slice_view.size:
# print(f'no data in view {self.name}?')
return 0, 0
if self.is_ohlc:
ylow = np.min(slice_view['low'])
yhigh = np.max(slice_view['high'])
else:
view = slice_view[self.name]
ylow = np.min(view)
yhigh = np.max(view)
# else:
# ylow, yhigh = 0, 0
result = ylow, yhigh
if result != (0, 0):
self._mxmns[rkey] = result
if self.name == 'drk_vlm':
print(f'{self.name} mxmn @ {rkey} -> {result}')
return result
class ChartPlotWidget(pg.PlotWidget):
@ -1005,6 +1081,13 @@ class ChartPlotWidget(pg.PlotWidget):
data_key = array_key or name
self._graphics[data_key] = graphics
self._flows[data_key] = Flow(
name=name,
plot=self.plotItem,
is_ohlc=True,
)
self._add_sticky(name, bg_color='davies')
return graphics, data_key
@ -1122,6 +1205,12 @@ class ChartPlotWidget(pg.PlotWidget):
pi = self.plotItem
self._flows[data_key] = Flow(
name=name,
plot=pi,
is_ohlc=False,
)
# TODO: this probably needs its own method?
if overlay:
if isinstance(overlay, pg.PlotItem):
@ -1130,10 +1219,6 @@ class ChartPlotWidget(pg.PlotWidget):
f'{overlay} must be from `.plotitem_overlay()`'
)
pi = overlay
# anchor_at = ('bottom', 'left')
self._flows[name] = Flow(name=name, plot=pi)
else:
# anchor_at = ('top', 'left')
@ -1342,46 +1427,33 @@ class ChartPlotWidget(pg.PlotWidget):
If ``bars_range`` is provided use that range.
'''
l, lbar, rbar, r = bars_range or self.bars_range()
# TODO: logic to check if end of bars in view
# extra = view_len - _min_points_to_show
# begin = self._arrays['ohlc'][0]['index'] - extra
# # end = len(self._arrays['ohlc']) - 1 + extra
# end = self._arrays['ohlc'][-1]['index'] - 1 + extra
profiler = pg.debug.Profiler(
disabled=not pg_profile_enabled(),
gt=ms_slower_then,
delayed=True,
)
# bars_len = rbar - lbar
# log.debug(
# f"\nl: {l}, lbar: {lbar}, rbar: {rbar}, r: {r}\n"
# f"view_len: {view_len}, bars_len: {bars_len}\n"
# f"begin: {begin}, end: {end}, extra: {extra}"
# )
l, lbar, rbar, r = bars_range or self.bars_range()
profiler(f'{self.name} got bars range')
# TODO: here we should instead look up the ``Flow.shm.array``
# and read directly from shm to avoid copying to memory first
# and then reading it again here.
a = self._arrays.get(name or self.name)
if a is None:
return None
ifirst = a[0]['index']
bars = a[lbar - ifirst:(rbar - ifirst) + 1]
if not len(bars):
# likely no data loaded yet or extreme scrolling?
log.error(f"WTF bars_range = {lbar}:{rbar}")
return
flow_key = name or self.name
flow = self._flows.get(flow_key)
if (
self.data_key == self.linked.symbol.key
flow is None
):
# ohlc sampled bars hi/lo lookup
ylow = np.nanmin(bars['low'])
yhigh = np.nanmax(bars['high'])
print(f"flow {flow_key} doesn't exist in chart {self.name}")
return 0, 0
else:
view = bars[name or self.data_key]
ylow = np.nanmin(view)
yhigh = np.nanmax(view)
key = round(lbar), round(rbar)
res = flow.maxmin(*key)
profiler(f'{key} max-min {res}')
if res == (0, 0):
log.error(
f"{flow_key} -> (0, 0) for bars_range = {key}"
)
# print(f'{(ylow, yhigh)}')
return ylow, yhigh
return res