Compare commits

..

8 Commits

Author SHA1 Message Date
Tyler Goodlet cc50932c4f TOQUASH: drop display loop old .update_ohlc_.. 2022-04-03 23:36:30 -04:00
Tyler Goodlet c62d3dd82c Add profiling to xrange update loop 2022-04-03 23:35:53 -04:00
Tyler Goodlet 7d664c55ff Drop old `pyqtgraph` downsample code 2022-04-03 23:35:32 -04:00
Tyler Goodlet 024d3661a0 Port to new `.update_graphics_from_array()`, pause quote updates on chart interaction 2022-04-03 23:34:55 -04:00
Tyler Goodlet 9befc1fb1a Toy with caching ds data, probably will revert.. 2022-04-03 23:30:10 -04:00
Tyler Goodlet 54a1397d2c If only drawing bars in view we can wait longer to ds 2022-04-03 23:29:04 -04:00
Tyler Goodlet 08d7f925b9 Establish stream before `fsp_compute` so that backfill updates work again.. 2022-04-03 23:28:30 -04:00
Tyler Goodlet 25891c6e51 WIP only-in-view paths 2022-04-03 18:00:04 -04:00
8 changed files with 962 additions and 541 deletions

View File

@ -76,7 +76,6 @@ async def filter_quotes_by_sym(
async def fsp_compute( async def fsp_compute(
ctx: tractor.Context,
symbol: Symbol, symbol: Symbol,
feed: Feed, feed: Feed,
quote_stream: trio.abc.ReceiveChannel, quote_stream: trio.abc.ReceiveChannel,
@ -86,7 +85,7 @@ async def fsp_compute(
func: Callable, func: Callable,
attach_stream: bool = False, # attach_stream: bool = False,
task_status: TaskStatus[None] = trio.TASK_STATUS_IGNORED, task_status: TaskStatus[None] = trio.TASK_STATUS_IGNORED,
) -> None: ) -> None:
@ -193,46 +192,47 @@ async def fsp_compute(
profiler(f'{func_name} pushed history') profiler(f'{func_name} pushed history')
profiler.finish() profiler.finish()
# TODO: UGH, what is the right way to do something like this?
if not ctx._started_called:
await ctx.started(index)
# setup a respawn handle # setup a respawn handle
with trio.CancelScope() as cs: with trio.CancelScope() as cs:
# TODO: might be better to just make a "restart" method where
# the target task is spawned implicitly and then the event is
# set via some higher level api? At that poing we might as well
# be writing a one-cancels-one nursery though right?
tracker = TaskTracker(trio.Event(), cs) tracker = TaskTracker(trio.Event(), cs)
task_status.started((tracker, index)) task_status.started((tracker, index))
profiler(f'{func_name} yield last index') profiler(f'{func_name} yield last index')
# import time # import time
# last = time.time() # last = time.time()
try: try:
# rt stream
async with ctx.open_stream() as stream:
# always trigger UI refresh after history update, async for processed in out_stream:
# see ``piker.ui._fsp.FspAdmin.open_chain()`` and
# ``piker.ui._display.trigger_update()``.
await stream.send('update')
async for processed in out_stream: log.debug(f"{func_name}: {processed}")
key, output = processed
index = src.index
dst.array[-1][key] = output
log.debug(f"{func_name}: {processed}") # NOTE: for now we aren't streaming this to the consumer
key, output = processed # stream latest array index entry which basically just acts
index = src.index # as trigger msg to tell the consumer to read from shm
dst.array[-1][key] = output # TODO: further this should likely be implemented much
# like our `Feed` api where there is one background
# "service" task which computes output and then sends to
# N-consumers who subscribe for the real-time output,
# which we'll likely want to implement using local-mem
# chans for the fan out?
# if attach_stream:
# await client_stream.send(index)
# NOTE: for now we aren't streaming this to the consumer # period = time.time() - last
# stream latest array index entry which basically just acts # hz = 1/period if period else float('nan')
# as trigger msg to tell the consumer to read from shm # if hz > 60:
if attach_stream: # log.info(f'FSP quote too fast: {hz}')
await stream.send(index) # last = time.time()
# period = time.time() - last
# hz = 1/period if period else float('nan')
# if hz > 60:
# log.info(f'FSP quote too fast: {hz}')
# last = time.time()
finally: finally:
tracker.complete.set() tracker.complete.set()
@ -323,7 +323,6 @@ async def cascade(
fsp_target = partial( fsp_target = partial(
fsp_compute, fsp_compute,
ctx=ctx,
symbol=symbol, symbol=symbol,
feed=feed, feed=feed,
quote_stream=quote_stream, quote_stream=quote_stream,
@ -332,7 +331,7 @@ async def cascade(
src=src, src=src,
dst=dst, dst=dst,
# func_name=func_name, # target
func=func func=func
) )
@ -344,90 +343,113 @@ async def cascade(
profiler(f'{func_name}: fsp up') profiler(f'{func_name}: fsp up')
async def resync(tracker: TaskTracker) -> tuple[TaskTracker, int]: # sync client
# TODO: adopt an incremental update engine/approach await ctx.started(index)
# where possible here eventually!
log.warning(f're-syncing fsp {func_name} to source')
tracker.cs.cancel()
await tracker.complete.wait()
return await n.start(fsp_target)
def is_synced( # XXX: rt stream with client which we MUST
src: ShmArray, # open here (and keep it open) in order to make
dst: ShmArray # incremental "updates" as history prepends take
) -> tuple[bool, int, int]: # place.
'''Predicate to dertmine if a destination FSP async with ctx.open_stream() as client_stream:
output array is aligned to its source array.
''' # TODO: these likely should all become
step_diff = src.index - dst.index # methods of this ``TaskLifetime`` or wtv
len_diff = abs(len(src.array) - len(dst.array)) # abstraction..
return not ( async def resync(
# the source is likely backfilling and we must tracker: TaskTracker,
# sync history calculations
len_diff > 2 or
# we aren't step synced to the source and may be ) -> tuple[TaskTracker, int]:
# leading/lagging by a step # TODO: adopt an incremental update engine/approach
step_diff > 1 or # where possible here eventually!
step_diff < 0 log.warning(f're-syncing fsp {func_name} to source')
), step_diff, len_diff tracker.cs.cancel()
await tracker.complete.wait()
tracker, index = await n.start(fsp_target)
async def poll_and_sync_to_step( # always trigger UI refresh after history update,
# see ``piker.ui._fsp.FspAdmin.open_chain()`` and
# ``piker.ui._display.trigger_update()``.
await client_stream.send('update')
return tracker, index
tracker: TaskTracker, def is_synced(
src: ShmArray, src: ShmArray,
dst: ShmArray, dst: ShmArray
) -> tuple[bool, int, int]:
'''Predicate to dertmine if a destination FSP
output array is aligned to its source array.
) -> tuple[TaskTracker, int]: '''
step_diff = src.index - dst.index
len_diff = abs(len(src.array) - len(dst.array))
return not (
# the source is likely backfilling and we must
# sync history calculations
len_diff > 2 or
# we aren't step synced to the source and may be
# leading/lagging by a step
step_diff > 1 or
step_diff < 0
), step_diff, len_diff
async def poll_and_sync_to_step(
tracker: TaskTracker,
src: ShmArray,
dst: ShmArray,
) -> tuple[TaskTracker, int]:
synced, step_diff, _ = is_synced(src, dst)
while not synced:
tracker, index = await resync(tracker)
synced, step_diff, _ = is_synced(src, dst) synced, step_diff, _ = is_synced(src, dst)
while not synced:
tracker, index = await resync(tracker)
synced, step_diff, _ = is_synced(src, dst)
return tracker, step_diff return tracker, step_diff
s, step, ld = is_synced(src, dst) s, step, ld = is_synced(src, dst)
# detect sample period step for subscription to increment # detect sample period step for subscription to increment
# signal # signal
times = src.array['time'] times = src.array['time']
delay_s = times[-1] - times[times != times[-1]][-1] delay_s = times[-1] - times[times != times[-1]][-1]
# Increment the underlying shared memory buffer on every # Increment the underlying shared memory buffer on every
# "increment" msg received from the underlying data feed. # "increment" msg received from the underlying data feed.
async with feed.index_stream(int(delay_s)) as istream: async with feed.index_stream(
int(delay_s)
) as istream:
profiler(f'{func_name}: sample stream up') profiler(f'{func_name}: sample stream up')
profiler.finish() profiler.finish()
async for _ in istream: async for _ in istream:
# respawn the compute task if the source # respawn the compute task if the source
# array has been updated such that we compute # array has been updated such that we compute
# new history from the (prepended) source. # new history from the (prepended) source.
synced, step_diff, _ = is_synced(src, dst) synced, step_diff, _ = is_synced(src, dst)
if not synced: if not synced:
tracker, step_diff = await poll_and_sync_to_step( tracker, step_diff = await poll_and_sync_to_step(
tracker, tracker,
src, src,
dst, dst,
) )
# skip adding a last bar since we should already # skip adding a last bar since we should already
# be step alinged # be step alinged
if step_diff == 0: if step_diff == 0:
continue continue
# read out last shm row, copy and write new row # read out last shm row, copy and write new row
array = dst.array array = dst.array
# some metrics like vlm should be reset # some metrics like vlm should be reset
# to zero every step. # to zero every step.
if zero_on_step: if zero_on_step:
last = zeroed last = zeroed
else: else:
last = array[-1:].copy() last = array[-1:].copy()
dst.push(last) dst.push(last)

View File

@ -838,8 +838,12 @@ class ChartPlotWidget(pg.PlotWidget):
''' '''
l, r = self.view_range() l, r = self.view_range()
array = self._arrays[self.name] array = self._arrays[self.name]
lbar = max(l, array[0]['index']) start, stop = self._xrange = (
rbar = min(r, array[-1]['index']) array[0]['index'],
array[-1]['index'],
)
lbar = max(l, start)
rbar = min(r, stop)
return l, lbar, rbar, r return l, lbar, rbar, r
def curve_width_pxs( def curve_width_pxs(
@ -907,7 +911,7 @@ class ChartPlotWidget(pg.PlotWidget):
return return
xfirst, xlast = index[0], index[-1] xfirst, xlast = index[0], index[-1]
brange = l, lbar, rbar, r = self.bars_range() l, lbar, rbar, r = self.bars_range()
marker_pos, l1_len = self.pre_l1_xs() marker_pos, l1_len = self.pre_l1_xs()
end = xlast + l1_len + 1 end = xlast + l1_len + 1
@ -986,7 +990,8 @@ class ChartPlotWidget(pg.PlotWidget):
graphics = BarItems( graphics = BarItems(
self.linked, self.linked,
self.plotItem, self.plotItem,
pen_color=self.pen_color pen_color=self.pen_color,
name=name,
) )
# adds all bar/candle graphics objects for each data point in # adds all bar/candle graphics objects for each data point in
@ -1175,29 +1180,34 @@ class ChartPlotWidget(pg.PlotWidget):
) )
return last return last
def update_ohlc_from_array( # def update_ohlc_from_array(
# self,
# graphics_name: str,
# array: np.ndarray,
# **kwargs,
# ) -> pg.GraphicsObject:
# '''
# Update the named internal graphics from ``array``.
# '''
# self._index = array['index'][0]
# self._arrays[self.name] = array
# graphics = self._graphics[graphics_name]
# graphics.update_from_array(array, **kwargs)
# return graphics
# def update_curve_from_array(
def update_graphics_from_array(
self, self,
graphics_name: str, graphics_name: str,
array: np.ndarray,
**kwargs,
) -> pg.GraphicsObject: array: Optional[np.ndarray] = None,
'''
Update the named internal graphics from ``array``.
'''
self._arrays[self.name] = array
graphics = self._graphics[graphics_name]
graphics.update_from_array(array, **kwargs)
return graphics
def update_curve_from_array(
self,
graphics_name: str,
array: np.ndarray,
array_key: Optional[str] = None, array_key: Optional[str] = None,
**kwargs, **kwargs,
) -> pg.GraphicsObject: ) -> pg.GraphicsObject:
@ -1205,31 +1215,64 @@ class ChartPlotWidget(pg.PlotWidget):
Update the named internal graphics from ``array``. Update the named internal graphics from ``array``.
''' '''
assert len(array) if array is not None:
assert len(array)
data_key = array_key or graphics_name data_key = array_key or graphics_name
if graphics_name not in self._flows: if graphics_name not in self._flows:
self._arrays[self.name] = array data_key = self.name
else:
if array is not None:
# write array to internal graphics table
self._arrays[data_key] = array self._arrays[data_key] = array
else:
array = self._arrays[data_key]
curve = self._graphics[graphics_name] # array key and graphics "name" might be different..
graphics = self._graphics[graphics_name]
# NOTE: back when we weren't implementing the curve graphics # compute "in-view" indices
# ourselves you'd have updates using this method: l, lbar, rbar, r = self.bars_range()
# curve.setData(y=array[graphics_name], x=array['index'], **kwargs) indexes = array['index']
ifirst = indexes[0]
ilast = indexes[-1]
# NOTE: graphics **must** implement a diff based update lbar_i = max(l, ifirst) - ifirst
# operation where an internal ``FastUpdateCurve._xrange`` is rbar_i = min(r, ilast) - ifirst
# used to determine if the underlying path needs to be
# pre/ap-pended.
curve.update_from_array(
x=array['index'],
y=array[data_key],
**kwargs
)
return curve in_view = array[lbar_i: rbar_i]
if not in_view.size:
return graphics
# TODO: we could do it this way as well no?
# to_draw = array[lbar - ifirst:(rbar - ifirst) + 1]
# start_index = self._index
# lbar = max(l, start_index) - start_index
# rbar = min(r, ohlc[-1]['index']) - start_index
if isinstance(graphics, BarItems):
graphics.update_from_array(
array,
in_view,
view_range=(lbar_i, rbar_i),
**kwargs,
)
else:
graphics.update_from_array(
x=array['index'],
y=array[data_key],
x_iv=in_view['index'],
y_iv=in_view[data_key],
view_range=(lbar_i, rbar_i),
**kwargs
)
return graphics
# def _label_h(self, yhigh: float, ylow: float) -> float: # def _label_h(self, yhigh: float, ylow: float) -> float:
# # compute contents label "height" in view terms # # compute contents label "height" in view terms
@ -1260,6 +1303,9 @@ class ChartPlotWidget(pg.PlotWidget):
# print(f"bounds (ylow, yhigh): {(ylow, yhigh)}") # print(f"bounds (ylow, yhigh): {(ylow, yhigh)}")
# TODO: pretty sure we can just call the cursor
# directly not? i don't wee why we need special "signal proxies"
# for this lul..
def enterEvent(self, ev): # noqa def enterEvent(self, ev): # noqa
# pg.PlotWidget.enterEvent(self, ev) # pg.PlotWidget.enterEvent(self, ev)
self.sig_mouse_enter.emit(self) self.sig_mouse_enter.emit(self)

View File

@ -106,53 +106,6 @@ def trace_hl(
return out return out
def downsample(
x: np.ndarray,
y: np.ndarray,
bins: int = 2,
method: str = 'peak',
**kwargs,
) -> tuple[np.ndarray, np.ndarray]:
'''
Downsample x/y data for lesser curve graphics gen.
The "peak" method is originally copied verbatim from
``pyqtgraph.PlotDataItem.getDisplayDataset()`` which gets
all credit, though we will likely drop this in favor of the M4
algo below.
'''
# py3.10 syntax
match method:
case 'peak':
if bins < 2:
log.warning('No downsampling taking place?')
ds = bins
n = len(x) // ds
x1 = np.empty((n, 2))
# start of x-values; try to select a somewhat centered point
stx = ds // 2
x1[:] = x[stx:stx+n*ds:ds, np.newaxis]
x = x1.reshape(n*2)
y1 = np.empty((n, 2))
y2 = y[:n*ds].reshape((n, ds))
y1[:, 0] = y2.max(axis=1)
y1[:, 1] = y2.min(axis=1)
y = y1.reshape(n*2)
return x, y
case 'm4':
return ds_m4(x, y, kwargs['px_width'])
def ohlc_flatten( def ohlc_flatten(
ohlc: np.ndarray, ohlc: np.ndarray,
use_mxmn: bool = False, use_mxmn: bool = False,

View File

@ -144,6 +144,8 @@ class FastAppendCurve(pg.GraphicsObject):
self.use_fpath = use_fpath self.use_fpath = use_fpath
self.fast_path: Optional[QtGui.QPainterPath] = None self.fast_path: Optional[QtGui.QPainterPath] = None
self._ds_cache: dict = {}
# TODO: we can probably just dispense with the parent since # TODO: we can probably just dispense with the parent since
# we're basically only using the pen setting now... # we're basically only using the pen setting now...
super().__init__(*args, **kwargs) super().__init__(*args, **kwargs)
@ -214,52 +216,61 @@ class FastAppendCurve(pg.GraphicsObject):
vr = self.viewRect() vr = self.viewRect()
l, r = int(vr.left()), int(vr.right()) l, r = int(vr.left()), int(vr.right())
if not self._xrange:
return 0
start, stop = self._xrange start, stop = self._xrange
lbar = max(l, start) lbar = max(l, start)
rbar = min(r, stop) rbar = min(r, stop)
return vb.mapViewToDevice( return round(vb.mapViewToDevice(
QLineF(lbar, 0, rbar, 0) QLineF(lbar, 0, rbar, 0)
).length() ).length())
def should_ds_or_redraw( # def should_ds_or_redraw(
self, # self,
) -> tuple[bool, bool]: # ) -> tuple[bool, bool]:
uppx = self.x_uppx() # uppx = self.x_uppx()
px_width = self.px_width() # px_width = self.px_width()
# uppx_diff = abs(uppx - self._last_uppx) # if not px_width:
uppx_diff = (uppx - self._last_uppx) # return False, False
self._last_uppx = uppx
should_redraw: bool = False # # uppx_diff = abs(uppx - self._last_uppx)
should_ds: bool = False # uppx_diff = (uppx - self._last_uppx)
# self._last_uppx = uppx
# print(uppx_diff) # should_redraw: bool = False
# should_ds: bool = self._in_ds
if ( # # print(uppx_diff)
uppx <= 8
):
# trigger redraw or original non-downsampled data
if self._in_ds:
print('REVERTING BACK TO SRC DATA')
# clear downsampled curve(s) and expect
# refresh of path segments.
should_redraw = True
elif ( # if (
uppx_diff >= 4 # uppx <= 8
or uppx_diff <= -2 # ):
or self._step_mode and abs(uppx_diff) >= 1 # # trigger redraw or original non-downsampled data
): # if self._in_ds:
log.info( # print('REVERTING BACK TO SRC DATA')
f'{self._name} downsampler change: {self._last_uppx} -> {uppx}' # # clear downsampled curve(s) and expect
) # # refresh of path segments.
should_ds = {'px_width': px_width, 'uppx': uppx} # should_redraw = True
should_redraw = True
return should_ds, should_redraw # elif (
# uppx_diff >= 1
# or uppx_diff <= -1
# or self._step_mode and abs(uppx_diff) >= 1
# ):
# log.info(
# f'{self._name} downsampler change: {self._last_uppx} -> {uppx}'
# )
# should_ds = {'px_width': px_width, 'uppx': uppx}
# should_redraw = True
# if should_ds:
# should_ds = {'px_width': px_width, 'uppx': uppx}
# return should_ds, should_redraw
def downsample( def downsample(
self, self,
@ -286,7 +297,7 @@ class FastAppendCurve(pg.GraphicsObject):
y = y.flatten() y = y.flatten()
# presumably? # presumably?
self._in_ds = True # self._in_ds = True
return x, y return x, y
def maybe_downsample( def maybe_downsample(
@ -303,9 +314,17 @@ class FastAppendCurve(pg.GraphicsObject):
def update_from_array( def update_from_array(
self, self,
# full array input history
x: np.ndarray, x: np.ndarray,
y: np.ndarray, y: np.ndarray,
# pre-sliced array data that's "in view"
x_iv: np.ndarray,
y_iv: np.ndarray,
view_range: Optional[tuple[int, int]] = None,
) -> QtGui.QPainterPath: ) -> QtGui.QPainterPath:
''' '''
Update curve from input 2-d data. Update curve from input 2-d data.
@ -315,87 +334,197 @@ class FastAppendCurve(pg.GraphicsObject):
''' '''
profiler = pg.debug.Profiler( profiler = pg.debug.Profiler(
msg=f'{self._name}.update_from_array()', msg=f'FastAppendCurve.update_from_array(): `{self._name}`',
disabled=not pg_profile_enabled(), disabled=not pg_profile_enabled(),
gt=ms_slower_then, gt=ms_slower_then,
) )
flip_cache = False flip_cache = False
draw_full_path = True
if self._xrange: # XXX: lol brutal, the internals of `CurvePoint` (inherited by
istart, istop = self._xrange # our `LineDot`) required ``.getData()`` to work..
else: self.xData = x
self._xrange = istart, istop = x[0], x[-1] self.yData = y
# print(f"xrange: {self._xrange}")
should_ds, should_redraw = self.should_ds_or_redraw() # update internal array refs
self._x, self._y = x, y
# compute the length diffs between the first/last index entry in # compute the length diffs between the first/last index entry in
# the input data and the last indexes we have on record from the # the input data and the last indexes we have on record from the
# last time we updated the curve index. # last time we updated the curve index.
if self._xrange:
istart, istop = self._xrange
else:
self._xrange = istart, istop = x[0], x[-1]
prepend_length = int(istart - x[0]) prepend_length = int(istart - x[0])
append_length = int(x[-1] - istop) append_length = int(x[-1] - istop)
# print(f"xrange: {self._xrange}")
if view_range:
li, ri = view_range
# x, y = x[lbar:rbar], y[lbar:rbar]
# x, y = x_iv, y_iv
profiler(f'view range slice {view_range}')
# if self._name == 'OHLC':
# print(f'view range slice {view_range}')
# ds state checking
uppx = self.x_uppx()
px_width = self.px_width()
uppx_diff = (uppx - self._last_uppx)
# step mode: draw flat top discrete "step"
# over the index space for each datum.
if self._step_mode:
# TODO: numba this bish
# x_out, y_out = step_path_arrays_from_1d(
# x[:-1], y[:-1]
# )
x_iv_out, y_iv_out = step_path_arrays_from_1d(
x_iv[:-1], y_iv[:-1]
)
profiler('generated step arrays')
else:
# by default we only pull data up to the last (current) index
# x_out, y_out = x[:-1], y[:-1]
x_iv_out, y_iv_out = x_iv[:-1], y_iv[:-1]
profiler('sliced array history')
# by default plan to draw the source ouput that's "in view"
x_to_path, y_to_path = x_iv_out, y_iv_out
ds_key = px_width, uppx
# always re-ds if we were dsed but the input range changes.
if self._in_ds:
# slice out the portion of the downsampled data that is
# "in view" and **only** draw a path for that.
entry = self._ds_cache.get(ds_key)
if entry:
x_ds_out, y_ds_out, first_i, last_i = entry
# if last_i == x[-1]:
log.info(
f'{self._name} has cached ds {ds_key} -> {entry}'
)
prepend_length = int(first_i - ri)
append_length = int(ri - last_i)
# x_to_path = x_ds_out
# y_to_path = y_ds_out
# else:
# log.warn(f'{self._name} ds updates unhandled!')
# DS only the new part?
# check for downsampling conditions
if (
# std m4 downsample conditions
uppx_diff >= 4
or uppx_diff <= -2
or self._step_mode and abs(uppx_diff) >= 2
# or self._in_ds and px_width > 1
):
# if not uppx_diff >= 1:
log.info(
f'{self._name} sampler change: {self._last_uppx} -> {uppx}'
)
self._last_uppx = uppx
# should_ds = {'px_width': px_width, 'uppx': uppx}
# if self._step_mode:
# # TODO: numba this bish
# x_out, y_out = step_path_arrays_from_1d(
# x_iv[:-1], y_iv[:-1]
# )
# else:
# # by default we only pull data up to the last (current) index
# x_out, y_out = x_iv[:-1], y_iv[:-1]
x_ds_out, y_ds_out = self.downsample(
x_iv_out,
y_iv_out,
px_width=px_width,
uppx=uppx,
)
profiler(
f'path downsample ds_key={ds_key}\n'
f'{x_iv_out.size}, {y_iv_out.size}'
)
# cache downsampled outputs
self._ds_cache[ds_key] = (
x_ds_out,
y_ds_out,
x[0],
x[-1],
)
x_to_path = x_ds_out
y_to_path = y_ds_out
self._in_ds = True
elif (
uppx <= 8
and self._in_ds
):
# we should de-downsample back to our original
# source data so we clear our path data in prep
# to generate a new one from original source data.
if self.path:
self.path.clear()
if self.fast_path:
self.fast_path.clear()
log.info(f'DEDOWN -> {self._name}')
profiler('path reversion to non-ds data')
self._in_ds = False
# render path graphics
# log.info(
# # f'{self._name}: last sizes {x_to_path.size}, {y_to_path.size}',
# f'{self._name}: sizes {x_to_path.size}, {y_to_path.size}',
# )
self._last_topaths = x_to_path, y_to_path
no_path_yet = self.path is None no_path_yet = self.path is None
if ( if draw_full_path:
should_redraw or should_ds
or self.path is None
or prepend_length > 0
):
# step mode: draw flat top discrete "step"
# over the index space for each datum.
if self._step_mode:
x_out, y_out = step_path_arrays_from_1d(
x[:-1], y[:-1]
)
profiler('generated step arrays')
else:
# by default we only pull data up to the last (current) index
x_out, y_out = x[:-1], y[:-1]
if should_ds:
x_out, y_out = self.downsample(
x_out,
y_out,
**should_ds,
)
profiler(f'path downsample redraw={should_ds}')
self._in_ds = True
if should_redraw:
profiler('path reversion to non-ds')
if self.path:
self.path.clear()
if self.fast_path:
self.fast_path.clear()
if should_redraw and not should_ds:
log.info(f'DEDOWN -> {self._name}')
self._in_ds = False
# else:
self.path = pg.functions.arrayToQPath( self.path = pg.functions.arrayToQPath(
x_out, x_to_path,
y_out, y_to_path,
connect='all', connect='all',
finiteCheck=False, finiteCheck=False,
path=self.path, path=self.path,
) )
# reserve mem allocs see: profiler('generated FULL PATH -> {self._name}')
# - https://doc.qt.io/qt-5/qpainterpath.html#reserve
# - https://doc.qt.io/qt-5/qpainterpath.html#capacity
# - https://doc.qt.io/qt-5/qpainterpath.html#clear
# XXX: right now this is based on had hoc checks on a
# hidpi 3840x2160 4k monitor but we should optimize for
# the target display(s) on the sys.
if no_path_yet:
self.path.reserve(int(500e3))
profiler('generated fresh path') # reserve mem allocs see:
# - https://doc.qt.io/qt-5/qpainterpath.html#reserve
# - https://doc.qt.io/qt-5/qpainterpath.html#capacity
# - https://doc.qt.io/qt-5/qpainterpath.html#clear
# XXX: right now this is based on had hoc checks on a
# hidpi 3840x2160 4k monitor but we should optimize for
# the target display(s) on the sys.
if no_path_yet:
self.path.reserve(int(500e3))
# if self._step_mode: self._last_vr = view_range
# self.path.closeSubpath()
# TODO: get this piecewise prepend working - right now it's # TODO: get this piecewise prepend working - right now it's
# giving heck on vwap... # giving heck on vwap...
@ -414,65 +543,65 @@ class FastAppendCurve(pg.GraphicsObject):
# # self.path.moveTo(new_x[0], new_y[0]) # # self.path.moveTo(new_x[0], new_y[0])
# self.path.connectPath(old_path) # self.path.connectPath(old_path)
elif ( # elif (
append_length > 0 # append_length > 0
): # ):
if self._step_mode: # if self._step_mode:
new_x, new_y = step_path_arrays_from_1d( # new_x, new_y = step_path_arrays_from_1d(
x[-append_length - 2:-1], # x[-append_length - 2:-1],
y[-append_length - 2:-1], # y[-append_length - 2:-1],
) # )
# [1:] since we don't need the vertical line normally at # # [1:] since we don't need the vertical line normally at
# the beginning of the step curve taking the first (x, # # the beginning of the step curve taking the first (x,
# y) poing down to the x-axis **because** this is an # # y) poing down to the x-axis **because** this is an
# appended path graphic. # # appended path graphic.
new_x = new_x[1:] # new_x = new_x[1:]
new_y = new_y[1:] # new_y = new_y[1:]
else: # else:
# print(f"append_length: {append_length}") # # print(f"append_length: {append_length}")
new_x = x[-append_length - 2:-1] # new_x = x[-append_length - 2:-1]
new_y = y[-append_length - 2:-1] # new_y = y[-append_length - 2:-1]
# print((new_x, new_y)) # # print((new_x, new_y))
profiler('diffed append arrays') # profiler('diffed append arrays')
if should_ds: # if should_ds:
new_x, new_y = self.downsample( # new_x, new_y = self.downsample(
new_x, # new_x,
new_y, # new_y,
**should_ds, # **should_ds,
) # )
profiler(f'fast path downsample redraw={should_ds}') # profiler(f'fast path downsample redraw={should_ds}')
append_path = pg.functions.arrayToQPath( # append_path = pg.functions.arrayToQPath(
new_x, # new_x,
new_y, # new_y,
connect='all', # connect='all',
finiteCheck=False, # finiteCheck=False,
path=self.fast_path, # path=self.fast_path,
) # )
if self.use_fpath: # if self.use_fpath:
# an attempt at trying to make append-updates faster.. # # an attempt at trying to make append-updates faster..
if self.fast_path is None: # if self.fast_path is None:
self.fast_path = append_path # self.fast_path = append_path
self.fast_path.reserve(int(6e3)) # self.fast_path.reserve(int(6e3))
else: # else:
self.fast_path.connectPath(append_path) # self.fast_path.connectPath(append_path)
size = self.fast_path.capacity() # size = self.fast_path.capacity()
profiler(f'connected fast path w size: {size}') # profiler(f'connected fast path w size: {size}')
# print(f"append_path br: {append_path.boundingRect()}") # # print(f"append_path br: {append_path.boundingRect()}")
# self.path.moveTo(new_x[0], new_y[0]) # # self.path.moveTo(new_x[0], new_y[0])
# path.connectPath(append_path) # # path.connectPath(append_path)
# XXX: lol this causes a hang.. # # XXX: lol this causes a hang..
# self.path = self.path.simplified() # # self.path = self.path.simplified()
else: # else:
size = self.path.capacity() # size = self.path.capacity()
profiler(f'connected history path w size: {size}') # profiler(f'connected history path w size: {size}')
self.path.connectPath(append_path) # self.path.connectPath(append_path)
# other merging ideas: # other merging ideas:
# https://stackoverflow.com/questions/8936225/how-to-merge-qpainterpaths # https://stackoverflow.com/questions/8936225/how-to-merge-qpainterpaths
@ -487,8 +616,8 @@ class FastAppendCurve(pg.GraphicsObject):
# # XXX: super slow set "union" op # # XXX: super slow set "union" op
# self.path = self.path.united(append_path).simplified() # self.path = self.path.united(append_path).simplified()
# self.disable_cache() # self.disable_cache()
# flip_cache = True # flip_cache = True
# XXX: do we need this any more? # XXX: do we need this any more?
# if ( # if (
@ -497,12 +626,7 @@ class FastAppendCurve(pg.GraphicsObject):
# self.disable_cache() # self.disable_cache()
# flip_cache = True # flip_cache = True
# XXX: lol brutal, the internals of `CurvePoint` (inherited by x_last = x[-1]
# our `LineDot`) required ``.getData()`` to work..
self.xData = x
self.yData = y
x0, x_last = self._xrange = x[0], x[-1]
y_last = y[-1] y_last = y[-1]
# draw the "current" step graphic segment so it lines up with # draw the "current" step graphic segment so it lines up with
@ -540,8 +664,6 @@ class FastAppendCurve(pg.GraphicsObject):
# XXX: seems to be needed to avoid artifacts (see above). # XXX: seems to be needed to avoid artifacts (see above).
self.setCacheMode(QGraphicsItem.DeviceCoordinateCache) self.setCacheMode(QGraphicsItem.DeviceCoordinateCache)
self._x, self._y = x, y
# XXX: lol brutal, the internals of `CurvePoint` (inherited by # XXX: lol brutal, the internals of `CurvePoint` (inherited by
# our `LineDot`) required ``.getData()`` to work.. # our `LineDot`) required ``.getData()`` to work..
def getData(self): def getData(self):
@ -636,6 +758,7 @@ class FastAppendCurve(pg.GraphicsObject):
profiler = pg.debug.Profiler( profiler = pg.debug.Profiler(
msg=f'FastAppendCurve.paint(): `{self._name}`', msg=f'FastAppendCurve.paint(): `{self._name}`',
disabled=not pg_profile_enabled(), disabled=not pg_profile_enabled(),
# disabled=True,
gt=ms_slower_then, gt=ms_slower_then,
) )

View File

@ -53,6 +53,10 @@ from ._forms import (
mk_order_pane_layout, mk_order_pane_layout,
) )
from .order_mode import open_order_mode from .order_mode import open_order_mode
# from .._profile import (
# pg_profile_enabled,
# ms_slower_then,
# )
from ..log import get_logger from ..log import get_logger
log = get_logger(__name__) log = get_logger(__name__)
@ -284,6 +288,12 @@ async def graphics_update_loop(
chart.pause_all_feeds() chart.pause_all_feeds()
continue continue
ic = chart.view._ic
if ic:
chart.pause_all_feeds()
await ic.wait()
chart.resume_all_feeds()
# sync call to update all graphics/UX components. # sync call to update all graphics/UX components.
graphics_update_cycle(ds) graphics_update_cycle(ds)
@ -299,8 +309,9 @@ def graphics_update_cycle(
profiler = pg.debug.Profiler( profiler = pg.debug.Profiler(
disabled=True, # not pg_profile_enabled(), disabled=True, # not pg_profile_enabled(),
delayed=False, gt=1/12 * 1e3,
) )
# unpack multi-referenced components # unpack multi-referenced components
chart = ds.chart chart = ds.chart
vlm_chart = ds.vlm_chart vlm_chart = ds.vlm_chart
@ -409,9 +420,11 @@ def graphics_update_cycle(
if ( if (
# if zoomed out alot don't update the last "bar" # if zoomed out alot don't update the last "bar"
(xpx < update_uppx or i_diff > 0) (xpx < update_uppx or i_diff > 0)
# and r >= i_step and r >= i_step
): ):
vlm_chart.update_curve_from_array('volume', array) # TODO: make it so this doesn't have to be called
# once the $vlm is up?
vlm_chart.update_graphics_from_array('volume', array)
if ( if (
mx_vlm_in_view != vars['last_mx_vlm'] mx_vlm_in_view != vars['last_mx_vlm']
@ -485,7 +498,7 @@ def graphics_update_cycle(
xpx < update_uppx xpx < update_uppx
or i_diff > 0 or i_diff > 0
): ):
chart.update_ohlc_from_array( chart.update_graphics_from_array(
chart.name, chart.name,
array, array,
) )
@ -524,7 +537,7 @@ def graphics_update_cycle(
if wap_in_history: if wap_in_history:
# update vwap overlay line # update vwap overlay line
chart.update_curve_from_array( chart.update_graphics_from_array(
'bar_wap', 'bar_wap',
array, array,
) )

View File

@ -89,7 +89,7 @@ def update_fsp_chart(
# update graphics # update graphics
# NOTE: this does a length check internally which allows it # NOTE: this does a length check internally which allows it
# staying above the last row check below.. # staying above the last row check below..
chart.update_curve_from_array( chart.update_graphics_from_array(
graphics_name, graphics_name,
array, array,
array_key=array_key or graphics_name, array_key=array_key or graphics_name,
@ -425,6 +425,7 @@ class FspAdmin:
) as (ctx, last_index), ) as (ctx, last_index),
ctx.open_stream() as stream, ctx.open_stream() as stream,
): ):
# register output data # register output data
self._registry[ self._registry[
(fqsn, ns_path) (fqsn, ns_path)
@ -440,6 +441,7 @@ class FspAdmin:
async with stream.subscribe() as stream: async with stream.subscribe() as stream:
async for msg in stream: async for msg in stream:
if msg == 'update': if msg == 'update':
log.info(f'Re-syncing graphics for fsp: {ns_path}')
self.linked.graphics_cycle() self.linked.graphics_cycle()
else: else:
log.info(f'recved unexpected fsp engine msg: {msg}') log.info(f'recved unexpected fsp engine msg: {msg}')
@ -674,7 +676,7 @@ async def open_vlm_displays(
last_val_sticky.update_from_data(-1, value) last_val_sticky.update_from_data(-1, value)
vlm_curve = chart.update_curve_from_array( vlm_curve = chart.update_graphics_from_array(
'volume', 'volume',
shm.array, shm.array,
) )

View File

@ -20,6 +20,7 @@ Chart view box primitives
""" """
from __future__ import annotations from __future__ import annotations
from contextlib import asynccontextmanager from contextlib import asynccontextmanager
# import itertools
import time import time
from typing import Optional, Callable from typing import Optional, Callable
@ -36,7 +37,8 @@ from ..log import get_logger
from ._style import _min_points_to_show from ._style import _min_points_to_show
from ._editors import SelectRect from ._editors import SelectRect
from . import _event from . import _event
from ._ohlc import BarItems from .._profile import pg_profile_enabled, ms_slower_then
# from ._ohlc import BarItems
log = get_logger(__name__) log = get_logger(__name__)
@ -319,6 +321,7 @@ async def handle_viewmode_mouse(
): ):
# when in order mode, submit execution # when in order mode, submit execution
# msg.event.accept() # msg.event.accept()
# breakpoint()
view.order_mode.submit_order() view.order_mode.submit_order()
@ -384,6 +387,29 @@ class ChartView(ViewBox):
self.order_mode: bool = False self.order_mode: bool = False
self.setFocusPolicy(QtCore.Qt.StrongFocus) self.setFocusPolicy(QtCore.Qt.StrongFocus)
self._ic = None
def start_ic(
self,
) -> None:
if self._ic is None:
self.chart.pause_all_feeds()
self._ic = trio.Event()
def signal_ic(
self,
*args,
# ev = None,
) -> None:
if args:
print(f'range change dun: {args}')
else:
print('proxy called')
if self._ic:
self._ic.set()
self._ic = None
self.chart.resume_all_feeds()
@asynccontextmanager @asynccontextmanager
async def open_async_input_handler( async def open_async_input_handler(
@ -429,11 +455,6 @@ class ChartView(ViewBox):
def maxmin(self, callback: Callable) -> None: def maxmin(self, callback: Callable) -> None:
self._maxmin = callback self._maxmin = callback
def maybe_downsample_graphics(self):
for graphic in self._chart._graphics.values():
if isinstance(graphic, BarItems):
graphic.maybe_paint_line()
def wheelEvent( def wheelEvent(
self, self,
ev, ev,
@ -542,6 +563,11 @@ class ChartView(ViewBox):
self._resetTarget() self._resetTarget()
self.scaleBy(s, focal) self.scaleBy(s, focal)
self.sigRangeChangedManually.emit(mask) self.sigRangeChangedManually.emit(mask)
# self._ic.set()
# self._ic = None
# self.chart.resume_all_feeds()
ev.accept() ev.accept()
def mouseDragEvent( def mouseDragEvent(
@ -624,6 +650,11 @@ class ChartView(ViewBox):
# XXX: WHY # XXX: WHY
ev.accept() ev.accept()
self.start_ic()
# if self._ic is None:
# self.chart.pause_all_feeds()
# self._ic = trio.Event()
if axis == 1: if axis == 1:
self.chart._static_yrange = 'axis' self.chart._static_yrange = 'axis'
@ -641,6 +672,13 @@ class ChartView(ViewBox):
self.sigRangeChangedManually.emit(self.state['mouseEnabled']) self.sigRangeChangedManually.emit(self.state['mouseEnabled'])
if ev.isFinish():
print('DRAG FINISH')
self.signal_ic()
# self._ic.set()
# self._ic = None
# self.chart.resume_all_feeds()
# WEIRD "RIGHT-CLICK CENTER ZOOM" MODE # WEIRD "RIGHT-CLICK CENTER ZOOM" MODE
elif button & QtCore.Qt.RightButton: elif button & QtCore.Qt.RightButton:
@ -788,11 +826,13 @@ class ChartView(ViewBox):
# iterate those. # iterate those.
# - only register this when certain downsampleable graphics are # - only register this when certain downsampleable graphics are
# "added to scene". # "added to scene".
vb.sigRangeChangedManually.connect(vb.maybe_downsample_graphics) vb.sigXRangeChanged.connect(vb.maybe_downsample_graphics)
# mouse wheel doesn't emit XRangeChanged # mouse wheel doesn't emit XRangeChanged
vb.sigRangeChangedManually.connect(vb._set_yrange) vb.sigRangeChangedManually.connect(vb._set_yrange)
vb.sigResized.connect(vb._set_yrange) # splitter(s) resizing
# splitter(s) resizing
vb.sigResized.connect(vb._set_yrange)
def disable_auto_yrange( def disable_auto_yrange(
self, self,
@ -808,10 +848,33 @@ class ChartView(ViewBox):
''' '''
for graphic in self._chart._graphics.values(): for graphic in self._chart._graphics.values():
# if isinstance(graphic, BarItems): xvec = graphic.pixelVectors()[0]
xpx = graphic.pixelVectors()[0].x() if xvec:
if xpx: xpx = xvec.x()
return xpx if xpx:
return xpx
else: else:
continue continue
return 1.0 return 1.0
def maybe_downsample_graphics(self):
# TODO: a faster single-loop-iterator way of doing this XD
chart = self._chart
# graphics = list(self._chart._graphics.values())
profiler = pg.debug.Profiler(
msg=f'FastAppendCurve.update_from_array(): `{chart.name}`',
disabled=not pg_profile_enabled(),
gt=ms_slower_then,
)
for name, graphics in chart._graphics.items():
# pass in no array which will read and render from the last
# passed array (normally provided by the display loop.)
chart.update_graphics_from_array(name)
profiler(f'updating {name}')
# for graphic in graphics:
# ds_meth = getattr(graphic, 'maybe_downsample', None)
# if ds_meth:
# ds_meth()

View File

@ -157,23 +157,40 @@ def path_arrays_from_ohlc(
def gen_qpath( def gen_qpath(
data, data: np.ndarray,
start, # XXX: do we need this? start: int, # XXX: do we need this?
w, w: float,
path: Optional[QtGui.QPainterPath] = None,
) -> QtGui.QPainterPath: ) -> QtGui.QPainterPath:
path_was_none = path is None
profiler = pg.debug.Profiler( profiler = pg.debug.Profiler(
msg=f'gen_qpath ohlc', msg='gen_qpath ohlc',
disabled=not pg_profile_enabled(), disabled=not pg_profile_enabled(),
gt=ms_slower_then, gt=ms_slower_then,
) )
x, y, c = path_arrays_from_ohlc(data, start, bar_gap=w) x, y, c = path_arrays_from_ohlc(
data,
start,
bar_gap=w,
)
profiler("generate stream with numba") profiler("generate stream with numba")
# TODO: numba the internals of this! # TODO: numba the internals of this!
path = pg.functions.arrayToQPath(x, y, connect=c) path = pg.functions.arrayToQPath(
x,
y,
connect=c,
path=path,
)
# avoid mem allocs if possible
if path_was_none:
path.reserve(path.capacity())
profiler("generate path with arrayToQPath") profiler("generate path with arrayToQPath")
return path return path
@ -206,6 +223,7 @@ class BarItems(pg.GraphicsObject):
self._color = pen_color self._color = pen_color
self.bars_pen = pg.mkPen(hcolor(pen_color), width=1) self.bars_pen = pg.mkPen(hcolor(pen_color), width=1)
self.last_bar_pen = pg.mkPen(hcolor(last_bar_color), width=2) self.last_bar_pen = pg.mkPen(hcolor(last_bar_color), width=2)
self._name = name
self._ds_line_xy: Optional[ self._ds_line_xy: Optional[
tuple[np.ndarray, np.ndarray] tuple[np.ndarray, np.ndarray]
@ -226,6 +244,7 @@ class BarItems(pg.GraphicsObject):
self._xrange: tuple[int, int] self._xrange: tuple[int, int]
self._yrange: tuple[float, float] self._yrange: tuple[float, float]
self._vrange = None
# TODO: don't render the full backing array each time # TODO: don't render the full backing array each time
# self._path_data = None # self._path_data = None
@ -254,7 +273,6 @@ class BarItems(pg.GraphicsObject):
''' '''
hist, last = ohlc[:-1], ohlc[-1] hist, last = ohlc[:-1], ohlc[-1]
self.path = gen_qpath(hist, start, self.w) self.path = gen_qpath(hist, start, self.w)
# save graphics for later reference and keep track # save graphics for later reference and keep track
@ -270,65 +288,101 @@ class BarItems(pg.GraphicsObject):
# up to last to avoid double draw of last bar # up to last to avoid double draw of last bar
self._last_bar_lines = bar_from_ohlc_row(last, self.w) self._last_bar_lines = bar_from_ohlc_row(last, self.w)
x, y = self._ds_line_xy = ohlc_flatten(ohlc)
# self.update_ds_line(
# x,
# y,
# )
# TODO: figuring out the most optimial size for the ideal
# curve-path by,
# - calcing the display's max px width `.screen()`
# - drawing a curve and figuring out it's capacity:
# https://doc.qt.io/qt-5/qpainterpath.html#capacity
# - reserving that cap for each curve-mapped-to-shm with
# - leveraging clearing when needed to redraw the entire
# curve that does not release mem allocs:
# https://doc.qt.io/qt-5/qpainterpath.html#clear
curve = FastAppendCurve(
y=y,
x=x,
name='OHLC',
color=self._color,
)
curve.hide()
self._pi.addItem(curve)
self._ds_line = curve
self._ds_xrange = (index[0], index[-1])
# trigger render # trigger render
# https://doc.qt.io/qt-5/qgraphicsitem.html#update # https://doc.qt.io/qt-5/qgraphicsitem.html#update
self.update() self.update()
x, y = self._ds_line_xy = ohlc_flatten(ohlc)
self.update_ds_line(x, y)
self._ds_xrange = (index[0], index[-1])
return self.path return self.path
def update_ds_line( # def update_ds_line(
self, # self,
x, # x,
y, # y,
) -> FastAppendCurve: # ) -> FastAppendCurve:
# determine current potential downsampling value (based on pixel # # determine current potential downsampling value (based on pixel
# scaling) and return any existing curve for it. # # scaling) and return any existing curve for it.
curve = self._ds_line # curve = self._ds_line
if not curve: # if not curve:
# TODO: figuring out the most optimial size for the ideal # # TODO: figuring out the most optimial size for the ideal
# curve-path by, # # curve-path by,
# - calcing the display's max px width `.screen()` # # - calcing the display's max px width `.screen()`
# - drawing a curve and figuring out it's capacity: # # - drawing a curve and figuring out it's capacity:
# https://doc.qt.io/qt-5/qpainterpath.html#capacity # # https://doc.qt.io/qt-5/qpainterpath.html#capacity
# - reserving that cap for each curve-mapped-to-shm with # # - reserving that cap for each curve-mapped-to-shm with
# - leveraging clearing when needed to redraw the entire # # - leveraging clearing when needed to redraw the entire
# curve that does not release mem allocs: # # curve that does not release mem allocs:
# https://doc.qt.io/qt-5/qpainterpath.html#clear # # https://doc.qt.io/qt-5/qpainterpath.html#clear
curve = FastAppendCurve( # curve = FastAppendCurve(
y=y, # y=y,
x=x, # x=x,
name='OHLC', # name='OHLC',
color=self._color, # color=self._color,
) # )
curve.hide() # curve.hide()
self._pi.addItem(curve) # self._pi.addItem(curve)
self._ds_line = curve # self._ds_line = curve
return curve
# TODO: we should be diffing the amount of new data which # return curve
# needs to be downsampled. Ideally we actually are just
# doing all the ds-ing in sibling actors so that the data
# can just be read and rendered to graphics on events of our
# choice.
# diff = do_diff(ohlc, new_bit)
curve.update_from_array( # # TODO: we should be diffing the amount of new data which
y=y, # # needs to be downsampled. Ideally we actually are just
x=x, # # doing all the ds-ing in sibling actors so that the data
) # # can just be read and rendered to graphics on events of our
return curve # # choice.
# # diff = do_diff(ohlc, new_bit)
# curve.update_from_array(
# y=y,
# x=x,
# x_iv=x,
# y_iv=y,
# view_range=True, # hack
# )
# return curve
def update_from_array( def update_from_array(
self, self,
# full array input history
ohlc: np.ndarray, ohlc: np.ndarray,
just_history=False,
# pre-sliced array data that's "in view"
ohlc_iv: np.ndarray,
view_range: Optional[tuple[int, int]] = None,
) -> None: ) -> None:
''' '''
@ -349,6 +403,19 @@ class BarItems(pg.GraphicsObject):
gt=ms_slower_then, gt=ms_slower_then,
) )
# vr = self.viewRect()
# l, r = int(vr.left()), int(vr.right())
# # l, r = self.view_range()
# # array = self._arrays[self.name]
# indexes = ohlc['index']
# start_index = indexes[0]
# end_index = indexes[-1]
# lbar = max(l, start_index) - start_index
# rbar = min(r, end_index) - start_index
# in_view = ohlc[lbar:rbar]
# self._vrange = lbar, rbar
# index = self.start_index # index = self.start_index
istart, istop = self._xrange istart, istop = self._xrange
ds_istart, ds_istop = self._ds_xrange ds_istart, ds_istop = self._ds_xrange
@ -360,11 +427,149 @@ class BarItems(pg.GraphicsObject):
prepend_length = istart - first_index prepend_length = istart - first_index
append_length = last_index - istop append_length = last_index - istop
ds_prepend_length = ds_istart - first_index # ds_prepend_length = ds_istart - first_index
ds_append_length = last_index - ds_istop # ds_append_length = last_index - ds_istop
flip_cache = False flip_cache = False
x_gt = 6
if self._ds_line:
uppx = self._ds_line.x_uppx()
else:
uppx = 0
should_line = self._in_ds
if (
self._in_ds
and uppx < x_gt
):
should_line = False
elif (
not self._in_ds
and uppx >= x_gt
):
should_line = True
# should_ds, should_redraw = self.should_ds_or_redraw()
# print(
# f'OHLC in line: {self._in_ds}'
# f'OHLC should line: {should_line}\n'
# # f'OHLC should_redraw: {should_redraw}\n'
# )
if (
should_line
):
# update the line graphic
# x, y = self._ds_line_xy = ohlc_flatten(ohlc_iv)
# x, y = self._ds_line_xy = ohlc_flatten(ohlc)
x_iv, y_iv = self._ds_line_xy = ohlc_flatten(ohlc_iv)
profiler('flattening bars to line')
curve = self._ds_line
# curve = self.update_ds_line(x, y)
# TODO: we should be diffing the amount of new data which
# needs to be downsampled. Ideally we actually are just
# doing all the ds-ing in sibling actors so that the data
# can just be read and rendered to graphics on events of our
# choice.
# diff = do_diff(ohlc, new_bit)
curve.update_from_array(
y=x_iv,
x=y_iv,
x_iv=x_iv,
y_iv=y_iv,
view_range=view_range, # hack
)
# we already are showing a line and should be
# self._in_ds
# check if the ds line should be resampled/drawn
# should_ds_line, should_redraw_line = self._ds_line.should_ds_or_redraw()
# print(f'OHLC DS should ds: {should_ds_line}, should_redraw: {should_redraw_line}')
# if (
# # line should be redrawn/sampled
# # should_ds_line or
# # we are flipping to line from bars mode
# not self._in_ds
# ):
# uppx = self._ds_line.x_uppx()
# self._xs_in_px = uppx
if not self._in_ds:
# hide bars and show line
self.hide()
# XXX: is this actually any faster?
# self._pi.removeItem(self)
# TODO: a `.ui()` log level?
log.info(
f'downsampling to line graphic {self._name}'
)
# self._pi.addItem(curve)
curve.show()
curve.update()
self._in_ds = True
# stop here since we don't need to update bars path any more
# as we delegate to the downsample line with updates.
return
elif (
not should_line
and self._in_ds
):
# flip back to bars graphics and hide the downsample line.
log.info(f'showing bars graphic {self._name}')
curve = self._ds_line
curve.hide()
# self._pi.removeItem(curve)
# XXX: is this actually any faster?
# self._pi.addItem(self)
self.show()
self._in_ds = False
# if not self._in_ds and should_ds
# self.hide()
# # XXX: is this actually any faster?
# # self._pi.removeItem(self)
# # this should have been done in the block above
# # x, y = self._ds_line_xy = ohlc_flatten(ohlc_iv)
# # curve = self.update_ds_line(x, y)
# # TODO: a `.ui()` log level?
# log.info(
# f'downsampling to line graphic {self._name}'
# )
# # self._pi.addItem(curve)
# curve.show()
# curve.update()
# self._in_ds = True
# return
# self._in_ds = False
# print('YO NOT DS OHLC')
# generate in_view path
self.path = gen_qpath(
ohlc_iv,
0,
self.w,
# path=self.path,
)
# TODO: to make the downsampling faster # TODO: to make the downsampling faster
# - allow mapping only a range of lines thus only drawing as # - allow mapping only a range of lines thus only drawing as
# many bars as exactly specified. # many bars as exactly specified.
@ -372,87 +577,97 @@ class BarItems(pg.GraphicsObject):
# - maybe move all this embedded logic to a higher # - maybe move all this embedded logic to a higher
# level type? # level type?
fx, fy = self._ds_line_xy # ohlc = in_view
if prepend_length: # if prepend_length:
# new history was added and we need to render a new path # # new history was added and we need to render a new path
prepend_bars = ohlc[:prepend_length] # prepend_bars = ohlc[:prepend_length]
if ds_prepend_length: # if ds_prepend_length:
ds_prepend_bars = ohlc[:ds_prepend_length] # ds_prepend_bars = ohlc[:ds_prepend_length]
pre_x, pre_y = ohlc_flatten(ds_prepend_bars) # pre_x, pre_y = ohlc_flatten(ds_prepend_bars)
fx = np.concatenate((pre_x, fx)) # fx = np.concatenate((pre_x, fx))
fy = np.concatenate((pre_y, fy)) # fy = np.concatenate((pre_y, fy))
profiler('ds line prepend diff complete') # profiler('ds line prepend diff complete')
if append_length: # if append_length:
# generate new graphics to match provided array # # generate new graphics to match provided array
# path appending logic: # # path appending logic:
# we need to get the previous "current bar(s)" for the time step # # we need to get the previous "current bar(s)" for the time step
# and convert it to a sub-path to append to the historical set # # and convert it to a sub-path to append to the historical set
# new_bars = ohlc[istop - 1:istop + append_length - 1] # # new_bars = ohlc[istop - 1:istop + append_length - 1]
append_bars = ohlc[-append_length - 1:-1] # append_bars = ohlc[-append_length - 1:-1]
# print(f'ohlc bars to append size: {append_bars.size}\n') # # print(f'ohlc bars to append size: {append_bars.size}\n')
if ds_append_length: # if ds_append_length:
ds_append_bars = ohlc[-ds_append_length - 1:-1] # ds_append_bars = ohlc[-ds_append_length - 1:-1]
post_x, post_y = ohlc_flatten(ds_append_bars) # post_x, post_y = ohlc_flatten(ds_append_bars)
# print(f'ds curve to append sizes: {(post_x.size, post_y.size)}') # print(
fx = np.concatenate((fx, post_x)) # f'ds curve to append sizes: {(post_x.size, post_y.size)}'
fy = np.concatenate((fy, post_y)) # )
# fx = np.concatenate((fx, post_x))
# fy = np.concatenate((fy, post_y))
profiler('ds line append diff complete') # profiler('ds line append diff complete')
profiler('array diffs complete') profiler('array diffs complete')
# does this work? # does this work?
last = ohlc[-1] last = ohlc[-1]
fy[-1] = last['close'] # fy[-1] = last['close']
# incremental update and cache line datums # # incremental update and cache line datums
self._ds_line_xy = fx, fy # self._ds_line_xy = fx, fy
# maybe downsample to line # maybe downsample to line
ds = self.maybe_downsample() # ds = self.maybe_downsample()
if ds: # if ds:
# if we downsample to a line don't bother with # # if we downsample to a line don't bother with
# any more path generation / updates # # any more path generation / updates
self._ds_xrange = first_index, last_index # self._ds_xrange = first_index, last_index
profiler('downsampled to line') # profiler('downsampled to line')
return # return
# print(in_view.size)
# if self.path:
# self.path = path
# self.path.reserve(path.capacity())
# self.path.swap(path)
# path updates # path updates
if prepend_length: # if prepend_length:
# XXX: SOMETHING IS MAYBE FISHY HERE what with the old_path # # XXX: SOMETHING IS MAYBE FISHY HERE what with the old_path
# y value not matching the first value from # # y value not matching the first value from
# ohlc[prepend_length + 1] ??? # # ohlc[prepend_length + 1] ???
prepend_path = gen_qpath(prepend_bars, 0, self.w) # prepend_path = gen_qpath(prepend_bars, 0, self.w)
old_path = self.path # old_path = self.path
self.path = prepend_path # self.path = prepend_path
self.path.addPath(old_path) # self.path.addPath(old_path)
profiler('path PREPEND') # profiler('path PREPEND')
if append_length: # if append_length:
append_path = gen_qpath(append_bars, 0, self.w) # append_path = gen_qpath(append_bars, 0, self.w)
self.path.moveTo( # self.path.moveTo(
float(istop - self.w), # float(istop - self.w),
float(append_bars[0]['open']) # float(append_bars[0]['open'])
) # )
self.path.addPath(append_path) # self.path.addPath(append_path)
profiler('path APPEND') # profiler('path APPEND')
# fp = self.fast_path # fp = self.fast_path
# if fp is None: # if fp is None:
# self.fast_path = append_path # self.fast_path = append_path
# else: # else:
# fp.moveTo(float(istop - self.w), float(new_bars[0]['open'])) # fp.moveTo(
# fp.addPath(append_path) # float(istop - self.w), float(new_bars[0]['open'])
# )
# self.setCacheMode(QtWidgets.QGraphicsItem.NoCache) # fp.addPath(append_path)
# flip_cache = True
# self.setCacheMode(QtWidgets.QGraphicsItem.NoCache)
# flip_cache = True
self._xrange = first_index, last_index self._xrange = first_index, last_index
@ -559,73 +774,69 @@ class BarItems(pg.GraphicsObject):
) )
def maybe_downsample( # def should_ds_or_redraw(
self, # self,
x_gt: float = 2., # x_gt: float = 2,
) -> bool: # ) -> tuple[bool, bool]:
'''
Call this when you want to stop drawing individual
bars and instead use a ``FastAppendCurve`` intepolation
line (normally when the width of a bar (aka 1.0 in the x)
is less then a pixel width on the device).
''' # curve = self._ds_line
curve = self._ds_line # if not curve:
if not curve: # return False, False
return False
# this is the ``float`` value of the "number of x units" (in # # this is the ``float`` value of the "number of x units" (in
# view coords) that a pixel spans. # # view coords) that a pixel spans.
xs_in_px = self._ds_line.x_uppx() # uppx = self._ds_line.x_uppx()
# print(f'uppx: {uppx}')
linked = self.linked # # linked = self.linked
# should_redraw: bool = False
# should_ds: bool = False
if ( # if (
self._ds_line_xy is not None # not self._in_ds
): # and uppx >= x_gt
curve = self.update_ds_line( # ):
*self._ds_line_xy,
)
if ( # should_ds = True
not self._in_ds # should_redraw = True
and xs_in_px >= x_gt
):
# TODO: a `.ui()` log level?
log.info(
f'downsampling to line graphic {linked.symbol.key}'
)
self.hide()
# XXX: is this actually any faster?
# self._pi.removeItem(self)
self._xs_in_px = xs_in_px # elif (
# self._in_ds
# and uppx < x_gt
# ):
# should_ds = False
# should_redraw = True
# self._pi.addItem(curve) # if self._in_ds:
curve.show() # should_ds = True
self._in_ds = True # # no curve change
# return should_ds, should_redraw
elif ( # def maybe_downsample(
self._in_ds # self,
and xs_in_px < x_gt # x_gt: float = 2,
):
log.info(f'showing bars graphic {linked.symbol.key}')
curve = self._ds_line # ) -> bool:
curve.hide() # '''
# self._pi.removeItem(curve) # Call this when you want to stop drawing individual
# bars and instead use a ``FastAppendCurve`` intepolation
# line (normally when the width of a bar (aka 1.0 in the x)
# is less then a pixel width on the device).
# XXX: is this actually any faster? # '''
# self._pi.addItem(self) # ds_xy = self._ds_line_xy
self.show() # if ds_xy:
self.update() # ds_xy.maybe_downsample()
self._in_ds = False # if (
# self._ds_line_xy is not None
# no curve change # and self._in_ds
return self._in_ds # ):
# curve = self.update_ds_line(
# *self._ds_line_xy,
# )
def paint( def paint(
self, self,
@ -657,20 +868,8 @@ class BarItems(pg.GraphicsObject):
p.setPen(self.bars_pen) p.setPen(self.bars_pen)
p.drawPath(self.path) p.drawPath(self.path)
profiler('draw history path') profiler(f'draw history path: {self.path.capacity()}')
# if self.fast_path: # if self.fast_path:
# p.drawPath(self.fast_path) # p.drawPath(self.fast_path)
# profiler('draw fast path') # profiler('draw fast path')
profiler.finish()
# NOTE: for testing paint frequency as throttled by display loop.
# now = time.time()
# global _last_draw
# print(f'DRAW RATE {1/(now - _last_draw)}')
# _last_draw = now
# import time
# _last_draw: float = time.time()