Compare commits

..

No commits in common. "a681b2f0bbc7fe03a00b0fb931527b4c901f5140" and "1f9a4976378f863193572a0b64a5dba5e4aaf672" have entirely different histories.

20 changed files with 390 additions and 1113 deletions

View File

@ -843,7 +843,6 @@ class Client:
self,
contract: Contract,
timeout: float = 1,
tries: int = 100,
raise_on_timeout: bool = False,
) -> Ticker | None:
@ -858,30 +857,30 @@ class Client:
ready: ticker.TickerUpdateEvent = ticker.updateEvent
# ensure a last price gets filled in before we deliver quote
timeouterr: Exception | None = None
warnset: bool = False
for _ in range(tries):
# wait for a first update(Event) indicatingn a
# live quote feed.
for _ in range(100):
if isnan(ticker.last):
# wait for a first update(Event)
try:
tkr = await asyncio.wait_for(
ready,
timeout=timeout,
)
if tkr:
break
except TimeoutError as err:
timeouterr = err
await asyncio.sleep(0.01)
continue
except TimeoutError:
import pdbp
pdbp.set_trace()
if raise_on_timeout:
raise
return None
if tkr:
break
else:
if not warnset:
log.warning(
f'Quote req timed out..maybe venue is closed?\n'
f'{asdict(contract)}'
f'Quote for {contract} timed out: market is closed?'
)
warnset = True
@ -889,11 +888,6 @@ class Client:
log.info(f'Got first quote for {contract}')
break
else:
if timeouterr and raise_on_timeout:
import pdbp
pdbp.set_trace()
raise timeouterr
if not warnset:
log.warning(
f'Contract {contract} is not returning a quote '
@ -901,8 +895,6 @@ class Client:
)
warnset = True
return None
return ticker
# async to be consistent for the client proxy, and cuz why not.

View File

@ -943,11 +943,6 @@ async def stream_quotes(
contract=con,
raise_on_timeout=True,
)
first_quote: dict = normalize(first_ticker)
log.info(
'Rxed init quote:\n'
f'{pformat(first_quote)}'
)
cs: trio.CancelScope | None = None
startup: bool = True
while (

View File

@ -134,19 +134,14 @@ def get_app_dir(
_click_config_dir: Path = Path(get_app_dir('piker'))
_config_dir: Path = _click_config_dir
_parent_user: str = os.environ.get('SUDO_USER')
# NOTE: when using `sudo` we attempt to determine the non-root user
# and still use their normal config dir.
if (
(_parent_user := os.environ.get('SUDO_USER'))
and
_parent_user != 'root'
):
if _parent_user:
non_root_user_dir = Path(
os.path.expanduser(f'~{_parent_user}')
)
root: str = 'root'
_ccds: str = str(_click_config_dir) # click config dir as string
_ccds: str = str(_click_config_dir) # click config dir string
i_tail: int = int(_ccds.rfind(root) + len(root))
_config_dir = (
non_root_user_dir

View File

@ -45,7 +45,10 @@ import trio
from trio.abc import ReceiveChannel
from trio_typing import TaskStatus
import tractor
from tractor import trionics
from tractor.trionics import (
maybe_open_context,
gather_contexts,
)
from piker.accounting import (
MktPair,
@ -66,7 +69,7 @@ from .validate import (
FeedInit,
validate_backend,
)
from ..tsp import (
from .history import (
manage_history,
)
from .ingest import get_ingestormod
@ -121,8 +124,6 @@ class _FeedsBus(Struct):
trio.CancelScope] = trio.TASK_STATUS_IGNORED,
) -> None:
with trio.CancelScope() as cs:
# TODO: shouldn't this be a direct await to avoid
# cancellation contagion to the bus nursery!?!?!
await self.nursery.start(
target,
*args,
@ -329,6 +330,7 @@ async def allocate_persistent_feed(
) = await bus.nursery.start(
manage_history,
mod,
bus,
mkt,
some_data_ready,
feed_is_live,
@ -405,13 +407,7 @@ async def allocate_persistent_feed(
rt_shm.array['time'][1] = ts + 1
elif hist_shm.array.size == 0:
for i in range(100):
await trio.sleep(0.1)
if hist_shm.array.size > 0:
break
else:
await tractor.pause()
raise RuntimeError(f'History (1m) Shm for {fqme} is empty!?')
raise RuntimeError(f'History (1m) Shm for {fqme} is empty!?')
# wait the spawning parent task to register its subscriber
# send-stream entry before we start the sample loop.
@ -457,12 +453,8 @@ async def open_feed_bus(
if loglevel is None:
loglevel = tractor.current_actor().loglevel
# XXX: required to propagate ``tractor`` loglevel to piker
# logging
get_console_log(
loglevel
or tractor.current_actor().loglevel
)
# XXX: required to propagate ``tractor`` loglevel to piker logging
get_console_log(loglevel or tractor.current_actor().loglevel)
# local state sanity checks
# TODO: check for any stale shm entries for this symbol
@ -472,7 +464,7 @@ async def open_feed_bus(
assert 'brokerd' in servicename
assert brokername in servicename
bus: _FeedsBus = get_feed_bus(brokername)
bus = get_feed_bus(brokername)
sub_registered = trio.Event()
flumes: dict[str, Flume] = {}
@ -776,7 +768,7 @@ async def maybe_open_feed(
'''
fqme = fqmes[0]
async with trionics.maybe_open_context(
async with maybe_open_context(
acm_func=open_feed,
kwargs={
'fqmes': fqmes,
@ -796,7 +788,7 @@ async def maybe_open_feed(
# add a new broadcast subscription for the quote stream
# if this feed is likely already in use
async with trionics.gather_contexts(
async with gather_contexts(
mngrs=[stream.subscribe() for stream in feed.streams.values()]
) as bstreams:
for bstream, flume in zip(bstreams, feed.flumes.values()):
@ -856,7 +848,7 @@ async def open_feed(
)
portals: tuple[tractor.Portal]
async with trionics.gather_contexts(
async with gather_contexts(
brokerd_ctxs,
) as portals:
@ -908,7 +900,7 @@ async def open_feed(
assert len(feed.mods) == len(feed.portals)
async with (
trionics.gather_contexts(bus_ctxs) as ctxs,
gather_contexts(bus_ctxs) as ctxs,
):
stream_ctxs: list[tractor.MsgStream] = []
for (
@ -950,7 +942,7 @@ async def open_feed(
brokermod: ModuleType
fqmes: list[str]
async with (
trionics.gather_contexts(stream_ctxs) as streams,
gather_contexts(stream_ctxs) as streams,
):
for (
stream,
@ -966,12 +958,6 @@ async def open_feed(
if brokermod.name == flume.mkt.broker:
flume.stream = stream
assert (
len(feed.mods)
==
len(feed.portals)
==
len(feed.streams)
)
assert len(feed.mods) == len(feed.portals) == len(feed.streams)
yield feed

View File

@ -32,7 +32,6 @@ from __future__ import annotations
from datetime import datetime
from functools import partial
from pathlib import Path
from pprint import pformat
from types import ModuleType
from typing import (
Callable,
@ -54,64 +53,25 @@ import polars as pl
from ..accounting import (
MktPair,
)
from ..data._util import (
from ._util import (
log,
)
from ..data._sharedmem import (
from ._sharedmem import (
maybe_open_shm_array,
ShmArray,
)
from ..data._source import def_iohlcv_fields
from ..data._sampling import (
from ._source import def_iohlcv_fields
from ._sampling import (
open_sample_stream,
)
from ._anal import (
from .tsp import (
dedupe,
get_null_segs,
iter_null_segs,
Frame,
Seq,
# codec-ish
np2pl,
pl2np,
# `numpy` only
slice_from_time,
# `polars` specific
dedupe,
with_dts,
detect_time_gaps,
sort_diff,
# TODO:
detect_price_gaps
Frame,
# Seq,
)
__all__: list[str] = [
'dedupe',
'get_null_segs',
'iter_null_segs',
'sort_diff',
'slice_from_time',
'Frame',
'Seq',
'np2pl',
'pl2np',
'slice_from_time',
'with_dts',
'detect_time_gaps',
'sort_diff',
# TODO:
'detect_price_gaps'
]
# TODO: break up all this shite into submods!
from ..brokers._util import (
DataUnavailable,
)
@ -269,20 +229,16 @@ async def maybe_fill_null_segments(
# - remember that in the display side, only refersh this
# if the respective history is actually "in view".
# loop
try:
await sampler_stream.send({
'broadcast_all': {
await sampler_stream.send({
'broadcast_all': {
# XXX NOTE XXX: see the
# `.ui._display.increment_history_view()` if block
# that looks for this info to FORCE a hard viz
# redraw!
'backfilling': (mkt.fqme, timeframe),
},
})
except tractor.ContextCancelled:
# log.exception
await tractor.pause()
# XXX NOTE XXX: see the
# `.ui._display.increment_history_view()` if block
# that looks for this info to FORCE a hard viz
# redraw!
'backfilling': (mkt.fqme, timeframe),
},
})
null_segs_detected.set()
# RECHECK for more null-gaps
@ -296,68 +252,39 @@ async def maybe_fill_null_segments(
and
len(null_segs[-1])
):
(
iabs_slices,
iabs_zero_rows,
zero_t,
) = null_segs
log.warning(
f'{len(iabs_slices)} NULL TIME SEGMENTS DETECTED!\n'
f'{pformat(iabs_slices)}'
)
await tractor.pause()
# TODO: always backfill gaps with the earliest (price) datum's
# value to avoid the y-ranger including zeros and completely
# stretching the y-axis..
# array: np.ndarray = shm.array
# zeros = array[array['low'] == 0]
ohlc_fields: list[str] = [
array = shm.array
zeros = array[array['low'] == 0]
# always backfill gaps with the earliest (price) datum's
# value to avoid the y-ranger including zeros and completely
# stretching the y-axis..
if 0 < zeros.size:
zeros[[
'open',
'high',
'low',
'close',
]
]] = shm._array[zeros['index'][0] - 1]['close']
for istart, istop in iabs_slices:
# get view into buffer for null-segment
gap: np.ndarray = shm._array[istart:istop]
# copy the oldest OHLC samples forward
gap[ohlc_fields] = shm._array[istart]['close']
start_t: float = shm._array[istart]['time']
t_diff: float = (istop - istart)*timeframe
gap['time'] = np.arange(
start=start_t,
stop=start_t + t_diff,
step=timeframe,
)
await sampler_stream.send({
'broadcast_all': {
# XXX NOTE XXX: see the
# `.ui._display.increment_history_view()` if block
# that looks for this info to FORCE a hard viz
# redraw!
'backfilling': (mkt.fqme, timeframe),
},
})
# TODO: interatively step through any remaining
# time-gaps/null-segments and spawn piecewise backfiller
# tasks in a nursery?
# -[ ] not sure that's going to work so well on the ib
# backend but worth a shot?
# -[ ] mk new history connections to make it properly
# parallel possible no matter the backend?
# -[ ] fill algo: do queries in alternating "latest, then
# earliest, then latest.. etc?"
# await tractor.pause()
# TODO: interatively step through any remaining
# time-gaps/null-segments and spawn piecewise backfiller
# tasks in a nursery?
# -[ ] not sure that's going to work so well on the ib
# backend but worth a shot?
# -[ ] mk new history connections to make it properly
# parallel possible no matter the backend?
# -[ ] fill algo: do queries in alternating "latest, then
# earliest, then latest.. etc?"
# if (
# next_end_dt not in frame[
# ):
# pass
async def start_backfill(
tn: trio.Nursery,
get_hist,
mod: ModuleType,
mkt: MktPair,
@ -411,6 +338,7 @@ async def start_backfill(
# settings above when the tsdb is detected as being empty.
backfill_until_dt = backfill_from_dt.subtract(**period_duration)
# STAGE NOTE: "backward history gap filling":
# - we push to the shm buffer until we have history back
# until the latest entry loaded from the tsdb's table B)
@ -754,8 +682,6 @@ async def back_load_from_tsdb(
async def push_latest_frame(
# box-type only that should get packed with the datetime
# objects received for the latest history frame
dt_eps: list[DateTime, DateTime],
shm: ShmArray,
get_hist: Callable[
@ -765,11 +691,8 @@ async def push_latest_frame(
timeframe: float,
config: dict,
task_status: TaskStatus[
Exception | list[datetime, datetime]
] = trio.TASK_STATUS_IGNORED,
) -> list[datetime, datetime] | None:
task_status: TaskStatus[None] = trio.TASK_STATUS_IGNORED,
):
# get latest query's worth of history all the way
# back to what is recorded in the tsdb
try:
@ -786,19 +709,17 @@ async def push_latest_frame(
mr_start_dt,
mr_end_dt,
])
task_status.started(dt_eps)
# XXX: timeframe not supported for backend (since
# above exception type), terminate immediately since
# there's no backfilling possible.
except DataUnavailable:
task_status.started(None)
task_status.started()
if timeframe > 1:
await tractor.pause()
# prolly tf not supported
return None
return
# NOTE: on the first history, most recent history
# frame we PREPEND from the current shm ._last index
@ -810,16 +731,11 @@ async def push_latest_frame(
prepend=True, # append on first frame
)
return dt_eps
async def load_tsdb_hist(
storage: StorageClient,
mkt: MktPair,
timeframe: float,
task_status: TaskStatus[None] = trio.TASK_STATUS_IGNORED,
) -> tuple[
np.ndarray,
DateTime,
@ -905,63 +821,48 @@ async def tsdb_backfill(
config,
)
# tell parent task to continue
# TODO: really we'd want this the other way with the
# tsdb load happening asap and the since the latest
# frame query will normally be the main source of
# latency?
task_status.started()
tsdb_entry: tuple = await load_tsdb_hist(
storage,
mkt,
timeframe,
)
# tell parent task to continue
# TODO: really we'd want this the other way with the
# tsdb load happening asap and the since the latest
# frame query will normally be the main source of
# latency?
task_status.started()
# NOTE: iabs to start backfilling from, reverse chronological,
# ONLY AFTER the first history frame has been pushed to
# mem!
backfill_gap_from_shm_index: int = shm._first.value + 1
# Prepend any tsdb history into the rt-shm-buffer which
# should NOW be getting filled with the most recent history
# pulled from the data-backend.
if dt_eps:
# well then, unpack the latest (gap) backfilled frame dts
(
mr_start_dt,
mr_end_dt,
) = dt_eps
(
mr_start_dt,
mr_end_dt,
) = dt_eps
# NOTE: when there's no offline data, there's 2 cases:
# - data backend doesn't support timeframe/sample
# period (in which case `dt_eps` should be `None` and
# we shouldn't be here!), or
# - no prior history has been stored (yet) and we need
# todo full backfill of the history now.
if tsdb_entry is None:
# indicate to backfill task to fill the whole
# shm buffer as much as it can!
last_tsdb_dt = None
async with trio.open_nursery() as tn:
# there's existing tsdb history from (offline) storage
# so only backfill the gap between the
# most-recent-frame (mrf) and that latest sample.
else:
# Prepend any tsdb history to the shm buffer which should
# now be full of the most recent history pulled from the
# backend's last frame.
if tsdb_entry:
(
tsdb_history,
first_tsdb_dt,
last_tsdb_dt,
) = tsdb_entry
# if there is a gap to backfill from the first
# history frame until the last datum loaded from the tsdb
# continue that now in the background
async with trio.open_nursery() as tn:
# if there is a gap to backfill from the first
# history frame until the last datum loaded from the tsdb
# continue that now in the background
bf_done = await tn.start(
partial(
start_backfill,
tn=tn,
get_hist=get_hist,
mod=mod,
mkt=mkt,
@ -970,107 +871,102 @@ async def tsdb_backfill(
backfill_from_shm_index=backfill_gap_from_shm_index,
backfill_from_dt=mr_start_dt,
sampler_stream=sampler_stream,
backfill_until_dt=last_tsdb_dt,
storage=storage,
write_tsdb=True,
)
)
nulls_detected: trio.Event | None = None
if last_tsdb_dt is not None:
# calc the index from which the tsdb data should be
# prepended, presuming there is a gap between the
# latest frame (loaded/read above) and the latest
# sample loaded from the tsdb.
backfill_diff: Duration = mr_start_dt - last_tsdb_dt
offset_s: float = backfill_diff.in_seconds()
# XXX EDGE CASEs: the most recent frame overlaps with
# prior tsdb history!!
# - so the latest frame's start time is earlier then
# the tsdb's latest sample.
# - alternatively this may also more generally occur
# when the venue was closed (say over the weeknd)
# causing a timeseries gap, AND the query frames size
# (eg. for ib's 1s we rx 2k datums ~= 33.33m) IS
# GREATER THAN the current venue-market's operating
# session (time) we will receive datums from BEFORE THE
# CLOSURE GAP and thus the `offset_s` value will be
# NEGATIVE! In this case we need to ensure we don't try
# to push datums that have already been recorded in the
# tsdb. In this case we instead only retreive and push
# the series portion missing from the db's data set.
# if offset_s < 0:
# non_overlap_diff: Duration = mr_end_dt - last_tsdb_dt
# non_overlap_offset_s: float = backfill_diff.in_seconds()
# calc the index from which the tsdb data should be
# prepended, presuming there is a gap between the
# latest frame (loaded/read above) and the latest
# sample loaded from the tsdb.
backfill_diff: Duration = mr_start_dt - last_tsdb_dt
offset_s: float = backfill_diff.in_seconds()
offset_samples: int = round(offset_s / timeframe)
# XXX EDGE CASEs: the most recent frame overlaps with
# prior tsdb history!!
# - so the latest frame's start time is earlier then
# the tsdb's latest sample.
# - alternatively this may also more generally occur
# when the venue was closed (say over the weeknd)
# causing a timeseries gap, AND the query frames size
# (eg. for ib's 1s we rx 2k datums ~= 33.33m) IS
# GREATER THAN the current venue-market's operating
# session (time) we will receive datums from BEFORE THE
# CLOSURE GAP and thus the `offset_s` value will be
# NEGATIVE! In this case we need to ensure we don't try
# to push datums that have already been recorded in the
# tsdb. In this case we instead only retreive and push
# the series portion missing from the db's data set.
# if offset_s < 0:
# non_overlap_diff: Duration = mr_end_dt - last_tsdb_dt
# non_overlap_offset_s: float = backfill_diff.in_seconds()
# TODO: see if there's faster multi-field reads:
# https://numpy.org/doc/stable/user/basics.rec.html#accessing-multiple-fields
# re-index with a `time` and index field
if offset_s > 0:
# NOTE XXX: ONLY when there is an actual gap
# between the earliest sample in the latest history
# frame do we want to NOT stick the latest tsdb
# history adjacent to that latest frame!
prepend_start = shm._first.value - offset_samples + 1
to_push = tsdb_history[-prepend_start:]
else:
# when there is overlap we want to remove the
# overlapping samples from the tsdb portion (taking
# instead the latest frame's values since THEY
# SHOULD BE THE SAME) and prepend DIRECTLY adjacent
# to the latest frame!
# TODO: assert the overlap segment array contains
# the same values!?!
prepend_start = shm._first.value
to_push = tsdb_history[-(shm._first.value):offset_samples - 1]
offset_samples: int = round(offset_s / timeframe)
# tsdb history is so far in the past we can't fit it in
# shm buffer space so simply don't load it!
if prepend_start > 0:
shm.push(
to_push,
# TODO: see if there's faster multi-field reads:
# https://numpy.org/doc/stable/user/basics.rec.html#accessing-multiple-fields
# re-index with a `time` and index field
if offset_s > 0:
# NOTE XXX: ONLY when there is an actual gap
# between the earliest sample in the latest history
# frame do we want to NOT stick the latest tsdb
# history adjacent to that latest frame!
prepend_start = shm._first.value - offset_samples + 1
to_push = tsdb_history[-prepend_start:]
else:
# when there is overlap we want to remove the
# overlapping samples from the tsdb portion (taking
# instead the latest frame's values since THEY
# SHOULD BE THE SAME) and prepend DIRECTLY adjacent
# to the latest frame!
# TODO: assert the overlap segment array contains
# the same values!?!
prepend_start = shm._first.value
to_push = tsdb_history[-(shm._first.value):offset_samples - 1]
# insert the history pre a "days worth" of samples
# to leave some real-time buffer space at the end.
prepend=True,
# update_first=False,
start=prepend_start,
field_map=storemod.ohlc_key_map,
)
# tsdb history is so far in the past we can't fit it in
# shm buffer space so simply don't load it!
if prepend_start > 0:
shm.push(
to_push,
log.info(f'Loaded {to_push.shape} datums from storage')
# insert the history pre a "days worth" of samples
# to leave some real-time buffer space at the end.
prepend=True,
# update_first=False,
start=prepend_start,
field_map=storemod.ohlc_key_map,
)
# NOTE: ASYNC-conduct tsdb timestamp gap detection and backfill any
# seemingly missing (null-time) segments..
# TODO: ideally these can never exist!
# -[ ] somehow it seems sometimes we're writing zero-ed
# segments to tsdbs during teardown?
# -[ ] can we ensure that the backcfiller tasks do this
# work PREVENTAVELY instead?
# -[ ] fill in non-zero epoch time values ALWAYS!
# await maybe_fill_null_segments(
nulls_detected: trio.Event = await tn.start(partial(
maybe_fill_null_segments,
log.info(f'Loaded {to_push.shape} datums from storage')
shm=shm,
timeframe=timeframe,
get_hist=get_hist,
sampler_stream=sampler_stream,
mkt=mkt,
))
# NOTE: ASYNC-conduct tsdb timestamp gap detection and backfill any
# seemingly missing (null-time) segments..
# TODO: ideally these can never exist!
# -[ ] somehow it seems sometimes we're writing zero-ed
# segments to tsdbs during teardown?
# -[ ] can we ensure that the backcfiller tasks do this
# work PREVENTAVELY instead?
# -[ ] fill in non-zero epoch time values ALWAYS!
# await maybe_fill_null_segments(
nulls_detected: trio.Event = await tn.start(partial(
maybe_fill_null_segments,
# 2nd nursery END
shm=shm,
timeframe=timeframe,
get_hist=get_hist,
sampler_stream=sampler_stream,
mkt=mkt,
))
# TODO: who would want to?
if nulls_detected:
await nulls_detected.wait()
await bf_done.wait()
# TODO: who would want to?
await nulls_detected.wait()
await bf_done.wait()
# TODO: maybe start history anal and load missing "history
# gaps" via backend..
@ -1114,6 +1010,7 @@ async def tsdb_backfill(
async def manage_history(
mod: ModuleType,
bus: _FeedsBus,
mkt: MktPair,
some_data_ready: trio.Event,
feed_is_live: trio.Event,
@ -1270,6 +1167,7 @@ async def manage_history(
tsdb_backfill,
mod=mod,
storemod=storemod,
# bus,
storage=client,
mkt=mkt,
shm=tf2mem[timeframe],
@ -1354,11 +1252,13 @@ def iter_dfs_from_shms(
assert not opened
ohlcv = shm.array
from ._anal import np2pl
df: pl.DataFrame = np2pl(ohlcv)
from ..data import tsp
df: pl.DataFrame = tsp.np2pl(ohlcv)
yield (
shmfile,
shm,
df,
)

View File

@ -319,8 +319,9 @@ def get_null_segs(
if num_gaps < 1:
if absi_zeros.size > 1:
absi_zsegs = [[
# TODO: maybe mk these max()/min() limits func
# consts instead of called more then once?
# see `get_hist()` in backend, should ALWAYS be
# able to handle a `start_dt=None`!
# None,
max(
absi_zeros[0] - 1,
0,
@ -358,10 +359,7 @@ def get_null_segs(
# corresponding to the first zero-segment's row, we add it
# manually here.
absi_zsegs.append([
max(
absi_zeros[0] - 1,
0,
),
absi_zeros[0] - 1,
None,
])
@ -402,18 +400,14 @@ def get_null_segs(
else:
if 0 < num_gaps < 2:
absi_zsegs[-1][1] = min(
absi_zeros[-1] + 1,
frame['index'][-1],
)
absi_zsegs[-1][1] = absi_zeros[-1] + 1
iabs_first: int = frame['index'][0]
for start, end in absi_zsegs:
ts_start: float = times[start - iabs_first]
ts_end: float = times[end - iabs_first]
if (
(ts_start == 0 and not start == 0)
ts_start == 0
or
ts_end == 0
):
@ -457,13 +451,11 @@ def iter_null_segs(
],
None,
]:
if not (
null_segs := get_null_segs(
if null_segs is None:
null_segs: tuple = get_null_segs(
frame,
period=timeframe,
)
):
return
absi_pairs_zsegs: list[list[float, float]]
izeros: Seq
@ -510,7 +502,6 @@ def iter_null_segs(
)
# TODO: move to ._pl_anal
def with_dts(
df: pl.DataFrame,
time_col: str = 'time',
@ -526,7 +517,7 @@ def with_dts(
pl.from_epoch(pl.col(time_col)).alias('dt'),
]).with_columns([
pl.from_epoch(
column=pl.col(f'{time_col}_prev'),
pl.col(f'{time_col}_prev')
).alias('dt_prev'),
pl.col('dt').diff().alias('dt_diff'),
]) #.with_columns(
@ -534,6 +525,19 @@ def with_dts(
# )
def dedup_dt(
df: pl.DataFrame,
) -> pl.DataFrame:
'''
Drop duplicate date-time rows (normally from an OHLC frame).
'''
return df.unique(
subset=['dt'],
maintain_order=True,
)
t_unit: Literal = Literal[
'days',
'hours',
@ -647,11 +651,7 @@ def dedupe(src_df: pl.DataFrame) -> tuple[
)
# remove duplicated datetime samples/sections
deduped: pl.DataFrame = df.unique(
subset=['dt'],
maintain_order=True,
)
deduped: pl.DataFrame = dedup_dt(df)
deduped_gaps = detect_time_gaps(deduped)
diff: int = (

View File

@ -100,10 +100,6 @@ async def open_piker_runtime(
or [_default_reg_addr]
)
if ems := tractor_kwargs.get('enable_modules'):
# import pdbp; pdbp.set_trace()
enable_modules.extend(ems)
async with (
tractor.open_root_actor(

View File

@ -1,5 +1,5 @@
# piker: trading gear for hackers
# Copyright (C) 2018-present Tyler Goodlet (in stewardship of pikers)
# Copyright (C) 2018-present Tyler Goodlet (in stewardship of piker0)
# This program is free software: you can redistribute it and/or modify
# it under the terms of the GNU Affero General Public License as published by
@ -19,7 +19,6 @@ Storage middle-ware CLIs.
"""
from __future__ import annotations
# from datetime import datetime
from pathlib import Path
import time
@ -37,8 +36,11 @@ from piker.cli import cli
from piker.config import get_conf_dir
from piker.data import (
ShmArray,
tsp,
)
from piker.data.history import (
iter_dfs_from_shms,
)
from piker import tsp
from . import (
log,
)
@ -187,8 +189,8 @@ def anal(
frame=history,
period=period,
)
# TODO: do tsp queries to backcend to fill i missing
# history and then prolly write it to tsdb!
if null_segs:
await tractor.pause()
shm_df: pl.DataFrame = await client.as_df(
fqme,
@ -204,27 +206,18 @@ def anal(
diff,
) = tsp.dedupe(shm_df)
write_edits: bool = True
if (
write_edits
and (
diff
or null_segs
)
):
await tractor.pause()
if diff:
await client.write_ohlcv(
fqme,
ohlcv=deduped,
timeframe=period,
)
else:
# TODO: something better with tab completion..
# is there something more minimal but nearly as
# functional as ipython?
await tractor.pause()
# TODO: something better with tab completion..
# is there something more minimal but nearly as
# functional as ipython?
await tractor.pause()
trio.run(main)
@ -250,11 +243,11 @@ def ldshm(
),
):
df: pl.DataFrame | None = None
for shmfile, shm, shm_df in tsp.iter_dfs_from_shms(fqme):
for shmfile, shm, shm_df in iter_dfs_from_shms(fqme):
# compute ohlc properties for naming
times: np.ndarray = shm.array['time']
period_s: float = float(times[-1] - times[-2])
period_s: float = times[-1] - times[-2]
if period_s < 1.:
raise ValueError(
f'Something is wrong with time period for {shm}:\n{times}'
@ -277,86 +270,11 @@ def ldshm(
# TODO: maybe only optionally enter this depending
# on some CLI flags and/or gap detection?
if (
not gaps.is_empty()
or null_segs
):
from piker.ui._remote_ctl import (
open_annot_ctl,
AnnotCtl,
)
annot_ctl: AnnotCtl
async with open_annot_ctl() as annot_ctl:
for i in range(gaps.height):
if not gaps.is_empty():
await tractor.pause()
row: pl.DataFrame = gaps[i]
# TODO: can we eventually remove this
# once we figure out why the epoch cols
# don't match?
iend: int = row['index'][0]
# dt: datetime = row['dt'][0]
# dt_prev: datetime = row['dt_prev'][0]
# the gap's right-most bar's OPEN value
# at that time (sample) step.
# dt_end_t: float = dt.timestamp()
# TODO: FIX HOW/WHY these aren't matching
# and are instead off by 4hours (EST
# vs. UTC?!?!)
# end_t: float = row['time']
# assert (
# dt.timestamp()
# ==
# end_t
# )
# the gap's left-most bar's CLOSE value
# at that time (sample) step.
prev_r: pl.DataFrame = df.filter(
pl.col('index') == gaps[0]['index'] - 1
)
istart: int = prev_r['index'][0]
# dt_start_t: float = dt_prev.timestamp()
# start_t: float = prev_r['time']
# assert (
# dt_start_t
# ==
# start_t
# )
# TODO: implement px-col width measure
# and ensure at least as many px-cols
# shown per rect as configured by user.
gap_w: float = abs((iend - istart))
# await tractor.pause()
if gap_w < 6:
margin: float = 6
iend += margin
istart -= margin
ro: tuple[float, float] = (
# dt_end_t,
iend,
row['open'][0],
)
lc: tuple[float, float] = (
# dt_start_t,
istart,
prev_r['close'][0],
)
aid: int = await annot_ctl.add_rect(
fqme=fqme,
timeframe=period_s,
start_pos=lc,
end_pos=ro,
)
assert aid
await tractor.pause()
if null_segs:
await tractor.pause()
# write to parquet file?
if write_parquet:

View File

@ -56,6 +56,8 @@ from datetime import datetime
from pathlib import Path
import time
# from bidict import bidict
# import tractor
import numpy as np
import polars as pl
from pendulum import (
@ -63,10 +65,10 @@ from pendulum import (
)
from piker import config
from piker import tsp
from piker.data import (
def_iohlcv_fields,
ShmArray,
tsp,
)
from piker.log import get_logger
from . import TimeseriesNotFound

View File

@ -272,15 +272,10 @@ class ContentsLabels:
x_in: int,
) -> None:
for (
chart,
name,
label,
update,
)in self._labels:
for chart, name, label, update in self._labels:
viz = chart.get_viz(name)
array: np.ndarray = viz.shm._array
array = viz.shm.array
index = array[viz.index_field]
start = index[0]
stop = index[-1]
@ -291,7 +286,7 @@ class ContentsLabels:
):
# out of range
print('WTF out of range?')
# continue
continue
# call provided update func with data point
try:
@ -299,7 +294,6 @@ class ContentsLabels:
ix = np.searchsorted(index, x_in)
if ix > len(array):
breakpoint()
update(ix, array)
except IndexError:

View File

@ -56,8 +56,8 @@ _line_styles: dict[str, int] = {
class FlowGraphic(pg.GraphicsObject):
'''
Base class with minimal interface for `QPainterPath`
implemented, real-time updated "data flow" graphics.
Base class with minimal interface for `QPainterPath` implemented,
real-time updated "data flow" graphics.
See subtypes below.
@ -167,12 +167,11 @@ class FlowGraphic(pg.GraphicsObject):
return None
# XXX: due to a variety of weird jitter bugs and "smearing"
# artifacts when click-drag panning and viewing history time
# series, we offer this ctx-mngr interface to allow temporarily
# disabling Qt's graphics caching mode; this is now currently
# used from ``ChartView.start/signal_ic()`` methods which also
# disable the rt-display loop when the user is moving around
# a view.
# artifacts when click-drag panning and viewing history time series,
# we offer this ctx-mngr interface to allow temporarily disabling
# Qt's graphics caching mode; this is now currently used from
# ``ChartView.start/signal_ic()`` methods which also disable the
# rt-display loop when the user is moving around a view.
@cm
def reset_cache(self) -> None:
try:

View File

@ -49,7 +49,7 @@ from ..data._formatters import (
OHLCBarsAsCurveFmtr, # OHLC converted to line
StepCurveFmtr, # "step" curve (like for vlm)
)
from ..tsp import (
from ..data.tsp import (
slice_from_time,
)
from ._ohlc import (
@ -563,8 +563,7 @@ class Viz(Struct):
def view_range(self) -> tuple[int, int]:
'''
Return the start and stop x-indexes for the managed
``ViewBox``.
Return the start and stop x-indexes for the managed ``ViewBox``.
'''
vr = self.plot.viewRect()

View File

@ -470,64 +470,54 @@ async def graphics_update_loop(
if ds.hist_vars['i_last'] < ds.hist_vars['i_last_append']:
await tractor.pause()
try:
from . import _remote_ctl
_remote_ctl._dss = dss
# main real-time quotes update loop
stream: tractor.MsgStream
async with feed.open_multi_stream() as stream:
assert stream
async for quotes in stream:
quote_period = time.time() - last_quote_s
quote_rate = round(
1/quote_period, 1) if quote_period > 0 else float('inf')
if (
quote_period <= 1/_quote_throttle_rate
# main real-time quotes update loop
stream: tractor.MsgStream
async with feed.open_multi_stream() as stream:
assert stream
async for quotes in stream:
quote_period = time.time() - last_quote_s
quote_rate = round(
1/quote_period, 1) if quote_period > 0 else float('inf')
# in the absolute worst case we shouldn't see more then
# twice the expected throttle rate right!?
# and quote_rate >= _quote_throttle_rate * 2
and quote_rate >= display_rate
):
pass
# log.warning(f'High quote rate {mkt.fqme}: {quote_rate}')
last_quote_s = time.time()
for fqme, quote in quotes.items():
ds = dss[fqme]
ds.quotes = quote
rt_pi, hist_pi = pis[fqme]
# chart isn't active/shown so skip render cycle and
# pause feed(s)
if (
quote_period <= 1/_quote_throttle_rate
# in the absolute worst case we shouldn't see more then
# twice the expected throttle rate right!?
# and quote_rate >= _quote_throttle_rate * 2
and quote_rate >= display_rate
fast_chart.linked.isHidden()
or not rt_pi.isVisible()
):
pass
# log.warning(f'High quote rate {mkt.fqme}: {quote_rate}')
print(f'{fqme} skipping update for HIDDEN CHART')
fast_chart.pause_all_feeds()
continue
last_quote_s = time.time()
ic = fast_chart.view._in_interact
if ic:
fast_chart.pause_all_feeds()
print(f'{fqme} PAUSING DURING INTERACTION')
await ic.wait()
fast_chart.resume_all_feeds()
for fqme, quote in quotes.items():
ds = dss[fqme]
ds.quotes = quote
rt_pi, hist_pi = pis[fqme]
# chart isn't active/shown so skip render cycle and
# pause feed(s)
if (
fast_chart.linked.isHidden()
or not rt_pi.isVisible()
):
print(f'{fqme} skipping update for HIDDEN CHART')
fast_chart.pause_all_feeds()
continue
ic = fast_chart.view._in_interact
if ic:
fast_chart.pause_all_feeds()
print(f'{fqme} PAUSING DURING INTERACTION')
await ic.wait()
fast_chart.resume_all_feeds()
# sync call to update all graphics/UX components.
graphics_update_cycle(
ds,
quote,
)
finally:
# XXX: cancel any remote annotation control ctxs
_remote_ctl._dss = None
for ctx in _remote_ctl._ctxs:
await ctx.cancel()
# sync call to update all graphics/UX components.
graphics_update_cycle(
ds,
quote,
)
def graphics_update_cycle(
@ -1245,7 +1235,7 @@ async def display_symbol_data(
fast from a cached watch-list.
'''
# sbar = godwidget.window.status_bar
sbar = godwidget.window.status_bar
# historical data fetch
# brokermod = brokers.get_brokermod(provider)
@ -1255,11 +1245,11 @@ async def display_symbol_data(
# group_key=loading_sym_key,
# )
# for fqme in fqmes:
# loading_sym_key = sbar.open_status(
# f'loading {fqme} ->',
# group_key=True
# )
for fqme in fqmes:
loading_sym_key = sbar.open_status(
f'loading {fqme} ->',
group_key=True
)
# (TODO: make this not so shit XD)
# close group status once a symbol feed fully loads to view.
@ -1432,7 +1422,7 @@ async def display_symbol_data(
start_fsp_displays,
rt_linked,
flume,
# loading_sym_key,
loading_sym_key,
loglevel,
)

View File

@ -21,8 +21,7 @@ Higher level annotation editors.
from __future__ import annotations
from collections import defaultdict
from typing import (
Sequence,
TYPE_CHECKING,
TYPE_CHECKING
)
import pyqtgraph as pg
@ -32,37 +31,24 @@ from pyqtgraph import (
QtCore,
QtWidgets,
)
from PyQt5.QtCore import (
QPointF,
QRectF,
)
from PyQt5.QtGui import (
QColor,
QTransform,
)
from PyQt5.QtWidgets import (
QGraphicsProxyWidget,
QGraphicsScene,
QLabel,
)
from pyqtgraph import functions as fn
from PyQt5.QtCore import QPointF
import numpy as np
from piker.types import Struct
from ._style import (
hcolor,
_font,
)
from ._style import hcolor, _font
from ._lines import LevelLine
from ..log import get_logger
if TYPE_CHECKING:
from ._chart import (
GodWidget,
ChartPlotWidget,
)
from ._interaction import ChartView
from ._chart import GodWidget
log = get_logger(__name__)
@ -79,7 +65,7 @@ class ArrowEditor(Struct):
uid: str,
x: float,
y: float,
color: str = 'default',
color='default',
pointing: str | None = None,
) -> pg.ArrowItem:
@ -265,56 +251,27 @@ class LineEditor(Struct):
return lines
def as_point(
pair: Sequence[float, float] | QPointF,
) -> list[QPointF, QPointF]:
'''
Case any input tuple of floats to a a list of `QPoint` objects
for use in Qt geometry routines.
'''
if isinstance(pair, QPointF):
return pair
return QPointF(pair[0], pair[1])
# TODO: maybe implement better, something something RectItemProxy??
# -[ ] dig into details of how proxy's work?
# https://doc.qt.io/qt-5/qgraphicsscene.html#addWidget
# -[ ] consider using `.addRect()` maybe?
class SelectRect(QtWidgets.QGraphicsRectItem):
'''
A data-view "selection rectangle": the most fundamental
geometry for annotating data views.
- https://doc.qt.io/qt-5/qgraphicsrectitem.html
- https://doc.qt.io/qt-6/qgraphicsrectitem.html
'''
def __init__(
self,
viewbox: ViewBox,
color: str | None = None,
color: str = 'dad_blue',
) -> None:
super().__init__(0, 0, 1, 1)
# self.rbScaleBox = QGraphicsRectItem(0, 0, 1, 1)
self.vb: ViewBox = viewbox
self.vb = viewbox
self._chart: 'ChartPlotWidget' = None # noqa
self._chart: ChartPlotWidget | None = None # noqa
# TODO: maybe allow this to be dynamic via a method?
#l override selection box color
color: str = color or 'dad_blue'
# override selection box color
color = QColor(hcolor(color))
self.setPen(fn.mkPen(color, width=1))
color.setAlpha(66)
self.setBrush(fn.mkBrush(color))
self.setZValue(1e9)
self.hide()
self._label = None
label = self._label = QLabel()
label.setTextFormat(0) # markdown
@ -324,15 +281,13 @@ class SelectRect(QtWidgets.QGraphicsRectItem):
QtCore.Qt.AlignLeft
# | QtCore.Qt.AlignVCenter
)
label.hide() # always right after init
# proxy is created after containing scene is initialized
self._label_proxy: QGraphicsProxyWidget | None = None
self._abs_top_right: Point | None = None
self._label_proxy = None
self._abs_top_right = None
# TODO: "swing %" might be handy here (data's max/min
# # % change)?
self._contents: list[str] = [
# TODO: "swing %" might be handy here (data's max/min # % change)
self._contents = [
'change: {pchng:.2f} %',
'range: {rng:.2f}',
'bars: {nbars}',
@ -342,30 +297,12 @@ class SelectRect(QtWidgets.QGraphicsRectItem):
'sigma: {std:.2f}',
]
self.add_to_view(viewbox)
def add_to_view(
self,
view: ChartView,
) -> None:
'''
Self-defined view hookup impl which will
also re-assign the internal ref.
'''
view.addItem(
self,
ignoreBounds=True,
)
if self.vb is not view:
self.vb = view
@property
def chart(self) -> ChartPlotWidget: # noqa
def chart(self) -> 'ChartPlotWidget': # noqa
return self._chart
@chart.setter
def chart(self, chart: ChartPlotWidget) -> None: # noqa
def chart(self, chart: 'ChartPlotWidget') -> None: # noqa
self._chart = chart
chart.sigRangeChanged.connect(self.update_on_resize)
palette = self._label.palette()
@ -378,155 +315,57 @@ class SelectRect(QtWidgets.QGraphicsRectItem):
)
def update_on_resize(self, vr, r):
'''
Re-position measure label on view range change.
"""Re-position measure label on view range change.
'''
"""
if self._abs_top_right:
self._label_proxy.setPos(
self.vb.mapFromView(self._abs_top_right)
)
def set_scen_pos(
def mouse_drag_released(
self,
scen_p1: QPointF,
scen_p2: QPointF,
update_label: bool = True,
p1: QPointF,
p2: QPointF
) -> None:
'''
Set position from scene coords of selection rect (normally
from mouse position) and accompanying label, move label to
match.
"""Called on final button release for mouse drag with start and
end positions.
'''
# NOTE XXX: apparently just setting it doesn't work!?
# i have no idea why but it's pretty weird we have to do
# this transform thing which was basically pulled verbatim
# from the `pg.ViewBox.updateScaleBox()` method.
view_rect: QRectF = self.vb.childGroup.mapRectFromScene(
QRectF(
scen_p1,
scen_p2,
)
)
self.setPos(view_rect.topLeft())
# XXX: does not work..!?!?
# https://doc.qt.io/qt-5/qgraphicsrectitem.html#setRect
# self.setRect(view_rect)
"""
self.set_pos(p1, p2)
tr = QTransform.fromScale(
view_rect.width(),
view_rect.height(),
)
self.setTransform(tr)
# XXX: never got this working, was always offset
# / transformed completely wrong (and off to the far right
# from the cursor?)
# self.set_view_pos(
# view_rect=view_rect,
# # self.vwqpToView(p1),
# # self.vb.mapToView(p2),
# # start_pos=self.vb.mapToScene(p1),
# # end_pos=self.vb.mapToScene(p2),
# )
self.show()
if update_label:
self.init_label(view_rect)
def set_view_pos(
def set_pos(
self,
start_pos: QPointF | Sequence[float, float] | None = None,
end_pos: QPointF | Sequence[float, float] | None = None,
view_rect: QRectF | None = None,
update_label: bool = True,
p1: QPointF,
p2: QPointF
) -> None:
'''
Set position from `ViewBox` coords (i.e. from the actual
data domain) of rect (and any accompanying label which is
moved to match).
"""Set position of selection rect and accompanying label, move
label to match.
'''
if self._chart is None:
raise RuntimeError(
'You MUST assign a `SelectRect.chart: ChartPlotWidget`!'
)
if view_rect is None:
# ensure point casting
start_pos: QPointF = as_point(start_pos)
end_pos: QPointF = as_point(end_pos)
# map to view coords and update area
view_rect = QtCore.QRectF(
start_pos,
end_pos,
)
self.setPos(view_rect.topLeft())
# NOTE: SERIOUSLY NO IDEA WHY THIS WORKS...
# but it does and all the other commented stuff above
# dint, dawg..
# self.resetTransform()
# self.setRect(view_rect)
tr = QTransform.fromScale(
view_rect.width(),
view_rect.height(),
)
self.setTransform(tr)
if update_label:
self.init_label(view_rect)
print(
'SelectRect modify:\n'
f'QRectF: {view_rect}\n'
f'start_pos: {start_pos}\n'
f'end_pos: {end_pos}\n'
)
self.show()
def init_label(
self,
view_rect: QRectF,
) -> QLabel:
# should be init-ed in `.__init__()`
label: QLabel = self._label
cv: ChartView = self.vb
# https://doc.qt.io/qt-5/qgraphicsproxywidget.html
"""
if self._label_proxy is None:
scen: QGraphicsScene = cv.scene()
# NOTE: specifically this is passing a widget
# pointer to the scene's `.addWidget()` as per,
# https://doc.qt.io/qt-5/qgraphicsproxywidget.html#embedding-a-widget-with-qgraphicsproxywidget
self._label_proxy: QGraphicsProxyWidget = scen.addWidget(label)
# https://doc.qt.io/qt-5/qgraphicsproxywidget.html
self._label_proxy = self.vb.scene().addWidget(self._label)
# get label startup coords
tl: QPointF = view_rect.topLeft()
br: QPointF = view_rect.bottomRight()
start_pos = self.vb.mapToView(p1)
end_pos = self.vb.mapToView(p2)
x1, y1 = tl.x(), tl.y()
x2, y2 = br.x(), br.y()
# map to view coords and update area
r = QtCore.QRectF(start_pos, end_pos)
# TODO: to remove, previous label corner point unpacking
# x1, y1 = start_pos.x(), start_pos.y()
# x2, y2 = end_pos.x(), end_pos.y()
# y1, y2 = start_pos.y(), end_pos.y()
# x1, x2 = start_pos.x(), end_pos.x()
# old way; don't need right?
# lr = QtCore.QRectF(p1, p2)
# r = self.vb.childGroup.mapRectFromParent(lr)
# TODO: heh, could probably use a max-min streamin algo
# here too?
self.setPos(r.topLeft())
self.resetTransform()
self.setRect(r)
self.show()
y1, y2 = start_pos.y(), end_pos.y()
x1, x2 = start_pos.x(), end_pos.x()
# TODO: heh, could probably use a max-min streamin algo here too
_, xmn = min(y1, y2), min(x1, x2)
ymx, xmx = max(y1, y2), max(x1, x2)
@ -536,35 +375,26 @@ class SelectRect(QtWidgets.QGraphicsRectItem):
ixmn, ixmx = round(xmn), round(xmx)
nbars = ixmx - ixmn + 1
chart: ChartPlotWidget = self._chart
data: np.ndarray = chart.get_viz(
chart.name
).shm.array[ixmn:ixmx]
chart = self._chart
data = chart.get_viz(chart.name).shm.array[ixmn:ixmx]
if len(data):
std: float = data['close'].std()
dmx: float = data['high'].max()
dmn: float = data['low'].min()
std = data['close'].std()
dmx = data['high'].max()
dmn = data['low'].min()
else:
dmn = dmx = std = np.nan
# update label info
label.setText('\n'.join(self._contents).format(
pchng=pchng,
rng=rng,
nbars=nbars,
std=std,
dmx=dmx,
dmn=dmn,
self._label.setText('\n'.join(self._contents).format(
pchng=pchng, rng=rng, nbars=nbars,
std=std, dmx=dmx, dmn=dmn,
))
# print(f'x2, y2: {(x2, y2)}')
# print(f'xmn, ymn: {(xmn, ymx)}')
label_anchor = Point(
xmx + 2,
ymx,
)
label_anchor = Point(xmx + 2, ymx)
# XXX: in the drag bottom-right -> top-left case we don't
# want the label to overlay the box.
@ -573,16 +403,13 @@ class SelectRect(QtWidgets.QGraphicsRectItem):
# # label_anchor = Point(x2, y2 + self._label.height())
# label_anchor = Point(xmn, ymn)
self._abs_top_right: Point = label_anchor
self._label_proxy.setPos(
cv.mapFromView(label_anchor)
)
label.show()
self._abs_top_right = label_anchor
self._label_proxy.setPos(self.vb.mapFromView(label_anchor))
# self._label.show()
def clear(self):
'''
Clear the selection box from view.
"""Clear the selection box from view.
'''
"""
self._label.hide()
self.hide()

View File

@ -181,10 +181,7 @@ async def open_fsp_sidepane(
async def open_fsp_actor_cluster(
names: list[str] = ['fsp_0', 'fsp_1'],
) -> AsyncGenerator[
int,
dict[str, tractor.Portal]
]:
) -> AsyncGenerator[int, dict[str, tractor.Portal]]:
from tractor._clustering import open_actor_cluster
@ -560,7 +557,7 @@ class FspAdmin:
conf: dict, # yeah probably dumb..
loglevel: str = 'error',
) -> trio.Event:
) -> (trio.Event, ChartPlotWidget):
flume, started = await self.start_engine_task(
target,
@ -927,7 +924,7 @@ async def start_fsp_displays(
linked: LinkedSplits,
flume: Flume,
# group_status_key: str,
group_status_key: str,
loglevel: str,
) -> None:
@ -974,23 +971,21 @@ async def start_fsp_displays(
flume,
) as admin,
):
statuses: list[trio.Event] = []
statuses = []
for target, conf in fsp_conf.items():
started: trio.Event = await admin.open_fsp_chart(
started = await admin.open_fsp_chart(
target,
conf,
)
# done = linked.window().status_bar.open_status(
# f'loading fsp, {target}..',
# group_key=group_status_key,
# )
# statuses.append((started, done))
statuses.append(started)
done = linked.window().status_bar.open_status(
f'loading fsp, {target}..',
group_key=group_status_key,
)
statuses.append((started, done))
# for fsp_loaded, status_cb in statuses:
for fsp_loaded in statuses:
for fsp_loaded, status_cb in statuses:
await fsp_loaded.wait()
profiler(f'attached to fsp portal: {target}')
# status_cb()
status_cb()
# blocks on nursery until all fsp actors complete

View File

@ -30,11 +30,7 @@ from typing import (
)
import pyqtgraph as pg
# NOTE XXX: pg is super annoying and re-implements it's own mouse
# event subsystem.. we should really look into re-working/writing
# this down the road.. Bo
from pyqtgraph.GraphicsScene import mouseEvents as mevs
# from pyqtgraph.GraphicsScene.mouseEvents import MouseDragEvent
# from pyqtgraph.GraphicsScene import mouseEvents
from PyQt5.QtWidgets import QGraphicsSceneMouseEvent as gs_mouse
from PyQt5.QtGui import (
QWheelEvent,
@ -470,7 +466,6 @@ class ChartView(ViewBox):
mode_name: str = 'view'
def_delta: float = 616 * 6
def_scale_factor: float = 1.016 ** (def_delta * -1 / 20)
# annots: dict[int, GraphicsObject] = {}
def __init__(
self,
@ -491,7 +486,6 @@ class ChartView(ViewBox):
# defaultPadding=0.,
**kwargs
)
# for "known y-range style"
self._static_yrange = static_yrange
@ -506,11 +500,7 @@ class ChartView(ViewBox):
# add our selection box annotator
self.select_box = SelectRect(self)
# self.select_box.add_to_view(self)
# self.addItem(
# self.select_box,
# ignoreBounds=True,
# )
self.addItem(self.select_box, ignoreBounds=True)
self.mode = None
self.order_mode: bool = False
@ -721,18 +711,17 @@ class ChartView(ViewBox):
def mouseDragEvent(
self,
ev: mevs.MouseDragEvent,
ev,
axis: int | None = None,
) -> None:
pos: Point = ev.pos()
lastPos: Point = ev.lastPos()
dif: Point = (pos - lastPos) * -1
# dif: Point = pos - lastPos
# dif: Point = dif * -1
pos = ev.pos()
lastPos = ev.lastPos()
dif = pos - lastPos
dif = dif * -1
# NOTE: if axis is specified, event will only affect that axis.
btn = ev.button()
button = ev.button()
# Ignore axes if mouse is disabled
mouseEnabled = np.array(
@ -744,7 +733,7 @@ class ChartView(ViewBox):
mask[1-axis] = 0.0
# Scale or translate based on mouse button
if btn & (
if button & (
QtCore.Qt.LeftButton | QtCore.Qt.MidButton
):
# zoom y-axis ONLY when click-n-drag on it
@ -767,55 +756,34 @@ class ChartView(ViewBox):
# XXX: WHY
ev.accept()
down_pos: Point = ev.buttonDownPos(
btn=btn,
)
scen_pos: Point = ev.scenePos()
scen_down_pos: Point = ev.buttonDownScenePos(
btn=btn,
)
down_pos = ev.buttonDownPos()
# This is the final position in the drag
if ev.isFinish():
# import pdbp; pdbp.set_trace()
self.select_box.mouse_drag_released(down_pos, pos)
# NOTE: think of this as a `.mouse_drag_release()`
# (bc HINT that's what i called the shit ass
# method that wrapped this call [yes, as a single
# fucking call] originally.. you bish, guille)
# Bo.. oraleeee
self.select_box.set_scen_pos(
# down_pos,
# pos,
scen_down_pos,
scen_pos,
)
# this is the zoom transform cmd
ax = QtCore.QRectF(down_pos, pos)
ax = self.childGroup.mapRectFromParent(ax)
# self.showAxRect(ax)
# this is the zoom transform cmd
self.showAxRect(ax)
# axis history tracking
self.axHistoryPointer += 1
self.axHistory = self.axHistory[
:self.axHistoryPointer] + [ax]
else:
self.select_box.set_scen_pos(
# down_pos,
# pos,
scen_down_pos,
scen_pos,
)
print('drag finish?')
self.select_box.set_pos(down_pos, pos)
# update shape of scale box
# self.updateScaleBox(ev.buttonDownPos(), ev.pos())
# breakpoint()
# self.updateScaleBox(
# down_pos,
# ev.pos(),
# )
self.updateScaleBox(
down_pos,
ev.pos(),
)
# PANNING MODE
else:
@ -854,7 +822,7 @@ class ChartView(ViewBox):
# ev.accept()
# WEIRD "RIGHT-CLICK CENTER ZOOM" MODE
elif btn & QtCore.Qt.RightButton:
elif button & QtCore.Qt.RightButton:
if self.state['aspectLocked'] is not False:
mask[0] = 0

View File

@ -24,6 +24,8 @@ view transforms.
"""
import pyqtgraph as pg
from ._axes import Axis
def invertQTransform(tr):
"""Return a QTransform that is the inverse of *tr*.
@ -51,9 +53,6 @@ def _do_overrides() -> None:
pg.functions.invertQTransform = invertQTransform
pg.PlotItem = PlotItem
from ._axes import Axis
pg.Axis = Axis
# enable "QPainterPathPrivate for faster arrayToQPath" from
# https://github.com/pyqtgraph/pyqtgraph/pull/2324
pg.setConfigOption('enableExperimental', True)
@ -235,7 +234,7 @@ class PlotItem(pg.PlotItem):
# ``ViewBox`` geometry bug.. where a gap for the
# 'bottom' axis is somehow left in?
# axis = pg.AxisItem(orientation=name, parent=self)
axis = pg.Axis(
axis = Axis(
self,
orientation=name,
parent=self,

View File

@ -1,273 +0,0 @@
# piker: trading gear for hackers
# Copyright (C) Tyler Goodlet (in stewardship for pikers)
# This program is free software: you can redistribute it and/or modify
# it under the terms of the GNU Affero General Public License as published by
# the Free Software Foundation, either version 3 of the License, or
# (at your option) any later version.
# This program is distributed in the hope that it will be useful,
# but WITHOUT ANY WARRANTY; without even the implied warranty of
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
# GNU Affero General Public License for more details.
# You should have received a copy of the GNU Affero General Public License
# along with this program. If not, see <https://www.gnu.org/licenses/>.
'''
Remote control tasks for sending annotations (and maybe more cmds)
to a chart from some other actor.
'''
from __future__ import annotations
from contextlib import asynccontextmanager as acm
from pprint import pformat
from typing import (
Any,
AsyncContextManager,
)
import tractor
from tractor import trionics
from tractor import (
Portal,
Context,
MsgStream,
)
from PyQt5.QtWidgets import (
QGraphicsItem,
)
from piker.log import get_logger
from piker.types import Struct
from piker.service import find_service
from ._display import DisplayState
from ._interaction import ChartView
from ._editors import SelectRect
from ._chart import ChartPlotWidget
log = get_logger(__name__)
# NOTE: this is set by the `._display.graphics_update_loop()` once
# all chart widgets / Viz per flume have been initialized allowing
# for remote annotation (control) of any chart-actor's mkt feed by
# fqme lookup Bo
_dss: dict[str, DisplayState] | None = None
# stash each and every client connection so that they can all
# be cancelled on shutdown/error.
# TODO: make `tractor.Context` hashable via is `.cid: str`?
# _ctxs: set[Context] = set()
_ctxs: list[Context] = []
# global map of all uniquely created annotation-graphics
# so that they can be mutated (eventually) by a client.
_annots: dict[int, QGraphicsItem] = {}
@tractor.context
async def remote_annotate(
ctx: Context,
) -> None:
global _dss, _ctxs
assert _dss
_ctxs.append(ctx)
# send back full fqme symbology to caller
await ctx.started(list(_dss))
async with ctx.open_stream() as annot_req_stream:
async for msg in annot_req_stream:
match msg:
case {
'annot': 'SelectRect',
'fqme': fqme,
'timeframe': timeframe,
'meth': str(meth),
'kwargs': dict(kwargs),
}:
ds: DisplayState = _dss[fqme]
chart: ChartPlotWidget = {
60: ds.hist_chart,
1: ds.chart,
}[timeframe]
cv: ChartView = chart.cv
# sanity
if timeframe == 60:
assert (
chart.linked.godwidget.hist_linked.chart.view
is
cv
)
# annot type lookup from cmd
rect = SelectRect(
viewbox=cv,
# TODO: make this more dynamic?
# -[ ] pull from conf.toml?
# -[ ] add `.set_color()` method to type?
# -[ ] make a green/red based on direction
# instead of default static color?
color=kwargs.pop('color', None),
)
# XXX NOTE: this is REQUIRED to set the rect
# resize callback!
rect.chart: ChartPlotWidget = chart
# delegate generically to the requested method
getattr(rect, meth)(**kwargs)
rect.show()
await annot_req_stream.send(id(rect))
case _:
log.error(
'Unknown remote annotation cmd:\n'
f'{pformat(msg)}'
)
class AnnotCtl(Struct):
'''
A control for remote "data annotations".
You know those "squares they always show in machine vision
UIs.." this API allows you to remotely control stuff like that
in some other graphics actor.
'''
ctx2fqmes: dict[str, str]
fqme2stream: dict[str, MsgStream]
async def add_rect(
self,
fqme: str,
timeframe: float,
start_pos: tuple[float, float],
end_pos: tuple[float, float],
# TODO: a `Literal['view', 'scene']` for this?
domain: str = 'view', # or 'scene'
color: str = 'dad_blue',
) -> int:
'''
Add a `SelectRect` annotation to the target view, return
the instances `id(obj)` from the remote UI actor.
'''
ipc: MsgStream = self.fqme2stream[fqme]
await ipc.send({
'fqme': fqme,
'annot': 'SelectRect',
'timeframe': timeframe,
# 'meth': str(meth),
'meth': 'set_view_pos' if domain == 'view' else 'set_scene_pos',
'kwargs': {
'start_pos': tuple(start_pos),
'end_pos': tuple(end_pos),
'color': color,
'update_label': False,
},
})
return (await ipc.receive())
async def modify(
self,
aid: int, # annotation id
meth: str, # far end graphics object method to invoke
params: dict[str, Any], # far end `meth(**kwargs)`
) -> bool:
'''
Modify an existing (remote) annotation's graphics
paramters, thus changing it's appearance / state in real
time.
'''
raise NotImplementedError
async def remove(
self,
uid: int,
) -> bool:
'''
Remove an existing annotation by instance id.
'''
raise NotImplementedError
@acm
async def open_annot_ctl(
uid: tuple[str, str] | None = None,
) -> AnnotCtl:
# TODO: load connetion to a specific chart actor
# -[ ] pull from either service scan or config
# -[ ] return some kinda client/proxy thinger?
# -[ ] maybe we should finally just provide this as
# a `tractor.hilevel.CallableProxy` or wtv?
# -[ ] use this from the storage.cli stuff to mark up gaps!
maybe_portals: list[Portal] | None
fqmes: list[str]
async with find_service(
service_name='chart',
first_only=False,
) as maybe_portals:
ctx_mngrs: list[AsyncContextManager] = []
# TODO: print the current discoverable actor UID set
# here as well?
if not maybe_portals:
raise RuntimeError('No chart UI actors found in service domain?')
for portal in maybe_portals:
ctx_mngrs.append(
portal.open_context(remote_annotate)
)
ctx2fqmes: dict[str, set[str]] = {}
fqme2stream: dict[str, MsgStream] = {}
client = AnnotCtl(
ctx2fqmes=ctx2fqmes,
fqme2stream=fqme2stream,
)
stream_ctxs: list[AsyncContextManager] = []
async with trionics.gather_contexts(ctx_mngrs) as ctxs:
for (ctx, fqmes) in ctxs:
stream_ctxs.append(ctx.open_stream())
# fill lookup table of mkt addrs to IPC ctxs
for fqme in fqmes:
if other := fqme2stream.get(fqme):
raise ValueError(
f'More then one chart displays {fqme}!?\n'
'Other UI actor info:\n'
f'channel: {other._ctx.chan}]\n'
f'actor uid: {other._ctx.chan.uid}]\n'
f'ctx id: {other._ctx.cid}]\n'
)
ctx2fqmes.setdefault(
ctx.cid,
set(),
).add(fqme)
async with trionics.gather_contexts(stream_ctxs) as streams:
for stream in streams:
fqmes: set[str] = ctx2fqmes[stream._ctx.cid]
for fqme in fqmes:
fqme2stream[fqme] = stream
yield client
# TODO: on graceful teardown should we try to
# remove all annots that were created/modded?

View File

@ -228,10 +228,5 @@ def chart(
'loglevel': tractorloglevel,
'name': 'chart',
'registry_addrs': list(set(regaddrs)),
'enable_modules': [
# remote data-view annotations Bo
'piker.ui._remote_ctl',
],
},
)

View File

@ -31,7 +31,7 @@ import pendulum
import pyqtgraph as pg
from piker.types import Struct
from ..tsp import slice_from_time
from ..data.tsp import slice_from_time
from ..log import get_logger
from ..toolz import Profiler