Compare commits

..

No commits in common. "cc50932c4f26ae557276cbce2d5998a4f6d3ee62" and "3a3baca9bc67d85ad70d7cee95c45ae69a234a0f" have entirely different histories.

8 changed files with 548 additions and 969 deletions

View File

@ -76,6 +76,7 @@ async def filter_quotes_by_sym(
async def fsp_compute( async def fsp_compute(
ctx: tractor.Context,
symbol: Symbol, symbol: Symbol,
feed: Feed, feed: Feed,
quote_stream: trio.abc.ReceiveChannel, quote_stream: trio.abc.ReceiveChannel,
@ -85,7 +86,7 @@ async def fsp_compute(
func: Callable, func: Callable,
# attach_stream: bool = False, attach_stream: bool = False,
task_status: TaskStatus[None] = trio.TASK_STATUS_IGNORED, task_status: TaskStatus[None] = trio.TASK_STATUS_IGNORED,
) -> None: ) -> None:
@ -192,47 +193,46 @@ async def fsp_compute(
profiler(f'{func_name} pushed history') profiler(f'{func_name} pushed history')
profiler.finish() profiler.finish()
# TODO: UGH, what is the right way to do something like this?
if not ctx._started_called:
await ctx.started(index)
# setup a respawn handle # setup a respawn handle
with trio.CancelScope() as cs: with trio.CancelScope() as cs:
# TODO: might be better to just make a "restart" method where
# the target task is spawned implicitly and then the event is
# set via some higher level api? At that poing we might as well
# be writing a one-cancels-one nursery though right?
tracker = TaskTracker(trio.Event(), cs) tracker = TaskTracker(trio.Event(), cs)
task_status.started((tracker, index)) task_status.started((tracker, index))
profiler(f'{func_name} yield last index') profiler(f'{func_name} yield last index')
# import time # import time
# last = time.time() # last = time.time()
try: try:
# rt stream
async with ctx.open_stream() as stream:
async for processed in out_stream: # always trigger UI refresh after history update,
# see ``piker.ui._fsp.FspAdmin.open_chain()`` and
# ``piker.ui._display.trigger_update()``.
await stream.send('update')
log.debug(f"{func_name}: {processed}") async for processed in out_stream:
key, output = processed
index = src.index
dst.array[-1][key] = output
# NOTE: for now we aren't streaming this to the consumer log.debug(f"{func_name}: {processed}")
# stream latest array index entry which basically just acts key, output = processed
# as trigger msg to tell the consumer to read from shm index = src.index
# TODO: further this should likely be implemented much dst.array[-1][key] = output
# like our `Feed` api where there is one background
# "service" task which computes output and then sends to
# N-consumers who subscribe for the real-time output,
# which we'll likely want to implement using local-mem
# chans for the fan out?
# if attach_stream:
# await client_stream.send(index)
# period = time.time() - last # NOTE: for now we aren't streaming this to the consumer
# hz = 1/period if period else float('nan') # stream latest array index entry which basically just acts
# if hz > 60: # as trigger msg to tell the consumer to read from shm
# log.info(f'FSP quote too fast: {hz}') if attach_stream:
# last = time.time() await stream.send(index)
# period = time.time() - last
# hz = 1/period if period else float('nan')
# if hz > 60:
# log.info(f'FSP quote too fast: {hz}')
# last = time.time()
finally: finally:
tracker.complete.set() tracker.complete.set()
@ -323,6 +323,7 @@ async def cascade(
fsp_target = partial( fsp_target = partial(
fsp_compute, fsp_compute,
ctx=ctx,
symbol=symbol, symbol=symbol,
feed=feed, feed=feed,
quote_stream=quote_stream, quote_stream=quote_stream,
@ -331,7 +332,7 @@ async def cascade(
src=src, src=src,
dst=dst, dst=dst,
# target # func_name=func_name,
func=func func=func
) )
@ -343,113 +344,90 @@ async def cascade(
profiler(f'{func_name}: fsp up') profiler(f'{func_name}: fsp up')
# sync client async def resync(tracker: TaskTracker) -> tuple[TaskTracker, int]:
await ctx.started(index) # TODO: adopt an incremental update engine/approach
# where possible here eventually!
log.warning(f're-syncing fsp {func_name} to source')
tracker.cs.cancel()
await tracker.complete.wait()
return await n.start(fsp_target)
# XXX: rt stream with client which we MUST def is_synced(
# open here (and keep it open) in order to make src: ShmArray,
# incremental "updates" as history prepends take dst: ShmArray
# place. ) -> tuple[bool, int, int]:
async with ctx.open_stream() as client_stream: '''Predicate to dertmine if a destination FSP
output array is aligned to its source array.
# TODO: these likely should all become '''
# methods of this ``TaskLifetime`` or wtv step_diff = src.index - dst.index
# abstraction.. len_diff = abs(len(src.array) - len(dst.array))
async def resync( return not (
tracker: TaskTracker, # the source is likely backfilling and we must
# sync history calculations
len_diff > 2 or
) -> tuple[TaskTracker, int]: # we aren't step synced to the source and may be
# TODO: adopt an incremental update engine/approach # leading/lagging by a step
# where possible here eventually! step_diff > 1 or
log.warning(f're-syncing fsp {func_name} to source') step_diff < 0
tracker.cs.cancel() ), step_diff, len_diff
await tracker.complete.wait()
tracker, index = await n.start(fsp_target)
# always trigger UI refresh after history update, async def poll_and_sync_to_step(
# see ``piker.ui._fsp.FspAdmin.open_chain()`` and
# ``piker.ui._display.trigger_update()``.
await client_stream.send('update')
return tracker, index
def is_synced( tracker: TaskTracker,
src: ShmArray, src: ShmArray,
dst: ShmArray dst: ShmArray,
) -> tuple[bool, int, int]:
'''Predicate to dertmine if a destination FSP
output array is aligned to its source array.
''' ) -> tuple[TaskTracker, int]:
step_diff = src.index - dst.index
len_diff = abs(len(src.array) - len(dst.array))
return not (
# the source is likely backfilling and we must
# sync history calculations
len_diff > 2 or
# we aren't step synced to the source and may be
# leading/lagging by a step
step_diff > 1 or
step_diff < 0
), step_diff, len_diff
async def poll_and_sync_to_step(
tracker: TaskTracker,
src: ShmArray,
dst: ShmArray,
) -> tuple[TaskTracker, int]:
synced, step_diff, _ = is_synced(src, dst)
while not synced:
tracker, index = await resync(tracker)
synced, step_diff, _ = is_synced(src, dst) synced, step_diff, _ = is_synced(src, dst)
while not synced:
tracker, index = await resync(tracker)
synced, step_diff, _ = is_synced(src, dst)
return tracker, step_diff return tracker, step_diff
s, step, ld = is_synced(src, dst) s, step, ld = is_synced(src, dst)
# detect sample period step for subscription to increment # detect sample period step for subscription to increment
# signal # signal
times = src.array['time'] times = src.array['time']
delay_s = times[-1] - times[times != times[-1]][-1] delay_s = times[-1] - times[times != times[-1]][-1]
# Increment the underlying shared memory buffer on every # Increment the underlying shared memory buffer on every
# "increment" msg received from the underlying data feed. # "increment" msg received from the underlying data feed.
async with feed.index_stream( async with feed.index_stream(int(delay_s)) as istream:
int(delay_s)
) as istream:
profiler(f'{func_name}: sample stream up') profiler(f'{func_name}: sample stream up')
profiler.finish() profiler.finish()
async for _ in istream: async for _ in istream:
# respawn the compute task if the source # respawn the compute task if the source
# array has been updated such that we compute # array has been updated such that we compute
# new history from the (prepended) source. # new history from the (prepended) source.
synced, step_diff, _ = is_synced(src, dst) synced, step_diff, _ = is_synced(src, dst)
if not synced: if not synced:
tracker, step_diff = await poll_and_sync_to_step( tracker, step_diff = await poll_and_sync_to_step(
tracker, tracker,
src, src,
dst, dst,
) )
# skip adding a last bar since we should already # skip adding a last bar since we should already
# be step alinged # be step alinged
if step_diff == 0: if step_diff == 0:
continue continue
# read out last shm row, copy and write new row # read out last shm row, copy and write new row
array = dst.array array = dst.array
# some metrics like vlm should be reset # some metrics like vlm should be reset
# to zero every step. # to zero every step.
if zero_on_step: if zero_on_step:
last = zeroed last = zeroed
else: else:
last = array[-1:].copy() last = array[-1:].copy()
dst.push(last) dst.push(last)

View File

@ -838,12 +838,8 @@ class ChartPlotWidget(pg.PlotWidget):
''' '''
l, r = self.view_range() l, r = self.view_range()
array = self._arrays[self.name] array = self._arrays[self.name]
start, stop = self._xrange = ( lbar = max(l, array[0]['index'])
array[0]['index'], rbar = min(r, array[-1]['index'])
array[-1]['index'],
)
lbar = max(l, start)
rbar = min(r, stop)
return l, lbar, rbar, r return l, lbar, rbar, r
def curve_width_pxs( def curve_width_pxs(
@ -911,7 +907,7 @@ class ChartPlotWidget(pg.PlotWidget):
return return
xfirst, xlast = index[0], index[-1] xfirst, xlast = index[0], index[-1]
l, lbar, rbar, r = self.bars_range() brange = l, lbar, rbar, r = self.bars_range()
marker_pos, l1_len = self.pre_l1_xs() marker_pos, l1_len = self.pre_l1_xs()
end = xlast + l1_len + 1 end = xlast + l1_len + 1
@ -990,8 +986,7 @@ class ChartPlotWidget(pg.PlotWidget):
graphics = BarItems( graphics = BarItems(
self.linked, self.linked,
self.plotItem, self.plotItem,
pen_color=self.pen_color, pen_color=self.pen_color
name=name,
) )
# adds all bar/candle graphics objects for each data point in # adds all bar/candle graphics objects for each data point in
@ -1180,34 +1175,11 @@ class ChartPlotWidget(pg.PlotWidget):
) )
return last return last
# def update_ohlc_from_array( def update_ohlc_from_array(
# self,
# graphics_name: str,
# array: np.ndarray,
# **kwargs,
# ) -> pg.GraphicsObject:
# '''
# Update the named internal graphics from ``array``.
# '''
# self._index = array['index'][0]
# self._arrays[self.name] = array
# graphics = self._graphics[graphics_name]
# graphics.update_from_array(array, **kwargs)
# return graphics
# def update_curve_from_array(
def update_graphics_from_array(
self, self,
graphics_name: str, graphics_name: str,
array: np.ndarray,
array: Optional[np.ndarray] = None,
array_key: Optional[str] = None,
**kwargs, **kwargs,
) -> pg.GraphicsObject: ) -> pg.GraphicsObject:
@ -1215,65 +1187,50 @@ class ChartPlotWidget(pg.PlotWidget):
Update the named internal graphics from ``array``. Update the named internal graphics from ``array``.
''' '''
if array is not None: self._arrays[self.name] = array
assert len(array)
data_key = array_key or graphics_name
if graphics_name not in self._flows:
data_key = self.name
if array is not None:
# write array to internal graphics table
self._arrays[data_key] = array
else:
array = self._arrays[data_key]
# array key and graphics "name" might be different..
graphics = self._graphics[graphics_name] graphics = self._graphics[graphics_name]
graphics.update_from_array(array, **kwargs)
# compute "in-view" indices
l, lbar, rbar, r = self.bars_range()
indexes = array['index']
ifirst = indexes[0]
ilast = indexes[-1]
lbar_i = max(l, ifirst) - ifirst
rbar_i = min(r, ilast) - ifirst
in_view = array[lbar_i: rbar_i]
if not in_view.size:
return graphics
# TODO: we could do it this way as well no?
# to_draw = array[lbar - ifirst:(rbar - ifirst) + 1]
# start_index = self._index
# lbar = max(l, start_index) - start_index
# rbar = min(r, ohlc[-1]['index']) - start_index
if isinstance(graphics, BarItems):
graphics.update_from_array(
array,
in_view,
view_range=(lbar_i, rbar_i),
**kwargs,
)
else:
graphics.update_from_array(
x=array['index'],
y=array[data_key],
x_iv=in_view['index'],
y_iv=in_view[data_key],
view_range=(lbar_i, rbar_i),
**kwargs
)
return graphics return graphics
def update_curve_from_array(
self,
graphics_name: str,
array: np.ndarray,
array_key: Optional[str] = None,
**kwargs,
) -> pg.GraphicsObject:
'''
Update the named internal graphics from ``array``.
'''
assert len(array)
data_key = array_key or graphics_name
if graphics_name not in self._flows:
self._arrays[self.name] = array
else:
self._arrays[data_key] = array
curve = self._graphics[graphics_name]
# NOTE: back when we weren't implementing the curve graphics
# ourselves you'd have updates using this method:
# curve.setData(y=array[graphics_name], x=array['index'], **kwargs)
# NOTE: graphics **must** implement a diff based update
# operation where an internal ``FastUpdateCurve._xrange`` is
# used to determine if the underlying path needs to be
# pre/ap-pended.
curve.update_from_array(
x=array['index'],
y=array[data_key],
**kwargs
)
return curve
# def _label_h(self, yhigh: float, ylow: float) -> float: # def _label_h(self, yhigh: float, ylow: float) -> float:
# # compute contents label "height" in view terms # # compute contents label "height" in view terms
# # to avoid having data "contents" overlap with them # # to avoid having data "contents" overlap with them
@ -1303,9 +1260,6 @@ class ChartPlotWidget(pg.PlotWidget):
# print(f"bounds (ylow, yhigh): {(ylow, yhigh)}") # print(f"bounds (ylow, yhigh): {(ylow, yhigh)}")
# TODO: pretty sure we can just call the cursor
# directly not? i don't wee why we need special "signal proxies"
# for this lul..
def enterEvent(self, ev): # noqa def enterEvent(self, ev): # noqa
# pg.PlotWidget.enterEvent(self, ev) # pg.PlotWidget.enterEvent(self, ev)
self.sig_mouse_enter.emit(self) self.sig_mouse_enter.emit(self)

View File

@ -106,6 +106,53 @@ def trace_hl(
return out return out
def downsample(
x: np.ndarray,
y: np.ndarray,
bins: int = 2,
method: str = 'peak',
**kwargs,
) -> tuple[np.ndarray, np.ndarray]:
'''
Downsample x/y data for lesser curve graphics gen.
The "peak" method is originally copied verbatim from
``pyqtgraph.PlotDataItem.getDisplayDataset()`` which gets
all credit, though we will likely drop this in favor of the M4
algo below.
'''
# py3.10 syntax
match method:
case 'peak':
if bins < 2:
log.warning('No downsampling taking place?')
ds = bins
n = len(x) // ds
x1 = np.empty((n, 2))
# start of x-values; try to select a somewhat centered point
stx = ds // 2
x1[:] = x[stx:stx+n*ds:ds, np.newaxis]
x = x1.reshape(n*2)
y1 = np.empty((n, 2))
y2 = y[:n*ds].reshape((n, ds))
y1[:, 0] = y2.max(axis=1)
y1[:, 1] = y2.min(axis=1)
y = y1.reshape(n*2)
return x, y
case 'm4':
return ds_m4(x, y, kwargs['px_width'])
def ohlc_flatten( def ohlc_flatten(
ohlc: np.ndarray, ohlc: np.ndarray,
use_mxmn: bool = False, use_mxmn: bool = False,

View File

@ -144,8 +144,6 @@ class FastAppendCurve(pg.GraphicsObject):
self.use_fpath = use_fpath self.use_fpath = use_fpath
self.fast_path: Optional[QtGui.QPainterPath] = None self.fast_path: Optional[QtGui.QPainterPath] = None
self._ds_cache: dict = {}
# TODO: we can probably just dispense with the parent since # TODO: we can probably just dispense with the parent since
# we're basically only using the pen setting now... # we're basically only using the pen setting now...
super().__init__(*args, **kwargs) super().__init__(*args, **kwargs)
@ -216,61 +214,52 @@ class FastAppendCurve(pg.GraphicsObject):
vr = self.viewRect() vr = self.viewRect()
l, r = int(vr.left()), int(vr.right()) l, r = int(vr.left()), int(vr.right())
if not self._xrange:
return 0
start, stop = self._xrange start, stop = self._xrange
lbar = max(l, start) lbar = max(l, start)
rbar = min(r, stop) rbar = min(r, stop)
return round(vb.mapViewToDevice( return vb.mapViewToDevice(
QLineF(lbar, 0, rbar, 0) QLineF(lbar, 0, rbar, 0)
).length()) ).length()
# def should_ds_or_redraw( def should_ds_or_redraw(
# self, self,
# ) -> tuple[bool, bool]: ) -> tuple[bool, bool]:
# uppx = self.x_uppx() uppx = self.x_uppx()
# px_width = self.px_width() px_width = self.px_width()
# if not px_width: # uppx_diff = abs(uppx - self._last_uppx)
# return False, False uppx_diff = (uppx - self._last_uppx)
self._last_uppx = uppx
# # uppx_diff = abs(uppx - self._last_uppx) should_redraw: bool = False
# uppx_diff = (uppx - self._last_uppx) should_ds: bool = False
# self._last_uppx = uppx
# should_redraw: bool = False # print(uppx_diff)
# should_ds: bool = self._in_ds
# # print(uppx_diff) if (
uppx <= 8
):
# trigger redraw or original non-downsampled data
if self._in_ds:
print('REVERTING BACK TO SRC DATA')
# clear downsampled curve(s) and expect
# refresh of path segments.
should_redraw = True
# if ( elif (
# uppx <= 8 uppx_diff >= 4
# ): or uppx_diff <= -2
# # trigger redraw or original non-downsampled data or self._step_mode and abs(uppx_diff) >= 1
# if self._in_ds: ):
# print('REVERTING BACK TO SRC DATA') log.info(
# # clear downsampled curve(s) and expect f'{self._name} downsampler change: {self._last_uppx} -> {uppx}'
# # refresh of path segments. )
# should_redraw = True should_ds = {'px_width': px_width, 'uppx': uppx}
should_redraw = True
# elif ( return should_ds, should_redraw
# uppx_diff >= 1
# or uppx_diff <= -1
# or self._step_mode and abs(uppx_diff) >= 1
# ):
# log.info(
# f'{self._name} downsampler change: {self._last_uppx} -> {uppx}'
# )
# should_ds = {'px_width': px_width, 'uppx': uppx}
# should_redraw = True
# if should_ds:
# should_ds = {'px_width': px_width, 'uppx': uppx}
# return should_ds, should_redraw
def downsample( def downsample(
self, self,
@ -297,7 +286,7 @@ class FastAppendCurve(pg.GraphicsObject):
y = y.flatten() y = y.flatten()
# presumably? # presumably?
# self._in_ds = True self._in_ds = True
return x, y return x, y
def maybe_downsample( def maybe_downsample(
@ -314,17 +303,9 @@ class FastAppendCurve(pg.GraphicsObject):
def update_from_array( def update_from_array(
self, self,
# full array input history
x: np.ndarray, x: np.ndarray,
y: np.ndarray, y: np.ndarray,
# pre-sliced array data that's "in view"
x_iv: np.ndarray,
y_iv: np.ndarray,
view_range: Optional[tuple[int, int]] = None,
) -> QtGui.QPainterPath: ) -> QtGui.QPainterPath:
''' '''
Update curve from input 2-d data. Update curve from input 2-d data.
@ -334,197 +315,87 @@ class FastAppendCurve(pg.GraphicsObject):
''' '''
profiler = pg.debug.Profiler( profiler = pg.debug.Profiler(
msg=f'FastAppendCurve.update_from_array(): `{self._name}`', msg=f'{self._name}.update_from_array()',
disabled=not pg_profile_enabled(), disabled=not pg_profile_enabled(),
gt=ms_slower_then, gt=ms_slower_then,
) )
flip_cache = False flip_cache = False
draw_full_path = True
# XXX: lol brutal, the internals of `CurvePoint` (inherited by
# our `LineDot`) required ``.getData()`` to work..
self.xData = x
self.yData = y
# update internal array refs
self._x, self._y = x, y
# compute the length diffs between the first/last index entry in
# the input data and the last indexes we have on record from the
# last time we updated the curve index.
if self._xrange: if self._xrange:
istart, istop = self._xrange istart, istop = self._xrange
else: else:
self._xrange = istart, istop = x[0], x[-1] self._xrange = istart, istop = x[0], x[-1]
# print(f"xrange: {self._xrange}")
should_ds, should_redraw = self.should_ds_or_redraw()
# compute the length diffs between the first/last index entry in
# the input data and the last indexes we have on record from the
# last time we updated the curve index.
prepend_length = int(istart - x[0]) prepend_length = int(istart - x[0])
append_length = int(x[-1] - istop) append_length = int(x[-1] - istop)
# print(f"xrange: {self._xrange}")
if view_range:
li, ri = view_range
# x, y = x[lbar:rbar], y[lbar:rbar]
# x, y = x_iv, y_iv
profiler(f'view range slice {view_range}')
# if self._name == 'OHLC':
# print(f'view range slice {view_range}')
# ds state checking
uppx = self.x_uppx()
px_width = self.px_width()
uppx_diff = (uppx - self._last_uppx)
# step mode: draw flat top discrete "step"
# over the index space for each datum.
if self._step_mode:
# TODO: numba this bish
# x_out, y_out = step_path_arrays_from_1d(
# x[:-1], y[:-1]
# )
x_iv_out, y_iv_out = step_path_arrays_from_1d(
x_iv[:-1], y_iv[:-1]
)
profiler('generated step arrays')
else:
# by default we only pull data up to the last (current) index
# x_out, y_out = x[:-1], y[:-1]
x_iv_out, y_iv_out = x_iv[:-1], y_iv[:-1]
profiler('sliced array history')
# by default plan to draw the source ouput that's "in view"
x_to_path, y_to_path = x_iv_out, y_iv_out
ds_key = px_width, uppx
# always re-ds if we were dsed but the input range changes.
if self._in_ds:
# slice out the portion of the downsampled data that is
# "in view" and **only** draw a path for that.
entry = self._ds_cache.get(ds_key)
if entry:
x_ds_out, y_ds_out, first_i, last_i = entry
# if last_i == x[-1]:
log.info(
f'{self._name} has cached ds {ds_key} -> {entry}'
)
prepend_length = int(first_i - ri)
append_length = int(ri - last_i)
# x_to_path = x_ds_out
# y_to_path = y_ds_out
# else:
# log.warn(f'{self._name} ds updates unhandled!')
# DS only the new part?
# check for downsampling conditions
if (
# std m4 downsample conditions
uppx_diff >= 4
or uppx_diff <= -2
or self._step_mode and abs(uppx_diff) >= 2
# or self._in_ds and px_width > 1
):
# if not uppx_diff >= 1:
log.info(
f'{self._name} sampler change: {self._last_uppx} -> {uppx}'
)
self._last_uppx = uppx
# should_ds = {'px_width': px_width, 'uppx': uppx}
# if self._step_mode:
# # TODO: numba this bish
# x_out, y_out = step_path_arrays_from_1d(
# x_iv[:-1], y_iv[:-1]
# )
# else:
# # by default we only pull data up to the last (current) index
# x_out, y_out = x_iv[:-1], y_iv[:-1]
x_ds_out, y_ds_out = self.downsample(
x_iv_out,
y_iv_out,
px_width=px_width,
uppx=uppx,
)
profiler(
f'path downsample ds_key={ds_key}\n'
f'{x_iv_out.size}, {y_iv_out.size}'
)
# cache downsampled outputs
self._ds_cache[ds_key] = (
x_ds_out,
y_ds_out,
x[0],
x[-1],
)
x_to_path = x_ds_out
y_to_path = y_ds_out
self._in_ds = True
elif (
uppx <= 8
and self._in_ds
):
# we should de-downsample back to our original
# source data so we clear our path data in prep
# to generate a new one from original source data.
if self.path:
self.path.clear()
if self.fast_path:
self.fast_path.clear()
log.info(f'DEDOWN -> {self._name}')
profiler('path reversion to non-ds data')
self._in_ds = False
# render path graphics
# log.info(
# # f'{self._name}: last sizes {x_to_path.size}, {y_to_path.size}',
# f'{self._name}: sizes {x_to_path.size}, {y_to_path.size}',
# )
self._last_topaths = x_to_path, y_to_path
no_path_yet = self.path is None no_path_yet = self.path is None
if draw_full_path: if (
should_redraw or should_ds
or self.path is None
or prepend_length > 0
):
# step mode: draw flat top discrete "step"
# over the index space for each datum.
if self._step_mode:
x_out, y_out = step_path_arrays_from_1d(
x[:-1], y[:-1]
)
profiler('generated step arrays')
else:
# by default we only pull data up to the last (current) index
x_out, y_out = x[:-1], y[:-1]
if should_ds:
x_out, y_out = self.downsample(
x_out,
y_out,
**should_ds,
)
profiler(f'path downsample redraw={should_ds}')
self._in_ds = True
if should_redraw:
profiler('path reversion to non-ds')
if self.path:
self.path.clear()
if self.fast_path:
self.fast_path.clear()
if should_redraw and not should_ds:
log.info(f'DEDOWN -> {self._name}')
self._in_ds = False
# else:
self.path = pg.functions.arrayToQPath( self.path = pg.functions.arrayToQPath(
x_to_path, x_out,
y_to_path, y_out,
connect='all', connect='all',
finiteCheck=False, finiteCheck=False,
path=self.path, path=self.path,
) )
profiler('generated FULL PATH -> {self._name}') # reserve mem allocs see:
# - https://doc.qt.io/qt-5/qpainterpath.html#reserve
# - https://doc.qt.io/qt-5/qpainterpath.html#capacity
# - https://doc.qt.io/qt-5/qpainterpath.html#clear
# XXX: right now this is based on had hoc checks on a
# hidpi 3840x2160 4k monitor but we should optimize for
# the target display(s) on the sys.
if no_path_yet:
self.path.reserve(int(500e3))
# reserve mem allocs see: profiler('generated fresh path')
# - https://doc.qt.io/qt-5/qpainterpath.html#reserve
# - https://doc.qt.io/qt-5/qpainterpath.html#capacity
# - https://doc.qt.io/qt-5/qpainterpath.html#clear
# XXX: right now this is based on had hoc checks on a
# hidpi 3840x2160 4k monitor but we should optimize for
# the target display(s) on the sys.
if no_path_yet:
self.path.reserve(int(500e3))
self._last_vr = view_range # if self._step_mode:
# self.path.closeSubpath()
# TODO: get this piecewise prepend working - right now it's # TODO: get this piecewise prepend working - right now it's
# giving heck on vwap... # giving heck on vwap...
@ -543,65 +414,65 @@ class FastAppendCurve(pg.GraphicsObject):
# # self.path.moveTo(new_x[0], new_y[0]) # # self.path.moveTo(new_x[0], new_y[0])
# self.path.connectPath(old_path) # self.path.connectPath(old_path)
# elif ( elif (
# append_length > 0 append_length > 0
# ): ):
# if self._step_mode: if self._step_mode:
# new_x, new_y = step_path_arrays_from_1d( new_x, new_y = step_path_arrays_from_1d(
# x[-append_length - 2:-1], x[-append_length - 2:-1],
# y[-append_length - 2:-1], y[-append_length - 2:-1],
# ) )
# # [1:] since we don't need the vertical line normally at # [1:] since we don't need the vertical line normally at
# # the beginning of the step curve taking the first (x, # the beginning of the step curve taking the first (x,
# # y) poing down to the x-axis **because** this is an # y) poing down to the x-axis **because** this is an
# # appended path graphic. # appended path graphic.
# new_x = new_x[1:] new_x = new_x[1:]
# new_y = new_y[1:] new_y = new_y[1:]
# else: else:
# # print(f"append_length: {append_length}") # print(f"append_length: {append_length}")
# new_x = x[-append_length - 2:-1] new_x = x[-append_length - 2:-1]
# new_y = y[-append_length - 2:-1] new_y = y[-append_length - 2:-1]
# # print((new_x, new_y)) # print((new_x, new_y))
# profiler('diffed append arrays') profiler('diffed append arrays')
# if should_ds: if should_ds:
# new_x, new_y = self.downsample( new_x, new_y = self.downsample(
# new_x, new_x,
# new_y, new_y,
# **should_ds, **should_ds,
# ) )
# profiler(f'fast path downsample redraw={should_ds}') profiler(f'fast path downsample redraw={should_ds}')
# append_path = pg.functions.arrayToQPath( append_path = pg.functions.arrayToQPath(
# new_x, new_x,
# new_y, new_y,
# connect='all', connect='all',
# finiteCheck=False, finiteCheck=False,
# path=self.fast_path, path=self.fast_path,
# ) )
# if self.use_fpath: if self.use_fpath:
# # an attempt at trying to make append-updates faster.. # an attempt at trying to make append-updates faster..
# if self.fast_path is None: if self.fast_path is None:
# self.fast_path = append_path self.fast_path = append_path
# self.fast_path.reserve(int(6e3)) self.fast_path.reserve(int(6e3))
# else: else:
# self.fast_path.connectPath(append_path) self.fast_path.connectPath(append_path)
# size = self.fast_path.capacity() size = self.fast_path.capacity()
# profiler(f'connected fast path w size: {size}') profiler(f'connected fast path w size: {size}')
# # print(f"append_path br: {append_path.boundingRect()}") # print(f"append_path br: {append_path.boundingRect()}")
# # self.path.moveTo(new_x[0], new_y[0]) # self.path.moveTo(new_x[0], new_y[0])
# # path.connectPath(append_path) # path.connectPath(append_path)
# # XXX: lol this causes a hang.. # XXX: lol this causes a hang..
# # self.path = self.path.simplified() # self.path = self.path.simplified()
# else: else:
# size = self.path.capacity() size = self.path.capacity()
# profiler(f'connected history path w size: {size}') profiler(f'connected history path w size: {size}')
# self.path.connectPath(append_path) self.path.connectPath(append_path)
# other merging ideas: # other merging ideas:
# https://stackoverflow.com/questions/8936225/how-to-merge-qpainterpaths # https://stackoverflow.com/questions/8936225/how-to-merge-qpainterpaths
@ -616,8 +487,8 @@ class FastAppendCurve(pg.GraphicsObject):
# # XXX: super slow set "union" op # # XXX: super slow set "union" op
# self.path = self.path.united(append_path).simplified() # self.path = self.path.united(append_path).simplified()
# self.disable_cache() # self.disable_cache()
# flip_cache = True # flip_cache = True
# XXX: do we need this any more? # XXX: do we need this any more?
# if ( # if (
@ -626,7 +497,12 @@ class FastAppendCurve(pg.GraphicsObject):
# self.disable_cache() # self.disable_cache()
# flip_cache = True # flip_cache = True
x_last = x[-1] # XXX: lol brutal, the internals of `CurvePoint` (inherited by
# our `LineDot`) required ``.getData()`` to work..
self.xData = x
self.yData = y
x0, x_last = self._xrange = x[0], x[-1]
y_last = y[-1] y_last = y[-1]
# draw the "current" step graphic segment so it lines up with # draw the "current" step graphic segment so it lines up with
@ -664,6 +540,8 @@ class FastAppendCurve(pg.GraphicsObject):
# XXX: seems to be needed to avoid artifacts (see above). # XXX: seems to be needed to avoid artifacts (see above).
self.setCacheMode(QGraphicsItem.DeviceCoordinateCache) self.setCacheMode(QGraphicsItem.DeviceCoordinateCache)
self._x, self._y = x, y
# XXX: lol brutal, the internals of `CurvePoint` (inherited by # XXX: lol brutal, the internals of `CurvePoint` (inherited by
# our `LineDot`) required ``.getData()`` to work.. # our `LineDot`) required ``.getData()`` to work..
def getData(self): def getData(self):
@ -758,7 +636,6 @@ class FastAppendCurve(pg.GraphicsObject):
profiler = pg.debug.Profiler( profiler = pg.debug.Profiler(
msg=f'FastAppendCurve.paint(): `{self._name}`', msg=f'FastAppendCurve.paint(): `{self._name}`',
disabled=not pg_profile_enabled(), disabled=not pg_profile_enabled(),
# disabled=True,
gt=ms_slower_then, gt=ms_slower_then,
) )

View File

@ -53,10 +53,6 @@ from ._forms import (
mk_order_pane_layout, mk_order_pane_layout,
) )
from .order_mode import open_order_mode from .order_mode import open_order_mode
# from .._profile import (
# pg_profile_enabled,
# ms_slower_then,
# )
from ..log import get_logger from ..log import get_logger
log = get_logger(__name__) log = get_logger(__name__)
@ -288,12 +284,6 @@ async def graphics_update_loop(
chart.pause_all_feeds() chart.pause_all_feeds()
continue continue
ic = chart.view._ic
if ic:
chart.pause_all_feeds()
await ic.wait()
chart.resume_all_feeds()
# sync call to update all graphics/UX components. # sync call to update all graphics/UX components.
graphics_update_cycle(ds) graphics_update_cycle(ds)
@ -309,9 +299,8 @@ def graphics_update_cycle(
profiler = pg.debug.Profiler( profiler = pg.debug.Profiler(
disabled=True, # not pg_profile_enabled(), disabled=True, # not pg_profile_enabled(),
gt=1/12 * 1e3, delayed=False,
) )
# unpack multi-referenced components # unpack multi-referenced components
chart = ds.chart chart = ds.chart
vlm_chart = ds.vlm_chart vlm_chart = ds.vlm_chart
@ -420,11 +409,9 @@ def graphics_update_cycle(
if ( if (
# if zoomed out alot don't update the last "bar" # if zoomed out alot don't update the last "bar"
(xpx < update_uppx or i_diff > 0) (xpx < update_uppx or i_diff > 0)
and r >= i_step # and r >= i_step
): ):
# TODO: make it so this doesn't have to be called vlm_chart.update_curve_from_array('volume', array)
# once the $vlm is up?
vlm_chart.update_graphics_from_array('volume', array)
if ( if (
mx_vlm_in_view != vars['last_mx_vlm'] mx_vlm_in_view != vars['last_mx_vlm']
@ -498,7 +485,7 @@ def graphics_update_cycle(
xpx < update_uppx xpx < update_uppx
or i_diff > 0 or i_diff > 0
): ):
chart.update_graphics_from_array( chart.update_ohlc_from_array(
chart.name, chart.name,
array, array,
) )
@ -537,7 +524,7 @@ def graphics_update_cycle(
if wap_in_history: if wap_in_history:
# update vwap overlay line # update vwap overlay line
chart.update_graphics_from_array( chart.update_curve_from_array(
'bar_wap', 'bar_wap',
array, array,
) )

View File

@ -89,7 +89,7 @@ def update_fsp_chart(
# update graphics # update graphics
# NOTE: this does a length check internally which allows it # NOTE: this does a length check internally which allows it
# staying above the last row check below.. # staying above the last row check below..
chart.update_graphics_from_array( chart.update_curve_from_array(
graphics_name, graphics_name,
array, array,
array_key=array_key or graphics_name, array_key=array_key or graphics_name,
@ -425,7 +425,6 @@ class FspAdmin:
) as (ctx, last_index), ) as (ctx, last_index),
ctx.open_stream() as stream, ctx.open_stream() as stream,
): ):
# register output data # register output data
self._registry[ self._registry[
(fqsn, ns_path) (fqsn, ns_path)
@ -441,7 +440,6 @@ class FspAdmin:
async with stream.subscribe() as stream: async with stream.subscribe() as stream:
async for msg in stream: async for msg in stream:
if msg == 'update': if msg == 'update':
log.info(f'Re-syncing graphics for fsp: {ns_path}')
self.linked.graphics_cycle() self.linked.graphics_cycle()
else: else:
log.info(f'recved unexpected fsp engine msg: {msg}') log.info(f'recved unexpected fsp engine msg: {msg}')
@ -676,7 +674,7 @@ async def open_vlm_displays(
last_val_sticky.update_from_data(-1, value) last_val_sticky.update_from_data(-1, value)
vlm_curve = chart.update_graphics_from_array( vlm_curve = chart.update_curve_from_array(
'volume', 'volume',
shm.array, shm.array,
) )

View File

@ -20,7 +20,6 @@ Chart view box primitives
""" """
from __future__ import annotations from __future__ import annotations
from contextlib import asynccontextmanager from contextlib import asynccontextmanager
# import itertools
import time import time
from typing import Optional, Callable from typing import Optional, Callable
@ -37,8 +36,7 @@ from ..log import get_logger
from ._style import _min_points_to_show from ._style import _min_points_to_show
from ._editors import SelectRect from ._editors import SelectRect
from . import _event from . import _event
from .._profile import pg_profile_enabled, ms_slower_then from ._ohlc import BarItems
# from ._ohlc import BarItems
log = get_logger(__name__) log = get_logger(__name__)
@ -321,7 +319,6 @@ async def handle_viewmode_mouse(
): ):
# when in order mode, submit execution # when in order mode, submit execution
# msg.event.accept() # msg.event.accept()
# breakpoint()
view.order_mode.submit_order() view.order_mode.submit_order()
@ -387,29 +384,6 @@ class ChartView(ViewBox):
self.order_mode: bool = False self.order_mode: bool = False
self.setFocusPolicy(QtCore.Qt.StrongFocus) self.setFocusPolicy(QtCore.Qt.StrongFocus)
self._ic = None
def start_ic(
self,
) -> None:
if self._ic is None:
self.chart.pause_all_feeds()
self._ic = trio.Event()
def signal_ic(
self,
*args,
# ev = None,
) -> None:
if args:
print(f'range change dun: {args}')
else:
print('proxy called')
if self._ic:
self._ic.set()
self._ic = None
self.chart.resume_all_feeds()
@asynccontextmanager @asynccontextmanager
async def open_async_input_handler( async def open_async_input_handler(
@ -455,6 +429,11 @@ class ChartView(ViewBox):
def maxmin(self, callback: Callable) -> None: def maxmin(self, callback: Callable) -> None:
self._maxmin = callback self._maxmin = callback
def maybe_downsample_graphics(self):
for graphic in self._chart._graphics.values():
if isinstance(graphic, BarItems):
graphic.maybe_paint_line()
def wheelEvent( def wheelEvent(
self, self,
ev, ev,
@ -563,11 +542,6 @@ class ChartView(ViewBox):
self._resetTarget() self._resetTarget()
self.scaleBy(s, focal) self.scaleBy(s, focal)
self.sigRangeChangedManually.emit(mask) self.sigRangeChangedManually.emit(mask)
# self._ic.set()
# self._ic = None
# self.chart.resume_all_feeds()
ev.accept() ev.accept()
def mouseDragEvent( def mouseDragEvent(
@ -650,11 +624,6 @@ class ChartView(ViewBox):
# XXX: WHY # XXX: WHY
ev.accept() ev.accept()
self.start_ic()
# if self._ic is None:
# self.chart.pause_all_feeds()
# self._ic = trio.Event()
if axis == 1: if axis == 1:
self.chart._static_yrange = 'axis' self.chart._static_yrange = 'axis'
@ -672,13 +641,6 @@ class ChartView(ViewBox):
self.sigRangeChangedManually.emit(self.state['mouseEnabled']) self.sigRangeChangedManually.emit(self.state['mouseEnabled'])
if ev.isFinish():
print('DRAG FINISH')
self.signal_ic()
# self._ic.set()
# self._ic = None
# self.chart.resume_all_feeds()
# WEIRD "RIGHT-CLICK CENTER ZOOM" MODE # WEIRD "RIGHT-CLICK CENTER ZOOM" MODE
elif button & QtCore.Qt.RightButton: elif button & QtCore.Qt.RightButton:
@ -826,13 +788,11 @@ class ChartView(ViewBox):
# iterate those. # iterate those.
# - only register this when certain downsampleable graphics are # - only register this when certain downsampleable graphics are
# "added to scene". # "added to scene".
vb.sigXRangeChanged.connect(vb.maybe_downsample_graphics) vb.sigRangeChangedManually.connect(vb.maybe_downsample_graphics)
# mouse wheel doesn't emit XRangeChanged # mouse wheel doesn't emit XRangeChanged
vb.sigRangeChangedManually.connect(vb._set_yrange) vb.sigRangeChangedManually.connect(vb._set_yrange)
vb.sigResized.connect(vb._set_yrange) # splitter(s) resizing
# splitter(s) resizing
vb.sigResized.connect(vb._set_yrange)
def disable_auto_yrange( def disable_auto_yrange(
self, self,
@ -848,33 +808,10 @@ class ChartView(ViewBox):
''' '''
for graphic in self._chart._graphics.values(): for graphic in self._chart._graphics.values():
xvec = graphic.pixelVectors()[0] # if isinstance(graphic, BarItems):
if xvec: xpx = graphic.pixelVectors()[0].x()
xpx = xvec.x() if xpx:
if xpx: return xpx
return xpx
else: else:
continue continue
return 1.0 return 1.0
def maybe_downsample_graphics(self):
# TODO: a faster single-loop-iterator way of doing this XD
chart = self._chart
# graphics = list(self._chart._graphics.values())
profiler = pg.debug.Profiler(
msg=f'FastAppendCurve.update_from_array(): `{chart.name}`',
disabled=not pg_profile_enabled(),
gt=ms_slower_then,
)
for name, graphics in chart._graphics.items():
# pass in no array which will read and render from the last
# passed array (normally provided by the display loop.)
chart.update_graphics_from_array(name)
profiler(f'updating {name}')
# for graphic in graphics:
# ds_meth = getattr(graphic, 'maybe_downsample', None)
# if ds_meth:
# ds_meth()

View File

@ -157,40 +157,23 @@ def path_arrays_from_ohlc(
def gen_qpath( def gen_qpath(
data: np.ndarray, data,
start: int, # XXX: do we need this? start, # XXX: do we need this?
w: float, w,
path: Optional[QtGui.QPainterPath] = None,
) -> QtGui.QPainterPath: ) -> QtGui.QPainterPath:
path_was_none = path is None
profiler = pg.debug.Profiler( profiler = pg.debug.Profiler(
msg='gen_qpath ohlc', msg=f'gen_qpath ohlc',
disabled=not pg_profile_enabled(), disabled=not pg_profile_enabled(),
gt=ms_slower_then, gt=ms_slower_then,
) )
x, y, c = path_arrays_from_ohlc( x, y, c = path_arrays_from_ohlc(data, start, bar_gap=w)
data,
start,
bar_gap=w,
)
profiler("generate stream with numba") profiler("generate stream with numba")
# TODO: numba the internals of this! # TODO: numba the internals of this!
path = pg.functions.arrayToQPath( path = pg.functions.arrayToQPath(x, y, connect=c)
x,
y,
connect=c,
path=path,
)
# avoid mem allocs if possible
if path_was_none:
path.reserve(path.capacity())
profiler("generate path with arrayToQPath") profiler("generate path with arrayToQPath")
return path return path
@ -223,7 +206,6 @@ class BarItems(pg.GraphicsObject):
self._color = pen_color self._color = pen_color
self.bars_pen = pg.mkPen(hcolor(pen_color), width=1) self.bars_pen = pg.mkPen(hcolor(pen_color), width=1)
self.last_bar_pen = pg.mkPen(hcolor(last_bar_color), width=2) self.last_bar_pen = pg.mkPen(hcolor(last_bar_color), width=2)
self._name = name
self._ds_line_xy: Optional[ self._ds_line_xy: Optional[
tuple[np.ndarray, np.ndarray] tuple[np.ndarray, np.ndarray]
@ -244,7 +226,6 @@ class BarItems(pg.GraphicsObject):
self._xrange: tuple[int, int] self._xrange: tuple[int, int]
self._yrange: tuple[float, float] self._yrange: tuple[float, float]
self._vrange = None
# TODO: don't render the full backing array each time # TODO: don't render the full backing array each time
# self._path_data = None # self._path_data = None
@ -273,6 +254,7 @@ class BarItems(pg.GraphicsObject):
''' '''
hist, last = ohlc[:-1], ohlc[-1] hist, last = ohlc[:-1], ohlc[-1]
self.path = gen_qpath(hist, start, self.w) self.path = gen_qpath(hist, start, self.w)
# save graphics for later reference and keep track # save graphics for later reference and keep track
@ -288,101 +270,65 @@ class BarItems(pg.GraphicsObject):
# up to last to avoid double draw of last bar # up to last to avoid double draw of last bar
self._last_bar_lines = bar_from_ohlc_row(last, self.w) self._last_bar_lines = bar_from_ohlc_row(last, self.w)
x, y = self._ds_line_xy = ohlc_flatten(ohlc)
# self.update_ds_line(
# x,
# y,
# )
# TODO: figuring out the most optimial size for the ideal
# curve-path by,
# - calcing the display's max px width `.screen()`
# - drawing a curve and figuring out it's capacity:
# https://doc.qt.io/qt-5/qpainterpath.html#capacity
# - reserving that cap for each curve-mapped-to-shm with
# - leveraging clearing when needed to redraw the entire
# curve that does not release mem allocs:
# https://doc.qt.io/qt-5/qpainterpath.html#clear
curve = FastAppendCurve(
y=y,
x=x,
name='OHLC',
color=self._color,
)
curve.hide()
self._pi.addItem(curve)
self._ds_line = curve
self._ds_xrange = (index[0], index[-1])
# trigger render # trigger render
# https://doc.qt.io/qt-5/qgraphicsitem.html#update # https://doc.qt.io/qt-5/qgraphicsitem.html#update
self.update() self.update()
x, y = self._ds_line_xy = ohlc_flatten(ohlc)
self.update_ds_line(x, y)
self._ds_xrange = (index[0], index[-1])
return self.path return self.path
# def update_ds_line( def update_ds_line(
# self, self,
# x, x,
# y, y,
# ) -> FastAppendCurve: ) -> FastAppendCurve:
# # determine current potential downsampling value (based on pixel # determine current potential downsampling value (based on pixel
# # scaling) and return any existing curve for it. # scaling) and return any existing curve for it.
# curve = self._ds_line curve = self._ds_line
# if not curve: if not curve:
# # TODO: figuring out the most optimial size for the ideal # TODO: figuring out the most optimial size for the ideal
# # curve-path by, # curve-path by,
# # - calcing the display's max px width `.screen()` # - calcing the display's max px width `.screen()`
# # - drawing a curve and figuring out it's capacity: # - drawing a curve and figuring out it's capacity:
# # https://doc.qt.io/qt-5/qpainterpath.html#capacity # https://doc.qt.io/qt-5/qpainterpath.html#capacity
# # - reserving that cap for each curve-mapped-to-shm with # - reserving that cap for each curve-mapped-to-shm with
# # - leveraging clearing when needed to redraw the entire # - leveraging clearing when needed to redraw the entire
# # curve that does not release mem allocs: # curve that does not release mem allocs:
# # https://doc.qt.io/qt-5/qpainterpath.html#clear # https://doc.qt.io/qt-5/qpainterpath.html#clear
# curve = FastAppendCurve( curve = FastAppendCurve(
# y=y, y=y,
# x=x, x=x,
# name='OHLC', name='OHLC',
# color=self._color, color=self._color,
# ) )
# curve.hide() curve.hide()
# self._pi.addItem(curve) self._pi.addItem(curve)
# self._ds_line = curve self._ds_line = curve
return curve
# return curve # TODO: we should be diffing the amount of new data which
# needs to be downsampled. Ideally we actually are just
# doing all the ds-ing in sibling actors so that the data
# can just be read and rendered to graphics on events of our
# choice.
# diff = do_diff(ohlc, new_bit)
# # TODO: we should be diffing the amount of new data which curve.update_from_array(
# # needs to be downsampled. Ideally we actually are just y=y,
# # doing all the ds-ing in sibling actors so that the data x=x,
# # can just be read and rendered to graphics on events of our )
# # choice. return curve
# # diff = do_diff(ohlc, new_bit)
# curve.update_from_array(
# y=y,
# x=x,
# x_iv=x,
# y_iv=y,
# view_range=True, # hack
# )
# return curve
def update_from_array( def update_from_array(
self, self,
# full array input history
ohlc: np.ndarray, ohlc: np.ndarray,
just_history=False,
# pre-sliced array data that's "in view"
ohlc_iv: np.ndarray,
view_range: Optional[tuple[int, int]] = None,
) -> None: ) -> None:
''' '''
@ -403,19 +349,6 @@ class BarItems(pg.GraphicsObject):
gt=ms_slower_then, gt=ms_slower_then,
) )
# vr = self.viewRect()
# l, r = int(vr.left()), int(vr.right())
# # l, r = self.view_range()
# # array = self._arrays[self.name]
# indexes = ohlc['index']
# start_index = indexes[0]
# end_index = indexes[-1]
# lbar = max(l, start_index) - start_index
# rbar = min(r, end_index) - start_index
# in_view = ohlc[lbar:rbar]
# self._vrange = lbar, rbar
# index = self.start_index # index = self.start_index
istart, istop = self._xrange istart, istop = self._xrange
ds_istart, ds_istop = self._ds_xrange ds_istart, ds_istop = self._ds_xrange
@ -427,149 +360,11 @@ class BarItems(pg.GraphicsObject):
prepend_length = istart - first_index prepend_length = istart - first_index
append_length = last_index - istop append_length = last_index - istop
# ds_prepend_length = ds_istart - first_index ds_prepend_length = ds_istart - first_index
# ds_append_length = last_index - ds_istop ds_append_length = last_index - ds_istop
flip_cache = False flip_cache = False
x_gt = 6
if self._ds_line:
uppx = self._ds_line.x_uppx()
else:
uppx = 0
should_line = self._in_ds
if (
self._in_ds
and uppx < x_gt
):
should_line = False
elif (
not self._in_ds
and uppx >= x_gt
):
should_line = True
# should_ds, should_redraw = self.should_ds_or_redraw()
# print(
# f'OHLC in line: {self._in_ds}'
# f'OHLC should line: {should_line}\n'
# # f'OHLC should_redraw: {should_redraw}\n'
# )
if (
should_line
):
# update the line graphic
# x, y = self._ds_line_xy = ohlc_flatten(ohlc_iv)
# x, y = self._ds_line_xy = ohlc_flatten(ohlc)
x_iv, y_iv = self._ds_line_xy = ohlc_flatten(ohlc_iv)
profiler('flattening bars to line')
curve = self._ds_line
# curve = self.update_ds_line(x, y)
# TODO: we should be diffing the amount of new data which
# needs to be downsampled. Ideally we actually are just
# doing all the ds-ing in sibling actors so that the data
# can just be read and rendered to graphics on events of our
# choice.
# diff = do_diff(ohlc, new_bit)
curve.update_from_array(
y=x_iv,
x=y_iv,
x_iv=x_iv,
y_iv=y_iv,
view_range=view_range, # hack
)
# we already are showing a line and should be
# self._in_ds
# check if the ds line should be resampled/drawn
# should_ds_line, should_redraw_line = self._ds_line.should_ds_or_redraw()
# print(f'OHLC DS should ds: {should_ds_line}, should_redraw: {should_redraw_line}')
# if (
# # line should be redrawn/sampled
# # should_ds_line or
# # we are flipping to line from bars mode
# not self._in_ds
# ):
# uppx = self._ds_line.x_uppx()
# self._xs_in_px = uppx
if not self._in_ds:
# hide bars and show line
self.hide()
# XXX: is this actually any faster?
# self._pi.removeItem(self)
# TODO: a `.ui()` log level?
log.info(
f'downsampling to line graphic {self._name}'
)
# self._pi.addItem(curve)
curve.show()
curve.update()
self._in_ds = True
# stop here since we don't need to update bars path any more
# as we delegate to the downsample line with updates.
return
elif (
not should_line
and self._in_ds
):
# flip back to bars graphics and hide the downsample line.
log.info(f'showing bars graphic {self._name}')
curve = self._ds_line
curve.hide()
# self._pi.removeItem(curve)
# XXX: is this actually any faster?
# self._pi.addItem(self)
self.show()
self._in_ds = False
# if not self._in_ds and should_ds
# self.hide()
# # XXX: is this actually any faster?
# # self._pi.removeItem(self)
# # this should have been done in the block above
# # x, y = self._ds_line_xy = ohlc_flatten(ohlc_iv)
# # curve = self.update_ds_line(x, y)
# # TODO: a `.ui()` log level?
# log.info(
# f'downsampling to line graphic {self._name}'
# )
# # self._pi.addItem(curve)
# curve.show()
# curve.update()
# self._in_ds = True
# return
# self._in_ds = False
# print('YO NOT DS OHLC')
# generate in_view path
self.path = gen_qpath(
ohlc_iv,
0,
self.w,
# path=self.path,
)
# TODO: to make the downsampling faster # TODO: to make the downsampling faster
# - allow mapping only a range of lines thus only drawing as # - allow mapping only a range of lines thus only drawing as
# many bars as exactly specified. # many bars as exactly specified.
@ -577,97 +372,87 @@ class BarItems(pg.GraphicsObject):
# - maybe move all this embedded logic to a higher # - maybe move all this embedded logic to a higher
# level type? # level type?
# ohlc = in_view fx, fy = self._ds_line_xy
# if prepend_length: if prepend_length:
# # new history was added and we need to render a new path # new history was added and we need to render a new path
# prepend_bars = ohlc[:prepend_length] prepend_bars = ohlc[:prepend_length]
# if ds_prepend_length: if ds_prepend_length:
# ds_prepend_bars = ohlc[:ds_prepend_length] ds_prepend_bars = ohlc[:ds_prepend_length]
# pre_x, pre_y = ohlc_flatten(ds_prepend_bars) pre_x, pre_y = ohlc_flatten(ds_prepend_bars)
# fx = np.concatenate((pre_x, fx)) fx = np.concatenate((pre_x, fx))
# fy = np.concatenate((pre_y, fy)) fy = np.concatenate((pre_y, fy))
# profiler('ds line prepend diff complete') profiler('ds line prepend diff complete')
# if append_length: if append_length:
# # generate new graphics to match provided array # generate new graphics to match provided array
# # path appending logic: # path appending logic:
# # we need to get the previous "current bar(s)" for the time step # we need to get the previous "current bar(s)" for the time step
# # and convert it to a sub-path to append to the historical set # and convert it to a sub-path to append to the historical set
# # new_bars = ohlc[istop - 1:istop + append_length - 1] # new_bars = ohlc[istop - 1:istop + append_length - 1]
# append_bars = ohlc[-append_length - 1:-1] append_bars = ohlc[-append_length - 1:-1]
# # print(f'ohlc bars to append size: {append_bars.size}\n') # print(f'ohlc bars to append size: {append_bars.size}\n')
# if ds_append_length: if ds_append_length:
# ds_append_bars = ohlc[-ds_append_length - 1:-1] ds_append_bars = ohlc[-ds_append_length - 1:-1]
# post_x, post_y = ohlc_flatten(ds_append_bars) post_x, post_y = ohlc_flatten(ds_append_bars)
# print( # print(f'ds curve to append sizes: {(post_x.size, post_y.size)}')
# f'ds curve to append sizes: {(post_x.size, post_y.size)}' fx = np.concatenate((fx, post_x))
# ) fy = np.concatenate((fy, post_y))
# fx = np.concatenate((fx, post_x))
# fy = np.concatenate((fy, post_y))
# profiler('ds line append diff complete') profiler('ds line append diff complete')
profiler('array diffs complete') profiler('array diffs complete')
# does this work? # does this work?
last = ohlc[-1] last = ohlc[-1]
# fy[-1] = last['close'] fy[-1] = last['close']
# # incremental update and cache line datums # incremental update and cache line datums
# self._ds_line_xy = fx, fy self._ds_line_xy = fx, fy
# maybe downsample to line # maybe downsample to line
# ds = self.maybe_downsample() ds = self.maybe_downsample()
# if ds: if ds:
# # if we downsample to a line don't bother with # if we downsample to a line don't bother with
# # any more path generation / updates # any more path generation / updates
# self._ds_xrange = first_index, last_index self._ds_xrange = first_index, last_index
# profiler('downsampled to line') profiler('downsampled to line')
# return return
# print(in_view.size)
# if self.path:
# self.path = path
# self.path.reserve(path.capacity())
# self.path.swap(path)
# path updates # path updates
# if prepend_length: if prepend_length:
# # XXX: SOMETHING IS MAYBE FISHY HERE what with the old_path # XXX: SOMETHING IS MAYBE FISHY HERE what with the old_path
# # y value not matching the first value from # y value not matching the first value from
# # ohlc[prepend_length + 1] ??? # ohlc[prepend_length + 1] ???
# prepend_path = gen_qpath(prepend_bars, 0, self.w) prepend_path = gen_qpath(prepend_bars, 0, self.w)
# old_path = self.path old_path = self.path
# self.path = prepend_path self.path = prepend_path
# self.path.addPath(old_path) self.path.addPath(old_path)
# profiler('path PREPEND') profiler('path PREPEND')
# if append_length: if append_length:
# append_path = gen_qpath(append_bars, 0, self.w) append_path = gen_qpath(append_bars, 0, self.w)
# self.path.moveTo( self.path.moveTo(
# float(istop - self.w), float(istop - self.w),
# float(append_bars[0]['open']) float(append_bars[0]['open'])
# ) )
# self.path.addPath(append_path) self.path.addPath(append_path)
# profiler('path APPEND') profiler('path APPEND')
# fp = self.fast_path # fp = self.fast_path
# if fp is None: # if fp is None:
# self.fast_path = append_path # self.fast_path = append_path
# else: # else:
# fp.moveTo( # fp.moveTo(float(istop - self.w), float(new_bars[0]['open']))
# float(istop - self.w), float(new_bars[0]['open']) # fp.addPath(append_path)
# )
# fp.addPath(append_path) # self.setCacheMode(QtWidgets.QGraphicsItem.NoCache)
# flip_cache = True
# self.setCacheMode(QtWidgets.QGraphicsItem.NoCache)
# flip_cache = True
self._xrange = first_index, last_index self._xrange = first_index, last_index
@ -774,69 +559,73 @@ class BarItems(pg.GraphicsObject):
) )
# def should_ds_or_redraw( def maybe_downsample(
# self, self,
# x_gt: float = 2, x_gt: float = 2.,
# ) -> tuple[bool, bool]: ) -> bool:
'''
Call this when you want to stop drawing individual
bars and instead use a ``FastAppendCurve`` intepolation
line (normally when the width of a bar (aka 1.0 in the x)
is less then a pixel width on the device).
# curve = self._ds_line '''
# if not curve: curve = self._ds_line
# return False, False if not curve:
return False
# # this is the ``float`` value of the "number of x units" (in # this is the ``float`` value of the "number of x units" (in
# # view coords) that a pixel spans. # view coords) that a pixel spans.
# uppx = self._ds_line.x_uppx() xs_in_px = self._ds_line.x_uppx()
# print(f'uppx: {uppx}')
# # linked = self.linked linked = self.linked
# should_redraw: bool = False
# should_ds: bool = False
# if ( if (
# not self._in_ds self._ds_line_xy is not None
# and uppx >= x_gt ):
# ): curve = self.update_ds_line(
*self._ds_line_xy,
)
# should_ds = True if (
# should_redraw = True not self._in_ds
and xs_in_px >= x_gt
):
# TODO: a `.ui()` log level?
log.info(
f'downsampling to line graphic {linked.symbol.key}'
)
self.hide()
# XXX: is this actually any faster?
# self._pi.removeItem(self)
# elif ( self._xs_in_px = xs_in_px
# self._in_ds
# and uppx < x_gt
# ):
# should_ds = False
# should_redraw = True
# if self._in_ds: # self._pi.addItem(curve)
# should_ds = True curve.show()
# # no curve change self._in_ds = True
# return should_ds, should_redraw
# def maybe_downsample( elif (
# self, self._in_ds
# x_gt: float = 2, and xs_in_px < x_gt
):
log.info(f'showing bars graphic {linked.symbol.key}')
# ) -> bool: curve = self._ds_line
# ''' curve.hide()
# Call this when you want to stop drawing individual # self._pi.removeItem(curve)
# bars and instead use a ``FastAppendCurve`` intepolation
# line (normally when the width of a bar (aka 1.0 in the x)
# is less then a pixel width on the device).
# ''' # XXX: is this actually any faster?
# ds_xy = self._ds_line_xy # self._pi.addItem(self)
# if ds_xy: self.show()
# ds_xy.maybe_downsample() self.update()
# if ( self._in_ds = False
# self._ds_line_xy is not None
# and self._in_ds # no curve change
# ): return self._in_ds
# curve = self.update_ds_line(
# *self._ds_line_xy,
# )
def paint( def paint(
self, self,
@ -868,8 +657,20 @@ class BarItems(pg.GraphicsObject):
p.setPen(self.bars_pen) p.setPen(self.bars_pen)
p.drawPath(self.path) p.drawPath(self.path)
profiler(f'draw history path: {self.path.capacity()}') profiler('draw history path')
# if self.fast_path: # if self.fast_path:
# p.drawPath(self.fast_path) # p.drawPath(self.fast_path)
# profiler('draw fast path') # profiler('draw fast path')
profiler.finish()
# NOTE: for testing paint frequency as throttled by display loop.
# now = time.time()
# global _last_draw
# print(f'DRAW RATE {1/(now - _last_draw)}')
# _last_draw = now
# import time
# _last_draw: float = time.time()