Compare commits

..

No commits in common. "a681b2f0bbc7fe03a00b0fb931527b4c901f5140" and "1f9a4976378f863193572a0b64a5dba5e4aaf672" have entirely different histories.

20 changed files with 390 additions and 1113 deletions

View File

@ -843,7 +843,6 @@ class Client:
self, self,
contract: Contract, contract: Contract,
timeout: float = 1, timeout: float = 1,
tries: int = 100,
raise_on_timeout: bool = False, raise_on_timeout: bool = False,
) -> Ticker | None: ) -> Ticker | None:
@ -858,30 +857,30 @@ class Client:
ready: ticker.TickerUpdateEvent = ticker.updateEvent ready: ticker.TickerUpdateEvent = ticker.updateEvent
# ensure a last price gets filled in before we deliver quote # ensure a last price gets filled in before we deliver quote
timeouterr: Exception | None = None
warnset: bool = False warnset: bool = False
for _ in range(tries): for _ in range(100):
# wait for a first update(Event) indicatingn a
# live quote feed.
if isnan(ticker.last): if isnan(ticker.last):
# wait for a first update(Event)
try: try:
tkr = await asyncio.wait_for( tkr = await asyncio.wait_for(
ready, ready,
timeout=timeout, timeout=timeout,
) )
except TimeoutError:
import pdbp
pdbp.set_trace()
if raise_on_timeout:
raise
return None
if tkr: if tkr:
break break
except TimeoutError as err:
timeouterr = err
await asyncio.sleep(0.01)
continue
else: else:
if not warnset: if not warnset:
log.warning( log.warning(
f'Quote req timed out..maybe venue is closed?\n' f'Quote for {contract} timed out: market is closed?'
f'{asdict(contract)}'
) )
warnset = True warnset = True
@ -889,11 +888,6 @@ class Client:
log.info(f'Got first quote for {contract}') log.info(f'Got first quote for {contract}')
break break
else: else:
if timeouterr and raise_on_timeout:
import pdbp
pdbp.set_trace()
raise timeouterr
if not warnset: if not warnset:
log.warning( log.warning(
f'Contract {contract} is not returning a quote ' f'Contract {contract} is not returning a quote '
@ -901,8 +895,6 @@ class Client:
) )
warnset = True warnset = True
return None
return ticker return ticker
# async to be consistent for the client proxy, and cuz why not. # async to be consistent for the client proxy, and cuz why not.

View File

@ -943,11 +943,6 @@ async def stream_quotes(
contract=con, contract=con,
raise_on_timeout=True, raise_on_timeout=True,
) )
first_quote: dict = normalize(first_ticker)
log.info(
'Rxed init quote:\n'
f'{pformat(first_quote)}'
)
cs: trio.CancelScope | None = None cs: trio.CancelScope | None = None
startup: bool = True startup: bool = True
while ( while (

View File

@ -134,19 +134,14 @@ def get_app_dir(
_click_config_dir: Path = Path(get_app_dir('piker')) _click_config_dir: Path = Path(get_app_dir('piker'))
_config_dir: Path = _click_config_dir _config_dir: Path = _click_config_dir
_parent_user: str = os.environ.get('SUDO_USER')
# NOTE: when using `sudo` we attempt to determine the non-root user if _parent_user:
# and still use their normal config dir.
if (
(_parent_user := os.environ.get('SUDO_USER'))
and
_parent_user != 'root'
):
non_root_user_dir = Path( non_root_user_dir = Path(
os.path.expanduser(f'~{_parent_user}') os.path.expanduser(f'~{_parent_user}')
) )
root: str = 'root' root: str = 'root'
_ccds: str = str(_click_config_dir) # click config dir as string _ccds: str = str(_click_config_dir) # click config dir string
i_tail: int = int(_ccds.rfind(root) + len(root)) i_tail: int = int(_ccds.rfind(root) + len(root))
_config_dir = ( _config_dir = (
non_root_user_dir non_root_user_dir

View File

@ -45,7 +45,10 @@ import trio
from trio.abc import ReceiveChannel from trio.abc import ReceiveChannel
from trio_typing import TaskStatus from trio_typing import TaskStatus
import tractor import tractor
from tractor import trionics from tractor.trionics import (
maybe_open_context,
gather_contexts,
)
from piker.accounting import ( from piker.accounting import (
MktPair, MktPair,
@ -66,7 +69,7 @@ from .validate import (
FeedInit, FeedInit,
validate_backend, validate_backend,
) )
from ..tsp import ( from .history import (
manage_history, manage_history,
) )
from .ingest import get_ingestormod from .ingest import get_ingestormod
@ -121,8 +124,6 @@ class _FeedsBus(Struct):
trio.CancelScope] = trio.TASK_STATUS_IGNORED, trio.CancelScope] = trio.TASK_STATUS_IGNORED,
) -> None: ) -> None:
with trio.CancelScope() as cs: with trio.CancelScope() as cs:
# TODO: shouldn't this be a direct await to avoid
# cancellation contagion to the bus nursery!?!?!
await self.nursery.start( await self.nursery.start(
target, target,
*args, *args,
@ -329,6 +330,7 @@ async def allocate_persistent_feed(
) = await bus.nursery.start( ) = await bus.nursery.start(
manage_history, manage_history,
mod, mod,
bus,
mkt, mkt,
some_data_ready, some_data_ready,
feed_is_live, feed_is_live,
@ -405,12 +407,6 @@ async def allocate_persistent_feed(
rt_shm.array['time'][1] = ts + 1 rt_shm.array['time'][1] = ts + 1
elif hist_shm.array.size == 0: elif hist_shm.array.size == 0:
for i in range(100):
await trio.sleep(0.1)
if hist_shm.array.size > 0:
break
else:
await tractor.pause()
raise RuntimeError(f'History (1m) Shm for {fqme} is empty!?') raise RuntimeError(f'History (1m) Shm for {fqme} is empty!?')
# wait the spawning parent task to register its subscriber # wait the spawning parent task to register its subscriber
@ -457,12 +453,8 @@ async def open_feed_bus(
if loglevel is None: if loglevel is None:
loglevel = tractor.current_actor().loglevel loglevel = tractor.current_actor().loglevel
# XXX: required to propagate ``tractor`` loglevel to piker # XXX: required to propagate ``tractor`` loglevel to piker logging
# logging get_console_log(loglevel or tractor.current_actor().loglevel)
get_console_log(
loglevel
or tractor.current_actor().loglevel
)
# local state sanity checks # local state sanity checks
# TODO: check for any stale shm entries for this symbol # TODO: check for any stale shm entries for this symbol
@ -472,7 +464,7 @@ async def open_feed_bus(
assert 'brokerd' in servicename assert 'brokerd' in servicename
assert brokername in servicename assert brokername in servicename
bus: _FeedsBus = get_feed_bus(brokername) bus = get_feed_bus(brokername)
sub_registered = trio.Event() sub_registered = trio.Event()
flumes: dict[str, Flume] = {} flumes: dict[str, Flume] = {}
@ -776,7 +768,7 @@ async def maybe_open_feed(
''' '''
fqme = fqmes[0] fqme = fqmes[0]
async with trionics.maybe_open_context( async with maybe_open_context(
acm_func=open_feed, acm_func=open_feed,
kwargs={ kwargs={
'fqmes': fqmes, 'fqmes': fqmes,
@ -796,7 +788,7 @@ async def maybe_open_feed(
# add a new broadcast subscription for the quote stream # add a new broadcast subscription for the quote stream
# if this feed is likely already in use # if this feed is likely already in use
async with trionics.gather_contexts( async with gather_contexts(
mngrs=[stream.subscribe() for stream in feed.streams.values()] mngrs=[stream.subscribe() for stream in feed.streams.values()]
) as bstreams: ) as bstreams:
for bstream, flume in zip(bstreams, feed.flumes.values()): for bstream, flume in zip(bstreams, feed.flumes.values()):
@ -856,7 +848,7 @@ async def open_feed(
) )
portals: tuple[tractor.Portal] portals: tuple[tractor.Portal]
async with trionics.gather_contexts( async with gather_contexts(
brokerd_ctxs, brokerd_ctxs,
) as portals: ) as portals:
@ -908,7 +900,7 @@ async def open_feed(
assert len(feed.mods) == len(feed.portals) assert len(feed.mods) == len(feed.portals)
async with ( async with (
trionics.gather_contexts(bus_ctxs) as ctxs, gather_contexts(bus_ctxs) as ctxs,
): ):
stream_ctxs: list[tractor.MsgStream] = [] stream_ctxs: list[tractor.MsgStream] = []
for ( for (
@ -950,7 +942,7 @@ async def open_feed(
brokermod: ModuleType brokermod: ModuleType
fqmes: list[str] fqmes: list[str]
async with ( async with (
trionics.gather_contexts(stream_ctxs) as streams, gather_contexts(stream_ctxs) as streams,
): ):
for ( for (
stream, stream,
@ -966,12 +958,6 @@ async def open_feed(
if brokermod.name == flume.mkt.broker: if brokermod.name == flume.mkt.broker:
flume.stream = stream flume.stream = stream
assert ( assert len(feed.mods) == len(feed.portals) == len(feed.streams)
len(feed.mods)
==
len(feed.portals)
==
len(feed.streams)
)
yield feed yield feed

View File

@ -32,7 +32,6 @@ from __future__ import annotations
from datetime import datetime from datetime import datetime
from functools import partial from functools import partial
from pathlib import Path from pathlib import Path
from pprint import pformat
from types import ModuleType from types import ModuleType
from typing import ( from typing import (
Callable, Callable,
@ -54,64 +53,25 @@ import polars as pl
from ..accounting import ( from ..accounting import (
MktPair, MktPair,
) )
from ..data._util import ( from ._util import (
log, log,
) )
from ..data._sharedmem import ( from ._sharedmem import (
maybe_open_shm_array, maybe_open_shm_array,
ShmArray, ShmArray,
) )
from ..data._source import def_iohlcv_fields from ._source import def_iohlcv_fields
from ..data._sampling import ( from ._sampling import (
open_sample_stream, open_sample_stream,
) )
from ._anal import ( from .tsp import (
dedupe,
get_null_segs, get_null_segs,
iter_null_segs, iter_null_segs,
Frame,
Seq,
# codec-ish
np2pl,
pl2np,
# `numpy` only
slice_from_time,
# `polars` specific
dedupe,
with_dts,
detect_time_gaps,
sort_diff, sort_diff,
Frame,
# TODO: # Seq,
detect_price_gaps
) )
__all__: list[str] = [
'dedupe',
'get_null_segs',
'iter_null_segs',
'sort_diff',
'slice_from_time',
'Frame',
'Seq',
'np2pl',
'pl2np',
'slice_from_time',
'with_dts',
'detect_time_gaps',
'sort_diff',
# TODO:
'detect_price_gaps'
]
# TODO: break up all this shite into submods!
from ..brokers._util import ( from ..brokers._util import (
DataUnavailable, DataUnavailable,
) )
@ -269,7 +229,6 @@ async def maybe_fill_null_segments(
# - remember that in the display side, only refersh this # - remember that in the display side, only refersh this
# if the respective history is actually "in view". # if the respective history is actually "in view".
# loop # loop
try:
await sampler_stream.send({ await sampler_stream.send({
'broadcast_all': { 'broadcast_all': {
@ -280,9 +239,6 @@ async def maybe_fill_null_segments(
'backfilling': (mkt.fqme, timeframe), 'backfilling': (mkt.fqme, timeframe),
}, },
}) })
except tractor.ContextCancelled:
# log.exception
await tractor.pause()
null_segs_detected.set() null_segs_detected.set()
# RECHECK for more null-gaps # RECHECK for more null-gaps
@ -296,54 +252,21 @@ async def maybe_fill_null_segments(
and and
len(null_segs[-1]) len(null_segs[-1])
): ):
( await tractor.pause()
iabs_slices,
iabs_zero_rows,
zero_t,
) = null_segs
log.warning(
f'{len(iabs_slices)} NULL TIME SEGMENTS DETECTED!\n'
f'{pformat(iabs_slices)}'
)
# TODO: always backfill gaps with the earliest (price) datum's array = shm.array
zeros = array[array['low'] == 0]
# always backfill gaps with the earliest (price) datum's
# value to avoid the y-ranger including zeros and completely # value to avoid the y-ranger including zeros and completely
# stretching the y-axis.. # stretching the y-axis..
# array: np.ndarray = shm.array if 0 < zeros.size:
# zeros = array[array['low'] == 0] zeros[[
ohlc_fields: list[str] = [
'open', 'open',
'high', 'high',
'low', 'low',
'close', 'close',
] ]] = shm._array[zeros['index'][0] - 1]['close']
for istart, istop in iabs_slices:
# get view into buffer for null-segment
gap: np.ndarray = shm._array[istart:istop]
# copy the oldest OHLC samples forward
gap[ohlc_fields] = shm._array[istart]['close']
start_t: float = shm._array[istart]['time']
t_diff: float = (istop - istart)*timeframe
gap['time'] = np.arange(
start=start_t,
stop=start_t + t_diff,
step=timeframe,
)
await sampler_stream.send({
'broadcast_all': {
# XXX NOTE XXX: see the
# `.ui._display.increment_history_view()` if block
# that looks for this info to FORCE a hard viz
# redraw!
'backfilling': (mkt.fqme, timeframe),
},
})
# TODO: interatively step through any remaining # TODO: interatively step through any remaining
# time-gaps/null-segments and spawn piecewise backfiller # time-gaps/null-segments and spawn piecewise backfiller
@ -354,10 +277,14 @@ async def maybe_fill_null_segments(
# parallel possible no matter the backend? # parallel possible no matter the backend?
# -[ ] fill algo: do queries in alternating "latest, then # -[ ] fill algo: do queries in alternating "latest, then
# earliest, then latest.. etc?" # earliest, then latest.. etc?"
# await tractor.pause() # if (
# next_end_dt not in frame[
# ):
# pass
async def start_backfill( async def start_backfill(
tn: trio.Nursery,
get_hist, get_hist,
mod: ModuleType, mod: ModuleType,
mkt: MktPair, mkt: MktPair,
@ -411,6 +338,7 @@ async def start_backfill(
# settings above when the tsdb is detected as being empty. # settings above when the tsdb is detected as being empty.
backfill_until_dt = backfill_from_dt.subtract(**period_duration) backfill_until_dt = backfill_from_dt.subtract(**period_duration)
# STAGE NOTE: "backward history gap filling": # STAGE NOTE: "backward history gap filling":
# - we push to the shm buffer until we have history back # - we push to the shm buffer until we have history back
# until the latest entry loaded from the tsdb's table B) # until the latest entry loaded from the tsdb's table B)
@ -754,8 +682,6 @@ async def back_load_from_tsdb(
async def push_latest_frame( async def push_latest_frame(
# box-type only that should get packed with the datetime
# objects received for the latest history frame
dt_eps: list[DateTime, DateTime], dt_eps: list[DateTime, DateTime],
shm: ShmArray, shm: ShmArray,
get_hist: Callable[ get_hist: Callable[
@ -765,11 +691,8 @@ async def push_latest_frame(
timeframe: float, timeframe: float,
config: dict, config: dict,
task_status: TaskStatus[ task_status: TaskStatus[None] = trio.TASK_STATUS_IGNORED,
Exception | list[datetime, datetime] ):
] = trio.TASK_STATUS_IGNORED,
) -> list[datetime, datetime] | None:
# get latest query's worth of history all the way # get latest query's worth of history all the way
# back to what is recorded in the tsdb # back to what is recorded in the tsdb
try: try:
@ -786,19 +709,17 @@ async def push_latest_frame(
mr_start_dt, mr_start_dt,
mr_end_dt, mr_end_dt,
]) ])
task_status.started(dt_eps)
# XXX: timeframe not supported for backend (since # XXX: timeframe not supported for backend (since
# above exception type), terminate immediately since # above exception type), terminate immediately since
# there's no backfilling possible. # there's no backfilling possible.
except DataUnavailable: except DataUnavailable:
task_status.started(None) task_status.started()
if timeframe > 1: if timeframe > 1:
await tractor.pause() await tractor.pause()
# prolly tf not supported return
return None
# NOTE: on the first history, most recent history # NOTE: on the first history, most recent history
# frame we PREPEND from the current shm ._last index # frame we PREPEND from the current shm ._last index
@ -810,16 +731,11 @@ async def push_latest_frame(
prepend=True, # append on first frame prepend=True, # append on first frame
) )
return dt_eps
async def load_tsdb_hist( async def load_tsdb_hist(
storage: StorageClient, storage: StorageClient,
mkt: MktPair, mkt: MktPair,
timeframe: float, timeframe: float,
task_status: TaskStatus[None] = trio.TASK_STATUS_IGNORED,
) -> tuple[ ) -> tuple[
np.ndarray, np.ndarray,
DateTime, DateTime,
@ -905,12 +821,6 @@ async def tsdb_backfill(
config, config,
) )
tsdb_entry: tuple = await load_tsdb_hist(
storage,
mkt,
timeframe,
)
# tell parent task to continue # tell parent task to continue
# TODO: really we'd want this the other way with the # TODO: really we'd want this the other way with the
# tsdb load happening asap and the since the latest # tsdb load happening asap and the since the latest
@ -918,36 +828,28 @@ async def tsdb_backfill(
# latency? # latency?
task_status.started() task_status.started()
tsdb_entry: tuple = await load_tsdb_hist(
storage,
mkt,
timeframe,
)
# NOTE: iabs to start backfilling from, reverse chronological, # NOTE: iabs to start backfilling from, reverse chronological,
# ONLY AFTER the first history frame has been pushed to # ONLY AFTER the first history frame has been pushed to
# mem! # mem!
backfill_gap_from_shm_index: int = shm._first.value + 1 backfill_gap_from_shm_index: int = shm._first.value + 1
# Prepend any tsdb history into the rt-shm-buffer which
# should NOW be getting filled with the most recent history
# pulled from the data-backend.
if dt_eps:
# well then, unpack the latest (gap) backfilled frame dts
( (
mr_start_dt, mr_start_dt,
mr_end_dt, mr_end_dt,
) = dt_eps ) = dt_eps
# NOTE: when there's no offline data, there's 2 cases: async with trio.open_nursery() as tn:
# - data backend doesn't support timeframe/sample
# period (in which case `dt_eps` should be `None` and
# we shouldn't be here!), or
# - no prior history has been stored (yet) and we need
# todo full backfill of the history now.
if tsdb_entry is None:
# indicate to backfill task to fill the whole
# shm buffer as much as it can!
last_tsdb_dt = None
# there's existing tsdb history from (offline) storage # Prepend any tsdb history to the shm buffer which should
# so only backfill the gap between the # now be full of the most recent history pulled from the
# most-recent-frame (mrf) and that latest sample. # backend's last frame.
else: if tsdb_entry:
( (
tsdb_history, tsdb_history,
first_tsdb_dt, first_tsdb_dt,
@ -957,11 +859,10 @@ async def tsdb_backfill(
# if there is a gap to backfill from the first # if there is a gap to backfill from the first
# history frame until the last datum loaded from the tsdb # history frame until the last datum loaded from the tsdb
# continue that now in the background # continue that now in the background
async with trio.open_nursery() as tn:
bf_done = await tn.start( bf_done = await tn.start(
partial( partial(
start_backfill, start_backfill,
tn=tn,
get_hist=get_hist, get_hist=get_hist,
mod=mod, mod=mod,
mkt=mkt, mkt=mkt,
@ -970,16 +871,13 @@ async def tsdb_backfill(
backfill_from_shm_index=backfill_gap_from_shm_index, backfill_from_shm_index=backfill_gap_from_shm_index,
backfill_from_dt=mr_start_dt, backfill_from_dt=mr_start_dt,
sampler_stream=sampler_stream, sampler_stream=sampler_stream,
backfill_until_dt=last_tsdb_dt, backfill_until_dt=last_tsdb_dt,
storage=storage, storage=storage,
write_tsdb=True, write_tsdb=True,
) )
) )
nulls_detected: trio.Event | None = None
if last_tsdb_dt is not None:
# calc the index from which the tsdb data should be # calc the index from which the tsdb data should be
# prepended, presuming there is a gap between the # prepended, presuming there is a gap between the
# latest frame (loaded/read above) and the latest # latest frame (loaded/read above) and the latest
@ -1064,10 +962,8 @@ async def tsdb_backfill(
mkt=mkt, mkt=mkt,
)) ))
# 2nd nursery END
# TODO: who would want to? # TODO: who would want to?
if nulls_detected:
await nulls_detected.wait() await nulls_detected.wait()
await bf_done.wait() await bf_done.wait()
@ -1114,6 +1010,7 @@ async def tsdb_backfill(
async def manage_history( async def manage_history(
mod: ModuleType, mod: ModuleType,
bus: _FeedsBus,
mkt: MktPair, mkt: MktPair,
some_data_ready: trio.Event, some_data_ready: trio.Event,
feed_is_live: trio.Event, feed_is_live: trio.Event,
@ -1270,6 +1167,7 @@ async def manage_history(
tsdb_backfill, tsdb_backfill,
mod=mod, mod=mod,
storemod=storemod, storemod=storemod,
# bus,
storage=client, storage=client,
mkt=mkt, mkt=mkt,
shm=tf2mem[timeframe], shm=tf2mem[timeframe],
@ -1354,11 +1252,13 @@ def iter_dfs_from_shms(
assert not opened assert not opened
ohlcv = shm.array ohlcv = shm.array
from ._anal import np2pl from ..data import tsp
df: pl.DataFrame = np2pl(ohlcv) df: pl.DataFrame = tsp.np2pl(ohlcv)
yield ( yield (
shmfile, shmfile,
shm, shm,
df, df,
) )

View File

@ -319,8 +319,9 @@ def get_null_segs(
if num_gaps < 1: if num_gaps < 1:
if absi_zeros.size > 1: if absi_zeros.size > 1:
absi_zsegs = [[ absi_zsegs = [[
# TODO: maybe mk these max()/min() limits func # see `get_hist()` in backend, should ALWAYS be
# consts instead of called more then once? # able to handle a `start_dt=None`!
# None,
max( max(
absi_zeros[0] - 1, absi_zeros[0] - 1,
0, 0,
@ -358,10 +359,7 @@ def get_null_segs(
# corresponding to the first zero-segment's row, we add it # corresponding to the first zero-segment's row, we add it
# manually here. # manually here.
absi_zsegs.append([ absi_zsegs.append([
max(
absi_zeros[0] - 1, absi_zeros[0] - 1,
0,
),
None, None,
]) ])
@ -402,18 +400,14 @@ def get_null_segs(
else: else:
if 0 < num_gaps < 2: if 0 < num_gaps < 2:
absi_zsegs[-1][1] = min( absi_zsegs[-1][1] = absi_zeros[-1] + 1
absi_zeros[-1] + 1,
frame['index'][-1],
)
iabs_first: int = frame['index'][0] iabs_first: int = frame['index'][0]
for start, end in absi_zsegs: for start, end in absi_zsegs:
ts_start: float = times[start - iabs_first] ts_start: float = times[start - iabs_first]
ts_end: float = times[end - iabs_first] ts_end: float = times[end - iabs_first]
if ( if (
(ts_start == 0 and not start == 0) ts_start == 0
or or
ts_end == 0 ts_end == 0
): ):
@ -457,13 +451,11 @@ def iter_null_segs(
], ],
None, None,
]: ]:
if not ( if null_segs is None:
null_segs := get_null_segs( null_segs: tuple = get_null_segs(
frame, frame,
period=timeframe, period=timeframe,
) )
):
return
absi_pairs_zsegs: list[list[float, float]] absi_pairs_zsegs: list[list[float, float]]
izeros: Seq izeros: Seq
@ -510,7 +502,6 @@ def iter_null_segs(
) )
# TODO: move to ._pl_anal
def with_dts( def with_dts(
df: pl.DataFrame, df: pl.DataFrame,
time_col: str = 'time', time_col: str = 'time',
@ -526,7 +517,7 @@ def with_dts(
pl.from_epoch(pl.col(time_col)).alias('dt'), pl.from_epoch(pl.col(time_col)).alias('dt'),
]).with_columns([ ]).with_columns([
pl.from_epoch( pl.from_epoch(
column=pl.col(f'{time_col}_prev'), pl.col(f'{time_col}_prev')
).alias('dt_prev'), ).alias('dt_prev'),
pl.col('dt').diff().alias('dt_diff'), pl.col('dt').diff().alias('dt_diff'),
]) #.with_columns( ]) #.with_columns(
@ -534,6 +525,19 @@ def with_dts(
# ) # )
def dedup_dt(
df: pl.DataFrame,
) -> pl.DataFrame:
'''
Drop duplicate date-time rows (normally from an OHLC frame).
'''
return df.unique(
subset=['dt'],
maintain_order=True,
)
t_unit: Literal = Literal[ t_unit: Literal = Literal[
'days', 'days',
'hours', 'hours',
@ -647,11 +651,7 @@ def dedupe(src_df: pl.DataFrame) -> tuple[
) )
# remove duplicated datetime samples/sections # remove duplicated datetime samples/sections
deduped: pl.DataFrame = df.unique( deduped: pl.DataFrame = dedup_dt(df)
subset=['dt'],
maintain_order=True,
)
deduped_gaps = detect_time_gaps(deduped) deduped_gaps = detect_time_gaps(deduped)
diff: int = ( diff: int = (

View File

@ -100,10 +100,6 @@ async def open_piker_runtime(
or [_default_reg_addr] or [_default_reg_addr]
) )
if ems := tractor_kwargs.get('enable_modules'):
# import pdbp; pdbp.set_trace()
enable_modules.extend(ems)
async with ( async with (
tractor.open_root_actor( tractor.open_root_actor(

View File

@ -1,5 +1,5 @@
# piker: trading gear for hackers # piker: trading gear for hackers
# Copyright (C) 2018-present Tyler Goodlet (in stewardship of pikers) # Copyright (C) 2018-present Tyler Goodlet (in stewardship of piker0)
# This program is free software: you can redistribute it and/or modify # This program is free software: you can redistribute it and/or modify
# it under the terms of the GNU Affero General Public License as published by # it under the terms of the GNU Affero General Public License as published by
@ -19,7 +19,6 @@ Storage middle-ware CLIs.
""" """
from __future__ import annotations from __future__ import annotations
# from datetime import datetime
from pathlib import Path from pathlib import Path
import time import time
@ -37,8 +36,11 @@ from piker.cli import cli
from piker.config import get_conf_dir from piker.config import get_conf_dir
from piker.data import ( from piker.data import (
ShmArray, ShmArray,
tsp,
)
from piker.data.history import (
iter_dfs_from_shms,
) )
from piker import tsp
from . import ( from . import (
log, log,
) )
@ -187,8 +189,8 @@ def anal(
frame=history, frame=history,
period=period, period=period,
) )
# TODO: do tsp queries to backcend to fill i missing if null_segs:
# history and then prolly write it to tsdb! await tractor.pause()
shm_df: pl.DataFrame = await client.as_df( shm_df: pl.DataFrame = await client.as_df(
fqme, fqme,
@ -204,23 +206,14 @@ def anal(
diff, diff,
) = tsp.dedupe(shm_df) ) = tsp.dedupe(shm_df)
write_edits: bool = True
if (
write_edits
and (
diff
or null_segs
)
):
await tractor.pause()
if diff:
await client.write_ohlcv( await client.write_ohlcv(
fqme, fqme,
ohlcv=deduped, ohlcv=deduped,
timeframe=period, timeframe=period,
) )
else:
# TODO: something better with tab completion.. # TODO: something better with tab completion..
# is there something more minimal but nearly as # is there something more minimal but nearly as
# functional as ipython? # functional as ipython?
@ -250,11 +243,11 @@ def ldshm(
), ),
): ):
df: pl.DataFrame | None = None df: pl.DataFrame | None = None
for shmfile, shm, shm_df in tsp.iter_dfs_from_shms(fqme): for shmfile, shm, shm_df in iter_dfs_from_shms(fqme):
# compute ohlc properties for naming # compute ohlc properties for naming
times: np.ndarray = shm.array['time'] times: np.ndarray = shm.array['time']
period_s: float = float(times[-1] - times[-2]) period_s: float = times[-1] - times[-2]
if period_s < 1.: if period_s < 1.:
raise ValueError( raise ValueError(
f'Something is wrong with time period for {shm}:\n{times}' f'Something is wrong with time period for {shm}:\n{times}'
@ -277,85 +270,10 @@ def ldshm(
# TODO: maybe only optionally enter this depending # TODO: maybe only optionally enter this depending
# on some CLI flags and/or gap detection? # on some CLI flags and/or gap detection?
if ( if not gaps.is_empty():
not gaps.is_empty() await tractor.pause()
or null_segs
):
from piker.ui._remote_ctl import (
open_annot_ctl,
AnnotCtl,
)
annot_ctl: AnnotCtl
async with open_annot_ctl() as annot_ctl:
for i in range(gaps.height):
row: pl.DataFrame = gaps[i] if null_segs:
# TODO: can we eventually remove this
# once we figure out why the epoch cols
# don't match?
iend: int = row['index'][0]
# dt: datetime = row['dt'][0]
# dt_prev: datetime = row['dt_prev'][0]
# the gap's right-most bar's OPEN value
# at that time (sample) step.
# dt_end_t: float = dt.timestamp()
# TODO: FIX HOW/WHY these aren't matching
# and are instead off by 4hours (EST
# vs. UTC?!?!)
# end_t: float = row['time']
# assert (
# dt.timestamp()
# ==
# end_t
# )
# the gap's left-most bar's CLOSE value
# at that time (sample) step.
prev_r: pl.DataFrame = df.filter(
pl.col('index') == gaps[0]['index'] - 1
)
istart: int = prev_r['index'][0]
# dt_start_t: float = dt_prev.timestamp()
# start_t: float = prev_r['time']
# assert (
# dt_start_t
# ==
# start_t
# )
# TODO: implement px-col width measure
# and ensure at least as many px-cols
# shown per rect as configured by user.
gap_w: float = abs((iend - istart))
# await tractor.pause()
if gap_w < 6:
margin: float = 6
iend += margin
istart -= margin
ro: tuple[float, float] = (
# dt_end_t,
iend,
row['open'][0],
)
lc: tuple[float, float] = (
# dt_start_t,
istart,
prev_r['close'][0],
)
aid: int = await annot_ctl.add_rect(
fqme=fqme,
timeframe=period_s,
start_pos=lc,
end_pos=ro,
)
assert aid
await tractor.pause() await tractor.pause()
# write to parquet file? # write to parquet file?

View File

@ -56,6 +56,8 @@ from datetime import datetime
from pathlib import Path from pathlib import Path
import time import time
# from bidict import bidict
# import tractor
import numpy as np import numpy as np
import polars as pl import polars as pl
from pendulum import ( from pendulum import (
@ -63,10 +65,10 @@ from pendulum import (
) )
from piker import config from piker import config
from piker import tsp
from piker.data import ( from piker.data import (
def_iohlcv_fields, def_iohlcv_fields,
ShmArray, ShmArray,
tsp,
) )
from piker.log import get_logger from piker.log import get_logger
from . import TimeseriesNotFound from . import TimeseriesNotFound

View File

@ -272,15 +272,10 @@ class ContentsLabels:
x_in: int, x_in: int,
) -> None: ) -> None:
for ( for chart, name, label, update in self._labels:
chart,
name,
label,
update,
)in self._labels:
viz = chart.get_viz(name) viz = chart.get_viz(name)
array: np.ndarray = viz.shm._array array = viz.shm.array
index = array[viz.index_field] index = array[viz.index_field]
start = index[0] start = index[0]
stop = index[-1] stop = index[-1]
@ -291,7 +286,7 @@ class ContentsLabels:
): ):
# out of range # out of range
print('WTF out of range?') print('WTF out of range?')
# continue continue
# call provided update func with data point # call provided update func with data point
try: try:
@ -299,7 +294,6 @@ class ContentsLabels:
ix = np.searchsorted(index, x_in) ix = np.searchsorted(index, x_in)
if ix > len(array): if ix > len(array):
breakpoint() breakpoint()
update(ix, array) update(ix, array)
except IndexError: except IndexError:

View File

@ -56,8 +56,8 @@ _line_styles: dict[str, int] = {
class FlowGraphic(pg.GraphicsObject): class FlowGraphic(pg.GraphicsObject):
''' '''
Base class with minimal interface for `QPainterPath` Base class with minimal interface for `QPainterPath` implemented,
implemented, real-time updated "data flow" graphics. real-time updated "data flow" graphics.
See subtypes below. See subtypes below.
@ -167,12 +167,11 @@ class FlowGraphic(pg.GraphicsObject):
return None return None
# XXX: due to a variety of weird jitter bugs and "smearing" # XXX: due to a variety of weird jitter bugs and "smearing"
# artifacts when click-drag panning and viewing history time # artifacts when click-drag panning and viewing history time series,
# series, we offer this ctx-mngr interface to allow temporarily # we offer this ctx-mngr interface to allow temporarily disabling
# disabling Qt's graphics caching mode; this is now currently # Qt's graphics caching mode; this is now currently used from
# used from ``ChartView.start/signal_ic()`` methods which also # ``ChartView.start/signal_ic()`` methods which also disable the
# disable the rt-display loop when the user is moving around # rt-display loop when the user is moving around a view.
# a view.
@cm @cm
def reset_cache(self) -> None: def reset_cache(self) -> None:
try: try:

View File

@ -49,7 +49,7 @@ from ..data._formatters import (
OHLCBarsAsCurveFmtr, # OHLC converted to line OHLCBarsAsCurveFmtr, # OHLC converted to line
StepCurveFmtr, # "step" curve (like for vlm) StepCurveFmtr, # "step" curve (like for vlm)
) )
from ..tsp import ( from ..data.tsp import (
slice_from_time, slice_from_time,
) )
from ._ohlc import ( from ._ohlc import (
@ -563,8 +563,7 @@ class Viz(Struct):
def view_range(self) -> tuple[int, int]: def view_range(self) -> tuple[int, int]:
''' '''
Return the start and stop x-indexes for the managed Return the start and stop x-indexes for the managed ``ViewBox``.
``ViewBox``.
''' '''
vr = self.plot.viewRect() vr = self.plot.viewRect()

View File

@ -470,10 +470,6 @@ async def graphics_update_loop(
if ds.hist_vars['i_last'] < ds.hist_vars['i_last_append']: if ds.hist_vars['i_last'] < ds.hist_vars['i_last_append']:
await tractor.pause() await tractor.pause()
try:
from . import _remote_ctl
_remote_ctl._dss = dss
# main real-time quotes update loop # main real-time quotes update loop
stream: tractor.MsgStream stream: tractor.MsgStream
async with feed.open_multi_stream() as stream: async with feed.open_multi_stream() as stream:
@ -523,12 +519,6 @@ async def graphics_update_loop(
quote, quote,
) )
finally:
# XXX: cancel any remote annotation control ctxs
_remote_ctl._dss = None
for ctx in _remote_ctl._ctxs:
await ctx.cancel()
def graphics_update_cycle( def graphics_update_cycle(
ds: DisplayState, ds: DisplayState,
@ -1245,7 +1235,7 @@ async def display_symbol_data(
fast from a cached watch-list. fast from a cached watch-list.
''' '''
# sbar = godwidget.window.status_bar sbar = godwidget.window.status_bar
# historical data fetch # historical data fetch
# brokermod = brokers.get_brokermod(provider) # brokermod = brokers.get_brokermod(provider)
@ -1255,11 +1245,11 @@ async def display_symbol_data(
# group_key=loading_sym_key, # group_key=loading_sym_key,
# ) # )
# for fqme in fqmes: for fqme in fqmes:
# loading_sym_key = sbar.open_status( loading_sym_key = sbar.open_status(
# f'loading {fqme} ->', f'loading {fqme} ->',
# group_key=True group_key=True
# ) )
# (TODO: make this not so shit XD) # (TODO: make this not so shit XD)
# close group status once a symbol feed fully loads to view. # close group status once a symbol feed fully loads to view.
@ -1432,7 +1422,7 @@ async def display_symbol_data(
start_fsp_displays, start_fsp_displays,
rt_linked, rt_linked,
flume, flume,
# loading_sym_key, loading_sym_key,
loglevel, loglevel,
) )

View File

@ -21,8 +21,7 @@ Higher level annotation editors.
from __future__ import annotations from __future__ import annotations
from collections import defaultdict from collections import defaultdict
from typing import ( from typing import (
Sequence, TYPE_CHECKING
TYPE_CHECKING,
) )
import pyqtgraph as pg import pyqtgraph as pg
@ -32,37 +31,24 @@ from pyqtgraph import (
QtCore, QtCore,
QtWidgets, QtWidgets,
) )
from PyQt5.QtCore import (
QPointF,
QRectF,
)
from PyQt5.QtGui import ( from PyQt5.QtGui import (
QColor, QColor,
QTransform,
) )
from PyQt5.QtWidgets import ( from PyQt5.QtWidgets import (
QGraphicsProxyWidget,
QGraphicsScene,
QLabel, QLabel,
) )
from pyqtgraph import functions as fn from pyqtgraph import functions as fn
from PyQt5.QtCore import QPointF
import numpy as np import numpy as np
from piker.types import Struct from piker.types import Struct
from ._style import ( from ._style import hcolor, _font
hcolor,
_font,
)
from ._lines import LevelLine from ._lines import LevelLine
from ..log import get_logger from ..log import get_logger
if TYPE_CHECKING: if TYPE_CHECKING:
from ._chart import ( from ._chart import GodWidget
GodWidget,
ChartPlotWidget,
)
from ._interaction import ChartView
log = get_logger(__name__) log = get_logger(__name__)
@ -79,7 +65,7 @@ class ArrowEditor(Struct):
uid: str, uid: str,
x: float, x: float,
y: float, y: float,
color: str = 'default', color='default',
pointing: str | None = None, pointing: str | None = None,
) -> pg.ArrowItem: ) -> pg.ArrowItem:
@ -265,56 +251,27 @@ class LineEditor(Struct):
return lines return lines
def as_point(
pair: Sequence[float, float] | QPointF,
) -> list[QPointF, QPointF]:
'''
Case any input tuple of floats to a a list of `QPoint` objects
for use in Qt geometry routines.
'''
if isinstance(pair, QPointF):
return pair
return QPointF(pair[0], pair[1])
# TODO: maybe implement better, something something RectItemProxy??
# -[ ] dig into details of how proxy's work?
# https://doc.qt.io/qt-5/qgraphicsscene.html#addWidget
# -[ ] consider using `.addRect()` maybe?
class SelectRect(QtWidgets.QGraphicsRectItem): class SelectRect(QtWidgets.QGraphicsRectItem):
'''
A data-view "selection rectangle": the most fundamental
geometry for annotating data views.
- https://doc.qt.io/qt-5/qgraphicsrectitem.html
- https://doc.qt.io/qt-6/qgraphicsrectitem.html
'''
def __init__( def __init__(
self, self,
viewbox: ViewBox, viewbox: ViewBox,
color: str | None = None, color: str = 'dad_blue',
) -> None: ) -> None:
super().__init__(0, 0, 1, 1) super().__init__(0, 0, 1, 1)
# self.rbScaleBox = QGraphicsRectItem(0, 0, 1, 1) # self.rbScaleBox = QGraphicsRectItem(0, 0, 1, 1)
self.vb: ViewBox = viewbox self.vb = viewbox
self._chart: 'ChartPlotWidget' = None # noqa
self._chart: ChartPlotWidget | None = None # noqa # override selection box color
# TODO: maybe allow this to be dynamic via a method?
#l override selection box color
color: str = color or 'dad_blue'
color = QColor(hcolor(color)) color = QColor(hcolor(color))
self.setPen(fn.mkPen(color, width=1)) self.setPen(fn.mkPen(color, width=1))
color.setAlpha(66) color.setAlpha(66)
self.setBrush(fn.mkBrush(color)) self.setBrush(fn.mkBrush(color))
self.setZValue(1e9) self.setZValue(1e9)
self.hide() self.hide()
self._label = None
label = self._label = QLabel() label = self._label = QLabel()
label.setTextFormat(0) # markdown label.setTextFormat(0) # markdown
@ -324,15 +281,13 @@ class SelectRect(QtWidgets.QGraphicsRectItem):
QtCore.Qt.AlignLeft QtCore.Qt.AlignLeft
# | QtCore.Qt.AlignVCenter # | QtCore.Qt.AlignVCenter
) )
label.hide() # always right after init
# proxy is created after containing scene is initialized # proxy is created after containing scene is initialized
self._label_proxy: QGraphicsProxyWidget | None = None self._label_proxy = None
self._abs_top_right: Point | None = None self._abs_top_right = None
# TODO: "swing %" might be handy here (data's max/min # TODO: "swing %" might be handy here (data's max/min # % change)
# # % change)? self._contents = [
self._contents: list[str] = [
'change: {pchng:.2f} %', 'change: {pchng:.2f} %',
'range: {rng:.2f}', 'range: {rng:.2f}',
'bars: {nbars}', 'bars: {nbars}',
@ -342,30 +297,12 @@ class SelectRect(QtWidgets.QGraphicsRectItem):
'sigma: {std:.2f}', 'sigma: {std:.2f}',
] ]
self.add_to_view(viewbox)
def add_to_view(
self,
view: ChartView,
) -> None:
'''
Self-defined view hookup impl which will
also re-assign the internal ref.
'''
view.addItem(
self,
ignoreBounds=True,
)
if self.vb is not view:
self.vb = view
@property @property
def chart(self) -> ChartPlotWidget: # noqa def chart(self) -> 'ChartPlotWidget': # noqa
return self._chart return self._chart
@chart.setter @chart.setter
def chart(self, chart: ChartPlotWidget) -> None: # noqa def chart(self, chart: 'ChartPlotWidget') -> None: # noqa
self._chart = chart self._chart = chart
chart.sigRangeChanged.connect(self.update_on_resize) chart.sigRangeChanged.connect(self.update_on_resize)
palette = self._label.palette() palette = self._label.palette()
@ -378,155 +315,57 @@ class SelectRect(QtWidgets.QGraphicsRectItem):
) )
def update_on_resize(self, vr, r): def update_on_resize(self, vr, r):
''' """Re-position measure label on view range change.
Re-position measure label on view range change.
''' """
if self._abs_top_right: if self._abs_top_right:
self._label_proxy.setPos( self._label_proxy.setPos(
self.vb.mapFromView(self._abs_top_right) self.vb.mapFromView(self._abs_top_right)
) )
def set_scen_pos( def mouse_drag_released(
self, self,
scen_p1: QPointF, p1: QPointF,
scen_p2: QPointF, p2: QPointF
update_label: bool = True,
) -> None: ) -> None:
''' """Called on final button release for mouse drag with start and
Set position from scene coords of selection rect (normally end positions.
from mouse position) and accompanying label, move label to
match.
''' """
# NOTE XXX: apparently just setting it doesn't work!? self.set_pos(p1, p2)
# i have no idea why but it's pretty weird we have to do
# this transform thing which was basically pulled verbatim
# from the `pg.ViewBox.updateScaleBox()` method.
view_rect: QRectF = self.vb.childGroup.mapRectFromScene(
QRectF(
scen_p1,
scen_p2,
)
)
self.setPos(view_rect.topLeft())
# XXX: does not work..!?!?
# https://doc.qt.io/qt-5/qgraphicsrectitem.html#setRect
# self.setRect(view_rect)
tr = QTransform.fromScale( def set_pos(
view_rect.width(),
view_rect.height(),
)
self.setTransform(tr)
# XXX: never got this working, was always offset
# / transformed completely wrong (and off to the far right
# from the cursor?)
# self.set_view_pos(
# view_rect=view_rect,
# # self.vwqpToView(p1),
# # self.vb.mapToView(p2),
# # start_pos=self.vb.mapToScene(p1),
# # end_pos=self.vb.mapToScene(p2),
# )
self.show()
if update_label:
self.init_label(view_rect)
def set_view_pos(
self, self,
p1: QPointF,
start_pos: QPointF | Sequence[float, float] | None = None, p2: QPointF
end_pos: QPointF | Sequence[float, float] | None = None,
view_rect: QRectF | None = None,
update_label: bool = True,
) -> None: ) -> None:
''' """Set position of selection rect and accompanying label, move
Set position from `ViewBox` coords (i.e. from the actual label to match.
data domain) of rect (and any accompanying label which is
moved to match).
''' """
if self._chart is None: if self._label_proxy is None:
raise RuntimeError( # https://doc.qt.io/qt-5/qgraphicsproxywidget.html
'You MUST assign a `SelectRect.chart: ChartPlotWidget`!' self._label_proxy = self.vb.scene().addWidget(self._label)
)
if view_rect is None: start_pos = self.vb.mapToView(p1)
# ensure point casting end_pos = self.vb.mapToView(p2)
start_pos: QPointF = as_point(start_pos)
end_pos: QPointF = as_point(end_pos)
# map to view coords and update area # map to view coords and update area
view_rect = QtCore.QRectF( r = QtCore.QRectF(start_pos, end_pos)
start_pos,
end_pos,
)
self.setPos(view_rect.topLeft()) # old way; don't need right?
# lr = QtCore.QRectF(p1, p2)
# r = self.vb.childGroup.mapRectFromParent(lr)
# NOTE: SERIOUSLY NO IDEA WHY THIS WORKS... self.setPos(r.topLeft())
# but it does and all the other commented stuff above self.resetTransform()
# dint, dawg.. self.setRect(r)
# self.resetTransform()
# self.setRect(view_rect)
tr = QTransform.fromScale(
view_rect.width(),
view_rect.height(),
)
self.setTransform(tr)
if update_label:
self.init_label(view_rect)
print(
'SelectRect modify:\n'
f'QRectF: {view_rect}\n'
f'start_pos: {start_pos}\n'
f'end_pos: {end_pos}\n'
)
self.show() self.show()
def init_label( y1, y2 = start_pos.y(), end_pos.y()
self, x1, x2 = start_pos.x(), end_pos.x()
view_rect: QRectF,
) -> QLabel:
# should be init-ed in `.__init__()` # TODO: heh, could probably use a max-min streamin algo here too
label: QLabel = self._label
cv: ChartView = self.vb
# https://doc.qt.io/qt-5/qgraphicsproxywidget.html
if self._label_proxy is None:
scen: QGraphicsScene = cv.scene()
# NOTE: specifically this is passing a widget
# pointer to the scene's `.addWidget()` as per,
# https://doc.qt.io/qt-5/qgraphicsproxywidget.html#embedding-a-widget-with-qgraphicsproxywidget
self._label_proxy: QGraphicsProxyWidget = scen.addWidget(label)
# get label startup coords
tl: QPointF = view_rect.topLeft()
br: QPointF = view_rect.bottomRight()
x1, y1 = tl.x(), tl.y()
x2, y2 = br.x(), br.y()
# TODO: to remove, previous label corner point unpacking
# x1, y1 = start_pos.x(), start_pos.y()
# x2, y2 = end_pos.x(), end_pos.y()
# y1, y2 = start_pos.y(), end_pos.y()
# x1, x2 = start_pos.x(), end_pos.x()
# TODO: heh, could probably use a max-min streamin algo
# here too?
_, xmn = min(y1, y2), min(x1, x2) _, xmn = min(y1, y2), min(x1, x2)
ymx, xmx = max(y1, y2), max(x1, x2) ymx, xmx = max(y1, y2), max(x1, x2)
@ -536,35 +375,26 @@ class SelectRect(QtWidgets.QGraphicsRectItem):
ixmn, ixmx = round(xmn), round(xmx) ixmn, ixmx = round(xmn), round(xmx)
nbars = ixmx - ixmn + 1 nbars = ixmx - ixmn + 1
chart: ChartPlotWidget = self._chart chart = self._chart
data: np.ndarray = chart.get_viz( data = chart.get_viz(chart.name).shm.array[ixmn:ixmx]
chart.name
).shm.array[ixmn:ixmx]
if len(data): if len(data):
std: float = data['close'].std() std = data['close'].std()
dmx: float = data['high'].max() dmx = data['high'].max()
dmn: float = data['low'].min() dmn = data['low'].min()
else: else:
dmn = dmx = std = np.nan dmn = dmx = std = np.nan
# update label info # update label info
label.setText('\n'.join(self._contents).format( self._label.setText('\n'.join(self._contents).format(
pchng=pchng, pchng=pchng, rng=rng, nbars=nbars,
rng=rng, std=std, dmx=dmx, dmn=dmn,
nbars=nbars,
std=std,
dmx=dmx,
dmn=dmn,
)) ))
# print(f'x2, y2: {(x2, y2)}') # print(f'x2, y2: {(x2, y2)}')
# print(f'xmn, ymn: {(xmn, ymx)}') # print(f'xmn, ymn: {(xmn, ymx)}')
label_anchor = Point( label_anchor = Point(xmx + 2, ymx)
xmx + 2,
ymx,
)
# XXX: in the drag bottom-right -> top-left case we don't # XXX: in the drag bottom-right -> top-left case we don't
# want the label to overlay the box. # want the label to overlay the box.
@ -573,16 +403,13 @@ class SelectRect(QtWidgets.QGraphicsRectItem):
# # label_anchor = Point(x2, y2 + self._label.height()) # # label_anchor = Point(x2, y2 + self._label.height())
# label_anchor = Point(xmn, ymn) # label_anchor = Point(xmn, ymn)
self._abs_top_right: Point = label_anchor self._abs_top_right = label_anchor
self._label_proxy.setPos( self._label_proxy.setPos(self.vb.mapFromView(label_anchor))
cv.mapFromView(label_anchor) # self._label.show()
)
label.show()
def clear(self): def clear(self):
''' """Clear the selection box from view.
Clear the selection box from view.
''' """
self._label.hide() self._label.hide()
self.hide() self.hide()

View File

@ -181,10 +181,7 @@ async def open_fsp_sidepane(
async def open_fsp_actor_cluster( async def open_fsp_actor_cluster(
names: list[str] = ['fsp_0', 'fsp_1'], names: list[str] = ['fsp_0', 'fsp_1'],
) -> AsyncGenerator[ ) -> AsyncGenerator[int, dict[str, tractor.Portal]]:
int,
dict[str, tractor.Portal]
]:
from tractor._clustering import open_actor_cluster from tractor._clustering import open_actor_cluster
@ -560,7 +557,7 @@ class FspAdmin:
conf: dict, # yeah probably dumb.. conf: dict, # yeah probably dumb..
loglevel: str = 'error', loglevel: str = 'error',
) -> trio.Event: ) -> (trio.Event, ChartPlotWidget):
flume, started = await self.start_engine_task( flume, started = await self.start_engine_task(
target, target,
@ -927,7 +924,7 @@ async def start_fsp_displays(
linked: LinkedSplits, linked: LinkedSplits,
flume: Flume, flume: Flume,
# group_status_key: str, group_status_key: str,
loglevel: str, loglevel: str,
) -> None: ) -> None:
@ -974,23 +971,21 @@ async def start_fsp_displays(
flume, flume,
) as admin, ) as admin,
): ):
statuses: list[trio.Event] = [] statuses = []
for target, conf in fsp_conf.items(): for target, conf in fsp_conf.items():
started: trio.Event = await admin.open_fsp_chart( started = await admin.open_fsp_chart(
target, target,
conf, conf,
) )
# done = linked.window().status_bar.open_status( done = linked.window().status_bar.open_status(
# f'loading fsp, {target}..', f'loading fsp, {target}..',
# group_key=group_status_key, group_key=group_status_key,
# ) )
# statuses.append((started, done)) statuses.append((started, done))
statuses.append(started)
# for fsp_loaded, status_cb in statuses: for fsp_loaded, status_cb in statuses:
for fsp_loaded in statuses:
await fsp_loaded.wait() await fsp_loaded.wait()
profiler(f'attached to fsp portal: {target}') profiler(f'attached to fsp portal: {target}')
# status_cb() status_cb()
# blocks on nursery until all fsp actors complete # blocks on nursery until all fsp actors complete

View File

@ -30,11 +30,7 @@ from typing import (
) )
import pyqtgraph as pg import pyqtgraph as pg
# NOTE XXX: pg is super annoying and re-implements it's own mouse # from pyqtgraph.GraphicsScene import mouseEvents
# event subsystem.. we should really look into re-working/writing
# this down the road.. Bo
from pyqtgraph.GraphicsScene import mouseEvents as mevs
# from pyqtgraph.GraphicsScene.mouseEvents import MouseDragEvent
from PyQt5.QtWidgets import QGraphicsSceneMouseEvent as gs_mouse from PyQt5.QtWidgets import QGraphicsSceneMouseEvent as gs_mouse
from PyQt5.QtGui import ( from PyQt5.QtGui import (
QWheelEvent, QWheelEvent,
@ -470,7 +466,6 @@ class ChartView(ViewBox):
mode_name: str = 'view' mode_name: str = 'view'
def_delta: float = 616 * 6 def_delta: float = 616 * 6
def_scale_factor: float = 1.016 ** (def_delta * -1 / 20) def_scale_factor: float = 1.016 ** (def_delta * -1 / 20)
# annots: dict[int, GraphicsObject] = {}
def __init__( def __init__(
self, self,
@ -491,7 +486,6 @@ class ChartView(ViewBox):
# defaultPadding=0., # defaultPadding=0.,
**kwargs **kwargs
) )
# for "known y-range style" # for "known y-range style"
self._static_yrange = static_yrange self._static_yrange = static_yrange
@ -506,11 +500,7 @@ class ChartView(ViewBox):
# add our selection box annotator # add our selection box annotator
self.select_box = SelectRect(self) self.select_box = SelectRect(self)
# self.select_box.add_to_view(self) self.addItem(self.select_box, ignoreBounds=True)
# self.addItem(
# self.select_box,
# ignoreBounds=True,
# )
self.mode = None self.mode = None
self.order_mode: bool = False self.order_mode: bool = False
@ -721,18 +711,17 @@ class ChartView(ViewBox):
def mouseDragEvent( def mouseDragEvent(
self, self,
ev: mevs.MouseDragEvent, ev,
axis: int | None = None, axis: int | None = None,
) -> None: ) -> None:
pos: Point = ev.pos() pos = ev.pos()
lastPos: Point = ev.lastPos() lastPos = ev.lastPos()
dif: Point = (pos - lastPos) * -1 dif = pos - lastPos
# dif: Point = pos - lastPos dif = dif * -1
# dif: Point = dif * -1
# NOTE: if axis is specified, event will only affect that axis. # NOTE: if axis is specified, event will only affect that axis.
btn = ev.button() button = ev.button()
# Ignore axes if mouse is disabled # Ignore axes if mouse is disabled
mouseEnabled = np.array( mouseEnabled = np.array(
@ -744,7 +733,7 @@ class ChartView(ViewBox):
mask[1-axis] = 0.0 mask[1-axis] = 0.0
# Scale or translate based on mouse button # Scale or translate based on mouse button
if btn & ( if button & (
QtCore.Qt.LeftButton | QtCore.Qt.MidButton QtCore.Qt.LeftButton | QtCore.Qt.MidButton
): ):
# zoom y-axis ONLY when click-n-drag on it # zoom y-axis ONLY when click-n-drag on it
@ -767,55 +756,34 @@ class ChartView(ViewBox):
# XXX: WHY # XXX: WHY
ev.accept() ev.accept()
down_pos: Point = ev.buttonDownPos( down_pos = ev.buttonDownPos()
btn=btn,
)
scen_pos: Point = ev.scenePos()
scen_down_pos: Point = ev.buttonDownScenePos(
btn=btn,
)
# This is the final position in the drag # This is the final position in the drag
if ev.isFinish(): if ev.isFinish():
# import pdbp; pdbp.set_trace() self.select_box.mouse_drag_released(down_pos, pos)
# NOTE: think of this as a `.mouse_drag_release()`
# (bc HINT that's what i called the shit ass
# method that wrapped this call [yes, as a single
# fucking call] originally.. you bish, guille)
# Bo.. oraleeee
self.select_box.set_scen_pos(
# down_pos,
# pos,
scen_down_pos,
scen_pos,
)
# this is the zoom transform cmd
ax = QtCore.QRectF(down_pos, pos) ax = QtCore.QRectF(down_pos, pos)
ax = self.childGroup.mapRectFromParent(ax) ax = self.childGroup.mapRectFromParent(ax)
# self.showAxRect(ax)
# this is the zoom transform cmd
self.showAxRect(ax)
# axis history tracking # axis history tracking
self.axHistoryPointer += 1 self.axHistoryPointer += 1
self.axHistory = self.axHistory[ self.axHistory = self.axHistory[
:self.axHistoryPointer] + [ax] :self.axHistoryPointer] + [ax]
else: else:
self.select_box.set_scen_pos( print('drag finish?')
# down_pos, self.select_box.set_pos(down_pos, pos)
# pos,
scen_down_pos,
scen_pos,
)
# update shape of scale box # update shape of scale box
# self.updateScaleBox(ev.buttonDownPos(), ev.pos()) # self.updateScaleBox(ev.buttonDownPos(), ev.pos())
# breakpoint() self.updateScaleBox(
# self.updateScaleBox( down_pos,
# down_pos, ev.pos(),
# ev.pos(), )
# )
# PANNING MODE # PANNING MODE
else: else:
@ -854,7 +822,7 @@ class ChartView(ViewBox):
# ev.accept() # ev.accept()
# WEIRD "RIGHT-CLICK CENTER ZOOM" MODE # WEIRD "RIGHT-CLICK CENTER ZOOM" MODE
elif btn & QtCore.Qt.RightButton: elif button & QtCore.Qt.RightButton:
if self.state['aspectLocked'] is not False: if self.state['aspectLocked'] is not False:
mask[0] = 0 mask[0] = 0

View File

@ -24,6 +24,8 @@ view transforms.
""" """
import pyqtgraph as pg import pyqtgraph as pg
from ._axes import Axis
def invertQTransform(tr): def invertQTransform(tr):
"""Return a QTransform that is the inverse of *tr*. """Return a QTransform that is the inverse of *tr*.
@ -51,9 +53,6 @@ def _do_overrides() -> None:
pg.functions.invertQTransform = invertQTransform pg.functions.invertQTransform = invertQTransform
pg.PlotItem = PlotItem pg.PlotItem = PlotItem
from ._axes import Axis
pg.Axis = Axis
# enable "QPainterPathPrivate for faster arrayToQPath" from # enable "QPainterPathPrivate for faster arrayToQPath" from
# https://github.com/pyqtgraph/pyqtgraph/pull/2324 # https://github.com/pyqtgraph/pyqtgraph/pull/2324
pg.setConfigOption('enableExperimental', True) pg.setConfigOption('enableExperimental', True)
@ -235,7 +234,7 @@ class PlotItem(pg.PlotItem):
# ``ViewBox`` geometry bug.. where a gap for the # ``ViewBox`` geometry bug.. where a gap for the
# 'bottom' axis is somehow left in? # 'bottom' axis is somehow left in?
# axis = pg.AxisItem(orientation=name, parent=self) # axis = pg.AxisItem(orientation=name, parent=self)
axis = pg.Axis( axis = Axis(
self, self,
orientation=name, orientation=name,
parent=self, parent=self,

View File

@ -1,273 +0,0 @@
# piker: trading gear for hackers
# Copyright (C) Tyler Goodlet (in stewardship for pikers)
# This program is free software: you can redistribute it and/or modify
# it under the terms of the GNU Affero General Public License as published by
# the Free Software Foundation, either version 3 of the License, or
# (at your option) any later version.
# This program is distributed in the hope that it will be useful,
# but WITHOUT ANY WARRANTY; without even the implied warranty of
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
# GNU Affero General Public License for more details.
# You should have received a copy of the GNU Affero General Public License
# along with this program. If not, see <https://www.gnu.org/licenses/>.
'''
Remote control tasks for sending annotations (and maybe more cmds)
to a chart from some other actor.
'''
from __future__ import annotations
from contextlib import asynccontextmanager as acm
from pprint import pformat
from typing import (
Any,
AsyncContextManager,
)
import tractor
from tractor import trionics
from tractor import (
Portal,
Context,
MsgStream,
)
from PyQt5.QtWidgets import (
QGraphicsItem,
)
from piker.log import get_logger
from piker.types import Struct
from piker.service import find_service
from ._display import DisplayState
from ._interaction import ChartView
from ._editors import SelectRect
from ._chart import ChartPlotWidget
log = get_logger(__name__)
# NOTE: this is set by the `._display.graphics_update_loop()` once
# all chart widgets / Viz per flume have been initialized allowing
# for remote annotation (control) of any chart-actor's mkt feed by
# fqme lookup Bo
_dss: dict[str, DisplayState] | None = None
# stash each and every client connection so that they can all
# be cancelled on shutdown/error.
# TODO: make `tractor.Context` hashable via is `.cid: str`?
# _ctxs: set[Context] = set()
_ctxs: list[Context] = []
# global map of all uniquely created annotation-graphics
# so that they can be mutated (eventually) by a client.
_annots: dict[int, QGraphicsItem] = {}
@tractor.context
async def remote_annotate(
ctx: Context,
) -> None:
global _dss, _ctxs
assert _dss
_ctxs.append(ctx)
# send back full fqme symbology to caller
await ctx.started(list(_dss))
async with ctx.open_stream() as annot_req_stream:
async for msg in annot_req_stream:
match msg:
case {
'annot': 'SelectRect',
'fqme': fqme,
'timeframe': timeframe,
'meth': str(meth),
'kwargs': dict(kwargs),
}:
ds: DisplayState = _dss[fqme]
chart: ChartPlotWidget = {
60: ds.hist_chart,
1: ds.chart,
}[timeframe]
cv: ChartView = chart.cv
# sanity
if timeframe == 60:
assert (
chart.linked.godwidget.hist_linked.chart.view
is
cv
)
# annot type lookup from cmd
rect = SelectRect(
viewbox=cv,
# TODO: make this more dynamic?
# -[ ] pull from conf.toml?
# -[ ] add `.set_color()` method to type?
# -[ ] make a green/red based on direction
# instead of default static color?
color=kwargs.pop('color', None),
)
# XXX NOTE: this is REQUIRED to set the rect
# resize callback!
rect.chart: ChartPlotWidget = chart
# delegate generically to the requested method
getattr(rect, meth)(**kwargs)
rect.show()
await annot_req_stream.send(id(rect))
case _:
log.error(
'Unknown remote annotation cmd:\n'
f'{pformat(msg)}'
)
class AnnotCtl(Struct):
'''
A control for remote "data annotations".
You know those "squares they always show in machine vision
UIs.." this API allows you to remotely control stuff like that
in some other graphics actor.
'''
ctx2fqmes: dict[str, str]
fqme2stream: dict[str, MsgStream]
async def add_rect(
self,
fqme: str,
timeframe: float,
start_pos: tuple[float, float],
end_pos: tuple[float, float],
# TODO: a `Literal['view', 'scene']` for this?
domain: str = 'view', # or 'scene'
color: str = 'dad_blue',
) -> int:
'''
Add a `SelectRect` annotation to the target view, return
the instances `id(obj)` from the remote UI actor.
'''
ipc: MsgStream = self.fqme2stream[fqme]
await ipc.send({
'fqme': fqme,
'annot': 'SelectRect',
'timeframe': timeframe,
# 'meth': str(meth),
'meth': 'set_view_pos' if domain == 'view' else 'set_scene_pos',
'kwargs': {
'start_pos': tuple(start_pos),
'end_pos': tuple(end_pos),
'color': color,
'update_label': False,
},
})
return (await ipc.receive())
async def modify(
self,
aid: int, # annotation id
meth: str, # far end graphics object method to invoke
params: dict[str, Any], # far end `meth(**kwargs)`
) -> bool:
'''
Modify an existing (remote) annotation's graphics
paramters, thus changing it's appearance / state in real
time.
'''
raise NotImplementedError
async def remove(
self,
uid: int,
) -> bool:
'''
Remove an existing annotation by instance id.
'''
raise NotImplementedError
@acm
async def open_annot_ctl(
uid: tuple[str, str] | None = None,
) -> AnnotCtl:
# TODO: load connetion to a specific chart actor
# -[ ] pull from either service scan or config
# -[ ] return some kinda client/proxy thinger?
# -[ ] maybe we should finally just provide this as
# a `tractor.hilevel.CallableProxy` or wtv?
# -[ ] use this from the storage.cli stuff to mark up gaps!
maybe_portals: list[Portal] | None
fqmes: list[str]
async with find_service(
service_name='chart',
first_only=False,
) as maybe_portals:
ctx_mngrs: list[AsyncContextManager] = []
# TODO: print the current discoverable actor UID set
# here as well?
if not maybe_portals:
raise RuntimeError('No chart UI actors found in service domain?')
for portal in maybe_portals:
ctx_mngrs.append(
portal.open_context(remote_annotate)
)
ctx2fqmes: dict[str, set[str]] = {}
fqme2stream: dict[str, MsgStream] = {}
client = AnnotCtl(
ctx2fqmes=ctx2fqmes,
fqme2stream=fqme2stream,
)
stream_ctxs: list[AsyncContextManager] = []
async with trionics.gather_contexts(ctx_mngrs) as ctxs:
for (ctx, fqmes) in ctxs:
stream_ctxs.append(ctx.open_stream())
# fill lookup table of mkt addrs to IPC ctxs
for fqme in fqmes:
if other := fqme2stream.get(fqme):
raise ValueError(
f'More then one chart displays {fqme}!?\n'
'Other UI actor info:\n'
f'channel: {other._ctx.chan}]\n'
f'actor uid: {other._ctx.chan.uid}]\n'
f'ctx id: {other._ctx.cid}]\n'
)
ctx2fqmes.setdefault(
ctx.cid,
set(),
).add(fqme)
async with trionics.gather_contexts(stream_ctxs) as streams:
for stream in streams:
fqmes: set[str] = ctx2fqmes[stream._ctx.cid]
for fqme in fqmes:
fqme2stream[fqme] = stream
yield client
# TODO: on graceful teardown should we try to
# remove all annots that were created/modded?

View File

@ -228,10 +228,5 @@ def chart(
'loglevel': tractorloglevel, 'loglevel': tractorloglevel,
'name': 'chart', 'name': 'chart',
'registry_addrs': list(set(regaddrs)), 'registry_addrs': list(set(regaddrs)),
'enable_modules': [
# remote data-view annotations Bo
'piker.ui._remote_ctl',
],
}, },
) )

View File

@ -31,7 +31,7 @@ import pendulum
import pyqtgraph as pg import pyqtgraph as pg
from piker.types import Struct from piker.types import Struct
from ..tsp import slice_from_time from ..data.tsp import slice_from_time
from ..log import get_logger from ..log import get_logger
from ..toolz import Profiler from ..toolz import Profiler