First attempt, field-index agnostic formatting

Remove harcoded `'index'` field refs from all formatters in a first
attempt at moving towards epoch-time alignment (though don't actually
use it it yet).

Adjustments to the formatter interface:
- property for `.xy_nd` the x/y nd arrays.
- property for and `.xy_slice` the nd format array(s) start->stop index
  slice.

Internal routine tweaks:
- drop `read_src_from_key` and always pass full source array on updates
  and adjust handlers to expect to have to index the data field of
  interest.
- set `.last_read` right after update calls instead of after 1d
  conversion.
- drop `slice_to_head` array read slicing.
- add some debug points for testing 'time' indexing (though not used
  here yet).
- add `.x_nd` array update logic for when the `.index_field` is not
  'index' - i.e. when we begin to try and support epoch time.
- simplify some new y_nd updates to not require use of `np.broadcast()`
  where possible.
epoch_indexing_and_dataviz_layer
Tyler Goodlet 2022-11-28 13:35:38 -05:00
parent be21f9829e
commit 696c6f8897
1 changed files with 132 additions and 76 deletions

View File

@ -28,14 +28,12 @@ from msgspec import field
import numpy as np import numpy as np
from numpy.lib import recfunctions as rfn from numpy.lib import recfunctions as rfn
from numba import ( from numba import (
types, # types,
njit, njit,
float64, float64,
int64, int64,
optional, # optional,
) )
from numba.core.types.misc import StringLiteral
# from numba.extending import as_numba_type
from ._sharedmem import ( from ._sharedmem import (
ShmArray, ShmArray,
@ -78,6 +76,40 @@ class IncrementalFormatter(msgspec.Struct):
def last_read(self) -> tuple | None: def last_read(self) -> tuple | None:
return self._last_read return self._last_read
# Incrementally updated xy ndarray formatted data, a pre-1d
# format which is updated and cached independently of the final
# pre-graphics-path 1d format.
x_nd: Optional[np.ndarray] = None
y_nd: Optional[np.ndarray] = None
@property
def xy_nd(self) -> tuple[np.ndarray, np.ndarray]:
return (
self.x_nd[self.xy_slice],
self.y_nd[self.xy_slice],
)
@property
def xy_slice(self) -> slice:
return slice(
self.xy_nd_start,
self.xy_nd_stop,
)
# indexes which slice into the above arrays (which are allocated
# based on source data shm input size) and allow retrieving
# incrementally updated data.
xy_nd_start: int = 0
xy_nd_stop: int = 0
# TODO: eventually incrementally update 1d-pre-graphics path data?
# x_1d: Optional[np.ndarray] = None
# y_1d: Optional[np.ndarray] = None
# incremental view-change state(s) tracking
_last_vr: tuple[float, float] | None = None
_last_ivdr: tuple[float, float] | None = None
def __repr__(self) -> str: def __repr__(self) -> str:
msg = ( msg = (
f'{type(self)}: ->\n\n' f'{type(self)}: ->\n\n'
@ -87,8 +119,8 @@ class IncrementalFormatter(msgspec.Struct):
f'last_vr={self._last_vr}\n' f'last_vr={self._last_vr}\n'
f'last_ivdr={self._last_ivdr}\n\n' f'last_ivdr={self._last_ivdr}\n\n'
f'xy_nd_start={self.xy_nd_start}\n' f'xy_slice={self.xy_slice}\n'
f'xy_nd_stop={self.xy_nd_stop}\n\n' # f'xy_nd_stop={self.xy_nd_stop}\n\n'
) )
x_nd_len = 0 x_nd_len = 0
@ -138,6 +170,12 @@ class IncrementalFormatter(msgspec.Struct):
prepend_length = int(last_xfirst - xfirst) prepend_length = int(last_xfirst - xfirst)
append_length = int(xlast - last_xlast) append_length = int(xlast - last_xlast)
if (
prepend_length < 0
or append_length < 0
):
breakpoint()
# blah blah blah # blah blah blah
# do diffing for prepend, append and last entry # do diffing for prepend, append and last entry
return ( return (
@ -147,26 +185,6 @@ class IncrementalFormatter(msgspec.Struct):
slice(last_xlast, xlast), slice(last_xlast, xlast),
) )
# Incrementally updated xy ndarray formatted data, a pre-1d
# format which is updated and cached independently of the final
# pre-graphics-path 1d format.
x_nd: Optional[np.ndarray] = None
y_nd: Optional[np.ndarray] = None
# indexes which slice into the above arrays (which are allocated
# based on source data shm input size) and allow retrieving
# incrementally updated data.
xy_nd_start: int = 0
xy_nd_stop: int = 0
# TODO: eventually incrementally update 1d-pre-graphics path data?
# x_1d: Optional[np.ndarray] = None
# y_1d: Optional[np.ndarray] = None
# incremental view-change state(s) tracking
_last_vr: tuple[float, float] | None = None
_last_ivdr: tuple[float, float] | None = None
def _track_inview_range( def _track_inview_range(
self, self,
view_range: tuple[int, int], view_range: tuple[int, int],
@ -245,18 +263,18 @@ class IncrementalFormatter(msgspec.Struct):
if self.y_nd is None: if self.y_nd is None:
# we first need to allocate xy data arrays # we first need to allocate xy data arrays
# from the source data. # from the source data.
self.xy_nd_start = shm._first.value
self.xy_nd_stop = shm._last.value
self.x_nd, self.y_nd = self.allocate_xy_nd( self.x_nd, self.y_nd = self.allocate_xy_nd(
shm, shm,
array_key, array_key,
) )
self.xy_nd_start = shm._first.value
self.xy_nd_stop = shm._last.value
profiler('allocated xy history') profiler('allocated xy history')
if prepend_len: if prepend_len:
y_prepend = shm._array[pre_slice] y_prepend = shm._array[pre_slice]
if read_src_from_key: # if read_src_from_key:
y_prepend = y_prepend[array_key] # y_prepend = y_prepend[array_key]
( (
new_y_nd, new_y_nd,
@ -293,8 +311,8 @@ class IncrementalFormatter(msgspec.Struct):
if append_len: if append_len:
y_append = shm._array[post_slice] y_append = shm._array[post_slice]
if read_src_from_key: # if read_src_from_key:
y_append = y_append[array_key] # y_append = y_append[array_key]
( (
new_y_nd, new_y_nd,
@ -315,14 +333,16 @@ class IncrementalFormatter(msgspec.Struct):
# self.y_nd[post_slice] = new_y_nd # self.y_nd[post_slice] = new_y_nd
# self.y_nd[xy_slice or post_slice] = xy_data # self.y_nd[xy_slice or post_slice] = xy_data
self.y_nd[y_nd_slc] = new_y_nd self.y_nd[y_nd_slc] = new_y_nd
# if read_src_from_key:
# y_nd_view[:][array_key] = new_y_nd
# else:
# y_nd_view[:] = new_y_nd
self.xy_nd_stop = shm._last.value self.xy_nd_stop = shm._last.value
profiler('appened xy history: {append_length}') profiler('appened xy history: {append_length}')
# TODO: eventually maybe we can implement some kind of
# transform on the ``QPainterPath`` that will more or less
# detect the diff in "elements" terms?
# update diff state since we've now rendered paths.
self._last_read = new_read
view_changed: bool = False view_changed: bool = False
view_range: tuple[int, int] = (ivl, ivr) view_range: tuple[int, int] = (ivl, ivr)
if slice_to_inview: if slice_to_inview:
@ -330,11 +350,14 @@ class IncrementalFormatter(msgspec.Struct):
array = in_view array = in_view
profiler(f'{self.viz.name} view range slice {view_range}') profiler(f'{self.viz.name} view range slice {view_range}')
hist = array[:slice_to_head] # hist = array[:slice_to_head]
# XXX: WOA WTF TRACTOR DEBUGGING BUGGG
# assert 0
# xy-path data transform: convert source data to a format # xy-path data transform: convert source data to a format
# able to be passed to a `QPainterPath` rendering routine. # able to be passed to a `QPainterPath` rendering routine.
if not len(hist): if not len(array):
# XXX: this might be why the profiler only has exits? # XXX: this might be why the profiler only has exits?
return return
@ -342,7 +365,7 @@ class IncrementalFormatter(msgspec.Struct):
# x/y_data in the case where allocate_xy is # x/y_data in the case where allocate_xy is
# defined? # defined?
x_1d, y_1d, connect = self.format_xy_nd_to_1d( x_1d, y_1d, connect = self.format_xy_nd_to_1d(
hist, array,
array_key, array_key,
view_range, view_range,
) )
@ -370,13 +393,10 @@ class IncrementalFormatter(msgspec.Struct):
if len(x_1d): if len(x_1d):
self._last_ivdr = x_1d[0], x_1d[slice_to_head] self._last_ivdr = x_1d[0], x_1d[slice_to_head]
# TODO: eventually maybe we can implement some kind of
# transform on the ``QPainterPath`` that will more or less
# detect the diff in "elements" terms?
# update diff state since we've now rendered paths.
self._last_read = new_read
profiler('.format_to_1d()') profiler('.format_to_1d()')
if (x_1d[-1] == 0.5).any():
breakpoint()
return ( return (
x_1d, x_1d,
y_1d, y_1d,
@ -430,21 +450,22 @@ class IncrementalFormatter(msgspec.Struct):
nd_stop: int, nd_stop: int,
is_append: bool, is_append: bool,
index_field: str = 'index',
) -> tuple[ ) -> tuple[
np.ndarray, np.ndarray,
slice, slice,
]: ]:
# write pushed data to flattened copy # write pushed data to flattened copy
new_y_nd = new_from_src new_y_nd = new_from_src[data_field]
# XXX # XXX
# TODO: this should be returned and written by caller! # TODO: this should be returned and written by caller!
# XXX # XXX
# generate same-valued-per-row x support based on y shape # generate same-valued-per-row x support with Nx1 shape
index_field = self.index_field
if index_field != 'index': if index_field != 'index':
self.x_nd[read_slc, :] = new_from_src[index_field] x_nd_new = self.x_nd[read_slc]
x_nd_new[:] = new_from_src[index_field]
return new_y_nd, read_slc return new_y_nd, read_slc
@ -576,6 +597,7 @@ class OHLCBarsFmtr(IncrementalFormatter):
low = q['low'] low = q['low']
close = q['close'] close = q['close']
# index = float64(q[index_field]) # index = float64(q[index_field])
# index = float64(q['time'])
index = float64(q['index']) index = float64(q['index'])
istart = i * 6 istart = i * 6
@ -652,7 +674,6 @@ class OHLCBarsFmtr(IncrementalFormatter):
nd_stop: int, nd_stop: int,
is_append: bool, is_append: bool,
index_field: str = 'index',
) -> tuple[ ) -> tuple[
np.ndarray, np.ndarray,
@ -668,8 +689,13 @@ class OHLCBarsFmtr(IncrementalFormatter):
# TODO: this should be returned and written by caller! # TODO: this should be returned and written by caller!
# XXX # XXX
# generate same-valued-per-row x support based on y shape # generate same-valued-per-row x support based on y shape
index_field: str = self.index_field
if index_field != 'index': if index_field != 'index':
self.x_nd[read_slc, :] = new_from_src[index_field] x_nd_new = self.x_nd[read_slc]
x_nd_new[:] = new_from_src[index_field][:, np.newaxis]
if (self.x_nd[self.xy_slice] == 0.5).any():
breakpoint()
return new_y_nd, read_slc return new_y_nd, read_slc
@ -715,8 +741,6 @@ class StepCurveFmtr(IncrementalFormatter):
shm: ShmArray, shm: ShmArray,
data_field: str, data_field: str,
index_field: str = 'index',
) -> tuple[ ) -> tuple[
np.ndarray, # x np.ndarray, # x
np.nd.array # y np.nd.array # y
@ -734,11 +758,17 @@ class StepCurveFmtr(IncrementalFormatter):
(i.size, 2), (i.size, 2),
) + np.array([-0.5, 0.5]) ) + np.array([-0.5, 0.5])
y_out = np.empty((len(out), 2), dtype=out.dtype) # fill out Nx2 array to hold each step's left + right vertices.
y_out = np.empty(
# (len(out), 2),
x_out.shape,
dtype=out.dtype,
)
# fill in (current) values from source shm buffer
y_out[:] = out[:, np.newaxis] y_out[:] = out[:, np.newaxis]
# start y at origin level # start y at origin level
y_out[0, 0] = 0 y_out[self.xy_nd_start] = 0
return x_out, y_out return x_out, y_out
def incr_update_xy_nd( def incr_update_xy_nd(
@ -747,12 +777,12 @@ class StepCurveFmtr(IncrementalFormatter):
src_shm: ShmArray, src_shm: ShmArray,
array_key: str, array_key: str,
src_update: np.ndarray, # portion of source that was updated new_from_src: np.ndarray, # portion of source that was updated
slc: slice, read_slc: slice,
ln: int, # len of updated ln: int, # len of updated
first: int, nd_start: int,
last: int, nd_stop: int,
is_append: bool, is_append: bool,
@ -763,20 +793,29 @@ class StepCurveFmtr(IncrementalFormatter):
# for a step curve we slice from one datum prior # for a step curve we slice from one datum prior
# to the current "update slice" to get the previous # to the current "update slice" to get the previous
# "level". # "level".
if is_append: # if is_append:
start = max(last - 1, 0) # start = max(last - 1, 0)
end = src_shm._last.value # end = src_shm._last.value
new_y = src_shm._array[start:end][array_key] # new_y = src_shm._array[start:end][array_key]
slc = slice(start, end) # append_slc = slice(start, end)
else: new_y = new_from_src[array_key][:, np.newaxis]
new_y = src_update
index_field = self.index_field
if index_field != 'index':
x_nd_new = self.x_nd[read_slc]
x_nd_new[:] = new_from_src[index_field][:, np.newaxis]
if (self.x_nd[self.xy_slice][-1] == 0.5).any():
breakpoint()
return ( return (
np.broadcast_to( new_y,
new_y[:, None], (new_y.size, 2), # np.broadcast_to(
), # new_x[:, None],
slc, # (new_y.size, 2),
# ),
read_slc,
) )
def format_xy_nd_to_1d( def format_xy_nd_to_1d(
@ -791,23 +830,40 @@ class StepCurveFmtr(IncrementalFormatter):
np.ndarray, np.ndarray,
str, str,
]: ]:
lasts = array[['index', array_key]] last_t, last = array[-1][[self.index_field, array_key]]
last = lasts[array_key][-1]
start = self.xy_nd_start
# 2 more datum-indexes to capture zero at end # 2 more datum-indexes to capture zero at end
x_step = self.x_nd[self.xy_nd_start:self.xy_nd_stop+2] # XXX: can we drop this ``extra`` bit?
y_step = self.y_nd[self.xy_nd_start:self.xy_nd_stop+2] extra = 2
stop = self.xy_nd_stop + extra
x_step = self.x_nd[start:stop]
y_step = self.y_nd[start:stop]
# if (x_step[-1] == 0.5).any():
# breakpoint()
# pack in duplicate final value to complete last step level
x_step[-1] = last_t
y_step[-1] = last y_step[-1] = last
# slice out in-view data # slice out in-view data
ivl, ivr = vr ivl, ivr = vr
ys_iv = y_step[ivl:ivr+1] # ys_iv = y_step[ivl:ivr+1]
xs_iv = x_step[ivl:ivr+1] # xs_iv = x_step[ivl:ivr+1]
ys_iv = y_step[ivl:ivr]
xs_iv = x_step[ivl:ivr]
# flatten to 1d # flatten to 1d
y_iv = ys_iv.reshape(ys_iv.size) y_iv = ys_iv.reshape(ys_iv.size)
x_iv = xs_iv.reshape(xs_iv.size) x_iv = xs_iv.reshape(xs_iv.size)
if (x_iv[-1] == 0.5).any():
breakpoint()
# s = 100
# print( # print(
# f'ys_iv : {ys_iv[-s:]}\n' # f'ys_iv : {ys_iv[-s:]}\n'
# f'y_iv: {y_iv[-s:]}\n' # f'y_iv: {y_iv[-s:]}\n'