Merge pull request 'hist_backfill_fixes: working-around (some) conc issues in the tsdb backfiller' (#62)

Reviewed-on: #62
Reviewed-by: momo <dilarayalniz1@gmail.com>
main
Gud Boi 2026-02-23 17:22:24 +00:00
commit 73369fb1ef
19 changed files with 3272 additions and 1730 deletions

View File

@ -275,9 +275,15 @@ async def open_history_client(
f'{times}' f'{times}'
) )
# XXX, debug any case where the latest 1m bar we get is
# already another "sample's-step-old"..
if end_dt is None: if end_dt is None:
inow: int = round(time.time()) inow: int = round(time.time())
if (inow - times[-1]) > 60: if (
_time_step := (inow - times[-1])
>
timeframe * 2
):
await tractor.pause() await tractor.pause()
start_dt = from_timestamp(times[0]) start_dt = from_timestamp(times[0])

View File

@ -250,7 +250,9 @@ async def vnc_click_hack(
'connection': 'r' 'connection': 'r'
}[reset_type] }[reset_type]
with tractor.devx.open_crash_handler(): with tractor.devx.open_crash_handler(
ignore={TimeoutError,},
):
client = await AsyncVNCClient.connect( client = await AsyncVNCClient.connect(
VNCConfig( VNCConfig(
host=host, host=host,
@ -331,7 +333,14 @@ def i3ipc_xdotool_manual_click_hack() -> None:
''' '''
focussed, matches = i3ipc_fin_wins_titled() focussed, matches = i3ipc_fin_wins_titled()
try:
orig_win_id = focussed.window orig_win_id = focussed.window
except AttributeError:
# XXX if .window cucks we prolly aren't intending to
# use this and/or just woke up from suspend..
log.exception('xdotool invalid usage ya ??\n')
return
try: try:
for name, con in matches: for name, con in matches:
print(f'Resetting data feed for {name}') print(f'Resetting data feed for {name}')

View File

@ -1187,7 +1187,7 @@ async def load_aio_clients(
# the API TCP in `ib_insync` connection can be flaky af so instead # the API TCP in `ib_insync` connection can be flaky af so instead
# retry a few times to get the client going.. # retry a few times to get the client going..
connect_retries: int = 3, connect_retries: int = 3,
connect_timeout: float = 10, connect_timeout: float = 30, # in case a remote-host
disconnect_on_exit: bool = True, disconnect_on_exit: bool = True,
) -> dict[str, Client]: ) -> dict[str, Client]:

View File

@ -178,8 +178,8 @@ async def open_history_client(
async def get_hist( async def get_hist(
timeframe: float, timeframe: float,
end_dt: datetime | None = None, end_dt: datetime|None = None,
start_dt: datetime | None = None, start_dt: datetime|None = None,
) -> tuple[np.ndarray, str]: ) -> tuple[np.ndarray, str]:
@ -262,7 +262,38 @@ async def open_history_client(
vlm = bars_array['volume'] vlm = bars_array['volume']
vlm[vlm < 0] = 0 vlm[vlm < 0] = 0
return bars_array, first_dt, last_dt # XXX, if a start-limit was passed ensure we only
# return history that far back!
if (
start_dt
and
first_dt < start_dt
):
trimmed_bars = bars_array[
bars_array['time'] >= start_dt.timestamp()
]
if (
trimmed_first_dt := from_timestamp(trimmed_bars['time'][0])
!=
start_dt
):
# TODO! rm this once we're more confident it never hits!
breakpoint()
raise RuntimeError(
f'OHLC-bars array start is gt `start_dt` limit !!\n'
f'start_dt: {start_dt}\n'
f'first_dt: {first_dt}\n'
f'trimmed_first_dt: {trimmed_first_dt}\n'
)
# XXX, overwrite with start_dt-limited frame
bars_array = trimmed_bars
return (
bars_array,
first_dt,
last_dt,
)
# TODO: it seems like we can do async queries for ohlc # TODO: it seems like we can do async queries for ohlc
# but getting the order right still isn't working and I'm not # but getting the order right still isn't working and I'm not
@ -397,7 +428,7 @@ async def get_bars(
# blank to start which tells ib to look up the latest datum # blank to start which tells ib to look up the latest datum
end_dt: str = '', end_dt: str = '',
start_dt: str | None = '', start_dt: str|None = '',
# TODO: make this more dynamic based on measured frame rx latency? # TODO: make this more dynamic based on measured frame rx latency?
# how long before we trigger a feed reset (seconds) # how long before we trigger a feed reset (seconds)
@ -451,6 +482,8 @@ async def get_bars(
dt_duration, dt_duration,
) = await proxy.bars( ) = await proxy.bars(
fqme=fqme, fqme=fqme,
# XXX TODO! lol we're not using this..
# start_dt=start_dt,
end_dt=end_dt, end_dt=end_dt,
sample_period_s=timeframe, sample_period_s=timeframe,
@ -1082,6 +1115,7 @@ async def stream_quotes(
con: Contract = details.contract con: Contract = details.contract
first_ticker: Ticker|None = None first_ticker: Ticker|None = None
first_quote: dict[str, Any] = {}
timeout: float = 1.6 timeout: float = 1.6
with trio.move_on_after(timeout) as quote_cs: with trio.move_on_after(timeout) as quote_cs:
@ -1134,15 +1168,14 @@ async def stream_quotes(
first_quote, first_quote,
)) ))
# it's not really live but this will unblock
# the brokerd feed task to tell the ui to update?
feed_is_live.set()
# block and let data history backfill code run. # block and let data history backfill code run.
# XXX obvi given the venue is closed, we never expect feed # XXX obvi given the venue is closed, we never expect feed
# to come up; a taskc should be the only way to # to come up; a taskc should be the only way to
# terminate this task. # terminate this task.
await trio.sleep_forever() await trio.sleep_forever()
#
# ^^XXX^^TODO! INSTEAD impl a `trio.sleep()` for the
# duration until the venue opens!!
# ?TODO, we could instead spawn a task that waits on a feed # ?TODO, we could instead spawn a task that waits on a feed
# to start and let it wait indefinitely..instead of this # to start and let it wait indefinitely..instead of this
@ -1166,6 +1199,9 @@ async def stream_quotes(
'Rxed init quote:\n' 'Rxed init quote:\n'
f'{pformat(first_quote)}' f'{pformat(first_quote)}'
) )
# signal `.data.feed` layer that mkt quotes are LIVE
feed_is_live.set()
cs: trio.CancelScope|None = None cs: trio.CancelScope|None = None
startup: bool = True startup: bool = True
iter_quotes: trio.abc.Channel iter_quotes: trio.abc.Channel
@ -1213,55 +1249,12 @@ async def stream_quotes(
tn.start_soon(reset_on_feed) tn.start_soon(reset_on_feed)
async with aclosing(iter_quotes): async with aclosing(iter_quotes):
# if syminfo.get('no_vlm', False):
if not init_msg.shm_write_opts['has_vlm']:
# generally speaking these feeds don't
# include vlm data.
atype: str = mkt.dst.atype
log.info(
f'No-vlm {mkt.fqme}@{atype}, skipping quote poll'
)
else:
# wait for real volume on feed (trading might be
# closed)
while True:
ticker = await iter_quotes.receive()
# for a real volume contract we rait for
# the first "real" trade to take place
if (
# not calc_price
# and not ticker.rtTime
False
# not ticker.rtTime
):
# spin consuming tickers until we
# get a real market datum
log.debug(f"New unsent ticker: {ticker}")
continue
else:
log.debug("Received first volume tick")
# ugh, clear ticks since we've
# consumed them (ahem, ib_insync is
# truly stateful trash)
# ticker.ticks = []
# XXX: this works because we don't use
# ``aclosing()`` above?
break
quote = normalize(ticker)
log.debug(f"First ticker received {quote}")
# tell data-layer spawner-caller that live # tell data-layer spawner-caller that live
# quotes are now active desptie not having # quotes are now active desptie not having
# necessarily received a first vlm/clearing # necessarily received a first vlm/clearing
# tick. # tick.
ticker = await iter_quotes.receive() ticker = await iter_quotes.receive()
feed_is_live.set() quote = normalize(ticker)
fqme: str = quote['fqme'] fqme: str = quote['fqme']
await send_chan.send({fqme: quote}) await send_chan.send({fqme: quote})

View File

@ -80,20 +80,20 @@ class Sampler:
This non-instantiated type is meant to be a singleton within This non-instantiated type is meant to be a singleton within
a `samplerd` actor-service spawned once by the user wishing to a `samplerd` actor-service spawned once by the user wishing to
time-step-sample (real-time) quote feeds, see time-step-sample (real-time) quote feeds, see
``.service.maybe_open_samplerd()`` and the below `.service.maybe_open_samplerd()` and the below
``register_with_sampler()``. `register_with_sampler()`.
''' '''
service_nursery: None | trio.Nursery = None service_nursery: None|trio.Nursery = None
# TODO: we could stick these in a composed type to avoid # TODO: we could stick these in a composed type to avoid angering
# angering the "i hate module scoped variables crowd" (yawn). # the "i hate module scoped variables crowd" (yawn).
ohlcv_shms: dict[float, list[ShmArray]] = {} ohlcv_shms: dict[float, list[ShmArray]] = {}
# holds one-task-per-sample-period tasks which are spawned as-needed by # holds one-task-per-sample-period tasks which are spawned as-needed by
# data feed requests with a given detected time step usually from # data feed requests with a given detected time step usually from
# history loading. # history loading.
incr_task_cs: trio.CancelScope | None = None incr_task_cs: trio.CancelScope|None = None
bcast_errors: tuple[Exception] = ( bcast_errors: tuple[Exception] = (
trio.BrokenResourceError, trio.BrokenResourceError,
@ -249,8 +249,8 @@ class Sampler:
async def broadcast( async def broadcast(
self, self,
period_s: float, period_s: float,
time_stamp: float | None = None, time_stamp: float|None = None,
info: dict | None = None, info: dict|None = None,
) -> None: ) -> None:
''' '''
@ -315,7 +315,7 @@ class Sampler:
@classmethod @classmethod
async def broadcast_all( async def broadcast_all(
self, self,
info: dict | None = None, info: dict|None = None,
) -> None: ) -> None:
# NOTE: take a copy of subs since removals can happen # NOTE: take a copy of subs since removals can happen
@ -332,12 +332,12 @@ class Sampler:
async def register_with_sampler( async def register_with_sampler(
ctx: Context, ctx: Context,
period_s: float, period_s: float,
shms_by_period: dict[float, dict] | None = None, shms_by_period: dict[float, dict]|None = None,
open_index_stream: bool = True, # open a 2way stream for sample step msgs? open_index_stream: bool = True, # open a 2way stream for sample step msgs?
sub_for_broadcasts: bool = True, # sampler side to send step updates? sub_for_broadcasts: bool = True, # sampler side to send step updates?
) -> None: ) -> set[int]:
get_console_log(tractor.current_actor().loglevel) get_console_log(tractor.current_actor().loglevel)
incr_was_started: bool = False incr_was_started: bool = False
@ -364,7 +364,12 @@ async def register_with_sampler(
# insert the base 1s period (for OHLC style sampling) into # insert the base 1s period (for OHLC style sampling) into
# the increment buffer set to update and shift every second. # the increment buffer set to update and shift every second.
if shms_by_period is not None: if (
shms_by_period is not None
# and
# feed_is_live.is_set()
# ^TODO? pass it in instead?
):
from ._sharedmem import ( from ._sharedmem import (
attach_shm_array, attach_shm_array,
_Token, _Token,
@ -378,12 +383,17 @@ async def register_with_sampler(
readonly=False, readonly=False,
) )
shms_by_period[period] = shm shms_by_period[period] = shm
Sampler.ohlcv_shms.setdefault(period, []).append(shm) Sampler.ohlcv_shms.setdefault(
period,
[],
).append(shm)
assert Sampler.ohlcv_shms assert Sampler.ohlcv_shms
# unblock caller # unblock caller
await ctx.started(set(Sampler.ohlcv_shms.keys())) await ctx.started(
set(Sampler.ohlcv_shms.keys())
)
if open_index_stream: if open_index_stream:
try: try:
@ -429,7 +439,7 @@ async def register_with_sampler(
async def spawn_samplerd( async def spawn_samplerd(
loglevel: str | None = None, loglevel: str|None = None,
**extra_tractor_kwargs **extra_tractor_kwargs
) -> bool: ) -> bool:
@ -475,7 +485,7 @@ async def spawn_samplerd(
@acm @acm
async def maybe_open_samplerd( async def maybe_open_samplerd(
loglevel: str | None = None, loglevel: str|None = None,
**pikerd_kwargs, **pikerd_kwargs,
) -> tractor.Portal: # noqa ) -> tractor.Portal: # noqa
@ -500,11 +510,11 @@ async def maybe_open_samplerd(
@acm @acm
async def open_sample_stream( async def open_sample_stream(
period_s: float, period_s: float,
shms_by_period: dict[float, dict] | None = None, shms_by_period: dict[float, dict]|None = None,
open_index_stream: bool = True, open_index_stream: bool = True,
sub_for_broadcasts: bool = True, sub_for_broadcasts: bool = True,
cache_key: str | None = None, cache_key: str|None = None,
allow_new_sampler: bool = True, allow_new_sampler: bool = True,
ensure_is_active: bool = False, ensure_is_active: bool = False,
@ -535,6 +545,8 @@ async def open_sample_stream(
# yield bistream # yield bistream
# else: # else:
ctx: tractor.Context
shm_periods: set[int] # in `int`-seconds
async with ( async with (
# XXX: this should be singleton on a host, # XXX: this should be singleton on a host,
# a lone broker-daemon per provider should be # a lone broker-daemon per provider should be
@ -549,10 +561,10 @@ async def open_sample_stream(
'open_index_stream': open_index_stream, 'open_index_stream': open_index_stream,
'sub_for_broadcasts': sub_for_broadcasts, 'sub_for_broadcasts': sub_for_broadcasts,
}, },
) as (ctx, first) ) as (ctx, shm_periods)
): ):
if ensure_is_active: if ensure_is_active:
assert len(first) > 1 assert len(shm_periods) > 1
async with ( async with (
ctx.open_stream( ctx.open_stream(

View File

@ -520,7 +520,10 @@ def open_shm_array(
# "unlink" created shm on process teardown by # "unlink" created shm on process teardown by
# pushing teardown calls onto actor context stack # pushing teardown calls onto actor context stack
stack = tractor.current_actor().lifetime_stack stack = tractor.current_actor(
err_on_no_runtime=False,
).lifetime_stack
if stack:
stack.callback(shmarr.close) stack.callback(shmarr.close)
stack.callback(shmarr.destroy) stack.callback(shmarr.destroy)
@ -607,7 +610,10 @@ def attach_shm_array(
_known_tokens[key] = token _known_tokens[key] = token
# "close" attached shm on actor teardown # "close" attached shm on actor teardown
tractor.current_actor().lifetime_stack.callback(sha.close) if (actor := tractor.current_actor(
err_on_no_runtime=False,
)):
actor.lifetime_stack.callback(sha.close)
return sha return sha

View File

@ -43,7 +43,6 @@ from typing import (
import numpy as np import numpy as np
from .. import config from .. import config
from ..service import ( from ..service import (
check_for_service, check_for_service,
@ -152,7 +151,10 @@ class StorageConnectionError(ConnectionError):
''' '''
def get_storagemod(name: str) -> ModuleType: def get_storagemod(
name: str,
) -> ModuleType:
mod: ModuleType = import_module( mod: ModuleType = import_module(
'.' + name, '.' + name,
'piker.storage', 'piker.storage',
@ -165,9 +167,12 @@ def get_storagemod(name: str) -> ModuleType:
@acm @acm
async def open_storage_client( async def open_storage_client(
backend: str | None = None, backend: str|None = None,
) -> tuple[ModuleType, StorageClient]: ) -> tuple[
ModuleType,
StorageClient,
]:
''' '''
Load the ``StorageClient`` for named backend. Load the ``StorageClient`` for named backend.
@ -267,7 +272,10 @@ async def open_tsdb_client(
from ..data.feed import maybe_open_feed from ..data.feed import maybe_open_feed
async with ( async with (
open_storage_client() as (_, storage), open_storage_client() as (
_,
storage,
),
maybe_open_feed( maybe_open_feed(
[fqme], [fqme],
@ -275,7 +283,7 @@ async def open_tsdb_client(
) as feed, ) as feed,
): ):
profiler(f'opened feed for {fqme}') profiler(f'opened feed for {fqme!r}')
# to_append = feed.hist_shm.array # to_append = feed.hist_shm.array
# to_prepend = None # to_prepend = None

View File

@ -19,16 +19,10 @@ Storage middle-ware CLIs.
""" """
from __future__ import annotations from __future__ import annotations
# from datetime import datetime
# from contextlib import (
# AsyncExitStack,
# )
from pathlib import Path from pathlib import Path
from math import copysign
import time import time
from types import ModuleType from types import ModuleType
from typing import ( from typing import (
Any,
TYPE_CHECKING, TYPE_CHECKING,
) )
@ -47,7 +41,6 @@ from piker.data import (
ShmArray, ShmArray,
) )
from piker import tsp from piker import tsp
from piker.data._formatters import BGM
from . import log from . import log
from . import ( from . import (
__tsdbs__, __tsdbs__,
@ -242,122 +235,12 @@ def anal(
trio.run(main) trio.run(main)
async def markup_gaps(
fqme: str,
timeframe: float,
actl: AnnotCtl,
wdts: pl.DataFrame,
gaps: pl.DataFrame,
) -> dict[int, dict]:
'''
Remote annotate time-gaps in a dt-fielded ts (normally OHLC)
with rectangles.
'''
aids: dict[int] = {}
for i in range(gaps.height):
row: pl.DataFrame = gaps[i]
# the gap's RIGHT-most bar's OPEN value
# at that time (sample) step.
iend: int = row['index'][0]
# dt: datetime = row['dt'][0]
# dt_prev: datetime = row['dt_prev'][0]
# dt_end_t: float = dt.timestamp()
# TODO: can we eventually remove this
# once we figure out why the epoch cols
# don't match?
# TODO: FIX HOW/WHY these aren't matching
# and are instead off by 4hours (EST
# vs. UTC?!?!)
# end_t: float = row['time']
# assert (
# dt.timestamp()
# ==
# end_t
# )
# the gap's LEFT-most bar's CLOSE value
# at that time (sample) step.
prev_r: pl.DataFrame = wdts.filter(
pl.col('index') == iend - 1
)
# XXX: probably a gap in the (newly sorted or de-duplicated)
# dt-df, so we might need to re-index first..
if prev_r.is_empty():
await tractor.pause()
istart: int = prev_r['index'][0]
# dt_start_t: float = dt_prev.timestamp()
# start_t: float = prev_r['time']
# assert (
# dt_start_t
# ==
# start_t
# )
# TODO: implement px-col width measure
# and ensure at least as many px-cols
# shown per rect as configured by user.
# gap_w: float = abs((iend - istart))
# if gap_w < 6:
# margin: float = 6
# iend += margin
# istart -= margin
rect_gap: float = BGM*3/8
opn: float = row['open'][0]
ro: tuple[float, float] = (
# dt_end_t,
iend + rect_gap + 1,
opn,
)
cls: float = prev_r['close'][0]
lc: tuple[float, float] = (
# dt_start_t,
istart - rect_gap, # + 1 ,
cls,
)
color: str = 'dad_blue'
diff: float = cls - opn
sgn: float = copysign(1, diff)
color: str = {
-1: 'buy_green',
1: 'sell_red',
}[sgn]
rect_kwargs: dict[str, Any] = dict(
fqme=fqme,
timeframe=timeframe,
start_pos=lc,
end_pos=ro,
color=color,
)
aid: int = await actl.add_rect(**rect_kwargs)
assert aid
aids[aid] = rect_kwargs
# tell chart to redraw all its
# graphics view layers Bo
await actl.redraw(
fqme=fqme,
timeframe=timeframe,
)
return aids
@store.command() @store.command()
def ldshm( def ldshm(
fqme: str, fqme: str,
write_parquet: bool = True, write_parquet: bool = True,
reload_parquet_to_shm: bool = True, reload_parquet_to_shm: bool = True,
pdb: bool = False, # --pdb passed?
) -> None: ) -> None:
''' '''
@ -377,7 +260,7 @@ def ldshm(
open_piker_runtime( open_piker_runtime(
'polars_boi', 'polars_boi',
enable_modules=['piker.data._sharedmem'], enable_modules=['piker.data._sharedmem'],
debug_mode=True, debug_mode=pdb,
), ),
open_storage_client() as ( open_storage_client() as (
mod, mod,
@ -397,6 +280,9 @@ def ldshm(
times: np.ndarray = shm.array['time'] times: np.ndarray = shm.array['time']
d1: float = float(times[-1] - times[-2]) d1: float = float(times[-1] - times[-2])
d2: float = 0
# XXX, take a median sample rate if sufficient data
if times.size > 2:
d2: float = float(times[-2] - times[-3]) d2: float = float(times[-2] - times[-3])
med: float = np.median(np.diff(times)) med: float = np.median(np.diff(times))
if ( if (
@ -407,7 +293,6 @@ def ldshm(
raise ValueError( raise ValueError(
f'Something is wrong with time period for {shm}:\n{times}' f'Something is wrong with time period for {shm}:\n{times}'
) )
period_s: float = float(max(d1, d2, med)) period_s: float = float(max(d1, d2, med))
null_segs: tuple = tsp.get_null_segs( null_segs: tuple = tsp.get_null_segs(
@ -417,6 +302,8 @@ def ldshm(
# TODO: call null-seg fixer somehow? # TODO: call null-seg fixer somehow?
if null_segs: if null_segs:
if tractor._state.is_debug_mode():
await tractor.pause() await tractor.pause()
# async with ( # async with (
# trio.open_nursery() as tn, # trio.open_nursery() as tn,
@ -441,9 +328,35 @@ def ldshm(
wdts, wdts,
deduped, deduped,
diff, diff,
) = tsp.dedupe( valid_races,
dq_issues,
) = tsp.dedupe_ohlcv_smart(
shm_df, shm_df,
period=period_s, )
# Report duplicate analysis
if diff > 0:
log.info(
f'Removed {diff} duplicate timestamp(s)\n'
)
if valid_races is not None:
identical: int = (
valid_races
.filter(pl.col('identical_bars'))
.height
)
monotonic: int = valid_races.height - identical
log.info(
f'Valid race conditions: {valid_races.height}\n'
f' - Identical bars: {identical}\n'
f' - Volume monotonic: {monotonic}\n'
)
if dq_issues is not None:
log.warning(
f'DATA QUALITY ISSUES from provider: '
f'{dq_issues.height} timestamp(s)\n'
f'{dq_issues}\n'
) )
# detect gaps from in expected (uniform OHLC) sample period # detect gaps from in expected (uniform OHLC) sample period
@ -460,7 +373,8 @@ def ldshm(
# TODO: actually pull the exact duration # TODO: actually pull the exact duration
# expected for each venue operational period? # expected for each venue operational period?
gap_dt_unit='days', # gap_dt_unit='day',
gap_dt_unit='day',
gap_thresh=1, gap_thresh=1,
) )
@ -471,8 +385,11 @@ def ldshm(
if ( if (
not venue_gaps.is_empty() not venue_gaps.is_empty()
or ( or (
period_s < 60 not step_gaps.is_empty()
and not step_gaps.is_empty() # XXX, i presume i put this bc i was guarding
# for ib venue gaps?
# and
# period_s < 60
) )
): ):
# write repaired ts to parquet-file? # write repaired ts to parquet-file?
@ -521,7 +438,7 @@ def ldshm(
do_markup_gaps: bool = True do_markup_gaps: bool = True
if do_markup_gaps: if do_markup_gaps:
new_df: pl.DataFrame = tsp.np2pl(new) new_df: pl.DataFrame = tsp.np2pl(new)
aids: dict = await markup_gaps( aids: dict = await tsp._annotate.markup_gaps(
fqme, fqme,
period_s, period_s,
actl, actl,
@ -530,12 +447,23 @@ def ldshm(
) )
# last chance manual overwrites in REPL # last chance manual overwrites in REPL
# await tractor.pause() # await tractor.pause()
assert aids if not aids:
log.warning(
f'No gaps were found !?\n'
f'fqme: {fqme!r}\n'
f'timeframe: {period_s!r}\n'
f"WELL THAT'S GOOD NOOZ!\n"
)
tf2aids[period_s] = aids tf2aids[period_s] = aids
else: else:
# allow interaction even when no ts problems. # No significant gaps to handle, but may have had
assert not diff # duplicates removed (valid race conditions are ok)
if diff > 0 and dq_issues is not None:
log.warning(
'Found duplicates with data quality issues '
'but no significant time gaps!\n'
)
await tractor.pause() await tractor.pause()
log.info('Exiting TSP shm anal-izer!') log.info('Exiting TSP shm anal-izer!')

File diff suppressed because it is too large Load Diff

View File

@ -275,6 +275,18 @@ def get_null_segs(
# diff of abs index steps between each zeroed row # diff of abs index steps between each zeroed row
absi_zdiff: np.ndarray = np.diff(absi_zeros) absi_zdiff: np.ndarray = np.diff(absi_zeros)
if zero_t.size < 2:
try:
breakpoint()
except RuntimeError:
# XXX, if greenback not active from
# piker store ldshm cmd..
log.exception(
"Can't debug single-sample null!\n"
)
return None
# scan for all frame-indices where the # scan for all frame-indices where the
# zeroed-row-abs-index-step-diff is greater then the # zeroed-row-abs-index-step-diff is greater then the
# expected increment of 1. # expected increment of 1.
@ -434,8 +446,8 @@ def get_null_segs(
def iter_null_segs( def iter_null_segs(
timeframe: float, timeframe: float,
frame: Frame | None = None, frame: Frame|None = None,
null_segs: tuple | None = None, null_segs: tuple|None = None,
) -> Generator[ ) -> Generator[
tuple[ tuple[
@ -487,7 +499,8 @@ def iter_null_segs(
start_dt = None start_dt = None
if ( if (
absi_start is not None absi_start is not None
and start_t != 0 and
start_t != 0
): ):
fi_start: int = absi_start - absi_first fi_start: int = absi_start - absi_first
start_row: Seq = frame[fi_start] start_row: Seq = frame[fi_start]
@ -501,8 +514,8 @@ def iter_null_segs(
yield ( yield (
absi_start, absi_end, # abs indices absi_start, absi_end, # abs indices
fi_start, fi_end, # relative "frame" indices fi_start, fi_end, # relative "frame" indices
start_t, end_t, start_t, end_t, # epoch times
start_dt, end_dt, start_dt, end_dt, # dts
) )
@ -578,11 +591,22 @@ def detect_time_gaps(
# NOTE: this flag is to indicate that on this (sampling) time # NOTE: this flag is to indicate that on this (sampling) time
# scale we expect to only be filtering against larger venue # scale we expect to only be filtering against larger venue
# closures-scale time gaps. # closures-scale time gaps.
#
# Map to total_ method since `dt_diff` is a duration type,
# not datetime - modern polars requires `total_*` methods
# for duration types (e.g. `total_days()` not `day()`)
# Ensure plural form for polars API (e.g. 'day' -> 'days')
unit_plural: str = (
gap_dt_unit
if gap_dt_unit.endswith('s')
else f'{gap_dt_unit}s'
)
duration_method: str = f'total_{unit_plural}'
return step_gaps.filter( return step_gaps.filter(
# Second by an arbitrary dt-unit step size # Second by an arbitrary dt-unit step size
getattr( getattr(
pl.col('dt_diff').dt, pl.col('dt_diff').dt,
gap_dt_unit, duration_method,
)().abs() > gap_thresh )().abs() > gap_thresh
) )

View File

@ -0,0 +1,306 @@
# piker: trading gear for hackers
# Copyright (C) 2018-present Tyler Goodlet (in stewardship of pikers)
# This program is free software: you can redistribute it and/or modify
# it under the terms of the GNU Affero General Public License as published by
# the Free Software Foundation, either version 3 of the License, or
# (at your option) any later version.
# This program is distributed in the hope that it will be useful,
# but WITHOUT ANY WARRANTY; without even the implied warranty of
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
# GNU Affero General Public License for more details.
# You should have received a copy of the GNU Affero General Public License
# along with this program. If not, see <https://www.gnu.org/licenses/>.
"""
Time-series (remote) annotation APIs.
"""
from __future__ import annotations
from math import copysign
from typing import (
Any,
TYPE_CHECKING,
)
import polars as pl
import tractor
from piker.data._formatters import BGM
from piker.storage import log
from piker.ui._style import get_fonts
if TYPE_CHECKING:
from piker.ui._remote_ctl import AnnotCtl
def humanize_duration(
seconds: float,
) -> str:
'''
Convert duration in seconds to short human-readable form.
Uses smallest appropriate time unit:
- d: days
- h: hours
- m: minutes
- s: seconds
Examples:
- 86400 -> "1d"
- 28800 -> "8h"
- 180 -> "3m"
- 45 -> "45s"
'''
abs_secs: float = abs(seconds)
if abs_secs >= 86400:
days: float = abs_secs / 86400
if days >= 10 or days == int(days):
return f'{int(days)}d'
return f'{days:.1f}d'
elif abs_secs >= 3600:
hours: float = abs_secs / 3600
if hours >= 10 or hours == int(hours):
return f'{int(hours)}h'
return f'{hours:.1f}h'
elif abs_secs >= 60:
mins: float = abs_secs / 60
if mins >= 10 or mins == int(mins):
return f'{int(mins)}m'
return f'{mins:.1f}m'
else:
if abs_secs >= 10 or abs_secs == int(abs_secs):
return f'{int(abs_secs)}s'
return f'{abs_secs:.1f}s'
async def markup_gaps(
fqme: str,
timeframe: float,
actl: AnnotCtl,
wdts: pl.DataFrame,
gaps: pl.DataFrame,
# XXX, switch on to see txt showing a "humanized" label of each
# gap's duration.
show_txt: bool = False,
) -> dict[int, dict]:
'''
Remote annotate time-gaps in a dt-fielded ts (normally OHLC)
with rectangles.
'''
# XXX: force chart redraw FIRST to ensure PlotItem coordinate
# system is properly initialized before we position annotations!
# Without this, annotations may be misaligned on first creation
# due to Qt/pyqtgraph initialization race conditions.
await actl.redraw(
fqme=fqme,
timeframe=timeframe,
)
aids: dict[int] = {}
for i in range(gaps.height):
row: pl.DataFrame = gaps[i]
# the gap's RIGHT-most bar's OPEN value
# at that time (sample) step.
iend: int = row['index'][0]
# dt: datetime = row['dt'][0]
# dt_prev: datetime = row['dt_prev'][0]
# dt_end_t: float = dt.timestamp()
# TODO: can we eventually remove this
# once we figure out why the epoch cols
# don't match?
# TODO: FIX HOW/WHY these aren't matching
# and are instead off by 4hours (EST
# vs. UTC?!?!)
# end_t: float = row['time']
# assert (
# dt.timestamp()
# ==
# end_t
# )
# the gap's LEFT-most bar's CLOSE value
# at that time (sample) step.
prev_r: pl.DataFrame = wdts.filter(
pl.col('index') == iend - 1
)
# XXX: probably a gap in the (newly sorted or de-duplicated)
# dt-df, so we might need to re-index first..
dt: pl.Series = row['dt']
dt_prev: pl.Series = row['dt_prev']
if prev_r.is_empty():
# XXX, filter out any special ignore cases,
# - UNIX-epoch stamped datums
# - first row
if (
dt_prev.dt.epoch()[0] == 0
or
dt.dt.epoch()[0] == 0
):
log.warning('Skipping row with UNIX epoch timestamp ??')
continue
if wdts[0]['index'][0] == iend: # first row
log.warning('Skipping first-row (has no previous obvi) !!')
continue
# XXX, if the previous-row by shm-index is missing,
# meaning there is a missing sample (set), get the prior
# row by df index and attempt to use it?
i_wdts: pl.DataFrame = wdts.with_row_index(name='i')
i_row: int = i_wdts.filter(pl.col('index') == iend)['i'][0]
prev_row_by_i = wdts[i_row]
prev_r: pl.DataFrame = prev_row_by_i
# debug any missing pre-row
if tractor._state.is_debug_mode():
await tractor.pause()
istart: int = prev_r['index'][0]
# TODO: implement px-col width measure
# and ensure at least as many px-cols
# shown per rect as configured by user.
# gap_w: float = abs((iend - istart))
# if gap_w < 6:
# margin: float = 6
# iend += margin
# istart -= margin
opn: float = row['open'][0]
cls: float = prev_r['close'][0]
# get gap duration for humanized label
gap_dur_s: float = row['s_diff'][0]
gap_label: str = humanize_duration(gap_dur_s)
# XXX: get timestamps for server-side index lookup
start_time: float = prev_r['time'][0]
end_time: float = row['time'][0]
# BGM=0.16 is the normal diff from overlap between bars, SO
# just go slightly "in" from that "between them".
from_idx: int = BGM - .06 # = .10
lc: tuple[float, float] = (
istart + 1 - from_idx,
cls,
)
ro: tuple[float, float] = (
iend + from_idx,
opn,
)
diff: float = cls - opn
sgn: float = copysign(1, diff)
up_gap: bool = sgn == -1
down_gap: bool = sgn == 1
flat: bool = sgn == 0
color: str = 'dad_blue'
# TODO? mks more sense to have up/down coloring?
# color: str = {
# -1: 'lilypad_green', # up-gap
# 1: 'wine', # down-gap
# }[sgn]
rect_kwargs: dict[str, Any] = dict(
fqme=fqme,
timeframe=timeframe,
start_pos=lc,
end_pos=ro,
color=color,
start_time=start_time,
end_time=end_time,
)
# add up/down rects
aid: int|None = await actl.add_rect(**rect_kwargs)
if aid is None:
log.error(
f'Failed to add rect for,\n'
f'{rect_kwargs!r}\n'
f'\n'
f'Skipping to next gap!\n'
)
continue
assert aid
aids[aid] = rect_kwargs
direction: str = (
'down' if down_gap
else 'up'
)
# TODO! mk this a `msgspec.Struct` which we deserialize
# on the server side!
# XXX: send timestamp for server-side index lookup
# to ensure alignment with current shm state
gap_time: float = row['time'][0]
arrow_kwargs: dict[str, Any] = dict(
fqme=fqme,
timeframe=timeframe,
x=iend, # fallback if timestamp lookup fails
y=cls,
time=gap_time, # for server-side index lookup
color=color,
alpha=169,
pointing=direction,
# TODO: expose these as params to markup_gaps()?
headLen=10,
headWidth=2.222,
pxMode=True,
)
aid: int = await actl.add_arrow(
**arrow_kwargs
)
# add duration label to RHS of arrow
if up_gap:
anchor = (0, 0)
# ^XXX? i dun get dese dims.. XD
elif down_gap:
anchor = (0, 1) # XXX y, x?
else: # no-gap?
assert flat
anchor = (0, 0) # up from bottom
# use a slightly smaller font for gap label txt.
font, small_font = get_fonts()
font_size: int = small_font.px_size - 1
assert isinstance(font_size, int)
if show_txt:
text_aid: int = await actl.add_text(
fqme=fqme,
timeframe=timeframe,
text=gap_label,
x=iend + 1, # fallback if timestamp lookup fails
y=cls,
time=gap_time, # server-side index lookup
color=color,
anchor=anchor,
font_size=font_size,
)
aids[text_aid] = {'text': gap_label}
# tell chart to redraw all its
# graphics view layers Bo
await actl.redraw(
fqme=fqme,
timeframe=timeframe,
)
return aids

View File

@ -0,0 +1,206 @@
'''
Smart OHLCV deduplication with data quality validation.
Handles concurrent write conflicts by keeping the most complete bar
(highest volume) while detecting data quality anomalies.
'''
import polars as pl
from ._anal import with_dts
def dedupe_ohlcv_smart(
src_df: pl.DataFrame,
time_col: str = 'time',
volume_col: str = 'volume',
sort: bool = True,
) -> tuple[
pl.DataFrame, # with dts
pl.DataFrame, # deduped (keeping higher volume bars)
int, # count of dupes removed
pl.DataFrame|None, # valid race conditions
pl.DataFrame|None, # data quality violations
]:
'''
Smart OHLCV deduplication keeping most complete bars.
For duplicate timestamps, keeps bar with highest volume under
the assumption that higher volume indicates more complete/final
data from backfill vs partial live updates.
Returns
-------
Tuple of:
- wdts: original dataframe with datetime columns added
- deduped: deduplicated frame keeping highest-volume bars
- diff: number of duplicate rows removed
- valid_races: duplicates meeting expected race condition pattern
(volume monotonic, OHLC ranges valid)
- data_quality_issues: duplicates violating expected relationships
indicating provider data problems
'''
wdts: pl.DataFrame = with_dts(src_df)
# Find duplicate timestamps
dupes: pl.DataFrame = wdts.filter(
pl.col(time_col).is_duplicated()
)
if dupes.is_empty():
# No duplicates, return as-is
return (wdts, wdts, 0, None, None)
# Analyze duplicate groups for validation
dupe_analysis: pl.DataFrame = (
dupes
.sort([time_col, 'index'])
.group_by(time_col, maintain_order=True)
.agg([
pl.col('index').alias('indices'),
pl.col('volume').alias('volumes'),
pl.col('high').alias('highs'),
pl.col('low').alias('lows'),
pl.col('open').alias('opens'),
pl.col('close').alias('closes'),
pl.col('dt').first().alias('dt'),
pl.len().alias('count'),
])
)
# Validate OHLCV monotonicity for each duplicate group
def check_ohlcv_validity(row) -> dict[str, bool]:
'''
Check if duplicate bars follow expected race condition pattern.
For a valid live-update backfill race:
- volume should be monotonically increasing
- high should be monotonically non-decreasing
- low should be monotonically non-increasing
- open should be identical (fixed at bar start)
Returns dict of violation flags.
'''
vols: list = row['volumes']
highs: list = row['highs']
lows: list = row['lows']
opens: list = row['opens']
violations: dict[str, bool] = {
'volume_non_monotonic': False,
'high_decreased': False,
'low_increased': False,
'open_mismatch': False,
'identical_bars': False,
}
# Check if all bars are identical (pure duplicate)
if (
len(set(vols)) == 1
and len(set(highs)) == 1
and len(set(lows)) == 1
and len(set(opens)) == 1
):
violations['identical_bars'] = True
return violations
# Check volume monotonicity
for i in range(1, len(vols)):
if vols[i] < vols[i-1]:
violations['volume_non_monotonic'] = True
break
# Check high monotonicity (can only increase or stay same)
for i in range(1, len(highs)):
if highs[i] < highs[i-1]:
violations['high_decreased'] = True
break
# Check low monotonicity (can only decrease or stay same)
for i in range(1, len(lows)):
if lows[i] > lows[i-1]:
violations['low_increased'] = True
break
# Check open consistency (should be fixed)
if len(set(opens)) > 1:
violations['open_mismatch'] = True
return violations
# Apply validation
dupe_analysis = dupe_analysis.with_columns([
pl.struct(['volumes', 'highs', 'lows', 'opens'])
.map_elements(
check_ohlcv_validity,
return_dtype=pl.Struct([
pl.Field('volume_non_monotonic', pl.Boolean),
pl.Field('high_decreased', pl.Boolean),
pl.Field('low_increased', pl.Boolean),
pl.Field('open_mismatch', pl.Boolean),
pl.Field('identical_bars', pl.Boolean),
])
)
.alias('validity')
])
# Unnest validity struct
dupe_analysis = dupe_analysis.unnest('validity')
# Separate valid races from data quality issues
valid_races: pl.DataFrame|None = (
dupe_analysis
.filter(
# Valid if no violations OR just identical bars
~pl.col('volume_non_monotonic')
& ~pl.col('high_decreased')
& ~pl.col('low_increased')
& ~pl.col('open_mismatch')
)
)
if valid_races.is_empty():
valid_races = None
data_quality_issues: pl.DataFrame|None = (
dupe_analysis
.filter(
# Issues if any non-identical violation exists
(
pl.col('volume_non_monotonic')
| pl.col('high_decreased')
| pl.col('low_increased')
| pl.col('open_mismatch')
)
& ~pl.col('identical_bars')
)
)
if data_quality_issues.is_empty():
data_quality_issues = None
# Deduplicate: keep highest volume bar for each timestamp
deduped: pl.DataFrame = (
wdts
.sort([time_col, volume_col])
.unique(
subset=[time_col],
keep='last',
maintain_order=False,
)
)
# Re-sort by time or index
if sort:
deduped = deduped.sort(by=time_col)
diff: int = wdts.height - deduped.height
return (
wdts,
deduped,
diff,
valid_races,
data_quality_issues,
)

1600
piker/tsp/_history.py 100644

File diff suppressed because it is too large Load Diff

View File

@ -21,6 +21,7 @@ Higher level annotation editors.
from __future__ import annotations from __future__ import annotations
from collections import defaultdict from collections import defaultdict
from typing import ( from typing import (
Literal,
Sequence, Sequence,
TYPE_CHECKING, TYPE_CHECKING,
) )
@ -71,9 +72,18 @@ log = get_logger(__name__)
class ArrowEditor(Struct): class ArrowEditor(Struct):
'''
Annotate a chart-view with arrows most often used for indicating,
- order txns/clears,
- positions directions,
- general points-of-interest like nooz events.
'''
godw: GodWidget = None # type: ignore # noqa godw: GodWidget = None # type: ignore # noqa
_arrows: dict[str, list[pg.ArrowItem]] = {} _arrows: dict[
str,
list[pg.ArrowItem]
] = {}
def add( def add(
self, self,
@ -81,8 +91,19 @@ class ArrowEditor(Struct):
uid: str, uid: str,
x: float, x: float,
y: float, y: float,
color: str = 'default', color: str|None = None,
pointing: str | None = None, pointing: Literal[
'up',
'down',
None,
] = None,
alpha: int = 255,
zval: float = 1e9,
headLen: float|None = None,
headWidth: float|None = None,
tailLen: float|None = None,
tailWidth: float|None = None,
pxMode: bool = True,
) -> pg.ArrowItem: ) -> pg.ArrowItem:
''' '''
@ -98,29 +119,83 @@ class ArrowEditor(Struct):
# scale arrow sizing to dpi-aware font # scale arrow sizing to dpi-aware font
size = _font.font.pixelSize() * 0.8 size = _font.font.pixelSize() * 0.8
# allow caller override of head dimensions
if headLen is None:
headLen = size
if headWidth is None:
headWidth = size/2
# tail params default to None (no tail)
if tailWidth is None:
tailWidth = 3
color = color or 'default'
color = QColor(hcolor(color))
color.setAlpha(alpha)
pen = fn.mkPen(color, width=1)
brush = fn.mkBrush(color)
arrow = pg.ArrowItem( arrow = pg.ArrowItem(
angle=angle, angle=angle,
baseAngle=0, baseAngle=0,
headLen=size, headLen=headLen,
headWidth=size/2, headWidth=headWidth,
tailLen=None, tailLen=tailLen,
pxMode=True, tailWidth=tailWidth,
pxMode=pxMode,
# coloring # coloring
pen=pg.mkPen(hcolor('papas_special')), pen=pen,
brush=pg.mkBrush(hcolor(color)), brush=brush,
) )
arrow.setZValue(zval)
arrow.setPos(x, y) arrow.setPos(x, y)
self._arrows.setdefault(uid, []).append(arrow) plot.addItem(arrow) # render to view
# render to view # register for removal
plot.addItem(arrow) arrow._uid = uid
self._arrows.setdefault(
uid, []
).append(arrow)
return arrow return arrow
def remove(self, arrow) -> bool: def remove(
self,
arrow: pg.ArrowItem,
) -> None:
'''
Remove a *single arrow* from all chart views to which it was
added.
'''
uid: str = arrow._uid
arrows: list[pg.ArrowItem] = self._arrows[uid]
log.info(
f'Removing arrow from views\n'
f'uid: {uid!r}\n'
f'{arrow!r}\n'
)
for linked in self.godw.iter_linked(): for linked in self.godw.iter_linked():
linked.chart.plotItem.removeItem(arrow) if not (chart := linked.chart):
continue
chart.plotItem.removeItem(arrow)
try:
arrows.remove(arrow)
except ValueError:
log.warning(
f'Arrow was already removed?\n'
f'uid: {uid!r}\n'
f'{arrow!r}\n'
)
def remove_all(self) -> set[pg.ArrowItem]:
'''
Remove all arrows added by this editor from all
chart-views.
'''
for uid, arrows in self._arrows.items():
for arrow in arrows:
self.remove(arrow)
class LineEditor(Struct): class LineEditor(Struct):
@ -266,6 +341,9 @@ class LineEditor(Struct):
return lines return lines
# compat with ArrowEditor
remove = remove_line
def as_point( def as_point(
pair: Sequence[float, float] | QPointF, pair: Sequence[float, float] | QPointF,
@ -298,7 +376,7 @@ class SelectRect(QtWidgets.QGraphicsRectItem):
def __init__( def __init__(
self, self,
viewbox: ViewBox, viewbox: ViewBox,
color: str | None = None, color: str|None = None,
) -> None: ) -> None:
super().__init__(0, 0, 1, 1) super().__init__(0, 0, 1, 1)
@ -614,3 +692,6 @@ class SelectRect(QtWidgets.QGraphicsRectItem):
): ):
scen.removeItem(self._label_proxy) scen.removeItem(self._label_proxy)
# compat with ArrowEditor
remove = delete

View File

@ -27,10 +27,12 @@ from contextlib import (
from functools import partial from functools import partial
from pprint import pformat from pprint import pformat
from typing import ( from typing import (
# Any,
AsyncContextManager, AsyncContextManager,
Literal,
) )
from uuid import uuid4
import pyqtgraph as pg
import tractor import tractor
import trio import trio
from tractor import trionics from tractor import trionics
@ -47,12 +49,16 @@ from piker.brokers import SymbolNotFound
from piker.ui.qt import ( from piker.ui.qt import (
QGraphicsItem, QGraphicsItem,
) )
from PyQt6.QtGui import QFont
from ._display import DisplayState from ._display import DisplayState
from ._interaction import ChartView from ._interaction import ChartView
from ._editors import SelectRect from ._editors import (
SelectRect,
ArrowEditor,
)
from ._chart import ChartPlotWidget from ._chart import ChartPlotWidget
from ._dataviz import Viz from ._dataviz import Viz
from ._style import hcolor
log = get_logger(__name__) log = get_logger(__name__)
@ -83,8 +89,40 @@ _ctxs: IpcCtxTable = {}
# the "annotations server" which actually renders to a Qt canvas). # the "annotations server" which actually renders to a Qt canvas).
# type AnnotsTable = dict[int, QGraphicsItem] # type AnnotsTable = dict[int, QGraphicsItem]
AnnotsTable = dict[int, QGraphicsItem] AnnotsTable = dict[int, QGraphicsItem]
EditorsTable = dict[int, ArrowEditor]
_annots: AnnotsTable = {} _annots: AnnotsTable = {}
_editors: EditorsTable = {}
def rm_annot(
annot: ArrowEditor|SelectRect|pg.TextItem
) -> bool:
global _editors
match annot:
case pg.ArrowItem():
editor = _editors[annot._uid]
editor.remove(annot)
# ^TODO? only remove each arrow or all?
# if editor._arrows:
# editor.remove_all()
# else:
# log.warning(
# f'Annot already removed!\n'
# f'{annot!r}\n'
# )
return True
case SelectRect():
annot.delete()
return True
case pg.TextItem():
scene = annot.scene()
if scene:
scene.removeItem(annot)
return True
return False
async def serve_rc_annots( async def serve_rc_annots(
@ -95,6 +133,12 @@ async def serve_rc_annots(
annots: AnnotsTable, annots: AnnotsTable,
) -> None: ) -> None:
'''
A small viz(ualization) server for remote ctl of chart
annotations.
'''
global _editors
async for msg in annot_req_stream: async for msg in annot_req_stream:
match msg: match msg:
case { case {
@ -104,14 +148,77 @@ async def serve_rc_annots(
'meth': str(meth), 'meth': str(meth),
'kwargs': dict(kwargs), 'kwargs': dict(kwargs),
}: }:
ds: DisplayState = _dss[fqme] ds: DisplayState = _dss[fqme]
try:
chart: ChartPlotWidget = { chart: ChartPlotWidget = {
60: ds.hist_chart, 60: ds.hist_chart,
1: ds.chart, 1: ds.chart,
}[timeframe] }[timeframe]
except KeyError:
msg: str = (
f'No chart for timeframe={timeframe}s, '
f'skipping rect annotation'
)
log.exeception(msg)
await annot_req_stream.send({'error': msg})
continue
cv: ChartView = chart.cv cv: ChartView = chart.cv
# NEW: if timestamps provided, lookup current indices
# from shm to ensure alignment with current buffer
# state
start_time = kwargs.pop('start_time', None)
end_time = kwargs.pop('end_time', None)
if (
start_time is not None
and end_time is not None
):
viz: Viz = chart.get_viz(fqme)
shm = viz.shm
arr = shm.array
# lookup start index
start_matches = arr[arr['time'] == start_time]
if len(start_matches) == 0:
msg: str = (
f'No shm entry for start_time={start_time}, '
f'skipping rect'
)
log.error(msg)
await annot_req_stream.send({'error': msg})
continue
# lookup end index
end_matches = arr[arr['time'] == end_time]
if len(end_matches) == 0:
msg: str = (
f'No shm entry for end_time={end_time}, '
f'skipping rect'
)
log.error(msg)
await annot_req_stream.send({'error': msg})
continue
# get close price from start bar, open from end
# bar
start_idx = float(start_matches[0]['index'])
end_idx = float(end_matches[0]['index'])
start_close = float(start_matches[0]['close'])
end_open = float(end_matches[0]['open'])
# reconstruct start_pos and end_pos with
# looked-up indices
from_idx: float = 0.16 - 0.06 # BGM offset
kwargs['start_pos'] = (
start_idx + 1 - from_idx,
start_close,
)
kwargs['end_pos'] = (
end_idx + from_idx,
end_open,
)
# annot type lookup from cmd # annot type lookup from cmd
rect = SelectRect( rect = SelectRect(
viewbox=cv, viewbox=cv,
@ -130,21 +237,207 @@ async def serve_rc_annots(
# delegate generically to the requested method # delegate generically to the requested method
getattr(rect, meth)(**kwargs) getattr(rect, meth)(**kwargs)
rect.show() rect.show()
# XXX: store absolute coords for repositioning
# during viz redraws (eg backfill updates)
rect._meth = meth
rect._kwargs = kwargs
aid: int = id(rect) aid: int = id(rect)
annots[aid] = rect annots[aid] = rect
aids: set[int] = ctxs[ipc_key][1] aids: set[int] = ctxs[ipc_key][1]
aids.add(aid) aids.add(aid)
await annot_req_stream.send(aid) await annot_req_stream.send(aid)
case {
'cmd': 'ArrowEditor',
'fqme': fqme,
'timeframe': timeframe,
'meth': 'add'|'remove' as meth,
'kwargs': {
'x': float(x),
'y': float(y),
'pointing': pointing,
'color': color,
'aid': str()|None as aid,
'alpha': int(alpha),
'headLen': int()|float()|None as headLen,
'headWidth': int()|float()|None as headWidth,
'tailLen': int()|float()|None as tailLen,
'tailWidth': int()|float()|None as tailWidth,
'pxMode': bool(pxMode),
'time': int()|float()|None as timestamp,
},
# ?TODO? split based on method fn-sigs?
# 'pointing',
}:
ds: DisplayState = _dss[fqme]
try:
chart: ChartPlotWidget = {
60: ds.hist_chart,
1: ds.chart,
}[timeframe]
except KeyError:
log.warning(
f'No chart for timeframe={timeframe}s, '
f'skipping arrow annotation'
)
# return -1 to indicate failure
await annot_req_stream.send(-1)
continue
cv: ChartView = chart.cv
godw = chart.linked.godwidget
# NEW: if timestamp provided, lookup current index
# from shm to ensure alignment with current buffer
# state
if timestamp is not None:
viz: Viz = chart.get_viz(fqme)
shm = viz.shm
arr = shm.array
# find index where time matches timestamp
matches = arr[arr['time'] == timestamp]
if len(matches) == 0:
log.error(
f'No shm entry for timestamp={timestamp}, '
f'skipping arrow annotation'
)
await annot_req_stream.send(-1)
continue
# use the matched row's index as x
x = float(matches[0]['index'])
arrows = ArrowEditor(godw=godw)
# `.add/.remove()` API
if meth != 'add':
# await tractor.pause()
raise ValueError(
f'Invalid arrow-edit request ?\n'
f'{msg!r}\n'
)
aid: str = str(uuid4())
arrow: pg.ArrowItem = arrows.add(
plot=chart.plotItem,
uid=aid,
x=x,
y=y,
pointing=pointing,
color=color,
alpha=alpha,
headLen=headLen,
headWidth=headWidth,
tailLen=tailLen,
tailWidth=tailWidth,
pxMode=pxMode,
)
# XXX: store absolute coords for repositioning
# during viz redraws (eg backfill updates)
arrow._abs_x = x
arrow._abs_y = y
annots[aid] = arrow
_editors[aid] = arrows
aids: set[int] = ctxs[ipc_key][1]
aids.add(aid)
await annot_req_stream.send(aid)
case {
'cmd': 'TextItem',
'fqme': fqme,
'timeframe': timeframe,
'kwargs': {
'text': str(text),
'x': int()|float() as x,
'y': int()|float() as y,
'color': color,
'anchor': list(anchor),
'font_size': int()|None as font_size,
'time': int()|float()|None as timestamp,
},
}:
ds: DisplayState = _dss[fqme]
try:
chart: ChartPlotWidget = {
60: ds.hist_chart,
1: ds.chart,
}[timeframe]
except KeyError:
log.warning(
f'No chart for timeframe={timeframe}s, '
f'skipping text annotation'
)
await annot_req_stream.send(-1)
continue
# NEW: if timestamp provided, lookup current index
# from shm to ensure alignment with current buffer
# state
if timestamp is not None:
viz: Viz = chart.get_viz(fqme)
shm = viz.shm
arr = shm.array
# find index where time matches timestamp
matches = arr[arr['time'] == timestamp]
if len(matches) == 0:
log.error(
f'No shm entry for timestamp={timestamp}, '
f'skipping text annotation'
)
await annot_req_stream.send(-1)
continue
# use the matched row's index as x, +1 for text
# offset
x = float(matches[0]['index']) + 1
# convert named color to hex
color_hex: str = hcolor(color)
# create text item
text_item: pg.TextItem = pg.TextItem(
text=text,
color=color_hex,
anchor=anchor,
# ?TODO, pin to github:main for this?
# legacy, can have scaling ish?
# ensureInBounds=True,
)
# apply font size (default to DpiAwareFont if not
# provided)
if font_size is None:
from ._style import get_fonts
font, font_small = get_fonts()
font_size = font_small.px_size - 1
qfont: QFont = text_item.textItem.font()
qfont.setPixelSize(font_size)
text_item.setFont(qfont)
text_item.setPos(x, y)
chart.plotItem.addItem(text_item)
# XXX: store absolute coords for repositioning
# during viz redraws (eg backfill updates)
text_item._abs_x = x
text_item._abs_y = y
aid: str = str(uuid4())
annots[aid] = text_item
aids: set[int] = ctxs[ipc_key][1]
aids.add(aid)
await annot_req_stream.send(aid)
case { case {
'cmd': 'remove', 'cmd': 'remove',
'aid': int(aid), 'aid': int(aid)|str(aid),
}: }:
# NOTE: this is normally entered on # NOTE: this is normally entered on
# a client's annotation de-alloc normally # a client's annotation de-alloc normally
# prior to detach or modify. # prior to detach or modify.
annot: QGraphicsItem = annots[aid] annot: QGraphicsItem = annots[aid]
annot.delete() assert rm_annot(annot)
# respond to client indicating annot # respond to client indicating annot
# was indeed deleted. # was indeed deleted.
@ -175,6 +468,38 @@ async def serve_rc_annots(
) )
viz.reset_graphics() viz.reset_graphics()
# XXX: reposition all annotations to ensure they
# stay aligned with viz data after reset (eg during
# backfill when abs-index range changes)
n_repositioned: int = 0
for aid, annot in annots.items():
# arrows and text items use abs x,y coords
if (
hasattr(annot, '_abs_x')
and
hasattr(annot, '_abs_y')
):
annot.setPos(
annot._abs_x,
annot._abs_y,
)
n_repositioned += 1
# rects use method + kwargs
elif (
hasattr(annot, '_meth')
and
hasattr(annot, '_kwargs')
):
getattr(annot, annot._meth)(**annot._kwargs)
n_repositioned += 1
if n_repositioned:
log.info(
f'Repositioned {n_repositioned} annotation(s) '
f'after viz redraw'
)
case _: case _:
log.error( log.error(
'Unknown remote annotation cmd:\n' 'Unknown remote annotation cmd:\n'
@ -188,6 +513,12 @@ async def remote_annotate(
) -> None: ) -> None:
global _dss, _ctxs global _dss, _ctxs
if not _dss:
raise RuntimeError(
'Race condition on chart-init state ??\n'
'Anoter actor is trying to annoate this chart '
'before it has fully spawned.\n'
)
assert _dss assert _dss
_ctxs[ctx.cid] = (ctx, set()) _ctxs[ctx.cid] = (ctx, set())
@ -212,7 +543,7 @@ async def remote_annotate(
assert _ctx is ctx assert _ctx is ctx
for aid in aids: for aid in aids:
annot: QGraphicsItem = _annots[aid] annot: QGraphicsItem = _annots[aid]
annot.delete() assert rm_annot(annot)
class AnnotCtl(Struct): class AnnotCtl(Struct):
@ -257,13 +588,18 @@ class AnnotCtl(Struct):
from_acm: bool = False, from_acm: bool = False,
) -> int: # NEW: optional timestamps for server-side index lookup
start_time: float|None = None,
end_time: float|None = None,
) -> int|None:
''' '''
Add a `SelectRect` annotation to the target view, return Add a `SelectRect` annotation to the target view, return
the instances `id(obj)` from the remote UI actor. the instances `id(obj)` from the remote UI actor.
''' '''
ipc: MsgStream = self._get_ipc(fqme) ipc: MsgStream = self._get_ipc(fqme)
with trio.fail_after(3):
await ipc.send({ await ipc.send({
'fqme': fqme, 'fqme': fqme,
'cmd': 'SelectRect', 'cmd': 'SelectRect',
@ -275,9 +611,15 @@ class AnnotCtl(Struct):
'end_pos': tuple(end_pos), 'end_pos': tuple(end_pos),
'color': color, 'color': color,
'update_label': False, 'update_label': False,
'start_time': start_time,
'end_time': end_time,
}, },
}) })
aid: int = await ipc.receive() aid: int|dict = await ipc.receive()
match aid:
case {'error': str(msg)}:
log.error(msg)
return None
self._ipcs[aid] = ipc self._ipcs[aid] = ipc
if not from_acm: if not from_acm:
self._annot_stack.push_async_callback( self._annot_stack.push_async_callback(
@ -334,20 +676,130 @@ class AnnotCtl(Struct):
'timeframe': timeframe, 'timeframe': timeframe,
}) })
# TODO: do we even need this? async def add_arrow(
# async def modify( self,
# self, fqme: str,
# aid: int, # annotation id timeframe: float,
# meth: str, # far end graphics object method to invoke x: float,
# params: dict[str, Any], # far end `meth(**kwargs)` y: float,
# ) -> bool: pointing: Literal[
# ''' 'up',
# Modify an existing (remote) annotation's graphics 'down',
# paramters, thus changing it's appearance / state in real ],
# time. # TODO: a `Literal['view', 'scene']` for this?
# domain: str = 'view', # or 'scene'
color: str = 'dad_blue',
alpha: int = 116,
headLen: float|None = None,
headWidth: float|None = None,
tailLen: float|None = None,
tailWidth: float|None = None,
pxMode: bool = True,
# ''' from_acm: bool = False,
# raise NotImplementedError
# NEW: optional timestamp for server-side index lookup
time: float|None = None,
) -> int|None:
'''
Add a `SelectRect` annotation to the target view, return
the instances `id(obj)` from the remote UI actor.
'''
ipc: MsgStream = self._get_ipc(fqme)
with trio.fail_after(3):
await ipc.send({
'fqme': fqme,
'cmd': 'ArrowEditor',
'timeframe': timeframe,
# 'meth': str(meth),
'meth': 'add',
'kwargs': {
'x': float(x),
'y': float(y),
'color': color,
'pointing': pointing, # up|down
'alpha': alpha,
'aid': None,
'headLen': headLen,
'headWidth': headWidth,
'tailLen': tailLen,
'tailWidth': tailWidth,
'pxMode': pxMode,
'time': time, # for server-side index lookup
},
})
aid: int|dict = await ipc.receive()
match aid:
case {'error': str(msg)}:
log.error(msg)
return None
self._ipcs[aid] = ipc
if not from_acm:
self._annot_stack.push_async_callback(
partial(
self.remove,
aid,
)
)
return aid
async def add_text(
self,
fqme: str,
timeframe: float,
text: str,
x: float,
y: float,
color: str|tuple = 'dad_blue',
anchor: tuple[float, float] = (0, 1),
font_size: int|None = None,
from_acm: bool = False,
# NEW: optional timestamp for server-side index lookup
time: float|None = None,
) -> int|None:
'''
Add a `pg.TextItem` annotation to the target view.
anchor: (x, y) where (0,0) is upper-left, (1,1) is lower-right
font_size: pixel size for font, defaults to `_font.font.pixelSize()`
'''
ipc: MsgStream = self._get_ipc(fqme)
with trio.fail_after(3):
await ipc.send({
'fqme': fqme,
'cmd': 'TextItem',
'timeframe': timeframe,
'kwargs': {
'text': text,
'x': float(x),
'y': float(y),
'color': color,
'anchor': tuple(anchor),
'font_size': font_size,
'time': time, # for server-side index lookup
},
})
aid: int|dict = await ipc.receive()
match aid:
case {'error': str(msg)}:
log.error(msg)
return None
self._ipcs[aid] = ipc
if not from_acm:
self._annot_stack.push_async_callback(
partial(
self.remove,
aid,
)
)
return aid
@acm @acm
@ -374,7 +826,9 @@ async def open_annot_ctl(
# TODO: print the current discoverable actor UID set # TODO: print the current discoverable actor UID set
# here as well? # here as well?
if not maybe_portals: if not maybe_portals:
raise RuntimeError('No chart UI actors found in service domain?') raise RuntimeError(
'No chart actors found in service domain?'
)
for portal in maybe_portals: for portal in maybe_portals:
ctx_mngrs.append( ctx_mngrs.append(

View File

@ -61,7 +61,7 @@ class DpiAwareFont:
) -> None: ) -> None:
self._font_size_calc_key: str = _font_size_key self._font_size_calc_key: str = _font_size_key
self._font_size: int | None = None self._font_size: int|None = None
# Read preferred font size from main config file if it exists # Read preferred font size from main config file if it exists
conf, path = config.load('conf', touch_if_dne=True) conf, path = config.load('conf', touch_if_dne=True)
@ -107,7 +107,22 @@ class DpiAwareFont:
@property @property
def px_size(self) -> int: def px_size(self) -> int:
return self._qfont.pixelSize() size: int = self._qfont.pixelSize()
# XXX, when no Qt app has been spawned this will always be
# invalid..
# SO, just return any conf.toml value.
if size == -1:
if (conf_size := self._font_size) is None:
raise ValueError(
f'No valid `{type(_font).__name__}.px_size` set?\n'
f'\n'
f'-> `ui.font_size` is NOT set in `conf.toml`\n'
f'-> no Qt app is active ??\n'
)
return conf_size
return size
def configure_to_dpi(self, screen: QtGui.QScreen | None = None): def configure_to_dpi(self, screen: QtGui.QScreen | None = None):
''' '''
@ -221,6 +236,20 @@ def _config_fonts_to_screen() -> None:
_font_small.configure_to_dpi() _font_small.configure_to_dpi()
def get_fonts() -> tuple[
DpiAwareFont,
DpiAwareFont,
]:
'''
Get the singleton font pair (of instances) from which all other
UI/UX should be "scaled around".
See `DpiAwareFont` for (internal) deats.
'''
return _font, _font_small
# TODO: re-compute font size when main widget switches screens? # TODO: re-compute font size when main widget switches screens?
# https://forum.qt.io/topic/54136/how-do-i-get-the-qscreen-my-widget-is-on-qapplication-desktop-screen-returns-a-qwidget-and-qobject_cast-qscreen-returns-null/3 # https://forum.qt.io/topic/54136/how-do-i-get-the-qscreen-my-widget-is-on-qapplication-desktop-screen-returns-a-qwidget-and-qobject_cast-qscreen-returns-null/3

View File

@ -98,6 +98,7 @@ python-downloads = 'manual'
# https://docs.astral.sh/uv/concepts/projects/dependencies/#default-groups # https://docs.astral.sh/uv/concepts/projects/dependencies/#default-groups
default-groups = [ default-groups = [
'uis', 'uis',
'repl',
] ]
# ------ tool.uv ------ # ------ tool.uv ------
@ -116,7 +117,6 @@ uis = [
dev = [ dev = [
# https://docs.astral.sh/uv/concepts/projects/dependencies/#development-dependencies # https://docs.astral.sh/uv/concepts/projects/dependencies/#development-dependencies
"cython >=3.0.0, <4.0.0", "cython >=3.0.0, <4.0.0",
# nested deps-groups # nested deps-groups
# https://docs.astral.sh/uv/concepts/projects/dependencies/#nesting-groups # https://docs.astral.sh/uv/concepts/projects/dependencies/#nesting-groups
{include-group = 'uis'}, {include-group = 'uis'},
@ -130,10 +130,14 @@ repl = [
"greenback >=1.1.1, <2.0.0", "greenback >=1.1.1, <2.0.0",
# @goodboy's preferred console toolz # @goodboy's preferred console toolz
"xonsh", "xonsh>=0.22.2",
"prompt-toolkit ==3.0.40", "prompt-toolkit ==3.0.40",
"pyperclip>=1.9.0", "pyperclip>=1.9.0",
# for @claude's `snippets/claude_debug_helper.py` it uses to do
# "offline" debug/crash REPL-in alongside a dev.
"pexpect>=4.9.0",
# ?TODO, new stuff to consider.. # ?TODO, new stuff to consider..
# "visidata" # console numerics # "visidata" # console numerics
# "xxh" # for remote `xonsh`-ing # "xxh" # for remote `xonsh`-ing
@ -191,6 +195,11 @@ pyqtgraph = { git = "https://github.com/pikers/pyqtgraph.git" }
tomlkit = { git = "https://github.com/pikers/tomlkit.git", branch ="piker_pin" } tomlkit = { git = "https://github.com/pikers/tomlkit.git", branch ="piker_pin" }
pyvnc = { git = "https://github.com/regulad/pyvnc.git" } pyvnc = { git = "https://github.com/regulad/pyvnc.git" }
# to get fancy next-cmd/suggestion feats prior to 0.22.2 B)
# https://github.com/xonsh/xonsh/pull/6037
# https://github.com/xonsh/xonsh/pull/6048
# xonsh = { git = 'https://github.com/xonsh/xonsh.git', branch = 'main' }
# XXX since, we're like, always hacking new shite all-the-time. Bp # XXX since, we're like, always hacking new shite all-the-time. Bp
tractor = { git = "https://github.com/goodboy/tractor.git", branch ="piker_pin" } tractor = { git = "https://github.com/goodboy/tractor.git", branch ="piker_pin" }
# tractor = { git = "https://pikers.dev/goodboy/tractor", branch = "piker_pin" } # tractor = { git = "https://pikers.dev/goodboy/tractor", branch = "piker_pin" }

View File

@ -0,0 +1,256 @@
#!/usr/bin/env python
'''
Programmatic debugging helper for `pdbp` REPL human-like
interaction but built to allow `claude` to interact with
crashes and `tractor.pause()` breakpoints along side a human dev.
Originally written by `clauded` during a backfiller inspection
session with @goodboy trying to resolve duplicate/gappy ohlcv ts
issues discovered while testing the new `nativedb` tsdb.
Allows `claude` to run `pdb` commands and capture output in an "offline"
manner but generating similar output as if it was iteracting with
the debug REPL.
The use of `pexpect` is heavily based on tractor's REPL UX test
suite(s), namely various `tests/devx/test_debugger.py` patterns.
'''
import sys
import os
import time
import pexpect
from pexpect.exceptions import (
TIMEOUT,
EOF,
)
PROMPT: str = r'\(Pdb\+\)'
def expect(
child: pexpect.spawn,
patt: str,
**kwargs,
) -> None:
'''
Expect wrapper that prints last console data before failing.
'''
try:
child.expect(
patt,
**kwargs,
)
except TIMEOUT:
before: str = (
str(child.before.decode())
if isinstance(child.before, bytes)
else str(child.before)
)
print(
f'TIMEOUT waiting for pattern: {patt}\n'
f'Last seen output:\n{before}'
)
raise
def run_pdb_commands(
commands: list[str],
initial_cmd: str = 'piker store ldshm xmrusdt.usdtm.perp.binance',
timeout: int = 30,
print_output: bool = True,
) -> dict[str, str]:
'''
Spawn piker process, wait for pdb prompt, execute commands.
Returns dict mapping command -> output.
'''
results: dict[str, str] = {}
# Disable colored output for easier parsing
os.environ['PYTHON_COLORS'] = '0'
# Spawn the process
if print_output:
print(f'Spawning: {initial_cmd}')
child: pexpect.spawn = pexpect.spawn(
initial_cmd,
timeout=timeout,
encoding='utf-8',
echo=False,
)
# Wait for pdb prompt
try:
expect(child, PROMPT, timeout=timeout)
if print_output:
print('Reached pdb prompt!')
# Execute each command
for cmd in commands:
if print_output:
print(f'\n>>> {cmd}')
child.sendline(cmd)
time.sleep(0.1)
# Wait for next prompt
expect(child, PROMPT, timeout=timeout)
# Capture output (everything before the prompt)
output: str = (
str(child.before.decode())
if isinstance(child.before, bytes)
else str(child.before)
)
results[cmd] = output
if print_output:
print(output)
# Quit debugger gracefully
child.sendline('quit')
try:
child.expect(EOF, timeout=5)
except (TIMEOUT, EOF):
pass
except TIMEOUT as e:
print(f'Timeout: {e}')
if child.before:
before: str = (
str(child.before.decode())
if isinstance(child.before, bytes)
else str(child.before)
)
print(f'Buffer:\n{before}')
results['_error'] = str(e)
finally:
if child.isalive():
child.close(force=True)
return results
class InteractivePdbSession:
'''
Interactive pdb session manager for incremental debugging.
'''
def __init__(
self,
cmd: str = 'piker store ldshm xmrusdt.usdtm.perp.binance',
timeout: int = 30,
):
self.cmd: str = cmd
self.timeout: int = timeout
self.child: pexpect.spawn|None = None
self.history: list[tuple[str, str]] = []
def start(self) -> None:
'''
Start the piker process and wait for first prompt.
'''
os.environ['PYTHON_COLORS'] = '0'
print(f'Starting: {self.cmd}')
self.child = pexpect.spawn(
self.cmd,
timeout=self.timeout,
encoding='utf-8',
echo=False,
)
# Wait for initial prompt
expect(self.child, PROMPT, timeout=self.timeout)
print('Ready at pdb prompt!')
def run(
self,
cmd: str,
print_output: bool = True,
) -> str:
'''
Execute a single pdb command and return output.
'''
if not self.child or not self.child.isalive():
raise RuntimeError('Session not started or dead')
if print_output:
print(f'\n>>> {cmd}')
self.child.sendline(cmd)
time.sleep(0.1)
# Wait for next prompt
expect(self.child, PROMPT, timeout=self.timeout)
output: str = (
str(self.child.before.decode())
if isinstance(self.child.before, bytes)
else str(self.child.before)
)
self.history.append((cmd, output))
if print_output:
print(output)
return output
def quit(self) -> None:
'''
Exit the debugger and cleanup.
'''
if self.child and self.child.isalive():
self.child.sendline('quit')
try:
self.child.expect(EOF, timeout=5)
except (TIMEOUT, EOF):
pass
self.child.close(force=True)
def __enter__(self):
self.start()
return self
def __exit__(self, *args):
self.quit()
if __name__ == '__main__':
# Example inspection commands
inspect_cmds: list[str] = [
'locals().keys()',
'type(deduped)',
'deduped.shape',
(
'step_gaps.shape '
'if "step_gaps" in locals() '
'else "N/A"'
),
(
'venue_gaps.shape '
'if "venue_gaps" in locals() '
'else "N/A"'
),
]
# Allow commands from CLI args
if len(sys.argv) > 1:
inspect_cmds = sys.argv[1:]
# Interactive session example
with InteractivePdbSession() as session:
for cmd in inspect_cmds:
session.run(cmd)
print('\n=== Session Complete ===')

41
uv.lock
View File

@ -1000,6 +1000,18 @@ wheels = [
{ url = "https://files.pythonhosted.org/packages/6e/23/e98758924d1b3aac11a626268eabf7f3cf177e7837c28d47bf84c64532d0/pendulum-3.1.0-py3-none-any.whl", hash = "sha256:f9178c2a8e291758ade1e8dd6371b1d26d08371b4c7730a6e9a3ef8b16ebae0f", size = 111799, upload-time = "2025-04-19T14:02:34.739Z" }, { url = "https://files.pythonhosted.org/packages/6e/23/e98758924d1b3aac11a626268eabf7f3cf177e7837c28d47bf84c64532d0/pendulum-3.1.0-py3-none-any.whl", hash = "sha256:f9178c2a8e291758ade1e8dd6371b1d26d08371b4c7730a6e9a3ef8b16ebae0f", size = 111799, upload-time = "2025-04-19T14:02:34.739Z" },
] ]
[[package]]
name = "pexpect"
version = "4.9.0"
source = { registry = "https://pypi.org/simple" }
dependencies = [
{ name = "ptyprocess" },
]
sdist = { url = "https://files.pythonhosted.org/packages/42/92/cc564bf6381ff43ce1f4d06852fc19a2f11d180f23dc32d9588bee2f149d/pexpect-4.9.0.tar.gz", hash = "sha256:ee7d41123f3c9911050ea2c2dac107568dc43b2d3b0c7557a33212c398ead30f", size = 166450, upload-time = "2023-11-25T09:07:26.339Z" }
wheels = [
{ url = "https://files.pythonhosted.org/packages/9e/c3/059298687310d527a58bb01f3b1965787ee3b40dce76752eda8b44e9a2c5/pexpect-4.9.0-py2.py3-none-any.whl", hash = "sha256:7236d1e080e4936be2dc3e326cec0af72acf9212a7e1d060210e70a47e253523", size = 63772, upload-time = "2023-11-25T06:56:14.81Z" },
]
[[package]] [[package]]
name = "piker" name = "piker"
version = "0.1.0a0.dev0" version = "0.1.0a0.dev0"
@ -1047,6 +1059,7 @@ dev = [
{ name = "greenback" }, { name = "greenback" },
{ name = "i3ipc" }, { name = "i3ipc" },
{ name = "pdbp" }, { name = "pdbp" },
{ name = "pexpect" },
{ name = "prompt-toolkit" }, { name = "prompt-toolkit" },
{ name = "pyperclip" }, { name = "pyperclip" },
{ name = "pyqt6" }, { name = "pyqt6" },
@ -1062,6 +1075,7 @@ lint = [
repl = [ repl = [
{ name = "greenback" }, { name = "greenback" },
{ name = "pdbp" }, { name = "pdbp" },
{ name = "pexpect" },
{ name = "prompt-toolkit" }, { name = "prompt-toolkit" },
{ name = "pyperclip" }, { name = "pyperclip" },
{ name = "xonsh" }, { name = "xonsh" },
@ -1116,6 +1130,7 @@ dev = [
{ name = "greenback", specifier = ">=1.1.1,<2.0.0" }, { name = "greenback", specifier = ">=1.1.1,<2.0.0" },
{ name = "i3ipc", specifier = ">=2.2.1" }, { name = "i3ipc", specifier = ">=2.2.1" },
{ name = "pdbp", specifier = ">=1.8.2,<2.0.0" }, { name = "pdbp", specifier = ">=1.8.2,<2.0.0" },
{ name = "pexpect", specifier = ">=4.9.0" },
{ name = "prompt-toolkit", specifier = "==3.0.40" }, { name = "prompt-toolkit", specifier = "==3.0.40" },
{ name = "pyperclip", specifier = ">=1.9.0" }, { name = "pyperclip", specifier = ">=1.9.0" },
{ name = "pyqt6", specifier = ">=6.7.0,<7.0.0" }, { name = "pyqt6", specifier = ">=6.7.0,<7.0.0" },
@ -1123,15 +1138,16 @@ dev = [
{ name = "pytest" }, { name = "pytest" },
{ name = "qdarkstyle", specifier = ">=3.0.2,<4.0.0" }, { name = "qdarkstyle", specifier = ">=3.0.2,<4.0.0" },
{ name = "rapidfuzz", specifier = ">=3.2.0,<4.0.0" }, { name = "rapidfuzz", specifier = ">=3.2.0,<4.0.0" },
{ name = "xonsh" }, { name = "xonsh", specifier = ">=0.22.2" },
] ]
lint = [{ name = "ruff", specifier = ">=0.9.6" }] lint = [{ name = "ruff", specifier = ">=0.9.6" }]
repl = [ repl = [
{ name = "greenback", specifier = ">=1.1.1,<2.0.0" }, { name = "greenback", specifier = ">=1.1.1,<2.0.0" },
{ name = "pdbp", specifier = ">=1.8.2,<2.0.0" }, { name = "pdbp", specifier = ">=1.8.2,<2.0.0" },
{ name = "pexpect", specifier = ">=4.9.0" },
{ name = "prompt-toolkit", specifier = "==3.0.40" }, { name = "prompt-toolkit", specifier = "==3.0.40" },
{ name = "pyperclip", specifier = ">=1.9.0" }, { name = "pyperclip", specifier = ">=1.9.0" },
{ name = "xonsh" }, { name = "xonsh", specifier = ">=0.22.2" },
] ]
testing = [{ name = "pytest" }] testing = [{ name = "pytest" }]
uis = [ uis = [
@ -1297,6 +1313,15 @@ wheels = [
{ url = "https://files.pythonhosted.org/packages/5b/5a/bc7b4a4ef808fa59a816c17b20c4bef6884daebbdf627ff2a161da67da19/propcache-0.4.1-py3-none-any.whl", hash = "sha256:af2a6052aeb6cf17d3e46ee169099044fd8224cbaf75c76a2ef596e8163e2237", size = 13305, upload-time = "2025-10-08T19:49:00.792Z" }, { url = "https://files.pythonhosted.org/packages/5b/5a/bc7b4a4ef808fa59a816c17b20c4bef6884daebbdf627ff2a161da67da19/propcache-0.4.1-py3-none-any.whl", hash = "sha256:af2a6052aeb6cf17d3e46ee169099044fd8224cbaf75c76a2ef596e8163e2237", size = 13305, upload-time = "2025-10-08T19:49:00.792Z" },
] ]
[[package]]
name = "ptyprocess"
version = "0.7.0"
source = { registry = "https://pypi.org/simple" }
sdist = { url = "https://files.pythonhosted.org/packages/20/e5/16ff212c1e452235a90aeb09066144d0c5a6a8c0834397e03f5224495c4e/ptyprocess-0.7.0.tar.gz", hash = "sha256:5c5d0a3b48ceee0b48485e0c26037c0acd7d29765ca3fbb5cb3831d347423220", size = 70762, upload-time = "2020-12-28T15:15:30.155Z" }
wheels = [
{ url = "https://files.pythonhosted.org/packages/22/a6/858897256d0deac81a172289110f31629fc4cee19b6f01283303e18c8db3/ptyprocess-0.7.0-py2.py3-none-any.whl", hash = "sha256:4b41f3967fce3af57cc7e94b888626c18bf37a083e3651ca8feeb66d492fef35", size = 13993, upload-time = "2020-12-28T15:15:28.35Z" },
]
[[package]] [[package]]
name = "pyarrow" name = "pyarrow"
version = "22.0.0" version = "22.0.0"
@ -1843,7 +1868,7 @@ source = { git = "https://github.com/pikers/tomlkit.git?branch=piker_pin#8e0239a
[[package]] [[package]]
name = "tractor" name = "tractor"
version = "0.1.0a6.dev0" version = "0.1.0a6.dev0"
source = { git = "https://github.com/goodboy/tractor.git?branch=piker_pin#e232d9dd06f41b8dca997f0647f2083d27cc34f2" } source = { git = "https://github.com/goodboy/tractor.git?branch=piker_pin#36307c59175a1d04fecc77ef2c28f5c943b5f3d1" }
dependencies = [ dependencies = [
{ name = "bidict" }, { name = "bidict" },
{ name = "cffi" }, { name = "cffi" },
@ -2095,13 +2120,13 @@ wheels = [
[[package]] [[package]]
name = "xonsh" name = "xonsh"
version = "0.20.0" version = "0.22.4"
source = { registry = "https://pypi.org/simple" } source = { registry = "https://pypi.org/simple" }
sdist = { url = "https://files.pythonhosted.org/packages/56/af/7e2ba3885da44cbe03c7ff46f90ea917ba10d91dc74d68604001ea28055f/xonsh-0.20.0.tar.gz", hash = "sha256:d44a50ee9f288ff96bd0456f0a38988ef6d4985637140ea793beeef5ec5d2d38", size = 811907, upload-time = "2025-11-24T07:50:50.847Z" } sdist = { url = "https://files.pythonhosted.org/packages/48/df/1fc9ed62b3d7c14612e1713e9eb7bd41d54f6ad1028a8fbb6b7cddebc345/xonsh-0.22.4.tar.gz", hash = "sha256:6be346563fec2db75778ba5d2caee155525e634e99d9cc8cc347626025c0b3fa", size = 826665, upload-time = "2026-02-17T07:53:39.424Z" }
wheels = [ wheels = [
{ url = "https://files.pythonhosted.org/packages/e8/db/1c5c057c0b2a89b8919477726558685720ae0849ea1a98a3803e93550824/xonsh-0.20.0-py311-none-any.whl", hash = "sha256:65d27ba31d558f79010d6c652751449fd3ed4df1f1eda78040a6427fa0a0f03e", size = 646312, upload-time = "2025-11-24T07:50:49.488Z" }, { url = "https://files.pythonhosted.org/packages/2e/00/7cbc0c1fb64365a0a317c54ce3a151c9644eea5a509d9cbaae61c9fd1426/xonsh-0.22.4-py311-none-any.whl", hash = "sha256:38b29b29fa85aa756462d9d9bbcaa1d85478c2108da3de6cc590a69a4bcd1a01", size = 654375, upload-time = "2026-02-17T07:53:37.702Z" },
{ url = "https://files.pythonhosted.org/packages/d2/a2/d6f7534f31489a4b8b54bd2a2496248f86f7c21a6a6ce9bfdcdd389fe4e7/xonsh-0.20.0-py312-none-any.whl", hash = "sha256:3148900e67b9c2796bef6f2eda003b0a64d4c6f50a0db23324f786d9e1af9353", size = 646323, upload-time = "2025-11-24T07:50:43.028Z" }, { url = "https://files.pythonhosted.org/packages/2e/c2/3dd498dc28d8f89cdd52e39950c5e591499ae423f61694c0bb4d03ed1d82/xonsh-0.22.4-py312-none-any.whl", hash = "sha256:4e538fac9f4c3d866ddbdeca068f0c0515469c997ed58d3bfee963878c6df5a5", size = 654300, upload-time = "2026-02-17T07:53:35.813Z" },
{ url = "https://files.pythonhosted.org/packages/bd/48/bcb1e4d329c3d522bc29b066b0b6ee86938ec392376a29c36fac0ad1c586/xonsh-0.20.0-py313-none-any.whl", hash = "sha256:c83daaf6eb2960180fc5a507459dbdf6c0d6d63e1733c43f4e43db77255c7278", size = 646830, upload-time = "2025-11-24T07:50:45.078Z" }, { url = "https://files.pythonhosted.org/packages/82/7d/1f9c7147518e9f03f6ce081b5bfc4f1aceb6ec5caba849024d005e41d3be/xonsh-0.22.4-py313-none-any.whl", hash = "sha256:cc5fabf0ad0c56a2a11bed1e6a43c4ec6416a5b30f24f126b8e768547c3793e2", size = 654818, upload-time = "2026-02-17T07:53:33.477Z" },
] ]
[[package]] [[package]]