Add cached dispersion methods to `Viz`

In an effort to make overlay calcs cleaner and leverage caching of view
range -> dispersion measures, this adds the following new methods:

- `._dispersion()` an lru cached returns scalar calculator given input
  y-range and y-ref values.
- `.disp_from_range()` which calls the above method and returns variable
  output depending on requested calc `method: str`.
- `.i_from_t()` a currently unused cached method for slicing the
  in-view's array index from time stamp (though not working yet due to
  needing to parameterize the cache by the input `.vs.xrange`).

Further refinements/adjustments:
- rename `.view_state: ViewState` -> `.vs`.
- drop the `.bars_range()` method as it's no longer used anywhere else
  in the code base.
- always set the `ViewState.in_view: np.ndarray` inside `.read()`.
- return the start array index (from slice) and `yref` value @ `xref`
  from `.scalars_from_index()` to aid with "pin to curve" rescaling
  caused by out-of-range pinned-minor curves.
storage_cli
Tyler Goodlet 2023-02-27 14:18:41 -05:00
parent c5a9cc22c2
commit 55ec9ef5a0
1 changed files with 96 additions and 33 deletions

View File

@ -279,7 +279,7 @@ class Viz(Struct): # , frozen=True):
flume: Flume
graphics: Curve | BarItems
view_state: ViewState = field(default_factory=ViewState)
vs: ViewState = field(default_factory=ViewState)
# last calculated y-mn/mx from m4 downsample code, this
# is updated in the body of `Renderer.render()`.
@ -520,6 +520,7 @@ class Viz(Struct): # , frozen=True):
# cache result for input range
assert mxmn
self._mxmns[ixrng] = (read_slc, mxmn)
self.vs.yrange = mxmn
profiler(f'yrange mxmn cacheing: {x_range} -> {mxmn}')
return (
ixrng,
@ -538,15 +539,6 @@ class Viz(Struct): # , frozen=True):
vr.right(),
)
def bars_range(self) -> tuple[int, int, int, int]:
'''
Return a range tuple for the left-view, left-datum, right-datum
and right-view x-indices.
'''
l, start, datum_start, datum_stop, stop, r = self.datums_range()
return l, datum_start, datum_stop, r
def datums_range(
self,
view_range: None | tuple[float, float] = None,
@ -574,11 +566,6 @@ class Viz(Struct): # , frozen=True):
first: int = floor(index[0])
last: int = ceil(index[-1])
# first and last datums in view determined by
# l -> r view range.
leftmost: int = floor(l)
rightmost: int = ceil(r)
# invalid view state
if (
r < l
@ -593,13 +580,15 @@ class Viz(Struct): # , frozen=True):
rightmost: int = last
else:
# determine first and last datums in view determined by
# l -> r view range.
rightmost = max(
min(last, rightmost),
min(last, ceil(r)),
first,
)
leftmost = min(
max(first, leftmost),
max(first, floor(l)),
last,
rightmost - 1,
)
@ -607,7 +596,7 @@ class Viz(Struct): # , frozen=True):
# sanity
# assert leftmost < rightmost
self.view_state.xrange = leftmost, rightmost
self.vs.xrange = leftmost, rightmost
return (
l, # left x-in-view
@ -671,14 +660,14 @@ class Viz(Struct): # , frozen=True):
# above?
in_view = array[read_slc]
if in_view.size:
self.view_state.in_view = in_view
self.vs.in_view = in_view
abs_indx = in_view['index']
abs_slc = slice(
int(abs_indx[0]),
int(abs_indx[-1]),
)
else:
self.view_state.in_view = None
self.vs.in_view = None
if profiler:
profiler(
@ -699,7 +688,7 @@ class Viz(Struct): # , frozen=True):
# BUT the ``in_view`` slice DOES..
read_slc = slice(lbar_i, rbar_i)
in_view = array[lbar_i: rbar_i + 1]
self.view_state.in_view = in_view
self.vs.in_view = in_view
# in_view = array[lbar_i-1: rbar_i+1]
# XXX: same as ^
# to_draw = array[lbar - ifirst:(rbar - ifirst) + 1]
@ -1323,27 +1312,101 @@ class Viz(Struct): # , frozen=True):
return np.median(in_view[self.name])
@lru_cache(maxsize=6116)
def dispersion(
start: int,
stop: int,
def _dispersion(
self,
# xrange: tuple[float, float],
ymn: float,
ymx: float,
yref: float,
) -> float:
pass
) -> tuple[float, float]:
return (
(ymx - yref) / yref,
(ymn - yref) / yref,
)
def disp_from_range(
self,
xrange: tuple[float, float] | None = None,
yref: float | None = None,
method: Literal[
'up',
'down',
'full', # both sides
'both', # both up and down as separate scalars
] = 'full',
) -> float | tuple[float, float] | None:
'''
Return a dispersion metric referenced from an optionally
provided ``yref`` or the left-most datum level by default.
'''
vs = self.vs
yrange = vs.yrange
if yrange is None:
return None
ymn, ymx = yrange
key = 'open' if self.is_ohlc else self.name
yref = yref or vs.in_view[0][key]
# xrange = xrange or vs.xrange
# call into the lru_cache-d sigma calculator method
r_up, r_down = self._dispersion(ymn, ymx, yref)
match method:
case 'full':
return r_up - r_down
case 'up':
return r_up
case 'down':
return r_up
case 'both':
return r_up, r_down
@lru_cache(maxsize=6116)
def i_from_t(
self,
t: float,
) -> int:
return slice_from_time(
self.vs.in_view,
start_t=t,
stop_t=t,
step=self.index_step(),
).start
def scalars_from_index(
self,
xref: float,
xref: float | None = None,
) -> tuple[int, float, float, float]:
vs = self.vs
arr = vs.in_view
# TODO: make this work by parametrizing over input
# .vs.xrange input for caching?
# read_slc_start = self.i_from_t(xref)
) -> tuple[float, float]:
arr = self.view_state.in_view
slc = slice_from_time(
arr=self.view_state.in_view,
arr=self.vs.in_view,
start_t=xref,
stop_t=xref,
)
yref = arr[slc.start]
ymn, ymx = self.view_state.yrange
read_slc_start = slc.start
key = 'open' if self.is_ohlc else self.name
yref = arr[read_slc_start][key]
ymn, ymx = self.vs.yrange
# print(
# f'INTERSECT xref: {read_slc_start}\n'
# f'ymn, ymx: {(ymn, ymx)}\n'
# )
return (
(ymn - yref) / yref,
read_slc_start,
yref,
(ymx - yref) / yref,
(ymn - yref) / yref,
)