Add batch-submit API for gap annotations
Introduce `AnnotCtl.add_batch()` and `serve_rc_annots()` batch handler to submit 1000s of gaps in single IPC msg instead of per-annot round-trips. Server builds `GapAnnotations` from specs and handles vectorized timestamp-to-index lookups. Deats, - add `'cmd': 'batch'` handler in `serve_rc_annots()` - vectorized timestamp lookup via `np.searchsorted()` + masking - build `gap_specs: list[dict]` from rect+arrow specs client-side - create single `GapAnnotations` item for all gaps server-side - handle `GapAnnotations.reposition()` in redraw handler - add profiling to batch path for perf measurement - support optional individual arrows for A/B comparison Also, - refactor `markup_gaps()` to collect specs + single batch call - add `no_qt_updates()` context mgr for batch render ops - add profiling to annotation teardown path - add `GapAnnotations` case to `rm_annot()` match block (this patch was generated in some part by [`claude-code`][claude-code-gh]) [claude-code-gh]: https://github.com/anthropics/claude-codegap_annotator
parent
16c770a808
commit
d78b8c4df3
|
|
@ -30,6 +30,11 @@ import tractor
|
||||||
|
|
||||||
from piker.data._formatters import BGM
|
from piker.data._formatters import BGM
|
||||||
from piker.storage import log
|
from piker.storage import log
|
||||||
|
from piker.toolz.profile import (
|
||||||
|
Profiler,
|
||||||
|
pg_profile_enabled,
|
||||||
|
ms_slower_then,
|
||||||
|
)
|
||||||
from piker.ui._style import get_fonts
|
from piker.ui._style import get_fonts
|
||||||
|
|
||||||
if TYPE_CHECKING:
|
if TYPE_CHECKING:
|
||||||
|
|
@ -92,12 +97,22 @@ async def markup_gaps(
|
||||||
# gap's duration.
|
# gap's duration.
|
||||||
show_txt: bool = False,
|
show_txt: bool = False,
|
||||||
|
|
||||||
|
# A/B comparison: render individual arrows alongside batch
|
||||||
|
# for visual comparison
|
||||||
|
show_individual_arrows: bool = False,
|
||||||
|
|
||||||
) -> dict[int, dict]:
|
) -> dict[int, dict]:
|
||||||
'''
|
'''
|
||||||
Remote annotate time-gaps in a dt-fielded ts (normally OHLC)
|
Remote annotate time-gaps in a dt-fielded ts (normally OHLC)
|
||||||
with rectangles.
|
with rectangles.
|
||||||
|
|
||||||
'''
|
'''
|
||||||
|
profiler = Profiler(
|
||||||
|
msg=f'markup_gaps() for {gaps.height} gaps',
|
||||||
|
disabled=False,
|
||||||
|
ms_threshold=0.0,
|
||||||
|
)
|
||||||
|
|
||||||
# XXX: force chart redraw FIRST to ensure PlotItem coordinate
|
# XXX: force chart redraw FIRST to ensure PlotItem coordinate
|
||||||
# system is properly initialized before we position annotations!
|
# system is properly initialized before we position annotations!
|
||||||
# Without this, annotations may be misaligned on first creation
|
# Without this, annotations may be misaligned on first creation
|
||||||
|
|
@ -106,6 +121,19 @@ async def markup_gaps(
|
||||||
fqme=fqme,
|
fqme=fqme,
|
||||||
timeframe=timeframe,
|
timeframe=timeframe,
|
||||||
)
|
)
|
||||||
|
profiler('first `.redraw()` before annot creation')
|
||||||
|
|
||||||
|
log.info(
|
||||||
|
f'markup_gaps() called:\n'
|
||||||
|
f' fqme: {fqme}\n'
|
||||||
|
f' timeframe: {timeframe}s\n'
|
||||||
|
f' gaps.height: {gaps.height}\n'
|
||||||
|
)
|
||||||
|
|
||||||
|
# collect all annotation specs for batch submission
|
||||||
|
rect_specs: list[dict] = []
|
||||||
|
arrow_specs: list[dict] = []
|
||||||
|
text_specs: list[dict] = []
|
||||||
|
|
||||||
aids: dict[int] = {}
|
aids: dict[int] = {}
|
||||||
for i in range(gaps.height):
|
for i in range(gaps.height):
|
||||||
|
|
@ -217,56 +245,38 @@ async def markup_gaps(
|
||||||
# 1: 'wine', # down-gap
|
# 1: 'wine', # down-gap
|
||||||
# }[sgn]
|
# }[sgn]
|
||||||
|
|
||||||
rect_kwargs: dict[str, Any] = dict(
|
# collect rect spec (no fqme/timeframe, added by batch
|
||||||
fqme=fqme,
|
# API)
|
||||||
timeframe=timeframe,
|
rect_spec: dict[str, Any] = dict(
|
||||||
|
meth='set_view_pos',
|
||||||
start_pos=lc,
|
start_pos=lc,
|
||||||
end_pos=ro,
|
end_pos=ro,
|
||||||
color=color,
|
color=color,
|
||||||
|
update_label=False,
|
||||||
start_time=start_time,
|
start_time=start_time,
|
||||||
end_time=end_time,
|
end_time=end_time,
|
||||||
)
|
)
|
||||||
|
rect_specs.append(rect_spec)
|
||||||
|
|
||||||
# add up/down rects
|
|
||||||
aid: int|None = await actl.add_rect(**rect_kwargs)
|
|
||||||
if aid is None:
|
|
||||||
log.error(
|
|
||||||
f'Failed to add rect for,\n'
|
|
||||||
f'{rect_kwargs!r}\n'
|
|
||||||
f'\n'
|
|
||||||
f'Skipping to next gap!\n'
|
|
||||||
)
|
|
||||||
continue
|
|
||||||
|
|
||||||
assert aid
|
|
||||||
aids[aid] = rect_kwargs
|
|
||||||
direction: str = (
|
direction: str = (
|
||||||
'down' if down_gap
|
'down' if down_gap
|
||||||
else 'up'
|
else 'up'
|
||||||
)
|
)
|
||||||
# TODO! mk this a `msgspec.Struct` which we deserialize
|
|
||||||
# on the server side!
|
# collect arrow spec
|
||||||
# XXX: send timestamp for server-side index lookup
|
|
||||||
# to ensure alignment with current shm state
|
|
||||||
gap_time: float = row['time'][0]
|
gap_time: float = row['time'][0]
|
||||||
arrow_kwargs: dict[str, Any] = dict(
|
arrow_spec: dict[str, Any] = dict(
|
||||||
fqme=fqme,
|
|
||||||
timeframe=timeframe,
|
|
||||||
x=iend, # fallback if timestamp lookup fails
|
x=iend, # fallback if timestamp lookup fails
|
||||||
y=cls,
|
y=cls,
|
||||||
time=gap_time, # for server-side index lookup
|
time=gap_time, # for server-side index lookup
|
||||||
color=color,
|
color=color,
|
||||||
alpha=169,
|
alpha=169,
|
||||||
pointing=direction,
|
pointing=direction,
|
||||||
# TODO: expose these as params to markup_gaps()?
|
|
||||||
headLen=10,
|
headLen=10,
|
||||||
headWidth=2.222,
|
headWidth=2.222,
|
||||||
pxMode=True,
|
pxMode=True,
|
||||||
)
|
)
|
||||||
|
arrow_specs.append(arrow_spec)
|
||||||
aid: int = await actl.add_arrow(
|
|
||||||
**arrow_kwargs
|
|
||||||
)
|
|
||||||
|
|
||||||
# add duration label to RHS of arrow
|
# add duration label to RHS of arrow
|
||||||
if up_gap:
|
if up_gap:
|
||||||
|
|
@ -278,15 +288,12 @@ async def markup_gaps(
|
||||||
assert flat
|
assert flat
|
||||||
anchor = (0, 0) # up from bottom
|
anchor = (0, 0) # up from bottom
|
||||||
|
|
||||||
# use a slightly smaller font for gap label txt.
|
# collect text spec if enabled
|
||||||
font, small_font = get_fonts()
|
|
||||||
font_size: int = small_font.px_size - 1
|
|
||||||
assert isinstance(font_size, int)
|
|
||||||
|
|
||||||
if show_txt:
|
if show_txt:
|
||||||
text_aid: int = await actl.add_text(
|
font, small_font = get_fonts()
|
||||||
fqme=fqme,
|
font_size: int = small_font.px_size - 1
|
||||||
timeframe=timeframe,
|
|
||||||
|
text_spec: dict[str, Any] = dict(
|
||||||
text=gap_label,
|
text=gap_label,
|
||||||
x=iend + 1, # fallback if timestamp lookup fails
|
x=iend + 1, # fallback if timestamp lookup fails
|
||||||
y=cls,
|
y=cls,
|
||||||
|
|
@ -295,12 +302,46 @@ async def markup_gaps(
|
||||||
anchor=anchor,
|
anchor=anchor,
|
||||||
font_size=font_size,
|
font_size=font_size,
|
||||||
)
|
)
|
||||||
aids[text_aid] = {'text': gap_label}
|
text_specs.append(text_spec)
|
||||||
|
|
||||||
# tell chart to redraw all its
|
# submit all annotations in single batch IPC msg
|
||||||
# graphics view layers Bo
|
log.info(
|
||||||
|
f'Submitting batch annotations:\n'
|
||||||
|
f' rects: {len(rect_specs)}\n'
|
||||||
|
f' arrows: {len(arrow_specs)}\n'
|
||||||
|
f' texts: {len(text_specs)}\n'
|
||||||
|
)
|
||||||
|
profiler('built all annotation specs')
|
||||||
|
|
||||||
|
result: dict[str, list[int]] = await actl.add_batch(
|
||||||
|
fqme=fqme,
|
||||||
|
timeframe=timeframe,
|
||||||
|
rects=rect_specs,
|
||||||
|
arrows=arrow_specs,
|
||||||
|
texts=text_specs,
|
||||||
|
show_individual_arrows=show_individual_arrows,
|
||||||
|
)
|
||||||
|
profiler('batch `.add_batch()` IPC call complete')
|
||||||
|
|
||||||
|
# build aids dict from batch results
|
||||||
|
for aid in result['rects']:
|
||||||
|
aids[aid] = {'type': 'rect'}
|
||||||
|
for aid in result['arrows']:
|
||||||
|
aids[aid] = {'type': 'arrow'}
|
||||||
|
for aid in result['texts']:
|
||||||
|
aids[aid] = {'type': 'text'}
|
||||||
|
|
||||||
|
log.info(
|
||||||
|
f'Batch submission complete: {len(aids)} annotation(s) '
|
||||||
|
f'created'
|
||||||
|
)
|
||||||
|
profiler('built aids result dict')
|
||||||
|
|
||||||
|
# tell chart to redraw all its graphics view layers
|
||||||
await actl.redraw(
|
await actl.redraw(
|
||||||
fqme=fqme,
|
fqme=fqme,
|
||||||
timeframe=timeframe,
|
timeframe=timeframe,
|
||||||
)
|
)
|
||||||
|
profiler('final `.redraw()` after annot creation')
|
||||||
|
|
||||||
return aids
|
return aids
|
||||||
|
|
|
||||||
|
|
@ -22,6 +22,7 @@ a chart from some other actor.
|
||||||
from __future__ import annotations
|
from __future__ import annotations
|
||||||
from contextlib import (
|
from contextlib import (
|
||||||
asynccontextmanager as acm,
|
asynccontextmanager as acm,
|
||||||
|
contextmanager as cm,
|
||||||
AsyncExitStack,
|
AsyncExitStack,
|
||||||
)
|
)
|
||||||
from functools import partial
|
from functools import partial
|
||||||
|
|
@ -46,6 +47,7 @@ from piker.log import get_logger
|
||||||
from piker.types import Struct
|
from piker.types import Struct
|
||||||
from piker.service import find_service
|
from piker.service import find_service
|
||||||
from piker.brokers import SymbolNotFound
|
from piker.brokers import SymbolNotFound
|
||||||
|
from piker.toolz import Profiler
|
||||||
from piker.ui.qt import (
|
from piker.ui.qt import (
|
||||||
QGraphicsItem,
|
QGraphicsItem,
|
||||||
)
|
)
|
||||||
|
|
@ -98,6 +100,8 @@ def rm_annot(
|
||||||
annot: ArrowEditor|SelectRect|pg.TextItem
|
annot: ArrowEditor|SelectRect|pg.TextItem
|
||||||
) -> bool:
|
) -> bool:
|
||||||
global _editors
|
global _editors
|
||||||
|
from piker.ui._annotate import GapAnnotations
|
||||||
|
|
||||||
match annot:
|
match annot:
|
||||||
case pg.ArrowItem():
|
case pg.ArrowItem():
|
||||||
editor = _editors[annot._uid]
|
editor = _editors[annot._uid]
|
||||||
|
|
@ -122,9 +126,35 @@ def rm_annot(
|
||||||
scene.removeItem(annot)
|
scene.removeItem(annot)
|
||||||
return True
|
return True
|
||||||
|
|
||||||
|
case GapAnnotations():
|
||||||
|
scene = annot.scene()
|
||||||
|
if scene:
|
||||||
|
scene.removeItem(annot)
|
||||||
|
return True
|
||||||
|
|
||||||
return False
|
return False
|
||||||
|
|
||||||
|
|
||||||
|
@cm
|
||||||
|
def no_qt_updates(*items):
|
||||||
|
'''
|
||||||
|
Disable Qt widget/item updates during context to batch
|
||||||
|
render operations and only trigger single repaint on exit.
|
||||||
|
|
||||||
|
Accepts both QWidgets and QGraphicsItems.
|
||||||
|
|
||||||
|
'''
|
||||||
|
for item in items:
|
||||||
|
if hasattr(item, 'setUpdatesEnabled'):
|
||||||
|
item.setUpdatesEnabled(False)
|
||||||
|
try:
|
||||||
|
yield
|
||||||
|
finally:
|
||||||
|
for item in items:
|
||||||
|
if hasattr(item, 'setUpdatesEnabled'):
|
||||||
|
item.setUpdatesEnabled(True)
|
||||||
|
|
||||||
|
|
||||||
async def serve_rc_annots(
|
async def serve_rc_annots(
|
||||||
ipc_key: str,
|
ipc_key: str,
|
||||||
annot_req_stream: MsgStream,
|
annot_req_stream: MsgStream,
|
||||||
|
|
@ -429,6 +459,333 @@ async def serve_rc_annots(
|
||||||
aids.add(aid)
|
aids.add(aid)
|
||||||
await annot_req_stream.send(aid)
|
await annot_req_stream.send(aid)
|
||||||
|
|
||||||
|
case {
|
||||||
|
'cmd': 'batch',
|
||||||
|
'fqme': fqme,
|
||||||
|
'timeframe': timeframe,
|
||||||
|
'rects': list(rect_specs),
|
||||||
|
'arrows': list(arrow_specs),
|
||||||
|
'texts': list(text_specs),
|
||||||
|
'show_individual_arrows': bool(show_individual_arrows),
|
||||||
|
}:
|
||||||
|
# batch submission handler - process multiple
|
||||||
|
# annotations in single IPC round-trip
|
||||||
|
ds: DisplayState = _dss[fqme]
|
||||||
|
try:
|
||||||
|
chart: ChartPlotWidget = {
|
||||||
|
60: ds.hist_chart,
|
||||||
|
1: ds.chart,
|
||||||
|
}[timeframe]
|
||||||
|
except KeyError:
|
||||||
|
msg: str = (
|
||||||
|
f'No chart for timeframe={timeframe}s, '
|
||||||
|
f'skipping batch annotation'
|
||||||
|
)
|
||||||
|
log.error(msg)
|
||||||
|
await annot_req_stream.send({'error': msg})
|
||||||
|
continue
|
||||||
|
|
||||||
|
cv: ChartView = chart.cv
|
||||||
|
viz: Viz = chart.get_viz(fqme)
|
||||||
|
shm = viz.shm
|
||||||
|
arr = shm.array
|
||||||
|
|
||||||
|
result: dict[str, list[int]] = {
|
||||||
|
'rects': [],
|
||||||
|
'arrows': [],
|
||||||
|
'texts': [],
|
||||||
|
}
|
||||||
|
|
||||||
|
profiler = Profiler(
|
||||||
|
msg=(
|
||||||
|
f'Batch annotate {len(rect_specs)} gaps '
|
||||||
|
f'on {fqme}@{timeframe}s'
|
||||||
|
),
|
||||||
|
disabled=False,
|
||||||
|
delayed=False,
|
||||||
|
)
|
||||||
|
|
||||||
|
aids_set: set[int] = ctxs[ipc_key][1]
|
||||||
|
|
||||||
|
# build unified gap_specs for GapAnnotations class
|
||||||
|
from piker.ui._annotate import GapAnnotations
|
||||||
|
|
||||||
|
gap_specs: list[dict] = []
|
||||||
|
n_gaps: int = max(
|
||||||
|
len(rect_specs),
|
||||||
|
len(arrow_specs),
|
||||||
|
)
|
||||||
|
profiler('setup batch annot creation')
|
||||||
|
|
||||||
|
# collect all unique timestamps for vectorized lookup
|
||||||
|
timestamps: list[float] = []
|
||||||
|
for rect_spec in rect_specs:
|
||||||
|
if start_time := rect_spec.get('start_time'):
|
||||||
|
timestamps.append(start_time)
|
||||||
|
if end_time := rect_spec.get('end_time'):
|
||||||
|
timestamps.append(end_time)
|
||||||
|
for arrow_spec in arrow_specs:
|
||||||
|
if time_val := arrow_spec.get('time'):
|
||||||
|
timestamps.append(time_val)
|
||||||
|
|
||||||
|
profiler('collect `timestamps: list` complet!')
|
||||||
|
|
||||||
|
# build timestamp -> row mapping using binary search
|
||||||
|
# O(m log n) instead of O(n*m) with np.isin
|
||||||
|
time_to_row: dict[float, dict] = {}
|
||||||
|
if timestamps:
|
||||||
|
import numpy as np
|
||||||
|
time_arr = arr['time']
|
||||||
|
ts_array = np.array(timestamps)
|
||||||
|
|
||||||
|
# binary search for each timestamp in sorted time array
|
||||||
|
search_indices = np.searchsorted(
|
||||||
|
time_arr,
|
||||||
|
ts_array,
|
||||||
|
)
|
||||||
|
|
||||||
|
profiler('`np.searchsorted()` complete!')
|
||||||
|
|
||||||
|
# vectorized bounds check and exact match verification
|
||||||
|
valid_mask = (
|
||||||
|
(search_indices < len(arr))
|
||||||
|
& (time_arr[search_indices] == ts_array)
|
||||||
|
)
|
||||||
|
|
||||||
|
# get all valid indices and timestamps
|
||||||
|
valid_indices = search_indices[valid_mask]
|
||||||
|
valid_timestamps = ts_array[valid_mask]
|
||||||
|
|
||||||
|
# use fancy indexing to get all rows at once
|
||||||
|
matched_rows = arr[valid_indices]
|
||||||
|
|
||||||
|
# extract fields to plain arrays BEFORE dict building
|
||||||
|
indices_arr = matched_rows['index'].astype(float)
|
||||||
|
opens_arr = matched_rows['open'].astype(float)
|
||||||
|
closes_arr = matched_rows['close'].astype(float)
|
||||||
|
|
||||||
|
profiler('extracted field arrays')
|
||||||
|
|
||||||
|
# build dict from plain arrays (much faster)
|
||||||
|
time_to_row: dict[float, dict] = {
|
||||||
|
float(ts): {
|
||||||
|
'index': idx,
|
||||||
|
'open': opn,
|
||||||
|
'close': cls,
|
||||||
|
}
|
||||||
|
for (
|
||||||
|
ts,
|
||||||
|
idx,
|
||||||
|
opn,
|
||||||
|
cls,
|
||||||
|
) in zip(
|
||||||
|
valid_timestamps,
|
||||||
|
indices_arr,
|
||||||
|
opens_arr,
|
||||||
|
closes_arr,
|
||||||
|
)
|
||||||
|
}
|
||||||
|
|
||||||
|
profiler('`time_to_row` creation complete!')
|
||||||
|
|
||||||
|
profiler(f'built timestamp lookup for {len(timestamps)} times')
|
||||||
|
|
||||||
|
# build gap_specs from rect+arrow specs
|
||||||
|
for i in range(n_gaps):
|
||||||
|
gap_spec: dict = {}
|
||||||
|
|
||||||
|
# get rect spec for this gap
|
||||||
|
if i < len(rect_specs):
|
||||||
|
rect_spec: dict = rect_specs[i].copy()
|
||||||
|
start_time = rect_spec.get('start_time')
|
||||||
|
end_time = rect_spec.get('end_time')
|
||||||
|
|
||||||
|
if (
|
||||||
|
start_time is not None
|
||||||
|
and end_time is not None
|
||||||
|
):
|
||||||
|
# lookup from pre-built mapping
|
||||||
|
start_row = time_to_row.get(start_time)
|
||||||
|
end_row = time_to_row.get(end_time)
|
||||||
|
|
||||||
|
if (
|
||||||
|
start_row is None
|
||||||
|
or end_row is None
|
||||||
|
):
|
||||||
|
log.warning(
|
||||||
|
f'Timestamp lookup failed for '
|
||||||
|
f'gap[{i}], skipping'
|
||||||
|
)
|
||||||
|
continue
|
||||||
|
|
||||||
|
start_idx = start_row['index']
|
||||||
|
end_idx = end_row['index']
|
||||||
|
start_close = start_row['close']
|
||||||
|
end_open = end_row['open']
|
||||||
|
|
||||||
|
from_idx: float = 0.16 - 0.06
|
||||||
|
gap_spec['start_pos'] = (
|
||||||
|
start_idx + 1 - from_idx,
|
||||||
|
start_close,
|
||||||
|
)
|
||||||
|
gap_spec['end_pos'] = (
|
||||||
|
end_idx + from_idx,
|
||||||
|
end_open,
|
||||||
|
)
|
||||||
|
gap_spec['start_time'] = start_time
|
||||||
|
gap_spec['end_time'] = end_time
|
||||||
|
gap_spec['color'] = rect_spec.get(
|
||||||
|
'color',
|
||||||
|
'dad_blue',
|
||||||
|
)
|
||||||
|
|
||||||
|
# get arrow spec for this gap
|
||||||
|
if i < len(arrow_specs):
|
||||||
|
arrow_spec: dict = arrow_specs[i].copy()
|
||||||
|
x: float = float(arrow_spec.get('x', 0))
|
||||||
|
y: float = float(arrow_spec.get('y', 0))
|
||||||
|
time_val: float|None = arrow_spec.get('time')
|
||||||
|
|
||||||
|
# timestamp-based index lookup (only for x, NOT y!)
|
||||||
|
# y is already set to the PREVIOUS bar's close
|
||||||
|
if time_val is not None:
|
||||||
|
arrow_row = time_to_row.get(time_val)
|
||||||
|
if arrow_row is not None:
|
||||||
|
x = arrow_row['index']
|
||||||
|
# NOTE: do NOT update y! it's the
|
||||||
|
# previous bar's close, not current
|
||||||
|
else:
|
||||||
|
log.warning(
|
||||||
|
f'Arrow timestamp {time_val} not '
|
||||||
|
f'found for gap[{i}], using x={x}'
|
||||||
|
)
|
||||||
|
|
||||||
|
gap_spec['arrow_x'] = x
|
||||||
|
gap_spec['arrow_y'] = y
|
||||||
|
gap_spec['time'] = time_val
|
||||||
|
gap_spec['pointing'] = arrow_spec.get(
|
||||||
|
'pointing',
|
||||||
|
'down',
|
||||||
|
)
|
||||||
|
gap_spec['alpha'] = arrow_spec.get('alpha', 169)
|
||||||
|
|
||||||
|
gap_specs.append(gap_spec)
|
||||||
|
|
||||||
|
profiler(f'built {len(gap_specs)} gap_specs')
|
||||||
|
|
||||||
|
# create single GapAnnotations item for all gaps
|
||||||
|
if gap_specs:
|
||||||
|
gaps_item = GapAnnotations(
|
||||||
|
gap_specs=gap_specs,
|
||||||
|
array=arr,
|
||||||
|
color=gap_specs[0].get('color', 'dad_blue'),
|
||||||
|
alpha=gap_specs[0].get('alpha', 169),
|
||||||
|
arrow_size=10.0,
|
||||||
|
fqme=fqme,
|
||||||
|
timeframe=timeframe,
|
||||||
|
)
|
||||||
|
chart.plotItem.addItem(gaps_item)
|
||||||
|
|
||||||
|
# register single item for repositioning
|
||||||
|
aid: int = id(gaps_item)
|
||||||
|
annots[aid] = gaps_item
|
||||||
|
aids_set.add(aid)
|
||||||
|
result['rects'].append(aid)
|
||||||
|
profiler(
|
||||||
|
f'created GapAnnotations item for {len(gap_specs)} '
|
||||||
|
f'gaps'
|
||||||
|
)
|
||||||
|
|
||||||
|
# A/B comparison: optionally create individual arrows
|
||||||
|
# alongside batch for visual comparison
|
||||||
|
if show_individual_arrows:
|
||||||
|
godw = chart.linked.godwidget
|
||||||
|
arrows: ArrowEditor = ArrowEditor(godw=godw)
|
||||||
|
for i, spec in enumerate(gap_specs):
|
||||||
|
if 'arrow_x' not in spec:
|
||||||
|
continue
|
||||||
|
|
||||||
|
aid_str: str = str(uuid4())
|
||||||
|
arrow: pg.ArrowItem = arrows.add(
|
||||||
|
plot=chart.plotItem,
|
||||||
|
uid=aid_str,
|
||||||
|
x=spec['arrow_x'],
|
||||||
|
y=spec['arrow_y'],
|
||||||
|
pointing=spec['pointing'],
|
||||||
|
color='bracket', # different color
|
||||||
|
alpha=spec.get('alpha', 169),
|
||||||
|
headLen=10.0,
|
||||||
|
headWidth=2.222,
|
||||||
|
pxMode=True,
|
||||||
|
)
|
||||||
|
arrow._abs_x = spec['arrow_x']
|
||||||
|
arrow._abs_y = spec['arrow_y']
|
||||||
|
|
||||||
|
annots[aid_str] = arrow
|
||||||
|
_editors[aid_str] = arrows
|
||||||
|
aids_set.add(aid_str)
|
||||||
|
result['arrows'].append(aid_str)
|
||||||
|
|
||||||
|
profiler(
|
||||||
|
f'created {len(gap_specs)} individual arrows '
|
||||||
|
f'for comparison'
|
||||||
|
)
|
||||||
|
|
||||||
|
# handle text items separately (less common, keep
|
||||||
|
# individual items)
|
||||||
|
n_texts: int = 0
|
||||||
|
for text_spec in text_specs:
|
||||||
|
kwargs: dict = text_spec.copy()
|
||||||
|
text: str = kwargs.pop('text')
|
||||||
|
x: float = float(kwargs.pop('x'))
|
||||||
|
y: float = float(kwargs.pop('y'))
|
||||||
|
time_val: float|None = kwargs.pop('time', None)
|
||||||
|
|
||||||
|
# timestamp-based index lookup
|
||||||
|
if time_val is not None:
|
||||||
|
matches = arr[arr['time'] == time_val]
|
||||||
|
if len(matches) > 0:
|
||||||
|
x = float(matches[0]['index'])
|
||||||
|
y = float(matches[0]['close'])
|
||||||
|
|
||||||
|
color = kwargs.pop('color', 'dad_blue')
|
||||||
|
anchor = kwargs.pop('anchor', (0, 1))
|
||||||
|
font_size = kwargs.pop('font_size', None)
|
||||||
|
|
||||||
|
text_item: pg.TextItem = pg.TextItem(
|
||||||
|
text,
|
||||||
|
color=hcolor(color),
|
||||||
|
anchor=anchor,
|
||||||
|
)
|
||||||
|
|
||||||
|
if font_size is None:
|
||||||
|
from ._style import get_fonts
|
||||||
|
font, font_small = get_fonts()
|
||||||
|
font_size = font_small.px_size - 1
|
||||||
|
|
||||||
|
qfont: QFont = text_item.textItem.font()
|
||||||
|
qfont.setPixelSize(font_size)
|
||||||
|
text_item.setFont(qfont)
|
||||||
|
|
||||||
|
text_item.setPos(float(x), float(y))
|
||||||
|
chart.plotItem.addItem(text_item)
|
||||||
|
|
||||||
|
text_item._abs_x = float(x)
|
||||||
|
text_item._abs_y = float(y)
|
||||||
|
|
||||||
|
aid: str = str(uuid4())
|
||||||
|
annots[aid] = text_item
|
||||||
|
aids_set.add(aid)
|
||||||
|
result['texts'].append(aid)
|
||||||
|
n_texts += 1
|
||||||
|
|
||||||
|
profiler(
|
||||||
|
f'created text annotations: {n_texts} texts'
|
||||||
|
)
|
||||||
|
profiler.finish()
|
||||||
|
|
||||||
|
await annot_req_stream.send(result)
|
||||||
|
|
||||||
case {
|
case {
|
||||||
'cmd': 'remove',
|
'cmd': 'remove',
|
||||||
'aid': int(aid)|str(aid),
|
'aid': int(aid)|str(aid),
|
||||||
|
|
@ -471,10 +828,26 @@ async def serve_rc_annots(
|
||||||
# XXX: reposition all annotations to ensure they
|
# XXX: reposition all annotations to ensure they
|
||||||
# stay aligned with viz data after reset (eg during
|
# stay aligned with viz data after reset (eg during
|
||||||
# backfill when abs-index range changes)
|
# backfill when abs-index range changes)
|
||||||
|
chart: ChartPlotWidget = {
|
||||||
|
60: ds.hist_chart,
|
||||||
|
1: ds.chart,
|
||||||
|
}[timeframe]
|
||||||
|
viz: Viz = chart.get_viz(fqme)
|
||||||
|
arr = viz.shm.array
|
||||||
|
|
||||||
n_repositioned: int = 0
|
n_repositioned: int = 0
|
||||||
for aid, annot in annots.items():
|
for aid, annot in annots.items():
|
||||||
|
# GapAnnotations batch items have .reposition()
|
||||||
|
if hasattr(annot, 'reposition'):
|
||||||
|
annot.reposition(
|
||||||
|
array=arr,
|
||||||
|
fqme=fqme,
|
||||||
|
timeframe=timeframe,
|
||||||
|
)
|
||||||
|
n_repositioned += 1
|
||||||
|
|
||||||
# arrows and text items use abs x,y coords
|
# arrows and text items use abs x,y coords
|
||||||
if (
|
elif (
|
||||||
hasattr(annot, '_abs_x')
|
hasattr(annot, '_abs_x')
|
||||||
and
|
and
|
||||||
hasattr(annot, '_abs_y')
|
hasattr(annot, '_abs_y')
|
||||||
|
|
@ -539,12 +912,21 @@ async def remote_annotate(
|
||||||
finally:
|
finally:
|
||||||
# ensure all annots for this connection are deleted
|
# ensure all annots for this connection are deleted
|
||||||
# on any final teardown
|
# on any final teardown
|
||||||
|
profiler = Profiler(
|
||||||
|
msg=f'Annotation teardown for ctx {ctx.cid}',
|
||||||
|
disabled=False,
|
||||||
|
ms_threshold=0.0,
|
||||||
|
)
|
||||||
(_ctx, aids) = _ctxs[ctx.cid]
|
(_ctx, aids) = _ctxs[ctx.cid]
|
||||||
assert _ctx is ctx
|
assert _ctx is ctx
|
||||||
|
profiler(f'got {len(aids)} aids to remove')
|
||||||
|
|
||||||
for aid in aids:
|
for aid in aids:
|
||||||
annot: QGraphicsItem = _annots[aid]
|
annot: QGraphicsItem = _annots[aid]
|
||||||
assert rm_annot(annot)
|
assert rm_annot(annot)
|
||||||
|
|
||||||
|
profiler(f'removed all {len(aids)} annotations')
|
||||||
|
|
||||||
|
|
||||||
class AnnotCtl(Struct):
|
class AnnotCtl(Struct):
|
||||||
'''
|
'''
|
||||||
|
|
@ -746,6 +1128,64 @@ class AnnotCtl(Struct):
|
||||||
)
|
)
|
||||||
return aid
|
return aid
|
||||||
|
|
||||||
|
async def add_batch(
|
||||||
|
self,
|
||||||
|
fqme: str,
|
||||||
|
timeframe: float,
|
||||||
|
rects: list[dict]|None = None,
|
||||||
|
arrows: list[dict]|None = None,
|
||||||
|
texts: list[dict]|None = None,
|
||||||
|
show_individual_arrows: bool = False,
|
||||||
|
|
||||||
|
from_acm: bool = False,
|
||||||
|
|
||||||
|
) -> dict[str, list[int]]:
|
||||||
|
'''
|
||||||
|
Batch submit multiple annotations in single IPC msg for
|
||||||
|
much faster remote annotation vs. per-annot round-trips.
|
||||||
|
|
||||||
|
Returns dict of annotation IDs:
|
||||||
|
{
|
||||||
|
'rects': [aid1, aid2, ...],
|
||||||
|
'arrows': [aid3, aid4, ...],
|
||||||
|
'texts': [aid5, aid6, ...],
|
||||||
|
}
|
||||||
|
|
||||||
|
'''
|
||||||
|
ipc: MsgStream = self._get_ipc(fqme)
|
||||||
|
with trio.fail_after(10):
|
||||||
|
await ipc.send({
|
||||||
|
'fqme': fqme,
|
||||||
|
'cmd': 'batch',
|
||||||
|
'timeframe': timeframe,
|
||||||
|
'rects': rects or [],
|
||||||
|
'arrows': arrows or [],
|
||||||
|
'texts': texts or [],
|
||||||
|
'show_individual_arrows': show_individual_arrows,
|
||||||
|
})
|
||||||
|
result: dict = await ipc.receive()
|
||||||
|
match result:
|
||||||
|
case {'error': str(msg)}:
|
||||||
|
log.error(msg)
|
||||||
|
return {
|
||||||
|
'rects': [],
|
||||||
|
'arrows': [],
|
||||||
|
'texts': [],
|
||||||
|
}
|
||||||
|
|
||||||
|
# register all AIDs with their IPC streams
|
||||||
|
for aid_list in result.values():
|
||||||
|
for aid in aid_list:
|
||||||
|
self._ipcs[aid] = ipc
|
||||||
|
if not from_acm:
|
||||||
|
self._annot_stack.push_async_callback(
|
||||||
|
partial(
|
||||||
|
self.remove,
|
||||||
|
aid,
|
||||||
|
)
|
||||||
|
)
|
||||||
|
return result
|
||||||
|
|
||||||
async def add_text(
|
async def add_text(
|
||||||
self,
|
self,
|
||||||
fqme: str,
|
fqme: str,
|
||||||
|
|
@ -881,3 +1321,14 @@ async def open_annot_ctl(
|
||||||
_annot_stack=annots_stack,
|
_annot_stack=annots_stack,
|
||||||
)
|
)
|
||||||
yield client
|
yield client
|
||||||
|
|
||||||
|
# client exited, measure teardown time
|
||||||
|
teardown_profiler = Profiler(
|
||||||
|
msg='Client AnnotCtl teardown',
|
||||||
|
disabled=False,
|
||||||
|
ms_threshold=0.0,
|
||||||
|
)
|
||||||
|
teardown_profiler('exiting annots_stack')
|
||||||
|
|
||||||
|
teardown_profiler('annots_stack exited')
|
||||||
|
teardown_profiler('exiting gather_contexts')
|
||||||
|
|
|
||||||
Loading…
Reference in New Issue