Compare commits

...

10 Commits

Author SHA1 Message Date
Tyler Goodlet ad565936ec Factor UI-rc loop into ctx-free func
In theory the `async for msg` loop can be re-purposed without having to
always call `remote_annotate()` so factor it into a new
`serve_rc_annots()` and then just call it from the former (for now) with
the wrapping `try:` block outside to delete per-client-ctx annotation
instance sets. Also, use some type aliases instead of repeatedly
defining the same complex `dict`-table defs B)
2023-12-26 20:56:04 -05:00
Tyler Goodlet d4b07cc95a `ui._lines`: more direct Qt imports for typing 2023-12-26 20:49:07 -05:00
Tyler Goodlet 1231c459aa Track data feed subscribers using a new `Sub(Struct)`
In prep for supporting reverse-ipc connect-back to UI actors from
middle-ware systems (for the purposes of triggering data-view canvas
re-renders and built-in tsp annotations), add a new struct type to
better generalize the management of remote feed subscriptions. Include
a `Sub.rc_ui: bool` for now (with nearby todo-comment) and expose an
`allow_remote_ctl_ui: bool` through the feed endpoints to help drive
/ prep for all that ^

Rework all the sampler tasks to expect the `Sub`'s new iface:

- split up the `Sub.ipc: MsgStream`  and `.send_chan` as separate fields
  since we're handling the throttle case in separate
  `sample_and_broadcast()` logic blocks anyway and avoids needing to
  monkey-patch on the `._ctx` malarky..
- explicitly provide the optional handle to the `_throttle_cs:
  CancelScope` again for the case where throttling/event-downsampling is
  requested.
- add `_FeedsBus.subs_items()` as a public iterator.
2023-12-26 20:48:06 -05:00
Tyler Goodlet 88f415e5b8 Cannot delete when the rect has no scene.. 2023-12-26 17:36:34 -05:00
Tyler Goodlet d9c574e291 Add `.sort()` support to `dedupe()` 2023-12-26 17:35:38 -05:00
Tyler Goodlet a86573b5a2 Fix .parquet filenaming..
Apparently `.storage.nativedb.mk_ohlcv_shm_keyed_filepath()` was always
kinda broken if you passed in a `period: float` with an actual non-`int`
to the format string? Fixed it to strictly cast to `int()` before
str-ifying so that you don't get weird `60.0s.parquet` in there..

Further this rejigs the `sotre ldshm` gap correction-annotation loop to,
- use `StorageClient.write_ohlcv()` instead of hackily re-implementing
  it.. now that problem from above is fixed!
- use a `needs_correction: bool` var to determine if gap markup and
  de-duplictated data should be pushed to the shm buffer,
- go back to using `AnnotCtl.add_rect()` for all detected gaps such that
  they all persist (and thus are shown together) until the client
  disconnects.
2023-12-26 17:14:26 -05:00
Tyler Goodlet 1d7e97a295 Woops, need to use `.push_async_callback()`
For non-full-`.__aexit__()` handlers need this method instead (facepalm).
Also create and assign the `AnnotCtl._annot_stack: AsyncExitStack` just
before yielding the client since it's not needed prior and ensures annot
removal happens **before** ipc teardown.
2023-12-24 15:08:44 -05:00
Tyler Goodlet bbb98597a0 Add annot removal via client methods or ctx-mngr
Since leaking annots to a remote `chart` actor probably isn't a thing we
want to do (often), add a removal/deletion handler block to the
`remote_annotate()` ctx which can be triggered using a `{rm_annot: aid}`
msg.

Augmnent the `AnnotCtl` with,
- `.remove() which sends said msg (from above) and returns a `bool`
  indicating success.
- add an `.open_rect()` acm which does the `.add_rect()` / `.remove()`
  calls underneath for use in scope oriented client usage.
- add a `._annot_stack: AsyncExitStack` which will always have any/all
  non-`.open_rect()` calls to `.add_rect()` register removal on client
  teardown, to avoid leaking annots when a client finally disconnects.
- comment out the `.modify()` meth idea for now.
- rename all `Xstream` var-tags to `Xipc` names.
2023-12-24 14:42:12 -05:00
Tyler Goodlet e33d6333ec Woops, remove the label-proxy, not the widget.. 2023-12-24 13:59:16 -05:00
Tyler Goodlet 263a5a8d07 Add `SelectRect.delete()` for permanent scene dealloc 2023-12-23 13:37:47 -05:00
10 changed files with 553 additions and 294 deletions

View File

@ -33,6 +33,11 @@ from typing import (
)
import tractor
from tractor import (
Context,
MsgStream,
Channel,
)
from tractor.trionics import (
maybe_open_nursery,
)
@ -53,7 +58,10 @@ if TYPE_CHECKING:
from ._sharedmem import (
ShmArray,
)
from .feed import _FeedsBus
from .feed import (
_FeedsBus,
Sub,
)
# highest frequency sample step is 1 second by default, though in
@ -94,7 +102,7 @@ class Sampler:
float,
list[
float,
set[tractor.MsgStream]
set[MsgStream]
],
] = defaultdict(
lambda: [
@ -258,8 +266,8 @@ class Sampler:
f'broadcasting {period_s} -> {last_ts}\n'
# f'consumers: {subs}'
)
borked: set[tractor.MsgStream] = set()
sent: set[tractor.MsgStream] = set()
borked: set[MsgStream] = set()
sent: set[MsgStream] = set()
while True:
try:
for stream in (subs - sent):
@ -314,7 +322,7 @@ class Sampler:
@tractor.context
async def register_with_sampler(
ctx: tractor.Context,
ctx: Context,
period_s: float,
shms_by_period: dict[float, dict] | None = None,
@ -649,12 +657,7 @@ async def sample_and_broadcast(
# eventually block this producer end of the feed and
# thus other consumers still attached.
sub_key: str = broker_symbol.lower()
subs: list[
tuple[
tractor.MsgStream | trio.MemorySendChannel,
float | None, # tick throttle in Hz
]
] = bus.get_subs(sub_key)
subs: set[Sub] = bus.get_subs(sub_key)
# NOTE: by default the broker backend doesn't append
# it's own "name" into the fqme schema (but maybe it
@ -663,34 +666,40 @@ async def sample_and_broadcast(
fqme: str = f'{broker_symbol}.{brokername}'
lags: int = 0
# TODO: speed up this loop in an AOT compiled lang (like
# rust or nim or zig) and/or instead of doing a fan out to
# TCP sockets here, we add a shm-style tick queue which
# readers can pull from instead of placing the burden of
# broadcast on solely on this `brokerd` actor. see issues:
# XXX TODO XXX: speed up this loop in an AOT compiled
# lang (like rust or nim or zig)!
# AND/OR instead of doing a fan out to TCP sockets
# here, we add a shm-style tick queue which readers can
# pull from instead of placing the burden of broadcast
# on solely on this `brokerd` actor. see issues:
# - https://github.com/pikers/piker/issues/98
# - https://github.com/pikers/piker/issues/107
for (stream, tick_throttle) in subs.copy():
# for (stream, tick_throttle) in subs.copy():
for sub in subs.copy():
ipc: MsgStream = sub.ipc
throttle: float = sub.throttle_rate
try:
with trio.move_on_after(0.2) as cs:
if tick_throttle:
if throttle:
send_chan: trio.abc.SendChannel = sub.send_chan
# this is a send mem chan that likely
# pushes to the ``uniform_rate_send()`` below.
try:
stream.send_nowait(
send_chan.send_nowait(
(fqme, quote)
)
except trio.WouldBlock:
overruns[sub_key] += 1
ctx = stream._ctx
chan = ctx.chan
ctx: Context = ipc._ctx
chan: Channel = ctx.chan
log.warning(
f'Feed OVERRUN {sub_key}'
'@{bus.brokername} -> \n'
f'feed @ {chan.uid}\n'
f'throttle = {tick_throttle} Hz'
f'throttle = {throttle} Hz'
)
if overruns[sub_key] > 6:
@ -707,10 +716,10 @@ async def sample_and_broadcast(
f'{sub_key}:'
f'{ctx.cid}@{chan.uid}'
)
await stream.aclose()
await ipc.aclose()
raise trio.BrokenResourceError
else:
await stream.send(
await ipc.send(
{fqme: quote}
)
@ -724,16 +733,16 @@ async def sample_and_broadcast(
trio.ClosedResourceError,
trio.EndOfChannel,
):
ctx = stream._ctx
chan = ctx.chan
ctx: Context = ipc._ctx
chan: Channel = ctx.chan
if ctx:
log.warning(
'Dropped `brokerd`-quotes-feed connection:\n'
f'{broker_symbol}:'
f'{ctx.cid}@{chan.uid}'
)
if tick_throttle:
assert stream._closed
if sub.throttle_rate:
assert ipc._closed
# XXX: do we need to deregister here
# if it's done in the fee bus code?
@ -742,7 +751,7 @@ async def sample_and_broadcast(
# since there seems to be some kinda race..
bus.remove_subs(
sub_key,
{(stream, tick_throttle)},
{sub},
)
@ -750,7 +759,7 @@ async def uniform_rate_send(
rate: float,
quote_stream: trio.abc.ReceiveChannel,
stream: tractor.MsgStream,
stream: MsgStream,
task_status: TaskStatus = trio.TASK_STATUS_IGNORED,

View File

@ -28,6 +28,7 @@ module.
from __future__ import annotations
from collections import (
defaultdict,
abc,
)
from contextlib import asynccontextmanager as acm
from functools import partial
@ -36,7 +37,6 @@ from types import ModuleType
from typing import (
Any,
AsyncContextManager,
Optional,
Awaitable,
Sequence,
)
@ -76,6 +76,31 @@ from ._sampling import (
)
class Sub(Struct, frozen=True):
'''
A live feed subscription entry.
Contains meta-data on the remote-actor type (in functionality
terms) as well as refs to IPC streams and sampler runtime
params.
'''
ipc: tractor.MsgStream
send_chan: trio.abc.SendChannel | None = None
# tick throttle rate in Hz; determines how live
# quotes/ticks should be downsampled before relay
# to the receiving remote consumer (process).
throttle_rate: float | None = None
_throttle_cs: trio.CancelScope | None = None
# TODO: actually stash comms info for the far end to allow
# `.tsp`, `.fsp` and `.data._sampling` sub-systems to re-render
# the data view as needed via msging with the `._remote_ctl`
# ipc ctx.
rc_ui: bool = False
class _FeedsBus(Struct):
'''
Data feeds broadcaster and persistence management.
@ -100,13 +125,7 @@ class _FeedsBus(Struct):
_subscribers: defaultdict[
str,
set[
tuple[
tractor.MsgStream | trio.MemorySendChannel,
# tractor.Context,
float | None, # tick throttle in Hz
]
]
set[Sub]
] = defaultdict(set)
async def start_task(
@ -140,31 +159,28 @@ class _FeedsBus(Struct):
def get_subs(
self,
key: str,
) -> set[
tuple[
tractor.MsgStream | trio.MemorySendChannel,
float | None, # tick throttle in Hz
]
]:
) -> set[Sub]:
'''
Get the ``set`` of consumer subscription entries for the given key.
'''
return self._subscribers[key]
def subs_items(self) -> abc.ItemsView[str, set[Sub]]:
return self._subscribers.items()
def add_subs(
self,
key: str,
subs: set[tuple[
tractor.MsgStream | trio.MemorySendChannel,
float | None, # tick throttle in Hz
]],
) -> set[tuple]:
subs: set[Sub],
) -> set[Sub]:
'''
Add a ``set`` of consumer subscription entries for the given key.
'''
_subs: set[tuple] = self._subscribers[key]
_subs: set[Sub] = self._subscribers.setdefault(key, set())
_subs.update(subs)
return _subs
@ -441,8 +457,9 @@ async def open_feed_bus(
symbols: list[str], # normally expected to the broker-specific fqme
loglevel: str = 'error',
tick_throttle: Optional[float] = None,
tick_throttle: float | None = None,
start_stream: bool = True,
allow_remote_ctl_ui: bool = False,
) -> dict[
str, # fqme
@ -519,10 +536,10 @@ async def open_feed_bus(
# pack for ``.started()`` sync msg
flumes[fqme] = flume
# we use the broker-specific fqme (bs_fqme) for the
# sampler subscription since the backend isn't (yet) expected to
# append it's own name to the fqme, so we filter on keys which
# *do not* include that name (e.g .ib) .
# we use the broker-specific fqme (bs_fqme) for the sampler
# subscription since the backend isn't (yet) expected to
# append it's own name to the fqme, so we filter on keys
# which *do not* include that name (e.g .ib) .
bus._subscribers.setdefault(bs_fqme, set())
# sync feed subscribers with flume handles
@ -561,49 +578,60 @@ async def open_feed_bus(
# that the ``sample_and_broadcast()`` task (spawned inside
# ``allocate_persistent_feed()``) will push real-time quote
# (ticks) to this new consumer.
cs: trio.CancelScope | None = None
send: trio.MemorySendChannel | None = None
if tick_throttle:
flume.throttle_rate = tick_throttle
# open a bg task which receives quotes over a mem chan
# and only pushes them to the target actor-consumer at
# a max ``tick_throttle`` instantaneous rate.
# open a bg task which receives quotes over a mem
# chan and only pushes them to the target
# actor-consumer at a max ``tick_throttle``
# (instantaneous) rate.
send, recv = trio.open_memory_channel(2**10)
cs = await bus.start_task(
# NOTE: the ``.send`` channel here is a swapped-in
# trio mem chan which gets `.send()`-ed by the normal
# sampler task but instead of being sent directly
# over the IPC msg stream it's the throttle task
# does the work of incrementally forwarding to the
# IPC stream at the throttle rate.
cs: trio.CancelScope = await bus.start_task(
uniform_rate_send,
tick_throttle,
recv,
stream,
)
# NOTE: so the ``send`` channel here is actually a swapped
# in trio mem chan which gets pushed by the normal sampler
# task but instead of being sent directly over the IPC msg
# stream it's the throttle task does the work of
# incrementally forwarding to the IPC stream at the throttle
# rate.
send._ctx = ctx # mock internal ``tractor.MsgStream`` ref
sub = (send, tick_throttle)
else:
sub = (stream, tick_throttle)
sub = Sub(
ipc=stream,
send_chan=send,
throttle_rate=tick_throttle,
_throttle_cs=cs,
rc_ui=allow_remote_ctl_ui,
)
# TODO: add an api for this on the bus?
# maybe use the current task-id to key the sub list that's
# added / removed? Or maybe we can add a general
# pause-resume by sub-key api?
bs_fqme = fqme.removesuffix(f'.{brokername}')
local_subs.setdefault(bs_fqme, set()).add(sub)
bus.add_subs(bs_fqme, {sub})
local_subs.setdefault(
bs_fqme,
set()
).add(sub)
bus.add_subs(
bs_fqme,
{sub}
)
# sync caller with all subs registered state
sub_registered.set()
uid = ctx.chan.uid
uid: tuple[str, str] = ctx.chan.uid
try:
# ctrl protocol for start/stop of quote streams based on UI
# state (eg. don't need a stream when a symbol isn't being
# displayed).
# ctrl protocol for start/stop of live quote streams
# based on UI state (eg. don't need a stream when
# a symbol isn't being displayed).
async for msg in stream:
if msg == 'pause':
@ -760,7 +788,7 @@ async def install_brokerd_search(
async def maybe_open_feed(
fqmes: list[str],
loglevel: Optional[str] = None,
loglevel: str | None = None,
**kwargs,
@ -820,6 +848,8 @@ async def open_feed(
start_stream: bool = True,
tick_throttle: float | None = None, # Hz
allow_remote_ctl_ui: bool = False,
) -> Feed:
'''
Open a "data feed" which provides streamed real-time quotes.
@ -902,6 +932,12 @@ async def open_feed(
# of these stream open sequences sequentially per
# backend? .. need some thot!
allow_overruns=True,
# NOTE: UI actors (like charts) can allow
# remote control of certain graphics rendering
# capabilities via the
# `.ui._remote_ctl.remote_annotate()` msg loop.
allow_remote_ctl_ui=allow_remote_ctl_ui,
)
)

View File

@ -20,8 +20,12 @@ Storage middle-ware CLIs.
"""
from __future__ import annotations
# from datetime import datetime
# from contextlib import (
# AsyncExitStack,
# )
from pathlib import Path
import time
from types import ModuleType
import polars as pl
import numpy as np
@ -34,7 +38,6 @@ import typer
from piker.service import open_piker_runtime
from piker.cli import cli
from piker.config import get_conf_dir
from piker.data import (
ShmArray,
)
@ -45,6 +48,7 @@ from . import (
from . import (
__tsdbs__,
open_storage_client,
StorageClient,
)
@ -232,7 +236,8 @@ def anal(
@store.command()
def ldshm(
fqme: str,
write_parquet: bool = False,
write_parquet: bool = True,
reload_parquet_to_shm: bool = True,
) -> None:
'''
@ -242,15 +247,32 @@ def ldshm(
'''
async def main():
from piker.ui._remote_ctl import (
open_annot_ctl,
AnnotCtl,
)
actl: AnnotCtl
mod: ModuleType
client: StorageClient
async with (
open_piker_runtime(
'polars_boi',
enable_modules=['piker.data._sharedmem'],
debug_mode=True,
),
open_storage_client() as (
mod,
client,
),
open_annot_ctl() as actl,
):
df: pl.DataFrame | None = None
for shmfile, shm, shm_df in tsp.iter_dfs_from_shms(fqme):
shm_df: pl.DataFrame | None = None
for (
shmfile,
shm,
# parquet_path,
shm_df,
) in tsp.iter_dfs_from_shms(fqme):
# compute ohlc properties for naming
times: np.ndarray = shm.array['time']
@ -275,122 +297,136 @@ def ldshm(
period=period_s,
)
# TODO: maybe only optionally enter this depending
# on some CLI flags and/or gap detection?
if (
needs_correction: bool = (
not gaps.is_empty()
or null_segs
):
from piker.ui._remote_ctl import (
open_annot_ctl,
AnnotCtl,
)
annot_ctl: AnnotCtl
async with open_annot_ctl() as annot_ctl:
for i in range(gaps.height):
)
# TODO: maybe only optionally enter this depending
# on some CLI flags and/or gap detection?
if needs_correction:
for i in range(gaps.height):
row: pl.DataFrame = gaps[i]
row: pl.DataFrame = gaps[i]
# TODO: can we eventually remove this
# once we figure out why the epoch cols
# don't match?
iend: int = row['index'][0]
# dt: datetime = row['dt'][0]
# dt_prev: datetime = row['dt_prev'][0]
# TODO: can we eventually remove this
# once we figure out why the epoch cols
# don't match?
iend: int = row['index'][0]
# dt: datetime = row['dt'][0]
# dt_prev: datetime = row['dt_prev'][0]
# the gap's right-most bar's OPEN value
# at that time (sample) step.
# dt_end_t: float = dt.timestamp()
# the gap's right-most bar's OPEN value
# at that time (sample) step.
# dt_end_t: float = dt.timestamp()
# TODO: FIX HOW/WHY these aren't matching
# and are instead off by 4hours (EST
# vs. UTC?!?!)
# end_t: float = row['time']
# assert (
# dt.timestamp()
# ==
# end_t
# )
# TODO: FIX HOW/WHY these aren't matching
# and are instead off by 4hours (EST
# vs. UTC?!?!)
# end_t: float = row['time']
# assert (
# dt.timestamp()
# ==
# end_t
# )
# the gap's left-most bar's CLOSE value
# at that time (sample) step.
prev_r: pl.DataFrame = df.filter(
pl.col('index') == iend - 1
)
istart: int = prev_r['index'][0]
# dt_start_t: float = dt_prev.timestamp()
# the gap's left-most bar's CLOSE value
# at that time (sample) step.
# start_t: float = prev_r['time']
# assert (
# dt_start_t
# ==
# start_t
# )
prev_r: pl.DataFrame = df.filter(
pl.col('index') == gaps[0]['index'] - 1
# TODO: implement px-col width measure
# and ensure at least as many px-cols
# shown per rect as configured by user.
gap_w: float = abs((iend - istart))
if gap_w < 6:
margin: float = 6
iend += margin
istart -= margin
ro: tuple[float, float] = (
# dt_end_t,
iend,
row['open'][0],
)
lc: tuple[float, float] = (
# dt_start_t,
istart,
prev_r['close'][0],
)
# async with actl.open_rect(
# ) as aid:
aid: int = await actl.add_rect(
fqme=fqme,
timeframe=period_s,
start_pos=lc,
end_pos=ro,
)
assert aid
# write to parquet file?
if (
write_parquet
):
# write to fs
start = time.time()
path: Path = await client.write_ohlcv(
fqme,
ohlcv=deduped,
timeframe=period_s,
)
write_delay: float = round(
time.time() - start,
ndigits=6,
)
# read back from fs
start = time.time()
read_df: pl.DataFrame = pl.read_parquet(path)
read_delay: float = round(
time.time() - start,
ndigits=6,
)
log.info(
f'parquet write took {write_delay} secs\n'
f'file path: {path}'
f'parquet read took {read_delay} secs\n'
f'polars df: {read_df}'
)
if reload_parquet_to_shm:
new = tsp.pl2np(
deduped,
dtype=shm.array.dtype,
)
istart: int = prev_r['index'][0]
# dt_start_t: float = dt_prev.timestamp()
# start_t: float = prev_r['time']
# assert (
# dt_start_t
# ==
# start_t
# )
# TODO: implement px-col width measure
# and ensure at least as many px-cols
# shown per rect as configured by user.
gap_w: float = abs((iend - istart))
# await tractor.pause()
if gap_w < 6:
margin: float = 6
iend += margin
istart -= margin
ro: tuple[float, float] = (
# dt_end_t,
iend,
row['open'][0],
# since normally readonly
shm._array.setflags(
write=int(1),
)
lc: tuple[float, float] = (
# dt_start_t,
istart,
prev_r['close'][0],
shm.push(
new,
prepend=True,
start=new['index'][-1],
update_first=False, # don't update ._first
)
aid: int = await annot_ctl.add_rect(
fqme=fqme,
timeframe=period_s,
start_pos=lc,
end_pos=ro,
)
assert aid
await tractor.pause()
await tractor.pause()
assert diff
# write to parquet file?
if write_parquet:
timeframe: str = f'{period_s}s'
else:
# allow interaction even when no ts problems.
await tractor.pause()
assert not diff
datadir: Path = get_conf_dir() / 'nativedb'
if not datadir.is_dir():
datadir.mkdir()
path: Path = datadir / f'{fqme}.{timeframe}.parquet'
# write to fs
start = time.time()
df.write_parquet(path)
delay: float = round(
time.time() - start,
ndigits=6,
)
log.info(
f'parquet write took {delay} secs\n'
f'file path: {path}'
)
# read back from fs
start = time.time()
read_df: pl.DataFrame = pl.read_parquet(path)
delay: float = round(
time.time() - start,
ndigits=6,
)
print(
f'parquet read took {delay} secs\n'
f'polars df: {read_df}'
)
if df is None:
log.error(f'No matching shm buffers for {fqme} ?')

View File

@ -95,16 +95,19 @@ def detect_period(shm: ShmArray) -> float:
def mk_ohlcv_shm_keyed_filepath(
fqme: str,
period: float, # ow known as the "timeframe"
period: float | int, # ow known as the "timeframe"
datadir: Path,
) -> str:
) -> Path:
if period < 1.:
raise ValueError('Sample period should be >= 1.!?')
period_s: str = f'{period}s'
path: Path = datadir / f'{fqme}.ohlcv{period_s}.parquet'
path: Path = (
datadir
/
f'{fqme}.ohlcv{int(period)}s.parquet'
)
return path
@ -227,6 +230,7 @@ class NativeStorageClient:
self,
fqme: str,
period: float,
) -> Path:
return mk_ohlcv_shm_keyed_filepath(
fqme=fqme,
@ -239,6 +243,7 @@ class NativeStorageClient:
fqme: str,
df: pl.DataFrame,
timeframe: float,
) -> None:
# cache df for later usage since we (currently) need to
# convert to np.ndarrays to push to our `ShmArray` rt

View File

@ -120,7 +120,7 @@ from ..storage import TimeseriesNotFound
if TYPE_CHECKING:
from bidict import bidict
from ..service.marketstore import StorageClient
from .feed import _FeedsBus
# from .feed import _FeedsBus
# `ShmArray` buffer sizing configuration:
@ -1352,9 +1352,7 @@ def iter_dfs_from_shms(
readonly=True,
)
assert not opened
ohlcv = shm.array
from ._anal import np2pl
ohlcv: np.ndarray = shm.array
df: pl.DataFrame = np2pl(ohlcv)
yield (

View File

@ -620,7 +620,11 @@ def detect_price_gaps(
...
def dedupe(src_df: pl.DataFrame) -> tuple[
def dedupe(
src_df: pl.DataFrame,
sort: bool = True,
) -> tuple[
pl.DataFrame, # with dts
pl.DataFrame, # gaps
pl.DataFrame, # with deduplicated dts (aka gap/repeat removal)
@ -634,6 +638,8 @@ def dedupe(src_df: pl.DataFrame) -> tuple[
'''
df: pl.DataFrame = with_dts(src_df)
# TODO: enable passing existing `with_dts` df for speedup?
gaps: pl.DataFrame = detect_time_gaps(df)
# if no gaps detected just return carbon copies
@ -651,8 +657,10 @@ def dedupe(src_df: pl.DataFrame) -> tuple[
subset=['dt'],
maintain_order=True,
)
if sort:
deduped = deduped.sort(by='time')
deduped_gaps = detect_time_gaps(deduped)
deduped_gaps: pl.DataFrame = detect_time_gaps(deduped)
diff: int = (
df.height
@ -660,7 +668,8 @@ def dedupe(src_df: pl.DataFrame) -> tuple[
deduped.height
)
log.warning(
f'Gaps found:\n{gaps}\n'
f'TIME GAPs FOUND:\n'
# f'{gaps}\n'
f'deduped Gaps found:\n{deduped_gaps}'
)
return (

View File

@ -471,6 +471,9 @@ async def graphics_update_loop(
await tractor.pause()
try:
# XXX TODO: we need to do _dss UPDATE here so that when
# a feed-view is switched you can still remote annotate the
# prior view..
from . import _remote_ctl
_remote_ctl._dss = dss
@ -526,7 +529,7 @@ async def graphics_update_loop(
finally:
# XXX: cancel any remote annotation control ctxs
_remote_ctl._dss = None
for ctx in _remote_ctl._ctxs:
for cid, (ctx, aids) in _remote_ctl._ctxs.items():
await ctx.cancel()

View File

@ -314,7 +314,6 @@ class SelectRect(QtWidgets.QGraphicsRectItem):
color.setAlpha(66)
self.setBrush(fn.mkBrush(color))
self.setZValue(1e9)
self.hide()
label = self._label = QLabel()
label.setTextFormat(0) # markdown
@ -343,6 +342,7 @@ class SelectRect(QtWidgets.QGraphicsRectItem):
]
self.add_to_view(viewbox)
self.hide()
def add_to_view(
self,
@ -579,10 +579,34 @@ class SelectRect(QtWidgets.QGraphicsRectItem):
)
label.show()
def clear(self):
def hide(self):
'''
Clear the selection box from view.
Clear the selection box from its graphics scene but
don't delete it permanently.
'''
super().hide()
self._label.hide()
self.hide()
# TODO: ensure noone else using dis.
clear = hide
def delete(self) -> None:
'''
De-allocate this rect from its rendering graphics scene.
Like a permanent hide.
'''
scen: QGraphicsScene = self.scene()
if scen is None:
return
scen.removeItem(self)
if (
self._label
and
self._label_proxy
):
scen.removeItem(self._label_proxy)

View File

@ -28,7 +28,17 @@ from typing import (
import pyqtgraph as pg
from pyqtgraph import Point, functions as fn
from PyQt5 import QtCore, QtGui, QtWidgets
from PyQt5 import (
QtCore,
QtGui,
)
from PyQt5.QtWidgets import (
QGraphicsPathItem,
QStyleOptionGraphicsItem,
QGraphicsItem,
QGraphicsScene,
QWidget,
)
from PyQt5.QtCore import QPointF
from ._annotate import LevelMarker
@ -130,7 +140,7 @@ class LevelLine(pg.InfiniteLine):
self._right_end_sc: float = 0
# use px caching
self.setCacheMode(QtWidgets.QGraphicsItem.DeviceCoordinateCache)
self.setCacheMode(QGraphicsItem.DeviceCoordinateCache)
def txt_offsets(self) -> tuple[int, int]:
return 0, 0
@ -308,7 +318,7 @@ class LevelLine(pg.InfiniteLine):
Remove this line from containing chart/view/scene.
'''
scene = self.scene()
scene: QGraphicsScene = self.scene()
if scene:
for label in self._labels:
label.delete()
@ -339,8 +349,8 @@ class LevelLine(pg.InfiniteLine):
self,
p: QtGui.QPainter,
opt: QtWidgets.QStyleOptionGraphicsItem,
w: QtWidgets.QWidget
opt: QStyleOptionGraphicsItem,
w: QWidget
) -> None:
'''
@ -417,9 +427,9 @@ class LevelLine(pg.InfiniteLine):
def add_marker(
self,
path: QtWidgets.QGraphicsPathItem,
path: QGraphicsPathItem,
) -> QtWidgets.QGraphicsPathItem:
) -> QGraphicsPathItem:
self._marker = path
self._marker.setPen(self.currentPen)

View File

@ -20,10 +20,14 @@ to a chart from some other actor.
'''
from __future__ import annotations
from contextlib import asynccontextmanager as acm
from contextlib import (
asynccontextmanager as acm,
AsyncExitStack,
)
from functools import partial
from pprint import pformat
from typing import (
Any,
# Any,
AsyncContextManager,
)
@ -41,6 +45,7 @@ from PyQt5.QtWidgets import (
from piker.log import get_logger
from piker.types import Struct
from piker.service import find_service
from piker.brokers import SymbolNotFound
from ._display import DisplayState
from ._interaction import ChartView
from ._editors import SelectRect
@ -59,11 +64,117 @@ _dss: dict[str, DisplayState] | None = None
# be cancelled on shutdown/error.
# TODO: make `tractor.Context` hashable via is `.cid: str`?
# _ctxs: set[Context] = set()
_ctxs: list[Context] = []
# TODO: use type statements from 3.12+
IpcCtxTable = dict[
str, # each `Context.cid`
tuple[
Context, # handle for ctx-cancellation
set[int] # set of annotation (instance) ids
]
]
# global map of all uniquely created annotation-graphics
# so that they can be mutated (eventually) by a client.
_annots: dict[int, QGraphicsItem] = {}
_ctxs: IpcCtxTable = {}
# XXX: global map of all uniquely created annotation-graphics so
# that they can be mutated (eventually) by a client.
# NOTE: this map is only populated on the `chart` actor side (aka
# the "annotations server" which actually renders to a Qt canvas).
# type AnnotsTable = dict[int, QGraphicsItem]
AnnotsTable = dict[int, QGraphicsItem]
_annots: AnnotsTable = {}
async def serve_rc_annots(
ipc_key: str,
annot_req_stream: MsgStream,
dss: dict[str, DisplayState],
ctxs: IpcCtxTable,
annots: AnnotsTable,
) -> None:
async for msg in annot_req_stream:
match msg:
case {
'annot': 'SelectRect',
'fqme': fqme,
'timeframe': timeframe,
'meth': str(meth),
'kwargs': dict(kwargs),
}:
ds: DisplayState = _dss[fqme]
chart: ChartPlotWidget = {
60: ds.hist_chart,
1: ds.chart,
}[timeframe]
cv: ChartView = chart.cv
# sanity
if timeframe == 60:
assert (
chart.linked.godwidget.hist_linked.chart.view
is
cv
)
# annot type lookup from cmd
rect = SelectRect(
viewbox=cv,
# TODO: make this more dynamic?
# -[ ] pull from conf.toml?
# -[ ] add `.set_color()` method to type?
# -[ ] make a green/red based on direction
# instead of default static color?
color=kwargs.pop('color', None),
)
# XXX NOTE: this is REQUIRED to set the rect
# resize callback!
rect.chart: ChartPlotWidget = chart
# delegate generically to the requested method
getattr(rect, meth)(**kwargs)
rect.show()
aid: int = id(rect)
annots[aid] = rect
aids: set[int] = ctxs[ipc_key][1]
aids.add(aid)
await annot_req_stream.send(aid)
case {
'rm_annot': int(aid),
}:
# NOTE: this is normally entered on
# a client's annotation de-alloc normally
# prior to detach or modify.
annot: QGraphicsItem = annots[aid]
annot.delete()
# respond to client indicating annot
# was indeed deleted.
await annot_req_stream.send(aid)
case {
'fqme': fqme,
'render': int(aid),
'viz_name': str(viz_name),
'timeframe': timeframe,
}:
# | {
# 'backfilling': (str(viz_name), timeframe),
# }:
ds: DisplayState = _dss[viz_name]
chart: ChartPlotWidget = {
60: ds.hist_chart,
1: ds.chart,
}[timeframe]
case _:
log.error(
'Unknown remote annotation cmd:\n'
f'{pformat(msg)}'
)
@tractor.context
@ -74,62 +185,29 @@ async def remote_annotate(
global _dss, _ctxs
assert _dss
_ctxs.append(ctx)
_ctxs[ctx.cid] = (ctx, set())
# send back full fqme symbology to caller
await ctx.started(list(_dss))
# open annot request handler stream
async with ctx.open_stream() as annot_req_stream:
async for msg in annot_req_stream:
match msg:
case {
'annot': 'SelectRect',
'fqme': fqme,
'timeframe': timeframe,
'meth': str(meth),
'kwargs': dict(kwargs),
}:
ds: DisplayState = _dss[fqme]
chart: ChartPlotWidget = {
60: ds.hist_chart,
1: ds.chart,
}[timeframe]
cv: ChartView = chart.cv
# sanity
if timeframe == 60:
assert (
chart.linked.godwidget.hist_linked.chart.view
is
cv
)
# annot type lookup from cmd
rect = SelectRect(
viewbox=cv,
# TODO: make this more dynamic?
# -[ ] pull from conf.toml?
# -[ ] add `.set_color()` method to type?
# -[ ] make a green/red based on direction
# instead of default static color?
color=kwargs.pop('color', None),
)
# XXX NOTE: this is REQUIRED to set the rect
# resize callback!
rect.chart: ChartPlotWidget = chart
# delegate generically to the requested method
getattr(rect, meth)(**kwargs)
rect.show()
await annot_req_stream.send(id(rect))
case _:
log.error(
'Unknown remote annotation cmd:\n'
f'{pformat(msg)}'
)
try:
await serve_rc_annots(
ipc_key=ctx.cid,
annot_req_stream=annot_req_stream,
dss=_dss,
ctxs=_ctxs,
annots=_annots,
)
finally:
# ensure all annots for this connection are deleted
# on any final teardown
(_ctx, aids) = _ctxs[ctx.cid]
assert _ctx is ctx
for aid in aids:
annot: QGraphicsItem = _annots[aid]
annot.delete()
class AnnotCtl(Struct):
@ -142,7 +220,12 @@ class AnnotCtl(Struct):
'''
ctx2fqmes: dict[str, str]
fqme2stream: dict[str, MsgStream]
fqme2ipc: dict[str, MsgStream]
_annot_stack: AsyncExitStack
# runtime-populated mapping of all annotation
# ids to their equivalent IPC msg-streams.
_ipcs: dict[int, MsgStream] = {}
async def add_rect(
self,
@ -155,13 +238,21 @@ class AnnotCtl(Struct):
domain: str = 'view', # or 'scene'
color: str = 'dad_blue',
from_acm: bool = False,
) -> int:
'''
Add a `SelectRect` annotation to the target view, return
the instances `id(obj)` from the remote UI actor.
'''
ipc: MsgStream = self.fqme2stream[fqme]
ipc: MsgStream = self.fqme2ipc.get(fqme)
if ipc is None:
raise SymbolNotFound(
'No chart (actor) seems to have mkt feed loaded?\n'
f'{fqme}'
)
await ipc.send({
'fqme': fqme,
'annot': 'SelectRect',
@ -175,32 +266,61 @@ class AnnotCtl(Struct):
'update_label': False,
},
})
return (await ipc.receive())
async def modify(
self,
aid: int, # annotation id
meth: str, # far end graphics object method to invoke
params: dict[str, Any], # far end `meth(**kwargs)`
) -> bool:
'''
Modify an existing (remote) annotation's graphics
paramters, thus changing it's appearance / state in real
time.
'''
raise NotImplementedError
aid: int = await ipc.receive()
self._ipcs[aid] = ipc
if not from_acm:
self._annot_stack.push_async_callback(
partial(
self.remove,
aid,
)
)
return aid
async def remove(
self,
uid: int,
aid: int,
) -> bool:
'''
Remove an existing annotation by instance id.
'''
raise NotImplementedError
ipc: MsgStream = self._ipcs[aid]
await ipc.send({
'rm_annot': aid,
})
removed: bool = await ipc.receive()
return removed
@acm
async def open_rect(
self,
**kwargs,
) -> int:
try:
aid: int = await self.add_rect(
from_acm=True,
**kwargs,
)
yield aid
finally:
await self.remove(aid)
# TODO: do we even need this?
# async def modify(
# self,
# aid: int, # annotation id
# meth: str, # far end graphics object method to invoke
# params: dict[str, Any], # far end `meth(**kwargs)`
# ) -> bool:
# '''
# Modify an existing (remote) annotation's graphics
# paramters, thus changing it's appearance / state in real
# time.
# '''
# raise NotImplementedError
@acm
@ -235,20 +355,18 @@ async def open_annot_ctl(
)
ctx2fqmes: dict[str, set[str]] = {}
fqme2stream: dict[str, MsgStream] = {}
client = AnnotCtl(
ctx2fqmes=ctx2fqmes,
fqme2stream=fqme2stream,
)
fqme2ipc: dict[str, MsgStream] = {}
stream_ctxs: list[AsyncContextManager] = []
async with trionics.gather_contexts(ctx_mngrs) as ctxs:
async with (
trionics.gather_contexts(ctx_mngrs) as ctxs,
):
for (ctx, fqmes) in ctxs:
stream_ctxs.append(ctx.open_stream())
# fill lookup table of mkt addrs to IPC ctxs
for fqme in fqmes:
if other := fqme2stream.get(fqme):
if other := fqme2ipc.get(fqme):
raise ValueError(
f'More then one chart displays {fqme}!?\n'
'Other UI actor info:\n'
@ -266,8 +384,19 @@ async def open_annot_ctl(
for stream in streams:
fqmes: set[str] = ctx2fqmes[stream._ctx.cid]
for fqme in fqmes:
fqme2stream[fqme] = stream
fqme2ipc[fqme] = stream
yield client
# TODO: on graceful teardown should we try to
# remove all annots that were created/modded?
# NOTE: on graceful teardown we always attempt to
# remove all annots that were created by the
# entering client.
# TODO: should we maybe instead/also do this on the
# server-actor side so that when a client
# disconnects we always delete all annotations by
# default instaead of expecting the client to?
async with AsyncExitStack() as annots_stack:
client = AnnotCtl(
ctx2fqmes=ctx2fqmes,
fqme2ipc=fqme2ipc,
_annot_stack=annots_stack,
)
yield client