Move all pre-path formatting routines to `._pathops`, proto formatter type

epoch_index_backup
Tyler Goodlet 2022-11-22 13:20:44 -05:00
parent 226e84d15f
commit 01b470faf4
2 changed files with 187 additions and 137 deletions

View File

@ -31,7 +31,6 @@ from typing import (
import msgspec
import numpy as np
from numpy.lib import recfunctions as rfn
import pyqtgraph as pg
from PyQt5.QtGui import QPainterPath
from PyQt5.QtCore import QLineF
@ -44,9 +43,21 @@ from .._profile import (
# ms_slower_then,
)
from ._pathops import (
by_index_and_key,
# Plain OHLC renderer
gen_ohlc_qpath,
# OHLC -> line renderer
ohlc_to_line,
update_ohlc_to_line,
ohlc_flat_to_xy,
# step curve renderer
to_step_format,
update_step_xy,
step_to_xy,
xy_downsample,
)
from ._ohlc import (
@ -75,55 +86,6 @@ log = get_logger(__name__)
# flows: dict[str, np.ndarray] = {}
def update_ohlc_to_line(
src_shm: ShmArray,
array_key: str,
src_update: np.ndarray,
slc: slice,
ln: int,
first: int,
last: int,
is_append: bool,
) -> np.ndarray:
fields = ['open', 'high', 'low', 'close']
return (
rfn.structured_to_unstructured(src_update[fields]),
slc,
)
def ohlc_flat_to_xy(
r: Renderer,
array: np.ndarray,
array_key: str,
vr: tuple[int, int],
) -> tuple[
np.ndarray,
np.nd.array,
str,
]:
# TODO: in the case of an existing ``.update_xy()``
# should we be passing in array as an xy arrays tuple?
# 2 more datum-indexes to capture zero at end
x_flat = r.x_data[r._xy_first:r._xy_last]
y_flat = r.y_data[r._xy_first:r._xy_last]
# slice to view
ivl, ivr = vr
x_iv_flat = x_flat[ivl:ivr]
y_iv_flat = y_flat[ivl:ivr]
# reshape to 1d for graphics rendering
y_iv = y_iv_flat.reshape(-1)
x_iv = x_iv_flat.reshape(-1)
return x_iv, y_iv, 'all'
def render_baritems(
flow: Flow,
graphics: BarItems,
@ -253,77 +215,6 @@ def render_baritems(
)
def update_step_xy(
src_shm: ShmArray,
array_key: str,
y_update: np.ndarray,
slc: slice,
ln: int,
first: int,
last: int,
is_append: bool,
) -> np.ndarray:
# for a step curve we slice from one datum prior
# to the current "update slice" to get the previous
# "level".
if is_append:
start = max(last - 1, 0)
end = src_shm._last.value
new_y = src_shm._array[start:end][array_key]
slc = slice(start, end)
else:
new_y = y_update
return (
np.broadcast_to(
new_y[:, None], (new_y.size, 2),
),
slc,
)
def step_to_xy(
r: Renderer,
array: np.ndarray,
array_key: str,
vr: tuple[int, int],
) -> tuple[
np.ndarray,
np.nd.array,
str,
]:
# 2 more datum-indexes to capture zero at end
x_step = r.x_data[r._xy_first:r._xy_last+2]
y_step = r.y_data[r._xy_first:r._xy_last+2]
lasts = array[['index', array_key]]
last = lasts[array_key][-1]
y_step[-1] = last
# slice out in-view data
ivl, ivr = vr
ys_iv = y_step[ivl:ivr+1]
xs_iv = x_step[ivl:ivr+1]
# flatten to 1d
y_iv = ys_iv.reshape(ys_iv.size)
x_iv = xs_iv.reshape(xs_iv.size)
# print(
# f'ys_iv : {ys_iv[-s:]}\n'
# f'y_iv: {y_iv[-s:]}\n'
# f'xs_iv: {xs_iv[-s:]}\n'
# f'x_iv: {x_iv[-s:]}\n'
# )
return x_iv, y_iv, 'all'
class Flow(msgspec.Struct): # , frozen=True):
'''
(Financial Signal-)Flow compound type which wraps a real-time
@ -786,20 +677,6 @@ class Flow(msgspec.Struct): # , frozen=True):
g.update()
def by_index_and_key(
renderer: Renderer,
array: np.ndarray,
array_key: str,
vr: tuple[int, int],
) -> tuple[
np.ndarray,
np.ndarray,
np.ndarray,
]:
return array['index'], array[array_key], 'all'
class Renderer(msgspec.Struct):
flow: Flow
@ -1210,7 +1087,7 @@ class Renderer(msgspec.Struct):
and do_append
and not should_redraw
):
# print(f'{array_key} append len: {append_length}')
print(f'{array_key} append len: {append_length}')
new_x = x_out[-append_length - 2:] # slice_to_head]
new_y = y_out[-append_length - 2:] # slice_to_head]
profiler('sliced append path')
@ -1236,6 +1113,7 @@ class Renderer(msgspec.Struct):
profiler('generated append qpath')
if use_fpath:
print(f'{self.flow.name}: FAST PATH')
# an attempt at trying to make append-updates faster..
if fast_path is None:
fast_path = append_path

View File

@ -19,10 +19,12 @@ Super fast ``QPainterPath`` generation related operator routines.
"""
from __future__ import annotations
from typing import (
# Optional,
Optional,
Callable,
TYPE_CHECKING,
)
import msgspec
import numpy as np
from numpy.lib import recfunctions as rfn
from numba import njit, float64, int64 # , optional
@ -42,6 +44,56 @@ if TYPE_CHECKING:
from ._flows import Renderer
def by_index_and_key(
renderer: Renderer,
array: np.ndarray,
array_key: str,
vr: tuple[int, int],
) -> tuple[
np.ndarray,
np.ndarray,
np.ndarray,
]:
return array['index'], array[array_key], 'all'
class IncrementalFormatter(msgspec.Struct):
shm: ShmArray
# optional pre-graphics xy formatted data which
# is incrementally updated in sync with the source data.
allocate_xy_nd: Optional[Callable[
[int, slice],
tuple[np.ndarray, np.nd.array]
]] = None
incr_update_xy_nd: Optional[Callable[
[int, slice], None]
] = None
# default just returns index, and named array from data
format_xy_nd_to_1d: Callable[
[np.ndarray, str],
tuple[np.ndarray]
] = by_index_and_key
x_nd: Optional[np.ndarray] = None
y_nd: Optional[np.ndarray] = None
x_1d: Optional[np.ndarray] = None
y_1d: Optional[np.ndarray] = None
# indexes which slice into the above arrays (which are allocated
# based on source data shm input size) and allow retrieving
# incrementally updated data.
# _xy_first: int = 0
# _xy_last: int = 0
xy_nd_start: int = 0
xy_nd_end: int = 0
def xy_downsample(
x,
y,
@ -214,6 +266,55 @@ def ohlc_to_line(
)
def update_ohlc_to_line(
src_shm: ShmArray,
array_key: str,
src_update: np.ndarray,
slc: slice,
ln: int,
first: int,
last: int,
is_append: bool,
) -> np.ndarray:
fields = ['open', 'high', 'low', 'close']
return (
rfn.structured_to_unstructured(src_update[fields]),
slc,
)
def ohlc_flat_to_xy(
r: Renderer,
array: np.ndarray,
array_key: str,
vr: tuple[int, int],
) -> tuple[
np.ndarray,
np.nd.array,
str,
]:
# TODO: in the case of an existing ``.update_xy()``
# should we be passing in array as an xy arrays tuple?
# 2 more datum-indexes to capture zero at end
x_flat = r.x_data[r._xy_first:r._xy_last]
y_flat = r.y_data[r._xy_first:r._xy_last]
# slice to view
ivl, ivr = vr
x_iv_flat = x_flat[ivl:ivr]
y_iv_flat = y_flat[ivl:ivr]
# reshape to 1d for graphics rendering
y_iv = y_iv_flat.reshape(-1)
x_iv = x_iv_flat.reshape(-1)
return x_iv, y_iv, 'all'
def to_step_format(
shm: ShmArray,
data_field: str,
@ -239,3 +340,74 @@ def to_step_format(
# start y at origin level
y_out[0, 0] = 0
return x_out, y_out
def update_step_xy(
src_shm: ShmArray,
array_key: str,
y_update: np.ndarray,
slc: slice,
ln: int,
first: int,
last: int,
is_append: bool,
) -> np.ndarray:
# for a step curve we slice from one datum prior
# to the current "update slice" to get the previous
# "level".
if is_append:
start = max(last - 1, 0)
end = src_shm._last.value
new_y = src_shm._array[start:end][array_key]
slc = slice(start, end)
else:
new_y = y_update
return (
np.broadcast_to(
new_y[:, None], (new_y.size, 2),
),
slc,
)
def step_to_xy(
r: Renderer,
array: np.ndarray,
array_key: str,
vr: tuple[int, int],
) -> tuple[
np.ndarray,
np.nd.array,
str,
]:
# 2 more datum-indexes to capture zero at end
x_step = r.x_data[r._xy_first:r._xy_last+2]
y_step = r.y_data[r._xy_first:r._xy_last+2]
lasts = array[['index', array_key]]
last = lasts[array_key][-1]
y_step[-1] = last
# slice out in-view data
ivl, ivr = vr
ys_iv = y_step[ivl:ivr+1]
xs_iv = x_step[ivl:ivr+1]
# flatten to 1d
y_iv = ys_iv.reshape(ys_iv.size)
x_iv = xs_iv.reshape(xs_iv.size)
# print(
# f'ys_iv : {ys_iv[-s:]}\n'
# f'y_iv: {y_iv[-s:]}\n'
# f'xs_iv: {xs_iv[-s:]}\n'
# f'x_iv: {x_iv[-s:]}\n'
# )
return x_iv, y_iv, 'all'