From d78b8c4df3e81dafd3ab3d79c5bcc7e5289ca058 Mon Sep 17 00:00:00 2001 From: goodboy Date: Sun, 1 Feb 2026 20:23:52 -0500 Subject: [PATCH] Add batch-submit API for gap annotations Introduce `AnnotCtl.add_batch()` and `serve_rc_annots()` batch handler to submit 1000s of gaps in single IPC msg instead of per-annot round-trips. Server builds `GapAnnotations` from specs and handles vectorized timestamp-to-index lookups. Deats, - add `'cmd': 'batch'` handler in `serve_rc_annots()` - vectorized timestamp lookup via `np.searchsorted()` + masking - build `gap_specs: list[dict]` from rect+arrow specs client-side - create single `GapAnnotations` item for all gaps server-side - handle `GapAnnotations.reposition()` in redraw handler - add profiling to batch path for perf measurement - support optional individual arrows for A/B comparison Also, - refactor `markup_gaps()` to collect specs + single batch call - add `no_qt_updates()` context mgr for batch render ops - add profiling to annotation teardown path - add `GapAnnotations` case to `rm_annot()` match block (this patch was generated in some part by [`claude-code`][claude-code-gh]) [claude-code-gh]: https://github.com/anthropics/claude-code --- piker/tsp/_annotate.py | 119 +++++++---- piker/ui/_remote_ctl.py | 453 +++++++++++++++++++++++++++++++++++++++- 2 files changed, 532 insertions(+), 40 deletions(-) diff --git a/piker/tsp/_annotate.py b/piker/tsp/_annotate.py index 7e91300f..bbf50437 100644 --- a/piker/tsp/_annotate.py +++ b/piker/tsp/_annotate.py @@ -30,6 +30,11 @@ import tractor from piker.data._formatters import BGM from piker.storage import log +from piker.toolz.profile import ( + Profiler, + pg_profile_enabled, + ms_slower_then, +) from piker.ui._style import get_fonts if TYPE_CHECKING: @@ -92,12 +97,22 @@ async def markup_gaps( # gap's duration. show_txt: bool = False, + # A/B comparison: render individual arrows alongside batch + # for visual comparison + show_individual_arrows: bool = False, + ) -> dict[int, dict]: ''' Remote annotate time-gaps in a dt-fielded ts (normally OHLC) with rectangles. ''' + profiler = Profiler( + msg=f'markup_gaps() for {gaps.height} gaps', + disabled=False, + ms_threshold=0.0, + ) + # XXX: force chart redraw FIRST to ensure PlotItem coordinate # system is properly initialized before we position annotations! # Without this, annotations may be misaligned on first creation @@ -106,6 +121,19 @@ async def markup_gaps( fqme=fqme, timeframe=timeframe, ) + profiler('first `.redraw()` before annot creation') + + log.info( + f'markup_gaps() called:\n' + f' fqme: {fqme}\n' + f' timeframe: {timeframe}s\n' + f' gaps.height: {gaps.height}\n' + ) + + # collect all annotation specs for batch submission + rect_specs: list[dict] = [] + arrow_specs: list[dict] = [] + text_specs: list[dict] = [] aids: dict[int] = {} for i in range(gaps.height): @@ -217,56 +245,38 @@ async def markup_gaps( # 1: 'wine', # down-gap # }[sgn] - rect_kwargs: dict[str, Any] = dict( - fqme=fqme, - timeframe=timeframe, + # collect rect spec (no fqme/timeframe, added by batch + # API) + rect_spec: dict[str, Any] = dict( + meth='set_view_pos', start_pos=lc, end_pos=ro, color=color, + update_label=False, start_time=start_time, end_time=end_time, ) + rect_specs.append(rect_spec) - # add up/down rects - aid: int|None = await actl.add_rect(**rect_kwargs) - if aid is None: - log.error( - f'Failed to add rect for,\n' - f'{rect_kwargs!r}\n' - f'\n' - f'Skipping to next gap!\n' - ) - continue - - assert aid - aids[aid] = rect_kwargs direction: str = ( 'down' if down_gap else 'up' ) - # TODO! mk this a `msgspec.Struct` which we deserialize - # on the server side! - # XXX: send timestamp for server-side index lookup - # to ensure alignment with current shm state + + # collect arrow spec gap_time: float = row['time'][0] - arrow_kwargs: dict[str, Any] = dict( - fqme=fqme, - timeframe=timeframe, + arrow_spec: dict[str, Any] = dict( x=iend, # fallback if timestamp lookup fails y=cls, time=gap_time, # for server-side index lookup color=color, alpha=169, pointing=direction, - # TODO: expose these as params to markup_gaps()? headLen=10, headWidth=2.222, pxMode=True, ) - - aid: int = await actl.add_arrow( - **arrow_kwargs - ) + arrow_specs.append(arrow_spec) # add duration label to RHS of arrow if up_gap: @@ -278,15 +288,12 @@ async def markup_gaps( assert flat anchor = (0, 0) # up from bottom - # use a slightly smaller font for gap label txt. - font, small_font = get_fonts() - font_size: int = small_font.px_size - 1 - assert isinstance(font_size, int) - + # collect text spec if enabled if show_txt: - text_aid: int = await actl.add_text( - fqme=fqme, - timeframe=timeframe, + font, small_font = get_fonts() + font_size: int = small_font.px_size - 1 + + text_spec: dict[str, Any] = dict( text=gap_label, x=iend + 1, # fallback if timestamp lookup fails y=cls, @@ -295,12 +302,46 @@ async def markup_gaps( anchor=anchor, font_size=font_size, ) - aids[text_aid] = {'text': gap_label} + text_specs.append(text_spec) - # tell chart to redraw all its - # graphics view layers Bo + # submit all annotations in single batch IPC msg + log.info( + f'Submitting batch annotations:\n' + f' rects: {len(rect_specs)}\n' + f' arrows: {len(arrow_specs)}\n' + f' texts: {len(text_specs)}\n' + ) + profiler('built all annotation specs') + + result: dict[str, list[int]] = await actl.add_batch( + fqme=fqme, + timeframe=timeframe, + rects=rect_specs, + arrows=arrow_specs, + texts=text_specs, + show_individual_arrows=show_individual_arrows, + ) + profiler('batch `.add_batch()` IPC call complete') + + # build aids dict from batch results + for aid in result['rects']: + aids[aid] = {'type': 'rect'} + for aid in result['arrows']: + aids[aid] = {'type': 'arrow'} + for aid in result['texts']: + aids[aid] = {'type': 'text'} + + log.info( + f'Batch submission complete: {len(aids)} annotation(s) ' + f'created' + ) + profiler('built aids result dict') + + # tell chart to redraw all its graphics view layers await actl.redraw( fqme=fqme, timeframe=timeframe, ) + profiler('final `.redraw()` after annot creation') + return aids diff --git a/piker/ui/_remote_ctl.py b/piker/ui/_remote_ctl.py index f67f80ad..6db07d4e 100644 --- a/piker/ui/_remote_ctl.py +++ b/piker/ui/_remote_ctl.py @@ -22,6 +22,7 @@ a chart from some other actor. from __future__ import annotations from contextlib import ( asynccontextmanager as acm, + contextmanager as cm, AsyncExitStack, ) from functools import partial @@ -46,6 +47,7 @@ from piker.log import get_logger from piker.types import Struct from piker.service import find_service from piker.brokers import SymbolNotFound +from piker.toolz import Profiler from piker.ui.qt import ( QGraphicsItem, ) @@ -98,6 +100,8 @@ def rm_annot( annot: ArrowEditor|SelectRect|pg.TextItem ) -> bool: global _editors + from piker.ui._annotate import GapAnnotations + match annot: case pg.ArrowItem(): editor = _editors[annot._uid] @@ -122,9 +126,35 @@ def rm_annot( scene.removeItem(annot) return True + case GapAnnotations(): + scene = annot.scene() + if scene: + scene.removeItem(annot) + return True + return False +@cm +def no_qt_updates(*items): + ''' + Disable Qt widget/item updates during context to batch + render operations and only trigger single repaint on exit. + + Accepts both QWidgets and QGraphicsItems. + + ''' + for item in items: + if hasattr(item, 'setUpdatesEnabled'): + item.setUpdatesEnabled(False) + try: + yield + finally: + for item in items: + if hasattr(item, 'setUpdatesEnabled'): + item.setUpdatesEnabled(True) + + async def serve_rc_annots( ipc_key: str, annot_req_stream: MsgStream, @@ -429,6 +459,333 @@ async def serve_rc_annots( aids.add(aid) await annot_req_stream.send(aid) + case { + 'cmd': 'batch', + 'fqme': fqme, + 'timeframe': timeframe, + 'rects': list(rect_specs), + 'arrows': list(arrow_specs), + 'texts': list(text_specs), + 'show_individual_arrows': bool(show_individual_arrows), + }: + # batch submission handler - process multiple + # annotations in single IPC round-trip + ds: DisplayState = _dss[fqme] + try: + chart: ChartPlotWidget = { + 60: ds.hist_chart, + 1: ds.chart, + }[timeframe] + except KeyError: + msg: str = ( + f'No chart for timeframe={timeframe}s, ' + f'skipping batch annotation' + ) + log.error(msg) + await annot_req_stream.send({'error': msg}) + continue + + cv: ChartView = chart.cv + viz: Viz = chart.get_viz(fqme) + shm = viz.shm + arr = shm.array + + result: dict[str, list[int]] = { + 'rects': [], + 'arrows': [], + 'texts': [], + } + + profiler = Profiler( + msg=( + f'Batch annotate {len(rect_specs)} gaps ' + f'on {fqme}@{timeframe}s' + ), + disabled=False, + delayed=False, + ) + + aids_set: set[int] = ctxs[ipc_key][1] + + # build unified gap_specs for GapAnnotations class + from piker.ui._annotate import GapAnnotations + + gap_specs: list[dict] = [] + n_gaps: int = max( + len(rect_specs), + len(arrow_specs), + ) + profiler('setup batch annot creation') + + # collect all unique timestamps for vectorized lookup + timestamps: list[float] = [] + for rect_spec in rect_specs: + if start_time := rect_spec.get('start_time'): + timestamps.append(start_time) + if end_time := rect_spec.get('end_time'): + timestamps.append(end_time) + for arrow_spec in arrow_specs: + if time_val := arrow_spec.get('time'): + timestamps.append(time_val) + + profiler('collect `timestamps: list` complet!') + + # build timestamp -> row mapping using binary search + # O(m log n) instead of O(n*m) with np.isin + time_to_row: dict[float, dict] = {} + if timestamps: + import numpy as np + time_arr = arr['time'] + ts_array = np.array(timestamps) + + # binary search for each timestamp in sorted time array + search_indices = np.searchsorted( + time_arr, + ts_array, + ) + + profiler('`np.searchsorted()` complete!') + + # vectorized bounds check and exact match verification + valid_mask = ( + (search_indices < len(arr)) + & (time_arr[search_indices] == ts_array) + ) + + # get all valid indices and timestamps + valid_indices = search_indices[valid_mask] + valid_timestamps = ts_array[valid_mask] + + # use fancy indexing to get all rows at once + matched_rows = arr[valid_indices] + + # extract fields to plain arrays BEFORE dict building + indices_arr = matched_rows['index'].astype(float) + opens_arr = matched_rows['open'].astype(float) + closes_arr = matched_rows['close'].astype(float) + + profiler('extracted field arrays') + + # build dict from plain arrays (much faster) + time_to_row: dict[float, dict] = { + float(ts): { + 'index': idx, + 'open': opn, + 'close': cls, + } + for ( + ts, + idx, + opn, + cls, + ) in zip( + valid_timestamps, + indices_arr, + opens_arr, + closes_arr, + ) + } + + profiler('`time_to_row` creation complete!') + + profiler(f'built timestamp lookup for {len(timestamps)} times') + + # build gap_specs from rect+arrow specs + for i in range(n_gaps): + gap_spec: dict = {} + + # get rect spec for this gap + if i < len(rect_specs): + rect_spec: dict = rect_specs[i].copy() + start_time = rect_spec.get('start_time') + end_time = rect_spec.get('end_time') + + if ( + start_time is not None + and end_time is not None + ): + # lookup from pre-built mapping + start_row = time_to_row.get(start_time) + end_row = time_to_row.get(end_time) + + if ( + start_row is None + or end_row is None + ): + log.warning( + f'Timestamp lookup failed for ' + f'gap[{i}], skipping' + ) + continue + + start_idx = start_row['index'] + end_idx = end_row['index'] + start_close = start_row['close'] + end_open = end_row['open'] + + from_idx: float = 0.16 - 0.06 + gap_spec['start_pos'] = ( + start_idx + 1 - from_idx, + start_close, + ) + gap_spec['end_pos'] = ( + end_idx + from_idx, + end_open, + ) + gap_spec['start_time'] = start_time + gap_spec['end_time'] = end_time + gap_spec['color'] = rect_spec.get( + 'color', + 'dad_blue', + ) + + # get arrow spec for this gap + if i < len(arrow_specs): + arrow_spec: dict = arrow_specs[i].copy() + x: float = float(arrow_spec.get('x', 0)) + y: float = float(arrow_spec.get('y', 0)) + time_val: float|None = arrow_spec.get('time') + + # timestamp-based index lookup (only for x, NOT y!) + # y is already set to the PREVIOUS bar's close + if time_val is not None: + arrow_row = time_to_row.get(time_val) + if arrow_row is not None: + x = arrow_row['index'] + # NOTE: do NOT update y! it's the + # previous bar's close, not current + else: + log.warning( + f'Arrow timestamp {time_val} not ' + f'found for gap[{i}], using x={x}' + ) + + gap_spec['arrow_x'] = x + gap_spec['arrow_y'] = y + gap_spec['time'] = time_val + gap_spec['pointing'] = arrow_spec.get( + 'pointing', + 'down', + ) + gap_spec['alpha'] = arrow_spec.get('alpha', 169) + + gap_specs.append(gap_spec) + + profiler(f'built {len(gap_specs)} gap_specs') + + # create single GapAnnotations item for all gaps + if gap_specs: + gaps_item = GapAnnotations( + gap_specs=gap_specs, + array=arr, + color=gap_specs[0].get('color', 'dad_blue'), + alpha=gap_specs[0].get('alpha', 169), + arrow_size=10.0, + fqme=fqme, + timeframe=timeframe, + ) + chart.plotItem.addItem(gaps_item) + + # register single item for repositioning + aid: int = id(gaps_item) + annots[aid] = gaps_item + aids_set.add(aid) + result['rects'].append(aid) + profiler( + f'created GapAnnotations item for {len(gap_specs)} ' + f'gaps' + ) + + # A/B comparison: optionally create individual arrows + # alongside batch for visual comparison + if show_individual_arrows: + godw = chart.linked.godwidget + arrows: ArrowEditor = ArrowEditor(godw=godw) + for i, spec in enumerate(gap_specs): + if 'arrow_x' not in spec: + continue + + aid_str: str = str(uuid4()) + arrow: pg.ArrowItem = arrows.add( + plot=chart.plotItem, + uid=aid_str, + x=spec['arrow_x'], + y=spec['arrow_y'], + pointing=spec['pointing'], + color='bracket', # different color + alpha=spec.get('alpha', 169), + headLen=10.0, + headWidth=2.222, + pxMode=True, + ) + arrow._abs_x = spec['arrow_x'] + arrow._abs_y = spec['arrow_y'] + + annots[aid_str] = arrow + _editors[aid_str] = arrows + aids_set.add(aid_str) + result['arrows'].append(aid_str) + + profiler( + f'created {len(gap_specs)} individual arrows ' + f'for comparison' + ) + + # handle text items separately (less common, keep + # individual items) + n_texts: int = 0 + for text_spec in text_specs: + kwargs: dict = text_spec.copy() + text: str = kwargs.pop('text') + x: float = float(kwargs.pop('x')) + y: float = float(kwargs.pop('y')) + time_val: float|None = kwargs.pop('time', None) + + # timestamp-based index lookup + if time_val is not None: + matches = arr[arr['time'] == time_val] + if len(matches) > 0: + x = float(matches[0]['index']) + y = float(matches[0]['close']) + + color = kwargs.pop('color', 'dad_blue') + anchor = kwargs.pop('anchor', (0, 1)) + font_size = kwargs.pop('font_size', None) + + text_item: pg.TextItem = pg.TextItem( + text, + color=hcolor(color), + anchor=anchor, + ) + + if font_size is None: + from ._style import get_fonts + font, font_small = get_fonts() + font_size = font_small.px_size - 1 + + qfont: QFont = text_item.textItem.font() + qfont.setPixelSize(font_size) + text_item.setFont(qfont) + + text_item.setPos(float(x), float(y)) + chart.plotItem.addItem(text_item) + + text_item._abs_x = float(x) + text_item._abs_y = float(y) + + aid: str = str(uuid4()) + annots[aid] = text_item + aids_set.add(aid) + result['texts'].append(aid) + n_texts += 1 + + profiler( + f'created text annotations: {n_texts} texts' + ) + profiler.finish() + + await annot_req_stream.send(result) + case { 'cmd': 'remove', 'aid': int(aid)|str(aid), @@ -471,10 +828,26 @@ async def serve_rc_annots( # XXX: reposition all annotations to ensure they # stay aligned with viz data after reset (eg during # backfill when abs-index range changes) + chart: ChartPlotWidget = { + 60: ds.hist_chart, + 1: ds.chart, + }[timeframe] + viz: Viz = chart.get_viz(fqme) + arr = viz.shm.array + n_repositioned: int = 0 for aid, annot in annots.items(): + # GapAnnotations batch items have .reposition() + if hasattr(annot, 'reposition'): + annot.reposition( + array=arr, + fqme=fqme, + timeframe=timeframe, + ) + n_repositioned += 1 + # arrows and text items use abs x,y coords - if ( + elif ( hasattr(annot, '_abs_x') and hasattr(annot, '_abs_y') @@ -539,12 +912,21 @@ async def remote_annotate( finally: # ensure all annots for this connection are deleted # on any final teardown + profiler = Profiler( + msg=f'Annotation teardown for ctx {ctx.cid}', + disabled=False, + ms_threshold=0.0, + ) (_ctx, aids) = _ctxs[ctx.cid] assert _ctx is ctx + profiler(f'got {len(aids)} aids to remove') + for aid in aids: annot: QGraphicsItem = _annots[aid] assert rm_annot(annot) + profiler(f'removed all {len(aids)} annotations') + class AnnotCtl(Struct): ''' @@ -746,6 +1128,64 @@ class AnnotCtl(Struct): ) return aid + async def add_batch( + self, + fqme: str, + timeframe: float, + rects: list[dict]|None = None, + arrows: list[dict]|None = None, + texts: list[dict]|None = None, + show_individual_arrows: bool = False, + + from_acm: bool = False, + + ) -> dict[str, list[int]]: + ''' + Batch submit multiple annotations in single IPC msg for + much faster remote annotation vs. per-annot round-trips. + + Returns dict of annotation IDs: + { + 'rects': [aid1, aid2, ...], + 'arrows': [aid3, aid4, ...], + 'texts': [aid5, aid6, ...], + } + + ''' + ipc: MsgStream = self._get_ipc(fqme) + with trio.fail_after(10): + await ipc.send({ + 'fqme': fqme, + 'cmd': 'batch', + 'timeframe': timeframe, + 'rects': rects or [], + 'arrows': arrows or [], + 'texts': texts or [], + 'show_individual_arrows': show_individual_arrows, + }) + result: dict = await ipc.receive() + match result: + case {'error': str(msg)}: + log.error(msg) + return { + 'rects': [], + 'arrows': [], + 'texts': [], + } + + # register all AIDs with their IPC streams + for aid_list in result.values(): + for aid in aid_list: + self._ipcs[aid] = ipc + if not from_acm: + self._annot_stack.push_async_callback( + partial( + self.remove, + aid, + ) + ) + return result + async def add_text( self, fqme: str, @@ -881,3 +1321,14 @@ async def open_annot_ctl( _annot_stack=annots_stack, ) yield client + + # client exited, measure teardown time + teardown_profiler = Profiler( + msg='Client AnnotCtl teardown', + disabled=False, + ms_threshold=0.0, + ) + teardown_profiler('exiting annots_stack') + + teardown_profiler('annots_stack exited') + teardown_profiler('exiting gather_contexts')