Simplify formatter update methodology

Don't expect values (array + slice) to be returned and applied by
`.incr_update_xy_nd()` and instead presume this will implemented
internally in each (sub)formatter.

Attempt to simplify some incr-update routines, (particularly in the step
curve formatter, though most of it was reverted to just a simpler form
of the original implementation XD) including:
- dropping the need for the `slice_to_head: int` control.
- using the `xy_nd_start/stop` index counters over custom lookups.
multichartz_backup
Tyler Goodlet 2022-11-29 09:05:06 -05:00
parent 331569c5b8
commit 9f1de263a7
1 changed files with 68 additions and 83 deletions

View File

@ -271,15 +271,7 @@ class IncrementalFormatter(msgspec.Struct):
profiler('allocated xy history') profiler('allocated xy history')
if prepend_len: if prepend_len:
y_prepend = shm._array[pre_slice] self.incr_update_xy_nd(
# if read_src_from_key:
# y_prepend = y_prepend[array_key]
(
new_y_nd,
y_nd_slc,
) = self.incr_update_xy_nd(
shm, shm,
array_key, array_key,
@ -289,7 +281,7 @@ class IncrementalFormatter(msgspec.Struct):
# step curves) the updater routine may want to do # step curves) the updater routine may want to do
# the source history-data reading itself, so we pass # the source history-data reading itself, so we pass
# both here. # both here.
y_prepend, shm._array[pre_slice],
pre_slice, pre_slice,
prepend_len, prepend_len,
@ -298,30 +290,16 @@ class IncrementalFormatter(msgspec.Struct):
is_append=False, is_append=False,
) )
# y_nd_view = self.y_nd[y_nd_slc] # self.y_nd[y_nd_slc] = new_y_nd
self.y_nd[y_nd_slc] = new_y_nd
# if read_src_from_key:
# y_nd_view[:][array_key] = new_y_nd
# else:
# y_nd_view[:] = new_y_nd
self.xy_nd_start = shm._first.value self.xy_nd_start = shm._first.value
profiler('prepended xy history: {prepend_length}') profiler('prepended xy history: {prepend_length}')
if append_len: if append_len:
y_append = shm._array[post_slice] self.incr_update_xy_nd(
# if read_src_from_key:
# y_append = y_append[array_key]
(
new_y_nd,
y_nd_slc,
) = self.incr_update_xy_nd(
shm, shm,
array_key, array_key,
y_append, shm._array[post_slice],
post_slice, post_slice,
append_len, append_len,
@ -329,10 +307,6 @@ class IncrementalFormatter(msgspec.Struct):
self.xy_nd_stop, self.xy_nd_stop,
is_append=True, is_append=True,
) )
# self.y_nd[post_slice] = new_y_nd
# self.y_nd[xy_slice or post_slice] = xy_data
self.y_nd[y_nd_slc] = new_y_nd
self.xy_nd_stop = shm._last.value self.xy_nd_stop = shm._last.value
profiler('appened xy history: {append_length}') profiler('appened xy history: {append_length}')
@ -391,11 +365,11 @@ class IncrementalFormatter(msgspec.Struct):
# update the last "in view data range" # update the last "in view data range"
if len(x_1d): if len(x_1d):
self._last_ivdr = x_1d[0], x_1d[slice_to_head] self._last_ivdr = x_1d[0], x_1d[slice_to_head]
profiler('.format_to_1d()')
if (x_1d[-1] == 0.5).any(): if (x_1d[-1] == 0.5).any():
breakpoint() breakpoint()
profiler('.format_to_1d()')
return ( return (
x_1d, x_1d,
y_1d, y_1d,
@ -450,10 +424,7 @@ class IncrementalFormatter(msgspec.Struct):
is_append: bool, is_append: bool,
) -> tuple[ ) -> None:
np.ndarray,
slice,
]:
# write pushed data to flattened copy # write pushed data to flattened copy
new_y_nd = new_from_src[data_field] new_y_nd = new_from_src[data_field]
@ -466,7 +437,7 @@ class IncrementalFormatter(msgspec.Struct):
x_nd_new = self.x_nd[read_slc] x_nd_new = self.x_nd[read_slc]
x_nd_new[:] = new_from_src[index_field] x_nd_new[:] = new_from_src[index_field]
return new_y_nd, read_slc self.y_nd[read_slc] = new_y_nd
# XXX: was ``.format_xy()`` # XXX: was ``.format_xy()``
def format_xy_nd_to_1d( def format_xy_nd_to_1d(
@ -488,8 +459,8 @@ class IncrementalFormatter(msgspec.Struct):
''' '''
return ( return (
array[self.index_field], array[self.index_field][:-1],
array[array_key], array[array_key][:-1],
# 1d connection array or style-key to # 1d connection array or style-key to
# ``pg.functions.arrayToQPath()`` # ``pg.functions.arrayToQPath()``
@ -672,10 +643,7 @@ class OHLCBarsFmtr(IncrementalFormatter):
is_append: bool, is_append: bool,
) -> tuple[ ) -> None:
np.ndarray,
slice,
]:
# write newly pushed data to flattened copy # write newly pushed data to flattened copy
# a struct-arr is always passed in. # a struct-arr is always passed in.
new_y_nd = rfn.structured_to_unstructured( new_y_nd = rfn.structured_to_unstructured(
@ -694,7 +662,7 @@ class OHLCBarsFmtr(IncrementalFormatter):
if (self.x_nd[self.xy_slice] == 0.5).any(): if (self.x_nd[self.xy_slice] == 0.5).any():
breakpoint() breakpoint()
return new_y_nd, read_slc self.y_nd[read_slc] = new_y_nd
class OHLCBarsAsCurveFmtr(OHLCBarsFmtr): class OHLCBarsAsCurveFmtr(OHLCBarsFmtr):
@ -764,8 +732,10 @@ class StepCurveFmtr(IncrementalFormatter):
# fill in (current) values from source shm buffer # fill in (current) values from source shm buffer
y_out[:] = out[:, np.newaxis] y_out[:] = out[:, np.newaxis]
# TODO: pretty sure we can drop this?
# start y at origin level # start y at origin level
y_out[self.xy_nd_start] = 0 # y_out[0, 0] = 0
# y_out[self.xy_nd_start] = 0
return x_out, y_out return x_out, y_out
def incr_update_xy_nd( def incr_update_xy_nd(
@ -790,13 +760,29 @@ class StepCurveFmtr(IncrementalFormatter):
# for a step curve we slice from one datum prior # for a step curve we slice from one datum prior
# to the current "update slice" to get the previous # to the current "update slice" to get the previous
# "level". # "level".
# if is_append: last_2 = slice(
# start = max(last - 1, 0) read_slc.start,
# end = src_shm._last.value read_slc.stop+1,
# new_y = src_shm._array[start:end][array_key] )
# append_slc = slice(start, end) y_nd_new = self.y_nd[last_2]
y_nd_new[:] = src_shm._array[last_2][array_key][:, None]
new_y = new_from_src[array_key][:, np.newaxis] # NOTE: we can't use the append slice since we need to "look
# forward" one step to get the current level and copy it as
# well? (though i still don't really grok why..)
# y_nd_new[:] = new_from_src[array_key][:, None]
# XXX: old approach now duplicated above (we can probably drop
# this since the key part was the ``nd_stop + 1``
# if is_append:
# start = max(nd_stop - 1, 0)
# end = src_shm._last.value
# y_nd_new = src_shm._array[start:end][array_key]#[:, np.newaxis]
# slc = slice(start, end)
# self.y_nd[slc] = np.broadcast_to(
# y_nd_new[:, None],
# (y_nd_new.size, 2),
# )
index_field = self.index_field index_field = self.index_field
if index_field != 'index': if index_field != 'index':
@ -806,15 +792,6 @@ class StepCurveFmtr(IncrementalFormatter):
if (self.x_nd[self.xy_slice][-1] == 0.5).any(): if (self.x_nd[self.xy_slice][-1] == 0.5).any():
breakpoint() breakpoint()
return (
new_y,
# np.broadcast_to(
# new_x[:, None],
# (new_y.size, 2),
# ),
read_slc,
)
def format_xy_nd_to_1d( def format_xy_nd_to_1d(
self, self,
@ -830,11 +807,7 @@ class StepCurveFmtr(IncrementalFormatter):
last_t, last = array[-1][[self.index_field, array_key]] last_t, last = array[-1][[self.index_field, array_key]]
start = self.xy_nd_start start = self.xy_nd_start
stop = self.xy_nd_stop
# 2 more datum-indexes to capture zero at end
# XXX: can we drop this ``extra`` bit?
extra = 2
stop = self.xy_nd_stop + extra
x_step = self.x_nd[start:stop] x_step = self.x_nd[start:stop]
y_step = self.y_nd[start:stop] y_step = self.y_nd[start:stop]
@ -843,32 +816,44 @@ class StepCurveFmtr(IncrementalFormatter):
# breakpoint() # breakpoint()
# pack in duplicate final value to complete last step level # pack in duplicate final value to complete last step level
x_step[-1] = last_t # x_step[-1] = last_t
y_step[-1] = last # y_step[-1] = last
# x_step[-1, 1] = last_t
y_step[-1, 1] = last
# if y_step.any():
# s = 3
# print(
# f'x_step:\n{x_step[-s:]}\n'
# f'y_step:\n{y_step[-s:]}\n\n'
# )
# slice out in-view data # slice out in-view data
ivl, ivr = vr ivl, ivr = vr
# ys_iv = y_step[ivl:ivr+1] # TODO: WHY do we need the extra +1 index?
# xs_iv = x_step[ivl:ivr+1] x_step_iv = x_step[ivl:ivr+1]
ys_iv = y_step[ivl:ivr] y_step_iv = y_step[ivl:ivr+1]
xs_iv = x_step[ivl:ivr]
# flatten to 1d # flatten to 1d
y_iv = ys_iv.reshape(ys_iv.size) x_1d = x_step_iv.reshape(x_step_iv.size)
x_iv = xs_iv.reshape(xs_iv.size) y_1d = y_step_iv.reshape(y_step_iv.size)
if (x_iv[-1] == 0.5).any(): if not x_1d.size == y_1d.size:
breakpoint() breakpoint()
# s = 100 if x_1d.any() and (x_1d[-1] == 0.5).any():
breakpoint()
# if y_1d.any():
# s = 6
# print( # print(
# f'ys_iv : {ys_iv[-s:]}\n' # f'x_step_iv:\n{x_step_iv[-s:]}\n'
# f'y_iv: {y_iv[-s:]}\n' # f'y_step_iv:\n{y_step_iv[-s:]}\n\n'
# f'xs_iv: {xs_iv[-s:]}\n' # f'x_1d:\n{x_1d[-s:]}\n'
# f'x_iv: {x_iv[-s:]}\n' # f'y_1d:\n{y_1d[-s:]}\n'
# ) # )
return x_iv, y_iv, 'all' return x_1d, y_1d, 'all'
def xy_downsample( def xy_downsample(