Compare commits

..

No commits in common. "cd79eabe61113f1350c1418f257549b69fefc2f3" and "3a7e00e2874ba73b42c3a7926525e1aeced6d4d1" have entirely different histories.

8 changed files with 268 additions and 275 deletions

View File

@ -1592,7 +1592,7 @@ async def backfill_bars(
# on that until we have the `marketstore` daemon in place in which
# case the shm size will be driven by user config and available sys
# memory.
count: int = 22,
count: int = 59,
task_status: TaskStatus[trio.CancelScope] = trio.TASK_STATUS_IGNORED,

View File

@ -240,13 +240,7 @@ class ShmArray:
def last(
self,
length: int = 1,
) -> np.ndarray:
'''
Return the last ``length``'s worth of ("row") entries from the
array.
'''
return self.array[-length:]
def push(

View File

@ -658,6 +658,102 @@ class LinkedSplits(QWidget):
cpw.sidepane.setMaximumWidth(sp_w)
# class FlowsTable(pydantic.BaseModel):
# '''
# Data-AGGRegate: high level API onto multiple (categorized)
# ``Flow``s with high level processing routines for
# multi-graphics computations and display.
# '''
# flows: dict[str, np.ndarray] = {}
class Flow(msgspec.Struct): # , frozen=True):
'''
(FinancialSignal-)Flow compound type which wraps a real-time
graphics (curve) and its backing data stream together for high level
access and control.
The intention is for this type to eventually be capable of shm-passing
of incrementally updated graphics stream data between actors.
'''
name: str
plot: pg.PlotItem
is_ohlc: bool = False
# TODO: hackery to be able to set a shm later
# but whilst also allowing this type to hashable,
# likely will require serializable token that is used to attach
# to the underlying shm ref after startup?
_shm: Optional[ShmArray] = None # currently, may be filled in "later"
# cache of y-range values per x-range input.
_mxmns: dict[tuple[int, int], tuple[float, float]] = {}
@property
def shm(self) -> ShmArray:
return self._shm
@shm.setter
def shm(self, shm: ShmArray) -> ShmArray:
self._shm = shm
def maxmin(
self,
lbar,
rbar,
) -> tuple[float, float]:
'''
Compute the cached max and min y-range values for a given
x-range determined by ``lbar`` and ``rbar``.
'''
rkey = (lbar, rbar)
result = self._mxmns.get(rkey)
if result:
return result
shm = self.shm
if shm is None:
# print(f'no shm {self.name}?')
return 0, 0
arr = shm.array
# build relative indexes into shm array
# TODO: should we just add/use a method
# on the shm to do this?
ifirst = arr[0]['index']
slice_view = arr[
lbar - ifirst:(rbar - ifirst) + 1]
if not slice_view.size:
# print(f'no data in view {self.name}?')
return 0, 0
if self.is_ohlc:
ylow = np.min(slice_view['low'])
yhigh = np.max(slice_view['high'])
else:
view = slice_view[self.name]
ylow = np.min(view)
yhigh = np.max(view)
# else:
# ylow, yhigh = 0, 0
result = ylow, yhigh
if result != (0, 0):
self._mxmns[rkey] = result
if self.name == 'drk_vlm':
print(f'{self.name} mxmn @ {rkey} -> {result}')
return result
class ChartPlotWidget(pg.PlotWidget):
'''
``GraphicsView`` subtype containing a single ``PlotItem``.
@ -990,7 +1086,6 @@ class ChartPlotWidget(pg.PlotWidget):
name=name,
plot=self.plotItem,
is_ohlc=True,
graphics=graphics,
)
self._add_sticky(name, bg_color='davies')
@ -1064,7 +1159,6 @@ class ChartPlotWidget(pg.PlotWidget):
overlay: bool = False,
color: Optional[str] = None,
add_label: bool = True,
pi: Optional[pg.PlotItem] = None,
**pdi_kwargs,
@ -1109,13 +1203,12 @@ class ChartPlotWidget(pg.PlotWidget):
self._graphics[name] = curve
self._arrays[data_key] = data
pi = pi or self.plotItem
pi = self.plotItem
self._flows[data_key] = Flow(
name=name,
plot=pi,
is_ohlc=False,
graphics=curve,
)
# TODO: this probably needs its own method?
@ -1335,7 +1428,6 @@ class ChartPlotWidget(pg.PlotWidget):
'''
profiler = pg.debug.Profiler(
msg=f'`{str(self)}.maxmin()` loop cycle for: `{self.name}`',
disabled=not pg_profile_enabled(),
gt=ms_slower_then,
delayed=True,
@ -1352,113 +1444,16 @@ class ChartPlotWidget(pg.PlotWidget):
if (
flow is None
):
log.error(f"flow {flow_key} doesn't exist in chart {self.name} !?")
res = 0, 0
print(f"flow {flow_key} doesn't exist in chart {self.name}")
return 0, 0
else:
key = round(lbar), round(rbar)
res = flow.maxmin(*key)
profiler(f'yrange mxmn: {key} -> {res}')
if res == (None, None):
profiler(f'{key} max-min {res}')
if res == (0, 0):
log.error(
f"{flow_key} no mxmn for bars_range => {key} !?"
f"{flow_key} -> (0, 0) for bars_range = {key}"
)
res = 0, 0
return res
# class FlowsTable(pydantic.BaseModel):
# '''
# Data-AGGRegate: high level API onto multiple (categorized)
# ``Flow``s with high level processing routines for
# multi-graphics computations and display.
# '''
# flows: dict[str, np.ndarray] = {}
class Flow(msgspec.Struct): # , frozen=True):
'''
(FinancialSignal-)Flow compound type which wraps a real-time
graphics (curve) and its backing data stream together for high level
access and control.
The intention is for this type to eventually be capable of shm-passing
of incrementally updated graphics stream data between actors.
'''
name: str
plot: pg.PlotItem
is_ohlc: bool = False
graphics: pg.GraphicsObject
# TODO: hackery to be able to set a shm later
# but whilst also allowing this type to hashable,
# likely will require serializable token that is used to attach
# to the underlying shm ref after startup?
_shm: Optional[ShmArray] = None # currently, may be filled in "later"
# cache of y-range values per x-range input.
_mxmns: dict[tuple[int, int], tuple[float, float]] = {}
@property
def shm(self) -> ShmArray:
return self._shm
@shm.setter
def shm(self, shm: ShmArray) -> ShmArray:
self._shm = shm
def maxmin(
self,
lbar,
rbar,
) -> tuple[float, float]:
'''
Compute the cached max and min y-range values for a given
x-range determined by ``lbar`` and ``rbar``.
'''
rkey = (lbar, rbar)
cached_result = self._mxmns.get(rkey)
if cached_result:
return cached_result
shm = self.shm
if shm is None:
mxmn = None
else: # new block for profiling?..
arr = shm.array
# build relative indexes into shm array
# TODO: should we just add/use a method
# on the shm to do this?
ifirst = arr[0]['index']
slice_view = arr[
lbar - ifirst:
(rbar - ifirst) + 1
]
if not slice_view.size:
mxmn = None
else:
if self.is_ohlc:
ylow = np.min(slice_view['low'])
yhigh = np.max(slice_view['high'])
else:
view = slice_view[self.name]
ylow = np.min(view)
yhigh = np.max(view)
mxmn = ylow, yhigh
if mxmn is not None:
# cache new mxmn result
self._mxmns[rkey] = mxmn
return mxmn

View File

@ -260,13 +260,12 @@ def ds_m4(
if log_scale:
assert uppx, 'You must provide a `uppx` value to use log scaling!'
# scaler = 2**7 / (1 + math.log(uppx, 2))
scaler = round(
max(
# NOTE: found that a 16x px width brought greater
# detail, likely due to dpi scaling?
# px_width=px_width * 16,
2**7 / (1 + math.log(uppx, 2)),
2**6 / (1 + math.log(uppx, 2)),
1
)
)

View File

@ -408,7 +408,7 @@ class FastAppendCurve(pg.GraphicsObject):
self._in_ds = False
elif should_ds and px_width:
elif should_ds:
x_out, y_out = self.downsample(
x_out,
y_out,

View File

@ -30,6 +30,7 @@ import numpy as np
import tractor
import trio
import pyqtgraph as pg
from PyQt5.QtCore import QLineF
from .. import brokers
from ..data.feed import open_feed
@ -72,20 +73,13 @@ _tick_groups = {
}
# TODO: delegate this to each `Flow.maxmin()` which includes
# caching and further we should implement the following stream based
# approach, likely with ``numba``:
# https://arxiv.org/abs/cs/0610046
# https://github.com/lemire/pythonmaxmin
def chart_maxmin(
chart: ChartPlotWidget,
ohlcv_shm: ShmArray,
vlm_chart: Optional[ChartPlotWidget] = None,
) -> tuple[
tuple[int, int, int, int],
float,
float,
float,
@ -94,6 +88,11 @@ def chart_maxmin(
Compute max and min datums "in view" for range limits.
'''
# TODO: implement this
# https://arxiv.org/abs/cs/0610046
# https://github.com/lemire/pythonmaxmin
# array = chart._arrays[chart.name]
array = ohlcv_shm.array
ifirst = array[0]['index']
@ -106,23 +105,18 @@ def chart_maxmin(
chart.default_view()
return (last_bars_range, 0, 0, 0)
mx, mn = (
np.nanmax(in_view['high']),
np.nanmin(in_view['low'],)
)
mx, mn = np.nanmax(in_view['high']), np.nanmin(in_view['low'])
# TODO: when we start using line charts, probably want to make
# this an overloaded call on our `DataView
# sym = chart.name
# mx, mn = np.nanmax(in_view[sym]), np.nanmin(in_view[sym])
mx_vlm_in_view = 0
if vlm_chart:
mx_vlm_in_view = np.max(
in_view['volume']
)
mx_vlm_in_view = np.max(in_view['volume'])
return (
last_bars_range,
mx,
max(mn, 0), # presuming price can't be negative?
mx_vlm_in_view,
)
return last_bars_range, mx, max(mn, 0), mx_vlm_in_view
@dataclass
@ -278,9 +272,8 @@ async def graphics_update_loop(
chart.default_view()
# main real-time quotes update loop
# main loop
async for quotes in stream:
ds.quotes = quotes
quote_period = time.time() - last_quote
quote_rate = round(
@ -317,50 +310,28 @@ def graphics_update_cycle(
wap_in_history: bool = False,
) -> None:
# TODO: eventually optimize this whole graphics stack with ``numba``
# hopefully XD
chart = ds.chart
profiler = pg.debug.Profiler(
msg=f'Graphics loop cycle for: `{chart.name}`',
disabled=True, # not pg_profile_enabled(),
gt=1/12 * 1e3,
# gt=ms_slower_then,
)
# unpack multi-referenced components
chart = ds.chart
vlm_chart = ds.vlm_chart
l1 = ds.l1
ohlcv = ds.ohlcv
array = ohlcv.array
vars = ds.vars
tick_margin = vars['tick_margin']
update_uppx = 6
update_uppx = 5
for sym, quote in ds.quotes.items():
# compute the first available graphic's x-units-per-pixel
xpx = vlm_chart.view.x_uppx()
# NOTE: vlm may be written by the ``brokerd`` backend
# event though a tick sample is not emitted.
# TODO: show dark trades differently
# https://github.com/pikers/piker/issues/116
# NOTE: this used to be implemented in a dedicated
# "increment task": ``check_for_new_bars()`` but it doesn't
# make sense to do a whole task switch when we can just do
# this simple index-diff and all the fsp sub-curve graphics
# are diffed on each draw cycle anyway; so updates to the
# "curve" length is already automatic.
# increment the view position by the sample offset.
i_step = ohlcv.index
i_diff = i_step - vars['i_last']
vars['i_last'] = i_step
(
brange,
mx_in_view,
@ -373,12 +344,83 @@ def graphics_update_cycle(
mn = mn_in_view - tick_margin
profiler('maxmin call')
# compute the first available graphic's x-units-per-pixel
xpx = vlm_chart.view.xs_in_px()
# print(f'vlm xpx {xpx}')
in_view = chart.in_view(ohlcv.array)
if lbar != rbar:
# view box width in pxs
w = chart.view.boundingRect().width()
# TODO: a better way to get this?
# i would guess the esiest way is to just
# get the ``.boundingRect()`` of the curve
# in view but maybe there's something smarter?
# Currently we're just mapping the rbar, lbar to
# pixels via:
cw = chart.view.mapViewToDevice(QLineF(lbar, 0, rbar, 0)).length()
# is this faster?
# cw = chart.mapFromView(QLineF(lbar, 0 , rbar, 0)).length()
profiler(
f'view width pxs: {w}\n'
f'curve width pxs: {cw}\n'
f'sliced in view: {in_view.size}'
)
# compress bars to m4 line(s) if uppx is high enough
# if in_view.size > cw:
# from ._compression import ds_m4, hl2mxmn
# mxmn, x = hl2mxmn(in_view)
# profiler('hl tracer')
# nb, x, y = ds_m4(
# x=x,
# y=mxmn,
# # TODO: this needs to actually be the width
# # in pixels of the visible curve since we don't
# # want to downsample any 'zeros' around the curve,
# # just the values that make up the curve graphic,
# # i think?
# px_width=cw,
# )
# profiler(
# 'm4 downsampled\n'
# f' ds bins: {nb}\n'
# f' x.shape: {x.shape}\n'
# f' y.shape: {y.shape}\n'
# f' x: {x}\n'
# f' y: {y}\n'
# )
# assert y.size == mxmn.size
# NOTE: vlm may be written by the ``brokerd`` backend
# event though a tick sample is not emitted.
# TODO: show dark trades differently
# https://github.com/pikers/piker/issues/116
# NOTE: this used to be implemented in a dedicated
# "increment tas": ``check_for_new_bars()`` but it doesn't
# make sense to do a whole task switch when we can just do
# this simple index-diff and all the fsp sub-curve graphics
# are diffed on each draw cycle anyway; so updates to the
# "curve" length is already automatic.
# increment the view position by the sample offset.
i_step = ohlcv.index
i_diff = i_step - vars['i_last']
vars['i_last'] = i_step
# don't real-time "shift" the curve to the
# left under the following conditions:
if (
i_diff > 0 # no new sample step
and xpx < update_uppx # chart is zoomed out very far
and r >= i_step # the last datum is in view
and r >= i_step # the last datum isn't in view
):
# TODO: we should track and compute whether the last
# pixel in a curve should show new data based on uppx
@ -387,9 +429,7 @@ def graphics_update_cycle(
if vlm_chart:
# always update y-label
ds.vlm_sticky.update_from_data(
*array[-1][['index', 'volume']]
)
ds.vlm_sticky.update_from_data(*array[-1][['index', 'volume']])
if (
# if zoomed out alot don't update the last "bar"
@ -420,17 +460,17 @@ def graphics_update_cycle(
mx_vlm_in_view != last_mx_vlm
or mx_vlm_in_view > last_mx_vlm
):
# print(f'mx vlm: {last_mx_vlm} -> {mx_vlm_in_view}')
yrange = (0, mx_vlm_in_view * 1.375)
vlm_chart.view._set_yrange(
yrange=yrange,
)
# print(f'mx vlm: {last_mx_vlm} -> {mx_vlm_in_view}')
vars['last_mx_vlm'] = mx_vlm_in_view
for curve_name, flow in vlm_chart._flows.items():
update_fsp_chart(
vlm_chart,
flow,
flow.shm,
curve_name,
array_key=curve_name,
)
@ -485,6 +525,7 @@ def graphics_update_cycle(
# current) tick first order as an optimization where we only
# update from the last tick from each type class.
# last_clear_updated: bool = False
# for typ, tick in reversed(lasts.items()):
# update ohlc sampled price bars
if (
@ -496,7 +537,7 @@ def graphics_update_cycle(
array,
)
# iterate in FIFO order per tick-frame
# iterate in FIFO order per frame
for typ, tick in lasts.items():
price = tick.get('price')
@ -567,34 +608,42 @@ def graphics_update_cycle(
if (
(mx > vars['last_mx']) or (mn < vars['last_mn'])
and not chart._static_yrange == 'axis'
and r > i_step # the last datum is in view
):
main_vb = chart.view
if (
main_vb._ic is None
or not main_vb._ic.is_set()
):
main_vb._set_yrange(
# TODO: we should probably scale
# the view margin based on the size
# of the true range? This way you can
# slap in orders outside the current
# L1 (only) book range.
# range_margin=0.1,
yrange=(mn, mx),
)
# print(f'new y range: {(mn, mx)}')
chart.view._set_yrange(
yrange=(mn, mx),
# TODO: we should probably scale
# the view margin based on the size
# of the true range? This way you can
# slap in orders outside the current
# L1 (only) book range.
# range_margin=0.1,
)
vars['last_mx'], vars['last_mn'] = mx, mn
# run synchronous update on all linked flows
for curve_name, flow in chart._flows.items():
# TODO: should the "main" (aka source) flow be special?
if curve_name == chart.data_key:
# run synchronous update on all derived fsp subplots
for name, subchart in ds.linked.subplots.items():
if name == 'volume':
continue
update_fsp_chart(
subchart,
subchart._shm,
# XXX: do we really needs seperate names here?
name,
array_key=name,
)
subchart.cv._set_yrange()
# TODO: all overlays on all subplots..
# run synchronous update on all derived overlays
for curve_name, flow in chart._flows.items():
update_fsp_chart(
chart,
flow,
flow.shm,
curve_name,
array_key=curve_name,
)
@ -688,7 +737,6 @@ async def display_symbol_data(
# TODO: a data view api that makes this less shit
chart._shm = ohlcv
chart._flows[chart.data_key].shm = ohlcv
# NOTE: we must immediately tell Qt to show the OHLC chart
# to avoid a race where the subplots get added/shown to
@ -745,7 +793,7 @@ async def display_symbol_data(
# that it isn't double rendered in the display loop
# above since we do a maxmin calc on the volume data to
# determine if auto-range adjustements should be made.
# linked.subplots.pop('volume', None)
linked.subplots.pop('volume', None)
# TODO: make this not so shit XD
# close group status

View File

@ -72,16 +72,12 @@ def has_vlm(ohlcv: ShmArray) -> bool:
def update_fsp_chart(
chart: ChartPlotWidget,
flow,
shm: ShmArray,
graphics_name: str,
array_key: Optional[str],
) -> None:
shm = flow.shm
if not shm:
return
array = shm.array
last_row = try_read(array)
@ -275,7 +271,6 @@ async def run_fsp_ui(
# data looked up from the chart's internal array set.
# TODO: we must get a data view api going STAT!!
chart._shm = shm
chart._flows[chart.data_key].shm = shm
# should **not** be the same sub-chart widget
assert chart.name != linkedsplits.chart.name
@ -287,7 +282,7 @@ async def run_fsp_ui(
# first UI update, usually from shm pushed history
update_fsp_chart(
chart,
chart._flows[array_key],
shm,
name,
array_key=array_key,
)
@ -446,11 +441,8 @@ class FspAdmin:
async with stream.subscribe() as stream:
async for msg in stream:
if msg == 'update':
# if the chart isn't hidden try to update
# the data on screen.
if not self.linked.isHidden():
log.info(f'Re-syncing graphics for fsp: {ns_path}')
self.linked.graphics_cycle()
log.info(f'Re-syncing graphics for fsp: {ns_path}')
self.linked.graphics_cycle()
else:
log.info(f'recved unexpected fsp engine msg: {msg}')
@ -639,7 +631,6 @@ async def open_vlm_displays(
# the curve item internals are pretty convoluted.
style='step',
)
chart._flows['volume'].shm = ohlcv
# force 0 to always be in view
def maxmin(
@ -803,16 +794,13 @@ async def open_vlm_displays(
color=color,
step_mode=step_mode,
style=style,
pi=pi,
)
# TODO: we need a better API to do this..
# specially store ref to shm for lookup in display loop
# since only a placeholder of `None` is entered in
# ``.draw_curve()``.
flow = chart._flows[name]
assert flow.plot is pi
flow.shm = shm
chart._flows[name].shm = shm
chart_curves(
fields,
@ -847,9 +835,6 @@ async def open_vlm_displays(
# liquidity events (well at least on low OHLC periods - 1s).
vlm_curve.hide()
chart.removeItem(vlm_curve)
chart._flows.pop('volume')
# avoid range sorting on volume once disabled
chart.view.disable_auto_yrange()
# Trade rate overlay
# XXX: requires an additional overlay for
@ -890,10 +875,7 @@ async def open_vlm_displays(
style='dash',
)
for pi in (
dvlm_pi,
tr_pi,
):
for pi in (dvlm_pi, tr_pi):
for name, axis_info in pi.axes.items():
# lol this sux XD
axis = axis_info['item']

View File

@ -393,11 +393,6 @@ class ChartView(ViewBox):
def start_ic(
self,
) -> None:
'''
Signal the beginning of a click-drag interaction
to any interested task waiters.
'''
if self._ic is None:
self.chart.pause_all_feeds()
self._ic = trio.Event()
@ -405,13 +400,13 @@ class ChartView(ViewBox):
def signal_ic(
self,
*args,
# ev = None,
) -> None:
'''
Signal the end of a click-drag interaction
to any waiters.
if args:
print(f'range change dun: {args}')
else:
print('proxy called')
'''
if self._ic:
self._ic.set()
self._ic = None
@ -679,6 +674,7 @@ class ChartView(ViewBox):
self.sigRangeChangedManually.emit(self.state['mouseEnabled'])
if ev.isFinish():
print('DRAG FINISH')
self.signal_ic()
# self._ic.set()
# self._ic = None
@ -784,32 +780,27 @@ class ChartView(ViewBox):
# TODO: maybe should be a method on the
# chart widget/item?
# if False:
# if autoscale_linked_plots:
# # avoid recursion by sibling plots
# linked = self.linkedsplits
# plots = list(linked.subplots.copy().values())
# main = linked.chart
# if main:
# plots.append(main)
if autoscale_linked_plots:
# avoid recursion by sibling plots
linked = self.linkedsplits
plots = list(linked.subplots.copy().values())
main = linked.chart
if main:
plots.append(main)
# for chart in plots:
# if chart and not chart._static_yrange:
# chart.cv._set_yrange(
# bars_range=br,
# autoscale_linked_plots=False,
# )
# profiler('autoscaled linked plots')
for chart in plots:
if chart and not chart._static_yrange:
chart.cv._set_yrange(
bars_range=br,
autoscale_linked_plots=False,
)
profiler('autoscaled linked plots')
if set_range:
if not yrange:
# XXX: only compute the mxmn range
# if none is provided as input!
yrange = self._maxmin()
if yrange is None:
log.warning(f'No yrange provided for {self.name}!?')
return
yrange = self._maxmin()
if yrange is None:
return
ylow, yhigh = yrange
@ -843,17 +834,8 @@ class ChartView(ViewBox):
if src_vb is None:
src_vb = self
# such that when a linked chart changes its range
# this local view is also automatically changed and
# resized to data.
src_vb.sigXRangeChanged.connect(self._set_yrange)
# splitter(s) resizing
src_vb.sigResized.connect(self._set_yrange)
# mouse wheel doesn't emit XRangeChanged
src_vb.sigRangeChangedManually.connect(self._set_yrange)
# TODO: a smarter way to avoid calling this needlessly?
# 2 things i can think of:
# - register downsample-able graphics specially and only
@ -864,24 +846,17 @@ class ChartView(ViewBox):
self.maybe_downsample_graphics
)
# mouse wheel doesn't emit XRangeChanged
src_vb.sigRangeChangedManually.connect(self._set_yrange)
# splitter(s) resizing
src_vb.sigResized.connect(self._set_yrange)
def disable_auto_yrange(
self,
) -> None:
# self._chart._static_yrange = 'axis'
self.sigXRangeChanged.disconnect(
self._set_yrange,
)
self.sigResized.disconnect(
self._set_yrange,
)
self.sigRangeChangedManually.disconnect(
self.maybe_downsample_graphics
)
self.sigRangeChangedManually.disconnect(
self._set_yrange,
)
self._chart._static_yrange = 'axis'
def x_uppx(self) -> float:
'''
@ -927,8 +902,8 @@ class ChartView(ViewBox):
linked = self.linkedsplits
plots = linked.subplots | {chart.name: chart}
for chart_name, chart in plots.items():
for name, flow in chart._flows.items():
graphics = flow.graphics
for name, graphics in chart._graphics.items():
# print(f'maybe ds chart:{name} graphic:{name}')
use_vr = False
if isinstance(graphics, BarItems):