Compare commits

..

No commits in common. "c82ca812a83105b1be20340044be4254470bfb08" and "ad565936ec669d5e324df51963ac6f2fcf482e62" have entirely different histories.

9 changed files with 252 additions and 400 deletions

View File

@ -41,11 +41,6 @@ if TYPE_CHECKING:
)
from piker.toolz import Profiler
# default gap between bars: "bar gap multiplier"
# - 0.5 is no overlap between OC arms,
# - 1.0 is full overlap on each neighbor sample
BGM: float = 0.16
class IncrementalFormatter(msgspec.Struct):
'''
@ -518,7 +513,6 @@ class IncrementalFormatter(msgspec.Struct):
class OHLCBarsFmtr(IncrementalFormatter):
x_offset: np.ndarray = np.array([
-0.5,
0,
@ -610,9 +604,8 @@ class OHLCBarsFmtr(IncrementalFormatter):
vr: tuple[int, int],
start: int = 0, # XXX: do we need this?
# 0.5 is no overlap between arms, 1.0 is full overlap
gap: float = BGM,
w: float = 0.16,
) -> tuple[
np.ndarray,
@ -629,7 +622,7 @@ class OHLCBarsFmtr(IncrementalFormatter):
array[:-1],
start,
bar_w=self.index_step_size,
bar_gap=gap * self.index_step_size,
bar_gap=w * self.index_step_size,
# XXX: don't ask, due to a ``numba`` bug..
use_time_index=(self.index_field == 'time'),

View File

@ -67,28 +67,50 @@ from ..data._sampling import (
)
from ._anal import (
get_null_segs as get_null_segs,
iter_null_segs as iter_null_segs,
Frame as Frame,
Seq as Seq,
get_null_segs,
iter_null_segs,
Frame,
Seq,
# codec-ish
np2pl as np2pl,
pl2np as pl2np,
np2pl,
pl2np,
# `numpy` only
slice_from_time as slice_from_time,
slice_from_time,
# `polars` specific
dedupe as dedupe,
with_dts as with_dts,
detect_time_gaps as detect_time_gaps,
sort_diff as sort_diff,
dedupe,
with_dts,
detect_time_gaps,
sort_diff,
# TODO:
detect_price_gaps as detect_price_gaps
detect_price_gaps
)
__all__: list[str] = [
'dedupe',
'get_null_segs',
'iter_null_segs',
'sort_diff',
'slice_from_time',
'Frame',
'Seq',
'np2pl',
'pl2np',
'slice_from_time',
'with_dts',
'detect_time_gaps',
'sort_diff',
# TODO:
'detect_price_gaps'
]
# TODO: break up all this shite into submods!
from ..brokers._util import (
DataUnavailable,
@ -418,11 +440,8 @@ async def start_backfill(
# broker says there never was or is no more history to pull
except DataUnavailable:
log.warning(
f'NO-MORE-DATA in range?\n'
f'`{mod.name}` halted history:\n'
f'tf@fqme: {timeframe}@{mkt.fqme}\n'
'bf_until <- last_start_dt:\n'
f'{backfill_until_dt} <- {last_start_dt}\n'
f'NO-MORE-DATA: backend {mod.name} halted history:\n'
f'{timeframe}@{mkt.fqme}'
)
# ugh, what's a better way?
@ -567,12 +586,13 @@ async def start_backfill(
load_from_offline=False,
)
(
wdts,
df,
gaps,
deduped,
diff,
) = dedupe(df)
# if diff:
# sort_diff(df)
if diff:
sort_diff(df)
else:
# finally filled gap

View File

@ -380,6 +380,10 @@ def get_null_segs(
None, # backfilled on next iter
])
# row = zero_t[fi]
# absi_pre_zseg = row['index'][0] - 1
# absi_pre_zseg = absi - 1
# final iter case, backfill FINAL end iabs!
if (i + 1) == fi_zgaps.size:
absi_zsegs[-1][1] = absi_zeros[-1] + 1
@ -506,10 +510,10 @@ def iter_null_segs(
)
# TODO: move to ._pl_anal
def with_dts(
df: pl.DataFrame,
time_col: str = 'time',
) -> pl.DataFrame:
'''
Insert datetime (casted) columns to a (presumably) OHLC sampled
@ -525,7 +529,9 @@ def with_dts(
column=pl.col(f'{time_col}_prev'),
).alias('dt_prev'),
pl.col('dt').diff().alias('dt_diff'),
])
]) #.with_columns(
# pl.col('dt').diff().dt.days().alias('days_dt_diff'),
# )
t_unit: Literal = Literal[
@ -540,23 +546,25 @@ t_unit: Literal = Literal[
def detect_time_gaps(
w_dts: pl.DataFrame,
df: pl.DataFrame,
time_col: str = 'time',
# epoch sampling step diff
expect_period: float = 60,
# datetime diff unit and gap value
# crypto mkts
# gap_dt_unit: t_unit = 'minutes',
# gap_thresh: int = 1,
# NOTE: legacy stock mkts have venue operating hours
# and thus gaps normally no more then 1-2 days at
# a time.
gap_thresh: float = 1.,
# TODO: allow passing in a frame of operating hours?
# -[ ] durations/ranges for faster legit gap checks?
# XXX -> must be valid ``polars.Expr.dt.<name>``
# like 'days' which a sane default for venue closures
# though will detect weekend gaps which are normal :o
gap_dt_unit: t_unit | None = None,
# TODO: allow passing in a frame of operating hours
# durations/ranges for faster legit gap checks.
gap_dt_unit: t_unit = 'days',
gap_thresh: int = 1,
) -> pl.DataFrame:
'''
@ -566,24 +574,19 @@ def detect_time_gaps(
actual missing data segments.
'''
# first select by any sample-period (in seconds unit) step size
# greater then expected.
step_gaps: pl.DataFrame = w_dts.filter(
pl.col('s_diff').abs() > expect_period
)
if gap_dt_unit is None:
return step_gaps
# NOTE: this flag is to indicate that on this (sampling) time
# scale we expect to only be filtering against larger venue
# closures-scale time gaps.
return step_gaps.filter(
# Second by an arbitrary dt-unit step size
getattr(
pl.col('dt_diff').dt,
gap_dt_unit,
)().abs() > gap_thresh
return (
with_dts(df)
# First by a seconds unit step size
.filter(
pl.col('s_diff').abs() > expect_period
)
.filter(
# Second by an arbitrary dt-unit step size
getattr(
pl.col('dt_diff').dt,
gap_dt_unit,
)().abs() > gap_thresh
)
)
@ -619,10 +622,7 @@ def detect_price_gaps(
def dedupe(
src_df: pl.DataFrame,
time_gaps: pl.DataFrame | None = None,
sort: bool = True,
period: float = 60,
) -> tuple[
pl.DataFrame, # with dts
@ -637,28 +637,44 @@ def dedupe(
dt-deduplicated frame.
'''
wdts: pl.DataFrame = with_dts(src_df)
df: pl.DataFrame = with_dts(src_df)
# maybe sort on any time field
if sort:
wdts = wdts.sort(by='time')
# TODO: detect out-of-order segments which were corrected!
# -[ ] report in log msg
# -[ ] possibly return segment sections which were moved?
# TODO: enable passing existing `with_dts` df for speedup?
gaps: pl.DataFrame = detect_time_gaps(df)
# if no gaps detected just return carbon copies
# and no len diff.
if gaps.is_empty():
return (
df,
gaps,
df,
0,
)
# remove duplicated datetime samples/sections
deduped: pl.DataFrame = wdts.unique(
deduped: pl.DataFrame = df.unique(
subset=['dt'],
maintain_order=True,
)
if sort:
deduped = deduped.sort(by='time')
deduped_gaps: pl.DataFrame = detect_time_gaps(deduped)
diff: int = (
wdts.height
df.height
-
deduped.height
)
log.warning(
f'TIME GAPs FOUND:\n'
# f'{gaps}\n'
f'deduped Gaps found:\n{deduped_gaps}'
)
return (
wdts,
df,
gaps,
deduped,
diff,
)
@ -692,7 +708,7 @@ def sort_diff(
# to go from numpy struct-arrays to polars dataframes and back:
# https://stackoverflow.com/a/72054819
def np2pl(array: np.ndarray) -> pl.DataFrame:
start: float = time.time()
start = time.time()
# XXX: thanks to this SO answer for this conversion tip:
# https://stackoverflow.com/a/72054819

View File

@ -23,7 +23,7 @@ from functools import lru_cache
from typing import Callable
from math import floor
import polars as pl
import numpy as np
import pyqtgraph as pg
from PyQt5 import QtCore, QtGui, QtWidgets
from PyQt5.QtCore import QPointF
@ -33,7 +33,6 @@ from ..accounting._mktinfo import float_digits
from ._label import Label
from ._style import DpiAwareFont, hcolor, _font
from ._interaction import ChartView
from ._dataviz import Viz
_axis_pen = pg.mkPen(hcolor('bracket'))
@ -288,7 +287,9 @@ class DynamicDateAxis(Axis):
# time formats mapped by seconds between bars
tick_tpl = {
60 * 60 * 24: '%Y-%b-%d',
60: '%Y-%b-%d(%H:%M)',
60: '%H:%M',
30: '%H:%M:%S',
5: '%H:%M:%S',
1: '%H:%M:%S',
}
@ -304,10 +305,10 @@ class DynamicDateAxis(Axis):
# XX: ARGGGGG AG:LKSKDJF:LKJSDFD
chart = self.pi.chart_widget
viz: Viz = chart._vizs[chart.name]
viz = chart._vizs[chart.name]
shm = viz.shm
array = shm.array
ifield: str = viz.index_field
ifield = viz.index_field
index = array[ifield]
i_0, i_l = index[0], index[-1]
@ -328,7 +329,7 @@ class DynamicDateAxis(Axis):
arr_len = index.shape[0]
first = shm._first.value
times = array['time']
epochs: list[int] = times[
epochs = times[
list(
map(
int,
@ -340,30 +341,23 @@ class DynamicDateAxis(Axis):
)
]
else:
epochs: list[int] = list(map(int, indexes))
epochs = list(map(int, indexes))
# TODO: **don't** have this hard coded shift to EST
delay: float = viz.time_step()
if delay > 1:
# NOTE: use less granular dt-str when using 1M+ OHLC
fmtstr: str = self.tick_tpl[delay]
else:
fmtstr: str = '%Y-%m-%d(%H:%M:%S)'
# https://pola-rs.github.io/polars/py-polars/html/reference/expressions/api/polars.from_epoch.html#polars-from-epoch
pl_dts: pl.Series = pl.from_epoch(
# delay = times[-1] - times[-2]
dts = np.array(
epochs,
time_unit='s',
# NOTE: kinda weird we can pass it to `.from_epoch()` no?
).dt.replace_time_zone(
time_zone='UTC'
).dt.convert_time_zone(
# TODO: pull this from either:
# -[ ] the mkt venue tz by default
# -[ ] the user's config under `sys.mkt_timezone: str`
'EST'
dtype='datetime64[s]',
)
return pl_dts.dt.to_string(fmtstr).to_list()
# see units listing:
# https://numpy.org/devdocs/reference/arrays.datetime.html#datetime-units
return list(np.datetime_as_string(dts))
# TODO: per timeframe formatting?
# - we probably need this based on zoom now right?
# prec = self.np_dt_precision[delay]
# return dts.strftime(self.tick_tpl[delay])
def tickStrings(
self,

View File

@ -36,9 +36,6 @@ from msgspec import (
field,
)
import numpy as np
from numpy import (
ndarray,
)
import pyqtgraph as pg
from PyQt5.QtCore import QLineF
@ -85,11 +82,10 @@ def render_baritems(
viz: Viz,
graphics: BarItems,
read: tuple[
int, int, ndarray,
int, int, ndarray,
int, int, np.ndarray,
int, int, np.ndarray,
],
profiler: Profiler,
force_redraw: bool = False,
**kwargs,
) -> None:
@ -220,11 +216,9 @@ def render_baritems(
viz._in_ds = should_line
should_redraw = (
force_redraw
or changed_to_line
changed_to_line
or not should_line
)
# print(f'should_redraw: {should_redraw}')
return (
graphics,
r,
@ -256,7 +250,7 @@ class ViewState(Struct):
] | None = None
# last in view ``ShmArray.array[read_slc]`` data
in_view: ndarray | None = None
in_view: np.ndarray | None = None
class Viz(Struct):
@ -319,7 +313,6 @@ class Viz(Struct):
_last_uppx: float = 0
_in_ds: bool = False
_index_step: float | None = None
_time_step: float | None = None
# map from uppx -> (downsampled data, incremental graphics)
_src_r: Renderer | None = None
@ -366,8 +359,7 @@ class Viz(Struct):
def index_step(
self,
index_field: str | None = None,
reset: bool = False,
) -> float:
'''
Return the size between sample steps in the units of the
@ -375,17 +367,12 @@ class Viz(Struct):
epoch time in seconds.
'''
# attempt to detect the best step size by scanning a sample
# of the source data.
if (
self._index_step is None
or index_field is not None
):
index: ndarray = self.shm.array[
index_field
or self.index_field
]
isample: ndarray = index[-16:]
# attempt to dectect the best step size by scanning a sample of
# the source data.
if self._index_step is None:
index: np.ndarray = self.shm.array[self.index_field]
isample: np.ndarray = index[-16:]
mxdiff: None | float = None
for step in np.diff(isample):
@ -399,15 +386,7 @@ class Viz(Struct):
)
mxdiff = step
step: float = max(mxdiff, 1)
# only SET the internal index step if an explicit
# field name is NOT passed, since in such cases this
# is likely just being called from `.time_step()`.
if index_field is not None:
return step
self._index_step = step
self._index_step = max(mxdiff, 1)
if (
mxdiff < 1
or 1 < mxdiff < 60
@ -418,17 +397,6 @@ class Viz(Struct):
return self._index_step
def time_step(self) -> float:
'''
Attempt to determine the per-sample time-step period by
forcing an epoch-index and calling `.index_step()`.
'''
if self._time_step is None:
self._time_step: float = self.index_step(index_field='time')
return self._time_step
def maxmin(
self,
@ -436,9 +404,6 @@ class Viz(Struct):
i_read_range: tuple[int, int] | None = None,
use_caching: bool = True,
# XXX: internal debug
_do_print: bool = False
) -> tuple[float, float] | None:
'''
Compute the cached max and min y-range values for a given
@ -458,14 +423,15 @@ class Viz(Struct):
if shm is None:
return None
arr: ndarray = shm.array
do_print: bool = False
arr = shm.array
if i_read_range is not None:
read_slc = slice(*i_read_range)
index: float | int = arr[read_slc][self.index_field]
index = arr[read_slc][self.index_field]
if not index.size:
return None
ixrng: tuple[int, int] = (index[0], index[-1])
ixrng = (index[0], index[-1])
else:
if x_range is None:
@ -483,24 +449,15 @@ class Viz(Struct):
# TODO: hash the slice instead maybe?
# https://stackoverflow.com/a/29980872
ixrng = lbar, rbar = (
round(x_range[0]),
round(x_range[1]),
)
ixrng = lbar, rbar = round(x_range[0]), round(x_range[1])
if (
use_caching
and self._mxmn_cache_enabled
):
# TODO: is there a way to ONLY clear ranges containing
# a certain sub-range?
# -[ ] currently we have a problem where a previously
# cached mxmn will persist even if the viz is "hard
# re-rendered" (usually bc underlying data was
# corrected)
cached_result = self._mxmns.get(ixrng)
if cached_result:
if _do_print:
if do_print:
print(
f'{self.name} CACHED maxmin\n'
f'{ixrng} -> {cached_result}'
@ -530,7 +487,7 @@ class Viz(Struct):
(rbar - ifirst) + 1
)
slice_view: ndarray = arr[read_slc]
slice_view = arr[read_slc]
if not slice_view.size:
log.warning(
@ -541,7 +498,7 @@ class Viz(Struct):
elif self.ds_yrange:
mxmn = self.ds_yrange
if _do_print:
if do_print:
print(
f'{self.name} M4 maxmin:\n'
f'{ixrng} -> {mxmn}'
@ -558,7 +515,7 @@ class Viz(Struct):
mxmn = ylow, yhigh
if (
_do_print
do_print
):
s = 3
print(
@ -572,23 +529,14 @@ class Viz(Struct):
# cache result for input range
ylow, yhi = mxmn
diff: float = yhi - ylow
# order-of-magnitude check
# TODO: really we should be checking the hi or low
# against the previous sample to catch stuff like,
# - rando stock (reverse-)split
# - null-segments written by some prior
# crash-during-backfil
if diff > 0:
omg: float = abs(logf(diff, 10))
else:
omg: float = 0
try:
prolly_anomaly: bool = (
# diff == 0
(ylow and omg > 10)
(
abs(logf(ylow, 10)) > 16
if ylow
else False
)
or (
isnan(ylow) or isnan(yhi)
)
@ -629,7 +577,7 @@ class Viz(Struct):
self,
view_range: None | tuple[float, float] = None,
index_field: str | None = None,
array: ndarray | None = None,
array: np.ndarray | None = None,
) -> tuple[
int, int, int, int, int, int
@ -700,8 +648,8 @@ class Viz(Struct):
profiler: None | Profiler = None,
) -> tuple[
int, int, ndarray,
int, int, ndarray,
int, int, np.ndarray,
int, int, np.ndarray,
]:
'''
Read the underlying shm array buffer and
@ -871,10 +819,6 @@ class Viz(Struct):
graphics,
read,
profiler,
# NOTE: only set when caller says to
force_redraw=should_redraw,
**kwargs,
)
@ -1037,39 +981,6 @@ class Viz(Struct):
graphics,
)
def reset_graphics(
self,
# TODO: allow only resetting within some x-domain range?
# ixrng: tuple[int, int] | None = None,
) -> None:
'''
Hard reset all graphics (rendering) layers for this
data viz including clearing the mxmn auto-y-range
cache.
Normally called when the underlying data set is modified
(probably by some `.tsp` correcting/editing routine) and
the (now cached) graphics need to be fully re-rendered from
source.
'''
log.warning(
f'Forcing hard Viz graphihcs RESET:\n'
f'.name: {self.name}\n'
f'.index_field: {self.index_field}\n'
f'.index_step(): {self.index_step()}\n'
f'.time_step(): {self.time_step()}\n'
)
# XXX: always clear the mxn y-range cache
# to avoid old data (anomalies) from being
# retained in auto-yrange output.
self._mxmn_cache_enabled = False
self._mxmns.clear()
self.update_graphics(force_redraw=True)
self._mxmn_cache_enabled = True
def draw_last(
self,
array_key: str | None = None,
@ -1162,7 +1073,7 @@ class Viz(Struct):
'''
shm: ShmArray = self.shm
array: ndarray = shm.array
array: np.ndarray = shm.array
view: ChartView = self.plot.vb
(
vl,

View File

@ -210,9 +210,9 @@ async def increment_history_view(
):
hist_chart: ChartPlotWidget = ds.hist_chart
hist_viz: Viz = ds.hist_viz
# viz: Viz = ds.viz
viz: Viz = ds.viz
assert 'hist' in hist_viz.shm.token['shm_name']
# name: str = hist_viz.name
name: str = hist_viz.name
# TODO: seems this is more reliable at keeping the slow
# chart incremented in view more correctly?
@ -225,8 +225,7 @@ async def increment_history_view(
# draw everything from scratch on first entry!
for curve_name, hist_viz in hist_chart._vizs.items():
log.info(f'Forcing hard redraw -> {curve_name}')
hist_viz.reset_graphics()
# hist_viz.update_graphics(force_redraw=True)
hist_viz.update_graphics(force_redraw=True)
async with open_sample_stream(1.) as min_istream:
async for msg in min_istream:
@ -249,27 +248,27 @@ async def increment_history_view(
# - samplerd could emit the actual update range via
# tuple and then we only enter the below block if that
# range is detected as in-view?
# match msg:
# case {
# 'backfilling': (viz_name, timeframe),
# } if (
# viz_name == name
# ):
# log.warning(
# f'Forcing HARD REDRAW:\n'
# f'name: {name}\n'
# f'timeframe: {timeframe}\n'
# )
# # TODO: only allow this when the data is IN VIEW!
# # also, we probably can do this more efficiently
# # / smarter by only redrawing the portion of the
# # path necessary?
# {
# 60: hist_viz,
# 1: viz,
# }[timeframe].update_graphics(
# force_redraw=True
# )
match msg:
case {
'backfilling': (viz_name, timeframe),
} if (
viz_name == name
):
log.warning(
f'Forcing HARD REDRAW:\n'
f'name: {name}\n'
f'timeframe: {timeframe}\n'
)
# TODO: only allow this when the data is IN VIEW!
# also, we probably can do this more efficiently
# / smarter by only redrawing the portion of the
# path necessary?
{
60: hist_viz,
1: viz,
}[timeframe].update_graphics(
force_redraw=True
)
# check if slow chart needs an x-domain shift and/or
# y-range resize.
@ -310,7 +309,6 @@ async def increment_history_view(
async def graphics_update_loop(
dss: dict[str, DisplayState],
nurse: trio.Nursery,
godwidget: GodWidget,
feed: Feed,
@ -352,6 +350,8 @@ async def graphics_update_loop(
'i_last_slow_t': 0, # multiview-global slow (1m) step index
}
dss: dict[str, DisplayState] = {}
for fqme, flume in feed.flumes.items():
ohlcv = flume.rt_shm
hist_ohlcv = flume.hist_shm
@ -470,68 +470,67 @@ async def graphics_update_loop(
if ds.hist_vars['i_last'] < ds.hist_vars['i_last_append']:
await tractor.pause()
# try:
try:
# XXX TODO: we need to do _dss UPDATE here so that when
# a feed-view is switched you can still remote annotate the
# prior view..
from . import _remote_ctl
_remote_ctl._dss = dss
# XXX TODO: we need to do _dss UPDATE here so that when
# a feed-view is switched you can still remote annotate the
# prior view..
from . import _remote_ctl
_remote_ctl._dss.update(dss)
# main real-time quotes update loop
stream: tractor.MsgStream
async with feed.open_multi_stream() as stream:
# assert stream
async for quotes in stream:
quote_period = time.time() - last_quote_s
quote_rate = round(
1/quote_period, 1) if quote_period > 0 else float('inf')
if (
quote_period <= 1/_quote_throttle_rate
# in the absolute worst case we shouldn't see more then
# twice the expected throttle rate right!?
# and quote_rate >= _quote_throttle_rate * 2
and quote_rate >= display_rate
):
pass
# log.warning(f'High quote rate {mkt.fqme}: {quote_rate}')
last_quote_s: float = time.time()
for fqme, quote in quotes.items():
ds = dss[fqme]
ds.quotes = quote
rt_pi, hist_pi = pis[fqme]
# chart isn't active/shown so skip render cycle and
# pause feed(s)
# main real-time quotes update loop
stream: tractor.MsgStream
async with feed.open_multi_stream() as stream:
assert stream
async for quotes in stream:
quote_period = time.time() - last_quote_s
quote_rate = round(
1/quote_period, 1) if quote_period > 0 else float('inf')
if (
fast_chart.linked.isHidden()
or not rt_pi.isVisible()
quote_period <= 1/_quote_throttle_rate
# in the absolute worst case we shouldn't see more then
# twice the expected throttle rate right!?
# and quote_rate >= _quote_throttle_rate * 2
and quote_rate >= display_rate
):
print(f'{fqme} skipping update for HIDDEN CHART')
fast_chart.pause_all_feeds()
continue
pass
# log.warning(f'High quote rate {mkt.fqme}: {quote_rate}')
ic = fast_chart.view._in_interact
if ic:
fast_chart.pause_all_feeds()
print(f'{fqme} PAUSING DURING INTERACTION')
await ic.wait()
fast_chart.resume_all_feeds()
last_quote_s = time.time()
# sync call to update all graphics/UX components.
graphics_update_cycle(
ds,
quote,
)
for fqme, quote in quotes.items():
ds = dss[fqme]
ds.quotes = quote
rt_pi, hist_pi = pis[fqme]
# finally:
# # XXX: cancel any remote annotation control ctxs
# _remote_ctl._dss = None
# for cid, (ctx, aids) in _remote_ctl._ctxs.items():
# await ctx.cancel()
# chart isn't active/shown so skip render cycle and
# pause feed(s)
if (
fast_chart.linked.isHidden()
or not rt_pi.isVisible()
):
print(f'{fqme} skipping update for HIDDEN CHART')
fast_chart.pause_all_feeds()
continue
ic = fast_chart.view._in_interact
if ic:
fast_chart.pause_all_feeds()
print(f'{fqme} PAUSING DURING INTERACTION')
await ic.wait()
fast_chart.resume_all_feeds()
# sync call to update all graphics/UX components.
graphics_update_cycle(
ds,
quote,
)
finally:
# XXX: cancel any remote annotation control ctxs
_remote_ctl._dss = None
for cid, (ctx, aids) in _remote_ctl._ctxs.items():
await ctx.cancel()
def graphics_update_cycle(
@ -1555,10 +1554,8 @@ async def display_symbol_data(
)
# start update loop task
dss: dict[str, DisplayState] = {}
ln.start_soon(
graphics_update_loop,
dss,
ln,
godwidget,
feed,
@ -1572,31 +1569,15 @@ async def display_symbol_data(
order_ctl_fqme: str = fqmes[0]
mode: OrderMode
async with (
open_order_mode(
feed,
godwidget,
order_ctl_fqme,
order_mode_started,
loglevel=loglevel
) as mode,
# TODO: maybe have these startup sooner before
# order mode fully boots? but we gotta,
# -[ ] decouple the order mode bindings until
# the mode has fully booted..
# -[ ] maybe do an Event to sync?
# start input handling for ``ChartView`` input
# (i.e. kb + mouse handling loops)
rt_chart.view.open_async_input_handler(
dss=dss,
),
hist_chart.view.open_async_input_handler(
dss=dss,
),
) as mode
):
rt_linked.mode = mode
rt_viz = rt_chart.get_viz(order_ctl_fqme)

View File

@ -201,8 +201,8 @@ async def open_signal_handler(
async for args in recv:
await async_handler(*args)
async with trio.open_nursery() as tn:
tn.start_soon(proxy_to_handler)
async with trio.open_nursery() as n:
n.start_soon(proxy_to_handler)
async with send:
yield
@ -212,48 +212,18 @@ async def open_handlers(
source_widgets: list[QWidget],
event_types: set[QEvent],
# NOTE: if you want to bind in additional kwargs to the handler
# pass in a `partial()` instead!
async_handler: Callable[
[QWidget, trio.abc.ReceiveChannel], # required handler args
None
],
# XXX: these are ONLY inputs available to the
# `open_event_stream()` event-relay to mem-chan factor above!
**open_ev_stream_kwargs,
async_handler: Callable[[QWidget, trio.abc.ReceiveChannel], None],
**kwargs,
) -> None:
'''
Connect and schedule an async handler function to receive an
arbitrary `QWidget`'s events with kb/mouse msgs repacked into
structs (see above) and shuttled over a mem-chan to the input
`async_handler` to allow interaction-IO processing from
a `trio` func-as-task.
'''
widget: QWidget
streams: list[trio.abc.ReceiveChannel]
async with (
trio.open_nursery() as tn,
trio.open_nursery() as n,
gather_contexts([
open_event_stream(
widget,
event_types,
**open_ev_stream_kwargs,
)
open_event_stream(widget, event_types, **kwargs)
for widget in source_widgets
]) as streams,
):
for widget, event_recv_stream in zip(
source_widgets,
streams,
):
tn.start_soon(
async_handler,
widget,
event_recv_stream,
)
for widget, event_recv_stream in zip(source_widgets, streams):
n.start_soon(async_handler, widget, event_recv_stream)
yield

View File

@ -23,7 +23,6 @@ from contextlib import (
asynccontextmanager,
ExitStack,
)
from functools import partial
import time
from typing import (
Callable,
@ -75,7 +74,6 @@ if TYPE_CHECKING:
)
from ._dataviz import Viz
from .order_mode import OrderMode
from ._display import DisplayState
log = get_logger(__name__)
@ -104,7 +102,6 @@ async def handle_viewmode_kb_inputs(
view: ChartView,
recv_chan: trio.abc.ReceiveChannel,
dss: dict[str, DisplayState],
) -> None:
@ -180,42 +177,17 @@ async def handle_viewmode_kb_inputs(
Qt.Key_P,
}
):
import tractor
feed = order_mode.feed # noqa
chart = order_mode.chart # noqa
viz = chart.main_viz # noqa
vlm_chart = chart.linked.subplots['volume'] # noqa
vlm_viz = vlm_chart.main_viz # noqa
dvlm_pi = vlm_chart._vizs['dolla_vlm'].plot # noqa
import tractor
await tractor.pause()
view.interact_graphics_cycle()
# FORCE graphics reset-and-render of all currently
# shown data `Viz`s for the current chart app.
if (
ctrl
and key in {
Qt.Key_R,
}
):
fqme: str
ds: DisplayState
for fqme, ds in dss.items():
viz: Viz
for tf, viz in {
60: ds.hist_viz,
1: ds.viz,
}.items():
# TODO: only allow this when the data is IN VIEW!
# also, we probably can do this more efficiently
# / smarter by only redrawing the portion of the
# path necessary?
viz.reset_graphics()
# ------ - ------
# SEARCH MODE
# ------ - ------
# SEARCH MODE #
# ctlr-<space>/<l> for "lookup", "search" -> open search tree
if (
ctrl
@ -275,10 +247,8 @@ async def handle_viewmode_kb_inputs(
delta=-view.def_delta,
)
elif (
not ctrl
and key == Qt.Key_R
):
elif key == Qt.Key_R:
# NOTE: seems that if we don't yield a Qt render
# cycle then the m4 downsampled curves will show here
# without another reset..
@ -461,7 +431,6 @@ async def handle_viewmode_mouse(
view: ChartView,
recv_chan: trio.abc.ReceiveChannel,
dss: dict[str, DisplayState],
) -> None:
@ -598,7 +567,6 @@ class ChartView(ViewBox):
@asynccontextmanager
async def open_async_input_handler(
self,
**handler_kwargs,
) -> ChartView:
@ -609,20 +577,14 @@ class ChartView(ViewBox):
QEvent.KeyPress,
QEvent.KeyRelease,
},
async_handler=partial(
handle_viewmode_kb_inputs,
**handler_kwargs,
),
async_handler=handle_viewmode_kb_inputs,
),
_event.open_handlers(
[self],
event_types={
gs_mouse.GraphicsSceneMousePress,
},
async_handler=partial(
handle_viewmode_mouse,
**handler_kwargs,
),
async_handler=handle_viewmode_mouse,
),
):
yield self

View File

@ -930,8 +930,13 @@ async def open_order_mode(
msg,
)
# start async input handling for chart's view
async with (
# ``ChartView`` input async handler startup
chart.view.open_async_input_handler(),
hist_chart.view.open_async_input_handler(),
# pp pane kb inputs
open_form_input_handling(
form,