Compare commits
8 Commits
ad565936ec
...
c82ca812a8
Author | SHA1 | Date |
---|---|---|
Tyler Goodlet | c82ca812a8 | |
Tyler Goodlet | a7ad50cf8f | |
Tyler Goodlet | 661805695e | |
Tyler Goodlet | 3de7c9a9eb | |
Tyler Goodlet | 59536bd284 | |
Tyler Goodlet | 5702e422d8 | |
Tyler Goodlet | 07331a160e | |
Tyler Goodlet | 0d18cb65c3 |
|
@ -41,6 +41,11 @@ if TYPE_CHECKING:
|
|||
)
|
||||
from piker.toolz import Profiler
|
||||
|
||||
# default gap between bars: "bar gap multiplier"
|
||||
# - 0.5 is no overlap between OC arms,
|
||||
# - 1.0 is full overlap on each neighbor sample
|
||||
BGM: float = 0.16
|
||||
|
||||
|
||||
class IncrementalFormatter(msgspec.Struct):
|
||||
'''
|
||||
|
@ -513,6 +518,7 @@ class IncrementalFormatter(msgspec.Struct):
|
|||
|
||||
|
||||
class OHLCBarsFmtr(IncrementalFormatter):
|
||||
|
||||
x_offset: np.ndarray = np.array([
|
||||
-0.5,
|
||||
0,
|
||||
|
@ -604,8 +610,9 @@ class OHLCBarsFmtr(IncrementalFormatter):
|
|||
vr: tuple[int, int],
|
||||
|
||||
start: int = 0, # XXX: do we need this?
|
||||
|
||||
# 0.5 is no overlap between arms, 1.0 is full overlap
|
||||
w: float = 0.16,
|
||||
gap: float = BGM,
|
||||
|
||||
) -> tuple[
|
||||
np.ndarray,
|
||||
|
@ -622,7 +629,7 @@ class OHLCBarsFmtr(IncrementalFormatter):
|
|||
array[:-1],
|
||||
start,
|
||||
bar_w=self.index_step_size,
|
||||
bar_gap=w * self.index_step_size,
|
||||
bar_gap=gap * self.index_step_size,
|
||||
|
||||
# XXX: don't ask, due to a ``numba`` bug..
|
||||
use_time_index=(self.index_field == 'time'),
|
||||
|
|
|
@ -67,50 +67,28 @@ from ..data._sampling import (
|
|||
)
|
||||
from ._anal import (
|
||||
|
||||
get_null_segs,
|
||||
iter_null_segs,
|
||||
Frame,
|
||||
Seq,
|
||||
get_null_segs as get_null_segs,
|
||||
iter_null_segs as iter_null_segs,
|
||||
Frame as Frame,
|
||||
Seq as Seq,
|
||||
|
||||
# codec-ish
|
||||
np2pl,
|
||||
pl2np,
|
||||
np2pl as np2pl,
|
||||
pl2np as pl2np,
|
||||
|
||||
# `numpy` only
|
||||
slice_from_time,
|
||||
slice_from_time as slice_from_time,
|
||||
|
||||
# `polars` specific
|
||||
dedupe,
|
||||
with_dts,
|
||||
detect_time_gaps,
|
||||
sort_diff,
|
||||
dedupe as dedupe,
|
||||
with_dts as with_dts,
|
||||
detect_time_gaps as detect_time_gaps,
|
||||
sort_diff as sort_diff,
|
||||
|
||||
# TODO:
|
||||
detect_price_gaps
|
||||
detect_price_gaps as detect_price_gaps
|
||||
)
|
||||
|
||||
__all__: list[str] = [
|
||||
'dedupe',
|
||||
'get_null_segs',
|
||||
'iter_null_segs',
|
||||
'sort_diff',
|
||||
'slice_from_time',
|
||||
'Frame',
|
||||
'Seq',
|
||||
|
||||
'np2pl',
|
||||
'pl2np',
|
||||
|
||||
'slice_from_time',
|
||||
|
||||
'with_dts',
|
||||
'detect_time_gaps',
|
||||
'sort_diff',
|
||||
|
||||
# TODO:
|
||||
'detect_price_gaps'
|
||||
]
|
||||
|
||||
# TODO: break up all this shite into submods!
|
||||
from ..brokers._util import (
|
||||
DataUnavailable,
|
||||
|
@ -440,8 +418,11 @@ async def start_backfill(
|
|||
# broker says there never was or is no more history to pull
|
||||
except DataUnavailable:
|
||||
log.warning(
|
||||
f'NO-MORE-DATA: backend {mod.name} halted history:\n'
|
||||
f'{timeframe}@{mkt.fqme}'
|
||||
f'NO-MORE-DATA in range?\n'
|
||||
f'`{mod.name}` halted history:\n'
|
||||
f'tf@fqme: {timeframe}@{mkt.fqme}\n'
|
||||
'bf_until <- last_start_dt:\n'
|
||||
f'{backfill_until_dt} <- {last_start_dt}\n'
|
||||
)
|
||||
|
||||
# ugh, what's a better way?
|
||||
|
@ -586,13 +567,12 @@ async def start_backfill(
|
|||
load_from_offline=False,
|
||||
)
|
||||
(
|
||||
df,
|
||||
gaps,
|
||||
wdts,
|
||||
deduped,
|
||||
diff,
|
||||
) = dedupe(df)
|
||||
if diff:
|
||||
sort_diff(df)
|
||||
# if diff:
|
||||
# sort_diff(df)
|
||||
|
||||
else:
|
||||
# finally filled gap
|
||||
|
|
|
@ -380,10 +380,6 @@ def get_null_segs(
|
|||
None, # backfilled on next iter
|
||||
])
|
||||
|
||||
# row = zero_t[fi]
|
||||
# absi_pre_zseg = row['index'][0] - 1
|
||||
# absi_pre_zseg = absi - 1
|
||||
|
||||
# final iter case, backfill FINAL end iabs!
|
||||
if (i + 1) == fi_zgaps.size:
|
||||
absi_zsegs[-1][1] = absi_zeros[-1] + 1
|
||||
|
@ -510,10 +506,10 @@ def iter_null_segs(
|
|||
)
|
||||
|
||||
|
||||
# TODO: move to ._pl_anal
|
||||
def with_dts(
|
||||
df: pl.DataFrame,
|
||||
time_col: str = 'time',
|
||||
|
||||
) -> pl.DataFrame:
|
||||
'''
|
||||
Insert datetime (casted) columns to a (presumably) OHLC sampled
|
||||
|
@ -529,9 +525,7 @@ def with_dts(
|
|||
column=pl.col(f'{time_col}_prev'),
|
||||
).alias('dt_prev'),
|
||||
pl.col('dt').diff().alias('dt_diff'),
|
||||
]) #.with_columns(
|
||||
# pl.col('dt').diff().dt.days().alias('days_dt_diff'),
|
||||
# )
|
||||
])
|
||||
|
||||
|
||||
t_unit: Literal = Literal[
|
||||
|
@ -546,25 +540,23 @@ t_unit: Literal = Literal[
|
|||
|
||||
|
||||
def detect_time_gaps(
|
||||
df: pl.DataFrame,
|
||||
w_dts: pl.DataFrame,
|
||||
|
||||
time_col: str = 'time',
|
||||
# epoch sampling step diff
|
||||
expect_period: float = 60,
|
||||
|
||||
# datetime diff unit and gap value
|
||||
# crypto mkts
|
||||
# gap_dt_unit: t_unit = 'minutes',
|
||||
# gap_thresh: int = 1,
|
||||
|
||||
# NOTE: legacy stock mkts have venue operating hours
|
||||
# and thus gaps normally no more then 1-2 days at
|
||||
# a time.
|
||||
gap_thresh: float = 1.,
|
||||
|
||||
# TODO: allow passing in a frame of operating hours?
|
||||
# -[ ] durations/ranges for faster legit gap checks?
|
||||
# XXX -> must be valid ``polars.Expr.dt.<name>``
|
||||
# TODO: allow passing in a frame of operating hours
|
||||
# durations/ranges for faster legit gap checks.
|
||||
gap_dt_unit: t_unit = 'days',
|
||||
gap_thresh: int = 1,
|
||||
# like 'days' which a sane default for venue closures
|
||||
# though will detect weekend gaps which are normal :o
|
||||
gap_dt_unit: t_unit | None = None,
|
||||
|
||||
) -> pl.DataFrame:
|
||||
'''
|
||||
|
@ -574,20 +566,25 @@ def detect_time_gaps(
|
|||
actual missing data segments.
|
||||
|
||||
'''
|
||||
return (
|
||||
with_dts(df)
|
||||
# First by a seconds unit step size
|
||||
.filter(
|
||||
# first select by any sample-period (in seconds unit) step size
|
||||
# greater then expected.
|
||||
step_gaps: pl.DataFrame = w_dts.filter(
|
||||
pl.col('s_diff').abs() > expect_period
|
||||
)
|
||||
.filter(
|
||||
|
||||
if gap_dt_unit is None:
|
||||
return step_gaps
|
||||
|
||||
# NOTE: this flag is to indicate that on this (sampling) time
|
||||
# scale we expect to only be filtering against larger venue
|
||||
# closures-scale time gaps.
|
||||
return step_gaps.filter(
|
||||
# Second by an arbitrary dt-unit step size
|
||||
getattr(
|
||||
pl.col('dt_diff').dt,
|
||||
gap_dt_unit,
|
||||
)().abs() > gap_thresh
|
||||
)
|
||||
)
|
||||
|
||||
|
||||
def detect_price_gaps(
|
||||
|
@ -622,7 +619,10 @@ def detect_price_gaps(
|
|||
|
||||
def dedupe(
|
||||
src_df: pl.DataFrame,
|
||||
|
||||
time_gaps: pl.DataFrame | None = None,
|
||||
sort: bool = True,
|
||||
period: float = 60,
|
||||
|
||||
) -> tuple[
|
||||
pl.DataFrame, # with dts
|
||||
|
@ -637,44 +637,28 @@ def dedupe(
|
|||
dt-deduplicated frame.
|
||||
|
||||
'''
|
||||
df: pl.DataFrame = with_dts(src_df)
|
||||
wdts: pl.DataFrame = with_dts(src_df)
|
||||
|
||||
# TODO: enable passing existing `with_dts` df for speedup?
|
||||
gaps: pl.DataFrame = detect_time_gaps(df)
|
||||
|
||||
# if no gaps detected just return carbon copies
|
||||
# and no len diff.
|
||||
if gaps.is_empty():
|
||||
return (
|
||||
df,
|
||||
gaps,
|
||||
df,
|
||||
0,
|
||||
)
|
||||
# maybe sort on any time field
|
||||
if sort:
|
||||
wdts = wdts.sort(by='time')
|
||||
# TODO: detect out-of-order segments which were corrected!
|
||||
# -[ ] report in log msg
|
||||
# -[ ] possibly return segment sections which were moved?
|
||||
|
||||
# remove duplicated datetime samples/sections
|
||||
deduped: pl.DataFrame = df.unique(
|
||||
deduped: pl.DataFrame = wdts.unique(
|
||||
subset=['dt'],
|
||||
maintain_order=True,
|
||||
)
|
||||
if sort:
|
||||
deduped = deduped.sort(by='time')
|
||||
|
||||
deduped_gaps: pl.DataFrame = detect_time_gaps(deduped)
|
||||
|
||||
diff: int = (
|
||||
df.height
|
||||
wdts.height
|
||||
-
|
||||
deduped.height
|
||||
)
|
||||
log.warning(
|
||||
f'TIME GAPs FOUND:\n'
|
||||
# f'{gaps}\n'
|
||||
f'deduped Gaps found:\n{deduped_gaps}'
|
||||
)
|
||||
return (
|
||||
df,
|
||||
gaps,
|
||||
wdts,
|
||||
deduped,
|
||||
diff,
|
||||
)
|
||||
|
@ -708,7 +692,7 @@ def sort_diff(
|
|||
# to go from numpy struct-arrays to polars dataframes and back:
|
||||
# https://stackoverflow.com/a/72054819
|
||||
def np2pl(array: np.ndarray) -> pl.DataFrame:
|
||||
start = time.time()
|
||||
start: float = time.time()
|
||||
|
||||
# XXX: thanks to this SO answer for this conversion tip:
|
||||
# https://stackoverflow.com/a/72054819
|
||||
|
|
|
@ -23,7 +23,7 @@ from functools import lru_cache
|
|||
from typing import Callable
|
||||
from math import floor
|
||||
|
||||
import numpy as np
|
||||
import polars as pl
|
||||
import pyqtgraph as pg
|
||||
from PyQt5 import QtCore, QtGui, QtWidgets
|
||||
from PyQt5.QtCore import QPointF
|
||||
|
@ -33,6 +33,7 @@ from ..accounting._mktinfo import float_digits
|
|||
from ._label import Label
|
||||
from ._style import DpiAwareFont, hcolor, _font
|
||||
from ._interaction import ChartView
|
||||
from ._dataviz import Viz
|
||||
|
||||
_axis_pen = pg.mkPen(hcolor('bracket'))
|
||||
|
||||
|
@ -287,9 +288,7 @@ class DynamicDateAxis(Axis):
|
|||
# time formats mapped by seconds between bars
|
||||
tick_tpl = {
|
||||
60 * 60 * 24: '%Y-%b-%d',
|
||||
60: '%H:%M',
|
||||
30: '%H:%M:%S',
|
||||
5: '%H:%M:%S',
|
||||
60: '%Y-%b-%d(%H:%M)',
|
||||
1: '%H:%M:%S',
|
||||
}
|
||||
|
||||
|
@ -305,10 +304,10 @@ class DynamicDateAxis(Axis):
|
|||
# XX: ARGGGGG AG:LKSKDJF:LKJSDFD
|
||||
chart = self.pi.chart_widget
|
||||
|
||||
viz = chart._vizs[chart.name]
|
||||
viz: Viz = chart._vizs[chart.name]
|
||||
shm = viz.shm
|
||||
array = shm.array
|
||||
ifield = viz.index_field
|
||||
ifield: str = viz.index_field
|
||||
index = array[ifield]
|
||||
i_0, i_l = index[0], index[-1]
|
||||
|
||||
|
@ -329,7 +328,7 @@ class DynamicDateAxis(Axis):
|
|||
arr_len = index.shape[0]
|
||||
first = shm._first.value
|
||||
times = array['time']
|
||||
epochs = times[
|
||||
epochs: list[int] = times[
|
||||
list(
|
||||
map(
|
||||
int,
|
||||
|
@ -341,23 +340,30 @@ class DynamicDateAxis(Axis):
|
|||
)
|
||||
]
|
||||
else:
|
||||
epochs = list(map(int, indexes))
|
||||
epochs: list[int] = list(map(int, indexes))
|
||||
|
||||
# TODO: **don't** have this hard coded shift to EST
|
||||
# delay = times[-1] - times[-2]
|
||||
dts = np.array(
|
||||
delay: float = viz.time_step()
|
||||
if delay > 1:
|
||||
# NOTE: use less granular dt-str when using 1M+ OHLC
|
||||
fmtstr: str = self.tick_tpl[delay]
|
||||
else:
|
||||
fmtstr: str = '%Y-%m-%d(%H:%M:%S)'
|
||||
|
||||
# https://pola-rs.github.io/polars/py-polars/html/reference/expressions/api/polars.from_epoch.html#polars-from-epoch
|
||||
pl_dts: pl.Series = pl.from_epoch(
|
||||
epochs,
|
||||
dtype='datetime64[s]',
|
||||
time_unit='s',
|
||||
# NOTE: kinda weird we can pass it to `.from_epoch()` no?
|
||||
).dt.replace_time_zone(
|
||||
time_zone='UTC'
|
||||
).dt.convert_time_zone(
|
||||
# TODO: pull this from either:
|
||||
# -[ ] the mkt venue tz by default
|
||||
# -[ ] the user's config under `sys.mkt_timezone: str`
|
||||
'EST'
|
||||
)
|
||||
|
||||
# see units listing:
|
||||
# https://numpy.org/devdocs/reference/arrays.datetime.html#datetime-units
|
||||
return list(np.datetime_as_string(dts))
|
||||
|
||||
# TODO: per timeframe formatting?
|
||||
# - we probably need this based on zoom now right?
|
||||
# prec = self.np_dt_precision[delay]
|
||||
# return dts.strftime(self.tick_tpl[delay])
|
||||
return pl_dts.dt.to_string(fmtstr).to_list()
|
||||
|
||||
def tickStrings(
|
||||
self,
|
||||
|
|
|
@ -36,6 +36,9 @@ from msgspec import (
|
|||
field,
|
||||
)
|
||||
import numpy as np
|
||||
from numpy import (
|
||||
ndarray,
|
||||
)
|
||||
import pyqtgraph as pg
|
||||
from PyQt5.QtCore import QLineF
|
||||
|
||||
|
@ -82,10 +85,11 @@ def render_baritems(
|
|||
viz: Viz,
|
||||
graphics: BarItems,
|
||||
read: tuple[
|
||||
int, int, np.ndarray,
|
||||
int, int, np.ndarray,
|
||||
int, int, ndarray,
|
||||
int, int, ndarray,
|
||||
],
|
||||
profiler: Profiler,
|
||||
force_redraw: bool = False,
|
||||
**kwargs,
|
||||
|
||||
) -> None:
|
||||
|
@ -216,9 +220,11 @@ def render_baritems(
|
|||
viz._in_ds = should_line
|
||||
|
||||
should_redraw = (
|
||||
changed_to_line
|
||||
force_redraw
|
||||
or changed_to_line
|
||||
or not should_line
|
||||
)
|
||||
# print(f'should_redraw: {should_redraw}')
|
||||
return (
|
||||
graphics,
|
||||
r,
|
||||
|
@ -250,7 +256,7 @@ class ViewState(Struct):
|
|||
] | None = None
|
||||
|
||||
# last in view ``ShmArray.array[read_slc]`` data
|
||||
in_view: np.ndarray | None = None
|
||||
in_view: ndarray | None = None
|
||||
|
||||
|
||||
class Viz(Struct):
|
||||
|
@ -313,6 +319,7 @@ class Viz(Struct):
|
|||
_last_uppx: float = 0
|
||||
_in_ds: bool = False
|
||||
_index_step: float | None = None
|
||||
_time_step: float | None = None
|
||||
|
||||
# map from uppx -> (downsampled data, incremental graphics)
|
||||
_src_r: Renderer | None = None
|
||||
|
@ -359,7 +366,8 @@ class Viz(Struct):
|
|||
|
||||
def index_step(
|
||||
self,
|
||||
reset: bool = False,
|
||||
index_field: str | None = None,
|
||||
|
||||
) -> float:
|
||||
'''
|
||||
Return the size between sample steps in the units of the
|
||||
|
@ -367,12 +375,17 @@ class Viz(Struct):
|
|||
epoch time in seconds.
|
||||
|
||||
'''
|
||||
# attempt to dectect the best step size by scanning a sample of
|
||||
# the source data.
|
||||
if self._index_step is None:
|
||||
|
||||
index: np.ndarray = self.shm.array[self.index_field]
|
||||
isample: np.ndarray = index[-16:]
|
||||
# attempt to detect the best step size by scanning a sample
|
||||
# of the source data.
|
||||
if (
|
||||
self._index_step is None
|
||||
or index_field is not None
|
||||
):
|
||||
index: ndarray = self.shm.array[
|
||||
index_field
|
||||
or self.index_field
|
||||
]
|
||||
isample: ndarray = index[-16:]
|
||||
|
||||
mxdiff: None | float = None
|
||||
for step in np.diff(isample):
|
||||
|
@ -386,7 +399,15 @@ class Viz(Struct):
|
|||
)
|
||||
mxdiff = step
|
||||
|
||||
self._index_step = max(mxdiff, 1)
|
||||
step: float = max(mxdiff, 1)
|
||||
|
||||
# only SET the internal index step if an explicit
|
||||
# field name is NOT passed, since in such cases this
|
||||
# is likely just being called from `.time_step()`.
|
||||
if index_field is not None:
|
||||
return step
|
||||
|
||||
self._index_step = step
|
||||
if (
|
||||
mxdiff < 1
|
||||
or 1 < mxdiff < 60
|
||||
|
@ -397,6 +418,17 @@ class Viz(Struct):
|
|||
|
||||
return self._index_step
|
||||
|
||||
def time_step(self) -> float:
|
||||
'''
|
||||
Attempt to determine the per-sample time-step period by
|
||||
forcing an epoch-index and calling `.index_step()`.
|
||||
|
||||
'''
|
||||
if self._time_step is None:
|
||||
self._time_step: float = self.index_step(index_field='time')
|
||||
|
||||
return self._time_step
|
||||
|
||||
def maxmin(
|
||||
self,
|
||||
|
||||
|
@ -404,6 +436,9 @@ class Viz(Struct):
|
|||
i_read_range: tuple[int, int] | None = None,
|
||||
use_caching: bool = True,
|
||||
|
||||
# XXX: internal debug
|
||||
_do_print: bool = False
|
||||
|
||||
) -> tuple[float, float] | None:
|
||||
'''
|
||||
Compute the cached max and min y-range values for a given
|
||||
|
@ -423,15 +458,14 @@ class Viz(Struct):
|
|||
if shm is None:
|
||||
return None
|
||||
|
||||
do_print: bool = False
|
||||
arr = shm.array
|
||||
arr: ndarray = shm.array
|
||||
|
||||
if i_read_range is not None:
|
||||
read_slc = slice(*i_read_range)
|
||||
index = arr[read_slc][self.index_field]
|
||||
index: float | int = arr[read_slc][self.index_field]
|
||||
if not index.size:
|
||||
return None
|
||||
ixrng = (index[0], index[-1])
|
||||
ixrng: tuple[int, int] = (index[0], index[-1])
|
||||
|
||||
else:
|
||||
if x_range is None:
|
||||
|
@ -449,15 +483,24 @@ class Viz(Struct):
|
|||
|
||||
# TODO: hash the slice instead maybe?
|
||||
# https://stackoverflow.com/a/29980872
|
||||
ixrng = lbar, rbar = round(x_range[0]), round(x_range[1])
|
||||
ixrng = lbar, rbar = (
|
||||
round(x_range[0]),
|
||||
round(x_range[1]),
|
||||
)
|
||||
|
||||
if (
|
||||
use_caching
|
||||
and self._mxmn_cache_enabled
|
||||
):
|
||||
# TODO: is there a way to ONLY clear ranges containing
|
||||
# a certain sub-range?
|
||||
# -[ ] currently we have a problem where a previously
|
||||
# cached mxmn will persist even if the viz is "hard
|
||||
# re-rendered" (usually bc underlying data was
|
||||
# corrected)
|
||||
cached_result = self._mxmns.get(ixrng)
|
||||
if cached_result:
|
||||
if do_print:
|
||||
if _do_print:
|
||||
print(
|
||||
f'{self.name} CACHED maxmin\n'
|
||||
f'{ixrng} -> {cached_result}'
|
||||
|
@ -487,7 +530,7 @@ class Viz(Struct):
|
|||
(rbar - ifirst) + 1
|
||||
)
|
||||
|
||||
slice_view = arr[read_slc]
|
||||
slice_view: ndarray = arr[read_slc]
|
||||
|
||||
if not slice_view.size:
|
||||
log.warning(
|
||||
|
@ -498,7 +541,7 @@ class Viz(Struct):
|
|||
|
||||
elif self.ds_yrange:
|
||||
mxmn = self.ds_yrange
|
||||
if do_print:
|
||||
if _do_print:
|
||||
print(
|
||||
f'{self.name} M4 maxmin:\n'
|
||||
f'{ixrng} -> {mxmn}'
|
||||
|
@ -515,7 +558,7 @@ class Viz(Struct):
|
|||
|
||||
mxmn = ylow, yhigh
|
||||
if (
|
||||
do_print
|
||||
_do_print
|
||||
):
|
||||
s = 3
|
||||
print(
|
||||
|
@ -529,14 +572,23 @@ class Viz(Struct):
|
|||
|
||||
# cache result for input range
|
||||
ylow, yhi = mxmn
|
||||
diff: float = yhi - ylow
|
||||
|
||||
# order-of-magnitude check
|
||||
# TODO: really we should be checking the hi or low
|
||||
# against the previous sample to catch stuff like,
|
||||
# - rando stock (reverse-)split
|
||||
# - null-segments written by some prior
|
||||
# crash-during-backfil
|
||||
if diff > 0:
|
||||
omg: float = abs(logf(diff, 10))
|
||||
else:
|
||||
omg: float = 0
|
||||
|
||||
try:
|
||||
prolly_anomaly: bool = (
|
||||
(
|
||||
abs(logf(ylow, 10)) > 16
|
||||
if ylow
|
||||
else False
|
||||
)
|
||||
# diff == 0
|
||||
(ylow and omg > 10)
|
||||
or (
|
||||
isnan(ylow) or isnan(yhi)
|
||||
)
|
||||
|
@ -577,7 +629,7 @@ class Viz(Struct):
|
|||
self,
|
||||
view_range: None | tuple[float, float] = None,
|
||||
index_field: str | None = None,
|
||||
array: np.ndarray | None = None,
|
||||
array: ndarray | None = None,
|
||||
|
||||
) -> tuple[
|
||||
int, int, int, int, int, int
|
||||
|
@ -648,8 +700,8 @@ class Viz(Struct):
|
|||
profiler: None | Profiler = None,
|
||||
|
||||
) -> tuple[
|
||||
int, int, np.ndarray,
|
||||
int, int, np.ndarray,
|
||||
int, int, ndarray,
|
||||
int, int, ndarray,
|
||||
]:
|
||||
'''
|
||||
Read the underlying shm array buffer and
|
||||
|
@ -819,6 +871,10 @@ class Viz(Struct):
|
|||
graphics,
|
||||
read,
|
||||
profiler,
|
||||
|
||||
# NOTE: only set when caller says to
|
||||
force_redraw=should_redraw,
|
||||
|
||||
**kwargs,
|
||||
)
|
||||
|
||||
|
@ -981,6 +1037,39 @@ class Viz(Struct):
|
|||
graphics,
|
||||
)
|
||||
|
||||
def reset_graphics(
|
||||
self,
|
||||
|
||||
# TODO: allow only resetting within some x-domain range?
|
||||
# ixrng: tuple[int, int] | None = None,
|
||||
|
||||
) -> None:
|
||||
'''
|
||||
Hard reset all graphics (rendering) layers for this
|
||||
data viz including clearing the mxmn auto-y-range
|
||||
cache.
|
||||
|
||||
Normally called when the underlying data set is modified
|
||||
(probably by some `.tsp` correcting/editing routine) and
|
||||
the (now cached) graphics need to be fully re-rendered from
|
||||
source.
|
||||
|
||||
'''
|
||||
log.warning(
|
||||
f'Forcing hard Viz graphihcs RESET:\n'
|
||||
f'.name: {self.name}\n'
|
||||
f'.index_field: {self.index_field}\n'
|
||||
f'.index_step(): {self.index_step()}\n'
|
||||
f'.time_step(): {self.time_step()}\n'
|
||||
)
|
||||
# XXX: always clear the mxn y-range cache
|
||||
# to avoid old data (anomalies) from being
|
||||
# retained in auto-yrange output.
|
||||
self._mxmn_cache_enabled = False
|
||||
self._mxmns.clear()
|
||||
self.update_graphics(force_redraw=True)
|
||||
self._mxmn_cache_enabled = True
|
||||
|
||||
def draw_last(
|
||||
self,
|
||||
array_key: str | None = None,
|
||||
|
@ -1073,7 +1162,7 @@ class Viz(Struct):
|
|||
|
||||
'''
|
||||
shm: ShmArray = self.shm
|
||||
array: np.ndarray = shm.array
|
||||
array: ndarray = shm.array
|
||||
view: ChartView = self.plot.vb
|
||||
(
|
||||
vl,
|
||||
|
|
|
@ -210,9 +210,9 @@ async def increment_history_view(
|
|||
):
|
||||
hist_chart: ChartPlotWidget = ds.hist_chart
|
||||
hist_viz: Viz = ds.hist_viz
|
||||
viz: Viz = ds.viz
|
||||
# viz: Viz = ds.viz
|
||||
assert 'hist' in hist_viz.shm.token['shm_name']
|
||||
name: str = hist_viz.name
|
||||
# name: str = hist_viz.name
|
||||
|
||||
# TODO: seems this is more reliable at keeping the slow
|
||||
# chart incremented in view more correctly?
|
||||
|
@ -225,7 +225,8 @@ async def increment_history_view(
|
|||
# draw everything from scratch on first entry!
|
||||
for curve_name, hist_viz in hist_chart._vizs.items():
|
||||
log.info(f'Forcing hard redraw -> {curve_name}')
|
||||
hist_viz.update_graphics(force_redraw=True)
|
||||
hist_viz.reset_graphics()
|
||||
# hist_viz.update_graphics(force_redraw=True)
|
||||
|
||||
async with open_sample_stream(1.) as min_istream:
|
||||
async for msg in min_istream:
|
||||
|
@ -248,27 +249,27 @@ async def increment_history_view(
|
|||
# - samplerd could emit the actual update range via
|
||||
# tuple and then we only enter the below block if that
|
||||
# range is detected as in-view?
|
||||
match msg:
|
||||
case {
|
||||
'backfilling': (viz_name, timeframe),
|
||||
} if (
|
||||
viz_name == name
|
||||
):
|
||||
log.warning(
|
||||
f'Forcing HARD REDRAW:\n'
|
||||
f'name: {name}\n'
|
||||
f'timeframe: {timeframe}\n'
|
||||
)
|
||||
# TODO: only allow this when the data is IN VIEW!
|
||||
# also, we probably can do this more efficiently
|
||||
# / smarter by only redrawing the portion of the
|
||||
# path necessary?
|
||||
{
|
||||
60: hist_viz,
|
||||
1: viz,
|
||||
}[timeframe].update_graphics(
|
||||
force_redraw=True
|
||||
)
|
||||
# match msg:
|
||||
# case {
|
||||
# 'backfilling': (viz_name, timeframe),
|
||||
# } if (
|
||||
# viz_name == name
|
||||
# ):
|
||||
# log.warning(
|
||||
# f'Forcing HARD REDRAW:\n'
|
||||
# f'name: {name}\n'
|
||||
# f'timeframe: {timeframe}\n'
|
||||
# )
|
||||
# # TODO: only allow this when the data is IN VIEW!
|
||||
# # also, we probably can do this more efficiently
|
||||
# # / smarter by only redrawing the portion of the
|
||||
# # path necessary?
|
||||
# {
|
||||
# 60: hist_viz,
|
||||
# 1: viz,
|
||||
# }[timeframe].update_graphics(
|
||||
# force_redraw=True
|
||||
# )
|
||||
|
||||
# check if slow chart needs an x-domain shift and/or
|
||||
# y-range resize.
|
||||
|
@ -309,6 +310,7 @@ async def increment_history_view(
|
|||
|
||||
async def graphics_update_loop(
|
||||
|
||||
dss: dict[str, DisplayState],
|
||||
nurse: trio.Nursery,
|
||||
godwidget: GodWidget,
|
||||
feed: Feed,
|
||||
|
@ -350,8 +352,6 @@ async def graphics_update_loop(
|
|||
'i_last_slow_t': 0, # multiview-global slow (1m) step index
|
||||
}
|
||||
|
||||
dss: dict[str, DisplayState] = {}
|
||||
|
||||
for fqme, flume in feed.flumes.items():
|
||||
ohlcv = flume.rt_shm
|
||||
hist_ohlcv = flume.hist_shm
|
||||
|
@ -470,17 +470,18 @@ async def graphics_update_loop(
|
|||
if ds.hist_vars['i_last'] < ds.hist_vars['i_last_append']:
|
||||
await tractor.pause()
|
||||
|
||||
try:
|
||||
# try:
|
||||
|
||||
# XXX TODO: we need to do _dss UPDATE here so that when
|
||||
# a feed-view is switched you can still remote annotate the
|
||||
# prior view..
|
||||
from . import _remote_ctl
|
||||
_remote_ctl._dss = dss
|
||||
_remote_ctl._dss.update(dss)
|
||||
|
||||
# main real-time quotes update loop
|
||||
stream: tractor.MsgStream
|
||||
async with feed.open_multi_stream() as stream:
|
||||
assert stream
|
||||
# assert stream
|
||||
async for quotes in stream:
|
||||
quote_period = time.time() - last_quote_s
|
||||
quote_rate = round(
|
||||
|
@ -496,7 +497,7 @@ async def graphics_update_loop(
|
|||
pass
|
||||
# log.warning(f'High quote rate {mkt.fqme}: {quote_rate}')
|
||||
|
||||
last_quote_s = time.time()
|
||||
last_quote_s: float = time.time()
|
||||
|
||||
for fqme, quote in quotes.items():
|
||||
ds = dss[fqme]
|
||||
|
@ -526,11 +527,11 @@ async def graphics_update_loop(
|
|||
quote,
|
||||
)
|
||||
|
||||
finally:
|
||||
# XXX: cancel any remote annotation control ctxs
|
||||
_remote_ctl._dss = None
|
||||
for cid, (ctx, aids) in _remote_ctl._ctxs.items():
|
||||
await ctx.cancel()
|
||||
# finally:
|
||||
# # XXX: cancel any remote annotation control ctxs
|
||||
# _remote_ctl._dss = None
|
||||
# for cid, (ctx, aids) in _remote_ctl._ctxs.items():
|
||||
# await ctx.cancel()
|
||||
|
||||
|
||||
def graphics_update_cycle(
|
||||
|
@ -1554,8 +1555,10 @@ async def display_symbol_data(
|
|||
)
|
||||
|
||||
# start update loop task
|
||||
dss: dict[str, DisplayState] = {}
|
||||
ln.start_soon(
|
||||
graphics_update_loop,
|
||||
dss,
|
||||
ln,
|
||||
godwidget,
|
||||
feed,
|
||||
|
@ -1569,15 +1572,31 @@ async def display_symbol_data(
|
|||
order_ctl_fqme: str = fqmes[0]
|
||||
mode: OrderMode
|
||||
async with (
|
||||
|
||||
open_order_mode(
|
||||
feed,
|
||||
godwidget,
|
||||
order_ctl_fqme,
|
||||
order_mode_started,
|
||||
loglevel=loglevel
|
||||
) as mode
|
||||
):
|
||||
) as mode,
|
||||
|
||||
# TODO: maybe have these startup sooner before
|
||||
# order mode fully boots? but we gotta,
|
||||
# -[ ] decouple the order mode bindings until
|
||||
# the mode has fully booted..
|
||||
# -[ ] maybe do an Event to sync?
|
||||
|
||||
# start input handling for ``ChartView`` input
|
||||
# (i.e. kb + mouse handling loops)
|
||||
rt_chart.view.open_async_input_handler(
|
||||
dss=dss,
|
||||
),
|
||||
hist_chart.view.open_async_input_handler(
|
||||
dss=dss,
|
||||
),
|
||||
|
||||
):
|
||||
rt_linked.mode = mode
|
||||
|
||||
rt_viz = rt_chart.get_viz(order_ctl_fqme)
|
||||
|
|
|
@ -201,8 +201,8 @@ async def open_signal_handler(
|
|||
async for args in recv:
|
||||
await async_handler(*args)
|
||||
|
||||
async with trio.open_nursery() as n:
|
||||
n.start_soon(proxy_to_handler)
|
||||
async with trio.open_nursery() as tn:
|
||||
tn.start_soon(proxy_to_handler)
|
||||
async with send:
|
||||
yield
|
||||
|
||||
|
@ -212,18 +212,48 @@ async def open_handlers(
|
|||
|
||||
source_widgets: list[QWidget],
|
||||
event_types: set[QEvent],
|
||||
async_handler: Callable[[QWidget, trio.abc.ReceiveChannel], None],
|
||||
**kwargs,
|
||||
|
||||
# NOTE: if you want to bind in additional kwargs to the handler
|
||||
# pass in a `partial()` instead!
|
||||
async_handler: Callable[
|
||||
[QWidget, trio.abc.ReceiveChannel], # required handler args
|
||||
None
|
||||
],
|
||||
|
||||
# XXX: these are ONLY inputs available to the
|
||||
# `open_event_stream()` event-relay to mem-chan factor above!
|
||||
**open_ev_stream_kwargs,
|
||||
|
||||
) -> None:
|
||||
'''
|
||||
Connect and schedule an async handler function to receive an
|
||||
arbitrary `QWidget`'s events with kb/mouse msgs repacked into
|
||||
structs (see above) and shuttled over a mem-chan to the input
|
||||
`async_handler` to allow interaction-IO processing from
|
||||
a `trio` func-as-task.
|
||||
|
||||
'''
|
||||
widget: QWidget
|
||||
streams: list[trio.abc.ReceiveChannel]
|
||||
async with (
|
||||
trio.open_nursery() as n,
|
||||
trio.open_nursery() as tn,
|
||||
gather_contexts([
|
||||
open_event_stream(widget, event_types, **kwargs)
|
||||
open_event_stream(
|
||||
widget,
|
||||
event_types,
|
||||
**open_ev_stream_kwargs,
|
||||
)
|
||||
for widget in source_widgets
|
||||
]) as streams,
|
||||
):
|
||||
for widget, event_recv_stream in zip(source_widgets, streams):
|
||||
n.start_soon(async_handler, widget, event_recv_stream)
|
||||
for widget, event_recv_stream in zip(
|
||||
source_widgets,
|
||||
streams,
|
||||
):
|
||||
tn.start_soon(
|
||||
async_handler,
|
||||
widget,
|
||||
event_recv_stream,
|
||||
)
|
||||
|
||||
yield
|
||||
|
|
|
@ -23,6 +23,7 @@ from contextlib import (
|
|||
asynccontextmanager,
|
||||
ExitStack,
|
||||
)
|
||||
from functools import partial
|
||||
import time
|
||||
from typing import (
|
||||
Callable,
|
||||
|
@ -74,6 +75,7 @@ if TYPE_CHECKING:
|
|||
)
|
||||
from ._dataviz import Viz
|
||||
from .order_mode import OrderMode
|
||||
from ._display import DisplayState
|
||||
|
||||
|
||||
log = get_logger(__name__)
|
||||
|
@ -102,6 +104,7 @@ async def handle_viewmode_kb_inputs(
|
|||
|
||||
view: ChartView,
|
||||
recv_chan: trio.abc.ReceiveChannel,
|
||||
dss: dict[str, DisplayState],
|
||||
|
||||
) -> None:
|
||||
|
||||
|
@ -177,17 +180,42 @@ async def handle_viewmode_kb_inputs(
|
|||
Qt.Key_P,
|
||||
}
|
||||
):
|
||||
import tractor
|
||||
feed = order_mode.feed # noqa
|
||||
chart = order_mode.chart # noqa
|
||||
viz = chart.main_viz # noqa
|
||||
vlm_chart = chart.linked.subplots['volume'] # noqa
|
||||
vlm_viz = vlm_chart.main_viz # noqa
|
||||
dvlm_pi = vlm_chart._vizs['dolla_vlm'].plot # noqa
|
||||
import tractor
|
||||
await tractor.pause()
|
||||
view.interact_graphics_cycle()
|
||||
|
||||
# SEARCH MODE #
|
||||
# FORCE graphics reset-and-render of all currently
|
||||
# shown data `Viz`s for the current chart app.
|
||||
if (
|
||||
ctrl
|
||||
and key in {
|
||||
Qt.Key_R,
|
||||
}
|
||||
):
|
||||
fqme: str
|
||||
ds: DisplayState
|
||||
for fqme, ds in dss.items():
|
||||
|
||||
viz: Viz
|
||||
for tf, viz in {
|
||||
60: ds.hist_viz,
|
||||
1: ds.viz,
|
||||
}.items():
|
||||
# TODO: only allow this when the data is IN VIEW!
|
||||
# also, we probably can do this more efficiently
|
||||
# / smarter by only redrawing the portion of the
|
||||
# path necessary?
|
||||
viz.reset_graphics()
|
||||
|
||||
# ------ - ------
|
||||
# SEARCH MODE
|
||||
# ------ - ------
|
||||
# ctlr-<space>/<l> for "lookup", "search" -> open search tree
|
||||
if (
|
||||
ctrl
|
||||
|
@ -247,8 +275,10 @@ async def handle_viewmode_kb_inputs(
|
|||
delta=-view.def_delta,
|
||||
)
|
||||
|
||||
elif key == Qt.Key_R:
|
||||
|
||||
elif (
|
||||
not ctrl
|
||||
and key == Qt.Key_R
|
||||
):
|
||||
# NOTE: seems that if we don't yield a Qt render
|
||||
# cycle then the m4 downsampled curves will show here
|
||||
# without another reset..
|
||||
|
@ -431,6 +461,7 @@ async def handle_viewmode_mouse(
|
|||
|
||||
view: ChartView,
|
||||
recv_chan: trio.abc.ReceiveChannel,
|
||||
dss: dict[str, DisplayState],
|
||||
|
||||
) -> None:
|
||||
|
||||
|
@ -567,6 +598,7 @@ class ChartView(ViewBox):
|
|||
@asynccontextmanager
|
||||
async def open_async_input_handler(
|
||||
self,
|
||||
**handler_kwargs,
|
||||
|
||||
) -> ChartView:
|
||||
|
||||
|
@ -577,14 +609,20 @@ class ChartView(ViewBox):
|
|||
QEvent.KeyPress,
|
||||
QEvent.KeyRelease,
|
||||
},
|
||||
async_handler=handle_viewmode_kb_inputs,
|
||||
async_handler=partial(
|
||||
handle_viewmode_kb_inputs,
|
||||
**handler_kwargs,
|
||||
),
|
||||
),
|
||||
_event.open_handlers(
|
||||
[self],
|
||||
event_types={
|
||||
gs_mouse.GraphicsSceneMousePress,
|
||||
},
|
||||
async_handler=handle_viewmode_mouse,
|
||||
async_handler=partial(
|
||||
handle_viewmode_mouse,
|
||||
**handler_kwargs,
|
||||
),
|
||||
),
|
||||
):
|
||||
yield self
|
||||
|
|
|
@ -930,13 +930,8 @@ async def open_order_mode(
|
|||
msg,
|
||||
)
|
||||
|
||||
# start async input handling for chart's view
|
||||
async with (
|
||||
|
||||
# ``ChartView`` input async handler startup
|
||||
chart.view.open_async_input_handler(),
|
||||
hist_chart.view.open_async_input_handler(),
|
||||
|
||||
# pp pane kb inputs
|
||||
open_form_input_handling(
|
||||
form,
|
||||
|
|
Loading…
Reference in New Issue