Compare commits
No commits in common. "a681b2f0bbc7fe03a00b0fb931527b4c901f5140" and "1f9a4976378f863193572a0b64a5dba5e4aaf672" have entirely different histories.
a681b2f0bb
...
1f9a497637
|
@ -843,7 +843,6 @@ class Client:
|
|||
self,
|
||||
contract: Contract,
|
||||
timeout: float = 1,
|
||||
tries: int = 100,
|
||||
raise_on_timeout: bool = False,
|
||||
|
||||
) -> Ticker | None:
|
||||
|
@ -858,30 +857,30 @@ class Client:
|
|||
ready: ticker.TickerUpdateEvent = ticker.updateEvent
|
||||
|
||||
# ensure a last price gets filled in before we deliver quote
|
||||
timeouterr: Exception | None = None
|
||||
warnset: bool = False
|
||||
for _ in range(tries):
|
||||
|
||||
# wait for a first update(Event) indicatingn a
|
||||
# live quote feed.
|
||||
for _ in range(100):
|
||||
if isnan(ticker.last):
|
||||
|
||||
# wait for a first update(Event)
|
||||
try:
|
||||
tkr = await asyncio.wait_for(
|
||||
ready,
|
||||
timeout=timeout,
|
||||
)
|
||||
if tkr:
|
||||
break
|
||||
except TimeoutError as err:
|
||||
timeouterr = err
|
||||
await asyncio.sleep(0.01)
|
||||
continue
|
||||
except TimeoutError:
|
||||
import pdbp
|
||||
pdbp.set_trace()
|
||||
|
||||
if raise_on_timeout:
|
||||
raise
|
||||
return None
|
||||
|
||||
if tkr:
|
||||
break
|
||||
else:
|
||||
if not warnset:
|
||||
log.warning(
|
||||
f'Quote req timed out..maybe venue is closed?\n'
|
||||
f'{asdict(contract)}'
|
||||
f'Quote for {contract} timed out: market is closed?'
|
||||
)
|
||||
warnset = True
|
||||
|
||||
|
@ -889,11 +888,6 @@ class Client:
|
|||
log.info(f'Got first quote for {contract}')
|
||||
break
|
||||
else:
|
||||
if timeouterr and raise_on_timeout:
|
||||
import pdbp
|
||||
pdbp.set_trace()
|
||||
raise timeouterr
|
||||
|
||||
if not warnset:
|
||||
log.warning(
|
||||
f'Contract {contract} is not returning a quote '
|
||||
|
@ -901,8 +895,6 @@ class Client:
|
|||
)
|
||||
warnset = True
|
||||
|
||||
return None
|
||||
|
||||
return ticker
|
||||
|
||||
# async to be consistent for the client proxy, and cuz why not.
|
||||
|
|
|
@ -943,11 +943,6 @@ async def stream_quotes(
|
|||
contract=con,
|
||||
raise_on_timeout=True,
|
||||
)
|
||||
first_quote: dict = normalize(first_ticker)
|
||||
log.info(
|
||||
'Rxed init quote:\n'
|
||||
f'{pformat(first_quote)}'
|
||||
)
|
||||
cs: trio.CancelScope | None = None
|
||||
startup: bool = True
|
||||
while (
|
||||
|
|
|
@ -134,19 +134,14 @@ def get_app_dir(
|
|||
|
||||
_click_config_dir: Path = Path(get_app_dir('piker'))
|
||||
_config_dir: Path = _click_config_dir
|
||||
_parent_user: str = os.environ.get('SUDO_USER')
|
||||
|
||||
# NOTE: when using `sudo` we attempt to determine the non-root user
|
||||
# and still use their normal config dir.
|
||||
if (
|
||||
(_parent_user := os.environ.get('SUDO_USER'))
|
||||
and
|
||||
_parent_user != 'root'
|
||||
):
|
||||
if _parent_user:
|
||||
non_root_user_dir = Path(
|
||||
os.path.expanduser(f'~{_parent_user}')
|
||||
)
|
||||
root: str = 'root'
|
||||
_ccds: str = str(_click_config_dir) # click config dir as string
|
||||
_ccds: str = str(_click_config_dir) # click config dir string
|
||||
i_tail: int = int(_ccds.rfind(root) + len(root))
|
||||
_config_dir = (
|
||||
non_root_user_dir
|
||||
|
|
|
@ -45,7 +45,10 @@ import trio
|
|||
from trio.abc import ReceiveChannel
|
||||
from trio_typing import TaskStatus
|
||||
import tractor
|
||||
from tractor import trionics
|
||||
from tractor.trionics import (
|
||||
maybe_open_context,
|
||||
gather_contexts,
|
||||
)
|
||||
|
||||
from piker.accounting import (
|
||||
MktPair,
|
||||
|
@ -66,7 +69,7 @@ from .validate import (
|
|||
FeedInit,
|
||||
validate_backend,
|
||||
)
|
||||
from ..tsp import (
|
||||
from .history import (
|
||||
manage_history,
|
||||
)
|
||||
from .ingest import get_ingestormod
|
||||
|
@ -121,8 +124,6 @@ class _FeedsBus(Struct):
|
|||
trio.CancelScope] = trio.TASK_STATUS_IGNORED,
|
||||
) -> None:
|
||||
with trio.CancelScope() as cs:
|
||||
# TODO: shouldn't this be a direct await to avoid
|
||||
# cancellation contagion to the bus nursery!?!?!
|
||||
await self.nursery.start(
|
||||
target,
|
||||
*args,
|
||||
|
@ -329,6 +330,7 @@ async def allocate_persistent_feed(
|
|||
) = await bus.nursery.start(
|
||||
manage_history,
|
||||
mod,
|
||||
bus,
|
||||
mkt,
|
||||
some_data_ready,
|
||||
feed_is_live,
|
||||
|
@ -405,13 +407,7 @@ async def allocate_persistent_feed(
|
|||
rt_shm.array['time'][1] = ts + 1
|
||||
|
||||
elif hist_shm.array.size == 0:
|
||||
for i in range(100):
|
||||
await trio.sleep(0.1)
|
||||
if hist_shm.array.size > 0:
|
||||
break
|
||||
else:
|
||||
await tractor.pause()
|
||||
raise RuntimeError(f'History (1m) Shm for {fqme} is empty!?')
|
||||
raise RuntimeError(f'History (1m) Shm for {fqme} is empty!?')
|
||||
|
||||
# wait the spawning parent task to register its subscriber
|
||||
# send-stream entry before we start the sample loop.
|
||||
|
@ -457,12 +453,8 @@ async def open_feed_bus(
|
|||
if loglevel is None:
|
||||
loglevel = tractor.current_actor().loglevel
|
||||
|
||||
# XXX: required to propagate ``tractor`` loglevel to piker
|
||||
# logging
|
||||
get_console_log(
|
||||
loglevel
|
||||
or tractor.current_actor().loglevel
|
||||
)
|
||||
# XXX: required to propagate ``tractor`` loglevel to piker logging
|
||||
get_console_log(loglevel or tractor.current_actor().loglevel)
|
||||
|
||||
# local state sanity checks
|
||||
# TODO: check for any stale shm entries for this symbol
|
||||
|
@ -472,7 +464,7 @@ async def open_feed_bus(
|
|||
assert 'brokerd' in servicename
|
||||
assert brokername in servicename
|
||||
|
||||
bus: _FeedsBus = get_feed_bus(brokername)
|
||||
bus = get_feed_bus(brokername)
|
||||
sub_registered = trio.Event()
|
||||
|
||||
flumes: dict[str, Flume] = {}
|
||||
|
@ -776,7 +768,7 @@ async def maybe_open_feed(
|
|||
'''
|
||||
fqme = fqmes[0]
|
||||
|
||||
async with trionics.maybe_open_context(
|
||||
async with maybe_open_context(
|
||||
acm_func=open_feed,
|
||||
kwargs={
|
||||
'fqmes': fqmes,
|
||||
|
@ -796,7 +788,7 @@ async def maybe_open_feed(
|
|||
# add a new broadcast subscription for the quote stream
|
||||
# if this feed is likely already in use
|
||||
|
||||
async with trionics.gather_contexts(
|
||||
async with gather_contexts(
|
||||
mngrs=[stream.subscribe() for stream in feed.streams.values()]
|
||||
) as bstreams:
|
||||
for bstream, flume in zip(bstreams, feed.flumes.values()):
|
||||
|
@ -856,7 +848,7 @@ async def open_feed(
|
|||
)
|
||||
|
||||
portals: tuple[tractor.Portal]
|
||||
async with trionics.gather_contexts(
|
||||
async with gather_contexts(
|
||||
brokerd_ctxs,
|
||||
) as portals:
|
||||
|
||||
|
@ -908,7 +900,7 @@ async def open_feed(
|
|||
assert len(feed.mods) == len(feed.portals)
|
||||
|
||||
async with (
|
||||
trionics.gather_contexts(bus_ctxs) as ctxs,
|
||||
gather_contexts(bus_ctxs) as ctxs,
|
||||
):
|
||||
stream_ctxs: list[tractor.MsgStream] = []
|
||||
for (
|
||||
|
@ -950,7 +942,7 @@ async def open_feed(
|
|||
brokermod: ModuleType
|
||||
fqmes: list[str]
|
||||
async with (
|
||||
trionics.gather_contexts(stream_ctxs) as streams,
|
||||
gather_contexts(stream_ctxs) as streams,
|
||||
):
|
||||
for (
|
||||
stream,
|
||||
|
@ -966,12 +958,6 @@ async def open_feed(
|
|||
if brokermod.name == flume.mkt.broker:
|
||||
flume.stream = stream
|
||||
|
||||
assert (
|
||||
len(feed.mods)
|
||||
==
|
||||
len(feed.portals)
|
||||
==
|
||||
len(feed.streams)
|
||||
)
|
||||
assert len(feed.mods) == len(feed.portals) == len(feed.streams)
|
||||
|
||||
yield feed
|
||||
|
|
|
@ -32,7 +32,6 @@ from __future__ import annotations
|
|||
from datetime import datetime
|
||||
from functools import partial
|
||||
from pathlib import Path
|
||||
from pprint import pformat
|
||||
from types import ModuleType
|
||||
from typing import (
|
||||
Callable,
|
||||
|
@ -54,64 +53,25 @@ import polars as pl
|
|||
from ..accounting import (
|
||||
MktPair,
|
||||
)
|
||||
from ..data._util import (
|
||||
from ._util import (
|
||||
log,
|
||||
)
|
||||
from ..data._sharedmem import (
|
||||
from ._sharedmem import (
|
||||
maybe_open_shm_array,
|
||||
ShmArray,
|
||||
)
|
||||
from ..data._source import def_iohlcv_fields
|
||||
from ..data._sampling import (
|
||||
from ._source import def_iohlcv_fields
|
||||
from ._sampling import (
|
||||
open_sample_stream,
|
||||
)
|
||||
from ._anal import (
|
||||
|
||||
from .tsp import (
|
||||
dedupe,
|
||||
get_null_segs,
|
||||
iter_null_segs,
|
||||
Frame,
|
||||
Seq,
|
||||
|
||||
# codec-ish
|
||||
np2pl,
|
||||
pl2np,
|
||||
|
||||
# `numpy` only
|
||||
slice_from_time,
|
||||
|
||||
# `polars` specific
|
||||
dedupe,
|
||||
with_dts,
|
||||
detect_time_gaps,
|
||||
sort_diff,
|
||||
|
||||
# TODO:
|
||||
detect_price_gaps
|
||||
Frame,
|
||||
# Seq,
|
||||
)
|
||||
|
||||
__all__: list[str] = [
|
||||
'dedupe',
|
||||
'get_null_segs',
|
||||
'iter_null_segs',
|
||||
'sort_diff',
|
||||
'slice_from_time',
|
||||
'Frame',
|
||||
'Seq',
|
||||
|
||||
'np2pl',
|
||||
'pl2np',
|
||||
|
||||
'slice_from_time',
|
||||
|
||||
'with_dts',
|
||||
'detect_time_gaps',
|
||||
'sort_diff',
|
||||
|
||||
# TODO:
|
||||
'detect_price_gaps'
|
||||
]
|
||||
|
||||
# TODO: break up all this shite into submods!
|
||||
from ..brokers._util import (
|
||||
DataUnavailable,
|
||||
)
|
||||
|
@ -269,20 +229,16 @@ async def maybe_fill_null_segments(
|
|||
# - remember that in the display side, only refersh this
|
||||
# if the respective history is actually "in view".
|
||||
# loop
|
||||
try:
|
||||
await sampler_stream.send({
|
||||
'broadcast_all': {
|
||||
await sampler_stream.send({
|
||||
'broadcast_all': {
|
||||
|
||||
# XXX NOTE XXX: see the
|
||||
# `.ui._display.increment_history_view()` if block
|
||||
# that looks for this info to FORCE a hard viz
|
||||
# redraw!
|
||||
'backfilling': (mkt.fqme, timeframe),
|
||||
},
|
||||
})
|
||||
except tractor.ContextCancelled:
|
||||
# log.exception
|
||||
await tractor.pause()
|
||||
# XXX NOTE XXX: see the
|
||||
# `.ui._display.increment_history_view()` if block
|
||||
# that looks for this info to FORCE a hard viz
|
||||
# redraw!
|
||||
'backfilling': (mkt.fqme, timeframe),
|
||||
},
|
||||
})
|
||||
|
||||
null_segs_detected.set()
|
||||
# RECHECK for more null-gaps
|
||||
|
@ -296,68 +252,39 @@ async def maybe_fill_null_segments(
|
|||
and
|
||||
len(null_segs[-1])
|
||||
):
|
||||
(
|
||||
iabs_slices,
|
||||
iabs_zero_rows,
|
||||
zero_t,
|
||||
) = null_segs
|
||||
log.warning(
|
||||
f'{len(iabs_slices)} NULL TIME SEGMENTS DETECTED!\n'
|
||||
f'{pformat(iabs_slices)}'
|
||||
)
|
||||
await tractor.pause()
|
||||
|
||||
# TODO: always backfill gaps with the earliest (price) datum's
|
||||
# value to avoid the y-ranger including zeros and completely
|
||||
# stretching the y-axis..
|
||||
# array: np.ndarray = shm.array
|
||||
# zeros = array[array['low'] == 0]
|
||||
ohlc_fields: list[str] = [
|
||||
array = shm.array
|
||||
zeros = array[array['low'] == 0]
|
||||
|
||||
# always backfill gaps with the earliest (price) datum's
|
||||
# value to avoid the y-ranger including zeros and completely
|
||||
# stretching the y-axis..
|
||||
if 0 < zeros.size:
|
||||
zeros[[
|
||||
'open',
|
||||
'high',
|
||||
'low',
|
||||
'close',
|
||||
]
|
||||
]] = shm._array[zeros['index'][0] - 1]['close']
|
||||
|
||||
for istart, istop in iabs_slices:
|
||||
|
||||
# get view into buffer for null-segment
|
||||
gap: np.ndarray = shm._array[istart:istop]
|
||||
|
||||
# copy the oldest OHLC samples forward
|
||||
gap[ohlc_fields] = shm._array[istart]['close']
|
||||
|
||||
start_t: float = shm._array[istart]['time']
|
||||
t_diff: float = (istop - istart)*timeframe
|
||||
gap['time'] = np.arange(
|
||||
start=start_t,
|
||||
stop=start_t + t_diff,
|
||||
step=timeframe,
|
||||
)
|
||||
|
||||
await sampler_stream.send({
|
||||
'broadcast_all': {
|
||||
|
||||
# XXX NOTE XXX: see the
|
||||
# `.ui._display.increment_history_view()` if block
|
||||
# that looks for this info to FORCE a hard viz
|
||||
# redraw!
|
||||
'backfilling': (mkt.fqme, timeframe),
|
||||
},
|
||||
})
|
||||
|
||||
# TODO: interatively step through any remaining
|
||||
# time-gaps/null-segments and spawn piecewise backfiller
|
||||
# tasks in a nursery?
|
||||
# -[ ] not sure that's going to work so well on the ib
|
||||
# backend but worth a shot?
|
||||
# -[ ] mk new history connections to make it properly
|
||||
# parallel possible no matter the backend?
|
||||
# -[ ] fill algo: do queries in alternating "latest, then
|
||||
# earliest, then latest.. etc?"
|
||||
# await tractor.pause()
|
||||
# TODO: interatively step through any remaining
|
||||
# time-gaps/null-segments and spawn piecewise backfiller
|
||||
# tasks in a nursery?
|
||||
# -[ ] not sure that's going to work so well on the ib
|
||||
# backend but worth a shot?
|
||||
# -[ ] mk new history connections to make it properly
|
||||
# parallel possible no matter the backend?
|
||||
# -[ ] fill algo: do queries in alternating "latest, then
|
||||
# earliest, then latest.. etc?"
|
||||
# if (
|
||||
# next_end_dt not in frame[
|
||||
# ):
|
||||
# pass
|
||||
|
||||
|
||||
async def start_backfill(
|
||||
tn: trio.Nursery,
|
||||
get_hist,
|
||||
mod: ModuleType,
|
||||
mkt: MktPair,
|
||||
|
@ -411,6 +338,7 @@ async def start_backfill(
|
|||
# settings above when the tsdb is detected as being empty.
|
||||
backfill_until_dt = backfill_from_dt.subtract(**period_duration)
|
||||
|
||||
|
||||
# STAGE NOTE: "backward history gap filling":
|
||||
# - we push to the shm buffer until we have history back
|
||||
# until the latest entry loaded from the tsdb's table B)
|
||||
|
@ -754,8 +682,6 @@ async def back_load_from_tsdb(
|
|||
|
||||
|
||||
async def push_latest_frame(
|
||||
# box-type only that should get packed with the datetime
|
||||
# objects received for the latest history frame
|
||||
dt_eps: list[DateTime, DateTime],
|
||||
shm: ShmArray,
|
||||
get_hist: Callable[
|
||||
|
@ -765,11 +691,8 @@ async def push_latest_frame(
|
|||
timeframe: float,
|
||||
config: dict,
|
||||
|
||||
task_status: TaskStatus[
|
||||
Exception | list[datetime, datetime]
|
||||
] = trio.TASK_STATUS_IGNORED,
|
||||
|
||||
) -> list[datetime, datetime] | None:
|
||||
task_status: TaskStatus[None] = trio.TASK_STATUS_IGNORED,
|
||||
):
|
||||
# get latest query's worth of history all the way
|
||||
# back to what is recorded in the tsdb
|
||||
try:
|
||||
|
@ -786,19 +709,17 @@ async def push_latest_frame(
|
|||
mr_start_dt,
|
||||
mr_end_dt,
|
||||
])
|
||||
task_status.started(dt_eps)
|
||||
|
||||
# XXX: timeframe not supported for backend (since
|
||||
# above exception type), terminate immediately since
|
||||
# there's no backfilling possible.
|
||||
except DataUnavailable:
|
||||
task_status.started(None)
|
||||
task_status.started()
|
||||
|
||||
if timeframe > 1:
|
||||
await tractor.pause()
|
||||
|
||||
# prolly tf not supported
|
||||
return None
|
||||
return
|
||||
|
||||
# NOTE: on the first history, most recent history
|
||||
# frame we PREPEND from the current shm ._last index
|
||||
|
@ -810,16 +731,11 @@ async def push_latest_frame(
|
|||
prepend=True, # append on first frame
|
||||
)
|
||||
|
||||
return dt_eps
|
||||
|
||||
|
||||
async def load_tsdb_hist(
|
||||
storage: StorageClient,
|
||||
mkt: MktPair,
|
||||
timeframe: float,
|
||||
|
||||
task_status: TaskStatus[None] = trio.TASK_STATUS_IGNORED,
|
||||
|
||||
) -> tuple[
|
||||
np.ndarray,
|
||||
DateTime,
|
||||
|
@ -905,63 +821,48 @@ async def tsdb_backfill(
|
|||
config,
|
||||
)
|
||||
|
||||
# tell parent task to continue
|
||||
# TODO: really we'd want this the other way with the
|
||||
# tsdb load happening asap and the since the latest
|
||||
# frame query will normally be the main source of
|
||||
# latency?
|
||||
task_status.started()
|
||||
|
||||
tsdb_entry: tuple = await load_tsdb_hist(
|
||||
storage,
|
||||
mkt,
|
||||
timeframe,
|
||||
)
|
||||
|
||||
# tell parent task to continue
|
||||
# TODO: really we'd want this the other way with the
|
||||
# tsdb load happening asap and the since the latest
|
||||
# frame query will normally be the main source of
|
||||
# latency?
|
||||
task_status.started()
|
||||
|
||||
# NOTE: iabs to start backfilling from, reverse chronological,
|
||||
# ONLY AFTER the first history frame has been pushed to
|
||||
# mem!
|
||||
backfill_gap_from_shm_index: int = shm._first.value + 1
|
||||
|
||||
# Prepend any tsdb history into the rt-shm-buffer which
|
||||
# should NOW be getting filled with the most recent history
|
||||
# pulled from the data-backend.
|
||||
if dt_eps:
|
||||
# well then, unpack the latest (gap) backfilled frame dts
|
||||
(
|
||||
mr_start_dt,
|
||||
mr_end_dt,
|
||||
) = dt_eps
|
||||
(
|
||||
mr_start_dt,
|
||||
mr_end_dt,
|
||||
) = dt_eps
|
||||
|
||||
# NOTE: when there's no offline data, there's 2 cases:
|
||||
# - data backend doesn't support timeframe/sample
|
||||
# period (in which case `dt_eps` should be `None` and
|
||||
# we shouldn't be here!), or
|
||||
# - no prior history has been stored (yet) and we need
|
||||
# todo full backfill of the history now.
|
||||
if tsdb_entry is None:
|
||||
# indicate to backfill task to fill the whole
|
||||
# shm buffer as much as it can!
|
||||
last_tsdb_dt = None
|
||||
async with trio.open_nursery() as tn:
|
||||
|
||||
# there's existing tsdb history from (offline) storage
|
||||
# so only backfill the gap between the
|
||||
# most-recent-frame (mrf) and that latest sample.
|
||||
else:
|
||||
# Prepend any tsdb history to the shm buffer which should
|
||||
# now be full of the most recent history pulled from the
|
||||
# backend's last frame.
|
||||
if tsdb_entry:
|
||||
(
|
||||
tsdb_history,
|
||||
first_tsdb_dt,
|
||||
last_tsdb_dt,
|
||||
) = tsdb_entry
|
||||
|
||||
# if there is a gap to backfill from the first
|
||||
# history frame until the last datum loaded from the tsdb
|
||||
# continue that now in the background
|
||||
async with trio.open_nursery() as tn:
|
||||
|
||||
# if there is a gap to backfill from the first
|
||||
# history frame until the last datum loaded from the tsdb
|
||||
# continue that now in the background
|
||||
bf_done = await tn.start(
|
||||
partial(
|
||||
start_backfill,
|
||||
tn=tn,
|
||||
get_hist=get_hist,
|
||||
mod=mod,
|
||||
mkt=mkt,
|
||||
|
@ -970,107 +871,102 @@ async def tsdb_backfill(
|
|||
|
||||
backfill_from_shm_index=backfill_gap_from_shm_index,
|
||||
backfill_from_dt=mr_start_dt,
|
||||
|
||||
sampler_stream=sampler_stream,
|
||||
backfill_until_dt=last_tsdb_dt,
|
||||
|
||||
storage=storage,
|
||||
write_tsdb=True,
|
||||
)
|
||||
)
|
||||
nulls_detected: trio.Event | None = None
|
||||
if last_tsdb_dt is not None:
|
||||
# calc the index from which the tsdb data should be
|
||||
# prepended, presuming there is a gap between the
|
||||
# latest frame (loaded/read above) and the latest
|
||||
# sample loaded from the tsdb.
|
||||
backfill_diff: Duration = mr_start_dt - last_tsdb_dt
|
||||
offset_s: float = backfill_diff.in_seconds()
|
||||
|
||||
# XXX EDGE CASEs: the most recent frame overlaps with
|
||||
# prior tsdb history!!
|
||||
# - so the latest frame's start time is earlier then
|
||||
# the tsdb's latest sample.
|
||||
# - alternatively this may also more generally occur
|
||||
# when the venue was closed (say over the weeknd)
|
||||
# causing a timeseries gap, AND the query frames size
|
||||
# (eg. for ib's 1s we rx 2k datums ~= 33.33m) IS
|
||||
# GREATER THAN the current venue-market's operating
|
||||
# session (time) we will receive datums from BEFORE THE
|
||||
# CLOSURE GAP and thus the `offset_s` value will be
|
||||
# NEGATIVE! In this case we need to ensure we don't try
|
||||
# to push datums that have already been recorded in the
|
||||
# tsdb. In this case we instead only retreive and push
|
||||
# the series portion missing from the db's data set.
|
||||
# if offset_s < 0:
|
||||
# non_overlap_diff: Duration = mr_end_dt - last_tsdb_dt
|
||||
# non_overlap_offset_s: float = backfill_diff.in_seconds()
|
||||
# calc the index from which the tsdb data should be
|
||||
# prepended, presuming there is a gap between the
|
||||
# latest frame (loaded/read above) and the latest
|
||||
# sample loaded from the tsdb.
|
||||
backfill_diff: Duration = mr_start_dt - last_tsdb_dt
|
||||
offset_s: float = backfill_diff.in_seconds()
|
||||
|
||||
offset_samples: int = round(offset_s / timeframe)
|
||||
# XXX EDGE CASEs: the most recent frame overlaps with
|
||||
# prior tsdb history!!
|
||||
# - so the latest frame's start time is earlier then
|
||||
# the tsdb's latest sample.
|
||||
# - alternatively this may also more generally occur
|
||||
# when the venue was closed (say over the weeknd)
|
||||
# causing a timeseries gap, AND the query frames size
|
||||
# (eg. for ib's 1s we rx 2k datums ~= 33.33m) IS
|
||||
# GREATER THAN the current venue-market's operating
|
||||
# session (time) we will receive datums from BEFORE THE
|
||||
# CLOSURE GAP and thus the `offset_s` value will be
|
||||
# NEGATIVE! In this case we need to ensure we don't try
|
||||
# to push datums that have already been recorded in the
|
||||
# tsdb. In this case we instead only retreive and push
|
||||
# the series portion missing from the db's data set.
|
||||
# if offset_s < 0:
|
||||
# non_overlap_diff: Duration = mr_end_dt - last_tsdb_dt
|
||||
# non_overlap_offset_s: float = backfill_diff.in_seconds()
|
||||
|
||||
# TODO: see if there's faster multi-field reads:
|
||||
# https://numpy.org/doc/stable/user/basics.rec.html#accessing-multiple-fields
|
||||
# re-index with a `time` and index field
|
||||
if offset_s > 0:
|
||||
# NOTE XXX: ONLY when there is an actual gap
|
||||
# between the earliest sample in the latest history
|
||||
# frame do we want to NOT stick the latest tsdb
|
||||
# history adjacent to that latest frame!
|
||||
prepend_start = shm._first.value - offset_samples + 1
|
||||
to_push = tsdb_history[-prepend_start:]
|
||||
else:
|
||||
# when there is overlap we want to remove the
|
||||
# overlapping samples from the tsdb portion (taking
|
||||
# instead the latest frame's values since THEY
|
||||
# SHOULD BE THE SAME) and prepend DIRECTLY adjacent
|
||||
# to the latest frame!
|
||||
# TODO: assert the overlap segment array contains
|
||||
# the same values!?!
|
||||
prepend_start = shm._first.value
|
||||
to_push = tsdb_history[-(shm._first.value):offset_samples - 1]
|
||||
offset_samples: int = round(offset_s / timeframe)
|
||||
|
||||
# tsdb history is so far in the past we can't fit it in
|
||||
# shm buffer space so simply don't load it!
|
||||
if prepend_start > 0:
|
||||
shm.push(
|
||||
to_push,
|
||||
# TODO: see if there's faster multi-field reads:
|
||||
# https://numpy.org/doc/stable/user/basics.rec.html#accessing-multiple-fields
|
||||
# re-index with a `time` and index field
|
||||
if offset_s > 0:
|
||||
# NOTE XXX: ONLY when there is an actual gap
|
||||
# between the earliest sample in the latest history
|
||||
# frame do we want to NOT stick the latest tsdb
|
||||
# history adjacent to that latest frame!
|
||||
prepend_start = shm._first.value - offset_samples + 1
|
||||
to_push = tsdb_history[-prepend_start:]
|
||||
else:
|
||||
# when there is overlap we want to remove the
|
||||
# overlapping samples from the tsdb portion (taking
|
||||
# instead the latest frame's values since THEY
|
||||
# SHOULD BE THE SAME) and prepend DIRECTLY adjacent
|
||||
# to the latest frame!
|
||||
# TODO: assert the overlap segment array contains
|
||||
# the same values!?!
|
||||
prepend_start = shm._first.value
|
||||
to_push = tsdb_history[-(shm._first.value):offset_samples - 1]
|
||||
|
||||
# insert the history pre a "days worth" of samples
|
||||
# to leave some real-time buffer space at the end.
|
||||
prepend=True,
|
||||
# update_first=False,
|
||||
start=prepend_start,
|
||||
field_map=storemod.ohlc_key_map,
|
||||
)
|
||||
# tsdb history is so far in the past we can't fit it in
|
||||
# shm buffer space so simply don't load it!
|
||||
if prepend_start > 0:
|
||||
shm.push(
|
||||
to_push,
|
||||
|
||||
log.info(f'Loaded {to_push.shape} datums from storage')
|
||||
# insert the history pre a "days worth" of samples
|
||||
# to leave some real-time buffer space at the end.
|
||||
prepend=True,
|
||||
# update_first=False,
|
||||
start=prepend_start,
|
||||
field_map=storemod.ohlc_key_map,
|
||||
)
|
||||
|
||||
# NOTE: ASYNC-conduct tsdb timestamp gap detection and backfill any
|
||||
# seemingly missing (null-time) segments..
|
||||
# TODO: ideally these can never exist!
|
||||
# -[ ] somehow it seems sometimes we're writing zero-ed
|
||||
# segments to tsdbs during teardown?
|
||||
# -[ ] can we ensure that the backcfiller tasks do this
|
||||
# work PREVENTAVELY instead?
|
||||
# -[ ] fill in non-zero epoch time values ALWAYS!
|
||||
# await maybe_fill_null_segments(
|
||||
nulls_detected: trio.Event = await tn.start(partial(
|
||||
maybe_fill_null_segments,
|
||||
log.info(f'Loaded {to_push.shape} datums from storage')
|
||||
|
||||
shm=shm,
|
||||
timeframe=timeframe,
|
||||
get_hist=get_hist,
|
||||
sampler_stream=sampler_stream,
|
||||
mkt=mkt,
|
||||
))
|
||||
# NOTE: ASYNC-conduct tsdb timestamp gap detection and backfill any
|
||||
# seemingly missing (null-time) segments..
|
||||
# TODO: ideally these can never exist!
|
||||
# -[ ] somehow it seems sometimes we're writing zero-ed
|
||||
# segments to tsdbs during teardown?
|
||||
# -[ ] can we ensure that the backcfiller tasks do this
|
||||
# work PREVENTAVELY instead?
|
||||
# -[ ] fill in non-zero epoch time values ALWAYS!
|
||||
# await maybe_fill_null_segments(
|
||||
nulls_detected: trio.Event = await tn.start(partial(
|
||||
maybe_fill_null_segments,
|
||||
|
||||
# 2nd nursery END
|
||||
shm=shm,
|
||||
timeframe=timeframe,
|
||||
get_hist=get_hist,
|
||||
sampler_stream=sampler_stream,
|
||||
mkt=mkt,
|
||||
))
|
||||
|
||||
# TODO: who would want to?
|
||||
if nulls_detected:
|
||||
await nulls_detected.wait()
|
||||
|
||||
await bf_done.wait()
|
||||
# TODO: who would want to?
|
||||
await nulls_detected.wait()
|
||||
|
||||
await bf_done.wait()
|
||||
# TODO: maybe start history anal and load missing "history
|
||||
# gaps" via backend..
|
||||
|
||||
|
@ -1114,6 +1010,7 @@ async def tsdb_backfill(
|
|||
|
||||
async def manage_history(
|
||||
mod: ModuleType,
|
||||
bus: _FeedsBus,
|
||||
mkt: MktPair,
|
||||
some_data_ready: trio.Event,
|
||||
feed_is_live: trio.Event,
|
||||
|
@ -1270,6 +1167,7 @@ async def manage_history(
|
|||
tsdb_backfill,
|
||||
mod=mod,
|
||||
storemod=storemod,
|
||||
# bus,
|
||||
storage=client,
|
||||
mkt=mkt,
|
||||
shm=tf2mem[timeframe],
|
||||
|
@ -1354,11 +1252,13 @@ def iter_dfs_from_shms(
|
|||
assert not opened
|
||||
ohlcv = shm.array
|
||||
|
||||
from ._anal import np2pl
|
||||
df: pl.DataFrame = np2pl(ohlcv)
|
||||
from ..data import tsp
|
||||
df: pl.DataFrame = tsp.np2pl(ohlcv)
|
||||
|
||||
yield (
|
||||
shmfile,
|
||||
shm,
|
||||
df,
|
||||
)
|
||||
|
||||
|
|
@ -319,8 +319,9 @@ def get_null_segs(
|
|||
if num_gaps < 1:
|
||||
if absi_zeros.size > 1:
|
||||
absi_zsegs = [[
|
||||
# TODO: maybe mk these max()/min() limits func
|
||||
# consts instead of called more then once?
|
||||
# see `get_hist()` in backend, should ALWAYS be
|
||||
# able to handle a `start_dt=None`!
|
||||
# None,
|
||||
max(
|
||||
absi_zeros[0] - 1,
|
||||
0,
|
||||
|
@ -358,10 +359,7 @@ def get_null_segs(
|
|||
# corresponding to the first zero-segment's row, we add it
|
||||
# manually here.
|
||||
absi_zsegs.append([
|
||||
max(
|
||||
absi_zeros[0] - 1,
|
||||
0,
|
||||
),
|
||||
absi_zeros[0] - 1,
|
||||
None,
|
||||
])
|
||||
|
||||
|
@ -402,18 +400,14 @@ def get_null_segs(
|
|||
|
||||
else:
|
||||
if 0 < num_gaps < 2:
|
||||
absi_zsegs[-1][1] = min(
|
||||
absi_zeros[-1] + 1,
|
||||
frame['index'][-1],
|
||||
)
|
||||
absi_zsegs[-1][1] = absi_zeros[-1] + 1
|
||||
|
||||
iabs_first: int = frame['index'][0]
|
||||
for start, end in absi_zsegs:
|
||||
|
||||
ts_start: float = times[start - iabs_first]
|
||||
ts_end: float = times[end - iabs_first]
|
||||
if (
|
||||
(ts_start == 0 and not start == 0)
|
||||
ts_start == 0
|
||||
or
|
||||
ts_end == 0
|
||||
):
|
||||
|
@ -457,13 +451,11 @@ def iter_null_segs(
|
|||
],
|
||||
None,
|
||||
]:
|
||||
if not (
|
||||
null_segs := get_null_segs(
|
||||
if null_segs is None:
|
||||
null_segs: tuple = get_null_segs(
|
||||
frame,
|
||||
period=timeframe,
|
||||
)
|
||||
):
|
||||
return
|
||||
|
||||
absi_pairs_zsegs: list[list[float, float]]
|
||||
izeros: Seq
|
||||
|
@ -510,7 +502,6 @@ def iter_null_segs(
|
|||
)
|
||||
|
||||
|
||||
# TODO: move to ._pl_anal
|
||||
def with_dts(
|
||||
df: pl.DataFrame,
|
||||
time_col: str = 'time',
|
||||
|
@ -526,7 +517,7 @@ def with_dts(
|
|||
pl.from_epoch(pl.col(time_col)).alias('dt'),
|
||||
]).with_columns([
|
||||
pl.from_epoch(
|
||||
column=pl.col(f'{time_col}_prev'),
|
||||
pl.col(f'{time_col}_prev')
|
||||
).alias('dt_prev'),
|
||||
pl.col('dt').diff().alias('dt_diff'),
|
||||
]) #.with_columns(
|
||||
|
@ -534,6 +525,19 @@ def with_dts(
|
|||
# )
|
||||
|
||||
|
||||
def dedup_dt(
|
||||
df: pl.DataFrame,
|
||||
) -> pl.DataFrame:
|
||||
'''
|
||||
Drop duplicate date-time rows (normally from an OHLC frame).
|
||||
|
||||
'''
|
||||
return df.unique(
|
||||
subset=['dt'],
|
||||
maintain_order=True,
|
||||
)
|
||||
|
||||
|
||||
t_unit: Literal = Literal[
|
||||
'days',
|
||||
'hours',
|
||||
|
@ -647,11 +651,7 @@ def dedupe(src_df: pl.DataFrame) -> tuple[
|
|||
)
|
||||
|
||||
# remove duplicated datetime samples/sections
|
||||
deduped: pl.DataFrame = df.unique(
|
||||
subset=['dt'],
|
||||
maintain_order=True,
|
||||
)
|
||||
|
||||
deduped: pl.DataFrame = dedup_dt(df)
|
||||
deduped_gaps = detect_time_gaps(deduped)
|
||||
|
||||
diff: int = (
|
|
@ -100,10 +100,6 @@ async def open_piker_runtime(
|
|||
or [_default_reg_addr]
|
||||
)
|
||||
|
||||
if ems := tractor_kwargs.get('enable_modules'):
|
||||
# import pdbp; pdbp.set_trace()
|
||||
enable_modules.extend(ems)
|
||||
|
||||
async with (
|
||||
tractor.open_root_actor(
|
||||
|
||||
|
|
|
@ -1,5 +1,5 @@
|
|||
# piker: trading gear for hackers
|
||||
# Copyright (C) 2018-present Tyler Goodlet (in stewardship of pikers)
|
||||
# Copyright (C) 2018-present Tyler Goodlet (in stewardship of piker0)
|
||||
|
||||
# This program is free software: you can redistribute it and/or modify
|
||||
# it under the terms of the GNU Affero General Public License as published by
|
||||
|
@ -19,7 +19,6 @@ Storage middle-ware CLIs.
|
|||
|
||||
"""
|
||||
from __future__ import annotations
|
||||
# from datetime import datetime
|
||||
from pathlib import Path
|
||||
import time
|
||||
|
||||
|
@ -37,8 +36,11 @@ from piker.cli import cli
|
|||
from piker.config import get_conf_dir
|
||||
from piker.data import (
|
||||
ShmArray,
|
||||
tsp,
|
||||
)
|
||||
from piker.data.history import (
|
||||
iter_dfs_from_shms,
|
||||
)
|
||||
from piker import tsp
|
||||
from . import (
|
||||
log,
|
||||
)
|
||||
|
@ -187,8 +189,8 @@ def anal(
|
|||
frame=history,
|
||||
period=period,
|
||||
)
|
||||
# TODO: do tsp queries to backcend to fill i missing
|
||||
# history and then prolly write it to tsdb!
|
||||
if null_segs:
|
||||
await tractor.pause()
|
||||
|
||||
shm_df: pl.DataFrame = await client.as_df(
|
||||
fqme,
|
||||
|
@ -204,27 +206,18 @@ def anal(
|
|||
diff,
|
||||
) = tsp.dedupe(shm_df)
|
||||
|
||||
write_edits: bool = True
|
||||
if (
|
||||
write_edits
|
||||
and (
|
||||
diff
|
||||
or null_segs
|
||||
)
|
||||
):
|
||||
await tractor.pause()
|
||||
|
||||
if diff:
|
||||
await client.write_ohlcv(
|
||||
fqme,
|
||||
ohlcv=deduped,
|
||||
timeframe=period,
|
||||
)
|
||||
|
||||
else:
|
||||
# TODO: something better with tab completion..
|
||||
# is there something more minimal but nearly as
|
||||
# functional as ipython?
|
||||
await tractor.pause()
|
||||
# TODO: something better with tab completion..
|
||||
# is there something more minimal but nearly as
|
||||
# functional as ipython?
|
||||
await tractor.pause()
|
||||
|
||||
trio.run(main)
|
||||
|
||||
|
@ -250,11 +243,11 @@ def ldshm(
|
|||
),
|
||||
):
|
||||
df: pl.DataFrame | None = None
|
||||
for shmfile, shm, shm_df in tsp.iter_dfs_from_shms(fqme):
|
||||
for shmfile, shm, shm_df in iter_dfs_from_shms(fqme):
|
||||
|
||||
# compute ohlc properties for naming
|
||||
times: np.ndarray = shm.array['time']
|
||||
period_s: float = float(times[-1] - times[-2])
|
||||
period_s: float = times[-1] - times[-2]
|
||||
if period_s < 1.:
|
||||
raise ValueError(
|
||||
f'Something is wrong with time period for {shm}:\n{times}'
|
||||
|
@ -277,86 +270,11 @@ def ldshm(
|
|||
|
||||
# TODO: maybe only optionally enter this depending
|
||||
# on some CLI flags and/or gap detection?
|
||||
if (
|
||||
not gaps.is_empty()
|
||||
or null_segs
|
||||
):
|
||||
from piker.ui._remote_ctl import (
|
||||
open_annot_ctl,
|
||||
AnnotCtl,
|
||||
)
|
||||
annot_ctl: AnnotCtl
|
||||
async with open_annot_ctl() as annot_ctl:
|
||||
for i in range(gaps.height):
|
||||
if not gaps.is_empty():
|
||||
await tractor.pause()
|
||||
|
||||
row: pl.DataFrame = gaps[i]
|
||||
|
||||
# TODO: can we eventually remove this
|
||||
# once we figure out why the epoch cols
|
||||
# don't match?
|
||||
iend: int = row['index'][0]
|
||||
# dt: datetime = row['dt'][0]
|
||||
# dt_prev: datetime = row['dt_prev'][0]
|
||||
|
||||
# the gap's right-most bar's OPEN value
|
||||
# at that time (sample) step.
|
||||
# dt_end_t: float = dt.timestamp()
|
||||
|
||||
# TODO: FIX HOW/WHY these aren't matching
|
||||
# and are instead off by 4hours (EST
|
||||
# vs. UTC?!?!)
|
||||
# end_t: float = row['time']
|
||||
# assert (
|
||||
# dt.timestamp()
|
||||
# ==
|
||||
# end_t
|
||||
# )
|
||||
|
||||
# the gap's left-most bar's CLOSE value
|
||||
# at that time (sample) step.
|
||||
|
||||
prev_r: pl.DataFrame = df.filter(
|
||||
pl.col('index') == gaps[0]['index'] - 1
|
||||
)
|
||||
istart: int = prev_r['index'][0]
|
||||
# dt_start_t: float = dt_prev.timestamp()
|
||||
|
||||
# start_t: float = prev_r['time']
|
||||
# assert (
|
||||
# dt_start_t
|
||||
# ==
|
||||
# start_t
|
||||
# )
|
||||
|
||||
# TODO: implement px-col width measure
|
||||
# and ensure at least as many px-cols
|
||||
# shown per rect as configured by user.
|
||||
gap_w: float = abs((iend - istart))
|
||||
# await tractor.pause()
|
||||
if gap_w < 6:
|
||||
margin: float = 6
|
||||
iend += margin
|
||||
istart -= margin
|
||||
|
||||
ro: tuple[float, float] = (
|
||||
# dt_end_t,
|
||||
iend,
|
||||
row['open'][0],
|
||||
)
|
||||
lc: tuple[float, float] = (
|
||||
# dt_start_t,
|
||||
istart,
|
||||
prev_r['close'][0],
|
||||
)
|
||||
|
||||
aid: int = await annot_ctl.add_rect(
|
||||
fqme=fqme,
|
||||
timeframe=period_s,
|
||||
start_pos=lc,
|
||||
end_pos=ro,
|
||||
)
|
||||
assert aid
|
||||
await tractor.pause()
|
||||
if null_segs:
|
||||
await tractor.pause()
|
||||
|
||||
# write to parquet file?
|
||||
if write_parquet:
|
||||
|
|
|
@ -56,6 +56,8 @@ from datetime import datetime
|
|||
from pathlib import Path
|
||||
import time
|
||||
|
||||
# from bidict import bidict
|
||||
# import tractor
|
||||
import numpy as np
|
||||
import polars as pl
|
||||
from pendulum import (
|
||||
|
@ -63,10 +65,10 @@ from pendulum import (
|
|||
)
|
||||
|
||||
from piker import config
|
||||
from piker import tsp
|
||||
from piker.data import (
|
||||
def_iohlcv_fields,
|
||||
ShmArray,
|
||||
tsp,
|
||||
)
|
||||
from piker.log import get_logger
|
||||
from . import TimeseriesNotFound
|
||||
|
|
|
@ -272,15 +272,10 @@ class ContentsLabels:
|
|||
x_in: int,
|
||||
|
||||
) -> None:
|
||||
for (
|
||||
chart,
|
||||
name,
|
||||
label,
|
||||
update,
|
||||
)in self._labels:
|
||||
for chart, name, label, update in self._labels:
|
||||
|
||||
viz = chart.get_viz(name)
|
||||
array: np.ndarray = viz.shm._array
|
||||
array = viz.shm.array
|
||||
index = array[viz.index_field]
|
||||
start = index[0]
|
||||
stop = index[-1]
|
||||
|
@ -291,7 +286,7 @@ class ContentsLabels:
|
|||
):
|
||||
# out of range
|
||||
print('WTF out of range?')
|
||||
# continue
|
||||
continue
|
||||
|
||||
# call provided update func with data point
|
||||
try:
|
||||
|
@ -299,7 +294,6 @@ class ContentsLabels:
|
|||
ix = np.searchsorted(index, x_in)
|
||||
if ix > len(array):
|
||||
breakpoint()
|
||||
|
||||
update(ix, array)
|
||||
|
||||
except IndexError:
|
||||
|
|
|
@ -56,8 +56,8 @@ _line_styles: dict[str, int] = {
|
|||
|
||||
class FlowGraphic(pg.GraphicsObject):
|
||||
'''
|
||||
Base class with minimal interface for `QPainterPath`
|
||||
implemented, real-time updated "data flow" graphics.
|
||||
Base class with minimal interface for `QPainterPath` implemented,
|
||||
real-time updated "data flow" graphics.
|
||||
|
||||
See subtypes below.
|
||||
|
||||
|
@ -167,12 +167,11 @@ class FlowGraphic(pg.GraphicsObject):
|
|||
return None
|
||||
|
||||
# XXX: due to a variety of weird jitter bugs and "smearing"
|
||||
# artifacts when click-drag panning and viewing history time
|
||||
# series, we offer this ctx-mngr interface to allow temporarily
|
||||
# disabling Qt's graphics caching mode; this is now currently
|
||||
# used from ``ChartView.start/signal_ic()`` methods which also
|
||||
# disable the rt-display loop when the user is moving around
|
||||
# a view.
|
||||
# artifacts when click-drag panning and viewing history time series,
|
||||
# we offer this ctx-mngr interface to allow temporarily disabling
|
||||
# Qt's graphics caching mode; this is now currently used from
|
||||
# ``ChartView.start/signal_ic()`` methods which also disable the
|
||||
# rt-display loop when the user is moving around a view.
|
||||
@cm
|
||||
def reset_cache(self) -> None:
|
||||
try:
|
||||
|
|
|
@ -49,7 +49,7 @@ from ..data._formatters import (
|
|||
OHLCBarsAsCurveFmtr, # OHLC converted to line
|
||||
StepCurveFmtr, # "step" curve (like for vlm)
|
||||
)
|
||||
from ..tsp import (
|
||||
from ..data.tsp import (
|
||||
slice_from_time,
|
||||
)
|
||||
from ._ohlc import (
|
||||
|
@ -563,8 +563,7 @@ class Viz(Struct):
|
|||
|
||||
def view_range(self) -> tuple[int, int]:
|
||||
'''
|
||||
Return the start and stop x-indexes for the managed
|
||||
``ViewBox``.
|
||||
Return the start and stop x-indexes for the managed ``ViewBox``.
|
||||
|
||||
'''
|
||||
vr = self.plot.viewRect()
|
||||
|
|
|
@ -470,64 +470,54 @@ async def graphics_update_loop(
|
|||
if ds.hist_vars['i_last'] < ds.hist_vars['i_last_append']:
|
||||
await tractor.pause()
|
||||
|
||||
try:
|
||||
from . import _remote_ctl
|
||||
_remote_ctl._dss = dss
|
||||
# main real-time quotes update loop
|
||||
stream: tractor.MsgStream
|
||||
async with feed.open_multi_stream() as stream:
|
||||
assert stream
|
||||
async for quotes in stream:
|
||||
quote_period = time.time() - last_quote_s
|
||||
quote_rate = round(
|
||||
1/quote_period, 1) if quote_period > 0 else float('inf')
|
||||
if (
|
||||
quote_period <= 1/_quote_throttle_rate
|
||||
|
||||
# main real-time quotes update loop
|
||||
stream: tractor.MsgStream
|
||||
async with feed.open_multi_stream() as stream:
|
||||
assert stream
|
||||
async for quotes in stream:
|
||||
quote_period = time.time() - last_quote_s
|
||||
quote_rate = round(
|
||||
1/quote_period, 1) if quote_period > 0 else float('inf')
|
||||
# in the absolute worst case we shouldn't see more then
|
||||
# twice the expected throttle rate right!?
|
||||
# and quote_rate >= _quote_throttle_rate * 2
|
||||
and quote_rate >= display_rate
|
||||
):
|
||||
pass
|
||||
# log.warning(f'High quote rate {mkt.fqme}: {quote_rate}')
|
||||
|
||||
last_quote_s = time.time()
|
||||
|
||||
for fqme, quote in quotes.items():
|
||||
ds = dss[fqme]
|
||||
ds.quotes = quote
|
||||
rt_pi, hist_pi = pis[fqme]
|
||||
|
||||
# chart isn't active/shown so skip render cycle and
|
||||
# pause feed(s)
|
||||
if (
|
||||
quote_period <= 1/_quote_throttle_rate
|
||||
|
||||
# in the absolute worst case we shouldn't see more then
|
||||
# twice the expected throttle rate right!?
|
||||
# and quote_rate >= _quote_throttle_rate * 2
|
||||
and quote_rate >= display_rate
|
||||
fast_chart.linked.isHidden()
|
||||
or not rt_pi.isVisible()
|
||||
):
|
||||
pass
|
||||
# log.warning(f'High quote rate {mkt.fqme}: {quote_rate}')
|
||||
print(f'{fqme} skipping update for HIDDEN CHART')
|
||||
fast_chart.pause_all_feeds()
|
||||
continue
|
||||
|
||||
last_quote_s = time.time()
|
||||
ic = fast_chart.view._in_interact
|
||||
if ic:
|
||||
fast_chart.pause_all_feeds()
|
||||
print(f'{fqme} PAUSING DURING INTERACTION')
|
||||
await ic.wait()
|
||||
fast_chart.resume_all_feeds()
|
||||
|
||||
for fqme, quote in quotes.items():
|
||||
ds = dss[fqme]
|
||||
ds.quotes = quote
|
||||
rt_pi, hist_pi = pis[fqme]
|
||||
|
||||
# chart isn't active/shown so skip render cycle and
|
||||
# pause feed(s)
|
||||
if (
|
||||
fast_chart.linked.isHidden()
|
||||
or not rt_pi.isVisible()
|
||||
):
|
||||
print(f'{fqme} skipping update for HIDDEN CHART')
|
||||
fast_chart.pause_all_feeds()
|
||||
continue
|
||||
|
||||
ic = fast_chart.view._in_interact
|
||||
if ic:
|
||||
fast_chart.pause_all_feeds()
|
||||
print(f'{fqme} PAUSING DURING INTERACTION')
|
||||
await ic.wait()
|
||||
fast_chart.resume_all_feeds()
|
||||
|
||||
# sync call to update all graphics/UX components.
|
||||
graphics_update_cycle(
|
||||
ds,
|
||||
quote,
|
||||
)
|
||||
|
||||
finally:
|
||||
# XXX: cancel any remote annotation control ctxs
|
||||
_remote_ctl._dss = None
|
||||
for ctx in _remote_ctl._ctxs:
|
||||
await ctx.cancel()
|
||||
# sync call to update all graphics/UX components.
|
||||
graphics_update_cycle(
|
||||
ds,
|
||||
quote,
|
||||
)
|
||||
|
||||
|
||||
def graphics_update_cycle(
|
||||
|
@ -1245,7 +1235,7 @@ async def display_symbol_data(
|
|||
fast from a cached watch-list.
|
||||
|
||||
'''
|
||||
# sbar = godwidget.window.status_bar
|
||||
sbar = godwidget.window.status_bar
|
||||
# historical data fetch
|
||||
# brokermod = brokers.get_brokermod(provider)
|
||||
|
||||
|
@ -1255,11 +1245,11 @@ async def display_symbol_data(
|
|||
# group_key=loading_sym_key,
|
||||
# )
|
||||
|
||||
# for fqme in fqmes:
|
||||
# loading_sym_key = sbar.open_status(
|
||||
# f'loading {fqme} ->',
|
||||
# group_key=True
|
||||
# )
|
||||
for fqme in fqmes:
|
||||
loading_sym_key = sbar.open_status(
|
||||
f'loading {fqme} ->',
|
||||
group_key=True
|
||||
)
|
||||
|
||||
# (TODO: make this not so shit XD)
|
||||
# close group status once a symbol feed fully loads to view.
|
||||
|
@ -1432,7 +1422,7 @@ async def display_symbol_data(
|
|||
start_fsp_displays,
|
||||
rt_linked,
|
||||
flume,
|
||||
# loading_sym_key,
|
||||
loading_sym_key,
|
||||
loglevel,
|
||||
)
|
||||
|
||||
|
|
|
@ -21,8 +21,7 @@ Higher level annotation editors.
|
|||
from __future__ import annotations
|
||||
from collections import defaultdict
|
||||
from typing import (
|
||||
Sequence,
|
||||
TYPE_CHECKING,
|
||||
TYPE_CHECKING
|
||||
)
|
||||
|
||||
import pyqtgraph as pg
|
||||
|
@ -32,37 +31,24 @@ from pyqtgraph import (
|
|||
QtCore,
|
||||
QtWidgets,
|
||||
)
|
||||
from PyQt5.QtCore import (
|
||||
QPointF,
|
||||
QRectF,
|
||||
)
|
||||
from PyQt5.QtGui import (
|
||||
QColor,
|
||||
QTransform,
|
||||
)
|
||||
from PyQt5.QtWidgets import (
|
||||
QGraphicsProxyWidget,
|
||||
QGraphicsScene,
|
||||
QLabel,
|
||||
)
|
||||
|
||||
from pyqtgraph import functions as fn
|
||||
from PyQt5.QtCore import QPointF
|
||||
import numpy as np
|
||||
|
||||
from piker.types import Struct
|
||||
from ._style import (
|
||||
hcolor,
|
||||
_font,
|
||||
)
|
||||
from ._style import hcolor, _font
|
||||
from ._lines import LevelLine
|
||||
from ..log import get_logger
|
||||
|
||||
if TYPE_CHECKING:
|
||||
from ._chart import (
|
||||
GodWidget,
|
||||
ChartPlotWidget,
|
||||
)
|
||||
from ._interaction import ChartView
|
||||
from ._chart import GodWidget
|
||||
|
||||
|
||||
log = get_logger(__name__)
|
||||
|
@ -79,7 +65,7 @@ class ArrowEditor(Struct):
|
|||
uid: str,
|
||||
x: float,
|
||||
y: float,
|
||||
color: str = 'default',
|
||||
color='default',
|
||||
pointing: str | None = None,
|
||||
|
||||
) -> pg.ArrowItem:
|
||||
|
@ -265,56 +251,27 @@ class LineEditor(Struct):
|
|||
return lines
|
||||
|
||||
|
||||
def as_point(
|
||||
pair: Sequence[float, float] | QPointF,
|
||||
) -> list[QPointF, QPointF]:
|
||||
'''
|
||||
Case any input tuple of floats to a a list of `QPoint` objects
|
||||
for use in Qt geometry routines.
|
||||
|
||||
'''
|
||||
if isinstance(pair, QPointF):
|
||||
return pair
|
||||
|
||||
return QPointF(pair[0], pair[1])
|
||||
|
||||
|
||||
# TODO: maybe implement better, something something RectItemProxy??
|
||||
# -[ ] dig into details of how proxy's work?
|
||||
# https://doc.qt.io/qt-5/qgraphicsscene.html#addWidget
|
||||
# -[ ] consider using `.addRect()` maybe?
|
||||
|
||||
class SelectRect(QtWidgets.QGraphicsRectItem):
|
||||
'''
|
||||
A data-view "selection rectangle": the most fundamental
|
||||
geometry for annotating data views.
|
||||
|
||||
- https://doc.qt.io/qt-5/qgraphicsrectitem.html
|
||||
- https://doc.qt.io/qt-6/qgraphicsrectitem.html
|
||||
|
||||
'''
|
||||
def __init__(
|
||||
self,
|
||||
viewbox: ViewBox,
|
||||
color: str | None = None,
|
||||
color: str = 'dad_blue',
|
||||
) -> None:
|
||||
super().__init__(0, 0, 1, 1)
|
||||
|
||||
# self.rbScaleBox = QGraphicsRectItem(0, 0, 1, 1)
|
||||
self.vb: ViewBox = viewbox
|
||||
self.vb = viewbox
|
||||
self._chart: 'ChartPlotWidget' = None # noqa
|
||||
|
||||
self._chart: ChartPlotWidget | None = None # noqa
|
||||
|
||||
# TODO: maybe allow this to be dynamic via a method?
|
||||
#l override selection box color
|
||||
color: str = color or 'dad_blue'
|
||||
# override selection box color
|
||||
color = QColor(hcolor(color))
|
||||
|
||||
self.setPen(fn.mkPen(color, width=1))
|
||||
color.setAlpha(66)
|
||||
self.setBrush(fn.mkBrush(color))
|
||||
self.setZValue(1e9)
|
||||
self.hide()
|
||||
self._label = None
|
||||
|
||||
label = self._label = QLabel()
|
||||
label.setTextFormat(0) # markdown
|
||||
|
@ -324,15 +281,13 @@ class SelectRect(QtWidgets.QGraphicsRectItem):
|
|||
QtCore.Qt.AlignLeft
|
||||
# | QtCore.Qt.AlignVCenter
|
||||
)
|
||||
label.hide() # always right after init
|
||||
|
||||
# proxy is created after containing scene is initialized
|
||||
self._label_proxy: QGraphicsProxyWidget | None = None
|
||||
self._abs_top_right: Point | None = None
|
||||
self._label_proxy = None
|
||||
self._abs_top_right = None
|
||||
|
||||
# TODO: "swing %" might be handy here (data's max/min
|
||||
# # % change)?
|
||||
self._contents: list[str] = [
|
||||
# TODO: "swing %" might be handy here (data's max/min # % change)
|
||||
self._contents = [
|
||||
'change: {pchng:.2f} %',
|
||||
'range: {rng:.2f}',
|
||||
'bars: {nbars}',
|
||||
|
@ -342,30 +297,12 @@ class SelectRect(QtWidgets.QGraphicsRectItem):
|
|||
'sigma: {std:.2f}',
|
||||
]
|
||||
|
||||
self.add_to_view(viewbox)
|
||||
|
||||
def add_to_view(
|
||||
self,
|
||||
view: ChartView,
|
||||
) -> None:
|
||||
'''
|
||||
Self-defined view hookup impl which will
|
||||
also re-assign the internal ref.
|
||||
|
||||
'''
|
||||
view.addItem(
|
||||
self,
|
||||
ignoreBounds=True,
|
||||
)
|
||||
if self.vb is not view:
|
||||
self.vb = view
|
||||
|
||||
@property
|
||||
def chart(self) -> ChartPlotWidget: # noqa
|
||||
def chart(self) -> 'ChartPlotWidget': # noqa
|
||||
return self._chart
|
||||
|
||||
@chart.setter
|
||||
def chart(self, chart: ChartPlotWidget) -> None: # noqa
|
||||
def chart(self, chart: 'ChartPlotWidget') -> None: # noqa
|
||||
self._chart = chart
|
||||
chart.sigRangeChanged.connect(self.update_on_resize)
|
||||
palette = self._label.palette()
|
||||
|
@ -378,155 +315,57 @@ class SelectRect(QtWidgets.QGraphicsRectItem):
|
|||
)
|
||||
|
||||
def update_on_resize(self, vr, r):
|
||||
'''
|
||||
Re-position measure label on view range change.
|
||||
"""Re-position measure label on view range change.
|
||||
|
||||
'''
|
||||
"""
|
||||
if self._abs_top_right:
|
||||
self._label_proxy.setPos(
|
||||
self.vb.mapFromView(self._abs_top_right)
|
||||
)
|
||||
|
||||
def set_scen_pos(
|
||||
def mouse_drag_released(
|
||||
self,
|
||||
scen_p1: QPointF,
|
||||
scen_p2: QPointF,
|
||||
|
||||
update_label: bool = True,
|
||||
|
||||
p1: QPointF,
|
||||
p2: QPointF
|
||||
) -> None:
|
||||
'''
|
||||
Set position from scene coords of selection rect (normally
|
||||
from mouse position) and accompanying label, move label to
|
||||
match.
|
||||
"""Called on final button release for mouse drag with start and
|
||||
end positions.
|
||||
|
||||
'''
|
||||
# NOTE XXX: apparently just setting it doesn't work!?
|
||||
# i have no idea why but it's pretty weird we have to do
|
||||
# this transform thing which was basically pulled verbatim
|
||||
# from the `pg.ViewBox.updateScaleBox()` method.
|
||||
view_rect: QRectF = self.vb.childGroup.mapRectFromScene(
|
||||
QRectF(
|
||||
scen_p1,
|
||||
scen_p2,
|
||||
)
|
||||
)
|
||||
self.setPos(view_rect.topLeft())
|
||||
# XXX: does not work..!?!?
|
||||
# https://doc.qt.io/qt-5/qgraphicsrectitem.html#setRect
|
||||
# self.setRect(view_rect)
|
||||
"""
|
||||
self.set_pos(p1, p2)
|
||||
|
||||
tr = QTransform.fromScale(
|
||||
view_rect.width(),
|
||||
view_rect.height(),
|
||||
)
|
||||
self.setTransform(tr)
|
||||
|
||||
# XXX: never got this working, was always offset
|
||||
# / transformed completely wrong (and off to the far right
|
||||
# from the cursor?)
|
||||
# self.set_view_pos(
|
||||
# view_rect=view_rect,
|
||||
# # self.vwqpToView(p1),
|
||||
# # self.vb.mapToView(p2),
|
||||
# # start_pos=self.vb.mapToScene(p1),
|
||||
# # end_pos=self.vb.mapToScene(p2),
|
||||
# )
|
||||
self.show()
|
||||
|
||||
if update_label:
|
||||
self.init_label(view_rect)
|
||||
|
||||
def set_view_pos(
|
||||
def set_pos(
|
||||
self,
|
||||
|
||||
start_pos: QPointF | Sequence[float, float] | None = None,
|
||||
end_pos: QPointF | Sequence[float, float] | None = None,
|
||||
view_rect: QRectF | None = None,
|
||||
|
||||
update_label: bool = True,
|
||||
|
||||
p1: QPointF,
|
||||
p2: QPointF
|
||||
) -> None:
|
||||
'''
|
||||
Set position from `ViewBox` coords (i.e. from the actual
|
||||
data domain) of rect (and any accompanying label which is
|
||||
moved to match).
|
||||
"""Set position of selection rect and accompanying label, move
|
||||
label to match.
|
||||
|
||||
'''
|
||||
if self._chart is None:
|
||||
raise RuntimeError(
|
||||
'You MUST assign a `SelectRect.chart: ChartPlotWidget`!'
|
||||
)
|
||||
|
||||
if view_rect is None:
|
||||
# ensure point casting
|
||||
start_pos: QPointF = as_point(start_pos)
|
||||
end_pos: QPointF = as_point(end_pos)
|
||||
|
||||
# map to view coords and update area
|
||||
view_rect = QtCore.QRectF(
|
||||
start_pos,
|
||||
end_pos,
|
||||
)
|
||||
|
||||
self.setPos(view_rect.topLeft())
|
||||
|
||||
# NOTE: SERIOUSLY NO IDEA WHY THIS WORKS...
|
||||
# but it does and all the other commented stuff above
|
||||
# dint, dawg..
|
||||
|
||||
# self.resetTransform()
|
||||
# self.setRect(view_rect)
|
||||
|
||||
tr = QTransform.fromScale(
|
||||
view_rect.width(),
|
||||
view_rect.height(),
|
||||
)
|
||||
self.setTransform(tr)
|
||||
|
||||
if update_label:
|
||||
self.init_label(view_rect)
|
||||
|
||||
print(
|
||||
'SelectRect modify:\n'
|
||||
f'QRectF: {view_rect}\n'
|
||||
f'start_pos: {start_pos}\n'
|
||||
f'end_pos: {end_pos}\n'
|
||||
)
|
||||
self.show()
|
||||
|
||||
def init_label(
|
||||
self,
|
||||
view_rect: QRectF,
|
||||
) -> QLabel:
|
||||
|
||||
# should be init-ed in `.__init__()`
|
||||
label: QLabel = self._label
|
||||
cv: ChartView = self.vb
|
||||
|
||||
# https://doc.qt.io/qt-5/qgraphicsproxywidget.html
|
||||
"""
|
||||
if self._label_proxy is None:
|
||||
scen: QGraphicsScene = cv.scene()
|
||||
# NOTE: specifically this is passing a widget
|
||||
# pointer to the scene's `.addWidget()` as per,
|
||||
# https://doc.qt.io/qt-5/qgraphicsproxywidget.html#embedding-a-widget-with-qgraphicsproxywidget
|
||||
self._label_proxy: QGraphicsProxyWidget = scen.addWidget(label)
|
||||
# https://doc.qt.io/qt-5/qgraphicsproxywidget.html
|
||||
self._label_proxy = self.vb.scene().addWidget(self._label)
|
||||
|
||||
# get label startup coords
|
||||
tl: QPointF = view_rect.topLeft()
|
||||
br: QPointF = view_rect.bottomRight()
|
||||
start_pos = self.vb.mapToView(p1)
|
||||
end_pos = self.vb.mapToView(p2)
|
||||
|
||||
x1, y1 = tl.x(), tl.y()
|
||||
x2, y2 = br.x(), br.y()
|
||||
# map to view coords and update area
|
||||
r = QtCore.QRectF(start_pos, end_pos)
|
||||
|
||||
# TODO: to remove, previous label corner point unpacking
|
||||
# x1, y1 = start_pos.x(), start_pos.y()
|
||||
# x2, y2 = end_pos.x(), end_pos.y()
|
||||
# y1, y2 = start_pos.y(), end_pos.y()
|
||||
# x1, x2 = start_pos.x(), end_pos.x()
|
||||
# old way; don't need right?
|
||||
# lr = QtCore.QRectF(p1, p2)
|
||||
# r = self.vb.childGroup.mapRectFromParent(lr)
|
||||
|
||||
# TODO: heh, could probably use a max-min streamin algo
|
||||
# here too?
|
||||
self.setPos(r.topLeft())
|
||||
self.resetTransform()
|
||||
self.setRect(r)
|
||||
self.show()
|
||||
|
||||
y1, y2 = start_pos.y(), end_pos.y()
|
||||
x1, x2 = start_pos.x(), end_pos.x()
|
||||
|
||||
# TODO: heh, could probably use a max-min streamin algo here too
|
||||
_, xmn = min(y1, y2), min(x1, x2)
|
||||
ymx, xmx = max(y1, y2), max(x1, x2)
|
||||
|
||||
|
@ -536,35 +375,26 @@ class SelectRect(QtWidgets.QGraphicsRectItem):
|
|||
ixmn, ixmx = round(xmn), round(xmx)
|
||||
nbars = ixmx - ixmn + 1
|
||||
|
||||
chart: ChartPlotWidget = self._chart
|
||||
data: np.ndarray = chart.get_viz(
|
||||
chart.name
|
||||
).shm.array[ixmn:ixmx]
|
||||
chart = self._chart
|
||||
data = chart.get_viz(chart.name).shm.array[ixmn:ixmx]
|
||||
|
||||
if len(data):
|
||||
std: float = data['close'].std()
|
||||
dmx: float = data['high'].max()
|
||||
dmn: float = data['low'].min()
|
||||
std = data['close'].std()
|
||||
dmx = data['high'].max()
|
||||
dmn = data['low'].min()
|
||||
else:
|
||||
dmn = dmx = std = np.nan
|
||||
|
||||
# update label info
|
||||
label.setText('\n'.join(self._contents).format(
|
||||
pchng=pchng,
|
||||
rng=rng,
|
||||
nbars=nbars,
|
||||
std=std,
|
||||
dmx=dmx,
|
||||
dmn=dmn,
|
||||
self._label.setText('\n'.join(self._contents).format(
|
||||
pchng=pchng, rng=rng, nbars=nbars,
|
||||
std=std, dmx=dmx, dmn=dmn,
|
||||
))
|
||||
|
||||
# print(f'x2, y2: {(x2, y2)}')
|
||||
# print(f'xmn, ymn: {(xmn, ymx)}')
|
||||
|
||||
label_anchor = Point(
|
||||
xmx + 2,
|
||||
ymx,
|
||||
)
|
||||
label_anchor = Point(xmx + 2, ymx)
|
||||
|
||||
# XXX: in the drag bottom-right -> top-left case we don't
|
||||
# want the label to overlay the box.
|
||||
|
@ -573,16 +403,13 @@ class SelectRect(QtWidgets.QGraphicsRectItem):
|
|||
# # label_anchor = Point(x2, y2 + self._label.height())
|
||||
# label_anchor = Point(xmn, ymn)
|
||||
|
||||
self._abs_top_right: Point = label_anchor
|
||||
self._label_proxy.setPos(
|
||||
cv.mapFromView(label_anchor)
|
||||
)
|
||||
label.show()
|
||||
self._abs_top_right = label_anchor
|
||||
self._label_proxy.setPos(self.vb.mapFromView(label_anchor))
|
||||
# self._label.show()
|
||||
|
||||
def clear(self):
|
||||
'''
|
||||
Clear the selection box from view.
|
||||
"""Clear the selection box from view.
|
||||
|
||||
'''
|
||||
"""
|
||||
self._label.hide()
|
||||
self.hide()
|
||||
|
|
|
@ -181,10 +181,7 @@ async def open_fsp_sidepane(
|
|||
async def open_fsp_actor_cluster(
|
||||
names: list[str] = ['fsp_0', 'fsp_1'],
|
||||
|
||||
) -> AsyncGenerator[
|
||||
int,
|
||||
dict[str, tractor.Portal]
|
||||
]:
|
||||
) -> AsyncGenerator[int, dict[str, tractor.Portal]]:
|
||||
|
||||
from tractor._clustering import open_actor_cluster
|
||||
|
||||
|
@ -560,7 +557,7 @@ class FspAdmin:
|
|||
conf: dict, # yeah probably dumb..
|
||||
loglevel: str = 'error',
|
||||
|
||||
) -> trio.Event:
|
||||
) -> (trio.Event, ChartPlotWidget):
|
||||
|
||||
flume, started = await self.start_engine_task(
|
||||
target,
|
||||
|
@ -927,7 +924,7 @@ async def start_fsp_displays(
|
|||
|
||||
linked: LinkedSplits,
|
||||
flume: Flume,
|
||||
# group_status_key: str,
|
||||
group_status_key: str,
|
||||
loglevel: str,
|
||||
|
||||
) -> None:
|
||||
|
@ -974,23 +971,21 @@ async def start_fsp_displays(
|
|||
flume,
|
||||
) as admin,
|
||||
):
|
||||
statuses: list[trio.Event] = []
|
||||
statuses = []
|
||||
for target, conf in fsp_conf.items():
|
||||
started: trio.Event = await admin.open_fsp_chart(
|
||||
started = await admin.open_fsp_chart(
|
||||
target,
|
||||
conf,
|
||||
)
|
||||
# done = linked.window().status_bar.open_status(
|
||||
# f'loading fsp, {target}..',
|
||||
# group_key=group_status_key,
|
||||
# )
|
||||
# statuses.append((started, done))
|
||||
statuses.append(started)
|
||||
done = linked.window().status_bar.open_status(
|
||||
f'loading fsp, {target}..',
|
||||
group_key=group_status_key,
|
||||
)
|
||||
statuses.append((started, done))
|
||||
|
||||
# for fsp_loaded, status_cb in statuses:
|
||||
for fsp_loaded in statuses:
|
||||
for fsp_loaded, status_cb in statuses:
|
||||
await fsp_loaded.wait()
|
||||
profiler(f'attached to fsp portal: {target}')
|
||||
# status_cb()
|
||||
status_cb()
|
||||
|
||||
# blocks on nursery until all fsp actors complete
|
||||
|
|
|
@ -30,11 +30,7 @@ from typing import (
|
|||
)
|
||||
|
||||
import pyqtgraph as pg
|
||||
# NOTE XXX: pg is super annoying and re-implements it's own mouse
|
||||
# event subsystem.. we should really look into re-working/writing
|
||||
# this down the road.. Bo
|
||||
from pyqtgraph.GraphicsScene import mouseEvents as mevs
|
||||
# from pyqtgraph.GraphicsScene.mouseEvents import MouseDragEvent
|
||||
# from pyqtgraph.GraphicsScene import mouseEvents
|
||||
from PyQt5.QtWidgets import QGraphicsSceneMouseEvent as gs_mouse
|
||||
from PyQt5.QtGui import (
|
||||
QWheelEvent,
|
||||
|
@ -470,7 +466,6 @@ class ChartView(ViewBox):
|
|||
mode_name: str = 'view'
|
||||
def_delta: float = 616 * 6
|
||||
def_scale_factor: float = 1.016 ** (def_delta * -1 / 20)
|
||||
# annots: dict[int, GraphicsObject] = {}
|
||||
|
||||
def __init__(
|
||||
self,
|
||||
|
@ -491,7 +486,6 @@ class ChartView(ViewBox):
|
|||
# defaultPadding=0.,
|
||||
**kwargs
|
||||
)
|
||||
|
||||
# for "known y-range style"
|
||||
self._static_yrange = static_yrange
|
||||
|
||||
|
@ -506,11 +500,7 @@ class ChartView(ViewBox):
|
|||
|
||||
# add our selection box annotator
|
||||
self.select_box = SelectRect(self)
|
||||
# self.select_box.add_to_view(self)
|
||||
# self.addItem(
|
||||
# self.select_box,
|
||||
# ignoreBounds=True,
|
||||
# )
|
||||
self.addItem(self.select_box, ignoreBounds=True)
|
||||
|
||||
self.mode = None
|
||||
self.order_mode: bool = False
|
||||
|
@ -721,18 +711,17 @@ class ChartView(ViewBox):
|
|||
|
||||
def mouseDragEvent(
|
||||
self,
|
||||
ev: mevs.MouseDragEvent,
|
||||
ev,
|
||||
axis: int | None = None,
|
||||
|
||||
) -> None:
|
||||
pos: Point = ev.pos()
|
||||
lastPos: Point = ev.lastPos()
|
||||
dif: Point = (pos - lastPos) * -1
|
||||
# dif: Point = pos - lastPos
|
||||
# dif: Point = dif * -1
|
||||
pos = ev.pos()
|
||||
lastPos = ev.lastPos()
|
||||
dif = pos - lastPos
|
||||
dif = dif * -1
|
||||
|
||||
# NOTE: if axis is specified, event will only affect that axis.
|
||||
btn = ev.button()
|
||||
button = ev.button()
|
||||
|
||||
# Ignore axes if mouse is disabled
|
||||
mouseEnabled = np.array(
|
||||
|
@ -744,7 +733,7 @@ class ChartView(ViewBox):
|
|||
mask[1-axis] = 0.0
|
||||
|
||||
# Scale or translate based on mouse button
|
||||
if btn & (
|
||||
if button & (
|
||||
QtCore.Qt.LeftButton | QtCore.Qt.MidButton
|
||||
):
|
||||
# zoom y-axis ONLY when click-n-drag on it
|
||||
|
@ -767,55 +756,34 @@ class ChartView(ViewBox):
|
|||
# XXX: WHY
|
||||
ev.accept()
|
||||
|
||||
down_pos: Point = ev.buttonDownPos(
|
||||
btn=btn,
|
||||
)
|
||||
scen_pos: Point = ev.scenePos()
|
||||
scen_down_pos: Point = ev.buttonDownScenePos(
|
||||
btn=btn,
|
||||
)
|
||||
down_pos = ev.buttonDownPos()
|
||||
|
||||
# This is the final position in the drag
|
||||
if ev.isFinish():
|
||||
|
||||
# import pdbp; pdbp.set_trace()
|
||||
self.select_box.mouse_drag_released(down_pos, pos)
|
||||
|
||||
# NOTE: think of this as a `.mouse_drag_release()`
|
||||
# (bc HINT that's what i called the shit ass
|
||||
# method that wrapped this call [yes, as a single
|
||||
# fucking call] originally.. you bish, guille)
|
||||
# Bo.. oraleeee
|
||||
self.select_box.set_scen_pos(
|
||||
# down_pos,
|
||||
# pos,
|
||||
scen_down_pos,
|
||||
scen_pos,
|
||||
)
|
||||
|
||||
# this is the zoom transform cmd
|
||||
ax = QtCore.QRectF(down_pos, pos)
|
||||
ax = self.childGroup.mapRectFromParent(ax)
|
||||
# self.showAxRect(ax)
|
||||
|
||||
# this is the zoom transform cmd
|
||||
self.showAxRect(ax)
|
||||
|
||||
# axis history tracking
|
||||
self.axHistoryPointer += 1
|
||||
self.axHistory = self.axHistory[
|
||||
:self.axHistoryPointer] + [ax]
|
||||
|
||||
else:
|
||||
self.select_box.set_scen_pos(
|
||||
# down_pos,
|
||||
# pos,
|
||||
scen_down_pos,
|
||||
scen_pos,
|
||||
)
|
||||
print('drag finish?')
|
||||
self.select_box.set_pos(down_pos, pos)
|
||||
|
||||
# update shape of scale box
|
||||
# self.updateScaleBox(ev.buttonDownPos(), ev.pos())
|
||||
# breakpoint()
|
||||
# self.updateScaleBox(
|
||||
# down_pos,
|
||||
# ev.pos(),
|
||||
# )
|
||||
self.updateScaleBox(
|
||||
down_pos,
|
||||
ev.pos(),
|
||||
)
|
||||
|
||||
# PANNING MODE
|
||||
else:
|
||||
|
@ -854,7 +822,7 @@ class ChartView(ViewBox):
|
|||
# ev.accept()
|
||||
|
||||
# WEIRD "RIGHT-CLICK CENTER ZOOM" MODE
|
||||
elif btn & QtCore.Qt.RightButton:
|
||||
elif button & QtCore.Qt.RightButton:
|
||||
|
||||
if self.state['aspectLocked'] is not False:
|
||||
mask[0] = 0
|
||||
|
|
|
@ -24,6 +24,8 @@ view transforms.
|
|||
"""
|
||||
import pyqtgraph as pg
|
||||
|
||||
from ._axes import Axis
|
||||
|
||||
|
||||
def invertQTransform(tr):
|
||||
"""Return a QTransform that is the inverse of *tr*.
|
||||
|
@ -51,9 +53,6 @@ def _do_overrides() -> None:
|
|||
pg.functions.invertQTransform = invertQTransform
|
||||
pg.PlotItem = PlotItem
|
||||
|
||||
from ._axes import Axis
|
||||
pg.Axis = Axis
|
||||
|
||||
# enable "QPainterPathPrivate for faster arrayToQPath" from
|
||||
# https://github.com/pyqtgraph/pyqtgraph/pull/2324
|
||||
pg.setConfigOption('enableExperimental', True)
|
||||
|
@ -235,7 +234,7 @@ class PlotItem(pg.PlotItem):
|
|||
# ``ViewBox`` geometry bug.. where a gap for the
|
||||
# 'bottom' axis is somehow left in?
|
||||
# axis = pg.AxisItem(orientation=name, parent=self)
|
||||
axis = pg.Axis(
|
||||
axis = Axis(
|
||||
self,
|
||||
orientation=name,
|
||||
parent=self,
|
||||
|
|
|
@ -1,273 +0,0 @@
|
|||
# piker: trading gear for hackers
|
||||
# Copyright (C) Tyler Goodlet (in stewardship for pikers)
|
||||
|
||||
# This program is free software: you can redistribute it and/or modify
|
||||
# it under the terms of the GNU Affero General Public License as published by
|
||||
# the Free Software Foundation, either version 3 of the License, or
|
||||
# (at your option) any later version.
|
||||
|
||||
# This program is distributed in the hope that it will be useful,
|
||||
# but WITHOUT ANY WARRANTY; without even the implied warranty of
|
||||
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
|
||||
# GNU Affero General Public License for more details.
|
||||
|
||||
# You should have received a copy of the GNU Affero General Public License
|
||||
# along with this program. If not, see <https://www.gnu.org/licenses/>.
|
||||
|
||||
'''
|
||||
Remote control tasks for sending annotations (and maybe more cmds)
|
||||
to a chart from some other actor.
|
||||
|
||||
'''
|
||||
from __future__ import annotations
|
||||
from contextlib import asynccontextmanager as acm
|
||||
from pprint import pformat
|
||||
from typing import (
|
||||
Any,
|
||||
AsyncContextManager,
|
||||
)
|
||||
|
||||
import tractor
|
||||
from tractor import trionics
|
||||
from tractor import (
|
||||
Portal,
|
||||
Context,
|
||||
MsgStream,
|
||||
)
|
||||
from PyQt5.QtWidgets import (
|
||||
QGraphicsItem,
|
||||
)
|
||||
|
||||
from piker.log import get_logger
|
||||
from piker.types import Struct
|
||||
from piker.service import find_service
|
||||
from ._display import DisplayState
|
||||
from ._interaction import ChartView
|
||||
from ._editors import SelectRect
|
||||
from ._chart import ChartPlotWidget
|
||||
|
||||
|
||||
log = get_logger(__name__)
|
||||
|
||||
# NOTE: this is set by the `._display.graphics_update_loop()` once
|
||||
# all chart widgets / Viz per flume have been initialized allowing
|
||||
# for remote annotation (control) of any chart-actor's mkt feed by
|
||||
# fqme lookup Bo
|
||||
_dss: dict[str, DisplayState] | None = None
|
||||
|
||||
# stash each and every client connection so that they can all
|
||||
# be cancelled on shutdown/error.
|
||||
# TODO: make `tractor.Context` hashable via is `.cid: str`?
|
||||
# _ctxs: set[Context] = set()
|
||||
_ctxs: list[Context] = []
|
||||
|
||||
# global map of all uniquely created annotation-graphics
|
||||
# so that they can be mutated (eventually) by a client.
|
||||
_annots: dict[int, QGraphicsItem] = {}
|
||||
|
||||
|
||||
@tractor.context
|
||||
async def remote_annotate(
|
||||
ctx: Context,
|
||||
) -> None:
|
||||
|
||||
global _dss, _ctxs
|
||||
assert _dss
|
||||
|
||||
_ctxs.append(ctx)
|
||||
|
||||
# send back full fqme symbology to caller
|
||||
await ctx.started(list(_dss))
|
||||
|
||||
async with ctx.open_stream() as annot_req_stream:
|
||||
async for msg in annot_req_stream:
|
||||
match msg:
|
||||
case {
|
||||
'annot': 'SelectRect',
|
||||
'fqme': fqme,
|
||||
'timeframe': timeframe,
|
||||
'meth': str(meth),
|
||||
'kwargs': dict(kwargs),
|
||||
}:
|
||||
|
||||
ds: DisplayState = _dss[fqme]
|
||||
chart: ChartPlotWidget = {
|
||||
60: ds.hist_chart,
|
||||
1: ds.chart,
|
||||
}[timeframe]
|
||||
cv: ChartView = chart.cv
|
||||
|
||||
# sanity
|
||||
if timeframe == 60:
|
||||
assert (
|
||||
chart.linked.godwidget.hist_linked.chart.view
|
||||
is
|
||||
cv
|
||||
)
|
||||
|
||||
# annot type lookup from cmd
|
||||
rect = SelectRect(
|
||||
viewbox=cv,
|
||||
|
||||
# TODO: make this more dynamic?
|
||||
# -[ ] pull from conf.toml?
|
||||
# -[ ] add `.set_color()` method to type?
|
||||
# -[ ] make a green/red based on direction
|
||||
# instead of default static color?
|
||||
color=kwargs.pop('color', None),
|
||||
)
|
||||
# XXX NOTE: this is REQUIRED to set the rect
|
||||
# resize callback!
|
||||
rect.chart: ChartPlotWidget = chart
|
||||
|
||||
# delegate generically to the requested method
|
||||
getattr(rect, meth)(**kwargs)
|
||||
rect.show()
|
||||
await annot_req_stream.send(id(rect))
|
||||
|
||||
case _:
|
||||
log.error(
|
||||
'Unknown remote annotation cmd:\n'
|
||||
f'{pformat(msg)}'
|
||||
)
|
||||
|
||||
|
||||
class AnnotCtl(Struct):
|
||||
'''
|
||||
A control for remote "data annotations".
|
||||
|
||||
You know those "squares they always show in machine vision
|
||||
UIs.." this API allows you to remotely control stuff like that
|
||||
in some other graphics actor.
|
||||
|
||||
'''
|
||||
ctx2fqmes: dict[str, str]
|
||||
fqme2stream: dict[str, MsgStream]
|
||||
|
||||
async def add_rect(
|
||||
self,
|
||||
fqme: str,
|
||||
timeframe: float,
|
||||
start_pos: tuple[float, float],
|
||||
end_pos: tuple[float, float],
|
||||
|
||||
# TODO: a `Literal['view', 'scene']` for this?
|
||||
domain: str = 'view', # or 'scene'
|
||||
color: str = 'dad_blue',
|
||||
|
||||
) -> int:
|
||||
'''
|
||||
Add a `SelectRect` annotation to the target view, return
|
||||
the instances `id(obj)` from the remote UI actor.
|
||||
|
||||
'''
|
||||
ipc: MsgStream = self.fqme2stream[fqme]
|
||||
await ipc.send({
|
||||
'fqme': fqme,
|
||||
'annot': 'SelectRect',
|
||||
'timeframe': timeframe,
|
||||
# 'meth': str(meth),
|
||||
'meth': 'set_view_pos' if domain == 'view' else 'set_scene_pos',
|
||||
'kwargs': {
|
||||
'start_pos': tuple(start_pos),
|
||||
'end_pos': tuple(end_pos),
|
||||
'color': color,
|
||||
'update_label': False,
|
||||
},
|
||||
})
|
||||
return (await ipc.receive())
|
||||
|
||||
async def modify(
|
||||
self,
|
||||
aid: int, # annotation id
|
||||
meth: str, # far end graphics object method to invoke
|
||||
params: dict[str, Any], # far end `meth(**kwargs)`
|
||||
) -> bool:
|
||||
'''
|
||||
Modify an existing (remote) annotation's graphics
|
||||
paramters, thus changing it's appearance / state in real
|
||||
time.
|
||||
|
||||
'''
|
||||
raise NotImplementedError
|
||||
|
||||
async def remove(
|
||||
self,
|
||||
uid: int,
|
||||
) -> bool:
|
||||
'''
|
||||
Remove an existing annotation by instance id.
|
||||
|
||||
'''
|
||||
raise NotImplementedError
|
||||
|
||||
|
||||
|
||||
@acm
|
||||
async def open_annot_ctl(
|
||||
uid: tuple[str, str] | None = None,
|
||||
|
||||
) -> AnnotCtl:
|
||||
# TODO: load connetion to a specific chart actor
|
||||
# -[ ] pull from either service scan or config
|
||||
# -[ ] return some kinda client/proxy thinger?
|
||||
# -[ ] maybe we should finally just provide this as
|
||||
# a `tractor.hilevel.CallableProxy` or wtv?
|
||||
# -[ ] use this from the storage.cli stuff to mark up gaps!
|
||||
|
||||
maybe_portals: list[Portal] | None
|
||||
fqmes: list[str]
|
||||
async with find_service(
|
||||
service_name='chart',
|
||||
first_only=False,
|
||||
) as maybe_portals:
|
||||
|
||||
ctx_mngrs: list[AsyncContextManager] = []
|
||||
|
||||
# TODO: print the current discoverable actor UID set
|
||||
# here as well?
|
||||
if not maybe_portals:
|
||||
raise RuntimeError('No chart UI actors found in service domain?')
|
||||
|
||||
for portal in maybe_portals:
|
||||
ctx_mngrs.append(
|
||||
portal.open_context(remote_annotate)
|
||||
)
|
||||
|
||||
ctx2fqmes: dict[str, set[str]] = {}
|
||||
fqme2stream: dict[str, MsgStream] = {}
|
||||
client = AnnotCtl(
|
||||
ctx2fqmes=ctx2fqmes,
|
||||
fqme2stream=fqme2stream,
|
||||
)
|
||||
|
||||
stream_ctxs: list[AsyncContextManager] = []
|
||||
async with trionics.gather_contexts(ctx_mngrs) as ctxs:
|
||||
for (ctx, fqmes) in ctxs:
|
||||
stream_ctxs.append(ctx.open_stream())
|
||||
|
||||
# fill lookup table of mkt addrs to IPC ctxs
|
||||
for fqme in fqmes:
|
||||
if other := fqme2stream.get(fqme):
|
||||
raise ValueError(
|
||||
f'More then one chart displays {fqme}!?\n'
|
||||
'Other UI actor info:\n'
|
||||
f'channel: {other._ctx.chan}]\n'
|
||||
f'actor uid: {other._ctx.chan.uid}]\n'
|
||||
f'ctx id: {other._ctx.cid}]\n'
|
||||
)
|
||||
|
||||
ctx2fqmes.setdefault(
|
||||
ctx.cid,
|
||||
set(),
|
||||
).add(fqme)
|
||||
|
||||
async with trionics.gather_contexts(stream_ctxs) as streams:
|
||||
for stream in streams:
|
||||
fqmes: set[str] = ctx2fqmes[stream._ctx.cid]
|
||||
for fqme in fqmes:
|
||||
fqme2stream[fqme] = stream
|
||||
|
||||
yield client
|
||||
# TODO: on graceful teardown should we try to
|
||||
# remove all annots that were created/modded?
|
|
@ -228,10 +228,5 @@ def chart(
|
|||
'loglevel': tractorloglevel,
|
||||
'name': 'chart',
|
||||
'registry_addrs': list(set(regaddrs)),
|
||||
'enable_modules': [
|
||||
|
||||
# remote data-view annotations Bo
|
||||
'piker.ui._remote_ctl',
|
||||
],
|
||||
},
|
||||
)
|
||||
|
|
|
@ -31,7 +31,7 @@ import pendulum
|
|||
import pyqtgraph as pg
|
||||
|
||||
from piker.types import Struct
|
||||
from ..tsp import slice_from_time
|
||||
from ..data.tsp import slice_from_time
|
||||
from ..log import get_logger
|
||||
from ..toolz import Profiler
|
||||
|
||||
|
|
Loading…
Reference in New Issue