Simplify formatter update methodology
Don't expect values (array + slice) to be returned and applied by `.incr_update_xy_nd()` and instead presume this will implemented internally in each (sub)formatter. Attempt to simplify some incr-update routines, (particularly in the step curve formatter, though most of it was reverted to just a simpler form of the original implementation XD) including: - dropping the need for the `slice_to_head: int` control. - using the `xy_nd_start/stop` index counters over custom lookups.epoch_index_backup
parent
97feb195e6
commit
bd2abcb91f
|
@ -271,15 +271,7 @@ class IncrementalFormatter(msgspec.Struct):
|
|||
profiler('allocated xy history')
|
||||
|
||||
if prepend_len:
|
||||
y_prepend = shm._array[pre_slice]
|
||||
# if read_src_from_key:
|
||||
# y_prepend = y_prepend[array_key]
|
||||
|
||||
(
|
||||
new_y_nd,
|
||||
y_nd_slc,
|
||||
|
||||
) = self.incr_update_xy_nd(
|
||||
self.incr_update_xy_nd(
|
||||
shm,
|
||||
array_key,
|
||||
|
||||
|
@ -289,7 +281,7 @@ class IncrementalFormatter(msgspec.Struct):
|
|||
# step curves) the updater routine may want to do
|
||||
# the source history-data reading itself, so we pass
|
||||
# both here.
|
||||
y_prepend,
|
||||
shm._array[pre_slice],
|
||||
pre_slice,
|
||||
prepend_len,
|
||||
|
||||
|
@ -298,30 +290,16 @@ class IncrementalFormatter(msgspec.Struct):
|
|||
is_append=False,
|
||||
)
|
||||
|
||||
# y_nd_view = self.y_nd[y_nd_slc]
|
||||
self.y_nd[y_nd_slc] = new_y_nd
|
||||
# if read_src_from_key:
|
||||
# y_nd_view[:][array_key] = new_y_nd
|
||||
# else:
|
||||
# y_nd_view[:] = new_y_nd
|
||||
|
||||
# self.y_nd[y_nd_slc] = new_y_nd
|
||||
self.xy_nd_start = shm._first.value
|
||||
profiler('prepended xy history: {prepend_length}')
|
||||
|
||||
if append_len:
|
||||
y_append = shm._array[post_slice]
|
||||
# if read_src_from_key:
|
||||
# y_append = y_append[array_key]
|
||||
|
||||
(
|
||||
new_y_nd,
|
||||
y_nd_slc,
|
||||
|
||||
) = self.incr_update_xy_nd(
|
||||
self.incr_update_xy_nd(
|
||||
shm,
|
||||
array_key,
|
||||
|
||||
y_append,
|
||||
shm._array[post_slice],
|
||||
post_slice,
|
||||
append_len,
|
||||
|
||||
|
@ -329,10 +307,6 @@ class IncrementalFormatter(msgspec.Struct):
|
|||
self.xy_nd_stop,
|
||||
is_append=True,
|
||||
)
|
||||
# self.y_nd[post_slice] = new_y_nd
|
||||
# self.y_nd[xy_slice or post_slice] = xy_data
|
||||
self.y_nd[y_nd_slc] = new_y_nd
|
||||
|
||||
self.xy_nd_stop = shm._last.value
|
||||
profiler('appened xy history: {append_length}')
|
||||
|
||||
|
@ -391,11 +365,11 @@ class IncrementalFormatter(msgspec.Struct):
|
|||
# update the last "in view data range"
|
||||
if len(x_1d):
|
||||
self._last_ivdr = x_1d[0], x_1d[slice_to_head]
|
||||
|
||||
profiler('.format_to_1d()')
|
||||
if (x_1d[-1] == 0.5).any():
|
||||
breakpoint()
|
||||
|
||||
profiler('.format_to_1d()')
|
||||
|
||||
return (
|
||||
x_1d,
|
||||
y_1d,
|
||||
|
@ -450,10 +424,7 @@ class IncrementalFormatter(msgspec.Struct):
|
|||
|
||||
is_append: bool,
|
||||
|
||||
) -> tuple[
|
||||
np.ndarray,
|
||||
slice,
|
||||
]:
|
||||
) -> None:
|
||||
# write pushed data to flattened copy
|
||||
new_y_nd = new_from_src[data_field]
|
||||
|
||||
|
@ -466,7 +437,7 @@ class IncrementalFormatter(msgspec.Struct):
|
|||
x_nd_new = self.x_nd[read_slc]
|
||||
x_nd_new[:] = new_from_src[index_field]
|
||||
|
||||
return new_y_nd, read_slc
|
||||
self.y_nd[read_slc] = new_y_nd
|
||||
|
||||
# XXX: was ``.format_xy()``
|
||||
def format_xy_nd_to_1d(
|
||||
|
@ -488,8 +459,8 @@ class IncrementalFormatter(msgspec.Struct):
|
|||
|
||||
'''
|
||||
return (
|
||||
array[self.index_field],
|
||||
array[array_key],
|
||||
array[self.index_field][:-1],
|
||||
array[array_key][:-1],
|
||||
|
||||
# 1d connection array or style-key to
|
||||
# ``pg.functions.arrayToQPath()``
|
||||
|
@ -672,10 +643,7 @@ class OHLCBarsFmtr(IncrementalFormatter):
|
|||
|
||||
is_append: bool,
|
||||
|
||||
) -> tuple[
|
||||
np.ndarray,
|
||||
slice,
|
||||
]:
|
||||
) -> None:
|
||||
# write newly pushed data to flattened copy
|
||||
# a struct-arr is always passed in.
|
||||
new_y_nd = rfn.structured_to_unstructured(
|
||||
|
@ -694,7 +662,7 @@ class OHLCBarsFmtr(IncrementalFormatter):
|
|||
if (self.x_nd[self.xy_slice] == 0.5).any():
|
||||
breakpoint()
|
||||
|
||||
return new_y_nd, read_slc
|
||||
self.y_nd[read_slc] = new_y_nd
|
||||
|
||||
|
||||
class OHLCBarsAsCurveFmtr(OHLCBarsFmtr):
|
||||
|
@ -764,8 +732,10 @@ class StepCurveFmtr(IncrementalFormatter):
|
|||
# fill in (current) values from source shm buffer
|
||||
y_out[:] = out[:, np.newaxis]
|
||||
|
||||
# TODO: pretty sure we can drop this?
|
||||
# start y at origin level
|
||||
y_out[self.xy_nd_start] = 0
|
||||
# y_out[0, 0] = 0
|
||||
# y_out[self.xy_nd_start] = 0
|
||||
return x_out, y_out
|
||||
|
||||
def incr_update_xy_nd(
|
||||
|
@ -790,13 +760,29 @@ class StepCurveFmtr(IncrementalFormatter):
|
|||
# for a step curve we slice from one datum prior
|
||||
# to the current "update slice" to get the previous
|
||||
# "level".
|
||||
# if is_append:
|
||||
# start = max(last - 1, 0)
|
||||
# end = src_shm._last.value
|
||||
# new_y = src_shm._array[start:end][array_key]
|
||||
# append_slc = slice(start, end)
|
||||
last_2 = slice(
|
||||
read_slc.start,
|
||||
read_slc.stop+1,
|
||||
)
|
||||
y_nd_new = self.y_nd[last_2]
|
||||
y_nd_new[:] = src_shm._array[last_2][array_key][:, None]
|
||||
|
||||
new_y = new_from_src[array_key][:, np.newaxis]
|
||||
# NOTE: we can't use the append slice since we need to "look
|
||||
# forward" one step to get the current level and copy it as
|
||||
# well? (though i still don't really grok why..)
|
||||
# y_nd_new[:] = new_from_src[array_key][:, None]
|
||||
|
||||
# XXX: old approach now duplicated above (we can probably drop
|
||||
# this since the key part was the ``nd_stop + 1``
|
||||
# if is_append:
|
||||
# start = max(nd_stop - 1, 0)
|
||||
# end = src_shm._last.value
|
||||
# y_nd_new = src_shm._array[start:end][array_key]#[:, np.newaxis]
|
||||
# slc = slice(start, end)
|
||||
# self.y_nd[slc] = np.broadcast_to(
|
||||
# y_nd_new[:, None],
|
||||
# (y_nd_new.size, 2),
|
||||
# )
|
||||
|
||||
index_field = self.index_field
|
||||
if index_field != 'index':
|
||||
|
@ -806,15 +792,6 @@ class StepCurveFmtr(IncrementalFormatter):
|
|||
if (self.x_nd[self.xy_slice][-1] == 0.5).any():
|
||||
breakpoint()
|
||||
|
||||
return (
|
||||
new_y,
|
||||
# np.broadcast_to(
|
||||
# new_x[:, None],
|
||||
# (new_y.size, 2),
|
||||
# ),
|
||||
read_slc,
|
||||
)
|
||||
|
||||
def format_xy_nd_to_1d(
|
||||
self,
|
||||
|
||||
|
@ -830,11 +807,7 @@ class StepCurveFmtr(IncrementalFormatter):
|
|||
last_t, last = array[-1][[self.index_field, array_key]]
|
||||
|
||||
start = self.xy_nd_start
|
||||
|
||||
# 2 more datum-indexes to capture zero at end
|
||||
# XXX: can we drop this ``extra`` bit?
|
||||
extra = 2
|
||||
stop = self.xy_nd_stop + extra
|
||||
stop = self.xy_nd_stop
|
||||
|
||||
x_step = self.x_nd[start:stop]
|
||||
y_step = self.y_nd[start:stop]
|
||||
|
@ -843,32 +816,44 @@ class StepCurveFmtr(IncrementalFormatter):
|
|||
# breakpoint()
|
||||
|
||||
# pack in duplicate final value to complete last step level
|
||||
x_step[-1] = last_t
|
||||
y_step[-1] = last
|
||||
# x_step[-1] = last_t
|
||||
# y_step[-1] = last
|
||||
# x_step[-1, 1] = last_t
|
||||
y_step[-1, 1] = last
|
||||
|
||||
# if y_step.any():
|
||||
# s = 3
|
||||
# print(
|
||||
# f'x_step:\n{x_step[-s:]}\n'
|
||||
# f'y_step:\n{y_step[-s:]}\n\n'
|
||||
# )
|
||||
|
||||
# slice out in-view data
|
||||
ivl, ivr = vr
|
||||
# ys_iv = y_step[ivl:ivr+1]
|
||||
# xs_iv = x_step[ivl:ivr+1]
|
||||
ys_iv = y_step[ivl:ivr]
|
||||
xs_iv = x_step[ivl:ivr]
|
||||
# TODO: WHY do we need the extra +1 index?
|
||||
x_step_iv = x_step[ivl:ivr+1]
|
||||
y_step_iv = y_step[ivl:ivr+1]
|
||||
|
||||
# flatten to 1d
|
||||
y_iv = ys_iv.reshape(ys_iv.size)
|
||||
x_iv = xs_iv.reshape(xs_iv.size)
|
||||
x_1d = x_step_iv.reshape(x_step_iv.size)
|
||||
y_1d = y_step_iv.reshape(y_step_iv.size)
|
||||
|
||||
if (x_iv[-1] == 0.5).any():
|
||||
if not x_1d.size == y_1d.size:
|
||||
breakpoint()
|
||||
|
||||
# s = 100
|
||||
if x_1d.any() and (x_1d[-1] == 0.5).any():
|
||||
breakpoint()
|
||||
|
||||
# if y_1d.any():
|
||||
# s = 6
|
||||
# print(
|
||||
# f'ys_iv : {ys_iv[-s:]}\n'
|
||||
# f'y_iv: {y_iv[-s:]}\n'
|
||||
# f'xs_iv: {xs_iv[-s:]}\n'
|
||||
# f'x_iv: {x_iv[-s:]}\n'
|
||||
# f'x_step_iv:\n{x_step_iv[-s:]}\n'
|
||||
# f'y_step_iv:\n{y_step_iv[-s:]}\n\n'
|
||||
# f'x_1d:\n{x_1d[-s:]}\n'
|
||||
# f'y_1d:\n{y_1d[-s:]}\n'
|
||||
# )
|
||||
|
||||
return x_iv, y_iv, 'all'
|
||||
return x_1d, y_1d, 'all'
|
||||
|
||||
|
||||
def xy_downsample(
|
||||
|
|
Loading…
Reference in New Issue