Factor UI-rc loop into ctx-free func
In theory the `async for msg` loop can be re-purposed without having to always call `remote_annotate()` so factor it into a new `serve_rc_annots()` and then just call it from the former (for now) with the wrapping `try:` block outside to delete per-client-ctx annotation instance sets. Also, use some type aliases instead of repeatedly defining the same complex `dict`-table defs B)distribute_dis
							parent
							
								
									d4b07cc95a
								
							
						
					
					
						commit
						ad565936ec
					
				| 
						 | 
					@ -471,6 +471,9 @@ async def graphics_update_loop(
 | 
				
			||||||
            await tractor.pause()
 | 
					            await tractor.pause()
 | 
				
			||||||
 | 
					
 | 
				
			||||||
    try:
 | 
					    try:
 | 
				
			||||||
 | 
					        # XXX TODO: we need to do _dss UPDATE here so that when
 | 
				
			||||||
 | 
					        # a feed-view is switched you can still remote annotate the
 | 
				
			||||||
 | 
					        # prior view..
 | 
				
			||||||
        from . import _remote_ctl
 | 
					        from . import _remote_ctl
 | 
				
			||||||
        _remote_ctl._dss = dss
 | 
					        _remote_ctl._dss = dss
 | 
				
			||||||
 | 
					
 | 
				
			||||||
| 
						 | 
					@ -526,7 +529,7 @@ async def graphics_update_loop(
 | 
				
			||||||
    finally:
 | 
					    finally:
 | 
				
			||||||
        # XXX: cancel any remote annotation control ctxs
 | 
					        # XXX: cancel any remote annotation control ctxs
 | 
				
			||||||
        _remote_ctl._dss = None
 | 
					        _remote_ctl._dss = None
 | 
				
			||||||
        for ctx in _remote_ctl._ctxs:
 | 
					        for cid, (ctx, aids) in _remote_ctl._ctxs.items():
 | 
				
			||||||
            await ctx.cancel()
 | 
					            await ctx.cancel()
 | 
				
			||||||
 | 
					
 | 
				
			||||||
 | 
					
 | 
				
			||||||
| 
						 | 
					
 | 
				
			||||||
| 
						 | 
					@ -45,11 +45,11 @@ from PyQt5.QtWidgets import (
 | 
				
			||||||
from piker.log import get_logger
 | 
					from piker.log import get_logger
 | 
				
			||||||
from piker.types import Struct
 | 
					from piker.types import Struct
 | 
				
			||||||
from piker.service import find_service
 | 
					from piker.service import find_service
 | 
				
			||||||
 | 
					from piker.brokers import SymbolNotFound
 | 
				
			||||||
from ._display import DisplayState
 | 
					from ._display import DisplayState
 | 
				
			||||||
from ._interaction import ChartView
 | 
					from ._interaction import ChartView
 | 
				
			||||||
from ._editors import SelectRect
 | 
					from ._editors import SelectRect
 | 
				
			||||||
from ._chart import ChartPlotWidget
 | 
					from ._chart import ChartPlotWidget
 | 
				
			||||||
from ..brokers import SymbolNotFound
 | 
					 | 
				
			||||||
 | 
					
 | 
				
			||||||
 | 
					
 | 
				
			||||||
log = get_logger(__name__)
 | 
					log = get_logger(__name__)
 | 
				
			||||||
| 
						 | 
					@ -64,13 +64,117 @@ _dss: dict[str, DisplayState] | None = None
 | 
				
			||||||
# be cancelled on shutdown/error.
 | 
					# be cancelled on shutdown/error.
 | 
				
			||||||
# TODO: make `tractor.Context` hashable via is `.cid: str`?
 | 
					# TODO: make `tractor.Context` hashable via is `.cid: str`?
 | 
				
			||||||
# _ctxs: set[Context] = set()
 | 
					# _ctxs: set[Context] = set()
 | 
				
			||||||
_ctxs: list[Context] = []
 | 
					# TODO: use type statements from 3.12+
 | 
				
			||||||
 | 
					IpcCtxTable = dict[
 | 
				
			||||||
 | 
					    str,  # each `Context.cid`
 | 
				
			||||||
 | 
					    tuple[
 | 
				
			||||||
 | 
					        Context,  # handle for ctx-cancellation
 | 
				
			||||||
 | 
					        set[int]  # set of annotation (instance) ids
 | 
				
			||||||
 | 
					    ]
 | 
				
			||||||
 | 
					]
 | 
				
			||||||
 | 
					
 | 
				
			||||||
 | 
					_ctxs: IpcCtxTable = {}
 | 
				
			||||||
 | 
					
 | 
				
			||||||
# XXX: global map of all uniquely created annotation-graphics so
 | 
					# XXX: global map of all uniquely created annotation-graphics so
 | 
				
			||||||
# that they can be mutated (eventually) by a client.
 | 
					# that they can be mutated (eventually) by a client.
 | 
				
			||||||
# NOTE: this map is only populated on the `chart` actor side (aka
 | 
					# NOTE: this map is only populated on the `chart` actor side (aka
 | 
				
			||||||
# the "annotations server" which actually renders to a Qt canvas).
 | 
					# the "annotations server" which actually renders to a Qt canvas).
 | 
				
			||||||
_annots: dict[int, QGraphicsItem] = {}
 | 
					# type AnnotsTable = dict[int, QGraphicsItem]
 | 
				
			||||||
 | 
					AnnotsTable = dict[int, QGraphicsItem]
 | 
				
			||||||
 | 
					
 | 
				
			||||||
 | 
					_annots: AnnotsTable  = {}
 | 
				
			||||||
 | 
					
 | 
				
			||||||
 | 
					
 | 
				
			||||||
 | 
					async def serve_rc_annots(
 | 
				
			||||||
 | 
					    ipc_key: str,
 | 
				
			||||||
 | 
					    annot_req_stream: MsgStream,
 | 
				
			||||||
 | 
					    dss: dict[str, DisplayState],
 | 
				
			||||||
 | 
					    ctxs: IpcCtxTable,
 | 
				
			||||||
 | 
					    annots: AnnotsTable,
 | 
				
			||||||
 | 
					
 | 
				
			||||||
 | 
					) -> None:
 | 
				
			||||||
 | 
					    async for msg in annot_req_stream:
 | 
				
			||||||
 | 
					        match msg:
 | 
				
			||||||
 | 
					            case {
 | 
				
			||||||
 | 
					                'annot': 'SelectRect',
 | 
				
			||||||
 | 
					                'fqme': fqme,
 | 
				
			||||||
 | 
					                'timeframe': timeframe,
 | 
				
			||||||
 | 
					                'meth': str(meth),
 | 
				
			||||||
 | 
					                'kwargs': dict(kwargs),
 | 
				
			||||||
 | 
					            }:
 | 
				
			||||||
 | 
					
 | 
				
			||||||
 | 
					                ds: DisplayState = _dss[fqme]
 | 
				
			||||||
 | 
					                chart: ChartPlotWidget = {
 | 
				
			||||||
 | 
					                    60: ds.hist_chart,
 | 
				
			||||||
 | 
					                    1: ds.chart,
 | 
				
			||||||
 | 
					                }[timeframe]
 | 
				
			||||||
 | 
					                cv: ChartView = chart.cv
 | 
				
			||||||
 | 
					
 | 
				
			||||||
 | 
					                # sanity
 | 
				
			||||||
 | 
					                if timeframe == 60:
 | 
				
			||||||
 | 
					                    assert (
 | 
				
			||||||
 | 
					                        chart.linked.godwidget.hist_linked.chart.view
 | 
				
			||||||
 | 
					                        is
 | 
				
			||||||
 | 
					                        cv
 | 
				
			||||||
 | 
					                    )
 | 
				
			||||||
 | 
					
 | 
				
			||||||
 | 
					                # annot type lookup from cmd
 | 
				
			||||||
 | 
					                rect = SelectRect(
 | 
				
			||||||
 | 
					                    viewbox=cv,
 | 
				
			||||||
 | 
					
 | 
				
			||||||
 | 
					                    # TODO: make this more dynamic?
 | 
				
			||||||
 | 
					                    # -[ ] pull from conf.toml?
 | 
				
			||||||
 | 
					                    # -[ ] add `.set_color()` method to type?
 | 
				
			||||||
 | 
					                    # -[ ] make a green/red based on direction
 | 
				
			||||||
 | 
					                    #    instead of default static color?
 | 
				
			||||||
 | 
					                    color=kwargs.pop('color', None),
 | 
				
			||||||
 | 
					                )
 | 
				
			||||||
 | 
					                # XXX NOTE: this is REQUIRED to set the rect
 | 
				
			||||||
 | 
					                # resize callback!
 | 
				
			||||||
 | 
					                rect.chart: ChartPlotWidget = chart
 | 
				
			||||||
 | 
					
 | 
				
			||||||
 | 
					                # delegate generically to the requested method
 | 
				
			||||||
 | 
					                getattr(rect, meth)(**kwargs)
 | 
				
			||||||
 | 
					                rect.show()
 | 
				
			||||||
 | 
					                aid: int = id(rect)
 | 
				
			||||||
 | 
					                annots[aid] = rect
 | 
				
			||||||
 | 
					                aids: set[int] = ctxs[ipc_key][1]
 | 
				
			||||||
 | 
					                aids.add(aid)
 | 
				
			||||||
 | 
					                await annot_req_stream.send(aid)
 | 
				
			||||||
 | 
					
 | 
				
			||||||
 | 
					            case {
 | 
				
			||||||
 | 
					                'rm_annot': int(aid),
 | 
				
			||||||
 | 
					            }:
 | 
				
			||||||
 | 
					                # NOTE: this is normally entered on
 | 
				
			||||||
 | 
					                # a client's annotation de-alloc normally
 | 
				
			||||||
 | 
					                # prior to detach or modify.
 | 
				
			||||||
 | 
					                annot: QGraphicsItem = annots[aid]
 | 
				
			||||||
 | 
					                annot.delete()
 | 
				
			||||||
 | 
					
 | 
				
			||||||
 | 
					                # respond to client indicating annot
 | 
				
			||||||
 | 
					                # was indeed deleted.
 | 
				
			||||||
 | 
					                await annot_req_stream.send(aid)
 | 
				
			||||||
 | 
					
 | 
				
			||||||
 | 
					            case {
 | 
				
			||||||
 | 
					                'fqme': fqme,
 | 
				
			||||||
 | 
					                'render': int(aid),
 | 
				
			||||||
 | 
					                'viz_name': str(viz_name),
 | 
				
			||||||
 | 
					                'timeframe': timeframe,
 | 
				
			||||||
 | 
					            }:
 | 
				
			||||||
 | 
					            # | {
 | 
				
			||||||
 | 
					            #     'backfilling': (str(viz_name), timeframe),
 | 
				
			||||||
 | 
					            # }:
 | 
				
			||||||
 | 
					                ds: DisplayState = _dss[viz_name]
 | 
				
			||||||
 | 
					                chart: ChartPlotWidget = {
 | 
				
			||||||
 | 
					                    60: ds.hist_chart,
 | 
				
			||||||
 | 
					                    1: ds.chart,
 | 
				
			||||||
 | 
					                }[timeframe]
 | 
				
			||||||
 | 
					
 | 
				
			||||||
 | 
					            case _:
 | 
				
			||||||
 | 
					                log.error(
 | 
				
			||||||
 | 
					                    'Unknown remote annotation cmd:\n'
 | 
				
			||||||
 | 
					                    f'{pformat(msg)}'
 | 
				
			||||||
 | 
					                )
 | 
				
			||||||
 | 
					
 | 
				
			||||||
 | 
					
 | 
				
			||||||
@tractor.context
 | 
					@tractor.context
 | 
				
			||||||
| 
						 | 
					@ -81,77 +185,29 @@ async def remote_annotate(
 | 
				
			||||||
    global _dss, _ctxs
 | 
					    global _dss, _ctxs
 | 
				
			||||||
    assert _dss
 | 
					    assert _dss
 | 
				
			||||||
 | 
					
 | 
				
			||||||
    _ctxs.append(ctx)
 | 
					    _ctxs[ctx.cid] = (ctx, set())
 | 
				
			||||||
 | 
					
 | 
				
			||||||
    # send back full fqme symbology to caller
 | 
					    # send back full fqme symbology to caller
 | 
				
			||||||
    await ctx.started(list(_dss))
 | 
					    await ctx.started(list(_dss))
 | 
				
			||||||
 | 
					
 | 
				
			||||||
 | 
					    # open annot request handler stream
 | 
				
			||||||
    async with ctx.open_stream() as annot_req_stream:
 | 
					    async with ctx.open_stream() as annot_req_stream:
 | 
				
			||||||
        async for msg in annot_req_stream:
 | 
					        try:
 | 
				
			||||||
            match msg:
 | 
					            await serve_rc_annots(
 | 
				
			||||||
                case {
 | 
					                ipc_key=ctx.cid,
 | 
				
			||||||
                    'annot': 'SelectRect',
 | 
					                annot_req_stream=annot_req_stream,
 | 
				
			||||||
                    'fqme': fqme,
 | 
					                dss=_dss,
 | 
				
			||||||
                    'timeframe': timeframe,
 | 
					                ctxs=_ctxs,
 | 
				
			||||||
                    'meth': str(meth),
 | 
					                annots=_annots,
 | 
				
			||||||
                    'kwargs': dict(kwargs),
 | 
					            )
 | 
				
			||||||
                }:
 | 
					        finally:
 | 
				
			||||||
 | 
					            # ensure all annots for this connection are deleted
 | 
				
			||||||
                    ds: DisplayState = _dss[fqme]
 | 
					            # on any final teardown
 | 
				
			||||||
                    chart: ChartPlotWidget = {
 | 
					            (_ctx, aids) = _ctxs[ctx.cid]
 | 
				
			||||||
                        60: ds.hist_chart,
 | 
					            assert _ctx is ctx
 | 
				
			||||||
                        1: ds.chart,
 | 
					            for aid in aids:
 | 
				
			||||||
                    }[timeframe]
 | 
					                annot: QGraphicsItem = _annots[aid]
 | 
				
			||||||
                    cv: ChartView = chart.cv
 | 
					                annot.delete()
 | 
				
			||||||
 | 
					 | 
				
			||||||
                    # sanity
 | 
					 | 
				
			||||||
                    if timeframe == 60:
 | 
					 | 
				
			||||||
                        assert (
 | 
					 | 
				
			||||||
                            chart.linked.godwidget.hist_linked.chart.view
 | 
					 | 
				
			||||||
                            is
 | 
					 | 
				
			||||||
                            cv
 | 
					 | 
				
			||||||
                        )
 | 
					 | 
				
			||||||
 | 
					 | 
				
			||||||
                    # annot type lookup from cmd
 | 
					 | 
				
			||||||
                    rect = SelectRect(
 | 
					 | 
				
			||||||
                        viewbox=cv,
 | 
					 | 
				
			||||||
 | 
					 | 
				
			||||||
                        # TODO: make this more dynamic?
 | 
					 | 
				
			||||||
                        # -[ ] pull from conf.toml?
 | 
					 | 
				
			||||||
                        # -[ ] add `.set_color()` method to type?
 | 
					 | 
				
			||||||
                        # -[ ] make a green/red based on direction
 | 
					 | 
				
			||||||
                        #    instead of default static color?
 | 
					 | 
				
			||||||
                        color=kwargs.pop('color', None),
 | 
					 | 
				
			||||||
                    )
 | 
					 | 
				
			||||||
                    # XXX NOTE: this is REQUIRED to set the rect
 | 
					 | 
				
			||||||
                    # resize callback!
 | 
					 | 
				
			||||||
                    rect.chart: ChartPlotWidget = chart
 | 
					 | 
				
			||||||
 | 
					 | 
				
			||||||
                    # delegate generically to the requested method
 | 
					 | 
				
			||||||
                    getattr(rect, meth)(**kwargs)
 | 
					 | 
				
			||||||
                    rect.show()
 | 
					 | 
				
			||||||
                    aid: int = id(rect)
 | 
					 | 
				
			||||||
                    _annots[aid] = rect
 | 
					 | 
				
			||||||
                    await annot_req_stream.send(aid)
 | 
					 | 
				
			||||||
 | 
					 | 
				
			||||||
                case {
 | 
					 | 
				
			||||||
                    'rm_annot': int(aid),
 | 
					 | 
				
			||||||
                }:
 | 
					 | 
				
			||||||
                    # NOTE: this is normally entered on
 | 
					 | 
				
			||||||
                    # a client's annotation de-alloc normally
 | 
					 | 
				
			||||||
                    # prior to detach or modify.
 | 
					 | 
				
			||||||
                    annot: 'QGraphicsItem' = _annots[aid]
 | 
					 | 
				
			||||||
                    annot.delete()
 | 
					 | 
				
			||||||
 | 
					 | 
				
			||||||
                    # respond to client indicating annot
 | 
					 | 
				
			||||||
                    # was indeed deleted.
 | 
					 | 
				
			||||||
                    await annot_req_stream.send(aid)
 | 
					 | 
				
			||||||
 | 
					 | 
				
			||||||
                case _:
 | 
					 | 
				
			||||||
                    log.error(
 | 
					 | 
				
			||||||
                        'Unknown remote annotation cmd:\n'
 | 
					 | 
				
			||||||
                        f'{pformat(msg)}'
 | 
					 | 
				
			||||||
                    )
 | 
					 | 
				
			||||||
 | 
					
 | 
				
			||||||
 | 
					
 | 
				
			||||||
class AnnotCtl(Struct):
 | 
					class AnnotCtl(Struct):
 | 
				
			||||||
| 
						 | 
					@ -301,6 +357,7 @@ async def open_annot_ctl(
 | 
				
			||||||
        ctx2fqmes: dict[str, set[str]] = {}
 | 
					        ctx2fqmes: dict[str, set[str]] = {}
 | 
				
			||||||
        fqme2ipc: dict[str, MsgStream] = {}
 | 
					        fqme2ipc: dict[str, MsgStream] = {}
 | 
				
			||||||
        stream_ctxs: list[AsyncContextManager] = []
 | 
					        stream_ctxs: list[AsyncContextManager] = []
 | 
				
			||||||
 | 
					
 | 
				
			||||||
        async with (
 | 
					        async with (
 | 
				
			||||||
            trionics.gather_contexts(ctx_mngrs) as ctxs,
 | 
					            trionics.gather_contexts(ctx_mngrs) as ctxs,
 | 
				
			||||||
        ):
 | 
					        ):
 | 
				
			||||||
| 
						 | 
					@ -332,6 +389,10 @@ async def open_annot_ctl(
 | 
				
			||||||
                # NOTE: on graceful teardown we always attempt to
 | 
					                # NOTE: on graceful teardown we always attempt to
 | 
				
			||||||
                # remove all annots that were created by the
 | 
					                # remove all annots that were created by the
 | 
				
			||||||
                # entering client.
 | 
					                # entering client.
 | 
				
			||||||
 | 
					                # TODO: should we maybe instead/also do this on the
 | 
				
			||||||
 | 
					                # server-actor side so that when a client
 | 
				
			||||||
 | 
					                # disconnects we always delete all annotations by
 | 
				
			||||||
 | 
					                # default instaead of expecting the client to?
 | 
				
			||||||
                async with AsyncExitStack() as annots_stack:
 | 
					                async with AsyncExitStack() as annots_stack:
 | 
				
			||||||
                    client = AnnotCtl(
 | 
					                    client = AnnotCtl(
 | 
				
			||||||
                        ctx2fqmes=ctx2fqmes,
 | 
					                        ctx2fqmes=ctx2fqmes,
 | 
				
			||||||
| 
						 | 
					
 | 
				
			||||||
		Loading…
	
		Reference in New Issue