diff --git a/piker/storage/cli.py b/piker/storage/cli.py index c15f4273..1d998558 100644 --- a/piker/storage/cli.py +++ b/piker/storage/cli.py @@ -19,6 +19,7 @@ Storage middle-ware CLIs. """ from __future__ import annotations +# from datetime import datetime from pathlib import Path import time @@ -253,7 +254,7 @@ def ldshm( # compute ohlc properties for naming times: np.ndarray = shm.array['time'] - period_s: float = times[-1] - times[-2] + period_s: float = float(times[-1] - times[-2]) if period_s < 1.: raise ValueError( f'Something is wrong with time period for {shm}:\n{times}' @@ -280,7 +281,82 @@ def ldshm( not gaps.is_empty() or null_segs ): - await tractor.pause() + from piker.ui._remote_ctl import ( + open_annot_ctl, + AnnotCtl, + ) + annot_ctl: AnnotCtl + async with open_annot_ctl() as annot_ctl: + for i in range(gaps.height): + + row: pl.DataFrame = gaps[i] + + # TODO: can we eventually remove this + # once we figure out why the epoch cols + # don't match? + iend: int = row['index'][0] + # dt: datetime = row['dt'][0] + # dt_prev: datetime = row['dt_prev'][0] + + # the gap's right-most bar's OPEN value + # at that time (sample) step. + # dt_end_t: float = dt.timestamp() + + # TODO: FIX HOW/WHY these aren't matching + # and are instead off by 4hours (EST + # vs. UTC?!?!) + # end_t: float = row['time'] + # assert ( + # dt.timestamp() + # == + # end_t + # ) + + # the gap's left-most bar's CLOSE value + # at that time (sample) step. + + prev_r: pl.DataFrame = df.filter( + pl.col('index') == gaps[0]['index'] - 1 + ) + istart: int = prev_r['index'][0] + # dt_start_t: float = dt_prev.timestamp() + + # start_t: float = prev_r['time'] + # assert ( + # dt_start_t + # == + # start_t + # ) + + # TODO: implement px-col width measure + # and ensure at least as many px-cols + # shown per rect as configured by user. + gap_w: float = abs((iend - istart)) + # await tractor.pause() + if gap_w < 6: + margin: float = 6 + iend += margin + istart -= margin + + ro: tuple[float, float] = ( + # dt_end_t, + iend, + row['open'][0], + ) + lc: tuple[float, float] = ( + # dt_start_t, + istart, + prev_r['close'][0], + ) + + aid: int = await annot_ctl.add_rect( + fqme=fqme, + timeframe=period_s, + start_pos=lc, + end_pos=ro, + ) + assert aid + await tractor.pause() # write to parquet file? if write_parquet: diff --git a/piker/ui/_remote_ctl.py b/piker/ui/_remote_ctl.py index ce4dcd95..c5cc27b0 100644 --- a/piker/ui/_remote_ctl.py +++ b/piker/ui/_remote_ctl.py @@ -21,15 +21,28 @@ to a chart from some other actor. ''' from __future__ import annotations from contextlib import asynccontextmanager as acm +from pprint import pformat +from typing import ( + Any, + AsyncContextManager, +) import tractor -# import trio +from tractor import trionics +from tractor import ( + Portal, + Context, + MsgStream, +) from PyQt5.QtWidgets import ( QGraphicsItem, ) from piker.log import get_logger +from piker.types import Struct +from piker.service import find_service from ._display import DisplayState +from ._interaction import ChartView from ._editors import SelectRect from ._chart import ChartPlotWidget @@ -44,7 +57,9 @@ _dss: dict[str, DisplayState] | None = None # stash each and every client connection so that they can all # be cancelled on shutdown/error. -_ctxs: set[tractor.Context] = set() +# TODO: make `tractor.Context` hashable via is `.cid: str`? +# _ctxs: set[Context] = set() +_ctxs: list[Context] = [] # global map of all uniquely created annotation-graphics # so that they can be mutated (eventually) by a client. @@ -53,13 +68,13 @@ _annots: dict[int, QGraphicsItem] = {} @tractor.context async def remote_annotate( - ctx: tractor.Context, + ctx: Context, ) -> None: global _dss, _ctxs assert _dss - _ctxs.add(ctx) + _ctxs.append(ctx) # send back full fqme symbology to caller await ctx.started(list(_dss)) @@ -68,52 +83,191 @@ async def remote_annotate( async for msg in annot_req_stream: match msg: case { + 'annot': 'SelectRect', 'fqme': fqme, - 'cmd': 'SelectRect', - 'color': color, 'timeframe': timeframe, - # 'meth': str(meth), - 'meth': 'set_view_pos', - 'kwargs': { - 'start_pos': tuple(start_pos), - 'end_pos': tuple(end_pos), - }, + 'meth': str(meth), + 'kwargs': dict(kwargs), }: + ds: DisplayState = _dss[fqme] chart: ChartPlotWidget = { 60: ds.hist_chart, 1: ds.chart, }[timeframe] + cv: ChartView = chart.cv + + # sanity + if timeframe == 60: + assert ( + chart.linked.godwidget.hist_linked.chart.view + is + cv + ) # annot type lookup from cmd rect = SelectRect( - chart.cv, + viewbox=cv, - # TODO: pull from conf.toml? - color=color or 'dad_blue', - ) - rect.set_view_pos( - start_pos=start_pos, - end_pos=end_pos, + # TODO: make this more dynamic? + # -[ ] pull from conf.toml? + # -[ ] add `.set_color()` method to type? + # -[ ] make a green/red based on direction + # instead of default static color? + color=kwargs.pop('color', None), ) + # XXX NOTE: this is REQUIRED to set the rect + # resize callback! + rect.chart: ChartPlotWidget = chart + + # delegate generically to the requested method + getattr(rect, meth)(**kwargs) + rect.show() await annot_req_stream.send(id(rect)) case _: log.error( 'Unknown remote annotation cmd:\n' - f'{msg}' + f'{pformat(msg)}' ) -@acm -async def open_annots_client( - uid: tuple[str, str], +class AnnotCtl(Struct): + ''' + A control for remote "data annotations". -) -> 'AnnotClient': + You know those "squares they always show in machine vision + UIs.." this API allows you to remotely control stuff like that + in some other graphics actor. + + ''' + ctx2fqmes: dict[str, str] + fqme2stream: dict[str, MsgStream] + + async def add_rect( + self, + fqme: str, + timeframe: float, + start_pos: tuple[float, float], + end_pos: tuple[float, float], + + # TODO: a `Literal['view', 'scene']` for this? + domain: str = 'view', # or 'scene' + color: str = 'dad_blue', + + ) -> int: + ''' + Add a `SelectRect` annotation to the target view, return + the instances `id(obj)` from the remote UI actor. + + ''' + ipc: MsgStream = self.fqme2stream[fqme] + await ipc.send({ + 'fqme': fqme, + 'annot': 'SelectRect', + 'timeframe': timeframe, + # 'meth': str(meth), + 'meth': 'set_view_pos' if domain == 'view' else 'set_scene_pos', + 'kwargs': { + 'start_pos': tuple(start_pos), + 'end_pos': tuple(end_pos), + 'color': color, + 'update_label': False, + }, + }) + return (await ipc.receive()) + + async def modify( + self, + aid: int, # annotation id + meth: str, # far end graphics object method to invoke + params: dict[str, Any], # far end `meth(**kwargs)` + ) -> bool: + ''' + Modify an existing (remote) annotation's graphics + paramters, thus changing it's appearance / state in real + time. + + ''' + raise NotImplementedError + + async def remove( + self, + uid: int, + ) -> bool: + ''' + Remove an existing annotation by instance id. + + ''' + raise NotImplementedError + + + +@acm +async def open_annot_ctl( + uid: tuple[str, str] | None = None, + +) -> AnnotCtl: # TODO: load connetion to a specific chart actor # -[ ] pull from either service scan or config # -[ ] return some kinda client/proxy thinger? # -[ ] maybe we should finally just provide this as # a `tractor.hilevel.CallableProxy` or wtv? # -[ ] use this from the storage.cli stuff to mark up gaps! - ... + + maybe_portals: list[Portal] | None + fqmes: list[str] + async with find_service( + service_name='chart', + first_only=False, + ) as maybe_portals: + + ctx_mngrs: list[AsyncContextManager] = [] + + # TODO: print the current discoverable actor UID set + # here as well? + if not maybe_portals: + raise RuntimeError('No chart UI actors found in service domain?') + + for portal in maybe_portals: + ctx_mngrs.append( + portal.open_context(remote_annotate) + ) + + ctx2fqmes: dict[str, set[str]] = {} + fqme2stream: dict[str, MsgStream] = {} + client = AnnotCtl( + ctx2fqmes=ctx2fqmes, + fqme2stream=fqme2stream, + ) + + stream_ctxs: list[AsyncContextManager] = [] + async with trionics.gather_contexts(ctx_mngrs) as ctxs: + for (ctx, fqmes) in ctxs: + stream_ctxs.append(ctx.open_stream()) + + # fill lookup table of mkt addrs to IPC ctxs + for fqme in fqmes: + if other := fqme2stream.get(fqme): + raise ValueError( + f'More then one chart displays {fqme}!?\n' + 'Other UI actor info:\n' + f'channel: {other._ctx.chan}]\n' + f'actor uid: {other._ctx.chan.uid}]\n' + f'ctx id: {other._ctx.cid}]\n' + ) + + ctx2fqmes.setdefault( + ctx.cid, + set(), + ).add(fqme) + + async with trionics.gather_contexts(stream_ctxs) as streams: + for stream in streams: + fqmes: set[str] = ctx2fqmes[stream._ctx.cid] + for fqme in fqmes: + fqme2stream[fqme] = stream + + yield client + # TODO: on graceful teardown should we try to + # remove all annots that were created/modded? diff --git a/piker/ui/cli.py b/piker/ui/cli.py index ca4bb653..dfc7c7ea 100644 --- a/piker/ui/cli.py +++ b/piker/ui/cli.py @@ -228,5 +228,10 @@ def chart( 'loglevel': tractorloglevel, 'name': 'chart', 'registry_addrs': list(set(regaddrs)), + 'enable_modules': [ + + # remote data-view annotations Bo + 'piker.ui._remote_ctl', + ], }, )