A working remote annotations controller B)
Obvi took a little `.ui` component fixing (as per prior commits) but
this is now a working PoC for gap detection and markup from a remote
(data) non-`chart` actor!
Iface and impl deats from `.ui._remote_ctl`:
- add new `open_annot_ctl()` mngr which attaches to all locally
discoverable chart actors, gathers annot-ctl streams per fqme set, and
delivers a new `AnnotCtl` client which allows adding annotation
rectangles via a `.add_rect()` method.
- also template out some other soon-to-get methods for removing and
modifying pre-exiting annotations on some `ChartView` 💥
- ensure the `chart` CLI subcmd starts the (`qtloops`) guest-mode init
with the `.ui._remote_ctl` module enabled.
- actually use this stuff in the `piker store ldshm` CLI to submit
markup rects around any detected null/time gaps in the tsdb data!
Still lots to do:
- probably colorization of gaps depending on if they're venue
closures (aka real mkt gaps) vs. "missing data" from the backend (aka
timeseries consistency gaps).
- run gap detection and markup as part of the std `.tsp` sub-sys
runtime such that gap annots are a std "built-in" feature of
charting.
- support for epoch time stamp AND abs-shm-index rect x-values
(depending on chart operational state).
distribute_dis
parent
e7fa841263
commit
b064a5f94d
|
@ -19,6 +19,7 @@ Storage middle-ware CLIs.
|
||||||
|
|
||||||
"""
|
"""
|
||||||
from __future__ import annotations
|
from __future__ import annotations
|
||||||
|
# from datetime import datetime
|
||||||
from pathlib import Path
|
from pathlib import Path
|
||||||
import time
|
import time
|
||||||
|
|
||||||
|
@ -253,7 +254,7 @@ def ldshm(
|
||||||
|
|
||||||
# compute ohlc properties for naming
|
# compute ohlc properties for naming
|
||||||
times: np.ndarray = shm.array['time']
|
times: np.ndarray = shm.array['time']
|
||||||
period_s: float = times[-1] - times[-2]
|
period_s: float = float(times[-1] - times[-2])
|
||||||
if period_s < 1.:
|
if period_s < 1.:
|
||||||
raise ValueError(
|
raise ValueError(
|
||||||
f'Something is wrong with time period for {shm}:\n{times}'
|
f'Something is wrong with time period for {shm}:\n{times}'
|
||||||
|
@ -280,6 +281,81 @@ def ldshm(
|
||||||
not gaps.is_empty()
|
not gaps.is_empty()
|
||||||
or null_segs
|
or null_segs
|
||||||
):
|
):
|
||||||
|
from piker.ui._remote_ctl import (
|
||||||
|
open_annot_ctl,
|
||||||
|
AnnotCtl,
|
||||||
|
)
|
||||||
|
annot_ctl: AnnotCtl
|
||||||
|
async with open_annot_ctl() as annot_ctl:
|
||||||
|
for i in range(gaps.height):
|
||||||
|
|
||||||
|
row: pl.DataFrame = gaps[i]
|
||||||
|
|
||||||
|
# TODO: can we eventually remove this
|
||||||
|
# once we figure out why the epoch cols
|
||||||
|
# don't match?
|
||||||
|
iend: int = row['index'][0]
|
||||||
|
# dt: datetime = row['dt'][0]
|
||||||
|
# dt_prev: datetime = row['dt_prev'][0]
|
||||||
|
|
||||||
|
# the gap's right-most bar's OPEN value
|
||||||
|
# at that time (sample) step.
|
||||||
|
# dt_end_t: float = dt.timestamp()
|
||||||
|
|
||||||
|
# TODO: FIX HOW/WHY these aren't matching
|
||||||
|
# and are instead off by 4hours (EST
|
||||||
|
# vs. UTC?!?!)
|
||||||
|
# end_t: float = row['time']
|
||||||
|
# assert (
|
||||||
|
# dt.timestamp()
|
||||||
|
# ==
|
||||||
|
# end_t
|
||||||
|
# )
|
||||||
|
|
||||||
|
# the gap's left-most bar's CLOSE value
|
||||||
|
# at that time (sample) step.
|
||||||
|
|
||||||
|
prev_r: pl.DataFrame = df.filter(
|
||||||
|
pl.col('index') == gaps[0]['index'] - 1
|
||||||
|
)
|
||||||
|
istart: int = prev_r['index'][0]
|
||||||
|
# dt_start_t: float = dt_prev.timestamp()
|
||||||
|
|
||||||
|
# start_t: float = prev_r['time']
|
||||||
|
# assert (
|
||||||
|
# dt_start_t
|
||||||
|
# ==
|
||||||
|
# start_t
|
||||||
|
# )
|
||||||
|
|
||||||
|
# TODO: implement px-col width measure
|
||||||
|
# and ensure at least as many px-cols
|
||||||
|
# shown per rect as configured by user.
|
||||||
|
gap_w: float = abs((iend - istart))
|
||||||
|
# await tractor.pause()
|
||||||
|
if gap_w < 6:
|
||||||
|
margin: float = 6
|
||||||
|
iend += margin
|
||||||
|
istart -= margin
|
||||||
|
|
||||||
|
ro: tuple[float, float] = (
|
||||||
|
# dt_end_t,
|
||||||
|
iend,
|
||||||
|
row['open'][0],
|
||||||
|
)
|
||||||
|
lc: tuple[float, float] = (
|
||||||
|
# dt_start_t,
|
||||||
|
istart,
|
||||||
|
prev_r['close'][0],
|
||||||
|
)
|
||||||
|
|
||||||
|
aid: int = await annot_ctl.add_rect(
|
||||||
|
fqme=fqme,
|
||||||
|
timeframe=period_s,
|
||||||
|
start_pos=lc,
|
||||||
|
end_pos=ro,
|
||||||
|
)
|
||||||
|
assert aid
|
||||||
await tractor.pause()
|
await tractor.pause()
|
||||||
|
|
||||||
# write to parquet file?
|
# write to parquet file?
|
||||||
|
|
|
@ -21,15 +21,28 @@ to a chart from some other actor.
|
||||||
'''
|
'''
|
||||||
from __future__ import annotations
|
from __future__ import annotations
|
||||||
from contextlib import asynccontextmanager as acm
|
from contextlib import asynccontextmanager as acm
|
||||||
|
from pprint import pformat
|
||||||
|
from typing import (
|
||||||
|
Any,
|
||||||
|
AsyncContextManager,
|
||||||
|
)
|
||||||
|
|
||||||
import tractor
|
import tractor
|
||||||
# import trio
|
from tractor import trionics
|
||||||
|
from tractor import (
|
||||||
|
Portal,
|
||||||
|
Context,
|
||||||
|
MsgStream,
|
||||||
|
)
|
||||||
from PyQt5.QtWidgets import (
|
from PyQt5.QtWidgets import (
|
||||||
QGraphicsItem,
|
QGraphicsItem,
|
||||||
)
|
)
|
||||||
|
|
||||||
from piker.log import get_logger
|
from piker.log import get_logger
|
||||||
|
from piker.types import Struct
|
||||||
|
from piker.service import find_service
|
||||||
from ._display import DisplayState
|
from ._display import DisplayState
|
||||||
|
from ._interaction import ChartView
|
||||||
from ._editors import SelectRect
|
from ._editors import SelectRect
|
||||||
from ._chart import ChartPlotWidget
|
from ._chart import ChartPlotWidget
|
||||||
|
|
||||||
|
@ -44,7 +57,9 @@ _dss: dict[str, DisplayState] | None = None
|
||||||
|
|
||||||
# stash each and every client connection so that they can all
|
# stash each and every client connection so that they can all
|
||||||
# be cancelled on shutdown/error.
|
# be cancelled on shutdown/error.
|
||||||
_ctxs: set[tractor.Context] = set()
|
# TODO: make `tractor.Context` hashable via is `.cid: str`?
|
||||||
|
# _ctxs: set[Context] = set()
|
||||||
|
_ctxs: list[Context] = []
|
||||||
|
|
||||||
# global map of all uniquely created annotation-graphics
|
# global map of all uniquely created annotation-graphics
|
||||||
# so that they can be mutated (eventually) by a client.
|
# so that they can be mutated (eventually) by a client.
|
||||||
|
@ -53,13 +68,13 @@ _annots: dict[int, QGraphicsItem] = {}
|
||||||
|
|
||||||
@tractor.context
|
@tractor.context
|
||||||
async def remote_annotate(
|
async def remote_annotate(
|
||||||
ctx: tractor.Context,
|
ctx: Context,
|
||||||
) -> None:
|
) -> None:
|
||||||
|
|
||||||
global _dss, _ctxs
|
global _dss, _ctxs
|
||||||
assert _dss
|
assert _dss
|
||||||
|
|
||||||
_ctxs.add(ctx)
|
_ctxs.append(ctx)
|
||||||
|
|
||||||
# send back full fqme symbology to caller
|
# send back full fqme symbology to caller
|
||||||
await ctx.started(list(_dss))
|
await ctx.started(list(_dss))
|
||||||
|
@ -68,52 +83,191 @@ async def remote_annotate(
|
||||||
async for msg in annot_req_stream:
|
async for msg in annot_req_stream:
|
||||||
match msg:
|
match msg:
|
||||||
case {
|
case {
|
||||||
|
'annot': 'SelectRect',
|
||||||
'fqme': fqme,
|
'fqme': fqme,
|
||||||
'cmd': 'SelectRect',
|
|
||||||
'color': color,
|
|
||||||
'timeframe': timeframe,
|
'timeframe': timeframe,
|
||||||
# 'meth': str(meth),
|
'meth': str(meth),
|
||||||
'meth': 'set_view_pos',
|
'kwargs': dict(kwargs),
|
||||||
'kwargs': {
|
|
||||||
'start_pos': tuple(start_pos),
|
|
||||||
'end_pos': tuple(end_pos),
|
|
||||||
},
|
|
||||||
}:
|
}:
|
||||||
|
|
||||||
ds: DisplayState = _dss[fqme]
|
ds: DisplayState = _dss[fqme]
|
||||||
chart: ChartPlotWidget = {
|
chart: ChartPlotWidget = {
|
||||||
60: ds.hist_chart,
|
60: ds.hist_chart,
|
||||||
1: ds.chart,
|
1: ds.chart,
|
||||||
}[timeframe]
|
}[timeframe]
|
||||||
|
cv: ChartView = chart.cv
|
||||||
|
|
||||||
|
# sanity
|
||||||
|
if timeframe == 60:
|
||||||
|
assert (
|
||||||
|
chart.linked.godwidget.hist_linked.chart.view
|
||||||
|
is
|
||||||
|
cv
|
||||||
|
)
|
||||||
|
|
||||||
# annot type lookup from cmd
|
# annot type lookup from cmd
|
||||||
rect = SelectRect(
|
rect = SelectRect(
|
||||||
chart.cv,
|
viewbox=cv,
|
||||||
|
|
||||||
# TODO: pull from conf.toml?
|
# TODO: make this more dynamic?
|
||||||
color=color or 'dad_blue',
|
# -[ ] pull from conf.toml?
|
||||||
)
|
# -[ ] add `.set_color()` method to type?
|
||||||
rect.set_view_pos(
|
# -[ ] make a green/red based on direction
|
||||||
start_pos=start_pos,
|
# instead of default static color?
|
||||||
end_pos=end_pos,
|
color=kwargs.pop('color', None),
|
||||||
)
|
)
|
||||||
|
# XXX NOTE: this is REQUIRED to set the rect
|
||||||
|
# resize callback!
|
||||||
|
rect.chart: ChartPlotWidget = chart
|
||||||
|
|
||||||
|
# delegate generically to the requested method
|
||||||
|
getattr(rect, meth)(**kwargs)
|
||||||
|
rect.show()
|
||||||
await annot_req_stream.send(id(rect))
|
await annot_req_stream.send(id(rect))
|
||||||
|
|
||||||
case _:
|
case _:
|
||||||
log.error(
|
log.error(
|
||||||
'Unknown remote annotation cmd:\n'
|
'Unknown remote annotation cmd:\n'
|
||||||
f'{msg}'
|
f'{pformat(msg)}'
|
||||||
)
|
)
|
||||||
|
|
||||||
|
|
||||||
@acm
|
class AnnotCtl(Struct):
|
||||||
async def open_annots_client(
|
'''
|
||||||
uid: tuple[str, str],
|
A control for remote "data annotations".
|
||||||
|
|
||||||
) -> 'AnnotClient':
|
You know those "squares they always show in machine vision
|
||||||
|
UIs.." this API allows you to remotely control stuff like that
|
||||||
|
in some other graphics actor.
|
||||||
|
|
||||||
|
'''
|
||||||
|
ctx2fqmes: dict[str, str]
|
||||||
|
fqme2stream: dict[str, MsgStream]
|
||||||
|
|
||||||
|
async def add_rect(
|
||||||
|
self,
|
||||||
|
fqme: str,
|
||||||
|
timeframe: float,
|
||||||
|
start_pos: tuple[float, float],
|
||||||
|
end_pos: tuple[float, float],
|
||||||
|
|
||||||
|
# TODO: a `Literal['view', 'scene']` for this?
|
||||||
|
domain: str = 'view', # or 'scene'
|
||||||
|
color: str = 'dad_blue',
|
||||||
|
|
||||||
|
) -> int:
|
||||||
|
'''
|
||||||
|
Add a `SelectRect` annotation to the target view, return
|
||||||
|
the instances `id(obj)` from the remote UI actor.
|
||||||
|
|
||||||
|
'''
|
||||||
|
ipc: MsgStream = self.fqme2stream[fqme]
|
||||||
|
await ipc.send({
|
||||||
|
'fqme': fqme,
|
||||||
|
'annot': 'SelectRect',
|
||||||
|
'timeframe': timeframe,
|
||||||
|
# 'meth': str(meth),
|
||||||
|
'meth': 'set_view_pos' if domain == 'view' else 'set_scene_pos',
|
||||||
|
'kwargs': {
|
||||||
|
'start_pos': tuple(start_pos),
|
||||||
|
'end_pos': tuple(end_pos),
|
||||||
|
'color': color,
|
||||||
|
'update_label': False,
|
||||||
|
},
|
||||||
|
})
|
||||||
|
return (await ipc.receive())
|
||||||
|
|
||||||
|
async def modify(
|
||||||
|
self,
|
||||||
|
aid: int, # annotation id
|
||||||
|
meth: str, # far end graphics object method to invoke
|
||||||
|
params: dict[str, Any], # far end `meth(**kwargs)`
|
||||||
|
) -> bool:
|
||||||
|
'''
|
||||||
|
Modify an existing (remote) annotation's graphics
|
||||||
|
paramters, thus changing it's appearance / state in real
|
||||||
|
time.
|
||||||
|
|
||||||
|
'''
|
||||||
|
raise NotImplementedError
|
||||||
|
|
||||||
|
async def remove(
|
||||||
|
self,
|
||||||
|
uid: int,
|
||||||
|
) -> bool:
|
||||||
|
'''
|
||||||
|
Remove an existing annotation by instance id.
|
||||||
|
|
||||||
|
'''
|
||||||
|
raise NotImplementedError
|
||||||
|
|
||||||
|
|
||||||
|
|
||||||
|
@acm
|
||||||
|
async def open_annot_ctl(
|
||||||
|
uid: tuple[str, str] | None = None,
|
||||||
|
|
||||||
|
) -> AnnotCtl:
|
||||||
# TODO: load connetion to a specific chart actor
|
# TODO: load connetion to a specific chart actor
|
||||||
# -[ ] pull from either service scan or config
|
# -[ ] pull from either service scan or config
|
||||||
# -[ ] return some kinda client/proxy thinger?
|
# -[ ] return some kinda client/proxy thinger?
|
||||||
# -[ ] maybe we should finally just provide this as
|
# -[ ] maybe we should finally just provide this as
|
||||||
# a `tractor.hilevel.CallableProxy` or wtv?
|
# a `tractor.hilevel.CallableProxy` or wtv?
|
||||||
# -[ ] use this from the storage.cli stuff to mark up gaps!
|
# -[ ] use this from the storage.cli stuff to mark up gaps!
|
||||||
...
|
|
||||||
|
maybe_portals: list[Portal] | None
|
||||||
|
fqmes: list[str]
|
||||||
|
async with find_service(
|
||||||
|
service_name='chart',
|
||||||
|
first_only=False,
|
||||||
|
) as maybe_portals:
|
||||||
|
|
||||||
|
ctx_mngrs: list[AsyncContextManager] = []
|
||||||
|
|
||||||
|
# TODO: print the current discoverable actor UID set
|
||||||
|
# here as well?
|
||||||
|
if not maybe_portals:
|
||||||
|
raise RuntimeError('No chart UI actors found in service domain?')
|
||||||
|
|
||||||
|
for portal in maybe_portals:
|
||||||
|
ctx_mngrs.append(
|
||||||
|
portal.open_context(remote_annotate)
|
||||||
|
)
|
||||||
|
|
||||||
|
ctx2fqmes: dict[str, set[str]] = {}
|
||||||
|
fqme2stream: dict[str, MsgStream] = {}
|
||||||
|
client = AnnotCtl(
|
||||||
|
ctx2fqmes=ctx2fqmes,
|
||||||
|
fqme2stream=fqme2stream,
|
||||||
|
)
|
||||||
|
|
||||||
|
stream_ctxs: list[AsyncContextManager] = []
|
||||||
|
async with trionics.gather_contexts(ctx_mngrs) as ctxs:
|
||||||
|
for (ctx, fqmes) in ctxs:
|
||||||
|
stream_ctxs.append(ctx.open_stream())
|
||||||
|
|
||||||
|
# fill lookup table of mkt addrs to IPC ctxs
|
||||||
|
for fqme in fqmes:
|
||||||
|
if other := fqme2stream.get(fqme):
|
||||||
|
raise ValueError(
|
||||||
|
f'More then one chart displays {fqme}!?\n'
|
||||||
|
'Other UI actor info:\n'
|
||||||
|
f'channel: {other._ctx.chan}]\n'
|
||||||
|
f'actor uid: {other._ctx.chan.uid}]\n'
|
||||||
|
f'ctx id: {other._ctx.cid}]\n'
|
||||||
|
)
|
||||||
|
|
||||||
|
ctx2fqmes.setdefault(
|
||||||
|
ctx.cid,
|
||||||
|
set(),
|
||||||
|
).add(fqme)
|
||||||
|
|
||||||
|
async with trionics.gather_contexts(stream_ctxs) as streams:
|
||||||
|
for stream in streams:
|
||||||
|
fqmes: set[str] = ctx2fqmes[stream._ctx.cid]
|
||||||
|
for fqme in fqmes:
|
||||||
|
fqme2stream[fqme] = stream
|
||||||
|
|
||||||
|
yield client
|
||||||
|
# TODO: on graceful teardown should we try to
|
||||||
|
# remove all annots that were created/modded?
|
||||||
|
|
|
@ -228,5 +228,10 @@ def chart(
|
||||||
'loglevel': tractorloglevel,
|
'loglevel': tractorloglevel,
|
||||||
'name': 'chart',
|
'name': 'chart',
|
||||||
'registry_addrs': list(set(regaddrs)),
|
'registry_addrs': list(set(regaddrs)),
|
||||||
|
'enable_modules': [
|
||||||
|
|
||||||
|
# remote data-view annotations Bo
|
||||||
|
'piker.ui._remote_ctl',
|
||||||
|
],
|
||||||
},
|
},
|
||||||
)
|
)
|
||||||
|
|
Loading…
Reference in New Issue