Compare commits
17 Commits
1f9a497637
...
a681b2f0bb
Author | SHA1 | Date |
---|---|---|
Tyler Goodlet | a681b2f0bb | |
Tyler Goodlet | 5b0c94933b | |
Tyler Goodlet | 61e52213b2 | |
Tyler Goodlet | b064a5f94d | |
Tyler Goodlet | e7fa841263 | |
Tyler Goodlet | 1f346483a0 | |
Tyler Goodlet | d006ecce7e | |
Tyler Goodlet | 69368f20c2 | |
Tyler Goodlet | 31fa0b02f5 | |
Tyler Goodlet | 5a60974990 | |
Tyler Goodlet | 8d324acf91 | |
Tyler Goodlet | ab84303da7 | |
Tyler Goodlet | 659649ec48 | |
Tyler Goodlet | f7cc43ee0b | |
Tyler Goodlet | f5dc21d3f4 | |
Tyler Goodlet | 4568c55f17 | |
Tyler Goodlet | d5d68f75ea |
|
@ -843,6 +843,7 @@ class Client:
|
||||||
self,
|
self,
|
||||||
contract: Contract,
|
contract: Contract,
|
||||||
timeout: float = 1,
|
timeout: float = 1,
|
||||||
|
tries: int = 100,
|
||||||
raise_on_timeout: bool = False,
|
raise_on_timeout: bool = False,
|
||||||
|
|
||||||
) -> Ticker | None:
|
) -> Ticker | None:
|
||||||
|
@ -857,30 +858,30 @@ class Client:
|
||||||
ready: ticker.TickerUpdateEvent = ticker.updateEvent
|
ready: ticker.TickerUpdateEvent = ticker.updateEvent
|
||||||
|
|
||||||
# ensure a last price gets filled in before we deliver quote
|
# ensure a last price gets filled in before we deliver quote
|
||||||
|
timeouterr: Exception | None = None
|
||||||
warnset: bool = False
|
warnset: bool = False
|
||||||
for _ in range(100):
|
for _ in range(tries):
|
||||||
if isnan(ticker.last):
|
|
||||||
|
|
||||||
# wait for a first update(Event)
|
# wait for a first update(Event) indicatingn a
|
||||||
|
# live quote feed.
|
||||||
|
if isnan(ticker.last):
|
||||||
try:
|
try:
|
||||||
tkr = await asyncio.wait_for(
|
tkr = await asyncio.wait_for(
|
||||||
ready,
|
ready,
|
||||||
timeout=timeout,
|
timeout=timeout,
|
||||||
)
|
)
|
||||||
except TimeoutError:
|
if tkr:
|
||||||
import pdbp
|
break
|
||||||
pdbp.set_trace()
|
except TimeoutError as err:
|
||||||
|
timeouterr = err
|
||||||
|
await asyncio.sleep(0.01)
|
||||||
|
continue
|
||||||
|
|
||||||
if raise_on_timeout:
|
|
||||||
raise
|
|
||||||
return None
|
|
||||||
|
|
||||||
if tkr:
|
|
||||||
break
|
|
||||||
else:
|
else:
|
||||||
if not warnset:
|
if not warnset:
|
||||||
log.warning(
|
log.warning(
|
||||||
f'Quote for {contract} timed out: market is closed?'
|
f'Quote req timed out..maybe venue is closed?\n'
|
||||||
|
f'{asdict(contract)}'
|
||||||
)
|
)
|
||||||
warnset = True
|
warnset = True
|
||||||
|
|
||||||
|
@ -888,6 +889,11 @@ class Client:
|
||||||
log.info(f'Got first quote for {contract}')
|
log.info(f'Got first quote for {contract}')
|
||||||
break
|
break
|
||||||
else:
|
else:
|
||||||
|
if timeouterr and raise_on_timeout:
|
||||||
|
import pdbp
|
||||||
|
pdbp.set_trace()
|
||||||
|
raise timeouterr
|
||||||
|
|
||||||
if not warnset:
|
if not warnset:
|
||||||
log.warning(
|
log.warning(
|
||||||
f'Contract {contract} is not returning a quote '
|
f'Contract {contract} is not returning a quote '
|
||||||
|
@ -895,6 +901,8 @@ class Client:
|
||||||
)
|
)
|
||||||
warnset = True
|
warnset = True
|
||||||
|
|
||||||
|
return None
|
||||||
|
|
||||||
return ticker
|
return ticker
|
||||||
|
|
||||||
# async to be consistent for the client proxy, and cuz why not.
|
# async to be consistent for the client proxy, and cuz why not.
|
||||||
|
|
|
@ -943,6 +943,11 @@ async def stream_quotes(
|
||||||
contract=con,
|
contract=con,
|
||||||
raise_on_timeout=True,
|
raise_on_timeout=True,
|
||||||
)
|
)
|
||||||
|
first_quote: dict = normalize(first_ticker)
|
||||||
|
log.info(
|
||||||
|
'Rxed init quote:\n'
|
||||||
|
f'{pformat(first_quote)}'
|
||||||
|
)
|
||||||
cs: trio.CancelScope | None = None
|
cs: trio.CancelScope | None = None
|
||||||
startup: bool = True
|
startup: bool = True
|
||||||
while (
|
while (
|
||||||
|
|
|
@ -134,14 +134,19 @@ def get_app_dir(
|
||||||
|
|
||||||
_click_config_dir: Path = Path(get_app_dir('piker'))
|
_click_config_dir: Path = Path(get_app_dir('piker'))
|
||||||
_config_dir: Path = _click_config_dir
|
_config_dir: Path = _click_config_dir
|
||||||
_parent_user: str = os.environ.get('SUDO_USER')
|
|
||||||
|
|
||||||
if _parent_user:
|
# NOTE: when using `sudo` we attempt to determine the non-root user
|
||||||
|
# and still use their normal config dir.
|
||||||
|
if (
|
||||||
|
(_parent_user := os.environ.get('SUDO_USER'))
|
||||||
|
and
|
||||||
|
_parent_user != 'root'
|
||||||
|
):
|
||||||
non_root_user_dir = Path(
|
non_root_user_dir = Path(
|
||||||
os.path.expanduser(f'~{_parent_user}')
|
os.path.expanduser(f'~{_parent_user}')
|
||||||
)
|
)
|
||||||
root: str = 'root'
|
root: str = 'root'
|
||||||
_ccds: str = str(_click_config_dir) # click config dir string
|
_ccds: str = str(_click_config_dir) # click config dir as string
|
||||||
i_tail: int = int(_ccds.rfind(root) + len(root))
|
i_tail: int = int(_ccds.rfind(root) + len(root))
|
||||||
_config_dir = (
|
_config_dir = (
|
||||||
non_root_user_dir
|
non_root_user_dir
|
||||||
|
|
|
@ -45,10 +45,7 @@ import trio
|
||||||
from trio.abc import ReceiveChannel
|
from trio.abc import ReceiveChannel
|
||||||
from trio_typing import TaskStatus
|
from trio_typing import TaskStatus
|
||||||
import tractor
|
import tractor
|
||||||
from tractor.trionics import (
|
from tractor import trionics
|
||||||
maybe_open_context,
|
|
||||||
gather_contexts,
|
|
||||||
)
|
|
||||||
|
|
||||||
from piker.accounting import (
|
from piker.accounting import (
|
||||||
MktPair,
|
MktPair,
|
||||||
|
@ -69,7 +66,7 @@ from .validate import (
|
||||||
FeedInit,
|
FeedInit,
|
||||||
validate_backend,
|
validate_backend,
|
||||||
)
|
)
|
||||||
from .history import (
|
from ..tsp import (
|
||||||
manage_history,
|
manage_history,
|
||||||
)
|
)
|
||||||
from .ingest import get_ingestormod
|
from .ingest import get_ingestormod
|
||||||
|
@ -124,6 +121,8 @@ class _FeedsBus(Struct):
|
||||||
trio.CancelScope] = trio.TASK_STATUS_IGNORED,
|
trio.CancelScope] = trio.TASK_STATUS_IGNORED,
|
||||||
) -> None:
|
) -> None:
|
||||||
with trio.CancelScope() as cs:
|
with trio.CancelScope() as cs:
|
||||||
|
# TODO: shouldn't this be a direct await to avoid
|
||||||
|
# cancellation contagion to the bus nursery!?!?!
|
||||||
await self.nursery.start(
|
await self.nursery.start(
|
||||||
target,
|
target,
|
||||||
*args,
|
*args,
|
||||||
|
@ -330,7 +329,6 @@ async def allocate_persistent_feed(
|
||||||
) = await bus.nursery.start(
|
) = await bus.nursery.start(
|
||||||
manage_history,
|
manage_history,
|
||||||
mod,
|
mod,
|
||||||
bus,
|
|
||||||
mkt,
|
mkt,
|
||||||
some_data_ready,
|
some_data_ready,
|
||||||
feed_is_live,
|
feed_is_live,
|
||||||
|
@ -407,7 +405,13 @@ async def allocate_persistent_feed(
|
||||||
rt_shm.array['time'][1] = ts + 1
|
rt_shm.array['time'][1] = ts + 1
|
||||||
|
|
||||||
elif hist_shm.array.size == 0:
|
elif hist_shm.array.size == 0:
|
||||||
raise RuntimeError(f'History (1m) Shm for {fqme} is empty!?')
|
for i in range(100):
|
||||||
|
await trio.sleep(0.1)
|
||||||
|
if hist_shm.array.size > 0:
|
||||||
|
break
|
||||||
|
else:
|
||||||
|
await tractor.pause()
|
||||||
|
raise RuntimeError(f'History (1m) Shm for {fqme} is empty!?')
|
||||||
|
|
||||||
# wait the spawning parent task to register its subscriber
|
# wait the spawning parent task to register its subscriber
|
||||||
# send-stream entry before we start the sample loop.
|
# send-stream entry before we start the sample loop.
|
||||||
|
@ -453,8 +457,12 @@ async def open_feed_bus(
|
||||||
if loglevel is None:
|
if loglevel is None:
|
||||||
loglevel = tractor.current_actor().loglevel
|
loglevel = tractor.current_actor().loglevel
|
||||||
|
|
||||||
# XXX: required to propagate ``tractor`` loglevel to piker logging
|
# XXX: required to propagate ``tractor`` loglevel to piker
|
||||||
get_console_log(loglevel or tractor.current_actor().loglevel)
|
# logging
|
||||||
|
get_console_log(
|
||||||
|
loglevel
|
||||||
|
or tractor.current_actor().loglevel
|
||||||
|
)
|
||||||
|
|
||||||
# local state sanity checks
|
# local state sanity checks
|
||||||
# TODO: check for any stale shm entries for this symbol
|
# TODO: check for any stale shm entries for this symbol
|
||||||
|
@ -464,7 +472,7 @@ async def open_feed_bus(
|
||||||
assert 'brokerd' in servicename
|
assert 'brokerd' in servicename
|
||||||
assert brokername in servicename
|
assert brokername in servicename
|
||||||
|
|
||||||
bus = get_feed_bus(brokername)
|
bus: _FeedsBus = get_feed_bus(brokername)
|
||||||
sub_registered = trio.Event()
|
sub_registered = trio.Event()
|
||||||
|
|
||||||
flumes: dict[str, Flume] = {}
|
flumes: dict[str, Flume] = {}
|
||||||
|
@ -768,7 +776,7 @@ async def maybe_open_feed(
|
||||||
'''
|
'''
|
||||||
fqme = fqmes[0]
|
fqme = fqmes[0]
|
||||||
|
|
||||||
async with maybe_open_context(
|
async with trionics.maybe_open_context(
|
||||||
acm_func=open_feed,
|
acm_func=open_feed,
|
||||||
kwargs={
|
kwargs={
|
||||||
'fqmes': fqmes,
|
'fqmes': fqmes,
|
||||||
|
@ -788,7 +796,7 @@ async def maybe_open_feed(
|
||||||
# add a new broadcast subscription for the quote stream
|
# add a new broadcast subscription for the quote stream
|
||||||
# if this feed is likely already in use
|
# if this feed is likely already in use
|
||||||
|
|
||||||
async with gather_contexts(
|
async with trionics.gather_contexts(
|
||||||
mngrs=[stream.subscribe() for stream in feed.streams.values()]
|
mngrs=[stream.subscribe() for stream in feed.streams.values()]
|
||||||
) as bstreams:
|
) as bstreams:
|
||||||
for bstream, flume in zip(bstreams, feed.flumes.values()):
|
for bstream, flume in zip(bstreams, feed.flumes.values()):
|
||||||
|
@ -848,7 +856,7 @@ async def open_feed(
|
||||||
)
|
)
|
||||||
|
|
||||||
portals: tuple[tractor.Portal]
|
portals: tuple[tractor.Portal]
|
||||||
async with gather_contexts(
|
async with trionics.gather_contexts(
|
||||||
brokerd_ctxs,
|
brokerd_ctxs,
|
||||||
) as portals:
|
) as portals:
|
||||||
|
|
||||||
|
@ -900,7 +908,7 @@ async def open_feed(
|
||||||
assert len(feed.mods) == len(feed.portals)
|
assert len(feed.mods) == len(feed.portals)
|
||||||
|
|
||||||
async with (
|
async with (
|
||||||
gather_contexts(bus_ctxs) as ctxs,
|
trionics.gather_contexts(bus_ctxs) as ctxs,
|
||||||
):
|
):
|
||||||
stream_ctxs: list[tractor.MsgStream] = []
|
stream_ctxs: list[tractor.MsgStream] = []
|
||||||
for (
|
for (
|
||||||
|
@ -942,7 +950,7 @@ async def open_feed(
|
||||||
brokermod: ModuleType
|
brokermod: ModuleType
|
||||||
fqmes: list[str]
|
fqmes: list[str]
|
||||||
async with (
|
async with (
|
||||||
gather_contexts(stream_ctxs) as streams,
|
trionics.gather_contexts(stream_ctxs) as streams,
|
||||||
):
|
):
|
||||||
for (
|
for (
|
||||||
stream,
|
stream,
|
||||||
|
@ -958,6 +966,12 @@ async def open_feed(
|
||||||
if brokermod.name == flume.mkt.broker:
|
if brokermod.name == flume.mkt.broker:
|
||||||
flume.stream = stream
|
flume.stream = stream
|
||||||
|
|
||||||
assert len(feed.mods) == len(feed.portals) == len(feed.streams)
|
assert (
|
||||||
|
len(feed.mods)
|
||||||
|
==
|
||||||
|
len(feed.portals)
|
||||||
|
==
|
||||||
|
len(feed.streams)
|
||||||
|
)
|
||||||
|
|
||||||
yield feed
|
yield feed
|
||||||
|
|
|
@ -100,6 +100,10 @@ async def open_piker_runtime(
|
||||||
or [_default_reg_addr]
|
or [_default_reg_addr]
|
||||||
)
|
)
|
||||||
|
|
||||||
|
if ems := tractor_kwargs.get('enable_modules'):
|
||||||
|
# import pdbp; pdbp.set_trace()
|
||||||
|
enable_modules.extend(ems)
|
||||||
|
|
||||||
async with (
|
async with (
|
||||||
tractor.open_root_actor(
|
tractor.open_root_actor(
|
||||||
|
|
||||||
|
|
|
@ -1,5 +1,5 @@
|
||||||
# piker: trading gear for hackers
|
# piker: trading gear for hackers
|
||||||
# Copyright (C) 2018-present Tyler Goodlet (in stewardship of piker0)
|
# Copyright (C) 2018-present Tyler Goodlet (in stewardship of pikers)
|
||||||
|
|
||||||
# This program is free software: you can redistribute it and/or modify
|
# This program is free software: you can redistribute it and/or modify
|
||||||
# it under the terms of the GNU Affero General Public License as published by
|
# it under the terms of the GNU Affero General Public License as published by
|
||||||
|
@ -19,6 +19,7 @@ Storage middle-ware CLIs.
|
||||||
|
|
||||||
"""
|
"""
|
||||||
from __future__ import annotations
|
from __future__ import annotations
|
||||||
|
# from datetime import datetime
|
||||||
from pathlib import Path
|
from pathlib import Path
|
||||||
import time
|
import time
|
||||||
|
|
||||||
|
@ -36,11 +37,8 @@ from piker.cli import cli
|
||||||
from piker.config import get_conf_dir
|
from piker.config import get_conf_dir
|
||||||
from piker.data import (
|
from piker.data import (
|
||||||
ShmArray,
|
ShmArray,
|
||||||
tsp,
|
|
||||||
)
|
|
||||||
from piker.data.history import (
|
|
||||||
iter_dfs_from_shms,
|
|
||||||
)
|
)
|
||||||
|
from piker import tsp
|
||||||
from . import (
|
from . import (
|
||||||
log,
|
log,
|
||||||
)
|
)
|
||||||
|
@ -189,8 +187,8 @@ def anal(
|
||||||
frame=history,
|
frame=history,
|
||||||
period=period,
|
period=period,
|
||||||
)
|
)
|
||||||
if null_segs:
|
# TODO: do tsp queries to backcend to fill i missing
|
||||||
await tractor.pause()
|
# history and then prolly write it to tsdb!
|
||||||
|
|
||||||
shm_df: pl.DataFrame = await client.as_df(
|
shm_df: pl.DataFrame = await client.as_df(
|
||||||
fqme,
|
fqme,
|
||||||
|
@ -206,18 +204,27 @@ def anal(
|
||||||
diff,
|
diff,
|
||||||
) = tsp.dedupe(shm_df)
|
) = tsp.dedupe(shm_df)
|
||||||
|
|
||||||
|
write_edits: bool = True
|
||||||
|
if (
|
||||||
|
write_edits
|
||||||
|
and (
|
||||||
|
diff
|
||||||
|
or null_segs
|
||||||
|
)
|
||||||
|
):
|
||||||
|
await tractor.pause()
|
||||||
|
|
||||||
if diff:
|
|
||||||
await client.write_ohlcv(
|
await client.write_ohlcv(
|
||||||
fqme,
|
fqme,
|
||||||
ohlcv=deduped,
|
ohlcv=deduped,
|
||||||
timeframe=period,
|
timeframe=period,
|
||||||
)
|
)
|
||||||
|
|
||||||
# TODO: something better with tab completion..
|
else:
|
||||||
# is there something more minimal but nearly as
|
# TODO: something better with tab completion..
|
||||||
# functional as ipython?
|
# is there something more minimal but nearly as
|
||||||
await tractor.pause()
|
# functional as ipython?
|
||||||
|
await tractor.pause()
|
||||||
|
|
||||||
trio.run(main)
|
trio.run(main)
|
||||||
|
|
||||||
|
@ -243,11 +250,11 @@ def ldshm(
|
||||||
),
|
),
|
||||||
):
|
):
|
||||||
df: pl.DataFrame | None = None
|
df: pl.DataFrame | None = None
|
||||||
for shmfile, shm, shm_df in iter_dfs_from_shms(fqme):
|
for shmfile, shm, shm_df in tsp.iter_dfs_from_shms(fqme):
|
||||||
|
|
||||||
# compute ohlc properties for naming
|
# compute ohlc properties for naming
|
||||||
times: np.ndarray = shm.array['time']
|
times: np.ndarray = shm.array['time']
|
||||||
period_s: float = times[-1] - times[-2]
|
period_s: float = float(times[-1] - times[-2])
|
||||||
if period_s < 1.:
|
if period_s < 1.:
|
||||||
raise ValueError(
|
raise ValueError(
|
||||||
f'Something is wrong with time period for {shm}:\n{times}'
|
f'Something is wrong with time period for {shm}:\n{times}'
|
||||||
|
@ -270,11 +277,86 @@ def ldshm(
|
||||||
|
|
||||||
# TODO: maybe only optionally enter this depending
|
# TODO: maybe only optionally enter this depending
|
||||||
# on some CLI flags and/or gap detection?
|
# on some CLI flags and/or gap detection?
|
||||||
if not gaps.is_empty():
|
if (
|
||||||
await tractor.pause()
|
not gaps.is_empty()
|
||||||
|
or null_segs
|
||||||
|
):
|
||||||
|
from piker.ui._remote_ctl import (
|
||||||
|
open_annot_ctl,
|
||||||
|
AnnotCtl,
|
||||||
|
)
|
||||||
|
annot_ctl: AnnotCtl
|
||||||
|
async with open_annot_ctl() as annot_ctl:
|
||||||
|
for i in range(gaps.height):
|
||||||
|
|
||||||
if null_segs:
|
row: pl.DataFrame = gaps[i]
|
||||||
await tractor.pause()
|
|
||||||
|
# TODO: can we eventually remove this
|
||||||
|
# once we figure out why the epoch cols
|
||||||
|
# don't match?
|
||||||
|
iend: int = row['index'][0]
|
||||||
|
# dt: datetime = row['dt'][0]
|
||||||
|
# dt_prev: datetime = row['dt_prev'][0]
|
||||||
|
|
||||||
|
# the gap's right-most bar's OPEN value
|
||||||
|
# at that time (sample) step.
|
||||||
|
# dt_end_t: float = dt.timestamp()
|
||||||
|
|
||||||
|
# TODO: FIX HOW/WHY these aren't matching
|
||||||
|
# and are instead off by 4hours (EST
|
||||||
|
# vs. UTC?!?!)
|
||||||
|
# end_t: float = row['time']
|
||||||
|
# assert (
|
||||||
|
# dt.timestamp()
|
||||||
|
# ==
|
||||||
|
# end_t
|
||||||
|
# )
|
||||||
|
|
||||||
|
# the gap's left-most bar's CLOSE value
|
||||||
|
# at that time (sample) step.
|
||||||
|
|
||||||
|
prev_r: pl.DataFrame = df.filter(
|
||||||
|
pl.col('index') == gaps[0]['index'] - 1
|
||||||
|
)
|
||||||
|
istart: int = prev_r['index'][0]
|
||||||
|
# dt_start_t: float = dt_prev.timestamp()
|
||||||
|
|
||||||
|
# start_t: float = prev_r['time']
|
||||||
|
# assert (
|
||||||
|
# dt_start_t
|
||||||
|
# ==
|
||||||
|
# start_t
|
||||||
|
# )
|
||||||
|
|
||||||
|
# TODO: implement px-col width measure
|
||||||
|
# and ensure at least as many px-cols
|
||||||
|
# shown per rect as configured by user.
|
||||||
|
gap_w: float = abs((iend - istart))
|
||||||
|
# await tractor.pause()
|
||||||
|
if gap_w < 6:
|
||||||
|
margin: float = 6
|
||||||
|
iend += margin
|
||||||
|
istart -= margin
|
||||||
|
|
||||||
|
ro: tuple[float, float] = (
|
||||||
|
# dt_end_t,
|
||||||
|
iend,
|
||||||
|
row['open'][0],
|
||||||
|
)
|
||||||
|
lc: tuple[float, float] = (
|
||||||
|
# dt_start_t,
|
||||||
|
istart,
|
||||||
|
prev_r['close'][0],
|
||||||
|
)
|
||||||
|
|
||||||
|
aid: int = await annot_ctl.add_rect(
|
||||||
|
fqme=fqme,
|
||||||
|
timeframe=period_s,
|
||||||
|
start_pos=lc,
|
||||||
|
end_pos=ro,
|
||||||
|
)
|
||||||
|
assert aid
|
||||||
|
await tractor.pause()
|
||||||
|
|
||||||
# write to parquet file?
|
# write to parquet file?
|
||||||
if write_parquet:
|
if write_parquet:
|
||||||
|
|
|
@ -56,8 +56,6 @@ from datetime import datetime
|
||||||
from pathlib import Path
|
from pathlib import Path
|
||||||
import time
|
import time
|
||||||
|
|
||||||
# from bidict import bidict
|
|
||||||
# import tractor
|
|
||||||
import numpy as np
|
import numpy as np
|
||||||
import polars as pl
|
import polars as pl
|
||||||
from pendulum import (
|
from pendulum import (
|
||||||
|
@ -65,10 +63,10 @@ from pendulum import (
|
||||||
)
|
)
|
||||||
|
|
||||||
from piker import config
|
from piker import config
|
||||||
|
from piker import tsp
|
||||||
from piker.data import (
|
from piker.data import (
|
||||||
def_iohlcv_fields,
|
def_iohlcv_fields,
|
||||||
ShmArray,
|
ShmArray,
|
||||||
tsp,
|
|
||||||
)
|
)
|
||||||
from piker.log import get_logger
|
from piker.log import get_logger
|
||||||
from . import TimeseriesNotFound
|
from . import TimeseriesNotFound
|
||||||
|
|
|
@ -32,6 +32,7 @@ from __future__ import annotations
|
||||||
from datetime import datetime
|
from datetime import datetime
|
||||||
from functools import partial
|
from functools import partial
|
||||||
from pathlib import Path
|
from pathlib import Path
|
||||||
|
from pprint import pformat
|
||||||
from types import ModuleType
|
from types import ModuleType
|
||||||
from typing import (
|
from typing import (
|
||||||
Callable,
|
Callable,
|
||||||
|
@ -53,25 +54,64 @@ import polars as pl
|
||||||
from ..accounting import (
|
from ..accounting import (
|
||||||
MktPair,
|
MktPair,
|
||||||
)
|
)
|
||||||
from ._util import (
|
from ..data._util import (
|
||||||
log,
|
log,
|
||||||
)
|
)
|
||||||
from ._sharedmem import (
|
from ..data._sharedmem import (
|
||||||
maybe_open_shm_array,
|
maybe_open_shm_array,
|
||||||
ShmArray,
|
ShmArray,
|
||||||
)
|
)
|
||||||
from ._source import def_iohlcv_fields
|
from ..data._source import def_iohlcv_fields
|
||||||
from ._sampling import (
|
from ..data._sampling import (
|
||||||
open_sample_stream,
|
open_sample_stream,
|
||||||
)
|
)
|
||||||
from .tsp import (
|
from ._anal import (
|
||||||
dedupe,
|
|
||||||
get_null_segs,
|
get_null_segs,
|
||||||
iter_null_segs,
|
iter_null_segs,
|
||||||
sort_diff,
|
|
||||||
Frame,
|
Frame,
|
||||||
# Seq,
|
Seq,
|
||||||
|
|
||||||
|
# codec-ish
|
||||||
|
np2pl,
|
||||||
|
pl2np,
|
||||||
|
|
||||||
|
# `numpy` only
|
||||||
|
slice_from_time,
|
||||||
|
|
||||||
|
# `polars` specific
|
||||||
|
dedupe,
|
||||||
|
with_dts,
|
||||||
|
detect_time_gaps,
|
||||||
|
sort_diff,
|
||||||
|
|
||||||
|
# TODO:
|
||||||
|
detect_price_gaps
|
||||||
)
|
)
|
||||||
|
|
||||||
|
__all__: list[str] = [
|
||||||
|
'dedupe',
|
||||||
|
'get_null_segs',
|
||||||
|
'iter_null_segs',
|
||||||
|
'sort_diff',
|
||||||
|
'slice_from_time',
|
||||||
|
'Frame',
|
||||||
|
'Seq',
|
||||||
|
|
||||||
|
'np2pl',
|
||||||
|
'pl2np',
|
||||||
|
|
||||||
|
'slice_from_time',
|
||||||
|
|
||||||
|
'with_dts',
|
||||||
|
'detect_time_gaps',
|
||||||
|
'sort_diff',
|
||||||
|
|
||||||
|
# TODO:
|
||||||
|
'detect_price_gaps'
|
||||||
|
]
|
||||||
|
|
||||||
|
# TODO: break up all this shite into submods!
|
||||||
from ..brokers._util import (
|
from ..brokers._util import (
|
||||||
DataUnavailable,
|
DataUnavailable,
|
||||||
)
|
)
|
||||||
|
@ -229,16 +269,20 @@ async def maybe_fill_null_segments(
|
||||||
# - remember that in the display side, only refersh this
|
# - remember that in the display side, only refersh this
|
||||||
# if the respective history is actually "in view".
|
# if the respective history is actually "in view".
|
||||||
# loop
|
# loop
|
||||||
await sampler_stream.send({
|
try:
|
||||||
'broadcast_all': {
|
await sampler_stream.send({
|
||||||
|
'broadcast_all': {
|
||||||
|
|
||||||
# XXX NOTE XXX: see the
|
# XXX NOTE XXX: see the
|
||||||
# `.ui._display.increment_history_view()` if block
|
# `.ui._display.increment_history_view()` if block
|
||||||
# that looks for this info to FORCE a hard viz
|
# that looks for this info to FORCE a hard viz
|
||||||
# redraw!
|
# redraw!
|
||||||
'backfilling': (mkt.fqme, timeframe),
|
'backfilling': (mkt.fqme, timeframe),
|
||||||
},
|
},
|
||||||
})
|
})
|
||||||
|
except tractor.ContextCancelled:
|
||||||
|
# log.exception
|
||||||
|
await tractor.pause()
|
||||||
|
|
||||||
null_segs_detected.set()
|
null_segs_detected.set()
|
||||||
# RECHECK for more null-gaps
|
# RECHECK for more null-gaps
|
||||||
|
@ -252,39 +296,68 @@ async def maybe_fill_null_segments(
|
||||||
and
|
and
|
||||||
len(null_segs[-1])
|
len(null_segs[-1])
|
||||||
):
|
):
|
||||||
await tractor.pause()
|
(
|
||||||
|
iabs_slices,
|
||||||
|
iabs_zero_rows,
|
||||||
|
zero_t,
|
||||||
|
) = null_segs
|
||||||
|
log.warning(
|
||||||
|
f'{len(iabs_slices)} NULL TIME SEGMENTS DETECTED!\n'
|
||||||
|
f'{pformat(iabs_slices)}'
|
||||||
|
)
|
||||||
|
|
||||||
array = shm.array
|
# TODO: always backfill gaps with the earliest (price) datum's
|
||||||
zeros = array[array['low'] == 0]
|
# value to avoid the y-ranger including zeros and completely
|
||||||
|
# stretching the y-axis..
|
||||||
# always backfill gaps with the earliest (price) datum's
|
# array: np.ndarray = shm.array
|
||||||
# value to avoid the y-ranger including zeros and completely
|
# zeros = array[array['low'] == 0]
|
||||||
# stretching the y-axis..
|
ohlc_fields: list[str] = [
|
||||||
if 0 < zeros.size:
|
|
||||||
zeros[[
|
|
||||||
'open',
|
'open',
|
||||||
'high',
|
'high',
|
||||||
'low',
|
'low',
|
||||||
'close',
|
'close',
|
||||||
]] = shm._array[zeros['index'][0] - 1]['close']
|
]
|
||||||
|
|
||||||
# TODO: interatively step through any remaining
|
for istart, istop in iabs_slices:
|
||||||
# time-gaps/null-segments and spawn piecewise backfiller
|
|
||||||
# tasks in a nursery?
|
# get view into buffer for null-segment
|
||||||
# -[ ] not sure that's going to work so well on the ib
|
gap: np.ndarray = shm._array[istart:istop]
|
||||||
# backend but worth a shot?
|
|
||||||
# -[ ] mk new history connections to make it properly
|
# copy the oldest OHLC samples forward
|
||||||
# parallel possible no matter the backend?
|
gap[ohlc_fields] = shm._array[istart]['close']
|
||||||
# -[ ] fill algo: do queries in alternating "latest, then
|
|
||||||
# earliest, then latest.. etc?"
|
start_t: float = shm._array[istart]['time']
|
||||||
# if (
|
t_diff: float = (istop - istart)*timeframe
|
||||||
# next_end_dt not in frame[
|
gap['time'] = np.arange(
|
||||||
# ):
|
start=start_t,
|
||||||
# pass
|
stop=start_t + t_diff,
|
||||||
|
step=timeframe,
|
||||||
|
)
|
||||||
|
|
||||||
|
await sampler_stream.send({
|
||||||
|
'broadcast_all': {
|
||||||
|
|
||||||
|
# XXX NOTE XXX: see the
|
||||||
|
# `.ui._display.increment_history_view()` if block
|
||||||
|
# that looks for this info to FORCE a hard viz
|
||||||
|
# redraw!
|
||||||
|
'backfilling': (mkt.fqme, timeframe),
|
||||||
|
},
|
||||||
|
})
|
||||||
|
|
||||||
|
# TODO: interatively step through any remaining
|
||||||
|
# time-gaps/null-segments and spawn piecewise backfiller
|
||||||
|
# tasks in a nursery?
|
||||||
|
# -[ ] not sure that's going to work so well on the ib
|
||||||
|
# backend but worth a shot?
|
||||||
|
# -[ ] mk new history connections to make it properly
|
||||||
|
# parallel possible no matter the backend?
|
||||||
|
# -[ ] fill algo: do queries in alternating "latest, then
|
||||||
|
# earliest, then latest.. etc?"
|
||||||
|
# await tractor.pause()
|
||||||
|
|
||||||
|
|
||||||
async def start_backfill(
|
async def start_backfill(
|
||||||
tn: trio.Nursery,
|
|
||||||
get_hist,
|
get_hist,
|
||||||
mod: ModuleType,
|
mod: ModuleType,
|
||||||
mkt: MktPair,
|
mkt: MktPair,
|
||||||
|
@ -338,7 +411,6 @@ async def start_backfill(
|
||||||
# settings above when the tsdb is detected as being empty.
|
# settings above when the tsdb is detected as being empty.
|
||||||
backfill_until_dt = backfill_from_dt.subtract(**period_duration)
|
backfill_until_dt = backfill_from_dt.subtract(**period_duration)
|
||||||
|
|
||||||
|
|
||||||
# STAGE NOTE: "backward history gap filling":
|
# STAGE NOTE: "backward history gap filling":
|
||||||
# - we push to the shm buffer until we have history back
|
# - we push to the shm buffer until we have history back
|
||||||
# until the latest entry loaded from the tsdb's table B)
|
# until the latest entry loaded from the tsdb's table B)
|
||||||
|
@ -682,6 +754,8 @@ async def back_load_from_tsdb(
|
||||||
|
|
||||||
|
|
||||||
async def push_latest_frame(
|
async def push_latest_frame(
|
||||||
|
# box-type only that should get packed with the datetime
|
||||||
|
# objects received for the latest history frame
|
||||||
dt_eps: list[DateTime, DateTime],
|
dt_eps: list[DateTime, DateTime],
|
||||||
shm: ShmArray,
|
shm: ShmArray,
|
||||||
get_hist: Callable[
|
get_hist: Callable[
|
||||||
|
@ -691,8 +765,11 @@ async def push_latest_frame(
|
||||||
timeframe: float,
|
timeframe: float,
|
||||||
config: dict,
|
config: dict,
|
||||||
|
|
||||||
task_status: TaskStatus[None] = trio.TASK_STATUS_IGNORED,
|
task_status: TaskStatus[
|
||||||
):
|
Exception | list[datetime, datetime]
|
||||||
|
] = trio.TASK_STATUS_IGNORED,
|
||||||
|
|
||||||
|
) -> list[datetime, datetime] | None:
|
||||||
# get latest query's worth of history all the way
|
# get latest query's worth of history all the way
|
||||||
# back to what is recorded in the tsdb
|
# back to what is recorded in the tsdb
|
||||||
try:
|
try:
|
||||||
|
@ -709,17 +786,19 @@ async def push_latest_frame(
|
||||||
mr_start_dt,
|
mr_start_dt,
|
||||||
mr_end_dt,
|
mr_end_dt,
|
||||||
])
|
])
|
||||||
|
task_status.started(dt_eps)
|
||||||
|
|
||||||
# XXX: timeframe not supported for backend (since
|
# XXX: timeframe not supported for backend (since
|
||||||
# above exception type), terminate immediately since
|
# above exception type), terminate immediately since
|
||||||
# there's no backfilling possible.
|
# there's no backfilling possible.
|
||||||
except DataUnavailable:
|
except DataUnavailable:
|
||||||
task_status.started()
|
task_status.started(None)
|
||||||
|
|
||||||
if timeframe > 1:
|
if timeframe > 1:
|
||||||
await tractor.pause()
|
await tractor.pause()
|
||||||
|
|
||||||
return
|
# prolly tf not supported
|
||||||
|
return None
|
||||||
|
|
||||||
# NOTE: on the first history, most recent history
|
# NOTE: on the first history, most recent history
|
||||||
# frame we PREPEND from the current shm ._last index
|
# frame we PREPEND from the current shm ._last index
|
||||||
|
@ -731,11 +810,16 @@ async def push_latest_frame(
|
||||||
prepend=True, # append on first frame
|
prepend=True, # append on first frame
|
||||||
)
|
)
|
||||||
|
|
||||||
|
return dt_eps
|
||||||
|
|
||||||
|
|
||||||
async def load_tsdb_hist(
|
async def load_tsdb_hist(
|
||||||
storage: StorageClient,
|
storage: StorageClient,
|
||||||
mkt: MktPair,
|
mkt: MktPair,
|
||||||
timeframe: float,
|
timeframe: float,
|
||||||
|
|
||||||
|
task_status: TaskStatus[None] = trio.TASK_STATUS_IGNORED,
|
||||||
|
|
||||||
) -> tuple[
|
) -> tuple[
|
||||||
np.ndarray,
|
np.ndarray,
|
||||||
DateTime,
|
DateTime,
|
||||||
|
@ -821,48 +905,63 @@ async def tsdb_backfill(
|
||||||
config,
|
config,
|
||||||
)
|
)
|
||||||
|
|
||||||
# tell parent task to continue
|
|
||||||
# TODO: really we'd want this the other way with the
|
|
||||||
# tsdb load happening asap and the since the latest
|
|
||||||
# frame query will normally be the main source of
|
|
||||||
# latency?
|
|
||||||
task_status.started()
|
|
||||||
|
|
||||||
tsdb_entry: tuple = await load_tsdb_hist(
|
tsdb_entry: tuple = await load_tsdb_hist(
|
||||||
storage,
|
storage,
|
||||||
mkt,
|
mkt,
|
||||||
timeframe,
|
timeframe,
|
||||||
)
|
)
|
||||||
|
|
||||||
|
# tell parent task to continue
|
||||||
|
# TODO: really we'd want this the other way with the
|
||||||
|
# tsdb load happening asap and the since the latest
|
||||||
|
# frame query will normally be the main source of
|
||||||
|
# latency?
|
||||||
|
task_status.started()
|
||||||
|
|
||||||
# NOTE: iabs to start backfilling from, reverse chronological,
|
# NOTE: iabs to start backfilling from, reverse chronological,
|
||||||
# ONLY AFTER the first history frame has been pushed to
|
# ONLY AFTER the first history frame has been pushed to
|
||||||
# mem!
|
# mem!
|
||||||
backfill_gap_from_shm_index: int = shm._first.value + 1
|
backfill_gap_from_shm_index: int = shm._first.value + 1
|
||||||
|
|
||||||
(
|
# Prepend any tsdb history into the rt-shm-buffer which
|
||||||
mr_start_dt,
|
# should NOW be getting filled with the most recent history
|
||||||
mr_end_dt,
|
# pulled from the data-backend.
|
||||||
) = dt_eps
|
if dt_eps:
|
||||||
|
# well then, unpack the latest (gap) backfilled frame dts
|
||||||
|
(
|
||||||
|
mr_start_dt,
|
||||||
|
mr_end_dt,
|
||||||
|
) = dt_eps
|
||||||
|
|
||||||
async with trio.open_nursery() as tn:
|
# NOTE: when there's no offline data, there's 2 cases:
|
||||||
|
# - data backend doesn't support timeframe/sample
|
||||||
|
# period (in which case `dt_eps` should be `None` and
|
||||||
|
# we shouldn't be here!), or
|
||||||
|
# - no prior history has been stored (yet) and we need
|
||||||
|
# todo full backfill of the history now.
|
||||||
|
if tsdb_entry is None:
|
||||||
|
# indicate to backfill task to fill the whole
|
||||||
|
# shm buffer as much as it can!
|
||||||
|
last_tsdb_dt = None
|
||||||
|
|
||||||
# Prepend any tsdb history to the shm buffer which should
|
# there's existing tsdb history from (offline) storage
|
||||||
# now be full of the most recent history pulled from the
|
# so only backfill the gap between the
|
||||||
# backend's last frame.
|
# most-recent-frame (mrf) and that latest sample.
|
||||||
if tsdb_entry:
|
else:
|
||||||
(
|
(
|
||||||
tsdb_history,
|
tsdb_history,
|
||||||
first_tsdb_dt,
|
first_tsdb_dt,
|
||||||
last_tsdb_dt,
|
last_tsdb_dt,
|
||||||
) = tsdb_entry
|
) = tsdb_entry
|
||||||
|
|
||||||
# if there is a gap to backfill from the first
|
# if there is a gap to backfill from the first
|
||||||
# history frame until the last datum loaded from the tsdb
|
# history frame until the last datum loaded from the tsdb
|
||||||
# continue that now in the background
|
# continue that now in the background
|
||||||
|
async with trio.open_nursery() as tn:
|
||||||
|
|
||||||
bf_done = await tn.start(
|
bf_done = await tn.start(
|
||||||
partial(
|
partial(
|
||||||
start_backfill,
|
start_backfill,
|
||||||
tn=tn,
|
|
||||||
get_hist=get_hist,
|
get_hist=get_hist,
|
||||||
mod=mod,
|
mod=mod,
|
||||||
mkt=mkt,
|
mkt=mkt,
|
||||||
|
@ -871,102 +970,107 @@ async def tsdb_backfill(
|
||||||
|
|
||||||
backfill_from_shm_index=backfill_gap_from_shm_index,
|
backfill_from_shm_index=backfill_gap_from_shm_index,
|
||||||
backfill_from_dt=mr_start_dt,
|
backfill_from_dt=mr_start_dt,
|
||||||
|
|
||||||
sampler_stream=sampler_stream,
|
sampler_stream=sampler_stream,
|
||||||
backfill_until_dt=last_tsdb_dt,
|
backfill_until_dt=last_tsdb_dt,
|
||||||
|
|
||||||
storage=storage,
|
storage=storage,
|
||||||
write_tsdb=True,
|
write_tsdb=True,
|
||||||
)
|
)
|
||||||
)
|
)
|
||||||
|
nulls_detected: trio.Event | None = None
|
||||||
|
if last_tsdb_dt is not None:
|
||||||
|
# calc the index from which the tsdb data should be
|
||||||
|
# prepended, presuming there is a gap between the
|
||||||
|
# latest frame (loaded/read above) and the latest
|
||||||
|
# sample loaded from the tsdb.
|
||||||
|
backfill_diff: Duration = mr_start_dt - last_tsdb_dt
|
||||||
|
offset_s: float = backfill_diff.in_seconds()
|
||||||
|
|
||||||
# calc the index from which the tsdb data should be
|
# XXX EDGE CASEs: the most recent frame overlaps with
|
||||||
# prepended, presuming there is a gap between the
|
# prior tsdb history!!
|
||||||
# latest frame (loaded/read above) and the latest
|
# - so the latest frame's start time is earlier then
|
||||||
# sample loaded from the tsdb.
|
# the tsdb's latest sample.
|
||||||
backfill_diff: Duration = mr_start_dt - last_tsdb_dt
|
# - alternatively this may also more generally occur
|
||||||
offset_s: float = backfill_diff.in_seconds()
|
# when the venue was closed (say over the weeknd)
|
||||||
|
# causing a timeseries gap, AND the query frames size
|
||||||
|
# (eg. for ib's 1s we rx 2k datums ~= 33.33m) IS
|
||||||
|
# GREATER THAN the current venue-market's operating
|
||||||
|
# session (time) we will receive datums from BEFORE THE
|
||||||
|
# CLOSURE GAP and thus the `offset_s` value will be
|
||||||
|
# NEGATIVE! In this case we need to ensure we don't try
|
||||||
|
# to push datums that have already been recorded in the
|
||||||
|
# tsdb. In this case we instead only retreive and push
|
||||||
|
# the series portion missing from the db's data set.
|
||||||
|
# if offset_s < 0:
|
||||||
|
# non_overlap_diff: Duration = mr_end_dt - last_tsdb_dt
|
||||||
|
# non_overlap_offset_s: float = backfill_diff.in_seconds()
|
||||||
|
|
||||||
# XXX EDGE CASEs: the most recent frame overlaps with
|
offset_samples: int = round(offset_s / timeframe)
|
||||||
# prior tsdb history!!
|
|
||||||
# - so the latest frame's start time is earlier then
|
|
||||||
# the tsdb's latest sample.
|
|
||||||
# - alternatively this may also more generally occur
|
|
||||||
# when the venue was closed (say over the weeknd)
|
|
||||||
# causing a timeseries gap, AND the query frames size
|
|
||||||
# (eg. for ib's 1s we rx 2k datums ~= 33.33m) IS
|
|
||||||
# GREATER THAN the current venue-market's operating
|
|
||||||
# session (time) we will receive datums from BEFORE THE
|
|
||||||
# CLOSURE GAP and thus the `offset_s` value will be
|
|
||||||
# NEGATIVE! In this case we need to ensure we don't try
|
|
||||||
# to push datums that have already been recorded in the
|
|
||||||
# tsdb. In this case we instead only retreive and push
|
|
||||||
# the series portion missing from the db's data set.
|
|
||||||
# if offset_s < 0:
|
|
||||||
# non_overlap_diff: Duration = mr_end_dt - last_tsdb_dt
|
|
||||||
# non_overlap_offset_s: float = backfill_diff.in_seconds()
|
|
||||||
|
|
||||||
offset_samples: int = round(offset_s / timeframe)
|
# TODO: see if there's faster multi-field reads:
|
||||||
|
# https://numpy.org/doc/stable/user/basics.rec.html#accessing-multiple-fields
|
||||||
|
# re-index with a `time` and index field
|
||||||
|
if offset_s > 0:
|
||||||
|
# NOTE XXX: ONLY when there is an actual gap
|
||||||
|
# between the earliest sample in the latest history
|
||||||
|
# frame do we want to NOT stick the latest tsdb
|
||||||
|
# history adjacent to that latest frame!
|
||||||
|
prepend_start = shm._first.value - offset_samples + 1
|
||||||
|
to_push = tsdb_history[-prepend_start:]
|
||||||
|
else:
|
||||||
|
# when there is overlap we want to remove the
|
||||||
|
# overlapping samples from the tsdb portion (taking
|
||||||
|
# instead the latest frame's values since THEY
|
||||||
|
# SHOULD BE THE SAME) and prepend DIRECTLY adjacent
|
||||||
|
# to the latest frame!
|
||||||
|
# TODO: assert the overlap segment array contains
|
||||||
|
# the same values!?!
|
||||||
|
prepend_start = shm._first.value
|
||||||
|
to_push = tsdb_history[-(shm._first.value):offset_samples - 1]
|
||||||
|
|
||||||
# TODO: see if there's faster multi-field reads:
|
# tsdb history is so far in the past we can't fit it in
|
||||||
# https://numpy.org/doc/stable/user/basics.rec.html#accessing-multiple-fields
|
# shm buffer space so simply don't load it!
|
||||||
# re-index with a `time` and index field
|
if prepend_start > 0:
|
||||||
if offset_s > 0:
|
shm.push(
|
||||||
# NOTE XXX: ONLY when there is an actual gap
|
to_push,
|
||||||
# between the earliest sample in the latest history
|
|
||||||
# frame do we want to NOT stick the latest tsdb
|
|
||||||
# history adjacent to that latest frame!
|
|
||||||
prepend_start = shm._first.value - offset_samples + 1
|
|
||||||
to_push = tsdb_history[-prepend_start:]
|
|
||||||
else:
|
|
||||||
# when there is overlap we want to remove the
|
|
||||||
# overlapping samples from the tsdb portion (taking
|
|
||||||
# instead the latest frame's values since THEY
|
|
||||||
# SHOULD BE THE SAME) and prepend DIRECTLY adjacent
|
|
||||||
# to the latest frame!
|
|
||||||
# TODO: assert the overlap segment array contains
|
|
||||||
# the same values!?!
|
|
||||||
prepend_start = shm._first.value
|
|
||||||
to_push = tsdb_history[-(shm._first.value):offset_samples - 1]
|
|
||||||
|
|
||||||
# tsdb history is so far in the past we can't fit it in
|
# insert the history pre a "days worth" of samples
|
||||||
# shm buffer space so simply don't load it!
|
# to leave some real-time buffer space at the end.
|
||||||
if prepend_start > 0:
|
prepend=True,
|
||||||
shm.push(
|
# update_first=False,
|
||||||
to_push,
|
start=prepend_start,
|
||||||
|
field_map=storemod.ohlc_key_map,
|
||||||
|
)
|
||||||
|
|
||||||
# insert the history pre a "days worth" of samples
|
log.info(f'Loaded {to_push.shape} datums from storage')
|
||||||
# to leave some real-time buffer space at the end.
|
|
||||||
prepend=True,
|
|
||||||
# update_first=False,
|
|
||||||
start=prepend_start,
|
|
||||||
field_map=storemod.ohlc_key_map,
|
|
||||||
)
|
|
||||||
|
|
||||||
log.info(f'Loaded {to_push.shape} datums from storage')
|
# NOTE: ASYNC-conduct tsdb timestamp gap detection and backfill any
|
||||||
|
# seemingly missing (null-time) segments..
|
||||||
|
# TODO: ideally these can never exist!
|
||||||
|
# -[ ] somehow it seems sometimes we're writing zero-ed
|
||||||
|
# segments to tsdbs during teardown?
|
||||||
|
# -[ ] can we ensure that the backcfiller tasks do this
|
||||||
|
# work PREVENTAVELY instead?
|
||||||
|
# -[ ] fill in non-zero epoch time values ALWAYS!
|
||||||
|
# await maybe_fill_null_segments(
|
||||||
|
nulls_detected: trio.Event = await tn.start(partial(
|
||||||
|
maybe_fill_null_segments,
|
||||||
|
|
||||||
# NOTE: ASYNC-conduct tsdb timestamp gap detection and backfill any
|
shm=shm,
|
||||||
# seemingly missing (null-time) segments..
|
timeframe=timeframe,
|
||||||
# TODO: ideally these can never exist!
|
get_hist=get_hist,
|
||||||
# -[ ] somehow it seems sometimes we're writing zero-ed
|
sampler_stream=sampler_stream,
|
||||||
# segments to tsdbs during teardown?
|
mkt=mkt,
|
||||||
# -[ ] can we ensure that the backcfiller tasks do this
|
))
|
||||||
# work PREVENTAVELY instead?
|
|
||||||
# -[ ] fill in non-zero epoch time values ALWAYS!
|
|
||||||
# await maybe_fill_null_segments(
|
|
||||||
nulls_detected: trio.Event = await tn.start(partial(
|
|
||||||
maybe_fill_null_segments,
|
|
||||||
|
|
||||||
shm=shm,
|
# 2nd nursery END
|
||||||
timeframe=timeframe,
|
|
||||||
get_hist=get_hist,
|
|
||||||
sampler_stream=sampler_stream,
|
|
||||||
mkt=mkt,
|
|
||||||
))
|
|
||||||
|
|
||||||
|
# TODO: who would want to?
|
||||||
|
if nulls_detected:
|
||||||
|
await nulls_detected.wait()
|
||||||
|
|
||||||
# TODO: who would want to?
|
await bf_done.wait()
|
||||||
await nulls_detected.wait()
|
|
||||||
|
|
||||||
await bf_done.wait()
|
|
||||||
# TODO: maybe start history anal and load missing "history
|
# TODO: maybe start history anal and load missing "history
|
||||||
# gaps" via backend..
|
# gaps" via backend..
|
||||||
|
|
||||||
|
@ -1010,7 +1114,6 @@ async def tsdb_backfill(
|
||||||
|
|
||||||
async def manage_history(
|
async def manage_history(
|
||||||
mod: ModuleType,
|
mod: ModuleType,
|
||||||
bus: _FeedsBus,
|
|
||||||
mkt: MktPair,
|
mkt: MktPair,
|
||||||
some_data_ready: trio.Event,
|
some_data_ready: trio.Event,
|
||||||
feed_is_live: trio.Event,
|
feed_is_live: trio.Event,
|
||||||
|
@ -1167,7 +1270,6 @@ async def manage_history(
|
||||||
tsdb_backfill,
|
tsdb_backfill,
|
||||||
mod=mod,
|
mod=mod,
|
||||||
storemod=storemod,
|
storemod=storemod,
|
||||||
# bus,
|
|
||||||
storage=client,
|
storage=client,
|
||||||
mkt=mkt,
|
mkt=mkt,
|
||||||
shm=tf2mem[timeframe],
|
shm=tf2mem[timeframe],
|
||||||
|
@ -1252,13 +1354,11 @@ def iter_dfs_from_shms(
|
||||||
assert not opened
|
assert not opened
|
||||||
ohlcv = shm.array
|
ohlcv = shm.array
|
||||||
|
|
||||||
from ..data import tsp
|
from ._anal import np2pl
|
||||||
df: pl.DataFrame = tsp.np2pl(ohlcv)
|
df: pl.DataFrame = np2pl(ohlcv)
|
||||||
|
|
||||||
yield (
|
yield (
|
||||||
shmfile,
|
shmfile,
|
||||||
shm,
|
shm,
|
||||||
df,
|
df,
|
||||||
)
|
)
|
||||||
|
|
||||||
|
|
|
@ -319,9 +319,8 @@ def get_null_segs(
|
||||||
if num_gaps < 1:
|
if num_gaps < 1:
|
||||||
if absi_zeros.size > 1:
|
if absi_zeros.size > 1:
|
||||||
absi_zsegs = [[
|
absi_zsegs = [[
|
||||||
# see `get_hist()` in backend, should ALWAYS be
|
# TODO: maybe mk these max()/min() limits func
|
||||||
# able to handle a `start_dt=None`!
|
# consts instead of called more then once?
|
||||||
# None,
|
|
||||||
max(
|
max(
|
||||||
absi_zeros[0] - 1,
|
absi_zeros[0] - 1,
|
||||||
0,
|
0,
|
||||||
|
@ -359,7 +358,10 @@ def get_null_segs(
|
||||||
# corresponding to the first zero-segment's row, we add it
|
# corresponding to the first zero-segment's row, we add it
|
||||||
# manually here.
|
# manually here.
|
||||||
absi_zsegs.append([
|
absi_zsegs.append([
|
||||||
absi_zeros[0] - 1,
|
max(
|
||||||
|
absi_zeros[0] - 1,
|
||||||
|
0,
|
||||||
|
),
|
||||||
None,
|
None,
|
||||||
])
|
])
|
||||||
|
|
||||||
|
@ -400,14 +402,18 @@ def get_null_segs(
|
||||||
|
|
||||||
else:
|
else:
|
||||||
if 0 < num_gaps < 2:
|
if 0 < num_gaps < 2:
|
||||||
absi_zsegs[-1][1] = absi_zeros[-1] + 1
|
absi_zsegs[-1][1] = min(
|
||||||
|
absi_zeros[-1] + 1,
|
||||||
|
frame['index'][-1],
|
||||||
|
)
|
||||||
|
|
||||||
iabs_first: int = frame['index'][0]
|
iabs_first: int = frame['index'][0]
|
||||||
for start, end in absi_zsegs:
|
for start, end in absi_zsegs:
|
||||||
|
|
||||||
ts_start: float = times[start - iabs_first]
|
ts_start: float = times[start - iabs_first]
|
||||||
ts_end: float = times[end - iabs_first]
|
ts_end: float = times[end - iabs_first]
|
||||||
if (
|
if (
|
||||||
ts_start == 0
|
(ts_start == 0 and not start == 0)
|
||||||
or
|
or
|
||||||
ts_end == 0
|
ts_end == 0
|
||||||
):
|
):
|
||||||
|
@ -451,11 +457,13 @@ def iter_null_segs(
|
||||||
],
|
],
|
||||||
None,
|
None,
|
||||||
]:
|
]:
|
||||||
if null_segs is None:
|
if not (
|
||||||
null_segs: tuple = get_null_segs(
|
null_segs := get_null_segs(
|
||||||
frame,
|
frame,
|
||||||
period=timeframe,
|
period=timeframe,
|
||||||
)
|
)
|
||||||
|
):
|
||||||
|
return
|
||||||
|
|
||||||
absi_pairs_zsegs: list[list[float, float]]
|
absi_pairs_zsegs: list[list[float, float]]
|
||||||
izeros: Seq
|
izeros: Seq
|
||||||
|
@ -502,6 +510,7 @@ def iter_null_segs(
|
||||||
)
|
)
|
||||||
|
|
||||||
|
|
||||||
|
# TODO: move to ._pl_anal
|
||||||
def with_dts(
|
def with_dts(
|
||||||
df: pl.DataFrame,
|
df: pl.DataFrame,
|
||||||
time_col: str = 'time',
|
time_col: str = 'time',
|
||||||
|
@ -517,7 +526,7 @@ def with_dts(
|
||||||
pl.from_epoch(pl.col(time_col)).alias('dt'),
|
pl.from_epoch(pl.col(time_col)).alias('dt'),
|
||||||
]).with_columns([
|
]).with_columns([
|
||||||
pl.from_epoch(
|
pl.from_epoch(
|
||||||
pl.col(f'{time_col}_prev')
|
column=pl.col(f'{time_col}_prev'),
|
||||||
).alias('dt_prev'),
|
).alias('dt_prev'),
|
||||||
pl.col('dt').diff().alias('dt_diff'),
|
pl.col('dt').diff().alias('dt_diff'),
|
||||||
]) #.with_columns(
|
]) #.with_columns(
|
||||||
|
@ -525,19 +534,6 @@ def with_dts(
|
||||||
# )
|
# )
|
||||||
|
|
||||||
|
|
||||||
def dedup_dt(
|
|
||||||
df: pl.DataFrame,
|
|
||||||
) -> pl.DataFrame:
|
|
||||||
'''
|
|
||||||
Drop duplicate date-time rows (normally from an OHLC frame).
|
|
||||||
|
|
||||||
'''
|
|
||||||
return df.unique(
|
|
||||||
subset=['dt'],
|
|
||||||
maintain_order=True,
|
|
||||||
)
|
|
||||||
|
|
||||||
|
|
||||||
t_unit: Literal = Literal[
|
t_unit: Literal = Literal[
|
||||||
'days',
|
'days',
|
||||||
'hours',
|
'hours',
|
||||||
|
@ -651,7 +647,11 @@ def dedupe(src_df: pl.DataFrame) -> tuple[
|
||||||
)
|
)
|
||||||
|
|
||||||
# remove duplicated datetime samples/sections
|
# remove duplicated datetime samples/sections
|
||||||
deduped: pl.DataFrame = dedup_dt(df)
|
deduped: pl.DataFrame = df.unique(
|
||||||
|
subset=['dt'],
|
||||||
|
maintain_order=True,
|
||||||
|
)
|
||||||
|
|
||||||
deduped_gaps = detect_time_gaps(deduped)
|
deduped_gaps = detect_time_gaps(deduped)
|
||||||
|
|
||||||
diff: int = (
|
diff: int = (
|
|
@ -272,10 +272,15 @@ class ContentsLabels:
|
||||||
x_in: int,
|
x_in: int,
|
||||||
|
|
||||||
) -> None:
|
) -> None:
|
||||||
for chart, name, label, update in self._labels:
|
for (
|
||||||
|
chart,
|
||||||
|
name,
|
||||||
|
label,
|
||||||
|
update,
|
||||||
|
)in self._labels:
|
||||||
|
|
||||||
viz = chart.get_viz(name)
|
viz = chart.get_viz(name)
|
||||||
array = viz.shm.array
|
array: np.ndarray = viz.shm._array
|
||||||
index = array[viz.index_field]
|
index = array[viz.index_field]
|
||||||
start = index[0]
|
start = index[0]
|
||||||
stop = index[-1]
|
stop = index[-1]
|
||||||
|
@ -286,7 +291,7 @@ class ContentsLabels:
|
||||||
):
|
):
|
||||||
# out of range
|
# out of range
|
||||||
print('WTF out of range?')
|
print('WTF out of range?')
|
||||||
continue
|
# continue
|
||||||
|
|
||||||
# call provided update func with data point
|
# call provided update func with data point
|
||||||
try:
|
try:
|
||||||
|
@ -294,6 +299,7 @@ class ContentsLabels:
|
||||||
ix = np.searchsorted(index, x_in)
|
ix = np.searchsorted(index, x_in)
|
||||||
if ix > len(array):
|
if ix > len(array):
|
||||||
breakpoint()
|
breakpoint()
|
||||||
|
|
||||||
update(ix, array)
|
update(ix, array)
|
||||||
|
|
||||||
except IndexError:
|
except IndexError:
|
||||||
|
|
|
@ -56,8 +56,8 @@ _line_styles: dict[str, int] = {
|
||||||
|
|
||||||
class FlowGraphic(pg.GraphicsObject):
|
class FlowGraphic(pg.GraphicsObject):
|
||||||
'''
|
'''
|
||||||
Base class with minimal interface for `QPainterPath` implemented,
|
Base class with minimal interface for `QPainterPath`
|
||||||
real-time updated "data flow" graphics.
|
implemented, real-time updated "data flow" graphics.
|
||||||
|
|
||||||
See subtypes below.
|
See subtypes below.
|
||||||
|
|
||||||
|
@ -167,11 +167,12 @@ class FlowGraphic(pg.GraphicsObject):
|
||||||
return None
|
return None
|
||||||
|
|
||||||
# XXX: due to a variety of weird jitter bugs and "smearing"
|
# XXX: due to a variety of weird jitter bugs and "smearing"
|
||||||
# artifacts when click-drag panning and viewing history time series,
|
# artifacts when click-drag panning and viewing history time
|
||||||
# we offer this ctx-mngr interface to allow temporarily disabling
|
# series, we offer this ctx-mngr interface to allow temporarily
|
||||||
# Qt's graphics caching mode; this is now currently used from
|
# disabling Qt's graphics caching mode; this is now currently
|
||||||
# ``ChartView.start/signal_ic()`` methods which also disable the
|
# used from ``ChartView.start/signal_ic()`` methods which also
|
||||||
# rt-display loop when the user is moving around a view.
|
# disable the rt-display loop when the user is moving around
|
||||||
|
# a view.
|
||||||
@cm
|
@cm
|
||||||
def reset_cache(self) -> None:
|
def reset_cache(self) -> None:
|
||||||
try:
|
try:
|
||||||
|
|
|
@ -49,7 +49,7 @@ from ..data._formatters import (
|
||||||
OHLCBarsAsCurveFmtr, # OHLC converted to line
|
OHLCBarsAsCurveFmtr, # OHLC converted to line
|
||||||
StepCurveFmtr, # "step" curve (like for vlm)
|
StepCurveFmtr, # "step" curve (like for vlm)
|
||||||
)
|
)
|
||||||
from ..data.tsp import (
|
from ..tsp import (
|
||||||
slice_from_time,
|
slice_from_time,
|
||||||
)
|
)
|
||||||
from ._ohlc import (
|
from ._ohlc import (
|
||||||
|
@ -563,7 +563,8 @@ class Viz(Struct):
|
||||||
|
|
||||||
def view_range(self) -> tuple[int, int]:
|
def view_range(self) -> tuple[int, int]:
|
||||||
'''
|
'''
|
||||||
Return the start and stop x-indexes for the managed ``ViewBox``.
|
Return the start and stop x-indexes for the managed
|
||||||
|
``ViewBox``.
|
||||||
|
|
||||||
'''
|
'''
|
||||||
vr = self.plot.viewRect()
|
vr = self.plot.viewRect()
|
||||||
|
|
|
@ -470,54 +470,64 @@ async def graphics_update_loop(
|
||||||
if ds.hist_vars['i_last'] < ds.hist_vars['i_last_append']:
|
if ds.hist_vars['i_last'] < ds.hist_vars['i_last_append']:
|
||||||
await tractor.pause()
|
await tractor.pause()
|
||||||
|
|
||||||
# main real-time quotes update loop
|
try:
|
||||||
stream: tractor.MsgStream
|
from . import _remote_ctl
|
||||||
async with feed.open_multi_stream() as stream:
|
_remote_ctl._dss = dss
|
||||||
assert stream
|
|
||||||
async for quotes in stream:
|
|
||||||
quote_period = time.time() - last_quote_s
|
|
||||||
quote_rate = round(
|
|
||||||
1/quote_period, 1) if quote_period > 0 else float('inf')
|
|
||||||
if (
|
|
||||||
quote_period <= 1/_quote_throttle_rate
|
|
||||||
|
|
||||||
# in the absolute worst case we shouldn't see more then
|
# main real-time quotes update loop
|
||||||
# twice the expected throttle rate right!?
|
stream: tractor.MsgStream
|
||||||
# and quote_rate >= _quote_throttle_rate * 2
|
async with feed.open_multi_stream() as stream:
|
||||||
and quote_rate >= display_rate
|
assert stream
|
||||||
):
|
async for quotes in stream:
|
||||||
pass
|
quote_period = time.time() - last_quote_s
|
||||||
# log.warning(f'High quote rate {mkt.fqme}: {quote_rate}')
|
quote_rate = round(
|
||||||
|
1/quote_period, 1) if quote_period > 0 else float('inf')
|
||||||
last_quote_s = time.time()
|
|
||||||
|
|
||||||
for fqme, quote in quotes.items():
|
|
||||||
ds = dss[fqme]
|
|
||||||
ds.quotes = quote
|
|
||||||
rt_pi, hist_pi = pis[fqme]
|
|
||||||
|
|
||||||
# chart isn't active/shown so skip render cycle and
|
|
||||||
# pause feed(s)
|
|
||||||
if (
|
if (
|
||||||
fast_chart.linked.isHidden()
|
quote_period <= 1/_quote_throttle_rate
|
||||||
or not rt_pi.isVisible()
|
|
||||||
|
# in the absolute worst case we shouldn't see more then
|
||||||
|
# twice the expected throttle rate right!?
|
||||||
|
# and quote_rate >= _quote_throttle_rate * 2
|
||||||
|
and quote_rate >= display_rate
|
||||||
):
|
):
|
||||||
print(f'{fqme} skipping update for HIDDEN CHART')
|
pass
|
||||||
fast_chart.pause_all_feeds()
|
# log.warning(f'High quote rate {mkt.fqme}: {quote_rate}')
|
||||||
continue
|
|
||||||
|
|
||||||
ic = fast_chart.view._in_interact
|
last_quote_s = time.time()
|
||||||
if ic:
|
|
||||||
fast_chart.pause_all_feeds()
|
|
||||||
print(f'{fqme} PAUSING DURING INTERACTION')
|
|
||||||
await ic.wait()
|
|
||||||
fast_chart.resume_all_feeds()
|
|
||||||
|
|
||||||
# sync call to update all graphics/UX components.
|
for fqme, quote in quotes.items():
|
||||||
graphics_update_cycle(
|
ds = dss[fqme]
|
||||||
ds,
|
ds.quotes = quote
|
||||||
quote,
|
rt_pi, hist_pi = pis[fqme]
|
||||||
)
|
|
||||||
|
# chart isn't active/shown so skip render cycle and
|
||||||
|
# pause feed(s)
|
||||||
|
if (
|
||||||
|
fast_chart.linked.isHidden()
|
||||||
|
or not rt_pi.isVisible()
|
||||||
|
):
|
||||||
|
print(f'{fqme} skipping update for HIDDEN CHART')
|
||||||
|
fast_chart.pause_all_feeds()
|
||||||
|
continue
|
||||||
|
|
||||||
|
ic = fast_chart.view._in_interact
|
||||||
|
if ic:
|
||||||
|
fast_chart.pause_all_feeds()
|
||||||
|
print(f'{fqme} PAUSING DURING INTERACTION')
|
||||||
|
await ic.wait()
|
||||||
|
fast_chart.resume_all_feeds()
|
||||||
|
|
||||||
|
# sync call to update all graphics/UX components.
|
||||||
|
graphics_update_cycle(
|
||||||
|
ds,
|
||||||
|
quote,
|
||||||
|
)
|
||||||
|
|
||||||
|
finally:
|
||||||
|
# XXX: cancel any remote annotation control ctxs
|
||||||
|
_remote_ctl._dss = None
|
||||||
|
for ctx in _remote_ctl._ctxs:
|
||||||
|
await ctx.cancel()
|
||||||
|
|
||||||
|
|
||||||
def graphics_update_cycle(
|
def graphics_update_cycle(
|
||||||
|
@ -1235,7 +1245,7 @@ async def display_symbol_data(
|
||||||
fast from a cached watch-list.
|
fast from a cached watch-list.
|
||||||
|
|
||||||
'''
|
'''
|
||||||
sbar = godwidget.window.status_bar
|
# sbar = godwidget.window.status_bar
|
||||||
# historical data fetch
|
# historical data fetch
|
||||||
# brokermod = brokers.get_brokermod(provider)
|
# brokermod = brokers.get_brokermod(provider)
|
||||||
|
|
||||||
|
@ -1245,11 +1255,11 @@ async def display_symbol_data(
|
||||||
# group_key=loading_sym_key,
|
# group_key=loading_sym_key,
|
||||||
# )
|
# )
|
||||||
|
|
||||||
for fqme in fqmes:
|
# for fqme in fqmes:
|
||||||
loading_sym_key = sbar.open_status(
|
# loading_sym_key = sbar.open_status(
|
||||||
f'loading {fqme} ->',
|
# f'loading {fqme} ->',
|
||||||
group_key=True
|
# group_key=True
|
||||||
)
|
# )
|
||||||
|
|
||||||
# (TODO: make this not so shit XD)
|
# (TODO: make this not so shit XD)
|
||||||
# close group status once a symbol feed fully loads to view.
|
# close group status once a symbol feed fully loads to view.
|
||||||
|
@ -1422,7 +1432,7 @@ async def display_symbol_data(
|
||||||
start_fsp_displays,
|
start_fsp_displays,
|
||||||
rt_linked,
|
rt_linked,
|
||||||
flume,
|
flume,
|
||||||
loading_sym_key,
|
# loading_sym_key,
|
||||||
loglevel,
|
loglevel,
|
||||||
)
|
)
|
||||||
|
|
||||||
|
|
|
@ -21,7 +21,8 @@ Higher level annotation editors.
|
||||||
from __future__ import annotations
|
from __future__ import annotations
|
||||||
from collections import defaultdict
|
from collections import defaultdict
|
||||||
from typing import (
|
from typing import (
|
||||||
TYPE_CHECKING
|
Sequence,
|
||||||
|
TYPE_CHECKING,
|
||||||
)
|
)
|
||||||
|
|
||||||
import pyqtgraph as pg
|
import pyqtgraph as pg
|
||||||
|
@ -31,24 +32,37 @@ from pyqtgraph import (
|
||||||
QtCore,
|
QtCore,
|
||||||
QtWidgets,
|
QtWidgets,
|
||||||
)
|
)
|
||||||
|
from PyQt5.QtCore import (
|
||||||
|
QPointF,
|
||||||
|
QRectF,
|
||||||
|
)
|
||||||
from PyQt5.QtGui import (
|
from PyQt5.QtGui import (
|
||||||
QColor,
|
QColor,
|
||||||
|
QTransform,
|
||||||
)
|
)
|
||||||
from PyQt5.QtWidgets import (
|
from PyQt5.QtWidgets import (
|
||||||
|
QGraphicsProxyWidget,
|
||||||
|
QGraphicsScene,
|
||||||
QLabel,
|
QLabel,
|
||||||
)
|
)
|
||||||
|
|
||||||
from pyqtgraph import functions as fn
|
from pyqtgraph import functions as fn
|
||||||
from PyQt5.QtCore import QPointF
|
|
||||||
import numpy as np
|
import numpy as np
|
||||||
|
|
||||||
from piker.types import Struct
|
from piker.types import Struct
|
||||||
from ._style import hcolor, _font
|
from ._style import (
|
||||||
|
hcolor,
|
||||||
|
_font,
|
||||||
|
)
|
||||||
from ._lines import LevelLine
|
from ._lines import LevelLine
|
||||||
from ..log import get_logger
|
from ..log import get_logger
|
||||||
|
|
||||||
if TYPE_CHECKING:
|
if TYPE_CHECKING:
|
||||||
from ._chart import GodWidget
|
from ._chart import (
|
||||||
|
GodWidget,
|
||||||
|
ChartPlotWidget,
|
||||||
|
)
|
||||||
|
from ._interaction import ChartView
|
||||||
|
|
||||||
|
|
||||||
log = get_logger(__name__)
|
log = get_logger(__name__)
|
||||||
|
@ -65,7 +79,7 @@ class ArrowEditor(Struct):
|
||||||
uid: str,
|
uid: str,
|
||||||
x: float,
|
x: float,
|
||||||
y: float,
|
y: float,
|
||||||
color='default',
|
color: str = 'default',
|
||||||
pointing: str | None = None,
|
pointing: str | None = None,
|
||||||
|
|
||||||
) -> pg.ArrowItem:
|
) -> pg.ArrowItem:
|
||||||
|
@ -251,27 +265,56 @@ class LineEditor(Struct):
|
||||||
return lines
|
return lines
|
||||||
|
|
||||||
|
|
||||||
class SelectRect(QtWidgets.QGraphicsRectItem):
|
def as_point(
|
||||||
|
pair: Sequence[float, float] | QPointF,
|
||||||
|
) -> list[QPointF, QPointF]:
|
||||||
|
'''
|
||||||
|
Case any input tuple of floats to a a list of `QPoint` objects
|
||||||
|
for use in Qt geometry routines.
|
||||||
|
|
||||||
|
'''
|
||||||
|
if isinstance(pair, QPointF):
|
||||||
|
return pair
|
||||||
|
|
||||||
|
return QPointF(pair[0], pair[1])
|
||||||
|
|
||||||
|
|
||||||
|
# TODO: maybe implement better, something something RectItemProxy??
|
||||||
|
# -[ ] dig into details of how proxy's work?
|
||||||
|
# https://doc.qt.io/qt-5/qgraphicsscene.html#addWidget
|
||||||
|
# -[ ] consider using `.addRect()` maybe?
|
||||||
|
|
||||||
|
class SelectRect(QtWidgets.QGraphicsRectItem):
|
||||||
|
'''
|
||||||
|
A data-view "selection rectangle": the most fundamental
|
||||||
|
geometry for annotating data views.
|
||||||
|
|
||||||
|
- https://doc.qt.io/qt-5/qgraphicsrectitem.html
|
||||||
|
- https://doc.qt.io/qt-6/qgraphicsrectitem.html
|
||||||
|
|
||||||
|
'''
|
||||||
def __init__(
|
def __init__(
|
||||||
self,
|
self,
|
||||||
viewbox: ViewBox,
|
viewbox: ViewBox,
|
||||||
color: str = 'dad_blue',
|
color: str | None = None,
|
||||||
) -> None:
|
) -> None:
|
||||||
super().__init__(0, 0, 1, 1)
|
super().__init__(0, 0, 1, 1)
|
||||||
|
|
||||||
# self.rbScaleBox = QGraphicsRectItem(0, 0, 1, 1)
|
# self.rbScaleBox = QGraphicsRectItem(0, 0, 1, 1)
|
||||||
self.vb = viewbox
|
self.vb: ViewBox = viewbox
|
||||||
self._chart: 'ChartPlotWidget' = None # noqa
|
|
||||||
|
|
||||||
# override selection box color
|
self._chart: ChartPlotWidget | None = None # noqa
|
||||||
|
|
||||||
|
# TODO: maybe allow this to be dynamic via a method?
|
||||||
|
#l override selection box color
|
||||||
|
color: str = color or 'dad_blue'
|
||||||
color = QColor(hcolor(color))
|
color = QColor(hcolor(color))
|
||||||
|
|
||||||
self.setPen(fn.mkPen(color, width=1))
|
self.setPen(fn.mkPen(color, width=1))
|
||||||
color.setAlpha(66)
|
color.setAlpha(66)
|
||||||
self.setBrush(fn.mkBrush(color))
|
self.setBrush(fn.mkBrush(color))
|
||||||
self.setZValue(1e9)
|
self.setZValue(1e9)
|
||||||
self.hide()
|
self.hide()
|
||||||
self._label = None
|
|
||||||
|
|
||||||
label = self._label = QLabel()
|
label = self._label = QLabel()
|
||||||
label.setTextFormat(0) # markdown
|
label.setTextFormat(0) # markdown
|
||||||
|
@ -281,13 +324,15 @@ class SelectRect(QtWidgets.QGraphicsRectItem):
|
||||||
QtCore.Qt.AlignLeft
|
QtCore.Qt.AlignLeft
|
||||||
# | QtCore.Qt.AlignVCenter
|
# | QtCore.Qt.AlignVCenter
|
||||||
)
|
)
|
||||||
|
label.hide() # always right after init
|
||||||
|
|
||||||
# proxy is created after containing scene is initialized
|
# proxy is created after containing scene is initialized
|
||||||
self._label_proxy = None
|
self._label_proxy: QGraphicsProxyWidget | None = None
|
||||||
self._abs_top_right = None
|
self._abs_top_right: Point | None = None
|
||||||
|
|
||||||
# TODO: "swing %" might be handy here (data's max/min # % change)
|
# TODO: "swing %" might be handy here (data's max/min
|
||||||
self._contents = [
|
# # % change)?
|
||||||
|
self._contents: list[str] = [
|
||||||
'change: {pchng:.2f} %',
|
'change: {pchng:.2f} %',
|
||||||
'range: {rng:.2f}',
|
'range: {rng:.2f}',
|
||||||
'bars: {nbars}',
|
'bars: {nbars}',
|
||||||
|
@ -297,12 +342,30 @@ class SelectRect(QtWidgets.QGraphicsRectItem):
|
||||||
'sigma: {std:.2f}',
|
'sigma: {std:.2f}',
|
||||||
]
|
]
|
||||||
|
|
||||||
|
self.add_to_view(viewbox)
|
||||||
|
|
||||||
|
def add_to_view(
|
||||||
|
self,
|
||||||
|
view: ChartView,
|
||||||
|
) -> None:
|
||||||
|
'''
|
||||||
|
Self-defined view hookup impl which will
|
||||||
|
also re-assign the internal ref.
|
||||||
|
|
||||||
|
'''
|
||||||
|
view.addItem(
|
||||||
|
self,
|
||||||
|
ignoreBounds=True,
|
||||||
|
)
|
||||||
|
if self.vb is not view:
|
||||||
|
self.vb = view
|
||||||
|
|
||||||
@property
|
@property
|
||||||
def chart(self) -> 'ChartPlotWidget': # noqa
|
def chart(self) -> ChartPlotWidget: # noqa
|
||||||
return self._chart
|
return self._chart
|
||||||
|
|
||||||
@chart.setter
|
@chart.setter
|
||||||
def chart(self, chart: 'ChartPlotWidget') -> None: # noqa
|
def chart(self, chart: ChartPlotWidget) -> None: # noqa
|
||||||
self._chart = chart
|
self._chart = chart
|
||||||
chart.sigRangeChanged.connect(self.update_on_resize)
|
chart.sigRangeChanged.connect(self.update_on_resize)
|
||||||
palette = self._label.palette()
|
palette = self._label.palette()
|
||||||
|
@ -315,57 +378,155 @@ class SelectRect(QtWidgets.QGraphicsRectItem):
|
||||||
)
|
)
|
||||||
|
|
||||||
def update_on_resize(self, vr, r):
|
def update_on_resize(self, vr, r):
|
||||||
"""Re-position measure label on view range change.
|
'''
|
||||||
|
Re-position measure label on view range change.
|
||||||
|
|
||||||
"""
|
'''
|
||||||
if self._abs_top_right:
|
if self._abs_top_right:
|
||||||
self._label_proxy.setPos(
|
self._label_proxy.setPos(
|
||||||
self.vb.mapFromView(self._abs_top_right)
|
self.vb.mapFromView(self._abs_top_right)
|
||||||
)
|
)
|
||||||
|
|
||||||
def mouse_drag_released(
|
def set_scen_pos(
|
||||||
self,
|
self,
|
||||||
p1: QPointF,
|
scen_p1: QPointF,
|
||||||
p2: QPointF
|
scen_p2: QPointF,
|
||||||
|
|
||||||
|
update_label: bool = True,
|
||||||
|
|
||||||
) -> None:
|
) -> None:
|
||||||
"""Called on final button release for mouse drag with start and
|
'''
|
||||||
end positions.
|
Set position from scene coords of selection rect (normally
|
||||||
|
from mouse position) and accompanying label, move label to
|
||||||
|
match.
|
||||||
|
|
||||||
"""
|
'''
|
||||||
self.set_pos(p1, p2)
|
# NOTE XXX: apparently just setting it doesn't work!?
|
||||||
|
# i have no idea why but it's pretty weird we have to do
|
||||||
|
# this transform thing which was basically pulled verbatim
|
||||||
|
# from the `pg.ViewBox.updateScaleBox()` method.
|
||||||
|
view_rect: QRectF = self.vb.childGroup.mapRectFromScene(
|
||||||
|
QRectF(
|
||||||
|
scen_p1,
|
||||||
|
scen_p2,
|
||||||
|
)
|
||||||
|
)
|
||||||
|
self.setPos(view_rect.topLeft())
|
||||||
|
# XXX: does not work..!?!?
|
||||||
|
# https://doc.qt.io/qt-5/qgraphicsrectitem.html#setRect
|
||||||
|
# self.setRect(view_rect)
|
||||||
|
|
||||||
def set_pos(
|
tr = QTransform.fromScale(
|
||||||
self,
|
view_rect.width(),
|
||||||
p1: QPointF,
|
view_rect.height(),
|
||||||
p2: QPointF
|
)
|
||||||
) -> None:
|
self.setTransform(tr)
|
||||||
"""Set position of selection rect and accompanying label, move
|
|
||||||
label to match.
|
|
||||||
|
|
||||||
"""
|
# XXX: never got this working, was always offset
|
||||||
if self._label_proxy is None:
|
# / transformed completely wrong (and off to the far right
|
||||||
# https://doc.qt.io/qt-5/qgraphicsproxywidget.html
|
# from the cursor?)
|
||||||
self._label_proxy = self.vb.scene().addWidget(self._label)
|
# self.set_view_pos(
|
||||||
|
# view_rect=view_rect,
|
||||||
start_pos = self.vb.mapToView(p1)
|
# # self.vwqpToView(p1),
|
||||||
end_pos = self.vb.mapToView(p2)
|
# # self.vb.mapToView(p2),
|
||||||
|
# # start_pos=self.vb.mapToScene(p1),
|
||||||
# map to view coords and update area
|
# # end_pos=self.vb.mapToScene(p2),
|
||||||
r = QtCore.QRectF(start_pos, end_pos)
|
# )
|
||||||
|
|
||||||
# old way; don't need right?
|
|
||||||
# lr = QtCore.QRectF(p1, p2)
|
|
||||||
# r = self.vb.childGroup.mapRectFromParent(lr)
|
|
||||||
|
|
||||||
self.setPos(r.topLeft())
|
|
||||||
self.resetTransform()
|
|
||||||
self.setRect(r)
|
|
||||||
self.show()
|
self.show()
|
||||||
|
|
||||||
y1, y2 = start_pos.y(), end_pos.y()
|
if update_label:
|
||||||
x1, x2 = start_pos.x(), end_pos.x()
|
self.init_label(view_rect)
|
||||||
|
|
||||||
# TODO: heh, could probably use a max-min streamin algo here too
|
def set_view_pos(
|
||||||
|
self,
|
||||||
|
|
||||||
|
start_pos: QPointF | Sequence[float, float] | None = None,
|
||||||
|
end_pos: QPointF | Sequence[float, float] | None = None,
|
||||||
|
view_rect: QRectF | None = None,
|
||||||
|
|
||||||
|
update_label: bool = True,
|
||||||
|
|
||||||
|
) -> None:
|
||||||
|
'''
|
||||||
|
Set position from `ViewBox` coords (i.e. from the actual
|
||||||
|
data domain) of rect (and any accompanying label which is
|
||||||
|
moved to match).
|
||||||
|
|
||||||
|
'''
|
||||||
|
if self._chart is None:
|
||||||
|
raise RuntimeError(
|
||||||
|
'You MUST assign a `SelectRect.chart: ChartPlotWidget`!'
|
||||||
|
)
|
||||||
|
|
||||||
|
if view_rect is None:
|
||||||
|
# ensure point casting
|
||||||
|
start_pos: QPointF = as_point(start_pos)
|
||||||
|
end_pos: QPointF = as_point(end_pos)
|
||||||
|
|
||||||
|
# map to view coords and update area
|
||||||
|
view_rect = QtCore.QRectF(
|
||||||
|
start_pos,
|
||||||
|
end_pos,
|
||||||
|
)
|
||||||
|
|
||||||
|
self.setPos(view_rect.topLeft())
|
||||||
|
|
||||||
|
# NOTE: SERIOUSLY NO IDEA WHY THIS WORKS...
|
||||||
|
# but it does and all the other commented stuff above
|
||||||
|
# dint, dawg..
|
||||||
|
|
||||||
|
# self.resetTransform()
|
||||||
|
# self.setRect(view_rect)
|
||||||
|
|
||||||
|
tr = QTransform.fromScale(
|
||||||
|
view_rect.width(),
|
||||||
|
view_rect.height(),
|
||||||
|
)
|
||||||
|
self.setTransform(tr)
|
||||||
|
|
||||||
|
if update_label:
|
||||||
|
self.init_label(view_rect)
|
||||||
|
|
||||||
|
print(
|
||||||
|
'SelectRect modify:\n'
|
||||||
|
f'QRectF: {view_rect}\n'
|
||||||
|
f'start_pos: {start_pos}\n'
|
||||||
|
f'end_pos: {end_pos}\n'
|
||||||
|
)
|
||||||
|
self.show()
|
||||||
|
|
||||||
|
def init_label(
|
||||||
|
self,
|
||||||
|
view_rect: QRectF,
|
||||||
|
) -> QLabel:
|
||||||
|
|
||||||
|
# should be init-ed in `.__init__()`
|
||||||
|
label: QLabel = self._label
|
||||||
|
cv: ChartView = self.vb
|
||||||
|
|
||||||
|
# https://doc.qt.io/qt-5/qgraphicsproxywidget.html
|
||||||
|
if self._label_proxy is None:
|
||||||
|
scen: QGraphicsScene = cv.scene()
|
||||||
|
# NOTE: specifically this is passing a widget
|
||||||
|
# pointer to the scene's `.addWidget()` as per,
|
||||||
|
# https://doc.qt.io/qt-5/qgraphicsproxywidget.html#embedding-a-widget-with-qgraphicsproxywidget
|
||||||
|
self._label_proxy: QGraphicsProxyWidget = scen.addWidget(label)
|
||||||
|
|
||||||
|
# get label startup coords
|
||||||
|
tl: QPointF = view_rect.topLeft()
|
||||||
|
br: QPointF = view_rect.bottomRight()
|
||||||
|
|
||||||
|
x1, y1 = tl.x(), tl.y()
|
||||||
|
x2, y2 = br.x(), br.y()
|
||||||
|
|
||||||
|
# TODO: to remove, previous label corner point unpacking
|
||||||
|
# x1, y1 = start_pos.x(), start_pos.y()
|
||||||
|
# x2, y2 = end_pos.x(), end_pos.y()
|
||||||
|
# y1, y2 = start_pos.y(), end_pos.y()
|
||||||
|
# x1, x2 = start_pos.x(), end_pos.x()
|
||||||
|
|
||||||
|
# TODO: heh, could probably use a max-min streamin algo
|
||||||
|
# here too?
|
||||||
_, xmn = min(y1, y2), min(x1, x2)
|
_, xmn = min(y1, y2), min(x1, x2)
|
||||||
ymx, xmx = max(y1, y2), max(x1, x2)
|
ymx, xmx = max(y1, y2), max(x1, x2)
|
||||||
|
|
||||||
|
@ -375,26 +536,35 @@ class SelectRect(QtWidgets.QGraphicsRectItem):
|
||||||
ixmn, ixmx = round(xmn), round(xmx)
|
ixmn, ixmx = round(xmn), round(xmx)
|
||||||
nbars = ixmx - ixmn + 1
|
nbars = ixmx - ixmn + 1
|
||||||
|
|
||||||
chart = self._chart
|
chart: ChartPlotWidget = self._chart
|
||||||
data = chart.get_viz(chart.name).shm.array[ixmn:ixmx]
|
data: np.ndarray = chart.get_viz(
|
||||||
|
chart.name
|
||||||
|
).shm.array[ixmn:ixmx]
|
||||||
|
|
||||||
if len(data):
|
if len(data):
|
||||||
std = data['close'].std()
|
std: float = data['close'].std()
|
||||||
dmx = data['high'].max()
|
dmx: float = data['high'].max()
|
||||||
dmn = data['low'].min()
|
dmn: float = data['low'].min()
|
||||||
else:
|
else:
|
||||||
dmn = dmx = std = np.nan
|
dmn = dmx = std = np.nan
|
||||||
|
|
||||||
# update label info
|
# update label info
|
||||||
self._label.setText('\n'.join(self._contents).format(
|
label.setText('\n'.join(self._contents).format(
|
||||||
pchng=pchng, rng=rng, nbars=nbars,
|
pchng=pchng,
|
||||||
std=std, dmx=dmx, dmn=dmn,
|
rng=rng,
|
||||||
|
nbars=nbars,
|
||||||
|
std=std,
|
||||||
|
dmx=dmx,
|
||||||
|
dmn=dmn,
|
||||||
))
|
))
|
||||||
|
|
||||||
# print(f'x2, y2: {(x2, y2)}')
|
# print(f'x2, y2: {(x2, y2)}')
|
||||||
# print(f'xmn, ymn: {(xmn, ymx)}')
|
# print(f'xmn, ymn: {(xmn, ymx)}')
|
||||||
|
|
||||||
label_anchor = Point(xmx + 2, ymx)
|
label_anchor = Point(
|
||||||
|
xmx + 2,
|
||||||
|
ymx,
|
||||||
|
)
|
||||||
|
|
||||||
# XXX: in the drag bottom-right -> top-left case we don't
|
# XXX: in the drag bottom-right -> top-left case we don't
|
||||||
# want the label to overlay the box.
|
# want the label to overlay the box.
|
||||||
|
@ -403,13 +573,16 @@ class SelectRect(QtWidgets.QGraphicsRectItem):
|
||||||
# # label_anchor = Point(x2, y2 + self._label.height())
|
# # label_anchor = Point(x2, y2 + self._label.height())
|
||||||
# label_anchor = Point(xmn, ymn)
|
# label_anchor = Point(xmn, ymn)
|
||||||
|
|
||||||
self._abs_top_right = label_anchor
|
self._abs_top_right: Point = label_anchor
|
||||||
self._label_proxy.setPos(self.vb.mapFromView(label_anchor))
|
self._label_proxy.setPos(
|
||||||
# self._label.show()
|
cv.mapFromView(label_anchor)
|
||||||
|
)
|
||||||
|
label.show()
|
||||||
|
|
||||||
def clear(self):
|
def clear(self):
|
||||||
"""Clear the selection box from view.
|
'''
|
||||||
|
Clear the selection box from view.
|
||||||
|
|
||||||
"""
|
'''
|
||||||
self._label.hide()
|
self._label.hide()
|
||||||
self.hide()
|
self.hide()
|
||||||
|
|
|
@ -181,7 +181,10 @@ async def open_fsp_sidepane(
|
||||||
async def open_fsp_actor_cluster(
|
async def open_fsp_actor_cluster(
|
||||||
names: list[str] = ['fsp_0', 'fsp_1'],
|
names: list[str] = ['fsp_0', 'fsp_1'],
|
||||||
|
|
||||||
) -> AsyncGenerator[int, dict[str, tractor.Portal]]:
|
) -> AsyncGenerator[
|
||||||
|
int,
|
||||||
|
dict[str, tractor.Portal]
|
||||||
|
]:
|
||||||
|
|
||||||
from tractor._clustering import open_actor_cluster
|
from tractor._clustering import open_actor_cluster
|
||||||
|
|
||||||
|
@ -557,7 +560,7 @@ class FspAdmin:
|
||||||
conf: dict, # yeah probably dumb..
|
conf: dict, # yeah probably dumb..
|
||||||
loglevel: str = 'error',
|
loglevel: str = 'error',
|
||||||
|
|
||||||
) -> (trio.Event, ChartPlotWidget):
|
) -> trio.Event:
|
||||||
|
|
||||||
flume, started = await self.start_engine_task(
|
flume, started = await self.start_engine_task(
|
||||||
target,
|
target,
|
||||||
|
@ -924,7 +927,7 @@ async def start_fsp_displays(
|
||||||
|
|
||||||
linked: LinkedSplits,
|
linked: LinkedSplits,
|
||||||
flume: Flume,
|
flume: Flume,
|
||||||
group_status_key: str,
|
# group_status_key: str,
|
||||||
loglevel: str,
|
loglevel: str,
|
||||||
|
|
||||||
) -> None:
|
) -> None:
|
||||||
|
@ -971,21 +974,23 @@ async def start_fsp_displays(
|
||||||
flume,
|
flume,
|
||||||
) as admin,
|
) as admin,
|
||||||
):
|
):
|
||||||
statuses = []
|
statuses: list[trio.Event] = []
|
||||||
for target, conf in fsp_conf.items():
|
for target, conf in fsp_conf.items():
|
||||||
started = await admin.open_fsp_chart(
|
started: trio.Event = await admin.open_fsp_chart(
|
||||||
target,
|
target,
|
||||||
conf,
|
conf,
|
||||||
)
|
)
|
||||||
done = linked.window().status_bar.open_status(
|
# done = linked.window().status_bar.open_status(
|
||||||
f'loading fsp, {target}..',
|
# f'loading fsp, {target}..',
|
||||||
group_key=group_status_key,
|
# group_key=group_status_key,
|
||||||
)
|
# )
|
||||||
statuses.append((started, done))
|
# statuses.append((started, done))
|
||||||
|
statuses.append(started)
|
||||||
|
|
||||||
for fsp_loaded, status_cb in statuses:
|
# for fsp_loaded, status_cb in statuses:
|
||||||
|
for fsp_loaded in statuses:
|
||||||
await fsp_loaded.wait()
|
await fsp_loaded.wait()
|
||||||
profiler(f'attached to fsp portal: {target}')
|
profiler(f'attached to fsp portal: {target}')
|
||||||
status_cb()
|
# status_cb()
|
||||||
|
|
||||||
# blocks on nursery until all fsp actors complete
|
# blocks on nursery until all fsp actors complete
|
||||||
|
|
|
@ -30,7 +30,11 @@ from typing import (
|
||||||
)
|
)
|
||||||
|
|
||||||
import pyqtgraph as pg
|
import pyqtgraph as pg
|
||||||
# from pyqtgraph.GraphicsScene import mouseEvents
|
# NOTE XXX: pg is super annoying and re-implements it's own mouse
|
||||||
|
# event subsystem.. we should really look into re-working/writing
|
||||||
|
# this down the road.. Bo
|
||||||
|
from pyqtgraph.GraphicsScene import mouseEvents as mevs
|
||||||
|
# from pyqtgraph.GraphicsScene.mouseEvents import MouseDragEvent
|
||||||
from PyQt5.QtWidgets import QGraphicsSceneMouseEvent as gs_mouse
|
from PyQt5.QtWidgets import QGraphicsSceneMouseEvent as gs_mouse
|
||||||
from PyQt5.QtGui import (
|
from PyQt5.QtGui import (
|
||||||
QWheelEvent,
|
QWheelEvent,
|
||||||
|
@ -466,6 +470,7 @@ class ChartView(ViewBox):
|
||||||
mode_name: str = 'view'
|
mode_name: str = 'view'
|
||||||
def_delta: float = 616 * 6
|
def_delta: float = 616 * 6
|
||||||
def_scale_factor: float = 1.016 ** (def_delta * -1 / 20)
|
def_scale_factor: float = 1.016 ** (def_delta * -1 / 20)
|
||||||
|
# annots: dict[int, GraphicsObject] = {}
|
||||||
|
|
||||||
def __init__(
|
def __init__(
|
||||||
self,
|
self,
|
||||||
|
@ -486,6 +491,7 @@ class ChartView(ViewBox):
|
||||||
# defaultPadding=0.,
|
# defaultPadding=0.,
|
||||||
**kwargs
|
**kwargs
|
||||||
)
|
)
|
||||||
|
|
||||||
# for "known y-range style"
|
# for "known y-range style"
|
||||||
self._static_yrange = static_yrange
|
self._static_yrange = static_yrange
|
||||||
|
|
||||||
|
@ -500,7 +506,11 @@ class ChartView(ViewBox):
|
||||||
|
|
||||||
# add our selection box annotator
|
# add our selection box annotator
|
||||||
self.select_box = SelectRect(self)
|
self.select_box = SelectRect(self)
|
||||||
self.addItem(self.select_box, ignoreBounds=True)
|
# self.select_box.add_to_view(self)
|
||||||
|
# self.addItem(
|
||||||
|
# self.select_box,
|
||||||
|
# ignoreBounds=True,
|
||||||
|
# )
|
||||||
|
|
||||||
self.mode = None
|
self.mode = None
|
||||||
self.order_mode: bool = False
|
self.order_mode: bool = False
|
||||||
|
@ -711,17 +721,18 @@ class ChartView(ViewBox):
|
||||||
|
|
||||||
def mouseDragEvent(
|
def mouseDragEvent(
|
||||||
self,
|
self,
|
||||||
ev,
|
ev: mevs.MouseDragEvent,
|
||||||
axis: int | None = None,
|
axis: int | None = None,
|
||||||
|
|
||||||
) -> None:
|
) -> None:
|
||||||
pos = ev.pos()
|
pos: Point = ev.pos()
|
||||||
lastPos = ev.lastPos()
|
lastPos: Point = ev.lastPos()
|
||||||
dif = pos - lastPos
|
dif: Point = (pos - lastPos) * -1
|
||||||
dif = dif * -1
|
# dif: Point = pos - lastPos
|
||||||
|
# dif: Point = dif * -1
|
||||||
|
|
||||||
# NOTE: if axis is specified, event will only affect that axis.
|
# NOTE: if axis is specified, event will only affect that axis.
|
||||||
button = ev.button()
|
btn = ev.button()
|
||||||
|
|
||||||
# Ignore axes if mouse is disabled
|
# Ignore axes if mouse is disabled
|
||||||
mouseEnabled = np.array(
|
mouseEnabled = np.array(
|
||||||
|
@ -733,7 +744,7 @@ class ChartView(ViewBox):
|
||||||
mask[1-axis] = 0.0
|
mask[1-axis] = 0.0
|
||||||
|
|
||||||
# Scale or translate based on mouse button
|
# Scale or translate based on mouse button
|
||||||
if button & (
|
if btn & (
|
||||||
QtCore.Qt.LeftButton | QtCore.Qt.MidButton
|
QtCore.Qt.LeftButton | QtCore.Qt.MidButton
|
||||||
):
|
):
|
||||||
# zoom y-axis ONLY when click-n-drag on it
|
# zoom y-axis ONLY when click-n-drag on it
|
||||||
|
@ -756,34 +767,55 @@ class ChartView(ViewBox):
|
||||||
# XXX: WHY
|
# XXX: WHY
|
||||||
ev.accept()
|
ev.accept()
|
||||||
|
|
||||||
down_pos = ev.buttonDownPos()
|
down_pos: Point = ev.buttonDownPos(
|
||||||
|
btn=btn,
|
||||||
|
)
|
||||||
|
scen_pos: Point = ev.scenePos()
|
||||||
|
scen_down_pos: Point = ev.buttonDownScenePos(
|
||||||
|
btn=btn,
|
||||||
|
)
|
||||||
|
|
||||||
# This is the final position in the drag
|
# This is the final position in the drag
|
||||||
if ev.isFinish():
|
if ev.isFinish():
|
||||||
|
|
||||||
self.select_box.mouse_drag_released(down_pos, pos)
|
# import pdbp; pdbp.set_trace()
|
||||||
|
|
||||||
ax = QtCore.QRectF(down_pos, pos)
|
# NOTE: think of this as a `.mouse_drag_release()`
|
||||||
ax = self.childGroup.mapRectFromParent(ax)
|
# (bc HINT that's what i called the shit ass
|
||||||
|
# method that wrapped this call [yes, as a single
|
||||||
|
# fucking call] originally.. you bish, guille)
|
||||||
|
# Bo.. oraleeee
|
||||||
|
self.select_box.set_scen_pos(
|
||||||
|
# down_pos,
|
||||||
|
# pos,
|
||||||
|
scen_down_pos,
|
||||||
|
scen_pos,
|
||||||
|
)
|
||||||
|
|
||||||
# this is the zoom transform cmd
|
# this is the zoom transform cmd
|
||||||
self.showAxRect(ax)
|
ax = QtCore.QRectF(down_pos, pos)
|
||||||
|
ax = self.childGroup.mapRectFromParent(ax)
|
||||||
|
# self.showAxRect(ax)
|
||||||
# axis history tracking
|
# axis history tracking
|
||||||
self.axHistoryPointer += 1
|
self.axHistoryPointer += 1
|
||||||
self.axHistory = self.axHistory[
|
self.axHistory = self.axHistory[
|
||||||
:self.axHistoryPointer] + [ax]
|
:self.axHistoryPointer] + [ax]
|
||||||
|
|
||||||
else:
|
else:
|
||||||
print('drag finish?')
|
self.select_box.set_scen_pos(
|
||||||
self.select_box.set_pos(down_pos, pos)
|
# down_pos,
|
||||||
|
# pos,
|
||||||
|
scen_down_pos,
|
||||||
|
scen_pos,
|
||||||
|
)
|
||||||
|
|
||||||
# update shape of scale box
|
# update shape of scale box
|
||||||
# self.updateScaleBox(ev.buttonDownPos(), ev.pos())
|
# self.updateScaleBox(ev.buttonDownPos(), ev.pos())
|
||||||
self.updateScaleBox(
|
# breakpoint()
|
||||||
down_pos,
|
# self.updateScaleBox(
|
||||||
ev.pos(),
|
# down_pos,
|
||||||
)
|
# ev.pos(),
|
||||||
|
# )
|
||||||
|
|
||||||
# PANNING MODE
|
# PANNING MODE
|
||||||
else:
|
else:
|
||||||
|
@ -822,7 +854,7 @@ class ChartView(ViewBox):
|
||||||
# ev.accept()
|
# ev.accept()
|
||||||
|
|
||||||
# WEIRD "RIGHT-CLICK CENTER ZOOM" MODE
|
# WEIRD "RIGHT-CLICK CENTER ZOOM" MODE
|
||||||
elif button & QtCore.Qt.RightButton:
|
elif btn & QtCore.Qt.RightButton:
|
||||||
|
|
||||||
if self.state['aspectLocked'] is not False:
|
if self.state['aspectLocked'] is not False:
|
||||||
mask[0] = 0
|
mask[0] = 0
|
||||||
|
|
|
@ -24,8 +24,6 @@ view transforms.
|
||||||
"""
|
"""
|
||||||
import pyqtgraph as pg
|
import pyqtgraph as pg
|
||||||
|
|
||||||
from ._axes import Axis
|
|
||||||
|
|
||||||
|
|
||||||
def invertQTransform(tr):
|
def invertQTransform(tr):
|
||||||
"""Return a QTransform that is the inverse of *tr*.
|
"""Return a QTransform that is the inverse of *tr*.
|
||||||
|
@ -53,6 +51,9 @@ def _do_overrides() -> None:
|
||||||
pg.functions.invertQTransform = invertQTransform
|
pg.functions.invertQTransform = invertQTransform
|
||||||
pg.PlotItem = PlotItem
|
pg.PlotItem = PlotItem
|
||||||
|
|
||||||
|
from ._axes import Axis
|
||||||
|
pg.Axis = Axis
|
||||||
|
|
||||||
# enable "QPainterPathPrivate for faster arrayToQPath" from
|
# enable "QPainterPathPrivate for faster arrayToQPath" from
|
||||||
# https://github.com/pyqtgraph/pyqtgraph/pull/2324
|
# https://github.com/pyqtgraph/pyqtgraph/pull/2324
|
||||||
pg.setConfigOption('enableExperimental', True)
|
pg.setConfigOption('enableExperimental', True)
|
||||||
|
@ -234,7 +235,7 @@ class PlotItem(pg.PlotItem):
|
||||||
# ``ViewBox`` geometry bug.. where a gap for the
|
# ``ViewBox`` geometry bug.. where a gap for the
|
||||||
# 'bottom' axis is somehow left in?
|
# 'bottom' axis is somehow left in?
|
||||||
# axis = pg.AxisItem(orientation=name, parent=self)
|
# axis = pg.AxisItem(orientation=name, parent=self)
|
||||||
axis = Axis(
|
axis = pg.Axis(
|
||||||
self,
|
self,
|
||||||
orientation=name,
|
orientation=name,
|
||||||
parent=self,
|
parent=self,
|
||||||
|
|
|
@ -0,0 +1,273 @@
|
||||||
|
# piker: trading gear for hackers
|
||||||
|
# Copyright (C) Tyler Goodlet (in stewardship for pikers)
|
||||||
|
|
||||||
|
# This program is free software: you can redistribute it and/or modify
|
||||||
|
# it under the terms of the GNU Affero General Public License as published by
|
||||||
|
# the Free Software Foundation, either version 3 of the License, or
|
||||||
|
# (at your option) any later version.
|
||||||
|
|
||||||
|
# This program is distributed in the hope that it will be useful,
|
||||||
|
# but WITHOUT ANY WARRANTY; without even the implied warranty of
|
||||||
|
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
|
||||||
|
# GNU Affero General Public License for more details.
|
||||||
|
|
||||||
|
# You should have received a copy of the GNU Affero General Public License
|
||||||
|
# along with this program. If not, see <https://www.gnu.org/licenses/>.
|
||||||
|
|
||||||
|
'''
|
||||||
|
Remote control tasks for sending annotations (and maybe more cmds)
|
||||||
|
to a chart from some other actor.
|
||||||
|
|
||||||
|
'''
|
||||||
|
from __future__ import annotations
|
||||||
|
from contextlib import asynccontextmanager as acm
|
||||||
|
from pprint import pformat
|
||||||
|
from typing import (
|
||||||
|
Any,
|
||||||
|
AsyncContextManager,
|
||||||
|
)
|
||||||
|
|
||||||
|
import tractor
|
||||||
|
from tractor import trionics
|
||||||
|
from tractor import (
|
||||||
|
Portal,
|
||||||
|
Context,
|
||||||
|
MsgStream,
|
||||||
|
)
|
||||||
|
from PyQt5.QtWidgets import (
|
||||||
|
QGraphicsItem,
|
||||||
|
)
|
||||||
|
|
||||||
|
from piker.log import get_logger
|
||||||
|
from piker.types import Struct
|
||||||
|
from piker.service import find_service
|
||||||
|
from ._display import DisplayState
|
||||||
|
from ._interaction import ChartView
|
||||||
|
from ._editors import SelectRect
|
||||||
|
from ._chart import ChartPlotWidget
|
||||||
|
|
||||||
|
|
||||||
|
log = get_logger(__name__)
|
||||||
|
|
||||||
|
# NOTE: this is set by the `._display.graphics_update_loop()` once
|
||||||
|
# all chart widgets / Viz per flume have been initialized allowing
|
||||||
|
# for remote annotation (control) of any chart-actor's mkt feed by
|
||||||
|
# fqme lookup Bo
|
||||||
|
_dss: dict[str, DisplayState] | None = None
|
||||||
|
|
||||||
|
# stash each and every client connection so that they can all
|
||||||
|
# be cancelled on shutdown/error.
|
||||||
|
# TODO: make `tractor.Context` hashable via is `.cid: str`?
|
||||||
|
# _ctxs: set[Context] = set()
|
||||||
|
_ctxs: list[Context] = []
|
||||||
|
|
||||||
|
# global map of all uniquely created annotation-graphics
|
||||||
|
# so that they can be mutated (eventually) by a client.
|
||||||
|
_annots: dict[int, QGraphicsItem] = {}
|
||||||
|
|
||||||
|
|
||||||
|
@tractor.context
|
||||||
|
async def remote_annotate(
|
||||||
|
ctx: Context,
|
||||||
|
) -> None:
|
||||||
|
|
||||||
|
global _dss, _ctxs
|
||||||
|
assert _dss
|
||||||
|
|
||||||
|
_ctxs.append(ctx)
|
||||||
|
|
||||||
|
# send back full fqme symbology to caller
|
||||||
|
await ctx.started(list(_dss))
|
||||||
|
|
||||||
|
async with ctx.open_stream() as annot_req_stream:
|
||||||
|
async for msg in annot_req_stream:
|
||||||
|
match msg:
|
||||||
|
case {
|
||||||
|
'annot': 'SelectRect',
|
||||||
|
'fqme': fqme,
|
||||||
|
'timeframe': timeframe,
|
||||||
|
'meth': str(meth),
|
||||||
|
'kwargs': dict(kwargs),
|
||||||
|
}:
|
||||||
|
|
||||||
|
ds: DisplayState = _dss[fqme]
|
||||||
|
chart: ChartPlotWidget = {
|
||||||
|
60: ds.hist_chart,
|
||||||
|
1: ds.chart,
|
||||||
|
}[timeframe]
|
||||||
|
cv: ChartView = chart.cv
|
||||||
|
|
||||||
|
# sanity
|
||||||
|
if timeframe == 60:
|
||||||
|
assert (
|
||||||
|
chart.linked.godwidget.hist_linked.chart.view
|
||||||
|
is
|
||||||
|
cv
|
||||||
|
)
|
||||||
|
|
||||||
|
# annot type lookup from cmd
|
||||||
|
rect = SelectRect(
|
||||||
|
viewbox=cv,
|
||||||
|
|
||||||
|
# TODO: make this more dynamic?
|
||||||
|
# -[ ] pull from conf.toml?
|
||||||
|
# -[ ] add `.set_color()` method to type?
|
||||||
|
# -[ ] make a green/red based on direction
|
||||||
|
# instead of default static color?
|
||||||
|
color=kwargs.pop('color', None),
|
||||||
|
)
|
||||||
|
# XXX NOTE: this is REQUIRED to set the rect
|
||||||
|
# resize callback!
|
||||||
|
rect.chart: ChartPlotWidget = chart
|
||||||
|
|
||||||
|
# delegate generically to the requested method
|
||||||
|
getattr(rect, meth)(**kwargs)
|
||||||
|
rect.show()
|
||||||
|
await annot_req_stream.send(id(rect))
|
||||||
|
|
||||||
|
case _:
|
||||||
|
log.error(
|
||||||
|
'Unknown remote annotation cmd:\n'
|
||||||
|
f'{pformat(msg)}'
|
||||||
|
)
|
||||||
|
|
||||||
|
|
||||||
|
class AnnotCtl(Struct):
|
||||||
|
'''
|
||||||
|
A control for remote "data annotations".
|
||||||
|
|
||||||
|
You know those "squares they always show in machine vision
|
||||||
|
UIs.." this API allows you to remotely control stuff like that
|
||||||
|
in some other graphics actor.
|
||||||
|
|
||||||
|
'''
|
||||||
|
ctx2fqmes: dict[str, str]
|
||||||
|
fqme2stream: dict[str, MsgStream]
|
||||||
|
|
||||||
|
async def add_rect(
|
||||||
|
self,
|
||||||
|
fqme: str,
|
||||||
|
timeframe: float,
|
||||||
|
start_pos: tuple[float, float],
|
||||||
|
end_pos: tuple[float, float],
|
||||||
|
|
||||||
|
# TODO: a `Literal['view', 'scene']` for this?
|
||||||
|
domain: str = 'view', # or 'scene'
|
||||||
|
color: str = 'dad_blue',
|
||||||
|
|
||||||
|
) -> int:
|
||||||
|
'''
|
||||||
|
Add a `SelectRect` annotation to the target view, return
|
||||||
|
the instances `id(obj)` from the remote UI actor.
|
||||||
|
|
||||||
|
'''
|
||||||
|
ipc: MsgStream = self.fqme2stream[fqme]
|
||||||
|
await ipc.send({
|
||||||
|
'fqme': fqme,
|
||||||
|
'annot': 'SelectRect',
|
||||||
|
'timeframe': timeframe,
|
||||||
|
# 'meth': str(meth),
|
||||||
|
'meth': 'set_view_pos' if domain == 'view' else 'set_scene_pos',
|
||||||
|
'kwargs': {
|
||||||
|
'start_pos': tuple(start_pos),
|
||||||
|
'end_pos': tuple(end_pos),
|
||||||
|
'color': color,
|
||||||
|
'update_label': False,
|
||||||
|
},
|
||||||
|
})
|
||||||
|
return (await ipc.receive())
|
||||||
|
|
||||||
|
async def modify(
|
||||||
|
self,
|
||||||
|
aid: int, # annotation id
|
||||||
|
meth: str, # far end graphics object method to invoke
|
||||||
|
params: dict[str, Any], # far end `meth(**kwargs)`
|
||||||
|
) -> bool:
|
||||||
|
'''
|
||||||
|
Modify an existing (remote) annotation's graphics
|
||||||
|
paramters, thus changing it's appearance / state in real
|
||||||
|
time.
|
||||||
|
|
||||||
|
'''
|
||||||
|
raise NotImplementedError
|
||||||
|
|
||||||
|
async def remove(
|
||||||
|
self,
|
||||||
|
uid: int,
|
||||||
|
) -> bool:
|
||||||
|
'''
|
||||||
|
Remove an existing annotation by instance id.
|
||||||
|
|
||||||
|
'''
|
||||||
|
raise NotImplementedError
|
||||||
|
|
||||||
|
|
||||||
|
|
||||||
|
@acm
|
||||||
|
async def open_annot_ctl(
|
||||||
|
uid: tuple[str, str] | None = None,
|
||||||
|
|
||||||
|
) -> AnnotCtl:
|
||||||
|
# TODO: load connetion to a specific chart actor
|
||||||
|
# -[ ] pull from either service scan or config
|
||||||
|
# -[ ] return some kinda client/proxy thinger?
|
||||||
|
# -[ ] maybe we should finally just provide this as
|
||||||
|
# a `tractor.hilevel.CallableProxy` or wtv?
|
||||||
|
# -[ ] use this from the storage.cli stuff to mark up gaps!
|
||||||
|
|
||||||
|
maybe_portals: list[Portal] | None
|
||||||
|
fqmes: list[str]
|
||||||
|
async with find_service(
|
||||||
|
service_name='chart',
|
||||||
|
first_only=False,
|
||||||
|
) as maybe_portals:
|
||||||
|
|
||||||
|
ctx_mngrs: list[AsyncContextManager] = []
|
||||||
|
|
||||||
|
# TODO: print the current discoverable actor UID set
|
||||||
|
# here as well?
|
||||||
|
if not maybe_portals:
|
||||||
|
raise RuntimeError('No chart UI actors found in service domain?')
|
||||||
|
|
||||||
|
for portal in maybe_portals:
|
||||||
|
ctx_mngrs.append(
|
||||||
|
portal.open_context(remote_annotate)
|
||||||
|
)
|
||||||
|
|
||||||
|
ctx2fqmes: dict[str, set[str]] = {}
|
||||||
|
fqme2stream: dict[str, MsgStream] = {}
|
||||||
|
client = AnnotCtl(
|
||||||
|
ctx2fqmes=ctx2fqmes,
|
||||||
|
fqme2stream=fqme2stream,
|
||||||
|
)
|
||||||
|
|
||||||
|
stream_ctxs: list[AsyncContextManager] = []
|
||||||
|
async with trionics.gather_contexts(ctx_mngrs) as ctxs:
|
||||||
|
for (ctx, fqmes) in ctxs:
|
||||||
|
stream_ctxs.append(ctx.open_stream())
|
||||||
|
|
||||||
|
# fill lookup table of mkt addrs to IPC ctxs
|
||||||
|
for fqme in fqmes:
|
||||||
|
if other := fqme2stream.get(fqme):
|
||||||
|
raise ValueError(
|
||||||
|
f'More then one chart displays {fqme}!?\n'
|
||||||
|
'Other UI actor info:\n'
|
||||||
|
f'channel: {other._ctx.chan}]\n'
|
||||||
|
f'actor uid: {other._ctx.chan.uid}]\n'
|
||||||
|
f'ctx id: {other._ctx.cid}]\n'
|
||||||
|
)
|
||||||
|
|
||||||
|
ctx2fqmes.setdefault(
|
||||||
|
ctx.cid,
|
||||||
|
set(),
|
||||||
|
).add(fqme)
|
||||||
|
|
||||||
|
async with trionics.gather_contexts(stream_ctxs) as streams:
|
||||||
|
for stream in streams:
|
||||||
|
fqmes: set[str] = ctx2fqmes[stream._ctx.cid]
|
||||||
|
for fqme in fqmes:
|
||||||
|
fqme2stream[fqme] = stream
|
||||||
|
|
||||||
|
yield client
|
||||||
|
# TODO: on graceful teardown should we try to
|
||||||
|
# remove all annots that were created/modded?
|
|
@ -228,5 +228,10 @@ def chart(
|
||||||
'loglevel': tractorloglevel,
|
'loglevel': tractorloglevel,
|
||||||
'name': 'chart',
|
'name': 'chart',
|
||||||
'registry_addrs': list(set(regaddrs)),
|
'registry_addrs': list(set(regaddrs)),
|
||||||
|
'enable_modules': [
|
||||||
|
|
||||||
|
# remote data-view annotations Bo
|
||||||
|
'piker.ui._remote_ctl',
|
||||||
|
],
|
||||||
},
|
},
|
||||||
)
|
)
|
||||||
|
|
|
@ -31,7 +31,7 @@ import pendulum
|
||||||
import pyqtgraph as pg
|
import pyqtgraph as pg
|
||||||
|
|
||||||
from piker.types import Struct
|
from piker.types import Struct
|
||||||
from ..data.tsp import slice_from_time
|
from ..tsp import slice_from_time
|
||||||
from ..log import get_logger
|
from ..log import get_logger
|
||||||
from ..toolz import Profiler
|
from ..toolz import Profiler
|
||||||
|
|
||||||
|
|
Loading…
Reference in New Issue