1335 lines
46 KiB
Python
1335 lines
46 KiB
Python
# piker: trading gear for hackers
|
|
# Copyright (C) Tyler Goodlet (in stewardship for pikers)
|
|
|
|
# This program is free software: you can redistribute it and/or modify
|
|
# it under the terms of the GNU Affero General Public License as published by
|
|
# the Free Software Foundation, either version 3 of the License, or
|
|
# (at your option) any later version.
|
|
|
|
# This program is distributed in the hope that it will be useful,
|
|
# but WITHOUT ANY WARRANTY; without even the implied warranty of
|
|
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
|
|
# GNU Affero General Public License for more details.
|
|
|
|
# You should have received a copy of the GNU Affero General Public License
|
|
# along with this program. If not, see <https://www.gnu.org/licenses/>.
|
|
|
|
'''
|
|
Remote control tasks for sending annotations (and maybe more cmds) to
|
|
a chart from some other actor.
|
|
|
|
'''
|
|
from __future__ import annotations
|
|
from contextlib import (
|
|
asynccontextmanager as acm,
|
|
contextmanager as cm,
|
|
AsyncExitStack,
|
|
)
|
|
from functools import partial
|
|
from pprint import pformat
|
|
from typing import (
|
|
AsyncContextManager,
|
|
Literal,
|
|
)
|
|
from uuid import uuid4
|
|
|
|
import pyqtgraph as pg
|
|
import tractor
|
|
import trio
|
|
from tractor import trionics
|
|
from tractor import (
|
|
Portal,
|
|
Context,
|
|
MsgStream,
|
|
)
|
|
|
|
from piker.log import get_logger
|
|
from piker.types import Struct
|
|
from piker.service import find_service
|
|
from piker.brokers import SymbolNotFound
|
|
from piker.toolz import Profiler
|
|
from piker.ui.qt import (
|
|
QGraphicsItem,
|
|
)
|
|
from PyQt6.QtGui import QFont
|
|
from ._display import DisplayState
|
|
from ._interaction import ChartView
|
|
from ._editors import (
|
|
SelectRect,
|
|
ArrowEditor,
|
|
)
|
|
from ._chart import ChartPlotWidget
|
|
from ._dataviz import Viz
|
|
from ._style import hcolor
|
|
|
|
log = get_logger(__name__)
|
|
|
|
# NOTE: this is UPDATED by the `._display.graphics_update_loop()`
|
|
# once all chart widgets / Viz per flume have been initialized
|
|
# allowing for remote annotation (control) of any chart-actor's mkt
|
|
# feed by fqme lookup Bo
|
|
_dss: dict[str, DisplayState] = {}
|
|
|
|
# stash each and every client connection so that they can all
|
|
# be cancelled on shutdown/error.
|
|
# TODO: make `tractor.Context` hashable via is `.cid: str`?
|
|
# _ctxs: set[Context] = set()
|
|
# TODO: use type statements from 3.12+
|
|
IpcCtxTable = dict[
|
|
str, # each `Context.cid`
|
|
tuple[
|
|
Context, # handle for ctx-cancellation
|
|
set[int] # set of annotation (instance) ids
|
|
]
|
|
]
|
|
|
|
_ctxs: IpcCtxTable = {}
|
|
|
|
# XXX: global map of all uniquely created annotation-graphics so
|
|
# that they can be mutated (eventually) by a client.
|
|
# NOTE: this map is only populated on the `chart` actor side (aka
|
|
# the "annotations server" which actually renders to a Qt canvas).
|
|
# type AnnotsTable = dict[int, QGraphicsItem]
|
|
AnnotsTable = dict[int, QGraphicsItem]
|
|
EditorsTable = dict[int, ArrowEditor]
|
|
|
|
_annots: AnnotsTable = {}
|
|
_editors: EditorsTable = {}
|
|
|
|
def rm_annot(
|
|
annot: ArrowEditor|SelectRect|pg.TextItem
|
|
) -> bool:
|
|
global _editors
|
|
from piker.ui._annotate import GapAnnotations
|
|
|
|
match annot:
|
|
case pg.ArrowItem():
|
|
editor = _editors[annot._uid]
|
|
editor.remove(annot)
|
|
# ^TODO? only remove each arrow or all?
|
|
# if editor._arrows:
|
|
# editor.remove_all()
|
|
# else:
|
|
# log.warning(
|
|
# f'Annot already removed!\n'
|
|
# f'{annot!r}\n'
|
|
# )
|
|
return True
|
|
|
|
case SelectRect():
|
|
annot.delete()
|
|
return True
|
|
|
|
case pg.TextItem():
|
|
scene = annot.scene()
|
|
if scene:
|
|
scene.removeItem(annot)
|
|
return True
|
|
|
|
case GapAnnotations():
|
|
scene = annot.scene()
|
|
if scene:
|
|
scene.removeItem(annot)
|
|
return True
|
|
|
|
return False
|
|
|
|
|
|
@cm
|
|
def no_qt_updates(*items):
|
|
'''
|
|
Disable Qt widget/item updates during context to batch
|
|
render operations and only trigger single repaint on exit.
|
|
|
|
Accepts both QWidgets and QGraphicsItems.
|
|
|
|
'''
|
|
for item in items:
|
|
if hasattr(item, 'setUpdatesEnabled'):
|
|
item.setUpdatesEnabled(False)
|
|
try:
|
|
yield
|
|
finally:
|
|
for item in items:
|
|
if hasattr(item, 'setUpdatesEnabled'):
|
|
item.setUpdatesEnabled(True)
|
|
|
|
|
|
async def serve_rc_annots(
|
|
ipc_key: str,
|
|
annot_req_stream: MsgStream,
|
|
dss: dict[str, DisplayState],
|
|
ctxs: IpcCtxTable,
|
|
annots: AnnotsTable,
|
|
|
|
) -> None:
|
|
'''
|
|
A small viz(ualization) server for remote ctl of chart
|
|
annotations.
|
|
|
|
'''
|
|
global _editors
|
|
async for msg in annot_req_stream:
|
|
match msg:
|
|
case {
|
|
'cmd': 'SelectRect',
|
|
'fqme': fqme,
|
|
'timeframe': timeframe,
|
|
'meth': str(meth),
|
|
'kwargs': dict(kwargs),
|
|
}:
|
|
ds: DisplayState = _dss[fqme]
|
|
try:
|
|
chart: ChartPlotWidget = {
|
|
60: ds.hist_chart,
|
|
1: ds.chart,
|
|
}[timeframe]
|
|
except KeyError:
|
|
msg: str = (
|
|
f'No chart for timeframe={timeframe}s, '
|
|
f'skipping rect annotation'
|
|
)
|
|
log.exeception(msg)
|
|
await annot_req_stream.send({'error': msg})
|
|
continue
|
|
|
|
cv: ChartView = chart.cv
|
|
|
|
# NEW: if timestamps provided, lookup current indices
|
|
# from shm to ensure alignment with current buffer
|
|
# state
|
|
start_time = kwargs.pop('start_time', None)
|
|
end_time = kwargs.pop('end_time', None)
|
|
if (
|
|
start_time is not None
|
|
and end_time is not None
|
|
):
|
|
viz: Viz = chart.get_viz(fqme)
|
|
shm = viz.shm
|
|
arr = shm.array
|
|
|
|
# lookup start index
|
|
start_matches = arr[arr['time'] == start_time]
|
|
if len(start_matches) == 0:
|
|
msg: str = (
|
|
f'No shm entry for start_time={start_time}, '
|
|
f'skipping rect'
|
|
)
|
|
log.error(msg)
|
|
await annot_req_stream.send({'error': msg})
|
|
continue
|
|
|
|
# lookup end index
|
|
end_matches = arr[arr['time'] == end_time]
|
|
if len(end_matches) == 0:
|
|
msg: str = (
|
|
f'No shm entry for end_time={end_time}, '
|
|
f'skipping rect'
|
|
)
|
|
log.error(msg)
|
|
await annot_req_stream.send({'error': msg})
|
|
continue
|
|
|
|
# get close price from start bar, open from end
|
|
# bar
|
|
start_idx = float(start_matches[0]['index'])
|
|
end_idx = float(end_matches[0]['index'])
|
|
start_close = float(start_matches[0]['close'])
|
|
end_open = float(end_matches[0]['open'])
|
|
|
|
# reconstruct start_pos and end_pos with
|
|
# looked-up indices
|
|
from_idx: float = 0.16 - 0.06 # BGM offset
|
|
kwargs['start_pos'] = (
|
|
start_idx + 1 - from_idx,
|
|
start_close,
|
|
)
|
|
kwargs['end_pos'] = (
|
|
end_idx + from_idx,
|
|
end_open,
|
|
)
|
|
|
|
# annot type lookup from cmd
|
|
rect = SelectRect(
|
|
viewbox=cv,
|
|
|
|
# TODO: make this more dynamic?
|
|
# -[ ] pull from conf.toml?
|
|
# -[ ] add `.set_color()` method to type?
|
|
# -[ ] make a green/red based on direction
|
|
# instead of default static color?
|
|
color=kwargs.pop('color', None),
|
|
)
|
|
# XXX NOTE: this is REQUIRED to set the rect
|
|
# resize callback!
|
|
rect.chart: ChartPlotWidget = chart
|
|
|
|
# delegate generically to the requested method
|
|
getattr(rect, meth)(**kwargs)
|
|
rect.show()
|
|
|
|
# XXX: store absolute coords for repositioning
|
|
# during viz redraws (eg backfill updates)
|
|
rect._meth = meth
|
|
rect._kwargs = kwargs
|
|
|
|
aid: int = id(rect)
|
|
annots[aid] = rect
|
|
aids: set[int] = ctxs[ipc_key][1]
|
|
aids.add(aid)
|
|
await annot_req_stream.send(aid)
|
|
|
|
case {
|
|
'cmd': 'ArrowEditor',
|
|
'fqme': fqme,
|
|
'timeframe': timeframe,
|
|
'meth': 'add'|'remove' as meth,
|
|
'kwargs': {
|
|
'x': float(x),
|
|
'y': float(y),
|
|
'pointing': pointing,
|
|
'color': color,
|
|
'aid': str()|None as aid,
|
|
'alpha': int(alpha),
|
|
'headLen': int()|float()|None as headLen,
|
|
'headWidth': int()|float()|None as headWidth,
|
|
'tailLen': int()|float()|None as tailLen,
|
|
'tailWidth': int()|float()|None as tailWidth,
|
|
'pxMode': bool(pxMode),
|
|
'time': int()|float()|None as timestamp,
|
|
},
|
|
# ?TODO? split based on method fn-sigs?
|
|
# 'pointing',
|
|
}:
|
|
ds: DisplayState = _dss[fqme]
|
|
try:
|
|
chart: ChartPlotWidget = {
|
|
60: ds.hist_chart,
|
|
1: ds.chart,
|
|
}[timeframe]
|
|
except KeyError:
|
|
log.warning(
|
|
f'No chart for timeframe={timeframe}s, '
|
|
f'skipping arrow annotation'
|
|
)
|
|
# return -1 to indicate failure
|
|
await annot_req_stream.send(-1)
|
|
continue
|
|
cv: ChartView = chart.cv
|
|
godw = chart.linked.godwidget
|
|
|
|
# NEW: if timestamp provided, lookup current index
|
|
# from shm to ensure alignment with current buffer
|
|
# state
|
|
if timestamp is not None:
|
|
viz: Viz = chart.get_viz(fqme)
|
|
shm = viz.shm
|
|
arr = shm.array
|
|
# find index where time matches timestamp
|
|
matches = arr[arr['time'] == timestamp]
|
|
if len(matches) == 0:
|
|
log.error(
|
|
f'No shm entry for timestamp={timestamp}, '
|
|
f'skipping arrow annotation'
|
|
)
|
|
await annot_req_stream.send(-1)
|
|
continue
|
|
# use the matched row's index as x
|
|
x = float(matches[0]['index'])
|
|
|
|
arrows = ArrowEditor(godw=godw)
|
|
# `.add/.remove()` API
|
|
if meth != 'add':
|
|
# await tractor.pause()
|
|
raise ValueError(
|
|
f'Invalid arrow-edit request ?\n'
|
|
f'{msg!r}\n'
|
|
)
|
|
|
|
aid: str = str(uuid4())
|
|
arrow: pg.ArrowItem = arrows.add(
|
|
plot=chart.plotItem,
|
|
uid=aid,
|
|
x=x,
|
|
y=y,
|
|
pointing=pointing,
|
|
color=color,
|
|
alpha=alpha,
|
|
headLen=headLen,
|
|
headWidth=headWidth,
|
|
tailLen=tailLen,
|
|
tailWidth=tailWidth,
|
|
pxMode=pxMode,
|
|
)
|
|
# XXX: store absolute coords for repositioning
|
|
# during viz redraws (eg backfill updates)
|
|
arrow._abs_x = x
|
|
arrow._abs_y = y
|
|
|
|
annots[aid] = arrow
|
|
_editors[aid] = arrows
|
|
aids: set[int] = ctxs[ipc_key][1]
|
|
aids.add(aid)
|
|
await annot_req_stream.send(aid)
|
|
|
|
case {
|
|
'cmd': 'TextItem',
|
|
'fqme': fqme,
|
|
'timeframe': timeframe,
|
|
'kwargs': {
|
|
'text': str(text),
|
|
'x': int()|float() as x,
|
|
'y': int()|float() as y,
|
|
'color': color,
|
|
'anchor': list(anchor),
|
|
'font_size': int()|None as font_size,
|
|
'time': int()|float()|None as timestamp,
|
|
},
|
|
}:
|
|
ds: DisplayState = _dss[fqme]
|
|
try:
|
|
chart: ChartPlotWidget = {
|
|
60: ds.hist_chart,
|
|
1: ds.chart,
|
|
}[timeframe]
|
|
except KeyError:
|
|
log.warning(
|
|
f'No chart for timeframe={timeframe}s, '
|
|
f'skipping text annotation'
|
|
)
|
|
await annot_req_stream.send(-1)
|
|
continue
|
|
|
|
# NEW: if timestamp provided, lookup current index
|
|
# from shm to ensure alignment with current buffer
|
|
# state
|
|
if timestamp is not None:
|
|
viz: Viz = chart.get_viz(fqme)
|
|
shm = viz.shm
|
|
arr = shm.array
|
|
# find index where time matches timestamp
|
|
matches = arr[arr['time'] == timestamp]
|
|
if len(matches) == 0:
|
|
log.error(
|
|
f'No shm entry for timestamp={timestamp}, '
|
|
f'skipping text annotation'
|
|
)
|
|
await annot_req_stream.send(-1)
|
|
continue
|
|
# use the matched row's index as x, +1 for text
|
|
# offset
|
|
x = float(matches[0]['index']) + 1
|
|
|
|
# convert named color to hex
|
|
color_hex: str = hcolor(color)
|
|
|
|
# create text item
|
|
text_item: pg.TextItem = pg.TextItem(
|
|
text=text,
|
|
color=color_hex,
|
|
anchor=anchor,
|
|
|
|
# ?TODO, pin to github:main for this?
|
|
# legacy, can have scaling ish?
|
|
# ensureInBounds=True,
|
|
)
|
|
|
|
# apply font size (default to DpiAwareFont if not
|
|
# provided)
|
|
if font_size is None:
|
|
from ._style import get_fonts
|
|
font, font_small = get_fonts()
|
|
font_size = font_small.px_size - 1
|
|
|
|
qfont: QFont = text_item.textItem.font()
|
|
qfont.setPixelSize(font_size)
|
|
text_item.setFont(qfont)
|
|
|
|
text_item.setPos(x, y)
|
|
chart.plotItem.addItem(text_item)
|
|
|
|
# XXX: store absolute coords for repositioning
|
|
# during viz redraws (eg backfill updates)
|
|
text_item._abs_x = x
|
|
text_item._abs_y = y
|
|
|
|
aid: str = str(uuid4())
|
|
annots[aid] = text_item
|
|
aids: set[int] = ctxs[ipc_key][1]
|
|
aids.add(aid)
|
|
await annot_req_stream.send(aid)
|
|
|
|
case {
|
|
'cmd': 'batch',
|
|
'fqme': fqme,
|
|
'timeframe': timeframe,
|
|
'rects': list(rect_specs),
|
|
'arrows': list(arrow_specs),
|
|
'texts': list(text_specs),
|
|
'show_individual_arrows': bool(show_individual_arrows),
|
|
}:
|
|
# batch submission handler - process multiple
|
|
# annotations in single IPC round-trip
|
|
ds: DisplayState = _dss[fqme]
|
|
try:
|
|
chart: ChartPlotWidget = {
|
|
60: ds.hist_chart,
|
|
1: ds.chart,
|
|
}[timeframe]
|
|
except KeyError:
|
|
msg: str = (
|
|
f'No chart for timeframe={timeframe}s, '
|
|
f'skipping batch annotation'
|
|
)
|
|
log.error(msg)
|
|
await annot_req_stream.send({'error': msg})
|
|
continue
|
|
|
|
cv: ChartView = chart.cv
|
|
viz: Viz = chart.get_viz(fqme)
|
|
shm = viz.shm
|
|
arr = shm.array
|
|
|
|
result: dict[str, list[int]] = {
|
|
'rects': [],
|
|
'arrows': [],
|
|
'texts': [],
|
|
}
|
|
|
|
profiler = Profiler(
|
|
msg=(
|
|
f'Batch annotate {len(rect_specs)} gaps '
|
|
f'on {fqme}@{timeframe}s'
|
|
),
|
|
disabled=False,
|
|
delayed=False,
|
|
)
|
|
|
|
aids_set: set[int] = ctxs[ipc_key][1]
|
|
|
|
# build unified gap_specs for GapAnnotations class
|
|
from piker.ui._annotate import GapAnnotations
|
|
|
|
gap_specs: list[dict] = []
|
|
n_gaps: int = max(
|
|
len(rect_specs),
|
|
len(arrow_specs),
|
|
)
|
|
profiler('setup batch annot creation')
|
|
|
|
# collect all unique timestamps for vectorized lookup
|
|
timestamps: list[float] = []
|
|
for rect_spec in rect_specs:
|
|
if start_time := rect_spec.get('start_time'):
|
|
timestamps.append(start_time)
|
|
if end_time := rect_spec.get('end_time'):
|
|
timestamps.append(end_time)
|
|
for arrow_spec in arrow_specs:
|
|
if time_val := arrow_spec.get('time'):
|
|
timestamps.append(time_val)
|
|
|
|
profiler('collect `timestamps: list` complet!')
|
|
|
|
# build timestamp -> row mapping using binary search
|
|
# O(m log n) instead of O(n*m) with np.isin
|
|
time_to_row: dict[float, dict] = {}
|
|
if timestamps:
|
|
import numpy as np
|
|
time_arr = arr['time']
|
|
ts_array = np.array(timestamps)
|
|
|
|
# binary search for each timestamp in sorted time array
|
|
search_indices = np.searchsorted(
|
|
time_arr,
|
|
ts_array,
|
|
)
|
|
|
|
profiler('`np.searchsorted()` complete!')
|
|
|
|
# vectorized bounds check and exact match verification
|
|
valid_mask = (
|
|
(search_indices < len(arr))
|
|
& (time_arr[search_indices] == ts_array)
|
|
)
|
|
|
|
# get all valid indices and timestamps
|
|
valid_indices = search_indices[valid_mask]
|
|
valid_timestamps = ts_array[valid_mask]
|
|
|
|
# use fancy indexing to get all rows at once
|
|
matched_rows = arr[valid_indices]
|
|
|
|
# extract fields to plain arrays BEFORE dict building
|
|
indices_arr = matched_rows['index'].astype(float)
|
|
opens_arr = matched_rows['open'].astype(float)
|
|
closes_arr = matched_rows['close'].astype(float)
|
|
|
|
profiler('extracted field arrays')
|
|
|
|
# build dict from plain arrays (much faster)
|
|
time_to_row: dict[float, dict] = {
|
|
float(ts): {
|
|
'index': idx,
|
|
'open': opn,
|
|
'close': cls,
|
|
}
|
|
for (
|
|
ts,
|
|
idx,
|
|
opn,
|
|
cls,
|
|
) in zip(
|
|
valid_timestamps,
|
|
indices_arr,
|
|
opens_arr,
|
|
closes_arr,
|
|
)
|
|
}
|
|
|
|
profiler('`time_to_row` creation complete!')
|
|
|
|
profiler(f'built timestamp lookup for {len(timestamps)} times')
|
|
|
|
# build gap_specs from rect+arrow specs
|
|
for i in range(n_gaps):
|
|
gap_spec: dict = {}
|
|
|
|
# get rect spec for this gap
|
|
if i < len(rect_specs):
|
|
rect_spec: dict = rect_specs[i].copy()
|
|
start_time = rect_spec.get('start_time')
|
|
end_time = rect_spec.get('end_time')
|
|
|
|
if (
|
|
start_time is not None
|
|
and end_time is not None
|
|
):
|
|
# lookup from pre-built mapping
|
|
start_row = time_to_row.get(start_time)
|
|
end_row = time_to_row.get(end_time)
|
|
|
|
if (
|
|
start_row is None
|
|
or end_row is None
|
|
):
|
|
log.warning(
|
|
f'Timestamp lookup failed for '
|
|
f'gap[{i}], skipping'
|
|
)
|
|
continue
|
|
|
|
start_idx = start_row['index']
|
|
end_idx = end_row['index']
|
|
start_close = start_row['close']
|
|
end_open = end_row['open']
|
|
|
|
from_idx: float = 0.16 - 0.06
|
|
gap_spec['start_pos'] = (
|
|
start_idx + 1 - from_idx,
|
|
start_close,
|
|
)
|
|
gap_spec['end_pos'] = (
|
|
end_idx + from_idx,
|
|
end_open,
|
|
)
|
|
gap_spec['start_time'] = start_time
|
|
gap_spec['end_time'] = end_time
|
|
gap_spec['color'] = rect_spec.get(
|
|
'color',
|
|
'dad_blue',
|
|
)
|
|
|
|
# get arrow spec for this gap
|
|
if i < len(arrow_specs):
|
|
arrow_spec: dict = arrow_specs[i].copy()
|
|
x: float = float(arrow_spec.get('x', 0))
|
|
y: float = float(arrow_spec.get('y', 0))
|
|
time_val: float|None = arrow_spec.get('time')
|
|
|
|
# timestamp-based index lookup (only for x, NOT y!)
|
|
# y is already set to the PREVIOUS bar's close
|
|
if time_val is not None:
|
|
arrow_row = time_to_row.get(time_val)
|
|
if arrow_row is not None:
|
|
x = arrow_row['index']
|
|
# NOTE: do NOT update y! it's the
|
|
# previous bar's close, not current
|
|
else:
|
|
log.warning(
|
|
f'Arrow timestamp {time_val} not '
|
|
f'found for gap[{i}], using x={x}'
|
|
)
|
|
|
|
gap_spec['arrow_x'] = x
|
|
gap_spec['arrow_y'] = y
|
|
gap_spec['time'] = time_val
|
|
gap_spec['pointing'] = arrow_spec.get(
|
|
'pointing',
|
|
'down',
|
|
)
|
|
gap_spec['alpha'] = arrow_spec.get('alpha', 169)
|
|
|
|
gap_specs.append(gap_spec)
|
|
|
|
profiler(f'built {len(gap_specs)} gap_specs')
|
|
|
|
# create single GapAnnotations item for all gaps
|
|
if gap_specs:
|
|
gaps_item = GapAnnotations(
|
|
gap_specs=gap_specs,
|
|
array=arr,
|
|
color=gap_specs[0].get('color', 'dad_blue'),
|
|
alpha=gap_specs[0].get('alpha', 169),
|
|
arrow_size=10.0,
|
|
fqme=fqme,
|
|
timeframe=timeframe,
|
|
)
|
|
chart.plotItem.addItem(gaps_item)
|
|
|
|
# register single item for repositioning
|
|
aid: int = id(gaps_item)
|
|
annots[aid] = gaps_item
|
|
aids_set.add(aid)
|
|
result['rects'].append(aid)
|
|
profiler(
|
|
f'created GapAnnotations item for {len(gap_specs)} '
|
|
f'gaps'
|
|
)
|
|
|
|
# A/B comparison: optionally create individual arrows
|
|
# alongside batch for visual comparison
|
|
if show_individual_arrows:
|
|
godw = chart.linked.godwidget
|
|
arrows: ArrowEditor = ArrowEditor(godw=godw)
|
|
for i, spec in enumerate(gap_specs):
|
|
if 'arrow_x' not in spec:
|
|
continue
|
|
|
|
aid_str: str = str(uuid4())
|
|
arrow: pg.ArrowItem = arrows.add(
|
|
plot=chart.plotItem,
|
|
uid=aid_str,
|
|
x=spec['arrow_x'],
|
|
y=spec['arrow_y'],
|
|
pointing=spec['pointing'],
|
|
color='bracket', # different color
|
|
alpha=spec.get('alpha', 169),
|
|
headLen=10.0,
|
|
headWidth=2.222,
|
|
pxMode=True,
|
|
)
|
|
arrow._abs_x = spec['arrow_x']
|
|
arrow._abs_y = spec['arrow_y']
|
|
|
|
annots[aid_str] = arrow
|
|
_editors[aid_str] = arrows
|
|
aids_set.add(aid_str)
|
|
result['arrows'].append(aid_str)
|
|
|
|
profiler(
|
|
f'created {len(gap_specs)} individual arrows '
|
|
f'for comparison'
|
|
)
|
|
|
|
# handle text items separately (less common, keep
|
|
# individual items)
|
|
n_texts: int = 0
|
|
for text_spec in text_specs:
|
|
kwargs: dict = text_spec.copy()
|
|
text: str = kwargs.pop('text')
|
|
x: float = float(kwargs.pop('x'))
|
|
y: float = float(kwargs.pop('y'))
|
|
time_val: float|None = kwargs.pop('time', None)
|
|
|
|
# timestamp-based index lookup
|
|
if time_val is not None:
|
|
matches = arr[arr['time'] == time_val]
|
|
if len(matches) > 0:
|
|
x = float(matches[0]['index'])
|
|
y = float(matches[0]['close'])
|
|
|
|
color = kwargs.pop('color', 'dad_blue')
|
|
anchor = kwargs.pop('anchor', (0, 1))
|
|
font_size = kwargs.pop('font_size', None)
|
|
|
|
text_item: pg.TextItem = pg.TextItem(
|
|
text,
|
|
color=hcolor(color),
|
|
anchor=anchor,
|
|
)
|
|
|
|
if font_size is None:
|
|
from ._style import get_fonts
|
|
font, font_small = get_fonts()
|
|
font_size = font_small.px_size - 1
|
|
|
|
qfont: QFont = text_item.textItem.font()
|
|
qfont.setPixelSize(font_size)
|
|
text_item.setFont(qfont)
|
|
|
|
text_item.setPos(float(x), float(y))
|
|
chart.plotItem.addItem(text_item)
|
|
|
|
text_item._abs_x = float(x)
|
|
text_item._abs_y = float(y)
|
|
|
|
aid: str = str(uuid4())
|
|
annots[aid] = text_item
|
|
aids_set.add(aid)
|
|
result['texts'].append(aid)
|
|
n_texts += 1
|
|
|
|
profiler(
|
|
f'created text annotations: {n_texts} texts'
|
|
)
|
|
profiler.finish()
|
|
|
|
await annot_req_stream.send(result)
|
|
|
|
case {
|
|
'cmd': 'remove',
|
|
'aid': int(aid)|str(aid),
|
|
}:
|
|
# NOTE: this is normally entered on
|
|
# a client's annotation de-alloc normally
|
|
# prior to detach or modify.
|
|
annot: QGraphicsItem = annots[aid]
|
|
assert rm_annot(annot)
|
|
|
|
# respond to client indicating annot
|
|
# was indeed deleted.
|
|
await annot_req_stream.send(aid)
|
|
|
|
case {
|
|
'cmd': 'redraw',
|
|
'fqme': fqme,
|
|
'timeframe': timeframe,
|
|
|
|
# TODO: maybe more fields?
|
|
# 'render': int(aid),
|
|
# 'viz_name': str(viz_name),
|
|
}:
|
|
# NOTE: old match from the 60s display loop task
|
|
# | {
|
|
# 'backfilling': (str(viz_name), timeframe),
|
|
# }:
|
|
ds: DisplayState = _dss[fqme]
|
|
viz: Viz = {
|
|
60: ds.hist_viz,
|
|
1: ds.viz,
|
|
}[timeframe]
|
|
log.warning(
|
|
f'Forcing VIZ REDRAW:\n'
|
|
f'fqme: {fqme}\n'
|
|
f'timeframe: {timeframe}\n'
|
|
)
|
|
viz.reset_graphics()
|
|
|
|
# XXX: reposition all annotations to ensure they
|
|
# stay aligned with viz data after reset (eg during
|
|
# backfill when abs-index range changes)
|
|
chart: ChartPlotWidget = {
|
|
60: ds.hist_chart,
|
|
1: ds.chart,
|
|
}[timeframe]
|
|
viz: Viz = chart.get_viz(fqme)
|
|
arr = viz.shm.array
|
|
|
|
n_repositioned: int = 0
|
|
for aid, annot in annots.items():
|
|
# GapAnnotations batch items have .reposition()
|
|
if hasattr(annot, 'reposition'):
|
|
annot.reposition(
|
|
array=arr,
|
|
fqme=fqme,
|
|
timeframe=timeframe,
|
|
)
|
|
n_repositioned += 1
|
|
|
|
# arrows and text items use abs x,y coords
|
|
elif (
|
|
hasattr(annot, '_abs_x')
|
|
and
|
|
hasattr(annot, '_abs_y')
|
|
):
|
|
annot.setPos(
|
|
annot._abs_x,
|
|
annot._abs_y,
|
|
)
|
|
n_repositioned += 1
|
|
|
|
# rects use method + kwargs
|
|
elif (
|
|
hasattr(annot, '_meth')
|
|
and
|
|
hasattr(annot, '_kwargs')
|
|
):
|
|
getattr(annot, annot._meth)(**annot._kwargs)
|
|
n_repositioned += 1
|
|
|
|
if n_repositioned:
|
|
log.info(
|
|
f'Repositioned {n_repositioned} annotation(s) '
|
|
f'after viz redraw'
|
|
)
|
|
|
|
case _:
|
|
log.error(
|
|
'Unknown remote annotation cmd:\n'
|
|
f'{pformat(msg)}'
|
|
)
|
|
|
|
|
|
@tractor.context
|
|
async def remote_annotate(
|
|
ctx: Context,
|
|
) -> None:
|
|
|
|
global _dss, _ctxs
|
|
if not _dss:
|
|
raise RuntimeError(
|
|
'Race condition on chart-init state ??\n'
|
|
'Anoter actor is trying to annoate this chart '
|
|
'before it has fully spawned.\n'
|
|
)
|
|
assert _dss
|
|
|
|
_ctxs[ctx.cid] = (ctx, set())
|
|
|
|
# send back full fqme symbology to caller
|
|
await ctx.started(list(_dss))
|
|
|
|
# open annot request handler stream
|
|
async with ctx.open_stream() as annot_req_stream:
|
|
try:
|
|
await serve_rc_annots(
|
|
ipc_key=ctx.cid,
|
|
annot_req_stream=annot_req_stream,
|
|
dss=_dss,
|
|
ctxs=_ctxs,
|
|
annots=_annots,
|
|
)
|
|
finally:
|
|
# ensure all annots for this connection are deleted
|
|
# on any final teardown
|
|
profiler = Profiler(
|
|
msg=f'Annotation teardown for ctx {ctx.cid}',
|
|
disabled=False,
|
|
ms_threshold=0.0,
|
|
)
|
|
(_ctx, aids) = _ctxs[ctx.cid]
|
|
assert _ctx is ctx
|
|
profiler(f'got {len(aids)} aids to remove')
|
|
|
|
for aid in aids:
|
|
annot: QGraphicsItem = _annots[aid]
|
|
assert rm_annot(annot)
|
|
|
|
profiler(f'removed all {len(aids)} annotations')
|
|
|
|
|
|
class AnnotCtl(Struct):
|
|
'''
|
|
A control for remote "data annotations".
|
|
|
|
You know those "squares they always show in machine vision
|
|
UIs.." this API allows you to remotely control stuff like that
|
|
in some other graphics actor.
|
|
|
|
'''
|
|
ctx2fqmes: dict[str, str]
|
|
fqme2ipc: dict[str, MsgStream]
|
|
_annot_stack: AsyncExitStack
|
|
|
|
# runtime-populated mapping of all annotation
|
|
# ids to their equivalent IPC msg-streams.
|
|
_ipcs: dict[int, MsgStream] = {}
|
|
|
|
def _get_ipc(
|
|
self,
|
|
fqme: str,
|
|
) -> MsgStream:
|
|
ipc: MsgStream = self.fqme2ipc.get(fqme)
|
|
if ipc is None:
|
|
raise SymbolNotFound(
|
|
'No chart (actor) seems to have mkt feed loaded?\n'
|
|
f'{fqme}'
|
|
)
|
|
return ipc
|
|
|
|
async def add_rect(
|
|
self,
|
|
fqme: str,
|
|
timeframe: float,
|
|
start_pos: tuple[float, float],
|
|
end_pos: tuple[float, float],
|
|
|
|
# TODO: a `Literal['view', 'scene']` for this?
|
|
domain: str = 'view', # or 'scene'
|
|
color: str = 'dad_blue',
|
|
|
|
from_acm: bool = False,
|
|
|
|
# NEW: optional timestamps for server-side index lookup
|
|
start_time: float|None = None,
|
|
end_time: float|None = None,
|
|
|
|
) -> int|None:
|
|
'''
|
|
Add a `SelectRect` annotation to the target view, return
|
|
the instances `id(obj)` from the remote UI actor.
|
|
|
|
'''
|
|
ipc: MsgStream = self._get_ipc(fqme)
|
|
with trio.fail_after(3):
|
|
await ipc.send({
|
|
'fqme': fqme,
|
|
'cmd': 'SelectRect',
|
|
'timeframe': timeframe,
|
|
# 'meth': str(meth),
|
|
'meth': 'set_view_pos' if domain == 'view' else 'set_scene_pos',
|
|
'kwargs': {
|
|
'start_pos': tuple(start_pos),
|
|
'end_pos': tuple(end_pos),
|
|
'color': color,
|
|
'update_label': False,
|
|
'start_time': start_time,
|
|
'end_time': end_time,
|
|
},
|
|
})
|
|
aid: int|dict = await ipc.receive()
|
|
match aid:
|
|
case {'error': str(msg)}:
|
|
log.error(msg)
|
|
return None
|
|
self._ipcs[aid] = ipc
|
|
if not from_acm:
|
|
self._annot_stack.push_async_callback(
|
|
partial(
|
|
self.remove,
|
|
aid,
|
|
)
|
|
)
|
|
return aid
|
|
|
|
async def remove(
|
|
self,
|
|
aid: int,
|
|
|
|
) -> bool:
|
|
'''
|
|
Remove an existing annotation by instance id.
|
|
|
|
'''
|
|
ipc: MsgStream = self._ipcs[aid]
|
|
await ipc.send({
|
|
'cmd': 'remove',
|
|
'aid': aid,
|
|
})
|
|
removed: bool = await ipc.receive()
|
|
return removed
|
|
|
|
@acm
|
|
async def open_rect(
|
|
self,
|
|
**kwargs,
|
|
) -> int:
|
|
try:
|
|
aid: int = await self.add_rect(
|
|
from_acm=True,
|
|
**kwargs,
|
|
)
|
|
yield aid
|
|
finally:
|
|
# async ipc send op
|
|
with trio.CancelScope(shield=True):
|
|
await self.remove(aid)
|
|
|
|
async def redraw(
|
|
self,
|
|
fqme: str,
|
|
timeframe: float,
|
|
) -> None:
|
|
await self._get_ipc(fqme).send({
|
|
'cmd': 'redraw',
|
|
'fqme': fqme,
|
|
# 'render': int(aid),
|
|
# 'viz_name': str(viz_name),
|
|
'timeframe': timeframe,
|
|
})
|
|
|
|
async def add_arrow(
|
|
self,
|
|
fqme: str,
|
|
timeframe: float,
|
|
x: float,
|
|
y: float,
|
|
pointing: Literal[
|
|
'up',
|
|
'down',
|
|
],
|
|
# TODO: a `Literal['view', 'scene']` for this?
|
|
# domain: str = 'view', # or 'scene'
|
|
color: str = 'dad_blue',
|
|
alpha: int = 116,
|
|
headLen: float|None = None,
|
|
headWidth: float|None = None,
|
|
tailLen: float|None = None,
|
|
tailWidth: float|None = None,
|
|
pxMode: bool = True,
|
|
|
|
from_acm: bool = False,
|
|
|
|
# NEW: optional timestamp for server-side index lookup
|
|
time: float|None = None,
|
|
|
|
) -> int|None:
|
|
'''
|
|
Add a `SelectRect` annotation to the target view, return
|
|
the instances `id(obj)` from the remote UI actor.
|
|
|
|
'''
|
|
ipc: MsgStream = self._get_ipc(fqme)
|
|
with trio.fail_after(3):
|
|
await ipc.send({
|
|
'fqme': fqme,
|
|
'cmd': 'ArrowEditor',
|
|
'timeframe': timeframe,
|
|
# 'meth': str(meth),
|
|
'meth': 'add',
|
|
'kwargs': {
|
|
'x': float(x),
|
|
'y': float(y),
|
|
'color': color,
|
|
'pointing': pointing, # up|down
|
|
'alpha': alpha,
|
|
'aid': None,
|
|
'headLen': headLen,
|
|
'headWidth': headWidth,
|
|
'tailLen': tailLen,
|
|
'tailWidth': tailWidth,
|
|
'pxMode': pxMode,
|
|
'time': time, # for server-side index lookup
|
|
},
|
|
})
|
|
aid: int|dict = await ipc.receive()
|
|
match aid:
|
|
case {'error': str(msg)}:
|
|
log.error(msg)
|
|
return None
|
|
|
|
self._ipcs[aid] = ipc
|
|
if not from_acm:
|
|
self._annot_stack.push_async_callback(
|
|
partial(
|
|
self.remove,
|
|
aid,
|
|
)
|
|
)
|
|
return aid
|
|
|
|
async def add_batch(
|
|
self,
|
|
fqme: str,
|
|
timeframe: float,
|
|
rects: list[dict]|None = None,
|
|
arrows: list[dict]|None = None,
|
|
texts: list[dict]|None = None,
|
|
show_individual_arrows: bool = False,
|
|
|
|
from_acm: bool = False,
|
|
|
|
) -> dict[str, list[int]]:
|
|
'''
|
|
Batch submit multiple annotations in single IPC msg for
|
|
much faster remote annotation vs. per-annot round-trips.
|
|
|
|
Returns dict of annotation IDs:
|
|
{
|
|
'rects': [aid1, aid2, ...],
|
|
'arrows': [aid3, aid4, ...],
|
|
'texts': [aid5, aid6, ...],
|
|
}
|
|
|
|
'''
|
|
ipc: MsgStream = self._get_ipc(fqme)
|
|
with trio.fail_after(10):
|
|
await ipc.send({
|
|
'fqme': fqme,
|
|
'cmd': 'batch',
|
|
'timeframe': timeframe,
|
|
'rects': rects or [],
|
|
'arrows': arrows or [],
|
|
'texts': texts or [],
|
|
'show_individual_arrows': show_individual_arrows,
|
|
})
|
|
result: dict = await ipc.receive()
|
|
match result:
|
|
case {'error': str(msg)}:
|
|
log.error(msg)
|
|
return {
|
|
'rects': [],
|
|
'arrows': [],
|
|
'texts': [],
|
|
}
|
|
|
|
# register all AIDs with their IPC streams
|
|
for aid_list in result.values():
|
|
for aid in aid_list:
|
|
self._ipcs[aid] = ipc
|
|
if not from_acm:
|
|
self._annot_stack.push_async_callback(
|
|
partial(
|
|
self.remove,
|
|
aid,
|
|
)
|
|
)
|
|
return result
|
|
|
|
async def add_text(
|
|
self,
|
|
fqme: str,
|
|
timeframe: float,
|
|
text: str,
|
|
x: float,
|
|
y: float,
|
|
color: str|tuple = 'dad_blue',
|
|
anchor: tuple[float, float] = (0, 1),
|
|
font_size: int|None = None,
|
|
|
|
from_acm: bool = False,
|
|
|
|
# NEW: optional timestamp for server-side index lookup
|
|
time: float|None = None,
|
|
|
|
) -> int|None:
|
|
'''
|
|
Add a `pg.TextItem` annotation to the target view.
|
|
|
|
anchor: (x, y) where (0,0) is upper-left, (1,1) is lower-right
|
|
font_size: pixel size for font, defaults to `_font.font.pixelSize()`
|
|
|
|
'''
|
|
ipc: MsgStream = self._get_ipc(fqme)
|
|
with trio.fail_after(3):
|
|
await ipc.send({
|
|
'fqme': fqme,
|
|
'cmd': 'TextItem',
|
|
'timeframe': timeframe,
|
|
'kwargs': {
|
|
'text': text,
|
|
'x': float(x),
|
|
'y': float(y),
|
|
'color': color,
|
|
'anchor': tuple(anchor),
|
|
'font_size': font_size,
|
|
'time': time, # for server-side index lookup
|
|
},
|
|
})
|
|
aid: int|dict = await ipc.receive()
|
|
match aid:
|
|
case {'error': str(msg)}:
|
|
log.error(msg)
|
|
return None
|
|
self._ipcs[aid] = ipc
|
|
if not from_acm:
|
|
self._annot_stack.push_async_callback(
|
|
partial(
|
|
self.remove,
|
|
aid,
|
|
)
|
|
)
|
|
return aid
|
|
|
|
|
|
@acm
|
|
async def open_annot_ctl(
|
|
uid: tuple[str, str] | None = None,
|
|
|
|
) -> AnnotCtl:
|
|
# TODO: load connetion to a specific chart actor
|
|
# -[ ] pull from either service scan or config
|
|
# -[ ] return some kinda client/proxy thinger?
|
|
# -[ ] maybe we should finally just provide this as
|
|
# a `tractor.hilevel.CallableProxy` or wtv?
|
|
# -[ ] use this from the storage.cli stuff to mark up gaps!
|
|
|
|
maybe_portals: list[Portal] | None
|
|
fqmes: list[str]
|
|
async with find_service(
|
|
service_name='chart',
|
|
first_only=False,
|
|
) as maybe_portals:
|
|
|
|
ctx_mngrs: list[AsyncContextManager] = []
|
|
|
|
# TODO: print the current discoverable actor UID set
|
|
# here as well?
|
|
if not maybe_portals:
|
|
raise RuntimeError(
|
|
'No chart actors found in service domain?'
|
|
)
|
|
|
|
for portal in maybe_portals:
|
|
ctx_mngrs.append(
|
|
portal.open_context(remote_annotate)
|
|
)
|
|
|
|
ctx2fqmes: dict[str, set[str]] = {}
|
|
fqme2ipc: dict[str, MsgStream] = {}
|
|
stream_ctxs: list[AsyncContextManager] = []
|
|
|
|
async with (
|
|
trionics.gather_contexts(ctx_mngrs) as ctxs,
|
|
):
|
|
for (ctx, fqmes) in ctxs:
|
|
stream_ctxs.append(ctx.open_stream())
|
|
|
|
# fill lookup table of mkt addrs to IPC ctxs
|
|
for fqme in fqmes:
|
|
if other := fqme2ipc.get(fqme):
|
|
raise ValueError(
|
|
f'More then one chart displays {fqme}!?\n'
|
|
'Other UI actor info:\n'
|
|
f'channel: {other._ctx.chan}]\n'
|
|
f'actor uid: {other._ctx.chan.uid}]\n'
|
|
f'ctx id: {other._ctx.cid}]\n'
|
|
)
|
|
|
|
ctx2fqmes.setdefault(
|
|
ctx.cid,
|
|
set(),
|
|
).add(fqme)
|
|
|
|
async with trionics.gather_contexts(stream_ctxs) as streams:
|
|
for stream in streams:
|
|
fqmes: set[str] = ctx2fqmes[stream._ctx.cid]
|
|
for fqme in fqmes:
|
|
fqme2ipc[fqme] = stream
|
|
|
|
# NOTE: on graceful teardown we always attempt to
|
|
# remove all annots that were created by the
|
|
# entering client.
|
|
# TODO: should we maybe instead/also do this on the
|
|
# server-actor side so that when a client
|
|
# disconnects we always delete all annotations by
|
|
# default instaead of expecting the client to?
|
|
async with AsyncExitStack() as annots_stack:
|
|
client = AnnotCtl(
|
|
ctx2fqmes=ctx2fqmes,
|
|
fqme2ipc=fqme2ipc,
|
|
_annot_stack=annots_stack,
|
|
)
|
|
yield client
|
|
|
|
# client exited, measure teardown time
|
|
teardown_profiler = Profiler(
|
|
msg='Client AnnotCtl teardown',
|
|
disabled=False,
|
|
ms_threshold=0.0,
|
|
)
|
|
teardown_profiler('exiting annots_stack')
|
|
|
|
teardown_profiler('annots_stack exited')
|
|
teardown_profiler('exiting gather_contexts')
|