Compare commits

...

17 Commits

Author SHA1 Message Date
Tyler Goodlet a681b2f0bb Drop passing `bus` to `tsp.manage_history()` in feed allocator 2023-12-22 21:44:38 -05:00
Tyler Goodlet 5b0c94933b `.config`: don't hack the user config dir if user is 'root' and sudo was NOT used.. 2023-12-22 21:41:51 -05:00
Tyler Goodlet 61e52213b2 Oof, fix no-tsdb-entry since needs full backfill case!
Got borked by the logic re-factoring to get more conc going around
tsdb vs. latest frame loads with nested nurseries. So, repair all that
such that we can still backfill symbols previously not loaded as well as
drop all the `_FeedBus` instance passing to subtasks where it's
definitely not needed.

Toss in a pause point around sampler stream `'backfilling'` msgs as well
since there's seems to be a weird ctx-cancelled propagation going on
when a feed client disconnects during backfill and this might be where
the src `tractor.ContextCancelled` is getting bubbled from?
2023-12-22 21:34:31 -05:00
Tyler Goodlet b064a5f94d A working remote annotations controller B)
Obvi took a little `.ui` component fixing (as per prior commits) but
this is now a working PoC for gap detection and markup from a remote
(data) non-`chart` actor!

Iface and impl deats from `.ui._remote_ctl`:
- add new `open_annot_ctl()` mngr which attaches to all locally
  discoverable chart actors, gathers annot-ctl streams per fqme set, and
  delivers a new `AnnotCtl` client which allows adding annotation
  rectangles via a `.add_rect()` method.
  - also template out some other soon-to-get methods for removing and
    modifying pre-exiting annotations on some `ChartView` 💥
- ensure the `chart` CLI subcmd starts the (`qtloops`) guest-mode init
  with the `.ui._remote_ctl` module enabled.
- actually use this stuff in the `piker store ldshm` CLI to submit
  markup rects around any detected null/time gaps in the tsdb data!

Still lots to do:
-  probably colorization of gaps depending on if they're venue
   closures (aka real mkt gaps) vs. "missing data" from the backend (aka
   timeseries consistency gaps).
- run gap detection and markup as part of the std `.tsp` sub-sys
   runtime such that gap annots are a std "built-in" feature of
   charting.
- support for epoch time stamp AND abs-shm-index rect x-values
  (depending on chart operational state).
2023-12-22 15:19:20 -05:00
Tyler Goodlet e7fa841263 Pass scene-points to `.select_box` as per prior comments
As mentioned in a prior commit this was the (seemingly, and so far) only
way to make our `.select_box` annotator shift-click rect work properly
(and the same as) by adopting the code around `ViewBox.rbScaleBox`
(which we now also disable). That means also passing the scene coords to
the `SelectRect.set_scen_pos()`. Also add in the proper `ev:
pyqtgraph.GraphicsScene.mouseEvents.MouseDragEvent` so we can actually
figure out wut the hell all this pg custom mouse-event stuff is XD
2023-12-22 12:09:08 -05:00
Tyler Goodlet 1f346483a0 Always pass full `ShmArray._array` buf to `ContentsLables` updates so the label can be used outside the "backfilled-valid" range 2023-12-22 12:06:55 -05:00
Tyler Goodlet d006ecce7e Fix `._pg_overrides` import cycle caused by our `Axis` override 2023-12-22 12:05:18 -05:00
Tyler Goodlet 69368f20c2 Finally fix our `SelectRect` for use with cursor..
Turns out using the `.setRect()` method was the main cause of the issue
(though still don't really understand how or why) and this instead
adopts verbatim the code from `pg.ViewBox.updateScaleBox()` which uses
a scaling transform to set the rect for the "zoom scale box" thingy.

Further add a shite ton more improvements and interface tweaks in
support of the new remote-annotation control msging subsys:
- re-impl `.set_scen_pos()` to expect `QGraphicsScene` coordinates (i.e.
  passed from the interaction loop and pass scene `QPointF`s from
  `ViewBox.mouseDragEvent()` using the `MouseDragEvent.scenePos()` and
  friends; this is required to properly use the transform setting
  approach to resize the select-rect as mentioned above.
- add `as_point()` converter to maybe-cast python `tuple[float, float]`
  inputs (prolly from IPC msgs) to equivalent `QPointF`s.
- add a ton more detailed Qt-obj-related typing throughout our deriv.
- call `.add_to_view()` from init so that wtv view is passed in during
  instantiation is always set as the `.vb` after creation.
- factor the (proxy widget) label creation into a new `.init_label()`
  so that both the `set_scen/view_pos()` methods can call it and just
  generally decouple rect-pos mods from label content mods.
2023-12-22 11:47:31 -05:00
Tyler Goodlet 31fa0b02f5 Append any `enable_modules` specc-ed in the chart guest-mode runner 2023-12-21 20:40:00 -05:00
Tyler Goodlet 5a60974990 Use explicit `.data.feed` import of `tractor.trionics` 2023-12-21 20:26:45 -05:00
Tyler Goodlet 8d324acf91 First (untested) draft remote annotation ctl API
Since we can and want to eventually allow remote control of pretty much
all UIs, this drafts out a new `.ui._remote_ctl` module with a new
`@tractor.context` called `remote_annotate()` which simply starts a msg
loop which allows for (eventual) initial control of a `SelectRect`
through IPC msgs.

Remote controller impl deats:
- make `._display.graphics_update_loop()` set a `._remote_ctl._dss:
  dict` for all chart actor-global `DisplayState` instances which can
  then be controlled from the `remote_annotate()` handler task.
- also stash any remote client controller `tractor.Context` handles in
  a module var for broadband IPC cancellation on any display loop
  shutdown.
- draft a further global map to track graphics object instances since
  likely we'll want to support remote mutation where the client can use
  the `id(obj): int` key as an IPC handle/uuid.
- just draft out a client-side `@acm` for now: `open_annots_client()` to
  be filled out in up coming commits.

UI component tweaks in support of the above:
- change/add `SelectRect.set_view_pos()` and `.set_scene_pos()` to allow
  specifying the rect coords in either of the scene or viewbox domains.
  - use these new apis in the interaction loop.
- add a `SelectRect.add_to_view()` to avoid having annotation client
  code knowing "how" a graphics obj needs to be added and can instead
  just pass only the target `ChartView` during init.
- drop all the status label updates from the display loop since they
  don't really work all the time, and probably it's not a feature we
  want to keep in the longer term (over just console output and/or using
  the status bar for simpler "current state / mkt" infos).
  - allows a bit of simplification of `.ui._fsp` method APIs to not pass
    around status (bar) callbacks as well!
2023-12-19 15:36:54 -05:00
Tyler Goodlet ab84303da7 Drop `SelectRect.mouse_drag_released()` since it was a dumb method 2023-12-18 20:32:17 -05:00
Tyler Goodlet 659649ec48 Bah, fix nursery indents for maybe tsdb backloading
Can't ref `dt_eps` and `tsdb_entry` if they don't exist.. like for 1s
sampling from `binance` (which dne). So make sure to add better logic
guard and only open the finaly backload nursery if we actually need to
fill the gap between latest history and where tsdb history ends.

TO CHERRY #486
2023-12-18 19:46:59 -05:00
Tyler Goodlet f7cc43ee0b Add pauses to `store anal/ldshm` only on bad segs
Particularly halting before maybe writing the repaired timeseries
history in `store anal` to optionally allow user to avoid writing to
storage.
2023-12-18 11:56:57 -05:00
Tyler Goodlet f5dc21d3f4 Adjust all `.tsp` imports to use new sub-pkg
Also toss in a poll loop around the `hist_shm: ShmArray` backfill
read-check in the `.data.allocate_persisten_feed()`  init to cope with
possible racy-ness from the increased tsdb history loading concurrency
now implemented.
2023-12-18 11:54:28 -05:00
Tyler Goodlet 4568c55f17 Create `piker.tsp` "time series processing" subpkg
Move `.data.history` -> `.tsp.__init__.py` for now as main pkg-mod
and `.data.tsp` -> `.tsp._anal` (for analysis).

Obviously follow commits will change surrounding codebase (imports) to
match..
2023-12-18 11:53:27 -05:00
Tyler Goodlet d5d68f75ea ib: only raise first quote timeout err after tries
Previously we were actually failing silently too fast instead of
actually trying multiple times (now we do for 100) before finally
raising any timeout in the final loop `else:` block.
2023-12-18 11:45:19 -05:00
20 changed files with 1118 additions and 395 deletions

View File

@ -843,6 +843,7 @@ class Client:
self,
contract: Contract,
timeout: float = 1,
tries: int = 100,
raise_on_timeout: bool = False,
) -> Ticker | None:
@ -857,30 +858,30 @@ class Client:
ready: ticker.TickerUpdateEvent = ticker.updateEvent
# ensure a last price gets filled in before we deliver quote
timeouterr: Exception | None = None
warnset: bool = False
for _ in range(100):
if isnan(ticker.last):
for _ in range(tries):
# wait for a first update(Event)
# wait for a first update(Event) indicatingn a
# live quote feed.
if isnan(ticker.last):
try:
tkr = await asyncio.wait_for(
ready,
timeout=timeout,
)
except TimeoutError:
import pdbp
pdbp.set_trace()
if tkr:
break
except TimeoutError as err:
timeouterr = err
await asyncio.sleep(0.01)
continue
if raise_on_timeout:
raise
return None
if tkr:
break
else:
if not warnset:
log.warning(
f'Quote for {contract} timed out: market is closed?'
f'Quote req timed out..maybe venue is closed?\n'
f'{asdict(contract)}'
)
warnset = True
@ -888,6 +889,11 @@ class Client:
log.info(f'Got first quote for {contract}')
break
else:
if timeouterr and raise_on_timeout:
import pdbp
pdbp.set_trace()
raise timeouterr
if not warnset:
log.warning(
f'Contract {contract} is not returning a quote '
@ -895,6 +901,8 @@ class Client:
)
warnset = True
return None
return ticker
# async to be consistent for the client proxy, and cuz why not.

View File

@ -943,6 +943,11 @@ async def stream_quotes(
contract=con,
raise_on_timeout=True,
)
first_quote: dict = normalize(first_ticker)
log.info(
'Rxed init quote:\n'
f'{pformat(first_quote)}'
)
cs: trio.CancelScope | None = None
startup: bool = True
while (

View File

@ -134,14 +134,19 @@ def get_app_dir(
_click_config_dir: Path = Path(get_app_dir('piker'))
_config_dir: Path = _click_config_dir
_parent_user: str = os.environ.get('SUDO_USER')
if _parent_user:
# NOTE: when using `sudo` we attempt to determine the non-root user
# and still use their normal config dir.
if (
(_parent_user := os.environ.get('SUDO_USER'))
and
_parent_user != 'root'
):
non_root_user_dir = Path(
os.path.expanduser(f'~{_parent_user}')
)
root: str = 'root'
_ccds: str = str(_click_config_dir) # click config dir string
_ccds: str = str(_click_config_dir) # click config dir as string
i_tail: int = int(_ccds.rfind(root) + len(root))
_config_dir = (
non_root_user_dir

View File

@ -45,10 +45,7 @@ import trio
from trio.abc import ReceiveChannel
from trio_typing import TaskStatus
import tractor
from tractor.trionics import (
maybe_open_context,
gather_contexts,
)
from tractor import trionics
from piker.accounting import (
MktPair,
@ -69,7 +66,7 @@ from .validate import (
FeedInit,
validate_backend,
)
from .history import (
from ..tsp import (
manage_history,
)
from .ingest import get_ingestormod
@ -124,6 +121,8 @@ class _FeedsBus(Struct):
trio.CancelScope] = trio.TASK_STATUS_IGNORED,
) -> None:
with trio.CancelScope() as cs:
# TODO: shouldn't this be a direct await to avoid
# cancellation contagion to the bus nursery!?!?!
await self.nursery.start(
target,
*args,
@ -330,7 +329,6 @@ async def allocate_persistent_feed(
) = await bus.nursery.start(
manage_history,
mod,
bus,
mkt,
some_data_ready,
feed_is_live,
@ -407,7 +405,13 @@ async def allocate_persistent_feed(
rt_shm.array['time'][1] = ts + 1
elif hist_shm.array.size == 0:
raise RuntimeError(f'History (1m) Shm for {fqme} is empty!?')
for i in range(100):
await trio.sleep(0.1)
if hist_shm.array.size > 0:
break
else:
await tractor.pause()
raise RuntimeError(f'History (1m) Shm for {fqme} is empty!?')
# wait the spawning parent task to register its subscriber
# send-stream entry before we start the sample loop.
@ -453,8 +457,12 @@ async def open_feed_bus(
if loglevel is None:
loglevel = tractor.current_actor().loglevel
# XXX: required to propagate ``tractor`` loglevel to piker logging
get_console_log(loglevel or tractor.current_actor().loglevel)
# XXX: required to propagate ``tractor`` loglevel to piker
# logging
get_console_log(
loglevel
or tractor.current_actor().loglevel
)
# local state sanity checks
# TODO: check for any stale shm entries for this symbol
@ -464,7 +472,7 @@ async def open_feed_bus(
assert 'brokerd' in servicename
assert brokername in servicename
bus = get_feed_bus(brokername)
bus: _FeedsBus = get_feed_bus(brokername)
sub_registered = trio.Event()
flumes: dict[str, Flume] = {}
@ -768,7 +776,7 @@ async def maybe_open_feed(
'''
fqme = fqmes[0]
async with maybe_open_context(
async with trionics.maybe_open_context(
acm_func=open_feed,
kwargs={
'fqmes': fqmes,
@ -788,7 +796,7 @@ async def maybe_open_feed(
# add a new broadcast subscription for the quote stream
# if this feed is likely already in use
async with gather_contexts(
async with trionics.gather_contexts(
mngrs=[stream.subscribe() for stream in feed.streams.values()]
) as bstreams:
for bstream, flume in zip(bstreams, feed.flumes.values()):
@ -848,7 +856,7 @@ async def open_feed(
)
portals: tuple[tractor.Portal]
async with gather_contexts(
async with trionics.gather_contexts(
brokerd_ctxs,
) as portals:
@ -900,7 +908,7 @@ async def open_feed(
assert len(feed.mods) == len(feed.portals)
async with (
gather_contexts(bus_ctxs) as ctxs,
trionics.gather_contexts(bus_ctxs) as ctxs,
):
stream_ctxs: list[tractor.MsgStream] = []
for (
@ -942,7 +950,7 @@ async def open_feed(
brokermod: ModuleType
fqmes: list[str]
async with (
gather_contexts(stream_ctxs) as streams,
trionics.gather_contexts(stream_ctxs) as streams,
):
for (
stream,
@ -958,6 +966,12 @@ async def open_feed(
if brokermod.name == flume.mkt.broker:
flume.stream = stream
assert len(feed.mods) == len(feed.portals) == len(feed.streams)
assert (
len(feed.mods)
==
len(feed.portals)
==
len(feed.streams)
)
yield feed

View File

@ -100,6 +100,10 @@ async def open_piker_runtime(
or [_default_reg_addr]
)
if ems := tractor_kwargs.get('enable_modules'):
# import pdbp; pdbp.set_trace()
enable_modules.extend(ems)
async with (
tractor.open_root_actor(

View File

@ -1,5 +1,5 @@
# piker: trading gear for hackers
# Copyright (C) 2018-present Tyler Goodlet (in stewardship of piker0)
# Copyright (C) 2018-present Tyler Goodlet (in stewardship of pikers)
# This program is free software: you can redistribute it and/or modify
# it under the terms of the GNU Affero General Public License as published by
@ -19,6 +19,7 @@ Storage middle-ware CLIs.
"""
from __future__ import annotations
# from datetime import datetime
from pathlib import Path
import time
@ -36,11 +37,8 @@ from piker.cli import cli
from piker.config import get_conf_dir
from piker.data import (
ShmArray,
tsp,
)
from piker.data.history import (
iter_dfs_from_shms,
)
from piker import tsp
from . import (
log,
)
@ -189,8 +187,8 @@ def anal(
frame=history,
period=period,
)
if null_segs:
await tractor.pause()
# TODO: do tsp queries to backcend to fill i missing
# history and then prolly write it to tsdb!
shm_df: pl.DataFrame = await client.as_df(
fqme,
@ -206,18 +204,27 @@ def anal(
diff,
) = tsp.dedupe(shm_df)
write_edits: bool = True
if (
write_edits
and (
diff
or null_segs
)
):
await tractor.pause()
if diff:
await client.write_ohlcv(
fqme,
ohlcv=deduped,
timeframe=period,
)
# TODO: something better with tab completion..
# is there something more minimal but nearly as
# functional as ipython?
await tractor.pause()
else:
# TODO: something better with tab completion..
# is there something more minimal but nearly as
# functional as ipython?
await tractor.pause()
trio.run(main)
@ -243,11 +250,11 @@ def ldshm(
),
):
df: pl.DataFrame | None = None
for shmfile, shm, shm_df in iter_dfs_from_shms(fqme):
for shmfile, shm, shm_df in tsp.iter_dfs_from_shms(fqme):
# compute ohlc properties for naming
times: np.ndarray = shm.array['time']
period_s: float = times[-1] - times[-2]
period_s: float = float(times[-1] - times[-2])
if period_s < 1.:
raise ValueError(
f'Something is wrong with time period for {shm}:\n{times}'
@ -270,11 +277,86 @@ def ldshm(
# TODO: maybe only optionally enter this depending
# on some CLI flags and/or gap detection?
if not gaps.is_empty():
await tractor.pause()
if (
not gaps.is_empty()
or null_segs
):
from piker.ui._remote_ctl import (
open_annot_ctl,
AnnotCtl,
)
annot_ctl: AnnotCtl
async with open_annot_ctl() as annot_ctl:
for i in range(gaps.height):
if null_segs:
await tractor.pause()
row: pl.DataFrame = gaps[i]
# TODO: can we eventually remove this
# once we figure out why the epoch cols
# don't match?
iend: int = row['index'][0]
# dt: datetime = row['dt'][0]
# dt_prev: datetime = row['dt_prev'][0]
# the gap's right-most bar's OPEN value
# at that time (sample) step.
# dt_end_t: float = dt.timestamp()
# TODO: FIX HOW/WHY these aren't matching
# and are instead off by 4hours (EST
# vs. UTC?!?!)
# end_t: float = row['time']
# assert (
# dt.timestamp()
# ==
# end_t
# )
# the gap's left-most bar's CLOSE value
# at that time (sample) step.
prev_r: pl.DataFrame = df.filter(
pl.col('index') == gaps[0]['index'] - 1
)
istart: int = prev_r['index'][0]
# dt_start_t: float = dt_prev.timestamp()
# start_t: float = prev_r['time']
# assert (
# dt_start_t
# ==
# start_t
# )
# TODO: implement px-col width measure
# and ensure at least as many px-cols
# shown per rect as configured by user.
gap_w: float = abs((iend - istart))
# await tractor.pause()
if gap_w < 6:
margin: float = 6
iend += margin
istart -= margin
ro: tuple[float, float] = (
# dt_end_t,
iend,
row['open'][0],
)
lc: tuple[float, float] = (
# dt_start_t,
istart,
prev_r['close'][0],
)
aid: int = await annot_ctl.add_rect(
fqme=fqme,
timeframe=period_s,
start_pos=lc,
end_pos=ro,
)
assert aid
await tractor.pause()
# write to parquet file?
if write_parquet:

View File

@ -56,8 +56,6 @@ from datetime import datetime
from pathlib import Path
import time
# from bidict import bidict
# import tractor
import numpy as np
import polars as pl
from pendulum import (
@ -65,10 +63,10 @@ from pendulum import (
)
from piker import config
from piker import tsp
from piker.data import (
def_iohlcv_fields,
ShmArray,
tsp,
)
from piker.log import get_logger
from . import TimeseriesNotFound

View File

@ -32,6 +32,7 @@ from __future__ import annotations
from datetime import datetime
from functools import partial
from pathlib import Path
from pprint import pformat
from types import ModuleType
from typing import (
Callable,
@ -53,25 +54,64 @@ import polars as pl
from ..accounting import (
MktPair,
)
from ._util import (
from ..data._util import (
log,
)
from ._sharedmem import (
from ..data._sharedmem import (
maybe_open_shm_array,
ShmArray,
)
from ._source import def_iohlcv_fields
from ._sampling import (
from ..data._source import def_iohlcv_fields
from ..data._sampling import (
open_sample_stream,
)
from .tsp import (
dedupe,
from ._anal import (
get_null_segs,
iter_null_segs,
sort_diff,
Frame,
# Seq,
Seq,
# codec-ish
np2pl,
pl2np,
# `numpy` only
slice_from_time,
# `polars` specific
dedupe,
with_dts,
detect_time_gaps,
sort_diff,
# TODO:
detect_price_gaps
)
__all__: list[str] = [
'dedupe',
'get_null_segs',
'iter_null_segs',
'sort_diff',
'slice_from_time',
'Frame',
'Seq',
'np2pl',
'pl2np',
'slice_from_time',
'with_dts',
'detect_time_gaps',
'sort_diff',
# TODO:
'detect_price_gaps'
]
# TODO: break up all this shite into submods!
from ..brokers._util import (
DataUnavailable,
)
@ -229,16 +269,20 @@ async def maybe_fill_null_segments(
# - remember that in the display side, only refersh this
# if the respective history is actually "in view".
# loop
await sampler_stream.send({
'broadcast_all': {
try:
await sampler_stream.send({
'broadcast_all': {
# XXX NOTE XXX: see the
# `.ui._display.increment_history_view()` if block
# that looks for this info to FORCE a hard viz
# redraw!
'backfilling': (mkt.fqme, timeframe),
},
})
# XXX NOTE XXX: see the
# `.ui._display.increment_history_view()` if block
# that looks for this info to FORCE a hard viz
# redraw!
'backfilling': (mkt.fqme, timeframe),
},
})
except tractor.ContextCancelled:
# log.exception
await tractor.pause()
null_segs_detected.set()
# RECHECK for more null-gaps
@ -252,39 +296,68 @@ async def maybe_fill_null_segments(
and
len(null_segs[-1])
):
await tractor.pause()
(
iabs_slices,
iabs_zero_rows,
zero_t,
) = null_segs
log.warning(
f'{len(iabs_slices)} NULL TIME SEGMENTS DETECTED!\n'
f'{pformat(iabs_slices)}'
)
array = shm.array
zeros = array[array['low'] == 0]
# always backfill gaps with the earliest (price) datum's
# value to avoid the y-ranger including zeros and completely
# stretching the y-axis..
if 0 < zeros.size:
zeros[[
# TODO: always backfill gaps with the earliest (price) datum's
# value to avoid the y-ranger including zeros and completely
# stretching the y-axis..
# array: np.ndarray = shm.array
# zeros = array[array['low'] == 0]
ohlc_fields: list[str] = [
'open',
'high',
'low',
'close',
]] = shm._array[zeros['index'][0] - 1]['close']
]
# TODO: interatively step through any remaining
# time-gaps/null-segments and spawn piecewise backfiller
# tasks in a nursery?
# -[ ] not sure that's going to work so well on the ib
# backend but worth a shot?
# -[ ] mk new history connections to make it properly
# parallel possible no matter the backend?
# -[ ] fill algo: do queries in alternating "latest, then
# earliest, then latest.. etc?"
# if (
# next_end_dt not in frame[
# ):
# pass
for istart, istop in iabs_slices:
# get view into buffer for null-segment
gap: np.ndarray = shm._array[istart:istop]
# copy the oldest OHLC samples forward
gap[ohlc_fields] = shm._array[istart]['close']
start_t: float = shm._array[istart]['time']
t_diff: float = (istop - istart)*timeframe
gap['time'] = np.arange(
start=start_t,
stop=start_t + t_diff,
step=timeframe,
)
await sampler_stream.send({
'broadcast_all': {
# XXX NOTE XXX: see the
# `.ui._display.increment_history_view()` if block
# that looks for this info to FORCE a hard viz
# redraw!
'backfilling': (mkt.fqme, timeframe),
},
})
# TODO: interatively step through any remaining
# time-gaps/null-segments and spawn piecewise backfiller
# tasks in a nursery?
# -[ ] not sure that's going to work so well on the ib
# backend but worth a shot?
# -[ ] mk new history connections to make it properly
# parallel possible no matter the backend?
# -[ ] fill algo: do queries in alternating "latest, then
# earliest, then latest.. etc?"
# await tractor.pause()
async def start_backfill(
tn: trio.Nursery,
get_hist,
mod: ModuleType,
mkt: MktPair,
@ -338,7 +411,6 @@ async def start_backfill(
# settings above when the tsdb is detected as being empty.
backfill_until_dt = backfill_from_dt.subtract(**period_duration)
# STAGE NOTE: "backward history gap filling":
# - we push to the shm buffer until we have history back
# until the latest entry loaded from the tsdb's table B)
@ -682,6 +754,8 @@ async def back_load_from_tsdb(
async def push_latest_frame(
# box-type only that should get packed with the datetime
# objects received for the latest history frame
dt_eps: list[DateTime, DateTime],
shm: ShmArray,
get_hist: Callable[
@ -691,8 +765,11 @@ async def push_latest_frame(
timeframe: float,
config: dict,
task_status: TaskStatus[None] = trio.TASK_STATUS_IGNORED,
):
task_status: TaskStatus[
Exception | list[datetime, datetime]
] = trio.TASK_STATUS_IGNORED,
) -> list[datetime, datetime] | None:
# get latest query's worth of history all the way
# back to what is recorded in the tsdb
try:
@ -709,17 +786,19 @@ async def push_latest_frame(
mr_start_dt,
mr_end_dt,
])
task_status.started(dt_eps)
# XXX: timeframe not supported for backend (since
# above exception type), terminate immediately since
# there's no backfilling possible.
except DataUnavailable:
task_status.started()
task_status.started(None)
if timeframe > 1:
await tractor.pause()
return
# prolly tf not supported
return None
# NOTE: on the first history, most recent history
# frame we PREPEND from the current shm ._last index
@ -731,11 +810,16 @@ async def push_latest_frame(
prepend=True, # append on first frame
)
return dt_eps
async def load_tsdb_hist(
storage: StorageClient,
mkt: MktPair,
timeframe: float,
task_status: TaskStatus[None] = trio.TASK_STATUS_IGNORED,
) -> tuple[
np.ndarray,
DateTime,
@ -821,48 +905,63 @@ async def tsdb_backfill(
config,
)
# tell parent task to continue
# TODO: really we'd want this the other way with the
# tsdb load happening asap and the since the latest
# frame query will normally be the main source of
# latency?
task_status.started()
tsdb_entry: tuple = await load_tsdb_hist(
storage,
mkt,
timeframe,
)
# tell parent task to continue
# TODO: really we'd want this the other way with the
# tsdb load happening asap and the since the latest
# frame query will normally be the main source of
# latency?
task_status.started()
# NOTE: iabs to start backfilling from, reverse chronological,
# ONLY AFTER the first history frame has been pushed to
# mem!
backfill_gap_from_shm_index: int = shm._first.value + 1
(
mr_start_dt,
mr_end_dt,
) = dt_eps
# Prepend any tsdb history into the rt-shm-buffer which
# should NOW be getting filled with the most recent history
# pulled from the data-backend.
if dt_eps:
# well then, unpack the latest (gap) backfilled frame dts
(
mr_start_dt,
mr_end_dt,
) = dt_eps
async with trio.open_nursery() as tn:
# NOTE: when there's no offline data, there's 2 cases:
# - data backend doesn't support timeframe/sample
# period (in which case `dt_eps` should be `None` and
# we shouldn't be here!), or
# - no prior history has been stored (yet) and we need
# todo full backfill of the history now.
if tsdb_entry is None:
# indicate to backfill task to fill the whole
# shm buffer as much as it can!
last_tsdb_dt = None
# Prepend any tsdb history to the shm buffer which should
# now be full of the most recent history pulled from the
# backend's last frame.
if tsdb_entry:
# there's existing tsdb history from (offline) storage
# so only backfill the gap between the
# most-recent-frame (mrf) and that latest sample.
else:
(
tsdb_history,
first_tsdb_dt,
last_tsdb_dt,
) = tsdb_entry
# if there is a gap to backfill from the first
# history frame until the last datum loaded from the tsdb
# continue that now in the background
# if there is a gap to backfill from the first
# history frame until the last datum loaded from the tsdb
# continue that now in the background
async with trio.open_nursery() as tn:
bf_done = await tn.start(
partial(
start_backfill,
tn=tn,
get_hist=get_hist,
mod=mod,
mkt=mkt,
@ -871,102 +970,107 @@ async def tsdb_backfill(
backfill_from_shm_index=backfill_gap_from_shm_index,
backfill_from_dt=mr_start_dt,
sampler_stream=sampler_stream,
backfill_until_dt=last_tsdb_dt,
storage=storage,
write_tsdb=True,
)
)
nulls_detected: trio.Event | None = None
if last_tsdb_dt is not None:
# calc the index from which the tsdb data should be
# prepended, presuming there is a gap between the
# latest frame (loaded/read above) and the latest
# sample loaded from the tsdb.
backfill_diff: Duration = mr_start_dt - last_tsdb_dt
offset_s: float = backfill_diff.in_seconds()
# calc the index from which the tsdb data should be
# prepended, presuming there is a gap between the
# latest frame (loaded/read above) and the latest
# sample loaded from the tsdb.
backfill_diff: Duration = mr_start_dt - last_tsdb_dt
offset_s: float = backfill_diff.in_seconds()
# XXX EDGE CASEs: the most recent frame overlaps with
# prior tsdb history!!
# - so the latest frame's start time is earlier then
# the tsdb's latest sample.
# - alternatively this may also more generally occur
# when the venue was closed (say over the weeknd)
# causing a timeseries gap, AND the query frames size
# (eg. for ib's 1s we rx 2k datums ~= 33.33m) IS
# GREATER THAN the current venue-market's operating
# session (time) we will receive datums from BEFORE THE
# CLOSURE GAP and thus the `offset_s` value will be
# NEGATIVE! In this case we need to ensure we don't try
# to push datums that have already been recorded in the
# tsdb. In this case we instead only retreive and push
# the series portion missing from the db's data set.
# if offset_s < 0:
# non_overlap_diff: Duration = mr_end_dt - last_tsdb_dt
# non_overlap_offset_s: float = backfill_diff.in_seconds()
# XXX EDGE CASEs: the most recent frame overlaps with
# prior tsdb history!!
# - so the latest frame's start time is earlier then
# the tsdb's latest sample.
# - alternatively this may also more generally occur
# when the venue was closed (say over the weeknd)
# causing a timeseries gap, AND the query frames size
# (eg. for ib's 1s we rx 2k datums ~= 33.33m) IS
# GREATER THAN the current venue-market's operating
# session (time) we will receive datums from BEFORE THE
# CLOSURE GAP and thus the `offset_s` value will be
# NEGATIVE! In this case we need to ensure we don't try
# to push datums that have already been recorded in the
# tsdb. In this case we instead only retreive and push
# the series portion missing from the db's data set.
# if offset_s < 0:
# non_overlap_diff: Duration = mr_end_dt - last_tsdb_dt
# non_overlap_offset_s: float = backfill_diff.in_seconds()
offset_samples: int = round(offset_s / timeframe)
offset_samples: int = round(offset_s / timeframe)
# TODO: see if there's faster multi-field reads:
# https://numpy.org/doc/stable/user/basics.rec.html#accessing-multiple-fields
# re-index with a `time` and index field
if offset_s > 0:
# NOTE XXX: ONLY when there is an actual gap
# between the earliest sample in the latest history
# frame do we want to NOT stick the latest tsdb
# history adjacent to that latest frame!
prepend_start = shm._first.value - offset_samples + 1
to_push = tsdb_history[-prepend_start:]
else:
# when there is overlap we want to remove the
# overlapping samples from the tsdb portion (taking
# instead the latest frame's values since THEY
# SHOULD BE THE SAME) and prepend DIRECTLY adjacent
# to the latest frame!
# TODO: assert the overlap segment array contains
# the same values!?!
prepend_start = shm._first.value
to_push = tsdb_history[-(shm._first.value):offset_samples - 1]
# TODO: see if there's faster multi-field reads:
# https://numpy.org/doc/stable/user/basics.rec.html#accessing-multiple-fields
# re-index with a `time` and index field
if offset_s > 0:
# NOTE XXX: ONLY when there is an actual gap
# between the earliest sample in the latest history
# frame do we want to NOT stick the latest tsdb
# history adjacent to that latest frame!
prepend_start = shm._first.value - offset_samples + 1
to_push = tsdb_history[-prepend_start:]
else:
# when there is overlap we want to remove the
# overlapping samples from the tsdb portion (taking
# instead the latest frame's values since THEY
# SHOULD BE THE SAME) and prepend DIRECTLY adjacent
# to the latest frame!
# TODO: assert the overlap segment array contains
# the same values!?!
prepend_start = shm._first.value
to_push = tsdb_history[-(shm._first.value):offset_samples - 1]
# tsdb history is so far in the past we can't fit it in
# shm buffer space so simply don't load it!
if prepend_start > 0:
shm.push(
to_push,
# tsdb history is so far in the past we can't fit it in
# shm buffer space so simply don't load it!
if prepend_start > 0:
shm.push(
to_push,
# insert the history pre a "days worth" of samples
# to leave some real-time buffer space at the end.
prepend=True,
# update_first=False,
start=prepend_start,
field_map=storemod.ohlc_key_map,
)
# insert the history pre a "days worth" of samples
# to leave some real-time buffer space at the end.
prepend=True,
# update_first=False,
start=prepend_start,
field_map=storemod.ohlc_key_map,
)
log.info(f'Loaded {to_push.shape} datums from storage')
log.info(f'Loaded {to_push.shape} datums from storage')
# NOTE: ASYNC-conduct tsdb timestamp gap detection and backfill any
# seemingly missing (null-time) segments..
# TODO: ideally these can never exist!
# -[ ] somehow it seems sometimes we're writing zero-ed
# segments to tsdbs during teardown?
# -[ ] can we ensure that the backcfiller tasks do this
# work PREVENTAVELY instead?
# -[ ] fill in non-zero epoch time values ALWAYS!
# await maybe_fill_null_segments(
nulls_detected: trio.Event = await tn.start(partial(
maybe_fill_null_segments,
# NOTE: ASYNC-conduct tsdb timestamp gap detection and backfill any
# seemingly missing (null-time) segments..
# TODO: ideally these can never exist!
# -[ ] somehow it seems sometimes we're writing zero-ed
# segments to tsdbs during teardown?
# -[ ] can we ensure that the backcfiller tasks do this
# work PREVENTAVELY instead?
# -[ ] fill in non-zero epoch time values ALWAYS!
# await maybe_fill_null_segments(
nulls_detected: trio.Event = await tn.start(partial(
maybe_fill_null_segments,
shm=shm,
timeframe=timeframe,
get_hist=get_hist,
sampler_stream=sampler_stream,
mkt=mkt,
))
shm=shm,
timeframe=timeframe,
get_hist=get_hist,
sampler_stream=sampler_stream,
mkt=mkt,
))
# 2nd nursery END
# TODO: who would want to?
if nulls_detected:
await nulls_detected.wait()
# TODO: who would want to?
await nulls_detected.wait()
await bf_done.wait()
await bf_done.wait()
# TODO: maybe start history anal and load missing "history
# gaps" via backend..
@ -1010,7 +1114,6 @@ async def tsdb_backfill(
async def manage_history(
mod: ModuleType,
bus: _FeedsBus,
mkt: MktPair,
some_data_ready: trio.Event,
feed_is_live: trio.Event,
@ -1167,7 +1270,6 @@ async def manage_history(
tsdb_backfill,
mod=mod,
storemod=storemod,
# bus,
storage=client,
mkt=mkt,
shm=tf2mem[timeframe],
@ -1252,13 +1354,11 @@ def iter_dfs_from_shms(
assert not opened
ohlcv = shm.array
from ..data import tsp
df: pl.DataFrame = tsp.np2pl(ohlcv)
from ._anal import np2pl
df: pl.DataFrame = np2pl(ohlcv)
yield (
shmfile,
shm,
df,
)

View File

@ -319,9 +319,8 @@ def get_null_segs(
if num_gaps < 1:
if absi_zeros.size > 1:
absi_zsegs = [[
# see `get_hist()` in backend, should ALWAYS be
# able to handle a `start_dt=None`!
# None,
# TODO: maybe mk these max()/min() limits func
# consts instead of called more then once?
max(
absi_zeros[0] - 1,
0,
@ -359,7 +358,10 @@ def get_null_segs(
# corresponding to the first zero-segment's row, we add it
# manually here.
absi_zsegs.append([
absi_zeros[0] - 1,
max(
absi_zeros[0] - 1,
0,
),
None,
])
@ -400,14 +402,18 @@ def get_null_segs(
else:
if 0 < num_gaps < 2:
absi_zsegs[-1][1] = absi_zeros[-1] + 1
absi_zsegs[-1][1] = min(
absi_zeros[-1] + 1,
frame['index'][-1],
)
iabs_first: int = frame['index'][0]
for start, end in absi_zsegs:
ts_start: float = times[start - iabs_first]
ts_end: float = times[end - iabs_first]
if (
ts_start == 0
(ts_start == 0 and not start == 0)
or
ts_end == 0
):
@ -451,11 +457,13 @@ def iter_null_segs(
],
None,
]:
if null_segs is None:
null_segs: tuple = get_null_segs(
if not (
null_segs := get_null_segs(
frame,
period=timeframe,
)
):
return
absi_pairs_zsegs: list[list[float, float]]
izeros: Seq
@ -502,6 +510,7 @@ def iter_null_segs(
)
# TODO: move to ._pl_anal
def with_dts(
df: pl.DataFrame,
time_col: str = 'time',
@ -517,7 +526,7 @@ def with_dts(
pl.from_epoch(pl.col(time_col)).alias('dt'),
]).with_columns([
pl.from_epoch(
pl.col(f'{time_col}_prev')
column=pl.col(f'{time_col}_prev'),
).alias('dt_prev'),
pl.col('dt').diff().alias('dt_diff'),
]) #.with_columns(
@ -525,19 +534,6 @@ def with_dts(
# )
def dedup_dt(
df: pl.DataFrame,
) -> pl.DataFrame:
'''
Drop duplicate date-time rows (normally from an OHLC frame).
'''
return df.unique(
subset=['dt'],
maintain_order=True,
)
t_unit: Literal = Literal[
'days',
'hours',
@ -651,7 +647,11 @@ def dedupe(src_df: pl.DataFrame) -> tuple[
)
# remove duplicated datetime samples/sections
deduped: pl.DataFrame = dedup_dt(df)
deduped: pl.DataFrame = df.unique(
subset=['dt'],
maintain_order=True,
)
deduped_gaps = detect_time_gaps(deduped)
diff: int = (

View File

@ -272,10 +272,15 @@ class ContentsLabels:
x_in: int,
) -> None:
for chart, name, label, update in self._labels:
for (
chart,
name,
label,
update,
)in self._labels:
viz = chart.get_viz(name)
array = viz.shm.array
array: np.ndarray = viz.shm._array
index = array[viz.index_field]
start = index[0]
stop = index[-1]
@ -286,7 +291,7 @@ class ContentsLabels:
):
# out of range
print('WTF out of range?')
continue
# continue
# call provided update func with data point
try:
@ -294,6 +299,7 @@ class ContentsLabels:
ix = np.searchsorted(index, x_in)
if ix > len(array):
breakpoint()
update(ix, array)
except IndexError:

View File

@ -56,8 +56,8 @@ _line_styles: dict[str, int] = {
class FlowGraphic(pg.GraphicsObject):
'''
Base class with minimal interface for `QPainterPath` implemented,
real-time updated "data flow" graphics.
Base class with minimal interface for `QPainterPath`
implemented, real-time updated "data flow" graphics.
See subtypes below.
@ -167,11 +167,12 @@ class FlowGraphic(pg.GraphicsObject):
return None
# XXX: due to a variety of weird jitter bugs and "smearing"
# artifacts when click-drag panning and viewing history time series,
# we offer this ctx-mngr interface to allow temporarily disabling
# Qt's graphics caching mode; this is now currently used from
# ``ChartView.start/signal_ic()`` methods which also disable the
# rt-display loop when the user is moving around a view.
# artifacts when click-drag panning and viewing history time
# series, we offer this ctx-mngr interface to allow temporarily
# disabling Qt's graphics caching mode; this is now currently
# used from ``ChartView.start/signal_ic()`` methods which also
# disable the rt-display loop when the user is moving around
# a view.
@cm
def reset_cache(self) -> None:
try:

View File

@ -49,7 +49,7 @@ from ..data._formatters import (
OHLCBarsAsCurveFmtr, # OHLC converted to line
StepCurveFmtr, # "step" curve (like for vlm)
)
from ..data.tsp import (
from ..tsp import (
slice_from_time,
)
from ._ohlc import (
@ -563,7 +563,8 @@ class Viz(Struct):
def view_range(self) -> tuple[int, int]:
'''
Return the start and stop x-indexes for the managed ``ViewBox``.
Return the start and stop x-indexes for the managed
``ViewBox``.
'''
vr = self.plot.viewRect()

View File

@ -470,54 +470,64 @@ async def graphics_update_loop(
if ds.hist_vars['i_last'] < ds.hist_vars['i_last_append']:
await tractor.pause()
# main real-time quotes update loop
stream: tractor.MsgStream
async with feed.open_multi_stream() as stream:
assert stream
async for quotes in stream:
quote_period = time.time() - last_quote_s
quote_rate = round(
1/quote_period, 1) if quote_period > 0 else float('inf')
if (
quote_period <= 1/_quote_throttle_rate
try:
from . import _remote_ctl
_remote_ctl._dss = dss
# in the absolute worst case we shouldn't see more then
# twice the expected throttle rate right!?
# and quote_rate >= _quote_throttle_rate * 2
and quote_rate >= display_rate
):
pass
# log.warning(f'High quote rate {mkt.fqme}: {quote_rate}')
last_quote_s = time.time()
for fqme, quote in quotes.items():
ds = dss[fqme]
ds.quotes = quote
rt_pi, hist_pi = pis[fqme]
# chart isn't active/shown so skip render cycle and
# pause feed(s)
# main real-time quotes update loop
stream: tractor.MsgStream
async with feed.open_multi_stream() as stream:
assert stream
async for quotes in stream:
quote_period = time.time() - last_quote_s
quote_rate = round(
1/quote_period, 1) if quote_period > 0 else float('inf')
if (
fast_chart.linked.isHidden()
or not rt_pi.isVisible()
quote_period <= 1/_quote_throttle_rate
# in the absolute worst case we shouldn't see more then
# twice the expected throttle rate right!?
# and quote_rate >= _quote_throttle_rate * 2
and quote_rate >= display_rate
):
print(f'{fqme} skipping update for HIDDEN CHART')
fast_chart.pause_all_feeds()
continue
pass
# log.warning(f'High quote rate {mkt.fqme}: {quote_rate}')
ic = fast_chart.view._in_interact
if ic:
fast_chart.pause_all_feeds()
print(f'{fqme} PAUSING DURING INTERACTION')
await ic.wait()
fast_chart.resume_all_feeds()
last_quote_s = time.time()
# sync call to update all graphics/UX components.
graphics_update_cycle(
ds,
quote,
)
for fqme, quote in quotes.items():
ds = dss[fqme]
ds.quotes = quote
rt_pi, hist_pi = pis[fqme]
# chart isn't active/shown so skip render cycle and
# pause feed(s)
if (
fast_chart.linked.isHidden()
or not rt_pi.isVisible()
):
print(f'{fqme} skipping update for HIDDEN CHART')
fast_chart.pause_all_feeds()
continue
ic = fast_chart.view._in_interact
if ic:
fast_chart.pause_all_feeds()
print(f'{fqme} PAUSING DURING INTERACTION')
await ic.wait()
fast_chart.resume_all_feeds()
# sync call to update all graphics/UX components.
graphics_update_cycle(
ds,
quote,
)
finally:
# XXX: cancel any remote annotation control ctxs
_remote_ctl._dss = None
for ctx in _remote_ctl._ctxs:
await ctx.cancel()
def graphics_update_cycle(
@ -1235,7 +1245,7 @@ async def display_symbol_data(
fast from a cached watch-list.
'''
sbar = godwidget.window.status_bar
# sbar = godwidget.window.status_bar
# historical data fetch
# brokermod = brokers.get_brokermod(provider)
@ -1245,11 +1255,11 @@ async def display_symbol_data(
# group_key=loading_sym_key,
# )
for fqme in fqmes:
loading_sym_key = sbar.open_status(
f'loading {fqme} ->',
group_key=True
)
# for fqme in fqmes:
# loading_sym_key = sbar.open_status(
# f'loading {fqme} ->',
# group_key=True
# )
# (TODO: make this not so shit XD)
# close group status once a symbol feed fully loads to view.
@ -1422,7 +1432,7 @@ async def display_symbol_data(
start_fsp_displays,
rt_linked,
flume,
loading_sym_key,
# loading_sym_key,
loglevel,
)

View File

@ -21,7 +21,8 @@ Higher level annotation editors.
from __future__ import annotations
from collections import defaultdict
from typing import (
TYPE_CHECKING
Sequence,
TYPE_CHECKING,
)
import pyqtgraph as pg
@ -31,24 +32,37 @@ from pyqtgraph import (
QtCore,
QtWidgets,
)
from PyQt5.QtCore import (
QPointF,
QRectF,
)
from PyQt5.QtGui import (
QColor,
QTransform,
)
from PyQt5.QtWidgets import (
QGraphicsProxyWidget,
QGraphicsScene,
QLabel,
)
from pyqtgraph import functions as fn
from PyQt5.QtCore import QPointF
import numpy as np
from piker.types import Struct
from ._style import hcolor, _font
from ._style import (
hcolor,
_font,
)
from ._lines import LevelLine
from ..log import get_logger
if TYPE_CHECKING:
from ._chart import GodWidget
from ._chart import (
GodWidget,
ChartPlotWidget,
)
from ._interaction import ChartView
log = get_logger(__name__)
@ -65,7 +79,7 @@ class ArrowEditor(Struct):
uid: str,
x: float,
y: float,
color='default',
color: str = 'default',
pointing: str | None = None,
) -> pg.ArrowItem:
@ -251,27 +265,56 @@ class LineEditor(Struct):
return lines
class SelectRect(QtWidgets.QGraphicsRectItem):
def as_point(
pair: Sequence[float, float] | QPointF,
) -> list[QPointF, QPointF]:
'''
Case any input tuple of floats to a a list of `QPoint` objects
for use in Qt geometry routines.
'''
if isinstance(pair, QPointF):
return pair
return QPointF(pair[0], pair[1])
# TODO: maybe implement better, something something RectItemProxy??
# -[ ] dig into details of how proxy's work?
# https://doc.qt.io/qt-5/qgraphicsscene.html#addWidget
# -[ ] consider using `.addRect()` maybe?
class SelectRect(QtWidgets.QGraphicsRectItem):
'''
A data-view "selection rectangle": the most fundamental
geometry for annotating data views.
- https://doc.qt.io/qt-5/qgraphicsrectitem.html
- https://doc.qt.io/qt-6/qgraphicsrectitem.html
'''
def __init__(
self,
viewbox: ViewBox,
color: str = 'dad_blue',
color: str | None = None,
) -> None:
super().__init__(0, 0, 1, 1)
# self.rbScaleBox = QGraphicsRectItem(0, 0, 1, 1)
self.vb = viewbox
self._chart: 'ChartPlotWidget' = None # noqa
self.vb: ViewBox = viewbox
# override selection box color
self._chart: ChartPlotWidget | None = None # noqa
# TODO: maybe allow this to be dynamic via a method?
#l override selection box color
color: str = color or 'dad_blue'
color = QColor(hcolor(color))
self.setPen(fn.mkPen(color, width=1))
color.setAlpha(66)
self.setBrush(fn.mkBrush(color))
self.setZValue(1e9)
self.hide()
self._label = None
label = self._label = QLabel()
label.setTextFormat(0) # markdown
@ -281,13 +324,15 @@ class SelectRect(QtWidgets.QGraphicsRectItem):
QtCore.Qt.AlignLeft
# | QtCore.Qt.AlignVCenter
)
label.hide() # always right after init
# proxy is created after containing scene is initialized
self._label_proxy = None
self._abs_top_right = None
self._label_proxy: QGraphicsProxyWidget | None = None
self._abs_top_right: Point | None = None
# TODO: "swing %" might be handy here (data's max/min # % change)
self._contents = [
# TODO: "swing %" might be handy here (data's max/min
# # % change)?
self._contents: list[str] = [
'change: {pchng:.2f} %',
'range: {rng:.2f}',
'bars: {nbars}',
@ -297,12 +342,30 @@ class SelectRect(QtWidgets.QGraphicsRectItem):
'sigma: {std:.2f}',
]
self.add_to_view(viewbox)
def add_to_view(
self,
view: ChartView,
) -> None:
'''
Self-defined view hookup impl which will
also re-assign the internal ref.
'''
view.addItem(
self,
ignoreBounds=True,
)
if self.vb is not view:
self.vb = view
@property
def chart(self) -> 'ChartPlotWidget': # noqa
def chart(self) -> ChartPlotWidget: # noqa
return self._chart
@chart.setter
def chart(self, chart: 'ChartPlotWidget') -> None: # noqa
def chart(self, chart: ChartPlotWidget) -> None: # noqa
self._chart = chart
chart.sigRangeChanged.connect(self.update_on_resize)
palette = self._label.palette()
@ -315,57 +378,155 @@ class SelectRect(QtWidgets.QGraphicsRectItem):
)
def update_on_resize(self, vr, r):
"""Re-position measure label on view range change.
'''
Re-position measure label on view range change.
"""
'''
if self._abs_top_right:
self._label_proxy.setPos(
self.vb.mapFromView(self._abs_top_right)
)
def mouse_drag_released(
def set_scen_pos(
self,
p1: QPointF,
p2: QPointF
scen_p1: QPointF,
scen_p2: QPointF,
update_label: bool = True,
) -> None:
"""Called on final button release for mouse drag with start and
end positions.
'''
Set position from scene coords of selection rect (normally
from mouse position) and accompanying label, move label to
match.
"""
self.set_pos(p1, p2)
'''
# NOTE XXX: apparently just setting it doesn't work!?
# i have no idea why but it's pretty weird we have to do
# this transform thing which was basically pulled verbatim
# from the `pg.ViewBox.updateScaleBox()` method.
view_rect: QRectF = self.vb.childGroup.mapRectFromScene(
QRectF(
scen_p1,
scen_p2,
)
)
self.setPos(view_rect.topLeft())
# XXX: does not work..!?!?
# https://doc.qt.io/qt-5/qgraphicsrectitem.html#setRect
# self.setRect(view_rect)
def set_pos(
self,
p1: QPointF,
p2: QPointF
) -> None:
"""Set position of selection rect and accompanying label, move
label to match.
tr = QTransform.fromScale(
view_rect.width(),
view_rect.height(),
)
self.setTransform(tr)
"""
if self._label_proxy is None:
# https://doc.qt.io/qt-5/qgraphicsproxywidget.html
self._label_proxy = self.vb.scene().addWidget(self._label)
start_pos = self.vb.mapToView(p1)
end_pos = self.vb.mapToView(p2)
# map to view coords and update area
r = QtCore.QRectF(start_pos, end_pos)
# old way; don't need right?
# lr = QtCore.QRectF(p1, p2)
# r = self.vb.childGroup.mapRectFromParent(lr)
self.setPos(r.topLeft())
self.resetTransform()
self.setRect(r)
# XXX: never got this working, was always offset
# / transformed completely wrong (and off to the far right
# from the cursor?)
# self.set_view_pos(
# view_rect=view_rect,
# # self.vwqpToView(p1),
# # self.vb.mapToView(p2),
# # start_pos=self.vb.mapToScene(p1),
# # end_pos=self.vb.mapToScene(p2),
# )
self.show()
y1, y2 = start_pos.y(), end_pos.y()
x1, x2 = start_pos.x(), end_pos.x()
if update_label:
self.init_label(view_rect)
# TODO: heh, could probably use a max-min streamin algo here too
def set_view_pos(
self,
start_pos: QPointF | Sequence[float, float] | None = None,
end_pos: QPointF | Sequence[float, float] | None = None,
view_rect: QRectF | None = None,
update_label: bool = True,
) -> None:
'''
Set position from `ViewBox` coords (i.e. from the actual
data domain) of rect (and any accompanying label which is
moved to match).
'''
if self._chart is None:
raise RuntimeError(
'You MUST assign a `SelectRect.chart: ChartPlotWidget`!'
)
if view_rect is None:
# ensure point casting
start_pos: QPointF = as_point(start_pos)
end_pos: QPointF = as_point(end_pos)
# map to view coords and update area
view_rect = QtCore.QRectF(
start_pos,
end_pos,
)
self.setPos(view_rect.topLeft())
# NOTE: SERIOUSLY NO IDEA WHY THIS WORKS...
# but it does and all the other commented stuff above
# dint, dawg..
# self.resetTransform()
# self.setRect(view_rect)
tr = QTransform.fromScale(
view_rect.width(),
view_rect.height(),
)
self.setTransform(tr)
if update_label:
self.init_label(view_rect)
print(
'SelectRect modify:\n'
f'QRectF: {view_rect}\n'
f'start_pos: {start_pos}\n'
f'end_pos: {end_pos}\n'
)
self.show()
def init_label(
self,
view_rect: QRectF,
) -> QLabel:
# should be init-ed in `.__init__()`
label: QLabel = self._label
cv: ChartView = self.vb
# https://doc.qt.io/qt-5/qgraphicsproxywidget.html
if self._label_proxy is None:
scen: QGraphicsScene = cv.scene()
# NOTE: specifically this is passing a widget
# pointer to the scene's `.addWidget()` as per,
# https://doc.qt.io/qt-5/qgraphicsproxywidget.html#embedding-a-widget-with-qgraphicsproxywidget
self._label_proxy: QGraphicsProxyWidget = scen.addWidget(label)
# get label startup coords
tl: QPointF = view_rect.topLeft()
br: QPointF = view_rect.bottomRight()
x1, y1 = tl.x(), tl.y()
x2, y2 = br.x(), br.y()
# TODO: to remove, previous label corner point unpacking
# x1, y1 = start_pos.x(), start_pos.y()
# x2, y2 = end_pos.x(), end_pos.y()
# y1, y2 = start_pos.y(), end_pos.y()
# x1, x2 = start_pos.x(), end_pos.x()
# TODO: heh, could probably use a max-min streamin algo
# here too?
_, xmn = min(y1, y2), min(x1, x2)
ymx, xmx = max(y1, y2), max(x1, x2)
@ -375,26 +536,35 @@ class SelectRect(QtWidgets.QGraphicsRectItem):
ixmn, ixmx = round(xmn), round(xmx)
nbars = ixmx - ixmn + 1
chart = self._chart
data = chart.get_viz(chart.name).shm.array[ixmn:ixmx]
chart: ChartPlotWidget = self._chart
data: np.ndarray = chart.get_viz(
chart.name
).shm.array[ixmn:ixmx]
if len(data):
std = data['close'].std()
dmx = data['high'].max()
dmn = data['low'].min()
std: float = data['close'].std()
dmx: float = data['high'].max()
dmn: float = data['low'].min()
else:
dmn = dmx = std = np.nan
# update label info
self._label.setText('\n'.join(self._contents).format(
pchng=pchng, rng=rng, nbars=nbars,
std=std, dmx=dmx, dmn=dmn,
label.setText('\n'.join(self._contents).format(
pchng=pchng,
rng=rng,
nbars=nbars,
std=std,
dmx=dmx,
dmn=dmn,
))
# print(f'x2, y2: {(x2, y2)}')
# print(f'xmn, ymn: {(xmn, ymx)}')
label_anchor = Point(xmx + 2, ymx)
label_anchor = Point(
xmx + 2,
ymx,
)
# XXX: in the drag bottom-right -> top-left case we don't
# want the label to overlay the box.
@ -403,13 +573,16 @@ class SelectRect(QtWidgets.QGraphicsRectItem):
# # label_anchor = Point(x2, y2 + self._label.height())
# label_anchor = Point(xmn, ymn)
self._abs_top_right = label_anchor
self._label_proxy.setPos(self.vb.mapFromView(label_anchor))
# self._label.show()
self._abs_top_right: Point = label_anchor
self._label_proxy.setPos(
cv.mapFromView(label_anchor)
)
label.show()
def clear(self):
"""Clear the selection box from view.
'''
Clear the selection box from view.
"""
'''
self._label.hide()
self.hide()

View File

@ -181,7 +181,10 @@ async def open_fsp_sidepane(
async def open_fsp_actor_cluster(
names: list[str] = ['fsp_0', 'fsp_1'],
) -> AsyncGenerator[int, dict[str, tractor.Portal]]:
) -> AsyncGenerator[
int,
dict[str, tractor.Portal]
]:
from tractor._clustering import open_actor_cluster
@ -557,7 +560,7 @@ class FspAdmin:
conf: dict, # yeah probably dumb..
loglevel: str = 'error',
) -> (trio.Event, ChartPlotWidget):
) -> trio.Event:
flume, started = await self.start_engine_task(
target,
@ -924,7 +927,7 @@ async def start_fsp_displays(
linked: LinkedSplits,
flume: Flume,
group_status_key: str,
# group_status_key: str,
loglevel: str,
) -> None:
@ -971,21 +974,23 @@ async def start_fsp_displays(
flume,
) as admin,
):
statuses = []
statuses: list[trio.Event] = []
for target, conf in fsp_conf.items():
started = await admin.open_fsp_chart(
started: trio.Event = await admin.open_fsp_chart(
target,
conf,
)
done = linked.window().status_bar.open_status(
f'loading fsp, {target}..',
group_key=group_status_key,
)
statuses.append((started, done))
# done = linked.window().status_bar.open_status(
# f'loading fsp, {target}..',
# group_key=group_status_key,
# )
# statuses.append((started, done))
statuses.append(started)
for fsp_loaded, status_cb in statuses:
# for fsp_loaded, status_cb in statuses:
for fsp_loaded in statuses:
await fsp_loaded.wait()
profiler(f'attached to fsp portal: {target}')
status_cb()
# status_cb()
# blocks on nursery until all fsp actors complete

View File

@ -30,7 +30,11 @@ from typing import (
)
import pyqtgraph as pg
# from pyqtgraph.GraphicsScene import mouseEvents
# NOTE XXX: pg is super annoying and re-implements it's own mouse
# event subsystem.. we should really look into re-working/writing
# this down the road.. Bo
from pyqtgraph.GraphicsScene import mouseEvents as mevs
# from pyqtgraph.GraphicsScene.mouseEvents import MouseDragEvent
from PyQt5.QtWidgets import QGraphicsSceneMouseEvent as gs_mouse
from PyQt5.QtGui import (
QWheelEvent,
@ -466,6 +470,7 @@ class ChartView(ViewBox):
mode_name: str = 'view'
def_delta: float = 616 * 6
def_scale_factor: float = 1.016 ** (def_delta * -1 / 20)
# annots: dict[int, GraphicsObject] = {}
def __init__(
self,
@ -486,6 +491,7 @@ class ChartView(ViewBox):
# defaultPadding=0.,
**kwargs
)
# for "known y-range style"
self._static_yrange = static_yrange
@ -500,7 +506,11 @@ class ChartView(ViewBox):
# add our selection box annotator
self.select_box = SelectRect(self)
self.addItem(self.select_box, ignoreBounds=True)
# self.select_box.add_to_view(self)
# self.addItem(
# self.select_box,
# ignoreBounds=True,
# )
self.mode = None
self.order_mode: bool = False
@ -711,17 +721,18 @@ class ChartView(ViewBox):
def mouseDragEvent(
self,
ev,
ev: mevs.MouseDragEvent,
axis: int | None = None,
) -> None:
pos = ev.pos()
lastPos = ev.lastPos()
dif = pos - lastPos
dif = dif * -1
pos: Point = ev.pos()
lastPos: Point = ev.lastPos()
dif: Point = (pos - lastPos) * -1
# dif: Point = pos - lastPos
# dif: Point = dif * -1
# NOTE: if axis is specified, event will only affect that axis.
button = ev.button()
btn = ev.button()
# Ignore axes if mouse is disabled
mouseEnabled = np.array(
@ -733,7 +744,7 @@ class ChartView(ViewBox):
mask[1-axis] = 0.0
# Scale or translate based on mouse button
if button & (
if btn & (
QtCore.Qt.LeftButton | QtCore.Qt.MidButton
):
# zoom y-axis ONLY when click-n-drag on it
@ -756,34 +767,55 @@ class ChartView(ViewBox):
# XXX: WHY
ev.accept()
down_pos = ev.buttonDownPos()
down_pos: Point = ev.buttonDownPos(
btn=btn,
)
scen_pos: Point = ev.scenePos()
scen_down_pos: Point = ev.buttonDownScenePos(
btn=btn,
)
# This is the final position in the drag
if ev.isFinish():
self.select_box.mouse_drag_released(down_pos, pos)
# import pdbp; pdbp.set_trace()
ax = QtCore.QRectF(down_pos, pos)
ax = self.childGroup.mapRectFromParent(ax)
# NOTE: think of this as a `.mouse_drag_release()`
# (bc HINT that's what i called the shit ass
# method that wrapped this call [yes, as a single
# fucking call] originally.. you bish, guille)
# Bo.. oraleeee
self.select_box.set_scen_pos(
# down_pos,
# pos,
scen_down_pos,
scen_pos,
)
# this is the zoom transform cmd
self.showAxRect(ax)
ax = QtCore.QRectF(down_pos, pos)
ax = self.childGroup.mapRectFromParent(ax)
# self.showAxRect(ax)
# axis history tracking
self.axHistoryPointer += 1
self.axHistory = self.axHistory[
:self.axHistoryPointer] + [ax]
else:
print('drag finish?')
self.select_box.set_pos(down_pos, pos)
self.select_box.set_scen_pos(
# down_pos,
# pos,
scen_down_pos,
scen_pos,
)
# update shape of scale box
# self.updateScaleBox(ev.buttonDownPos(), ev.pos())
self.updateScaleBox(
down_pos,
ev.pos(),
)
# breakpoint()
# self.updateScaleBox(
# down_pos,
# ev.pos(),
# )
# PANNING MODE
else:
@ -822,7 +854,7 @@ class ChartView(ViewBox):
# ev.accept()
# WEIRD "RIGHT-CLICK CENTER ZOOM" MODE
elif button & QtCore.Qt.RightButton:
elif btn & QtCore.Qt.RightButton:
if self.state['aspectLocked'] is not False:
mask[0] = 0

View File

@ -24,8 +24,6 @@ view transforms.
"""
import pyqtgraph as pg
from ._axes import Axis
def invertQTransform(tr):
"""Return a QTransform that is the inverse of *tr*.
@ -53,6 +51,9 @@ def _do_overrides() -> None:
pg.functions.invertQTransform = invertQTransform
pg.PlotItem = PlotItem
from ._axes import Axis
pg.Axis = Axis
# enable "QPainterPathPrivate for faster arrayToQPath" from
# https://github.com/pyqtgraph/pyqtgraph/pull/2324
pg.setConfigOption('enableExperimental', True)
@ -234,7 +235,7 @@ class PlotItem(pg.PlotItem):
# ``ViewBox`` geometry bug.. where a gap for the
# 'bottom' axis is somehow left in?
# axis = pg.AxisItem(orientation=name, parent=self)
axis = Axis(
axis = pg.Axis(
self,
orientation=name,
parent=self,

View File

@ -0,0 +1,273 @@
# piker: trading gear for hackers
# Copyright (C) Tyler Goodlet (in stewardship for pikers)
# This program is free software: you can redistribute it and/or modify
# it under the terms of the GNU Affero General Public License as published by
# the Free Software Foundation, either version 3 of the License, or
# (at your option) any later version.
# This program is distributed in the hope that it will be useful,
# but WITHOUT ANY WARRANTY; without even the implied warranty of
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
# GNU Affero General Public License for more details.
# You should have received a copy of the GNU Affero General Public License
# along with this program. If not, see <https://www.gnu.org/licenses/>.
'''
Remote control tasks for sending annotations (and maybe more cmds)
to a chart from some other actor.
'''
from __future__ import annotations
from contextlib import asynccontextmanager as acm
from pprint import pformat
from typing import (
Any,
AsyncContextManager,
)
import tractor
from tractor import trionics
from tractor import (
Portal,
Context,
MsgStream,
)
from PyQt5.QtWidgets import (
QGraphicsItem,
)
from piker.log import get_logger
from piker.types import Struct
from piker.service import find_service
from ._display import DisplayState
from ._interaction import ChartView
from ._editors import SelectRect
from ._chart import ChartPlotWidget
log = get_logger(__name__)
# NOTE: this is set by the `._display.graphics_update_loop()` once
# all chart widgets / Viz per flume have been initialized allowing
# for remote annotation (control) of any chart-actor's mkt feed by
# fqme lookup Bo
_dss: dict[str, DisplayState] | None = None
# stash each and every client connection so that they can all
# be cancelled on shutdown/error.
# TODO: make `tractor.Context` hashable via is `.cid: str`?
# _ctxs: set[Context] = set()
_ctxs: list[Context] = []
# global map of all uniquely created annotation-graphics
# so that they can be mutated (eventually) by a client.
_annots: dict[int, QGraphicsItem] = {}
@tractor.context
async def remote_annotate(
ctx: Context,
) -> None:
global _dss, _ctxs
assert _dss
_ctxs.append(ctx)
# send back full fqme symbology to caller
await ctx.started(list(_dss))
async with ctx.open_stream() as annot_req_stream:
async for msg in annot_req_stream:
match msg:
case {
'annot': 'SelectRect',
'fqme': fqme,
'timeframe': timeframe,
'meth': str(meth),
'kwargs': dict(kwargs),
}:
ds: DisplayState = _dss[fqme]
chart: ChartPlotWidget = {
60: ds.hist_chart,
1: ds.chart,
}[timeframe]
cv: ChartView = chart.cv
# sanity
if timeframe == 60:
assert (
chart.linked.godwidget.hist_linked.chart.view
is
cv
)
# annot type lookup from cmd
rect = SelectRect(
viewbox=cv,
# TODO: make this more dynamic?
# -[ ] pull from conf.toml?
# -[ ] add `.set_color()` method to type?
# -[ ] make a green/red based on direction
# instead of default static color?
color=kwargs.pop('color', None),
)
# XXX NOTE: this is REQUIRED to set the rect
# resize callback!
rect.chart: ChartPlotWidget = chart
# delegate generically to the requested method
getattr(rect, meth)(**kwargs)
rect.show()
await annot_req_stream.send(id(rect))
case _:
log.error(
'Unknown remote annotation cmd:\n'
f'{pformat(msg)}'
)
class AnnotCtl(Struct):
'''
A control for remote "data annotations".
You know those "squares they always show in machine vision
UIs.." this API allows you to remotely control stuff like that
in some other graphics actor.
'''
ctx2fqmes: dict[str, str]
fqme2stream: dict[str, MsgStream]
async def add_rect(
self,
fqme: str,
timeframe: float,
start_pos: tuple[float, float],
end_pos: tuple[float, float],
# TODO: a `Literal['view', 'scene']` for this?
domain: str = 'view', # or 'scene'
color: str = 'dad_blue',
) -> int:
'''
Add a `SelectRect` annotation to the target view, return
the instances `id(obj)` from the remote UI actor.
'''
ipc: MsgStream = self.fqme2stream[fqme]
await ipc.send({
'fqme': fqme,
'annot': 'SelectRect',
'timeframe': timeframe,
# 'meth': str(meth),
'meth': 'set_view_pos' if domain == 'view' else 'set_scene_pos',
'kwargs': {
'start_pos': tuple(start_pos),
'end_pos': tuple(end_pos),
'color': color,
'update_label': False,
},
})
return (await ipc.receive())
async def modify(
self,
aid: int, # annotation id
meth: str, # far end graphics object method to invoke
params: dict[str, Any], # far end `meth(**kwargs)`
) -> bool:
'''
Modify an existing (remote) annotation's graphics
paramters, thus changing it's appearance / state in real
time.
'''
raise NotImplementedError
async def remove(
self,
uid: int,
) -> bool:
'''
Remove an existing annotation by instance id.
'''
raise NotImplementedError
@acm
async def open_annot_ctl(
uid: tuple[str, str] | None = None,
) -> AnnotCtl:
# TODO: load connetion to a specific chart actor
# -[ ] pull from either service scan or config
# -[ ] return some kinda client/proxy thinger?
# -[ ] maybe we should finally just provide this as
# a `tractor.hilevel.CallableProxy` or wtv?
# -[ ] use this from the storage.cli stuff to mark up gaps!
maybe_portals: list[Portal] | None
fqmes: list[str]
async with find_service(
service_name='chart',
first_only=False,
) as maybe_portals:
ctx_mngrs: list[AsyncContextManager] = []
# TODO: print the current discoverable actor UID set
# here as well?
if not maybe_portals:
raise RuntimeError('No chart UI actors found in service domain?')
for portal in maybe_portals:
ctx_mngrs.append(
portal.open_context(remote_annotate)
)
ctx2fqmes: dict[str, set[str]] = {}
fqme2stream: dict[str, MsgStream] = {}
client = AnnotCtl(
ctx2fqmes=ctx2fqmes,
fqme2stream=fqme2stream,
)
stream_ctxs: list[AsyncContextManager] = []
async with trionics.gather_contexts(ctx_mngrs) as ctxs:
for (ctx, fqmes) in ctxs:
stream_ctxs.append(ctx.open_stream())
# fill lookup table of mkt addrs to IPC ctxs
for fqme in fqmes:
if other := fqme2stream.get(fqme):
raise ValueError(
f'More then one chart displays {fqme}!?\n'
'Other UI actor info:\n'
f'channel: {other._ctx.chan}]\n'
f'actor uid: {other._ctx.chan.uid}]\n'
f'ctx id: {other._ctx.cid}]\n'
)
ctx2fqmes.setdefault(
ctx.cid,
set(),
).add(fqme)
async with trionics.gather_contexts(stream_ctxs) as streams:
for stream in streams:
fqmes: set[str] = ctx2fqmes[stream._ctx.cid]
for fqme in fqmes:
fqme2stream[fqme] = stream
yield client
# TODO: on graceful teardown should we try to
# remove all annots that were created/modded?

View File

@ -228,5 +228,10 @@ def chart(
'loglevel': tractorloglevel,
'name': 'chart',
'registry_addrs': list(set(regaddrs)),
'enable_modules': [
# remote data-view annotations Bo
'piker.ui._remote_ctl',
],
},
)

View File

@ -31,7 +31,7 @@ import pendulum
import pyqtgraph as pg
from piker.types import Struct
from ..data.tsp import slice_from_time
from ..tsp import slice_from_time
from ..log import get_logger
from ..toolz import Profiler