Compare commits

..

No commits in common. "ff584215653d0f714f72ee5a2265834cde1fc12a" and "19136f66a078af94cdfc9fe481607b4e77e2a8f3" have entirely different histories.

7 changed files with 176 additions and 197 deletions

View File

@ -253,17 +253,12 @@ class Sampler:
# f'consumers: {subs}'
)
borked: set[tractor.MsgStream] = set()
sent: set[tractor.MsgStream] = set()
while True:
try:
for stream in (subs - sent):
for stream in subs:
try:
await stream.send({
'index': time_stamp or last_ts,
'period': period_s,
})
sent.add(stream)
except (
trio.BrokenResourceError,
trio.ClosedResourceError
@ -272,11 +267,6 @@ class Sampler:
f'{stream._ctx.chan.uid} dropped connection'
)
borked.add(stream)
else:
break
except RuntimeError:
log.warning(f'Client subs {subs} changed while broadcasting')
continue
for stream in borked:
try:
@ -858,16 +848,6 @@ async def uniform_rate_send(
# rate timing exactly lul
try:
await stream.send({sym: first_quote})
except tractor.RemoteActorError as rme:
if rme.type is not tractor._exceptions.StreamOverrun:
raise
ctx = stream._ctx
chan = ctx.chan
log.warning(
'Throttled quote-stream overrun!\n'
f'{sym}:{ctx.cid}@{chan.uid}'
)
except (
# NOTE: any of these can be raised by ``tractor``'s IPC
# transport-layer and we want to be highly resilient

View File

@ -1589,9 +1589,6 @@ async def open_feed(
(brokermod, bfqsns),
) in zip(ctxs, providers.items()):
# NOTE: do it asap to avoid overruns during multi-feed setup?
ctx._backpressure = backpressure
for fqsn, flume_msg in flumes_msg_dict.items():
flume = Flume.from_msg(flume_msg)
assert flume.symbol.fqsn == fqsn

View File

@ -634,7 +634,6 @@ class LinkedSplits(QWidget):
axis.pi = cpw.plotItem
cpw.hideAxis('left')
# cpw.removeAxis('left')
cpw.hideAxis('bottom')
if (
@ -751,12 +750,12 @@ class LinkedSplits(QWidget):
# NOTE: back-link the new sub-chart to trigger y-autoranging in
# the (ohlc parent) main chart for this linked set.
# if self.chart:
# main_viz = self.chart.get_viz(self.chart.name)
# self.chart.view.enable_auto_yrange(
# src_vb=cpw.view,
# viz=main_viz,
# )
if self.chart:
main_viz = self.chart.get_viz(self.chart.name)
self.chart.view.enable_auto_yrange(
src_vb=cpw.view,
viz=main_viz,
)
graphics = viz.graphics
data_key = viz.name
@ -1107,12 +1106,6 @@ class ChartPlotWidget(pg.PlotWidget):
pi.chart_widget = self
pi.hideButtons()
# hide all axes not named by ``axis_side``
for axname in (
({'bottom'} | allowed_sides) - {axis_side}
):
pi.hideAxis(axname)
# compose this new plot's graphics with the current chart's
# existing one but with separate axes as neede and specified.
self.pi_overlay.add_plotitem(
@ -1216,21 +1209,17 @@ class ChartPlotWidget(pg.PlotWidget):
pi = overlay
if add_sticky:
axis = pi.getAxis(add_sticky)
if pi.name not in axis._stickies:
if pi is not self.plotItem:
# overlay = self.pi_overlay
# assert pi in overlay.overlays
overlay = self.pi_overlay
assert pi in overlay.overlays
axis = overlay.get_axis(
overlay_axis = overlay.get_axis(
pi,
add_sticky,
)
else:
axis = pi.getAxis(add_sticky)
if pi.name not in axis._stickies:
assert overlay_axis is axis
# TODO: UGH! just make this not here! we should
# be making the sticky from code which has access

View File

@ -934,17 +934,16 @@ class Viz(msgspec.Struct): # , frozen=True):
# the most recent datum is being drawn.
uppx = ceil(gfx.x_uppx())
if (
(self._in_ds or only_last_uppx)
and uppx > 0
):
alt_renderer = self._alt_r
if alt_renderer:
renderer, gfx = alt_renderer
else:
renderer = self._src_r
if (
(self._in_ds or only_last_uppx)
and uppx > 0
and renderer is not None
):
fmtr = renderer.fmtr
x = fmtr.x_1d
y = fmtr.y_1d
@ -953,7 +952,6 @@ class Viz(msgspec.Struct): # , frozen=True):
if alt_renderer:
iuppx = ceil(uppx / fmtr.flat_index_ratio)
if y is not None:
y = y[-iuppx:]
ymn, ymx = y.min(), y.max()
try:

View File

@ -144,11 +144,12 @@ def multi_maxmin(
profiler(f'vlm_viz.maxmin({read_slc})')
return (
mx,
# enforcing price can't be negative?
# TODO: do we even need this?
max(mn, 0),
mx,
mx_vlm_in_view, # vlm max
)
@ -347,8 +348,8 @@ async def graphics_update_loop(
vlm_viz = vlm_chart._vizs.get('volume') if vlm_chart else None
(
last_mn,
last_mx,
last_mn,
last_mx_vlm,
) = multi_maxmin(
None,
@ -376,7 +377,7 @@ async def graphics_update_loop(
# present differently -> likely dark vlm
tick_size = symbol.tick_size
tick_margin = 4 * tick_size
tick_margin = 3 * tick_size
fast_chart.show()
last_quote_s = time.time()
@ -543,14 +544,8 @@ def graphics_update_cycle(
# them as an additional graphic.
clear_types = _tick_groups['clears']
# TODO: fancier y-range sorting..
# https://github.com/pikers/piker/issues/325
# - a proper streaming mxmn algo as per above issue.
# - we should probably scale the view margin based on the size of
# the true range? This way you can slap in orders outside the
# current L1 (only) book range.
mx = lmx = varz['last_mx']
mn = lmn = varz['last_mn']
mx = varz['last_mx']
mn = varz['last_mn']
mx_vlm_in_view = varz['last_mx_vlm']
# update ohlc sampled price bars
@ -560,12 +555,24 @@ def graphics_update_cycle(
(liv and do_px_step)
or trigger_all
):
# TODO: i think we're double calling this right now
# since .interact_graphics_cycle() also calls it?
# I guess we can add a guard in there?
_, i_read_range, _ = main_viz.update_graphics()
profiler('`Viz.update_graphics()` call')
(
mx_in_view,
mn_in_view,
mx_vlm_in_view,
) = multi_maxmin(
i_read_range,
main_viz,
ds.vlm_viz,
profiler,
)
mx = mx_in_view + tick_margin
mn = mn_in_view - tick_margin
profiler('{fqsdn} `multi_maxmin()` call')
# don't real-time "shift" the curve to the
# left unless we get one of the following:
if (
@ -581,23 +588,6 @@ def graphics_update_cycle(
profiler('view incremented')
# NOTE: do this **after** the tread to ensure we take the yrange
# from the most current view x-domain.
(
mn_in_view,
mx_in_view,
mx_vlm_in_view,
) = multi_maxmin(
i_read_range,
main_viz,
ds.vlm_viz,
profiler,
)
mx = mx_in_view + tick_margin
mn = mn_in_view - tick_margin
profiler(f'{fqsn} `multi_maxmin()` call')
# iterate frames of ticks-by-type such that we only update graphics
# using the last update per type where possible.
ticks_by_type = quote.get('tbt', {})
@ -683,10 +673,14 @@ def graphics_update_cycle(
# Y-autoranging: adjust y-axis limits based on state tracking
# of previous "last" L1 values which are in view.
mn_diff = mn - lmn
lmx = varz['last_mx']
lmn = varz['last_mn']
mx_diff = mx - lmx
mn_diff = mn - lmn
if (
mx_diff or mn_diff
mx_diff
or mn_diff
):
# complain about out-of-range outliers which can show up
# in certain annoying feeds (like ib)..
@ -705,12 +699,7 @@ def graphics_update_cycle(
f'mn_diff: {mn_diff}\n'
)
# TODO: track local liv maxmin without doing a recompute all the
# time..plus, just generally the user is more likely to be
# zoomed out enough on the slow chart that this is never an
# issue (the last datum going out of y-range).
# FAST CHART y-auto-range resize case
# FAST CHART resize case
elif (
liv
and not chart._static_yrange == 'axis'
@ -721,15 +710,22 @@ def graphics_update_cycle(
main_vb._ic is None
or not main_vb._ic.is_set()
):
# print(f'SETTING Y-mxmx -> {main_viz.name}: {(mn, mx)}')
# TODO: incremenal update of the median
# and maxmin driving the y-autoranging.
# yr = (mn, mx)
main_vb.interact_graphics_cycle(
# do_overlay_scaling=False,
do_linked_charts=False,
yranges={main_viz: (mn, mx)},
)
# TODO: we should probably scale
# the view margin based on the size
# of the true range? This way you can
# slap in orders outside the current
# L1 (only) book range.
profiler('main vb y-autorange')
# SLOW CHART y-auto-range resize case
# SLOW CHART resize case
(
_,
hist_liv,
@ -744,6 +740,10 @@ def graphics_update_cycle(
)
profiler('hist `Viz.incr_info()`')
# TODO: track local liv maxmin without doing a recompute all the
# time..plut, just generally the user is more likely to be
# zoomed out enough on the slow chart that this is never an
# issue (the last datum going out of y-range).
# hist_chart = ds.hist_chart
# if (
# hist_liv
@ -758,8 +758,7 @@ def graphics_update_cycle(
# XXX: update this every draw cycle to ensure y-axis auto-ranging
# only adjusts when the in-view data co-domain actually expands or
# contracts.
varz['last_mn'] = mn
varz['last_mx'] = mx
varz['last_mx'], varz['last_mn'] = mx, mn
# TODO: a similar, only-update-full-path-on-px-step approach for all
# fsp overlays and vlm stuff..
@ -767,12 +766,10 @@ def graphics_update_cycle(
# run synchronous update on all `Viz` overlays
for curve_name, viz in chart._vizs.items():
if viz.is_ohlc:
continue
# update any overlayed fsp flows
if (
curve_name != fqsn
and not viz.is_ohlc
):
update_fsp_chart(
viz,
@ -888,9 +885,7 @@ def graphics_update_cycle(
fvb.interact_graphics_cycle(
do_linked_charts=False,
)
profiler(
f'Viz[{viz.name}].plot.vb.interact_graphics_cycle()`'
)
profiler(f'vlm `Viz[{viz.name}].plot.vb._set_yrange()`')
# even if we're downsampled bigly
# draw the last datum in the final
@ -1319,6 +1314,13 @@ async def display_symbol_data(
name=fqsn,
axis_title=fqsn,
)
# only show a singleton bottom-bottom axis by default.
hist_pi.hideAxis('bottom')
# XXX: TODO: THIS WILL CAUSE A GAP ON OVERLAYS,
# i think it needs to be "removed" instead when there
# are none?
hist_pi.hideAxis('left')
hist_viz = hist_chart.draw_curve(
fqsn,
@ -1354,6 +1356,9 @@ async def display_symbol_data(
axis_title=fqsn,
)
rt_pi.hideAxis('left')
rt_pi.hideAxis('bottom')
rt_viz = rt_chart.draw_curve(
fqsn,
ohlcv,

View File

@ -608,7 +608,6 @@ async def open_vlm_displays(
linked: LinkedSplits,
flume: Flume,
dvlm: bool = True,
loglevel: str = 'info',
task_status: TaskStatus[ChartPlotWidget] = trio.TASK_STATUS_IGNORED,
@ -691,7 +690,7 @@ async def open_vlm_displays(
# the axis on the left it's totally not lined up...
# show volume units value on LHS (for dinkus)
# vlm_chart.hideAxis('right')
vlm_chart.hideAxis('left')
# vlm_chart.showAxis('left')
# send back new chart to caller
task_status.started(vlm_chart)
@ -711,9 +710,9 @@ async def open_vlm_displays(
_, _, vlm_curve = vlm_viz.update_graphics()
# size view to data once at outset
# vlm_chart.view._set_yrange(
# viz=vlm_viz
# )
vlm_chart.view._set_yrange(
viz=vlm_viz
)
# add axis title
axis = vlm_chart.getAxis('right')
@ -735,8 +734,22 @@ async def open_vlm_displays(
},
},
},
loglevel,
# loglevel,
)
tasks_ready.append(started)
# FIXME: we should error on starting the same fsp right
# since it might collide with existing shm.. or wait we
# had this before??
# dolla_vlm
tasks_ready.append(started)
# profiler(f'created shm for fsp actor: {display_name}')
# wait for all engine tasks to startup
async with trio.open_nursery() as n:
for event in tasks_ready:
n.start_soon(event.wait)
# dolla vlm overlay
# XXX: the main chart already contains a vlm "units" axis
@ -759,6 +772,10 @@ async def open_vlm_displays(
},
)
# TODO: should this maybe be implicit based on input args to
# `.overlay_plotitem()` above?
dvlm_pi.hideAxis('bottom')
# all to be overlayed curve names
dvlm_fields = [
'dolla_vlm',
@ -808,7 +825,6 @@ async def open_vlm_displays(
)
assert viz.plot is pi
await started.wait()
chart_curves(
dvlm_fields,
dvlm_pi,
@ -817,17 +833,19 @@ async def open_vlm_displays(
step_mode=True,
)
# NOTE: spawn flow rates fsp **ONLY AFTER** the 'dolla_vlm' fsp is
# up since calculating vlm "rates" obvs first requires the
# underlying vlm event feed ;)
# spawn flow rates fsp **ONLY AFTER** the 'dolla_vlm' fsp is
# up since this one depends on it.
fr_flume, started = await admin.start_engine_task(
flow_rates,
{ # fsp engine conf
'func_name': 'flow_rates',
'zero_on_step': True,
},
loglevel,
# loglevel,
)
await started.wait()
# chart_curves(
# dvlm_rate_fields,
# dvlm_pi,
@ -841,15 +859,10 @@ async def open_vlm_displays(
# liquidity events (well at least on low OHLC periods - 1s).
vlm_curve.hide()
vlm_chart.removeItem(vlm_curve)
# vlm_chart.plotItem.layout.setMinimumWidth(0)
# vlm_chart.removeAxis('left')
vlm_viz = vlm_chart._vizs['volume']
vlm_viz.render = False
# NOTE: DON'T DO THIS.
# WHY: we want range sorting on volume for the RHS label!
# -> if you don't want that then use this but likely you
# only will if we decide to drop unit vlm..
# vlm_viz.render = False
# avoid range sorting on volume once disabled
vlm_chart.view.disable_auto_yrange()
# Trade rate overlay
@ -873,8 +886,8 @@ async def open_vlm_displays(
},
)
tr_pi.hideAxis('bottom')
await started.wait()
chart_curves(
trade_rate_fields,
tr_pi,

View File

@ -956,8 +956,6 @@ class ChartView(ViewBox):
debug_print: bool = False,
do_overlay_scaling: bool = True,
do_linked_charts: bool = True,
yranges: tuple[float, float] | None = None,
):
profiler = Profiler(
msg=f'ChartView.interact_graphics_cycle() for {self.name}',
@ -1046,22 +1044,15 @@ class ChartView(ViewBox):
profiler(f'{viz.name}@{chart_name} `Viz.update_graphics()`')
yrange = yranges.get(viz) if yranges else None
if yrange is not None:
# print(f'INPUT {viz.name} yrange: {yrange}')
read_slc = slice(*i_read_range)
else:
out = viz.maxmin(i_read_range=i_read_range)
if out is None:
log.warning(f'No yrange provided for {name}!?')
return
(
_, # ixrng,
ixrng,
read_slc,
yrange
) = out
profiler(f'{viz.name}@{chart_name} `Viz.maxmin()`')
pi = viz.plot
@ -1086,16 +1077,19 @@ class ChartView(ViewBox):
):
ymn, ymx = yrange
# print(f'adding {viz.name} to overlay')
# mxmn_groups[viz.name] = out
# viz = chart._vizs[viz_name]
# determine start datum in view
arr = viz.shm.array
in_view = arr[read_slc]
if not in_view.size:
log.warning(f'{viz.name} not in view?')
return
row_start = arr[read_slc.start - 1]
# y_med = (ymx - ymn) / 2
# y_med = viz.median_from_range(
# read_slc.start,
# read_slc.stop,
# )
if viz.is_ohlc:
y_start = row_start['open']
else:
@ -1108,6 +1102,7 @@ class ChartView(ViewBox):
y_start,
ymn,
ymx,
# y_med,
read_slc,
in_view,
)
@ -1129,8 +1124,10 @@ class ChartView(ViewBox):
# y_ref = y_med
# up_rng = (ymx - y_ref) / y_ref
# down_rng = (ymn - y_ref) / y_ref
# mx_up_rng = max(mx_up_rng, up_rng)
# mn_down_rng = min(mn_down_rng, down_rng)
# print(
# f'{viz.name}@{chart_name} group mxmn calc\n'
# '--------------------\n'
@ -1161,10 +1158,10 @@ class ChartView(ViewBox):
len(start_datums) < 2
or not do_overlay_scaling
):
# print(f'ONLY ranging major: {viz.name}')
if not major_viz:
major_viz = viz
# print(f'ONLY ranging major: {viz.name}')
major_viz.plot.vb._set_yrange(
yrange=yrange,
)