Compare commits

..

8 Commits

Author SHA1 Message Date
Tyler Goodlet cc50932c4f TOQUASH: drop display loop old .update_ohlc_.. 2022-04-03 23:36:30 -04:00
Tyler Goodlet c62d3dd82c Add profiling to xrange update loop 2022-04-03 23:35:53 -04:00
Tyler Goodlet 7d664c55ff Drop old `pyqtgraph` downsample code 2022-04-03 23:35:32 -04:00
Tyler Goodlet 024d3661a0 Port to new `.update_graphics_from_array()`, pause quote updates on chart interaction 2022-04-03 23:34:55 -04:00
Tyler Goodlet 9befc1fb1a Toy with caching ds data, probably will revert.. 2022-04-03 23:30:10 -04:00
Tyler Goodlet 54a1397d2c If only drawing bars in view we can wait longer to ds 2022-04-03 23:29:04 -04:00
Tyler Goodlet 08d7f925b9 Establish stream before `fsp_compute` so that backfill updates work again.. 2022-04-03 23:28:30 -04:00
Tyler Goodlet 25891c6e51 WIP only-in-view paths 2022-04-03 18:00:04 -04:00
8 changed files with 962 additions and 541 deletions

View File

@ -76,7 +76,6 @@ async def filter_quotes_by_sym(
async def fsp_compute(
ctx: tractor.Context,
symbol: Symbol,
feed: Feed,
quote_stream: trio.abc.ReceiveChannel,
@ -86,7 +85,7 @@ async def fsp_compute(
func: Callable,
attach_stream: bool = False,
# attach_stream: bool = False,
task_status: TaskStatus[None] = trio.TASK_STATUS_IGNORED,
) -> None:
@ -193,27 +192,22 @@ async def fsp_compute(
profiler(f'{func_name} pushed history')
profiler.finish()
# TODO: UGH, what is the right way to do something like this?
if not ctx._started_called:
await ctx.started(index)
# setup a respawn handle
with trio.CancelScope() as cs:
# TODO: might be better to just make a "restart" method where
# the target task is spawned implicitly and then the event is
# set via some higher level api? At that poing we might as well
# be writing a one-cancels-one nursery though right?
tracker = TaskTracker(trio.Event(), cs)
task_status.started((tracker, index))
profiler(f'{func_name} yield last index')
# import time
# last = time.time()
try:
# rt stream
async with ctx.open_stream() as stream:
# always trigger UI refresh after history update,
# see ``piker.ui._fsp.FspAdmin.open_chain()`` and
# ``piker.ui._display.trigger_update()``.
await stream.send('update')
async for processed in out_stream:
@ -225,8 +219,14 @@ async def fsp_compute(
# NOTE: for now we aren't streaming this to the consumer
# stream latest array index entry which basically just acts
# as trigger msg to tell the consumer to read from shm
if attach_stream:
await stream.send(index)
# TODO: further this should likely be implemented much
# like our `Feed` api where there is one background
# "service" task which computes output and then sends to
# N-consumers who subscribe for the real-time output,
# which we'll likely want to implement using local-mem
# chans for the fan out?
# if attach_stream:
# await client_stream.send(index)
# period = time.time() - last
# hz = 1/period if period else float('nan')
@ -323,7 +323,6 @@ async def cascade(
fsp_target = partial(
fsp_compute,
ctx=ctx,
symbol=symbol,
feed=feed,
quote_stream=quote_stream,
@ -332,7 +331,7 @@ async def cascade(
src=src,
dst=dst,
# func_name=func_name,
# target
func=func
)
@ -344,13 +343,34 @@ async def cascade(
profiler(f'{func_name}: fsp up')
async def resync(tracker: TaskTracker) -> tuple[TaskTracker, int]:
# sync client
await ctx.started(index)
# XXX: rt stream with client which we MUST
# open here (and keep it open) in order to make
# incremental "updates" as history prepends take
# place.
async with ctx.open_stream() as client_stream:
# TODO: these likely should all become
# methods of this ``TaskLifetime`` or wtv
# abstraction..
async def resync(
tracker: TaskTracker,
) -> tuple[TaskTracker, int]:
# TODO: adopt an incremental update engine/approach
# where possible here eventually!
log.warning(f're-syncing fsp {func_name} to source')
tracker.cs.cancel()
await tracker.complete.wait()
return await n.start(fsp_target)
tracker, index = await n.start(fsp_target)
# always trigger UI refresh after history update,
# see ``piker.ui._fsp.FspAdmin.open_chain()`` and
# ``piker.ui._display.trigger_update()``.
await client_stream.send('update')
return tracker, index
def is_synced(
src: ShmArray,
@ -397,7 +417,9 @@ async def cascade(
# Increment the underlying shared memory buffer on every
# "increment" msg received from the underlying data feed.
async with feed.index_stream(int(delay_s)) as istream:
async with feed.index_stream(
int(delay_s)
) as istream:
profiler(f'{func_name}: sample stream up')
profiler.finish()

View File

@ -838,8 +838,12 @@ class ChartPlotWidget(pg.PlotWidget):
'''
l, r = self.view_range()
array = self._arrays[self.name]
lbar = max(l, array[0]['index'])
rbar = min(r, array[-1]['index'])
start, stop = self._xrange = (
array[0]['index'],
array[-1]['index'],
)
lbar = max(l, start)
rbar = min(r, stop)
return l, lbar, rbar, r
def curve_width_pxs(
@ -907,7 +911,7 @@ class ChartPlotWidget(pg.PlotWidget):
return
xfirst, xlast = index[0], index[-1]
brange = l, lbar, rbar, r = self.bars_range()
l, lbar, rbar, r = self.bars_range()
marker_pos, l1_len = self.pre_l1_xs()
end = xlast + l1_len + 1
@ -986,7 +990,8 @@ class ChartPlotWidget(pg.PlotWidget):
graphics = BarItems(
self.linked,
self.plotItem,
pen_color=self.pen_color
pen_color=self.pen_color,
name=name,
)
# adds all bar/candle graphics objects for each data point in
@ -1175,11 +1180,34 @@ class ChartPlotWidget(pg.PlotWidget):
)
return last
def update_ohlc_from_array(
self,
# def update_ohlc_from_array(
# self,
# graphics_name: str,
# array: np.ndarray,
# **kwargs,
# ) -> pg.GraphicsObject:
# '''
# Update the named internal graphics from ``array``.
# '''
# self._index = array['index'][0]
# self._arrays[self.name] = array
# graphics = self._graphics[graphics_name]
# graphics.update_from_array(array, **kwargs)
# return graphics
# def update_curve_from_array(
def update_graphics_from_array(
self,
graphics_name: str,
array: np.ndarray,
array: Optional[np.ndarray] = None,
array_key: Optional[str] = None,
**kwargs,
) -> pg.GraphicsObject:
@ -1187,49 +1215,64 @@ class ChartPlotWidget(pg.PlotWidget):
Update the named internal graphics from ``array``.
'''
self._arrays[self.name] = array
if array is not None:
assert len(array)
data_key = array_key or graphics_name
if graphics_name not in self._flows:
data_key = self.name
if array is not None:
# write array to internal graphics table
self._arrays[data_key] = array
else:
array = self._arrays[data_key]
# array key and graphics "name" might be different..
graphics = self._graphics[graphics_name]
graphics.update_from_array(array, **kwargs)
# compute "in-view" indices
l, lbar, rbar, r = self.bars_range()
indexes = array['index']
ifirst = indexes[0]
ilast = indexes[-1]
lbar_i = max(l, ifirst) - ifirst
rbar_i = min(r, ilast) - ifirst
in_view = array[lbar_i: rbar_i]
if not in_view.size:
return graphics
def update_curve_from_array(
self,
# TODO: we could do it this way as well no?
# to_draw = array[lbar - ifirst:(rbar - ifirst) + 1]
# start_index = self._index
# lbar = max(l, start_index) - start_index
# rbar = min(r, ohlc[-1]['index']) - start_index
if isinstance(graphics, BarItems):
graphics.update_from_array(
array,
in_view,
view_range=(lbar_i, rbar_i),
graphics_name: str,
array: np.ndarray,
array_key: Optional[str] = None,
**kwargs,
)
) -> pg.GraphicsObject:
'''
Update the named internal graphics from ``array``.
'''
assert len(array)
data_key = array_key or graphics_name
if graphics_name not in self._flows:
self._arrays[self.name] = array
else:
self._arrays[data_key] = array
curve = self._graphics[graphics_name]
# NOTE: back when we weren't implementing the curve graphics
# ourselves you'd have updates using this method:
# curve.setData(y=array[graphics_name], x=array['index'], **kwargs)
# NOTE: graphics **must** implement a diff based update
# operation where an internal ``FastUpdateCurve._xrange`` is
# used to determine if the underlying path needs to be
# pre/ap-pended.
curve.update_from_array(
graphics.update_from_array(
x=array['index'],
y=array[data_key],
x_iv=in_view['index'],
y_iv=in_view[data_key],
view_range=(lbar_i, rbar_i),
**kwargs
)
return curve
return graphics
# def _label_h(self, yhigh: float, ylow: float) -> float:
# # compute contents label "height" in view terms
@ -1260,6 +1303,9 @@ class ChartPlotWidget(pg.PlotWidget):
# print(f"bounds (ylow, yhigh): {(ylow, yhigh)}")
# TODO: pretty sure we can just call the cursor
# directly not? i don't wee why we need special "signal proxies"
# for this lul..
def enterEvent(self, ev): # noqa
# pg.PlotWidget.enterEvent(self, ev)
self.sig_mouse_enter.emit(self)

View File

@ -106,53 +106,6 @@ def trace_hl(
return out
def downsample(
x: np.ndarray,
y: np.ndarray,
bins: int = 2,
method: str = 'peak',
**kwargs,
) -> tuple[np.ndarray, np.ndarray]:
'''
Downsample x/y data for lesser curve graphics gen.
The "peak" method is originally copied verbatim from
``pyqtgraph.PlotDataItem.getDisplayDataset()`` which gets
all credit, though we will likely drop this in favor of the M4
algo below.
'''
# py3.10 syntax
match method:
case 'peak':
if bins < 2:
log.warning('No downsampling taking place?')
ds = bins
n = len(x) // ds
x1 = np.empty((n, 2))
# start of x-values; try to select a somewhat centered point
stx = ds // 2
x1[:] = x[stx:stx+n*ds:ds, np.newaxis]
x = x1.reshape(n*2)
y1 = np.empty((n, 2))
y2 = y[:n*ds].reshape((n, ds))
y1[:, 0] = y2.max(axis=1)
y1[:, 1] = y2.min(axis=1)
y = y1.reshape(n*2)
return x, y
case 'm4':
return ds_m4(x, y, kwargs['px_width'])
def ohlc_flatten(
ohlc: np.ndarray,
use_mxmn: bool = False,

View File

@ -144,6 +144,8 @@ class FastAppendCurve(pg.GraphicsObject):
self.use_fpath = use_fpath
self.fast_path: Optional[QtGui.QPainterPath] = None
self._ds_cache: dict = {}
# TODO: we can probably just dispense with the parent since
# we're basically only using the pen setting now...
super().__init__(*args, **kwargs)
@ -214,52 +216,61 @@ class FastAppendCurve(pg.GraphicsObject):
vr = self.viewRect()
l, r = int(vr.left()), int(vr.right())
if not self._xrange:
return 0
start, stop = self._xrange
lbar = max(l, start)
rbar = min(r, stop)
return vb.mapViewToDevice(
return round(vb.mapViewToDevice(
QLineF(lbar, 0, rbar, 0)
).length()
).length())
def should_ds_or_redraw(
self,
# def should_ds_or_redraw(
# self,
) -> tuple[bool, bool]:
# ) -> tuple[bool, bool]:
uppx = self.x_uppx()
px_width = self.px_width()
# uppx_diff = abs(uppx - self._last_uppx)
uppx_diff = (uppx - self._last_uppx)
self._last_uppx = uppx
# uppx = self.x_uppx()
# px_width = self.px_width()
# if not px_width:
# return False, False
should_redraw: bool = False
should_ds: bool = False
# # uppx_diff = abs(uppx - self._last_uppx)
# uppx_diff = (uppx - self._last_uppx)
# self._last_uppx = uppx
# print(uppx_diff)
# should_redraw: bool = False
# should_ds: bool = self._in_ds
if (
uppx <= 8
):
# trigger redraw or original non-downsampled data
if self._in_ds:
print('REVERTING BACK TO SRC DATA')
# clear downsampled curve(s) and expect
# refresh of path segments.
should_redraw = True
# # print(uppx_diff)
elif (
uppx_diff >= 4
or uppx_diff <= -2
or self._step_mode and abs(uppx_diff) >= 1
):
log.info(
f'{self._name} downsampler change: {self._last_uppx} -> {uppx}'
)
should_ds = {'px_width': px_width, 'uppx': uppx}
should_redraw = True
# if (
# uppx <= 8
# ):
# # trigger redraw or original non-downsampled data
# if self._in_ds:
# print('REVERTING BACK TO SRC DATA')
# # clear downsampled curve(s) and expect
# # refresh of path segments.
# should_redraw = True
return should_ds, should_redraw
# elif (
# uppx_diff >= 1
# or uppx_diff <= -1
# or self._step_mode and abs(uppx_diff) >= 1
# ):
# log.info(
# f'{self._name} downsampler change: {self._last_uppx} -> {uppx}'
# )
# should_ds = {'px_width': px_width, 'uppx': uppx}
# should_redraw = True
# if should_ds:
# should_ds = {'px_width': px_width, 'uppx': uppx}
# return should_ds, should_redraw
def downsample(
self,
@ -286,7 +297,7 @@ class FastAppendCurve(pg.GraphicsObject):
y = y.flatten()
# presumably?
self._in_ds = True
# self._in_ds = True
return x, y
def maybe_downsample(
@ -303,9 +314,17 @@ class FastAppendCurve(pg.GraphicsObject):
def update_from_array(
self,
# full array input history
x: np.ndarray,
y: np.ndarray,
# pre-sliced array data that's "in view"
x_iv: np.ndarray,
y_iv: np.ndarray,
view_range: Optional[tuple[int, int]] = None,
) -> QtGui.QPainterPath:
'''
Update curve from input 2-d data.
@ -315,73 +334,186 @@ class FastAppendCurve(pg.GraphicsObject):
'''
profiler = pg.debug.Profiler(
msg=f'{self._name}.update_from_array()',
msg=f'FastAppendCurve.update_from_array(): `{self._name}`',
disabled=not pg_profile_enabled(),
gt=ms_slower_then,
)
flip_cache = False
draw_full_path = True
if self._xrange:
istart, istop = self._xrange
else:
self._xrange = istart, istop = x[0], x[-1]
# print(f"xrange: {self._xrange}")
# XXX: lol brutal, the internals of `CurvePoint` (inherited by
# our `LineDot`) required ``.getData()`` to work..
self.xData = x
self.yData = y
should_ds, should_redraw = self.should_ds_or_redraw()
# update internal array refs
self._x, self._y = x, y
# compute the length diffs between the first/last index entry in
# the input data and the last indexes we have on record from the
# last time we updated the curve index.
if self._xrange:
istart, istop = self._xrange
else:
self._xrange = istart, istop = x[0], x[-1]
prepend_length = int(istart - x[0])
append_length = int(x[-1] - istop)
no_path_yet = self.path is None
if (
should_redraw or should_ds
or self.path is None
or prepend_length > 0
):
# print(f"xrange: {self._xrange}")
if view_range:
li, ri = view_range
# x, y = x[lbar:rbar], y[lbar:rbar]
# x, y = x_iv, y_iv
profiler(f'view range slice {view_range}')
# if self._name == 'OHLC':
# print(f'view range slice {view_range}')
# ds state checking
uppx = self.x_uppx()
px_width = self.px_width()
uppx_diff = (uppx - self._last_uppx)
# step mode: draw flat top discrete "step"
# over the index space for each datum.
if self._step_mode:
x_out, y_out = step_path_arrays_from_1d(
x[:-1], y[:-1]
# TODO: numba this bish
# x_out, y_out = step_path_arrays_from_1d(
# x[:-1], y[:-1]
# )
x_iv_out, y_iv_out = step_path_arrays_from_1d(
x_iv[:-1], y_iv[:-1]
)
profiler('generated step arrays')
else:
# by default we only pull data up to the last (current) index
x_out, y_out = x[:-1], y[:-1]
# x_out, y_out = x[:-1], y[:-1]
x_iv_out, y_iv_out = x_iv[:-1], y_iv[:-1]
profiler('sliced array history')
if should_ds:
x_out, y_out = self.downsample(
x_out,
y_out,
**should_ds,
# by default plan to draw the source ouput that's "in view"
x_to_path, y_to_path = x_iv_out, y_iv_out
ds_key = px_width, uppx
# always re-ds if we were dsed but the input range changes.
if self._in_ds:
# slice out the portion of the downsampled data that is
# "in view" and **only** draw a path for that.
entry = self._ds_cache.get(ds_key)
if entry:
x_ds_out, y_ds_out, first_i, last_i = entry
# if last_i == x[-1]:
log.info(
f'{self._name} has cached ds {ds_key} -> {entry}'
)
profiler(f'path downsample redraw={should_ds}')
prepend_length = int(first_i - ri)
append_length = int(ri - last_i)
# x_to_path = x_ds_out
# y_to_path = y_ds_out
# else:
# log.warn(f'{self._name} ds updates unhandled!')
# DS only the new part?
# check for downsampling conditions
if (
# std m4 downsample conditions
uppx_diff >= 4
or uppx_diff <= -2
or self._step_mode and abs(uppx_diff) >= 2
# or self._in_ds and px_width > 1
):
# if not uppx_diff >= 1:
log.info(
f'{self._name} sampler change: {self._last_uppx} -> {uppx}'
)
self._last_uppx = uppx
# should_ds = {'px_width': px_width, 'uppx': uppx}
# if self._step_mode:
# # TODO: numba this bish
# x_out, y_out = step_path_arrays_from_1d(
# x_iv[:-1], y_iv[:-1]
# )
# else:
# # by default we only pull data up to the last (current) index
# x_out, y_out = x_iv[:-1], y_iv[:-1]
x_ds_out, y_ds_out = self.downsample(
x_iv_out,
y_iv_out,
px_width=px_width,
uppx=uppx,
)
profiler(
f'path downsample ds_key={ds_key}\n'
f'{x_iv_out.size}, {y_iv_out.size}'
)
# cache downsampled outputs
self._ds_cache[ds_key] = (
x_ds_out,
y_ds_out,
x[0],
x[-1],
)
x_to_path = x_ds_out
y_to_path = y_ds_out
self._in_ds = True
if should_redraw:
profiler('path reversion to non-ds')
elif (
uppx <= 8
and self._in_ds
):
# we should de-downsample back to our original
# source data so we clear our path data in prep
# to generate a new one from original source data.
if self.path:
self.path.clear()
if self.fast_path:
self.fast_path.clear()
if should_redraw and not should_ds:
log.info(f'DEDOWN -> {self._name}')
profiler('path reversion to non-ds data')
self._in_ds = False
# else:
# render path graphics
# log.info(
# # f'{self._name}: last sizes {x_to_path.size}, {y_to_path.size}',
# f'{self._name}: sizes {x_to_path.size}, {y_to_path.size}',
# )
self._last_topaths = x_to_path, y_to_path
no_path_yet = self.path is None
if draw_full_path:
self.path = pg.functions.arrayToQPath(
x_out,
y_out,
x_to_path,
y_to_path,
connect='all',
finiteCheck=False,
path=self.path,
)
profiler('generated FULL PATH -> {self._name}')
# reserve mem allocs see:
# - https://doc.qt.io/qt-5/qpainterpath.html#reserve
# - https://doc.qt.io/qt-5/qpainterpath.html#capacity
@ -392,10 +524,7 @@ class FastAppendCurve(pg.GraphicsObject):
if no_path_yet:
self.path.reserve(int(500e3))
profiler('generated fresh path')
# if self._step_mode:
# self.path.closeSubpath()
self._last_vr = view_range
# TODO: get this piecewise prepend working - right now it's
# giving heck on vwap...
@ -414,65 +543,65 @@ class FastAppendCurve(pg.GraphicsObject):
# # self.path.moveTo(new_x[0], new_y[0])
# self.path.connectPath(old_path)
elif (
append_length > 0
):
if self._step_mode:
new_x, new_y = step_path_arrays_from_1d(
x[-append_length - 2:-1],
y[-append_length - 2:-1],
)
# [1:] since we don't need the vertical line normally at
# the beginning of the step curve taking the first (x,
# y) poing down to the x-axis **because** this is an
# appended path graphic.
new_x = new_x[1:]
new_y = new_y[1:]
# elif (
# append_length > 0
# ):
# if self._step_mode:
# new_x, new_y = step_path_arrays_from_1d(
# x[-append_length - 2:-1],
# y[-append_length - 2:-1],
# )
# # [1:] since we don't need the vertical line normally at
# # the beginning of the step curve taking the first (x,
# # y) poing down to the x-axis **because** this is an
# # appended path graphic.
# new_x = new_x[1:]
# new_y = new_y[1:]
else:
# print(f"append_length: {append_length}")
new_x = x[-append_length - 2:-1]
new_y = y[-append_length - 2:-1]
# print((new_x, new_y))
# else:
# # print(f"append_length: {append_length}")
# new_x = x[-append_length - 2:-1]
# new_y = y[-append_length - 2:-1]
# # print((new_x, new_y))
profiler('diffed append arrays')
# profiler('diffed append arrays')
if should_ds:
new_x, new_y = self.downsample(
new_x,
new_y,
**should_ds,
)
profiler(f'fast path downsample redraw={should_ds}')
# if should_ds:
# new_x, new_y = self.downsample(
# new_x,
# new_y,
# **should_ds,
# )
# profiler(f'fast path downsample redraw={should_ds}')
append_path = pg.functions.arrayToQPath(
new_x,
new_y,
connect='all',
finiteCheck=False,
path=self.fast_path,
)
# append_path = pg.functions.arrayToQPath(
# new_x,
# new_y,
# connect='all',
# finiteCheck=False,
# path=self.fast_path,
# )
if self.use_fpath:
# an attempt at trying to make append-updates faster..
if self.fast_path is None:
self.fast_path = append_path
self.fast_path.reserve(int(6e3))
else:
self.fast_path.connectPath(append_path)
size = self.fast_path.capacity()
profiler(f'connected fast path w size: {size}')
# if self.use_fpath:
# # an attempt at trying to make append-updates faster..
# if self.fast_path is None:
# self.fast_path = append_path
# self.fast_path.reserve(int(6e3))
# else:
# self.fast_path.connectPath(append_path)
# size = self.fast_path.capacity()
# profiler(f'connected fast path w size: {size}')
# print(f"append_path br: {append_path.boundingRect()}")
# self.path.moveTo(new_x[0], new_y[0])
# path.connectPath(append_path)
# # print(f"append_path br: {append_path.boundingRect()}")
# # self.path.moveTo(new_x[0], new_y[0])
# # path.connectPath(append_path)
# XXX: lol this causes a hang..
# self.path = self.path.simplified()
else:
size = self.path.capacity()
profiler(f'connected history path w size: {size}')
self.path.connectPath(append_path)
# # XXX: lol this causes a hang..
# # self.path = self.path.simplified()
# else:
# size = self.path.capacity()
# profiler(f'connected history path w size: {size}')
# self.path.connectPath(append_path)
# other merging ideas:
# https://stackoverflow.com/questions/8936225/how-to-merge-qpainterpaths
@ -497,12 +626,7 @@ class FastAppendCurve(pg.GraphicsObject):
# self.disable_cache()
# flip_cache = True
# XXX: lol brutal, the internals of `CurvePoint` (inherited by
# our `LineDot`) required ``.getData()`` to work..
self.xData = x
self.yData = y
x0, x_last = self._xrange = x[0], x[-1]
x_last = x[-1]
y_last = y[-1]
# draw the "current" step graphic segment so it lines up with
@ -540,8 +664,6 @@ class FastAppendCurve(pg.GraphicsObject):
# XXX: seems to be needed to avoid artifacts (see above).
self.setCacheMode(QGraphicsItem.DeviceCoordinateCache)
self._x, self._y = x, y
# XXX: lol brutal, the internals of `CurvePoint` (inherited by
# our `LineDot`) required ``.getData()`` to work..
def getData(self):
@ -636,6 +758,7 @@ class FastAppendCurve(pg.GraphicsObject):
profiler = pg.debug.Profiler(
msg=f'FastAppendCurve.paint(): `{self._name}`',
disabled=not pg_profile_enabled(),
# disabled=True,
gt=ms_slower_then,
)

View File

@ -53,6 +53,10 @@ from ._forms import (
mk_order_pane_layout,
)
from .order_mode import open_order_mode
# from .._profile import (
# pg_profile_enabled,
# ms_slower_then,
# )
from ..log import get_logger
log = get_logger(__name__)
@ -284,6 +288,12 @@ async def graphics_update_loop(
chart.pause_all_feeds()
continue
ic = chart.view._ic
if ic:
chart.pause_all_feeds()
await ic.wait()
chart.resume_all_feeds()
# sync call to update all graphics/UX components.
graphics_update_cycle(ds)
@ -299,8 +309,9 @@ def graphics_update_cycle(
profiler = pg.debug.Profiler(
disabled=True, # not pg_profile_enabled(),
delayed=False,
gt=1/12 * 1e3,
)
# unpack multi-referenced components
chart = ds.chart
vlm_chart = ds.vlm_chart
@ -409,9 +420,11 @@ def graphics_update_cycle(
if (
# if zoomed out alot don't update the last "bar"
(xpx < update_uppx or i_diff > 0)
# and r >= i_step
and r >= i_step
):
vlm_chart.update_curve_from_array('volume', array)
# TODO: make it so this doesn't have to be called
# once the $vlm is up?
vlm_chart.update_graphics_from_array('volume', array)
if (
mx_vlm_in_view != vars['last_mx_vlm']
@ -485,7 +498,7 @@ def graphics_update_cycle(
xpx < update_uppx
or i_diff > 0
):
chart.update_ohlc_from_array(
chart.update_graphics_from_array(
chart.name,
array,
)
@ -524,7 +537,7 @@ def graphics_update_cycle(
if wap_in_history:
# update vwap overlay line
chart.update_curve_from_array(
chart.update_graphics_from_array(
'bar_wap',
array,
)

View File

@ -89,7 +89,7 @@ def update_fsp_chart(
# update graphics
# NOTE: this does a length check internally which allows it
# staying above the last row check below..
chart.update_curve_from_array(
chart.update_graphics_from_array(
graphics_name,
array,
array_key=array_key or graphics_name,
@ -425,6 +425,7 @@ class FspAdmin:
) as (ctx, last_index),
ctx.open_stream() as stream,
):
# register output data
self._registry[
(fqsn, ns_path)
@ -440,6 +441,7 @@ class FspAdmin:
async with stream.subscribe() as stream:
async for msg in stream:
if msg == 'update':
log.info(f'Re-syncing graphics for fsp: {ns_path}')
self.linked.graphics_cycle()
else:
log.info(f'recved unexpected fsp engine msg: {msg}')
@ -674,7 +676,7 @@ async def open_vlm_displays(
last_val_sticky.update_from_data(-1, value)
vlm_curve = chart.update_curve_from_array(
vlm_curve = chart.update_graphics_from_array(
'volume',
shm.array,
)

View File

@ -20,6 +20,7 @@ Chart view box primitives
"""
from __future__ import annotations
from contextlib import asynccontextmanager
# import itertools
import time
from typing import Optional, Callable
@ -36,7 +37,8 @@ from ..log import get_logger
from ._style import _min_points_to_show
from ._editors import SelectRect
from . import _event
from ._ohlc import BarItems
from .._profile import pg_profile_enabled, ms_slower_then
# from ._ohlc import BarItems
log = get_logger(__name__)
@ -319,6 +321,7 @@ async def handle_viewmode_mouse(
):
# when in order mode, submit execution
# msg.event.accept()
# breakpoint()
view.order_mode.submit_order()
@ -384,6 +387,29 @@ class ChartView(ViewBox):
self.order_mode: bool = False
self.setFocusPolicy(QtCore.Qt.StrongFocus)
self._ic = None
def start_ic(
self,
) -> None:
if self._ic is None:
self.chart.pause_all_feeds()
self._ic = trio.Event()
def signal_ic(
self,
*args,
# ev = None,
) -> None:
if args:
print(f'range change dun: {args}')
else:
print('proxy called')
if self._ic:
self._ic.set()
self._ic = None
self.chart.resume_all_feeds()
@asynccontextmanager
async def open_async_input_handler(
@ -429,11 +455,6 @@ class ChartView(ViewBox):
def maxmin(self, callback: Callable) -> None:
self._maxmin = callback
def maybe_downsample_graphics(self):
for graphic in self._chart._graphics.values():
if isinstance(graphic, BarItems):
graphic.maybe_paint_line()
def wheelEvent(
self,
ev,
@ -542,6 +563,11 @@ class ChartView(ViewBox):
self._resetTarget()
self.scaleBy(s, focal)
self.sigRangeChangedManually.emit(mask)
# self._ic.set()
# self._ic = None
# self.chart.resume_all_feeds()
ev.accept()
def mouseDragEvent(
@ -624,6 +650,11 @@ class ChartView(ViewBox):
# XXX: WHY
ev.accept()
self.start_ic()
# if self._ic is None:
# self.chart.pause_all_feeds()
# self._ic = trio.Event()
if axis == 1:
self.chart._static_yrange = 'axis'
@ -641,6 +672,13 @@ class ChartView(ViewBox):
self.sigRangeChangedManually.emit(self.state['mouseEnabled'])
if ev.isFinish():
print('DRAG FINISH')
self.signal_ic()
# self._ic.set()
# self._ic = None
# self.chart.resume_all_feeds()
# WEIRD "RIGHT-CLICK CENTER ZOOM" MODE
elif button & QtCore.Qt.RightButton:
@ -788,11 +826,13 @@ class ChartView(ViewBox):
# iterate those.
# - only register this when certain downsampleable graphics are
# "added to scene".
vb.sigRangeChangedManually.connect(vb.maybe_downsample_graphics)
vb.sigXRangeChanged.connect(vb.maybe_downsample_graphics)
# mouse wheel doesn't emit XRangeChanged
vb.sigRangeChangedManually.connect(vb._set_yrange)
vb.sigResized.connect(vb._set_yrange) # splitter(s) resizing
# splitter(s) resizing
vb.sigResized.connect(vb._set_yrange)
def disable_auto_yrange(
self,
@ -808,10 +848,33 @@ class ChartView(ViewBox):
'''
for graphic in self._chart._graphics.values():
# if isinstance(graphic, BarItems):
xpx = graphic.pixelVectors()[0].x()
xvec = graphic.pixelVectors()[0]
if xvec:
xpx = xvec.x()
if xpx:
return xpx
else:
continue
return 1.0
def maybe_downsample_graphics(self):
# TODO: a faster single-loop-iterator way of doing this XD
chart = self._chart
# graphics = list(self._chart._graphics.values())
profiler = pg.debug.Profiler(
msg=f'FastAppendCurve.update_from_array(): `{chart.name}`',
disabled=not pg_profile_enabled(),
gt=ms_slower_then,
)
for name, graphics in chart._graphics.items():
# pass in no array which will read and render from the last
# passed array (normally provided by the display loop.)
chart.update_graphics_from_array(name)
profiler(f'updating {name}')
# for graphic in graphics:
# ds_meth = getattr(graphic, 'maybe_downsample', None)
# if ds_meth:
# ds_meth()

View File

@ -157,23 +157,40 @@ def path_arrays_from_ohlc(
def gen_qpath(
data,
start, # XXX: do we need this?
w,
data: np.ndarray,
start: int, # XXX: do we need this?
w: float,
path: Optional[QtGui.QPainterPath] = None,
) -> QtGui.QPainterPath:
path_was_none = path is None
profiler = pg.debug.Profiler(
msg=f'gen_qpath ohlc',
msg='gen_qpath ohlc',
disabled=not pg_profile_enabled(),
gt=ms_slower_then,
)
x, y, c = path_arrays_from_ohlc(data, start, bar_gap=w)
x, y, c = path_arrays_from_ohlc(
data,
start,
bar_gap=w,
)
profiler("generate stream with numba")
# TODO: numba the internals of this!
path = pg.functions.arrayToQPath(x, y, connect=c)
path = pg.functions.arrayToQPath(
x,
y,
connect=c,
path=path,
)
# avoid mem allocs if possible
if path_was_none:
path.reserve(path.capacity())
profiler("generate path with arrayToQPath")
return path
@ -206,6 +223,7 @@ class BarItems(pg.GraphicsObject):
self._color = pen_color
self.bars_pen = pg.mkPen(hcolor(pen_color), width=1)
self.last_bar_pen = pg.mkPen(hcolor(last_bar_color), width=2)
self._name = name
self._ds_line_xy: Optional[
tuple[np.ndarray, np.ndarray]
@ -226,6 +244,7 @@ class BarItems(pg.GraphicsObject):
self._xrange: tuple[int, int]
self._yrange: tuple[float, float]
self._vrange = None
# TODO: don't render the full backing array each time
# self._path_data = None
@ -254,7 +273,6 @@ class BarItems(pg.GraphicsObject):
'''
hist, last = ohlc[:-1], ohlc[-1]
self.path = gen_qpath(hist, start, self.w)
# save graphics for later reference and keep track
@ -270,27 +288,13 @@ class BarItems(pg.GraphicsObject):
# up to last to avoid double draw of last bar
self._last_bar_lines = bar_from_ohlc_row(last, self.w)
# trigger render
# https://doc.qt.io/qt-5/qgraphicsitem.html#update
self.update()
x, y = self._ds_line_xy = ohlc_flatten(ohlc)
self.update_ds_line(x, y)
self._ds_xrange = (index[0], index[-1])
return self.path
def update_ds_line(
self,
x,
y,
# self.update_ds_line(
# x,
# y,
# )
) -> FastAppendCurve:
# determine current potential downsampling value (based on pixel
# scaling) and return any existing curve for it.
curve = self._ds_line
if not curve:
# TODO: figuring out the most optimial size for the ideal
# curve-path by,
# - calcing the display's max px width `.screen()`
@ -310,25 +314,75 @@ class BarItems(pg.GraphicsObject):
curve.hide()
self._pi.addItem(curve)
self._ds_line = curve
return curve
# TODO: we should be diffing the amount of new data which
# needs to be downsampled. Ideally we actually are just
# doing all the ds-ing in sibling actors so that the data
# can just be read and rendered to graphics on events of our
# choice.
# diff = do_diff(ohlc, new_bit)
self._ds_xrange = (index[0], index[-1])
curve.update_from_array(
y=y,
x=x,
)
return curve
# trigger render
# https://doc.qt.io/qt-5/qgraphicsitem.html#update
self.update()
return self.path
# def update_ds_line(
# self,
# x,
# y,
# ) -> FastAppendCurve:
# # determine current potential downsampling value (based on pixel
# # scaling) and return any existing curve for it.
# curve = self._ds_line
# if not curve:
# # TODO: figuring out the most optimial size for the ideal
# # curve-path by,
# # - calcing the display's max px width `.screen()`
# # - drawing a curve and figuring out it's capacity:
# # https://doc.qt.io/qt-5/qpainterpath.html#capacity
# # - reserving that cap for each curve-mapped-to-shm with
# # - leveraging clearing when needed to redraw the entire
# # curve that does not release mem allocs:
# # https://doc.qt.io/qt-5/qpainterpath.html#clear
# curve = FastAppendCurve(
# y=y,
# x=x,
# name='OHLC',
# color=self._color,
# )
# curve.hide()
# self._pi.addItem(curve)
# self._ds_line = curve
# return curve
# # TODO: we should be diffing the amount of new data which
# # needs to be downsampled. Ideally we actually are just
# # doing all the ds-ing in sibling actors so that the data
# # can just be read and rendered to graphics on events of our
# # choice.
# # diff = do_diff(ohlc, new_bit)
# curve.update_from_array(
# y=y,
# x=x,
# x_iv=x,
# y_iv=y,
# view_range=True, # hack
# )
# return curve
def update_from_array(
self,
# full array input history
ohlc: np.ndarray,
just_history=False,
# pre-sliced array data that's "in view"
ohlc_iv: np.ndarray,
view_range: Optional[tuple[int, int]] = None,
) -> None:
'''
@ -349,6 +403,19 @@ class BarItems(pg.GraphicsObject):
gt=ms_slower_then,
)
# vr = self.viewRect()
# l, r = int(vr.left()), int(vr.right())
# # l, r = self.view_range()
# # array = self._arrays[self.name]
# indexes = ohlc['index']
# start_index = indexes[0]
# end_index = indexes[-1]
# lbar = max(l, start_index) - start_index
# rbar = min(r, end_index) - start_index
# in_view = ohlc[lbar:rbar]
# self._vrange = lbar, rbar
# index = self.start_index
istart, istop = self._xrange
ds_istart, ds_istop = self._ds_xrange
@ -360,11 +427,149 @@ class BarItems(pg.GraphicsObject):
prepend_length = istart - first_index
append_length = last_index - istop
ds_prepend_length = ds_istart - first_index
ds_append_length = last_index - ds_istop
# ds_prepend_length = ds_istart - first_index
# ds_append_length = last_index - ds_istop
flip_cache = False
x_gt = 6
if self._ds_line:
uppx = self._ds_line.x_uppx()
else:
uppx = 0
should_line = self._in_ds
if (
self._in_ds
and uppx < x_gt
):
should_line = False
elif (
not self._in_ds
and uppx >= x_gt
):
should_line = True
# should_ds, should_redraw = self.should_ds_or_redraw()
# print(
# f'OHLC in line: {self._in_ds}'
# f'OHLC should line: {should_line}\n'
# # f'OHLC should_redraw: {should_redraw}\n'
# )
if (
should_line
):
# update the line graphic
# x, y = self._ds_line_xy = ohlc_flatten(ohlc_iv)
# x, y = self._ds_line_xy = ohlc_flatten(ohlc)
x_iv, y_iv = self._ds_line_xy = ohlc_flatten(ohlc_iv)
profiler('flattening bars to line')
curve = self._ds_line
# curve = self.update_ds_line(x, y)
# TODO: we should be diffing the amount of new data which
# needs to be downsampled. Ideally we actually are just
# doing all the ds-ing in sibling actors so that the data
# can just be read and rendered to graphics on events of our
# choice.
# diff = do_diff(ohlc, new_bit)
curve.update_from_array(
y=x_iv,
x=y_iv,
x_iv=x_iv,
y_iv=y_iv,
view_range=view_range, # hack
)
# we already are showing a line and should be
# self._in_ds
# check if the ds line should be resampled/drawn
# should_ds_line, should_redraw_line = self._ds_line.should_ds_or_redraw()
# print(f'OHLC DS should ds: {should_ds_line}, should_redraw: {should_redraw_line}')
# if (
# # line should be redrawn/sampled
# # should_ds_line or
# # we are flipping to line from bars mode
# not self._in_ds
# ):
# uppx = self._ds_line.x_uppx()
# self._xs_in_px = uppx
if not self._in_ds:
# hide bars and show line
self.hide()
# XXX: is this actually any faster?
# self._pi.removeItem(self)
# TODO: a `.ui()` log level?
log.info(
f'downsampling to line graphic {self._name}'
)
# self._pi.addItem(curve)
curve.show()
curve.update()
self._in_ds = True
# stop here since we don't need to update bars path any more
# as we delegate to the downsample line with updates.
return
elif (
not should_line
and self._in_ds
):
# flip back to bars graphics and hide the downsample line.
log.info(f'showing bars graphic {self._name}')
curve = self._ds_line
curve.hide()
# self._pi.removeItem(curve)
# XXX: is this actually any faster?
# self._pi.addItem(self)
self.show()
self._in_ds = False
# if not self._in_ds and should_ds
# self.hide()
# # XXX: is this actually any faster?
# # self._pi.removeItem(self)
# # this should have been done in the block above
# # x, y = self._ds_line_xy = ohlc_flatten(ohlc_iv)
# # curve = self.update_ds_line(x, y)
# # TODO: a `.ui()` log level?
# log.info(
# f'downsampling to line graphic {self._name}'
# )
# # self._pi.addItem(curve)
# curve.show()
# curve.update()
# self._in_ds = True
# return
# self._in_ds = False
# print('YO NOT DS OHLC')
# generate in_view path
self.path = gen_qpath(
ohlc_iv,
0,
self.w,
# path=self.path,
)
# TODO: to make the downsampling faster
# - allow mapping only a range of lines thus only drawing as
# many bars as exactly specified.
@ -372,88 +577,98 @@ class BarItems(pg.GraphicsObject):
# - maybe move all this embedded logic to a higher
# level type?
fx, fy = self._ds_line_xy
# ohlc = in_view
if prepend_length:
# new history was added and we need to render a new path
prepend_bars = ohlc[:prepend_length]
# if prepend_length:
# # new history was added and we need to render a new path
# prepend_bars = ohlc[:prepend_length]
if ds_prepend_length:
ds_prepend_bars = ohlc[:ds_prepend_length]
pre_x, pre_y = ohlc_flatten(ds_prepend_bars)
fx = np.concatenate((pre_x, fx))
fy = np.concatenate((pre_y, fy))
profiler('ds line prepend diff complete')
# if ds_prepend_length:
# ds_prepend_bars = ohlc[:ds_prepend_length]
# pre_x, pre_y = ohlc_flatten(ds_prepend_bars)
# fx = np.concatenate((pre_x, fx))
# fy = np.concatenate((pre_y, fy))
# profiler('ds line prepend diff complete')
if append_length:
# generate new graphics to match provided array
# path appending logic:
# we need to get the previous "current bar(s)" for the time step
# and convert it to a sub-path to append to the historical set
# new_bars = ohlc[istop - 1:istop + append_length - 1]
append_bars = ohlc[-append_length - 1:-1]
# print(f'ohlc bars to append size: {append_bars.size}\n')
# if append_length:
# # generate new graphics to match provided array
# # path appending logic:
# # we need to get the previous "current bar(s)" for the time step
# # and convert it to a sub-path to append to the historical set
# # new_bars = ohlc[istop - 1:istop + append_length - 1]
# append_bars = ohlc[-append_length - 1:-1]
# # print(f'ohlc bars to append size: {append_bars.size}\n')
if ds_append_length:
ds_append_bars = ohlc[-ds_append_length - 1:-1]
post_x, post_y = ohlc_flatten(ds_append_bars)
# print(f'ds curve to append sizes: {(post_x.size, post_y.size)}')
fx = np.concatenate((fx, post_x))
fy = np.concatenate((fy, post_y))
# if ds_append_length:
# ds_append_bars = ohlc[-ds_append_length - 1:-1]
# post_x, post_y = ohlc_flatten(ds_append_bars)
# print(
# f'ds curve to append sizes: {(post_x.size, post_y.size)}'
# )
# fx = np.concatenate((fx, post_x))
# fy = np.concatenate((fy, post_y))
profiler('ds line append diff complete')
# profiler('ds line append diff complete')
profiler('array diffs complete')
# does this work?
last = ohlc[-1]
fy[-1] = last['close']
# fy[-1] = last['close']
# incremental update and cache line datums
self._ds_line_xy = fx, fy
# # incremental update and cache line datums
# self._ds_line_xy = fx, fy
# maybe downsample to line
ds = self.maybe_downsample()
if ds:
# if we downsample to a line don't bother with
# any more path generation / updates
self._ds_xrange = first_index, last_index
profiler('downsampled to line')
return
# ds = self.maybe_downsample()
# if ds:
# # if we downsample to a line don't bother with
# # any more path generation / updates
# self._ds_xrange = first_index, last_index
# profiler('downsampled to line')
# return
# print(in_view.size)
# if self.path:
# self.path = path
# self.path.reserve(path.capacity())
# self.path.swap(path)
# path updates
if prepend_length:
# XXX: SOMETHING IS MAYBE FISHY HERE what with the old_path
# y value not matching the first value from
# ohlc[prepend_length + 1] ???
prepend_path = gen_qpath(prepend_bars, 0, self.w)
old_path = self.path
self.path = prepend_path
self.path.addPath(old_path)
profiler('path PREPEND')
# if prepend_length:
# # XXX: SOMETHING IS MAYBE FISHY HERE what with the old_path
# # y value not matching the first value from
# # ohlc[prepend_length + 1] ???
# prepend_path = gen_qpath(prepend_bars, 0, self.w)
# old_path = self.path
# self.path = prepend_path
# self.path.addPath(old_path)
# profiler('path PREPEND')
if append_length:
append_path = gen_qpath(append_bars, 0, self.w)
# if append_length:
# append_path = gen_qpath(append_bars, 0, self.w)
self.path.moveTo(
float(istop - self.w),
float(append_bars[0]['open'])
)
self.path.addPath(append_path)
# self.path.moveTo(
# float(istop - self.w),
# float(append_bars[0]['open'])
# )
# self.path.addPath(append_path)
profiler('path APPEND')
# profiler('path APPEND')
# fp = self.fast_path
# if fp is None:
# self.fast_path = append_path
# else:
# fp.moveTo(float(istop - self.w), float(new_bars[0]['open']))
# fp.moveTo(
# float(istop - self.w), float(new_bars[0]['open'])
# )
# fp.addPath(append_path)
# self.setCacheMode(QtWidgets.QGraphicsItem.NoCache)
# flip_cache = True
self._xrange = first_index, last_index
# trigger redraw despite caching
@ -559,73 +774,69 @@ class BarItems(pg.GraphicsObject):
)
def maybe_downsample(
self,
x_gt: float = 2.,
# def should_ds_or_redraw(
# self,
# x_gt: float = 2,
) -> bool:
'''
Call this when you want to stop drawing individual
bars and instead use a ``FastAppendCurve`` intepolation
line (normally when the width of a bar (aka 1.0 in the x)
is less then a pixel width on the device).
# ) -> tuple[bool, bool]:
'''
curve = self._ds_line
if not curve:
return False
# curve = self._ds_line
# if not curve:
# return False, False
# this is the ``float`` value of the "number of x units" (in
# view coords) that a pixel spans.
xs_in_px = self._ds_line.x_uppx()
# # this is the ``float`` value of the "number of x units" (in
# # view coords) that a pixel spans.
# uppx = self._ds_line.x_uppx()
# print(f'uppx: {uppx}')
linked = self.linked
# # linked = self.linked
# should_redraw: bool = False
# should_ds: bool = False
if (
self._ds_line_xy is not None
):
curve = self.update_ds_line(
*self._ds_line_xy,
)
# if (
# not self._in_ds
# and uppx >= x_gt
# ):
if (
not self._in_ds
and xs_in_px >= x_gt
):
# TODO: a `.ui()` log level?
log.info(
f'downsampling to line graphic {linked.symbol.key}'
)
self.hide()
# XXX: is this actually any faster?
# self._pi.removeItem(self)
# should_ds = True
# should_redraw = True
self._xs_in_px = xs_in_px
# elif (
# self._in_ds
# and uppx < x_gt
# ):
# should_ds = False
# should_redraw = True
# self._pi.addItem(curve)
curve.show()
# if self._in_ds:
# should_ds = True
self._in_ds = True
# # no curve change
# return should_ds, should_redraw
elif (
self._in_ds
and xs_in_px < x_gt
):
log.info(f'showing bars graphic {linked.symbol.key}')
# def maybe_downsample(
# self,
# x_gt: float = 2,
curve = self._ds_line
curve.hide()
# self._pi.removeItem(curve)
# ) -> bool:
# '''
# Call this when you want to stop drawing individual
# bars and instead use a ``FastAppendCurve`` intepolation
# line (normally when the width of a bar (aka 1.0 in the x)
# is less then a pixel width on the device).
# XXX: is this actually any faster?
# self._pi.addItem(self)
self.show()
self.update()
# '''
# ds_xy = self._ds_line_xy
# if ds_xy:
# ds_xy.maybe_downsample()
self._in_ds = False
# no curve change
return self._in_ds
# if (
# self._ds_line_xy is not None
# and self._in_ds
# ):
# curve = self.update_ds_line(
# *self._ds_line_xy,
# )
def paint(
self,
@ -657,20 +868,8 @@ class BarItems(pg.GraphicsObject):
p.setPen(self.bars_pen)
p.drawPath(self.path)
profiler('draw history path')
profiler(f'draw history path: {self.path.capacity()}')
# if self.fast_path:
# p.drawPath(self.fast_path)
# profiler('draw fast path')
profiler.finish()
# NOTE: for testing paint frequency as throttled by display loop.
# now = time.time()
# global _last_draw
# print(f'DRAW RATE {1/(now - _last_draw)}')
# _last_draw = now
# import time
# _last_draw: float = time.time()