diff --git a/piker/fsp/_engine.py b/piker/fsp/_engine.py index 49fe3322..94d9bfa1 100644 --- a/piker/fsp/_engine.py +++ b/piker/fsp/_engine.py @@ -114,7 +114,7 @@ async def fsp_compute( dict[str, np.ndarray], # multi-output case np.ndarray, # single output case ] - history_output = await out_stream.__anext__() + history_output = await anext(out_stream) func_name = func.__name__ profiler(f'{func_name} generated history') @@ -374,7 +374,8 @@ async def cascade( 'key': dst_shm_token, 'first': dst._first.value, 'last': dst._last.value, - }}) + } + }) return tracker, index def is_synced( diff --git a/piker/ui/_chart.py b/piker/ui/_chart.py index 7b40f0d7..688b97eb 100644 --- a/piker/ui/_chart.py +++ b/piker/ui/_chart.py @@ -230,25 +230,26 @@ class GodWidget(QWidget): # - we'll probably want per-instrument/provider state here? # change the order config form over to the new chart - # XXX: since the pp config is a singleton widget we have to - # also switch it over to the new chart's interal-layout - # self.linkedsplits.chart.qframe.hbox.removeWidget(self.pp_pane) - chart = linkedsplits.chart - # chart is already in memory so just focus it linkedsplits.show() linkedsplits.focus() linkedsplits.graphics_cycle() await trio.sleep(0) - # resume feeds *after* rendering chart view asap - chart.resume_all_feeds() + # XXX: since the pp config is a singleton widget we have to + # also switch it over to the new chart's interal-layout + # self.linkedsplits.chart.qframe.hbox.removeWidget(self.pp_pane) + chart = linkedsplits.chart - # TODO: we need a check to see if the chart - # last had the xlast in view, if so then shift so it's - # still in view, if the user was viewing history then - # do nothing yah? - chart.default_view() + # resume feeds *after* rendering chart view asap + if chart: + chart.resume_all_feeds() + + # TODO: we need a check to see if the chart + # last had the xlast in view, if so then shift so it's + # still in view, if the user was viewing history then + # do nothing yah? + chart.default_view() self.linkedsplits = linkedsplits symbol = linkedsplits.symbol @@ -760,9 +761,18 @@ class ChartPlotWidget(pg.PlotWidget): self.pi_overlay: PlotItemOverlay = PlotItemOverlay(self.plotItem) + # indempotent startup flag for auto-yrange subsys + # to detect the "first time" y-domain graphics begin + # to be shown in the (main) graphics view. + self._on_screen: bool = False + def resume_all_feeds(self): - for feed in self._feeds.values(): - self.linked.godwidget._root_n.start_soon(feed.resume) + try: + for feed in self._feeds.values(): + self.linked.godwidget._root_n.start_soon(feed.resume) + except RuntimeError: + # TODO: cancel the qtractor runtime here? + raise def pause_all_feeds(self): for feed in self._feeds.values(): @@ -859,7 +869,8 @@ class ChartPlotWidget(pg.PlotWidget): def default_view( self, - bars_from_y: int = 3000, + bars_from_y: int = 616, + do_ds: bool = True, ) -> None: ''' @@ -920,8 +931,11 @@ class ChartPlotWidget(pg.PlotWidget): max=end, padding=0, ) - self.view.maybe_downsample_graphics() - view._set_yrange() + + if do_ds: + self.view.maybe_downsample_graphics() + view._set_yrange() + try: self.linked.graphics_cycle() except IndexError: @@ -1255,7 +1269,6 @@ class ChartPlotWidget(pg.PlotWidget): If ``bars_range`` is provided use that range. ''' - # print(f'Chart[{self.name}].maxmin()') profiler = pg.debug.Profiler( msg=f'`{str(self)}.maxmin(name={name})`: `{self.name}`', disabled=not pg_profile_enabled(), @@ -1287,11 +1300,18 @@ class ChartPlotWidget(pg.PlotWidget): key = round(lbar), round(rbar) res = flow.maxmin(*key) - if res == (None, None): - log.error( + + if ( + res is None + ): + log.warning( f"{flow_key} no mxmn for bars_range => {key} !?" ) res = 0, 0 + if not self._on_screen: + self.default_view(do_ds=False) + self._on_screen = True profiler(f'yrange mxmn: {key} -> {res}') + # print(f'{flow_key} yrange mxmn: {key} -> {res}') return res diff --git a/piker/ui/_compression.py b/piker/ui/_compression.py index e9564359..c66b3e58 100644 --- a/piker/ui/_compression.py +++ b/piker/ui/_compression.py @@ -223,14 +223,20 @@ def ds_m4( assert frames >= (xrange / uppx) # call into ``numba`` - nb, i_win, y_out = _m4( + ( + nb, + x_out, + y_out, + ymn, + ymx, + ) = _m4( x, y, frames, # TODO: see func below.. - # i_win, + # x_out, # y_out, # first index in x data to start at @@ -243,10 +249,11 @@ def ds_m4( # filter out any overshoot in the input allocation arrays by # removing zero-ed tail entries which should start at a certain # index. - i_win = i_win[i_win != 0] - y_out = y_out[:i_win.size] + x_out = x_out[x_out != 0] + y_out = y_out[:x_out.size] - return nb, i_win, y_out + # print(f'M4 output ymn, ymx: {ymn},{ymx}') + return nb, x_out, y_out, ymn, ymx @jit( @@ -260,8 +267,8 @@ def _m4( frames: int, - # TODO: using this approach by having the ``.zeros()`` alloc lines - # below, in put python was causing segs faults and alloc crashes.. + # TODO: using this approach, having the ``.zeros()`` alloc lines + # below in pure python, there were segs faults and alloc crashes.. # we might need to see how it behaves with shm arrays and consider # allocating them once at startup? @@ -274,14 +281,22 @@ def _m4( x_start: int, step: float, -) -> int: - # nbins = len(i_win) - # count = len(xs) +) -> tuple[ + int, + np.ndarray, + np.ndarray, + float, + float, +]: + ''' + Implementation of the m4 algorithm in ``numba``: + http://www.vldb.org/pvldb/vol7/p797-jugel.pdf + ''' # these are pre-allocated and mutated by ``numba`` # code in-place. y_out = np.zeros((frames, 4), ys.dtype) - i_win = np.zeros(frames, xs.dtype) + x_out = np.zeros(frames, xs.dtype) bincount = 0 x_left = x_start @@ -295,24 +310,34 @@ def _m4( # set all bins in the left-most entry to the starting left-most x value # (aka a row broadcast). - i_win[bincount] = x_left + x_out[bincount] = x_left # set all y-values to the first value passed in. y_out[bincount] = ys[0] + # full input y-data mx and mn + mx: float = -np.inf + mn: float = np.inf + + # compute OHLC style max / min values per window sized x-frame. for i in range(len(xs)): + x = xs[i] y = ys[i] + if x < x_left + step: # the current window "step" is [bin, bin+1) - y_out[bincount, 1] = min(y, y_out[bincount, 1]) - y_out[bincount, 2] = max(y, y_out[bincount, 2]) + ymn = y_out[bincount, 1] = min(y, y_out[bincount, 1]) + ymx = y_out[bincount, 2] = max(y, y_out[bincount, 2]) y_out[bincount, 3] = y + mx = max(mx, ymx) + mn = min(mn, ymn) + else: # Find the next bin while x >= x_left + step: x_left += step bincount += 1 - i_win[bincount] = x_left + x_out[bincount] = x_left y_out[bincount] = y - return bincount, i_win, y_out + return bincount, x_out, y_out, mn, mx diff --git a/piker/ui/_display.py b/piker/ui/_display.py index 9ad59e30..d63ee17f 100644 --- a/piker/ui/_display.py +++ b/piker/ui/_display.py @@ -105,6 +105,10 @@ def chart_maxmin( mn, mx = out mx_vlm_in_view = 0 + + # TODO: we need to NOT call this to avoid a manual + # np.max/min trigger and especially on the vlm_chart + # flows which aren't shown.. like vlm? if vlm_chart: out = vlm_chart.maxmin() if out: @@ -222,33 +226,9 @@ async def graphics_update_loop( tick_margin = 3 * tick_size chart.show() - # view = chart.view last_quote = time.time() i_last = ohlcv.index - # async def iter_drain_quotes(): - # # NOTE: all code below this loop is expected to be synchronous - # # and thus draw instructions are not picked up jntil the next - # # wait / iteration. - # async for quotes in stream: - # while True: - # try: - # moar = stream.receive_nowait() - # except trio.WouldBlock: - # yield quotes - # break - # else: - # for sym, quote in moar.items(): - # ticks_frame = quote.get('ticks') - # if ticks_frame: - # quotes[sym].setdefault( - # 'ticks', []).extend(ticks_frame) - # print('pulled extra') - - # yield quotes - - # async for quotes in iter_drain_quotes(): - ds = linked.display_state = DisplayState(**{ 'quotes': {}, 'linked': linked, @@ -293,6 +273,7 @@ async def graphics_update_loop( # chart isn't active/shown so skip render cycle and pause feed(s) if chart.linked.isHidden(): + print('skipping update') chart.pause_all_feeds() continue @@ -416,10 +397,8 @@ def graphics_update_cycle( ) or trigger_all ): - # TODO: we should track and compute whether the last - # pixel in a curve should show new data based on uppx - # and then iff update curves and shift? chart.increment_view(steps=i_diff) + # chart.increment_view(steps=i_diff + round(append_diff - uppx)) if vlm_chart: vlm_chart.increment_view(steps=i_diff) @@ -477,7 +456,6 @@ def graphics_update_cycle( ): chart.update_graphics_from_flow( chart.name, - # do_append=uppx < update_uppx, do_append=do_append, ) diff --git a/piker/ui/_flows.py b/piker/ui/_flows.py index 2c2d0b66..48bd89d0 100644 --- a/piker/ui/_flows.py +++ b/piker/ui/_flows.py @@ -337,6 +337,7 @@ class Flow(msgspec.Struct): # , frozen=True): name: str plot: pg.PlotItem graphics: Union[Curve, BarItems] + yrange: tuple[float, float] = None # in some cases a flow may want to change its # graphical "type" or, "form" when downsampling, @@ -386,10 +387,11 @@ class Flow(msgspec.Struct): # , frozen=True): lbar: int, rbar: int, - ) -> tuple[float, float]: + ) -> Optional[tuple[float, float]]: ''' Compute the cached max and min y-range values for a given - x-range determined by ``lbar`` and ``rbar``. + x-range determined by ``lbar`` and ``rbar`` or ``None`` + if no range can be determined (yet). ''' rkey = (lbar, rbar) @@ -399,40 +401,44 @@ class Flow(msgspec.Struct): # , frozen=True): shm = self.shm if shm is None: - mxmn = None + return None - else: # new block for profiling?.. - arr = shm.array + arr = shm.array - # build relative indexes into shm array - # TODO: should we just add/use a method - # on the shm to do this? - ifirst = arr[0]['index'] - slice_view = arr[ - lbar - ifirst: - (rbar - ifirst) + 1 - ] + # build relative indexes into shm array + # TODO: should we just add/use a method + # on the shm to do this? + ifirst = arr[0]['index'] + slice_view = arr[ + lbar - ifirst: + (rbar - ifirst) + 1 + ] - if not slice_view.size: - mxmn = None + if not slice_view.size: + return None + + elif self.yrange: + mxmn = self.yrange + # print(f'{self.name} M4 maxmin: {mxmn}') + + else: + if self.is_ohlc: + ylow = np.min(slice_view['low']) + yhigh = np.max(slice_view['high']) else: - if self.is_ohlc: - ylow = np.min(slice_view['low']) - yhigh = np.max(slice_view['high']) + view = slice_view[self.name] + ylow = np.min(view) + yhigh = np.max(view) - else: - view = slice_view[self.name] - ylow = np.min(view) - yhigh = np.max(view) + mxmn = ylow, yhigh + # print(f'{self.name} MANUAL maxmin: {mxmin}') - mxmn = ylow, yhigh + # cache result for input range + assert mxmn + self._mxmns[rkey] = mxmn - if mxmn is not None: - # cache new mxmn result - self._mxmns[rkey] = mxmn - - return mxmn + return mxmn def view_range(self) -> tuple[int, int]: ''' @@ -628,10 +634,13 @@ class Flow(msgspec.Struct): # , frozen=True): # source data so we clear our path data in prep # to generate a new one from original source data. new_sample_rate = True - showing_src_data = True should_ds = False should_redraw = True + showing_src_data = True + # reset yrange to be computed from source data + self.yrange = None + # MAIN RENDER LOGIC: # - determine in view data and redraw on range change # - determine downsampling ops if needed @@ -657,6 +666,10 @@ class Flow(msgspec.Struct): # , frozen=True): **rkwargs, ) + if showing_src_data: + # print(f"{self.name} SHOWING SOURCE") + # reset yrange to be computed from source data + self.yrange = None if not out: log.warning(f'{self.name} failed to render!?') @@ -664,6 +677,9 @@ class Flow(msgspec.Struct): # , frozen=True): path, data, reset = out + # if self.yrange: + # print(f'flow {self.name} yrange from m4: {self.yrange}') + # XXX: SUPER UGGGHHH... without this we get stale cache # graphics that don't update until you downsampler again.. if reset: @@ -1058,6 +1074,7 @@ class Renderer(msgspec.Struct): # xy-path data transform: convert source data to a format # able to be passed to a `QPainterPath` rendering routine. if not len(hist): + # XXX: this might be why the profiler only has exits? return x_out, y_out, connect = self.format_xy( @@ -1144,11 +1161,14 @@ class Renderer(msgspec.Struct): elif should_ds and uppx > 1: - x_out, y_out = xy_downsample( + x_out, y_out, ymn, ymx = xy_downsample( x_out, y_out, uppx, ) + self.flow.yrange = ymn, ymx + # print(f'{self.flow.name} post ds: ymn, ymx: {ymn},{ymx}') + reset = True profiler(f'FULL PATH downsample redraw={should_ds}') self._in_ds = True diff --git a/piker/ui/_fsp.py b/piker/ui/_fsp.py index 80bdf8d2..5301db6a 100644 --- a/piker/ui/_fsp.py +++ b/piker/ui/_fsp.py @@ -639,20 +639,25 @@ async def open_vlm_displays( names: list[str], ) -> tuple[float, float]: + ''' + Flows "group" maxmin loop; assumes all named flows + are in the same co-domain and thus can be sorted + as one set. + Iterates all the named flows and calls the chart + api to find their range values and return. + + TODO: really we should probably have a more built-in API + for this? + + ''' mx = 0 for name in names: - - mxmn = chart.maxmin(name=name) - if mxmn: - ymax = mxmn[1] - if ymax > mx: - mx = ymax + ymn, ymx = chart.maxmin(name=name) + mx = max(mx, ymx) return 0, mx - chart.view.maxmin = partial(multi_maxmin, names=['volume']) - # TODO: fix the x-axis label issue where if you put # the axis on the left it's totally not lined up... # show volume units value on LHS (for dinkus) @@ -776,6 +781,7 @@ async def open_vlm_displays( ) -> None: for name in names: + if 'dark' in name: color = dark_vlm_color elif 'rate' in name: diff --git a/piker/ui/_interaction.py b/piker/ui/_interaction.py index a659612a..d8f65dd9 100644 --- a/piker/ui/_interaction.py +++ b/piker/ui/_interaction.py @@ -923,6 +923,7 @@ class ChartView(ViewBox): # XXX: super important to be aware of this. # or not flow.graphics.isVisible() ): + # print(f'skipping {flow.name}') continue # pass in no array which will read and render from the last diff --git a/piker/ui/_pathops.py b/piker/ui/_pathops.py index 83b46f43..bbdde19a 100644 --- a/piker/ui/_pathops.py +++ b/piker/ui/_pathops.py @@ -49,12 +49,17 @@ def xy_downsample( x_spacer: float = 0.5, -) -> tuple[np.ndarray, np.ndarray]: +) -> tuple[ + np.ndarray, + np.ndarray, + float, + float, +]: # downsample whenever more then 1 pixels per datum can be shown. # always refresh data bounds until we get diffing # working properly, see above.. - bins, x, y = ds_m4( + bins, x, y, ymn, ymx = ds_m4( x, y, uppx, @@ -67,7 +72,7 @@ def xy_downsample( )).flatten() y = y.flatten() - return x, y + return x, y, ymn, ymx @njit(