diff --git a/piker/ui/_display.py b/piker/ui/_display.py index 7c9870fa..0b7caf63 100644 --- a/piker/ui/_display.py +++ b/piker/ui/_display.py @@ -471,6 +471,9 @@ async def graphics_update_loop( await tractor.pause() try: + # XXX TODO: we need to do _dss UPDATE here so that when + # a feed-view is switched you can still remote annotate the + # prior view.. from . import _remote_ctl _remote_ctl._dss = dss @@ -526,7 +529,7 @@ async def graphics_update_loop( finally: # XXX: cancel any remote annotation control ctxs _remote_ctl._dss = None - for ctx in _remote_ctl._ctxs: + for cid, (ctx, aids) in _remote_ctl._ctxs.items(): await ctx.cancel() diff --git a/piker/ui/_remote_ctl.py b/piker/ui/_remote_ctl.py index 4c2a5161..62c7eec5 100644 --- a/piker/ui/_remote_ctl.py +++ b/piker/ui/_remote_ctl.py @@ -45,11 +45,11 @@ from PyQt5.QtWidgets import ( from piker.log import get_logger from piker.types import Struct from piker.service import find_service +from piker.brokers import SymbolNotFound from ._display import DisplayState from ._interaction import ChartView from ._editors import SelectRect from ._chart import ChartPlotWidget -from ..brokers import SymbolNotFound log = get_logger(__name__) @@ -64,13 +64,117 @@ _dss: dict[str, DisplayState] | None = None # be cancelled on shutdown/error. # TODO: make `tractor.Context` hashable via is `.cid: str`? # _ctxs: set[Context] = set() -_ctxs: list[Context] = [] +# TODO: use type statements from 3.12+ +IpcCtxTable = dict[ + str, # each `Context.cid` + tuple[ + Context, # handle for ctx-cancellation + set[int] # set of annotation (instance) ids + ] +] + +_ctxs: IpcCtxTable = {} # XXX: global map of all uniquely created annotation-graphics so # that they can be mutated (eventually) by a client. # NOTE: this map is only populated on the `chart` actor side (aka # the "annotations server" which actually renders to a Qt canvas). -_annots: dict[int, QGraphicsItem] = {} +# type AnnotsTable = dict[int, QGraphicsItem] +AnnotsTable = dict[int, QGraphicsItem] + +_annots: AnnotsTable = {} + + +async def serve_rc_annots( + ipc_key: str, + annot_req_stream: MsgStream, + dss: dict[str, DisplayState], + ctxs: IpcCtxTable, + annots: AnnotsTable, + +) -> None: + async for msg in annot_req_stream: + match msg: + case { + 'annot': 'SelectRect', + 'fqme': fqme, + 'timeframe': timeframe, + 'meth': str(meth), + 'kwargs': dict(kwargs), + }: + + ds: DisplayState = _dss[fqme] + chart: ChartPlotWidget = { + 60: ds.hist_chart, + 1: ds.chart, + }[timeframe] + cv: ChartView = chart.cv + + # sanity + if timeframe == 60: + assert ( + chart.linked.godwidget.hist_linked.chart.view + is + cv + ) + + # annot type lookup from cmd + rect = SelectRect( + viewbox=cv, + + # TODO: make this more dynamic? + # -[ ] pull from conf.toml? + # -[ ] add `.set_color()` method to type? + # -[ ] make a green/red based on direction + # instead of default static color? + color=kwargs.pop('color', None), + ) + # XXX NOTE: this is REQUIRED to set the rect + # resize callback! + rect.chart: ChartPlotWidget = chart + + # delegate generically to the requested method + getattr(rect, meth)(**kwargs) + rect.show() + aid: int = id(rect) + annots[aid] = rect + aids: set[int] = ctxs[ipc_key][1] + aids.add(aid) + await annot_req_stream.send(aid) + + case { + 'rm_annot': int(aid), + }: + # NOTE: this is normally entered on + # a client's annotation de-alloc normally + # prior to detach or modify. + annot: QGraphicsItem = annots[aid] + annot.delete() + + # respond to client indicating annot + # was indeed deleted. + await annot_req_stream.send(aid) + + case { + 'fqme': fqme, + 'render': int(aid), + 'viz_name': str(viz_name), + 'timeframe': timeframe, + }: + # | { + # 'backfilling': (str(viz_name), timeframe), + # }: + ds: DisplayState = _dss[viz_name] + chart: ChartPlotWidget = { + 60: ds.hist_chart, + 1: ds.chart, + }[timeframe] + + case _: + log.error( + 'Unknown remote annotation cmd:\n' + f'{pformat(msg)}' + ) @tractor.context @@ -81,77 +185,29 @@ async def remote_annotate( global _dss, _ctxs assert _dss - _ctxs.append(ctx) + _ctxs[ctx.cid] = (ctx, set()) # send back full fqme symbology to caller await ctx.started(list(_dss)) + # open annot request handler stream async with ctx.open_stream() as annot_req_stream: - async for msg in annot_req_stream: - match msg: - case { - 'annot': 'SelectRect', - 'fqme': fqme, - 'timeframe': timeframe, - 'meth': str(meth), - 'kwargs': dict(kwargs), - }: - - ds: DisplayState = _dss[fqme] - chart: ChartPlotWidget = { - 60: ds.hist_chart, - 1: ds.chart, - }[timeframe] - cv: ChartView = chart.cv - - # sanity - if timeframe == 60: - assert ( - chart.linked.godwidget.hist_linked.chart.view - is - cv - ) - - # annot type lookup from cmd - rect = SelectRect( - viewbox=cv, - - # TODO: make this more dynamic? - # -[ ] pull from conf.toml? - # -[ ] add `.set_color()` method to type? - # -[ ] make a green/red based on direction - # instead of default static color? - color=kwargs.pop('color', None), - ) - # XXX NOTE: this is REQUIRED to set the rect - # resize callback! - rect.chart: ChartPlotWidget = chart - - # delegate generically to the requested method - getattr(rect, meth)(**kwargs) - rect.show() - aid: int = id(rect) - _annots[aid] = rect - await annot_req_stream.send(aid) - - case { - 'rm_annot': int(aid), - }: - # NOTE: this is normally entered on - # a client's annotation de-alloc normally - # prior to detach or modify. - annot: 'QGraphicsItem' = _annots[aid] - annot.delete() - - # respond to client indicating annot - # was indeed deleted. - await annot_req_stream.send(aid) - - case _: - log.error( - 'Unknown remote annotation cmd:\n' - f'{pformat(msg)}' - ) + try: + await serve_rc_annots( + ipc_key=ctx.cid, + annot_req_stream=annot_req_stream, + dss=_dss, + ctxs=_ctxs, + annots=_annots, + ) + finally: + # ensure all annots for this connection are deleted + # on any final teardown + (_ctx, aids) = _ctxs[ctx.cid] + assert _ctx is ctx + for aid in aids: + annot: QGraphicsItem = _annots[aid] + annot.delete() class AnnotCtl(Struct): @@ -301,6 +357,7 @@ async def open_annot_ctl( ctx2fqmes: dict[str, set[str]] = {} fqme2ipc: dict[str, MsgStream] = {} stream_ctxs: list[AsyncContextManager] = [] + async with ( trionics.gather_contexts(ctx_mngrs) as ctxs, ): @@ -332,6 +389,10 @@ async def open_annot_ctl( # NOTE: on graceful teardown we always attempt to # remove all annots that were created by the # entering client. + # TODO: should we maybe instead/also do this on the + # server-actor side so that when a client + # disconnects we always delete all annotations by + # default instaead of expecting the client to? async with AsyncExitStack() as annots_stack: client = AnnotCtl( ctx2fqmes=ctx2fqmes,