Factor step format data gen into `to_step_format()`

Yet another path ops routine which converts a 1d array into a data
format suitable for rendering a "step curve" graphics path (aka a "bar
graph" but implemented as a continuous line).

Also, factor the `BarItems` rendering logic (which determines whether to
render the literal bars lines or a downsampled curve) into a routine
`render_baritems()` until we figure out the right abstraction layer for
it.
pre_flow
Tyler Goodlet 2022-05-15 16:54:50 -04:00
parent 06d3cadcc0
commit 3cb6b7221c
2 changed files with 282 additions and 262 deletions

View File

@ -53,6 +53,7 @@ from .._profile import (
from ._pathops import (
gen_ohlc_qpath,
ohlc_to_line,
to_step_format,
)
from ._ohlc import (
BarItems,
@ -140,6 +141,243 @@ def mk_ohlc_flat_copy(
return y
def render_baritems(
flow: Flow,
graphics: BarItems,
read: tuple[
int, int, np.ndarray,
int, int, np.ndarray,
],
profiler: pg.debug.Profiler,
**kwargs,
) -> None:
'''
Graphics management logic for a ``BarItems`` object.
Mostly just logic to determine when and how to downsample an OHLC
lines curve into a flattened line graphic and when to display one
graphic or the other.
TODO: this should likely be moved into some kind of better abstraction
layer, if not a `Renderer` then something just above it?
'''
(
xfirst, xlast, array,
ivl, ivr, in_view,
) = read
# if no source data renderer exists create one.
self = flow
r = self._src_r
if not r:
# OHLC bars path renderer
r = self._src_r = Renderer(
flow=self,
# TODO: rename this to something with ohlc
draw_path=gen_ohlc_qpath,
last_read=read,
)
ds_curve_r = Renderer(
flow=self,
# just swap in the flat view
# data_t=lambda array: self.gy.array,
last_read=read,
draw_path=partial(
rowarr_to_path,
x_basis=None,
),
)
curve = FastAppendCurve(
name='OHLC',
color=graphics._color,
)
curve.hide()
self.plot.addItem(curve)
# baseline "line" downsampled OHLC curve that should
# kick on only when we reach a certain uppx threshold.
self._render_table[0] = (
ds_curve_r,
curve,
)
dsc_r, curve = self._render_table[0]
# do checks for whether or not we require downsampling:
# - if we're **not** downsampling then we simply want to
# render the bars graphics curve and update..
# - if insteam we are in a downsamplig state then we to
x_gt = 6
uppx = curve.x_uppx()
in_line = should_line = curve.isVisible()
if (
should_line
and uppx < x_gt
):
print('FLIPPING TO BARS')
should_line = False
elif (
not should_line
and uppx >= x_gt
):
print('FLIPPING TO LINE')
should_line = True
profiler(f'ds logic complete line={should_line}')
# do graphics updates
if should_line:
fields = ['open', 'high', 'low', 'close']
if self.gy is None:
# create a flattened view onto the OHLC array
# which can be read as a line-style format
shm = self.shm
(
self._iflat_first,
self._iflat_last,
self.gx,
self.gy,
) = ohlc_to_line(
shm,
fields=fields,
)
# print(f'unstruct diff: {time.time() - start}')
gy = self.gy
# update flatted ohlc copy
(
iflat_first,
iflat,
ishm_last,
ishm_first,
) = (
self._iflat_first,
self._iflat_last,
self.shm._last.value,
self.shm._first.value
)
# check for shm prepend updates since last read.
if iflat_first != ishm_first:
# write newly prepended data to flattened copy
gy[
ishm_first:iflat_first
] = rfn.structured_to_unstructured(
self.shm._array[fields][ishm_first:iflat_first]
)
self._iflat_first = ishm_first
to_update = rfn.structured_to_unstructured(
self.shm._array[iflat:ishm_last][fields]
)
gy[iflat:ishm_last][:] = to_update
profiler('updated ustruct OHLC data')
# slice out up-to-last step contents
y_flat = gy[ishm_first:ishm_last]
x_flat = self.gx[ishm_first:ishm_last]
# update local last-index tracking
self._iflat_last = ishm_last
# reshape to 1d for graphics rendering
y = y_flat.reshape(-1)
x = x_flat.reshape(-1)
profiler('flattened ustruct OHLC data')
# do all the same for only in-view data
y_iv_flat = y_flat[ivl:ivr]
x_iv_flat = x_flat[ivl:ivr]
y_iv = y_iv_flat.reshape(-1)
x_iv = x_iv_flat.reshape(-1)
profiler('flattened ustruct in-view OHLC data')
# pass into curve graphics processing
curve.update_from_array(
x,
y,
x_iv=x_iv,
y_iv=y_iv,
view_range=(ivl, ivr), # hack
profiler=profiler,
# should_redraw=False,
# NOTE: already passed through by display loop?
# do_append=uppx < 16,
**kwargs,
)
curve.show()
profiler('updated ds curve')
else:
# render incremental or in-view update
# and apply ouput (path) to graphics.
path, last = r.render(
read,
only_in_view=True,
)
graphics.path = path
graphics.draw_last(last)
# NOTE: on appends we used to have to flip the coords
# cache thought it doesn't seem to be required any more?
# graphics.setCacheMode(QtWidgets.QGraphicsItem.NoCache)
# graphics.setCacheMode(QtWidgets.QGraphicsItem.DeviceCoordinateCache)
# graphics.prepareGeometryChange()
graphics.update()
if (
not in_line
and should_line
):
# change to line graphic
log.info(
f'downsampling to line graphic {self.name}'
)
graphics.hide()
# graphics.update()
curve.show()
curve.update()
elif in_line and not should_line:
log.info(f'showing bars graphic {self.name}')
curve.hide()
graphics.show()
graphics.update()
# update our pre-downsample-ready data and then pass that
# new data the downsampler algo for incremental update.
# graphics.update_from_array(
# array,
# in_view,
# view_range=(ivl, ivr) if use_vr else None,
# **kwargs,
# )
# generate and apply path to graphics obj
# graphics.path, last = r.render(
# read,
# only_in_view=True,
# )
# graphics.draw_last(last)
class Flow(msgspec.Struct): # , frozen=True):
'''
(Financial Signal-)Flow compound type which wraps a real-time
@ -355,276 +593,30 @@ class Flow(msgspec.Struct): # , frozen=True):
graphics = self.graphics
if isinstance(graphics, BarItems):
# if no source data renderer exists create one.
r = self._src_r
if not r:
# OHLC bars path renderer
r = self._src_r = Renderer(
flow=self,
# TODO: rename this to something with ohlc
draw_path=gen_ohlc_qpath,
last_read=read,
)
ds_curve_r = Renderer(
flow=self,
# just swap in the flat view
# data_t=lambda array: self.gy.array,
last_read=read,
draw_path=partial(
rowarr_to_path,
x_basis=None,
),
)
curve = FastAppendCurve(
name='OHLC',
color=graphics._color,
)
curve.hide()
self.plot.addItem(curve)
# baseline "line" downsampled OHLC curve that should
# kick on only when we reach a certain uppx threshold.
self._render_table[0] = (
ds_curve_r,
curve,
)
dsc_r, curve = self._render_table[0]
# do checks for whether or not we require downsampling:
# - if we're **not** downsampling then we simply want to
# render the bars graphics curve and update..
# - if insteam we are in a downsamplig state then we to
x_gt = 6
uppx = curve.x_uppx()
in_line = should_line = curve.isVisible()
if (
should_line
and uppx < x_gt
):
print('FLIPPING TO BARS')
should_line = False
elif (
not should_line
and uppx >= x_gt
):
print('FLIPPING TO LINE')
should_line = True
profiler(f'ds logic complete line={should_line}')
# do graphics updates
if should_line:
fields = ['open', 'high', 'low', 'close']
if self.gy is None:
# create a flattened view onto the OHLC array
# which can be read as a line-style format
shm = self.shm
(
self._iflat_first,
self._iflat_last,
self.gx,
self.gy,
) = ohlc_to_line(shm)
# self.gy = self.shm.ustruct(fields)
# first = self._iflat_first = self.shm._first.value
# last = self._iflat_last = self.shm._last.value
# # write pushed data to flattened copy
# self.gy[first:last] = rfn.structured_to_unstructured(
# self.shm.array[fields]
# )
# # generate an flat-interpolated x-domain
# self.gx = (
# np.broadcast_to(
# shm._array['index'][:, None],
# (
# shm._array.size,
# # 4, # only ohlc
# self.gy.shape[1],
# ),
# ) + np.array([-0.5, 0, 0, 0.5])
# )
# assert self.gy.any()
# print(f'unstruct diff: {time.time() - start}')
# profiler('read unstr view bars to line')
# start = self.gy._first.value
# update flatted ohlc copy
(
iflat_first,
iflat,
ishm_last,
ishm_first,
) = (
self._iflat_first,
self._iflat_last,
self.shm._last.value,
self.shm._first.value
)
# check for shm prepend updates since last read.
if iflat_first != ishm_first:
# write newly prepended data to flattened copy
self.gy[
ishm_first:iflat_first
] = rfn.structured_to_unstructured(
self.shm._array[fields][ishm_first:iflat_first]
)
self._iflat_first = ishm_first
# # flat = self.gy = self.shm.unstruct_view(fields)
# self.gy = self.shm.ustruct(fields)
# # self._iflat_last = self.shm._last.value
# # self._iflat_first = self.shm._first.value
# # do an update for the most recent prepend
# # index
# iflat = ishm_first
to_update = rfn.structured_to_unstructured(
self.shm._array[iflat:ishm_last][fields]
)
self.gy[iflat:ishm_last][:] = to_update
profiler('updated ustruct OHLC data')
# slice out up-to-last step contents
y_flat = self.gy[ishm_first:ishm_last]
x_flat = self.gx[ishm_first:ishm_last]
# update local last-index tracking
self._iflat_last = ishm_last
# reshape to 1d for graphics rendering
y = y_flat.reshape(-1)
x = x_flat.reshape(-1)
profiler('flattened ustruct OHLC data')
# do all the same for only in-view data
y_iv_flat = y_flat[ivl:ivr]
x_iv_flat = x_flat[ivl:ivr]
y_iv = y_iv_flat.reshape(-1)
x_iv = x_iv_flat.reshape(-1)
profiler('flattened ustruct in-view OHLC data')
# legacy full-recompute-everytime method
# x, y = ohlc_flatten(array)
# x_iv, y_iv = ohlc_flatten(in_view)
# profiler('flattened OHLC data')
curve.update_from_array(
x,
y,
x_iv=x_iv,
y_iv=y_iv,
view_range=(ivl, ivr), # hack
profiler=profiler,
# should_redraw=False,
# NOTE: already passed through by display loop?
# do_append=uppx < 16,
**kwargs,
)
curve.show()
profiler('updated ds curve')
else:
# render incremental or in-view update
# and apply ouput (path) to graphics.
path, last = r.render(
read,
only_in_view=True,
)
graphics.path = path
graphics.draw_last(last)
# NOTE: on appends we used to have to flip the coords
# cache thought it doesn't seem to be required any more?
# graphics.setCacheMode(QtWidgets.QGraphicsItem.NoCache)
# graphics.setCacheMode(QtWidgets.QGraphicsItem.DeviceCoordinateCache)
# graphics.prepareGeometryChange()
graphics.update()
if (
not in_line
and should_line
):
# change to line graphic
log.info(
f'downsampling to line graphic {self.name}'
)
graphics.hide()
# graphics.update()
curve.show()
curve.update()
elif in_line and not should_line:
log.info(f'showing bars graphic {self.name}')
curve.hide()
graphics.show()
graphics.update()
# update our pre-downsample-ready data and then pass that
# new data the downsampler algo for incremental update.
# graphics.update_from_array(
# array,
# in_view,
# view_range=(ivl, ivr) if use_vr else None,
# **kwargs,
# )
# generate and apply path to graphics obj
# graphics.path, last = r.render(
# read,
# only_in_view=True,
# )
# graphics.draw_last(last)
render_baritems(
self,
graphics,
read,
profiler,
**kwargs,
)
else:
# ``FastAppendCurve`` case:
array_key = array_key or self.name
uppx = graphics.x_uppx()
profiler('read uppx')
profiler(f'read uppx {uppx}')
if graphics._step_mode and self.gy is None:
self._iflat_first = self.shm._first.value
# create a flattened view onto the OHLC array
# which can be read as a line-style format
shm = self.shm
# fields = ['index', array_key]
i = shm._array['index'].copy()
out = shm._array[array_key].copy()
self.gx = np.broadcast_to(
i[:, None],
(i.size, 2),
) + np.array([-0.5, 0.5])
# self.gy = np.broadcast_to(
# out[:, None], (out.size, 2),
# )
self.gy = np.empty((len(out), 2), dtype=out.dtype)
self.gy[:] = out[:, np.newaxis]
# start y at origin level
self.gy[0, 0] = 0
(
self._iflat_first,
self.gx,
self.gy,
) = to_step_format(
shm,
array_key,
)
profiler('generated step mode data')
if graphics._step_mode:

View File

@ -226,3 +226,31 @@ def ohlc_to_line(
x_out,
y_out,
)
def to_step_format(
shm: ShmArray,
data_field: str,
index_field: str = 'index',
) -> tuple[int, np.ndarray, np.ndarray]:
'''
Convert an input 1d shm array to a "step array" format
for use by path graphics generation.
'''
first = shm._first.value
i = shm._array['index'].copy()
out = shm._array[data_field].copy()
x_out = np.broadcast_to(
i[:, None],
(i.size, 2),
) + np.array([-0.5, 0.5])
y_out = np.empty((len(out), 2), dtype=out.dtype)
y_out[:] = out[:, np.newaxis]
# start y at origin level
y_out[0, 0] = 0
return first, x_out, y_out