Compare commits

..

8 Commits

Author SHA1 Message Date
Tyler Goodlet c82ca812a8 Pass display state table to interaction handlers
This took a teensie bit of reworking in some `.ui` modules
more or less in the following order of functional dependence:

- add a `Ctl-R` kb-binding to trigger a `Viz.reset_graphics()` in
  the kb-handler task `handle_viewmode_kb_inputs()`.
  - call the new method on all `Viz`s (& for all sample-rates) and
    `DisplayState` refs provided in a (new input)
    `dss: dict[str, DisplayState]` table, which was originally inite-ed
    from the multi-feed display loop (so orig in `.graphics_update_loop()`
    but now provided as an input to that func, see below..)
- `._interaction`: allow binding in `async_handler()` kwargs (`via
  a `functools.partial`) passed to `ChartView.open_async_input_handler()`
  such that arbitrary inputs to our kb+mouse handler funcs can accept
  "wtv we desire".
  - use ^ to bind in the aforementioned `dss` display-state table to
    said handlers!
- define the `dss` table (as mentioned) inside `._display.display_symbol_data()`
  and pass it into the update loop funcs as well as the newly augmented
  `.open_async_input_handler()` calls,
  - drop calling `chart.view.open_async_input_handler()` from the
    `.order_mode.open_order_mode()`'s enter block and instead factor it
    into the caller to support passing the `dss` table to the kb
    handlers.
  - comment out the original history update loop handling of forced `Viz`
    redraws entirely since we now have a manual method via `Ctl-R`.
  - now, just update the `._remote_ctl.dss: dict` with this table since
    we want to also provide rc for **all** loaded feeds, not just the
    currently shown one/set.
- docs, naming and typing tweaks to `._event.open_handlers()`
2023-12-28 21:06:06 -05:00
Tyler Goodlet a7ad50cf8f Add `Viz.reset_graphics()` for "force re-render"
Since we're now using it multiple layers probably makes sense to impl
and wrap it more correctly / publicly. The main (recent) use case is
where editing an underlying time series and then wanting to refresh the
graphics layers to reflect the changes in a chart. Part of this also
obviously includes wiping the y-range mx/mn cache.

Also ensure that `force_redraw` is proxying through to any `BarItems`
via the new `render_baritems()` func kwarg even when switching between
downsampled-line vs. bars modes.
2023-12-28 18:00:26 -05:00
Tyler Goodlet 661805695e Reimpl axis dt label contents gen with `polars`
Since `polars` has a more sane set of (time-zone aware) datetime APIs it
makes more sense and is definitely no slower then the previous `numpy`
impl. Also, actually use the sample-rate specific formats defined in
`DynamicDateAxis.tick_tpl`: dict[int, str]` finally using the new
`Viz.time_step()` property.
2023-12-28 11:08:29 -05:00
Tyler Goodlet 3de7c9a9eb Add `Viz.time_step()`, the sample step-size in time
Since we end up needing the actual (OHLC sampled) time step info (at
least in seconds) for various purposes (in this specific follow up use
case to determine sample-rate specific `datetime` format strings for
a charted time series x-axis label), allow always reading it from the
viz with the presumption (at least for now) the underlying data-frame
will have an epoch `'time'` col/field.
2023-12-28 11:02:06 -05:00
Tyler Goodlet 59536bd284 Use `import <name> as <name>,` in `.tsp`
Thanks to oremanj in the `trio` room for this hot style tip which i much
prefer to have less LOC and places to change sub-pkg name exports!

Also drop expecting a `gaps` frame output from `dedupe()`.
2023-12-28 10:58:22 -05:00
Tyler Goodlet 5702e422d8 Drop gap detection from `dedupe()`, expect caller to handle it 2023-12-28 10:40:08 -05:00
Tyler Goodlet 07331a160e Expose "bar gap margin" as `.ui._formatters.BGM: float` 2023-12-28 10:37:20 -05:00
Tyler Goodlet 0d18cb65c3 Lul, actually detect gaps for 1s OHLC
Turns out we were always filtering to time gaps longer then a day smh..
Instead tweak `detect_time_gaps()` to only return venue-gaps when
a `gap_dt_unit: str` is passed and pass `'days'` (like it was by default
before) from `dedupe()` though we should really pass in an actual venue
gap duration in the future.
2023-12-27 16:55:00 -05:00
9 changed files with 400 additions and 252 deletions

View File

@ -41,6 +41,11 @@ if TYPE_CHECKING:
) )
from piker.toolz import Profiler from piker.toolz import Profiler
# default gap between bars: "bar gap multiplier"
# - 0.5 is no overlap between OC arms,
# - 1.0 is full overlap on each neighbor sample
BGM: float = 0.16
class IncrementalFormatter(msgspec.Struct): class IncrementalFormatter(msgspec.Struct):
''' '''
@ -513,6 +518,7 @@ class IncrementalFormatter(msgspec.Struct):
class OHLCBarsFmtr(IncrementalFormatter): class OHLCBarsFmtr(IncrementalFormatter):
x_offset: np.ndarray = np.array([ x_offset: np.ndarray = np.array([
-0.5, -0.5,
0, 0,
@ -604,8 +610,9 @@ class OHLCBarsFmtr(IncrementalFormatter):
vr: tuple[int, int], vr: tuple[int, int],
start: int = 0, # XXX: do we need this? start: int = 0, # XXX: do we need this?
# 0.5 is no overlap between arms, 1.0 is full overlap # 0.5 is no overlap between arms, 1.0 is full overlap
w: float = 0.16, gap: float = BGM,
) -> tuple[ ) -> tuple[
np.ndarray, np.ndarray,
@ -622,7 +629,7 @@ class OHLCBarsFmtr(IncrementalFormatter):
array[:-1], array[:-1],
start, start,
bar_w=self.index_step_size, bar_w=self.index_step_size,
bar_gap=w * self.index_step_size, bar_gap=gap * self.index_step_size,
# XXX: don't ask, due to a ``numba`` bug.. # XXX: don't ask, due to a ``numba`` bug..
use_time_index=(self.index_field == 'time'), use_time_index=(self.index_field == 'time'),

View File

@ -67,50 +67,28 @@ from ..data._sampling import (
) )
from ._anal import ( from ._anal import (
get_null_segs, get_null_segs as get_null_segs,
iter_null_segs, iter_null_segs as iter_null_segs,
Frame, Frame as Frame,
Seq, Seq as Seq,
# codec-ish # codec-ish
np2pl, np2pl as np2pl,
pl2np, pl2np as pl2np,
# `numpy` only # `numpy` only
slice_from_time, slice_from_time as slice_from_time,
# `polars` specific # `polars` specific
dedupe, dedupe as dedupe,
with_dts, with_dts as with_dts,
detect_time_gaps, detect_time_gaps as detect_time_gaps,
sort_diff, sort_diff as sort_diff,
# TODO: # TODO:
detect_price_gaps detect_price_gaps as detect_price_gaps
) )
__all__: list[str] = [
'dedupe',
'get_null_segs',
'iter_null_segs',
'sort_diff',
'slice_from_time',
'Frame',
'Seq',
'np2pl',
'pl2np',
'slice_from_time',
'with_dts',
'detect_time_gaps',
'sort_diff',
# TODO:
'detect_price_gaps'
]
# TODO: break up all this shite into submods! # TODO: break up all this shite into submods!
from ..brokers._util import ( from ..brokers._util import (
DataUnavailable, DataUnavailable,
@ -440,8 +418,11 @@ async def start_backfill(
# broker says there never was or is no more history to pull # broker says there never was or is no more history to pull
except DataUnavailable: except DataUnavailable:
log.warning( log.warning(
f'NO-MORE-DATA: backend {mod.name} halted history:\n' f'NO-MORE-DATA in range?\n'
f'{timeframe}@{mkt.fqme}' f'`{mod.name}` halted history:\n'
f'tf@fqme: {timeframe}@{mkt.fqme}\n'
'bf_until <- last_start_dt:\n'
f'{backfill_until_dt} <- {last_start_dt}\n'
) )
# ugh, what's a better way? # ugh, what's a better way?
@ -586,13 +567,12 @@ async def start_backfill(
load_from_offline=False, load_from_offline=False,
) )
( (
df, wdts,
gaps,
deduped, deduped,
diff, diff,
) = dedupe(df) ) = dedupe(df)
if diff: # if diff:
sort_diff(df) # sort_diff(df)
else: else:
# finally filled gap # finally filled gap

View File

@ -380,10 +380,6 @@ def get_null_segs(
None, # backfilled on next iter None, # backfilled on next iter
]) ])
# row = zero_t[fi]
# absi_pre_zseg = row['index'][0] - 1
# absi_pre_zseg = absi - 1
# final iter case, backfill FINAL end iabs! # final iter case, backfill FINAL end iabs!
if (i + 1) == fi_zgaps.size: if (i + 1) == fi_zgaps.size:
absi_zsegs[-1][1] = absi_zeros[-1] + 1 absi_zsegs[-1][1] = absi_zeros[-1] + 1
@ -510,10 +506,10 @@ def iter_null_segs(
) )
# TODO: move to ._pl_anal
def with_dts( def with_dts(
df: pl.DataFrame, df: pl.DataFrame,
time_col: str = 'time', time_col: str = 'time',
) -> pl.DataFrame: ) -> pl.DataFrame:
''' '''
Insert datetime (casted) columns to a (presumably) OHLC sampled Insert datetime (casted) columns to a (presumably) OHLC sampled
@ -529,9 +525,7 @@ def with_dts(
column=pl.col(f'{time_col}_prev'), column=pl.col(f'{time_col}_prev'),
).alias('dt_prev'), ).alias('dt_prev'),
pl.col('dt').diff().alias('dt_diff'), pl.col('dt').diff().alias('dt_diff'),
]) #.with_columns( ])
# pl.col('dt').diff().dt.days().alias('days_dt_diff'),
# )
t_unit: Literal = Literal[ t_unit: Literal = Literal[
@ -546,25 +540,23 @@ t_unit: Literal = Literal[
def detect_time_gaps( def detect_time_gaps(
df: pl.DataFrame, w_dts: pl.DataFrame,
time_col: str = 'time', time_col: str = 'time',
# epoch sampling step diff # epoch sampling step diff
expect_period: float = 60, expect_period: float = 60,
# datetime diff unit and gap value
# crypto mkts
# gap_dt_unit: t_unit = 'minutes',
# gap_thresh: int = 1,
# NOTE: legacy stock mkts have venue operating hours # NOTE: legacy stock mkts have venue operating hours
# and thus gaps normally no more then 1-2 days at # and thus gaps normally no more then 1-2 days at
# a time. # a time.
gap_thresh: float = 1.,
# TODO: allow passing in a frame of operating hours?
# -[ ] durations/ranges for faster legit gap checks?
# XXX -> must be valid ``polars.Expr.dt.<name>`` # XXX -> must be valid ``polars.Expr.dt.<name>``
# TODO: allow passing in a frame of operating hours # like 'days' which a sane default for venue closures
# durations/ranges for faster legit gap checks. # though will detect weekend gaps which are normal :o
gap_dt_unit: t_unit = 'days', gap_dt_unit: t_unit | None = None,
gap_thresh: int = 1,
) -> pl.DataFrame: ) -> pl.DataFrame:
''' '''
@ -574,19 +566,24 @@ def detect_time_gaps(
actual missing data segments. actual missing data segments.
''' '''
return ( # first select by any sample-period (in seconds unit) step size
with_dts(df) # greater then expected.
# First by a seconds unit step size step_gaps: pl.DataFrame = w_dts.filter(
.filter( pl.col('s_diff').abs() > expect_period
pl.col('s_diff').abs() > expect_period )
)
.filter( if gap_dt_unit is None:
# Second by an arbitrary dt-unit step size return step_gaps
getattr(
pl.col('dt_diff').dt, # NOTE: this flag is to indicate that on this (sampling) time
gap_dt_unit, # scale we expect to only be filtering against larger venue
)().abs() > gap_thresh # closures-scale time gaps.
) return step_gaps.filter(
# Second by an arbitrary dt-unit step size
getattr(
pl.col('dt_diff').dt,
gap_dt_unit,
)().abs() > gap_thresh
) )
@ -622,7 +619,10 @@ def detect_price_gaps(
def dedupe( def dedupe(
src_df: pl.DataFrame, src_df: pl.DataFrame,
time_gaps: pl.DataFrame | None = None,
sort: bool = True, sort: bool = True,
period: float = 60,
) -> tuple[ ) -> tuple[
pl.DataFrame, # with dts pl.DataFrame, # with dts
@ -637,44 +637,28 @@ def dedupe(
dt-deduplicated frame. dt-deduplicated frame.
''' '''
df: pl.DataFrame = with_dts(src_df) wdts: pl.DataFrame = with_dts(src_df)
# TODO: enable passing existing `with_dts` df for speedup? # maybe sort on any time field
gaps: pl.DataFrame = detect_time_gaps(df) if sort:
wdts = wdts.sort(by='time')
# if no gaps detected just return carbon copies # TODO: detect out-of-order segments which were corrected!
# and no len diff. # -[ ] report in log msg
if gaps.is_empty(): # -[ ] possibly return segment sections which were moved?
return (
df,
gaps,
df,
0,
)
# remove duplicated datetime samples/sections # remove duplicated datetime samples/sections
deduped: pl.DataFrame = df.unique( deduped: pl.DataFrame = wdts.unique(
subset=['dt'], subset=['dt'],
maintain_order=True, maintain_order=True,
) )
if sort:
deduped = deduped.sort(by='time')
deduped_gaps: pl.DataFrame = detect_time_gaps(deduped)
diff: int = ( diff: int = (
df.height wdts.height
- -
deduped.height deduped.height
) )
log.warning(
f'TIME GAPs FOUND:\n'
# f'{gaps}\n'
f'deduped Gaps found:\n{deduped_gaps}'
)
return ( return (
df, wdts,
gaps,
deduped, deduped,
diff, diff,
) )
@ -708,7 +692,7 @@ def sort_diff(
# to go from numpy struct-arrays to polars dataframes and back: # to go from numpy struct-arrays to polars dataframes and back:
# https://stackoverflow.com/a/72054819 # https://stackoverflow.com/a/72054819
def np2pl(array: np.ndarray) -> pl.DataFrame: def np2pl(array: np.ndarray) -> pl.DataFrame:
start = time.time() start: float = time.time()
# XXX: thanks to this SO answer for this conversion tip: # XXX: thanks to this SO answer for this conversion tip:
# https://stackoverflow.com/a/72054819 # https://stackoverflow.com/a/72054819

View File

@ -23,7 +23,7 @@ from functools import lru_cache
from typing import Callable from typing import Callable
from math import floor from math import floor
import numpy as np import polars as pl
import pyqtgraph as pg import pyqtgraph as pg
from PyQt5 import QtCore, QtGui, QtWidgets from PyQt5 import QtCore, QtGui, QtWidgets
from PyQt5.QtCore import QPointF from PyQt5.QtCore import QPointF
@ -33,6 +33,7 @@ from ..accounting._mktinfo import float_digits
from ._label import Label from ._label import Label
from ._style import DpiAwareFont, hcolor, _font from ._style import DpiAwareFont, hcolor, _font
from ._interaction import ChartView from ._interaction import ChartView
from ._dataviz import Viz
_axis_pen = pg.mkPen(hcolor('bracket')) _axis_pen = pg.mkPen(hcolor('bracket'))
@ -287,9 +288,7 @@ class DynamicDateAxis(Axis):
# time formats mapped by seconds between bars # time formats mapped by seconds between bars
tick_tpl = { tick_tpl = {
60 * 60 * 24: '%Y-%b-%d', 60 * 60 * 24: '%Y-%b-%d',
60: '%H:%M', 60: '%Y-%b-%d(%H:%M)',
30: '%H:%M:%S',
5: '%H:%M:%S',
1: '%H:%M:%S', 1: '%H:%M:%S',
} }
@ -305,10 +304,10 @@ class DynamicDateAxis(Axis):
# XX: ARGGGGG AG:LKSKDJF:LKJSDFD # XX: ARGGGGG AG:LKSKDJF:LKJSDFD
chart = self.pi.chart_widget chart = self.pi.chart_widget
viz = chart._vizs[chart.name] viz: Viz = chart._vizs[chart.name]
shm = viz.shm shm = viz.shm
array = shm.array array = shm.array
ifield = viz.index_field ifield: str = viz.index_field
index = array[ifield] index = array[ifield]
i_0, i_l = index[0], index[-1] i_0, i_l = index[0], index[-1]
@ -329,7 +328,7 @@ class DynamicDateAxis(Axis):
arr_len = index.shape[0] arr_len = index.shape[0]
first = shm._first.value first = shm._first.value
times = array['time'] times = array['time']
epochs = times[ epochs: list[int] = times[
list( list(
map( map(
int, int,
@ -341,23 +340,30 @@ class DynamicDateAxis(Axis):
) )
] ]
else: else:
epochs = list(map(int, indexes)) epochs: list[int] = list(map(int, indexes))
# TODO: **don't** have this hard coded shift to EST # TODO: **don't** have this hard coded shift to EST
# delay = times[-1] - times[-2] delay: float = viz.time_step()
dts = np.array( if delay > 1:
# NOTE: use less granular dt-str when using 1M+ OHLC
fmtstr: str = self.tick_tpl[delay]
else:
fmtstr: str = '%Y-%m-%d(%H:%M:%S)'
# https://pola-rs.github.io/polars/py-polars/html/reference/expressions/api/polars.from_epoch.html#polars-from-epoch
pl_dts: pl.Series = pl.from_epoch(
epochs, epochs,
dtype='datetime64[s]', time_unit='s',
# NOTE: kinda weird we can pass it to `.from_epoch()` no?
).dt.replace_time_zone(
time_zone='UTC'
).dt.convert_time_zone(
# TODO: pull this from either:
# -[ ] the mkt venue tz by default
# -[ ] the user's config under `sys.mkt_timezone: str`
'EST'
) )
return pl_dts.dt.to_string(fmtstr).to_list()
# see units listing:
# https://numpy.org/devdocs/reference/arrays.datetime.html#datetime-units
return list(np.datetime_as_string(dts))
# TODO: per timeframe formatting?
# - we probably need this based on zoom now right?
# prec = self.np_dt_precision[delay]
# return dts.strftime(self.tick_tpl[delay])
def tickStrings( def tickStrings(
self, self,

View File

@ -36,6 +36,9 @@ from msgspec import (
field, field,
) )
import numpy as np import numpy as np
from numpy import (
ndarray,
)
import pyqtgraph as pg import pyqtgraph as pg
from PyQt5.QtCore import QLineF from PyQt5.QtCore import QLineF
@ -82,10 +85,11 @@ def render_baritems(
viz: Viz, viz: Viz,
graphics: BarItems, graphics: BarItems,
read: tuple[ read: tuple[
int, int, np.ndarray, int, int, ndarray,
int, int, np.ndarray, int, int, ndarray,
], ],
profiler: Profiler, profiler: Profiler,
force_redraw: bool = False,
**kwargs, **kwargs,
) -> None: ) -> None:
@ -216,9 +220,11 @@ def render_baritems(
viz._in_ds = should_line viz._in_ds = should_line
should_redraw = ( should_redraw = (
changed_to_line force_redraw
or changed_to_line
or not should_line or not should_line
) )
# print(f'should_redraw: {should_redraw}')
return ( return (
graphics, graphics,
r, r,
@ -250,7 +256,7 @@ class ViewState(Struct):
] | None = None ] | None = None
# last in view ``ShmArray.array[read_slc]`` data # last in view ``ShmArray.array[read_slc]`` data
in_view: np.ndarray | None = None in_view: ndarray | None = None
class Viz(Struct): class Viz(Struct):
@ -313,6 +319,7 @@ class Viz(Struct):
_last_uppx: float = 0 _last_uppx: float = 0
_in_ds: bool = False _in_ds: bool = False
_index_step: float | None = None _index_step: float | None = None
_time_step: float | None = None
# map from uppx -> (downsampled data, incremental graphics) # map from uppx -> (downsampled data, incremental graphics)
_src_r: Renderer | None = None _src_r: Renderer | None = None
@ -359,7 +366,8 @@ class Viz(Struct):
def index_step( def index_step(
self, self,
reset: bool = False, index_field: str | None = None,
) -> float: ) -> float:
''' '''
Return the size between sample steps in the units of the Return the size between sample steps in the units of the
@ -367,12 +375,17 @@ class Viz(Struct):
epoch time in seconds. epoch time in seconds.
''' '''
# attempt to dectect the best step size by scanning a sample of # attempt to detect the best step size by scanning a sample
# the source data. # of the source data.
if self._index_step is None: if (
self._index_step is None
index: np.ndarray = self.shm.array[self.index_field] or index_field is not None
isample: np.ndarray = index[-16:] ):
index: ndarray = self.shm.array[
index_field
or self.index_field
]
isample: ndarray = index[-16:]
mxdiff: None | float = None mxdiff: None | float = None
for step in np.diff(isample): for step in np.diff(isample):
@ -386,7 +399,15 @@ class Viz(Struct):
) )
mxdiff = step mxdiff = step
self._index_step = max(mxdiff, 1) step: float = max(mxdiff, 1)
# only SET the internal index step if an explicit
# field name is NOT passed, since in such cases this
# is likely just being called from `.time_step()`.
if index_field is not None:
return step
self._index_step = step
if ( if (
mxdiff < 1 mxdiff < 1
or 1 < mxdiff < 60 or 1 < mxdiff < 60
@ -397,6 +418,17 @@ class Viz(Struct):
return self._index_step return self._index_step
def time_step(self) -> float:
'''
Attempt to determine the per-sample time-step period by
forcing an epoch-index and calling `.index_step()`.
'''
if self._time_step is None:
self._time_step: float = self.index_step(index_field='time')
return self._time_step
def maxmin( def maxmin(
self, self,
@ -404,6 +436,9 @@ class Viz(Struct):
i_read_range: tuple[int, int] | None = None, i_read_range: tuple[int, int] | None = None,
use_caching: bool = True, use_caching: bool = True,
# XXX: internal debug
_do_print: bool = False
) -> tuple[float, float] | None: ) -> tuple[float, float] | None:
''' '''
Compute the cached max and min y-range values for a given Compute the cached max and min y-range values for a given
@ -423,15 +458,14 @@ class Viz(Struct):
if shm is None: if shm is None:
return None return None
do_print: bool = False arr: ndarray = shm.array
arr = shm.array
if i_read_range is not None: if i_read_range is not None:
read_slc = slice(*i_read_range) read_slc = slice(*i_read_range)
index = arr[read_slc][self.index_field] index: float | int = arr[read_slc][self.index_field]
if not index.size: if not index.size:
return None return None
ixrng = (index[0], index[-1]) ixrng: tuple[int, int] = (index[0], index[-1])
else: else:
if x_range is None: if x_range is None:
@ -449,15 +483,24 @@ class Viz(Struct):
# TODO: hash the slice instead maybe? # TODO: hash the slice instead maybe?
# https://stackoverflow.com/a/29980872 # https://stackoverflow.com/a/29980872
ixrng = lbar, rbar = round(x_range[0]), round(x_range[1]) ixrng = lbar, rbar = (
round(x_range[0]),
round(x_range[1]),
)
if ( if (
use_caching use_caching
and self._mxmn_cache_enabled and self._mxmn_cache_enabled
): ):
# TODO: is there a way to ONLY clear ranges containing
# a certain sub-range?
# -[ ] currently we have a problem where a previously
# cached mxmn will persist even if the viz is "hard
# re-rendered" (usually bc underlying data was
# corrected)
cached_result = self._mxmns.get(ixrng) cached_result = self._mxmns.get(ixrng)
if cached_result: if cached_result:
if do_print: if _do_print:
print( print(
f'{self.name} CACHED maxmin\n' f'{self.name} CACHED maxmin\n'
f'{ixrng} -> {cached_result}' f'{ixrng} -> {cached_result}'
@ -487,7 +530,7 @@ class Viz(Struct):
(rbar - ifirst) + 1 (rbar - ifirst) + 1
) )
slice_view = arr[read_slc] slice_view: ndarray = arr[read_slc]
if not slice_view.size: if not slice_view.size:
log.warning( log.warning(
@ -498,7 +541,7 @@ class Viz(Struct):
elif self.ds_yrange: elif self.ds_yrange:
mxmn = self.ds_yrange mxmn = self.ds_yrange
if do_print: if _do_print:
print( print(
f'{self.name} M4 maxmin:\n' f'{self.name} M4 maxmin:\n'
f'{ixrng} -> {mxmn}' f'{ixrng} -> {mxmn}'
@ -515,7 +558,7 @@ class Viz(Struct):
mxmn = ylow, yhigh mxmn = ylow, yhigh
if ( if (
do_print _do_print
): ):
s = 3 s = 3
print( print(
@ -529,14 +572,23 @@ class Viz(Struct):
# cache result for input range # cache result for input range
ylow, yhi = mxmn ylow, yhi = mxmn
diff: float = yhi - ylow
# order-of-magnitude check
# TODO: really we should be checking the hi or low
# against the previous sample to catch stuff like,
# - rando stock (reverse-)split
# - null-segments written by some prior
# crash-during-backfil
if diff > 0:
omg: float = abs(logf(diff, 10))
else:
omg: float = 0
try: try:
prolly_anomaly: bool = ( prolly_anomaly: bool = (
( # diff == 0
abs(logf(ylow, 10)) > 16 (ylow and omg > 10)
if ylow
else False
)
or ( or (
isnan(ylow) or isnan(yhi) isnan(ylow) or isnan(yhi)
) )
@ -577,7 +629,7 @@ class Viz(Struct):
self, self,
view_range: None | tuple[float, float] = None, view_range: None | tuple[float, float] = None,
index_field: str | None = None, index_field: str | None = None,
array: np.ndarray | None = None, array: ndarray | None = None,
) -> tuple[ ) -> tuple[
int, int, int, int, int, int int, int, int, int, int, int
@ -648,8 +700,8 @@ class Viz(Struct):
profiler: None | Profiler = None, profiler: None | Profiler = None,
) -> tuple[ ) -> tuple[
int, int, np.ndarray, int, int, ndarray,
int, int, np.ndarray, int, int, ndarray,
]: ]:
''' '''
Read the underlying shm array buffer and Read the underlying shm array buffer and
@ -819,6 +871,10 @@ class Viz(Struct):
graphics, graphics,
read, read,
profiler, profiler,
# NOTE: only set when caller says to
force_redraw=should_redraw,
**kwargs, **kwargs,
) )
@ -981,6 +1037,39 @@ class Viz(Struct):
graphics, graphics,
) )
def reset_graphics(
self,
# TODO: allow only resetting within some x-domain range?
# ixrng: tuple[int, int] | None = None,
) -> None:
'''
Hard reset all graphics (rendering) layers for this
data viz including clearing the mxmn auto-y-range
cache.
Normally called when the underlying data set is modified
(probably by some `.tsp` correcting/editing routine) and
the (now cached) graphics need to be fully re-rendered from
source.
'''
log.warning(
f'Forcing hard Viz graphihcs RESET:\n'
f'.name: {self.name}\n'
f'.index_field: {self.index_field}\n'
f'.index_step(): {self.index_step()}\n'
f'.time_step(): {self.time_step()}\n'
)
# XXX: always clear the mxn y-range cache
# to avoid old data (anomalies) from being
# retained in auto-yrange output.
self._mxmn_cache_enabled = False
self._mxmns.clear()
self.update_graphics(force_redraw=True)
self._mxmn_cache_enabled = True
def draw_last( def draw_last(
self, self,
array_key: str | None = None, array_key: str | None = None,
@ -1073,7 +1162,7 @@ class Viz(Struct):
''' '''
shm: ShmArray = self.shm shm: ShmArray = self.shm
array: np.ndarray = shm.array array: ndarray = shm.array
view: ChartView = self.plot.vb view: ChartView = self.plot.vb
( (
vl, vl,

View File

@ -210,9 +210,9 @@ async def increment_history_view(
): ):
hist_chart: ChartPlotWidget = ds.hist_chart hist_chart: ChartPlotWidget = ds.hist_chart
hist_viz: Viz = ds.hist_viz hist_viz: Viz = ds.hist_viz
viz: Viz = ds.viz # viz: Viz = ds.viz
assert 'hist' in hist_viz.shm.token['shm_name'] assert 'hist' in hist_viz.shm.token['shm_name']
name: str = hist_viz.name # name: str = hist_viz.name
# TODO: seems this is more reliable at keeping the slow # TODO: seems this is more reliable at keeping the slow
# chart incremented in view more correctly? # chart incremented in view more correctly?
@ -225,7 +225,8 @@ async def increment_history_view(
# draw everything from scratch on first entry! # draw everything from scratch on first entry!
for curve_name, hist_viz in hist_chart._vizs.items(): for curve_name, hist_viz in hist_chart._vizs.items():
log.info(f'Forcing hard redraw -> {curve_name}') log.info(f'Forcing hard redraw -> {curve_name}')
hist_viz.update_graphics(force_redraw=True) hist_viz.reset_graphics()
# hist_viz.update_graphics(force_redraw=True)
async with open_sample_stream(1.) as min_istream: async with open_sample_stream(1.) as min_istream:
async for msg in min_istream: async for msg in min_istream:
@ -248,27 +249,27 @@ async def increment_history_view(
# - samplerd could emit the actual update range via # - samplerd could emit the actual update range via
# tuple and then we only enter the below block if that # tuple and then we only enter the below block if that
# range is detected as in-view? # range is detected as in-view?
match msg: # match msg:
case { # case {
'backfilling': (viz_name, timeframe), # 'backfilling': (viz_name, timeframe),
} if ( # } if (
viz_name == name # viz_name == name
): # ):
log.warning( # log.warning(
f'Forcing HARD REDRAW:\n' # f'Forcing HARD REDRAW:\n'
f'name: {name}\n' # f'name: {name}\n'
f'timeframe: {timeframe}\n' # f'timeframe: {timeframe}\n'
) # )
# TODO: only allow this when the data is IN VIEW! # # TODO: only allow this when the data is IN VIEW!
# also, we probably can do this more efficiently # # also, we probably can do this more efficiently
# / smarter by only redrawing the portion of the # # / smarter by only redrawing the portion of the
# path necessary? # # path necessary?
{ # {
60: hist_viz, # 60: hist_viz,
1: viz, # 1: viz,
}[timeframe].update_graphics( # }[timeframe].update_graphics(
force_redraw=True # force_redraw=True
) # )
# check if slow chart needs an x-domain shift and/or # check if slow chart needs an x-domain shift and/or
# y-range resize. # y-range resize.
@ -309,6 +310,7 @@ async def increment_history_view(
async def graphics_update_loop( async def graphics_update_loop(
dss: dict[str, DisplayState],
nurse: trio.Nursery, nurse: trio.Nursery,
godwidget: GodWidget, godwidget: GodWidget,
feed: Feed, feed: Feed,
@ -350,8 +352,6 @@ async def graphics_update_loop(
'i_last_slow_t': 0, # multiview-global slow (1m) step index 'i_last_slow_t': 0, # multiview-global slow (1m) step index
} }
dss: dict[str, DisplayState] = {}
for fqme, flume in feed.flumes.items(): for fqme, flume in feed.flumes.items():
ohlcv = flume.rt_shm ohlcv = flume.rt_shm
hist_ohlcv = flume.hist_shm hist_ohlcv = flume.hist_shm
@ -470,67 +470,68 @@ async def graphics_update_loop(
if ds.hist_vars['i_last'] < ds.hist_vars['i_last_append']: if ds.hist_vars['i_last'] < ds.hist_vars['i_last_append']:
await tractor.pause() await tractor.pause()
try: # try:
# XXX TODO: we need to do _dss UPDATE here so that when
# a feed-view is switched you can still remote annotate the
# prior view..
from . import _remote_ctl
_remote_ctl._dss = dss
# main real-time quotes update loop # XXX TODO: we need to do _dss UPDATE here so that when
stream: tractor.MsgStream # a feed-view is switched you can still remote annotate the
async with feed.open_multi_stream() as stream: # prior view..
assert stream from . import _remote_ctl
async for quotes in stream: _remote_ctl._dss.update(dss)
quote_period = time.time() - last_quote_s
quote_rate = round( # main real-time quotes update loop
1/quote_period, 1) if quote_period > 0 else float('inf') stream: tractor.MsgStream
async with feed.open_multi_stream() as stream:
# assert stream
async for quotes in stream:
quote_period = time.time() - last_quote_s
quote_rate = round(
1/quote_period, 1) if quote_period > 0 else float('inf')
if (
quote_period <= 1/_quote_throttle_rate
# in the absolute worst case we shouldn't see more then
# twice the expected throttle rate right!?
# and quote_rate >= _quote_throttle_rate * 2
and quote_rate >= display_rate
):
pass
# log.warning(f'High quote rate {mkt.fqme}: {quote_rate}')
last_quote_s: float = time.time()
for fqme, quote in quotes.items():
ds = dss[fqme]
ds.quotes = quote
rt_pi, hist_pi = pis[fqme]
# chart isn't active/shown so skip render cycle and
# pause feed(s)
if ( if (
quote_period <= 1/_quote_throttle_rate fast_chart.linked.isHidden()
or not rt_pi.isVisible()
# in the absolute worst case we shouldn't see more then
# twice the expected throttle rate right!?
# and quote_rate >= _quote_throttle_rate * 2
and quote_rate >= display_rate
): ):
pass print(f'{fqme} skipping update for HIDDEN CHART')
# log.warning(f'High quote rate {mkt.fqme}: {quote_rate}') fast_chart.pause_all_feeds()
continue
last_quote_s = time.time() ic = fast_chart.view._in_interact
if ic:
fast_chart.pause_all_feeds()
print(f'{fqme} PAUSING DURING INTERACTION')
await ic.wait()
fast_chart.resume_all_feeds()
for fqme, quote in quotes.items(): # sync call to update all graphics/UX components.
ds = dss[fqme] graphics_update_cycle(
ds.quotes = quote ds,
rt_pi, hist_pi = pis[fqme] quote,
)
# chart isn't active/shown so skip render cycle and # finally:
# pause feed(s) # # XXX: cancel any remote annotation control ctxs
if ( # _remote_ctl._dss = None
fast_chart.linked.isHidden() # for cid, (ctx, aids) in _remote_ctl._ctxs.items():
or not rt_pi.isVisible() # await ctx.cancel()
):
print(f'{fqme} skipping update for HIDDEN CHART')
fast_chart.pause_all_feeds()
continue
ic = fast_chart.view._in_interact
if ic:
fast_chart.pause_all_feeds()
print(f'{fqme} PAUSING DURING INTERACTION')
await ic.wait()
fast_chart.resume_all_feeds()
# sync call to update all graphics/UX components.
graphics_update_cycle(
ds,
quote,
)
finally:
# XXX: cancel any remote annotation control ctxs
_remote_ctl._dss = None
for cid, (ctx, aids) in _remote_ctl._ctxs.items():
await ctx.cancel()
def graphics_update_cycle( def graphics_update_cycle(
@ -1554,8 +1555,10 @@ async def display_symbol_data(
) )
# start update loop task # start update loop task
dss: dict[str, DisplayState] = {}
ln.start_soon( ln.start_soon(
graphics_update_loop, graphics_update_loop,
dss,
ln, ln,
godwidget, godwidget,
feed, feed,
@ -1569,15 +1572,31 @@ async def display_symbol_data(
order_ctl_fqme: str = fqmes[0] order_ctl_fqme: str = fqmes[0]
mode: OrderMode mode: OrderMode
async with ( async with (
open_order_mode( open_order_mode(
feed, feed,
godwidget, godwidget,
order_ctl_fqme, order_ctl_fqme,
order_mode_started, order_mode_started,
loglevel=loglevel loglevel=loglevel
) as mode ) as mode,
):
# TODO: maybe have these startup sooner before
# order mode fully boots? but we gotta,
# -[ ] decouple the order mode bindings until
# the mode has fully booted..
# -[ ] maybe do an Event to sync?
# start input handling for ``ChartView`` input
# (i.e. kb + mouse handling loops)
rt_chart.view.open_async_input_handler(
dss=dss,
),
hist_chart.view.open_async_input_handler(
dss=dss,
),
):
rt_linked.mode = mode rt_linked.mode = mode
rt_viz = rt_chart.get_viz(order_ctl_fqme) rt_viz = rt_chart.get_viz(order_ctl_fqme)

View File

@ -201,8 +201,8 @@ async def open_signal_handler(
async for args in recv: async for args in recv:
await async_handler(*args) await async_handler(*args)
async with trio.open_nursery() as n: async with trio.open_nursery() as tn:
n.start_soon(proxy_to_handler) tn.start_soon(proxy_to_handler)
async with send: async with send:
yield yield
@ -212,18 +212,48 @@ async def open_handlers(
source_widgets: list[QWidget], source_widgets: list[QWidget],
event_types: set[QEvent], event_types: set[QEvent],
async_handler: Callable[[QWidget, trio.abc.ReceiveChannel], None],
**kwargs, # NOTE: if you want to bind in additional kwargs to the handler
# pass in a `partial()` instead!
async_handler: Callable[
[QWidget, trio.abc.ReceiveChannel], # required handler args
None
],
# XXX: these are ONLY inputs available to the
# `open_event_stream()` event-relay to mem-chan factor above!
**open_ev_stream_kwargs,
) -> None: ) -> None:
'''
Connect and schedule an async handler function to receive an
arbitrary `QWidget`'s events with kb/mouse msgs repacked into
structs (see above) and shuttled over a mem-chan to the input
`async_handler` to allow interaction-IO processing from
a `trio` func-as-task.
'''
widget: QWidget
streams: list[trio.abc.ReceiveChannel]
async with ( async with (
trio.open_nursery() as n, trio.open_nursery() as tn,
gather_contexts([ gather_contexts([
open_event_stream(widget, event_types, **kwargs) open_event_stream(
widget,
event_types,
**open_ev_stream_kwargs,
)
for widget in source_widgets for widget in source_widgets
]) as streams, ]) as streams,
): ):
for widget, event_recv_stream in zip(source_widgets, streams): for widget, event_recv_stream in zip(
n.start_soon(async_handler, widget, event_recv_stream) source_widgets,
streams,
):
tn.start_soon(
async_handler,
widget,
event_recv_stream,
)
yield yield

View File

@ -23,6 +23,7 @@ from contextlib import (
asynccontextmanager, asynccontextmanager,
ExitStack, ExitStack,
) )
from functools import partial
import time import time
from typing import ( from typing import (
Callable, Callable,
@ -74,6 +75,7 @@ if TYPE_CHECKING:
) )
from ._dataviz import Viz from ._dataviz import Viz
from .order_mode import OrderMode from .order_mode import OrderMode
from ._display import DisplayState
log = get_logger(__name__) log = get_logger(__name__)
@ -102,6 +104,7 @@ async def handle_viewmode_kb_inputs(
view: ChartView, view: ChartView,
recv_chan: trio.abc.ReceiveChannel, recv_chan: trio.abc.ReceiveChannel,
dss: dict[str, DisplayState],
) -> None: ) -> None:
@ -177,17 +180,42 @@ async def handle_viewmode_kb_inputs(
Qt.Key_P, Qt.Key_P,
} }
): ):
import tractor
feed = order_mode.feed # noqa feed = order_mode.feed # noqa
chart = order_mode.chart # noqa chart = order_mode.chart # noqa
viz = chart.main_viz # noqa viz = chart.main_viz # noqa
vlm_chart = chart.linked.subplots['volume'] # noqa vlm_chart = chart.linked.subplots['volume'] # noqa
vlm_viz = vlm_chart.main_viz # noqa vlm_viz = vlm_chart.main_viz # noqa
dvlm_pi = vlm_chart._vizs['dolla_vlm'].plot # noqa dvlm_pi = vlm_chart._vizs['dolla_vlm'].plot # noqa
import tractor
await tractor.pause() await tractor.pause()
view.interact_graphics_cycle() view.interact_graphics_cycle()
# SEARCH MODE # # FORCE graphics reset-and-render of all currently
# shown data `Viz`s for the current chart app.
if (
ctrl
and key in {
Qt.Key_R,
}
):
fqme: str
ds: DisplayState
for fqme, ds in dss.items():
viz: Viz
for tf, viz in {
60: ds.hist_viz,
1: ds.viz,
}.items():
# TODO: only allow this when the data is IN VIEW!
# also, we probably can do this more efficiently
# / smarter by only redrawing the portion of the
# path necessary?
viz.reset_graphics()
# ------ - ------
# SEARCH MODE
# ------ - ------
# ctlr-<space>/<l> for "lookup", "search" -> open search tree # ctlr-<space>/<l> for "lookup", "search" -> open search tree
if ( if (
ctrl ctrl
@ -247,8 +275,10 @@ async def handle_viewmode_kb_inputs(
delta=-view.def_delta, delta=-view.def_delta,
) )
elif key == Qt.Key_R: elif (
not ctrl
and key == Qt.Key_R
):
# NOTE: seems that if we don't yield a Qt render # NOTE: seems that if we don't yield a Qt render
# cycle then the m4 downsampled curves will show here # cycle then the m4 downsampled curves will show here
# without another reset.. # without another reset..
@ -431,6 +461,7 @@ async def handle_viewmode_mouse(
view: ChartView, view: ChartView,
recv_chan: trio.abc.ReceiveChannel, recv_chan: trio.abc.ReceiveChannel,
dss: dict[str, DisplayState],
) -> None: ) -> None:
@ -567,6 +598,7 @@ class ChartView(ViewBox):
@asynccontextmanager @asynccontextmanager
async def open_async_input_handler( async def open_async_input_handler(
self, self,
**handler_kwargs,
) -> ChartView: ) -> ChartView:
@ -577,14 +609,20 @@ class ChartView(ViewBox):
QEvent.KeyPress, QEvent.KeyPress,
QEvent.KeyRelease, QEvent.KeyRelease,
}, },
async_handler=handle_viewmode_kb_inputs, async_handler=partial(
handle_viewmode_kb_inputs,
**handler_kwargs,
),
), ),
_event.open_handlers( _event.open_handlers(
[self], [self],
event_types={ event_types={
gs_mouse.GraphicsSceneMousePress, gs_mouse.GraphicsSceneMousePress,
}, },
async_handler=handle_viewmode_mouse, async_handler=partial(
handle_viewmode_mouse,
**handler_kwargs,
),
), ),
): ):
yield self yield self

View File

@ -930,13 +930,8 @@ async def open_order_mode(
msg, msg,
) )
# start async input handling for chart's view
async with ( async with (
# ``ChartView`` input async handler startup
chart.view.open_async_input_handler(),
hist_chart.view.open_async_input_handler(),
# pp pane kb inputs # pp pane kb inputs
open_form_input_handling( open_form_input_handling(
form, form,